diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
commit | 26a029d407be480d791972afb5975cf62c9360a6 (patch) | |
tree | f435a8308119effd964b339f76abb83a57c29483 /third_party/rust/glean-core/src/scheduler.rs | |
parent | Initial commit. (diff) | |
download | firefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz firefox-26a029d407be480d791972afb5975cf62c9360a6.zip |
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/glean-core/src/scheduler.rs')
-rw-r--r-- | third_party/rust/glean-core/src/scheduler.rs | 560 |
1 files changed, 560 insertions, 0 deletions
diff --git a/third_party/rust/glean-core/src/scheduler.rs b/third_party/rust/glean-core/src/scheduler.rs new file mode 100644 index 0000000000..30fc956e25 --- /dev/null +++ b/third_party/rust/glean-core/src/scheduler.rs @@ -0,0 +1,560 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! # Metrics Ping Scheduler +//! +//! The Metrics Ping Scheduler (MPS) is responsible for scheduling "metrics" pings. +//! It implements the spec described in +//! [the docs](https://mozilla.github.io/glean/book/user/pings/metrics.html#scheduling) + +use crate::metrics::{DatetimeMetric, StringMetric, TimeUnit}; +use crate::storage::INTERNAL_STORAGE; +use crate::util::local_now_with_offset; +use crate::{CommonMetricData, Glean, Lifetime}; +use chrono::prelude::*; +use chrono::Duration; +use once_cell::sync::Lazy; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread::JoinHandle; + +const SCHEDULED_HOUR: u32 = 4; + +// Clippy thinks an AtomicBool would be preferred, but Condvar requires a full Mutex. +// See https://github.com/rust-lang/rust-clippy/issues/1516 +#[allow(clippy::mutex_atomic)] +static TASK_CONDVAR: Lazy<Arc<(Mutex<bool>, Condvar)>> = + Lazy::new(|| Arc::new((Mutex::new(false), Condvar::new()))); + +/// Describes the interface for a submitter of "metrics" pings. +/// Used to decouple the implementation so we can test it. +trait MetricsPingSubmitter { + /// Submits a metrics ping, updating the last sent time to `now` + /// (which might not be _right now_ due to processing delays (or in tests)) + fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>); +} + +/// Describes the interface for a scheduler of "metrics" pings. +/// Used to decouple the implementation so we can test it. +trait MetricsPingScheduler { + /// Begins a recurring schedule of "metrics" ping submissions, on another thread. + /// `now` is used with `when` to determine the first schedule interval and + /// may not be _right now_ due to processing delays (or in tests). + fn start_scheduler( + &self, + submitter: impl MetricsPingSubmitter + Send + 'static, + now: DateTime<FixedOffset>, + when: When, + ); +} + +/// Uses Glean to submit "metrics" pings directly. +struct GleanMetricsPingSubmitter {} +impl MetricsPingSubmitter for GleanMetricsPingSubmitter { + fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>) { + glean.submit_ping_by_name("metrics", reason); + // Always update the collection date, irrespective of the ping being sent. + get_last_sent_time_metric().set_sync_chrono(glean, now); + } +} + +/// Schedule "metrics" pings directly using the default behaviour. +struct GleanMetricsPingScheduler {} +impl MetricsPingScheduler for GleanMetricsPingScheduler { + fn start_scheduler( + &self, + submitter: impl MetricsPingSubmitter + Send + 'static, + now: DateTime<FixedOffset>, + when: When, + ) { + start_scheduler(submitter, now, when); + } +} + +/// Performs startup checks to decide when to schedule the next "metrics" ping collection. +/// **Must** be called before draining the preinit queue. +/// (We're at the Language Bindings' mercy for that) +pub fn schedule(glean: &Glean) { + let now = local_now_with_offset(); + + let (cancelled_lock, _condvar) = &**TASK_CONDVAR; + if *cancelled_lock.lock().unwrap() { + log::debug!("Told to schedule, but already cancelled. Are we in a test?"); + } + *cancelled_lock.lock().unwrap() = false; // Uncancel the thread. + + let submitter = GleanMetricsPingSubmitter {}; + let scheduler = GleanMetricsPingScheduler {}; + + schedule_internal(glean, submitter, scheduler, now) +} + +/// Tells the scheduler task to exit quickly and cleanly. +pub fn cancel() { + let (cancelled_lock, condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc + *cancelled_lock.lock().unwrap() = true; // Cancel the scheduler thread. + condvar.notify_all(); // Notify any/all listening schedulers to check whether they were cancelled. +} + +fn schedule_internal( + glean: &Glean, + submitter: impl MetricsPingSubmitter + Send + 'static, + scheduler: impl MetricsPingScheduler, + now: DateTime<FixedOffset>, +) { + let last_sent_build_metric = get_last_sent_build_metric(); + if let Some(last_sent_build) = last_sent_build_metric.get_value(glean, Some(INTERNAL_STORAGE)) { + // If `app_build` is longer than StringMetric's max length, we will always + // treat it as a changed build when really it isn't. + // This will be externally-observable as InvalidOverflow errors on both the core + // `client_info.app_build` metric and the scheduler's internal metric. + if last_sent_build != glean.app_build { + last_sent_build_metric.set_sync(glean, &glean.app_build); + log::info!("App build changed. Sending 'metrics' ping"); + submitter.submit_metrics_ping(glean, Some("upgrade"), now); + scheduler.start_scheduler(submitter, now, When::Reschedule); + return; + } + } else { + // No value in last_sent_build. Better set one. + last_sent_build_metric.set_sync(glean, &glean.app_build); + } + + let last_sent_time = get_last_sent_time_metric().get_value(glean, INTERNAL_STORAGE); + if let Some(last_sent) = last_sent_time { + log::info!("The 'metrics' ping was last sent on {}", last_sent); + } + + // We aim to cover 3 cases here: + // + // 1. The ping was already collected on the current calendar day; + // only schedule one for collection on the next calendar day at the due time. + // 2. The ping was NOT collected on the current calendar day AND we're later + // than today's due time; collect the ping immediately. + // 3. The ping was NOT collected on the current calendar day BUT we still have + // some time to the due time; schedule for submitting the current calendar day. + + let already_sent_today = last_sent_time.map_or(false, |d| d.date() == now.date()); + if already_sent_today { + // Case #1 + log::info!("The 'metrics' ping was already sent today, {}", now); + scheduler.start_scheduler(submitter, now, When::Tomorrow); + } else if now > now.date().and_hms(SCHEDULED_HOUR, 0, 0) { + // Case #2 + log::info!("Sending the 'metrics' ping immediately, {}", now); + submitter.submit_metrics_ping(glean, Some("overdue"), now); + scheduler.start_scheduler(submitter, now, When::Reschedule); + } else { + // Case #3 + log::info!("The 'metrics' collection is scheduled for today, {}", now); + scheduler.start_scheduler(submitter, now, When::Today); + } +} + +/// "metrics" ping scheduling deadlines. +#[derive(Debug, PartialEq)] +enum When { + Today, + Tomorrow, + Reschedule, +} + +impl When { + /// Returns the duration from now until our deadline. + /// Note that std::time::Duration doesn't do negative time spans, so if + /// our deadline has passed, this will return zero. + fn until(&self, now: DateTime<FixedOffset>) -> std::time::Duration { + let fire_date = match self { + Self::Today => now.date().and_hms(SCHEDULED_HOUR, 0, 0), + // Doesn't actually save us from being an hour off on DST because + // chrono doesn't know when DST changes. : ( + Self::Tomorrow | Self::Reschedule => { + (now.date() + Duration::days(1)).and_hms(SCHEDULED_HOUR, 0, 0) + } + }; + // After rust-lang/rust#73544 can use std::time::Duration::ZERO + (fire_date - now) + .to_std() + .unwrap_or_else(|_| std::time::Duration::from_millis(0)) + } + + /// The "metrics" ping reason corresponding to our deadline. + fn reason(&self) -> &'static str { + match self { + Self::Today => "today", + Self::Tomorrow => "tomorrow", + Self::Reschedule => "reschedule", + } + } +} + +fn start_scheduler( + submitter: impl MetricsPingSubmitter + Send + 'static, + now: DateTime<FixedOffset>, + when: When, +) -> JoinHandle<()> { + let pair = Arc::clone(&TASK_CONDVAR); + std::thread::Builder::new() + .name("glean.mps".into()) + .spawn(move || { + let (cancelled_lock, condvar) = &*pair; + let mut when = when; + let mut now = now; + loop { + let dur = when.until(now); + log::info!("Scheduling for {:?} after {}, reason {:?}", dur, now, when); + let mut timed_out = false; + { + match condvar.wait_timeout_while(cancelled_lock.lock().unwrap(), dur, |cancelled| !*cancelled) { + Err(err) => { + log::warn!("Condvar wait failure. MPS exiting. {}", err); + break; + } + Ok((cancelled, wait_result)) => { + if *cancelled { + log::info!("Metrics Ping Scheduler cancelled. Exiting."); + break; + } else if wait_result.timed_out() { + // Can't get the global glean while holding cancelled's lock. + timed_out = true; + } else { + // This should be impossible. `cancelled_lock` is acquired, and + // `!*cancelled` is checked by the condvar before it is allowed + // to return from `wait_timeout_while` (I checked). + // So `Ok(_)` implies `*cancelled || wait_result.timed_out`. + log::warn!("Spurious wakeup of the MPS condvar should be impossible."); + } + } + } + } + // Safety: + // We are okay dropping the condvar's cancelled lock here because it only guards + // whether we're cancelled, and we've established that we weren't when we timed out. + // We might _now_ be cancelled at any time, in which case when we loop back over + // we'll immediately exit. But first we need to submit our "metrics" ping. + if timed_out { + log::info!("Time to submit our metrics ping, {:?}", when); + let glean = crate::core::global_glean().expect("Global Glean not present when trying to send scheduled 'metrics' ping?!").lock().unwrap(); + submitter.submit_metrics_ping(&glean, Some(when.reason()), now); + when = When::Reschedule; + } + now = local_now_with_offset(); + } + }).expect("Unable to spawn Metrics Ping Scheduler thread.") +} + +fn get_last_sent_time_metric() -> DatetimeMetric { + DatetimeMetric::new( + CommonMetricData { + name: "last_sent_time".into(), + category: "mps".into(), + send_in_pings: vec![INTERNAL_STORAGE.into()], + lifetime: Lifetime::User, + ..Default::default() + }, + TimeUnit::Minute, + ) +} + +fn get_last_sent_build_metric() -> StringMetric { + StringMetric::new(CommonMetricData { + name: "last_sent_build".into(), + category: "mps".into(), + send_in_pings: vec![INTERNAL_STORAGE.into()], + lifetime: Lifetime::User, + ..Default::default() + }) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::tests::new_glean; + use std::sync::atomic::{AtomicU32, Ordering}; + + struct ValidatingSubmitter<F: Fn(DateTime<FixedOffset>, Option<&str>)> { + submit_validator: F, + validator_run_count: Arc<AtomicU32>, + } + struct ValidatingScheduler<F: Fn(DateTime<FixedOffset>, When)> { + schedule_validator: F, + validator_run_count: Arc<AtomicU32>, + } + impl<F: Fn(DateTime<FixedOffset>, Option<&str>)> MetricsPingSubmitter for ValidatingSubmitter<F> { + fn submit_metrics_ping( + &self, + _glean: &Glean, + reason: Option<&str>, + now: DateTime<FixedOffset>, + ) { + (self.submit_validator)(now, reason); + self.validator_run_count.fetch_add(1, Ordering::Relaxed); + } + } + impl<F: Fn(DateTime<FixedOffset>, When)> MetricsPingScheduler for ValidatingScheduler<F> { + fn start_scheduler( + &self, + _submitter: impl MetricsPingSubmitter + Send + 'static, + now: DateTime<FixedOffset>, + when: When, + ) { + (self.schedule_validator)(now, when); + self.validator_run_count.fetch_add(1, Ordering::Relaxed); + } + } + + fn new_proxies< + F1: Fn(DateTime<FixedOffset>, Option<&str>), + F2: Fn(DateTime<FixedOffset>, When), + >( + submit_validator: F1, + schedule_validator: F2, + ) -> ( + ValidatingSubmitter<F1>, + Arc<AtomicU32>, + ValidatingScheduler<F2>, + Arc<AtomicU32>, + ) { + let submitter_count = Arc::new(AtomicU32::new(0)); + let submitter = ValidatingSubmitter { + submit_validator, + validator_run_count: Arc::clone(&submitter_count), + }; + let scheduler_count = Arc::new(AtomicU32::new(0)); + let scheduler = ValidatingScheduler { + schedule_validator, + validator_run_count: Arc::clone(&scheduler_count), + }; + (submitter, submitter_count, scheduler, scheduler_count) + } + + // Ensure on first run that we actually set the last sent build metric. + // (and that we send an "overdue" ping if it's after the scheduled hour) + #[test] + fn first_run_last_sent_build() { + let (mut glean, _t) = new_glean(None); + + glean.app_build = "a build".into(); + let lsb_metric = get_last_sent_build_metric(); + assert_eq!(None, lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE))); + + let fake_now = FixedOffset::east(0) + .ymd(2022, 11, 15) + .and_hms(SCHEDULED_HOUR, 0, 1); + + let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies( + |_, reason| assert_eq!(reason, Some("overdue")), + |_, when| assert_eq!(when, When::Reschedule), + ); + + schedule_internal(&glean, submitter, scheduler, fake_now); + assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed)); + assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed)); + + assert_eq!( + Some(glean.app_build.to_string()), + lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE)) + ); + } + + // Ensure that if we have a different build, we immediately submit an "upgrade" ping + // and schedule a "reschedule" ping for tomorrow. + #[test] + fn different_app_builds_submit_and_reschedule() { + let (mut glean, _t) = new_glean(None); + + glean.app_build = "a build".into(); + get_last_sent_build_metric().set_sync(&glean, "a different build"); + + let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies( + |_, reason| assert_eq!(reason, Some("upgrade")), + |_, when| assert_eq!(when, When::Reschedule), + ); + + schedule_internal(&glean, submitter, scheduler, local_now_with_offset()); + assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed)); + assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed)); + } + + // If we've already sent a ping today, ensure we don't send a ping but we + // do schedule a ping for tomorrow. ("Case #1" in schedule_internal) + #[test] + fn case_1_no_submit_but_schedule_tomorrow() { + let (glean, _t) = new_glean(None); + + let fake_now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(14, 36, 14); + get_last_sent_time_metric().set_sync_chrono(&glean, fake_now); + + let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies( + |_, reason| panic!("Case #1 shouldn't submit a ping! reason: {:?}", reason), + |_, when| assert_eq!(when, When::Tomorrow), + ); + schedule_internal(&glean, submitter, scheduler, fake_now); + assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed)); + assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed)); + } + + // If we haven't sent a ping today and we're after the scheduled time, + // ensure we send a ping and then schedule a "reschedule" ping for tomorrow. + // ("Case #2" in schedule_internal) + #[test] + fn case_2_submit_ping_and_reschedule() { + let (glean, _t) = new_glean(None); + + let fake_yesterday = FixedOffset::east(0) + .ymd(2021, 4, 29) + .and_hms(SCHEDULED_HOUR, 0, 1); + get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday); + let fake_now = fake_yesterday + Duration::days(1); + + let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies( + |_, reason| assert_eq!(reason, Some("overdue")), + |_, when| assert_eq!(when, When::Reschedule), + ); + schedule_internal(&glean, submitter, scheduler, fake_now); + assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed)); + assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed)); + } + + // If we haven't sent a ping today and we're before the scheduled time, + // ensure we don't send a ping but schedule a "today" ping for today. + // ("Case #3" in schedule_internal) + #[test] + fn case_3_no_submit_but_schedule_today() { + let (glean, _t) = new_glean(None); + + let fake_yesterday = + FixedOffset::east(0) + .ymd(2021, 4, 29) + .and_hms(SCHEDULED_HOUR - 1, 0, 1); + get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday); + let fake_now = fake_yesterday + Duration::days(1); + + let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies( + |_, reason| panic!("Case #3 shouldn't submit a ping! reason: {:?}", reason), + |_, when| assert_eq!(when, When::Today), + ); + schedule_internal(&glean, submitter, scheduler, fake_now); + assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed)); + assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed)); + } + + // `When` is responsible for date math. Let's make sure it's correct. + #[test] + fn when_gets_at_least_some_date_math_correct() { + let now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(15, 2, 10); + // `now` is after `SCHEDULED_HOUR` so should be zero: + assert_eq!(std::time::Duration::from_secs(0), When::Today.until(now)); + // If we bring it back before `SCHEDULED_HOUR` it should give us the duration: + let earlier = now.date().and_hms(SCHEDULED_HOUR - 1, 0, 0); + assert_eq!( + std::time::Duration::from_secs(3600), + When::Today.until(earlier) + ); + + // `Tomorrow` and `Reschedule` should differ only in their `reason()` + // 46670s is 12h57m10s (aka, the time from 15:02:10 to 04:00:00 + // (when the timezone doesn't change between them)). + assert_eq!( + std::time::Duration::from_secs(46670), + When::Tomorrow.until(now) + ); + assert_eq!( + std::time::Duration::from_secs(46670), + When::Reschedule.until(now) + ); + assert_eq!(When::Tomorrow.until(now), When::Reschedule.until(now)); + assert_ne!(When::Tomorrow.reason(), When::Reschedule.reason()); + } + + // Scheduler tests mutate global state and thus must not be run in parallel. + // Otherwise one test could cancel the other. + // This Mutex aims to solve that. + static SCHEDULER_TEST_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(())); + + // The scheduler has been designed to be cancellable. Can we cancel it? + #[test] + fn cancellable_tasks_can_be_cancelled() { + // First and foremost, all scheduler tests must ensure they start uncancelled. + // Perils of having shared state. + let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap(); + let (cancelled_lock, _condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc + *cancelled_lock.lock().unwrap() = false; + + // Pick a time at least two hours from the next scheduled submission. + // (So that this test will time out if cancellation fails). + let now = FixedOffset::east(0) + .ymd(2021, 4, 30) + .and_hms(SCHEDULED_HOUR - 2, 0, 0); + + let proxy_factory = || { + new_proxies( + |_, reason| { + panic!( + "Shouldn't submit when testing scheduler. reason: {:?}", + reason + ) + }, + |_, _| panic!("Not even using the scheduler this time."), + ) + }; + + // Test Today. + let (submitter, submitter_count, _, _) = proxy_factory(); + let handle = start_scheduler(submitter, now, When::Today); + super::cancel(); + handle.join().unwrap(); // Should complete immediately. + assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed)); + + // Test Tomorrow. + let (submitter, submitter_count, _, _) = proxy_factory(); + *cancelled_lock.lock().unwrap() = false; // Uncancel. + let handle = start_scheduler(submitter, now, When::Tomorrow); + super::cancel(); + handle.join().unwrap(); // Should complete immediately. + assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed)); + + // Test Reschedule. + let (submitter, submitter_count, _, _) = proxy_factory(); + *cancelled_lock.lock().unwrap() = false; // Uncancel. + let handle = start_scheduler(submitter, now, When::Reschedule); + super::cancel(); + handle.join().unwrap(); // Should complete immediately. + assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed)); + } + + // We're not keen to wait like the scheduler is, but we can test a quick schedule. + #[test] + fn immediate_task_runs_immediately() { + // First and foremost, all scheduler tests must ensure they start uncancelled. + // Perils of having shared state. + let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap(); + let (cancelled_lock, _condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc + *cancelled_lock.lock().unwrap() = false; + + // We're actually going to submit a ping from the scheduler, which requires a global glean. + let (glean, _t) = new_glean(None); + assert!( + !glean.schedule_metrics_pings, + "Real schedulers not allowed in tests!" + ); + assert!(crate::core::setup_glean(glean).is_ok()); + + // We're choosing a time after SCHEDULED_HOUR so `When::Today` will give us a duration of 0. + let now = FixedOffset::east(0).ymd(2021, 4, 20).and_hms(15, 42, 0); + + let (submitter, submitter_count, _, _) = new_proxies( + move |_, reason| { + assert_eq!(reason, Some("today")); + // After submitting the ping we expect, let's cancel this scheduler so the thread exits. + // (But do it on another thread because the condvar loop is currently holding `cancelled`'s mutex) + std::thread::spawn(super::cancel); + }, + |_, _| panic!("Not using the scheduler this time."), + ); + + let handle = start_scheduler(submitter, now, When::Today); + handle.join().unwrap(); + assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed)); + } +} |