diff options
Diffstat (limited to 'third_party/rust/tokio/tests/rt_basic.rs')
-rw-r--r-- | third_party/rust/tokio/tests/rt_basic.rs | 296 |
1 files changed, 296 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/rt_basic.rs b/third_party/rust/tokio/tests/rt_basic.rs new file mode 100644 index 0000000000..cc6ac67728 --- /dev/null +++ b/third_party/rust/tokio/tests/rt_basic.rs @@ -0,0 +1,296 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::runtime::Runtime; +use tokio::sync::oneshot; +use tokio::time::{timeout, Duration}; +use tokio_test::{assert_err, assert_ok}; + +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::task::{Context, Poll}; +use std::thread; + +mod support { + pub(crate) mod mpsc_stream; +} + +macro_rules! cfg_metrics { + ($($t:tt)*) => { + #[cfg(tokio_unstable)] + { + $( $t )* + } + } +} + +#[test] +fn spawned_task_does_not_progress_without_block_on() { + let (tx, mut rx) = oneshot::channel(); + + let rt = rt(); + + rt.spawn(async move { + assert_ok!(tx.send("hello")); + }); + + thread::sleep(Duration::from_millis(50)); + + assert_err!(rx.try_recv()); + + let out = rt.block_on(async { assert_ok!(rx.await) }); + + assert_eq!(out, "hello"); +} + +#[test] +fn no_extra_poll() { + use pin_project_lite::pin_project; + use std::pin::Pin; + use std::sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, + }; + use std::task::{Context, Poll}; + use tokio_stream::{Stream, StreamExt}; + + pin_project! { + struct TrackPolls<S> { + npolls: Arc<AtomicUsize>, + #[pin] + s: S, + } + } + + impl<S> Stream for TrackPolls<S> + where + S: Stream, + { + type Item = S::Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let this = self.project(); + this.npolls.fetch_add(1, SeqCst); + this.s.poll_next(cx) + } + } + + let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>(); + let rx = TrackPolls { + npolls: Arc::new(AtomicUsize::new(0)), + s: rx, + }; + let npolls = Arc::clone(&rx.npolls); + + let rt = rt(); + + // TODO: could probably avoid this, but why not. + let mut rx = Box::pin(rx); + + rt.spawn(async move { while rx.next().await.is_some() {} }); + rt.block_on(async { + tokio::task::yield_now().await; + }); + + // should have been polled exactly once: the initial poll + assert_eq!(npolls.load(SeqCst), 1); + + tx.send(()).unwrap(); + rt.block_on(async { + tokio::task::yield_now().await; + }); + + // should have been polled twice more: once to yield Some(), then once to yield Pending + assert_eq!(npolls.load(SeqCst), 1 + 2); + + drop(tx); + rt.block_on(async { + tokio::task::yield_now().await; + }); + + // should have been polled once more: to yield None + assert_eq!(npolls.load(SeqCst), 1 + 2 + 1); +} + +#[test] +fn acquire_mutex_in_drop() { + use futures::future::pending; + use tokio::task; + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + let rt = rt(); + + rt.spawn(async move { + let _ = rx2.await; + unreachable!(); + }); + + rt.spawn(async move { + let _ = rx1.await; + tx2.send(()).unwrap(); + unreachable!(); + }); + + // Spawn a task that will never notify + rt.spawn(async move { + pending::<()>().await; + tx1.send(()).unwrap(); + }); + + // Tick the loop + rt.block_on(async { + task::yield_now().await; + }); + + // Drop the rt + drop(rt); +} + +#[test] +fn drop_tasks_in_context() { + static SUCCESS: AtomicBool = AtomicBool::new(false); + + struct ContextOnDrop; + + impl Future for ContextOnDrop { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + Poll::Pending + } + } + + impl Drop for ContextOnDrop { + fn drop(&mut self) { + if tokio::runtime::Handle::try_current().is_ok() { + SUCCESS.store(true, Ordering::SeqCst); + } + } + } + + let rt = rt(); + rt.spawn(ContextOnDrop); + drop(rt); + + assert!(SUCCESS.load(Ordering::SeqCst)); +} + +#[test] +#[should_panic(expected = "boom")] +fn wake_in_drop_after_panic() { + let (tx, rx) = oneshot::channel::<()>(); + + struct WakeOnDrop(Option<oneshot::Sender<()>>); + + impl Drop for WakeOnDrop { + fn drop(&mut self) { + self.0.take().unwrap().send(()).unwrap(); + } + } + + let rt = rt(); + + rt.spawn(async move { + let _wake_on_drop = WakeOnDrop(Some(tx)); + // wait forever + futures::future::pending::<()>().await; + }); + + let _join = rt.spawn(async move { rx.await }); + + rt.block_on(async { + tokio::task::yield_now().await; + panic!("boom"); + }); +} + +#[test] +fn spawn_two() { + let rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + tokio::spawn(async move { + tx.send("ZOMG").unwrap(); + }); + }); + + assert_ok!(rx.await) + }); + + assert_eq!(out, "ZOMG"); + + cfg_metrics! { + let metrics = rt.metrics(); + drop(rt); + assert_eq!(0, metrics.remote_schedule_count()); + + let mut local = 0; + for i in 0..metrics.num_workers() { + local += metrics.worker_local_schedule_count(i); + } + + assert_eq!(2, local); + } +} + +#[test] +fn spawn_remote() { + let rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + let handle = tokio::spawn(async move { + std::thread::spawn(move || { + std::thread::sleep(Duration::from_millis(10)); + tx.send("ZOMG").unwrap(); + }); + + rx.await.unwrap() + }); + + handle.await.unwrap() + }); + + assert_eq!(out, "ZOMG"); + + cfg_metrics! { + let metrics = rt.metrics(); + drop(rt); + assert_eq!(1, metrics.remote_schedule_count()); + + let mut local = 0; + for i in 0..metrics.num_workers() { + local += metrics.worker_local_schedule_count(i); + } + + assert_eq!(1, local); + } +} + +#[test] +#[should_panic( + expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers." +)] +fn timeout_panics_when_no_time_handle() { + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + rt.block_on(async { + let (_tx, rx) = oneshot::channel::<()>(); + let dur = Duration::from_millis(20); + let _ = timeout(dur, rx).await; + }); +} + +fn rt() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() +} |