summaryrefslogtreecommitdiffstats
path: root/third_party/rust/sync15/src/telemetry.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/sync15/src/telemetry.rs')
-rw-r--r--third_party/rust/sync15/src/telemetry.rs824
1 files changed, 824 insertions, 0 deletions
diff --git a/third_party/rust/sync15/src/telemetry.rs b/third_party/rust/sync15/src/telemetry.rs
new file mode 100644
index 0000000000..dac018f032
--- /dev/null
+++ b/third_party/rust/sync15/src/telemetry.rs
@@ -0,0 +1,824 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+//! Manage recording sync telemetry. Assumes some external telemetry
+//! library/code which manages submitting.
+
+use crate::error::Error;
+#[cfg(feature = "sync-client")]
+use crate::error::ErrorResponse;
+
+use std::collections::HashMap;
+use std::time;
+
+use serde::{ser, Serialize, Serializer};
+
+// A test helper, used by the many test modules below.
+#[cfg(test)]
+fn assert_json<T: ?Sized>(v: &T, expected: serde_json::Value)
+where
+ T: serde::Serialize,
+{
+ assert_eq!(
+ serde_json::to_value(&v).expect("should get a value"),
+ expected
+ );
+}
+
+/// What we record for 'when' and 'took' in a telemetry record.
+#[derive(Debug, Serialize)]
+struct WhenTook {
+ when: f64,
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ took: u64,
+}
+
+/// What we track while recording 'when' and 'took. It serializes as a WhenTook,
+/// except when .finished() hasn't been called, in which case it panics.
+#[derive(Debug)]
+enum Stopwatch {
+ Started(time::SystemTime, time::Instant),
+ Finished(WhenTook),
+}
+
+impl Default for Stopwatch {
+ fn default() -> Self {
+ Stopwatch::new()
+ }
+}
+
+impl Stopwatch {
+ fn new() -> Self {
+ Stopwatch::Started(time::SystemTime::now(), time::Instant::now())
+ }
+
+ // For tests we don't want real timestamps because we test against literals.
+ #[cfg(test)]
+ fn finished(&self) -> Self {
+ Stopwatch::Finished(WhenTook { when: 0.0, took: 0 })
+ }
+
+ #[cfg(not(test))]
+ fn finished(&self) -> Self {
+ match self {
+ Stopwatch::Started(st, si) => {
+ let std = st.duration_since(time::UNIX_EPOCH).unwrap_or_default();
+ let when = std.as_secs() as f64; // we don't want sub-sec accuracy. Do we need to write a float?
+
+ let sid = si.elapsed();
+ let took = sid.as_secs() * 1000 + (u64::from(sid.subsec_nanos()) / 1_000_000);
+ Stopwatch::Finished(WhenTook { when, took })
+ }
+ _ => {
+ unreachable!("can't finish twice");
+ }
+ }
+ }
+}
+
+impl Serialize for Stopwatch {
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ match self {
+ Stopwatch::Started(_, _) => Err(ser::Error::custom("StopWatch has not been finished")),
+ Stopwatch::Finished(c) => c.serialize(serializer),
+ }
+ }
+}
+
+#[cfg(test)]
+mod stopwatch_tests {
+ use super::*;
+
+ // A wrapper struct because we flatten - this struct should serialize with
+ // 'when' and 'took' keys (but with no 'sw'.)
+ #[derive(Debug, Serialize)]
+ struct WT {
+ #[serde(flatten)]
+ sw: Stopwatch,
+ }
+
+ #[test]
+ fn test_not_finished() {
+ let wt = WT {
+ sw: Stopwatch::new(),
+ };
+ serde_json::to_string(&wt).expect_err("unfinished stopwatch should fail");
+ }
+
+ #[test]
+ fn test() {
+ assert_json(
+ &WT {
+ sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 1 }),
+ },
+ serde_json::json!({"when": 1.0, "took": 1}),
+ );
+ assert_json(
+ &WT {
+ sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 0 }),
+ },
+ serde_json::json!({"when": 1.0}),
+ );
+ }
+}
+
+/// A generic "Event" - suitable for all kinds of pings (although this module
+/// only cares about the sync ping)
+#[derive(Debug, Serialize)]
+pub struct Event {
+ // We use static str references as we expect values to be literals.
+ object: &'static str,
+
+ method: &'static str,
+
+ // Maybe "value" should be a string?
+ #[serde(skip_serializing_if = "Option::is_none")]
+ value: Option<&'static str>,
+
+ // we expect the keys to be literals but values are real strings.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ extra: Option<HashMap<&'static str, String>>,
+}
+
+impl Event {
+ pub fn new(object: &'static str, method: &'static str) -> Self {
+ assert!(object.len() <= 20);
+ assert!(method.len() <= 20);
+ Self {
+ object,
+ method,
+ value: None,
+ extra: None,
+ }
+ }
+
+ pub fn value(mut self, v: &'static str) -> Self {
+ assert!(v.len() <= 80);
+ self.value = Some(v);
+ self
+ }
+
+ pub fn extra(mut self, key: &'static str, val: String) -> Self {
+ assert!(key.len() <= 15);
+ assert!(val.len() <= 85);
+ match self.extra {
+ None => self.extra = Some(HashMap::new()),
+ Some(ref e) => assert!(e.len() < 10),
+ }
+ self.extra.as_mut().unwrap().insert(key, val);
+ self
+ }
+}
+
+#[cfg(test)]
+mod test_events {
+ use super::*;
+
+ #[test]
+ #[should_panic]
+ fn test_invalid_length_ctor() {
+ Event::new("A very long object value", "Method");
+ }
+
+ #[test]
+ #[should_panic]
+ fn test_invalid_length_extra_key() {
+ Event::new("O", "M").extra("A very long key value", "v".to_string());
+ }
+
+ #[test]
+ #[should_panic]
+ fn test_invalid_length_extra_val() {
+ let l = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
+ abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ Event::new("O", "M").extra("k", l.to_string());
+ }
+
+ #[test]
+ #[should_panic]
+ fn test_too_many_extras() {
+ let l = "abcdefghijk";
+ let mut e = Event::new("Object", "Method");
+ for i in 0..l.len() {
+ e = e.extra(&l[i..=i], "v".to_string());
+ }
+ }
+
+ #[test]
+ fn test_json() {
+ assert_json(
+ &Event::new("Object", "Method").value("Value"),
+ serde_json::json!({"object": "Object", "method": "Method", "value": "Value"}),
+ );
+
+ assert_json(
+ &Event::new("Object", "Method").extra("one", "one".to_string()),
+ serde_json::json!({"object": "Object",
+ "method": "Method",
+ "extra": {"one": "one"}
+ }),
+ )
+ }
+}
+
+/// A Sync failure.
+#[derive(Debug, Serialize)]
+#[serde(tag = "name")]
+pub enum SyncFailure {
+ #[serde(rename = "shutdownerror")]
+ Shutdown,
+
+ #[serde(rename = "othererror")]
+ Other { error: String },
+
+ #[serde(rename = "unexpectederror")]
+ Unexpected { error: String },
+
+ #[serde(rename = "autherror")]
+ Auth { from: &'static str },
+
+ #[serde(rename = "httperror")]
+ Http { code: u16 },
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn reprs() {
+ assert_json(
+ &SyncFailure::Shutdown,
+ serde_json::json!({"name": "shutdownerror"}),
+ );
+
+ assert_json(
+ &SyncFailure::Other {
+ error: "dunno".to_string(),
+ },
+ serde_json::json!({"name": "othererror", "error": "dunno"}),
+ );
+
+ assert_json(
+ &SyncFailure::Unexpected {
+ error: "dunno".to_string(),
+ },
+ serde_json::json!({"name": "unexpectederror", "error": "dunno"}),
+ );
+
+ assert_json(
+ &SyncFailure::Auth { from: "FxA" },
+ serde_json::json!({"name": "autherror", "from": "FxA"}),
+ );
+
+ assert_json(
+ &SyncFailure::Http { code: 500 },
+ serde_json::json!({"name": "httperror", "code": 500}),
+ );
+ }
+}
+
+/// Incoming record for an engine's sync
+#[derive(Debug, Default, Serialize)]
+pub struct EngineIncoming {
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ applied: u32,
+
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ failed: u32,
+
+ #[serde(rename = "newFailed")]
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ new_failed: u32,
+
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ reconciled: u32,
+}
+
+impl EngineIncoming {
+ pub fn new() -> Self {
+ Self {
+ ..Default::default()
+ }
+ }
+
+ // A helper used via skip_serializing_if
+ fn is_empty(inc: &Option<Self>) -> bool {
+ match inc {
+ Some(a) => a.applied == 0 && a.failed == 0 && a.new_failed == 0 && a.reconciled == 0,
+ None => true,
+ }
+ }
+
+ /// Increment the value of `applied` by `n`.
+ #[inline]
+ pub fn applied(&mut self, n: u32) {
+ self.applied += n;
+ }
+
+ /// Increment the value of `failed` by `n`.
+ #[inline]
+ pub fn failed(&mut self, n: u32) {
+ self.failed += n;
+ }
+
+ /// Increment the value of `new_failed` by `n`.
+ #[inline]
+ pub fn new_failed(&mut self, n: u32) {
+ self.new_failed += n;
+ }
+
+ /// Increment the value of `reconciled` by `n`.
+ #[inline]
+ pub fn reconciled(&mut self, n: u32) {
+ self.reconciled += n;
+ }
+
+ /// Get the value of `applied`. Mostly useful for testing.
+ #[inline]
+ pub fn get_applied(&self) -> u32 {
+ self.applied
+ }
+
+ /// Get the value of `failed`. Mostly useful for testing.
+ #[inline]
+ pub fn get_failed(&self) -> u32 {
+ self.failed
+ }
+
+ /// Get the value of `new_failed`. Mostly useful for testing.
+ #[inline]
+ pub fn get_new_failed(&self) -> u32 {
+ self.new_failed
+ }
+
+ /// Get the value of `reconciled`. Mostly useful for testing.
+ #[inline]
+ pub fn get_reconciled(&self) -> u32 {
+ self.reconciled
+ }
+}
+
+/// Outgoing record for an engine's sync
+#[derive(Debug, Default, Serialize)]
+pub struct EngineOutgoing {
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ sent: usize,
+
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ failed: usize,
+}
+
+impl EngineOutgoing {
+ pub fn new() -> Self {
+ EngineOutgoing {
+ ..Default::default()
+ }
+ }
+
+ #[inline]
+ pub fn sent(&mut self, n: usize) {
+ self.sent += n;
+ }
+
+ #[inline]
+ pub fn failed(&mut self, n: usize) {
+ self.failed += n;
+ }
+}
+
+/// One engine's sync.
+#[derive(Debug, Serialize)]
+pub struct Engine {
+ name: String,
+
+ #[serde(flatten)]
+ when_took: Stopwatch,
+
+ #[serde(skip_serializing_if = "EngineIncoming::is_empty")]
+ incoming: Option<EngineIncoming>,
+
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ outgoing: Vec<EngineOutgoing>, // one for each batch posted.
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ #[serde(rename = "failureReason")]
+ failure: Option<SyncFailure>,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ validation: Option<Validation>,
+}
+
+impl Engine {
+ pub fn new(name: impl Into<String>) -> Self {
+ Self {
+ name: name.into(),
+ when_took: Stopwatch::new(),
+ incoming: None,
+ outgoing: Vec::new(),
+ failure: None,
+ validation: None,
+ }
+ }
+
+ pub fn incoming(&mut self, inc: EngineIncoming) {
+ assert!(self.incoming.is_none());
+ self.incoming = Some(inc);
+ }
+
+ pub fn outgoing(&mut self, out: EngineOutgoing) {
+ self.outgoing.push(out);
+ }
+
+ pub fn failure(&mut self, err: impl Into<SyncFailure>) {
+ // Currently we take the first error, under the assumption that the
+ // first is the most important and all others stem from that.
+ let failure = err.into();
+ if self.failure.is_none() {
+ self.failure = Some(failure);
+ } else {
+ log::warn!(
+ "engine already has recorded a failure of {:?} - ignoring {:?}",
+ &self.failure,
+ &failure
+ );
+ }
+ }
+
+ pub fn validation(&mut self, v: Validation) {
+ assert!(self.validation.is_none());
+ self.validation = Some(v);
+ }
+
+ fn finished(&mut self) {
+ self.when_took = self.when_took.finished();
+ }
+}
+
+#[derive(Debug, Default, Serialize)]
+pub struct Validation {
+ version: u32,
+
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ problems: Vec<Problem>,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ #[serde(rename = "failureReason")]
+ failure: Option<SyncFailure>,
+}
+
+impl Validation {
+ pub fn with_version(version: u32) -> Validation {
+ Validation {
+ version,
+ ..Validation::default()
+ }
+ }
+
+ pub fn problem(&mut self, name: &'static str, count: usize) -> &mut Self {
+ if count > 0 {
+ self.problems.push(Problem { name, count });
+ }
+ self
+ }
+}
+
+#[derive(Debug, Default, Serialize)]
+pub struct Problem {
+ name: &'static str,
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ count: usize,
+}
+
+#[cfg(test)]
+mod engine_tests {
+ use super::*;
+
+ #[test]
+ fn test_engine() {
+ let mut e = Engine::new("test_engine");
+ e.finished();
+ assert_json(&e, serde_json::json!({"name": "test_engine", "when": 0.0}));
+ }
+
+ #[test]
+ fn test_engine_not_finished() {
+ let e = Engine::new("test_engine");
+ serde_json::to_value(&e).expect_err("unfinished stopwatch should fail");
+ }
+
+ #[test]
+ fn test_incoming() {
+ let mut i = EngineIncoming::new();
+ i.applied(1);
+ i.failed(2);
+ let mut e = Engine::new("TestEngine");
+ e.incoming(i);
+ e.finished();
+ assert_json(
+ &e,
+ serde_json::json!({"name": "TestEngine", "when": 0.0, "incoming": {"applied": 1, "failed": 2}}),
+ );
+ }
+
+ #[test]
+ fn test_outgoing() {
+ let mut o = EngineOutgoing::new();
+ o.sent(2);
+ o.failed(1);
+ let mut e = Engine::new("TestEngine");
+ e.outgoing(o);
+ e.finished();
+ assert_json(
+ &e,
+ serde_json::json!({"name": "TestEngine", "when": 0.0, "outgoing": [{"sent": 2, "failed": 1}]}),
+ );
+ }
+
+ #[test]
+ fn test_failure() {
+ let mut e = Engine::new("TestEngine");
+ e.failure(SyncFailure::Http { code: 500 });
+ e.finished();
+ assert_json(
+ &e,
+ serde_json::json!({"name": "TestEngine",
+ "when": 0.0,
+ "failureReason": {"name": "httperror", "code": 500}
+ }),
+ );
+ }
+
+ #[test]
+ fn test_raw() {
+ let mut e = Engine::new("TestEngine");
+ let mut inc = EngineIncoming::new();
+ inc.applied(10);
+ e.incoming(inc);
+ let mut out = EngineOutgoing::new();
+ out.sent(1);
+ e.outgoing(out);
+ e.failure(SyncFailure::Http { code: 500 });
+ e.finished();
+
+ assert_eq!(e.outgoing.len(), 1);
+ assert_eq!(e.incoming.as_ref().unwrap().applied, 10);
+ assert_eq!(e.outgoing[0].sent, 1);
+ assert!(e.failure.is_some());
+ serde_json::to_string(&e).expect("should get json");
+ }
+}
+
+/// A single sync. May have many engines, may have its own failure.
+#[derive(Debug, Serialize, Default)]
+pub struct SyncTelemetry {
+ #[serde(flatten)]
+ when_took: Stopwatch,
+
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ engines: Vec<Engine>,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ #[serde(rename = "failureReason")]
+ failure: Option<SyncFailure>,
+}
+
+impl SyncTelemetry {
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ pub fn engine(&mut self, mut e: Engine) {
+ e.finished();
+ self.engines.push(e);
+ }
+
+ pub fn failure(&mut self, failure: SyncFailure) {
+ assert!(self.failure.is_none());
+ self.failure = Some(failure);
+ }
+
+ // Note that unlike other 'finished' methods, this isn't private - someone
+ // needs to explicitly call this before handling the json payload to
+ // whatever ends up submitting it.
+ pub fn finished(&mut self) {
+ self.when_took = self.when_took.finished();
+ }
+}
+
+#[cfg(test)]
+mod sync_tests {
+ use super::*;
+
+ #[test]
+ fn test_accum() {
+ let mut s = SyncTelemetry::new();
+ let mut inc = EngineIncoming::new();
+ inc.applied(10);
+ let mut e = Engine::new("test_engine");
+ e.incoming(inc);
+ e.failure(SyncFailure::Http { code: 500 });
+ e.finished();
+ s.engine(e);
+ s.finished();
+
+ assert_json(
+ &s,
+ serde_json::json!({
+ "when": 0.0,
+ "engines": [{
+ "name":"test_engine",
+ "when":0.0,
+ "incoming": {
+ "applied": 10
+ },
+ "failureReason": {
+ "name": "httperror",
+ "code": 500
+ }
+ }]
+ }),
+ );
+ }
+
+ #[test]
+ fn test_multi_engine() {
+ let mut inc_e1 = EngineIncoming::new();
+ inc_e1.applied(1);
+ let mut e1 = Engine::new("test_engine");
+ e1.incoming(inc_e1);
+
+ let mut inc_e2 = EngineIncoming::new();
+ inc_e2.failed(1);
+ let mut e2 = Engine::new("test_engine_2");
+ e2.incoming(inc_e2);
+ let mut out_e2 = EngineOutgoing::new();
+ out_e2.sent(1);
+ e2.outgoing(out_e2);
+
+ let mut s = SyncTelemetry::new();
+ s.engine(e1);
+ s.engine(e2);
+ s.failure(SyncFailure::Http { code: 500 });
+ s.finished();
+ assert_json(
+ &s,
+ serde_json::json!({
+ "when": 0.0,
+ "engines": [{
+ "name": "test_engine",
+ "when": 0.0,
+ "incoming": {
+ "applied": 1
+ }
+ },{
+ "name": "test_engine_2",
+ "when": 0.0,
+ "incoming": {
+ "failed": 1
+ },
+ "outgoing": [{
+ "sent": 1
+ }]
+ }],
+ "failureReason": {
+ "name": "httperror",
+ "code": 500
+ }
+ }),
+ );
+ }
+}
+
+/// The Sync ping payload, as documented at
+/// https://firefox-source-docs.mozilla.org/toolkit/components/telemetry/telemetry/data/sync-ping.html.
+/// May have many syncs, may have many events. However, due to the architecture
+/// of apps which use these components, this payload is almost certainly not
+/// suitable for submitting directly. For example, we will always return a
+/// payload with exactly 1 sync, and it will not know certain other fields
+/// in the payload, such as the *hashed* FxA device ID (see
+/// https://searchfox.org/mozilla-central/rev/c3ebaf6de2d481c262c04bb9657eaf76bf47e2ac/services/sync/modules/browserid_identity.js#185
+/// for an example of how the device ID is constructed). The intention is that
+/// consumers of this will use this to create a "real" payload - eg, accumulating
+/// until some threshold number of syncs is reached, and contributing
+/// additional data which only the consumer knows.
+#[derive(Debug, Serialize, Default)]
+pub struct SyncTelemetryPing {
+ version: u32,
+
+ uid: Option<String>,
+
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ events: Vec<Event>,
+
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ syncs: Vec<SyncTelemetry>,
+}
+
+impl SyncTelemetryPing {
+ pub fn new() -> Self {
+ Self {
+ version: 1,
+ ..Default::default()
+ }
+ }
+
+ pub fn uid(&mut self, uid: String) {
+ if let Some(ref existing) = self.uid {
+ if *existing != uid {
+ log::warn!("existing uid ${} being replaced by {}", existing, uid);
+ }
+ }
+ self.uid = Some(uid);
+ }
+
+ pub fn sync(&mut self, mut s: SyncTelemetry) {
+ s.finished();
+ self.syncs.push(s);
+ }
+
+ pub fn event(&mut self, e: Event) {
+ self.events.push(e);
+ }
+}
+
+ffi_support::implement_into_ffi_by_json!(SyncTelemetryPing);
+
+#[cfg(test)]
+mod ping_tests {
+ use super::*;
+ #[test]
+ fn test_ping() {
+ let engine = Engine::new("test");
+ let mut s = SyncTelemetry::new();
+ s.engine(engine);
+ let mut p = SyncTelemetryPing::new();
+ p.uid("user-id".into());
+ p.sync(s);
+ let event = Event::new("foo", "bar");
+ p.event(event);
+ assert_json(
+ &p,
+ serde_json::json!({
+ "events": [{
+ "method": "bar", "object": "foo"
+ }],
+ "syncs": [{
+ "engines": [{
+ "name": "test", "when": 0.0
+ }],
+ "when": 0.0
+ }],
+ "uid": "user-id",
+ "version": 1
+ }),
+ );
+ }
+}
+
+impl<'a> From<&'a Error> for SyncFailure {
+ fn from(e: &Error) -> SyncFailure {
+ match e {
+ #[cfg(feature = "sync-client")]
+ Error::TokenserverHttpError(status) => {
+ if *status == 401 {
+ SyncFailure::Auth {
+ from: "tokenserver",
+ }
+ } else {
+ SyncFailure::Http { code: *status }
+ }
+ }
+ #[cfg(feature = "sync-client")]
+ Error::BackoffError(_) => SyncFailure::Http { code: 503 },
+ #[cfg(feature = "sync-client")]
+ Error::StorageHttpError(ref e) => match e {
+ ErrorResponse::NotFound { .. } => SyncFailure::Http { code: 404 },
+ ErrorResponse::Unauthorized { .. } => SyncFailure::Auth { from: "storage" },
+ ErrorResponse::PreconditionFailed { .. } => SyncFailure::Http { code: 412 },
+ ErrorResponse::ServerError { status, .. } => SyncFailure::Http { code: *status },
+ ErrorResponse::RequestFailed { status, .. } => SyncFailure::Http { code: *status },
+ },
+ #[cfg(feature = "crypto")]
+ Error::CryptoError(ref e) => SyncFailure::Unexpected {
+ error: e.to_string(),
+ },
+ #[cfg(feature = "sync-client")]
+ Error::RequestError(ref e) => SyncFailure::Unexpected {
+ error: e.to_string(),
+ },
+ #[cfg(feature = "sync-client")]
+ Error::UnexpectedStatus(ref e) => SyncFailure::Http { code: e.status },
+ Error::Interrupted(ref e) => SyncFailure::Unexpected {
+ error: e.to_string(),
+ },
+ e => SyncFailure::Other {
+ error: e.to_string(),
+ },
+ }
+ }
+}