summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/tests/rt_metrics.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/tests/rt_metrics.rs')
-rw-r--r--third_party/rust/tokio/tests/rt_metrics.rs718
1 files changed, 718 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/rt_metrics.rs b/third_party/rust/tokio/tests/rt_metrics.rs
new file mode 100644
index 0000000000..0fe839a2f9
--- /dev/null
+++ b/third_party/rust/tokio/tests/rt_metrics.rs
@@ -0,0 +1,718 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(feature = "full", tokio_unstable, not(tokio_wasi)))]
+
+use std::future::Future;
+use std::sync::{Arc, Mutex};
+use std::task::Poll;
+use tokio::macros::support::poll_fn;
+
+use tokio::runtime::Runtime;
+use tokio::task::consume_budget;
+use tokio::time::{self, Duration};
+
+#[test]
+fn num_workers() {
+ let rt = current_thread();
+ assert_eq!(1, rt.metrics().num_workers());
+
+ let rt = threaded();
+ assert_eq!(2, rt.metrics().num_workers());
+}
+
+#[test]
+fn num_blocking_threads() {
+ let rt = current_thread();
+ assert_eq!(0, rt.metrics().num_blocking_threads());
+ let _ = rt.block_on(rt.spawn_blocking(move || {}));
+ assert_eq!(1, rt.metrics().num_blocking_threads());
+}
+
+#[test]
+fn num_idle_blocking_threads() {
+ let rt = current_thread();
+ assert_eq!(0, rt.metrics().num_idle_blocking_threads());
+ let _ = rt.block_on(rt.spawn_blocking(move || {}));
+ rt.block_on(async {
+ time::sleep(Duration::from_millis(5)).await;
+ });
+
+ // We need to wait until the blocking thread has become idle. Usually 5ms is
+ // enough for this to happen, but not always. When it isn't enough, sleep
+ // for another second. We don't always wait for a whole second since we want
+ // the test suite to finish quickly.
+ //
+ // Note that the timeout for idle threads to be killed is 10 seconds.
+ if 0 == rt.metrics().num_idle_blocking_threads() {
+ rt.block_on(async {
+ time::sleep(Duration::from_secs(1)).await;
+ });
+ }
+
+ assert_eq!(1, rt.metrics().num_idle_blocking_threads());
+}
+
+#[test]
+fn blocking_queue_depth() {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .max_blocking_threads(1)
+ .build()
+ .unwrap();
+
+ assert_eq!(0, rt.metrics().blocking_queue_depth());
+
+ let ready = Arc::new(Mutex::new(()));
+ let guard = ready.lock().unwrap();
+
+ let ready_cloned = ready.clone();
+ let wait_until_ready = move || {
+ let _unused = ready_cloned.lock().unwrap();
+ };
+
+ let h1 = rt.spawn_blocking(wait_until_ready.clone());
+ let h2 = rt.spawn_blocking(wait_until_ready);
+ assert!(rt.metrics().blocking_queue_depth() > 0);
+
+ drop(guard);
+
+ let _ = rt.block_on(h1);
+ let _ = rt.block_on(h2);
+
+ assert_eq!(0, rt.metrics().blocking_queue_depth());
+}
+
+#[test]
+fn active_tasks_count() {
+ let rt = current_thread();
+ let metrics = rt.metrics();
+ assert_eq!(0, metrics.active_tasks_count());
+ rt.spawn(async move {
+ assert_eq!(1, metrics.active_tasks_count());
+ });
+
+ let rt = threaded();
+ let metrics = rt.metrics();
+ assert_eq!(0, metrics.active_tasks_count());
+ rt.spawn(async move {
+ assert_eq!(1, metrics.active_tasks_count());
+ });
+}
+
+#[test]
+fn remote_schedule_count() {
+ use std::thread;
+
+ let rt = current_thread();
+ let handle = rt.handle().clone();
+ let task = thread::spawn(move || {
+ handle.spawn(async {
+ // DO nothing
+ })
+ })
+ .join()
+ .unwrap();
+
+ rt.block_on(task).unwrap();
+
+ assert_eq!(1, rt.metrics().remote_schedule_count());
+
+ let rt = threaded();
+ let handle = rt.handle().clone();
+ let task = thread::spawn(move || {
+ handle.spawn(async {
+ // DO nothing
+ })
+ })
+ .join()
+ .unwrap();
+
+ rt.block_on(task).unwrap();
+
+ assert_eq!(1, rt.metrics().remote_schedule_count());
+}
+
+#[test]
+fn worker_park_count() {
+ let rt = current_thread();
+ let metrics = rt.metrics();
+ rt.block_on(async {
+ time::sleep(Duration::from_millis(1)).await;
+ });
+ drop(rt);
+ assert!(1 <= metrics.worker_park_count(0));
+
+ let rt = threaded();
+ let metrics = rt.metrics();
+ rt.block_on(async {
+ time::sleep(Duration::from_millis(1)).await;
+ });
+ drop(rt);
+ assert!(1 <= metrics.worker_park_count(0));
+ assert!(1 <= metrics.worker_park_count(1));
+}
+
+#[test]
+fn worker_noop_count() {
+ // There isn't really a great way to generate no-op parks as they happen as
+ // false-positive events under concurrency.
+
+ let rt = current_thread();
+ let metrics = rt.metrics();
+ rt.block_on(async {
+ time::sleep(Duration::from_millis(1)).await;
+ });
+ drop(rt);
+ assert!(0 < metrics.worker_noop_count(0));
+
+ let rt = threaded();
+ let metrics = rt.metrics();
+ rt.block_on(async {
+ time::sleep(Duration::from_millis(1)).await;
+ });
+ drop(rt);
+ assert!(0 < metrics.worker_noop_count(0));
+ assert!(0 < metrics.worker_noop_count(1));
+}
+
+#[test]
+fn worker_steal_count() {
+ // This metric only applies to the multi-threaded runtime.
+ //
+ // We use a blocking channel to backup one worker thread.
+ use std::sync::mpsc::channel;
+
+ let rt = threaded();
+ let metrics = rt.metrics();
+
+ rt.block_on(async {
+ let (tx, rx) = channel();
+
+ // Move to the runtime.
+ tokio::spawn(async move {
+ // Spawn the task that sends to the channel
+ tokio::spawn(async move {
+ tx.send(()).unwrap();
+ });
+
+ // Spawn a task that bumps the previous task out of the "next
+ // scheduled" slot.
+ tokio::spawn(async {});
+
+ // Blocking receive on the channel.
+ rx.recv().unwrap();
+ })
+ .await
+ .unwrap();
+ });
+
+ drop(rt);
+
+ let n: u64 = (0..metrics.num_workers())
+ .map(|i| metrics.worker_steal_count(i))
+ .sum();
+
+ assert_eq!(1, n);
+}
+
+#[test]
+fn worker_poll_count() {
+ const N: u64 = 5;
+
+ let rt = current_thread();
+ let metrics = rt.metrics();
+ rt.block_on(async {
+ for _ in 0..N {
+ tokio::spawn(async {}).await.unwrap();
+ }
+ });
+ drop(rt);
+ assert_eq!(N, metrics.worker_poll_count(0));
+
+ // Does not populate the histogram
+ assert!(!metrics.poll_count_histogram_enabled());
+ for i in 0..10 {
+ assert_eq!(0, metrics.poll_count_histogram_bucket_count(0, i));
+ }
+
+ let rt = threaded();
+ let metrics = rt.metrics();
+ rt.block_on(async {
+ for _ in 0..N {
+ tokio::spawn(async {}).await.unwrap();
+ }
+ });
+ drop(rt);
+ // Account for the `block_on` task
+ let n = (0..metrics.num_workers())
+ .map(|i| metrics.worker_poll_count(i))
+ .sum();
+
+ assert_eq!(N, n);
+
+ // Does not populate the histogram
+ assert!(!metrics.poll_count_histogram_enabled());
+ for n in 0..metrics.num_workers() {
+ for i in 0..10 {
+ assert_eq!(0, metrics.poll_count_histogram_bucket_count(n, i));
+ }
+ }
+}
+
+#[test]
+fn worker_poll_count_histogram() {
+ const N: u64 = 5;
+
+ let rts = [
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .enable_metrics_poll_count_histogram()
+ .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
+ .metrics_poll_count_histogram_buckets(3)
+ .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
+ .build()
+ .unwrap(),
+ tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(2)
+ .enable_all()
+ .enable_metrics_poll_count_histogram()
+ .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
+ .metrics_poll_count_histogram_buckets(3)
+ .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
+ .build()
+ .unwrap(),
+ ];
+
+ for rt in rts {
+ let metrics = rt.metrics();
+ rt.block_on(async {
+ for _ in 0..N {
+ tokio::spawn(async {}).await.unwrap();
+ }
+ });
+ drop(rt);
+
+ let num_workers = metrics.num_workers();
+ let num_buckets = metrics.poll_count_histogram_num_buckets();
+
+ assert!(metrics.poll_count_histogram_enabled());
+ assert_eq!(num_buckets, 3);
+
+ let n = (0..num_workers)
+ .flat_map(|i| (0..num_buckets).map(move |j| (i, j)))
+ .map(|(worker, bucket)| metrics.poll_count_histogram_bucket_count(worker, bucket))
+ .sum();
+ assert_eq!(N, n);
+ }
+}
+
+#[test]
+fn worker_poll_count_histogram_range() {
+ let max = Duration::from_nanos(u64::MAX);
+
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .enable_metrics_poll_count_histogram()
+ .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
+ .metrics_poll_count_histogram_buckets(3)
+ .metrics_poll_count_histogram_resolution(us(50))
+ .build()
+ .unwrap();
+ let metrics = rt.metrics();
+
+ assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..us(50));
+ assert_eq!(
+ metrics.poll_count_histogram_bucket_range(1),
+ us(50)..us(100)
+ );
+ assert_eq!(metrics.poll_count_histogram_bucket_range(2), us(100)..max);
+
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .enable_metrics_poll_count_histogram()
+ .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Log)
+ .metrics_poll_count_histogram_buckets(3)
+ .metrics_poll_count_histogram_resolution(us(50))
+ .build()
+ .unwrap();
+ let metrics = rt.metrics();
+
+ let a = Duration::from_nanos(50000_u64.next_power_of_two());
+ let b = a * 2;
+
+ assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..a);
+ assert_eq!(metrics.poll_count_histogram_bucket_range(1), a..b);
+ assert_eq!(metrics.poll_count_histogram_bucket_range(2), b..max);
+}
+
+#[test]
+fn worker_poll_count_histogram_disabled_without_explicit_enable() {
+ let rts = [
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
+ .metrics_poll_count_histogram_buckets(3)
+ .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
+ .build()
+ .unwrap(),
+ tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(2)
+ .enable_all()
+ .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
+ .metrics_poll_count_histogram_buckets(3)
+ .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
+ .build()
+ .unwrap(),
+ ];
+
+ for rt in rts {
+ let metrics = rt.metrics();
+ assert!(!metrics.poll_count_histogram_enabled());
+ }
+}
+
+#[test]
+fn worker_total_busy_duration() {
+ const N: usize = 5;
+
+ let zero = Duration::from_millis(0);
+
+ let rt = current_thread();
+ let metrics = rt.metrics();
+
+ rt.block_on(async {
+ for _ in 0..N {
+ tokio::spawn(async {
+ tokio::task::yield_now().await;
+ })
+ .await
+ .unwrap();
+ }
+ });
+
+ drop(rt);
+
+ assert!(zero < metrics.worker_total_busy_duration(0));
+
+ let rt = threaded();
+ let metrics = rt.metrics();
+
+ rt.block_on(async {
+ for _ in 0..N {
+ tokio::spawn(async {
+ tokio::task::yield_now().await;
+ })
+ .await
+ .unwrap();
+ }
+ });
+
+ drop(rt);
+
+ for i in 0..metrics.num_workers() {
+ assert!(zero < metrics.worker_total_busy_duration(i));
+ }
+}
+
+#[test]
+fn worker_local_schedule_count() {
+ let rt = current_thread();
+ let metrics = rt.metrics();
+ rt.block_on(async {
+ tokio::spawn(async {}).await.unwrap();
+ });
+ drop(rt);
+
+ assert_eq!(1, metrics.worker_local_schedule_count(0));
+ assert_eq!(0, metrics.remote_schedule_count());
+
+ let rt = threaded();
+ let metrics = rt.metrics();
+ rt.block_on(async {
+ // Move to the runtime
+ tokio::spawn(async {
+ tokio::spawn(async {}).await.unwrap();
+ })
+ .await
+ .unwrap();
+ });
+ drop(rt);
+
+ let n: u64 = (0..metrics.num_workers())
+ .map(|i| metrics.worker_local_schedule_count(i))
+ .sum();
+
+ assert_eq!(2, n);
+ assert_eq!(1, metrics.remote_schedule_count());
+}
+
+#[test]
+fn worker_overflow_count() {
+ // Only applies to the threaded worker
+ let rt = threaded();
+ let metrics = rt.metrics();
+ rt.block_on(async {
+ // Move to the runtime
+ tokio::spawn(async {
+ let (tx1, rx1) = std::sync::mpsc::channel();
+ let (tx2, rx2) = std::sync::mpsc::channel();
+
+ // First, we need to block the other worker until all tasks have
+ // been spawned.
+ tokio::spawn(async move {
+ tx1.send(()).unwrap();
+ rx2.recv().unwrap();
+ });
+
+ // Bump the next-run spawn
+ tokio::spawn(async {});
+
+ rx1.recv().unwrap();
+
+ // Spawn many tasks
+ for _ in 0..300 {
+ tokio::spawn(async {});
+ }
+
+ tx2.send(()).unwrap();
+ })
+ .await
+ .unwrap();
+ });
+ drop(rt);
+
+ let n: u64 = (0..metrics.num_workers())
+ .map(|i| metrics.worker_overflow_count(i))
+ .sum();
+
+ assert_eq!(1, n);
+}
+
+#[test]
+fn injection_queue_depth() {
+ use std::thread;
+
+ let rt = current_thread();
+ let handle = rt.handle().clone();
+ let metrics = rt.metrics();
+
+ thread::spawn(move || {
+ handle.spawn(async {});
+ })
+ .join()
+ .unwrap();
+
+ assert_eq!(1, metrics.injection_queue_depth());
+
+ let rt = threaded();
+ let handle = rt.handle().clone();
+ let metrics = rt.metrics();
+
+ // First we need to block the runtime workers
+ let (tx1, rx1) = std::sync::mpsc::channel();
+ let (tx2, rx2) = std::sync::mpsc::channel();
+ let (tx3, rx3) = std::sync::mpsc::channel();
+ let rx3 = Arc::new(Mutex::new(rx3));
+
+ rt.spawn(async move { rx1.recv().unwrap() });
+ rt.spawn(async move { rx2.recv().unwrap() });
+
+ // Spawn some more to make sure there are items
+ for _ in 0..10 {
+ let rx = rx3.clone();
+ rt.spawn(async move {
+ rx.lock().unwrap().recv().unwrap();
+ });
+ }
+
+ thread::spawn(move || {
+ handle.spawn(async {});
+ })
+ .join()
+ .unwrap();
+
+ let n = metrics.injection_queue_depth();
+ assert!(1 <= n, "{}", n);
+ assert!(15 >= n, "{}", n);
+
+ for _ in 0..10 {
+ tx3.send(()).unwrap();
+ }
+
+ tx1.send(()).unwrap();
+ tx2.send(()).unwrap();
+}
+
+#[test]
+fn worker_local_queue_depth() {
+ const N: usize = 100;
+
+ let rt = current_thread();
+ let metrics = rt.metrics();
+ rt.block_on(async {
+ for _ in 0..N {
+ tokio::spawn(async {});
+ }
+
+ assert_eq!(N, metrics.worker_local_queue_depth(0));
+ });
+
+ let rt = threaded();
+ let metrics = rt.metrics();
+ rt.block_on(async move {
+ // Move to the runtime
+ tokio::spawn(async move {
+ let (tx1, rx1) = std::sync::mpsc::channel();
+ let (tx2, rx2) = std::sync::mpsc::channel();
+
+ // First, we need to block the other worker until all tasks have
+ // been spawned.
+ tokio::spawn(async move {
+ tx1.send(()).unwrap();
+ rx2.recv().unwrap();
+ });
+
+ // Bump the next-run spawn
+ tokio::spawn(async {});
+
+ rx1.recv().unwrap();
+
+ // Spawn some tasks
+ for _ in 0..100 {
+ tokio::spawn(async {});
+ }
+
+ let n: usize = (0..metrics.num_workers())
+ .map(|i| metrics.worker_local_queue_depth(i))
+ .sum();
+
+ assert_eq!(n, N);
+
+ tx2.send(()).unwrap();
+ })
+ .await
+ .unwrap();
+ });
+}
+
+#[test]
+fn budget_exhaustion_yield() {
+ let rt = current_thread();
+ let metrics = rt.metrics();
+
+ assert_eq!(0, metrics.budget_forced_yield_count());
+
+ let mut did_yield = false;
+
+ // block on a task which consumes budget until it yields
+ rt.block_on(poll_fn(|cx| loop {
+ if did_yield {
+ return Poll::Ready(());
+ }
+
+ let fut = consume_budget();
+ tokio::pin!(fut);
+
+ if fut.poll(cx).is_pending() {
+ did_yield = true;
+ return Poll::Pending;
+ }
+ }));
+
+ assert_eq!(1, rt.metrics().budget_forced_yield_count());
+}
+
+#[test]
+fn budget_exhaustion_yield_with_joins() {
+ let rt = current_thread();
+ let metrics = rt.metrics();
+
+ assert_eq!(0, metrics.budget_forced_yield_count());
+
+ let mut did_yield_1 = false;
+ let mut did_yield_2 = false;
+
+ // block on a task which consumes budget until it yields
+ rt.block_on(async {
+ tokio::join!(
+ poll_fn(|cx| loop {
+ if did_yield_1 {
+ return Poll::Ready(());
+ }
+
+ let fut = consume_budget();
+ tokio::pin!(fut);
+
+ if fut.poll(cx).is_pending() {
+ did_yield_1 = true;
+ return Poll::Pending;
+ }
+ }),
+ poll_fn(|cx| loop {
+ if did_yield_2 {
+ return Poll::Ready(());
+ }
+
+ let fut = consume_budget();
+ tokio::pin!(fut);
+
+ if fut.poll(cx).is_pending() {
+ did_yield_2 = true;
+ return Poll::Pending;
+ }
+ })
+ )
+ });
+
+ assert_eq!(1, rt.metrics().budget_forced_yield_count());
+}
+
+#[cfg(any(target_os = "linux", target_os = "macos"))]
+#[test]
+fn io_driver_fd_count() {
+ let rt = current_thread();
+ let metrics = rt.metrics();
+
+ assert_eq!(metrics.io_driver_fd_registered_count(), 0);
+
+ let stream = tokio::net::TcpStream::connect("google.com:80");
+ let stream = rt.block_on(async move { stream.await.unwrap() });
+
+ assert_eq!(metrics.io_driver_fd_registered_count(), 1);
+ assert_eq!(metrics.io_driver_fd_deregistered_count(), 0);
+
+ drop(stream);
+
+ assert_eq!(metrics.io_driver_fd_deregistered_count(), 1);
+ assert_eq!(metrics.io_driver_fd_registered_count(), 1);
+}
+
+#[cfg(any(target_os = "linux", target_os = "macos"))]
+#[test]
+fn io_driver_ready_count() {
+ let rt = current_thread();
+ let metrics = rt.metrics();
+
+ let stream = tokio::net::TcpStream::connect("google.com:80");
+ let _stream = rt.block_on(async move { stream.await.unwrap() });
+
+ assert_eq!(metrics.io_driver_ready_count(), 1);
+}
+
+fn current_thread() -> Runtime {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+}
+
+fn threaded() -> Runtime {
+ tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(2)
+ .enable_all()
+ .build()
+ .unwrap()
+}
+
+fn us(n: u64) -> Duration {
+ Duration::from_micros(n)
+}