summaryrefslogtreecommitdiffstats
path: root/third_party/rust/sync15-traits
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/sync15-traits')
-rw-r--r--third_party/rust/sync15-traits/.cargo-checksum.json1
-rw-r--r--third_party/rust/sync15-traits/Cargo.toml18
-rw-r--r--third_party/rust/sync15-traits/README.md4
-rw-r--r--third_party/rust/sync15-traits/src/bridged_engine.rs208
-rw-r--r--third_party/rust/sync15-traits/src/changeset.rs33
-rw-r--r--third_party/rust/sync15-traits/src/client.rs58
-rw-r--r--third_party/rust/sync15-traits/src/lib.rs26
-rw-r--r--third_party/rust/sync15-traits/src/payload.rs154
-rw-r--r--third_party/rust/sync15-traits/src/request.rs175
-rw-r--r--third_party/rust/sync15-traits/src/server_timestamp.rs118
-rw-r--r--third_party/rust/sync15-traits/src/store.rs97
-rw-r--r--third_party/rust/sync15-traits/src/telemetry.rs777
12 files changed, 1669 insertions, 0 deletions
diff --git a/third_party/rust/sync15-traits/.cargo-checksum.json b/third_party/rust/sync15-traits/.cargo-checksum.json
new file mode 100644
index 0000000000..2887ffaca2
--- /dev/null
+++ b/third_party/rust/sync15-traits/.cargo-checksum.json
@@ -0,0 +1 @@
+{"files":{"Cargo.toml":"f12d51072211a0986cb114404242360cafd1c2635d735b3047927bdb2f89bd43","README.md":"396105211d8ce7f40b05d8062d7ab55d99674555f3ac81c061874ae26656ed7e","src/bridged_engine.rs":"49339b8a4471ff1fba05f3fc59b41b4da99db6cd40bbf4f6fabc999860da7a7b","src/changeset.rs":"442aa92b5130ec0f8f2b0054acb399c547380e0060015cbf4ca7a72027440d54","src/client.rs":"104c35522d31b064b4315bfbd86ef8bef54c80ea7226de7c6ed00d7eb28bcaac","src/lib.rs":"a64802fb56b1fd066c4cfdf18874347e80fc9ef4a1975bdbbd76541b0fa1744c","src/payload.rs":"b82a10fac59daeedf4ae14f3b78def528c0bd7efc85c7a65b1a01164acf5af79","src/request.rs":"9e656ec487e53c7485643687e605d73bb25e138056e920d6f4b7d63fc6a8c460","src/server_timestamp.rs":"71b40eb2f9424c91b8a9b4f05cd0c349b9e821ec00d8d2fd7b92f1e7e3d65862","src/store.rs":"4cd1fa788dcdfed00a485483ca43e92ab5c307f79907be5a54a5ffb7687fbc8c","src/telemetry.rs":"027befb099a6fcded3457f7e566296548a0898ff613267190621856b9ef288f6"},"package":null} \ No newline at end of file
diff --git a/third_party/rust/sync15-traits/Cargo.toml b/third_party/rust/sync15-traits/Cargo.toml
new file mode 100644
index 0000000000..f9a656b3af
--- /dev/null
+++ b/third_party/rust/sync15-traits/Cargo.toml
@@ -0,0 +1,18 @@
+[package]
+name = "sync15-traits"
+version = "0.1.0"
+authors = ["Thom Chiovoloni <tchiovoloni@mozilla.com>"]
+license = "MPL-2.0"
+edition = "2018"
+
+[features]
+random-guid = ["sync-guid/random"]
+
+[dependencies]
+sync-guid = { path = "../guid" }
+serde = { version = "1", features = ["derive"] }
+serde_json = "1"
+log = "0.4"
+ffi-support = "0.4"
+url = "2.1"
+anyhow = "1.0"
diff --git a/third_party/rust/sync15-traits/README.md b/third_party/rust/sync15-traits/README.md
new file mode 100644
index 0000000000..893abcf587
--- /dev/null
+++ b/third_party/rust/sync15-traits/README.md
@@ -0,0 +1,4 @@
+# sync15-traits
+
+Extracted types and traits from sync15. Usable for cases where depending on the
+sync15 crate is impossible (like in remerge).
diff --git a/third_party/rust/sync15-traits/src/bridged_engine.rs b/third_party/rust/sync15-traits/src/bridged_engine.rs
new file mode 100644
index 0000000000..a6690ac9fe
--- /dev/null
+++ b/third_party/rust/sync15-traits/src/bridged_engine.rs
@@ -0,0 +1,208 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::{error::Error, fmt};
+
+use serde::{Deserialize, Serialize};
+
+use super::{Guid, Payload, ServerTimestamp};
+
+/// A bridged Sync engine implements all the methods needed to support
+/// Desktop Sync.
+pub trait BridgedEngine {
+ /// The type returned for errors.
+ type Error;
+
+ /// Returns the last sync time, in milliseconds, for this engine's
+ /// collection. This is called before each sync, to determine the lower
+ /// bound for new records to fetch from the server.
+ fn last_sync(&self) -> Result<i64, Self::Error>;
+
+ /// Sets the last sync time, in milliseconds. This is called throughout
+ /// the sync, to fast-forward the stored last sync time to match the
+ /// timestamp on the uploaded records.
+ fn set_last_sync(&self, last_sync_millis: i64) -> Result<(), Self::Error>;
+
+ /// Returns the sync ID for this engine's collection. This is only used in
+ /// tests.
+ fn sync_id(&self) -> Result<Option<String>, Self::Error>;
+
+ /// Resets the sync ID for this engine's collection, returning the new ID.
+ /// As a side effect, implementations should reset all local Sync state,
+ /// as in `reset`.
+ fn reset_sync_id(&self) -> Result<String, Self::Error>;
+
+ /// Ensures that the locally stored sync ID for this engine's collection
+ /// matches the `new_sync_id` from the server. If the two don't match,
+ /// implementations should reset all local Sync state, as in `reset`.
+ /// This method returns the assigned sync ID, which can be either the
+ /// `new_sync_id`, or a different one if the engine wants to force other
+ /// devices to reset their Sync state for this collection the next time they
+ /// sync.
+ fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String, Self::Error>;
+
+ /// Indicates that the engine is about to start syncing. This is called
+ /// once per sync, and always before `store_incoming`.
+ fn sync_started(&self) -> Result<(), Self::Error>;
+
+ /// Stages a batch of incoming Sync records. This is called multiple
+ /// times per sync, once for each batch. Implementations can use the
+ /// signal to check if the operation was aborted, and cancel any
+ /// pending work.
+ fn store_incoming(&self, incoming_cleartexts: &[IncomingEnvelope]) -> Result<(), Self::Error>;
+
+ /// Applies all staged records, reconciling changes on both sides and
+ /// resolving conflicts. Returns a list of records to upload.
+ fn apply(&self) -> Result<ApplyResults, Self::Error>;
+
+ /// Indicates that the given record IDs were uploaded successfully to the
+ /// server. This is called multiple times per sync, once for each batch
+ /// upload.
+ fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<(), Self::Error>;
+
+ /// Indicates that all records have been uploaded. At this point, any record
+ /// IDs marked for upload that haven't been passed to `set_uploaded`, can be
+ /// assumed to have failed: for example, because the server rejected a record
+ /// with an invalid TTL or sort index.
+ fn sync_finished(&self) -> Result<(), Self::Error>;
+
+ /// Resets all local Sync state, including any change flags, mirrors, and
+ /// the last sync time, such that the next sync is treated as a first sync
+ /// with all new local data. Does not erase any local user data.
+ fn reset(&self) -> Result<(), Self::Error>;
+
+ /// Erases all local user data for this collection, and any Sync metadata.
+ /// This method is destructive, and unused for most collections.
+ fn wipe(&self) -> Result<(), Self::Error>;
+}
+
+#[derive(Clone, Debug, Default)]
+pub struct ApplyResults {
+ /// List of records
+ pub envelopes: Vec<OutgoingEnvelope>,
+ /// The number of incoming records whose contents were merged because they
+ /// changed on both sides. None indicates we aren't reporting this
+ /// information.
+ pub num_reconciled: Option<usize>,
+}
+
+impl ApplyResults {
+ pub fn new(envelopes: Vec<OutgoingEnvelope>, num_reconciled: impl Into<Option<usize>>) -> Self {
+ Self {
+ envelopes,
+ num_reconciled: num_reconciled.into(),
+ }
+ }
+}
+
+// Shorthand for engines that don't care.
+impl From<Vec<OutgoingEnvelope>> for ApplyResults {
+ fn from(envelopes: Vec<OutgoingEnvelope>) -> Self {
+ Self {
+ envelopes,
+ num_reconciled: None,
+ }
+ }
+}
+
+/// An envelope for an incoming item, passed to `BridgedEngine::store_incoming`.
+/// Envelopes are a halfway point between BSOs, the format used for all items on
+/// the Sync server, and records, which are specific to each engine.
+///
+/// A BSO is a JSON object with metadata fields (`id`, `modifed`, `sortindex`),
+/// and a BSO payload that is itself a JSON string. For encrypted records, the
+/// BSO payload has a ciphertext, which must be decrypted to yield a cleartext.
+/// The cleartext is a JSON string (that's three levels of JSON wrapping, if
+/// you're keeping score: the BSO itself, BSO payload, and cleartext) with the
+/// actual record payload.
+///
+/// An envelope combines the metadata fields from the BSO, and the cleartext
+/// from the encrypted BSO payload.
+#[derive(Clone, Debug, Deserialize)]
+pub struct IncomingEnvelope {
+ pub id: Guid,
+ pub modified: ServerTimestamp,
+ #[serde(default)]
+ pub sortindex: Option<i32>,
+ #[serde(default)]
+ pub ttl: Option<u32>,
+ // Don't provide access to the cleartext directly. We want all callers to
+ // use `IncomingEnvelope::payload`, so that we can validate the cleartext.
+ cleartext: String,
+}
+
+impl IncomingEnvelope {
+ /// Parses and returns the record payload from this envelope. Returns an
+ /// error if the envelope's cleartext isn't valid JSON, or the payload is
+ /// invalid.
+ pub fn payload(&self) -> Result<Payload, PayloadError> {
+ let payload: Payload = serde_json::from_str(&self.cleartext)?;
+ if payload.id != self.id {
+ return Err(PayloadError::MismatchedId {
+ envelope: self.id.clone(),
+ payload: payload.id,
+ });
+ }
+ // Remove auto field data from payload and replace with real data
+ Ok(payload
+ .with_auto_field("ttl", self.ttl)
+ .with_auto_field("sortindex", self.sortindex))
+ }
+}
+
+/// An envelope for an outgoing item, returned from `BridgedEngine::apply`. This
+/// is similar to `IncomingEnvelope`, but omits fields that are only set by the
+/// server, like `modified`.
+#[derive(Clone, Debug, Serialize)]
+pub struct OutgoingEnvelope {
+ id: Guid,
+ cleartext: String,
+ sortindex: Option<i32>,
+ ttl: Option<u32>,
+}
+
+impl From<Payload> for OutgoingEnvelope {
+ fn from(mut payload: Payload) -> Self {
+ let id = payload.id.clone();
+ // Remove auto field data from OutgoingEnvelope payload
+ let ttl = payload.take_auto_field("ttl");
+ let sortindex = payload.take_auto_field("sortindex");
+ OutgoingEnvelope {
+ id,
+ cleartext: payload.into_json_string(),
+ sortindex,
+ ttl,
+ }
+ }
+}
+
+/// An error that indicates a payload is invalid.
+#[derive(Debug)]
+pub enum PayloadError {
+ /// The payload contains invalid JSON.
+ Invalid(serde_json::Error),
+ /// The ID of the BSO in the envelope doesn't match the ID in the payload.
+ MismatchedId { envelope: Guid, payload: Guid },
+}
+
+impl Error for PayloadError {}
+
+impl From<serde_json::Error> for PayloadError {
+ fn from(err: serde_json::Error) -> PayloadError {
+ PayloadError::Invalid(err)
+ }
+}
+
+impl fmt::Display for PayloadError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ PayloadError::Invalid(err) => err.fmt(f),
+ PayloadError::MismatchedId { envelope, payload } => write!(
+ f,
+ "ID `{}` in envelope doesn't match `{}` in payload",
+ envelope, payload
+ ),
+ }
+ }
+}
diff --git a/third_party/rust/sync15-traits/src/changeset.rs b/third_party/rust/sync15-traits/src/changeset.rs
new file mode 100644
index 0000000000..36d8f5b833
--- /dev/null
+++ b/third_party/rust/sync15-traits/src/changeset.rs
@@ -0,0 +1,33 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::{Payload, ServerTimestamp};
+
+#[derive(Debug, Clone)]
+pub struct RecordChangeset<P> {
+ pub changes: Vec<P>,
+ /// For GETs, the last sync timestamp that should be persisted after
+ /// applying the records.
+ /// For POSTs, this is the XIUS timestamp.
+ pub timestamp: ServerTimestamp,
+ pub collection: std::borrow::Cow<'static, str>,
+}
+
+pub type IncomingChangeset = RecordChangeset<(Payload, ServerTimestamp)>;
+pub type OutgoingChangeset = RecordChangeset<Payload>;
+
+// TODO: use a trait to unify this with the non-json versions
+impl<T> RecordChangeset<T> {
+ #[inline]
+ pub fn new(
+ collection: impl Into<std::borrow::Cow<'static, str>>,
+ timestamp: ServerTimestamp,
+ ) -> RecordChangeset<T> {
+ RecordChangeset {
+ changes: vec![],
+ timestamp,
+ collection: collection.into(),
+ }
+ }
+}
diff --git a/third_party/rust/sync15-traits/src/client.rs b/third_party/rust/sync15-traits/src/client.rs
new file mode 100644
index 0000000000..5349b68f63
--- /dev/null
+++ b/third_party/rust/sync15-traits/src/client.rs
@@ -0,0 +1,58 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+//! This module has to be here because of some hard-to-avoid hacks done for the
+//! tabs engine... See issue #2590
+
+use std::collections::HashMap;
+
+/// Argument to Store::prepare_for_sync. See comment there for more info. Only
+/// really intended to be used by tabs engine.
+#[derive(Clone, Debug)]
+pub struct ClientData {
+ pub local_client_id: String,
+ pub recent_clients: HashMap<String, RemoteClient>,
+}
+
+/// Information about a remote client in the clients collection.
+#[derive(Clone, Debug, Eq, Hash, PartialEq)]
+pub struct RemoteClient {
+ pub fxa_device_id: Option<String>,
+ pub device_name: String,
+ pub device_type: Option<DeviceType>,
+}
+
+/// The type of a client. Please keep these variants in sync with the device
+/// types in the FxA client and sync manager.
+#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, serde::Deserialize, serde::Serialize)]
+pub enum DeviceType {
+ Desktop,
+ Mobile,
+ Tablet,
+ VR,
+ TV,
+}
+
+impl DeviceType {
+ pub fn try_from_str(d: impl AsRef<str>) -> Option<DeviceType> {
+ match d.as_ref() {
+ "desktop" => Some(DeviceType::Desktop),
+ "mobile" => Some(DeviceType::Mobile),
+ "tablet" => Some(DeviceType::Tablet),
+ "vr" => Some(DeviceType::VR),
+ "tv" => Some(DeviceType::TV),
+ _ => None,
+ }
+ }
+
+ pub fn as_str(self) -> &'static str {
+ match self {
+ DeviceType::Desktop => "desktop",
+ DeviceType::Mobile => "mobile",
+ DeviceType::Tablet => "tablet",
+ DeviceType::VR => "vr",
+ DeviceType::TV => "tv",
+ }
+ }
+}
diff --git a/third_party/rust/sync15-traits/src/lib.rs b/third_party/rust/sync15-traits/src/lib.rs
new file mode 100644
index 0000000000..fc31d84e5e
--- /dev/null
+++ b/third_party/rust/sync15-traits/src/lib.rs
@@ -0,0 +1,26 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#![warn(rust_2018_idioms)]
+pub mod bridged_engine;
+mod changeset;
+pub mod client;
+mod payload;
+pub mod request;
+mod server_timestamp;
+mod store;
+pub mod telemetry;
+
+pub use bridged_engine::{ApplyResults, BridgedEngine, IncomingEnvelope, OutgoingEnvelope};
+pub use changeset::{IncomingChangeset, OutgoingChangeset, RecordChangeset};
+pub use payload::Payload;
+pub use request::{CollectionRequest, RequestOrder};
+pub use server_timestamp::ServerTimestamp;
+pub use store::{CollSyncIds, Store, StoreSyncAssociation};
+pub use sync_guid::Guid;
+
+// For skip_serializing_if
+pub(crate) fn skip_if_default<T: PartialEq + Default>(v: &T) -> bool {
+ *v == T::default()
+}
diff --git a/third_party/rust/sync15-traits/src/payload.rs b/third_party/rust/sync15-traits/src/payload.rs
new file mode 100644
index 0000000000..2e241774fe
--- /dev/null
+++ b/third_party/rust/sync15-traits/src/payload.rs
@@ -0,0 +1,154 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+use super::Guid;
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value as JsonValue};
+
+/// Represents the decrypted payload in a Bso. Provides a minimal layer of type
+/// safety to avoid double-encrypting.
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+pub struct Payload {
+ pub id: Guid,
+
+ #[serde(default)]
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ pub deleted: bool,
+
+ #[serde(flatten)]
+ pub data: Map<String, JsonValue>,
+}
+
+impl Payload {
+ pub fn new_tombstone(id: impl Into<Guid>) -> Payload {
+ Payload {
+ id: id.into(),
+ deleted: true,
+ data: Map::new(),
+ }
+ }
+
+ pub fn new_tombstone_with_ttl(id: impl Into<Guid>, ttl: u32) -> Payload {
+ let mut result = Payload::new_tombstone(id);
+ result.data.insert("ttl".into(), ttl.into());
+ result
+ }
+
+ #[inline]
+ pub fn with_sortindex(mut self, index: i32) -> Payload {
+ self.data.insert("sortindex".into(), index.into());
+ self
+ }
+
+ /// "Auto" fields are fields like 'sortindex' and 'ttl', which are:
+ ///
+ /// - Added to the payload automatically when deserializing if present on
+ /// the incoming BSO or envelope.
+ /// - Removed from the payload automatically and attached to the BSO or
+ /// envelope if present on the outgoing payload.
+ pub fn with_auto_field<T: Into<JsonValue>>(mut self, name: &str, v: Option<T>) -> Payload {
+ let old_value: Option<JsonValue> = if let Some(value) = v {
+ self.data.insert(name.into(), value.into())
+ } else {
+ self.data.remove(name)
+ };
+
+ // This is a little dubious, but it seems like if we have a e.g. `sortindex` field on the payload
+ // it's going to be a bug if we use it instead of the "real" sort index.
+ if let Some(old_value) = old_value {
+ log::warn!(
+ "Payload for record {} already contains 'automatic' field \"{}\"? \
+ Overwriting auto value: {} with 'real' value",
+ self.id,
+ name,
+ old_value,
+ );
+ }
+ self
+ }
+
+ pub fn take_auto_field<V>(&mut self, name: &str) -> Option<V>
+ where
+ for<'a> V: Deserialize<'a>,
+ {
+ let v = self.data.remove(name)?;
+ match serde_json::from_value(v) {
+ Ok(v) => Some(v),
+ Err(e) => {
+ log::error!(
+ "Automatic field {} exists on payload, but cannot be deserialized: {}",
+ name,
+ e
+ );
+ None
+ }
+ }
+ }
+
+ #[inline]
+ pub fn id(&self) -> &str {
+ &self.id[..]
+ }
+
+ #[inline]
+ pub fn is_tombstone(&self) -> bool {
+ self.deleted
+ }
+
+ pub fn from_json(value: JsonValue) -> Result<Payload, serde_json::Error> {
+ serde_json::from_value(value)
+ }
+
+ /// Deserializes the BSO payload into a specific record type `T`.
+ ///
+ /// BSO payloads are unstructured JSON objects, with string keys and
+ /// dynamically-typed values. `into_record` makes it more convenient to
+ /// work with payloads by converting them into data type-specific structs.
+ /// Your record type only needs to derive or implement `serde::Deserialize`;
+ /// Serde will take care of the rest.
+ ///
+ /// # Errors
+ ///
+ /// `into_record` returns errors for type mismatches. As an example, trying
+ /// to deserialize a string value from the payload into an integer field in
+ /// `T` will fail.
+ ///
+ /// If there's a chance that a field contains invalid or mistyped data,
+ /// you'll want to extract it from `payload.data` manually, instead of using
+ /// `into_record`. This has been seen in the wild: for example, `dateAdded`
+ /// for bookmarks can be either an integer or a string.
+ pub fn into_record<T>(self) -> Result<T, serde_json::Error>
+ where
+ for<'a> T: Deserialize<'a>,
+ {
+ serde_json::from_value(JsonValue::from(self))
+ }
+
+ pub fn from_record<T: Serialize>(v: T) -> Result<Payload, serde_json::Error> {
+ // TODO(issue #2588): This is kind of dumb, we do to_value and then
+ // from_value. In general a more strongly typed API would help us avoid
+ // this sort of thing... But also concretely this could probably be
+ // avoided? At least in some cases.
+ Ok(Payload::from_json(serde_json::to_value(v)?)?)
+ }
+
+ pub fn into_json_string(self) -> String {
+ serde_json::to_string(&JsonValue::from(self))
+ .expect("JSON.stringify failed, which shouldn't be possible")
+ }
+}
+
+impl From<Payload> for JsonValue {
+ fn from(cleartext: Payload) -> Self {
+ let Payload {
+ mut data,
+ id,
+ deleted,
+ } = cleartext;
+ data.insert("id".to_string(), JsonValue::String(id.into_string()));
+ if deleted {
+ data.insert("deleted".to_string(), JsonValue::Bool(true));
+ }
+ JsonValue::Object(data)
+ }
+}
diff --git a/third_party/rust/sync15-traits/src/request.rs b/third_party/rust/sync15-traits/src/request.rs
new file mode 100644
index 0000000000..1e51cf1e62
--- /dev/null
+++ b/third_party/rust/sync15-traits/src/request.rs
@@ -0,0 +1,175 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+use crate::{Guid, ServerTimestamp};
+use std::borrow::Cow;
+use url::{form_urlencoded as form, Url, UrlQuery};
+#[derive(Debug, Clone, PartialEq)]
+pub struct CollectionRequest {
+ pub collection: Cow<'static, str>,
+ pub full: bool,
+ pub ids: Option<Vec<Guid>>,
+ pub limit: usize,
+ pub older: Option<ServerTimestamp>,
+ pub newer: Option<ServerTimestamp>,
+ pub order: Option<RequestOrder>,
+ pub commit: bool,
+ pub batch: Option<String>,
+}
+
+impl CollectionRequest {
+ #[inline]
+ pub fn new<S>(collection: S) -> CollectionRequest
+ where
+ S: Into<Cow<'static, str>>,
+ {
+ CollectionRequest {
+ collection: collection.into(),
+ full: false,
+ ids: None,
+ limit: 0,
+ older: None,
+ newer: None,
+ order: None,
+ commit: false,
+ batch: None,
+ }
+ }
+
+ #[inline]
+ pub fn ids<V>(mut self, v: V) -> CollectionRequest
+ where
+ V: IntoIterator,
+ V::Item: Into<Guid>,
+ {
+ self.ids = Some(v.into_iter().map(|id| id.into()).collect());
+ self
+ }
+
+ #[inline]
+ pub fn full(mut self) -> CollectionRequest {
+ self.full = true;
+ self
+ }
+
+ #[inline]
+ pub fn older_than(mut self, ts: ServerTimestamp) -> CollectionRequest {
+ self.older = Some(ts);
+ self
+ }
+
+ #[inline]
+ pub fn newer_than(mut self, ts: ServerTimestamp) -> CollectionRequest {
+ self.newer = Some(ts);
+ self
+ }
+
+ #[inline]
+ pub fn sort_by(mut self, order: RequestOrder) -> CollectionRequest {
+ self.order = Some(order);
+ self
+ }
+
+ #[inline]
+ pub fn limit(mut self, num: usize) -> CollectionRequest {
+ self.limit = num;
+ self
+ }
+
+ #[inline]
+ pub fn batch(mut self, batch: Option<String>) -> CollectionRequest {
+ self.batch = batch;
+ self
+ }
+
+ #[inline]
+ pub fn commit(mut self, v: bool) -> CollectionRequest {
+ self.commit = v;
+ self
+ }
+
+ fn build_query(&self, pairs: &mut form::Serializer<'_, UrlQuery<'_>>) {
+ if self.full {
+ pairs.append_pair("full", "1");
+ }
+ if self.limit > 0 {
+ pairs.append_pair("limit", &self.limit.to_string());
+ }
+ if let Some(ids) = &self.ids {
+ // Most ids are 12 characters, and we comma separate them, so 13.
+ let mut buf = String::with_capacity(ids.len() * 13);
+ for (i, id) in ids.iter().enumerate() {
+ if i > 0 {
+ buf.push(',');
+ }
+ buf.push_str(id.as_str());
+ }
+ pairs.append_pair("ids", &buf);
+ }
+ if let Some(batch) = &self.batch {
+ pairs.append_pair("batch", &batch);
+ }
+ if self.commit {
+ pairs.append_pair("commit", "true");
+ }
+ if let Some(ts) = self.older {
+ pairs.append_pair("older", &ts.to_string());
+ }
+ if let Some(ts) = self.newer {
+ pairs.append_pair("newer", &ts.to_string());
+ }
+ if let Some(o) = self.order {
+ pairs.append_pair("sort", o.as_str());
+ }
+ pairs.finish();
+ }
+
+ pub fn build_url(&self, mut base_url: Url) -> Result<Url, UnacceptableBaseUrl> {
+ base_url
+ .path_segments_mut()
+ .map_err(|_| UnacceptableBaseUrl(()))?
+ .extend(&["storage", &self.collection]);
+ self.build_query(&mut base_url.query_pairs_mut());
+ // This is strange but just accessing query_pairs_mut makes you have
+ // a trailing question mark on your url. I don't think anything bad
+ // would happen here, but I don't know, and also, it looks dumb so
+ // I'd rather not have it.
+ if base_url.query() == Some("") {
+ base_url.set_query(None);
+ }
+ Ok(base_url)
+ }
+}
+#[derive(Debug)]
+pub struct UnacceptableBaseUrl(());
+
+impl std::fmt::Display for UnacceptableBaseUrl {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str("Storage server URL is not a base")
+ }
+}
+impl std::error::Error for UnacceptableBaseUrl {}
+
+#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub enum RequestOrder {
+ Oldest,
+ Newest,
+ Index,
+}
+
+impl RequestOrder {
+ #[inline]
+ pub fn as_str(self) -> &'static str {
+ match self {
+ RequestOrder::Oldest => "oldest",
+ RequestOrder::Newest => "newest",
+ RequestOrder::Index => "index",
+ }
+ }
+}
+
+impl std::fmt::Display for RequestOrder {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str(self.as_str())
+ }
+}
diff --git a/third_party/rust/sync15-traits/src/server_timestamp.rs b/third_party/rust/sync15-traits/src/server_timestamp.rs
new file mode 100644
index 0000000000..8a361dbdba
--- /dev/null
+++ b/third_party/rust/sync15-traits/src/server_timestamp.rs
@@ -0,0 +1,118 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+use std::time::Duration;
+
+/// Typesafe way to manage server timestamps without accidentally mixing them up with
+/// local ones.
+#[derive(Debug, Copy, Clone, PartialEq, PartialOrd, Default)]
+pub struct ServerTimestamp(pub i64);
+
+impl ServerTimestamp {
+ pub fn from_float_seconds(ts: f64) -> Self {
+ let rf = (ts * 1000.0).round();
+ if !rf.is_finite() || rf < 0.0 || rf >= i64::max_value() as f64 {
+ log::error!("Illegal timestamp: {}", ts);
+ ServerTimestamp(0)
+ } else {
+ ServerTimestamp(rf as i64)
+ }
+ }
+
+ pub fn from_millis(ts: i64) -> Self {
+ // Catch it in tests, but just complain and replace with 0 otherwise.
+ debug_assert!(ts >= 0, "Bad timestamp: {}", ts);
+ if ts >= 0 {
+ Self(ts)
+ } else {
+ log::error!("Illegal timestamp, substituting 0: {}", ts);
+ Self(0)
+ }
+ }
+}
+
+// This lets us use these in hyper header! blocks.
+impl std::str::FromStr for ServerTimestamp {
+ type Err = std::num::ParseFloatError;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ let val = f64::from_str(s)?;
+ Ok(Self::from_float_seconds(val))
+ }
+}
+
+impl std::fmt::Display for ServerTimestamp {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.0 as f64 / 1000.0)
+ }
+}
+
+impl ServerTimestamp {
+ pub const EPOCH: ServerTimestamp = ServerTimestamp(0);
+
+ /// Returns None if `other` is later than `self` (Duration may not represent
+ /// negative timespans in rust).
+ #[inline]
+ pub fn duration_since(self, other: ServerTimestamp) -> Option<Duration> {
+ let delta = self.0 - other.0;
+ if delta < 0 {
+ None
+ } else {
+ Some(Duration::from_millis(delta as u64))
+ }
+ }
+
+ /// Get the milliseconds for the timestamp.
+ #[inline]
+ pub fn as_millis(self) -> i64 {
+ self.0
+ }
+}
+
+impl serde::ser::Serialize for ServerTimestamp {
+ fn serialize<S: serde::ser::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
+ serializer.serialize_f64(self.0 as f64 / 1000.0)
+ }
+}
+
+impl<'de> serde::de::Deserialize<'de> for ServerTimestamp {
+ fn deserialize<D: serde::de::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
+ f64::deserialize(d).map(Self::from_float_seconds)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_server_timestamp() {
+ let t0 = ServerTimestamp(10_300_150);
+ let t1 = ServerTimestamp(10_100_050);
+ assert!(t1.duration_since(t0).is_none());
+ assert!(t0.duration_since(t1).is_some());
+ let dur = t0.duration_since(t1).unwrap();
+ assert_eq!(dur.as_secs(), 200);
+ assert_eq!(dur.subsec_nanos(), 100_000_000);
+ }
+
+ #[test]
+ fn test_serde() {
+ let ts = ServerTimestamp(123_456);
+
+ // test serialize
+ let ser = serde_json::to_string(&ts).unwrap();
+ assert_eq!("123.456".to_string(), ser);
+
+ // test deserialize of float
+ let ts: ServerTimestamp = serde_json::from_str(&ser).unwrap();
+ assert_eq!(ServerTimestamp(123_456), ts);
+
+ // test deserialize of whole number
+ let ts: ServerTimestamp = serde_json::from_str("123").unwrap();
+ assert_eq!(ServerTimestamp(123_000), ts);
+
+ // test deserialize of negative number
+ let ts: ServerTimestamp = serde_json::from_str("-123").unwrap();
+ assert_eq!(ServerTimestamp(0), ts);
+ }
+}
diff --git a/third_party/rust/sync15-traits/src/store.rs b/third_party/rust/sync15-traits/src/store.rs
new file mode 100644
index 0000000000..51a595473d
--- /dev/null
+++ b/third_party/rust/sync15-traits/src/store.rs
@@ -0,0 +1,97 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::{
+ client::ClientData, telemetry, CollectionRequest, Guid, IncomingChangeset, OutgoingChangeset,
+ ServerTimestamp,
+};
+use anyhow::Result;
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct CollSyncIds {
+ pub global: Guid,
+ pub coll: Guid,
+}
+
+/// Defines how a store is associated with Sync.
+#[derive(Debug, Clone, PartialEq)]
+pub enum StoreSyncAssociation {
+ /// This store is disconnected (although it may be connected in the future).
+ Disconnected,
+ /// Sync is connected, and has the following sync IDs.
+ Connected(CollSyncIds),
+}
+
+/// Low-level store functionality. Stores that need custom reconciliation logic
+/// should use this.
+///
+/// Different stores will produce errors of different types. To accommodate
+/// this, we force them all to return failure::Error.
+pub trait Store {
+ fn collection_name(&self) -> std::borrow::Cow<'static, str>;
+
+ /// Prepares the store for syncing. The tabs store currently uses this to
+ /// store the current list of clients, which it uses to look up device names
+ /// and types.
+ ///
+ /// Note that this method is only called by `sync_multiple`, and only if a
+ /// command processor is registered. In particular, `prepare_for_sync` will
+ /// not be called if the store is synced using `sync::synchronize` or
+ /// `sync_multiple::sync_multiple`. It _will_ be called if the store is
+ /// synced via the Sync Manager.
+ ///
+ /// TODO(issue #2590): This is pretty cludgey and will be hard to extend for
+ /// any case other than the tabs case. We should find another way to support
+ /// tabs...
+ fn prepare_for_sync(&self, _get_client_data: &dyn Fn() -> ClientData) -> Result<()> {
+ Ok(())
+ }
+
+ /// `inbound` is a vector to support the case where
+ /// `get_collection_requests` returned multiple requests. The changesets are
+ /// in the same order as the requests were -- e.g. if `vec![req_a, req_b]`
+ /// was returned from `get_collection_requests`, `inbound` will have the
+ /// results from `req_a` as its first index, and those from `req_b` as it's
+ /// second.
+ fn apply_incoming(
+ &self,
+ inbound: Vec<IncomingChangeset>,
+ telem: &mut telemetry::Engine,
+ ) -> Result<OutgoingChangeset>;
+
+ fn sync_finished(
+ &self,
+ new_timestamp: ServerTimestamp,
+ records_synced: Vec<Guid>,
+ ) -> Result<()>;
+
+ /// The store is responsible for building the collection request. Engines
+ /// typically will store a lastModified timestamp and use that to build a
+ /// request saying "give me full records since that date" - however, other
+ /// engines might do something fancier. This could even later be extended to
+ /// handle "backfills" etc
+ ///
+ /// To support more advanced use cases (e.g. remerge), multiple requests can
+ /// be returned here. The vast majority of engines will just want to return
+ /// zero or one item in their vector (zero is a valid optimization when the
+ /// server timestamp is the same as the engine last saw, one when it is not)
+ ///
+ /// Important: In the case when more than one collection is requested, it's
+ /// assumed the last one is the "canonical" one. (That is, it must be for
+ /// "this" collection, its timestamp is used to represent the sync, etc).
+ fn get_collection_requests(
+ &self,
+ server_timestamp: ServerTimestamp,
+ ) -> Result<Vec<CollectionRequest>>;
+
+ /// Get persisted sync IDs. If they don't match the global state we'll be
+ /// `reset()` with the new IDs.
+ fn get_sync_assoc(&self) -> Result<StoreSyncAssociation>;
+
+ /// Reset the store without wiping local data, ready for a "first sync".
+ /// `assoc` defines how this store is to be associated with sync.
+ fn reset(&self, assoc: &StoreSyncAssociation) -> Result<()>;
+
+ fn wipe(&self) -> Result<()>;
+}
diff --git a/third_party/rust/sync15-traits/src/telemetry.rs b/third_party/rust/sync15-traits/src/telemetry.rs
new file mode 100644
index 0000000000..a3609e3d6d
--- /dev/null
+++ b/third_party/rust/sync15-traits/src/telemetry.rs
@@ -0,0 +1,777 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+//! Manage recording sync telemetry. Assumes some external telemetry
+//! library/code which manages submitting.
+
+use std::collections::HashMap;
+use std::time;
+
+use serde::{ser, Serialize, Serializer};
+
+// A test helper, used by the many test modules below.
+#[cfg(test)]
+fn assert_json<T: ?Sized>(v: &T, expected: serde_json::Value)
+where
+ T: serde::Serialize,
+{
+ assert_eq!(
+ serde_json::to_value(&v).expect("should get a value"),
+ expected
+ );
+}
+
+/// What we record for 'when' and 'took' in a telemetry record.
+#[derive(Debug, Serialize)]
+struct WhenTook {
+ when: f64,
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ took: u64,
+}
+
+/// What we track while recording 'when' and 'took. It serializes as a WhenTook,
+/// except when .finished() hasn't been called, in which case it panics.
+#[derive(Debug)]
+enum Stopwatch {
+ Started(time::SystemTime, time::Instant),
+ Finished(WhenTook),
+}
+
+impl Default for Stopwatch {
+ fn default() -> Self {
+ Stopwatch::new()
+ }
+}
+
+impl Stopwatch {
+ fn new() -> Self {
+ Stopwatch::Started(time::SystemTime::now(), time::Instant::now())
+ }
+
+ // For tests we don't want real timestamps because we test against literals.
+ #[cfg(test)]
+ fn finished(&self) -> Self {
+ Stopwatch::Finished(WhenTook { when: 0.0, took: 0 })
+ }
+
+ #[cfg(not(test))]
+ fn finished(&self) -> Self {
+ match self {
+ Stopwatch::Started(st, si) => {
+ let std = st.duration_since(time::UNIX_EPOCH).unwrap_or_default();
+ let when = std.as_secs() as f64; // we don't want sub-sec accuracy. Do we need to write a float?
+
+ let sid = si.elapsed();
+ let took = sid.as_secs() * 1000 + (u64::from(sid.subsec_nanos()) / 1_000_000);
+ Stopwatch::Finished(WhenTook { when, took })
+ }
+ _ => {
+ unreachable!("can't finish twice");
+ }
+ }
+ }
+}
+
+impl Serialize for Stopwatch {
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ match self {
+ Stopwatch::Started(_, _) => Err(ser::Error::custom("StopWatch has not been finished")),
+ Stopwatch::Finished(c) => c.serialize(serializer),
+ }
+ }
+}
+
+#[cfg(test)]
+mod stopwatch_tests {
+ use super::*;
+
+ // A wrapper struct because we flatten - this struct should serialize with
+ // 'when' and 'took' keys (but with no 'sw'.)
+ #[derive(Debug, Serialize)]
+ struct WT {
+ #[serde(flatten)]
+ sw: Stopwatch,
+ }
+
+ #[test]
+ fn test_not_finished() {
+ let wt = WT {
+ sw: Stopwatch::new(),
+ };
+ serde_json::to_string(&wt).expect_err("unfinished stopwatch should fail");
+ }
+
+ #[test]
+ fn test() {
+ assert_json(
+ &WT {
+ sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 1 }),
+ },
+ serde_json::json!({"when": 1.0, "took": 1}),
+ );
+ assert_json(
+ &WT {
+ sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 0 }),
+ },
+ serde_json::json!({"when": 1.0}),
+ );
+ }
+}
+
+/// A generic "Event" - suitable for all kinds of pings (although this module
+/// only cares about the sync ping)
+#[derive(Debug, Serialize)]
+pub struct Event {
+ // We use static str references as we expect values to be literals.
+ object: &'static str,
+
+ method: &'static str,
+
+ // Maybe "value" should be a string?
+ #[serde(skip_serializing_if = "Option::is_none")]
+ value: Option<&'static str>,
+
+ // we expect the keys to be literals but values are real strings.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ extra: Option<HashMap<&'static str, String>>,
+}
+
+impl Event {
+ pub fn new(object: &'static str, method: &'static str) -> Self {
+ assert!(object.len() <= 20);
+ assert!(method.len() <= 20);
+ Self {
+ object,
+ method,
+ value: None,
+ extra: None,
+ }
+ }
+
+ pub fn value(mut self, v: &'static str) -> Self {
+ assert!(v.len() <= 80);
+ self.value = Some(v);
+ self
+ }
+
+ pub fn extra(mut self, key: &'static str, val: String) -> Self {
+ assert!(key.len() <= 15);
+ assert!(val.len() <= 85);
+ match self.extra {
+ None => self.extra = Some(HashMap::new()),
+ Some(ref e) => assert!(e.len() < 10),
+ }
+ self.extra.as_mut().unwrap().insert(key, val);
+ self
+ }
+}
+
+#[cfg(test)]
+mod test_events {
+ use super::*;
+
+ #[test]
+ #[should_panic]
+ fn test_invalid_length_ctor() {
+ Event::new("A very long object value", "Method");
+ }
+
+ #[test]
+ #[should_panic]
+ fn test_invalid_length_extra_key() {
+ Event::new("O", "M").extra("A very long key value", "v".to_string());
+ }
+
+ #[test]
+ #[should_panic]
+ fn test_invalid_length_extra_val() {
+ let l = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
+ abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ Event::new("O", "M").extra("k", l.to_string());
+ }
+
+ #[test]
+ #[should_panic]
+ fn test_too_many_extras() {
+ let l = "abcdefghijk";
+ let mut e = Event::new("Object", "Method");
+ for i in 0..l.len() {
+ e = e.extra(&l[i..=i], "v".to_string());
+ }
+ }
+
+ #[test]
+ fn test_json() {
+ assert_json(
+ &Event::new("Object", "Method").value("Value"),
+ serde_json::json!({"object": "Object", "method": "Method", "value": "Value"}),
+ );
+
+ assert_json(
+ &Event::new("Object", "Method").extra("one", "one".to_string()),
+ serde_json::json!({"object": "Object",
+ "method": "Method",
+ "extra": {"one": "one"}
+ }),
+ )
+ }
+}
+
+/// A Sync failure.
+#[derive(Debug, Serialize)]
+#[serde(tag = "name")]
+pub enum SyncFailure {
+ #[serde(rename = "shutdownerror")]
+ Shutdown,
+
+ #[serde(rename = "othererror")]
+ Other { error: String },
+
+ #[serde(rename = "unexpectederror")]
+ Unexpected { error: String },
+
+ #[serde(rename = "autherror")]
+ Auth { from: &'static str },
+
+ #[serde(rename = "httperror")]
+ Http { code: u16 },
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn reprs() {
+ assert_json(
+ &SyncFailure::Shutdown,
+ serde_json::json!({"name": "shutdownerror"}),
+ );
+
+ assert_json(
+ &SyncFailure::Other {
+ error: "dunno".to_string(),
+ },
+ serde_json::json!({"name": "othererror", "error": "dunno"}),
+ );
+
+ assert_json(
+ &SyncFailure::Unexpected {
+ error: "dunno".to_string(),
+ },
+ serde_json::json!({"name": "unexpectederror", "error": "dunno"}),
+ );
+
+ assert_json(
+ &SyncFailure::Auth { from: "FxA" },
+ serde_json::json!({"name": "autherror", "from": "FxA"}),
+ );
+
+ assert_json(
+ &SyncFailure::Http { code: 500 },
+ serde_json::json!({"name": "httperror", "code": 500}),
+ );
+ }
+}
+
+/// Incoming record for an engine's sync
+#[derive(Debug, Default, Serialize)]
+pub struct EngineIncoming {
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ applied: u32,
+
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ failed: u32,
+
+ #[serde(rename = "newFailed")]
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ new_failed: u32,
+
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ reconciled: u32,
+}
+
+impl EngineIncoming {
+ pub fn new() -> Self {
+ Self {
+ ..Default::default()
+ }
+ }
+
+ // A helper used via skip_serializing_if
+ fn is_empty(inc: &Option<Self>) -> bool {
+ match inc {
+ Some(a) => a.applied == 0 && a.failed == 0 && a.new_failed == 0 && a.reconciled == 0,
+ None => true,
+ }
+ }
+
+ /// Increment the value of `applied` by `n`.
+ #[inline]
+ pub fn applied(&mut self, n: u32) {
+ self.applied += n;
+ }
+
+ /// Increment the value of `failed` by `n`.
+ #[inline]
+ pub fn failed(&mut self, n: u32) {
+ self.failed += n;
+ }
+
+ /// Increment the value of `new_failed` by `n`.
+ #[inline]
+ pub fn new_failed(&mut self, n: u32) {
+ self.new_failed += n;
+ }
+
+ /// Increment the value of `reconciled` by `n`.
+ #[inline]
+ pub fn reconciled(&mut self, n: u32) {
+ self.reconciled += n;
+ }
+
+ /// Get the value of `applied`. Mostly useful for testing.
+ #[inline]
+ pub fn get_applied(&self) -> u32 {
+ self.applied
+ }
+
+ /// Get the value of `failed`. Mostly useful for testing.
+ #[inline]
+ pub fn get_failed(&self) -> u32 {
+ self.failed
+ }
+
+ /// Get the value of `new_failed`. Mostly useful for testing.
+ #[inline]
+ pub fn get_new_failed(&self) -> u32 {
+ self.new_failed
+ }
+
+ /// Get the value of `reconciled`. Mostly useful for testing.
+ #[inline]
+ pub fn get_reconciled(&self) -> u32 {
+ self.reconciled
+ }
+}
+
+/// Outgoing record for an engine's sync
+#[derive(Debug, Default, Serialize)]
+pub struct EngineOutgoing {
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ sent: usize,
+
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ failed: usize,
+}
+
+impl EngineOutgoing {
+ pub fn new() -> Self {
+ EngineOutgoing {
+ ..Default::default()
+ }
+ }
+
+ #[inline]
+ pub fn sent(&mut self, n: usize) {
+ self.sent += n;
+ }
+
+ #[inline]
+ pub fn failed(&mut self, n: usize) {
+ self.failed += n;
+ }
+}
+
+/// One engine's sync.
+#[derive(Debug, Serialize)]
+pub struct Engine {
+ name: String,
+
+ #[serde(flatten)]
+ when_took: Stopwatch,
+
+ #[serde(skip_serializing_if = "EngineIncoming::is_empty")]
+ incoming: Option<EngineIncoming>,
+
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ outgoing: Vec<EngineOutgoing>, // one for each batch posted.
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ #[serde(rename = "failureReason")]
+ failure: Option<SyncFailure>,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ validation: Option<Validation>,
+}
+
+impl Engine {
+ pub fn new(name: impl Into<String>) -> Self {
+ Self {
+ name: name.into(),
+ when_took: Stopwatch::new(),
+ incoming: None,
+ outgoing: Vec::new(),
+ failure: None,
+ validation: None,
+ }
+ }
+
+ pub fn incoming(&mut self, inc: EngineIncoming) {
+ assert!(self.incoming.is_none());
+ self.incoming = Some(inc);
+ }
+
+ pub fn outgoing(&mut self, out: EngineOutgoing) {
+ self.outgoing.push(out);
+ }
+
+ pub fn failure(&mut self, err: impl Into<SyncFailure>) {
+ // Currently we take the first error, under the assumption that the
+ // first is the most important and all others stem from that.
+ let failure = err.into();
+ if self.failure.is_none() {
+ self.failure = Some(failure);
+ } else {
+ log::warn!(
+ "engine already has recorded a failure of {:?} - ignoring {:?}",
+ &self.failure,
+ &failure
+ );
+ }
+ }
+
+ pub fn validation(&mut self, v: Validation) {
+ assert!(self.validation.is_none());
+ self.validation = Some(v);
+ }
+
+ fn finished(&mut self) {
+ self.when_took = self.when_took.finished();
+ }
+}
+
+#[derive(Debug, Default, Serialize)]
+pub struct Validation {
+ version: u32,
+
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ problems: Vec<Problem>,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ #[serde(rename = "failureReason")]
+ failure: Option<SyncFailure>,
+}
+
+impl Validation {
+ pub fn with_version(version: u32) -> Validation {
+ Validation {
+ version,
+ ..Validation::default()
+ }
+ }
+
+ pub fn problem(&mut self, name: &'static str, count: usize) -> &mut Self {
+ if count > 0 {
+ self.problems.push(Problem { name, count });
+ }
+ self
+ }
+}
+
+#[derive(Debug, Default, Serialize)]
+pub struct Problem {
+ name: &'static str,
+ #[serde(skip_serializing_if = "crate::skip_if_default")]
+ count: usize,
+}
+
+#[cfg(test)]
+mod engine_tests {
+ use super::*;
+
+ #[test]
+ fn test_engine() {
+ let mut e = Engine::new("test_engine");
+ e.finished();
+ assert_json(&e, serde_json::json!({"name": "test_engine", "when": 0.0}));
+ }
+
+ #[test]
+ fn test_engine_not_finished() {
+ let e = Engine::new("test_engine");
+ serde_json::to_value(&e).expect_err("unfinished stopwatch should fail");
+ }
+
+ #[test]
+ fn test_incoming() {
+ let mut i = EngineIncoming::new();
+ i.applied(1);
+ i.failed(2);
+ let mut e = Engine::new("TestEngine");
+ e.incoming(i);
+ e.finished();
+ assert_json(
+ &e,
+ serde_json::json!({"name": "TestEngine", "when": 0.0, "incoming": {"applied": 1, "failed": 2}}),
+ );
+ }
+
+ #[test]
+ fn test_outgoing() {
+ let mut o = EngineOutgoing::new();
+ o.sent(2);
+ o.failed(1);
+ let mut e = Engine::new("TestEngine");
+ e.outgoing(o);
+ e.finished();
+ assert_json(
+ &e,
+ serde_json::json!({"name": "TestEngine", "when": 0.0, "outgoing": [{"sent": 2, "failed": 1}]}),
+ );
+ }
+
+ #[test]
+ fn test_failure() {
+ let mut e = Engine::new("TestEngine");
+ e.failure(SyncFailure::Http { code: 500 });
+ e.finished();
+ assert_json(
+ &e,
+ serde_json::json!({"name": "TestEngine",
+ "when": 0.0,
+ "failureReason": {"name": "httperror", "code": 500}
+ }),
+ );
+ }
+
+ #[test]
+ fn test_raw() {
+ let mut e = Engine::new("TestEngine");
+ let mut inc = EngineIncoming::new();
+ inc.applied(10);
+ e.incoming(inc);
+ let mut out = EngineOutgoing::new();
+ out.sent(1);
+ e.outgoing(out);
+ e.failure(SyncFailure::Http { code: 500 });
+ e.finished();
+
+ assert_eq!(e.outgoing.len(), 1);
+ assert_eq!(e.incoming.as_ref().unwrap().applied, 10);
+ assert_eq!(e.outgoing[0].sent, 1);
+ assert!(e.failure.is_some());
+ serde_json::to_string(&e).expect("should get json");
+ }
+}
+
+/// A single sync. May have many engines, may have its own failure.
+#[derive(Debug, Serialize, Default)]
+pub struct SyncTelemetry {
+ #[serde(flatten)]
+ when_took: Stopwatch,
+
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ engines: Vec<Engine>,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ #[serde(rename = "failureReason")]
+ failure: Option<SyncFailure>,
+}
+
+impl SyncTelemetry {
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ pub fn engine(&mut self, mut e: Engine) {
+ e.finished();
+ self.engines.push(e);
+ }
+
+ pub fn failure(&mut self, failure: SyncFailure) {
+ assert!(self.failure.is_none());
+ self.failure = Some(failure);
+ }
+
+ // Note that unlike other 'finished' methods, this isn't private - someone
+ // needs to explicitly call this before handling the json payload to
+ // whatever ends up submitting it.
+ pub fn finished(&mut self) {
+ self.when_took = self.when_took.finished();
+ }
+}
+
+#[cfg(test)]
+mod sync_tests {
+ use super::*;
+
+ #[test]
+ fn test_accum() {
+ let mut s = SyncTelemetry::new();
+ let mut inc = EngineIncoming::new();
+ inc.applied(10);
+ let mut e = Engine::new("test_engine");
+ e.incoming(inc);
+ e.failure(SyncFailure::Http { code: 500 });
+ e.finished();
+ s.engine(e);
+ s.finished();
+
+ assert_json(
+ &s,
+ serde_json::json!({
+ "when": 0.0,
+ "engines": [{
+ "name":"test_engine",
+ "when":0.0,
+ "incoming": {
+ "applied": 10
+ },
+ "failureReason": {
+ "name": "httperror",
+ "code": 500
+ }
+ }]
+ }),
+ );
+ }
+
+ #[test]
+ fn test_multi_engine() {
+ let mut inc_e1 = EngineIncoming::new();
+ inc_e1.applied(1);
+ let mut e1 = Engine::new("test_engine");
+ e1.incoming(inc_e1);
+
+ let mut inc_e2 = EngineIncoming::new();
+ inc_e2.failed(1);
+ let mut e2 = Engine::new("test_engine_2");
+ e2.incoming(inc_e2);
+ let mut out_e2 = EngineOutgoing::new();
+ out_e2.sent(1);
+ e2.outgoing(out_e2);
+
+ let mut s = SyncTelemetry::new();
+ s.engine(e1);
+ s.engine(e2);
+ s.failure(SyncFailure::Http { code: 500 });
+ s.finished();
+ assert_json(
+ &s,
+ serde_json::json!({
+ "when": 0.0,
+ "engines": [{
+ "name": "test_engine",
+ "when": 0.0,
+ "incoming": {
+ "applied": 1
+ }
+ },{
+ "name": "test_engine_2",
+ "when": 0.0,
+ "incoming": {
+ "failed": 1
+ },
+ "outgoing": [{
+ "sent": 1
+ }]
+ }],
+ "failureReason": {
+ "name": "httperror",
+ "code": 500
+ }
+ }),
+ );
+ }
+}
+
+/// The Sync ping payload, as documented at
+/// https://firefox-source-docs.mozilla.org/toolkit/components/telemetry/telemetry/data/sync-ping.html.
+/// May have many syncs, may have many events. However, due to the architecture
+/// of apps which use these components, this payload is almost certainly not
+/// suitable for submitting directly. For example, we will always return a
+/// payload with exactly 1 sync, and it will not know certain other fields
+/// in the payload, such as the *hashed* FxA device ID (see
+/// https://searchfox.org/mozilla-central/rev/c3ebaf6de2d481c262c04bb9657eaf76bf47e2ac/services/sync/modules/browserid_identity.js#185
+/// for an example of how the device ID is constructed). The intention is that
+/// consumers of this will use this to create a "real" payload - eg, accumulating
+/// until some threshold number of syncs is reached, and contributing
+/// additional data which only the consumer knows.
+#[derive(Debug, Serialize, Default)]
+pub struct SyncTelemetryPing {
+ version: u32,
+
+ uid: Option<String>,
+
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ events: Vec<Event>,
+
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ syncs: Vec<SyncTelemetry>,
+}
+
+impl SyncTelemetryPing {
+ pub fn new() -> Self {
+ Self {
+ version: 1,
+ ..Default::default()
+ }
+ }
+
+ pub fn uid(&mut self, uid: String) {
+ if let Some(ref existing) = self.uid {
+ if *existing != uid {
+ log::warn!("existing uid ${} being replaced by {}", existing, uid);
+ }
+ }
+ self.uid = Some(uid);
+ }
+
+ pub fn sync(&mut self, mut s: SyncTelemetry) {
+ s.finished();
+ self.syncs.push(s);
+ }
+
+ pub fn event(&mut self, e: Event) {
+ self.events.push(e);
+ }
+}
+
+ffi_support::implement_into_ffi_by_json!(SyncTelemetryPing);
+
+#[cfg(test)]
+mod ping_tests {
+ use super::*;
+ #[test]
+ fn test_ping() {
+ let engine = Engine::new("test");
+ let mut s = SyncTelemetry::new();
+ s.engine(engine);
+ let mut p = SyncTelemetryPing::new();
+ p.uid("user-id".into());
+ p.sync(s);
+ let event = Event::new("foo", "bar");
+ p.event(event);
+ assert_json(
+ &p,
+ serde_json::json!({
+ "events": [{
+ "method": "bar", "object": "foo"
+ }],
+ "syncs": [{
+ "engines": [{
+ "name": "test", "when": 0.0
+ }],
+ "when": 0.0
+ }],
+ "uid": "user-id",
+ "version": 1
+ }),
+ );
+ }
+}