From 36d22d82aa202bb199967e9512281e9a53db42c9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 21:33:14 +0200 Subject: Adding upstream version 115.7.0esr. Signed-off-by: Daniel Baumann --- .../rust/glean-core/src/common_metric_data.rs | 149 ++ third_party/rust/glean-core/src/core/mod.rs | 942 +++++++++++ third_party/rust/glean-core/src/core_metrics.rs | 206 +++ third_party/rust/glean-core/src/coverage.rs | 47 + third_party/rust/glean-core/src/database/mod.rs | 1766 ++++++++++++++++++++ third_party/rust/glean-core/src/debug.rs | 319 ++++ .../rust/glean-core/src/dispatcher/global.rs | 232 +++ third_party/rust/glean-core/src/dispatcher/mod.rs | 589 +++++++ third_party/rust/glean-core/src/error.rs | 169 ++ third_party/rust/glean-core/src/error_recording.rs | 239 +++ .../rust/glean-core/src/event_database/mod.rs | 1299 ++++++++++++++ third_party/rust/glean-core/src/fd_logger.rs | 85 + third_party/rust/glean-core/src/glean.udl | 601 +++++++ third_party/rust/glean-core/src/glean_metrics.rs | 26 + .../rust/glean-core/src/histogram/exponential.rs | 206 +++ .../rust/glean-core/src/histogram/functional.rs | 174 ++ .../rust/glean-core/src/histogram/linear.rs | 178 ++ third_party/rust/glean-core/src/histogram/mod.rs | 139 ++ .../rust/glean-core/src/internal_metrics.rs | 261 +++ third_party/rust/glean-core/src/internal_pings.rs | 64 + third_party/rust/glean-core/src/lib.rs | 1108 ++++++++++++ third_party/rust/glean-core/src/lib_unit_tests.rs | 1080 ++++++++++++ third_party/rust/glean-core/src/metrics/boolean.rs | 134 ++ third_party/rust/glean-core/src/metrics/counter.rs | 163 ++ .../glean-core/src/metrics/custom_distribution.rs | 222 +++ .../rust/glean-core/src/metrics/datetime.rs | 327 ++++ .../rust/glean-core/src/metrics/denominator.rs | 140 ++ third_party/rust/glean-core/src/metrics/event.rs | 189 +++ .../rust/glean-core/src/metrics/experiment.rs | 266 +++ third_party/rust/glean-core/src/metrics/labeled.rs | 294 ++++ .../glean-core/src/metrics/memory_distribution.rs | 282 ++++ .../rust/glean-core/src/metrics/memory_unit.rs | 64 + .../src/metrics/metrics_enabled_config.rs | 46 + third_party/rust/glean-core/src/metrics/mod.rs | 285 ++++ .../rust/glean-core/src/metrics/numerator.rs | 94 ++ third_party/rust/glean-core/src/metrics/ping.rs | 201 +++ .../rust/glean-core/src/metrics/quantity.rs | 126 ++ third_party/rust/glean-core/src/metrics/rate.rs | 191 +++ .../glean-core/src/metrics/recorded_experiment.rs | 35 + third_party/rust/glean-core/src/metrics/string.rs | 176 ++ .../rust/glean-core/src/metrics/string_list.rs | 199 +++ third_party/rust/glean-core/src/metrics/text.rs | 180 ++ .../rust/glean-core/src/metrics/time_unit.rs | 117 ++ .../rust/glean-core/src/metrics/timespan.rs | 308 ++++ .../glean-core/src/metrics/timing_distribution.rs | 557 ++++++ third_party/rust/glean-core/src/metrics/url.rs | 312 ++++ third_party/rust/glean-core/src/metrics/uuid.rs | 159 ++ third_party/rust/glean-core/src/ping/mod.rs | 391 +++++ third_party/rust/glean-core/src/scheduler.rs | 560 +++++++ third_party/rust/glean-core/src/storage/mod.rs | 283 ++++ third_party/rust/glean-core/src/system.rs | 82 + third_party/rust/glean-core/src/traits/boolean.rs | 43 + third_party/rust/glean-core/src/traits/counter.rs | 47 + .../glean-core/src/traits/custom_distribution.rs | 58 + third_party/rust/glean-core/src/traits/datetime.rs | 52 + third_party/rust/glean-core/src/traits/event.rs | 118 ++ third_party/rust/glean-core/src/traits/labeled.rs | 40 + .../glean-core/src/traits/memory_distribution.rs | 54 + third_party/rust/glean-core/src/traits/mod.rs | 50 + .../rust/glean-core/src/traits/numerator.rs | 47 + third_party/rust/glean-core/src/traits/ping.rs | 17 + third_party/rust/glean-core/src/traits/quantity.rs | 47 + third_party/rust/glean-core/src/traits/rate.rs | 57 + third_party/rust/glean-core/src/traits/string.rs | 50 + .../rust/glean-core/src/traits/string_list.rs | 60 + third_party/rust/glean-core/src/traits/text.rs | 50 + third_party/rust/glean-core/src/traits/timespan.rs | 67 + .../glean-core/src/traits/timing_distribution.rs | 73 + third_party/rust/glean-core/src/traits/url.rs | 51 + third_party/rust/glean-core/src/traits/uuid.rs | 46 + .../rust/glean-core/src/upload/directory.rs | 420 +++++ third_party/rust/glean-core/src/upload/mod.rs | 1683 +++++++++++++++++++ third_party/rust/glean-core/src/upload/policy.rs | 112 ++ third_party/rust/glean-core/src/upload/request.rs | 289 ++++ third_party/rust/glean-core/src/upload/result.rs | 98 ++ third_party/rust/glean-core/src/util.rs | 312 ++++ 76 files changed, 20203 insertions(+) create mode 100644 third_party/rust/glean-core/src/common_metric_data.rs create mode 100644 third_party/rust/glean-core/src/core/mod.rs create mode 100644 third_party/rust/glean-core/src/core_metrics.rs create mode 100644 third_party/rust/glean-core/src/coverage.rs create mode 100644 third_party/rust/glean-core/src/database/mod.rs create mode 100644 third_party/rust/glean-core/src/debug.rs create mode 100644 third_party/rust/glean-core/src/dispatcher/global.rs create mode 100644 third_party/rust/glean-core/src/dispatcher/mod.rs create mode 100644 third_party/rust/glean-core/src/error.rs create mode 100644 third_party/rust/glean-core/src/error_recording.rs create mode 100644 third_party/rust/glean-core/src/event_database/mod.rs create mode 100644 third_party/rust/glean-core/src/fd_logger.rs create mode 100644 third_party/rust/glean-core/src/glean.udl create mode 100644 third_party/rust/glean-core/src/glean_metrics.rs create mode 100644 third_party/rust/glean-core/src/histogram/exponential.rs create mode 100644 third_party/rust/glean-core/src/histogram/functional.rs create mode 100644 third_party/rust/glean-core/src/histogram/linear.rs create mode 100644 third_party/rust/glean-core/src/histogram/mod.rs create mode 100644 third_party/rust/glean-core/src/internal_metrics.rs create mode 100644 third_party/rust/glean-core/src/internal_pings.rs create mode 100644 third_party/rust/glean-core/src/lib.rs create mode 100644 third_party/rust/glean-core/src/lib_unit_tests.rs create mode 100644 third_party/rust/glean-core/src/metrics/boolean.rs create mode 100644 third_party/rust/glean-core/src/metrics/counter.rs create mode 100644 third_party/rust/glean-core/src/metrics/custom_distribution.rs create mode 100644 third_party/rust/glean-core/src/metrics/datetime.rs create mode 100644 third_party/rust/glean-core/src/metrics/denominator.rs create mode 100644 third_party/rust/glean-core/src/metrics/event.rs create mode 100644 third_party/rust/glean-core/src/metrics/experiment.rs create mode 100644 third_party/rust/glean-core/src/metrics/labeled.rs create mode 100644 third_party/rust/glean-core/src/metrics/memory_distribution.rs create mode 100644 third_party/rust/glean-core/src/metrics/memory_unit.rs create mode 100644 third_party/rust/glean-core/src/metrics/metrics_enabled_config.rs create mode 100644 third_party/rust/glean-core/src/metrics/mod.rs create mode 100644 third_party/rust/glean-core/src/metrics/numerator.rs create mode 100644 third_party/rust/glean-core/src/metrics/ping.rs create mode 100644 third_party/rust/glean-core/src/metrics/quantity.rs create mode 100644 third_party/rust/glean-core/src/metrics/rate.rs create mode 100644 third_party/rust/glean-core/src/metrics/recorded_experiment.rs create mode 100644 third_party/rust/glean-core/src/metrics/string.rs create mode 100644 third_party/rust/glean-core/src/metrics/string_list.rs create mode 100644 third_party/rust/glean-core/src/metrics/text.rs create mode 100644 third_party/rust/glean-core/src/metrics/time_unit.rs create mode 100644 third_party/rust/glean-core/src/metrics/timespan.rs create mode 100644 third_party/rust/glean-core/src/metrics/timing_distribution.rs create mode 100644 third_party/rust/glean-core/src/metrics/url.rs create mode 100644 third_party/rust/glean-core/src/metrics/uuid.rs create mode 100644 third_party/rust/glean-core/src/ping/mod.rs create mode 100644 third_party/rust/glean-core/src/scheduler.rs create mode 100644 third_party/rust/glean-core/src/storage/mod.rs create mode 100644 third_party/rust/glean-core/src/system.rs create mode 100644 third_party/rust/glean-core/src/traits/boolean.rs create mode 100644 third_party/rust/glean-core/src/traits/counter.rs create mode 100644 third_party/rust/glean-core/src/traits/custom_distribution.rs create mode 100644 third_party/rust/glean-core/src/traits/datetime.rs create mode 100644 third_party/rust/glean-core/src/traits/event.rs create mode 100644 third_party/rust/glean-core/src/traits/labeled.rs create mode 100644 third_party/rust/glean-core/src/traits/memory_distribution.rs create mode 100644 third_party/rust/glean-core/src/traits/mod.rs create mode 100644 third_party/rust/glean-core/src/traits/numerator.rs create mode 100644 third_party/rust/glean-core/src/traits/ping.rs create mode 100644 third_party/rust/glean-core/src/traits/quantity.rs create mode 100644 third_party/rust/glean-core/src/traits/rate.rs create mode 100644 third_party/rust/glean-core/src/traits/string.rs create mode 100644 third_party/rust/glean-core/src/traits/string_list.rs create mode 100644 third_party/rust/glean-core/src/traits/text.rs create mode 100644 third_party/rust/glean-core/src/traits/timespan.rs create mode 100644 third_party/rust/glean-core/src/traits/timing_distribution.rs create mode 100644 third_party/rust/glean-core/src/traits/url.rs create mode 100644 third_party/rust/glean-core/src/traits/uuid.rs create mode 100644 third_party/rust/glean-core/src/upload/directory.rs create mode 100644 third_party/rust/glean-core/src/upload/mod.rs create mode 100644 third_party/rust/glean-core/src/upload/policy.rs create mode 100644 third_party/rust/glean-core/src/upload/request.rs create mode 100644 third_party/rust/glean-core/src/upload/result.rs create mode 100644 third_party/rust/glean-core/src/util.rs (limited to 'third_party/rust/glean-core/src') diff --git a/third_party/rust/glean-core/src/common_metric_data.rs b/third_party/rust/glean-core/src/common_metric_data.rs new file mode 100644 index 0000000000..033cbe1472 --- /dev/null +++ b/third_party/rust/glean-core/src/common_metric_data.rs @@ -0,0 +1,149 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use std::convert::TryFrom; +use std::sync::atomic::{AtomicU8, Ordering}; + +use crate::error::{Error, ErrorKind}; +use crate::metrics::labeled::validate_dynamic_label; +use crate::Glean; +use serde::{Deserialize, Serialize}; + +/// The supported metrics' lifetimes. +/// +/// A metric's lifetime determines when its stored data gets reset. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize, Default)] +#[repr(i32)] // Use i32 to be compatible with our JNA definition +#[serde(rename_all = "lowercase")] +pub enum Lifetime { + /// The metric is reset with each sent ping + #[default] + Ping, + /// The metric is reset on application restart + Application, + /// The metric is reset with each user profile + User, +} + +impl Lifetime { + /// String representation of the lifetime. + pub fn as_str(self) -> &'static str { + match self { + Lifetime::Ping => "ping", + Lifetime::Application => "app", + Lifetime::User => "user", + } + } +} + +impl TryFrom for Lifetime { + type Error = Error; + + fn try_from(value: i32) -> Result { + match value { + 0 => Ok(Lifetime::Ping), + 1 => Ok(Lifetime::Application), + 2 => Ok(Lifetime::User), + e => Err(ErrorKind::Lifetime(e).into()), + } + } +} + +/// The common set of data shared across all different metric types. +#[derive(Default, Debug, Clone, Deserialize, Serialize)] +pub struct CommonMetricData { + /// The metric's name. + pub name: String, + /// The metric's category. + pub category: String, + /// List of ping names to include this metric in. + pub send_in_pings: Vec, + /// The metric's lifetime. + pub lifetime: Lifetime, + /// Whether or not the metric is disabled. + /// + /// Disabled metrics are never recorded. + pub disabled: bool, + /// Dynamic label. + /// + /// When a [`LabeledMetric`](crate::metrics::LabeledMetric) factory creates the specific + /// metric to be recorded to, dynamic labels are stored in the specific + /// label so that we can validate them when the Glean singleton is + /// available. + pub dynamic_label: Option, +} + +#[derive(Default, Debug)] +pub struct CommonMetricDataInternal { + pub inner: CommonMetricData, + pub disabled: AtomicU8, +} + +impl Clone for CommonMetricDataInternal { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + disabled: AtomicU8::new(self.disabled.load(Ordering::Relaxed)), + } + } +} + +impl From for CommonMetricDataInternal { + fn from(input_data: CommonMetricData) -> Self { + Self { + inner: input_data.clone(), + disabled: AtomicU8::new(u8::from(input_data.disabled)), + } + } +} + +impl CommonMetricDataInternal { + /// Creates a new metadata object. + pub fn new, B: Into, C: Into>( + category: A, + name: B, + ping_name: C, + ) -> CommonMetricDataInternal { + CommonMetricDataInternal { + inner: CommonMetricData { + name: name.into(), + category: category.into(), + send_in_pings: vec![ping_name.into()], + ..Default::default() + }, + disabled: AtomicU8::new(0), + } + } + + /// The metric's base identifier, including the category and name, but not the label. + /// + /// If `category` is empty, it's ommitted. + /// Otherwise, it's the combination of the metric's `category` and `name`. + pub(crate) fn base_identifier(&self) -> String { + if self.inner.category.is_empty() { + self.inner.name.clone() + } else { + format!("{}.{}", self.inner.category, self.inner.name) + } + } + + /// The metric's unique identifier, including the category, name and label. + /// + /// If `category` is empty, it's ommitted. + /// Otherwise, it's the combination of the metric's `category`, `name` and `label`. + pub(crate) fn identifier(&self, glean: &Glean) -> String { + let base_identifier = self.base_identifier(); + + if let Some(label) = &self.inner.dynamic_label { + validate_dynamic_label(glean, self, &base_identifier, label) + } else { + base_identifier + } + } + + /// The list of storages this metric should be recorded into. + pub fn storage_names(&self) -> &[String] { + &self.inner.send_in_pings + } +} diff --git a/third_party/rust/glean-core/src/core/mod.rs b/third_party/rust/glean-core/src/core/mod.rs new file mode 100644 index 0000000000..29ee1e52c7 --- /dev/null +++ b/third_party/rust/glean-core/src/core/mod.rs @@ -0,0 +1,942 @@ +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::{Arc, Mutex}; + +use chrono::{DateTime, FixedOffset}; +use once_cell::sync::OnceCell; + +use crate::database::Database; +use crate::debug::DebugOptions; +use crate::event_database::EventDatabase; +use crate::internal_metrics::{AdditionalMetrics, CoreMetrics, DatabaseMetrics}; +use crate::internal_pings::InternalPings; +use crate::metrics::{ + self, ExperimentMetric, Metric, MetricType, MetricsEnabledConfig, PingType, RecordedExperiment, +}; +use crate::ping::PingMaker; +use crate::storage::{StorageManager, INTERNAL_STORAGE}; +use crate::upload::{PingUploadManager, PingUploadTask, UploadResult, UploadTaskAction}; +use crate::util::{local_now_with_offset, sanitize_application_id}; +use crate::{ + scheduler, system, CommonMetricData, ErrorKind, InternalConfiguration, Lifetime, Result, + DEFAULT_MAX_EVENTS, GLEAN_SCHEMA_VERSION, GLEAN_VERSION, KNOWN_CLIENT_ID, +}; + +static GLEAN: OnceCell> = OnceCell::new(); + +pub fn global_glean() -> Option<&'static Mutex> { + GLEAN.get() +} + +/// Sets or replaces the global Glean object. +pub fn setup_glean(glean: Glean) -> Result<()> { + // The `OnceCell` type wrapping our Glean is thread-safe and can only be set once. + // Therefore even if our check for it being empty succeeds, setting it could fail if a + // concurrent thread is quicker in setting it. + // However this will not cause a bigger problem, as the second `set` operation will just fail. + // We can log it and move on. + // + // For all wrappers this is not a problem, as the Glean object is intialized exactly once on + // calling `initialize` on the global singleton and further operations check that it has been + // initialized. + if GLEAN.get().is_none() { + if GLEAN.set(Mutex::new(glean)).is_err() { + log::warn!( + "Global Glean object is initialized already. This probably happened concurrently." + ) + } + } else { + // We allow overriding the global Glean object to support test mode. + // In test mode the Glean object is fully destroyed and recreated. + // This all happens behind a mutex and is therefore also thread-safe.. + let mut lock = GLEAN.get().unwrap().lock().unwrap(); + *lock = glean; + } + Ok(()) +} + +/// Execute `f` passing the global Glean object. +/// +/// Panics if the global Glean object has not been set. +pub fn with_glean(f: F) -> R +where + F: FnOnce(&Glean) -> R, +{ + let glean = global_glean().expect("Global Glean object not initialized"); + let lock = glean.lock().unwrap(); + f(&lock) +} + +/// Execute `f` passing the global Glean object mutable. +/// +/// Panics if the global Glean object has not been set. +pub fn with_glean_mut(f: F) -> R +where + F: FnOnce(&mut Glean) -> R, +{ + let glean = global_glean().expect("Global Glean object not initialized"); + let mut lock = glean.lock().unwrap(); + f(&mut lock) +} + +/// Execute `f` passing the global Glean object if it has been set. +/// +/// Returns `None` if the global Glean object has not been set. +/// Returns `Some(T)` otherwise. +pub fn with_opt_glean(f: F) -> Option +where + F: FnOnce(&Glean) -> R, +{ + let glean = global_glean()?; + let lock = glean.lock().unwrap(); + Some(f(&lock)) +} + +/// The object holding meta information about a Glean instance. +/// +/// ## Example +/// +/// Create a new Glean instance, register a ping, record a simple counter and then send the final +/// ping. +/// +/// ```rust,no_run +/// # use glean_core::{Glean, InternalConfiguration, CommonMetricData, metrics::*}; +/// let cfg = InternalConfiguration { +/// data_path: "/tmp/glean".into(), +/// application_id: "glean.sample.app".into(), +/// language_binding_name: "Rust".into(), +/// upload_enabled: true, +/// max_events: None, +/// delay_ping_lifetime_io: false, +/// app_build: "".into(), +/// use_core_mps: false, +/// trim_data_to_registered_pings: false, +/// log_level: None, +/// }; +/// let mut glean = Glean::new(cfg).unwrap(); +/// let ping = PingType::new("sample", true, false, vec![]); +/// glean.register_ping_type(&ping); +/// +/// let call_counter: CounterMetric = CounterMetric::new(CommonMetricData { +/// name: "calls".into(), +/// category: "local".into(), +/// send_in_pings: vec!["sample".into()], +/// ..Default::default() +/// }); +/// +/// call_counter.add_sync(&glean, 1); +/// +/// ping.submit_sync(&glean, None); +/// ``` +/// +/// ## Note +/// +/// In specific language bindings, this is usually wrapped in a singleton and all metric recording goes to a single instance of this object. +/// In the Rust core, it is possible to create multiple instances, which is used in testing. +#[derive(Debug)] +pub struct Glean { + upload_enabled: bool, + pub(crate) data_store: Option, + event_data_store: EventDatabase, + pub(crate) core_metrics: CoreMetrics, + pub(crate) additional_metrics: AdditionalMetrics, + pub(crate) database_metrics: DatabaseMetrics, + pub(crate) internal_pings: InternalPings, + data_path: PathBuf, + application_id: String, + ping_registry: HashMap, + start_time: DateTime, + max_events: u32, + is_first_run: bool, + pub(crate) upload_manager: PingUploadManager, + debug: DebugOptions, + pub(crate) app_build: String, + pub(crate) schedule_metrics_pings: bool, + pub(crate) remote_settings_epoch: AtomicU8, + pub(crate) remote_settings_metrics_config: Arc>, +} + +impl Glean { + /// Creates and initializes a new Glean object for use in a subprocess. + /// + /// Importantly, this will not send any pings at startup, since that + /// sort of management should only happen in the main process. + pub fn new_for_subprocess(cfg: &InternalConfiguration, scan_directories: bool) -> Result { + log::info!("Creating new Glean v{}", GLEAN_VERSION); + + let application_id = sanitize_application_id(&cfg.application_id); + if application_id.is_empty() { + return Err(ErrorKind::InvalidConfig.into()); + } + + let data_path = Path::new(&cfg.data_path); + let event_data_store = EventDatabase::new(data_path)?; + + // Create an upload manager with rate limiting of 15 pings every 60 seconds. + let mut upload_manager = PingUploadManager::new(&cfg.data_path, &cfg.language_binding_name); + upload_manager.set_rate_limiter( + /* seconds per interval */ 60, /* max pings per interval */ 15, + ); + + // We only scan the pending ping directories when calling this from a subprocess, + // when calling this from ::new we need to scan the directories after dealing with the upload state. + if scan_directories { + let _scanning_thread = upload_manager.scan_pending_pings_directories(); + } + + let start_time = local_now_with_offset(); + let mut this = Self { + upload_enabled: cfg.upload_enabled, + // In the subprocess, we want to avoid accessing the database entirely. + // The easiest way to ensure that is to just not initialize it. + data_store: None, + event_data_store, + core_metrics: CoreMetrics::new(), + additional_metrics: AdditionalMetrics::new(), + database_metrics: DatabaseMetrics::new(), + internal_pings: InternalPings::new(), + upload_manager, + data_path: PathBuf::from(&cfg.data_path), + application_id, + ping_registry: HashMap::new(), + start_time, + max_events: cfg.max_events.unwrap_or(DEFAULT_MAX_EVENTS), + is_first_run: false, + debug: DebugOptions::new(), + app_build: cfg.app_build.to_string(), + // Subprocess doesn't use "metrics" pings so has no need for a scheduler. + schedule_metrics_pings: false, + remote_settings_epoch: AtomicU8::new(0), + remote_settings_metrics_config: Arc::new(Mutex::new(MetricsEnabledConfig::new())), + }; + + // Ensuring these pings are registered. + let pings = this.internal_pings.clone(); + this.register_ping_type(&pings.baseline); + this.register_ping_type(&pings.metrics); + this.register_ping_type(&pings.events); + this.register_ping_type(&pings.deletion_request); + + Ok(this) + } + + /// Creates and initializes a new Glean object. + /// + /// This will create the necessary directories and files in + /// [`cfg.data_path`](InternalConfiguration::data_path). This will also initialize + /// the core metrics. + pub fn new(cfg: InternalConfiguration) -> Result { + let mut glean = Self::new_for_subprocess(&cfg, false)?; + + // Creating the data store creates the necessary path as well. + // If that fails we bail out and don't initialize further. + let data_path = Path::new(&cfg.data_path); + glean.data_store = Some(Database::new(data_path, cfg.delay_ping_lifetime_io)?); + + // The upload enabled flag may have changed since the last run, for + // example by the changing of a config file. + if cfg.upload_enabled { + // If upload is enabled, just follow the normal code path to + // instantiate the core metrics. + glean.on_upload_enabled(); + } else { + // If upload is disabled, and we've never run before, only set the + // client_id to KNOWN_CLIENT_ID, but do not send a deletion request + // ping. + // If we have run before, and if the client_id is not equal to + // the KNOWN_CLIENT_ID, do the full upload disabled operations to + // clear metrics, set the client_id to KNOWN_CLIENT_ID, and send a + // deletion request ping. + match glean + .core_metrics + .client_id + .get_value(&glean, Some("glean_client_info")) + { + None => glean.clear_metrics(), + Some(uuid) => { + if uuid != *KNOWN_CLIENT_ID { + // Temporarily enable uploading so we can submit a + // deletion request ping. + glean.upload_enabled = true; + glean.on_upload_disabled(true); + } + } + } + } + + // We set this only for non-subprocess situations. + glean.schedule_metrics_pings = cfg.use_core_mps; + + // We only scan the pendings pings directories **after** dealing with the upload state. + // If upload is disabled, we delete all pending pings files + // and we need to do that **before** scanning the pending pings folder + // to ensure we don't enqueue pings before their files are deleted. + let _scanning_thread = glean.upload_manager.scan_pending_pings_directories(); + + Ok(glean) + } + + /// For tests make it easy to create a Glean object using only the required configuration. + #[cfg(test)] + pub(crate) fn with_options( + data_path: &str, + application_id: &str, + upload_enabled: bool, + ) -> Self { + let cfg = InternalConfiguration { + data_path: data_path.into(), + application_id: application_id.into(), + language_binding_name: "Rust".into(), + upload_enabled, + max_events: None, + delay_ping_lifetime_io: false, + app_build: "Unknown".into(), + use_core_mps: false, + trim_data_to_registered_pings: false, + log_level: None, + }; + + let mut glean = Self::new(cfg).unwrap(); + + // Disable all upload manager policies for testing + glean.upload_manager = PingUploadManager::no_policy(data_path); + + glean + } + + /// Destroys the database. + /// + /// After this Glean needs to be reinitialized. + pub fn destroy_db(&mut self) { + self.data_store = None; + } + + /// Initializes the core metrics managed by Glean's Rust core. + fn initialize_core_metrics(&mut self) { + let need_new_client_id = match self + .core_metrics + .client_id + .get_value(self, Some("glean_client_info")) + { + None => true, + Some(uuid) => uuid == *KNOWN_CLIENT_ID, + }; + if need_new_client_id { + self.core_metrics.client_id.generate_and_set_sync(self); + } + + if self + .core_metrics + .first_run_date + .get_value(self, "glean_client_info") + .is_none() + { + self.core_metrics.first_run_date.set_sync(self, None); + // The `first_run_date` field is generated on the very first run + // and persisted across upload toggling. We can assume that, the only + // time it is set, that's indeed our "first run". + self.is_first_run = true; + } + + self.set_application_lifetime_core_metrics(); + } + + /// Initializes the database metrics managed by Glean's Rust core. + fn initialize_database_metrics(&mut self) { + log::trace!("Initializing database metrics"); + + if let Some(size) = self + .data_store + .as_ref() + .and_then(|database| database.file_size()) + { + log::trace!("Database file size: {}", size.get()); + self.database_metrics + .size + .accumulate_sync(self, size.get() as i64) + } + } + + /// Signals that the environment is ready to submit pings. + /// + /// Should be called when Glean is initialized to the point where it can correctly assemble pings. + /// Usually called from the language binding after all of the core metrics have been set + /// and the ping types have been registered. + /// + /// # Arguments + /// + /// * `trim_data_to_registered_pings` - Whether we should limit to storing data only for + /// data belonging to pings previously registered via `register_ping_type`. + /// + /// # Returns + /// + /// Whether the "events" ping was submitted. + pub fn on_ready_to_submit_pings(&self, trim_data_to_registered_pings: bool) -> bool { + self.event_data_store + .flush_pending_events_on_startup(self, trim_data_to_registered_pings) + } + + /// Sets whether upload is enabled or not. + /// + /// When uploading is disabled, metrics aren't recorded at all and no + /// data is uploaded. + /// + /// When disabling, all pending metrics, events and queued pings are cleared. + /// + /// When enabling, the core Glean metrics are recreated. + /// + /// If the value of this flag is not actually changed, this is a no-op. + /// + /// # Arguments + /// + /// * `flag` - When true, enable metric collection. + /// + /// # Returns + /// + /// Whether the flag was different from the current value, + /// and actual work was done to clear or reinstate metrics. + pub fn set_upload_enabled(&mut self, flag: bool) -> bool { + log::info!("Upload enabled: {:?}", flag); + + if self.upload_enabled != flag { + if flag { + self.on_upload_enabled(); + } else { + self.on_upload_disabled(false); + } + true + } else { + false + } + } + + /// Determines whether upload is enabled. + /// + /// When upload is disabled, no data will be recorded. + pub fn is_upload_enabled(&self) -> bool { + self.upload_enabled + } + + /// Handles the changing of state from upload disabled to enabled. + /// + /// Should only be called when the state actually changes. + /// + /// The `upload_enabled` flag is set to true and the core Glean metrics are + /// recreated. + fn on_upload_enabled(&mut self) { + self.upload_enabled = true; + self.initialize_core_metrics(); + self.initialize_database_metrics(); + } + + /// Handles the changing of state from upload enabled to disabled. + /// + /// Should only be called when the state actually changes. + /// + /// A deletion_request ping is sent, all pending metrics, events and queued + /// pings are cleared, and the client_id is set to KNOWN_CLIENT_ID. + /// Afterward, the upload_enabled flag is set to false. + fn on_upload_disabled(&mut self, during_init: bool) { + // The upload_enabled flag should be true here, or the deletion ping + // won't be submitted. + let reason = if during_init { + Some("at_init") + } else { + Some("set_upload_enabled") + }; + if !self + .internal_pings + .deletion_request + .submit_sync(self, reason) + { + log::error!("Failed to submit deletion-request ping on optout."); + } + self.clear_metrics(); + self.upload_enabled = false; + } + + /// Clear any pending metrics when telemetry is disabled. + fn clear_metrics(&mut self) { + // Clear the pending pings queue and acquire the lock + // so that it can't be accessed until this function is done. + let _lock = self.upload_manager.clear_ping_queue(); + + // There is only one metric that we want to survive after clearing all + // metrics: first_run_date. Here, we store its value so we can restore + // it after clearing the metrics. + let existing_first_run_date = self + .core_metrics + .first_run_date + .get_value(self, "glean_client_info"); + + // Clear any pending pings. + let ping_maker = PingMaker::new(); + if let Err(err) = ping_maker.clear_pending_pings(self.get_data_path()) { + log::warn!("Error clearing pending pings: {}", err); + } + + // Delete all stored metrics. + // Note that this also includes the ping sequence numbers, so it has + // the effect of resetting those to their initial values. + if let Some(data) = self.data_store.as_ref() { + data.clear_all() + } + if let Err(err) = self.event_data_store.clear_all() { + log::warn!("Error clearing pending events: {}", err); + } + + // This does not clear the experiments store (which isn't managed by the + // StorageEngineManager), since doing so would mean we would have to have the + // application tell us again which experiments are active if telemetry is + // re-enabled. + + { + // We need to briefly set upload_enabled to true here so that `set` + // is not a no-op. This is safe, since nothing on the Rust side can + // run concurrently to this since we hold a mutable reference to the + // Glean object. Additionally, the pending pings have been cleared + // from disk, so the PingUploader can't wake up and start sending + // pings. + self.upload_enabled = true; + + // Store a "dummy" KNOWN_CLIENT_ID in the client_id metric. This will + // make it easier to detect if pings were unintentionally sent after + // uploading is disabled. + self.core_metrics + .client_id + .set_from_uuid_sync(self, *KNOWN_CLIENT_ID); + + // Restore the first_run_date. + if let Some(existing_first_run_date) = existing_first_run_date { + self.core_metrics + .first_run_date + .set_sync_chrono(self, existing_first_run_date); + } + + self.upload_enabled = false; + } + } + + /// Gets the application ID as specified on instantiation. + pub fn get_application_id(&self) -> &str { + &self.application_id + } + + /// Gets the data path of this instance. + pub fn get_data_path(&self) -> &Path { + &self.data_path + } + + /// Gets a handle to the database. + #[track_caller] // If this fails we're interested in the caller. + pub fn storage(&self) -> &Database { + self.data_store.as_ref().expect("No database found") + } + + /// Gets an optional handle to the database. + pub fn storage_opt(&self) -> Option<&Database> { + self.data_store.as_ref() + } + + /// Gets a handle to the event database. + pub fn event_storage(&self) -> &EventDatabase { + &self.event_data_store + } + + /// Gets the maximum number of events to store before sending a ping. + pub fn get_max_events(&self) -> usize { + self.max_events as usize + } + + /// Gets the next task for an uploader. + /// + /// This can be one of: + /// + /// * [`Wait`](PingUploadTask::Wait) - which means the requester should ask + /// again later; + /// * [`Upload(PingRequest)`](PingUploadTask::Upload) - which means there is + /// a ping to upload. This wraps the actual request object; + /// * [`Done`](PingUploadTask::Done) - which means requester should stop + /// asking for now. + /// + /// # Returns + /// + /// A [`PingUploadTask`] representing the next task. + pub fn get_upload_task(&self) -> PingUploadTask { + self.upload_manager.get_upload_task(self, self.log_pings()) + } + + /// Processes the response from an attempt to upload a ping. + /// + /// # Arguments + /// + /// * `uuid` - The UUID of the ping in question. + /// * `status` - The upload result. + pub fn process_ping_upload_response( + &self, + uuid: &str, + status: UploadResult, + ) -> UploadTaskAction { + self.upload_manager + .process_ping_upload_response(self, uuid, status) + } + + /// Takes a snapshot for the given store and optionally clear it. + /// + /// # Arguments + /// + /// * `store_name` - The store to snapshot. + /// * `clear_store` - Whether to clear the store after snapshotting. + /// + /// # Returns + /// + /// The snapshot in a string encoded as JSON. If the snapshot is empty, returns an empty string. + pub fn snapshot(&mut self, store_name: &str, clear_store: bool) -> String { + StorageManager + .snapshot(self.storage(), store_name, clear_store) + .unwrap_or_else(|| String::from("")) + } + + pub(crate) fn make_path(&self, ping_name: &str, doc_id: &str) -> String { + format!( + "/submit/{}/{}/{}/{}", + self.get_application_id(), + ping_name, + GLEAN_SCHEMA_VERSION, + doc_id + ) + } + + /// Collects and submits a ping by name for eventual uploading. + /// + /// The ping content is assembled as soon as possible, but upload is not + /// guaranteed to happen immediately, as that depends on the upload policies. + /// + /// If the ping currently contains no content, it will not be sent, + /// unless it is configured to be sent if empty. + /// + /// # Arguments + /// + /// * `ping_name` - The name of the ping to submit + /// * `reason` - A reason code to include in the ping + /// + /// # Returns + /// + /// Whether the ping was succesfully assembled and queued. + /// + /// # Errors + /// + /// If collecting or writing the ping to disk failed. + pub fn submit_ping_by_name(&self, ping_name: &str, reason: Option<&str>) -> bool { + match self.get_ping_by_name(ping_name) { + None => { + log::error!("Attempted to submit unknown ping '{}'", ping_name); + false + } + Some(ping) => ping.submit_sync(self, reason), + } + } + + /// Gets a [`PingType`] by name. + /// + /// # Returns + /// + /// The [`PingType`] of a ping if the given name was registered before, [`None`] + /// otherwise. + pub fn get_ping_by_name(&self, ping_name: &str) -> Option<&PingType> { + self.ping_registry.get(ping_name) + } + + /// Register a new [`PingType`](metrics/struct.PingType.html). + pub fn register_ping_type(&mut self, ping: &PingType) { + if self.ping_registry.contains_key(ping.name()) { + log::debug!("Duplicate ping named '{}'", ping.name()) + } + + self.ping_registry + .insert(ping.name().to_string(), ping.clone()); + } + + /// Get create time of the Glean object. + pub(crate) fn start_time(&self) -> DateTime { + self.start_time + } + + /// Indicates that an experiment is running. + /// + /// Glean will then add an experiment annotation to the environment + /// which is sent with pings. This information is not persisted between runs. + /// + /// # Arguments + /// + /// * `experiment_id` - The id of the active experiment (maximum 30 bytes). + /// * `branch` - The experiment branch (maximum 30 bytes). + /// * `extra` - Optional metadata to output with the ping. + pub fn set_experiment_active( + &self, + experiment_id: String, + branch: String, + extra: HashMap, + ) { + let metric = ExperimentMetric::new(self, experiment_id); + metric.set_active_sync(self, branch, extra); + } + + /// Indicates that an experiment is no longer running. + /// + /// # Arguments + /// + /// * `experiment_id` - The id of the active experiment to deactivate (maximum 30 bytes). + pub fn set_experiment_inactive(&self, experiment_id: String) { + let metric = ExperimentMetric::new(self, experiment_id); + metric.set_inactive_sync(self); + } + + /// **Test-only API (exported for FFI purposes).** + /// + /// Gets stored data for the requested experiment. + /// + /// # Arguments + /// + /// * `experiment_id` - The id of the active experiment (maximum 30 bytes). + pub fn test_get_experiment_data(&self, experiment_id: String) -> Option { + let metric = ExperimentMetric::new(self, experiment_id); + metric.test_get_value(self) + } + + /// Set configuration to override the default metric enabled/disabled state, typically from a + /// remote_settings experiment or rollout + /// + /// # Arguments + /// + /// * `json` - The stringified JSON representation of a `MetricsEnabledConfig` object + pub fn set_metrics_enabled_config(&self, cfg: MetricsEnabledConfig) { + // Set the current MetricsEnabledConfig, keeping the lock until the epoch is + // updated to prevent against reading a "new" config but an "old" epoch + let mut lock = self.remote_settings_metrics_config.lock().unwrap(); + *lock = cfg; + + // Update remote_settings epoch + self.remote_settings_epoch.fetch_add(1, Ordering::SeqCst); + } + + /// Persists [`Lifetime::Ping`] data that might be in memory in case + /// [`delay_ping_lifetime_io`](InternalConfiguration::delay_ping_lifetime_io) is set + /// or was set at a previous time. + /// + /// If there is no data to persist, this function does nothing. + pub fn persist_ping_lifetime_data(&self) -> Result<()> { + if let Some(data) = self.data_store.as_ref() { + return data.persist_ping_lifetime_data(); + } + + Ok(()) + } + + /// Sets internally-handled application lifetime metrics. + fn set_application_lifetime_core_metrics(&self) { + self.core_metrics.os.set_sync(self, system::OS); + } + + /// **This is not meant to be used directly.** + /// + /// Clears all the metrics that have [`Lifetime::Application`]. + pub fn clear_application_lifetime_metrics(&self) { + log::trace!("Clearing Lifetime::Application metrics"); + if let Some(data) = self.data_store.as_ref() { + data.clear_lifetime(Lifetime::Application); + } + + // Set internally handled app lifetime metrics again. + self.set_application_lifetime_core_metrics(); + } + + /// Whether or not this is the first run on this profile. + pub fn is_first_run(&self) -> bool { + self.is_first_run + } + + /// Sets a debug view tag. + /// + /// This will return `false` in case `value` is not a valid tag. + /// + /// When the debug view tag is set, pings are sent with a `X-Debug-ID` header with the value of the tag + /// and are sent to the ["Ping Debug Viewer"](https://mozilla.github.io/glean/book/dev/core/internal/debug-pings.html). + /// + /// # Arguments + /// + /// * `value` - A valid HTTP header value. Must match the regex: "[a-zA-Z0-9-]{1,20}". + pub fn set_debug_view_tag(&mut self, value: &str) -> bool { + self.debug.debug_view_tag.set(value.into()) + } + + /// Return the value for the debug view tag or [`None`] if it hasn't been set. + /// + /// The `debug_view_tag` may be set from an environment variable + /// (`GLEAN_DEBUG_VIEW_TAG`) or through the [`set_debug_view_tag`] function. + pub(crate) fn debug_view_tag(&self) -> Option<&String> { + self.debug.debug_view_tag.get() + } + + /// Sets source tags. + /// + /// This will return `false` in case `value` contains invalid tags. + /// + /// Ping tags will show in the destination datasets, after ingestion. + /// + /// **Note** If one or more tags are invalid, all tags are ignored. + /// + /// # Arguments + /// + /// * `value` - A vector of at most 5 valid HTTP header values. Individual tags must match the regex: "[a-zA-Z0-9-]{1,20}". + pub fn set_source_tags(&mut self, value: Vec) -> bool { + self.debug.source_tags.set(value) + } + + /// Return the value for the source tags or [`None`] if it hasn't been set. + /// + /// The `source_tags` may be set from an environment variable (`GLEAN_SOURCE_TAGS`) + /// or through the [`set_source_tags`] function. + pub(crate) fn source_tags(&self) -> Option<&Vec> { + self.debug.source_tags.get() + } + + /// Sets the log pings debug option. + /// + /// This will return `false` in case we are unable to set the option. + /// + /// When the log pings debug option is `true`, + /// we log the payload of all succesfully assembled pings. + /// + /// # Arguments + /// + /// * `value` - The value of the log pings option + pub fn set_log_pings(&mut self, value: bool) -> bool { + self.debug.log_pings.set(value) + } + + /// Return the value for the log pings debug option or [`None`] if it hasn't been set. + /// + /// The `log_pings` option may be set from an environment variable (`GLEAN_LOG_PINGS`) + /// or through the [`set_log_pings`] function. + pub(crate) fn log_pings(&self) -> bool { + self.debug.log_pings.get().copied().unwrap_or(false) + } + + fn get_dirty_bit_metric(&self) -> metrics::BooleanMetric { + metrics::BooleanMetric::new(CommonMetricData { + name: "dirtybit".into(), + // We don't need a category, the name is already unique + category: "".into(), + send_in_pings: vec![INTERNAL_STORAGE.into()], + lifetime: Lifetime::User, + ..Default::default() + }) + } + + /// **This is not meant to be used directly.** + /// + /// Sets the value of a "dirty flag" in the permanent storage. + /// + /// The "dirty flag" is meant to have the following behaviour, implemented + /// by the consumers of the FFI layer: + /// + /// - on mobile: set to `false` when going to background or shutting down, + /// set to `true` at startup and when going to foreground. + /// - on non-mobile platforms: set to `true` at startup and `false` at + /// shutdown. + /// + /// At startup, before setting its new value, if the "dirty flag" value is + /// `true`, then Glean knows it did not exit cleanly and can implement + /// coping mechanisms (e.g. sending a `baseline` ping). + pub fn set_dirty_flag(&self, new_value: bool) { + self.get_dirty_bit_metric().set_sync(self, new_value); + } + + /// **This is not meant to be used directly.** + /// + /// Checks the stored value of the "dirty flag". + pub fn is_dirty_flag_set(&self) -> bool { + let dirty_bit_metric = self.get_dirty_bit_metric(); + match StorageManager.snapshot_metric( + self.storage(), + INTERNAL_STORAGE, + &dirty_bit_metric.meta().identifier(self), + dirty_bit_metric.meta().inner.lifetime, + ) { + Some(Metric::Boolean(b)) => b, + _ => false, + } + } + + /// Performs the collection/cleanup operations required by becoming active. + /// + /// This functions generates a baseline ping with reason `active` + /// and then sets the dirty bit. + pub fn handle_client_active(&mut self) { + if !self + .internal_pings + .baseline + .submit_sync(self, Some("active")) + { + log::info!("baseline ping not submitted on active"); + } + + self.set_dirty_flag(true); + } + + /// Performs the collection/cleanup operations required by becoming inactive. + /// + /// This functions generates a baseline and an events ping with reason + /// `inactive` and then clears the dirty bit. + pub fn handle_client_inactive(&mut self) { + if !self + .internal_pings + .baseline + .submit_sync(self, Some("inactive")) + { + log::info!("baseline ping not submitted on inactive"); + } + + if !self + .internal_pings + .events + .submit_sync(self, Some("inactive")) + { + log::info!("events ping not submitted on inactive"); + } + + self.set_dirty_flag(false); + } + + /// **Test-only API (exported for FFI purposes).** + /// + /// Deletes all stored metrics. + /// + /// Note that this also includes the ping sequence numbers, so it has + /// the effect of resetting those to their initial values. + pub fn test_clear_all_stores(&self) { + if let Some(data) = self.data_store.as_ref() { + data.clear_all() + } + // We don't care about this failing, maybe the data does just not exist. + let _ = self.event_data_store.clear_all(); + } + + /// Instructs the Metrics Ping Scheduler's thread to exit cleanly. + /// If Glean was configured with `use_core_mps: false`, this has no effect. + pub fn cancel_metrics_ping_scheduler(&self) { + if self.schedule_metrics_pings { + scheduler::cancel(); + } + } + + /// Instructs the Metrics Ping Scheduler to being scheduling metrics pings. + /// If Glean wsa configured with `use_core_mps: false`, this has no effect. + pub fn start_metrics_ping_scheduler(&self) { + if self.schedule_metrics_pings { + scheduler::schedule(self); + } + } +} diff --git a/third_party/rust/glean-core/src/core_metrics.rs b/third_party/rust/glean-core/src/core_metrics.rs new file mode 100644 index 0000000000..baa2b8515b --- /dev/null +++ b/third_party/rust/glean-core/src/core_metrics.rs @@ -0,0 +1,206 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::metrics::{ + Datetime, DatetimeMetric, QuantityMetric, StringMetric, TimeUnit, TimespanMetric, +}; +use crate::{CommonMetricData, Lifetime}; + +use once_cell::sync::Lazy; + +/// Metrics included in every ping as `client_info`. +#[derive(Debug, Default)] +pub struct ClientInfoMetrics { + /// The build identifier generated by the CI system (e.g. "1234/A"). + pub app_build: String, + /// The user visible version string (e.g. "1.0.3"). + pub app_display_version: String, + /// The app's build date + pub app_build_date: Datetime, + + /// The architecture of the device (e.g. "arm", "x86"). + pub architecture: String, + /// The name of the operating system (e.g. "Linux", "Android", "iOS"). + pub os_version: String, + + /// The product-provided release channel (e.g. "beta"). + pub channel: Option, + /// The Android specific SDK version of the software running on this hardware device (e.g. "23"). + pub android_sdk_version: Option, + /// The Windows specific OS build version (e.g. 19043) + pub windows_build_number: Option, + /// The manufacturer of the device the application is running on. + /// Not set if the device manufacturer can't be determined (e.g. on Desktop). + pub device_manufacturer: Option, + /// The model of the device the application is running on. + /// On Android, this is Build.MODEL, the user-visible marketing name, like "Pixel 2 XL". + /// Not set if the device model can't be determined (e.g. on Desktop). + pub device_model: Option, + /// The locale of the application during initialization (e.g. "es-ES"). + /// If the locale can't be determined on the system, the value is "und", to indicate "undetermined". + pub locale: Option, +} + +/// Metrics included in every ping as `client_info`. +impl ClientInfoMetrics { + /// Creates the client info with dummy values for all. + pub fn unknown() -> Self { + ClientInfoMetrics { + app_build: "Unknown".to_string(), + app_display_version: "Unknown".to_string(), + app_build_date: Datetime::default(), + architecture: "Unknown".to_string(), + os_version: "Unknown".to_string(), + channel: Some("Unknown".to_string()), + android_sdk_version: None, + windows_build_number: None, + device_manufacturer: None, + device_model: None, + locale: None, + } + } +} + +#[allow(non_upper_case_globals)] +pub mod internal_metrics { + use super::*; + + pub static app_build: Lazy = Lazy::new(|| { + StringMetric::new(CommonMetricData { + name: "app_build".into(), + category: "".into(), + send_in_pings: vec!["glean_client_info".into()], + lifetime: Lifetime::Application, + disabled: false, + ..Default::default() + }) + }); + + pub static app_display_version: Lazy = Lazy::new(|| { + StringMetric::new(CommonMetricData { + name: "app_display_version".into(), + category: "".into(), + send_in_pings: vec!["glean_client_info".into()], + lifetime: Lifetime::Application, + disabled: false, + ..Default::default() + }) + }); + + pub static app_build_date: Lazy = Lazy::new(|| { + DatetimeMetric::new( + CommonMetricData { + name: "build_date".into(), + category: "".into(), + send_in_pings: vec!["glean_client_info".into()], + lifetime: Lifetime::Application, + disabled: false, + ..Default::default() + }, + TimeUnit::Second, + ) + }); + + pub static app_channel: Lazy = Lazy::new(|| { + StringMetric::new(CommonMetricData { + name: "app_channel".into(), + category: "".into(), + send_in_pings: vec!["glean_client_info".into()], + lifetime: Lifetime::Application, + disabled: false, + ..Default::default() + }) + }); + + pub static os_version: Lazy = Lazy::new(|| { + StringMetric::new(CommonMetricData { + name: "os_version".into(), + category: "".into(), + send_in_pings: vec!["glean_client_info".into()], + lifetime: Lifetime::Application, + disabled: false, + ..Default::default() + }) + }); + + pub static architecture: Lazy = Lazy::new(|| { + StringMetric::new(CommonMetricData { + name: "architecture".into(), + category: "".into(), + send_in_pings: vec!["glean_client_info".into()], + lifetime: Lifetime::Application, + disabled: false, + ..Default::default() + }) + }); + + pub static android_sdk_version: Lazy = Lazy::new(|| { + StringMetric::new(CommonMetricData { + name: "android_sdk_version".into(), + category: "".into(), + send_in_pings: vec!["glean_client_info".into()], + lifetime: Lifetime::Application, + disabled: false, + ..Default::default() + }) + }); + + pub static windows_build_number: Lazy = Lazy::new(|| { + QuantityMetric::new(CommonMetricData { + name: "windows_build_number".into(), + category: "".into(), + send_in_pings: vec!["glean_client_info".into()], + lifetime: Lifetime::Application, + disabled: false, + ..Default::default() + }) + }); + + pub static device_manufacturer: Lazy = Lazy::new(|| { + StringMetric::new(CommonMetricData { + name: "device_manufacturer".into(), + category: "".into(), + send_in_pings: vec!["glean_client_info".into()], + lifetime: Lifetime::Application, + disabled: false, + ..Default::default() + }) + }); + + pub static device_model: Lazy = Lazy::new(|| { + StringMetric::new(CommonMetricData { + name: "device_model".into(), + category: "".into(), + send_in_pings: vec!["glean_client_info".into()], + lifetime: Lifetime::Application, + disabled: false, + ..Default::default() + }) + }); + + pub static locale: Lazy = Lazy::new(|| { + StringMetric::new(CommonMetricData { + name: "locale".into(), + category: "".into(), + send_in_pings: vec!["glean_client_info".into()], + lifetime: Lifetime::Application, + disabled: false, + ..Default::default() + }) + }); + + pub static baseline_duration: Lazy = Lazy::new(|| { + TimespanMetric::new( + CommonMetricData { + name: "duration".into(), + category: "glean.baseline".into(), + send_in_pings: vec!["baseline".into()], + lifetime: Lifetime::Ping, + disabled: false, + ..Default::default() + }, + TimeUnit::Second, + ) + }); +} diff --git a/third_party/rust/glean-core/src/coverage.rs b/third_party/rust/glean-core/src/coverage.rs new file mode 100644 index 0000000000..426e6295c8 --- /dev/null +++ b/third_party/rust/glean-core/src/coverage.rs @@ -0,0 +1,47 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Utilities for recording when testing APIs have been called on specific +//! metrics. +//! +//! Testing coverage is enabled by setting the GLEAN_TEST_COVERAGE environment +//! variable to the name of an output file. This output file must run through a +//! post-processor (in glean_parser's `coverage` command) to convert to a format +//! understood by third-party coverage reporting tools. +//! +//! While running a unit test suite, Glean records which database keys were +//! accessed by the testing APIs, with one entry per line. Database keys are +//! usually, but not always, the same as metric identifiers, but it is the +//! responsibility of the post-processor to resolve that difference. +//! +//! This functionality has no runtime overhead unless the testing API is used. + +use std::env; +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::sync::Mutex; + +use once_cell::sync::Lazy; + +static COVERAGE_FILE: Lazy>> = Lazy::new(|| { + if let Some(filename) = env::var_os("GLEAN_TEST_COVERAGE") { + match OpenOptions::new().append(true).create(true).open(filename) { + Ok(file) => { + return Some(Mutex::new(file)); + } + Err(err) => { + log::error!("Couldn't open file for coverage results: {:?}", err); + } + } + } + None +}); + +pub(crate) fn record_coverage(metric_id: &str) { + if let Some(file_mutex) = &*COVERAGE_FILE { + let mut file = file_mutex.lock().unwrap(); + writeln!(&mut file, "{}", metric_id).ok(); + file.flush().ok(); + } +} diff --git a/third_party/rust/glean-core/src/database/mod.rs b/third_party/rust/glean-core/src/database/mod.rs new file mode 100644 index 0000000000..32a20b017a --- /dev/null +++ b/third_party/rust/glean-core/src/database/mod.rs @@ -0,0 +1,1766 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; +use std::fs; +use std::io; +use std::num::NonZeroU64; +use std::path::Path; +use std::str; +use std::sync::RwLock; + +use crate::ErrorKind; + +use rkv::migrator::Migrator; +use rkv::{MigrateError, StoreError, StoreOptions}; + +/// Unwrap a `Result`s `Ok` value or do the specified action. +/// +/// This is an alternative to the question-mark operator (`?`), +/// when the other action should not be to return the error. +macro_rules! unwrap_or { + ($expr:expr, $or:expr) => { + match $expr { + Ok(x) => x, + Err(_) => { + $or; + } + } + }; +} + +/// cbindgen:ignore +pub type Rkv = rkv::Rkv; +/// cbindgen:ignore +pub type SingleStore = rkv::SingleStore; +/// cbindgen:ignore +pub type Writer<'t> = rkv::Writer>; + +pub fn rkv_new(path: &Path) -> std::result::Result { + match Rkv::new::(path) { + // An invalid file can mean: + // 1. An empty file. + // 2. A corrupted file. + // + // In both instances there's not much we can do. + // Drop the data by removing the file, and start over. + Err(rkv::StoreError::FileInvalid) => { + let safebin = path.join("data.safe.bin"); + fs::remove_file(safebin).map_err(|_| rkv::StoreError::FileInvalid)?; + // Now try again, we only handle that error once. + Rkv::new::(path) + } + Err(rkv::StoreError::DatabaseCorrupted) => { + let safebin = path.join("data.safe.bin"); + fs::remove_file(safebin).map_err(|_| rkv::StoreError::DatabaseCorrupted)?; + // Try again, only allowing the error once. + Rkv::new::(path) + } + other => other, + } +} + +fn delete_and_log(path: &Path, msg: &str) { + if let Err(err) = fs::remove_file(path) { + match err.kind() { + std::io::ErrorKind::NotFound => { + // Silently drop this error, the file was already non-existing. + } + _ => log::warn!("{}", msg), + } + } +} + +fn delete_lmdb_database(path: &Path) { + let datamdb = path.join("data.mdb"); + delete_and_log(&datamdb, "Failed to delete old data."); + + let lockmdb = path.join("lock.mdb"); + delete_and_log(&lockmdb, "Failed to delete old lock."); +} + +/// Migrate from LMDB storage to safe-mode storage. +/// +/// This migrates the data once, then deletes the LMDB storage. +/// The safe-mode storage must be empty for it to work. +/// Existing data will not be overwritten. +/// If the destination database is not empty the LMDB database is deleted +/// without migrating data. +/// This is a no-op if no LMDB database file exists. +pub fn migrate(path: &Path, dst_env: &Rkv) { + log::debug!("Migrating files in {}", path.display()); + + // Shortcut if no data to migrate is around. + let datamdb = path.join("data.mdb"); + if !datamdb.exists() { + log::debug!("No data to migrate."); + return; + } + + // We're handling the same error cases as `easy_migrate_lmdb_to_safe_mode`, + // but annotate each why they don't cause problems for Glean. + // Additionally for known cases we delete the LMDB database regardless. + let should_delete = + match Migrator::open_and_migrate_lmdb_to_safe_mode(path, |builder| builder, dst_env) { + // Source environment is corrupted. + // We start fresh with the new database. + Err(MigrateError::StoreError(StoreError::FileInvalid)) => true, + Err(MigrateError::StoreError(StoreError::DatabaseCorrupted)) => true, + // Path not accessible. + // Somehow our directory vanished between us creating it and reading from it. + // Nothing we can do really. + Err(MigrateError::StoreError(StoreError::IoError(_))) => true, + // Path accessible but incompatible for configuration. + // This should not happen, we never used storages that safe-mode doesn't understand. + // If it does happen, let's start fresh and use the safe-mode from now on. + Err(MigrateError::StoreError(StoreError::UnsuitableEnvironmentPath(_))) => true, + // Nothing to migrate. + // Source database was empty. We just start fresh anyway. + Err(MigrateError::SourceEmpty) => true, + // Migrating would overwrite. + // Either a previous migration failed and we still started writing data, + // or someone placed back an old data file. + // In any case we better stay on the new data and delete the old one. + Err(MigrateError::DestinationNotEmpty) => { + log::warn!("Failed to migrate old data. Destination was not empty"); + true + } + // An internal lock was poisoned. + // This would only happen if multiple things run concurrently and one crashes. + Err(MigrateError::ManagerPoisonError) => false, + // Couldn't close source environment and delete files on disk (e.g. other stores still open). + // This could only happen if multiple instances are running, + // we leave files in place. + Err(MigrateError::CloseError(_)) => false, + // Other store errors are never returned from the migrator. + // We need to handle them to please rustc. + Err(MigrateError::StoreError(_)) => false, + // Other errors can't happen, so this leaves us with the Ok case. + // This already deleted the LMDB files. + Ok(()) => false, + }; + + if should_delete { + log::debug!("Need to delete remaining LMDB files."); + delete_lmdb_database(path); + } + + log::debug!("Migration ended. Safe-mode database in {}", path.display()); +} + +use crate::common_metric_data::CommonMetricDataInternal; +use crate::metrics::Metric; +use crate::Glean; +use crate::Lifetime; +use crate::Result; + +pub struct Database { + /// Handle to the database environment. + rkv: Rkv, + + /// Handles to the "lifetime" stores. + /// + /// A "store" is a handle to the underlying database. + /// We keep them open for fast and frequent access. + user_store: SingleStore, + ping_store: SingleStore, + application_store: SingleStore, + + /// If the `delay_ping_lifetime_io` Glean config option is `true`, + /// we will save metrics with 'ping' lifetime data in a map temporarily + /// so as to persist them to disk using rkv in bulk on demand. + ping_lifetime_data: Option>>, + + // Initial file size when opening the database. + file_size: Option, +} + +impl std::fmt::Debug for Database { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + fmt.debug_struct("Database") + .field("rkv", &self.rkv) + .field("user_store", &"SingleStore") + .field("ping_store", &"SingleStore") + .field("application_store", &"SingleStore") + .field("ping_lifetime_data", &self.ping_lifetime_data) + .finish() + } +} + +/// Calculate the database size from all the files in the directory. +/// +/// # Arguments +/// +/// *`path` - The path to the directory +/// +/// # Returns +/// +/// Returns the non-zero combined size of all files in a directory, +/// or `None` on error or if the size is `0`. +fn database_size(dir: &Path) -> Option { + let mut total_size = 0; + if let Ok(entries) = fs::read_dir(dir) { + for entry in entries.flatten() { + if let Ok(file_type) = entry.file_type() { + if file_type.is_file() { + let path = entry.path(); + if let Ok(metadata) = fs::metadata(path) { + total_size += metadata.len(); + } else { + continue; + } + } + } + } + } + + NonZeroU64::new(total_size) +} + +impl Database { + /// Initializes the data store. + /// + /// This opens the underlying rkv store and creates + /// the underlying directory structure. + /// + /// It also loads any Lifetime::Ping data that might be + /// persisted, in case `delay_ping_lifetime_io` is set. + pub fn new(data_path: &Path, delay_ping_lifetime_io: bool) -> Result { + let path = data_path.join("db"); + log::debug!("Database path: {:?}", path.display()); + let file_size = database_size(&path); + + let rkv = Self::open_rkv(&path)?; + let user_store = rkv.open_single(Lifetime::User.as_str(), StoreOptions::create())?; + let ping_store = rkv.open_single(Lifetime::Ping.as_str(), StoreOptions::create())?; + let application_store = + rkv.open_single(Lifetime::Application.as_str(), StoreOptions::create())?; + let ping_lifetime_data = if delay_ping_lifetime_io { + Some(RwLock::new(BTreeMap::new())) + } else { + None + }; + + let db = Self { + rkv, + user_store, + ping_store, + application_store, + ping_lifetime_data, + file_size, + }; + + db.load_ping_lifetime_data(); + + Ok(db) + } + + /// Get the initial database file size. + pub fn file_size(&self) -> Option { + self.file_size + } + + fn get_store(&self, lifetime: Lifetime) -> &SingleStore { + match lifetime { + Lifetime::User => &self.user_store, + Lifetime::Ping => &self.ping_store, + Lifetime::Application => &self.application_store, + } + } + + /// Creates the storage directories and inits rkv. + fn open_rkv(path: &Path) -> Result { + fs::create_dir_all(path)?; + + let rkv = rkv_new(path)?; + migrate(path, &rkv); + + log::info!("Database initialized"); + Ok(rkv) + } + + /// Build the key of the final location of the data in the database. + /// Such location is built using the storage name and the metric + /// key/name (if available). + /// + /// # Arguments + /// + /// * `storage_name` - the name of the storage to store/fetch data from. + /// * `metric_key` - the optional metric key/name. + /// + /// # Returns + /// + /// A string representing the location in the database. + fn get_storage_key(storage_name: &str, metric_key: Option<&str>) -> String { + match metric_key { + Some(k) => format!("{}#{}", storage_name, k), + None => format!("{}#", storage_name), + } + } + + /// Loads Lifetime::Ping data from rkv to memory, + /// if `delay_ping_lifetime_io` is set to true. + /// + /// Does nothing if it isn't or if there is not data to load. + fn load_ping_lifetime_data(&self) { + if let Some(ping_lifetime_data) = &self.ping_lifetime_data { + let mut data = ping_lifetime_data + .write() + .expect("Can't read ping lifetime data"); + + let reader = unwrap_or!(self.rkv.read(), return); + let store = self.get_store(Lifetime::Ping); + let mut iter = unwrap_or!(store.iter_start(&reader), return); + + while let Some(Ok((metric_id, value))) = iter.next() { + let metric_id = match str::from_utf8(metric_id) { + Ok(metric_id) => metric_id.to_string(), + _ => continue, + }; + let metric: Metric = match value { + rkv::Value::Blob(blob) => unwrap_or!(bincode::deserialize(blob), continue), + _ => continue, + }; + + data.insert(metric_id, metric); + } + } + } + + /// Iterates with the provided transaction function + /// over the requested data from the given storage. + /// + /// * If the storage is unavailable, the transaction function is never invoked. + /// * If the read data cannot be deserialized it will be silently skipped. + /// + /// # Arguments + /// + /// * `lifetime` - The metric lifetime to iterate over. + /// * `storage_name` - The storage name to iterate over. + /// * `metric_key` - The metric key to iterate over. All metrics iterated over + /// will have this prefix. For example, if `metric_key` is of the form `{category}.`, + /// it will iterate over all metrics in the given category. If the `metric_key` is of the + /// form `{category}.{name}/`, the iterator will iterate over all specific metrics for + /// a given labeled metric. If not provided, the entire storage for the given lifetime + /// will be iterated over. + /// * `transaction_fn` - Called for each entry being iterated over. It is + /// passed two arguments: `(metric_id: &[u8], metric: &Metric)`. + /// + /// # Panics + /// + /// This function will **not** panic on database errors. + pub fn iter_store_from( + &self, + lifetime: Lifetime, + storage_name: &str, + metric_key: Option<&str>, + mut transaction_fn: F, + ) where + F: FnMut(&[u8], &Metric), + { + let iter_start = Self::get_storage_key(storage_name, metric_key); + let len = iter_start.len(); + + // Lifetime::Ping data is not immediately persisted to disk if + // Glean has `delay_ping_lifetime_io` set to true + if lifetime == Lifetime::Ping { + if let Some(ping_lifetime_data) = &self.ping_lifetime_data { + let data = ping_lifetime_data + .read() + .expect("Can't read ping lifetime data"); + for (key, value) in data.iter() { + if key.starts_with(&iter_start) { + let key = &key[len..]; + transaction_fn(key.as_bytes(), value); + } + } + return; + } + } + + let reader = unwrap_or!(self.rkv.read(), return); + let mut iter = unwrap_or!( + self.get_store(lifetime).iter_from(&reader, &iter_start), + return + ); + + while let Some(Ok((metric_id, value))) = iter.next() { + if !metric_id.starts_with(iter_start.as_bytes()) { + break; + } + + let metric_id = &metric_id[len..]; + let metric: Metric = match value { + rkv::Value::Blob(blob) => unwrap_or!(bincode::deserialize(blob), continue), + _ => continue, + }; + transaction_fn(metric_id, &metric); + } + } + + /// Determines if the storage has the given metric. + /// + /// If data cannot be read it is assumed that the storage does not have the metric. + /// + /// # Arguments + /// + /// * `lifetime` - The lifetime of the metric. + /// * `storage_name` - The storage name to look in. + /// * `metric_identifier` - The metric identifier. + /// + /// # Panics + /// + /// This function will **not** panic on database errors. + pub fn has_metric( + &self, + lifetime: Lifetime, + storage_name: &str, + metric_identifier: &str, + ) -> bool { + let key = Self::get_storage_key(storage_name, Some(metric_identifier)); + + // Lifetime::Ping data is not persisted to disk if + // Glean has `delay_ping_lifetime_io` set to true + if lifetime == Lifetime::Ping { + if let Some(ping_lifetime_data) = &self.ping_lifetime_data { + return ping_lifetime_data + .read() + .map(|data| data.contains_key(&key)) + .unwrap_or(false); + } + } + + let reader = unwrap_or!(self.rkv.read(), return false); + self.get_store(lifetime) + .get(&reader, &key) + .unwrap_or(None) + .is_some() + } + + /// Writes to the specified storage with the provided transaction function. + /// + /// If the storage is unavailable, it will return an error. + /// + /// # Panics + /// + /// * This function will **not** panic on database errors. + fn write_with_store(&self, store_name: Lifetime, mut transaction_fn: F) -> Result<()> + where + F: FnMut(Writer, &SingleStore) -> Result<()>, + { + let writer = self.rkv.write().unwrap(); + let store = self.get_store(store_name); + transaction_fn(writer, store) + } + + /// Records a metric in the underlying storage system. + pub fn record(&self, glean: &Glean, data: &CommonMetricDataInternal, value: &Metric) { + // If upload is disabled we don't want to record. + if !glean.is_upload_enabled() { + return; + } + + let name = data.identifier(glean); + + for ping_name in data.storage_names() { + if let Err(e) = self.record_per_lifetime(data.inner.lifetime, ping_name, &name, value) { + log::error!( + "Failed to record metric '{}' into {}: {:?}", + data.base_identifier(), + ping_name, + e + ); + } + } + } + + /// Records a metric in the underlying storage system, for a single lifetime. + /// + /// # Returns + /// + /// If the storage is unavailable or the write fails, no data will be stored and an error will be returned. + /// + /// Otherwise `Ok(())` is returned. + /// + /// # Panics + /// + /// This function will **not** panic on database errors. + fn record_per_lifetime( + &self, + lifetime: Lifetime, + storage_name: &str, + key: &str, + metric: &Metric, + ) -> Result<()> { + let final_key = Self::get_storage_key(storage_name, Some(key)); + + // Lifetime::Ping data is not immediately persisted to disk if + // Glean has `delay_ping_lifetime_io` set to true + if lifetime == Lifetime::Ping { + if let Some(ping_lifetime_data) = &self.ping_lifetime_data { + let mut data = ping_lifetime_data + .write() + .expect("Can't read ping lifetime data"); + data.insert(final_key, metric.clone()); + return Ok(()); + } + } + + let encoded = bincode::serialize(&metric).expect("IMPOSSIBLE: Serializing metric failed"); + let value = rkv::Value::Blob(&encoded); + + let mut writer = self.rkv.write()?; + self.get_store(lifetime) + .put(&mut writer, final_key, &value)?; + writer.commit()?; + Ok(()) + } + + /// Records the provided value, with the given lifetime, + /// after applying a transformation function. + pub fn record_with(&self, glean: &Glean, data: &CommonMetricDataInternal, mut transform: F) + where + F: FnMut(Option) -> Metric, + { + // If upload is disabled we don't want to record. + if !glean.is_upload_enabled() { + return; + } + + let name = data.identifier(glean); + for ping_name in data.storage_names() { + if let Err(e) = + self.record_per_lifetime_with(data.inner.lifetime, ping_name, &name, &mut transform) + { + log::error!( + "Failed to record metric '{}' into {}: {:?}", + data.base_identifier(), + ping_name, + e + ); + } + } + } + + /// Records a metric in the underlying storage system, + /// after applying the given transformation function, for a single lifetime. + /// + /// # Returns + /// + /// If the storage is unavailable or the write fails, no data will be stored and an error will be returned. + /// + /// Otherwise `Ok(())` is returned. + /// + /// # Panics + /// + /// This function will **not** panic on database errors. + fn record_per_lifetime_with( + &self, + lifetime: Lifetime, + storage_name: &str, + key: &str, + mut transform: F, + ) -> Result<()> + where + F: FnMut(Option) -> Metric, + { + let final_key = Self::get_storage_key(storage_name, Some(key)); + + // Lifetime::Ping data is not persisted to disk if + // Glean has `delay_ping_lifetime_io` set to true + if lifetime == Lifetime::Ping { + if let Some(ping_lifetime_data) = &self.ping_lifetime_data { + let mut data = ping_lifetime_data + .write() + .expect("Can't access ping lifetime data as writable"); + let entry = data.entry(final_key); + match entry { + Entry::Vacant(entry) => { + entry.insert(transform(None)); + } + Entry::Occupied(mut entry) => { + let old_value = entry.get().clone(); + entry.insert(transform(Some(old_value))); + } + } + return Ok(()); + } + } + + let mut writer = self.rkv.write()?; + let store = self.get_store(lifetime); + let new_value: Metric = { + let old_value = store.get(&writer, &final_key)?; + + match old_value { + Some(rkv::Value::Blob(blob)) => { + let old_value = bincode::deserialize(blob).ok(); + transform(old_value) + } + _ => transform(None), + } + }; + + let encoded = + bincode::serialize(&new_value).expect("IMPOSSIBLE: Serializing metric failed"); + let value = rkv::Value::Blob(&encoded); + store.put(&mut writer, final_key, &value)?; + writer.commit()?; + Ok(()) + } + + /// Clears a storage (only Ping Lifetime). + /// + /// # Returns + /// + /// * If the storage is unavailable an error is returned. + /// * If any individual delete fails, an error is returned, but other deletions might have + /// happened. + /// + /// Otherwise `Ok(())` is returned. + /// + /// # Panics + /// + /// This function will **not** panic on database errors. + pub fn clear_ping_lifetime_storage(&self, storage_name: &str) -> Result<()> { + // Lifetime::Ping data will be saved to `ping_lifetime_data` + // in case `delay_ping_lifetime_io` is set to true + if let Some(ping_lifetime_data) = &self.ping_lifetime_data { + ping_lifetime_data + .write() + .expect("Can't access ping lifetime data as writable") + .retain(|metric_id, _| !metric_id.starts_with(storage_name)); + } + + self.write_with_store(Lifetime::Ping, |mut writer, store| { + let mut metrics = Vec::new(); + { + let mut iter = store.iter_from(&writer, storage_name)?; + while let Some(Ok((metric_id, _))) = iter.next() { + if let Ok(metric_id) = std::str::from_utf8(metric_id) { + if !metric_id.starts_with(storage_name) { + break; + } + metrics.push(metric_id.to_owned()); + } + } + } + + let mut res = Ok(()); + for to_delete in metrics { + if let Err(e) = store.delete(&mut writer, to_delete) { + log::warn!("Can't delete from store: {:?}", e); + res = Err(e); + } + } + + writer.commit()?; + Ok(res?) + }) + } + + /// Removes a single metric from the storage. + /// + /// # Arguments + /// + /// * `lifetime` - the lifetime of the storage in which to look for the metric. + /// * `storage_name` - the name of the storage to store/fetch data from. + /// * `metric_id` - the metric category + name. + /// + /// # Returns + /// + /// * If the storage is unavailable an error is returned. + /// * If the metric could not be deleted, an error is returned. + /// + /// Otherwise `Ok(())` is returned. + /// + /// # Panics + /// + /// This function will **not** panic on database errors. + pub fn remove_single_metric( + &self, + lifetime: Lifetime, + storage_name: &str, + metric_id: &str, + ) -> Result<()> { + let final_key = Self::get_storage_key(storage_name, Some(metric_id)); + + // Lifetime::Ping data is not persisted to disk if + // Glean has `delay_ping_lifetime_io` set to true + if lifetime == Lifetime::Ping { + if let Some(ping_lifetime_data) = &self.ping_lifetime_data { + let mut data = ping_lifetime_data + .write() + .expect("Can't access app lifetime data as writable"); + data.remove(&final_key); + } + } + + self.write_with_store(lifetime, |mut writer, store| { + if let Err(e) = store.delete(&mut writer, final_key.clone()) { + if self.ping_lifetime_data.is_some() { + // If ping_lifetime_data exists, it might be + // that data is in memory, but not yet in rkv. + return Ok(()); + } + return Err(e.into()); + } + writer.commit()?; + Ok(()) + }) + } + + /// Clears all the metrics in the database, for the provided lifetime. + /// + /// Errors are logged. + /// + /// # Panics + /// + /// * This function will **not** panic on database errors. + pub fn clear_lifetime(&self, lifetime: Lifetime) { + let res = self.write_with_store(lifetime, |mut writer, store| { + store.clear(&mut writer)?; + writer.commit()?; + Ok(()) + }); + + if let Err(e) = res { + // We try to clear everything. + // If there was no data to begin with we encounter a `NotFound` error. + // There's no point in logging that. + if let ErrorKind::Rkv(StoreError::IoError(ioerr)) = e.kind() { + if let io::ErrorKind::NotFound = ioerr.kind() { + log::debug!( + "Could not clear store for lifetime {:?}: {:?}", + lifetime, + ioerr + ); + return; + } + } + + log::warn!("Could not clear store for lifetime {:?}: {:?}", lifetime, e); + } + } + + /// Clears all metrics in the database. + /// + /// Errors are logged. + /// + /// # Panics + /// + /// * This function will **not** panic on database errors. + pub fn clear_all(&self) { + if let Some(ping_lifetime_data) = &self.ping_lifetime_data { + ping_lifetime_data + .write() + .expect("Can't access ping lifetime data as writable") + .clear(); + } + + for lifetime in [Lifetime::User, Lifetime::Ping, Lifetime::Application].iter() { + self.clear_lifetime(*lifetime); + } + } + + /// Persists ping_lifetime_data to disk. + /// + /// Does nothing in case there is nothing to persist. + /// + /// # Panics + /// + /// * This function will **not** panic on database errors. + pub fn persist_ping_lifetime_data(&self) -> Result<()> { + if let Some(ping_lifetime_data) = &self.ping_lifetime_data { + let data = ping_lifetime_data + .read() + .expect("Can't read ping lifetime data"); + + self.write_with_store(Lifetime::Ping, |mut writer, store| { + for (key, value) in data.iter() { + let encoded = + bincode::serialize(&value).expect("IMPOSSIBLE: Serializing metric failed"); + // There is no need for `get_storage_key` here because + // the key is already formatted from when it was saved + // to ping_lifetime_data. + store.put(&mut writer, key, &rkv::Value::Blob(&encoded))?; + } + writer.commit()?; + Ok(()) + })?; + } + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::tests::new_glean; + use std::collections::HashMap; + use std::path::Path; + use tempfile::tempdir; + + #[test] + fn test_panicks_if_fails_dir_creation() { + let path = Path::new("/!#\"'@#°ç"); + assert!(Database::new(path, false).is_err()); + } + + #[test] + #[cfg(windows)] + fn windows_invalid_utf16_panicfree() { + use std::ffi::OsString; + use std::os::windows::prelude::*; + + // Here the values 0x0066 and 0x006f correspond to 'f' and 'o' + // respectively. The value 0xD800 is a lone surrogate half, invalid + // in a UTF-16 sequence. + let source = [0x0066, 0x006f, 0xD800, 0x006f]; + let os_string = OsString::from_wide(&source[..]); + let os_str = os_string.as_os_str(); + let dir = tempdir().unwrap(); + let path = dir.path().join(os_str); + + let res = Database::new(&path, false); + + assert!( + res.is_ok(), + "Database should succeed at {}: {:?}", + path.display(), + res + ); + } + + #[test] + #[cfg(target_os = "linux")] + fn linux_invalid_utf8_panicfree() { + use std::ffi::OsStr; + use std::os::unix::ffi::OsStrExt; + + // Here, the values 0x66 and 0x6f correspond to 'f' and 'o' + // respectively. The value 0x80 is a lone continuation byte, invalid + // in a UTF-8 sequence. + let source = [0x66, 0x6f, 0x80, 0x6f]; + let os_str = OsStr::from_bytes(&source[..]); + let dir = tempdir().unwrap(); + let path = dir.path().join(os_str); + + let res = Database::new(&path, false); + assert!( + res.is_ok(), + "Database should not fail at {}: {:?}", + path.display(), + res + ); + } + + #[test] + #[cfg(target_os = "macos")] + fn macos_invalid_utf8_panicfree() { + use std::ffi::OsStr; + use std::os::unix::ffi::OsStrExt; + + // Here, the values 0x66 and 0x6f correspond to 'f' and 'o' + // respectively. The value 0x80 is a lone continuation byte, invalid + // in a UTF-8 sequence. + let source = [0x66, 0x6f, 0x80, 0x6f]; + let os_str = OsStr::from_bytes(&source[..]); + let dir = tempdir().unwrap(); + let path = dir.path().join(os_str); + + let res = Database::new(&path, false); + assert!( + res.is_err(), + "Database should not fail at {}: {:?}", + path.display(), + res + ); + } + + #[test] + fn test_data_dir_rkv_inits() { + let dir = tempdir().unwrap(); + Database::new(dir.path(), false).unwrap(); + + assert!(dir.path().exists()); + } + + #[test] + fn test_ping_lifetime_metric_recorded() { + // Init the database in a temporary directory. + let dir = tempdir().unwrap(); + let db = Database::new(dir.path(), false).unwrap(); + + assert!(db.ping_lifetime_data.is_none()); + + // Attempt to record a known value. + let test_value = "test-value"; + let test_storage = "test-storage"; + let test_metric_id = "telemetry_test.test_name"; + db.record_per_lifetime( + Lifetime::Ping, + test_storage, + test_metric_id, + &Metric::String(test_value.to_string()), + ) + .unwrap(); + + // Verify that the data is correctly recorded. + let mut found_metrics = 0; + let mut snapshotter = |metric_id: &[u8], metric: &Metric| { + found_metrics += 1; + let metric_id = String::from_utf8_lossy(metric_id).into_owned(); + assert_eq!(test_metric_id, metric_id); + match metric { + Metric::String(s) => assert_eq!(test_value, s), + _ => panic!("Unexpected data found"), + } + }; + + db.iter_store_from(Lifetime::Ping, test_storage, None, &mut snapshotter); + assert_eq!(1, found_metrics, "We only expect 1 Lifetime.Ping metric."); + } + + #[test] + fn test_application_lifetime_metric_recorded() { + // Init the database in a temporary directory. + let dir = tempdir().unwrap(); + let db = Database::new(dir.path(), false).unwrap(); + + // Attempt to record a known value. + let test_value = "test-value"; + let test_storage = "test-storage1"; + let test_metric_id = "telemetry_test.test_name"; + db.record_per_lifetime( + Lifetime::Application, + test_storage, + test_metric_id, + &Metric::String(test_value.to_string()), + ) + .unwrap(); + + // Verify that the data is correctly recorded. + let mut found_metrics = 0; + let mut snapshotter = |metric_id: &[u8], metric: &Metric| { + found_metrics += 1; + let metric_id = String::from_utf8_lossy(metric_id).into_owned(); + assert_eq!(test_metric_id, metric_id); + match metric { + Metric::String(s) => assert_eq!(test_value, s), + _ => panic!("Unexpected data found"), + } + }; + + db.iter_store_from(Lifetime::Application, test_storage, None, &mut snapshotter); + assert_eq!( + 1, found_metrics, + "We only expect 1 Lifetime.Application metric." + ); + } + + #[test] + fn test_user_lifetime_metric_recorded() { + // Init the database in a temporary directory. + let dir = tempdir().unwrap(); + let db = Database::new(dir.path(), false).unwrap(); + + // Attempt to record a known value. + let test_value = "test-value"; + let test_storage = "test-storage2"; + let test_metric_id = "telemetry_test.test_name"; + db.record_per_lifetime( + Lifetime::User, + test_storage, + test_metric_id, + &Metric::String(test_value.to_string()), + ) + .unwrap(); + + // Verify that the data is correctly recorded. + let mut found_metrics = 0; + let mut snapshotter = |metric_id: &[u8], metric: &Metric| { + found_metrics += 1; + let metric_id = String::from_utf8_lossy(metric_id).into_owned(); + assert_eq!(test_metric_id, metric_id); + match metric { + Metric::String(s) => assert_eq!(test_value, s), + _ => panic!("Unexpected data found"), + } + }; + + db.iter_store_from(Lifetime::User, test_storage, None, &mut snapshotter); + assert_eq!(1, found_metrics, "We only expect 1 Lifetime.User metric."); + } + + #[test] + fn test_clear_ping_storage() { + // Init the database in a temporary directory. + let dir = tempdir().unwrap(); + let db = Database::new(dir.path(), false).unwrap(); + + // Attempt to record a known value for every single lifetime. + let test_storage = "test-storage"; + db.record_per_lifetime( + Lifetime::User, + test_storage, + "telemetry_test.test_name_user", + &Metric::String("test-value-user".to_string()), + ) + .unwrap(); + db.record_per_lifetime( + Lifetime::Ping, + test_storage, + "telemetry_test.test_name_ping", + &Metric::String("test-value-ping".to_string()), + ) + .unwrap(); + db.record_per_lifetime( + Lifetime::Application, + test_storage, + "telemetry_test.test_name_application", + &Metric::String("test-value-application".to_string()), + ) + .unwrap(); + + // Take a snapshot for the data, all the lifetimes. + { + let mut snapshot: HashMap = HashMap::new(); + let mut snapshotter = |metric_id: &[u8], metric: &Metric| { + let metric_id = String::from_utf8_lossy(metric_id).into_owned(); + match metric { + Metric::String(s) => snapshot.insert(metric_id, s.to_string()), + _ => panic!("Unexpected data found"), + }; + }; + + db.iter_store_from(Lifetime::User, test_storage, None, &mut snapshotter); + db.iter_store_from(Lifetime::Ping, test_storage, None, &mut snapshotter); + db.iter_store_from(Lifetime::Application, test_storage, None, &mut snapshotter); + + assert_eq!(3, snapshot.len(), "We expect all lifetimes to be present."); + assert!(snapshot.contains_key("telemetry_test.test_name_user")); + assert!(snapshot.contains_key("telemetry_test.test_name_ping")); + assert!(snapshot.contains_key("telemetry_test.test_name_application")); + } + + // Clear the Ping lifetime. + db.clear_ping_lifetime_storage(test_storage).unwrap(); + + // Take a snapshot again and check that we're only clearing the Ping lifetime. + { + let mut snapshot: HashMap = HashMap::new(); + let mut snapshotter = |metric_id: &[u8], metric: &Metric| { + let metric_id = String::from_utf8_lossy(metric_id).into_owned(); + match metric { + Metric::String(s) => snapshot.insert(metric_id, s.to_string()), + _ => panic!("Unexpected data found"), + }; + }; + + db.iter_store_from(Lifetime::User, test_storage, None, &mut snapshotter); + db.iter_store_from(Lifetime::Ping, test_storage, None, &mut snapshotter); + db.iter_store_from(Lifetime::Application, test_storage, None, &mut snapshotter); + + assert_eq!(2, snapshot.len(), "We only expect 2 metrics to be left."); + assert!(snapshot.contains_key("telemetry_test.test_name_user")); + assert!(snapshot.contains_key("telemetry_test.test_name_application")); + } + } + + #[test] + fn test_remove_single_metric() { + // Init the database in a temporary directory. + let dir = tempdir().unwrap(); + let db = Database::new(dir.path(), false).unwrap(); + + let test_storage = "test-storage-single-lifetime"; + let metric_id_pattern = "telemetry_test.single_metric"; + + // Write sample metrics to the database. + let lifetimes = vec![Lifetime::User, Lifetime::Ping, Lifetime::Application]; + + for lifetime in lifetimes.iter() { + for value in &["retain", "delete"] { + db.record_per_lifetime( + *lifetime, + test_storage, + &format!("{}_{}", metric_id_pattern, value), + &Metric::String((*value).to_string()), + ) + .unwrap(); + } + } + + // Remove "telemetry_test.single_metric_delete" from each lifetime. + for lifetime in lifetimes.iter() { + db.remove_single_metric( + *lifetime, + test_storage, + &format!("{}_delete", metric_id_pattern), + ) + .unwrap(); + } + + // Verify that "telemetry_test.single_metric_retain" is still around for all lifetimes. + for lifetime in lifetimes.iter() { + let mut found_metrics = 0; + let mut snapshotter = |metric_id: &[u8], metric: &Metric| { + found_metrics += 1; + let metric_id = String::from_utf8_lossy(metric_id).into_owned(); + assert_eq!(format!("{}_retain", metric_id_pattern), metric_id); + match metric { + Metric::String(s) => assert_eq!("retain", s), + _ => panic!("Unexpected data found"), + } + }; + + // Check the User lifetime. + db.iter_store_from(*lifetime, test_storage, None, &mut snapshotter); + assert_eq!( + 1, found_metrics, + "We only expect 1 metric for this lifetime." + ); + } + } + + #[test] + fn test_delayed_ping_lifetime_persistence() { + // Init the database in a temporary directory. + let dir = tempdir().unwrap(); + let db = Database::new(dir.path(), true).unwrap(); + let test_storage = "test-storage"; + + assert!(db.ping_lifetime_data.is_some()); + + // Attempt to record a known value. + let test_value1 = "test-value1"; + let test_metric_id1 = "telemetry_test.test_name1"; + db.record_per_lifetime( + Lifetime::Ping, + test_storage, + test_metric_id1, + &Metric::String(test_value1.to_string()), + ) + .unwrap(); + + // Attempt to persist data. + db.persist_ping_lifetime_data().unwrap(); + + // Attempt to record another known value. + let test_value2 = "test-value2"; + let test_metric_id2 = "telemetry_test.test_name2"; + db.record_per_lifetime( + Lifetime::Ping, + test_storage, + test_metric_id2, + &Metric::String(test_value2.to_string()), + ) + .unwrap(); + + { + // At this stage we expect `test_value1` to be persisted and in memory, + // since it was recorded before calling `persist_ping_lifetime_data`, + // and `test_value2` to be only in memory, since it was recorded after. + let store: SingleStore = db + .rkv + .open_single(Lifetime::Ping.as_str(), StoreOptions::create()) + .unwrap(); + let reader = db.rkv.read().unwrap(); + + // Verify that test_value1 is in rkv. + assert!(store + .get(&reader, format!("{}#{}", test_storage, test_metric_id1)) + .unwrap_or(None) + .is_some()); + // Verifiy that test_value2 is **not** in rkv. + assert!(store + .get(&reader, format!("{}#{}", test_storage, test_metric_id2)) + .unwrap_or(None) + .is_none()); + + let data = match &db.ping_lifetime_data { + Some(ping_lifetime_data) => ping_lifetime_data, + None => panic!("Expected `ping_lifetime_data` to exist here!"), + }; + let data = data.read().unwrap(); + // Verify that test_value1 is also in memory. + assert!(data + .get(&format!("{}#{}", test_storage, test_metric_id1)) + .is_some()); + // Verify that test_value2 is in memory. + assert!(data + .get(&format!("{}#{}", test_storage, test_metric_id2)) + .is_some()); + } + + // Attempt to persist data again. + db.persist_ping_lifetime_data().unwrap(); + + { + // At this stage we expect `test_value1` and `test_value2` to + // be persisted, since both were created before a call to + // `persist_ping_lifetime_data`. + let store: SingleStore = db + .rkv + .open_single(Lifetime::Ping.as_str(), StoreOptions::create()) + .unwrap(); + let reader = db.rkv.read().unwrap(); + + // Verify that test_value1 is in rkv. + assert!(store + .get(&reader, format!("{}#{}", test_storage, test_metric_id1)) + .unwrap_or(None) + .is_some()); + // Verifiy that test_value2 is also in rkv. + assert!(store + .get(&reader, format!("{}#{}", test_storage, test_metric_id2)) + .unwrap_or(None) + .is_some()); + + let data = match &db.ping_lifetime_data { + Some(ping_lifetime_data) => ping_lifetime_data, + None => panic!("Expected `ping_lifetime_data` to exist here!"), + }; + let data = data.read().unwrap(); + // Verify that test_value1 is also in memory. + assert!(data + .get(&format!("{}#{}", test_storage, test_metric_id1)) + .is_some()); + // Verify that test_value2 is also in memory. + assert!(data + .get(&format!("{}#{}", test_storage, test_metric_id2)) + .is_some()); + } + } + + #[test] + fn test_load_ping_lifetime_data_from_memory() { + // Init the database in a temporary directory. + let dir = tempdir().unwrap(); + + let test_storage = "test-storage"; + let test_value = "test-value"; + let test_metric_id = "telemetry_test.test_name"; + + { + let db = Database::new(dir.path(), true).unwrap(); + + // Attempt to record a known value. + db.record_per_lifetime( + Lifetime::Ping, + test_storage, + test_metric_id, + &Metric::String(test_value.to_string()), + ) + .unwrap(); + + // Verify that test_value is in memory. + let data = match &db.ping_lifetime_data { + Some(ping_lifetime_data) => ping_lifetime_data, + None => panic!("Expected `ping_lifetime_data` to exist here!"), + }; + let data = data.read().unwrap(); + assert!(data + .get(&format!("{}#{}", test_storage, test_metric_id)) + .is_some()); + + // Attempt to persist data. + db.persist_ping_lifetime_data().unwrap(); + + // Verify that test_value is now in rkv. + let store: SingleStore = db + .rkv + .open_single(Lifetime::Ping.as_str(), StoreOptions::create()) + .unwrap(); + let reader = db.rkv.read().unwrap(); + assert!(store + .get(&reader, format!("{}#{}", test_storage, test_metric_id)) + .unwrap_or(None) + .is_some()); + } + + // Now create a new instace of the db and check if data was + // correctly loaded from rkv to memory. + { + let db = Database::new(dir.path(), true).unwrap(); + + // Verify that test_value is in memory. + let data = match &db.ping_lifetime_data { + Some(ping_lifetime_data) => ping_lifetime_data, + None => panic!("Expected `ping_lifetime_data` to exist here!"), + }; + let data = data.read().unwrap(); + assert!(data + .get(&format!("{}#{}", test_storage, test_metric_id)) + .is_some()); + + // Verify that test_value is also in rkv. + let store: SingleStore = db + .rkv + .open_single(Lifetime::Ping.as_str(), StoreOptions::create()) + .unwrap(); + let reader = db.rkv.read().unwrap(); + assert!(store + .get(&reader, format!("{}#{}", test_storage, test_metric_id)) + .unwrap_or(None) + .is_some()); + } + } + + #[test] + fn test_delayed_ping_lifetime_clear() { + // Init the database in a temporary directory. + let dir = tempdir().unwrap(); + let db = Database::new(dir.path(), true).unwrap(); + let test_storage = "test-storage"; + + assert!(db.ping_lifetime_data.is_some()); + + // Attempt to record a known value. + let test_value1 = "test-value1"; + let test_metric_id1 = "telemetry_test.test_name1"; + db.record_per_lifetime( + Lifetime::Ping, + test_storage, + test_metric_id1, + &Metric::String(test_value1.to_string()), + ) + .unwrap(); + + { + let data = match &db.ping_lifetime_data { + Some(ping_lifetime_data) => ping_lifetime_data, + None => panic!("Expected `ping_lifetime_data` to exist here!"), + }; + let data = data.read().unwrap(); + // Verify that test_value1 is in memory. + assert!(data + .get(&format!("{}#{}", test_storage, test_metric_id1)) + .is_some()); + } + + // Clear ping lifetime storage for a storage that isn't test_storage. + // Doesn't matter what it's called, just that it isn't test_storage. + db.clear_ping_lifetime_storage(&(test_storage.to_owned() + "x")) + .unwrap(); + + { + let data = match &db.ping_lifetime_data { + Some(ping_lifetime_data) => ping_lifetime_data, + None => panic!("Expected `ping_lifetime_data` to exist here!"), + }; + let data = data.read().unwrap(); + // Verify that test_value1 is still in memory. + assert!(data + .get(&format!("{}#{}", test_storage, test_metric_id1)) + .is_some()); + } + + // Clear test_storage's ping lifetime storage. + db.clear_ping_lifetime_storage(test_storage).unwrap(); + + { + let data = match &db.ping_lifetime_data { + Some(ping_lifetime_data) => ping_lifetime_data, + None => panic!("Expected `ping_lifetime_data` to exist here!"), + }; + let data = data.read().unwrap(); + // Verify that test_value1 is no longer in memory. + assert!(data + .get(&format!("{}#{}", test_storage, test_metric_id1)) + .is_none()); + } + } + + #[test] + fn doesnt_record_when_upload_is_disabled() { + let (mut glean, dir) = new_glean(None); + + // Init the database in a temporary directory. + + let test_storage = "test-storage"; + let test_data = CommonMetricDataInternal::new("category", "name", test_storage); + let test_metric_id = test_data.identifier(&glean); + + // Attempt to record metric with the record and record_with functions, + // this should work since upload is enabled. + let db = Database::new(dir.path(), true).unwrap(); + db.record(&glean, &test_data, &Metric::String("record".to_owned())); + db.iter_store_from( + Lifetime::Ping, + test_storage, + None, + &mut |metric_id: &[u8], metric: &Metric| { + assert_eq!( + String::from_utf8_lossy(metric_id).into_owned(), + test_metric_id + ); + match metric { + Metric::String(v) => assert_eq!("record", *v), + _ => panic!("Unexpected data found"), + } + }, + ); + + db.record_with(&glean, &test_data, |_| { + Metric::String("record_with".to_owned()) + }); + db.iter_store_from( + Lifetime::Ping, + test_storage, + None, + &mut |metric_id: &[u8], metric: &Metric| { + assert_eq!( + String::from_utf8_lossy(metric_id).into_owned(), + test_metric_id + ); + match metric { + Metric::String(v) => assert_eq!("record_with", *v), + _ => panic!("Unexpected data found"), + } + }, + ); + + // Disable upload + glean.set_upload_enabled(false); + + // Attempt to record metric with the record and record_with functions, + // this should work since upload is now **disabled**. + db.record(&glean, &test_data, &Metric::String("record_nop".to_owned())); + db.iter_store_from( + Lifetime::Ping, + test_storage, + None, + &mut |metric_id: &[u8], metric: &Metric| { + assert_eq!( + String::from_utf8_lossy(metric_id).into_owned(), + test_metric_id + ); + match metric { + Metric::String(v) => assert_eq!("record_with", *v), + _ => panic!("Unexpected data found"), + } + }, + ); + db.record_with(&glean, &test_data, |_| { + Metric::String("record_with_nop".to_owned()) + }); + db.iter_store_from( + Lifetime::Ping, + test_storage, + None, + &mut |metric_id: &[u8], metric: &Metric| { + assert_eq!( + String::from_utf8_lossy(metric_id).into_owned(), + test_metric_id + ); + match metric { + Metric::String(v) => assert_eq!("record_with", *v), + _ => panic!("Unexpected data found"), + } + }, + ); + } + + mod safe_mode { + use std::fs::File; + + use super::*; + use rkv::Value; + + #[test] + fn empty_data_file() { + let dir = tempdir().unwrap(); + + // Create database directory structure. + let database_dir = dir.path().join("db"); + fs::create_dir_all(&database_dir).expect("create database dir"); + + // Create empty database file. + let safebin = database_dir.join("data.safe.bin"); + let f = File::create(safebin).expect("create database file"); + drop(f); + + Database::new(dir.path(), false).unwrap(); + + assert!(dir.path().exists()); + } + + #[test] + fn corrupted_data_file() { + let dir = tempdir().unwrap(); + + // Create database directory structure. + let database_dir = dir.path().join("db"); + fs::create_dir_all(&database_dir).expect("create database dir"); + + // Create empty database file. + let safebin = database_dir.join("data.safe.bin"); + fs::write(safebin, "").expect("write to database file"); + + Database::new(dir.path(), false).unwrap(); + + assert!(dir.path().exists()); + } + + #[test] + fn migration_works_on_startup() { + let dir = tempdir().unwrap(); + + let database_dir = dir.path().join("db"); + let datamdb = database_dir.join("data.mdb"); + let lockmdb = database_dir.join("lock.mdb"); + let safebin = database_dir.join("data.safe.bin"); + + assert!(!safebin.exists()); + assert!(!datamdb.exists()); + assert!(!lockmdb.exists()); + + let store_name = "store1"; + let metric_name = "bool"; + let key = Database::get_storage_key(store_name, Some(metric_name)); + + // Ensure some old data in the LMDB format exists. + { + fs::create_dir_all(&database_dir).expect("create dir"); + let rkv_db = rkv::Rkv::new::(&database_dir).expect("rkv env"); + + let store = rkv_db + .open_single("ping", StoreOptions::create()) + .expect("opened"); + let mut writer = rkv_db.write().expect("writer"); + let metric = Metric::Boolean(true); + let value = bincode::serialize(&metric).expect("serialized"); + store + .put(&mut writer, &key, &Value::Blob(&value)) + .expect("wrote"); + writer.commit().expect("committed"); + + assert!(datamdb.exists()); + assert!(lockmdb.exists()); + assert!(!safebin.exists()); + } + + // First open should migrate the data. + { + let db = Database::new(dir.path(), false).unwrap(); + let safebin = database_dir.join("data.safe.bin"); + assert!(safebin.exists(), "safe-mode file should exist"); + assert!(!datamdb.exists(), "LMDB data should be deleted"); + assert!(!lockmdb.exists(), "LMDB lock should be deleted"); + + let mut stored_metrics = vec![]; + let mut snapshotter = |name: &[u8], metric: &Metric| { + let name = str::from_utf8(name).unwrap().to_string(); + stored_metrics.push((name, metric.clone())) + }; + db.iter_store_from(Lifetime::Ping, "store1", None, &mut snapshotter); + + assert_eq!(1, stored_metrics.len()); + assert_eq!(metric_name, stored_metrics[0].0); + assert_eq!(&Metric::Boolean(true), &stored_metrics[0].1); + } + + // Next open should not re-create the LMDB files. + { + let db = Database::new(dir.path(), false).unwrap(); + let safebin = database_dir.join("data.safe.bin"); + assert!(safebin.exists(), "safe-mode file exists"); + assert!(!datamdb.exists(), "LMDB data should not be recreated"); + assert!(!lockmdb.exists(), "LMDB lock should not be recreated"); + + let mut stored_metrics = vec![]; + let mut snapshotter = |name: &[u8], metric: &Metric| { + let name = str::from_utf8(name).unwrap().to_string(); + stored_metrics.push((name, metric.clone())) + }; + db.iter_store_from(Lifetime::Ping, "store1", None, &mut snapshotter); + + assert_eq!(1, stored_metrics.len()); + assert_eq!(metric_name, stored_metrics[0].0); + assert_eq!(&Metric::Boolean(true), &stored_metrics[0].1); + } + } + + #[test] + fn migration_doesnt_overwrite() { + let dir = tempdir().unwrap(); + + let database_dir = dir.path().join("db"); + let datamdb = database_dir.join("data.mdb"); + let lockmdb = database_dir.join("lock.mdb"); + let safebin = database_dir.join("data.safe.bin"); + + assert!(!safebin.exists()); + assert!(!datamdb.exists()); + assert!(!lockmdb.exists()); + + let store_name = "store1"; + let metric_name = "counter"; + let key = Database::get_storage_key(store_name, Some(metric_name)); + + // Ensure some old data in the LMDB format exists. + { + fs::create_dir_all(&database_dir).expect("create dir"); + let rkv_db = rkv::Rkv::new::(&database_dir).expect("rkv env"); + + let store = rkv_db + .open_single("ping", StoreOptions::create()) + .expect("opened"); + let mut writer = rkv_db.write().expect("writer"); + let metric = Metric::Counter(734); // this value will be ignored + let value = bincode::serialize(&metric).expect("serialized"); + store + .put(&mut writer, &key, &Value::Blob(&value)) + .expect("wrote"); + writer.commit().expect("committed"); + + assert!(datamdb.exists()); + assert!(lockmdb.exists()); + } + + // Ensure some data exists in the new database. + { + fs::create_dir_all(&database_dir).expect("create dir"); + let rkv_db = + rkv::Rkv::new::(&database_dir).expect("rkv env"); + + let store = rkv_db + .open_single("ping", StoreOptions::create()) + .expect("opened"); + let mut writer = rkv_db.write().expect("writer"); + let metric = Metric::Counter(2); + let value = bincode::serialize(&metric).expect("serialized"); + store + .put(&mut writer, &key, &Value::Blob(&value)) + .expect("wrote"); + writer.commit().expect("committed"); + + assert!(safebin.exists()); + } + + // First open should try migration and ignore it, because destination is not empty. + // It also deletes the leftover LMDB database. + { + let db = Database::new(dir.path(), false).unwrap(); + let safebin = database_dir.join("data.safe.bin"); + assert!(safebin.exists(), "safe-mode file should exist"); + assert!(!datamdb.exists(), "LMDB data should be deleted"); + assert!(!lockmdb.exists(), "LMDB lock should be deleted"); + + let mut stored_metrics = vec![]; + let mut snapshotter = |name: &[u8], metric: &Metric| { + let name = str::from_utf8(name).unwrap().to_string(); + stored_metrics.push((name, metric.clone())) + }; + db.iter_store_from(Lifetime::Ping, "store1", None, &mut snapshotter); + + assert_eq!(1, stored_metrics.len()); + assert_eq!(metric_name, stored_metrics[0].0); + assert_eq!(&Metric::Counter(2), &stored_metrics[0].1); + } + } + + #[test] + fn migration_ignores_broken_database() { + let dir = tempdir().unwrap(); + + let database_dir = dir.path().join("db"); + let datamdb = database_dir.join("data.mdb"); + let lockmdb = database_dir.join("lock.mdb"); + let safebin = database_dir.join("data.safe.bin"); + + assert!(!safebin.exists()); + assert!(!datamdb.exists()); + assert!(!lockmdb.exists()); + + let store_name = "store1"; + let metric_name = "counter"; + let key = Database::get_storage_key(store_name, Some(metric_name)); + + // Ensure some old data in the LMDB format exists. + { + fs::create_dir_all(&database_dir).expect("create dir"); + fs::write(&datamdb, "bogus").expect("dbfile created"); + + assert!(datamdb.exists()); + } + + // Ensure some data exists in the new database. + { + fs::create_dir_all(&database_dir).expect("create dir"); + let rkv_db = + rkv::Rkv::new::(&database_dir).expect("rkv env"); + + let store = rkv_db + .open_single("ping", StoreOptions::create()) + .expect("opened"); + let mut writer = rkv_db.write().expect("writer"); + let metric = Metric::Counter(2); + let value = bincode::serialize(&metric).expect("serialized"); + store + .put(&mut writer, &key, &Value::Blob(&value)) + .expect("wrote"); + writer.commit().expect("committed"); + } + + // First open should try migration and ignore it, because destination is not empty. + // It also deletes the leftover LMDB database. + { + let db = Database::new(dir.path(), false).unwrap(); + let safebin = database_dir.join("data.safe.bin"); + assert!(safebin.exists(), "safe-mode file should exist"); + assert!(!datamdb.exists(), "LMDB data should be deleted"); + assert!(!lockmdb.exists(), "LMDB lock should be deleted"); + + let mut stored_metrics = vec![]; + let mut snapshotter = |name: &[u8], metric: &Metric| { + let name = str::from_utf8(name).unwrap().to_string(); + stored_metrics.push((name, metric.clone())) + }; + db.iter_store_from(Lifetime::Ping, "store1", None, &mut snapshotter); + + assert_eq!(1, stored_metrics.len()); + assert_eq!(metric_name, stored_metrics[0].0); + assert_eq!(&Metric::Counter(2), &stored_metrics[0].1); + } + } + + #[test] + fn migration_ignores_empty_database() { + let dir = tempdir().unwrap(); + + let database_dir = dir.path().join("db"); + let datamdb = database_dir.join("data.mdb"); + let lockmdb = database_dir.join("lock.mdb"); + let safebin = database_dir.join("data.safe.bin"); + + assert!(!safebin.exists()); + assert!(!datamdb.exists()); + assert!(!lockmdb.exists()); + + // Ensure old LMDB database exists, but is empty. + { + fs::create_dir_all(&database_dir).expect("create dir"); + let rkv_db = rkv::Rkv::new::(&database_dir).expect("rkv env"); + drop(rkv_db); + assert!(datamdb.exists()); + assert!(lockmdb.exists()); + } + + // First open should try migration, but find no data. + // safe-mode does not write an empty database to disk. + // It also deletes the leftover LMDB database. + { + let _db = Database::new(dir.path(), false).unwrap(); + let safebin = database_dir.join("data.safe.bin"); + assert!(!safebin.exists(), "safe-mode file should exist"); + assert!(!datamdb.exists(), "LMDB data should be deleted"); + assert!(!lockmdb.exists(), "LMDB lock should be deleted"); + } + } + } +} diff --git a/third_party/rust/glean-core/src/debug.rs b/third_party/rust/glean-core/src/debug.rs new file mode 100644 index 0000000000..a572a02b8f --- /dev/null +++ b/third_party/rust/glean-core/src/debug.rs @@ -0,0 +1,319 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! # Debug options +//! +//! The debug options for Glean may be set by calling one of the `set_*` functions +//! or by setting specific environment variables. +//! +//! The environment variables will be read only once when the options are initialized. +//! +//! The possible debugging features available out of the box are: +//! +//! * **Ping logging** - logging the contents of ping requests that are correctly assembled; +//! This may be set by calling glean.set_log_pings(value: bool) +//! or by setting the environment variable GLEAN_LOG_PINGS="true"; +//! * **Debug tagging** - Adding the X-Debug-ID header to every ping request, +//! allowing these tagged pings to be sent to the ["Ping Debug Viewer"](https://mozilla.github.io/glean/book/dev/core/internal/debug-pings.html). +//! This may be set by calling glean.set_debug_view_tag(value: &str) +//! or by setting the environment variable GLEAN_DEBUG_VIEW_TAG=; +//! * **Source tagging** - Adding the X-Source-Tags header to every ping request, +//! allowing pings to be tagged with custom labels. +//! This may be set by calling glean.set_source_tags(value: Vec) +//! or by setting the environment variable GLEAN_SOURCE_TAGS=; +//! +//! Bindings may implement other debugging features, e.g. sending pings on demand. + +use std::env; + +const GLEAN_LOG_PINGS: &str = "GLEAN_LOG_PINGS"; +const GLEAN_DEBUG_VIEW_TAG: &str = "GLEAN_DEBUG_VIEW_TAG"; +const GLEAN_SOURCE_TAGS: &str = "GLEAN_SOURCE_TAGS"; +const GLEAN_MAX_SOURCE_TAGS: usize = 5; + +/// A representation of all of Glean's debug options. +pub struct DebugOptions { + /// Option to log the payload of pings that are successfully assembled into a ping request. + pub log_pings: DebugOption, + /// Option to add the X-Debug-ID header to every ping request. + pub debug_view_tag: DebugOption, + /// Option to add the X-Source-Tags header to ping requests. This will allow the data + /// consumers to classify data depending on the applied tags. + pub source_tags: DebugOption>, +} + +impl std::fmt::Debug for DebugOptions { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + fmt.debug_struct("DebugOptions") + .field("log_pings", &self.log_pings.get()) + .field("debug_view_tag", &self.debug_view_tag.get()) + .field("source_tags", &self.source_tags.get()) + .finish() + } +} + +impl DebugOptions { + pub fn new() -> Self { + Self { + log_pings: DebugOption::new(GLEAN_LOG_PINGS, get_bool_from_str, None), + debug_view_tag: DebugOption::new(GLEAN_DEBUG_VIEW_TAG, Some, Some(validate_tag)), + source_tags: DebugOption::new( + GLEAN_SOURCE_TAGS, + tokenize_string, + Some(validate_source_tags), + ), + } + } +} + +/// A representation of a debug option, +/// where the value can be set programmatically or come from an environment variable. +#[derive(Debug)] +pub struct DebugOption Option, V = fn(&T) -> bool> { + /// The name of the environment variable related to this debug option. + env: String, + /// The actual value of this option. + value: Option, + /// Function to extract the data of type `T` from a `String`, used when + /// extracting data from the environment. + extraction: E, + /// Optional function to validate the value parsed from the environment + /// or passed to the `set` function. + validation: Option, +} + +impl DebugOption +where + T: Clone, + E: Fn(String) -> Option, + V: Fn(&T) -> bool, +{ + /// Creates a new debug option. + /// + /// Tries to get the initial value of the option from the environment. + pub fn new(env: &str, extraction: E, validation: Option) -> Self { + let mut option = Self { + env: env.into(), + value: None, + extraction, + validation, + }; + + option.set_from_env(); + option + } + + fn validate(&self, value: &T) -> bool { + if let Some(f) = self.validation.as_ref() { + f(value) + } else { + true + } + } + + fn set_from_env(&mut self) { + let extract = &self.extraction; + match env::var(&self.env) { + Ok(env_value) => match extract(env_value.clone()) { + Some(v) => { + self.set(v); + } + None => { + log::error!( + "Unable to parse debug option {}={} into {}. Ignoring.", + self.env, + env_value, + std::any::type_name::() + ); + } + }, + Err(env::VarError::NotUnicode(_)) => { + log::error!("The value of {} is not valid unicode. Ignoring.", self.env) + } + // The other possible error is that the env var is not set, + // which is not an error for us and can safely be ignored. + Err(_) => {} + } + } + + /// Tries to set a value for this debug option. + /// + /// Validates the value in case a validation function is available. + /// + /// # Returns + /// + /// Whether the option passed validation and was succesfully set. + pub fn set(&mut self, value: T) -> bool { + let validated = self.validate(&value); + if validated { + log::info!("Setting the debug option {}.", self.env); + self.value = Some(value); + return true; + } + log::error!("Invalid value for debug option {}.", self.env); + false + } + + /// Gets the value of this debug option. + pub fn get(&self) -> Option<&T> { + self.value.as_ref() + } +} + +fn get_bool_from_str(value: String) -> Option { + std::str::FromStr::from_str(&value).ok() +} + +fn tokenize_string(value: String) -> Option> { + let trimmed = value.trim(); + if trimmed.is_empty() { + return None; + } + + Some(trimmed.split(',').map(|s| s.trim().to_string()).collect()) +} + +/// A tag is the value used in both the `X-Debug-ID` and `X-Source-Tags` headers +/// of tagged ping requests, thus is it must be a valid header value. +/// +/// In other words, it must match the regex: "[a-zA-Z0-9-]{1,20}" +/// +/// The regex crate isn't used here because it adds to the binary size, +/// and the Glean SDK doesn't use regular expressions anywhere else. +#[allow(clippy::ptr_arg)] +fn validate_tag(value: &String) -> bool { + if value.is_empty() { + log::error!("A tag must have at least one character."); + return false; + } + + let mut iter = value.chars(); + let mut count = 0; + + loop { + match iter.next() { + // We are done, so the whole expression is valid. + None => return true, + // Valid characters. + Some('-') | Some('a'..='z') | Some('A'..='Z') | Some('0'..='9') => (), + // An invalid character + Some(c) => { + log::error!("Invalid character '{}' in the tag.", c); + return false; + } + } + count += 1; + if count == 20 { + log::error!("A tag cannot exceed 20 characters."); + return false; + } + } +} + +/// Validate the list of source tags. +/// +/// This builds upon the existing `validate_tag` function, since all the +/// tags should respect the same rules to make the pipeline happy. +#[allow(clippy::ptr_arg)] +fn validate_source_tags(tags: &Vec) -> bool { + if tags.is_empty() { + return false; + } + + if tags.len() > GLEAN_MAX_SOURCE_TAGS { + log::error!( + "A list of tags cannot contain more than {} elements.", + GLEAN_MAX_SOURCE_TAGS + ); + return false; + } + + if tags.iter().any(|s| s.starts_with("glean")) { + log::error!("Tags starting with `glean` are reserved and must not be used."); + return false; + } + + tags.iter().all(validate_tag) +} + +#[cfg(test)] +mod test { + use super::*; + use std::env; + + #[test] + fn debug_option_is_correctly_loaded_from_env() { + env::set_var("GLEAN_TEST_1", "test"); + let option: DebugOption = DebugOption::new("GLEAN_TEST_1", Some, None); + assert_eq!(option.get().unwrap(), "test"); + } + + #[test] + fn debug_option_is_correctly_validated_when_necessary() { + #[allow(clippy::ptr_arg)] + fn validate(value: &String) -> bool { + value == "test" + } + + // Invalid values from the env are not set + env::set_var("GLEAN_TEST_2", "invalid"); + let mut option: DebugOption = + DebugOption::new("GLEAN_TEST_2", Some, Some(validate)); + assert!(option.get().is_none()); + + // Valid values are set using the `set` function + assert!(option.set("test".into())); + assert_eq!(option.get().unwrap(), "test"); + + // Invalid values are not set using the `set` function + assert!(!option.set("invalid".into())); + assert_eq!(option.get().unwrap(), "test"); + } + + #[test] + fn tokenize_string_splits_correctly() { + // Valid list is properly tokenized and spaces are trimmed. + assert_eq!( + Some(vec!["test1".to_string(), "test2".to_string()]), + tokenize_string(" test1, test2 ".to_string()) + ); + + // Empty strings return no item. + assert_eq!(None, tokenize_string("".to_string())); + } + + #[test] + fn validates_tag_correctly() { + assert!(validate_tag(&"valid-value".to_string())); + assert!(validate_tag(&"-also-valid-value".to_string())); + assert!(!validate_tag(&"invalid_value".to_string())); + assert!(!validate_tag(&"invalid value".to_string())); + assert!(!validate_tag(&"!nv@lid-val*e".to_string())); + assert!(!validate_tag( + &"invalid-value-because-way-too-long".to_string() + )); + assert!(!validate_tag(&"".to_string())); + } + + #[test] + fn validates_source_tags_correctly() { + // Empty tags. + assert!(!validate_source_tags(&vec!["".to_string()])); + // Too many tags. + assert!(!validate_source_tags(&vec![ + "1".to_string(), + "2".to_string(), + "3".to_string(), + "4".to_string(), + "5".to_string(), + "6".to_string() + ])); + // Invalid tags. + assert!(!validate_source_tags(&vec!["!nv@lid-val*e".to_string()])); + assert!(!validate_source_tags(&vec![ + "glean-test1".to_string(), + "test2".to_string() + ])); + } +} diff --git a/third_party/rust/glean-core/src/dispatcher/global.rs b/third_party/rust/glean-core/src/dispatcher/global.rs new file mode 100644 index 0000000000..f90a681a5e --- /dev/null +++ b/third_party/rust/glean-core/src/dispatcher/global.rs @@ -0,0 +1,232 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use once_cell::sync::Lazy; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::RwLock; +use std::thread; +use std::time::Duration; + +use super::{DispatchError, DispatchGuard, Dispatcher}; +use crossbeam_channel::RecvTimeoutError; + +#[cfg(feature = "preinit_million_queue")] +pub const GLOBAL_DISPATCHER_LIMIT: usize = 1000000; +#[cfg(not(feature = "preinit_million_queue"))] +pub const GLOBAL_DISPATCHER_LIMIT: usize = 1000; + +static GLOBAL_DISPATCHER: Lazy>> = + Lazy::new(|| RwLock::new(Some(Dispatcher::new(GLOBAL_DISPATCHER_LIMIT)))); +pub static TESTING_MODE: AtomicBool = AtomicBool::new(false); +pub static QUEUE_TASKS: AtomicBool = AtomicBool::new(true); + +pub fn is_test_mode() -> bool { + TESTING_MODE.load(Ordering::SeqCst) +} + +/// Get a dispatcher for the global queue. +/// +/// A dispatcher is cheap to create, so we create one on every access instead of caching it. +/// This avoids troubles for tests where the global dispatcher _can_ change. +fn guard() -> DispatchGuard { + GLOBAL_DISPATCHER + .read() + .unwrap() + .as_ref() + .map(|dispatcher| dispatcher.guard()) + .unwrap() +} + +/// Launches a new task on the global dispatch queue. +/// +/// The new task will be enqueued immediately. +/// If the pre-init queue was already flushed, +/// the background thread will process tasks in the queue (see [`flush_init`]). +/// +/// This will not block. +/// +/// [`flush_init`]: fn.flush_init.html +pub fn launch(task: impl FnOnce() + Send + 'static) { + let current_thread = thread::current(); + if let Some("glean.shutdown") = current_thread.name() { + log::error!("Tried to launch a task from the shutdown thread. That is forbidden."); + } + + let guard = guard(); + match guard.launch(task) { + Ok(_) => {} + Err(DispatchError::QueueFull) => { + log::info!("Exceeded maximum queue size, discarding task"); + // TODO: Record this as an error. + } + Err(_) => { + log::info!("Failed to launch a task on the queue. Discarding task."); + } + } + + // In test mode wait for the execution, unless we're still queueing tasks. + let is_queueing = QUEUE_TASKS.load(Ordering::SeqCst); + let is_test = TESTING_MODE.load(Ordering::SeqCst); + if !is_queueing && is_test { + guard.block_on_queue(); + } +} + +/// Block until all tasks prior to this call are processed. +pub fn block_on_queue() { + guard().block_on_queue(); +} + +/// Block until all tasks prior to this call are processed, with a timeout. +pub fn block_on_queue_timeout(timeout: Duration) -> Result<(), RecvTimeoutError> { + guard().block_on_queue_timeout(timeout) +} + +/// Starts processing queued tasks in the global dispatch queue. +/// +/// This function blocks until queued tasks prior to this call are finished. +/// Once the initial queue is empty the dispatcher will wait for new tasks to be launched. +/// +/// # Returns +/// +/// Returns the total number of items that were added to the queue before being flushed, +/// or an error if the queue couldn't be flushed. +pub fn flush_init() -> Result { + guard().flush_init() +} + +fn join_dispatcher_thread() -> Result<(), DispatchError> { + // After we issue the shutdown command, make sure to wait for the + // worker thread to join. + let mut lock = GLOBAL_DISPATCHER.write().unwrap(); + let dispatcher = lock.as_mut().expect("Global dispatcher has gone missing"); + + if let Some(worker) = dispatcher.worker.take() { + return worker.join().map_err(|_| DispatchError::WorkerPanic); + } + + Ok(()) +} + +/// Kill the blocked dispatcher without processing the queue. +/// +/// This will immediately shutdown the worker thread +/// and no other tasks will be processed. +/// This only has an effect when the queue is still blocked. +pub fn kill() -> Result<(), DispatchError> { + guard().kill()?; + join_dispatcher_thread() +} + +/// Shuts down the dispatch queue. +/// +/// This will initiate a shutdown of the worker thread +/// and no new tasks will be processed after this. +pub fn shutdown() -> Result<(), DispatchError> { + guard().shutdown()?; + join_dispatcher_thread() +} + +/// TEST ONLY FUNCTION. +/// Resets the Glean state and triggers init again. +pub(crate) fn reset_dispatcher() { + // We don't care about shutdown errors, since they will + // definitely happen if this is run concurrently. + // We will still replace the global dispatcher. + let _ = shutdown(); + + // New dispatcher = we're queuing again. + QUEUE_TASKS.store(true, Ordering::SeqCst); + + // Now that the dispatcher is shut down, replace it. + // For that we + // 1. Create a new + // 2. Replace the global one + // 3. Only then return (and thus release the lock) + let mut lock = GLOBAL_DISPATCHER.write().unwrap(); + let new_dispatcher = Some(Dispatcher::new(GLOBAL_DISPATCHER_LIMIT)); + *lock = new_dispatcher; +} + +#[cfg(test)] +mod test { + use std::sync::{Arc, Mutex}; + + use super::*; + + #[test] + #[ignore] // We can't reset the queue at the moment, so filling it up breaks other tests. + fn global_fills_up_in_order_and_works() { + let _ = env_logger::builder().is_test(true).try_init(); + + let result = Arc::new(Mutex::new(vec![])); + + for i in 1..=GLOBAL_DISPATCHER_LIMIT { + let result = Arc::clone(&result); + launch(move || { + result.lock().unwrap().push(i); + }); + } + + { + let result = Arc::clone(&result); + launch(move || { + result.lock().unwrap().push(150); + }); + } + + flush_init().unwrap(); + + { + let result = Arc::clone(&result); + launch(move || { + result.lock().unwrap().push(200); + }); + } + + block_on_queue(); + + let mut expected = (1..=GLOBAL_DISPATCHER_LIMIT).collect::>(); + expected.push(200); + assert_eq!(&*result.lock().unwrap(), &expected); + } + + #[test] + #[ignore] // We can't reset the queue at the moment, so flushing it breaks other tests. + fn global_nested_calls() { + let _ = env_logger::builder().is_test(true).try_init(); + + let result = Arc::new(Mutex::new(vec![])); + + { + let result = Arc::clone(&result); + launch(move || { + result.lock().unwrap().push(1); + }); + } + + flush_init().unwrap(); + + { + let result = Arc::clone(&result); + launch(move || { + result.lock().unwrap().push(21); + + { + let result = Arc::clone(&result); + launch(move || { + result.lock().unwrap().push(3); + }); + } + + result.lock().unwrap().push(22); + }); + } + + block_on_queue(); + + let expected = vec![1, 21, 22, 3]; + assert_eq!(&*result.lock().unwrap(), &expected); + } +} diff --git a/third_party/rust/glean-core/src/dispatcher/mod.rs b/third_party/rust/glean-core/src/dispatcher/mod.rs new file mode 100644 index 0000000000..257695c34e --- /dev/null +++ b/third_party/rust/glean-core/src/dispatcher/mod.rs @@ -0,0 +1,589 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! A global dispatcher queue. +//! +//! # Example - Global Dispatch queue +//! +//! The global dispatch queue is pre-configured with a maximum queue size of 100 tasks. +//! +//! ```rust,ignore +//! // Ensure the dispatcher queue is being worked on. +//! dispatcher::flush_init(); +//! +//! dispatcher::launch(|| { +//! println!("Executing expensive task"); +//! // Run your expensive task in a separate thread. +//! }); +//! +//! dispatcher::launch(|| { +//! println!("A second task that's executed sequentially, but off the main thread."); +//! }); +//! ``` + +use std::{ + mem, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, + thread::{self, JoinHandle}, + time::Duration, +}; + +use crossbeam_channel::{bounded, unbounded, RecvTimeoutError, SendError, Sender}; +use thiserror::Error; + +pub use global::*; + +pub(crate) mod global; + +/// Command received while blocked from further work. +enum Blocked { + /// Shutdown immediately without processing the queue. + Shutdown, + /// Unblock and continue with work as normal. + Continue, +} + +/// The command a worker should execute. +enum Command { + /// A task is a user-defined function to run. + Task(Box), + + /// Swap the channel + Swap(Sender<()>), + + /// Signal the worker to finish work and shut down. + Shutdown, +} + +/// The error returned from operations on the dispatcher +#[derive(Error, Debug, PartialEq, Eq)] +pub enum DispatchError { + /// The worker panicked while running a task + #[error("The worker panicked while running a task")] + WorkerPanic, + + /// Maximum queue size reached + #[error("Maximum queue size reached")] + QueueFull, + + /// Pre-init buffer was already flushed + #[error("Pre-init buffer was already flushed")] + AlreadyFlushed, + + /// Failed to send command to worker thread + #[error("Failed to send command to worker thread")] + SendError, + + /// Failed to receive from channel + #[error("Failed to receive from channel")] + RecvError(#[from] crossbeam_channel::RecvError), +} + +impl From> for DispatchError { + fn from(_: SendError) -> Self { + DispatchError::SendError + } +} + +/// A clonable guard for a dispatch queue. +#[derive(Clone)] +struct DispatchGuard { + /// Whether to queue on the preinit buffer or on the unbounded queue + queue_preinit: Arc, + + /// The number of items that were added to the queue after it filled up. + overflow_count: Arc, + + /// The maximum pre-init queue size + max_queue_size: usize, + + /// Used to unblock the worker thread initially. + block_sender: Sender, + + /// Sender for the preinit queue. + preinit_sender: Sender, + + /// Sender for the unbounded queue. + sender: Sender, +} + +impl DispatchGuard { + pub fn launch(&self, task: impl FnOnce() + Send + 'static) -> Result<(), DispatchError> { + let task = Command::Task(Box::new(task)); + self.send(task) + } + + pub fn shutdown(&mut self) -> Result<(), DispatchError> { + // Need to flush in order for the thread to actually process anything, + // including the shutdown command. + self.flush_init().ok(); + self.send(Command::Shutdown) + } + + fn send(&self, task: Command) -> Result<(), DispatchError> { + if self.queue_preinit.load(Ordering::SeqCst) { + if self.preinit_sender.len() < self.max_queue_size { + self.preinit_sender.send(task)?; + Ok(()) + } else { + self.overflow_count.fetch_add(1, Ordering::SeqCst); + // Instead of using a bounded queue, we are handling the bounds + // checking ourselves. If a bounded queue were full, we would return + // a QueueFull DispatchError, so we do the same here. + Err(DispatchError::QueueFull) + } + } else { + self.sender.send(task)?; + Ok(()) + } + } + + fn block_on_queue(&self) { + let (tx, rx) = crossbeam_channel::bounded(0); + + // We explicitly don't use `self.launch` here. + // We always put this task on the unbounded queue. + // The pre-init queue might be full before its flushed, in which case this would panic. + // Blocking on the queue can only work if it is eventually flushed anyway. + + let task = Command::Task(Box::new(move || { + tx.send(()) + .expect("(worker) Can't send message on single-use channel"); + })); + self.sender + .send(task) + .expect("Failed to launch the blocking task"); + + rx.recv() + .expect("Failed to receive message on single-use channel"); + } + + /// Block on the task queue emptying, with a timeout. + fn block_on_queue_timeout(&self, timeout: Duration) -> Result<(), RecvTimeoutError> { + let (tx, rx) = crossbeam_channel::bounded(0); + + // We explicitly don't use `self.launch` here. + // We always put this task on the unbounded queue. + // The pre-init queue might be full before its flushed, in which case this would panic. + // Blocking on the queue can only work if it is eventually flushed anyway. + + let task = Command::Task(Box::new(move || { + tx.send(()) + .expect("(worker) Can't send message on single-use channel"); + })); + self.sender + .send(task) + .expect("Failed to launch the blocking task"); + + rx.recv_timeout(timeout) + } + + fn kill(&mut self) -> Result<(), DispatchError> { + // We immediately stop queueing in the pre-init buffer. + let old_val = self.queue_preinit.swap(false, Ordering::SeqCst); + if !old_val { + return Err(DispatchError::AlreadyFlushed); + } + + // Unblock the worker thread exactly once. + self.block_sender.send(Blocked::Shutdown)?; + Ok(()) + } + + /// Flushes the pre-init buffer. + /// + /// This function blocks until tasks queued prior to this call are finished. + /// Once the initial queue is empty the dispatcher will wait for new tasks to be launched. + /// + /// Returns an error if called multiple times. + fn flush_init(&mut self) -> Result { + // We immediately stop queueing in the pre-init buffer. + let old_val = self.queue_preinit.swap(false, Ordering::SeqCst); + if !old_val { + return Err(DispatchError::AlreadyFlushed); + } + + // Unblock the worker thread exactly once. + self.block_sender.send(Blocked::Continue)?; + + // Single-use channel to communicate with the worker thread. + let (swap_sender, swap_receiver) = bounded(0); + + // Send final command and block until it is sent. + self.preinit_sender + .send(Command::Swap(swap_sender)) + .map_err(|_| DispatchError::SendError)?; + + // Now wait for the worker thread to do the swap and inform us. + // This blocks until all tasks in the preinit buffer have been processed. + swap_receiver.recv()?; + + // We're not queueing anymore. + global::QUEUE_TASKS.store(false, Ordering::SeqCst); + + let overflow_count = self.overflow_count.load(Ordering::SeqCst); + if overflow_count > 0 { + Ok(overflow_count) + } else { + Ok(0) + } + } +} + +/// A dispatcher. +/// +/// Run expensive processing tasks sequentially off the main thread. +/// Tasks are processed in a single separate thread in the order they are submitted. +/// The dispatch queue will enqueue tasks while not flushed, up to the maximum queue size. +/// Processing will start after flushing once, processing already enqueued tasks first, then +/// waiting for further tasks to be enqueued. +pub struct Dispatcher { + /// Guard used for communication with the worker thread. + guard: DispatchGuard, + + /// Handle to the worker thread, allows to wait for it to finish. + worker: Option>, +} + +impl Dispatcher { + /// Creates a new dispatcher with a maximum queue size. + /// + /// Launched tasks won't run until [`flush_init`] is called. + /// + /// [`flush_init`]: #method.flush_init + pub fn new(max_queue_size: usize) -> Self { + let (block_sender, block_receiver) = bounded(1); + let (preinit_sender, preinit_receiver) = unbounded(); + let (sender, mut unbounded_receiver) = unbounded(); + + let queue_preinit = Arc::new(AtomicBool::new(true)); + let overflow_count = Arc::new(AtomicUsize::new(0)); + + let worker = thread::Builder::new() + .name("glean.dispatcher".into()) + .spawn(move || { + match block_receiver.recv() { + Err(_) => { + // The other side was disconnected. + // There's nothing the worker thread can do. + log::error!("The task producer was disconnected. Worker thread will exit."); + return; + } + Ok(Blocked::Shutdown) => { + // The other side wants us to stop immediately + return; + } + Ok(Blocked::Continue) => { + // Queue is unblocked, processing continues as normal. + } + } + + let mut receiver = preinit_receiver; + loop { + use Command::*; + + match receiver.recv() { + Ok(Shutdown) => { + break; + } + + Ok(Task(f)) => { + (f)(); + } + + Ok(Swap(swap_done)) => { + // A swap should only occur exactly once. + // This is upheld by `flush_init`, which errors out if the preinit buffer + // was already flushed. + + // We swap the channels we listen on for new tasks. + // The next iteration will continue with the unbounded queue. + mem::swap(&mut receiver, &mut unbounded_receiver); + + // The swap command MUST be the last one received on the preinit buffer, + // so by the time we run this we know all preinit tasks were processed. + // We can notify the other side. + swap_done + .send(()) + .expect("The caller of `flush_init` has gone missing"); + } + + // Other side was disconnected. + Err(_) => { + log::error!( + "The task producer was disconnected. Worker thread will exit." + ); + return; + } + } + } + }) + .expect("Failed to spawn Glean's dispatcher thread"); + + let guard = DispatchGuard { + queue_preinit, + overflow_count, + max_queue_size, + block_sender, + preinit_sender, + sender, + }; + + Dispatcher { + guard, + worker: Some(worker), + } + } + + fn guard(&self) -> DispatchGuard { + self.guard.clone() + } + + /// Waits for the worker thread to finish and finishes the dispatch queue. + /// + /// You need to call `shutdown` to initiate a shutdown of the queue. + #[cfg(test)] + fn join(mut self) -> Result<(), DispatchError> { + if let Some(worker) = self.worker.take() { + worker.join().map_err(|_| DispatchError::WorkerPanic)?; + } + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; + use std::sync::{Arc, Mutex}; + use std::{thread, time::Duration}; + + fn enable_test_logging() { + // When testing we want all logs to go to stdout/stderr by default, + // without requiring each individual test to activate it. + let _ = env_logger::builder().is_test(true).try_init(); + } + + #[test] + fn tasks_run_off_the_main_thread() { + enable_test_logging(); + + let main_thread_id = thread::current().id(); + let thread_canary = Arc::new(AtomicBool::new(false)); + + let dispatcher = Dispatcher::new(100); + + // Force the Dispatcher out of the pre-init queue mode. + dispatcher + .guard() + .flush_init() + .expect("Failed to get out of preinit queue mode"); + + let canary_clone = thread_canary.clone(); + dispatcher + .guard() + .launch(move || { + assert!(thread::current().id() != main_thread_id); + // Use the canary bool to make sure this is getting called before + // the test completes. + assert!(!canary_clone.load(Ordering::SeqCst)); + canary_clone.store(true, Ordering::SeqCst); + }) + .expect("Failed to dispatch the test task"); + + dispatcher.guard().block_on_queue(); + assert!(thread_canary.load(Ordering::SeqCst)); + assert_eq!(main_thread_id, thread::current().id()); + } + + #[test] + fn launch_correctly_adds_tasks_to_preinit_queue() { + enable_test_logging(); + + let main_thread_id = thread::current().id(); + let thread_canary = Arc::new(AtomicU8::new(0)); + + let dispatcher = Dispatcher::new(100); + + // Add 3 tasks to queue each one increasing thread_canary by 1 to + // signal that the tasks ran. + for _ in 0..3 { + let canary_clone = thread_canary.clone(); + dispatcher + .guard() + .launch(move || { + // Make sure the task is flushed off-the-main thread. + assert!(thread::current().id() != main_thread_id); + canary_clone.fetch_add(1, Ordering::SeqCst); + }) + .expect("Failed to dispatch the test task"); + } + + // Ensure that no task ran. + assert_eq!(0, thread_canary.load(Ordering::SeqCst)); + + // Flush the queue and wait for the tasks to complete. + dispatcher + .guard() + .flush_init() + .expect("Failed to get out of preinit queue mode"); + // Validate that we have the expected canary value. + assert_eq!(3, thread_canary.load(Ordering::SeqCst)); + } + + #[test] + fn preinit_tasks_are_processed_after_flush() { + enable_test_logging(); + + let dispatcher = Dispatcher::new(10); + + let result = Arc::new(Mutex::new(vec![])); + for i in 1..=5 { + let result = Arc::clone(&result); + dispatcher + .guard() + .launch(move || { + result.lock().unwrap().push(i); + }) + .unwrap(); + } + + result.lock().unwrap().push(0); + dispatcher.guard().flush_init().unwrap(); + for i in 6..=10 { + let result = Arc::clone(&result); + dispatcher + .guard() + .launch(move || { + result.lock().unwrap().push(i); + }) + .unwrap(); + } + + dispatcher.guard().block_on_queue(); + + // This additionally checks that tasks were executed in order. + assert_eq!( + &*result.lock().unwrap(), + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + ); + } + + #[test] + fn tasks_after_shutdown_are_not_processed() { + enable_test_logging(); + + let dispatcher = Dispatcher::new(10); + + let result = Arc::new(Mutex::new(vec![])); + + dispatcher.guard().flush_init().unwrap(); + + dispatcher.guard().shutdown().unwrap(); + { + let result = Arc::clone(&result); + // This might fail because the shutdown is quick enough, + // or it might succeed and still send the task. + // In any case that task should not be executed. + let _ = dispatcher.guard().launch(move || { + result.lock().unwrap().push(0); + }); + } + + dispatcher.join().unwrap(); + + assert!(result.lock().unwrap().is_empty()); + } + + #[test] + fn preinit_buffer_fills_up() { + enable_test_logging(); + + let dispatcher = Dispatcher::new(5); + + let result = Arc::new(Mutex::new(vec![])); + + for i in 1..=5 { + let result = Arc::clone(&result); + dispatcher + .guard() + .launch(move || { + result.lock().unwrap().push(i); + }) + .unwrap(); + } + + { + let result = Arc::clone(&result); + let err = dispatcher.guard().launch(move || { + result.lock().unwrap().push(10); + }); + assert_eq!(Err(DispatchError::QueueFull), err); + } + + dispatcher.guard().flush_init().unwrap(); + + { + let result = Arc::clone(&result); + dispatcher + .guard() + .launch(move || { + result.lock().unwrap().push(20); + }) + .unwrap(); + } + + dispatcher.guard().block_on_queue(); + + assert_eq!(&*result.lock().unwrap(), &[1, 2, 3, 4, 5, 20]); + } + + #[test] + fn normal_queue_is_unbounded() { + enable_test_logging(); + + // Note: We can't actually test that it's fully unbounded, + // but we can quickly queue more slow tasks than the pre-init buffer holds + // and then guarantuee they all run. + + let dispatcher = Dispatcher::new(5); + + let result = Arc::new(Mutex::new(vec![])); + + for i in 1..=5 { + let result = Arc::clone(&result); + dispatcher + .guard() + .launch(move || { + result.lock().unwrap().push(i); + }) + .unwrap(); + } + + dispatcher.guard().flush_init().unwrap(); + + // Queue more than 5 tasks, + // Each one is slow to process, so we should be faster in queueing + // them up than they are processed. + for i in 6..=20 { + let result = Arc::clone(&result); + dispatcher + .guard() + .launch(move || { + thread::sleep(Duration::from_millis(50)); + result.lock().unwrap().push(i); + }) + .unwrap(); + } + + dispatcher.guard().shutdown().unwrap(); + dispatcher.join().unwrap(); + + let expected = (1..=20).collect::>(); + assert_eq!(&*result.lock().unwrap(), &expected); + } +} diff --git a/third_party/rust/glean-core/src/error.rs b/third_party/rust/glean-core/src/error.rs new file mode 100644 index 0000000000..9c2c445a4c --- /dev/null +++ b/third_party/rust/glean-core/src/error.rs @@ -0,0 +1,169 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use std::ffi::OsString; +use std::fmt::{self, Display}; +use std::io; +use std::result; + +use rkv::StoreError; + +/// A specialized [`Result`] type for this crate's operations. +/// +/// This is generally used to avoid writing out [`Error`] directly and +/// is otherwise a direct mapping to [`Result`]. +/// +/// [`Result`]: https://doc.rust-lang.org/stable/std/result/enum.Result.html +/// [`Error`]: std.struct.Error.html +pub type Result = result::Result; + +/// A list enumerating the categories of errors in this crate. +/// +/// [`Error`]: https://doc.rust-lang.org/stable/std/error/trait.Error.html +/// +/// This list is intended to grow over time and it is not recommended to +/// exhaustively match against it. +#[derive(Debug)] +#[non_exhaustive] +pub enum ErrorKind { + /// Lifetime conversion failed + Lifetime(i32), + + /// IO error + IoError(io::Error), + + /// IO error + Rkv(StoreError), + + /// JSON error + Json(serde_json::error::Error), + + /// TimeUnit conversion failed + TimeUnit(i32), + + /// MemoryUnit conversion failed + MemoryUnit(i32), + + /// HistogramType conversion failed + HistogramType(i32), + + /// [`OsString`] conversion failed + OsString(OsString), + + /// Unknown error + Utf8Error, + + /// Glean initialization was attempted with an invalid configuration + InvalidConfig, + + /// Glean not initialized + NotInitialized, + + /// Ping request body size overflowed + PingBodyOverflow(usize), +} + +/// A specialized [`Error`] type for this crate's operations. +/// +/// [`Error`]: https://doc.rust-lang.org/stable/std/error/trait.Error.html +#[derive(Debug)] +pub struct Error { + kind: ErrorKind, +} + +impl Error { + /// Returns a new UTF-8 error + /// + /// This is exposed in order to expose conversion errors on the FFI layer. + pub fn utf8_error() -> Error { + Error { + kind: ErrorKind::Utf8Error, + } + } + + /// Indicates an error that no requested global object is initialized + pub fn not_initialized() -> Error { + Error { + kind: ErrorKind::NotInitialized, + } + } + + /// Returns the kind of the current error instance. + pub fn kind(&self) -> &ErrorKind { + &self.kind + } +} + +impl std::error::Error for Error {} + +impl Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use ErrorKind::*; + match self.kind() { + Lifetime(l) => write!(f, "Lifetime conversion from {} failed", l), + IoError(e) => write!(f, "An I/O error occurred: {}", e), + Rkv(e) => write!(f, "An Rkv error occurred: {}", e), + Json(e) => write!(f, "A JSON error occurred: {}", e), + TimeUnit(t) => write!(f, "TimeUnit conversion from {} failed", t), + MemoryUnit(m) => write!(f, "MemoryUnit conversion from {} failed", m), + HistogramType(h) => write!(f, "HistogramType conversion from {} failed", h), + OsString(s) => write!(f, "OsString conversion from {:?} failed", s), + Utf8Error => write!(f, "Invalid UTF-8 byte sequence in string"), + InvalidConfig => write!(f, "Invalid Glean configuration provided"), + NotInitialized => write!(f, "Global Glean object missing"), + PingBodyOverflow(s) => write!( + f, + "Ping request body size exceeded maximum size allowed: {}kB.", + s / 1024 + ), + } + } +} + +impl From for Error { + fn from(kind: ErrorKind) -> Error { + Error { kind } + } +} + +impl From for Error { + fn from(error: io::Error) -> Error { + Error { + kind: ErrorKind::IoError(error), + } + } +} + +impl From for Error { + fn from(error: StoreError) -> Error { + Error { + kind: ErrorKind::Rkv(error), + } + } +} + +impl From for Error { + fn from(error: serde_json::error::Error) -> Error { + Error { + kind: ErrorKind::Json(error), + } + } +} + +impl From for Error { + fn from(error: OsString) -> Error { + Error { + kind: ErrorKind::OsString(error), + } + } +} + +/// To satisfy integer conversion done by the macros on the FFI side, we need to be able to turn +/// something infallible into an error. +/// This will never actually be reached, as an integer-to-integer conversion is infallible. +impl From for Error { + fn from(_: std::convert::Infallible) -> Error { + unreachable!() + } +} diff --git a/third_party/rust/glean-core/src/error_recording.rs b/third_party/rust/glean-core/src/error_recording.rs new file mode 100644 index 0000000000..aaf850d019 --- /dev/null +++ b/third_party/rust/glean-core/src/error_recording.rs @@ -0,0 +1,239 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! # Error Recording +//! +//! Glean keeps track of errors that occured due to invalid labels or invalid values when recording +//! other metrics. +//! +//! Error counts are stored in labeled counters in the `glean.error` category. +//! The labeled counter metrics that store the errors are defined in the `metrics.yaml` for documentation purposes, +//! but are not actually used directly, since the `send_in_pings` value needs to match the pings of the metric that is erroring (plus the "metrics" ping), +//! not some constant value that we could define in `metrics.yaml`. + +use std::convert::TryFrom; +use std::fmt::Display; + +use crate::common_metric_data::CommonMetricDataInternal; +use crate::error::{Error, ErrorKind}; +use crate::metrics::labeled::{combine_base_identifier_and_label, strip_label}; +use crate::metrics::CounterMetric; +use crate::CommonMetricData; +use crate::Glean; +use crate::Lifetime; + +/// The possible error types for metric recording. +/// Note: the cases in this enum must be kept in sync with the ones +/// in the platform-specific code (e.g. `ErrorType.kt`) and with the +/// metrics in the registry files. +// When adding a new error type ensure it's also added to `ErrorType::iter()` below. +#[repr(C)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum ErrorType { + /// For when the value to be recorded does not match the metric-specific restrictions + InvalidValue, + /// For when the label of a labeled metric does not match the restrictions + InvalidLabel, + /// For when the metric caught an invalid state while recording + InvalidState, + /// For when the value to be recorded overflows the metric-specific upper range + InvalidOverflow, +} + +impl ErrorType { + /// The error type's metric id + pub fn as_str(&self) -> &'static str { + match self { + ErrorType::InvalidValue => "invalid_value", + ErrorType::InvalidLabel => "invalid_label", + ErrorType::InvalidState => "invalid_state", + ErrorType::InvalidOverflow => "invalid_overflow", + } + } + + /// Return an iterator over all possible error types. + /// + /// ``` + /// # use glean_core::ErrorType; + /// let errors = ErrorType::iter(); + /// let all_errors = errors.collect::>(); + /// assert_eq!(4, all_errors.len()); + /// ``` + pub fn iter() -> impl Iterator { + // N.B.: This has no compile-time guarantees that it is complete. + // New `ErrorType` variants will need to be added manually. + [ + ErrorType::InvalidValue, + ErrorType::InvalidLabel, + ErrorType::InvalidState, + ErrorType::InvalidOverflow, + ] + .iter() + .copied() + } +} + +impl TryFrom for ErrorType { + type Error = Error; + + fn try_from(value: i32) -> Result { + match value { + 0 => Ok(ErrorType::InvalidValue), + 1 => Ok(ErrorType::InvalidLabel), + 2 => Ok(ErrorType::InvalidState), + 3 => Ok(ErrorType::InvalidOverflow), + e => Err(ErrorKind::Lifetime(e).into()), + } + } +} + +/// For a given metric, get the metric in which to record errors +fn get_error_metric_for_metric(meta: &CommonMetricDataInternal, error: ErrorType) -> CounterMetric { + // Can't use meta.identifier here, since that might cause infinite recursion + // if the label on this metric needs to report an error. + let identifier = meta.base_identifier(); + let name = strip_label(&identifier); + + // Record errors in the pings the metric is in, as well as the metrics ping. + let mut send_in_pings = meta.inner.send_in_pings.clone(); + let ping_name = "metrics".to_string(); + if !send_in_pings.contains(&ping_name) { + send_in_pings.push(ping_name); + } + + CounterMetric::new(CommonMetricData { + name: combine_base_identifier_and_label(error.as_str(), name), + category: "glean.error".into(), + lifetime: Lifetime::Ping, + send_in_pings, + ..Default::default() + }) +} + +/// Records an error into Glean. +/// +/// Errors are recorded as labeled counters in the `glean.error` category. +/// +/// *Note*: We do make assumptions here how labeled metrics are encoded, namely by having the name +/// `/