summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/tests/rt_basic.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/tests/rt_basic.rs')
-rw-r--r--third_party/rust/tokio/tests/rt_basic.rs296
1 files changed, 296 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/rt_basic.rs b/third_party/rust/tokio/tests/rt_basic.rs
new file mode 100644
index 0000000000..cc6ac67728
--- /dev/null
+++ b/third_party/rust/tokio/tests/rt_basic.rs
@@ -0,0 +1,296 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use tokio::runtime::Runtime;
+use tokio::sync::oneshot;
+use tokio::time::{timeout, Duration};
+use tokio_test::{assert_err, assert_ok};
+
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::task::{Context, Poll};
+use std::thread;
+
+mod support {
+ pub(crate) mod mpsc_stream;
+}
+
+macro_rules! cfg_metrics {
+ ($($t:tt)*) => {
+ #[cfg(tokio_unstable)]
+ {
+ $( $t )*
+ }
+ }
+}
+
+#[test]
+fn spawned_task_does_not_progress_without_block_on() {
+ let (tx, mut rx) = oneshot::channel();
+
+ let rt = rt();
+
+ rt.spawn(async move {
+ assert_ok!(tx.send("hello"));
+ });
+
+ thread::sleep(Duration::from_millis(50));
+
+ assert_err!(rx.try_recv());
+
+ let out = rt.block_on(async { assert_ok!(rx.await) });
+
+ assert_eq!(out, "hello");
+}
+
+#[test]
+fn no_extra_poll() {
+ use pin_project_lite::pin_project;
+ use std::pin::Pin;
+ use std::sync::{
+ atomic::{AtomicUsize, Ordering::SeqCst},
+ Arc,
+ };
+ use std::task::{Context, Poll};
+ use tokio_stream::{Stream, StreamExt};
+
+ pin_project! {
+ struct TrackPolls<S> {
+ npolls: Arc<AtomicUsize>,
+ #[pin]
+ s: S,
+ }
+ }
+
+ impl<S> Stream for TrackPolls<S>
+ where
+ S: Stream,
+ {
+ type Item = S::Item;
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let this = self.project();
+ this.npolls.fetch_add(1, SeqCst);
+ this.s.poll_next(cx)
+ }
+ }
+
+ let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>();
+ let rx = TrackPolls {
+ npolls: Arc::new(AtomicUsize::new(0)),
+ s: rx,
+ };
+ let npolls = Arc::clone(&rx.npolls);
+
+ let rt = rt();
+
+ // TODO: could probably avoid this, but why not.
+ let mut rx = Box::pin(rx);
+
+ rt.spawn(async move { while rx.next().await.is_some() {} });
+ rt.block_on(async {
+ tokio::task::yield_now().await;
+ });
+
+ // should have been polled exactly once: the initial poll
+ assert_eq!(npolls.load(SeqCst), 1);
+
+ tx.send(()).unwrap();
+ rt.block_on(async {
+ tokio::task::yield_now().await;
+ });
+
+ // should have been polled twice more: once to yield Some(), then once to yield Pending
+ assert_eq!(npolls.load(SeqCst), 1 + 2);
+
+ drop(tx);
+ rt.block_on(async {
+ tokio::task::yield_now().await;
+ });
+
+ // should have been polled once more: to yield None
+ assert_eq!(npolls.load(SeqCst), 1 + 2 + 1);
+}
+
+#[test]
+fn acquire_mutex_in_drop() {
+ use futures::future::pending;
+ use tokio::task;
+
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+
+ let rt = rt();
+
+ rt.spawn(async move {
+ let _ = rx2.await;
+ unreachable!();
+ });
+
+ rt.spawn(async move {
+ let _ = rx1.await;
+ tx2.send(()).unwrap();
+ unreachable!();
+ });
+
+ // Spawn a task that will never notify
+ rt.spawn(async move {
+ pending::<()>().await;
+ tx1.send(()).unwrap();
+ });
+
+ // Tick the loop
+ rt.block_on(async {
+ task::yield_now().await;
+ });
+
+ // Drop the rt
+ drop(rt);
+}
+
+#[test]
+fn drop_tasks_in_context() {
+ static SUCCESS: AtomicBool = AtomicBool::new(false);
+
+ struct ContextOnDrop;
+
+ impl Future for ContextOnDrop {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
+ Poll::Pending
+ }
+ }
+
+ impl Drop for ContextOnDrop {
+ fn drop(&mut self) {
+ if tokio::runtime::Handle::try_current().is_ok() {
+ SUCCESS.store(true, Ordering::SeqCst);
+ }
+ }
+ }
+
+ let rt = rt();
+ rt.spawn(ContextOnDrop);
+ drop(rt);
+
+ assert!(SUCCESS.load(Ordering::SeqCst));
+}
+
+#[test]
+#[should_panic(expected = "boom")]
+fn wake_in_drop_after_panic() {
+ let (tx, rx) = oneshot::channel::<()>();
+
+ struct WakeOnDrop(Option<oneshot::Sender<()>>);
+
+ impl Drop for WakeOnDrop {
+ fn drop(&mut self) {
+ self.0.take().unwrap().send(()).unwrap();
+ }
+ }
+
+ let rt = rt();
+
+ rt.spawn(async move {
+ let _wake_on_drop = WakeOnDrop(Some(tx));
+ // wait forever
+ futures::future::pending::<()>().await;
+ });
+
+ let _join = rt.spawn(async move { rx.await });
+
+ rt.block_on(async {
+ tokio::task::yield_now().await;
+ panic!("boom");
+ });
+}
+
+#[test]
+fn spawn_two() {
+ let rt = rt();
+
+ let out = rt.block_on(async {
+ let (tx, rx) = oneshot::channel();
+
+ tokio::spawn(async move {
+ tokio::spawn(async move {
+ tx.send("ZOMG").unwrap();
+ });
+ });
+
+ assert_ok!(rx.await)
+ });
+
+ assert_eq!(out, "ZOMG");
+
+ cfg_metrics! {
+ let metrics = rt.metrics();
+ drop(rt);
+ assert_eq!(0, metrics.remote_schedule_count());
+
+ let mut local = 0;
+ for i in 0..metrics.num_workers() {
+ local += metrics.worker_local_schedule_count(i);
+ }
+
+ assert_eq!(2, local);
+ }
+}
+
+#[test]
+fn spawn_remote() {
+ let rt = rt();
+
+ let out = rt.block_on(async {
+ let (tx, rx) = oneshot::channel();
+
+ let handle = tokio::spawn(async move {
+ std::thread::spawn(move || {
+ std::thread::sleep(Duration::from_millis(10));
+ tx.send("ZOMG").unwrap();
+ });
+
+ rx.await.unwrap()
+ });
+
+ handle.await.unwrap()
+ });
+
+ assert_eq!(out, "ZOMG");
+
+ cfg_metrics! {
+ let metrics = rt.metrics();
+ drop(rt);
+ assert_eq!(1, metrics.remote_schedule_count());
+
+ let mut local = 0;
+ for i in 0..metrics.num_workers() {
+ local += metrics.worker_local_schedule_count(i);
+ }
+
+ assert_eq!(1, local);
+ }
+}
+
+#[test]
+#[should_panic(
+ expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."
+)]
+fn timeout_panics_when_no_time_handle() {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .build()
+ .unwrap();
+ rt.block_on(async {
+ let (_tx, rx) = oneshot::channel::<()>();
+ let dur = Duration::from_millis(20);
+ let _ = timeout(dur, rx).await;
+ });
+}
+
+fn rt() -> Runtime {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+}