summaryrefslogtreecommitdiffstats
path: root/third_party/rust/glean-core/src/scheduler.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
commit26a029d407be480d791972afb5975cf62c9360a6 (patch)
treef435a8308119effd964b339f76abb83a57c29483 /third_party/rust/glean-core/src/scheduler.rs
parentInitial commit. (diff)
downloadfirefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz
firefox-26a029d407be480d791972afb5975cf62c9360a6.zip
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/glean-core/src/scheduler.rs')
-rw-r--r--third_party/rust/glean-core/src/scheduler.rs560
1 files changed, 560 insertions, 0 deletions
diff --git a/third_party/rust/glean-core/src/scheduler.rs b/third_party/rust/glean-core/src/scheduler.rs
new file mode 100644
index 0000000000..30fc956e25
--- /dev/null
+++ b/third_party/rust/glean-core/src/scheduler.rs
@@ -0,0 +1,560 @@
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+
+//! # Metrics Ping Scheduler
+//!
+//! The Metrics Ping Scheduler (MPS) is responsible for scheduling "metrics" pings.
+//! It implements the spec described in
+//! [the docs](https://mozilla.github.io/glean/book/user/pings/metrics.html#scheduling)
+
+use crate::metrics::{DatetimeMetric, StringMetric, TimeUnit};
+use crate::storage::INTERNAL_STORAGE;
+use crate::util::local_now_with_offset;
+use crate::{CommonMetricData, Glean, Lifetime};
+use chrono::prelude::*;
+use chrono::Duration;
+use once_cell::sync::Lazy;
+use std::sync::{Arc, Condvar, Mutex};
+use std::thread::JoinHandle;
+
+const SCHEDULED_HOUR: u32 = 4;
+
+// Clippy thinks an AtomicBool would be preferred, but Condvar requires a full Mutex.
+// See https://github.com/rust-lang/rust-clippy/issues/1516
+#[allow(clippy::mutex_atomic)]
+static TASK_CONDVAR: Lazy<Arc<(Mutex<bool>, Condvar)>> =
+ Lazy::new(|| Arc::new((Mutex::new(false), Condvar::new())));
+
+/// Describes the interface for a submitter of "metrics" pings.
+/// Used to decouple the implementation so we can test it.
+trait MetricsPingSubmitter {
+ /// Submits a metrics ping, updating the last sent time to `now`
+ /// (which might not be _right now_ due to processing delays (or in tests))
+ fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>);
+}
+
+/// Describes the interface for a scheduler of "metrics" pings.
+/// Used to decouple the implementation so we can test it.
+trait MetricsPingScheduler {
+ /// Begins a recurring schedule of "metrics" ping submissions, on another thread.
+ /// `now` is used with `when` to determine the first schedule interval and
+ /// may not be _right now_ due to processing delays (or in tests).
+ fn start_scheduler(
+ &self,
+ submitter: impl MetricsPingSubmitter + Send + 'static,
+ now: DateTime<FixedOffset>,
+ when: When,
+ );
+}
+
+/// Uses Glean to submit "metrics" pings directly.
+struct GleanMetricsPingSubmitter {}
+impl MetricsPingSubmitter for GleanMetricsPingSubmitter {
+ fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>) {
+ glean.submit_ping_by_name("metrics", reason);
+ // Always update the collection date, irrespective of the ping being sent.
+ get_last_sent_time_metric().set_sync_chrono(glean, now);
+ }
+}
+
+/// Schedule "metrics" pings directly using the default behaviour.
+struct GleanMetricsPingScheduler {}
+impl MetricsPingScheduler for GleanMetricsPingScheduler {
+ fn start_scheduler(
+ &self,
+ submitter: impl MetricsPingSubmitter + Send + 'static,
+ now: DateTime<FixedOffset>,
+ when: When,
+ ) {
+ start_scheduler(submitter, now, when);
+ }
+}
+
+/// Performs startup checks to decide when to schedule the next "metrics" ping collection.
+/// **Must** be called before draining the preinit queue.
+/// (We're at the Language Bindings' mercy for that)
+pub fn schedule(glean: &Glean) {
+ let now = local_now_with_offset();
+
+ let (cancelled_lock, _condvar) = &**TASK_CONDVAR;
+ if *cancelled_lock.lock().unwrap() {
+ log::debug!("Told to schedule, but already cancelled. Are we in a test?");
+ }
+ *cancelled_lock.lock().unwrap() = false; // Uncancel the thread.
+
+ let submitter = GleanMetricsPingSubmitter {};
+ let scheduler = GleanMetricsPingScheduler {};
+
+ schedule_internal(glean, submitter, scheduler, now)
+}
+
+/// Tells the scheduler task to exit quickly and cleanly.
+pub fn cancel() {
+ let (cancelled_lock, condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc
+ *cancelled_lock.lock().unwrap() = true; // Cancel the scheduler thread.
+ condvar.notify_all(); // Notify any/all listening schedulers to check whether they were cancelled.
+}
+
+fn schedule_internal(
+ glean: &Glean,
+ submitter: impl MetricsPingSubmitter + Send + 'static,
+ scheduler: impl MetricsPingScheduler,
+ now: DateTime<FixedOffset>,
+) {
+ let last_sent_build_metric = get_last_sent_build_metric();
+ if let Some(last_sent_build) = last_sent_build_metric.get_value(glean, Some(INTERNAL_STORAGE)) {
+ // If `app_build` is longer than StringMetric's max length, we will always
+ // treat it as a changed build when really it isn't.
+ // This will be externally-observable as InvalidOverflow errors on both the core
+ // `client_info.app_build` metric and the scheduler's internal metric.
+ if last_sent_build != glean.app_build {
+ last_sent_build_metric.set_sync(glean, &glean.app_build);
+ log::info!("App build changed. Sending 'metrics' ping");
+ submitter.submit_metrics_ping(glean, Some("upgrade"), now);
+ scheduler.start_scheduler(submitter, now, When::Reschedule);
+ return;
+ }
+ } else {
+ // No value in last_sent_build. Better set one.
+ last_sent_build_metric.set_sync(glean, &glean.app_build);
+ }
+
+ let last_sent_time = get_last_sent_time_metric().get_value(glean, INTERNAL_STORAGE);
+ if let Some(last_sent) = last_sent_time {
+ log::info!("The 'metrics' ping was last sent on {}", last_sent);
+ }
+
+ // We aim to cover 3 cases here:
+ //
+ // 1. The ping was already collected on the current calendar day;
+ // only schedule one for collection on the next calendar day at the due time.
+ // 2. The ping was NOT collected on the current calendar day AND we're later
+ // than today's due time; collect the ping immediately.
+ // 3. The ping was NOT collected on the current calendar day BUT we still have
+ // some time to the due time; schedule for submitting the current calendar day.
+
+ let already_sent_today = last_sent_time.map_or(false, |d| d.date() == now.date());
+ if already_sent_today {
+ // Case #1
+ log::info!("The 'metrics' ping was already sent today, {}", now);
+ scheduler.start_scheduler(submitter, now, When::Tomorrow);
+ } else if now > now.date().and_hms(SCHEDULED_HOUR, 0, 0) {
+ // Case #2
+ log::info!("Sending the 'metrics' ping immediately, {}", now);
+ submitter.submit_metrics_ping(glean, Some("overdue"), now);
+ scheduler.start_scheduler(submitter, now, When::Reschedule);
+ } else {
+ // Case #3
+ log::info!("The 'metrics' collection is scheduled for today, {}", now);
+ scheduler.start_scheduler(submitter, now, When::Today);
+ }
+}
+
+/// "metrics" ping scheduling deadlines.
+#[derive(Debug, PartialEq)]
+enum When {
+ Today,
+ Tomorrow,
+ Reschedule,
+}
+
+impl When {
+ /// Returns the duration from now until our deadline.
+ /// Note that std::time::Duration doesn't do negative time spans, so if
+ /// our deadline has passed, this will return zero.
+ fn until(&self, now: DateTime<FixedOffset>) -> std::time::Duration {
+ let fire_date = match self {
+ Self::Today => now.date().and_hms(SCHEDULED_HOUR, 0, 0),
+ // Doesn't actually save us from being an hour off on DST because
+ // chrono doesn't know when DST changes. : (
+ Self::Tomorrow | Self::Reschedule => {
+ (now.date() + Duration::days(1)).and_hms(SCHEDULED_HOUR, 0, 0)
+ }
+ };
+ // After rust-lang/rust#73544 can use std::time::Duration::ZERO
+ (fire_date - now)
+ .to_std()
+ .unwrap_or_else(|_| std::time::Duration::from_millis(0))
+ }
+
+ /// The "metrics" ping reason corresponding to our deadline.
+ fn reason(&self) -> &'static str {
+ match self {
+ Self::Today => "today",
+ Self::Tomorrow => "tomorrow",
+ Self::Reschedule => "reschedule",
+ }
+ }
+}
+
+fn start_scheduler(
+ submitter: impl MetricsPingSubmitter + Send + 'static,
+ now: DateTime<FixedOffset>,
+ when: When,
+) -> JoinHandle<()> {
+ let pair = Arc::clone(&TASK_CONDVAR);
+ std::thread::Builder::new()
+ .name("glean.mps".into())
+ .spawn(move || {
+ let (cancelled_lock, condvar) = &*pair;
+ let mut when = when;
+ let mut now = now;
+ loop {
+ let dur = when.until(now);
+ log::info!("Scheduling for {:?} after {}, reason {:?}", dur, now, when);
+ let mut timed_out = false;
+ {
+ match condvar.wait_timeout_while(cancelled_lock.lock().unwrap(), dur, |cancelled| !*cancelled) {
+ Err(err) => {
+ log::warn!("Condvar wait failure. MPS exiting. {}", err);
+ break;
+ }
+ Ok((cancelled, wait_result)) => {
+ if *cancelled {
+ log::info!("Metrics Ping Scheduler cancelled. Exiting.");
+ break;
+ } else if wait_result.timed_out() {
+ // Can't get the global glean while holding cancelled's lock.
+ timed_out = true;
+ } else {
+ // This should be impossible. `cancelled_lock` is acquired, and
+ // `!*cancelled` is checked by the condvar before it is allowed
+ // to return from `wait_timeout_while` (I checked).
+ // So `Ok(_)` implies `*cancelled || wait_result.timed_out`.
+ log::warn!("Spurious wakeup of the MPS condvar should be impossible.");
+ }
+ }
+ }
+ }
+ // Safety:
+ // We are okay dropping the condvar's cancelled lock here because it only guards
+ // whether we're cancelled, and we've established that we weren't when we timed out.
+ // We might _now_ be cancelled at any time, in which case when we loop back over
+ // we'll immediately exit. But first we need to submit our "metrics" ping.
+ if timed_out {
+ log::info!("Time to submit our metrics ping, {:?}", when);
+ let glean = crate::core::global_glean().expect("Global Glean not present when trying to send scheduled 'metrics' ping?!").lock().unwrap();
+ submitter.submit_metrics_ping(&glean, Some(when.reason()), now);
+ when = When::Reschedule;
+ }
+ now = local_now_with_offset();
+ }
+ }).expect("Unable to spawn Metrics Ping Scheduler thread.")
+}
+
+fn get_last_sent_time_metric() -> DatetimeMetric {
+ DatetimeMetric::new(
+ CommonMetricData {
+ name: "last_sent_time".into(),
+ category: "mps".into(),
+ send_in_pings: vec![INTERNAL_STORAGE.into()],
+ lifetime: Lifetime::User,
+ ..Default::default()
+ },
+ TimeUnit::Minute,
+ )
+}
+
+fn get_last_sent_build_metric() -> StringMetric {
+ StringMetric::new(CommonMetricData {
+ name: "last_sent_build".into(),
+ category: "mps".into(),
+ send_in_pings: vec![INTERNAL_STORAGE.into()],
+ lifetime: Lifetime::User,
+ ..Default::default()
+ })
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::tests::new_glean;
+ use std::sync::atomic::{AtomicU32, Ordering};
+
+ struct ValidatingSubmitter<F: Fn(DateTime<FixedOffset>, Option<&str>)> {
+ submit_validator: F,
+ validator_run_count: Arc<AtomicU32>,
+ }
+ struct ValidatingScheduler<F: Fn(DateTime<FixedOffset>, When)> {
+ schedule_validator: F,
+ validator_run_count: Arc<AtomicU32>,
+ }
+ impl<F: Fn(DateTime<FixedOffset>, Option<&str>)> MetricsPingSubmitter for ValidatingSubmitter<F> {
+ fn submit_metrics_ping(
+ &self,
+ _glean: &Glean,
+ reason: Option<&str>,
+ now: DateTime<FixedOffset>,
+ ) {
+ (self.submit_validator)(now, reason);
+ self.validator_run_count.fetch_add(1, Ordering::Relaxed);
+ }
+ }
+ impl<F: Fn(DateTime<FixedOffset>, When)> MetricsPingScheduler for ValidatingScheduler<F> {
+ fn start_scheduler(
+ &self,
+ _submitter: impl MetricsPingSubmitter + Send + 'static,
+ now: DateTime<FixedOffset>,
+ when: When,
+ ) {
+ (self.schedule_validator)(now, when);
+ self.validator_run_count.fetch_add(1, Ordering::Relaxed);
+ }
+ }
+
+ fn new_proxies<
+ F1: Fn(DateTime<FixedOffset>, Option<&str>),
+ F2: Fn(DateTime<FixedOffset>, When),
+ >(
+ submit_validator: F1,
+ schedule_validator: F2,
+ ) -> (
+ ValidatingSubmitter<F1>,
+ Arc<AtomicU32>,
+ ValidatingScheduler<F2>,
+ Arc<AtomicU32>,
+ ) {
+ let submitter_count = Arc::new(AtomicU32::new(0));
+ let submitter = ValidatingSubmitter {
+ submit_validator,
+ validator_run_count: Arc::clone(&submitter_count),
+ };
+ let scheduler_count = Arc::new(AtomicU32::new(0));
+ let scheduler = ValidatingScheduler {
+ schedule_validator,
+ validator_run_count: Arc::clone(&scheduler_count),
+ };
+ (submitter, submitter_count, scheduler, scheduler_count)
+ }
+
+ // Ensure on first run that we actually set the last sent build metric.
+ // (and that we send an "overdue" ping if it's after the scheduled hour)
+ #[test]
+ fn first_run_last_sent_build() {
+ let (mut glean, _t) = new_glean(None);
+
+ glean.app_build = "a build".into();
+ let lsb_metric = get_last_sent_build_metric();
+ assert_eq!(None, lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE)));
+
+ let fake_now = FixedOffset::east(0)
+ .ymd(2022, 11, 15)
+ .and_hms(SCHEDULED_HOUR, 0, 1);
+
+ let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
+ |_, reason| assert_eq!(reason, Some("overdue")),
+ |_, when| assert_eq!(when, When::Reschedule),
+ );
+
+ schedule_internal(&glean, submitter, scheduler, fake_now);
+ assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
+ assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
+
+ assert_eq!(
+ Some(glean.app_build.to_string()),
+ lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE))
+ );
+ }
+
+ // Ensure that if we have a different build, we immediately submit an "upgrade" ping
+ // and schedule a "reschedule" ping for tomorrow.
+ #[test]
+ fn different_app_builds_submit_and_reschedule() {
+ let (mut glean, _t) = new_glean(None);
+
+ glean.app_build = "a build".into();
+ get_last_sent_build_metric().set_sync(&glean, "a different build");
+
+ let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
+ |_, reason| assert_eq!(reason, Some("upgrade")),
+ |_, when| assert_eq!(when, When::Reschedule),
+ );
+
+ schedule_internal(&glean, submitter, scheduler, local_now_with_offset());
+ assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
+ assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
+ }
+
+ // If we've already sent a ping today, ensure we don't send a ping but we
+ // do schedule a ping for tomorrow. ("Case #1" in schedule_internal)
+ #[test]
+ fn case_1_no_submit_but_schedule_tomorrow() {
+ let (glean, _t) = new_glean(None);
+
+ let fake_now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(14, 36, 14);
+ get_last_sent_time_metric().set_sync_chrono(&glean, fake_now);
+
+ let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
+ |_, reason| panic!("Case #1 shouldn't submit a ping! reason: {:?}", reason),
+ |_, when| assert_eq!(when, When::Tomorrow),
+ );
+ schedule_internal(&glean, submitter, scheduler, fake_now);
+ assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
+ assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
+ }
+
+ // If we haven't sent a ping today and we're after the scheduled time,
+ // ensure we send a ping and then schedule a "reschedule" ping for tomorrow.
+ // ("Case #2" in schedule_internal)
+ #[test]
+ fn case_2_submit_ping_and_reschedule() {
+ let (glean, _t) = new_glean(None);
+
+ let fake_yesterday = FixedOffset::east(0)
+ .ymd(2021, 4, 29)
+ .and_hms(SCHEDULED_HOUR, 0, 1);
+ get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday);
+ let fake_now = fake_yesterday + Duration::days(1);
+
+ let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
+ |_, reason| assert_eq!(reason, Some("overdue")),
+ |_, when| assert_eq!(when, When::Reschedule),
+ );
+ schedule_internal(&glean, submitter, scheduler, fake_now);
+ assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
+ assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
+ }
+
+ // If we haven't sent a ping today and we're before the scheduled time,
+ // ensure we don't send a ping but schedule a "today" ping for today.
+ // ("Case #3" in schedule_internal)
+ #[test]
+ fn case_3_no_submit_but_schedule_today() {
+ let (glean, _t) = new_glean(None);
+
+ let fake_yesterday =
+ FixedOffset::east(0)
+ .ymd(2021, 4, 29)
+ .and_hms(SCHEDULED_HOUR - 1, 0, 1);
+ get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday);
+ let fake_now = fake_yesterday + Duration::days(1);
+
+ let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
+ |_, reason| panic!("Case #3 shouldn't submit a ping! reason: {:?}", reason),
+ |_, when| assert_eq!(when, When::Today),
+ );
+ schedule_internal(&glean, submitter, scheduler, fake_now);
+ assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
+ assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
+ }
+
+ // `When` is responsible for date math. Let's make sure it's correct.
+ #[test]
+ fn when_gets_at_least_some_date_math_correct() {
+ let now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(15, 2, 10);
+ // `now` is after `SCHEDULED_HOUR` so should be zero:
+ assert_eq!(std::time::Duration::from_secs(0), When::Today.until(now));
+ // If we bring it back before `SCHEDULED_HOUR` it should give us the duration:
+ let earlier = now.date().and_hms(SCHEDULED_HOUR - 1, 0, 0);
+ assert_eq!(
+ std::time::Duration::from_secs(3600),
+ When::Today.until(earlier)
+ );
+
+ // `Tomorrow` and `Reschedule` should differ only in their `reason()`
+ // 46670s is 12h57m10s (aka, the time from 15:02:10 to 04:00:00
+ // (when the timezone doesn't change between them)).
+ assert_eq!(
+ std::time::Duration::from_secs(46670),
+ When::Tomorrow.until(now)
+ );
+ assert_eq!(
+ std::time::Duration::from_secs(46670),
+ When::Reschedule.until(now)
+ );
+ assert_eq!(When::Tomorrow.until(now), When::Reschedule.until(now));
+ assert_ne!(When::Tomorrow.reason(), When::Reschedule.reason());
+ }
+
+ // Scheduler tests mutate global state and thus must not be run in parallel.
+ // Otherwise one test could cancel the other.
+ // This Mutex aims to solve that.
+ static SCHEDULER_TEST_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
+
+ // The scheduler has been designed to be cancellable. Can we cancel it?
+ #[test]
+ fn cancellable_tasks_can_be_cancelled() {
+ // First and foremost, all scheduler tests must ensure they start uncancelled.
+ // Perils of having shared state.
+ let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap();
+ let (cancelled_lock, _condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc
+ *cancelled_lock.lock().unwrap() = false;
+
+ // Pick a time at least two hours from the next scheduled submission.
+ // (So that this test will time out if cancellation fails).
+ let now = FixedOffset::east(0)
+ .ymd(2021, 4, 30)
+ .and_hms(SCHEDULED_HOUR - 2, 0, 0);
+
+ let proxy_factory = || {
+ new_proxies(
+ |_, reason| {
+ panic!(
+ "Shouldn't submit when testing scheduler. reason: {:?}",
+ reason
+ )
+ },
+ |_, _| panic!("Not even using the scheduler this time."),
+ )
+ };
+
+ // Test Today.
+ let (submitter, submitter_count, _, _) = proxy_factory();
+ let handle = start_scheduler(submitter, now, When::Today);
+ super::cancel();
+ handle.join().unwrap(); // Should complete immediately.
+ assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
+
+ // Test Tomorrow.
+ let (submitter, submitter_count, _, _) = proxy_factory();
+ *cancelled_lock.lock().unwrap() = false; // Uncancel.
+ let handle = start_scheduler(submitter, now, When::Tomorrow);
+ super::cancel();
+ handle.join().unwrap(); // Should complete immediately.
+ assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
+
+ // Test Reschedule.
+ let (submitter, submitter_count, _, _) = proxy_factory();
+ *cancelled_lock.lock().unwrap() = false; // Uncancel.
+ let handle = start_scheduler(submitter, now, When::Reschedule);
+ super::cancel();
+ handle.join().unwrap(); // Should complete immediately.
+ assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
+ }
+
+ // We're not keen to wait like the scheduler is, but we can test a quick schedule.
+ #[test]
+ fn immediate_task_runs_immediately() {
+ // First and foremost, all scheduler tests must ensure they start uncancelled.
+ // Perils of having shared state.
+ let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap();
+ let (cancelled_lock, _condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc
+ *cancelled_lock.lock().unwrap() = false;
+
+ // We're actually going to submit a ping from the scheduler, which requires a global glean.
+ let (glean, _t) = new_glean(None);
+ assert!(
+ !glean.schedule_metrics_pings,
+ "Real schedulers not allowed in tests!"
+ );
+ assert!(crate::core::setup_glean(glean).is_ok());
+
+ // We're choosing a time after SCHEDULED_HOUR so `When::Today` will give us a duration of 0.
+ let now = FixedOffset::east(0).ymd(2021, 4, 20).and_hms(15, 42, 0);
+
+ let (submitter, submitter_count, _, _) = new_proxies(
+ move |_, reason| {
+ assert_eq!(reason, Some("today"));
+ // After submitting the ping we expect, let's cancel this scheduler so the thread exits.
+ // (But do it on another thread because the condvar loop is currently holding `cancelled`'s mutex)
+ std::thread::spawn(super::cancel);
+ },
+ |_, _| panic!("Not using the scheduler this time."),
+ );
+
+ let handle = start_scheduler(submitter, now, When::Today);
+ handle.join().unwrap();
+ assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
+ }
+}