// This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. //! Ping collection, assembly & submission. use std::fs::{create_dir_all, File}; use std::io::Write; use std::path::{Path, PathBuf}; use log::info; use serde_json::{json, Value as JsonValue}; use crate::common_metric_data::{CommonMetricData, Lifetime}; use crate::metrics::{CounterMetric, DatetimeMetric, Metric, MetricType, PingType, TimeUnit}; use crate::storage::{StorageManager, INTERNAL_STORAGE}; use crate::upload::HeaderMap; use crate::util::{get_iso_time_string, local_now_with_offset}; use crate::{Glean, Result, DELETION_REQUEST_PINGS_DIRECTORY, PENDING_PINGS_DIRECTORY}; /// Holds everything you need to store or send a ping. pub struct Ping<'a> { /// The unique document id. pub doc_id: &'a str, /// The ping's name. pub name: &'a str, /// The path on the server to use when uplaoding this ping. pub url_path: &'a str, /// The payload, including `*_info` fields. pub content: JsonValue, /// The headers to upload with the payload. pub headers: HeaderMap, } /// Collect a ping's data, assemble it into its full payload and store it on disk. pub struct PingMaker; fn merge(a: &mut JsonValue, b: &JsonValue) { match (a, b) { (&mut JsonValue::Object(ref mut a), JsonValue::Object(b)) => { for (k, v) in b { merge(a.entry(k.clone()).or_insert(JsonValue::Null), v); } } (a, b) => { *a = b.clone(); } } } impl Default for PingMaker { fn default() -> Self { Self::new() } } impl PingMaker { /// Creates a new [`PingMaker`]. pub fn new() -> Self { Self } /// Gets, and then increments, the sequence number for a given ping. fn get_ping_seq(&self, glean: &Glean, storage_name: &str) -> usize { // Sequence numbers are stored as a counter under a name that includes the storage name let seq = CounterMetric::new(CommonMetricData { name: format!("{}#sequence", storage_name), // We don't need a category, the name is already unique category: "".into(), send_in_pings: vec![INTERNAL_STORAGE.into()], lifetime: Lifetime::User, ..Default::default() }); let current_seq = match StorageManager.snapshot_metric( glean.storage(), INTERNAL_STORAGE, &seq.meta().identifier(glean), seq.meta().inner.lifetime, ) { Some(Metric::Counter(i)) => i, _ => 0, }; // Increase to next sequence id seq.add_sync(glean, 1); current_seq as usize } /// Gets the formatted start and end times for this ping and update for the next ping. fn get_start_end_times( &self, glean: &Glean, storage_name: &str, time_unit: TimeUnit, ) -> (String, String) { let start_time = DatetimeMetric::new( CommonMetricData { name: format!("{}#start", storage_name), category: "".into(), send_in_pings: vec![INTERNAL_STORAGE.into()], lifetime: Lifetime::User, ..Default::default() }, time_unit, ); // "start_time" is the time the ping was generated the last time. // If not available, we use the date the Glean object was initialized. let start_time_data = start_time .get_value(glean, INTERNAL_STORAGE) .unwrap_or_else(|| glean.start_time()); let end_time_data = local_now_with_offset(); // Update the start time with the current time. start_time.set_sync_chrono(glean, end_time_data); // Format the times. let start_time_data = get_iso_time_string(start_time_data, time_unit); let end_time_data = get_iso_time_string(end_time_data, time_unit); (start_time_data, end_time_data) } fn get_ping_info( &self, glean: &Glean, storage_name: &str, reason: Option<&str>, precision: TimeUnit, ) -> JsonValue { let (start_time, end_time) = self.get_start_end_times(glean, storage_name, precision); let mut map = json!({ "seq": self.get_ping_seq(glean, storage_name), "start_time": start_time, "end_time": end_time, }); if let Some(reason) = reason { map.as_object_mut() .unwrap() // safe unwrap, we created the object above .insert("reason".to_string(), JsonValue::String(reason.to_string())); }; // Get the experiment data, if available. if let Some(experiment_data) = StorageManager.snapshot_experiments_as_json(glean.storage(), INTERNAL_STORAGE) { map.as_object_mut() .unwrap() // safe unwrap, we created the object above .insert("experiments".to_string(), experiment_data); }; map } fn get_client_info(&self, glean: &Glean, include_client_id: bool) -> JsonValue { // Add the "telemetry_sdk_build", which is the glean-core version. let mut map = json!({ "telemetry_sdk_build": crate::GLEAN_VERSION, }); // Flatten the whole thing. if let Some(client_info) = StorageManager.snapshot_as_json(glean.storage(), "glean_client_info", true) { let client_info_obj = client_info.as_object().unwrap(); // safe unwrap, snapshot always returns an object. for (_key, value) in client_info_obj { merge(&mut map, value); } } else { log::warn!("Empty client info data."); } if !include_client_id { // safe unwrap, we created the object above map.as_object_mut().unwrap().remove("client_id"); } json!(map) } /// Build the headers to be persisted and sent with a ping. /// /// Currently the only headers we persist are `X-Debug-ID` and `X-Source-Tags`. /// /// # Arguments /// /// * `glean` - the [`Glean`] instance to collect headers from. /// /// # Returns /// /// A map of header names to header values. /// Might be empty if there are no extra headers to send. /// ``` fn get_headers(&self, glean: &Glean) -> HeaderMap { let mut headers_map = HeaderMap::new(); if let Some(debug_view_tag) = glean.debug_view_tag() { headers_map.insert("X-Debug-ID".to_string(), debug_view_tag.to_string()); } if let Some(source_tags) = glean.source_tags() { headers_map.insert("X-Source-Tags".to_string(), source_tags.join(",")); } headers_map } /// Collects a snapshot for the given ping from storage and attach required meta information. /// /// # Arguments /// /// * `glean` - the [`Glean`] instance to collect data from. /// * `ping` - the ping to collect for. /// * `reason` - an optional reason code to include in the ping. /// * `doc_id` - the ping's unique document identifier. /// * `url_path` - the path on the server to upload this ping to. /// /// # Returns /// /// A fully assembled representation of the ping payload and associated metadata. /// If there is no data stored for the ping, `None` is returned. pub fn collect<'a>( &self, glean: &Glean, ping: &'a PingType, reason: Option<&str>, doc_id: &'a str, url_path: &'a str, ) -> Option> { info!("Collecting {}", ping.name()); let mut metrics_data = StorageManager.snapshot_as_json(glean.storage(), ping.name(), true); let events_data = glean .event_storage() .snapshot_as_json(glean, ping.name(), true); // Due to the way the experimentation identifier could link datasets that are intentionally unlinked, // it will not be included in pings that specifically exclude the Glean client-id and those pings that // should not be sent if empty. if (!ping.include_client_id() || !ping.send_if_empty()) && glean.test_get_experimentation_id().is_some() && metrics_data.is_some() { // There is a lot of unwrapping here, but that's fine because the `if` conditions above mean that the // experimentation id is present in the metrics. let metrics = metrics_data.as_mut().unwrap().as_object_mut().unwrap(); let metrics_count = metrics.len(); let strings = metrics.get_mut("string").unwrap().as_object_mut().unwrap(); let string_count = strings.len(); // Handle the send_if_empty case by checking if the experimentation id is the only metric in the data. let empty_payload = events_data.is_none() && metrics_count == 1 && string_count == 1; if !ping.include_client_id() || (!ping.send_if_empty() && empty_payload) { strings.remove("glean.client.annotation.experimentation_id"); } if strings.is_empty() { metrics.remove("string"); } if metrics.is_empty() { metrics_data = None; } } let is_empty = metrics_data.is_none() && events_data.is_none(); if !ping.send_if_empty() && is_empty { info!("Storage for {} empty. Bailing out.", ping.name()); return None; } else if ping.name() == "events" && events_data.is_none() { info!("No events for 'events' ping. Bailing out."); return None; } else if is_empty { info!( "Storage for {} empty. Ping will still be sent.", ping.name() ); } let precision = if ping.precise_timestamps() { TimeUnit::Millisecond } else { TimeUnit::Minute }; let ping_info = self.get_ping_info(glean, ping.name(), reason, precision); let client_info = self.get_client_info(glean, ping.include_client_id()); let mut json = json!({ "ping_info": ping_info, "client_info": client_info }); let json_obj = json.as_object_mut()?; if let Some(metrics_data) = metrics_data { json_obj.insert("metrics".to_string(), metrics_data); } if let Some(events_data) = events_data { json_obj.insert("events".to_string(), events_data); } Some(Ping { content: json, name: ping.name(), doc_id, url_path, headers: self.get_headers(glean), }) } /// Gets the path to a directory for ping storage. /// /// The directory will be created inside the `data_path`. /// The `pings` directory (and its parents) is created if it does not exist. fn get_pings_dir(&self, data_path: &Path, ping_type: Option<&str>) -> std::io::Result { // Use a special directory for deletion-request pings let pings_dir = match ping_type { Some("deletion-request") => data_path.join(DELETION_REQUEST_PINGS_DIRECTORY), _ => data_path.join(PENDING_PINGS_DIRECTORY), }; create_dir_all(&pings_dir)?; Ok(pings_dir) } /// Gets path to a directory for temporary storage. /// /// The directory will be created inside the `data_path`. /// The `tmp` directory (and its parents) is created if it does not exist. fn get_tmp_dir(&self, data_path: &Path) -> std::io::Result { let pings_dir = data_path.join("tmp"); create_dir_all(&pings_dir)?; Ok(pings_dir) } /// Stores a ping to disk in the pings directory. pub fn store_ping(&self, data_path: &Path, ping: &Ping) -> std::io::Result<()> { let pings_dir = self.get_pings_dir(data_path, Some(ping.name))?; let temp_dir = self.get_tmp_dir(data_path)?; // Write to a temporary location and then move when done, // for transactional writes. let temp_ping_path = temp_dir.join(ping.doc_id); let ping_path = pings_dir.join(ping.doc_id); log::debug!( "Storing ping '{}' at '{}'", ping.doc_id, ping_path.display() ); { let mut file = File::create(&temp_ping_path)?; file.write_all(ping.url_path.as_bytes())?; file.write_all(b"\n")?; file.write_all(::serde_json::to_string(&ping.content)?.as_bytes())?; if !ping.headers.is_empty() { file.write_all(b"\n{\"headers\":")?; file.write_all(::serde_json::to_string(&ping.headers)?.as_bytes())?; file.write_all(b"}")?; } } if let Err(e) = std::fs::rename(&temp_ping_path, &ping_path) { log::warn!( "Unable to move '{}' to '{}", temp_ping_path.display(), ping_path.display() ); return Err(e); } Ok(()) } /// Clears any pending pings in the queue. pub fn clear_pending_pings(&self, data_path: &Path) -> Result<()> { let pings_dir = self.get_pings_dir(data_path, None)?; std::fs::remove_dir_all(&pings_dir)?; create_dir_all(&pings_dir)?; log::debug!("All pending pings deleted"); Ok(()) } } #[cfg(test)] mod test { use super::*; use crate::tests::new_glean; #[test] fn sequence_numbers_should_be_reset_when_toggling_uploading() { let (mut glean, _t) = new_glean(None); let ping_maker = PingMaker::new(); assert_eq!(0, ping_maker.get_ping_seq(&glean, "custom")); assert_eq!(1, ping_maker.get_ping_seq(&glean, "custom")); glean.set_upload_enabled(false); assert_eq!(0, ping_maker.get_ping_seq(&glean, "custom")); assert_eq!(0, ping_maker.get_ping_seq(&glean, "custom")); glean.set_upload_enabled(true); assert_eq!(0, ping_maker.get_ping_seq(&glean, "custom")); assert_eq!(1, ping_maker.get_ping_seq(&glean, "custom")); } }