diff options
Diffstat (limited to 'third_party/rust/sync15-traits')
-rw-r--r-- | third_party/rust/sync15-traits/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/sync15-traits/Cargo.toml | 18 | ||||
-rw-r--r-- | third_party/rust/sync15-traits/README.md | 4 | ||||
-rw-r--r-- | third_party/rust/sync15-traits/src/bridged_engine.rs | 208 | ||||
-rw-r--r-- | third_party/rust/sync15-traits/src/changeset.rs | 33 | ||||
-rw-r--r-- | third_party/rust/sync15-traits/src/client.rs | 58 | ||||
-rw-r--r-- | third_party/rust/sync15-traits/src/lib.rs | 26 | ||||
-rw-r--r-- | third_party/rust/sync15-traits/src/payload.rs | 154 | ||||
-rw-r--r-- | third_party/rust/sync15-traits/src/request.rs | 175 | ||||
-rw-r--r-- | third_party/rust/sync15-traits/src/server_timestamp.rs | 118 | ||||
-rw-r--r-- | third_party/rust/sync15-traits/src/store.rs | 97 | ||||
-rw-r--r-- | third_party/rust/sync15-traits/src/telemetry.rs | 777 |
12 files changed, 1669 insertions, 0 deletions
diff --git a/third_party/rust/sync15-traits/.cargo-checksum.json b/third_party/rust/sync15-traits/.cargo-checksum.json new file mode 100644 index 0000000000..2887ffaca2 --- /dev/null +++ b/third_party/rust/sync15-traits/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"Cargo.toml":"f12d51072211a0986cb114404242360cafd1c2635d735b3047927bdb2f89bd43","README.md":"396105211d8ce7f40b05d8062d7ab55d99674555f3ac81c061874ae26656ed7e","src/bridged_engine.rs":"49339b8a4471ff1fba05f3fc59b41b4da99db6cd40bbf4f6fabc999860da7a7b","src/changeset.rs":"442aa92b5130ec0f8f2b0054acb399c547380e0060015cbf4ca7a72027440d54","src/client.rs":"104c35522d31b064b4315bfbd86ef8bef54c80ea7226de7c6ed00d7eb28bcaac","src/lib.rs":"a64802fb56b1fd066c4cfdf18874347e80fc9ef4a1975bdbbd76541b0fa1744c","src/payload.rs":"b82a10fac59daeedf4ae14f3b78def528c0bd7efc85c7a65b1a01164acf5af79","src/request.rs":"9e656ec487e53c7485643687e605d73bb25e138056e920d6f4b7d63fc6a8c460","src/server_timestamp.rs":"71b40eb2f9424c91b8a9b4f05cd0c349b9e821ec00d8d2fd7b92f1e7e3d65862","src/store.rs":"4cd1fa788dcdfed00a485483ca43e92ab5c307f79907be5a54a5ffb7687fbc8c","src/telemetry.rs":"027befb099a6fcded3457f7e566296548a0898ff613267190621856b9ef288f6"},"package":null}
\ No newline at end of file diff --git a/third_party/rust/sync15-traits/Cargo.toml b/third_party/rust/sync15-traits/Cargo.toml new file mode 100644 index 0000000000..f9a656b3af --- /dev/null +++ b/third_party/rust/sync15-traits/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "sync15-traits" +version = "0.1.0" +authors = ["Thom Chiovoloni <tchiovoloni@mozilla.com>"] +license = "MPL-2.0" +edition = "2018" + +[features] +random-guid = ["sync-guid/random"] + +[dependencies] +sync-guid = { path = "../guid" } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +log = "0.4" +ffi-support = "0.4" +url = "2.1" +anyhow = "1.0" diff --git a/third_party/rust/sync15-traits/README.md b/third_party/rust/sync15-traits/README.md new file mode 100644 index 0000000000..893abcf587 --- /dev/null +++ b/third_party/rust/sync15-traits/README.md @@ -0,0 +1,4 @@ +# sync15-traits + +Extracted types and traits from sync15. Usable for cases where depending on the +sync15 crate is impossible (like in remerge). diff --git a/third_party/rust/sync15-traits/src/bridged_engine.rs b/third_party/rust/sync15-traits/src/bridged_engine.rs new file mode 100644 index 0000000000..a6690ac9fe --- /dev/null +++ b/third_party/rust/sync15-traits/src/bridged_engine.rs @@ -0,0 +1,208 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::{error::Error, fmt}; + +use serde::{Deserialize, Serialize}; + +use super::{Guid, Payload, ServerTimestamp}; + +/// A bridged Sync engine implements all the methods needed to support +/// Desktop Sync. +pub trait BridgedEngine { + /// The type returned for errors. + type Error; + + /// Returns the last sync time, in milliseconds, for this engine's + /// collection. This is called before each sync, to determine the lower + /// bound for new records to fetch from the server. + fn last_sync(&self) -> Result<i64, Self::Error>; + + /// Sets the last sync time, in milliseconds. This is called throughout + /// the sync, to fast-forward the stored last sync time to match the + /// timestamp on the uploaded records. + fn set_last_sync(&self, last_sync_millis: i64) -> Result<(), Self::Error>; + + /// Returns the sync ID for this engine's collection. This is only used in + /// tests. + fn sync_id(&self) -> Result<Option<String>, Self::Error>; + + /// Resets the sync ID for this engine's collection, returning the new ID. + /// As a side effect, implementations should reset all local Sync state, + /// as in `reset`. + fn reset_sync_id(&self) -> Result<String, Self::Error>; + + /// Ensures that the locally stored sync ID for this engine's collection + /// matches the `new_sync_id` from the server. If the two don't match, + /// implementations should reset all local Sync state, as in `reset`. + /// This method returns the assigned sync ID, which can be either the + /// `new_sync_id`, or a different one if the engine wants to force other + /// devices to reset their Sync state for this collection the next time they + /// sync. + fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String, Self::Error>; + + /// Indicates that the engine is about to start syncing. This is called + /// once per sync, and always before `store_incoming`. + fn sync_started(&self) -> Result<(), Self::Error>; + + /// Stages a batch of incoming Sync records. This is called multiple + /// times per sync, once for each batch. Implementations can use the + /// signal to check if the operation was aborted, and cancel any + /// pending work. + fn store_incoming(&self, incoming_cleartexts: &[IncomingEnvelope]) -> Result<(), Self::Error>; + + /// Applies all staged records, reconciling changes on both sides and + /// resolving conflicts. Returns a list of records to upload. + fn apply(&self) -> Result<ApplyResults, Self::Error>; + + /// Indicates that the given record IDs were uploaded successfully to the + /// server. This is called multiple times per sync, once for each batch + /// upload. + fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<(), Self::Error>; + + /// Indicates that all records have been uploaded. At this point, any record + /// IDs marked for upload that haven't been passed to `set_uploaded`, can be + /// assumed to have failed: for example, because the server rejected a record + /// with an invalid TTL or sort index. + fn sync_finished(&self) -> Result<(), Self::Error>; + + /// Resets all local Sync state, including any change flags, mirrors, and + /// the last sync time, such that the next sync is treated as a first sync + /// with all new local data. Does not erase any local user data. + fn reset(&self) -> Result<(), Self::Error>; + + /// Erases all local user data for this collection, and any Sync metadata. + /// This method is destructive, and unused for most collections. + fn wipe(&self) -> Result<(), Self::Error>; +} + +#[derive(Clone, Debug, Default)] +pub struct ApplyResults { + /// List of records + pub envelopes: Vec<OutgoingEnvelope>, + /// The number of incoming records whose contents were merged because they + /// changed on both sides. None indicates we aren't reporting this + /// information. + pub num_reconciled: Option<usize>, +} + +impl ApplyResults { + pub fn new(envelopes: Vec<OutgoingEnvelope>, num_reconciled: impl Into<Option<usize>>) -> Self { + Self { + envelopes, + num_reconciled: num_reconciled.into(), + } + } +} + +// Shorthand for engines that don't care. +impl From<Vec<OutgoingEnvelope>> for ApplyResults { + fn from(envelopes: Vec<OutgoingEnvelope>) -> Self { + Self { + envelopes, + num_reconciled: None, + } + } +} + +/// An envelope for an incoming item, passed to `BridgedEngine::store_incoming`. +/// Envelopes are a halfway point between BSOs, the format used for all items on +/// the Sync server, and records, which are specific to each engine. +/// +/// A BSO is a JSON object with metadata fields (`id`, `modifed`, `sortindex`), +/// and a BSO payload that is itself a JSON string. For encrypted records, the +/// BSO payload has a ciphertext, which must be decrypted to yield a cleartext. +/// The cleartext is a JSON string (that's three levels of JSON wrapping, if +/// you're keeping score: the BSO itself, BSO payload, and cleartext) with the +/// actual record payload. +/// +/// An envelope combines the metadata fields from the BSO, and the cleartext +/// from the encrypted BSO payload. +#[derive(Clone, Debug, Deserialize)] +pub struct IncomingEnvelope { + pub id: Guid, + pub modified: ServerTimestamp, + #[serde(default)] + pub sortindex: Option<i32>, + #[serde(default)] + pub ttl: Option<u32>, + // Don't provide access to the cleartext directly. We want all callers to + // use `IncomingEnvelope::payload`, so that we can validate the cleartext. + cleartext: String, +} + +impl IncomingEnvelope { + /// Parses and returns the record payload from this envelope. Returns an + /// error if the envelope's cleartext isn't valid JSON, or the payload is + /// invalid. + pub fn payload(&self) -> Result<Payload, PayloadError> { + let payload: Payload = serde_json::from_str(&self.cleartext)?; + if payload.id != self.id { + return Err(PayloadError::MismatchedId { + envelope: self.id.clone(), + payload: payload.id, + }); + } + // Remove auto field data from payload and replace with real data + Ok(payload + .with_auto_field("ttl", self.ttl) + .with_auto_field("sortindex", self.sortindex)) + } +} + +/// An envelope for an outgoing item, returned from `BridgedEngine::apply`. This +/// is similar to `IncomingEnvelope`, but omits fields that are only set by the +/// server, like `modified`. +#[derive(Clone, Debug, Serialize)] +pub struct OutgoingEnvelope { + id: Guid, + cleartext: String, + sortindex: Option<i32>, + ttl: Option<u32>, +} + +impl From<Payload> for OutgoingEnvelope { + fn from(mut payload: Payload) -> Self { + let id = payload.id.clone(); + // Remove auto field data from OutgoingEnvelope payload + let ttl = payload.take_auto_field("ttl"); + let sortindex = payload.take_auto_field("sortindex"); + OutgoingEnvelope { + id, + cleartext: payload.into_json_string(), + sortindex, + ttl, + } + } +} + +/// An error that indicates a payload is invalid. +#[derive(Debug)] +pub enum PayloadError { + /// The payload contains invalid JSON. + Invalid(serde_json::Error), + /// The ID of the BSO in the envelope doesn't match the ID in the payload. + MismatchedId { envelope: Guid, payload: Guid }, +} + +impl Error for PayloadError {} + +impl From<serde_json::Error> for PayloadError { + fn from(err: serde_json::Error) -> PayloadError { + PayloadError::Invalid(err) + } +} + +impl fmt::Display for PayloadError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PayloadError::Invalid(err) => err.fmt(f), + PayloadError::MismatchedId { envelope, payload } => write!( + f, + "ID `{}` in envelope doesn't match `{}` in payload", + envelope, payload + ), + } + } +} diff --git a/third_party/rust/sync15-traits/src/changeset.rs b/third_party/rust/sync15-traits/src/changeset.rs new file mode 100644 index 0000000000..36d8f5b833 --- /dev/null +++ b/third_party/rust/sync15-traits/src/changeset.rs @@ -0,0 +1,33 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::{Payload, ServerTimestamp}; + +#[derive(Debug, Clone)] +pub struct RecordChangeset<P> { + pub changes: Vec<P>, + /// For GETs, the last sync timestamp that should be persisted after + /// applying the records. + /// For POSTs, this is the XIUS timestamp. + pub timestamp: ServerTimestamp, + pub collection: std::borrow::Cow<'static, str>, +} + +pub type IncomingChangeset = RecordChangeset<(Payload, ServerTimestamp)>; +pub type OutgoingChangeset = RecordChangeset<Payload>; + +// TODO: use a trait to unify this with the non-json versions +impl<T> RecordChangeset<T> { + #[inline] + pub fn new( + collection: impl Into<std::borrow::Cow<'static, str>>, + timestamp: ServerTimestamp, + ) -> RecordChangeset<T> { + RecordChangeset { + changes: vec![], + timestamp, + collection: collection.into(), + } + } +} diff --git a/third_party/rust/sync15-traits/src/client.rs b/third_party/rust/sync15-traits/src/client.rs new file mode 100644 index 0000000000..5349b68f63 --- /dev/null +++ b/third_party/rust/sync15-traits/src/client.rs @@ -0,0 +1,58 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! This module has to be here because of some hard-to-avoid hacks done for the +//! tabs engine... See issue #2590 + +use std::collections::HashMap; + +/// Argument to Store::prepare_for_sync. See comment there for more info. Only +/// really intended to be used by tabs engine. +#[derive(Clone, Debug)] +pub struct ClientData { + pub local_client_id: String, + pub recent_clients: HashMap<String, RemoteClient>, +} + +/// Information about a remote client in the clients collection. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct RemoteClient { + pub fxa_device_id: Option<String>, + pub device_name: String, + pub device_type: Option<DeviceType>, +} + +/// The type of a client. Please keep these variants in sync with the device +/// types in the FxA client and sync manager. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, serde::Deserialize, serde::Serialize)] +pub enum DeviceType { + Desktop, + Mobile, + Tablet, + VR, + TV, +} + +impl DeviceType { + pub fn try_from_str(d: impl AsRef<str>) -> Option<DeviceType> { + match d.as_ref() { + "desktop" => Some(DeviceType::Desktop), + "mobile" => Some(DeviceType::Mobile), + "tablet" => Some(DeviceType::Tablet), + "vr" => Some(DeviceType::VR), + "tv" => Some(DeviceType::TV), + _ => None, + } + } + + pub fn as_str(self) -> &'static str { + match self { + DeviceType::Desktop => "desktop", + DeviceType::Mobile => "mobile", + DeviceType::Tablet => "tablet", + DeviceType::VR => "vr", + DeviceType::TV => "tv", + } + } +} diff --git a/third_party/rust/sync15-traits/src/lib.rs b/third_party/rust/sync15-traits/src/lib.rs new file mode 100644 index 0000000000..fc31d84e5e --- /dev/null +++ b/third_party/rust/sync15-traits/src/lib.rs @@ -0,0 +1,26 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#![warn(rust_2018_idioms)] +pub mod bridged_engine; +mod changeset; +pub mod client; +mod payload; +pub mod request; +mod server_timestamp; +mod store; +pub mod telemetry; + +pub use bridged_engine::{ApplyResults, BridgedEngine, IncomingEnvelope, OutgoingEnvelope}; +pub use changeset::{IncomingChangeset, OutgoingChangeset, RecordChangeset}; +pub use payload::Payload; +pub use request::{CollectionRequest, RequestOrder}; +pub use server_timestamp::ServerTimestamp; +pub use store::{CollSyncIds, Store, StoreSyncAssociation}; +pub use sync_guid::Guid; + +// For skip_serializing_if +pub(crate) fn skip_if_default<T: PartialEq + Default>(v: &T) -> bool { + *v == T::default() +} diff --git a/third_party/rust/sync15-traits/src/payload.rs b/third_party/rust/sync15-traits/src/payload.rs new file mode 100644 index 0000000000..2e241774fe --- /dev/null +++ b/third_party/rust/sync15-traits/src/payload.rs @@ -0,0 +1,154 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +use super::Guid; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value as JsonValue}; + +/// Represents the decrypted payload in a Bso. Provides a minimal layer of type +/// safety to avoid double-encrypting. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct Payload { + pub id: Guid, + + #[serde(default)] + #[serde(skip_serializing_if = "crate::skip_if_default")] + pub deleted: bool, + + #[serde(flatten)] + pub data: Map<String, JsonValue>, +} + +impl Payload { + pub fn new_tombstone(id: impl Into<Guid>) -> Payload { + Payload { + id: id.into(), + deleted: true, + data: Map::new(), + } + } + + pub fn new_tombstone_with_ttl(id: impl Into<Guid>, ttl: u32) -> Payload { + let mut result = Payload::new_tombstone(id); + result.data.insert("ttl".into(), ttl.into()); + result + } + + #[inline] + pub fn with_sortindex(mut self, index: i32) -> Payload { + self.data.insert("sortindex".into(), index.into()); + self + } + + /// "Auto" fields are fields like 'sortindex' and 'ttl', which are: + /// + /// - Added to the payload automatically when deserializing if present on + /// the incoming BSO or envelope. + /// - Removed from the payload automatically and attached to the BSO or + /// envelope if present on the outgoing payload. + pub fn with_auto_field<T: Into<JsonValue>>(mut self, name: &str, v: Option<T>) -> Payload { + let old_value: Option<JsonValue> = if let Some(value) = v { + self.data.insert(name.into(), value.into()) + } else { + self.data.remove(name) + }; + + // This is a little dubious, but it seems like if we have a e.g. `sortindex` field on the payload + // it's going to be a bug if we use it instead of the "real" sort index. + if let Some(old_value) = old_value { + log::warn!( + "Payload for record {} already contains 'automatic' field \"{}\"? \ + Overwriting auto value: {} with 'real' value", + self.id, + name, + old_value, + ); + } + self + } + + pub fn take_auto_field<V>(&mut self, name: &str) -> Option<V> + where + for<'a> V: Deserialize<'a>, + { + let v = self.data.remove(name)?; + match serde_json::from_value(v) { + Ok(v) => Some(v), + Err(e) => { + log::error!( + "Automatic field {} exists on payload, but cannot be deserialized: {}", + name, + e + ); + None + } + } + } + + #[inline] + pub fn id(&self) -> &str { + &self.id[..] + } + + #[inline] + pub fn is_tombstone(&self) -> bool { + self.deleted + } + + pub fn from_json(value: JsonValue) -> Result<Payload, serde_json::Error> { + serde_json::from_value(value) + } + + /// Deserializes the BSO payload into a specific record type `T`. + /// + /// BSO payloads are unstructured JSON objects, with string keys and + /// dynamically-typed values. `into_record` makes it more convenient to + /// work with payloads by converting them into data type-specific structs. + /// Your record type only needs to derive or implement `serde::Deserialize`; + /// Serde will take care of the rest. + /// + /// # Errors + /// + /// `into_record` returns errors for type mismatches. As an example, trying + /// to deserialize a string value from the payload into an integer field in + /// `T` will fail. + /// + /// If there's a chance that a field contains invalid or mistyped data, + /// you'll want to extract it from `payload.data` manually, instead of using + /// `into_record`. This has been seen in the wild: for example, `dateAdded` + /// for bookmarks can be either an integer or a string. + pub fn into_record<T>(self) -> Result<T, serde_json::Error> + where + for<'a> T: Deserialize<'a>, + { + serde_json::from_value(JsonValue::from(self)) + } + + pub fn from_record<T: Serialize>(v: T) -> Result<Payload, serde_json::Error> { + // TODO(issue #2588): This is kind of dumb, we do to_value and then + // from_value. In general a more strongly typed API would help us avoid + // this sort of thing... But also concretely this could probably be + // avoided? At least in some cases. + Ok(Payload::from_json(serde_json::to_value(v)?)?) + } + + pub fn into_json_string(self) -> String { + serde_json::to_string(&JsonValue::from(self)) + .expect("JSON.stringify failed, which shouldn't be possible") + } +} + +impl From<Payload> for JsonValue { + fn from(cleartext: Payload) -> Self { + let Payload { + mut data, + id, + deleted, + } = cleartext; + data.insert("id".to_string(), JsonValue::String(id.into_string())); + if deleted { + data.insert("deleted".to_string(), JsonValue::Bool(true)); + } + JsonValue::Object(data) + } +} diff --git a/third_party/rust/sync15-traits/src/request.rs b/third_party/rust/sync15-traits/src/request.rs new file mode 100644 index 0000000000..1e51cf1e62 --- /dev/null +++ b/third_party/rust/sync15-traits/src/request.rs @@ -0,0 +1,175 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +use crate::{Guid, ServerTimestamp}; +use std::borrow::Cow; +use url::{form_urlencoded as form, Url, UrlQuery}; +#[derive(Debug, Clone, PartialEq)] +pub struct CollectionRequest { + pub collection: Cow<'static, str>, + pub full: bool, + pub ids: Option<Vec<Guid>>, + pub limit: usize, + pub older: Option<ServerTimestamp>, + pub newer: Option<ServerTimestamp>, + pub order: Option<RequestOrder>, + pub commit: bool, + pub batch: Option<String>, +} + +impl CollectionRequest { + #[inline] + pub fn new<S>(collection: S) -> CollectionRequest + where + S: Into<Cow<'static, str>>, + { + CollectionRequest { + collection: collection.into(), + full: false, + ids: None, + limit: 0, + older: None, + newer: None, + order: None, + commit: false, + batch: None, + } + } + + #[inline] + pub fn ids<V>(mut self, v: V) -> CollectionRequest + where + V: IntoIterator, + V::Item: Into<Guid>, + { + self.ids = Some(v.into_iter().map(|id| id.into()).collect()); + self + } + + #[inline] + pub fn full(mut self) -> CollectionRequest { + self.full = true; + self + } + + #[inline] + pub fn older_than(mut self, ts: ServerTimestamp) -> CollectionRequest { + self.older = Some(ts); + self + } + + #[inline] + pub fn newer_than(mut self, ts: ServerTimestamp) -> CollectionRequest { + self.newer = Some(ts); + self + } + + #[inline] + pub fn sort_by(mut self, order: RequestOrder) -> CollectionRequest { + self.order = Some(order); + self + } + + #[inline] + pub fn limit(mut self, num: usize) -> CollectionRequest { + self.limit = num; + self + } + + #[inline] + pub fn batch(mut self, batch: Option<String>) -> CollectionRequest { + self.batch = batch; + self + } + + #[inline] + pub fn commit(mut self, v: bool) -> CollectionRequest { + self.commit = v; + self + } + + fn build_query(&self, pairs: &mut form::Serializer<'_, UrlQuery<'_>>) { + if self.full { + pairs.append_pair("full", "1"); + } + if self.limit > 0 { + pairs.append_pair("limit", &self.limit.to_string()); + } + if let Some(ids) = &self.ids { + // Most ids are 12 characters, and we comma separate them, so 13. + let mut buf = String::with_capacity(ids.len() * 13); + for (i, id) in ids.iter().enumerate() { + if i > 0 { + buf.push(','); + } + buf.push_str(id.as_str()); + } + pairs.append_pair("ids", &buf); + } + if let Some(batch) = &self.batch { + pairs.append_pair("batch", &batch); + } + if self.commit { + pairs.append_pair("commit", "true"); + } + if let Some(ts) = self.older { + pairs.append_pair("older", &ts.to_string()); + } + if let Some(ts) = self.newer { + pairs.append_pair("newer", &ts.to_string()); + } + if let Some(o) = self.order { + pairs.append_pair("sort", o.as_str()); + } + pairs.finish(); + } + + pub fn build_url(&self, mut base_url: Url) -> Result<Url, UnacceptableBaseUrl> { + base_url + .path_segments_mut() + .map_err(|_| UnacceptableBaseUrl(()))? + .extend(&["storage", &self.collection]); + self.build_query(&mut base_url.query_pairs_mut()); + // This is strange but just accessing query_pairs_mut makes you have + // a trailing question mark on your url. I don't think anything bad + // would happen here, but I don't know, and also, it looks dumb so + // I'd rather not have it. + if base_url.query() == Some("") { + base_url.set_query(None); + } + Ok(base_url) + } +} +#[derive(Debug)] +pub struct UnacceptableBaseUrl(()); + +impl std::fmt::Display for UnacceptableBaseUrl { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("Storage server URL is not a base") + } +} +impl std::error::Error for UnacceptableBaseUrl {} + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub enum RequestOrder { + Oldest, + Newest, + Index, +} + +impl RequestOrder { + #[inline] + pub fn as_str(self) -> &'static str { + match self { + RequestOrder::Oldest => "oldest", + RequestOrder::Newest => "newest", + RequestOrder::Index => "index", + } + } +} + +impl std::fmt::Display for RequestOrder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} diff --git a/third_party/rust/sync15-traits/src/server_timestamp.rs b/third_party/rust/sync15-traits/src/server_timestamp.rs new file mode 100644 index 0000000000..8a361dbdba --- /dev/null +++ b/third_party/rust/sync15-traits/src/server_timestamp.rs @@ -0,0 +1,118 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +use std::time::Duration; + +/// Typesafe way to manage server timestamps without accidentally mixing them up with +/// local ones. +#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Default)] +pub struct ServerTimestamp(pub i64); + +impl ServerTimestamp { + pub fn from_float_seconds(ts: f64) -> Self { + let rf = (ts * 1000.0).round(); + if !rf.is_finite() || rf < 0.0 || rf >= i64::max_value() as f64 { + log::error!("Illegal timestamp: {}", ts); + ServerTimestamp(0) + } else { + ServerTimestamp(rf as i64) + } + } + + pub fn from_millis(ts: i64) -> Self { + // Catch it in tests, but just complain and replace with 0 otherwise. + debug_assert!(ts >= 0, "Bad timestamp: {}", ts); + if ts >= 0 { + Self(ts) + } else { + log::error!("Illegal timestamp, substituting 0: {}", ts); + Self(0) + } + } +} + +// This lets us use these in hyper header! blocks. +impl std::str::FromStr for ServerTimestamp { + type Err = std::num::ParseFloatError; + fn from_str(s: &str) -> Result<Self, Self::Err> { + let val = f64::from_str(s)?; + Ok(Self::from_float_seconds(val)) + } +} + +impl std::fmt::Display for ServerTimestamp { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0 as f64 / 1000.0) + } +} + +impl ServerTimestamp { + pub const EPOCH: ServerTimestamp = ServerTimestamp(0); + + /// Returns None if `other` is later than `self` (Duration may not represent + /// negative timespans in rust). + #[inline] + pub fn duration_since(self, other: ServerTimestamp) -> Option<Duration> { + let delta = self.0 - other.0; + if delta < 0 { + None + } else { + Some(Duration::from_millis(delta as u64)) + } + } + + /// Get the milliseconds for the timestamp. + #[inline] + pub fn as_millis(self) -> i64 { + self.0 + } +} + +impl serde::ser::Serialize for ServerTimestamp { + fn serialize<S: serde::ser::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> { + serializer.serialize_f64(self.0 as f64 / 1000.0) + } +} + +impl<'de> serde::de::Deserialize<'de> for ServerTimestamp { + fn deserialize<D: serde::de::Deserializer<'de>>(d: D) -> Result<Self, D::Error> { + f64::deserialize(d).map(Self::from_float_seconds) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_server_timestamp() { + let t0 = ServerTimestamp(10_300_150); + let t1 = ServerTimestamp(10_100_050); + assert!(t1.duration_since(t0).is_none()); + assert!(t0.duration_since(t1).is_some()); + let dur = t0.duration_since(t1).unwrap(); + assert_eq!(dur.as_secs(), 200); + assert_eq!(dur.subsec_nanos(), 100_000_000); + } + + #[test] + fn test_serde() { + let ts = ServerTimestamp(123_456); + + // test serialize + let ser = serde_json::to_string(&ts).unwrap(); + assert_eq!("123.456".to_string(), ser); + + // test deserialize of float + let ts: ServerTimestamp = serde_json::from_str(&ser).unwrap(); + assert_eq!(ServerTimestamp(123_456), ts); + + // test deserialize of whole number + let ts: ServerTimestamp = serde_json::from_str("123").unwrap(); + assert_eq!(ServerTimestamp(123_000), ts); + + // test deserialize of negative number + let ts: ServerTimestamp = serde_json::from_str("-123").unwrap(); + assert_eq!(ServerTimestamp(0), ts); + } +} diff --git a/third_party/rust/sync15-traits/src/store.rs b/third_party/rust/sync15-traits/src/store.rs new file mode 100644 index 0000000000..51a595473d --- /dev/null +++ b/third_party/rust/sync15-traits/src/store.rs @@ -0,0 +1,97 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::{ + client::ClientData, telemetry, CollectionRequest, Guid, IncomingChangeset, OutgoingChangeset, + ServerTimestamp, +}; +use anyhow::Result; + +#[derive(Debug, Clone, PartialEq)] +pub struct CollSyncIds { + pub global: Guid, + pub coll: Guid, +} + +/// Defines how a store is associated with Sync. +#[derive(Debug, Clone, PartialEq)] +pub enum StoreSyncAssociation { + /// This store is disconnected (although it may be connected in the future). + Disconnected, + /// Sync is connected, and has the following sync IDs. + Connected(CollSyncIds), +} + +/// Low-level store functionality. Stores that need custom reconciliation logic +/// should use this. +/// +/// Different stores will produce errors of different types. To accommodate +/// this, we force them all to return failure::Error. +pub trait Store { + fn collection_name(&self) -> std::borrow::Cow<'static, str>; + + /// Prepares the store for syncing. The tabs store currently uses this to + /// store the current list of clients, which it uses to look up device names + /// and types. + /// + /// Note that this method is only called by `sync_multiple`, and only if a + /// command processor is registered. In particular, `prepare_for_sync` will + /// not be called if the store is synced using `sync::synchronize` or + /// `sync_multiple::sync_multiple`. It _will_ be called if the store is + /// synced via the Sync Manager. + /// + /// TODO(issue #2590): This is pretty cludgey and will be hard to extend for + /// any case other than the tabs case. We should find another way to support + /// tabs... + fn prepare_for_sync(&self, _get_client_data: &dyn Fn() -> ClientData) -> Result<()> { + Ok(()) + } + + /// `inbound` is a vector to support the case where + /// `get_collection_requests` returned multiple requests. The changesets are + /// in the same order as the requests were -- e.g. if `vec![req_a, req_b]` + /// was returned from `get_collection_requests`, `inbound` will have the + /// results from `req_a` as its first index, and those from `req_b` as it's + /// second. + fn apply_incoming( + &self, + inbound: Vec<IncomingChangeset>, + telem: &mut telemetry::Engine, + ) -> Result<OutgoingChangeset>; + + fn sync_finished( + &self, + new_timestamp: ServerTimestamp, + records_synced: Vec<Guid>, + ) -> Result<()>; + + /// The store is responsible for building the collection request. Engines + /// typically will store a lastModified timestamp and use that to build a + /// request saying "give me full records since that date" - however, other + /// engines might do something fancier. This could even later be extended to + /// handle "backfills" etc + /// + /// To support more advanced use cases (e.g. remerge), multiple requests can + /// be returned here. The vast majority of engines will just want to return + /// zero or one item in their vector (zero is a valid optimization when the + /// server timestamp is the same as the engine last saw, one when it is not) + /// + /// Important: In the case when more than one collection is requested, it's + /// assumed the last one is the "canonical" one. (That is, it must be for + /// "this" collection, its timestamp is used to represent the sync, etc). + fn get_collection_requests( + &self, + server_timestamp: ServerTimestamp, + ) -> Result<Vec<CollectionRequest>>; + + /// Get persisted sync IDs. If they don't match the global state we'll be + /// `reset()` with the new IDs. + fn get_sync_assoc(&self) -> Result<StoreSyncAssociation>; + + /// Reset the store without wiping local data, ready for a "first sync". + /// `assoc` defines how this store is to be associated with sync. + fn reset(&self, assoc: &StoreSyncAssociation) -> Result<()>; + + fn wipe(&self) -> Result<()>; +} diff --git a/third_party/rust/sync15-traits/src/telemetry.rs b/third_party/rust/sync15-traits/src/telemetry.rs new file mode 100644 index 0000000000..a3609e3d6d --- /dev/null +++ b/third_party/rust/sync15-traits/src/telemetry.rs @@ -0,0 +1,777 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! Manage recording sync telemetry. Assumes some external telemetry +//! library/code which manages submitting. + +use std::collections::HashMap; +use std::time; + +use serde::{ser, Serialize, Serializer}; + +// A test helper, used by the many test modules below. +#[cfg(test)] +fn assert_json<T: ?Sized>(v: &T, expected: serde_json::Value) +where + T: serde::Serialize, +{ + assert_eq!( + serde_json::to_value(&v).expect("should get a value"), + expected + ); +} + +/// What we record for 'when' and 'took' in a telemetry record. +#[derive(Debug, Serialize)] +struct WhenTook { + when: f64, + #[serde(skip_serializing_if = "crate::skip_if_default")] + took: u64, +} + +/// What we track while recording 'when' and 'took. It serializes as a WhenTook, +/// except when .finished() hasn't been called, in which case it panics. +#[derive(Debug)] +enum Stopwatch { + Started(time::SystemTime, time::Instant), + Finished(WhenTook), +} + +impl Default for Stopwatch { + fn default() -> Self { + Stopwatch::new() + } +} + +impl Stopwatch { + fn new() -> Self { + Stopwatch::Started(time::SystemTime::now(), time::Instant::now()) + } + + // For tests we don't want real timestamps because we test against literals. + #[cfg(test)] + fn finished(&self) -> Self { + Stopwatch::Finished(WhenTook { when: 0.0, took: 0 }) + } + + #[cfg(not(test))] + fn finished(&self) -> Self { + match self { + Stopwatch::Started(st, si) => { + let std = st.duration_since(time::UNIX_EPOCH).unwrap_or_default(); + let when = std.as_secs() as f64; // we don't want sub-sec accuracy. Do we need to write a float? + + let sid = si.elapsed(); + let took = sid.as_secs() * 1000 + (u64::from(sid.subsec_nanos()) / 1_000_000); + Stopwatch::Finished(WhenTook { when, took }) + } + _ => { + unreachable!("can't finish twice"); + } + } + } +} + +impl Serialize for Stopwatch { + fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> + where + S: Serializer, + { + match self { + Stopwatch::Started(_, _) => Err(ser::Error::custom("StopWatch has not been finished")), + Stopwatch::Finished(c) => c.serialize(serializer), + } + } +} + +#[cfg(test)] +mod stopwatch_tests { + use super::*; + + // A wrapper struct because we flatten - this struct should serialize with + // 'when' and 'took' keys (but with no 'sw'.) + #[derive(Debug, Serialize)] + struct WT { + #[serde(flatten)] + sw: Stopwatch, + } + + #[test] + fn test_not_finished() { + let wt = WT { + sw: Stopwatch::new(), + }; + serde_json::to_string(&wt).expect_err("unfinished stopwatch should fail"); + } + + #[test] + fn test() { + assert_json( + &WT { + sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 1 }), + }, + serde_json::json!({"when": 1.0, "took": 1}), + ); + assert_json( + &WT { + sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 0 }), + }, + serde_json::json!({"when": 1.0}), + ); + } +} + +/// A generic "Event" - suitable for all kinds of pings (although this module +/// only cares about the sync ping) +#[derive(Debug, Serialize)] +pub struct Event { + // We use static str references as we expect values to be literals. + object: &'static str, + + method: &'static str, + + // Maybe "value" should be a string? + #[serde(skip_serializing_if = "Option::is_none")] + value: Option<&'static str>, + + // we expect the keys to be literals but values are real strings. + #[serde(skip_serializing_if = "Option::is_none")] + extra: Option<HashMap<&'static str, String>>, +} + +impl Event { + pub fn new(object: &'static str, method: &'static str) -> Self { + assert!(object.len() <= 20); + assert!(method.len() <= 20); + Self { + object, + method, + value: None, + extra: None, + } + } + + pub fn value(mut self, v: &'static str) -> Self { + assert!(v.len() <= 80); + self.value = Some(v); + self + } + + pub fn extra(mut self, key: &'static str, val: String) -> Self { + assert!(key.len() <= 15); + assert!(val.len() <= 85); + match self.extra { + None => self.extra = Some(HashMap::new()), + Some(ref e) => assert!(e.len() < 10), + } + self.extra.as_mut().unwrap().insert(key, val); + self + } +} + +#[cfg(test)] +mod test_events { + use super::*; + + #[test] + #[should_panic] + fn test_invalid_length_ctor() { + Event::new("A very long object value", "Method"); + } + + #[test] + #[should_panic] + fn test_invalid_length_extra_key() { + Event::new("O", "M").extra("A very long key value", "v".to_string()); + } + + #[test] + #[should_panic] + fn test_invalid_length_extra_val() { + let l = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ + abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + Event::new("O", "M").extra("k", l.to_string()); + } + + #[test] + #[should_panic] + fn test_too_many_extras() { + let l = "abcdefghijk"; + let mut e = Event::new("Object", "Method"); + for i in 0..l.len() { + e = e.extra(&l[i..=i], "v".to_string()); + } + } + + #[test] + fn test_json() { + assert_json( + &Event::new("Object", "Method").value("Value"), + serde_json::json!({"object": "Object", "method": "Method", "value": "Value"}), + ); + + assert_json( + &Event::new("Object", "Method").extra("one", "one".to_string()), + serde_json::json!({"object": "Object", + "method": "Method", + "extra": {"one": "one"} + }), + ) + } +} + +/// A Sync failure. +#[derive(Debug, Serialize)] +#[serde(tag = "name")] +pub enum SyncFailure { + #[serde(rename = "shutdownerror")] + Shutdown, + + #[serde(rename = "othererror")] + Other { error: String }, + + #[serde(rename = "unexpectederror")] + Unexpected { error: String }, + + #[serde(rename = "autherror")] + Auth { from: &'static str }, + + #[serde(rename = "httperror")] + Http { code: u16 }, +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn reprs() { + assert_json( + &SyncFailure::Shutdown, + serde_json::json!({"name": "shutdownerror"}), + ); + + assert_json( + &SyncFailure::Other { + error: "dunno".to_string(), + }, + serde_json::json!({"name": "othererror", "error": "dunno"}), + ); + + assert_json( + &SyncFailure::Unexpected { + error: "dunno".to_string(), + }, + serde_json::json!({"name": "unexpectederror", "error": "dunno"}), + ); + + assert_json( + &SyncFailure::Auth { from: "FxA" }, + serde_json::json!({"name": "autherror", "from": "FxA"}), + ); + + assert_json( + &SyncFailure::Http { code: 500 }, + serde_json::json!({"name": "httperror", "code": 500}), + ); + } +} + +/// Incoming record for an engine's sync +#[derive(Debug, Default, Serialize)] +pub struct EngineIncoming { + #[serde(skip_serializing_if = "crate::skip_if_default")] + applied: u32, + + #[serde(skip_serializing_if = "crate::skip_if_default")] + failed: u32, + + #[serde(rename = "newFailed")] + #[serde(skip_serializing_if = "crate::skip_if_default")] + new_failed: u32, + + #[serde(skip_serializing_if = "crate::skip_if_default")] + reconciled: u32, +} + +impl EngineIncoming { + pub fn new() -> Self { + Self { + ..Default::default() + } + } + + // A helper used via skip_serializing_if + fn is_empty(inc: &Option<Self>) -> bool { + match inc { + Some(a) => a.applied == 0 && a.failed == 0 && a.new_failed == 0 && a.reconciled == 0, + None => true, + } + } + + /// Increment the value of `applied` by `n`. + #[inline] + pub fn applied(&mut self, n: u32) { + self.applied += n; + } + + /// Increment the value of `failed` by `n`. + #[inline] + pub fn failed(&mut self, n: u32) { + self.failed += n; + } + + /// Increment the value of `new_failed` by `n`. + #[inline] + pub fn new_failed(&mut self, n: u32) { + self.new_failed += n; + } + + /// Increment the value of `reconciled` by `n`. + #[inline] + pub fn reconciled(&mut self, n: u32) { + self.reconciled += n; + } + + /// Get the value of `applied`. Mostly useful for testing. + #[inline] + pub fn get_applied(&self) -> u32 { + self.applied + } + + /// Get the value of `failed`. Mostly useful for testing. + #[inline] + pub fn get_failed(&self) -> u32 { + self.failed + } + + /// Get the value of `new_failed`. Mostly useful for testing. + #[inline] + pub fn get_new_failed(&self) -> u32 { + self.new_failed + } + + /// Get the value of `reconciled`. Mostly useful for testing. + #[inline] + pub fn get_reconciled(&self) -> u32 { + self.reconciled + } +} + +/// Outgoing record for an engine's sync +#[derive(Debug, Default, Serialize)] +pub struct EngineOutgoing { + #[serde(skip_serializing_if = "crate::skip_if_default")] + sent: usize, + + #[serde(skip_serializing_if = "crate::skip_if_default")] + failed: usize, +} + +impl EngineOutgoing { + pub fn new() -> Self { + EngineOutgoing { + ..Default::default() + } + } + + #[inline] + pub fn sent(&mut self, n: usize) { + self.sent += n; + } + + #[inline] + pub fn failed(&mut self, n: usize) { + self.failed += n; + } +} + +/// One engine's sync. +#[derive(Debug, Serialize)] +pub struct Engine { + name: String, + + #[serde(flatten)] + when_took: Stopwatch, + + #[serde(skip_serializing_if = "EngineIncoming::is_empty")] + incoming: Option<EngineIncoming>, + + #[serde(skip_serializing_if = "Vec::is_empty")] + outgoing: Vec<EngineOutgoing>, // one for each batch posted. + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "failureReason")] + failure: Option<SyncFailure>, + + #[serde(skip_serializing_if = "Option::is_none")] + validation: Option<Validation>, +} + +impl Engine { + pub fn new(name: impl Into<String>) -> Self { + Self { + name: name.into(), + when_took: Stopwatch::new(), + incoming: None, + outgoing: Vec::new(), + failure: None, + validation: None, + } + } + + pub fn incoming(&mut self, inc: EngineIncoming) { + assert!(self.incoming.is_none()); + self.incoming = Some(inc); + } + + pub fn outgoing(&mut self, out: EngineOutgoing) { + self.outgoing.push(out); + } + + pub fn failure(&mut self, err: impl Into<SyncFailure>) { + // Currently we take the first error, under the assumption that the + // first is the most important and all others stem from that. + let failure = err.into(); + if self.failure.is_none() { + self.failure = Some(failure); + } else { + log::warn!( + "engine already has recorded a failure of {:?} - ignoring {:?}", + &self.failure, + &failure + ); + } + } + + pub fn validation(&mut self, v: Validation) { + assert!(self.validation.is_none()); + self.validation = Some(v); + } + + fn finished(&mut self) { + self.when_took = self.when_took.finished(); + } +} + +#[derive(Debug, Default, Serialize)] +pub struct Validation { + version: u32, + + #[serde(skip_serializing_if = "Vec::is_empty")] + problems: Vec<Problem>, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "failureReason")] + failure: Option<SyncFailure>, +} + +impl Validation { + pub fn with_version(version: u32) -> Validation { + Validation { + version, + ..Validation::default() + } + } + + pub fn problem(&mut self, name: &'static str, count: usize) -> &mut Self { + if count > 0 { + self.problems.push(Problem { name, count }); + } + self + } +} + +#[derive(Debug, Default, Serialize)] +pub struct Problem { + name: &'static str, + #[serde(skip_serializing_if = "crate::skip_if_default")] + count: usize, +} + +#[cfg(test)] +mod engine_tests { + use super::*; + + #[test] + fn test_engine() { + let mut e = Engine::new("test_engine"); + e.finished(); + assert_json(&e, serde_json::json!({"name": "test_engine", "when": 0.0})); + } + + #[test] + fn test_engine_not_finished() { + let e = Engine::new("test_engine"); + serde_json::to_value(&e).expect_err("unfinished stopwatch should fail"); + } + + #[test] + fn test_incoming() { + let mut i = EngineIncoming::new(); + i.applied(1); + i.failed(2); + let mut e = Engine::new("TestEngine"); + e.incoming(i); + e.finished(); + assert_json( + &e, + serde_json::json!({"name": "TestEngine", "when": 0.0, "incoming": {"applied": 1, "failed": 2}}), + ); + } + + #[test] + fn test_outgoing() { + let mut o = EngineOutgoing::new(); + o.sent(2); + o.failed(1); + let mut e = Engine::new("TestEngine"); + e.outgoing(o); + e.finished(); + assert_json( + &e, + serde_json::json!({"name": "TestEngine", "when": 0.0, "outgoing": [{"sent": 2, "failed": 1}]}), + ); + } + + #[test] + fn test_failure() { + let mut e = Engine::new("TestEngine"); + e.failure(SyncFailure::Http { code: 500 }); + e.finished(); + assert_json( + &e, + serde_json::json!({"name": "TestEngine", + "when": 0.0, + "failureReason": {"name": "httperror", "code": 500} + }), + ); + } + + #[test] + fn test_raw() { + let mut e = Engine::new("TestEngine"); + let mut inc = EngineIncoming::new(); + inc.applied(10); + e.incoming(inc); + let mut out = EngineOutgoing::new(); + out.sent(1); + e.outgoing(out); + e.failure(SyncFailure::Http { code: 500 }); + e.finished(); + + assert_eq!(e.outgoing.len(), 1); + assert_eq!(e.incoming.as_ref().unwrap().applied, 10); + assert_eq!(e.outgoing[0].sent, 1); + assert!(e.failure.is_some()); + serde_json::to_string(&e).expect("should get json"); + } +} + +/// A single sync. May have many engines, may have its own failure. +#[derive(Debug, Serialize, Default)] +pub struct SyncTelemetry { + #[serde(flatten)] + when_took: Stopwatch, + + #[serde(skip_serializing_if = "Vec::is_empty")] + engines: Vec<Engine>, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "failureReason")] + failure: Option<SyncFailure>, +} + +impl SyncTelemetry { + pub fn new() -> Self { + Default::default() + } + + pub fn engine(&mut self, mut e: Engine) { + e.finished(); + self.engines.push(e); + } + + pub fn failure(&mut self, failure: SyncFailure) { + assert!(self.failure.is_none()); + self.failure = Some(failure); + } + + // Note that unlike other 'finished' methods, this isn't private - someone + // needs to explicitly call this before handling the json payload to + // whatever ends up submitting it. + pub fn finished(&mut self) { + self.when_took = self.when_took.finished(); + } +} + +#[cfg(test)] +mod sync_tests { + use super::*; + + #[test] + fn test_accum() { + let mut s = SyncTelemetry::new(); + let mut inc = EngineIncoming::new(); + inc.applied(10); + let mut e = Engine::new("test_engine"); + e.incoming(inc); + e.failure(SyncFailure::Http { code: 500 }); + e.finished(); + s.engine(e); + s.finished(); + + assert_json( + &s, + serde_json::json!({ + "when": 0.0, + "engines": [{ + "name":"test_engine", + "when":0.0, + "incoming": { + "applied": 10 + }, + "failureReason": { + "name": "httperror", + "code": 500 + } + }] + }), + ); + } + + #[test] + fn test_multi_engine() { + let mut inc_e1 = EngineIncoming::new(); + inc_e1.applied(1); + let mut e1 = Engine::new("test_engine"); + e1.incoming(inc_e1); + + let mut inc_e2 = EngineIncoming::new(); + inc_e2.failed(1); + let mut e2 = Engine::new("test_engine_2"); + e2.incoming(inc_e2); + let mut out_e2 = EngineOutgoing::new(); + out_e2.sent(1); + e2.outgoing(out_e2); + + let mut s = SyncTelemetry::new(); + s.engine(e1); + s.engine(e2); + s.failure(SyncFailure::Http { code: 500 }); + s.finished(); + assert_json( + &s, + serde_json::json!({ + "when": 0.0, + "engines": [{ + "name": "test_engine", + "when": 0.0, + "incoming": { + "applied": 1 + } + },{ + "name": "test_engine_2", + "when": 0.0, + "incoming": { + "failed": 1 + }, + "outgoing": [{ + "sent": 1 + }] + }], + "failureReason": { + "name": "httperror", + "code": 500 + } + }), + ); + } +} + +/// The Sync ping payload, as documented at +/// https://firefox-source-docs.mozilla.org/toolkit/components/telemetry/telemetry/data/sync-ping.html. +/// May have many syncs, may have many events. However, due to the architecture +/// of apps which use these components, this payload is almost certainly not +/// suitable for submitting directly. For example, we will always return a +/// payload with exactly 1 sync, and it will not know certain other fields +/// in the payload, such as the *hashed* FxA device ID (see +/// https://searchfox.org/mozilla-central/rev/c3ebaf6de2d481c262c04bb9657eaf76bf47e2ac/services/sync/modules/browserid_identity.js#185 +/// for an example of how the device ID is constructed). The intention is that +/// consumers of this will use this to create a "real" payload - eg, accumulating +/// until some threshold number of syncs is reached, and contributing +/// additional data which only the consumer knows. +#[derive(Debug, Serialize, Default)] +pub struct SyncTelemetryPing { + version: u32, + + uid: Option<String>, + + #[serde(skip_serializing_if = "Vec::is_empty")] + events: Vec<Event>, + + #[serde(skip_serializing_if = "Vec::is_empty")] + syncs: Vec<SyncTelemetry>, +} + +impl SyncTelemetryPing { + pub fn new() -> Self { + Self { + version: 1, + ..Default::default() + } + } + + pub fn uid(&mut self, uid: String) { + if let Some(ref existing) = self.uid { + if *existing != uid { + log::warn!("existing uid ${} being replaced by {}", existing, uid); + } + } + self.uid = Some(uid); + } + + pub fn sync(&mut self, mut s: SyncTelemetry) { + s.finished(); + self.syncs.push(s); + } + + pub fn event(&mut self, e: Event) { + self.events.push(e); + } +} + +ffi_support::implement_into_ffi_by_json!(SyncTelemetryPing); + +#[cfg(test)] +mod ping_tests { + use super::*; + #[test] + fn test_ping() { + let engine = Engine::new("test"); + let mut s = SyncTelemetry::new(); + s.engine(engine); + let mut p = SyncTelemetryPing::new(); + p.uid("user-id".into()); + p.sync(s); + let event = Event::new("foo", "bar"); + p.event(event); + assert_json( + &p, + serde_json::json!({ + "events": [{ + "method": "bar", "object": "foo" + }], + "syncs": [{ + "engines": [{ + "name": "test", "when": 0.0 + }], + "when": 0.0 + }], + "uid": "user-id", + "version": 1 + }), + ); + } +} |