// This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. //! # Metrics Ping Scheduler //! //! The Metrics Ping Scheduler (MPS) is responsible for scheduling "metrics" pings. //! It implements the spec described in //! [the docs](https://mozilla.github.io/glean/book/user/pings/metrics.html#scheduling) use crate::metrics::{DatetimeMetric, StringMetric, TimeUnit}; use crate::storage::INTERNAL_STORAGE; use crate::util::local_now_with_offset; use crate::{CommonMetricData, Glean, Lifetime}; use chrono::prelude::*; use chrono::Duration; use once_cell::sync::Lazy; use std::sync::{Arc, Condvar, Mutex}; use std::thread::JoinHandle; const SCHEDULED_HOUR: u32 = 4; // Clippy thinks an AtomicBool would be preferred, but Condvar requires a full Mutex. // See https://github.com/rust-lang/rust-clippy/issues/1516 #[allow(clippy::mutex_atomic)] static TASK_CONDVAR: Lazy, Condvar)>> = Lazy::new(|| Arc::new((Mutex::new(false), Condvar::new()))); /// Describes the interface for a submitter of "metrics" pings. /// Used to decouple the implementation so we can test it. trait MetricsPingSubmitter { /// Submits a metrics ping, updating the last sent time to `now` /// (which might not be _right now_ due to processing delays (or in tests)) fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime); } /// Describes the interface for a scheduler of "metrics" pings. /// Used to decouple the implementation so we can test it. trait MetricsPingScheduler { /// Begins a recurring schedule of "metrics" ping submissions, on another thread. /// `now` is used with `when` to determine the first schedule interval and /// may not be _right now_ due to processing delays (or in tests). fn start_scheduler( &self, submitter: impl MetricsPingSubmitter + Send + 'static, now: DateTime, when: When, ); } /// Uses Glean to submit "metrics" pings directly. struct GleanMetricsPingSubmitter {} impl MetricsPingSubmitter for GleanMetricsPingSubmitter { fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime) { glean.submit_ping_by_name("metrics", reason); // Always update the collection date, irrespective of the ping being sent. get_last_sent_time_metric().set_sync_chrono(glean, now); } } /// Schedule "metrics" pings directly using the default behaviour. struct GleanMetricsPingScheduler {} impl MetricsPingScheduler for GleanMetricsPingScheduler { fn start_scheduler( &self, submitter: impl MetricsPingSubmitter + Send + 'static, now: DateTime, when: When, ) { start_scheduler(submitter, now, when); } } /// Performs startup checks to decide when to schedule the next "metrics" ping collection. /// **Must** be called before draining the preinit queue. /// (We're at the Language Bindings' mercy for that) pub fn schedule(glean: &Glean) { let now = local_now_with_offset(); let (cancelled_lock, _condvar) = &**TASK_CONDVAR; if *cancelled_lock.lock().unwrap() { log::debug!("Told to schedule, but already cancelled. Are we in a test?"); } *cancelled_lock.lock().unwrap() = false; // Uncancel the thread. let submitter = GleanMetricsPingSubmitter {}; let scheduler = GleanMetricsPingScheduler {}; schedule_internal(glean, submitter, scheduler, now) } /// Tells the scheduler task to exit quickly and cleanly. pub fn cancel() { let (cancelled_lock, condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc *cancelled_lock.lock().unwrap() = true; // Cancel the scheduler thread. condvar.notify_all(); // Notify any/all listening schedulers to check whether they were cancelled. } fn schedule_internal( glean: &Glean, submitter: impl MetricsPingSubmitter + Send + 'static, scheduler: impl MetricsPingScheduler, now: DateTime, ) { let last_sent_build_metric = get_last_sent_build_metric(); if let Some(last_sent_build) = last_sent_build_metric.get_value(glean, Some(INTERNAL_STORAGE)) { // If `app_build` is longer than StringMetric's max length, we will always // treat it as a changed build when really it isn't. // This will be externally-observable as InvalidOverflow errors on both the core // `client_info.app_build` metric and the scheduler's internal metric. if last_sent_build != glean.app_build { last_sent_build_metric.set_sync(glean, &glean.app_build); log::info!("App build changed. Sending 'metrics' ping"); submitter.submit_metrics_ping(glean, Some("upgrade"), now); scheduler.start_scheduler(submitter, now, When::Reschedule); return; } } else { // No value in last_sent_build. Better set one. last_sent_build_metric.set_sync(glean, &glean.app_build); } let last_sent_time = get_last_sent_time_metric().get_value(glean, INTERNAL_STORAGE); if let Some(last_sent) = last_sent_time { log::info!("The 'metrics' ping was last sent on {}", last_sent); } // We aim to cover 3 cases here: // // 1. The ping was already collected on the current calendar day; // only schedule one for collection on the next calendar day at the due time. // 2. The ping was NOT collected on the current calendar day AND we're later // than today's due time; collect the ping immediately. // 3. The ping was NOT collected on the current calendar day BUT we still have // some time to the due time; schedule for submitting the current calendar day. let already_sent_today = last_sent_time.map_or(false, |d| d.date() == now.date()); if already_sent_today { // Case #1 log::info!("The 'metrics' ping was already sent today, {}", now); scheduler.start_scheduler(submitter, now, When::Tomorrow); } else if now > now.date().and_hms(SCHEDULED_HOUR, 0, 0) { // Case #2 log::info!("Sending the 'metrics' ping immediately, {}", now); submitter.submit_metrics_ping(glean, Some("overdue"), now); scheduler.start_scheduler(submitter, now, When::Reschedule); } else { // Case #3 log::info!("The 'metrics' collection is scheduled for today, {}", now); scheduler.start_scheduler(submitter, now, When::Today); } } /// "metrics" ping scheduling deadlines. #[derive(Debug, PartialEq)] enum When { Today, Tomorrow, Reschedule, } impl When { /// Returns the duration from now until our deadline. /// Note that std::time::Duration doesn't do negative time spans, so if /// our deadline has passed, this will return zero. fn until(&self, now: DateTime) -> std::time::Duration { let fire_date = match self { Self::Today => now.date().and_hms(SCHEDULED_HOUR, 0, 0), // Doesn't actually save us from being an hour off on DST because // chrono doesn't know when DST changes. : ( Self::Tomorrow | Self::Reschedule => { (now.date() + Duration::days(1)).and_hms(SCHEDULED_HOUR, 0, 0) } }; // After rust-lang/rust#73544 can use std::time::Duration::ZERO (fire_date - now) .to_std() .unwrap_or_else(|_| std::time::Duration::from_millis(0)) } /// The "metrics" ping reason corresponding to our deadline. fn reason(&self) -> &'static str { match self { Self::Today => "today", Self::Tomorrow => "tomorrow", Self::Reschedule => "reschedule", } } } fn start_scheduler( submitter: impl MetricsPingSubmitter + Send + 'static, now: DateTime, when: When, ) -> JoinHandle<()> { let pair = Arc::clone(&TASK_CONDVAR); std::thread::Builder::new() .name("glean.mps".into()) .spawn(move || { let (cancelled_lock, condvar) = &*pair; let mut when = when; let mut now = now; loop { let dur = when.until(now); log::info!("Scheduling for {:?} after {}, reason {:?}", dur, now, when); let mut timed_out = false; { match condvar.wait_timeout_while(cancelled_lock.lock().unwrap(), dur, |cancelled| !*cancelled) { Err(err) => { log::warn!("Condvar wait failure. MPS exiting. {}", err); break; } Ok((cancelled, wait_result)) => { if *cancelled { log::info!("Metrics Ping Scheduler cancelled. Exiting."); break; } else if wait_result.timed_out() { // Can't get the global glean while holding cancelled's lock. timed_out = true; } else { // This should be impossible. `cancelled_lock` is acquired, and // `!*cancelled` is checked by the condvar before it is allowed // to return from `wait_timeout_while` (I checked). // So `Ok(_)` implies `*cancelled || wait_result.timed_out`. log::warn!("Spurious wakeup of the MPS condvar should be impossible."); } } } } // Safety: // We are okay dropping the condvar's cancelled lock here because it only guards // whether we're cancelled, and we've established that we weren't when we timed out. // We might _now_ be cancelled at any time, in which case when we loop back over // we'll immediately exit. But first we need to submit our "metrics" ping. if timed_out { log::info!("Time to submit our metrics ping, {:?}", when); let glean = crate::core::global_glean().expect("Global Glean not present when trying to send scheduled 'metrics' ping?!").lock().unwrap(); submitter.submit_metrics_ping(&glean, Some(when.reason()), now); when = When::Reschedule; } now = local_now_with_offset(); } }).expect("Unable to spawn Metrics Ping Scheduler thread.") } fn get_last_sent_time_metric() -> DatetimeMetric { DatetimeMetric::new( CommonMetricData { name: "last_sent_time".into(), category: "mps".into(), send_in_pings: vec![INTERNAL_STORAGE.into()], lifetime: Lifetime::User, ..Default::default() }, TimeUnit::Minute, ) } fn get_last_sent_build_metric() -> StringMetric { StringMetric::new(CommonMetricData { name: "last_sent_build".into(), category: "mps".into(), send_in_pings: vec![INTERNAL_STORAGE.into()], lifetime: Lifetime::User, ..Default::default() }) } #[cfg(test)] mod test { use super::*; use crate::tests::new_glean; use std::sync::atomic::{AtomicU32, Ordering}; struct ValidatingSubmitter, Option<&str>)> { submit_validator: F, validator_run_count: Arc, } struct ValidatingScheduler, When)> { schedule_validator: F, validator_run_count: Arc, } impl, Option<&str>)> MetricsPingSubmitter for ValidatingSubmitter { fn submit_metrics_ping( &self, _glean: &Glean, reason: Option<&str>, now: DateTime, ) { (self.submit_validator)(now, reason); self.validator_run_count.fetch_add(1, Ordering::Relaxed); } } impl, When)> MetricsPingScheduler for ValidatingScheduler { fn start_scheduler( &self, _submitter: impl MetricsPingSubmitter + Send + 'static, now: DateTime, when: When, ) { (self.schedule_validator)(now, when); self.validator_run_count.fetch_add(1, Ordering::Relaxed); } } fn new_proxies< F1: Fn(DateTime, Option<&str>), F2: Fn(DateTime, When), >( submit_validator: F1, schedule_validator: F2, ) -> ( ValidatingSubmitter, Arc, ValidatingScheduler, Arc, ) { let submitter_count = Arc::new(AtomicU32::new(0)); let submitter = ValidatingSubmitter { submit_validator, validator_run_count: Arc::clone(&submitter_count), }; let scheduler_count = Arc::new(AtomicU32::new(0)); let scheduler = ValidatingScheduler { schedule_validator, validator_run_count: Arc::clone(&scheduler_count), }; (submitter, submitter_count, scheduler, scheduler_count) } // Ensure on first run that we actually set the last sent build metric. // (and that we send an "overdue" ping if it's after the scheduled hour) #[test] fn first_run_last_sent_build() { let (mut glean, _t) = new_glean(None); glean.app_build = "a build".into(); let lsb_metric = get_last_sent_build_metric(); assert_eq!(None, lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE))); let fake_now = FixedOffset::east(0) .ymd(2022, 11, 15) .and_hms(SCHEDULED_HOUR, 0, 1); let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies( |_, reason| assert_eq!(reason, Some("overdue")), |_, when| assert_eq!(when, When::Reschedule), ); schedule_internal(&glean, submitter, scheduler, fake_now); assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed)); assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed)); assert_eq!( Some(glean.app_build.to_string()), lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE)) ); } // Ensure that if we have a different build, we immediately submit an "upgrade" ping // and schedule a "reschedule" ping for tomorrow. #[test] fn different_app_builds_submit_and_reschedule() { let (mut glean, _t) = new_glean(None); glean.app_build = "a build".into(); get_last_sent_build_metric().set_sync(&glean, "a different build"); let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies( |_, reason| assert_eq!(reason, Some("upgrade")), |_, when| assert_eq!(when, When::Reschedule), ); schedule_internal(&glean, submitter, scheduler, local_now_with_offset()); assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed)); assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed)); } // If we've already sent a ping today, ensure we don't send a ping but we // do schedule a ping for tomorrow. ("Case #1" in schedule_internal) #[test] fn case_1_no_submit_but_schedule_tomorrow() { let (glean, _t) = new_glean(None); let fake_now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(14, 36, 14); get_last_sent_time_metric().set_sync_chrono(&glean, fake_now); let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies( |_, reason| panic!("Case #1 shouldn't submit a ping! reason: {:?}", reason), |_, when| assert_eq!(when, When::Tomorrow), ); schedule_internal(&glean, submitter, scheduler, fake_now); assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed)); assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed)); } // If we haven't sent a ping today and we're after the scheduled time, // ensure we send a ping and then schedule a "reschedule" ping for tomorrow. // ("Case #2" in schedule_internal) #[test] fn case_2_submit_ping_and_reschedule() { let (glean, _t) = new_glean(None); let fake_yesterday = FixedOffset::east(0) .ymd(2021, 4, 29) .and_hms(SCHEDULED_HOUR, 0, 1); get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday); let fake_now = fake_yesterday + Duration::days(1); let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies( |_, reason| assert_eq!(reason, Some("overdue")), |_, when| assert_eq!(when, When::Reschedule), ); schedule_internal(&glean, submitter, scheduler, fake_now); assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed)); assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed)); } // If we haven't sent a ping today and we're before the scheduled time, // ensure we don't send a ping but schedule a "today" ping for today. // ("Case #3" in schedule_internal) #[test] fn case_3_no_submit_but_schedule_today() { let (glean, _t) = new_glean(None); let fake_yesterday = FixedOffset::east(0) .ymd(2021, 4, 29) .and_hms(SCHEDULED_HOUR - 1, 0, 1); get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday); let fake_now = fake_yesterday + Duration::days(1); let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies( |_, reason| panic!("Case #3 shouldn't submit a ping! reason: {:?}", reason), |_, when| assert_eq!(when, When::Today), ); schedule_internal(&glean, submitter, scheduler, fake_now); assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed)); assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed)); } // `When` is responsible for date math. Let's make sure it's correct. #[test] fn when_gets_at_least_some_date_math_correct() { let now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(15, 2, 10); // `now` is after `SCHEDULED_HOUR` so should be zero: assert_eq!(std::time::Duration::from_secs(0), When::Today.until(now)); // If we bring it back before `SCHEDULED_HOUR` it should give us the duration: let earlier = now.date().and_hms(SCHEDULED_HOUR - 1, 0, 0); assert_eq!( std::time::Duration::from_secs(3600), When::Today.until(earlier) ); // `Tomorrow` and `Reschedule` should differ only in their `reason()` // 46670s is 12h57m10s (aka, the time from 15:02:10 to 04:00:00 // (when the timezone doesn't change between them)). assert_eq!( std::time::Duration::from_secs(46670), When::Tomorrow.until(now) ); assert_eq!( std::time::Duration::from_secs(46670), When::Reschedule.until(now) ); assert_eq!(When::Tomorrow.until(now), When::Reschedule.until(now)); assert_ne!(When::Tomorrow.reason(), When::Reschedule.reason()); } // Scheduler tests mutate global state and thus must not be run in parallel. // Otherwise one test could cancel the other. // This Mutex aims to solve that. static SCHEDULER_TEST_MUTEX: Lazy> = Lazy::new(|| Mutex::new(())); // The scheduler has been designed to be cancellable. Can we cancel it? #[test] fn cancellable_tasks_can_be_cancelled() { // First and foremost, all scheduler tests must ensure they start uncancelled. // Perils of having shared state. let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap(); let (cancelled_lock, _condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc *cancelled_lock.lock().unwrap() = false; // Pick a time at least two hours from the next scheduled submission. // (So that this test will time out if cancellation fails). let now = FixedOffset::east(0) .ymd(2021, 4, 30) .and_hms(SCHEDULED_HOUR - 2, 0, 0); let proxy_factory = || { new_proxies( |_, reason| { panic!( "Shouldn't submit when testing scheduler. reason: {:?}", reason ) }, |_, _| panic!("Not even using the scheduler this time."), ) }; // Test Today. let (submitter, submitter_count, _, _) = proxy_factory(); let handle = start_scheduler(submitter, now, When::Today); super::cancel(); handle.join().unwrap(); // Should complete immediately. assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed)); // Test Tomorrow. let (submitter, submitter_count, _, _) = proxy_factory(); *cancelled_lock.lock().unwrap() = false; // Uncancel. let handle = start_scheduler(submitter, now, When::Tomorrow); super::cancel(); handle.join().unwrap(); // Should complete immediately. assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed)); // Test Reschedule. let (submitter, submitter_count, _, _) = proxy_factory(); *cancelled_lock.lock().unwrap() = false; // Uncancel. let handle = start_scheduler(submitter, now, When::Reschedule); super::cancel(); handle.join().unwrap(); // Should complete immediately. assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed)); } // We're not keen to wait like the scheduler is, but we can test a quick schedule. #[test] fn immediate_task_runs_immediately() { // First and foremost, all scheduler tests must ensure they start uncancelled. // Perils of having shared state. let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap(); let (cancelled_lock, _condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc *cancelled_lock.lock().unwrap() = false; // We're actually going to submit a ping from the scheduler, which requires a global glean. let (glean, _t) = new_glean(None); assert!( !glean.schedule_metrics_pings, "Real schedulers not allowed in tests!" ); assert!(crate::core::setup_glean(glean).is_ok()); // We're choosing a time after SCHEDULED_HOUR so `When::Today` will give us a duration of 0. let now = FixedOffset::east(0).ymd(2021, 4, 20).and_hms(15, 42, 0); let (submitter, submitter_count, _, _) = new_proxies( move |_, reason| { assert_eq!(reason, Some("today")); // After submitting the ping we expect, let's cancel this scheduler so the thread exits. // (But do it on another thread because the condvar loop is currently holding `cancelled`'s mutex) std::thread::spawn(super::cancel); }, |_, _| panic!("Not using the scheduler this time."), ); let handle = start_scheduler(submitter, now, When::Today); handle.join().unwrap(); assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed)); } }