diff options
Diffstat (limited to 'vendor/tokio/tests/rt_basic.rs')
-rw-r--r-- | vendor/tokio/tests/rt_basic.rs | 158 |
1 files changed, 158 insertions, 0 deletions
diff --git a/vendor/tokio/tests/rt_basic.rs b/vendor/tokio/tests/rt_basic.rs new file mode 100644 index 000000000..4b1bdadc1 --- /dev/null +++ b/vendor/tokio/tests/rt_basic.rs @@ -0,0 +1,158 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::runtime::Runtime; +use tokio::sync::oneshot; +use tokio_test::{assert_err, assert_ok}; + +use std::thread; +use tokio::time::{timeout, Duration}; + +mod support { + pub(crate) mod mpsc_stream; +} + +#[test] +fn spawned_task_does_not_progress_without_block_on() { + let (tx, mut rx) = oneshot::channel(); + + let rt = rt(); + + rt.spawn(async move { + assert_ok!(tx.send("hello")); + }); + + thread::sleep(Duration::from_millis(50)); + + assert_err!(rx.try_recv()); + + let out = rt.block_on(async { assert_ok!(rx.await) }); + + assert_eq!(out, "hello"); +} + +#[test] +fn no_extra_poll() { + use pin_project_lite::pin_project; + use std::pin::Pin; + use std::sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, + }; + use std::task::{Context, Poll}; + use tokio_stream::{Stream, StreamExt}; + + pin_project! { + struct TrackPolls<S> { + npolls: Arc<AtomicUsize>, + #[pin] + s: S, + } + } + + impl<S> Stream for TrackPolls<S> + where + S: Stream, + { + type Item = S::Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let this = self.project(); + this.npolls.fetch_add(1, SeqCst); + this.s.poll_next(cx) + } + } + + let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>(); + let rx = TrackPolls { + npolls: Arc::new(AtomicUsize::new(0)), + s: rx, + }; + let npolls = Arc::clone(&rx.npolls); + + let rt = rt(); + + // TODO: could probably avoid this, but why not. + let mut rx = Box::pin(rx); + + rt.spawn(async move { while rx.next().await.is_some() {} }); + rt.block_on(async { + tokio::task::yield_now().await; + }); + + // should have been polled exactly once: the initial poll + assert_eq!(npolls.load(SeqCst), 1); + + tx.send(()).unwrap(); + rt.block_on(async { + tokio::task::yield_now().await; + }); + + // should have been polled twice more: once to yield Some(), then once to yield Pending + assert_eq!(npolls.load(SeqCst), 1 + 2); + + drop(tx); + rt.block_on(async { + tokio::task::yield_now().await; + }); + + // should have been polled once more: to yield None + assert_eq!(npolls.load(SeqCst), 1 + 2 + 1); +} + +#[test] +fn acquire_mutex_in_drop() { + use futures::future::pending; + use tokio::task; + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + let rt = rt(); + + rt.spawn(async move { + let _ = rx2.await; + unreachable!(); + }); + + rt.spawn(async move { + let _ = rx1.await; + tx2.send(()).unwrap(); + unreachable!(); + }); + + // Spawn a task that will never notify + rt.spawn(async move { + pending::<()>().await; + tx1.send(()).unwrap(); + }); + + // Tick the loop + rt.block_on(async { + task::yield_now().await; + }); + + // Drop the rt + drop(rt); +} + +#[test] +#[should_panic( + expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers." +)] +fn timeout_panics_when_no_time_handle() { + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + rt.block_on(async { + let (_tx, rx) = oneshot::channel::<()>(); + let dur = Duration::from_millis(20); + let _ = timeout(dur, rx).await; + }); +} + +fn rt() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() +} |