1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
|
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//! # Metrics Ping Scheduler
//!
//! The Metrics Ping Scheduler (MPS) is responsible for scheduling "metrics" pings.
//! It implements the spec described in
//! [the docs](https://mozilla.github.io/glean/book/user/pings/metrics.html#scheduling)
use crate::metrics::{DatetimeMetric, StringMetric, TimeUnit};
use crate::storage::INTERNAL_STORAGE;
use crate::util::local_now_with_offset;
use crate::{CommonMetricData, Glean, Lifetime};
use chrono::prelude::*;
use chrono::Duration;
use once_cell::sync::Lazy;
use std::sync::{Arc, Condvar, Mutex};
use std::thread::JoinHandle;
const SCHEDULED_HOUR: u32 = 4;
// Clippy thinks an AtomicBool would be preferred, but Condvar requires a full Mutex.
// See https://github.com/rust-lang/rust-clippy/issues/1516
#[allow(clippy::mutex_atomic)]
static TASK_CONDVAR: Lazy<Arc<(Mutex<bool>, Condvar)>> =
Lazy::new(|| Arc::new((Mutex::new(false), Condvar::new())));
/// Describes the interface for a submitter of "metrics" pings.
/// Used to decouple the implementation so we can test it.
trait MetricsPingSubmitter {
/// Submits a metrics ping, updating the last sent time to `now`
/// (which might not be _right now_ due to processing delays (or in tests))
fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>);
}
/// Describes the interface for a scheduler of "metrics" pings.
/// Used to decouple the implementation so we can test it.
trait MetricsPingScheduler {
/// Begins a recurring schedule of "metrics" ping submissions, on another thread.
/// `now` is used with `when` to determine the first schedule interval and
/// may not be _right now_ due to processing delays (or in tests).
fn start_scheduler(
&self,
submitter: impl MetricsPingSubmitter + Send + 'static,
now: DateTime<FixedOffset>,
when: When,
);
}
/// Uses Glean to submit "metrics" pings directly.
struct GleanMetricsPingSubmitter {}
impl MetricsPingSubmitter for GleanMetricsPingSubmitter {
fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>) {
glean.submit_ping_by_name("metrics", reason);
// Always update the collection date, irrespective of the ping being sent.
get_last_sent_time_metric().set_sync_chrono(glean, now);
}
}
/// Schedule "metrics" pings directly using the default behaviour.
struct GleanMetricsPingScheduler {}
impl MetricsPingScheduler for GleanMetricsPingScheduler {
fn start_scheduler(
&self,
submitter: impl MetricsPingSubmitter + Send + 'static,
now: DateTime<FixedOffset>,
when: When,
) {
start_scheduler(submitter, now, when);
}
}
/// Performs startup checks to decide when to schedule the next "metrics" ping collection.
/// **Must** be called before draining the preinit queue.
/// (We're at the Language Bindings' mercy for that)
pub fn schedule(glean: &Glean) {
let now = local_now_with_offset();
let (cancelled_lock, _condvar) = &**TASK_CONDVAR;
if *cancelled_lock.lock().unwrap() {
log::debug!("Told to schedule, but already cancelled. Are we in a test?");
}
*cancelled_lock.lock().unwrap() = false; // Uncancel the thread.
let submitter = GleanMetricsPingSubmitter {};
let scheduler = GleanMetricsPingScheduler {};
schedule_internal(glean, submitter, scheduler, now)
}
/// Tells the scheduler task to exit quickly and cleanly.
pub fn cancel() {
let (cancelled_lock, condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc
*cancelled_lock.lock().unwrap() = true; // Cancel the scheduler thread.
condvar.notify_all(); // Notify any/all listening schedulers to check whether they were cancelled.
}
fn schedule_internal(
glean: &Glean,
submitter: impl MetricsPingSubmitter + Send + 'static,
scheduler: impl MetricsPingScheduler,
now: DateTime<FixedOffset>,
) {
let last_sent_build_metric = get_last_sent_build_metric();
if let Some(last_sent_build) = last_sent_build_metric.get_value(glean, Some(INTERNAL_STORAGE)) {
// If `app_build` is longer than StringMetric's max length, we will always
// treat it as a changed build when really it isn't.
// This will be externally-observable as InvalidOverflow errors on both the core
// `client_info.app_build` metric and the scheduler's internal metric.
if last_sent_build != glean.app_build {
last_sent_build_metric.set_sync(glean, &glean.app_build);
log::info!("App build changed. Sending 'metrics' ping");
submitter.submit_metrics_ping(glean, Some("upgrade"), now);
scheduler.start_scheduler(submitter, now, When::Reschedule);
return;
}
} else {
// No value in last_sent_build. Better set one.
last_sent_build_metric.set_sync(glean, &glean.app_build);
}
let last_sent_time = get_last_sent_time_metric().get_value(glean, INTERNAL_STORAGE);
if let Some(last_sent) = last_sent_time {
log::info!("The 'metrics' ping was last sent on {}", last_sent);
}
// We aim to cover 3 cases here:
//
// 1. The ping was already collected on the current calendar day;
// only schedule one for collection on the next calendar day at the due time.
// 2. The ping was NOT collected on the current calendar day AND we're later
// than today's due time; collect the ping immediately.
// 3. The ping was NOT collected on the current calendar day BUT we still have
// some time to the due time; schedule for submitting the current calendar day.
let already_sent_today = last_sent_time.map_or(false, |d| d.date() == now.date());
if already_sent_today {
// Case #1
log::info!("The 'metrics' ping was already sent today, {}", now);
scheduler.start_scheduler(submitter, now, When::Tomorrow);
} else if now > now.date().and_hms(SCHEDULED_HOUR, 0, 0) {
// Case #2
log::info!("Sending the 'metrics' ping immediately, {}", now);
submitter.submit_metrics_ping(glean, Some("overdue"), now);
scheduler.start_scheduler(submitter, now, When::Reschedule);
} else {
// Case #3
log::info!("The 'metrics' collection is scheduled for today, {}", now);
scheduler.start_scheduler(submitter, now, When::Today);
}
}
/// "metrics" ping scheduling deadlines.
#[derive(Debug, PartialEq)]
enum When {
Today,
Tomorrow,
Reschedule,
}
impl When {
/// Returns the duration from now until our deadline.
/// Note that std::time::Duration doesn't do negative time spans, so if
/// our deadline has passed, this will return zero.
fn until(&self, now: DateTime<FixedOffset>) -> std::time::Duration {
let fire_date = match self {
Self::Today => now.date().and_hms(SCHEDULED_HOUR, 0, 0),
// Doesn't actually save us from being an hour off on DST because
// chrono doesn't know when DST changes. : (
Self::Tomorrow | Self::Reschedule => {
(now.date() + Duration::days(1)).and_hms(SCHEDULED_HOUR, 0, 0)
}
};
// After rust-lang/rust#73544 can use std::time::Duration::ZERO
(fire_date - now)
.to_std()
.unwrap_or_else(|_| std::time::Duration::from_millis(0))
}
/// The "metrics" ping reason corresponding to our deadline.
fn reason(&self) -> &'static str {
match self {
Self::Today => "today",
Self::Tomorrow => "tomorrow",
Self::Reschedule => "reschedule",
}
}
}
fn start_scheduler(
submitter: impl MetricsPingSubmitter + Send + 'static,
now: DateTime<FixedOffset>,
when: When,
) -> JoinHandle<()> {
let pair = Arc::clone(&TASK_CONDVAR);
std::thread::Builder::new()
.name("glean.mps".into())
.spawn(move || {
let (cancelled_lock, condvar) = &*pair;
let mut when = when;
let mut now = now;
loop {
let dur = when.until(now);
log::info!("Scheduling for {:?} after {}, reason {:?}", dur, now, when);
let mut timed_out = false;
{
match condvar.wait_timeout_while(cancelled_lock.lock().unwrap(), dur, |cancelled| !*cancelled) {
Err(err) => {
log::warn!("Condvar wait failure. MPS exiting. {}", err);
break;
}
Ok((cancelled, wait_result)) => {
if *cancelled {
log::info!("Metrics Ping Scheduler cancelled. Exiting.");
break;
} else if wait_result.timed_out() {
// Can't get the global glean while holding cancelled's lock.
timed_out = true;
} else {
// This should be impossible. `cancelled_lock` is acquired, and
// `!*cancelled` is checked by the condvar before it is allowed
// to return from `wait_timeout_while` (I checked).
// So `Ok(_)` implies `*cancelled || wait_result.timed_out`.
log::warn!("Spurious wakeup of the MPS condvar should be impossible.");
}
}
}
}
// Safety:
// We are okay dropping the condvar's cancelled lock here because it only guards
// whether we're cancelled, and we've established that we weren't when we timed out.
// We might _now_ be cancelled at any time, in which case when we loop back over
// we'll immediately exit. But first we need to submit our "metrics" ping.
if timed_out {
log::info!("Time to submit our metrics ping, {:?}", when);
let glean = crate::core::global_glean().expect("Global Glean not present when trying to send scheduled 'metrics' ping?!").lock().unwrap();
submitter.submit_metrics_ping(&glean, Some(when.reason()), now);
when = When::Reschedule;
}
now = local_now_with_offset();
}
}).expect("Unable to spawn Metrics Ping Scheduler thread.")
}
fn get_last_sent_time_metric() -> DatetimeMetric {
DatetimeMetric::new(
CommonMetricData {
name: "last_sent_time".into(),
category: "mps".into(),
send_in_pings: vec![INTERNAL_STORAGE.into()],
lifetime: Lifetime::User,
..Default::default()
},
TimeUnit::Minute,
)
}
fn get_last_sent_build_metric() -> StringMetric {
StringMetric::new(CommonMetricData {
name: "last_sent_build".into(),
category: "mps".into(),
send_in_pings: vec![INTERNAL_STORAGE.into()],
lifetime: Lifetime::User,
..Default::default()
})
}
#[cfg(test)]
mod test {
use super::*;
use crate::tests::new_glean;
use std::sync::atomic::{AtomicU32, Ordering};
struct ValidatingSubmitter<F: Fn(DateTime<FixedOffset>, Option<&str>)> {
submit_validator: F,
validator_run_count: Arc<AtomicU32>,
}
struct ValidatingScheduler<F: Fn(DateTime<FixedOffset>, When)> {
schedule_validator: F,
validator_run_count: Arc<AtomicU32>,
}
impl<F: Fn(DateTime<FixedOffset>, Option<&str>)> MetricsPingSubmitter for ValidatingSubmitter<F> {
fn submit_metrics_ping(
&self,
_glean: &Glean,
reason: Option<&str>,
now: DateTime<FixedOffset>,
) {
(self.submit_validator)(now, reason);
self.validator_run_count.fetch_add(1, Ordering::Relaxed);
}
}
impl<F: Fn(DateTime<FixedOffset>, When)> MetricsPingScheduler for ValidatingScheduler<F> {
fn start_scheduler(
&self,
_submitter: impl MetricsPingSubmitter + Send + 'static,
now: DateTime<FixedOffset>,
when: When,
) {
(self.schedule_validator)(now, when);
self.validator_run_count.fetch_add(1, Ordering::Relaxed);
}
}
fn new_proxies<
F1: Fn(DateTime<FixedOffset>, Option<&str>),
F2: Fn(DateTime<FixedOffset>, When),
>(
submit_validator: F1,
schedule_validator: F2,
) -> (
ValidatingSubmitter<F1>,
Arc<AtomicU32>,
ValidatingScheduler<F2>,
Arc<AtomicU32>,
) {
let submitter_count = Arc::new(AtomicU32::new(0));
let submitter = ValidatingSubmitter {
submit_validator,
validator_run_count: Arc::clone(&submitter_count),
};
let scheduler_count = Arc::new(AtomicU32::new(0));
let scheduler = ValidatingScheduler {
schedule_validator,
validator_run_count: Arc::clone(&scheduler_count),
};
(submitter, submitter_count, scheduler, scheduler_count)
}
// Ensure on first run that we actually set the last sent build metric.
// (and that we send an "overdue" ping if it's after the scheduled hour)
#[test]
fn first_run_last_sent_build() {
let (mut glean, _t) = new_glean(None);
glean.app_build = "a build".into();
let lsb_metric = get_last_sent_build_metric();
assert_eq!(None, lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE)));
let fake_now = FixedOffset::east(0)
.ymd(2022, 11, 15)
.and_hms(SCHEDULED_HOUR, 0, 1);
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| assert_eq!(reason, Some("overdue")),
|_, when| assert_eq!(when, When::Reschedule),
);
schedule_internal(&glean, submitter, scheduler, fake_now);
assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
assert_eq!(
Some(glean.app_build.to_string()),
lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE))
);
}
// Ensure that if we have a different build, we immediately submit an "upgrade" ping
// and schedule a "reschedule" ping for tomorrow.
#[test]
fn different_app_builds_submit_and_reschedule() {
let (mut glean, _t) = new_glean(None);
glean.app_build = "a build".into();
get_last_sent_build_metric().set_sync(&glean, "a different build");
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| assert_eq!(reason, Some("upgrade")),
|_, when| assert_eq!(when, When::Reschedule),
);
schedule_internal(&glean, submitter, scheduler, local_now_with_offset());
assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
}
// If we've already sent a ping today, ensure we don't send a ping but we
// do schedule a ping for tomorrow. ("Case #1" in schedule_internal)
#[test]
fn case_1_no_submit_but_schedule_tomorrow() {
let (glean, _t) = new_glean(None);
let fake_now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(14, 36, 14);
get_last_sent_time_metric().set_sync_chrono(&glean, fake_now);
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| panic!("Case #1 shouldn't submit a ping! reason: {:?}", reason),
|_, when| assert_eq!(when, When::Tomorrow),
);
schedule_internal(&glean, submitter, scheduler, fake_now);
assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
}
// If we haven't sent a ping today and we're after the scheduled time,
// ensure we send a ping and then schedule a "reschedule" ping for tomorrow.
// ("Case #2" in schedule_internal)
#[test]
fn case_2_submit_ping_and_reschedule() {
let (glean, _t) = new_glean(None);
let fake_yesterday = FixedOffset::east(0)
.ymd(2021, 4, 29)
.and_hms(SCHEDULED_HOUR, 0, 1);
get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday);
let fake_now = fake_yesterday + Duration::days(1);
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| assert_eq!(reason, Some("overdue")),
|_, when| assert_eq!(when, When::Reschedule),
);
schedule_internal(&glean, submitter, scheduler, fake_now);
assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
}
// If we haven't sent a ping today and we're before the scheduled time,
// ensure we don't send a ping but schedule a "today" ping for today.
// ("Case #3" in schedule_internal)
#[test]
fn case_3_no_submit_but_schedule_today() {
let (glean, _t) = new_glean(None);
let fake_yesterday =
FixedOffset::east(0)
.ymd(2021, 4, 29)
.and_hms(SCHEDULED_HOUR - 1, 0, 1);
get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday);
let fake_now = fake_yesterday + Duration::days(1);
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| panic!("Case #3 shouldn't submit a ping! reason: {:?}", reason),
|_, when| assert_eq!(when, When::Today),
);
schedule_internal(&glean, submitter, scheduler, fake_now);
assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
}
// `When` is responsible for date math. Let's make sure it's correct.
#[test]
fn when_gets_at_least_some_date_math_correct() {
let now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(15, 2, 10);
// `now` is after `SCHEDULED_HOUR` so should be zero:
assert_eq!(std::time::Duration::from_secs(0), When::Today.until(now));
// If we bring it back before `SCHEDULED_HOUR` it should give us the duration:
let earlier = now.date().and_hms(SCHEDULED_HOUR - 1, 0, 0);
assert_eq!(
std::time::Duration::from_secs(3600),
When::Today.until(earlier)
);
// `Tomorrow` and `Reschedule` should differ only in their `reason()`
// 46670s is 12h57m10s (aka, the time from 15:02:10 to 04:00:00
// (when the timezone doesn't change between them)).
assert_eq!(
std::time::Duration::from_secs(46670),
When::Tomorrow.until(now)
);
assert_eq!(
std::time::Duration::from_secs(46670),
When::Reschedule.until(now)
);
assert_eq!(When::Tomorrow.until(now), When::Reschedule.until(now));
assert_ne!(When::Tomorrow.reason(), When::Reschedule.reason());
}
// Scheduler tests mutate global state and thus must not be run in parallel.
// Otherwise one test could cancel the other.
// This Mutex aims to solve that.
static SCHEDULER_TEST_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
// The scheduler has been designed to be cancellable. Can we cancel it?
#[test]
fn cancellable_tasks_can_be_cancelled() {
// First and foremost, all scheduler tests must ensure they start uncancelled.
// Perils of having shared state.
let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap();
let (cancelled_lock, _condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc
*cancelled_lock.lock().unwrap() = false;
// Pick a time at least two hours from the next scheduled submission.
// (So that this test will time out if cancellation fails).
let now = FixedOffset::east(0)
.ymd(2021, 4, 30)
.and_hms(SCHEDULED_HOUR - 2, 0, 0);
let proxy_factory = || {
new_proxies(
|_, reason| {
panic!(
"Shouldn't submit when testing scheduler. reason: {:?}",
reason
)
},
|_, _| panic!("Not even using the scheduler this time."),
)
};
// Test Today.
let (submitter, submitter_count, _, _) = proxy_factory();
let handle = start_scheduler(submitter, now, When::Today);
super::cancel();
handle.join().unwrap(); // Should complete immediately.
assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
// Test Tomorrow.
let (submitter, submitter_count, _, _) = proxy_factory();
*cancelled_lock.lock().unwrap() = false; // Uncancel.
let handle = start_scheduler(submitter, now, When::Tomorrow);
super::cancel();
handle.join().unwrap(); // Should complete immediately.
assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
// Test Reschedule.
let (submitter, submitter_count, _, _) = proxy_factory();
*cancelled_lock.lock().unwrap() = false; // Uncancel.
let handle = start_scheduler(submitter, now, When::Reschedule);
super::cancel();
handle.join().unwrap(); // Should complete immediately.
assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
}
// We're not keen to wait like the scheduler is, but we can test a quick schedule.
#[test]
fn immediate_task_runs_immediately() {
// First and foremost, all scheduler tests must ensure they start uncancelled.
// Perils of having shared state.
let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap();
let (cancelled_lock, _condvar) = &**TASK_CONDVAR; // One `*` for Lazy, the second for Arc
*cancelled_lock.lock().unwrap() = false;
// We're actually going to submit a ping from the scheduler, which requires a global glean.
let (glean, _t) = new_glean(None);
assert!(
!glean.schedule_metrics_pings,
"Real schedulers not allowed in tests!"
);
assert!(crate::core::setup_glean(glean).is_ok());
// We're choosing a time after SCHEDULED_HOUR so `When::Today` will give us a duration of 0.
let now = FixedOffset::east(0).ymd(2021, 4, 20).and_hms(15, 42, 0);
let (submitter, submitter_count, _, _) = new_proxies(
move |_, reason| {
assert_eq!(reason, Some("today"));
// After submitting the ping we expect, let's cancel this scheduler so the thread exits.
// (But do it on another thread because the condvar loop is currently holding `cancelled`'s mutex)
std::thread::spawn(super::cancel);
},
|_, _| panic!("Not using the scheduler this time."),
);
let handle = start_scheduler(submitter, now, When::Today);
handle.join().unwrap();
assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
}
}
|