From 43a97878ce14b72f0981164f87f2e35e14151312 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 11:22:09 +0200 Subject: Adding upstream version 110.0.1. Signed-off-by: Daniel Baumann --- third_party/rust/sync15/.cargo-checksum.json | 1 + third_party/rust/sync15/Cargo.toml | 66 ++ third_party/rust/sync15/README.md | 128 +++ third_party/rust/sync15/src/bso/content.rs | 388 +++++++ third_party/rust/sync15/src/bso/crypto.rs | 197 ++++ third_party/rust/sync15/src/bso/mod.rs | 204 ++++ third_party/rust/sync15/src/bso/test_utils.rs | 61 + third_party/rust/sync15/src/client/coll_state.rs | 354 ++++++ third_party/rust/sync15/src/client/coll_update.rs | 137 +++ .../rust/sync15/src/client/collection_keys.rs | 61 + third_party/rust/sync15/src/client/mod.rs | 39 + third_party/rust/sync15/src/client/request.rs | 1199 ++++++++++++++++++++ third_party/rust/sync15/src/client/state.rs | 1089 ++++++++++++++++++ third_party/rust/sync15/src/client/status.rs | 106 ++ .../rust/sync15/src/client/storage_client.rs | 587 ++++++++++ third_party/rust/sync15/src/client/sync.rs | 105 ++ .../rust/sync15/src/client/sync_multiple.rs | 493 ++++++++ third_party/rust/sync15/src/client/token.rs | 602 ++++++++++ third_party/rust/sync15/src/client/util.rs | 102 ++ third_party/rust/sync15/src/client_types.rs | 101 ++ .../rust/sync15/src/clients_engine/engine.rs | 814 +++++++++++++ third_party/rust/sync15/src/clients_engine/mod.rs | 93 ++ .../rust/sync15/src/clients_engine/record.rs | 124 ++ third_party/rust/sync15/src/clients_engine/ser.rs | 125 ++ third_party/rust/sync15/src/enc_payload.rs | 110 ++ .../rust/sync15/src/engine/bridged_engine.rs | 121 ++ third_party/rust/sync15/src/engine/changeset.rs | 42 + third_party/rust/sync15/src/engine/mod.rs | 36 + third_party/rust/sync15/src/engine/request.rs | 113 ++ third_party/rust/sync15/src/engine/sync_engine.rs | 235 ++++ third_party/rust/sync15/src/error.rs | 138 +++ third_party/rust/sync15/src/key_bundle.rs | 224 ++++ third_party/rust/sync15/src/lib.rs | 40 + third_party/rust/sync15/src/record_types.rs | 51 + third_party/rust/sync15/src/server_timestamp.rs | 122 ++ third_party/rust/sync15/src/telemetry.rs | 824 ++++++++++++++ 36 files changed, 9232 insertions(+) create mode 100644 third_party/rust/sync15/.cargo-checksum.json create mode 100644 third_party/rust/sync15/Cargo.toml create mode 100644 third_party/rust/sync15/README.md create mode 100644 third_party/rust/sync15/src/bso/content.rs create mode 100644 third_party/rust/sync15/src/bso/crypto.rs create mode 100644 third_party/rust/sync15/src/bso/mod.rs create mode 100644 third_party/rust/sync15/src/bso/test_utils.rs create mode 100644 third_party/rust/sync15/src/client/coll_state.rs create mode 100644 third_party/rust/sync15/src/client/coll_update.rs create mode 100644 third_party/rust/sync15/src/client/collection_keys.rs create mode 100644 third_party/rust/sync15/src/client/mod.rs create mode 100644 third_party/rust/sync15/src/client/request.rs create mode 100644 third_party/rust/sync15/src/client/state.rs create mode 100644 third_party/rust/sync15/src/client/status.rs create mode 100644 third_party/rust/sync15/src/client/storage_client.rs create mode 100644 third_party/rust/sync15/src/client/sync.rs create mode 100644 third_party/rust/sync15/src/client/sync_multiple.rs create mode 100644 third_party/rust/sync15/src/client/token.rs create mode 100644 third_party/rust/sync15/src/client/util.rs create mode 100644 third_party/rust/sync15/src/client_types.rs create mode 100644 third_party/rust/sync15/src/clients_engine/engine.rs create mode 100644 third_party/rust/sync15/src/clients_engine/mod.rs create mode 100644 third_party/rust/sync15/src/clients_engine/record.rs create mode 100644 third_party/rust/sync15/src/clients_engine/ser.rs create mode 100644 third_party/rust/sync15/src/enc_payload.rs create mode 100644 third_party/rust/sync15/src/engine/bridged_engine.rs create mode 100644 third_party/rust/sync15/src/engine/changeset.rs create mode 100644 third_party/rust/sync15/src/engine/mod.rs create mode 100644 third_party/rust/sync15/src/engine/request.rs create mode 100644 third_party/rust/sync15/src/engine/sync_engine.rs create mode 100644 third_party/rust/sync15/src/error.rs create mode 100644 third_party/rust/sync15/src/key_bundle.rs create mode 100644 third_party/rust/sync15/src/lib.rs create mode 100644 third_party/rust/sync15/src/record_types.rs create mode 100644 third_party/rust/sync15/src/server_timestamp.rs create mode 100644 third_party/rust/sync15/src/telemetry.rs (limited to 'third_party/rust/sync15') diff --git a/third_party/rust/sync15/.cargo-checksum.json b/third_party/rust/sync15/.cargo-checksum.json new file mode 100644 index 0000000000..c9e16591e8 --- /dev/null +++ b/third_party/rust/sync15/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"Cargo.toml":"1f11acaa90a112979205b4c7af9ba0c015afab5f3141dd082d58c862c84490e3","README.md":"6d4ff5b079ac5340d18fa127f583e7ad793c5a2328b8ecd12c3fc723939804f2","src/bso/content.rs":"d2d650f4932e9a7068a25dd7df0085b92cd8976a0635320e6ae306d5a425075c","src/bso/crypto.rs":"27602dcccb37d3a55620ee4e16b705da455d49af575de115c7c79c0178eb1d6d","src/bso/mod.rs":"09e723dc7e99295ecafdcadffaf604d66ea27cf2b7f1fd9ab3cac4f4698ff6a7","src/bso/test_utils.rs":"4ec5a2df5e1c0ec14dc770681e959bdcef6ef04f6fde435999197f46a8ae4831","src/client/coll_state.rs":"b0c47e44168ea2c7017cd8531f76bb230f9be66b119bb7416537b8693a1d0a0a","src/client/coll_update.rs":"cc12dfde0817eae68aa8e176497ed16e9e3307f72a33faa3fe329d7a3bfd1598","src/client/collection_keys.rs":"c27b2277a3a52033b58ab01490fc2ea7007494195dd5e6dc2c6931a4ca96795a","src/client/mod.rs":"9500b1d22a5064bbbd6a3d6bcc63fc4191e8ea4605ded359bc6c2dc2887626a3","src/client/request.rs":"8841524e37d8195867bdf6ba98c75f610cf47a4644adeebd6372cc6713f2260a","src/client/state.rs":"4e31193ef2471c1dfabf1c6a391bcb95e14ddb45855786a4194ff187d5c9347c","src/client/status.rs":"f445a8765dac9789444e23b5145148413407bb1d18a15ef56682243997f591bf","src/client/storage_client.rs":"3637b4522048353b06ad24031c150c66c13d9c27cef293e400db88807421633c","src/client/sync.rs":"ed7225c314df27793ed5de6da93cc4b75a98da1c14ac82e37a723a99821d4dc7","src/client/sync_multiple.rs":"3729d4afd90ab1bd9982a3506252c99d8f37619cc1792ef4feba352ad01a7192","src/client/token.rs":"b268759d31e0fe17e0e2a428694cd9a317fcfbdd52f023d5d8c7cc6f00f1a102","src/client/util.rs":"71cc70ee41f821f53078675e636e9fad9c6046fa1a989e37f5487e340a2277d6","src/client_types.rs":"c53e6fa8e9d5c7b56a87c6803ec3fc808d471b1d8c20c0fbb4ec0c02571b21ba","src/clients_engine/engine.rs":"ba9f8efc068392c3ecfc7241d6ddd96912036da3e497ea6920c6085ba9e537bb","src/clients_engine/mod.rs":"461729e6f89b66b2cbd89b041a03d4d6a8ba582284ed4f3015cb13e1a0c6da97","src/clients_engine/record.rs":"50bfa33610581dce97f6e6973e18dbcdbf7520f3d48f4d71b6ba04eb0a4ffa1e","src/clients_engine/ser.rs":"9796e44ed7daf04f22afbb51238ac25fd0de1438b72181351b4ca29fd70fd429","src/enc_payload.rs":"aa3eea7df49b24cd59831680a47c417b73a3e36e6b0f3f4baf14ca66bd68be6b","src/engine/bridged_engine.rs":"9c0d602b3553932e77a87caba9262d3a0fc146500c6d46f1770273be6636d064","src/engine/changeset.rs":"5e323aa07f0b18d22495a695b829326d18287ff75155b4818adf66b86e16ba00","src/engine/mod.rs":"f84a254642c1876fe56506703fb010a7866eb5d40af3fc238bf92b62a61cb6cc","src/engine/request.rs":"f40bac0b3f5286446a4056de885fd81e4fa77e4dc7d5bbb6aa644b93201046de","src/engine/sync_engine.rs":"5314d0163ccc93d78f5879d52cf2b60b9622e80722d84d3482cfa7c26df6bfdd","src/error.rs":"a45cfe02e6301f473c34678b694943c1a04308b8c292c6e0448bf495194c3b5e","src/key_bundle.rs":"56b67ef12d7cb2afca540cf3c29f1748418bbbb023f9b663344cf28fdc2e8766","src/lib.rs":"41c2171b0e1a96adfd56682ca90bd4ac59fe9390a6872f85128948bdb53a0d42","src/record_types.rs":"02bb3d352fb808131d298f9b90d9c95b7e9e0138b97c5401f3b9fdacc5562f44","src/server_timestamp.rs":"0020f31971ccbfc485894cabc3087459d42252b86d7de07f2136997864b0373b","src/telemetry.rs":"3471aaaaca275496ec6880723e076ce39b44fb351ca88e53fe63750a43255c33"},"package":null} \ No newline at end of file diff --git a/third_party/rust/sync15/Cargo.toml b/third_party/rust/sync15/Cargo.toml new file mode 100644 index 0000000000..6ef5a2d8a8 --- /dev/null +++ b/third_party/rust/sync15/Cargo.toml @@ -0,0 +1,66 @@ +[package] +name = "sync15" +edition = "2021" +version = "0.1.0" +authors = ["application-services@mozilla.com"] +license = "MPL-2.0" +exclude = ["/android", "/ios"] + +[features] +# The default feature is what mozilla-central wants (we are listed in a `[patch]` section in +# in m-c's top-level Cargo.toml to make updating easier, but you can't specify features there) +default = ["sync-engine"] + +random-guid = ["sync-guid/random"] + +# Some consumers of this just need our encrypted payloads and no other sync functionality. +crypto = ["rc_crypto", "base16", "base64"] + +# Some crates need to implement a "sync engine", but aren't a "sync client" (ie, their +# engine is used by a "sync client".) Engines don't interact directly with the storage servers, +# nor do they do their own crypto. +# See the rustdocs in `crate::engine` for more information about engines. +sync-engine = ["random-guid"] + +# Some crates are a "sync client" and do full management/initialization of server storage, +# keys, etc and sync one or more engines. This crate has an engine to manage the "clients" +# collection, so needs the sync-engine feature. +# See the rustdocs in `crate::client` for more information about clients. +sync-client = ["sync-engine", "crypto", "viaduct", "url"] + +# Some crates just do their own engine but need to pretend they are a client, +# eg, iOS pre-sync-manager. +# Consider places: +# * It always is going to need to supply a "sync-engine". +# * When used in iOS, due to the lack of sync_manager support, it also needs to +# supply a kind of "sync-client". It is *not* necessary to supply this for Android. +# In a perfect world: +# * places would also have a feature called, say, "sync-client" +# * The code needed for iOS would be behind the "sync-client" feature. +# * The ios megazord would enable the "sync-client" feature, but the android megazord would not. +# +# However, that's not yet the case. This "stand-alone" sync feature is used by crates +# such as places, and is really a marker to help identify the crates which should be +# upgraded to make the sync-client part truly optional. +standalone-sync = ["sync-client"] + +[dependencies] +anyhow = "1.0" +base16 = { version = "0.2", optional = true } +base64 = { version = "0.13", optional = true } +error-support = { path = "../support/error" } +ffi-support = "0.4" +interrupt-support = { path = "../support/interrupt" } +lazy_static = "1.4" +log = "0.4" +rc_crypto = { path = "../support/rc_crypto", features = ["hawk"], optional = true } +serde = { version = "1", features = ["derive"] } +serde_derive = "1" +serde_json = "1" +sync-guid = { path = "../support/guid", features = ["random"] } +thiserror = "1.0" +url = { version = "2.1", optional = true } # mozilla-central can't yet take 2.2 (see bug 1734538) +viaduct = { path = "../viaduct", optional = true } + +[dev-dependencies] +env_logger = { version = "0.7", default-features = false } diff --git a/third_party/rust/sync15/README.md b/third_party/rust/sync15/README.md new file mode 100644 index 0000000000..a638435908 --- /dev/null +++ b/third_party/rust/sync15/README.md @@ -0,0 +1,128 @@ +# Low-level sync-1.5 helper component + +This component contains utility code to be shared between different +data stores that want to sync against a Firefox Sync v1.5 sync server. +It handles things like encrypting/decrypting records, obtaining and +using storage node auth tokens, and so-on. + +There are 2 key concepts to understand here - the implementation itself, and +a rust trait for a "syncable store" where component-specific logic lives - but +before we dive into them, some preamble might help put things into context. + +## Nomenclature + +* The term "store" is generally used as the interface to the database - ie, the + thing that gets and saves items. It can also be seen as supplying the API + used by most consumers of the component. Note that the "places" component + is alone in using the term "api" for this object. + +* The term "engine" (or ideally, "sync engine") is used for the thing that + actually does the syncing for a store. Sync engines implement the SyncEngine + trait - the trait is either implemented directly by a store, or a new object + that has a reference to a store. + +## Introduction and History + +For many years Sync has worked exclusively against a "sync v1.5 server". This +[is a REST API described here](https://mozilla-services.readthedocs.io/en/latest/storage/apis-1.5.html). +The important part is that the API is conceptually quite simple - there are +arbitrary "collections" containing "records" indexed by a GUID, and lacking +traditonal database concepts like joins. Because the record is encrypted, +there's very little scope for the server to be much smarter. Thus it's +reasonably easy to create a fairly generic abstraction over the API that can be +easily reused. + +Back in the deep past, we found ourselves with 2 different components that +needed to sync against a sync v1.5 server. The apps using these components +didn't have schedulers or any UI for choosing what to sync - so these +components just looked at the existing state of the engines on the server and +synced if they were enabled. + +This was also pre-megazord - the idea was that apps could choose from a "menu" +of components to include - so we didn't really want to bind these components +together. Therefore, there was no concept of "sync all" - instead, each of the +components had to be synced individually. So this component started out as more +of a "library" than a "component" which individual components could reuse - and +each of these components was a "syncable store" (ie, a store which could supply + a "sync engine"). + +Fast forward to Fenix and we needed a UI for managing all the engines supported +there, and a single "sync now" experience etc - so we also have a sync_manager +component - [see its README for more](../components/sync_manager/README.md). +But even though it exists, there are still some parts of this component that +reflect these early days - for example, it's still possible to sync just a +single component using sync15 (ie, without going via the "sync manager"), +although this isn't used and should be removed - the "sync manager" allows you +to choose which engines to sync, so that should be used exclusively. + +## Metadata + +There's some metadata associated with a sync. Some of the metadata is "global" +to the app (eg, the enabled state of engines, information about what servers to +use, etc) and some is specific to an engine (eg, timestamp of the +server's collection for this engine, guids for the collections, etc). + +We made the decision early on that no storage should be done by this +component: + +* The "global" metadata should be stored by the application - but because it + doesn't need to interpret the data, we do this with an opaque string (that + is JSON, but the app should never assume or introspect that) + +* Each engine should store its own metadata, so we don't end up in the + situation where, say, a database is moved between profiles causing the + metadata to refer to a completely different data set. So each engine + stores its metadata in the same database as the data itself, so if the + database is moved or copied, the metadata comes with it) + +## Sync Implementation + +The core implementation does all of the interaction with things like the +tokenserver, the `meta/global` and `info/collections` collections, etc. It +does all network interaction (ie, individual engines don't need to interact with +the network at all), tracks things like whether the server is asking us to +"backoff" due to operational concerns, manages encryption keys and the +encryption itself, etc. The general flow of a sync - which interacts with the +`SyncEngine` trait - is: + +* Does all pre-sync setup, such as checking `meta/global`, and whether the + sync IDs on the server match the sync IDs we last saw (ie, to check whether + something drastic has happened since we last synced) +* Asks the engine about how to formulate the URL query params to obtain the + records the engine cares about. In most cases, this will simply be "records + since the last modified timestamp of the last sync". +* Downloads and decrypts these records. +* Passes these records to the engine for processing, and obtains records that + should be uploaded to the server. +* Encrypts these outgoing records and uploads them. +* Tells the engine about the result of the upload (ie, the last-modified + timestamp of the POST so it can be saved as engine metadata) + +As above, the sync15 component really only deals with a single engine at a time. +See the "sync manager" for how multiple engine are managed (but the tl;dr is +that the "sync manager" leans on this very heavily, but knows about multiple +engine and manages shared state) + +## The `SyncEngine` trait + +The SyncEngine trait is where all logic specific to a collection lives. A "sync +engine" implements (or provides) this trait to implement actual syncing. + +For reasons, it actually lives in the +[sync-traits helper](https://github.com/mozilla/application-services/blob/main/components/support/sync15-traits/src/engine.rs) +but for the purposes of this document, you should consider it as owned by sync15. + +This is actually quite a simple trait - at a high level, it's really just +concerned with: + +* Get or set some metadata the sync15 component has decided should be saved or + fetched. + +* In a normal sync, take some "incoming" records, process them, and return + the "outgoing" records we should send to the server. + +* In some edge-cases, either "wipe" (ie, actually delete everything, which + almost never happens) or "reset" (ie, pretend this engine has never before + been synced) + +And that's it! diff --git a/third_party/rust/sync15/src/bso/content.rs b/third_party/rust/sync15/src/bso/content.rs new file mode 100644 index 0000000000..f7aa6f608b --- /dev/null +++ b/third_party/rust/sync15/src/bso/content.rs @@ -0,0 +1,388 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +//! This module enhances the IncomingBso and OutgoingBso records to deal with +//! arbitrary types, which we call "content" +//! It can: +//! * Parse JSON into some while handling tombstones and invalid json. +//! * Turn arbitrary objects with an `id` field into an OutgoingBso. + +use super::{IncomingBso, IncomingContent, IncomingKind, OutgoingBso, OutgoingEnvelope}; +use crate::Guid; +use error_support::report_error; +use serde::Serialize; + +// The only errors we return here are serde errors. +type Result = std::result::Result; + +impl IncomingContent { + /// Returns Some(content) if [self.kind] is [IncomingKind::Content], None otherwise. + pub fn content(self) -> Option { + match self.kind { + IncomingKind::Content(t) => Some(t), + _ => None, + } + } +} + +// We don't want to force our T to be Debug, but we can be Debug if T is. +impl std::fmt::Debug for IncomingKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + IncomingKind::Content(r) => { + write!(f, "IncomingKind::Content<{:?}>", r) + } + IncomingKind::Tombstone => write!(f, "IncomingKind::Tombstone"), + IncomingKind::Malformed => write!(f, "IncomingKind::Malformed"), + } + } +} + +impl IncomingBso { + /// Convert an [IncomingBso] to an [IncomingContent] possibly holding a T. + pub fn into_content serde::Deserialize<'de>>(self) -> IncomingContent { + match serde_json::from_str(&self.payload) { + Ok(json) => { + // We got a good serde_json::Value, see if it's a . + let kind = json_to_kind(json, &self.envelope.id); + IncomingContent { + envelope: self.envelope, + kind, + } + } + Err(e) => { + // payload isn't valid json. + log::warn!("Invalid incoming cleartext {}: {}", self.envelope.id, e); + IncomingContent { + envelope: self.envelope, + kind: IncomingKind::Malformed, + } + } + } + } +} + +impl OutgoingBso { + /// Creates a new tombstone record. + /// Not all collections expect tombstones. + pub fn new_tombstone(envelope: OutgoingEnvelope) -> Self { + Self { + envelope, + payload: serde_json::json!({"deleted": true}).to_string(), + } + } + + /// Creates a outgoing record from some , which can be made into a JSON object + /// with a valid `id`. This is the most convenient way to create an outgoing + /// item from a when the default envelope is suitable. + /// Will panic if there's no good `id` in the json. + pub fn from_content_with_id(record: T) -> Result + where + T: Serialize, + { + let (json, id) = content_with_id_to_json(record)?; + Ok(Self { + envelope: id.into(), + payload: serde_json::to_string(&json)?, + }) + } + + /// Create an Outgoing record with an explicit envelope. Will panic if the + /// payload has an ID but it doesn't match the envelope. + pub fn from_content(envelope: OutgoingEnvelope, record: T) -> Result + where + T: Serialize, + { + let json = content_to_json(record, &envelope.id)?; + Ok(Self { + envelope, + payload: serde_json::to_string(&json)?, + }) + } +} + +// Helpers for packing and unpacking serde objects to and from a . In particular: +// * Helping deal complications around raw json payload not having 'id' (the envelope is +// canonical) but needing it to exist when dealing with serde locally. +// For example, a record on the server after being decrypted looks like: +// `{"id": "a-guid", payload: {"field": "value"}}` +// But the `T` for this typically looks like `struct T { id: Guid, field: String}` +// So before we try and deserialize this record into a T, we copy the `id` field +// from the envelope into the payload, and when serializing from a T we do the +// reverse (ie, ensure the `id` in the payload is removed and placed in the envelope) +// * Tombstones. + +// Deserializing json into a T +fn json_to_kind(mut json: serde_json::Value, id: &Guid) -> IncomingKind +where + T: for<'de> serde::Deserialize<'de>, +{ + // It's possible that the payload does not carry 'id', but always does - so grab it from the + // envelope and put it into the json before deserializing the record. + if let serde_json::Value::Object(ref mut map) = json { + if map.contains_key("deleted") { + return IncomingKind::Tombstone; + } + match map.get("id") { + Some(serde_json::Value::String(content_id)) => { + // It exists in the payload! We treat a mismatch as malformed. + if content_id != id { + log::trace!( + "malformed incoming record: envelope id: {} payload id: {}", + content_id, + id + ); + report_error!( + "incoming-invalid-mismatched-ids", + "Envelope and payload don't agree on the ID" + ); + return IncomingKind::Malformed; + } + if !id.is_valid_for_sync_server() { + log::trace!("malformed incoming record: id is not valid: {}", id); + report_error!( + "incoming-invalid-bad-payload-id", + "ID in the payload is invalid" + ); + return IncomingKind::Malformed; + } + } + Some(v) => { + // It exists in the payload but is not a string - they can't possibly be + // the same as the envelope uses a String, so must be malformed. + log::trace!("malformed incoming record: id is not a string: {}", v); + report_error!("incoming-invalid-wrong_type", "ID is not a string"); + return IncomingKind::Malformed; + } + None => { + // Doesn't exist in the payload - add it before trying to deser a T. + if !id.is_valid_for_sync_server() { + log::trace!("malformed incoming record: id is not valid: {}", id); + report_error!( + "incoming-invalid-bad-envelope-id", + "ID in envelope is not valid" + ); + return IncomingKind::Malformed; + } + map.insert("id".to_string(), id.to_string().into()); + } + } + }; + match serde_json::from_value(json) { + Ok(v) => IncomingKind::Content(v), + Err(e) => { + report_error!("invalid-incoming-content", "Invalid incoming T: {}", e); + IncomingKind::Malformed + } + } +} + +// Serializing into json with special handling of `id` (the `id` from the payload +// is used as the envelope ID) +fn content_with_id_to_json(record: T) -> Result<(serde_json::Value, Guid)> +where + T: Serialize, +{ + let mut json = serde_json::to_value(record)?; + let id = match json.as_object_mut() { + Some(ref mut map) => { + match map.get("id").as_ref().and_then(|v| v.as_str()) { + Some(id) => { + let id: Guid = id.into(); + assert!(id.is_valid_for_sync_server(), "record's ID is invalid"); + id + } + // In practice, this is a "static" error and not influenced by runtime behavior + None => panic!("record does not have an ID in the payload"), + } + } + None => panic!("record is not a json object"), + }; + Ok((json, id)) +} + +// Serializing into json with special handling of `id` (if `id` in serialized +// JSON already exists, we panic if it doesn't match the envelope. If the serialized +// content does not have an `id`, it is added from the envelope) +// is used as the envelope ID) +fn content_to_json(record: T, id: &Guid) -> Result +where + T: Serialize, +{ + let mut payload = serde_json::to_value(record)?; + if let Some(ref mut map) = payload.as_object_mut() { + if let Some(content_id) = map.get("id").as_ref().and_then(|v| v.as_str()) { + assert_eq!(content_id, id); + assert!(id.is_valid_for_sync_server(), "record's ID is invalid"); + } else { + map.insert("id".to_string(), serde_json::Value::String(id.to_string())); + } + }; + Ok(payload) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::bso::IncomingBso; + use serde::{Deserialize, Serialize}; + use serde_json::json; + + #[derive(Default, Debug, PartialEq, Serialize, Deserialize)] + struct TestStruct { + id: Guid, + data: u32, + } + #[test] + fn test_content_deser() { + env_logger::try_init().ok(); + let json = json!({ + "id": "test", + "payload": json!({"data": 1}).to_string(), + }); + let incoming: IncomingBso = serde_json::from_value(json).unwrap(); + assert_eq!(incoming.envelope.id, "test"); + let record = incoming.into_content::().content().unwrap(); + let expected = TestStruct { + id: Guid::new("test"), + data: 1, + }; + assert_eq!(record, expected); + } + + #[test] + fn test_content_deser_empty_id() { + env_logger::try_init().ok(); + let json = json!({ + "id": "", + "payload": json!({"data": 1}).to_string(), + }); + let incoming: IncomingBso = serde_json::from_value(json).unwrap(); + // The envelope has an invalid ID, but it's not handled until we try and deserialize + // it into a T + assert_eq!(incoming.envelope.id, ""); + let content = incoming.into_content::(); + assert!(matches!(content.kind, IncomingKind::Malformed)); + } + + #[test] + fn test_content_deser_invalid() { + env_logger::try_init().ok(); + // And a non-empty but still invalid guid. + let json = json!({ + "id": "X".repeat(65), + "payload": json!({"data": 1}).to_string(), + }); + let incoming: IncomingBso = serde_json::from_value(json).unwrap(); + let content = incoming.into_content::(); + assert!(matches!(content.kind, IncomingKind::Malformed)); + } + + #[test] + fn test_content_deser_not_string() { + env_logger::try_init().ok(); + // A non-string id. + let json = json!({ + "id": "0", + "payload": json!({"id": 0, "data": 1}).to_string(), + }); + let incoming: IncomingBso = serde_json::from_value(json).unwrap(); + let content = incoming.into_content::(); + assert!(matches!(content.kind, IncomingKind::Malformed)); + } + + #[test] + fn test_content_ser_with_id() { + env_logger::try_init().ok(); + // When serializing, expect the ID to be in the top-level payload (ie, + // in the envelope) but should not appear in the cleartext `payload` part of + // the payload. + let val = TestStruct { + id: Guid::new("test"), + data: 1, + }; + let outgoing = OutgoingBso::from_content_with_id(val).unwrap(); + + // The envelope should have our ID. + assert_eq!(outgoing.envelope.id, Guid::new("test")); + + // and make sure `cleartext` part of the payload the data and the id. + let ct_value = serde_json::from_str::(&outgoing.payload).unwrap(); + assert_eq!(ct_value, json!({"data": 1, "id": "test"})); + } + + #[test] + fn test_content_ser_with_envelope() { + env_logger::try_init().ok(); + // When serializing, expect the ID to be in the top-level payload (ie, + // in the envelope) but should not appear in the cleartext `payload` + let val = TestStruct { + id: Guid::new("test"), + data: 1, + }; + let envelope: OutgoingEnvelope = Guid::new("test").into(); + let outgoing = OutgoingBso::from_content(envelope, val).unwrap(); + + // The envelope should have our ID. + assert_eq!(outgoing.envelope.id, Guid::new("test")); + + // and make sure `cleartext` part of the payload has data and the id. + let ct_value = serde_json::from_str::(&outgoing.payload).unwrap(); + assert_eq!(ct_value, json!({"data": 1, "id": "test"})); + } + + #[test] + #[should_panic] + fn test_content_ser_no_ids() { + env_logger::try_init().ok(); + #[derive(Serialize)] + struct StructWithNoId { + data: u32, + } + let val = StructWithNoId { data: 1 }; + let _ = OutgoingBso::from_content_with_id(val); + } + + #[test] + #[should_panic] + fn test_content_ser_not_object() { + env_logger::try_init().ok(); + let _ = OutgoingBso::from_content_with_id(json!("string")); + } + + #[test] + #[should_panic] + fn test_content_ser_mismatched_ids() { + env_logger::try_init().ok(); + let val = TestStruct { + id: Guid::new("test"), + data: 1, + }; + let envelope: OutgoingEnvelope = Guid::new("different").into(); + let _ = OutgoingBso::from_content(envelope, val); + } + + #[test] + #[should_panic] + fn test_content_empty_id() { + env_logger::try_init().ok(); + let val = TestStruct { + id: Guid::new(""), + data: 1, + }; + let _ = OutgoingBso::from_content_with_id(val); + } + + #[test] + #[should_panic] + fn test_content_invalid_id() { + env_logger::try_init().ok(); + let val = TestStruct { + id: Guid::new(&"X".repeat(65)), + data: 1, + }; + let _ = OutgoingBso::from_content_with_id(val); + } +} diff --git a/third_party/rust/sync15/src/bso/crypto.rs b/third_party/rust/sync15/src/bso/crypto.rs new file mode 100644 index 0000000000..d572c4692b --- /dev/null +++ b/third_party/rust/sync15/src/bso/crypto.rs @@ -0,0 +1,197 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! Support for "encrypted bso"s, as received by the storage servers. +//! This module decrypts them into IncomingBso's suitable for use by the +//! engines. +use super::{IncomingBso, IncomingEnvelope, OutgoingBso, OutgoingEnvelope}; +use crate::error; +use crate::key_bundle::KeyBundle; +use crate::EncryptedPayload; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +// The BSO implementation we use for encrypted payloads. +// Note that this is almost identical to the IncomingBso implementations, except +// instead of a String payload we use an EncryptedPayload. Obviously we *could* +// just use a String payload and transform it into an EncryptedPayload - any maybe we +// should - but this is marginally optimal in terms of deserialization. +#[derive(Deserialize, Debug)] +pub struct IncomingEncryptedBso { + #[serde(flatten)] + pub envelope: IncomingEnvelope, + #[serde( + with = "as_json", + bound(deserialize = "EncryptedPayload: DeserializeOwned") + )] + pub(crate) payload: EncryptedPayload, +} + +impl IncomingEncryptedBso { + pub fn new(envelope: IncomingEnvelope, payload: EncryptedPayload) -> Self { + Self { envelope, payload } + } + /// Decrypt a BSO, consuming it into a clear-text version. + pub fn into_decrypted(self, key: &KeyBundle) -> error::Result { + Ok(IncomingBso::new(self.envelope, self.payload.decrypt(key)?)) + } +} + +#[derive(Serialize, Debug)] +pub struct OutgoingEncryptedBso { + #[serde(flatten)] + pub envelope: OutgoingEnvelope, + #[serde(with = "as_json", bound(serialize = "EncryptedPayload: Serialize"))] + payload: EncryptedPayload, +} + +impl OutgoingEncryptedBso { + pub fn new(envelope: OutgoingEnvelope, payload: EncryptedPayload) -> Self { + Self { envelope, payload } + } + + #[inline] + pub fn serialized_payload_len(&self) -> usize { + self.payload.serialized_len() + } +} + +impl OutgoingBso { + pub fn into_encrypted(self, key: &KeyBundle) -> error::Result { + Ok(OutgoingEncryptedBso { + envelope: self.envelope, + payload: EncryptedPayload::from_cleartext(key, self.payload)?, + }) + } +} + +// The BSOs we write to the servers expect a "payload" attribute which is a JSON serialized +// string, rather than the JSON representation of the object. +// ie, the serialized object is expected to look like: +// `{"id": "some-guid", "payload": "{\"IV\": ... }"}` <-- payload is a string. +// However, if we just serialize it directly, we end up with: +// `{"id": "some-guid", "payload": {"IV": ... }}` <-- payload is an object. +// The magic here means we can serialize and deserialize directly into/from the object, correctly +// working with the payload as a string, instead of needing to explicitly stringify/parse the +// payload as an extra step. +// +// This would work for any , but we only use it for EncryptedPayload - the way our cleartext +// BSOs work mean it's not necessary there as they define the payload as a String - ie, they do +// explicitly end up doing 2 JSON operations as an ergonomic design choice. +mod as_json { + use serde::de::{self, Deserialize, DeserializeOwned, Deserializer}; + use serde::ser::{self, Serialize, Serializer}; + + pub fn serialize(t: &T, serializer: S) -> Result + where + T: Serialize, + S: Serializer, + { + let j = serde_json::to_string(t).map_err(ser::Error::custom)?; + serializer.serialize_str(&j) + } + + pub fn deserialize<'de, T, D>(deserializer: D) -> Result + where + T: DeserializeOwned, + D: Deserializer<'de>, + { + let j = String::deserialize(deserializer)?; + serde_json::from_str(&j).map_err(de::Error::custom) + } +} + +// Lots of stuff for testing the sizes of encrypted records, because the servers have +// certain limits in terms of max-POST sizes, forcing us to chunk uploads, but +// we need to calculate based on encrypted record size rather than the raw size. +// +// This is a little cludgey but I couldn't think of another way to have easy deserialization +// without a bunch of wrapper types, while still only serializing a single time in the +// postqueue. +#[cfg(test)] +impl OutgoingEncryptedBso { + /// Return the length of the serialized payload. + pub fn payload_serialized_len(&self) -> usize { + self.payload.serialized_len() + } + + // self.payload is private, but tests want to create funky things. + // XXX - test only, but test in another crate :( + //#[cfg(test)] + pub fn make_test_bso(ciphertext: String) -> Self { + Self { + envelope: OutgoingEnvelope { + id: "".into(), + sortindex: None, + ttl: None, + }, + payload: EncryptedPayload { + iv: "".into(), + hmac: "".into(), + ciphertext, + }, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::bso::OutgoingEnvelope; + + #[test] + fn test_deserialize_enc() { + let serialized = r#"{ + "id": "1234", + "collection": "passwords", + "modified": 12344321.0, + "payload": "{\"IV\": \"aaaaa\", \"hmac\": \"bbbbb\", \"ciphertext\": \"ccccc\"}" + }"#; + let record: IncomingEncryptedBso = serde_json::from_str(serialized).unwrap(); + assert_eq!(&record.envelope.id, "1234"); + assert_eq!((record.envelope.modified.0 - 12_344_321_000).abs(), 0); + assert_eq!(record.envelope.sortindex, None); + assert_eq!(&record.payload.iv, "aaaaa"); + assert_eq!(&record.payload.hmac, "bbbbb"); + assert_eq!(&record.payload.ciphertext, "ccccc"); + } + + #[test] + fn test_deserialize_autofields() { + let serialized = r#"{ + "id": "1234", + "collection": "passwords", + "modified": 12344321.0, + "sortindex": 100, + "ttl": 99, + "payload": "{\"IV\": \"aaaaa\", \"hmac\": \"bbbbb\", \"ciphertext\": \"ccccc\"}" + }"#; + let record: IncomingEncryptedBso = serde_json::from_str(serialized).unwrap(); + assert_eq!(record.envelope.sortindex, Some(100)); + assert_eq!(record.envelope.ttl, Some(99)); + } + + #[test] + fn test_serialize_enc() { + let goal = r#"{"id":"1234","payload":"{\"IV\":\"aaaaa\",\"hmac\":\"bbbbb\",\"ciphertext\":\"ccccc\"}"}"#; + let record = OutgoingEncryptedBso { + envelope: OutgoingEnvelope { + id: "1234".into(), + ..Default::default() + }, + payload: EncryptedPayload { + iv: "aaaaa".into(), + hmac: "bbbbb".into(), + ciphertext: "ccccc".into(), + }, + }; + let actual = serde_json::to_string(&record).unwrap(); + assert_eq!(actual, goal); + + let val_str_payload: serde_json::Value = serde_json::from_str(goal).unwrap(); + assert_eq!( + val_str_payload["payload"].as_str().unwrap().len(), + record.payload.serialized_len() + ) + } +} diff --git a/third_party/rust/sync15/src/bso/mod.rs b/third_party/rust/sync15/src/bso/mod.rs new file mode 100644 index 0000000000..251c11fb3b --- /dev/null +++ b/third_party/rust/sync15/src/bso/mod.rs @@ -0,0 +1,204 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/// This module defines our core "bso" abstractions. +/// In the terminology of this crate: +/// * "bso" is an acronym for "basic storage object" and used extensively in the sync server docs. +/// the record always has a well-defined "envelope" with metadata (eg, the ID of the record, +/// the server timestamp of the resource, etc) and a field called `payload`. +/// A bso is serialized to and from JSON. +/// * There's a "cleartext" bso: +/// ** The payload is a String, which itself is JSON encoded (ie, this string `payload` is +/// always double JSON encoded in a server record) +/// ** This supplies helper methods for working with the "content" (some arbitrary ) in the +/// payload. +/// * There's an "encrypted" bso +/// ** The payload is an [crate::enc_payload::EncryptedPayload] +/// ** Only clients use this; as soon as practical we decrypt and as late as practical we encrypt +/// to and from encrypted bsos. +/// ** The encrypted bsos etc are all in the [crypto] module and require the `crypto` feature. +/// +/// Let's look at some real-world examples: +/// # meta/global +/// A "bso" (ie, record with an "envelope" and a "payload" with a JSON string) - but the payload +/// is cleartext. +/// ```json +/// { +/// "id":"global", +/// "modified":1661564513.50, +/// "payload": "{\"syncID\":\"p1z5_oDdOfLF\",\"storageVersion\":5,\"engines\":{\"passwords\":{\"version\":1,\"syncID\":\"6Y6JJkB074cF\"} /* snip */},\"declined\":[]}" +/// }``` +/// +/// # encrypted bsos: +/// Encrypted BSOs are still a "bso" (ie, a record with a field names `payload` which is a string) +/// but the payload is in the form of an EncryptedPayload. +/// For example, crypto/keys: +/// ```json +/// { +/// "id":"keys", +/// "modified":1661564513.74, +/// "payload":"{\"IV\":\"snip-base-64==\",\"hmac\":\"snip-hex\",\"ciphertext\":\"snip-base64==\"}" +/// }``` +/// (Note that as described above, most code working with bsos *do not* use that `payload` +/// directly, but instead a decrypted cleartext bso. +/// +/// Note all collection responses are the same shape as `crypto/keys` - a `payload` field with a +/// JSON serialized EncryptedPayload, it's just that the final content differs for each +/// collection (eg, tabs and bookmarks have quite different s JSON-encoded in the +/// String payload.) +/// +/// For completeness, some other "non-BSO" records - no "id", "modified" or "payload" fields in +/// the response, just plain-old clear-text JSON. +/// # Example +/// ## `info/collections` +/// ```json +/// { +/// "bookmarks":1661564648.65, +/// "meta":1661564513.50, +/// "addons":1661564649.09, +/// "clients":1661564643.57, +/// ... +/// }``` +/// ## `info/configuration` +/// ```json +/// { +/// "max_post_bytes":2097152, +/// "max_post_records":100, +/// "max_record_payload_bytes":2097152, +/// ... +/// }``` +/// +/// Given our definitions above, these are not any kind of "bso", so are +/// not relevant to this module +use crate::{Guid, ServerTimestamp}; +use serde::{Deserialize, Serialize}; + +#[cfg(feature = "crypto")] +mod crypto; +#[cfg(feature = "crypto")] +pub use crypto::{IncomingEncryptedBso, OutgoingEncryptedBso}; + +mod content; + +// A feature for this would be ideal, but (a) the module is small and (b) it +// doesn't really fit the "features" model for sync15 to have a dev-dependency +// against itself but with a different feature set. +pub mod test_utils; + +/// An envelope for an incoming item. Envelopes carry all the metadata for +/// a Sync BSO record (`id`, `modified`, `sortindex`), *but not* the payload +/// itself. +#[derive(Debug, Clone, Deserialize)] +pub struct IncomingEnvelope { + /// The ID of the record. + pub id: Guid, + // If we don't give it a default, a small handful of tests fail. + // XXX - we should probably fix the tests and kill this? + #[serde(default = "ServerTimestamp::default")] + pub modified: ServerTimestamp, + pub sortindex: Option, + pub ttl: Option, +} + +/// An envelope for an outgoing item. This is conceptually identical to +/// [IncomingEnvelope], but omits fields that are only set by the server, +/// like `modified`. +#[derive(Debug, Default, Clone, Serialize)] +pub struct OutgoingEnvelope { + /// The ID of the record. + pub id: Guid, + #[serde(skip_serializing_if = "Option::is_none")] + pub sortindex: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub ttl: Option, +} + +/// Allow an outgoing envelope to be constructed with just a guid when default +/// values for the other fields are OK. +impl From for OutgoingEnvelope { + fn from(id: Guid) -> Self { + OutgoingEnvelope { + id, + ..Default::default() + } + } +} + +/// IncomingBso's can come from: +/// * Directly from the server (ie, some records aren't encrypted, such as meta/global) +/// * From environments where the encryption is done externally (eg, Rust syncing in Desktop +/// Firefox has the encryption/decryption done by Firefox and the cleartext BSOs are passed in. +/// * Read from the server as an EncryptedBso; see EncryptedBso description above. +#[derive(Deserialize, Debug)] +pub struct IncomingBso { + #[serde(flatten)] + pub envelope: IncomingEnvelope, + // payload is public for some edge-cases in some components, but in general, + // you should use into_content<> to get a record out of it. + pub payload: String, +} + +impl IncomingBso { + pub fn new(envelope: IncomingEnvelope, payload: String) -> Self { + Self { envelope, payload } + } +} + +#[derive(Serialize, Debug)] +pub struct OutgoingBso { + #[serde(flatten)] + pub envelope: OutgoingEnvelope, + // payload is public for some edge-cases in some components, but in general, + // you should use into_content<> to get a record out of it. + pub payload: String, +} + +impl OutgoingBso { + /// Most consumers will use `self.from_content` and `self.from_content_with_id` + /// but this exists for the few consumers for whom that doesn't make sense. + pub fn new( + envelope: OutgoingEnvelope, + val: &T, + ) -> Result { + Ok(Self { + envelope, + payload: serde_json::to_string(&val)?, + }) + } +} + +/// We also have the concept of "content", which helps work with a `T` which +/// is represented inside the payload. Real-world examples of a `T` include +/// Bookmarks or Tabs. +/// See the content module for the implementations. +/// +/// So this all flows together in the following way: +/// * Incoming encrypted data: +/// EncryptedIncomingBso -> IncomingBso -> [specific engine] -> IncomingContent +/// * Incoming cleartext data: +/// IncomingBso -> IncomingContent +/// (Note that incoming cleartext only happens for a few collections managed by +/// the sync client and never by specific engines - engine BSOs are always encryted) +/// * Outgoing encrypted data: +/// OutgoingBso (created in the engine) -> [this crate] -> EncryptedOutgoingBso +/// * Outgoing cleartext data: just an OutgoingBso with no conversions needed. + +/// [IncomingContent] is the result of converting an [IncomingBso] into +/// some - it consumes the Bso, so you get the envelope, and the [IncomingKind] +/// which reflects the state of parsing the json. +#[derive(Debug)] +pub struct IncomingContent { + pub envelope: IncomingEnvelope, + pub kind: IncomingKind, +} + +/// The "kind" of incoming content after deserializing it. +pub enum IncomingKind { + /// A good, live T. + Content(T), + /// A record that used to be a T but has been replaced with a tombstone. + Tombstone, + /// Either not JSON, or can't be made into a T. + Malformed, +} diff --git a/third_party/rust/sync15/src/bso/test_utils.rs b/third_party/rust/sync15/src/bso/test_utils.rs new file mode 100644 index 0000000000..55735afda2 --- /dev/null +++ b/third_party/rust/sync15/src/bso/test_utils.rs @@ -0,0 +1,61 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! Utilities for tests to make IncomingBsos and Content from test data. +use super::{IncomingBso, IncomingEnvelope, OutgoingBso}; +use crate::{Guid, ServerTimestamp}; + +/// Tests often want an IncomingBso to test, and the easiest way is often to +/// create an OutgoingBso convert it back to an incoming. +impl OutgoingBso { + // These functions would ideally consume `self` and avoid the clones, but + // this is more convenient for some tests and the extra overhead doesn't + // really matter for tests. + /// When a test has an [OutgoingBso] and wants it as an [IncomingBso] + pub fn to_test_incoming(&self) -> IncomingBso { + self.to_test_incoming_ts(ServerTimestamp::default()) + } + + /// When a test has an [OutgoingBso] and wants it as an [IncomingBso] with a specific timestamp. + pub fn to_test_incoming_ts(&self, ts: ServerTimestamp) -> IncomingBso { + IncomingBso { + envelope: IncomingEnvelope { + id: self.envelope.id.clone(), + modified: ts, + sortindex: self.envelope.sortindex, + ttl: self.envelope.ttl, + }, + payload: self.payload.clone(), + } + } + + /// When a test has an [OutgoingBso] and wants it as an [IncomingBso] with a specific T. + pub fn to_test_incoming_t serde::Deserialize<'de>>(&self) -> T { + self.to_test_incoming().into_content().content().unwrap() + } +} + +/// Helpers to create an IncomingBso from some T +impl IncomingBso { + /// When a test has an T and wants it as an [IncomingBso] + pub fn from_test_content(json: T) -> Self { + // Go via an OutgoingBso + OutgoingBso::from_content_with_id(json) + .unwrap() + .to_test_incoming() + } + + /// When a test has an T and wants it as an [IncomingBso] with a specific timestamp. + pub fn from_test_content_ts(json: T, ts: ServerTimestamp) -> Self { + // Go via an OutgoingBso + OutgoingBso::from_content_with_id(json) + .unwrap() + .to_test_incoming_ts(ts) + } + + /// When a test wants a new incoming tombstone. + pub fn new_test_tombstone(guid: Guid) -> Self { + OutgoingBso::new_tombstone(guid.into()).to_test_incoming() + } +} diff --git a/third_party/rust/sync15/src/client/coll_state.rs b/third_party/rust/sync15/src/client/coll_state.rs new file mode 100644 index 0000000000..df8be5f5b5 --- /dev/null +++ b/third_party/rust/sync15/src/client/coll_state.rs @@ -0,0 +1,354 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use super::request::InfoConfiguration; +use super::{CollectionKeys, GlobalState}; +use crate::engine::{CollSyncIds, EngineSyncAssociation, SyncEngine}; +use crate::error; +use crate::KeyBundle; +use crate::ServerTimestamp; + +/// Holds state for a collection. In general, only the CollState is +/// needed to sync a collection (but a valid GlobalState is needed to obtain +/// a CollState) +#[derive(Debug, Clone)] +pub struct CollState { + pub config: InfoConfiguration, + // initially from meta/global, updated after an xius POST/PUT. + pub last_modified: ServerTimestamp, + pub key: KeyBundle, +} + +#[derive(Debug)] +pub enum LocalCollState { + /// The state is unknown, with the EngineSyncAssociation the collection + /// reports. + Unknown { assoc: EngineSyncAssociation }, + + /// The engine has been declined. This is a "terminal" state. + Declined, + + /// There's no such collection in meta/global. We could possibly update + /// meta/global, but currently all known collections are there by default, + /// so this is, basically, an error condition. + NoSuchCollection, + + /// Either the global or collection sync ID has changed - we will reset the engine. + SyncIdChanged { ids: CollSyncIds }, + + /// The collection is ready to sync. + Ready { key: KeyBundle }, +} + +pub struct LocalCollStateMachine<'state> { + global_state: &'state GlobalState, + root_key: &'state KeyBundle, +} + +impl<'state> LocalCollStateMachine<'state> { + fn advance( + &self, + from: LocalCollState, + engine: &dyn SyncEngine, + ) -> error::Result { + let name = &engine.collection_name().to_string(); + let meta_global = &self.global_state.global; + match from { + LocalCollState::Unknown { assoc } => { + if meta_global.declined.contains(name) { + return Ok(LocalCollState::Declined); + } + match meta_global.engines.get(name) { + Some(engine_meta) => match assoc { + EngineSyncAssociation::Disconnected => Ok(LocalCollState::SyncIdChanged { + ids: CollSyncIds { + global: meta_global.sync_id.clone(), + coll: engine_meta.sync_id.clone(), + }, + }), + EngineSyncAssociation::Connected(ref ids) + if ids.global == meta_global.sync_id + && ids.coll == engine_meta.sync_id => + { + let coll_keys = CollectionKeys::from_encrypted_payload( + self.global_state.keys.clone(), + self.global_state.keys_timestamp, + self.root_key, + )?; + Ok(LocalCollState::Ready { + key: coll_keys.key_for_collection(name).clone(), + }) + } + _ => Ok(LocalCollState::SyncIdChanged { + ids: CollSyncIds { + global: meta_global.sync_id.clone(), + coll: engine_meta.sync_id.clone(), + }, + }), + }, + None => Ok(LocalCollState::NoSuchCollection), + } + } + + LocalCollState::Declined => unreachable!("can't advance from declined"), + + LocalCollState::NoSuchCollection => unreachable!("the collection is unknown"), + + LocalCollState::SyncIdChanged { ids } => { + let assoc = EngineSyncAssociation::Connected(ids); + log::info!("Resetting {} engine", engine.collection_name()); + engine.reset(&assoc)?; + Ok(LocalCollState::Unknown { assoc }) + } + + LocalCollState::Ready { .. } => unreachable!("can't advance from ready"), + } + } + + // A little whimsy - a portmanteau of far and fast + fn run_and_run_as_farst_as_you_can( + &mut self, + engine: &dyn SyncEngine, + ) -> error::Result> { + let mut s = LocalCollState::Unknown { + assoc: engine.get_sync_assoc()?, + }; + // This is a simple state machine and should never take more than + // 10 goes around. + let mut count = 0; + loop { + log::trace!("LocalCollState in {:?}", s); + match s { + LocalCollState::Ready { key } => { + let name = engine.collection_name(); + let config = self.global_state.config.clone(); + let last_modified = self + .global_state + .collections + .get(name.as_ref()) + .cloned() + .unwrap_or_default(); + return Ok(Some(CollState { + config, + last_modified, + key, + })); + } + LocalCollState::Declined | LocalCollState::NoSuchCollection => return Ok(None), + + _ => { + count += 1; + if count > 10 { + log::warn!("LocalCollStateMachine appears to be looping"); + return Ok(None); + } + // should we have better loop detection? Our limit of 10 + // goes is probably OK for now, but not really ideal. + s = self.advance(s, engine)?; + } + }; + } + } + + pub fn get_state( + engine: &dyn SyncEngine, + global_state: &'state GlobalState, + root_key: &'state KeyBundle, + ) -> error::Result> { + let mut gingerbread_man = Self { + global_state, + root_key, + }; + gingerbread_man.run_and_run_as_farst_as_you_can(engine) + } +} + +#[cfg(test)] +mod tests { + use super::super::request::{InfoCollections, InfoConfiguration}; + use super::super::CollectionKeys; + use super::*; + use crate::engine::CollectionRequest; + use crate::engine::{IncomingChangeset, OutgoingChangeset}; + use crate::record_types::{MetaGlobalEngine, MetaGlobalRecord}; + use crate::telemetry; + use anyhow::Result; + use std::cell::{Cell, RefCell}; + use std::collections::HashMap; + use sync_guid::Guid; + + fn get_global_state(root_key: &KeyBundle) -> GlobalState { + let keys = CollectionKeys::new_random() + .unwrap() + .to_encrypted_payload(root_key) + .unwrap(); + GlobalState { + config: InfoConfiguration::default(), + collections: InfoCollections::new(HashMap::new()), + global: MetaGlobalRecord { + sync_id: "syncIDAAAAAA".into(), + storage_version: 5usize, + engines: vec![( + "bookmarks", + MetaGlobalEngine { + version: 1usize, + sync_id: "syncIDBBBBBB".into(), + }, + )] + .into_iter() + .map(|(key, value)| (key.to_owned(), value)) + .collect(), + declined: vec![], + }, + global_timestamp: ServerTimestamp::default(), + keys, + keys_timestamp: ServerTimestamp::default(), + } + } + + struct TestSyncEngine { + collection_name: &'static str, + assoc: Cell, + num_resets: RefCell, + } + + impl TestSyncEngine { + fn new(collection_name: &'static str, assoc: EngineSyncAssociation) -> Self { + Self { + collection_name, + assoc: Cell::new(assoc), + num_resets: RefCell::new(0), + } + } + fn get_num_resets(&self) -> usize { + *self.num_resets.borrow() + } + } + + impl SyncEngine for TestSyncEngine { + fn collection_name(&self) -> std::borrow::Cow<'static, str> { + self.collection_name.into() + } + + fn apply_incoming( + &self, + _inbound: Vec, + _telem: &mut telemetry::Engine, + ) -> Result { + unreachable!("these tests shouldn't call these"); + } + + fn sync_finished( + &self, + _new_timestamp: ServerTimestamp, + _records_synced: Vec, + ) -> Result<()> { + unreachable!("these tests shouldn't call these"); + } + + fn get_collection_requests( + &self, + _server_timestamp: ServerTimestamp, + ) -> Result> { + unreachable!("these tests shouldn't call these"); + } + + fn get_sync_assoc(&self) -> Result { + Ok(self.assoc.replace(EngineSyncAssociation::Disconnected)) + } + + fn reset(&self, new_assoc: &EngineSyncAssociation) -> Result<()> { + self.assoc.replace(new_assoc.clone()); + *self.num_resets.borrow_mut() += 1; + Ok(()) + } + + fn wipe(&self) -> Result<()> { + unreachable!("these tests shouldn't call these"); + } + } + + #[test] + fn test_unknown() { + let root_key = KeyBundle::new_random().expect("should work"); + let gs = get_global_state(&root_key); + let engine = TestSyncEngine::new("unknown", EngineSyncAssociation::Disconnected); + let cs = LocalCollStateMachine::get_state(&engine, &gs, &root_key).expect("should work"); + assert!(cs.is_none(), "unknown collection name can't sync"); + assert_eq!(engine.get_num_resets(), 0); + } + + #[test] + fn test_known_no_state() { + let root_key = KeyBundle::new_random().expect("should work"); + let gs = get_global_state(&root_key); + let engine = TestSyncEngine::new("bookmarks", EngineSyncAssociation::Disconnected); + let cs = LocalCollStateMachine::get_state(&engine, &gs, &root_key).expect("should work"); + assert!(cs.is_some(), "collection can sync"); + assert_eq!( + engine.assoc.replace(EngineSyncAssociation::Disconnected), + EngineSyncAssociation::Connected(CollSyncIds { + global: "syncIDAAAAAA".into(), + coll: "syncIDBBBBBB".into(), + }) + ); + assert_eq!(engine.get_num_resets(), 1); + } + + #[test] + fn test_known_wrong_state() { + let root_key = KeyBundle::new_random().expect("should work"); + let gs = get_global_state(&root_key); + let engine = TestSyncEngine::new( + "bookmarks", + EngineSyncAssociation::Connected(CollSyncIds { + global: "syncIDXXXXXX".into(), + coll: "syncIDYYYYYY".into(), + }), + ); + let cs = LocalCollStateMachine::get_state(&engine, &gs, &root_key).expect("should work"); + assert!(cs.is_some(), "collection can sync"); + assert_eq!( + engine.assoc.replace(EngineSyncAssociation::Disconnected), + EngineSyncAssociation::Connected(CollSyncIds { + global: "syncIDAAAAAA".into(), + coll: "syncIDBBBBBB".into(), + }) + ); + assert_eq!(engine.get_num_resets(), 1); + } + + #[test] + fn test_known_good_state() { + let root_key = KeyBundle::new_random().expect("should work"); + let gs = get_global_state(&root_key); + let engine = TestSyncEngine::new( + "bookmarks", + EngineSyncAssociation::Connected(CollSyncIds { + global: "syncIDAAAAAA".into(), + coll: "syncIDBBBBBB".into(), + }), + ); + let cs = LocalCollStateMachine::get_state(&engine, &gs, &root_key).expect("should work"); + assert!(cs.is_some(), "collection can sync"); + assert_eq!(engine.get_num_resets(), 0); + } + + #[test] + fn test_declined() { + let root_key = KeyBundle::new_random().expect("should work"); + let mut gs = get_global_state(&root_key); + gs.global.declined.push("bookmarks".to_string()); + let engine = TestSyncEngine::new( + "bookmarks", + EngineSyncAssociation::Connected(CollSyncIds { + global: "syncIDAAAAAA".into(), + coll: "syncIDBBBBBB".into(), + }), + ); + let cs = LocalCollStateMachine::get_state(&engine, &gs, &root_key).expect("should work"); + assert!(cs.is_none(), "declined collection can sync"); + assert_eq!(engine.get_num_resets(), 0); + } +} diff --git a/third_party/rust/sync15/src/client/coll_update.rs b/third_party/rust/sync15/src/client/coll_update.rs new file mode 100644 index 0000000000..baa551c9a4 --- /dev/null +++ b/third_party/rust/sync15/src/client/coll_update.rs @@ -0,0 +1,137 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use super::{ + request::{NormalResponseHandler, UploadInfo}, + CollState, Sync15ClientResponse, Sync15StorageClient, +}; +use crate::bso::OutgoingEncryptedBso; +use crate::engine::{CollectionRequest, IncomingChangeset, OutgoingChangeset}; +use crate::error::{self, Error, ErrorResponse, Result}; +use crate::{KeyBundle, ServerTimestamp}; +use std::borrow::Cow; + +pub fn encrypt_outgoing( + o: OutgoingChangeset, + key: &KeyBundle, +) -> Result> { + o.changes + .into_iter() + .map(|change| change.into_encrypted(key)) + .collect() +} + +pub fn fetch_incoming( + client: &Sync15StorageClient, + state: &mut CollState, + collection_request: &CollectionRequest, +) -> Result { + let collection = collection_request.collection.clone(); + let (records, timestamp) = match client.get_encrypted_records(collection_request)? { + Sync15ClientResponse::Success { + record, + last_modified, + .. + } => (record, last_modified), + other => return Err(other.create_storage_error()), + }; + // xxx - duplication below of `timestamp` smells wrong + state.last_modified = timestamp; + let mut result = IncomingChangeset::new(collection, timestamp); + result.changes.reserve(records.len()); + for record in records { + // if we see a HMAC error, we've made an explicit decision to + // NOT handle it here, but restart the global state machine. + // That should cause us to re-read crypto/keys and things should + // work (although if for some reason crypto/keys was updated but + // not all storage was wiped we are probably screwed.) + result.changes.push(record.into_decrypted(&state.key)?); + } + Ok(result) +} + +pub struct CollectionUpdate<'a> { + client: &'a Sync15StorageClient, + state: &'a CollState, + collection: Cow<'static, str>, + xius: ServerTimestamp, + to_update: Vec, + fully_atomic: bool, +} + +impl<'a> CollectionUpdate<'a> { + pub fn new( + client: &'a Sync15StorageClient, + state: &'a CollState, + collection: Cow<'static, str>, + xius: ServerTimestamp, + records: Vec, + fully_atomic: bool, + ) -> CollectionUpdate<'a> { + CollectionUpdate { + client, + state, + collection, + xius, + to_update: records, + fully_atomic, + } + } + + pub fn new_from_changeset( + client: &'a Sync15StorageClient, + state: &'a CollState, + changeset: OutgoingChangeset, + fully_atomic: bool, + ) -> Result> { + let collection = changeset.collection.clone(); + let xius = changeset.timestamp; + if xius < state.last_modified { + // We know we are going to fail the XIUS check... + return Err(Error::StorageHttpError(ErrorResponse::PreconditionFailed { + route: collection.into_owned(), + })); + } + let to_update = encrypt_outgoing(changeset, &state.key)?; + Ok(CollectionUpdate::new( + client, + state, + collection, + xius, + to_update, + fully_atomic, + )) + } + + /// Returns a list of the IDs that failed if allowed_dropped_records is true, otherwise + /// returns an empty vec. + pub fn upload(self) -> error::Result { + let mut failed = vec![]; + let mut q = self.client.new_post_queue( + &self.collection, + &self.state.config, + self.xius, + NormalResponseHandler::new(!self.fully_atomic), + )?; + + for record in self.to_update.into_iter() { + let enqueued = q.enqueue(&record)?; + if !enqueued && self.fully_atomic { + return Err(Error::RecordTooLargeError); + } + } + + q.flush(true)?; + let mut info = q.completed_upload_info(); + info.failed_ids.append(&mut failed); + if self.fully_atomic { + assert_eq!( + info.failed_ids.len(), + 0, + "Bug: Should have failed by now if we aren't allowing dropped records" + ); + } + Ok(info) + } +} diff --git a/third_party/rust/sync15/src/client/collection_keys.rs b/third_party/rust/sync15/src/client/collection_keys.rs new file mode 100644 index 0000000000..f51894f756 --- /dev/null +++ b/third_party/rust/sync15/src/client/collection_keys.rs @@ -0,0 +1,61 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::Result; +use crate::record_types::CryptoKeysRecord; +use crate::{EncryptedPayload, KeyBundle, ServerTimestamp}; +use std::collections::HashMap; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct CollectionKeys { + pub timestamp: ServerTimestamp, + pub default: KeyBundle, + pub collections: HashMap, +} + +impl CollectionKeys { + pub fn new_random() -> Result { + let default = KeyBundle::new_random()?; + Ok(CollectionKeys { + timestamp: ServerTimestamp(0), + default, + collections: HashMap::new(), + }) + } + + pub fn from_encrypted_payload( + record: EncryptedPayload, + timestamp: ServerTimestamp, + root_key: &KeyBundle, + ) -> Result { + let keys: CryptoKeysRecord = record.decrypt_into(root_key)?; + Ok(CollectionKeys { + timestamp, + default: KeyBundle::from_base64(&keys.default[0], &keys.default[1])?, + collections: keys + .collections + .into_iter() + .map(|kv| Ok((kv.0, KeyBundle::from_base64(&kv.1[0], &kv.1[1])?))) + .collect::>>()?, + }) + } + + pub fn to_encrypted_payload(&self, root_key: &KeyBundle) -> Result { + let record = CryptoKeysRecord { + id: "keys".into(), + collection: "crypto".into(), + default: self.default.to_b64_array(), + collections: self + .collections + .iter() + .map(|kv| (kv.0.clone(), kv.1.to_b64_array())) + .collect(), + }; + EncryptedPayload::from_cleartext_payload(root_key, &record) + } + + pub fn key_for_collection<'a>(&'a self, collection: &str) -> &'a KeyBundle { + self.collections.get(collection).unwrap_or(&self.default) + } +} diff --git a/third_party/rust/sync15/src/client/mod.rs b/third_party/rust/sync15/src/client/mod.rs new file mode 100644 index 0000000000..84fe3678de --- /dev/null +++ b/third_party/rust/sync15/src/client/mod.rs @@ -0,0 +1,39 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! A module for everything needed to be a "sync client" - ie, a device which +//! can perform a full sync of any number of collections, including managing +//! the server state. +//! +//! In general, the client is responsible for all communication with the sync server, +//! including ensuring the state is correct, and encrypting/decrypting all records +//! to and from the server. However, the actual syncing of the collections is +//! delegated to an external [crate::engine](Sync Engine). +//! +//! One exception is that the "sync client" owns one sync engine - the +//! [crate::clients_engine], which is managed internally. +mod coll_state; +mod coll_update; +mod collection_keys; +mod request; +mod state; +mod status; +mod storage_client; +mod sync; +mod sync_multiple; +mod token; +mod util; + +pub(crate) use coll_state::CollState; +pub(crate) use coll_update::{fetch_incoming, CollectionUpdate}; +pub(crate) use collection_keys::CollectionKeys; +pub(crate) use request::InfoConfiguration; +pub(crate) use state::GlobalState; +pub use status::{ServiceStatus, SyncResult}; +pub use storage_client::{ + SetupStorageClient, Sync15ClientResponse, Sync15StorageClient, Sync15StorageClientInit, +}; +pub use sync_multiple::{ + sync_multiple, sync_multiple_with_command_processor, MemoryCachedState, SyncRequestInfo, +}; diff --git a/third_party/rust/sync15/src/client/request.rs b/third_party/rust/sync15/src/client/request.rs new file mode 100644 index 0000000000..c69b630c8d --- /dev/null +++ b/third_party/rust/sync15/src/client/request.rs @@ -0,0 +1,1199 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use super::storage_client::Sync15ClientResponse; +use crate::bso::OutgoingEncryptedBso; +use crate::error::{self, Error as ErrorKind, Result}; +use crate::ServerTimestamp; +use serde_derive::*; +use std::collections::HashMap; +use std::default::Default; +use std::ops::Deref; +use sync_guid::Guid; +use viaduct::status_codes; + +/// Manages a pair of (byte, count) limits for a PostQueue, such as +/// (max_post_bytes, max_post_records) or (max_total_bytes, max_total_records). +#[derive(Debug, Clone)] +struct LimitTracker { + max_bytes: usize, + max_records: usize, + cur_bytes: usize, + cur_records: usize, +} + +impl LimitTracker { + pub fn new(max_bytes: usize, max_records: usize) -> LimitTracker { + LimitTracker { + max_bytes, + max_records, + cur_bytes: 0, + cur_records: 0, + } + } + + pub fn clear(&mut self) { + self.cur_records = 0; + self.cur_bytes = 0; + } + + pub fn can_add_record(&self, payload_size: usize) -> bool { + // Desktop does the cur_bytes check as exclusive, but we shouldn't see any servers that + // don't have https://github.com/mozilla-services/server-syncstorage/issues/73 + self.cur_records < self.max_records && self.cur_bytes + payload_size <= self.max_bytes + } + + pub fn can_never_add(&self, record_size: usize) -> bool { + record_size >= self.max_bytes + } + + pub fn record_added(&mut self, record_size: usize) { + assert!( + self.can_add_record(record_size), + "LimitTracker::record_added caller must check can_add_record" + ); + self.cur_records += 1; + self.cur_bytes += record_size; + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct InfoConfiguration { + /// The maximum size in bytes of the overall HTTP request body that will be accepted by the + /// server. + #[serde(default = "default_max_request_bytes")] + pub max_request_bytes: usize, + + /// The maximum number of records that can be uploaded to a collection in a single POST request. + #[serde(default = "usize::max_value")] + pub max_post_records: usize, + + /// The maximum combined size in bytes of the record payloads that can be uploaded to a + /// collection in a single POST request. + #[serde(default = "usize::max_value")] + pub max_post_bytes: usize, + + /// The maximum total number of records that can be uploaded to a collection as part of a + /// batched upload. + #[serde(default = "usize::max_value")] + pub max_total_records: usize, + + /// The maximum total combined size in bytes of the record payloads that can be uploaded to a + /// collection as part of a batched upload. + #[serde(default = "usize::max_value")] + pub max_total_bytes: usize, + + /// The maximum size of an individual BSO payload, in bytes. + #[serde(default = "default_max_record_payload_bytes")] + pub max_record_payload_bytes: usize, +} + +// This is annoying but seems to be the only way to do it... +fn default_max_request_bytes() -> usize { + 260 * 1024 +} +fn default_max_record_payload_bytes() -> usize { + 256 * 1024 +} + +impl Default for InfoConfiguration { + #[inline] + fn default() -> InfoConfiguration { + InfoConfiguration { + max_request_bytes: default_max_request_bytes(), + max_record_payload_bytes: default_max_record_payload_bytes(), + max_post_records: usize::max_value(), + max_post_bytes: usize::max_value(), + max_total_records: usize::max_value(), + max_total_bytes: usize::max_value(), + } + } +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct InfoCollections(pub(crate) HashMap); + +impl InfoCollections { + pub fn new(collections: HashMap) -> InfoCollections { + InfoCollections(collections) + } +} + +impl Deref for InfoCollections { + type Target = HashMap; + + fn deref(&self) -> &HashMap { + &self.0 + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct UploadResult { + batch: Option, + /// Maps record id => why failed + #[serde(default = "HashMap::new")] + pub failed: HashMap, + /// Vec of ids + #[serde(default = "Vec::new")] + pub success: Vec, +} + +pub type PostResponse = Sync15ClientResponse; + +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub enum BatchState { + Unsupported, + NoBatch, + InBatch(String), +} + +#[derive(Debug)] +pub struct PostQueue { + poster: Post, + on_response: OnResponse, + post_limits: LimitTracker, + batch_limits: LimitTracker, + max_payload_bytes: usize, + max_request_bytes: usize, + queued: Vec, + batch: BatchState, + last_modified: ServerTimestamp, +} + +pub trait BatchPoster { + /// Note: Last argument (reference to the batch poster) is provided for the purposes of testing + /// Important: Poster should not report non-success HTTP statuses as errors!! + fn post( + &self, + body: Vec, + xius: ServerTimestamp, + batch: Option, + commit: bool, + queue: &PostQueue, + ) -> Result; +} + +// We don't just use a FnMut here since we want to override it in mocking for RefCell, +// which we can't do for FnMut since neither FnMut nor RefCell are defined here. Also, this +// is somewhat better for documentation. +pub trait PostResponseHandler { + fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()>; +} + +#[derive(Debug, Clone)] +pub(crate) struct NormalResponseHandler { + pub failed_ids: Vec, + pub successful_ids: Vec, + pub allow_failed: bool, + pub pending_failed: Vec, + pub pending_success: Vec, +} + +impl NormalResponseHandler { + pub fn new(allow_failed: bool) -> NormalResponseHandler { + NormalResponseHandler { + failed_ids: vec![], + successful_ids: vec![], + pending_failed: vec![], + pending_success: vec![], + allow_failed, + } + } +} + +impl PostResponseHandler for NormalResponseHandler { + fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> error::Result<()> { + match r { + Sync15ClientResponse::Success { record, .. } => { + if !record.failed.is_empty() && !self.allow_failed { + return Err(ErrorKind::RecordUploadFailed); + } + for id in record.success.iter() { + self.pending_success.push(id.clone()); + } + for kv in record.failed.iter() { + self.pending_failed.push(kv.0.clone()); + } + if !mid_batch { + self.successful_ids.append(&mut self.pending_success); + self.failed_ids.append(&mut self.pending_failed); + } + Ok(()) + } + _ => Err(r.create_storage_error()), + } + } +} + +impl PostQueue +where + Poster: BatchPoster, + OnResponse: PostResponseHandler, +{ + pub fn new( + config: &InfoConfiguration, + ts: ServerTimestamp, + poster: Poster, + on_response: OnResponse, + ) -> PostQueue { + PostQueue { + poster, + on_response, + last_modified: ts, + post_limits: LimitTracker::new(config.max_post_bytes, config.max_post_records), + batch_limits: LimitTracker::new(config.max_total_bytes, config.max_total_records), + batch: BatchState::NoBatch, + max_payload_bytes: config.max_record_payload_bytes, + max_request_bytes: config.max_request_bytes, + queued: Vec::new(), + } + } + + #[inline] + fn in_batch(&self) -> bool { + !matches!(&self.batch, BatchState::Unsupported | BatchState::NoBatch) + } + + pub fn enqueue(&mut self, record: &OutgoingEncryptedBso) -> Result { + let payload_length = record.serialized_payload_len(); + + if self.post_limits.can_never_add(payload_length) + || self.batch_limits.can_never_add(payload_length) + || payload_length >= self.max_payload_bytes + { + log::warn!( + "Single record too large to submit to server ({} b)", + payload_length + ); + return Ok(false); + } + + // Write directly into `queued` but undo if necessary (the vast majority of the time + // it won't be necessary). If we hit a problem we need to undo that, but the only error + // case we have to worry about right now is in flush() + let item_start = self.queued.len(); + + // This is conservative but can't hurt. + self.queued.reserve(payload_length + 2); + + // Either the first character in an array, or a comma separating + // it from the previous item. + let c = if self.queued.is_empty() { b'[' } else { b',' }; + self.queued.push(c); + + // This unwrap is fine, since serde_json's failure case is HashMaps that have non-object + // keys, which is impossible. If you decide to change this part, you *need* to call + // `self.queued.truncate(item_start)` here in the failure case! + serde_json::to_writer(&mut self.queued, &record).unwrap(); + + let item_end = self.queued.len(); + + debug_assert!( + item_end >= payload_length, + "EncryptedPayload::serialized_len is bugged" + ); + + // The + 1 is only relevant for the final record, which will have a trailing ']'. + let item_len = item_end - item_start + 1; + + if item_len >= self.max_request_bytes { + self.queued.truncate(item_start); + log::warn!( + "Single record too large to submit to server ({} b)", + item_len + ); + return Ok(false); + } + + let can_post_record = self.post_limits.can_add_record(payload_length); + let can_batch_record = self.batch_limits.can_add_record(payload_length); + let can_send_record = self.queued.len() < self.max_request_bytes; + + if !can_post_record || !can_send_record || !can_batch_record { + log::debug!( + "PostQueue flushing! (can_post = {}, can_send = {}, can_batch = {})", + can_post_record, + can_send_record, + can_batch_record + ); + // "unwrite" the record. + self.queued.truncate(item_start); + // Flush whatever we have queued. + self.flush(!can_batch_record)?; + // And write it again. + let c = if self.queued.is_empty() { b'[' } else { b',' }; + self.queued.push(c); + serde_json::to_writer(&mut self.queued, &record).unwrap(); + } + + self.post_limits.record_added(payload_length); + self.batch_limits.record_added(payload_length); + + Ok(true) + } + + pub fn flush(&mut self, want_commit: bool) -> Result<()> { + if self.queued.is_empty() { + assert!( + !self.in_batch(), + "Bug: Somehow we're in a batch but have no queued records" + ); + // Nothing to do! + return Ok(()); + } + + self.queued.push(b']'); + let batch_id = match &self.batch { + // Not the first post and we know we have no batch semantics. + BatchState::Unsupported => None, + // First commit in possible batch + BatchState::NoBatch => Some("true".into()), + // In a batch and we have a batch id. + BatchState::InBatch(ref s) => Some(s.clone()), + }; + + log::info!( + "Posting {} records of {} bytes", + self.post_limits.cur_records, + self.queued.len() + ); + + let is_commit = want_commit && batch_id.is_some(); + // Weird syntax for calling a function object that is a property. + let resp_or_error = self.poster.post( + self.queued.clone(), + self.last_modified, + batch_id, + is_commit, + self, + ); + + self.queued.truncate(0); + + if want_commit || self.batch == BatchState::Unsupported { + self.batch_limits.clear(); + } + self.post_limits.clear(); + + let resp = resp_or_error?; + + let (status, last_modified, record) = match resp { + Sync15ClientResponse::Success { + status, + last_modified, + ref record, + .. + } => (status, last_modified, record), + _ => { + self.on_response.handle_response(resp, !want_commit)?; + // on_response() should always fail! + unreachable!(); + } + }; + + if want_commit || self.batch == BatchState::Unsupported { + self.last_modified = last_modified; + } + + if want_commit { + log::debug!("Committed batch {:?}", self.batch); + self.batch = BatchState::NoBatch; + self.on_response.handle_response(resp, false)?; + return Ok(()); + } + + if status != status_codes::ACCEPTED { + if self.in_batch() { + return Err(ErrorKind::ServerBatchProblem( + "Server responded non-202 success code while a batch was in progress", + )); + } + self.last_modified = last_modified; + self.batch = BatchState::Unsupported; + self.batch_limits.clear(); + self.on_response.handle_response(resp, false)?; + return Ok(()); + } + + let batch_id = record + .batch + .as_ref() + .ok_or({ + ErrorKind::ServerBatchProblem("Invalid server response: 202 without a batch ID") + })? + .clone(); + + match &self.batch { + BatchState::Unsupported => { + log::warn!("Server changed its mind about supporting batching mid-batch..."); + } + + BatchState::InBatch(ref cur_id) => { + if cur_id != &batch_id { + return Err(ErrorKind::ServerBatchProblem( + "Invalid server response: 202 without a batch ID", + )); + } + } + _ => {} + } + + // Can't change this in match arms without NLL + self.batch = BatchState::InBatch(batch_id); + self.last_modified = last_modified; + + self.on_response.handle_response(resp, true)?; + + Ok(()) + } +} + +#[derive(Clone)] +pub struct UploadInfo { + pub successful_ids: Vec, + pub failed_ids: Vec, + pub modified_timestamp: ServerTimestamp, +} + +impl PostQueue { + // TODO: should take by move + pub fn completed_upload_info(&mut self) -> UploadInfo { + let mut result = UploadInfo { + successful_ids: Vec::with_capacity(self.on_response.successful_ids.len()), + failed_ids: Vec::with_capacity( + self.on_response.failed_ids.len() + + self.on_response.pending_failed.len() + + self.on_response.pending_success.len(), + ), + modified_timestamp: self.last_modified, + }; + + result + .successful_ids + .append(&mut self.on_response.successful_ids); + + result.failed_ids.append(&mut self.on_response.failed_ids); + result + .failed_ids + .append(&mut self.on_response.pending_failed); + result + .failed_ids + .append(&mut self.on_response.pending_success); + + result + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::bso::{IncomingEncryptedBso, OutgoingEncryptedBso, OutgoingEnvelope}; + use crate::EncryptedPayload; + use lazy_static::lazy_static; + use std::cell::RefCell; + use std::collections::VecDeque; + use std::rc::Rc; + + #[derive(Debug, Clone)] + struct PostedData { + body: String, + _xius: ServerTimestamp, + batch: Option, + commit: bool, + payload_bytes: usize, + records: usize, + } + + impl PostedData { + fn records_as_json(&self) -> Vec { + let values = + serde_json::from_str::(&self.body).expect("Posted invalid json"); + // Check that they actually deserialize as what we want + let records_or_err = + serde_json::from_value::>(values.clone()); + records_or_err.expect("Failed to deserialize data"); + serde_json::from_value(values).unwrap() + } + } + + #[derive(Debug, Clone)] + struct BatchInfo { + id: Option, + posts: Vec, + bytes: usize, + records: usize, + } + + #[derive(Debug, Clone)] + struct TestPoster { + all_posts: Vec, + responses: VecDeque, + batches: Vec, + cur_batch: Option, + cfg: InfoConfiguration, + } + + type TestPosterRef = Rc>; + impl TestPoster { + pub fn new(cfg: &InfoConfiguration, responses: T) -> TestPosterRef + where + T: Into>, + { + Rc::new(RefCell::new(TestPoster { + all_posts: vec![], + responses: responses.into(), + batches: vec![], + cur_batch: None, + cfg: cfg.clone(), + })) + } + // Adds &mut + fn do_post( + &mut self, + body: &[u8], + xius: ServerTimestamp, + batch: Option, + commit: bool, + queue: &PostQueue, + ) -> Sync15ClientResponse { + let mut post = PostedData { + body: String::from_utf8(body.into()).expect("Posted invalid utf8..."), + batch: batch.clone(), + _xius: xius, + commit, + payload_bytes: 0, + records: 0, + }; + + assert!(body.len() <= self.cfg.max_request_bytes); + + let (num_records, record_payload_bytes) = { + let recs = post.records_as_json(); + assert!(recs.len() <= self.cfg.max_post_records); + assert!(recs.len() <= self.cfg.max_total_records); + let payload_bytes: usize = recs + .iter() + .map(|r| { + let len = r["payload"] + .as_str() + .expect("Non string payload property") + .len(); + assert!(len <= self.cfg.max_record_payload_bytes); + len + }) + .sum(); + assert!(payload_bytes <= self.cfg.max_post_bytes); + assert!(payload_bytes <= self.cfg.max_total_bytes); + + assert_eq!(queue.post_limits.cur_bytes, payload_bytes); + assert_eq!(queue.post_limits.cur_records, recs.len()); + (recs.len(), payload_bytes) + }; + post.payload_bytes = record_payload_bytes; + post.records = num_records; + + self.all_posts.push(post.clone()); + let response = self.responses.pop_front().unwrap(); + + let record = match response { + Sync15ClientResponse::Success { ref record, .. } => record, + _ => { + panic!("only success codes are used in this test"); + } + }; + + if self.cur_batch.is_none() { + assert!( + batch.is_none() || batch == Some("true".into()), + "We shouldn't be in a batch now" + ); + self.cur_batch = Some(BatchInfo { + id: record.batch.clone(), + posts: vec![], + records: 0, + bytes: 0, + }); + } else { + assert_eq!( + batch, + self.cur_batch.as_ref().unwrap().id, + "We're in a batch but got the wrong batch id" + ); + } + + { + let batch = self.cur_batch.as_mut().unwrap(); + batch.posts.push(post); + batch.records += num_records; + batch.bytes += record_payload_bytes; + + assert!(batch.bytes <= self.cfg.max_total_bytes); + assert!(batch.records <= self.cfg.max_total_records); + + assert_eq!(batch.records, queue.batch_limits.cur_records); + assert_eq!(batch.bytes, queue.batch_limits.cur_bytes); + } + + if commit || record.batch.is_none() { + let batch = self.cur_batch.take().unwrap(); + self.batches.push(batch); + } + + response + } + + fn do_handle_response(&mut self, _: PostResponse, mid_batch: bool) { + assert_eq!(mid_batch, self.cur_batch.is_some()); + } + } + impl BatchPoster for TestPosterRef { + fn post( + &self, + body: Vec, + xius: ServerTimestamp, + batch: Option, + commit: bool, + queue: &PostQueue, + ) -> Result { + Ok(self.borrow_mut().do_post(&body, xius, batch, commit, queue)) + } + } + + impl PostResponseHandler for TestPosterRef { + fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()> { + self.borrow_mut().do_handle_response(r, mid_batch); + Ok(()) + } + } + + type MockedPostQueue = PostQueue; + + fn pq_test_setup( + cfg: InfoConfiguration, + lm: i64, + resps: Vec, + ) -> (MockedPostQueue, TestPosterRef) { + let tester = TestPoster::new(&cfg, resps); + let pq = PostQueue::new(&cfg, ServerTimestamp(lm), tester.clone(), tester.clone()); + (pq, tester) + } + + fn fake_response<'a, T: Into>>(status: u16, lm: i64, batch: T) -> PostResponse { + assert!(status_codes::is_success_code(status)); + Sync15ClientResponse::Success { + status, + last_modified: ServerTimestamp(lm), + record: UploadResult { + batch: batch.into().map(Into::into), + failed: HashMap::new(), + success: vec![], + }, + route: "test/path".into(), + } + } + + lazy_static! { + // ~40b + static ref PAYLOAD_OVERHEAD: usize = { + let payload = EncryptedPayload { + iv: "".into(), + hmac: "".into(), + ciphertext: "".into() + }; + serde_json::to_string(&payload).unwrap().len() + }; + // ~80b + static ref TOTAL_RECORD_OVERHEAD: usize = { + let val = serde_json::to_value(OutgoingEncryptedBso::new(OutgoingEnvelope { + id: "".into(), + sortindex: None, + ttl: None, + }, + EncryptedPayload { + iv: "".into(), + hmac: "".into(), + ciphertext: "".into() + }, + )).unwrap(); + serde_json::to_string(&val).unwrap().len() + }; + // There's some subtlety in how we calulate this having to do with the fact that + // the quotes in the payload are escaped but the escape chars count to the request len + // and *not* to the payload len (the payload len check happens after json parsing the + // top level object). + static ref NON_PAYLOAD_OVERHEAD: usize = { + *TOTAL_RECORD_OVERHEAD - *PAYLOAD_OVERHEAD + }; + } + + // Actual record size (for max_request_len) will be larger by some amount + fn make_record(payload_size: usize) -> OutgoingEncryptedBso { + assert!(payload_size > *PAYLOAD_OVERHEAD); + let ciphertext_len = payload_size - *PAYLOAD_OVERHEAD; + OutgoingEncryptedBso::new( + OutgoingEnvelope { + id: "".into(), + sortindex: None, + ttl: None, + }, + EncryptedPayload { + iv: "".into(), + hmac: "".into(), + ciphertext: "x".repeat(ciphertext_len), + }, + ) + } + + fn request_bytes_for_payloads(payloads: &[usize]) -> usize { + 1 + payloads + .iter() + .map(|&size| size + 1 + *NON_PAYLOAD_OVERHEAD) + .sum::() + } + + #[test] + fn test_pq_basic() { + let cfg = InfoConfiguration { + max_request_bytes: 1000, + max_record_payload_bytes: 1000, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![fake_response(status_codes::OK, time + 100_000, None)], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.flush(true).unwrap(); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 1); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].posts.len(), 1); + assert_eq!(t.batches[0].records, 1); + assert_eq!(t.batches[0].bytes, 100); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + #[test] + fn test_pq_max_request_bytes_no_batch() { + let cfg = InfoConfiguration { + max_request_bytes: 250, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::OK, time + 100_000, None), + fake_response(status_codes::OK, time + 200_000, None), + ], + ); + + // Note that the total record overhead is around 85 bytes + let payload_size = 100 - *NON_PAYLOAD_OVERHEAD; + pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r] + pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 203; [r,r] + pq.enqueue(&make_record(payload_size)).unwrap(); // too big, 2nd post. + pq.flush(true).unwrap(); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 2); + assert_eq!(t.batches.len(), 2); + assert_eq!(t.batches[0].posts.len(), 1); + assert_eq!(t.batches[0].records, 2); + assert_eq!(t.batches[0].bytes, payload_size * 2); + assert_eq!(t.batches[0].posts[0].batch, Some("true".into())); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[payload_size, payload_size]) + ); + + assert_eq!(t.batches[1].posts.len(), 1); + assert_eq!(t.batches[1].records, 1); + assert_eq!(t.batches[1].bytes, payload_size); + // We know at this point that the server does not support batching. + assert_eq!(t.batches[1].posts[0].batch, None); + assert!(!t.batches[1].posts[0].commit); + assert_eq!( + t.batches[1].posts[0].body.len(), + request_bytes_for_payloads(&[payload_size]) + ); + } + + #[test] + fn test_pq_max_record_payload_bytes_no_batch() { + let cfg = InfoConfiguration { + max_record_payload_bytes: 150, + max_request_bytes: 350, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::OK, time + 100_000, None), + fake_response(status_codes::OK, time + 200_000, None), + ], + ); + + // Note that the total record overhead is around 85 bytes + let payload_size = 100 - *NON_PAYLOAD_OVERHEAD; + pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r] + let enqueued = pq.enqueue(&make_record(151)).unwrap(); // still 102 + assert!(!enqueued, "Should not have fit"); + pq.enqueue(&make_record(payload_size)).unwrap(); + pq.flush(true).unwrap(); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 1); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].posts.len(), 1); + assert_eq!(t.batches[0].records, 2); + assert_eq!(t.batches[0].bytes, payload_size * 2); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[payload_size, payload_size]) + ); + } + + #[test] + fn test_pq_single_batch() { + let cfg = InfoConfiguration::default(); + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![fake_response( + status_codes::ACCEPTED, + time + 100_000, + Some("1234"), + )], + ); + + let payload_size = 100 - *NON_PAYLOAD_OVERHEAD; + pq.enqueue(&make_record(payload_size)).unwrap(); + pq.enqueue(&make_record(payload_size)).unwrap(); + pq.enqueue(&make_record(payload_size)).unwrap(); + pq.flush(true).unwrap(); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 1); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].id.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts.len(), 1); + assert_eq!(t.batches[0].records, 3); + assert_eq!(t.batches[0].bytes, payload_size * 3); + assert!(t.batches[0].posts[0].commit); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[payload_size, payload_size, payload_size]) + ); + } + + #[test] + fn test_pq_multi_post_batch_bytes() { + let cfg = InfoConfiguration { + max_post_bytes: 200, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), + ], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.flush(true).unwrap(); // COMMIT + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 2); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].posts.len(), 2); + assert_eq!(t.batches[0].records, 3); + assert_eq!(t.batches[0].bytes, 300); + + assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[0].posts[0].records, 2); + assert_eq!(t.batches[0].posts[0].payload_bytes, 200); + assert!(!t.batches[0].posts[0].commit); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100]) + ); + + assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[1].records, 1); + assert_eq!(t.batches[0].posts[1].payload_bytes, 100); + assert!(t.batches[0].posts[1].commit); + assert_eq!( + t.batches[0].posts[1].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + #[test] + fn test_pq_multi_post_batch_records() { + let cfg = InfoConfiguration { + max_post_records: 3, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), + ], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.flush(true).unwrap(); // COMMIT + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 3); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].posts.len(), 3); + assert_eq!(t.batches[0].records, 7); + assert_eq!(t.batches[0].bytes, 700); + + assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[0].posts[0].records, 3); + assert_eq!(t.batches[0].posts[0].payload_bytes, 300); + assert!(!t.batches[0].posts[0].commit); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[1].records, 3); + assert_eq!(t.batches[0].posts[1].payload_bytes, 300); + assert!(!t.batches[0].posts[1].commit); + assert_eq!( + t.batches[0].posts[1].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[0].posts[2].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[2].records, 1); + assert_eq!(t.batches[0].posts[2].payload_bytes, 100); + assert!(t.batches[0].posts[2].commit); + assert_eq!( + t.batches[0].posts[2].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + #[test] + #[allow(clippy::cognitive_complexity)] + fn test_pq_multi_post_multi_batch_records() { + let cfg = InfoConfiguration { + max_post_records: 3, + max_total_records: 5, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")), + fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")), + ], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + COMMIT + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.flush(true).unwrap(); // COMMIT + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 4); + assert_eq!(t.batches.len(), 2); + assert_eq!(t.batches[0].posts.len(), 2); + assert_eq!(t.batches[1].posts.len(), 2); + + assert_eq!(t.batches[0].records, 5); + assert_eq!(t.batches[1].records, 4); + + assert_eq!(t.batches[0].bytes, 500); + assert_eq!(t.batches[1].bytes, 400); + + assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[0].posts[0].records, 3); + assert_eq!(t.batches[0].posts[0].payload_bytes, 300); + assert!(!t.batches[0].posts[0].commit); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[1].records, 2); + assert_eq!(t.batches[0].posts[1].payload_bytes, 200); + assert!(t.batches[0].posts[1].commit); + assert_eq!( + t.batches[0].posts[1].body.len(), + request_bytes_for_payloads(&[100, 100]) + ); + + assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[1].posts[0].records, 3); + assert_eq!(t.batches[1].posts[0].payload_bytes, 300); + assert!(!t.batches[1].posts[0].commit); + assert_eq!( + t.batches[1].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd"); + assert_eq!(t.batches[1].posts[1].records, 1); + assert_eq!(t.batches[1].posts[1].payload_bytes, 100); + assert!(t.batches[1].posts[1].commit); + assert_eq!( + t.batches[1].posts[1].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + #[test] + #[allow(clippy::cognitive_complexity)] + fn test_pq_multi_post_multi_batch_bytes() { + let cfg = InfoConfiguration { + max_post_bytes: 300, + max_total_bytes: 500, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), // should commit + fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")), + fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")), // should commit + ], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + assert_eq!(pq.last_modified.0, time); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + COMMIT + pq.enqueue(&make_record(100)).unwrap(); + assert_eq!(pq.last_modified.0, time + 100_000); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + + // POST + pq.enqueue(&make_record(100)).unwrap(); + assert_eq!(pq.last_modified.0, time + 100_000); + pq.flush(true).unwrap(); // COMMIT + + assert_eq!(pq.last_modified.0, time + 200_000); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 4); + assert_eq!(t.batches.len(), 2); + assert_eq!(t.batches[0].posts.len(), 2); + assert_eq!(t.batches[1].posts.len(), 2); + + assert_eq!(t.batches[0].records, 5); + assert_eq!(t.batches[1].records, 4); + + assert_eq!(t.batches[0].bytes, 500); + assert_eq!(t.batches[1].bytes, 400); + + assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[0].posts[0].records, 3); + assert_eq!(t.batches[0].posts[0].payload_bytes, 300); + assert!(!t.batches[0].posts[0].commit); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[1].records, 2); + assert_eq!(t.batches[0].posts[1].payload_bytes, 200); + assert!(t.batches[0].posts[1].commit); + assert_eq!( + t.batches[0].posts[1].body.len(), + request_bytes_for_payloads(&[100, 100]) + ); + + assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[1].posts[0].records, 3); + assert_eq!(t.batches[1].posts[0].payload_bytes, 300); + assert!(!t.batches[1].posts[0].commit); + assert_eq!( + t.batches[1].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd"); + assert_eq!(t.batches[1].posts[1].records, 1); + assert_eq!(t.batches[1].posts[1].payload_bytes, 100); + assert!(t.batches[1].posts[1].commit); + assert_eq!( + t.batches[1].posts[1].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + // TODO: Test + // + // - error cases!!! We don't test our handling of server errors at all! + // - mixed bytes/record limits + // + // A lot of these have good examples in test_postqueue.js on deskftop sync +} diff --git a/third_party/rust/sync15/src/client/state.rs b/third_party/rust/sync15/src/client/state.rs new file mode 100644 index 0000000000..78e9a6a718 --- /dev/null +++ b/third_party/rust/sync15/src/client/state.rs @@ -0,0 +1,1089 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::collections::{HashMap, HashSet}; + +use super::request::{InfoCollections, InfoConfiguration}; +use super::storage_client::{SetupStorageClient, Sync15ClientResponse}; +use super::CollectionKeys; +use crate::bso::OutgoingEncryptedBso; +use crate::error::{self, Error as ErrorKind, ErrorResponse}; +use crate::record_types::{MetaGlobalEngine, MetaGlobalRecord}; +use crate::EncryptedPayload; +use crate::{Guid, KeyBundle, ServerTimestamp}; +use interrupt_support::Interruptee; +use serde_derive::*; + +use self::SetupState::*; + +const STORAGE_VERSION: usize = 5; + +/// Maps names to storage versions for engines to include in a fresh +/// `meta/global` record. We include engines that we don't implement +/// because they'll be disabled on other clients if we omit them +/// (bug 1479929). +const DEFAULT_ENGINES: &[(&str, usize)] = &[ + ("passwords", 1), + ("clients", 1), + ("addons", 1), + ("addresses", 1), + ("bookmarks", 2), + ("creditcards", 1), + ("forms", 1), + ("history", 1), + ("prefs", 2), + ("tabs", 1), +]; + +// Declined engines to include in a fresh `meta/global` record. +const DEFAULT_DECLINED: &[&str] = &[]; + +/// State that we require the app to persist to storage for us. +/// It's a little unfortunate we need this, because it's only tracking +/// "declined engines", and even then, only needed in practice when there's +/// no meta/global so we need to create one. It's extra unfortunate because we +/// want to move away from "globally declined" engines anyway, moving towards +/// allowing engines to be enabled or disabled per client rather than globally. +/// +/// Apps are expected to treat this as opaque, so we support serializing it. +/// Note that this structure is *not* used to *change* the declined engines +/// list - that will be done in the future, but the API exposed for that +/// purpose will also take a mutable PersistedGlobalState. +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "schema_version")] +pub enum PersistedGlobalState { + /// V1 was when we persisted the entire GlobalState, keys and all! + + /// V2 is just tracking the globally declined list. + /// None means "I've no idea" and theoretically should only happen on the + /// very first sync for an app. + V2 { declined: Option> }, +} + +impl Default for PersistedGlobalState { + #[inline] + fn default() -> PersistedGlobalState { + PersistedGlobalState::V2 { declined: None } + } +} + +#[derive(Debug, Default, Clone, PartialEq)] +pub(crate) struct EngineChangesNeeded { + pub local_resets: HashSet, + pub remote_wipes: HashSet, +} + +#[derive(Debug, Default, Clone, PartialEq)] +struct RemoteEngineState { + info_collections: HashSet, + declined: HashSet, +} + +#[derive(Debug, Default, Clone, PartialEq)] +struct EngineStateInput { + local_declined: HashSet, + remote: Option, + user_changes: HashMap, +} + +#[derive(Debug, Default, Clone, PartialEq)] +struct EngineStateOutput { + // The new declined. + declined: HashSet, + // Which engines need resets or wipes. + changes_needed: EngineChangesNeeded, +} + +fn compute_engine_states(input: EngineStateInput) -> EngineStateOutput { + use super::util::*; + log::debug!("compute_engine_states: input {:?}", input); + let (must_enable, must_disable) = partition_by_value(&input.user_changes); + let have_remote = input.remote.is_some(); + let RemoteEngineState { + info_collections, + declined: remote_declined, + } = input.remote.clone().unwrap_or_default(); + + let both_declined_and_remote = set_intersection(&info_collections, &remote_declined); + if !both_declined_and_remote.is_empty() { + // Should we wipe these too? + log::warn!( + "Remote state contains engines which are in both info/collections and meta/global's declined: {:?}", + both_declined_and_remote, + ); + } + + let most_recent_declined_list = if have_remote { + &remote_declined + } else { + &input.local_declined + }; + + let result_declined = set_difference( + &set_union(most_recent_declined_list, &must_disable), + &must_enable, + ); + + let output = EngineStateOutput { + changes_needed: EngineChangesNeeded { + // Anything now declined which wasn't in our declined list before gets a reset. + local_resets: set_difference(&result_declined, &input.local_declined), + // Anything remote that we just declined gets a wipe. In the future + // we might want to consider wiping things in both remote declined + // and info/collections, but we'll let other clients pick up their + // own mess for now. + remote_wipes: set_intersection(&info_collections, &must_disable), + }, + declined: result_declined, + }; + // No PII here and this helps debug problems. + log::debug!("compute_engine_states: output {:?}", output); + output +} + +impl PersistedGlobalState { + fn set_declined(&mut self, new_declined: Vec) { + match self { + Self::V2 { ref mut declined } => *declined = Some(new_declined), + } + } + pub(crate) fn get_declined(&self) -> &[String] { + match self { + Self::V2 { declined: Some(d) } => d, + Self::V2 { declined: None } => &[], + } + } +} + +/// Holds global Sync state, including server upload limits, the +/// last-fetched collection modified times, `meta/global` record, and +/// an encrypted copy of the crypto/keys resource (avoids keeping them +/// in memory longer than necessary; avoids key mismatches by ensuring the same KeyBundle +/// is used for both the keys and encrypted payloads.) +#[derive(Debug, Clone)] +pub struct GlobalState { + pub config: InfoConfiguration, + pub collections: InfoCollections, + pub global: MetaGlobalRecord, + pub global_timestamp: ServerTimestamp, + pub keys: EncryptedPayload, + pub keys_timestamp: ServerTimestamp, +} + +/// Creates a fresh `meta/global` record, using the default engine selections, +/// and declined engines from our PersistedGlobalState. +fn new_global(pgs: &PersistedGlobalState) -> MetaGlobalRecord { + let sync_id = Guid::random(); + let mut engines: HashMap = HashMap::new(); + for (name, version) in DEFAULT_ENGINES.iter() { + let sync_id = Guid::random(); + engines.insert( + (*name).to_string(), + MetaGlobalEngine { + version: *version, + sync_id, + }, + ); + } + // We only need our PersistedGlobalState to fill out a new meta/global - if + // we previously saw a meta/global then we would have updated it with what + // it was at the time. + let declined = match pgs { + PersistedGlobalState::V2 { declined: Some(d) } => d.clone(), + _ => DEFAULT_DECLINED.iter().map(ToString::to_string).collect(), + }; + + MetaGlobalRecord { + sync_id, + storage_version: STORAGE_VERSION, + engines, + declined, + } +} + +fn fixup_meta_global(global: &mut MetaGlobalRecord) -> bool { + let mut changed_any = false; + for &(name, version) in DEFAULT_ENGINES.iter() { + let had_engine = global.engines.contains_key(name); + let should_have_engine = !global.declined.iter().any(|c| c == name); + if had_engine != should_have_engine { + if should_have_engine { + log::debug!("SyncID for engine {:?} was missing", name); + global.engines.insert( + name.to_string(), + MetaGlobalEngine { + version, + sync_id: Guid::random(), + }, + ); + } else { + log::debug!("SyncID for engine {:?} was present, but shouldn't be", name); + global.engines.remove(name); + } + changed_any = true; + } + } + changed_any +} + +pub struct SetupStateMachine<'a> { + client: &'a dyn SetupStorageClient, + root_key: &'a KeyBundle, + pgs: &'a mut PersistedGlobalState, + // `allowed_states` is designed so that we can arrange for the concept of + // a "fast" sync - so we decline to advance if we need to setup from scratch. + // The idea is that if we need to sync before going to sleep we should do + // it as fast as possible. However, in practice this isn't going to do + // what we expect - a "fast sync" that finds lots to do is almost certainly + // going to take longer than a "full sync" that finds nothing to do. + // We should almost certainly remove this and instead allow for a "time + // budget", after which we get interrupted. Later... + allowed_states: Vec<&'static str>, + sequence: Vec<&'static str>, + engine_updates: Option<&'a HashMap>, + interruptee: &'a dyn Interruptee, + pub(crate) changes_needed: Option, +} + +impl<'a> SetupStateMachine<'a> { + /// Creates a state machine for a "classic" Sync 1.5 client that supports + /// all states, including uploading a fresh `meta/global` and `crypto/keys` + /// after a node reassignment. + pub fn for_full_sync( + client: &'a dyn SetupStorageClient, + root_key: &'a KeyBundle, + pgs: &'a mut PersistedGlobalState, + engine_updates: Option<&'a HashMap>, + interruptee: &'a dyn Interruptee, + ) -> SetupStateMachine<'a> { + SetupStateMachine::with_allowed_states( + client, + root_key, + pgs, + interruptee, + engine_updates, + vec![ + "Initial", + "InitialWithConfig", + "InitialWithInfo", + "InitialWithMetaGlobal", + "Ready", + "FreshStartRequired", + "WithPreviousState", + ], + ) + } + + fn with_allowed_states( + client: &'a dyn SetupStorageClient, + root_key: &'a KeyBundle, + pgs: &'a mut PersistedGlobalState, + interruptee: &'a dyn Interruptee, + engine_updates: Option<&'a HashMap>, + allowed_states: Vec<&'static str>, + ) -> SetupStateMachine<'a> { + SetupStateMachine { + client, + root_key, + pgs, + sequence: Vec::new(), + allowed_states, + engine_updates, + interruptee, + changes_needed: None, + } + } + + fn advance(&mut self, from: SetupState) -> error::Result { + match from { + // Fetch `info/configuration` with current server limits, and + // `info/collections` with collection last modified times. + Initial => { + let config = match self.client.fetch_info_configuration()? { + Sync15ClientResponse::Success { record, .. } => record, + Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => { + InfoConfiguration::default() + } + other => return Err(other.create_storage_error()), + }; + Ok(InitialWithConfig { config }) + } + + // XXX - we could consider combining these Initial* states, because we don't + // attempt to support filling in "missing" global state - *any* 404 in them + // means `FreshStart`. + // IOW, in all cases, they either `Err()`, move to `FreshStartRequired`, or + // advance to a specific next state. + InitialWithConfig { config } => { + match self.client.fetch_info_collections()? { + Sync15ClientResponse::Success { + record: collections, + .. + } => Ok(InitialWithInfo { + config, + collections, + }), + // If the server doesn't have a `crypto/keys`, start over + // and reupload our `meta/global` and `crypto/keys`. + Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => { + Ok(FreshStartRequired { config }) + } + other => Err(other.create_storage_error()), + } + } + + InitialWithInfo { + config, + collections, + } => { + match self.client.fetch_meta_global()? { + Sync15ClientResponse::Success { + record: mut global, + last_modified: mut global_timestamp, + .. + } => { + // If the server has a newer storage version, we can't + // sync until our client is updated. + if global.storage_version > STORAGE_VERSION { + return Err(ErrorKind::ClientUpgradeRequired); + } + + // If the server has an older storage version, wipe and + // reupload. + if global.storage_version < STORAGE_VERSION { + Ok(FreshStartRequired { config }) + } else { + log::info!("Have info/collections and meta/global. Computing new engine states"); + let initial_global_declined: HashSet = + global.declined.iter().cloned().collect(); + let result = compute_engine_states(EngineStateInput { + local_declined: self.pgs.get_declined().iter().cloned().collect(), + user_changes: self.engine_updates.cloned().unwrap_or_default(), + remote: Some(RemoteEngineState { + declined: initial_global_declined.clone(), + info_collections: collections.keys().cloned().collect(), + }), + }); + // Persist the new declined. + self.pgs + .set_declined(result.declined.iter().cloned().collect()); + // If the declined engines differ from remote, fix that. + let fixed_declined = if result.declined != initial_global_declined { + global.declined = result.declined.iter().cloned().collect(); + log::info!( + "Uploading new declined {:?} to meta/global with timestamp {:?}", + global.declined, + global_timestamp, + ); + true + } else { + false + }; + // If there are missing syncIds, we need to fix those as well + let fixed_ids = if fixup_meta_global(&mut global) { + log::info!( + "Uploading corrected meta/global with timestamp {:?}", + global_timestamp, + ); + true + } else { + false + }; + + if fixed_declined || fixed_ids { + global_timestamp = + self.client.put_meta_global(global_timestamp, &global)?; + log::debug!("new global_timestamp: {:?}", global_timestamp); + } + // Update the set of changes needed. + if self.changes_needed.is_some() { + // Should never happen (we prevent state machine + // loops elsewhere) but if it did, the info is stale + // anyway. + log::warn!("Already have a set of changes needed, Overwriting..."); + } + self.changes_needed = Some(result.changes_needed); + Ok(InitialWithMetaGlobal { + config, + collections, + global, + global_timestamp, + }) + } + } + Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => { + Ok(FreshStartRequired { config }) + } + other => Err(other.create_storage_error()), + } + } + + InitialWithMetaGlobal { + config, + collections, + global, + global_timestamp, + } => { + // Now try and get keys etc - if we fresh-start we'll re-use declined. + match self.client.fetch_crypto_keys()? { + Sync15ClientResponse::Success { + record, + last_modified, + .. + } => { + // Note that collection/keys is itself a bso, so the + // json body also carries the timestamp. If they aren't + // identical something has screwed up and we should die. + assert_eq!(last_modified, record.envelope.modified); + let state = GlobalState { + config, + collections, + global, + global_timestamp, + keys: record.payload, + keys_timestamp: last_modified, + }; + Ok(Ready { state }) + } + // If the server doesn't have a `crypto/keys`, start over + // and reupload our `meta/global` and `crypto/keys`. + Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => { + Ok(FreshStartRequired { config }) + } + other => Err(other.create_storage_error()), + } + } + + // We've got old state that's likely to be OK. + // We keep things simple here - if there's evidence of a new/missing + // meta/global or new/missing keys we just restart from scratch. + WithPreviousState { old_state } => match self.client.fetch_info_collections()? { + Sync15ClientResponse::Success { + record: collections, + .. + } => Ok( + if self.engine_updates.is_none() + && is_same_timestamp(old_state.global_timestamp, &collections, "meta") + && is_same_timestamp(old_state.keys_timestamp, &collections, "crypto") + { + Ready { + state: GlobalState { + collections, + ..old_state + }, + } + } else { + InitialWithConfig { + config: old_state.config, + } + }, + ), + _ => Ok(InitialWithConfig { + config: old_state.config, + }), + }, + + Ready { state } => Ok(Ready { state }), + + FreshStartRequired { config } => { + // Wipe the server. + log::info!("Fresh start: wiping remote"); + self.client.wipe_all_remote()?; + + // Upload a fresh `meta/global`... + log::info!("Uploading meta/global"); + let computed = compute_engine_states(EngineStateInput { + local_declined: self.pgs.get_declined().iter().cloned().collect(), + user_changes: self.engine_updates.cloned().unwrap_or_default(), + remote: None, + }); + self.pgs + .set_declined(computed.declined.iter().cloned().collect()); + + self.changes_needed = Some(computed.changes_needed); + + let new_global = new_global(self.pgs); + + self.client + .put_meta_global(ServerTimestamp::default(), &new_global)?; + + // ...And a fresh `crypto/keys`. + let new_keys = CollectionKeys::new_random()?.to_encrypted_payload(self.root_key)?; + let bso = OutgoingEncryptedBso::new(Guid::new("keys").into(), new_keys); + self.client + .put_crypto_keys(ServerTimestamp::default(), &bso)?; + + // TODO(lina): Can we pass along server timestamps from the PUTs + // above, and avoid re-fetching the `m/g` and `c/k` we just + // uploaded? + // OTOH(mark): restarting the state machine keeps life simple and rare. + Ok(InitialWithConfig { config }) + } + } + } + + /// Runs through the state machine to the ready state. + pub fn run_to_ready(&mut self, state: Option) -> error::Result { + let mut s = match state { + Some(old_state) => WithPreviousState { old_state }, + None => Initial, + }; + loop { + self.interruptee.err_if_interrupted()?; + let label = &s.label(); + log::trace!("global state: {:?}", label); + match s { + Ready { state } => { + self.sequence.push(label); + return Ok(state); + } + // If we already started over once before, we're likely in a + // cycle, and should try again later. Intermediate states + // aren't a problem, just the initial ones. + FreshStartRequired { .. } | WithPreviousState { .. } | Initial => { + if self.sequence.contains(label) { + // Is this really the correct error? + return Err(ErrorKind::SetupRace); + } + } + _ => { + if !self.allowed_states.contains(label) { + return Err(ErrorKind::SetupRequired); + } + } + }; + self.sequence.push(label); + s = self.advance(s)?; + } + } +} + +/// States in the remote setup process. +/// TODO(lina): Add link once #56 is merged. +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +enum SetupState { + // These "Initial" states are only ever used when starting from scratch. + Initial, + InitialWithConfig { + config: InfoConfiguration, + }, + InitialWithInfo { + config: InfoConfiguration, + collections: InfoCollections, + }, + InitialWithMetaGlobal { + config: InfoConfiguration, + collections: InfoCollections, + global: MetaGlobalRecord, + global_timestamp: ServerTimestamp, + }, + WithPreviousState { + old_state: GlobalState, + }, + Ready { + state: GlobalState, + }, + FreshStartRequired { + config: InfoConfiguration, + }, +} + +impl SetupState { + fn label(&self) -> &'static str { + match self { + Initial { .. } => "Initial", + InitialWithConfig { .. } => "InitialWithConfig", + InitialWithInfo { .. } => "InitialWithInfo", + InitialWithMetaGlobal { .. } => "InitialWithMetaGlobal", + Ready { .. } => "Ready", + WithPreviousState { .. } => "WithPreviousState", + FreshStartRequired { .. } => "FreshStartRequired", + } + } +} + +/// Whether we should skip fetching an item. Used when we already have timestamps +/// and want to check if we should reuse our existing state. The state's fairly +/// cheap to recreate and very bad to use if it is wrong, so we insist on the +/// *exact* timestamp matching and not a simple "later than" check. +fn is_same_timestamp(local: ServerTimestamp, collections: &InfoCollections, key: &str) -> bool { + collections.get(key).map_or(false, |ts| local == *ts) +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::bso::{IncomingEncryptedBso, IncomingEnvelope}; + use interrupt_support::NeverInterrupts; + + struct InMemoryClient { + info_configuration: error::Result>, + info_collections: error::Result>, + meta_global: error::Result>, + crypto_keys: error::Result>, + } + + impl SetupStorageClient for InMemoryClient { + fn fetch_info_configuration( + &self, + ) -> error::Result> { + match &self.info_configuration { + Ok(client_response) => Ok(client_response.clone()), + Err(_) => Ok(Sync15ClientResponse::Error(ErrorResponse::ServerError { + status: 500, + route: "test/path".into(), + })), + } + } + + fn fetch_info_collections(&self) -> error::Result> { + match &self.info_collections { + Ok(collections) => Ok(collections.clone()), + Err(_) => Ok(Sync15ClientResponse::Error(ErrorResponse::ServerError { + status: 500, + route: "test/path".into(), + })), + } + } + + fn fetch_meta_global(&self) -> error::Result> { + match &self.meta_global { + Ok(global) => Ok(global.clone()), + // TODO(lina): Special handling for 404s, we want to ensure we + // handle missing keys and other server errors correctly. + Err(_) => Ok(Sync15ClientResponse::Error(ErrorResponse::ServerError { + status: 500, + route: "test/path".into(), + })), + } + } + + fn put_meta_global( + &self, + xius: ServerTimestamp, + global: &MetaGlobalRecord, + ) -> error::Result { + // Ensure that the meta/global record we uploaded is "fixed up" + assert!(DEFAULT_ENGINES + .iter() + .filter(|e| e.0 != "logins") + .all(|&(k, _v)| global.engines.contains_key(k))); + assert!(!global.engines.contains_key("logins")); + assert_eq!(global.declined, vec!["logins".to_string()]); + // return a different timestamp. + Ok(ServerTimestamp(xius.0 + 1)) + } + + fn fetch_crypto_keys(&self) -> error::Result> { + match &self.crypto_keys { + Ok(Sync15ClientResponse::Success { + status, + record, + last_modified, + route, + }) => Ok(Sync15ClientResponse::Success { + status: *status, + record: IncomingEncryptedBso::new( + record.envelope.clone(), + record.payload.clone(), + ), + last_modified: *last_modified, + route: route.clone(), + }), + // TODO(lina): Same as above, for 404s. + _ => Ok(Sync15ClientResponse::Error(ErrorResponse::ServerError { + status: 500, + route: "test/path".into(), + })), + } + } + + fn put_crypto_keys( + &self, + xius: ServerTimestamp, + _keys: &OutgoingEncryptedBso, + ) -> error::Result<()> { + assert_eq!(xius, ServerTimestamp(888_800)); + Err(ErrorKind::StorageHttpError(ErrorResponse::ServerError { + status: 500, + route: "crypto/keys".to_string(), + })) + } + + fn wipe_all_remote(&self) -> error::Result<()> { + Ok(()) + } + } + + #[allow(clippy::unnecessary_wraps)] + fn mocked_success_ts(t: T, ts: i64) -> error::Result> { + Ok(Sync15ClientResponse::Success { + status: 200, + record: t, + last_modified: ServerTimestamp(ts), + route: "test/path".into(), + }) + } + + fn mocked_success(t: T) -> error::Result> { + mocked_success_ts(t, 0) + } + + fn mocked_success_keys( + keys: CollectionKeys, + root_key: &KeyBundle, + ) -> error::Result> { + let timestamp = keys.timestamp; + let payload = keys.to_encrypted_payload(root_key).unwrap(); + let bso = IncomingEncryptedBso::new( + IncomingEnvelope { + id: Guid::new("keys"), + modified: timestamp, + sortindex: None, + ttl: None, + }, + payload, + ); + Ok(Sync15ClientResponse::Success { + status: 200, + record: bso, + last_modified: timestamp, + route: "test/path".into(), + }) + } + + #[test] + fn test_state_machine_ready_from_empty() { + let _ = env_logger::try_init(); + let root_key = KeyBundle::new_random().unwrap(); + let keys = CollectionKeys { + timestamp: ServerTimestamp(123_400), + default: KeyBundle::new_random().unwrap(), + collections: HashMap::new(), + }; + let mg = MetaGlobalRecord { + sync_id: "syncIDAAAAAA".into(), + storage_version: 5usize, + engines: vec![( + "bookmarks", + MetaGlobalEngine { + version: 1usize, + sync_id: "syncIDBBBBBB".into(), + }, + )] + .into_iter() + .map(|(key, value)| (key.to_owned(), value)) + .collect(), + // We ensure that the record we upload doesn't have a logins record. + declined: vec!["logins".to_string()], + }; + let client = InMemoryClient { + info_configuration: mocked_success(InfoConfiguration::default()), + info_collections: mocked_success(InfoCollections::new( + vec![("meta", 123_456), ("crypto", 145_000)] + .into_iter() + .map(|(key, value)| (key.to_owned(), ServerTimestamp(value))) + .collect(), + )), + meta_global: mocked_success_ts(mg, 999_000), + crypto_keys: mocked_success_keys(keys, &root_key), + }; + let mut pgs = PersistedGlobalState::V2 { declined: None }; + + let mut state_machine = + SetupStateMachine::for_full_sync(&client, &root_key, &mut pgs, None, &NeverInterrupts); + assert!( + state_machine.run_to_ready(None).is_ok(), + "Should drive state machine to ready" + ); + assert_eq!( + state_machine.sequence, + vec![ + "Initial", + "InitialWithConfig", + "InitialWithInfo", + "InitialWithMetaGlobal", + "Ready", + ], + "Should cycle through all states" + ); + } + + #[test] + fn test_from_previous_state_declined() { + let _ = env_logger::try_init(); + // The state-machine sequence where we didn't use the previous state + // (ie, where the state machine restarted) + let sm_seq_restarted = vec![ + "WithPreviousState", + "InitialWithConfig", + "InitialWithInfo", + "InitialWithMetaGlobal", + "Ready", + ]; + // The state-machine sequence where we used the previous state. + let sm_seq_used_previous = vec!["WithPreviousState", "Ready"]; + + // do the actual test. + fn do_test( + client: &dyn SetupStorageClient, + root_key: &KeyBundle, + pgs: &mut PersistedGlobalState, + engine_updates: Option<&HashMap>, + old_state: GlobalState, + expected_states: &[&str], + ) { + let mut state_machine = SetupStateMachine::for_full_sync( + client, + root_key, + pgs, + engine_updates, + &NeverInterrupts, + ); + assert!( + state_machine.run_to_ready(Some(old_state)).is_ok(), + "Should drive state machine to ready" + ); + assert_eq!(state_machine.sequence, expected_states); + } + + // and all the complicated setup... + let ts_metaglobal = 123_456; + let ts_keys = 145_000; + let root_key = KeyBundle::new_random().unwrap(); + let keys = CollectionKeys { + timestamp: ServerTimestamp(ts_keys + 1), + default: KeyBundle::new_random().unwrap(), + collections: HashMap::new(), + }; + let mg = MetaGlobalRecord { + sync_id: "syncIDAAAAAA".into(), + storage_version: 5usize, + engines: vec![( + "bookmarks", + MetaGlobalEngine { + version: 1usize, + sync_id: "syncIDBBBBBB".into(), + }, + )] + .into_iter() + .map(|(key, value)| (key.to_owned(), value)) + .collect(), + // We ensure that the record we upload doesn't have a logins record. + declined: vec!["logins".to_string()], + }; + let collections = InfoCollections::new( + vec![("meta", ts_metaglobal), ("crypto", ts_keys)] + .into_iter() + .map(|(key, value)| (key.to_owned(), ServerTimestamp(value))) + .collect(), + ); + let client = InMemoryClient { + info_configuration: mocked_success(InfoConfiguration::default()), + info_collections: mocked_success(collections.clone()), + meta_global: mocked_success_ts(mg.clone(), ts_metaglobal), + crypto_keys: mocked_success_keys(keys.clone(), &root_key), + }; + + // First a test where the "previous" global state is OK to reuse. + { + let mut pgs = PersistedGlobalState::V2 { declined: None }; + // A "previous" global state. + let old_state = GlobalState { + config: InfoConfiguration::default(), + collections: collections.clone(), + global: mg.clone(), + global_timestamp: ServerTimestamp(ts_metaglobal), + keys: keys + .to_encrypted_payload(&root_key) + .expect("should always work in this test"), + keys_timestamp: ServerTimestamp(ts_keys), + }; + do_test( + &client, + &root_key, + &mut pgs, + None, + old_state, + &sm_seq_used_previous, + ); + } + + // Now where the meta/global record on the server is later. + { + let mut pgs = PersistedGlobalState::V2 { declined: None }; + // A "previous" global state. + let old_state = GlobalState { + config: InfoConfiguration::default(), + collections: collections.clone(), + global: mg.clone(), + global_timestamp: ServerTimestamp(999_999), + keys: keys + .to_encrypted_payload(&root_key) + .expect("should always work in this test"), + keys_timestamp: ServerTimestamp(ts_keys), + }; + do_test( + &client, + &root_key, + &mut pgs, + None, + old_state, + &sm_seq_restarted, + ); + } + + // Where keys on the server is later. + { + let mut pgs = PersistedGlobalState::V2 { declined: None }; + // A "previous" global state. + let old_state = GlobalState { + config: InfoConfiguration::default(), + collections: collections.clone(), + global: mg.clone(), + global_timestamp: ServerTimestamp(ts_metaglobal), + keys: keys + .to_encrypted_payload(&root_key) + .expect("should always work in this test"), + keys_timestamp: ServerTimestamp(999_999), + }; + do_test( + &client, + &root_key, + &mut pgs, + None, + old_state, + &sm_seq_restarted, + ); + } + + // Where there are engine-state changes. + { + let mut pgs = PersistedGlobalState::V2 { declined: None }; + // A "previous" global state. + let old_state = GlobalState { + config: InfoConfiguration::default(), + collections, + global: mg, + global_timestamp: ServerTimestamp(ts_metaglobal), + keys: keys + .to_encrypted_payload(&root_key) + .expect("should always work in this test"), + keys_timestamp: ServerTimestamp(ts_keys), + }; + let mut engine_updates = HashMap::::new(); + engine_updates.insert("logins".to_string(), false); + do_test( + &client, + &root_key, + &mut pgs, + Some(&engine_updates), + old_state, + &sm_seq_restarted, + ); + let declined = match pgs { + PersistedGlobalState::V2 { declined: d } => d, + }; + // and check we now consider logins as declined. + assert_eq!(declined, Some(vec!["logins".to_string()])); + } + } + + fn string_set(s: &[&str]) -> HashSet { + s.iter().map(ToString::to_string).collect() + } + fn string_map(s: &[(&str, T)]) -> HashMap { + s.iter().map(|v| (v.0.to_string(), v.1.clone())).collect() + } + #[test] + fn test_engine_states() { + assert_eq!( + compute_engine_states(EngineStateInput { + local_declined: string_set(&["foo", "bar"]), + remote: None, + user_changes: Default::default(), + }), + EngineStateOutput { + declined: string_set(&["foo", "bar"]), + // No wipes, no resets + changes_needed: Default::default(), + } + ); + assert_eq!( + compute_engine_states(EngineStateInput { + local_declined: string_set(&["foo", "bar"]), + remote: Some(RemoteEngineState { + declined: string_set(&["foo"]), + info_collections: string_set(&["bar"]) + }), + user_changes: Default::default(), + }), + EngineStateOutput { + // Now we have `foo`. + declined: string_set(&["foo"]), + // No wipes, no resets, should just be a local update. + changes_needed: Default::default(), + } + ); + assert_eq!( + compute_engine_states(EngineStateInput { + local_declined: string_set(&["foo", "bar"]), + remote: Some(RemoteEngineState { + declined: string_set(&["foo", "bar", "quux"]), + info_collections: string_set(&[]) + }), + user_changes: Default::default(), + }), + EngineStateOutput { + // Now we have `foo`. + declined: string_set(&["foo", "bar", "quux"]), + changes_needed: EngineChangesNeeded { + // Should reset `quux`. + local_resets: string_set(&["quux"]), + // No wipes, though. + remote_wipes: string_set(&[]), + } + } + ); + assert_eq!( + compute_engine_states(EngineStateInput { + local_declined: string_set(&["bar", "baz"]), + remote: Some(RemoteEngineState { + declined: string_set(&["bar", "baz",]), + info_collections: string_set(&["quux"]) + }), + // Change a declined engine to undeclined. + user_changes: string_map(&[("bar", true)]), + }), + EngineStateOutput { + declined: string_set(&["baz"]), + // No wipes, just undecline it. + changes_needed: Default::default() + } + ); + assert_eq!( + compute_engine_states(EngineStateInput { + local_declined: string_set(&["bar", "baz"]), + remote: Some(RemoteEngineState { + declined: string_set(&["bar", "baz"]), + info_collections: string_set(&["foo"]) + }), + // Change an engine which exists remotely to declined. + user_changes: string_map(&[("foo", false)]), + }), + EngineStateOutput { + declined: string_set(&["baz", "bar", "foo"]), + // No wipes, just undecline it. + changes_needed: EngineChangesNeeded { + // Should reset our local foo + local_resets: string_set(&["foo"]), + // And wipe the server. + remote_wipes: string_set(&["foo"]), + } + } + ); + } +} diff --git a/third_party/rust/sync15/src/client/status.rs b/third_party/rust/sync15/src/client/status.rs new file mode 100644 index 0000000000..407efeec12 --- /dev/null +++ b/third_party/rust/sync15/src/client/status.rs @@ -0,0 +1,106 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::{Error, ErrorResponse}; +use crate::telemetry::SyncTelemetryPing; +use std::collections::HashMap; +use std::time::{Duration, SystemTime}; + +/// The general status of sync - should probably be moved to the "sync manager" +/// once we have one! +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ServiceStatus { + /// Everything is fine. + Ok, + /// Some general network issue. + NetworkError, + /// Some apparent issue with the servers. + ServiceError, + /// Some external FxA action needs to be taken. + AuthenticationError, + /// We declined to do anything for backoff or rate-limiting reasons. + BackedOff, + /// We were interrupted. + Interrupted, + /// Something else - you need to check the logs for more details. May + /// or may not be transient, we really don't know. + OtherError, +} + +impl ServiceStatus { + // This is a bit naive and probably will not survive in this form in the + // SyncManager - eg, we'll want to handle backoff etc. + pub fn from_err(err: &Error) -> ServiceStatus { + match err { + // HTTP based errors. + Error::TokenserverHttpError(status) => { + // bit of a shame the tokenserver is different to storage... + if *status == 401 { + ServiceStatus::AuthenticationError + } else { + ServiceStatus::ServiceError + } + } + // BackoffError is also from the tokenserver. + Error::BackoffError(_) => ServiceStatus::ServiceError, + Error::StorageHttpError(ref e) => match e { + ErrorResponse::Unauthorized { .. } => ServiceStatus::AuthenticationError, + _ => ServiceStatus::ServiceError, + }, + + // Network errors. + Error::RequestError(_) | Error::UnexpectedStatus(_) | Error::HawkError(_) => { + ServiceStatus::NetworkError + } + + Error::Interrupted(_) => ServiceStatus::Interrupted, + _ => ServiceStatus::OtherError, + } + } +} + +/// The result of a sync request. This too is from the "sync manager", but only +/// has a fraction of the things it will have when we actually build that. +#[derive(Debug)] +pub struct SyncResult { + /// The general health. + pub service_status: ServiceStatus, + + /// The set of declined engines, if we know them. + pub declined: Option>, + + /// The result of the sync. + pub result: Result<(), Error>, + + /// The result for each engine. + /// Note that we expect the `String` to be replaced with an enum later. + pub engine_results: HashMap>, + + pub telemetry: SyncTelemetryPing, + + pub next_sync_after: Option, +} + +// If `r` has a BackoffError, then returns the later backoff value. +fn advance_backoff(cur_best: SystemTime, r: &Result<(), Error>) -> SystemTime { + if let Err(e) = r { + if let Some(time) = e.get_backoff() { + return std::cmp::max(time, cur_best); + } + } + cur_best +} + +impl SyncResult { + pub(crate) fn set_sync_after(&mut self, backoff_duration: Duration) { + let now = SystemTime::now(); + let toplevel = advance_backoff(now + backoff_duration, &self.result); + let sync_after = self.engine_results.values().fold(toplevel, advance_backoff); + if sync_after <= now { + self.next_sync_after = None; + } else { + self.next_sync_after = Some(sync_after); + } + } +} diff --git a/third_party/rust/sync15/src/client/storage_client.rs b/third_party/rust/sync15/src/client/storage_client.rs new file mode 100644 index 0000000000..83dbbf294e --- /dev/null +++ b/third_party/rust/sync15/src/client/storage_client.rs @@ -0,0 +1,587 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use super::request::{ + BatchPoster, InfoCollections, InfoConfiguration, PostQueue, PostResponse, PostResponseHandler, +}; +use super::token; +use crate::bso::{IncomingBso, IncomingEncryptedBso, OutgoingBso, OutgoingEncryptedBso}; +use crate::engine::CollectionRequest; +use crate::error::{self, Error, ErrorResponse}; +use crate::record_types::MetaGlobalRecord; +use crate::{Guid, ServerTimestamp}; +use serde_json::Value; +use std::str::FromStr; +use std::sync::atomic::{AtomicU32, Ordering}; +use url::Url; +use viaduct::{ + header_names::{self, AUTHORIZATION}, + Method, Request, Response, +}; + +/// A response from a GET request on a Sync15StorageClient, encapsulating all +/// the variants users of this client needs to care about. +#[derive(Debug, Clone)] +pub enum Sync15ClientResponse { + Success { + status: u16, + record: T, + last_modified: ServerTimestamp, + route: String, + }, + Error(ErrorResponse), +} + +fn parse_seconds(seconds_str: &str) -> Option { + let secs = seconds_str.parse::().ok()?.ceil(); + // Note: u32 doesn't impl TryFrom :( + if !secs.is_finite() || secs < 0.0 || secs > f64::from(u32::max_value()) { + log::warn!("invalid backoff value: {}", secs); + None + } else { + Some(secs as u32) + } +} + +impl Sync15ClientResponse { + pub fn from_response(resp: Response, backoff_listener: &BackoffListener) -> error::Result + where + for<'a> T: serde::de::Deserialize<'a>, + { + let route: String = resp.url.path().into(); + // Android seems to respect retry_after even on success requests, so we + // will too if it's present. This also lets us handle both backoff-like + // properties in the same place. + let retry_after = resp + .headers + .get(header_names::RETRY_AFTER) + .and_then(parse_seconds); + + let backoff = resp + .headers + .get(header_names::X_WEAVE_BACKOFF) + .and_then(parse_seconds); + + if let Some(b) = backoff { + backoff_listener.note_backoff(b); + } + if let Some(ra) = retry_after { + backoff_listener.note_retry_after(ra); + } + + Ok(if resp.is_success() { + let record: T = resp.json()?; + let last_modified = resp + .headers + .get(header_names::X_LAST_MODIFIED) + .and_then(|s| ServerTimestamp::from_str(s).ok()) + .ok_or(Error::MissingServerTimestamp)?; + log::info!( + "Successful request to \"{}\", incoming x-last-modified={:?}", + route, + last_modified + ); + + Sync15ClientResponse::Success { + status: resp.status, + record, + last_modified, + route, + } + } else { + let status = resp.status; + log::info!("Request \"{}\" was an error (status={})", route, status); + match status { + 404 => Sync15ClientResponse::Error(ErrorResponse::NotFound { route }), + 401 => Sync15ClientResponse::Error(ErrorResponse::Unauthorized { route }), + 412 => Sync15ClientResponse::Error(ErrorResponse::PreconditionFailed { route }), + 500..=600 => { + Sync15ClientResponse::Error(ErrorResponse::ServerError { route, status }) + } + _ => Sync15ClientResponse::Error(ErrorResponse::RequestFailed { route, status }), + } + }) + } + + pub fn create_storage_error(self) -> Error { + let inner = match self { + Sync15ClientResponse::Success { status, route, .. } => { + // This should never happen as callers are expected to have + // already special-cased this response, so warn if it does. + // (or maybe we could panic?) + log::warn!("Converting success response into an error"); + ErrorResponse::RequestFailed { status, route } + } + Sync15ClientResponse::Error(e) => e, + }; + Error::StorageHttpError(inner) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Sync15StorageClientInit { + pub key_id: String, + pub access_token: String, + pub tokenserver_url: Url, +} + +/// A trait containing the methods required to run through the setup state +/// machine. This is factored out into a separate trait to make mocking +/// easier. +pub trait SetupStorageClient { + fn fetch_info_configuration(&self) -> error::Result>; + fn fetch_info_collections(&self) -> error::Result>; + fn fetch_meta_global(&self) -> error::Result>; + fn fetch_crypto_keys(&self) -> error::Result>; + + fn put_meta_global( + &self, + xius: ServerTimestamp, + global: &MetaGlobalRecord, + ) -> error::Result; + fn put_crypto_keys( + &self, + xius: ServerTimestamp, + keys: &OutgoingEncryptedBso, + ) -> error::Result<()>; + fn wipe_all_remote(&self) -> error::Result<()>; +} + +#[derive(Debug, Default)] +pub struct BackoffState { + pub backoff_secs: AtomicU32, + pub retry_after_secs: AtomicU32, +} + +pub(crate) type BackoffListener = std::sync::Arc; + +pub(crate) fn new_backoff_listener() -> BackoffListener { + std::sync::Arc::new(BackoffState::default()) +} + +impl BackoffState { + pub fn note_backoff(&self, noted: u32) { + super::util::atomic_update_max(&self.backoff_secs, noted) + } + + pub fn note_retry_after(&self, noted: u32) { + super::util::atomic_update_max(&self.retry_after_secs, noted) + } + + pub fn get_backoff_secs(&self) -> u32 { + self.backoff_secs.load(Ordering::SeqCst) + } + + pub fn get_retry_after_secs(&self) -> u32 { + self.retry_after_secs.load(Ordering::SeqCst) + } + + pub fn get_required_wait(&self, ignore_soft_backoff: bool) -> Option { + let bo = self.get_backoff_secs(); + let ra = self.get_retry_after_secs(); + let secs = u64::from(if ignore_soft_backoff { ra } else { bo.max(ra) }); + if secs > 0 { + Some(std::time::Duration::from_secs(secs)) + } else { + None + } + } + + pub fn reset(&self) { + self.backoff_secs.store(0, Ordering::SeqCst); + self.retry_after_secs.store(0, Ordering::SeqCst); + } +} + +// meta/global is a clear-text Bso (ie, there's a String `payload` which has a MetaGlobalRecord) +// We don't use the 'content' helpers here because we want json errors to be fatal here +// (ie, we don't need tombstones and can't just skip a malformed record) +type IncMetaGlobalBso = IncomingBso; +type OutMetaGlobalBso = OutgoingBso; + +#[derive(Debug)] +pub struct Sync15StorageClient { + tsc: token::TokenProvider, + pub(crate) backoff: BackoffListener, +} + +impl SetupStorageClient for Sync15StorageClient { + fn fetch_info_configuration(&self) -> error::Result> { + self.relative_storage_request(Method::Get, "info/configuration") + } + + fn fetch_info_collections(&self) -> error::Result> { + self.relative_storage_request(Method::Get, "info/collections") + } + + fn fetch_meta_global(&self) -> error::Result> { + let got: Sync15ClientResponse = + self.relative_storage_request(Method::Get, "storage/meta/global")?; + Ok(match got { + Sync15ClientResponse::Success { + record, + last_modified, + route, + status, + } => { + log::debug!( + "Got meta global with modified = {}; last-modified = {}", + record.envelope.modified, + last_modified + ); + Sync15ClientResponse::Success { + record: serde_json::from_str(&record.payload)?, + last_modified, + route, + status, + } + } + Sync15ClientResponse::Error(e) => Sync15ClientResponse::Error(e), + }) + } + + fn fetch_crypto_keys(&self) -> error::Result> { + self.relative_storage_request(Method::Get, "storage/crypto/keys") + } + + fn put_meta_global( + &self, + xius: ServerTimestamp, + global: &MetaGlobalRecord, + ) -> error::Result { + let bso = OutMetaGlobalBso::new(Guid::new("global").into(), global)?; + self.put("storage/meta/global", xius, &bso) + } + + fn put_crypto_keys( + &self, + xius: ServerTimestamp, + keys: &OutgoingEncryptedBso, + ) -> error::Result<()> { + self.put("storage/crypto/keys", xius, keys)?; + Ok(()) + } + + fn wipe_all_remote(&self) -> error::Result<()> { + let s = self.tsc.api_endpoint()?; + let url = Url::parse(&s)?; + + let req = self.build_request(Method::Delete, url)?; + match self.exec_request::(req, false) { + Ok(Sync15ClientResponse::Error(ErrorResponse::NotFound { .. })) + | Ok(Sync15ClientResponse::Success { .. }) => Ok(()), + Ok(resp) => Err(resp.create_storage_error()), + Err(e) => Err(e), + } + } +} + +impl Sync15StorageClient { + pub fn new(init_params: Sync15StorageClientInit) -> error::Result { + rc_crypto::ensure_initialized(); + let tsc = token::TokenProvider::new( + init_params.tokenserver_url, + init_params.access_token, + init_params.key_id, + )?; + Ok(Sync15StorageClient { + tsc, + backoff: new_backoff_listener(), + }) + } + + pub fn get_encrypted_records( + &self, + collection_request: &CollectionRequest, + ) -> error::Result>> { + self.collection_request(Method::Get, collection_request) + } + + #[inline] + fn authorized(&self, req: Request) -> error::Result { + let hawk_header_value = self.tsc.authorization(&req)?; + Ok(req.header(AUTHORIZATION, hawk_header_value)?) + } + + // TODO: probably want a builder-like API to do collection requests (e.g. something + // that occupies roughly the same conceptual role as the Collection class in desktop) + fn build_request(&self, method: Method, url: Url) -> error::Result { + self.authorized(Request::new(method, url).header(header_names::ACCEPT, "application/json")?) + } + + fn relative_storage_request( + &self, + method: Method, + relative_path: P, + ) -> error::Result> + where + P: AsRef, + for<'a> T: serde::de::Deserialize<'a>, + { + let s = self.tsc.api_endpoint()? + "/"; + let url = Url::parse(&s)?.join(relative_path.as_ref())?; + self.exec_request(self.build_request(method, url)?, false) + } + + fn exec_request( + &self, + req: Request, + require_success: bool, + ) -> error::Result> + where + for<'a> T: serde::de::Deserialize<'a>, + { + log::trace!( + "request: {} {} ({:?})", + req.method, + req.url.path(), + req.url.query() + ); + let resp = req.send()?; + + let result = Sync15ClientResponse::from_response(resp, &self.backoff)?; + match result { + Sync15ClientResponse::Success { .. } => Ok(result), + _ => { + if require_success { + Err(result.create_storage_error()) + } else { + Ok(result) + } + } + } + } + + fn collection_request( + &self, + method: Method, + r: &CollectionRequest, + ) -> error::Result> + where + for<'a> T: serde::de::Deserialize<'a>, + { + let url = build_collection_request_url(Url::parse(&self.tsc.api_endpoint()?)?, r)?; + self.exec_request(self.build_request(method, url)?, false) + } + + pub fn new_post_queue<'a, F: PostResponseHandler>( + &'a self, + coll: &str, + config: &InfoConfiguration, + ts: ServerTimestamp, + on_response: F, + ) -> error::Result, F>> { + let pw = PostWrapper { + client: self, + coll: coll.into(), + }; + Ok(PostQueue::new(config, ts, pw, on_response)) + } + + fn put( + &self, + relative_path: P, + xius: ServerTimestamp, + body: &B, + ) -> error::Result + where + P: AsRef, + B: serde::ser::Serialize, + { + let s = self.tsc.api_endpoint()? + "/"; + let url = Url::parse(&s)?.join(relative_path.as_ref())?; + + let req = self + .build_request(Method::Put, url)? + .json(body) + .header(header_names::X_IF_UNMODIFIED_SINCE, format!("{}", xius))?; + + let resp = self.exec_request::(req, true)?; + // Note: we pass `true` for require_success, so this panic never happens. + if let Sync15ClientResponse::Success { last_modified, .. } = resp { + Ok(last_modified) + } else { + unreachable!("Error returned exec_request when `require_success` was true"); + } + } + + pub fn hashed_uid(&self) -> error::Result { + self.tsc.hashed_uid() + } + + pub(crate) fn wipe_remote_engine(&self, engine: &str) -> error::Result<()> { + let s = self.tsc.api_endpoint()? + "/"; + let url = Url::parse(&s)?.join(&format!("storage/{}", engine))?; + log::debug!("Wiping: {:?}", url); + let req = self.build_request(Method::Delete, url)?; + match self.exec_request::(req, false) { + Ok(Sync15ClientResponse::Error(ErrorResponse::NotFound { .. })) + | Ok(Sync15ClientResponse::Success { .. }) => Ok(()), + Ok(resp) => Err(resp.create_storage_error()), + Err(e) => Err(e), + } + } +} + +pub struct PostWrapper<'a> { + client: &'a Sync15StorageClient, + coll: String, +} + +impl<'a> BatchPoster for PostWrapper<'a> { + fn post( + &self, + bytes: Vec, + xius: ServerTimestamp, + batch: Option, + commit: bool, + _: &PostQueue, + ) -> error::Result { + let r = CollectionRequest::new(self.coll.clone()) + .batch(batch) + .commit(commit); + let url = build_collection_request_url(Url::parse(&self.client.tsc.api_endpoint()?)?, &r)?; + + let req = self + .client + .build_request(Method::Post, url)? + .header(header_names::CONTENT_TYPE, "application/json")? + .header(header_names::X_IF_UNMODIFIED_SINCE, format!("{}", xius))? + .body(bytes); + self.client.exec_request(req, false) + } +} + +fn build_collection_request_url(mut base_url: Url, r: &CollectionRequest) -> error::Result { + base_url + .path_segments_mut() + .map_err(|_| Error::UnacceptableUrl("Storage server URL is not a base".to_string()))? + .extend(&["storage", &r.collection]); + + let mut pairs = base_url.query_pairs_mut(); + if r.full { + pairs.append_pair("full", "1"); + } + if r.limit > 0 { + pairs.append_pair("limit", &r.limit.to_string()); + } + if let Some(ids) = &r.ids { + // Most ids are 12 characters, and we comma separate them, so 13. + let mut buf = String::with_capacity(ids.len() * 13); + for (i, id) in ids.iter().enumerate() { + if i > 0 { + buf.push(','); + } + buf.push_str(id.as_str()); + } + pairs.append_pair("ids", &buf); + } + if let Some(batch) = &r.batch { + pairs.append_pair("batch", batch); + } + if r.commit { + pairs.append_pair("commit", "true"); + } + if let Some(ts) = r.older { + pairs.append_pair("older", &ts.to_string()); + } + if let Some(ts) = r.newer { + pairs.append_pair("newer", &ts.to_string()); + } + if let Some(o) = r.order { + pairs.append_pair("sort", o.as_str()); + } + pairs.finish(); + drop(pairs); + + // This is strange but just accessing query_pairs_mut makes you have + // a trailing question mark on your url. I don't think anything bad + // would happen here, but I don't know, and also, it looks dumb so + // I'd rather not have it. + if base_url.query() == Some("") { + base_url.set_query(None); + } + Ok(base_url) +} + +#[cfg(test)] +mod test { + use super::*; + #[test] + fn test_send() { + fn ensure_send() {} + // Compile will fail if not send. + ensure_send::(); + } + + #[test] + fn test_parse_seconds() { + assert_eq!(parse_seconds("1"), Some(1)); + assert_eq!(parse_seconds("1.4"), Some(2)); + assert_eq!(parse_seconds("1.5"), Some(2)); + assert_eq!(parse_seconds("3600.0"), Some(3600)); + assert_eq!(parse_seconds("3600"), Some(3600)); + assert_eq!(parse_seconds("-1"), None); + assert_eq!(parse_seconds("inf"), None); + assert_eq!(parse_seconds("-inf"), None); + assert_eq!(parse_seconds("one-thousand"), None); + assert_eq!(parse_seconds("4294967295"), Some(4294967295)); + assert_eq!(parse_seconds("4294967296"), None); + } + + #[test] + fn test_query_building() { + use crate::engine::RequestOrder; + let base = Url::parse("https://example.com/sync").unwrap(); + + let empty = + build_collection_request_url(base.clone(), &CollectionRequest::new("foo")).unwrap(); + assert_eq!(empty.as_str(), "https://example.com/sync/storage/foo"); + let batch_start = build_collection_request_url( + base.clone(), + &CollectionRequest::new("bar") + .batch(Some("true".into())) + .commit(false), + ) + .unwrap(); + assert_eq!( + batch_start.as_str(), + "https://example.com/sync/storage/bar?batch=true" + ); + let batch_commit = build_collection_request_url( + base.clone(), + &CollectionRequest::new("asdf") + .batch(Some("1234abc".into())) + .commit(true), + ) + .unwrap(); + assert_eq!( + batch_commit.as_str(), + "https://example.com/sync/storage/asdf?batch=1234abc&commit=true" + ); + + let idreq = build_collection_request_url( + base.clone(), + &CollectionRequest::new("wutang").full().ids(&["rza", "gza"]), + ) + .unwrap(); + assert_eq!( + idreq.as_str(), + "https://example.com/sync/storage/wutang?full=1&ids=rza%2Cgza" + ); + + let complex = build_collection_request_url( + base, + &CollectionRequest::new("specific") + .full() + .limit(10) + .sort_by(RequestOrder::Oldest) + .older_than(ServerTimestamp(9_876_540)) + .newer_than(ServerTimestamp(1_234_560)), + ) + .unwrap(); + assert_eq!(complex.as_str(), + "https://example.com/sync/storage/specific?full=1&limit=10&older=9876.54&newer=1234.56&sort=oldest"); + } +} diff --git a/third_party/rust/sync15/src/client/sync.rs b/third_party/rust/sync15/src/client/sync.rs new file mode 100644 index 0000000000..808dae9c79 --- /dev/null +++ b/third_party/rust/sync15/src/client/sync.rs @@ -0,0 +1,105 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use super::coll_state::LocalCollStateMachine; +use super::coll_update::CollectionUpdate; +use super::state::GlobalState; +use super::storage_client::Sync15StorageClient; +use crate::clients_engine; +use crate::engine::{IncomingChangeset, SyncEngine}; +use crate::error::Error; +use crate::telemetry; +use crate::KeyBundle; +use interrupt_support::Interruptee; + +#[allow(clippy::too_many_arguments)] +pub fn synchronize_with_clients_engine( + client: &Sync15StorageClient, + global_state: &GlobalState, + root_sync_key: &KeyBundle, + clients: Option<&clients_engine::Engine<'_>>, + engine: &dyn SyncEngine, + fully_atomic: bool, + telem_engine: &mut telemetry::Engine, + interruptee: &dyn Interruptee, +) -> Result<(), Error> { + let collection = engine.collection_name(); + log::info!("Syncing collection {}", collection); + + // our global state machine is ready - get the collection machine going. + let mut coll_state = + match LocalCollStateMachine::get_state(engine, global_state, root_sync_key)? { + Some(coll_state) => coll_state, + None => { + // XXX - this is either "error" or "declined". + log::warn!( + "can't setup for the {} collection - hopefully it works later", + collection + ); + return Ok(()); + } + }; + + if let Some(clients) = clients { + engine.prepare_for_sync(&|| clients.get_client_data())?; + } + + let collection_requests = engine.get_collection_requests(coll_state.last_modified)?; + let incoming = if collection_requests.is_empty() { + log::info!("skipping incoming for {} - not needed.", collection); + vec![IncomingChangeset::new(collection, coll_state.last_modified)] + } else { + assert_eq!(collection_requests.last().unwrap().collection, collection); + + let count = collection_requests.len(); + collection_requests + .into_iter() + .enumerate() + .map(|(idx, collection_request)| { + interruptee.err_if_interrupted()?; + let incoming_changes = + super::fetch_incoming(client, &mut coll_state, &collection_request)?; + + log::info!( + "Downloaded {} remote changes (request {} of {})", + incoming_changes.changes.len(), + idx, + count, + ); + Ok(incoming_changes) + }) + .collect::, Error>>()? + }; + + let new_timestamp = incoming.last().expect("must have >= 1").timestamp; + let mut outgoing = engine.apply_incoming(incoming, telem_engine)?; + + interruptee.err_if_interrupted()?; + // Bump the timestamps now just incase the upload fails. + // xxx - duplication below smells wrong + outgoing.timestamp = new_timestamp; + coll_state.last_modified = new_timestamp; + + log::info!("Uploading {} outgoing changes", outgoing.changes.len()); + let upload_info = + CollectionUpdate::new_from_changeset(client, &coll_state, outgoing, fully_atomic)? + .upload()?; + + log::info!( + "Upload success ({} records success, {} records failed)", + upload_info.successful_ids.len(), + upload_info.failed_ids.len() + ); + // ideally we'd report this per-batch, but for now, let's just report it + // as a total. + let mut telem_outgoing = telemetry::EngineOutgoing::new(); + telem_outgoing.sent(upload_info.successful_ids.len() + upload_info.failed_ids.len()); + telem_outgoing.failed(upload_info.failed_ids.len()); + telem_engine.outgoing(telem_outgoing); + + engine.sync_finished(upload_info.modified_timestamp, upload_info.successful_ids)?; + + log::info!("Sync finished!"); + Ok(()) +} diff --git a/third_party/rust/sync15/src/client/sync_multiple.rs b/third_party/rust/sync15/src/client/sync_multiple.rs new file mode 100644 index 0000000000..79ddceff3c --- /dev/null +++ b/third_party/rust/sync15/src/client/sync_multiple.rs @@ -0,0 +1,493 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// This helps you perform a sync of multiple engines and helps you manage +// global and local state between syncs. + +use super::state::{EngineChangesNeeded, GlobalState, PersistedGlobalState, SetupStateMachine}; +use super::status::{ServiceStatus, SyncResult}; +use super::storage_client::{BackoffListener, Sync15StorageClient, Sync15StorageClientInit}; +use crate::clients_engine::{self, CommandProcessor, CLIENTS_TTL_REFRESH}; +use crate::engine::{EngineSyncAssociation, SyncEngine}; +use crate::error::Error; +use crate::telemetry; +use crate::KeyBundle; +use interrupt_support::Interruptee; +use std::collections::HashMap; +use std::mem; +use std::result; +use std::time::{Duration, SystemTime}; + +/// Info about the client to use. We reuse the client unless +/// we discover the client_init has changed, in which case we re-create one. +#[derive(Debug)] +struct ClientInfo { + // the client_init used to create `client`. + client_init: Sync15StorageClientInit, + // the client (our tokenserver state machine state, and our http library's state) + client: Sync15StorageClient, +} + +impl ClientInfo { + fn new(ci: &Sync15StorageClientInit) -> Result { + Ok(Self { + client_init: ci.clone(), + client: Sync15StorageClient::new(ci.clone())?, + }) + } +} + +/// Info we want callers to engine *in memory* for us so that subsequent +/// syncs are faster. This should never be persisted to storage as it holds +/// sensitive information, such as the sync decryption keys. +#[derive(Debug, Default)] +pub struct MemoryCachedState { + last_client_info: Option, + last_global_state: Option, + // These are just engined in memory, as persisting an invalid value far in the + // future has the potential to break sync for good. + next_sync_after: Option, + next_client_refresh_after: Option, +} + +impl MemoryCachedState { + // Called we notice the cached state is stale. + pub fn clear_sensitive_info(&mut self) { + self.last_client_info = None; + self.last_global_state = None; + // Leave the backoff time, as there's no reason to think it's not still + // true. + } + pub fn get_next_sync_after(&self) -> Option { + self.next_sync_after + } + pub fn should_refresh_client(&self) -> bool { + match self.next_client_refresh_after { + Some(t) => SystemTime::now() > t, + None => true, + } + } + pub fn note_client_refresh(&mut self) { + self.next_client_refresh_after = + Some(SystemTime::now() + Duration::from_secs(CLIENTS_TTL_REFRESH)); + } +} + +/// Sync multiple engines +/// * `engines` - The engines to sync +/// * `persisted_global_state` - The global state to use, or None if never +/// before provided. At the end of the sync, and even when the sync fails, +/// the value in this cell should be persisted to permanent storage and +/// provided next time the sync is called. +/// * `last_client_info` - The client state to use, or None if never before +/// provided. At the end of the sync, the value should be persisted +/// *in memory only* - it should not be persisted to disk. +/// * `storage_init` - Information about how the sync http client should be +/// configured. +/// * `root_sync_key` - The KeyBundle used for encryption. +/// +/// Returns a map, keyed by name and holding an error value - if any engine +/// fails, the sync will continue on to other engines, but the error will be +/// places in this map. The absence of a name in the map implies the engine +/// succeeded. +pub fn sync_multiple( + engines: &[&dyn SyncEngine], + persisted_global_state: &mut Option, + mem_cached_state: &mut MemoryCachedState, + storage_init: &Sync15StorageClientInit, + root_sync_key: &KeyBundle, + interruptee: &dyn Interruptee, + req_info: Option>, +) -> SyncResult { + sync_multiple_with_command_processor( + None, + engines, + persisted_global_state, + mem_cached_state, + storage_init, + root_sync_key, + interruptee, + req_info, + ) +} + +/// Like `sync_multiple`, but specifies an optional command processor to handle +/// commands from the clients collection. This function is called by the sync +/// manager, which provides its own processor. +#[allow(clippy::too_many_arguments)] +pub fn sync_multiple_with_command_processor( + command_processor: Option<&dyn CommandProcessor>, + engines: &[&dyn SyncEngine], + persisted_global_state: &mut Option, + mem_cached_state: &mut MemoryCachedState, + storage_init: &Sync15StorageClientInit, + root_sync_key: &KeyBundle, + interruptee: &dyn Interruptee, + req_info: Option>, +) -> SyncResult { + log::info!("Syncing {} engines", engines.len()); + let mut sync_result = SyncResult { + service_status: ServiceStatus::OtherError, + result: Ok(()), + declined: None, + next_sync_after: None, + engine_results: HashMap::with_capacity(engines.len()), + telemetry: telemetry::SyncTelemetryPing::new(), + }; + let backoff = super::storage_client::new_backoff_listener(); + let req_info = req_info.unwrap_or_default(); + let driver = SyncMultipleDriver { + command_processor, + engines, + storage_init, + interruptee, + engines_to_state_change: req_info.engines_to_state_change, + backoff: backoff.clone(), + root_sync_key, + result: &mut sync_result, + persisted_global_state, + mem_cached_state, + saw_auth_error: false, + ignore_soft_backoff: req_info.is_user_action, + }; + match driver.sync() { + Ok(()) => { + log::debug!( + "sync was successful, final status={:?}", + sync_result.service_status + ); + } + Err(e) => { + log::warn!( + "sync failed: {}, final status={:?}", + e, + sync_result.service_status, + ); + sync_result.result = Err(e); + } + } + // Respect `backoff` value when computing the next sync time even if we were + // ignoring it during the sync + sync_result.set_sync_after(backoff.get_required_wait(false).unwrap_or_default()); + mem_cached_state.next_sync_after = sync_result.next_sync_after; + log::trace!("Sync result: {:?}", sync_result); + sync_result +} + +/// This is essentially a bag of information that the sync manager knows, but +/// otherwise we won't. It should probably be rethought if it gains many more +/// fields. +#[derive(Debug, Default)] +pub struct SyncRequestInfo<'a> { + pub engines_to_state_change: Option<&'a HashMap>, + pub is_user_action: bool, +} + +// The sync multiple driver +struct SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> { + command_processor: Option<&'info dyn CommandProcessor>, + engines: &'info [&'info dyn SyncEngine], + storage_init: &'info Sync15StorageClientInit, + root_sync_key: &'info KeyBundle, + interruptee: &'info dyn Interruptee, + backoff: BackoffListener, + engines_to_state_change: Option<&'info HashMap>, + result: &'res mut SyncResult, + persisted_global_state: &'pgs mut Option, + mem_cached_state: &'mcs mut MemoryCachedState, + ignore_soft_backoff: bool, + saw_auth_error: bool, +} + +impl<'info, 'res, 'pgs, 'mcs> SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> { + /// The actual worker for sync_multiple. + fn sync(mut self) -> result::Result<(), Error> { + log::info!("Loading/initializing persisted state"); + let mut pgs = self.prepare_persisted_state(); + + log::info!("Preparing client info"); + let client_info = self.prepare_client_info()?; + + if self.was_interrupted() { + return Ok(()); + } + + log::info!("Entering sync state machine"); + // Advance the state machine to the point where it can perform a full + // sync. This may involve uploading meta/global, crypto/keys etc. + let mut global_state = self.run_state_machine(&client_info, &mut pgs)?; + + if self.was_interrupted() { + return Ok(()); + } + + // Set the service status to OK here - we may adjust it based on an individual + // engine failing. + self.result.service_status = ServiceStatus::Ok; + + let clients_engine = if let Some(command_processor) = self.command_processor { + log::info!("Synchronizing clients engine"); + let should_refresh = self.mem_cached_state.should_refresh_client(); + let mut engine = clients_engine::Engine::new(command_processor, self.interruptee); + if let Err(e) = engine.sync( + &client_info.client, + &global_state, + self.root_sync_key, + should_refresh, + ) { + // Record telemetry with the error just in case... + let mut telem_sync = telemetry::SyncTelemetry::new(); + let mut telem_engine = telemetry::Engine::new("clients"); + telem_engine.failure(&e); + telem_sync.engine(telem_engine); + self.result.service_status = ServiceStatus::from_err(&e); + + // ...And bail, because a clients engine sync failure is fatal. + return Err(e); + } + // We don't record telemetry for successful clients engine + // syncs, since we only keep client records in memory, we + // expect the counts to be the same most times, and a + // failure aborts the entire sync. + if self.was_interrupted() { + return Ok(()); + } + self.mem_cached_state.note_client_refresh(); + Some(engine) + } else { + None + }; + + log::info!("Synchronizing engines"); + + let telem_sync = + self.sync_engines(&client_info, &mut global_state, clients_engine.as_ref()); + self.result.telemetry.sync(telem_sync); + + log::info!("Finished syncing engines."); + + if !self.saw_auth_error { + log::trace!("Updating persisted global state"); + self.mem_cached_state.last_client_info = Some(client_info); + self.mem_cached_state.last_global_state = Some(global_state); + } + + Ok(()) + } + + fn was_interrupted(&mut self) -> bool { + if self.interruptee.was_interrupted() { + log::info!("Interrupted, bailing out"); + self.result.service_status = ServiceStatus::Interrupted; + true + } else { + false + } + } + + fn sync_engines( + &mut self, + client_info: &ClientInfo, + global_state: &mut GlobalState, + clients: Option<&clients_engine::Engine<'_>>, + ) -> telemetry::SyncTelemetry { + let mut telem_sync = telemetry::SyncTelemetry::new(); + for engine in self.engines { + let name = engine.collection_name(); + if self + .backoff + .get_required_wait(self.ignore_soft_backoff) + .is_some() + { + log::warn!("Got backoff, bailing out of sync early"); + break; + } + if global_state.global.declined.iter().any(|e| e == &*name) { + log::info!("The {} engine is declined. Skipping", name); + continue; + } + log::info!("Syncing {} engine!", name); + + let mut telem_engine = telemetry::Engine::new(&*name); + let result = super::sync::synchronize_with_clients_engine( + &client_info.client, + global_state, + self.root_sync_key, + clients, + *engine, + true, + &mut telem_engine, + self.interruptee, + ); + + match result { + Ok(()) => log::info!("Sync of {} was successful!", name), + Err(ref e) => { + log::warn!("Sync of {} failed! {:?}", name, e); + let this_status = ServiceStatus::from_err(e); + // The only error which forces us to discard our state is an + // auth error. + self.saw_auth_error = + self.saw_auth_error || this_status == ServiceStatus::AuthenticationError; + telem_engine.failure(e); + // If the failure from the engine looks like anything other than + // a "engine error" we don't bother trying the others. + if this_status != ServiceStatus::OtherError { + telem_sync.engine(telem_engine); + self.result.engine_results.insert(name.into(), result); + self.result.service_status = this_status; + break; + } + } + } + telem_sync.engine(telem_engine); + self.result.engine_results.insert(name.into(), result); + if self.was_interrupted() { + break; + } + } + telem_sync + } + + fn run_state_machine( + &mut self, + client_info: &ClientInfo, + pgs: &mut PersistedGlobalState, + ) -> result::Result { + let last_state = mem::replace(&mut self.mem_cached_state.last_global_state, None); + + let mut state_machine = SetupStateMachine::for_full_sync( + &client_info.client, + self.root_sync_key, + pgs, + self.engines_to_state_change, + self.interruptee, + ); + + log::info!("Advancing state machine to ready (full)"); + let res = state_machine.run_to_ready(last_state); + // Grab this now even though we don't need it until later to avoid a + // lifetime issue + let changes = state_machine.changes_needed.take(); + // The state machine might have updated our persisted_global_state, so + // update the caller's repr of it. + *self.persisted_global_state = Some(serde_json::to_string(&pgs)?); + + // Now that we've gone through the state machine, engine the declined list in + // the sync_result + self.result.declined = Some(pgs.get_declined().to_vec()); + log::debug!( + "Declined engines list after state machine set to: {:?}", + self.result.declined, + ); + + if let Some(c) = changes { + self.wipe_or_reset_engines(c, &client_info.client)?; + } + let state = match res { + Err(e) => { + self.result.service_status = ServiceStatus::from_err(&e); + return Err(e); + } + Ok(state) => state, + }; + self.result.telemetry.uid(client_info.client.hashed_uid()?); + // As for client_info, put None back now so we start from scratch on error. + self.mem_cached_state.last_global_state = None; + Ok(state) + } + + fn wipe_or_reset_engines( + &mut self, + changes: EngineChangesNeeded, + client: &Sync15StorageClient, + ) -> result::Result<(), Error> { + if changes.local_resets.is_empty() && changes.remote_wipes.is_empty() { + return Ok(()); + } + for e in &changes.remote_wipes { + log::info!("Engine {:?} just got disabled locally, wiping server", e); + client.wipe_remote_engine(e)?; + } + + for s in self.engines { + let name = s.collection_name(); + if changes.local_resets.contains(&*name) { + log::info!("Resetting engine {}, as it was declined remotely", name); + s.reset(&EngineSyncAssociation::Disconnected)?; + } + } + + Ok(()) + } + + fn prepare_client_info(&mut self) -> result::Result { + let mut client_info = match mem::replace(&mut self.mem_cached_state.last_client_info, None) + { + Some(client_info) => { + // if our storage_init has changed it probably means the user has + // changed, courtesy of the 'kid' in the structure. Thus, we can't + // reuse the client or the memory cached state. We do keep the disk + // state as currently that's only the declined list. + if client_info.client_init != *self.storage_init { + log::info!("Discarding all state as the account might have changed"); + *self.mem_cached_state = MemoryCachedState::default(); + ClientInfo::new(self.storage_init)? + } else { + log::debug!("Reusing memory-cached client_info"); + // we can reuse it (which should be the common path) + client_info + } + } + None => { + log::debug!("mem_cached_state was stale or missing, need setup"); + // We almost certainly have no other state here, but to be safe, we + // throw away any memory state we do have. + self.mem_cached_state.clear_sensitive_info(); + ClientInfo::new(self.storage_init)? + } + }; + // Ensure we use the correct listener here rather than on all the branches + // above, since it seems less error prone. + client_info.client.backoff = self.backoff.clone(); + Ok(client_info) + } + + fn prepare_persisted_state(&mut self) -> PersistedGlobalState { + // Note that any failure to use a persisted state means we also decline + // to use our memory cached state, so that we fully rebuild that + // persisted state for next time. + match self.persisted_global_state { + Some(persisted_string) if !persisted_string.is_empty() => { + match serde_json::from_str::(persisted_string) { + Ok(state) => { + log::trace!("Read persisted state: {:?}", state); + // Note that we don't set `result.declined` from the + // data in state - it remains None, which explicitly + // indicates "we don't have updated info". + state + } + _ => { + // Don't log the error since it might contain sensitive + // info (although currently it only contains the declined engines list) + error_support::report_error!( + "sync15-prepare-persisted-state", + "Failed to parse PersistedGlobalState from JSON! Falling back to default" + ); + *self.mem_cached_state = MemoryCachedState::default(); + PersistedGlobalState::default() + } + } + } + _ => { + log::info!( + "The application didn't give us persisted state - \ + this is only expected on the very first run for a given user." + ); + *self.mem_cached_state = MemoryCachedState::default(); + PersistedGlobalState::default() + } + } + } +} diff --git a/third_party/rust/sync15/src/client/token.rs b/third_party/rust/sync15/src/client/token.rs new file mode 100644 index 0000000000..b416c0c12a --- /dev/null +++ b/third_party/rust/sync15/src/client/token.rs @@ -0,0 +1,602 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::{self, Error as ErrorKind, Result}; +use crate::ServerTimestamp; +use rc_crypto::hawk; +use serde_derive::*; +use std::borrow::{Borrow, Cow}; +use std::cell::RefCell; +use std::fmt; +use std::time::{Duration, SystemTime}; +use url::Url; +use viaduct::{header_names, Request}; + +const RETRY_AFTER_DEFAULT_MS: u64 = 10000; + +// The TokenserverToken is the token as received directly from the token server +// and deserialized from JSON. +#[derive(Deserialize, Clone, PartialEq, Eq)] +struct TokenserverToken { + id: String, + key: String, + api_endpoint: String, + uid: u64, + duration: u64, + hashed_fxa_uid: String, +} + +impl std::fmt::Debug for TokenserverToken { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TokenserverToken") + .field("api_endpoint", &self.api_endpoint) + .field("uid", &self.uid) + .field("duration", &self.duration) + .field("hashed_fxa_uid", &self.hashed_fxa_uid) + .finish() + } +} + +// The struct returned by the TokenFetcher - the token itself and the +// server timestamp. +struct TokenFetchResult { + token: TokenserverToken, + server_timestamp: ServerTimestamp, +} + +// The trait for fetching tokens - we'll provide a "real" implementation but +// tests will re-implement it. +trait TokenFetcher { + fn fetch_token(&self) -> crate::Result; + // We allow the trait to tell us what the time is so tests can get funky. + fn now(&self) -> SystemTime; +} + +// Our "real" token fetcher, implementing the TokenFetcher trait, which hits +// the token server +#[derive(Debug)] +struct TokenServerFetcher { + // The stuff needed to fetch a token. + server_url: Url, + access_token: String, + key_id: String, +} + +fn fixup_server_url(mut url: Url) -> url::Url { + // The given `url` is the end-point as returned by .well-known/fxa-client-configuration, + // or as directly specified by self-hosters. As a result, it may or may not have + // the sync 1.5 suffix of "/1.0/sync/1.5", so add it on here if it does not. + if url.as_str().ends_with("1.0/sync/1.5") { + // ok! + } else if url.as_str().ends_with("1.0/sync/1.5/") { + // Shouldn't ever be Err() here, but the result is `Result` + // and I don't want to unwrap or add a new error type just for PathSegmentsMut failing. + if let Ok(mut path) = url.path_segments_mut() { + path.pop(); + } + } else { + // We deliberately don't use `.join()` here in order to preserve all path components. + // For example, "http://example.com/token" should produce "http://example.com/token/1.0/sync/1.5" + // but using `.join()` would produce "http://example.com/1.0/sync/1.5". + if let Ok(mut path) = url.path_segments_mut() { + path.pop_if_empty(); + path.extend(&["1.0", "sync", "1.5"]); + } + }; + url +} + +impl TokenServerFetcher { + fn new(base_url: Url, access_token: String, key_id: String) -> TokenServerFetcher { + TokenServerFetcher { + server_url: fixup_server_url(base_url), + access_token, + key_id, + } + } +} + +impl TokenFetcher for TokenServerFetcher { + fn fetch_token(&self) -> Result { + log::debug!("Fetching token from {}", self.server_url); + let resp = Request::get(self.server_url.clone()) + .header( + header_names::AUTHORIZATION, + format!("Bearer {}", self.access_token), + )? + .header(header_names::X_KEYID, self.key_id.clone())? + .send()?; + + if !resp.is_success() { + log::warn!("Non-success status when fetching token: {}", resp.status); + // TODO: the body should be JSON and contain a status parameter we might need? + log::trace!(" Response body {}", resp.text()); + // XXX - shouldn't we "chain" these errors - ie, a BackoffError could + // have a TokenserverHttpError as its cause? + if let Some(res) = resp.headers.get_as::(header_names::RETRY_AFTER) { + let ms = res + .ok() + .map_or(RETRY_AFTER_DEFAULT_MS, |f| (f * 1000f64) as u64); + let when = self.now() + Duration::from_millis(ms); + return Err(ErrorKind::BackoffError(when)); + } + let status = resp.status; + return Err(ErrorKind::TokenserverHttpError(status)); + } + + let token: TokenserverToken = resp.json()?; + let server_timestamp = resp + .headers + .try_get::(header_names::X_TIMESTAMP) + .ok_or(ErrorKind::MissingServerTimestamp)?; + Ok(TokenFetchResult { + token, + server_timestamp, + }) + } + + fn now(&self) -> SystemTime { + SystemTime::now() + } +} + +// The context stored by our TokenProvider when it has a TokenState::Token +// state. +struct TokenContext { + token: TokenserverToken, + credentials: hawk::Credentials, + server_timestamp: ServerTimestamp, + valid_until: SystemTime, +} + +// hawk::Credentials doesn't implement debug -_- +impl fmt::Debug for TokenContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> ::std::result::Result<(), fmt::Error> { + f.debug_struct("TokenContext") + .field("token", &self.token) + .field("credentials", &"(omitted)") + .field("server_timestamp", &self.server_timestamp) + .field("valid_until", &self.valid_until) + .finish() + } +} + +impl TokenContext { + fn new( + token: TokenserverToken, + credentials: hawk::Credentials, + server_timestamp: ServerTimestamp, + valid_until: SystemTime, + ) -> Self { + Self { + token, + credentials, + server_timestamp, + valid_until, + } + } + + fn is_valid(&self, now: SystemTime) -> bool { + // We could consider making the duration a little shorter - if it + // only has 1 second validity there seems a reasonable chance it will + // have expired by the time it gets presented to the remote that wants + // it. + // Either way though, we will eventually need to handle a token being + // rejected as a non-fatal error and recover, so maybe we don't care? + now < self.valid_until + } + + fn authorization(&self, req: &Request) -> Result { + let url = &req.url; + + let path_and_query = match url.query() { + None => Cow::from(url.path()), + Some(qs) => Cow::from(format!("{}?{}", url.path(), qs)), + }; + + let host = url + .host_str() + .ok_or_else(|| ErrorKind::UnacceptableUrl("Storage URL has no host".into()))?; + + // Known defaults exist for https? (among others), so this should be impossible + let port = url.port_or_known_default().ok_or_else(|| { + ErrorKind::UnacceptableUrl( + "Storage URL has no port and no default port is known for the protocol".into(), + ) + })?; + + let header = + hawk::RequestBuilder::new(req.method.as_str(), host, port, path_and_query.borrow()) + .request() + .make_header(&self.credentials)?; + + Ok(format!("Hawk {}", header)) + } +} + +// The state our TokenProvider holds to reflect the state of the token. +#[derive(Debug)] +enum TokenState { + // We've never fetched a token. + NoToken, + // Have a token and last we checked it remained valid. + Token(TokenContext), + // We failed to fetch a token. First elt is the error, second elt is + // the api_endpoint we had before we failed to fetch a new token (or + // None if the very first attempt at fetching a token failed) + Failed(Option, Option), + // Previously failed and told to back-off for SystemTime duration. Second + // elt is the api_endpoint we had before we hit the backoff error. + // XXX - should we roll Backoff and Failed together? + Backoff(SystemTime, Option), + // api_endpoint changed - we are never going to get a token nor move out + // of this state. + NodeReassigned, +} + +/// The generic TokenProvider implementation - long lived and fetches tokens +/// on demand (eg, when first needed, or when an existing one expires.) +#[derive(Debug)] +struct TokenProviderImpl { + fetcher: TF, + // Our token state (ie, whether we have a token, and if not, why not) + current_state: RefCell, +} + +impl TokenProviderImpl { + fn new(fetcher: TF) -> Self { + // We check this at the real entrypoint of the application, but tests + // can/do bypass that, so check this here too. + rc_crypto::ensure_initialized(); + TokenProviderImpl { + fetcher, + current_state: RefCell::new(TokenState::NoToken), + } + } + + // Uses our fetcher to grab a new token and if successfull, derives other + // info from that token into a usable TokenContext. + fn fetch_context(&self) -> Result { + let result = self.fetcher.fetch_token()?; + let token = result.token; + let valid_until = SystemTime::now() + Duration::from_secs(token.duration); + + let credentials = hawk::Credentials { + id: token.id.clone(), + key: hawk::Key::new(token.key.as_bytes(), hawk::SHA256)?, + }; + + Ok(TokenContext::new( + token, + credentials, + result.server_timestamp, + valid_until, + )) + } + + // Attempt to fetch a new token and return a new state reflecting that + // operation. If it worked a TokenState will be returned, but errors may + // cause other states. + fn fetch_token(&self, previous_endpoint: Option<&str>) -> TokenState { + match self.fetch_context() { + Ok(tc) => { + // We got a new token - check that the endpoint is the same + // as a previous endpoint we saw (if any) + match previous_endpoint { + Some(prev) => { + if prev == tc.token.api_endpoint { + TokenState::Token(tc) + } else { + log::warn!( + "api_endpoint changed from {} to {}", + prev, + tc.token.api_endpoint + ); + TokenState::NodeReassigned + } + } + None => { + // Never had an api_endpoint in the past, so this is OK. + TokenState::Token(tc) + } + } + } + Err(e) => { + // Early to avoid nll issues... + if let ErrorKind::BackoffError(be) = e { + return TokenState::Backoff(be, previous_endpoint.map(ToString::to_string)); + } + TokenState::Failed(Some(e), previous_endpoint.map(ToString::to_string)) + } + } + } + + // Given the state we are currently in, return a new current state. + // Returns None if the current state should be used (eg, if we are + // holding a token that remains valid) or Some() if the state has changed + // (which may have changed to a state with a token or an error state) + fn advance_state(&self, state: &TokenState) -> Option { + match state { + TokenState::NoToken => Some(self.fetch_token(None)), + TokenState::Failed(_, existing_endpoint) => { + Some(self.fetch_token(existing_endpoint.as_ref().map(String::as_str))) + } + TokenState::Token(existing_context) => { + if existing_context.is_valid(self.fetcher.now()) { + None + } else { + Some(self.fetch_token(Some(existing_context.token.api_endpoint.as_str()))) + } + } + TokenState::Backoff(ref until, ref existing_endpoint) => { + if let Ok(remaining) = until.duration_since(self.fetcher.now()) { + log::debug!("enforcing existing backoff - {:?} remains", remaining); + None + } else { + // backoff period is over + Some(self.fetch_token(existing_endpoint.as_ref().map(String::as_str))) + } + } + TokenState::NodeReassigned => { + // We never leave this state. + None + } + } + } + + fn with_token(&self, func: F) -> Result + where + F: FnOnce(&TokenContext) -> Result, + { + // first get a mutable ref to our existing state, advance to the + // state we will use, then re-stash that state for next time. + let state: &mut TokenState = &mut self.current_state.borrow_mut(); + if let Some(new_state) = self.advance_state(state) { + *state = new_state; + } + + // Now re-fetch the state we should use for this call - if it's + // anything other than TokenState::Token we will fail. + match state { + TokenState::NoToken => { + // it should be impossible to get here. + panic!("Can't be in NoToken state after advancing"); + } + TokenState::Token(ref token_context) => { + // make the call. + func(token_context) + } + TokenState::Failed(e, _) => { + // We swap the error out of the state enum and return it. + Err(e.take().unwrap()) + } + TokenState::NodeReassigned => { + // this is unrecoverable. + Err(ErrorKind::StorageResetError) + } + TokenState::Backoff(ref remaining, _) => Err(ErrorKind::BackoffError(*remaining)), + } + } + + fn hashed_uid(&self) -> Result { + self.with_token(|ctx| Ok(ctx.token.hashed_fxa_uid.clone())) + } + + fn authorization(&self, req: &Request) -> Result { + self.with_token(|ctx| ctx.authorization(req)) + } + + fn api_endpoint(&self) -> Result { + self.with_token(|ctx| Ok(ctx.token.api_endpoint.clone())) + } + // TODO: we probably want a "drop_token/context" type method so that when + // using a token with some validity fails the caller can force a new one + // (in which case the new token request will probably fail with a 401) +} + +// The public concrete object exposed by this module +#[derive(Debug)] +pub struct TokenProvider { + imp: TokenProviderImpl, +} + +impl TokenProvider { + pub fn new(url: Url, access_token: String, key_id: String) -> Result { + let fetcher = TokenServerFetcher::new(url, access_token, key_id); + Ok(Self { + imp: TokenProviderImpl::new(fetcher), + }) + } + + pub fn hashed_uid(&self) -> Result { + self.imp.hashed_uid() + } + + pub fn authorization(&self, req: &Request) -> Result { + self.imp.authorization(req) + } + + pub fn api_endpoint(&self) -> Result { + self.imp.api_endpoint() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::cell::Cell; + + struct TestFetcher + where + FF: Fn() -> Result, + FN: Fn() -> SystemTime, + { + fetch: FF, + now: FN, + } + impl TokenFetcher for TestFetcher + where + FF: Fn() -> Result, + FN: Fn() -> SystemTime, + { + fn fetch_token(&self) -> Result { + (self.fetch)() + } + fn now(&self) -> SystemTime { + (self.now)() + } + } + + fn make_tsc(fetch: FF, now: FN) -> TokenProviderImpl> + where + FF: Fn() -> Result, + FN: Fn() -> SystemTime, + { + let fetcher: TestFetcher = TestFetcher { fetch, now }; + TokenProviderImpl::new(fetcher) + } + + #[test] + fn test_endpoint() { + // Use a cell to avoid the closure having a mutable ref to this scope. + let counter: Cell = Cell::new(0); + let fetch = || { + counter.set(counter.get() + 1); + Ok(TokenFetchResult { + token: TokenserverToken { + id: "id".to_string(), + key: "key".to_string(), + api_endpoint: "api_endpoint".to_string(), + uid: 1, + duration: 1000, + hashed_fxa_uid: "hash".to_string(), + }, + server_timestamp: ServerTimestamp(0i64), + }) + }; + + let tsc = make_tsc(fetch, SystemTime::now); + + let e = tsc.api_endpoint().expect("should work"); + assert_eq!(e, "api_endpoint".to_string()); + assert_eq!(counter.get(), 1); + + let e2 = tsc.api_endpoint().expect("should work"); + assert_eq!(e2, "api_endpoint".to_string()); + // should not have re-fetched. + assert_eq!(counter.get(), 1); + } + + #[test] + fn test_backoff() { + let counter: Cell = Cell::new(0); + let fetch = || { + counter.set(counter.get() + 1); + let when = SystemTime::now() + Duration::from_millis(10000); + Err(ErrorKind::BackoffError(when)) + }; + let now: Cell = Cell::new(SystemTime::now()); + let tsc = make_tsc(fetch, || now.get()); + + tsc.api_endpoint().expect_err("should bail"); + // XXX - check error type. + assert_eq!(counter.get(), 1); + // try and get another token - should not re-fetch as backoff is still + // in progress. + tsc.api_endpoint().expect_err("should bail"); + assert_eq!(counter.get(), 1); + + // Advance the clock. + now.set(now.get() + Duration::new(20, 0)); + + // Our token fetch mock is still returning a backoff error, so we + // still fail, but should have re-hit the fetch function. + tsc.api_endpoint().expect_err("should bail"); + assert_eq!(counter.get(), 2); + } + + #[test] + fn test_validity() { + let counter: Cell = Cell::new(0); + let fetch = || { + counter.set(counter.get() + 1); + Ok(TokenFetchResult { + token: TokenserverToken { + id: "id".to_string(), + key: "key".to_string(), + api_endpoint: "api_endpoint".to_string(), + uid: 1, + duration: 10, + hashed_fxa_uid: "hash".to_string(), + }, + server_timestamp: ServerTimestamp(0i64), + }) + }; + let now: Cell = Cell::new(SystemTime::now()); + let tsc = make_tsc(fetch, || now.get()); + + tsc.api_endpoint().expect("should get a valid token"); + assert_eq!(counter.get(), 1); + + // try and get another token - should not re-fetch as the old one + // remains valid. + tsc.api_endpoint().expect("should reuse existing token"); + assert_eq!(counter.get(), 1); + + // Advance the clock. + now.set(now.get() + Duration::new(20, 0)); + + // We should discard our token and fetch a new one. + tsc.api_endpoint().expect("should re-fetch"); + assert_eq!(counter.get(), 2); + } + + #[test] + fn test_server_url() { + assert_eq!( + fixup_server_url( + Url::parse("https://token.services.mozilla.com/1.0/sync/1.5").unwrap() + ) + .as_str(), + "https://token.services.mozilla.com/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url( + Url::parse("https://token.services.mozilla.com/1.0/sync/1.5/").unwrap() + ) + .as_str(), + "https://token.services.mozilla.com/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url(Url::parse("https://token.services.mozilla.com").unwrap()).as_str(), + "https://token.services.mozilla.com/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url(Url::parse("https://token.services.mozilla.com/").unwrap()).as_str(), + "https://token.services.mozilla.com/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url( + Url::parse("https://selfhosted.example.com/token/1.0/sync/1.5").unwrap() + ) + .as_str(), + "https://selfhosted.example.com/token/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url( + Url::parse("https://selfhosted.example.com/token/1.0/sync/1.5/").unwrap() + ) + .as_str(), + "https://selfhosted.example.com/token/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url(Url::parse("https://selfhosted.example.com/token/").unwrap()).as_str(), + "https://selfhosted.example.com/token/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url(Url::parse("https://selfhosted.example.com/token").unwrap()).as_str(), + "https://selfhosted.example.com/token/1.0/sync/1.5" + ); + } +} diff --git a/third_party/rust/sync15/src/client/util.rs b/third_party/rust/sync15/src/client/util.rs new file mode 100644 index 0000000000..01fff77afa --- /dev/null +++ b/third_party/rust/sync15/src/client/util.rs @@ -0,0 +1,102 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicU32, Ordering}; + +/// Finds the maximum of the current value and the argument `val`, and sets the +/// new value to the result. +/// +/// Note: `AtomicFoo::fetch_max` is unstable, and can't really be implemented as +/// a single atomic operation from outside the stdlib ;-; +pub(crate) fn atomic_update_max(v: &AtomicU32, new: u32) { + // For loads (and the compare_exchange_weak second ordering argument) this + // is too strong, we could probably get away with Acquire (or maybe Relaxed + // because we don't need the result?). In either case, this fn isn't called + // from a hot spot so whatever. + let mut cur = v.load(Ordering::SeqCst); + while cur < new { + // we're already handling the failure case so there's no reason not to + // use _weak here. + match v.compare_exchange_weak(cur, new, Ordering::SeqCst, Ordering::SeqCst) { + Ok(_) => { + // Success. + break; + } + Err(new_cur) => { + // Interrupted, keep trying. + cur = new_cur + } + } + } +} + +// Slight wrappers around the builtin methods for doing this. +pub(crate) fn set_union(a: &HashSet, b: &HashSet) -> HashSet { + a.union(b).cloned().collect() +} + +pub(crate) fn set_difference(a: &HashSet, b: &HashSet) -> HashSet { + a.difference(b).cloned().collect() +} + +pub(crate) fn set_intersection(a: &HashSet, b: &HashSet) -> HashSet { + a.intersection(b).cloned().collect() +} + +pub(crate) fn partition_by_value(v: &HashMap) -> (HashSet, HashSet) { + let mut true_: HashSet = HashSet::new(); + let mut false_: HashSet = HashSet::new(); + for (s, val) in v { + if *val { + true_.insert(s.clone()); + } else { + false_.insert(s.clone()); + } + } + (true_, false_) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_set_ops() { + fn hash_set(s: &[&str]) -> HashSet { + s.iter() + .copied() + .map(ToOwned::to_owned) + .collect::>() + } + + assert_eq!( + set_union(&hash_set(&["a", "b", "c"]), &hash_set(&["b", "d"])), + hash_set(&["a", "b", "c", "d"]), + ); + + assert_eq!( + set_difference(&hash_set(&["a", "b", "c"]), &hash_set(&["b", "d"])), + hash_set(&["a", "c"]), + ); + assert_eq!( + set_intersection(&hash_set(&["a", "b", "c"]), &hash_set(&["b", "d"])), + hash_set(&["b"]), + ); + let m: HashMap = [ + ("foo", true), + ("bar", true), + ("baz", false), + ("quux", false), + ] + .iter() + .copied() + .map(|(a, b)| (a.to_owned(), b)) + .collect(); + assert_eq!( + partition_by_value(&m), + (hash_set(&["foo", "bar"]), hash_set(&["baz", "quux"])), + ); + } +} diff --git a/third_party/rust/sync15/src/client_types.rs b/third_party/rust/sync15/src/client_types.rs new file mode 100644 index 0000000000..14c220c1db --- /dev/null +++ b/third_party/rust/sync15/src/client_types.rs @@ -0,0 +1,101 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! This module has to be here because of some hard-to-avoid hacks done for the +//! tabs engine... See issue #2590 + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Argument to Store::prepare_for_sync. See comment there for more info. Only +/// really intended to be used by tabs engine. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ClientData { + pub local_client_id: String, + pub recent_clients: HashMap, +} + +/// Information about a remote client in the clients collection. +#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)] +pub struct RemoteClient { + pub fxa_device_id: Option, + pub device_name: String, + pub device_type: Option, +} + +/// Enumeration for the different types of device. +/// +/// Firefox Accounts and the broader Sync universe separates devices into broad categories for +/// display purposes, such as distinguishing a desktop PC from a mobile phone. + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum DeviceType { + #[serde(rename = "desktop")] + Desktop, + #[serde(rename = "mobile")] + Mobile, + #[serde(rename = "tablet")] + Tablet, + #[serde(rename = "vr")] + VR, + #[serde(rename = "tv")] + TV, + // Unknown is a bit odd - it should never be set (ie, it's never serialized) + // and exists really just so we can avoid using an Option<>. + #[serde(other)] + #[serde(skip_serializing)] // Don't you dare trying. + Unknown, +} + +#[cfg(test)] +mod device_type_tests { + use super::*; + + #[test] + fn test_serde_ser() { + assert_eq!( + serde_json::to_string(&DeviceType::Desktop).unwrap(), + "\"desktop\"" + ); + assert_eq!( + serde_json::to_string(&DeviceType::Mobile).unwrap(), + "\"mobile\"" + ); + assert_eq!( + serde_json::to_string(&DeviceType::Tablet).unwrap(), + "\"tablet\"" + ); + assert_eq!(serde_json::to_string(&DeviceType::VR).unwrap(), "\"vr\""); + assert_eq!(serde_json::to_string(&DeviceType::TV).unwrap(), "\"tv\""); + assert!(serde_json::to_string(&DeviceType::Unknown).is_err()); + } + + #[test] + fn test_serde_de() { + assert!(matches!( + serde_json::from_str::("\"desktop\"").unwrap(), + DeviceType::Desktop + )); + assert!(matches!( + serde_json::from_str::("\"mobile\"").unwrap(), + DeviceType::Mobile + )); + assert!(matches!( + serde_json::from_str::("\"tablet\"").unwrap(), + DeviceType::Tablet + )); + assert!(matches!( + serde_json::from_str::("\"vr\"").unwrap(), + DeviceType::VR + )); + assert!(matches!( + serde_json::from_str::("\"tv\"").unwrap(), + DeviceType::TV + )); + assert!(matches!( + serde_json::from_str::("\"something-else\"").unwrap(), + DeviceType::Unknown, + )); + } +} diff --git a/third_party/rust/sync15/src/clients_engine/engine.rs b/third_party/rust/sync15/src/clients_engine/engine.rs new file mode 100644 index 0000000000..f3d6242126 --- /dev/null +++ b/third_party/rust/sync15/src/clients_engine/engine.rs @@ -0,0 +1,814 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::collections::{HashMap, HashSet}; + +use crate::bso::{IncomingKind, OutgoingBso, OutgoingEnvelope}; +use crate::client::{ + CollState, CollectionKeys, CollectionUpdate, GlobalState, InfoConfiguration, + Sync15StorageClient, +}; +use crate::client_types::{ClientData, RemoteClient}; +use crate::engine::{CollectionRequest, IncomingChangeset, OutgoingChangeset}; +use crate::{error::Result, Guid, KeyBundle}; +use interrupt_support::Interruptee; + +use super::{ + record::{ClientRecord, CommandRecord}, + ser::shrink_to_fit, + Command, CommandProcessor, CommandStatus, CLIENTS_TTL, +}; + +const COLLECTION_NAME: &str = "clients"; + +/// The driver for the clients engine. Internal; split out from the `Engine` +/// struct to make testing easier. +struct Driver<'a> { + command_processor: &'a dyn CommandProcessor, + interruptee: &'a dyn Interruptee, + config: &'a InfoConfiguration, + recent_clients: HashMap, +} + +impl<'a> Driver<'a> { + fn new( + command_processor: &'a dyn CommandProcessor, + interruptee: &'a dyn Interruptee, + config: &'a InfoConfiguration, + ) -> Driver<'a> { + Driver { + command_processor, + interruptee, + config, + recent_clients: HashMap::new(), + } + } + + fn note_recent_client(&mut self, client: &ClientRecord) { + self.recent_clients.insert(client.id.clone(), client.into()); + } + + fn sync( + &mut self, + inbound: IncomingChangeset, + should_refresh_client: bool, + ) -> Result { + let mut outgoing = OutgoingChangeset::new(COLLECTION_NAME, inbound.timestamp); + outgoing.timestamp = inbound.timestamp; + + self.interruptee.err_if_interrupted()?; + let outgoing_commands = self.command_processor.fetch_outgoing_commands()?; + + let mut has_own_client_record = false; + + for bso in inbound.changes { + self.interruptee.err_if_interrupted()?; + + let content = bso.into_content(); + + let client: ClientRecord = match content.kind { + IncomingKind::Malformed => { + log::debug!("Error unpacking record"); + continue; + } + IncomingKind::Tombstone => { + log::debug!("Record has been deleted; skipping..."); + continue; + } + IncomingKind::Content(client) => client, + }; + + if client.id == self.command_processor.settings().fxa_device_id { + log::debug!("Found my record on the server"); + // If we see our own client record, apply any incoming commands, + // remove them from the list, and reupload the record. Any + // commands that we don't understand also go back in the list. + // https://github.com/mozilla/application-services/issues/1800 + // tracks if that's the right thing to do. + has_own_client_record = true; + let mut current_client_record = self.current_client_record(); + for c in &client.commands { + let status = match c.as_command() { + Some(command) => self.command_processor.apply_incoming_command(command)?, + None => CommandStatus::Unsupported, + }; + match status { + CommandStatus::Applied => {} + CommandStatus::Ignored => { + log::debug!("Ignored command {:?}", c); + } + CommandStatus::Unsupported => { + log::warn!("Don't know how to apply command {:?}", c); + current_client_record.commands.push(c.clone()); + } + } + } + + // The clients collection has a hard limit on the payload size, + // after which the server starts rejecting our records. Large + // command lists can cause us to exceed this, so we truncate + // the list. + shrink_to_fit( + &mut current_client_record.commands, + self.memcache_max_record_payload_size(), + )?; + + // Add the new client record to our map of recently synced + // clients, so that downstream consumers like synced tabs can + // access them. + self.note_recent_client(¤t_client_record); + + // We periodically upload our own client record, even if it + // doesn't change, to keep it fresh. + if should_refresh_client || client != current_client_record { + log::debug!("Will update our client record on the server"); + let envelope = OutgoingEnvelope { + id: content.envelope.id, + ttl: Some(CLIENTS_TTL), + ..Default::default() + }; + outgoing + .changes + .push(OutgoingBso::from_content(envelope, current_client_record)?); + } + } else { + // Add the other client to our map of recently synced clients. + self.note_recent_client(&client); + + // Bail if we don't have any outgoing commands to write into + // the other client's record. + if outgoing_commands.is_empty() { + continue; + } + + // Determine if we have new commands, that aren't already in the + // client's command list. + let current_commands: HashSet = client + .commands + .iter() + .filter_map(|c| c.as_command()) + .collect(); + let mut new_outgoing_commands = outgoing_commands + .difference(¤t_commands) + .cloned() + .collect::>(); + // Sort, to ensure deterministic ordering for tests. + new_outgoing_commands.sort(); + let mut new_client = client.clone(); + new_client + .commands + .extend(new_outgoing_commands.into_iter().map(CommandRecord::from)); + if new_client.commands.len() == client.commands.len() { + continue; + } + + // Hooray, we added new commands! Make sure the record still + // fits in the maximum record size, or the server will reject + // our upload. + shrink_to_fit( + &mut new_client.commands, + self.memcache_max_record_payload_size(), + )?; + + let envelope = OutgoingEnvelope { + id: content.envelope.id, + ttl: Some(CLIENTS_TTL), + ..Default::default() + }; + outgoing + .changes + .push(OutgoingBso::from_content(envelope, new_client)?); + } + } + + // Upload a record for our own client, if we didn't replace it already. + if !has_own_client_record { + let current_client_record = self.current_client_record(); + self.note_recent_client(¤t_client_record); + let envelope = OutgoingEnvelope { + id: Guid::new(¤t_client_record.id), + ttl: Some(CLIENTS_TTL), + ..Default::default() + }; + outgoing + .changes + .push(OutgoingBso::from_content(envelope, current_client_record)?); + } + + Ok(outgoing) + } + + /// Builds a fresh client record for this device. + fn current_client_record(&self) -> ClientRecord { + let settings = self.command_processor.settings(); + ClientRecord { + id: settings.fxa_device_id.clone(), + name: settings.device_name.clone(), + typ: settings.device_type.into(), + commands: Vec::new(), + fxa_device_id: Some(settings.fxa_device_id.clone()), + version: None, + protocols: vec!["1.5".into()], + form_factor: None, + os: None, + app_package: None, + application: None, + device: None, + } + } + + fn max_record_payload_size(&self) -> usize { + let payload_max = self.config.max_record_payload_bytes; + if payload_max <= self.config.max_post_bytes { + self.config.max_post_bytes.saturating_sub(4096) + } else { + payload_max + } + } + + /// Collections stored in memcached ("tabs", "clients" or "meta") have a + /// different max size than ones stored in the normal storage server db. + /// In practice, the real limit here is 1M (bug 1300451 comment 40), but + /// there's overhead involved that is hard to calculate on the client, so we + /// use 512k to be safe (at the recommendation of the server team). Note + /// that if the server reports a lower limit (via info/configuration), we + /// respect that limit instead. See also bug 1403052. + /// XXX - the above comment is stale and refers to the world before the + /// move to spanner and the rust sync server. + fn memcache_max_record_payload_size(&self) -> usize { + self.max_record_payload_size().min(512 * 1024) + } +} + +pub struct Engine<'a> { + pub command_processor: &'a dyn CommandProcessor, + pub interruptee: &'a dyn Interruptee, + pub recent_clients: HashMap, +} + +impl<'a> Engine<'a> { + /// Creates a new clients engine that delegates to the given command + /// processor to apply incoming commands. + pub fn new<'b>( + command_processor: &'b dyn CommandProcessor, + interruptee: &'b dyn Interruptee, + ) -> Engine<'b> { + Engine { + command_processor, + interruptee, + recent_clients: HashMap::new(), + } + } + + /// Syncs the clients collection. This works a little differently than + /// other collections: + /// + /// 1. It can't be disabled or declined. + /// 2. The sync ID and last sync time aren't meaningful, since we always + /// fetch all client records on every sync. As such, the + /// `LocalCollStateMachine` that we use for other engines doesn't + /// apply to it. + /// 3. It doesn't persist state directly, but relies on the sync manager + /// to persist device settings, and process commands. + /// 4. Failing to sync the clients collection is fatal, and aborts the + /// sync. + /// + /// For these reasons, we implement this engine directly in the `sync15` + /// crate, and provide a specialized `sync` method instead of implementing + /// `sync15::Store`. + pub fn sync( + &mut self, + storage_client: &Sync15StorageClient, + global_state: &GlobalState, + root_sync_key: &KeyBundle, + should_refresh_client: bool, + ) -> Result<()> { + log::info!("Syncing collection clients"); + + let coll_keys = CollectionKeys::from_encrypted_payload( + global_state.keys.clone(), + global_state.keys_timestamp, + root_sync_key, + )?; + let mut coll_state = CollState { + config: global_state.config.clone(), + last_modified: global_state + .collections + .get(COLLECTION_NAME) + .cloned() + .unwrap_or_default(), + key: coll_keys.key_for_collection(COLLECTION_NAME).clone(), + }; + + let inbound = self.fetch_incoming(storage_client, &mut coll_state)?; + + let mut driver = Driver::new( + self.command_processor, + self.interruptee, + &global_state.config, + ); + + let outgoing = driver.sync(inbound, should_refresh_client)?; + self.recent_clients = driver.recent_clients; + + coll_state.last_modified = outgoing.timestamp; + + self.interruptee.err_if_interrupted()?; + let upload_info = + CollectionUpdate::new_from_changeset(storage_client, &coll_state, outgoing, true)? + .upload()?; + + log::info!( + "Upload success ({} records success, {} records failed)", + upload_info.successful_ids.len(), + upload_info.failed_ids.len() + ); + + log::info!("Finished syncing clients"); + Ok(()) + } + + fn fetch_incoming( + &self, + storage_client: &Sync15StorageClient, + coll_state: &mut CollState, + ) -> Result { + // Note that, unlike other stores, we always fetch the full collection + // on every sync, so `inbound` will return all clients, not just the + // ones that changed since the last sync. + let coll_request = CollectionRequest::new(COLLECTION_NAME).full(); + + self.interruptee.err_if_interrupted()?; + let inbound = crate::client::fetch_incoming(storage_client, coll_state, &coll_request)?; + + Ok(inbound) + } + + pub fn local_client_id(&self) -> String { + // Bit dirty but it's the easiest way to reach to our own + // device ID without refactoring the whole sync manager crate. + self.command_processor.settings().fxa_device_id.clone() + } + + pub fn get_client_data(&self) -> ClientData { + ClientData { + local_client_id: self.local_client_id(), + recent_clients: self.recent_clients.clone(), + } + } +} + +#[cfg(test)] +mod tests { + use super::super::{CommandStatus, DeviceType, Settings}; + use super::*; + use crate::bso::IncomingBso; + use crate::ServerTimestamp; + use anyhow::Result; + use interrupt_support::NeverInterrupts; + use serde_json::{json, Value}; + use std::iter::zip; + + struct TestProcessor { + settings: Settings, + outgoing_commands: HashSet, + } + + impl CommandProcessor for TestProcessor { + fn settings(&self) -> &Settings { + &self.settings + } + + fn apply_incoming_command(&self, command: Command) -> Result { + Ok(if let Command::Reset(name) = command { + if name == "forms" { + CommandStatus::Unsupported + } else { + CommandStatus::Applied + } + } else { + CommandStatus::Ignored + }) + } + + fn fetch_outgoing_commands(&self) -> Result> { + Ok(self.outgoing_commands.clone()) + } + } + + fn inbound_from_clients(clients: Value) -> IncomingChangeset { + if let Value::Array(clients) = clients { + let changes = clients + .into_iter() + .map(IncomingBso::from_test_content) + .collect(); + IncomingChangeset { + changes, + timestamp: ServerTimestamp(0), + collection: COLLECTION_NAME.into(), + } + } else { + unreachable!("`clients` must be an array of client records") + } + } + + #[test] + fn test_clients_sync() { + let processor = TestProcessor { + settings: Settings { + fxa_device_id: "deviceAAAAAA".into(), + device_name: "Laptop".into(), + device_type: DeviceType::Desktop, + }, + outgoing_commands: [ + Command::Wipe("bookmarks".into()), + Command::Reset("history".into()), + ] + .iter() + .cloned() + .collect(), + }; + + let config = InfoConfiguration::default(); + + let mut driver = Driver::new(&processor, &NeverInterrupts, &config); + + let inbound = inbound_from_clients(json!([{ + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + }, { + "id": "deviceCCCCCC", + "name": "Fenix", + "type": "mobile", + "commands": [], + "fxaDeviceId": "deviceCCCCCC", + }, { + "id": "deviceAAAAAA", + "name": "Laptop with a different name", + "type": "desktop", + "commands": [{ + "command": "wipeEngine", + "args": ["logins"] + }, { + "command": "displayURI", + "args": ["http://example.com", "Fennec", "Example page"], + "flowID": "flooooooooow", + }, { + "command": "resetEngine", + "args": ["forms"], + }, { + "command": "logout", + "args": [], + }], + "fxaDeviceId": "deviceAAAAAA", + }])); + + // Passing false for `should_refresh_client` - it should be ignored + // because we've changed the commands. + let mut outgoing = driver.sync(inbound, false).expect("Should sync clients"); + outgoing + .changes + .sort_by(|a, b| a.envelope.id.cmp(&b.envelope.id)); + + // Make sure the list of recently synced remote clients is correct. + let expected_ids = &["deviceAAAAAA", "deviceBBBBBB", "deviceCCCCCC"]; + let mut actual_ids = driver.recent_clients.keys().collect::>(); + actual_ids.sort(); + assert_eq!(actual_ids, expected_ids); + + let expected_remote_clients = &[ + RemoteClient { + fxa_device_id: Some("deviceAAAAAA".to_string()), + device_name: "Laptop".into(), + device_type: Some(DeviceType::Desktop), + }, + RemoteClient { + fxa_device_id: Some("iPhooooooone".to_string()), + device_name: "iPhone".into(), + device_type: Some(DeviceType::Mobile), + }, + RemoteClient { + fxa_device_id: Some("deviceCCCCCC".to_string()), + device_name: "Fenix".into(), + device_type: Some(DeviceType::Mobile), + }, + ]; + let actual_remote_clients = expected_ids + .iter() + .filter_map(|&id| driver.recent_clients.get(id)) + .cloned() + .collect::>(); + assert_eq!(actual_remote_clients, expected_remote_clients); + + let expected = json!([{ + "id": "deviceAAAAAA", + "name": "Laptop", + "type": "desktop", + "commands": [{ + "command": "displayURI", + "args": ["http://example.com", "Fennec", "Example page"], + "flowID": "flooooooooow", + }, { + "command": "resetEngine", + "args": ["forms"], + }, { + "command": "logout", + "args": [], + }], + "fxaDeviceId": "deviceAAAAAA", + "protocols": ["1.5"], + }, { + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }, { + "command": "wipeEngine", + "args": ["bookmarks"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + }, { + "id": "deviceCCCCCC", + "name": "Fenix", + "type": "mobile", + "commands": [{ + "command": "wipeEngine", + "args": ["bookmarks"], + }, { + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "deviceCCCCCC", + }]); + // turn outgoing into an incoming payload. + let incoming = IncomingChangeset { + changes: outgoing + .changes + .into_iter() + .map(|c| OutgoingBso::to_test_incoming(&c)) + .collect(), + timestamp: outgoing.timestamp, + collection: outgoing.collection, + }; + if let Value::Array(expected) = expected { + for (incoming_cleartext, exp_client) in zip(incoming.changes, expected) { + let incoming_client: ClientRecord = + incoming_cleartext.into_content().content().unwrap(); + assert_eq!(incoming_client, serde_json::from_value(exp_client).unwrap()); + } + } else { + unreachable!("`expected_clients` must be an array of client records") + } + } + + #[test] + fn test_clients_sync_bad_incoming_record_skipped() { + let processor = TestProcessor { + settings: Settings { + fxa_device_id: "deviceAAAAAA".into(), + device_name: "Laptop".into(), + device_type: DeviceType::Desktop, + }, + outgoing_commands: [].iter().cloned().collect(), + }; + + let config = InfoConfiguration::default(); + + let mut driver = Driver::new(&processor, &NeverInterrupts, &config); + + let inbound = inbound_from_clients(json!([{ + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + }, { + "id": "garbage", + "garbage": "value", + }, { + "id": "deviceCCCCCC", + "deleted": true, + "name": "Fenix", + "type": "mobile", + "commands": [], + "fxaDeviceId": "deviceCCCCCC", + }])); + + driver.sync(inbound, false).expect("Should sync clients"); + + // Make sure the list of recently synced remote clients is correct. + let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"]; + let mut actual_ids = driver.recent_clients.keys().collect::>(); + actual_ids.sort(); + assert_eq!(actual_ids, expected_ids); + + let expected_remote_clients = &[ + RemoteClient { + fxa_device_id: Some("deviceAAAAAA".to_string()), + device_name: "Laptop".into(), + device_type: Some(DeviceType::Desktop), + }, + RemoteClient { + fxa_device_id: Some("iPhooooooone".to_string()), + device_name: "iPhone".into(), + device_type: Some(DeviceType::Mobile), + }, + ]; + let actual_remote_clients = expected_ids + .iter() + .filter_map(|&id| driver.recent_clients.get(id)) + .cloned() + .collect::>(); + assert_eq!(actual_remote_clients, expected_remote_clients); + } + + #[test] + fn test_clients_sync_explicit_refresh() { + let processor = TestProcessor { + settings: Settings { + fxa_device_id: "deviceAAAAAA".into(), + device_name: "Laptop".into(), + device_type: DeviceType::Desktop, + }, + outgoing_commands: [].iter().cloned().collect(), + }; + + let config = InfoConfiguration::default(); + + let mut driver = Driver::new(&processor, &NeverInterrupts, &config); + + let test_clients = json!([{ + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + }, { + "id": "deviceAAAAAA", + "name": "Laptop", + "type": "desktop", + "commands": [], + "fxaDeviceId": "deviceAAAAAA", + "protocols": ["1.5"], + }]); + + let outgoing = driver + .sync(inbound_from_clients(test_clients.clone()), false) + .expect("Should sync clients"); + // should be no outgoing changes. + assert_eq!(outgoing.changes.len(), 0); + + // Make sure the list of recently synced remote clients is correct and + // still includes our record we didn't update. + let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"]; + let mut actual_ids = driver.recent_clients.keys().collect::>(); + actual_ids.sort(); + assert_eq!(actual_ids, expected_ids); + + // Do it again - still no changes, but force a refresh. + let outgoing = driver + .sync(inbound_from_clients(test_clients), true) + .expect("Should sync clients"); + assert_eq!(outgoing.changes.len(), 1); + + // Do it again - but this time with our own client record needing + // some change. + let inbound = inbound_from_clients(json!([{ + "id": "deviceAAAAAA", + "name": "Laptop with New Name", + "type": "desktop", + "commands": [], + "fxaDeviceId": "deviceAAAAAA", + "protocols": ["1.5"], + }])); + let outgoing = driver.sync(inbound, false).expect("Should sync clients"); + // should still be outgoing because the name changed. + assert_eq!(outgoing.changes.len(), 1); + } + + #[test] + fn test_fresh_client_record() { + let processor = TestProcessor { + settings: Settings { + fxa_device_id: "deviceAAAAAA".into(), + device_name: "Laptop".into(), + device_type: DeviceType::Desktop, + }, + outgoing_commands: HashSet::new(), + }; + + let config = InfoConfiguration::default(); + + let mut driver = Driver::new(&processor, &NeverInterrupts, &config); + + let clients = json!([{ + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + }]); + + let inbound = if let Value::Array(clients) = clients { + let changes = clients + .into_iter() + .map(IncomingBso::from_test_content) + .collect(); + IncomingChangeset { + changes, + timestamp: ServerTimestamp(0), + collection: COLLECTION_NAME.into(), + } + } else { + unreachable!("`clients` must be an array of client records") + }; + + // Passing false here for should_refresh_client, but it should be + // ignored as we don't have an existing record yet. + let mut outgoing = driver.sync(inbound, false).expect("Should sync clients"); + outgoing + .changes + .sort_by(|a, b| a.envelope.id.cmp(&b.envelope.id)); + + // Make sure the list of recently synced remote clients is correct. + let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"]; + let mut actual_ids = driver.recent_clients.keys().collect::>(); + actual_ids.sort(); + assert_eq!(actual_ids, expected_ids); + + let expected_remote_clients = &[ + RemoteClient { + fxa_device_id: Some("deviceAAAAAA".to_string()), + device_name: "Laptop".into(), + device_type: Some(DeviceType::Desktop), + }, + RemoteClient { + fxa_device_id: Some("iPhooooooone".to_string()), + device_name: "iPhone".into(), + device_type: Some(DeviceType::Mobile), + }, + ]; + let actual_remote_clients = expected_ids + .iter() + .filter_map(|&id| driver.recent_clients.get(id)) + .cloned() + .collect::>(); + assert_eq!(actual_remote_clients, expected_remote_clients); + + let expected = json!([{ + "id": "deviceAAAAAA", + "name": "Laptop", + "type": "desktop", + "fxaDeviceId": "deviceAAAAAA", + "protocols": ["1.5"], + "ttl": CLIENTS_TTL, + }]); + if let Value::Array(expected) = expected { + // turn outgoing into an incoming payload. + let incoming = IncomingChangeset { + changes: outgoing + .changes + .into_iter() + .map(|c| OutgoingBso::to_test_incoming(&c)) + .collect(), + timestamp: outgoing.timestamp, + collection: outgoing.collection, + }; + for (incoming_cleartext, record) in zip(incoming.changes, expected) { + let incoming_client: ClientRecord = + incoming_cleartext.into_content().content().unwrap(); + assert_eq!(incoming_client, serde_json::from_value(record).unwrap()); + } + } else { + unreachable!("`expected_clients` must be an array of client records") + } + } +} diff --git a/third_party/rust/sync15/src/clients_engine/mod.rs b/third_party/rust/sync15/src/clients_engine/mod.rs new file mode 100644 index 0000000000..7346712dc7 --- /dev/null +++ b/third_party/rust/sync15/src/clients_engine/mod.rs @@ -0,0 +1,93 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! The client engine is a [crate::engine](Sync Engine) used to manage the +//! "clients" collection. The clients engine manages the client record for +//! "this device, and also manages "commands". +//! In short, commands target one or more engines and instruct them to +//! perform various operations - such as wiping all local data. +//! These commands are used very rarely - currently the only command used +//! in practice is for bookmarks to wipe all their data, which is sent when +//! a desktop device restores all bookmarks from a backup. In this scenario, +//! desktop will delete all local bookmarks then replace them with the backed +//! up set, which without a "wipe" command would almost certainly cause other +//! connected devices to "resurrect" the deleted bookmarks. +use std::collections::HashSet; + +mod engine; +mod record; +mod ser; + +use crate::DeviceType; +use anyhow::Result; +pub use engine::Engine; + +// These are what desktop uses. +const CLIENTS_TTL: u32 = 15_552_000; // 180 days +pub(crate) const CLIENTS_TTL_REFRESH: u64 = 604_800; // 7 days + +/// A command processor applies incoming commands like wipes and resets for all +/// stores, and returns commands to send to other clients. It also manages +/// settings like the device name and type, which is stored in the special +/// `clients` collection. +/// +/// In practice, this trait only has one implementation, in the sync manager. +/// It's split this way because the clients engine depends on internal `sync15` +/// structures, and can't be implemented as a syncable store...but `sync15` +/// doesn't know anything about multiple engines. This lets the sync manager +/// provide its own implementation for handling wipe and reset commands for all +/// the engines that it manages. +pub trait CommandProcessor { + fn settings(&self) -> &Settings; + + /// Fetches commands to send to other clients. An error return value means + /// commands couldn't be fetched, and halts the sync. + fn fetch_outgoing_commands(&self) -> Result>; + + /// Applies a command sent to this client from another client. This method + /// should return a `CommandStatus` indicating whether the command was + /// processed. + /// + /// An error return value means the sync manager encountered an error + /// applying the command, and halts the sync to prevent unexpected behavior + /// (for example, merging local and remote bookmarks, when we were told to + /// wipe our local bookmarks). + fn apply_incoming_command(&self, command: Command) -> Result; +} + +/// Indicates if a command was applied successfully, ignored, or not supported. +/// Applied and ignored commands are removed from our client record, and never +/// retried. Unsupported commands are put back into our record, and retried on +/// subsequent syncs. This is to handle clients adding support for new data +/// types. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub enum CommandStatus { + Applied, + Ignored, + Unsupported, +} + +/// Information about this device to include in its client record. This should +/// be persisted across syncs, as part of the sync manager state. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct Settings { + /// The FxA device ID of this client, also used as this client's record ID + /// in the clients collection. + pub fxa_device_id: String, + /// The name of this client. This should match the client's name in the + /// FxA device manager. + pub device_name: String, + /// The type of this client: mobile, tablet, desktop, or other. + pub device_type: DeviceType, +} + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum Command { + /// Erases all local data for a specific engine. + Wipe(String), + /// Resets local sync state for all engines. + ResetAll, + /// Resets local sync state for a specific engine. + Reset(String), +} diff --git a/third_party/rust/sync15/src/clients_engine/record.rs b/third_party/rust/sync15/src/clients_engine/record.rs new file mode 100644 index 0000000000..12de36d82f --- /dev/null +++ b/third_party/rust/sync15/src/clients_engine/record.rs @@ -0,0 +1,124 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use serde_derive::*; + +use super::Command; + +/// The serialized form of a client record. +#[derive(Clone, Debug, Eq, Deserialize, Hash, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ClientRecord { + #[serde(rename = "id")] + pub id: String, + + pub name: String, + + #[serde(default, rename = "type")] + pub typ: Option, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub commands: Vec, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub fxa_device_id: Option, + + /// `version`, `protocols`, `formfactor`, `os`, `appPackage`, `application`, + /// and `device` are unused and optional in all implementations (Desktop, + /// iOS, and Fennec), but we round-trip them. + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub version: Option, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub protocols: Vec, + + #[serde( + default, + rename = "formfactor", + skip_serializing_if = "Option::is_none" + )] + pub form_factor: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub os: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub app_package: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub application: Option, + + /// The model of the device, like "iPhone" or "iPod touch" on iOS. Note + /// that this is _not_ the client ID (`id`) or the FxA device ID + /// (`fxa_device_id`). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub device: Option, +} + +impl From<&ClientRecord> for crate::RemoteClient { + fn from(record: &ClientRecord) -> crate::RemoteClient { + crate::RemoteClient { + fxa_device_id: record.fxa_device_id.clone(), + device_name: record.name.clone(), + device_type: record.typ, + } + } +} + +/// The serialized form of a client command. +#[derive(Clone, Debug, Eq, Deserialize, Hash, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CommandRecord { + /// The command name. This is a string, not an enum, because we want to + /// round-trip commands that we don't support yet. + #[serde(rename = "command")] + pub name: String, + + /// Extra, command-specific arguments. Note that we must send an empty + /// array if the command expects no arguments. + #[serde(default)] + pub args: Vec, + + /// Some commands, like repair, send a "flow ID" that other cliennts can + /// record in their telemetry. We don't currently send commands with + /// flow IDs, but we round-trip them. + #[serde(default, rename = "flowID", skip_serializing_if = "Option::is_none")] + pub flow_id: Option, +} + +impl CommandRecord { + /// Converts a serialized command into one that we can apply. Returns `None` + /// if we don't support the command. + pub fn as_command(&self) -> Option { + match self.name.as_str() { + "wipeEngine" => self.args.get(0).map(|e| Command::Wipe(e.into())), + "resetEngine" => self.args.get(0).map(|e| Command::Reset(e.into())), + "resetAll" => Some(Command::ResetAll), + _ => None, + } + } +} + +impl From for CommandRecord { + fn from(command: Command) -> CommandRecord { + match command { + Command::Wipe(engine) => CommandRecord { + name: "wipeEngine".into(), + args: vec![engine], + flow_id: None, + }, + Command::Reset(engine) => CommandRecord { + name: "resetEngine".into(), + args: vec![engine], + flow_id: None, + }, + Command::ResetAll => CommandRecord { + name: "resetAll".into(), + args: Vec::new(), + flow_id: None, + }, + } + } +} diff --git a/third_party/rust/sync15/src/clients_engine/ser.rs b/third_party/rust/sync15/src/clients_engine/ser.rs new file mode 100644 index 0000000000..2e7b0817b8 --- /dev/null +++ b/third_party/rust/sync15/src/clients_engine/ser.rs @@ -0,0 +1,125 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::Result; +use serde::Serialize; +use std::io::{self, Write}; + +/// A writer that counts the number of bytes it's asked to write, and discards +/// the data. Used to calculate the serialized size of the commands list. +#[derive(Clone, Copy, Default)] +pub struct WriteCount(usize); + +impl WriteCount { + #[inline] + pub fn len(self) -> usize { + self.0 + } +} + +impl Write for WriteCount { + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0 += buf.len(); + Ok(buf.len()) + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +/// Returns the size of the given value, in bytes, when serialized to JSON. +fn compute_serialized_size(value: &T) -> Result { + let mut w = WriteCount::default(); + serde_json::to_writer(&mut w, value)?; + Ok(w.len()) +} + +/// Truncates `list` to fit within `payload_size_max_bytes` when serialized to +/// JSON. +pub fn shrink_to_fit(list: &mut Vec, payload_size_max_bytes: usize) -> Result<()> { + let size = compute_serialized_size(&list)?; + // See bug 535326 comment 8 for an explanation of the estimation + match ((payload_size_max_bytes / 4) * 3).checked_sub(1500) { + Some(max_serialized_size) => { + if size > max_serialized_size { + // Estimate a little more than the direct fraction to maximize packing + let cutoff = (list.len() * max_serialized_size - 1) / size + 1; + list.truncate(cutoff + 1); + // Keep dropping off the last entry until the data fits. + while compute_serialized_size(&list)? > max_serialized_size { + if list.pop().is_none() { + break; + } + } + } + Ok(()) + } + None => { + list.clear(); + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use super::super::record::CommandRecord; + use super::*; + + #[test] + fn test_compute_serialized_size() { + assert_eq!(compute_serialized_size(&1).unwrap(), 1); + assert_eq!(compute_serialized_size(&"hi").unwrap(), 4); + assert_eq!( + compute_serialized_size(&["hi", "hello", "bye"]).unwrap(), + 20 + ); + } + + #[test] + fn test_shrink_to_fit() { + let mut commands = vec![ + CommandRecord { + name: "wipeEngine".into(), + args: vec!["bookmarks".into()], + flow_id: Some("flow".into()), + }, + CommandRecord { + name: "resetEngine".into(), + args: vec!["history".into()], + flow_id: Some("flow".into()), + }, + CommandRecord { + name: "logout".into(), + args: Vec::new(), + flow_id: None, + }, + ]; + + // 4096 bytes is enough to fit all three commands. + shrink_to_fit(&mut commands, 4096).unwrap(); + assert_eq!(commands.len(), 3); + + let sizes = commands + .iter() + .map(|c| compute_serialized_size(c).unwrap()) + .collect::>(); + assert_eq!(sizes, &[61, 60, 30]); + + // `logout` won't fit within 2168 bytes. + shrink_to_fit(&mut commands, 2168).unwrap(); + assert_eq!(commands.len(), 2); + + // `resetEngine` won't fit within 2084 bytes. + shrink_to_fit(&mut commands, 2084).unwrap(); + assert_eq!(commands.len(), 1); + + // `wipeEngine` won't fit at all. + shrink_to_fit(&mut commands, 1024).unwrap(); + assert!(commands.is_empty()); + } +} diff --git a/third_party/rust/sync15/src/enc_payload.rs b/third_party/rust/sync15/src/enc_payload.rs new file mode 100644 index 0000000000..2adc031f70 --- /dev/null +++ b/third_party/rust/sync15/src/enc_payload.rs @@ -0,0 +1,110 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error; +use crate::key_bundle::KeyBundle; +use lazy_static::lazy_static; +use serde::{Deserialize, Serialize}; + +/// A representation of an encrypted payload. Used as the payload in EncryptedBso and +/// also anywhere else the sync keys might be used to encrypt/decrypt, such as send-tab payloads. +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct EncryptedPayload { + #[serde(rename = "IV")] + pub iv: String, + pub hmac: String, + pub ciphertext: String, +} + +impl EncryptedPayload { + #[inline] + pub fn serialized_len(&self) -> usize { + (*EMPTY_ENCRYPTED_PAYLOAD_SIZE) + self.ciphertext.len() + self.hmac.len() + self.iv.len() + } + + pub fn decrypt(&self, key: &KeyBundle) -> error::Result { + key.decrypt(&self.ciphertext, &self.iv, &self.hmac) + } + + pub fn decrypt_into(&self, key: &KeyBundle) -> error::Result + where + for<'a> T: Deserialize<'a>, + { + Ok(serde_json::from_str(&self.decrypt(key)?)?) + } + + pub fn from_cleartext(key: &KeyBundle, cleartext: String) -> error::Result { + let (enc_base64, iv_base64, hmac_base16) = + key.encrypt_bytes_rand_iv(cleartext.as_bytes())?; + Ok(EncryptedPayload { + iv: iv_base64, + hmac: hmac_base16, + ciphertext: enc_base64, + }) + } + + pub fn from_cleartext_payload( + key: &KeyBundle, + cleartext_payload: &T, + ) -> error::Result { + Self::from_cleartext(key, serde_json::to_string(cleartext_payload)?) + } +} + +// Our "postqueue", which chunks records for upload, needs to know this value. +// It's tricky to determine at compile time, so do it once at at runtime. +lazy_static! { + // The number of bytes taken up by padding in a EncryptedPayload. + static ref EMPTY_ENCRYPTED_PAYLOAD_SIZE: usize = serde_json::to_string( + &EncryptedPayload { iv: "".into(), hmac: "".into(), ciphertext: "".into() } + ).unwrap().len(); +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[derive(Serialize, Deserialize, Debug)] + struct TestStruct { + id: String, + age: u32, + meta: String, + } + + #[test] + fn test_roundtrip_crypt_record() { + let key = KeyBundle::new_random().unwrap(); + let payload_json = json!({ "id": "aaaaaaaaaaaa", "age": 105, "meta": "data" }); + let payload = + EncryptedPayload::from_cleartext(&key, serde_json::to_string(&payload_json).unwrap()) + .unwrap(); + + let record = payload.decrypt_into::(&key).unwrap(); + assert_eq!(record.id, "aaaaaaaaaaaa"); + assert_eq!(record.age, 105); + assert_eq!(record.meta, "data"); + + // While we're here, check on EncryptedPayload::serialized_len + let val_rec = serde_json::to_string(&serde_json::to_value(&payload).unwrap()).unwrap(); + assert_eq!(payload.serialized_len(), val_rec.len()); + } + + #[test] + fn test_record_bad_hmac() { + let key1 = KeyBundle::new_random().unwrap(); + let json = json!({ "id": "aaaaaaaaaaaa", "deleted": true, }); + + let payload = + EncryptedPayload::from_cleartext(&key1, serde_json::to_string(&json).unwrap()).unwrap(); + + let key2 = KeyBundle::new_random().unwrap(); + let e = payload + .decrypt(&key2) + .expect_err("Should fail because wrong keybundle"); + + // Note: ErrorKind isn't PartialEq, so. + assert!(matches!(e, error::Error::CryptoError(_))); + } +} diff --git a/third_party/rust/sync15/src/engine/bridged_engine.rs b/third_party/rust/sync15/src/engine/bridged_engine.rs new file mode 100644 index 0000000000..85e60726b4 --- /dev/null +++ b/third_party/rust/sync15/src/engine/bridged_engine.rs @@ -0,0 +1,121 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::bso::{IncomingBso, OutgoingBso}; + +use crate::Guid; + +/// A BridgedEngine acts as a bridge between application-services, rust +/// implemented sync engines and sync engines as defined by Desktop Firefox. +/// +/// [Desktop Firefox has an abstract implementation of a Sync +/// Engine](https://searchfox.org/mozilla-central/source/services/sync/modules/engines.js) +/// with a number of functions each engine is expected to override. Engines +/// implemented in Rust use a different shape (specifically, the +/// [SyncEngine](crate::SyncEngine) trait), so this BridgedEngine trait adapts +/// between the 2. +pub trait BridgedEngine { + /// The type returned for errors. + type Error; + + /// Returns the last sync time, in milliseconds, for this engine's + /// collection. This is called before each sync, to determine the lower + /// bound for new records to fetch from the server. + fn last_sync(&self) -> Result; + + /// Sets the last sync time, in milliseconds. This is called throughout + /// the sync, to fast-forward the stored last sync time to match the + /// timestamp on the uploaded records. + fn set_last_sync(&self, last_sync_millis: i64) -> Result<(), Self::Error>; + + /// Returns the sync ID for this engine's collection. This is only used in + /// tests. + fn sync_id(&self) -> Result, Self::Error>; + + /// Resets the sync ID for this engine's collection, returning the new ID. + /// As a side effect, implementations should reset all local Sync state, + /// as in `reset`. + fn reset_sync_id(&self) -> Result; + + /// Ensures that the locally stored sync ID for this engine's collection + /// matches the `new_sync_id` from the server. If the two don't match, + /// implementations should reset all local Sync state, as in `reset`. + /// This method returns the assigned sync ID, which can be either the + /// `new_sync_id`, or a different one if the engine wants to force other + /// devices to reset their Sync state for this collection the next time they + /// sync. + fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result; + + /// Tells the tabs engine about recent FxA devices. A bit of a leaky abstration as it only + /// makes sense for tabs. + /// The arg is a json serialized `ClientData` struct. + fn prepare_for_sync(&self, _client_data: &str) -> Result<(), Self::Error> { + Ok(()) + } + + /// Indicates that the engine is about to start syncing. This is called + /// once per sync, and always before `store_incoming`. + fn sync_started(&self) -> Result<(), Self::Error>; + + /// Stages a batch of incoming Sync records. This is called multiple + /// times per sync, once for each batch. Implementations can use the + /// signal to check if the operation was aborted, and cancel any + /// pending work. + fn store_incoming(&self, incoming_records: Vec) -> Result<(), Self::Error>; + + /// Applies all staged records, reconciling changes on both sides and + /// resolving conflicts. Returns a list of records to upload. + fn apply(&self) -> Result; + + /// Indicates that the given record IDs were uploaded successfully to the + /// server. This is called multiple times per sync, once for each batch + /// upload. + fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<(), Self::Error>; + + /// Indicates that all records have been uploaded. At this point, any record + /// IDs marked for upload that haven't been passed to `set_uploaded`, can be + /// assumed to have failed: for example, because the server rejected a record + /// with an invalid TTL or sort index. + fn sync_finished(&self) -> Result<(), Self::Error>; + + /// Resets all local Sync state, including any change flags, mirrors, and + /// the last sync time, such that the next sync is treated as a first sync + /// with all new local data. Does not erase any local user data. + fn reset(&self) -> Result<(), Self::Error>; + + /// Erases all local user data for this collection, and any Sync metadata. + /// This method is destructive, and unused for most collections. + fn wipe(&self) -> Result<(), Self::Error>; +} + +// TODO: We should replace this with OutgoingChangeset to reduce the number +// of types engines need to deal with. +#[derive(Debug, Default)] +pub struct ApplyResults { + /// List of records + pub records: Vec, + /// The number of incoming records whose contents were merged because they + /// changed on both sides. None indicates we aren't reporting this + /// information. + pub num_reconciled: Option, +} + +impl ApplyResults { + pub fn new(records: Vec, num_reconciled: impl Into>) -> Self { + Self { + records, + num_reconciled: num_reconciled.into(), + } + } +} + +// Shorthand for engines that don't care. +impl From> for ApplyResults { + fn from(records: Vec) -> Self { + Self { + records, + num_reconciled: None, + } + } +} diff --git a/third_party/rust/sync15/src/engine/changeset.rs b/third_party/rust/sync15/src/engine/changeset.rs new file mode 100644 index 0000000000..f69ce1b8fa --- /dev/null +++ b/third_party/rust/sync15/src/engine/changeset.rs @@ -0,0 +1,42 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::bso::{IncomingBso, OutgoingBso}; +use crate::ServerTimestamp; + +#[derive(Debug, Clone)] +pub struct RecordChangeset { + pub changes: Vec, + /// For GETs, the last sync timestamp that should be persisted after + /// applying the records. + /// For POSTs, this is the XIUS timestamp. + pub timestamp: ServerTimestamp, + pub collection: std::borrow::Cow<'static, str>, +} + +pub type IncomingChangeset = RecordChangeset; +pub type OutgoingChangeset = RecordChangeset; + +impl RecordChangeset { + #[inline] + pub fn new( + collection: impl Into>, + timestamp: ServerTimestamp, + ) -> RecordChangeset { + Self::new_with_changes(collection, timestamp, Vec::new()) + } + + #[inline] + pub fn new_with_changes( + collection: impl Into>, + timestamp: ServerTimestamp, + changes: Vec, + ) -> RecordChangeset { + RecordChangeset { + changes, + timestamp, + collection: collection.into(), + } + } +} diff --git a/third_party/rust/sync15/src/engine/mod.rs b/third_party/rust/sync15/src/engine/mod.rs new file mode 100644 index 0000000000..ef3b7158ea --- /dev/null +++ b/third_party/rust/sync15/src/engine/mod.rs @@ -0,0 +1,36 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! This module is used by crates which need to implement a "sync engine". +//! At a high-level, a "sync engine" is code which knows how to take records +//! from a sync server, apply and reconcile them with the local data, then +//! provide records which should be uploaded to the server. +//! +//! Note that the "sync engine" does not itself talk to the server, nor does +//! it manage the state of the remote server, nor does it do any of the +//! encryption/decryption - that is the responsbility of the "sync client", as +//! implemented in the [client] module (or in some cases, implemented externally) +//! +//! There are currently 2 types of engine: +//! * Code which implements the [crate::engine::sync_engine::SyncEngine] +//! trait. These are the "original" Rust engines, designed to be used with +//! the [crate::client](sync client) +//! * Code which implements the [crate::engine::bridged_engine::BridgedEngine] +//! trait. These engines are a "bridge" between the Desktop JS Sync world and +//! this rust code. +//! While these engines end up doing the same thing, the difference is due to +//! implementation differences between the Desktop Sync client and the Rust +//! client. +//! We intend merging these engines - the first step will be to merge the +//! types and payload management used by these traits, then to combine the +//! requirements into a single trait that captures both use-cases. +mod bridged_engine; +mod changeset; +mod request; +mod sync_engine; + +pub use bridged_engine::{ApplyResults, BridgedEngine}; +pub use changeset::{IncomingChangeset, OutgoingChangeset}; +pub use request::{CollectionRequest, RequestOrder}; +pub use sync_engine::{CollSyncIds, EngineSyncAssociation, SyncEngine, SyncEngineId}; diff --git a/third_party/rust/sync15/src/engine/request.rs b/third_party/rust/sync15/src/engine/request.rs new file mode 100644 index 0000000000..d8e20103a8 --- /dev/null +++ b/third_party/rust/sync15/src/engine/request.rs @@ -0,0 +1,113 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +use crate::{Guid, ServerTimestamp}; +use std::borrow::Cow; +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CollectionRequest { + pub collection: Cow<'static, str>, + pub full: bool, + pub ids: Option>, + pub limit: usize, + pub older: Option, + pub newer: Option, + pub order: Option, + pub commit: bool, + pub batch: Option, +} + +impl CollectionRequest { + #[inline] + pub fn new(collection: S) -> CollectionRequest + where + S: Into>, + { + CollectionRequest { + collection: collection.into(), + full: false, + ids: None, + limit: 0, + older: None, + newer: None, + order: None, + commit: false, + batch: None, + } + } + + #[inline] + pub fn ids(mut self, v: V) -> CollectionRequest + where + V: IntoIterator, + V::Item: Into, + { + self.ids = Some(v.into_iter().map(|id| id.into()).collect()); + self + } + + #[inline] + pub fn full(mut self) -> CollectionRequest { + self.full = true; + self + } + + #[inline] + pub fn older_than(mut self, ts: ServerTimestamp) -> CollectionRequest { + self.older = Some(ts); + self + } + + #[inline] + pub fn newer_than(mut self, ts: ServerTimestamp) -> CollectionRequest { + self.newer = Some(ts); + self + } + + #[inline] + pub fn sort_by(mut self, order: RequestOrder) -> CollectionRequest { + self.order = Some(order); + self + } + + #[inline] + pub fn limit(mut self, num: usize) -> CollectionRequest { + self.limit = num; + self + } + + #[inline] + pub fn batch(mut self, batch: Option) -> CollectionRequest { + self.batch = batch; + self + } + + #[inline] + pub fn commit(mut self, v: bool) -> CollectionRequest { + self.commit = v; + self + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub enum RequestOrder { + Oldest, + Newest, + Index, +} + +impl RequestOrder { + #[inline] + pub fn as_str(self) -> &'static str { + match self { + RequestOrder::Oldest => "oldest", + RequestOrder::Newest => "newest", + RequestOrder::Index => "index", + } + } +} + +impl std::fmt::Display for RequestOrder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} diff --git a/third_party/rust/sync15/src/engine/sync_engine.rs b/third_party/rust/sync15/src/engine/sync_engine.rs new file mode 100644 index 0000000000..ebd5138200 --- /dev/null +++ b/third_party/rust/sync15/src/engine/sync_engine.rs @@ -0,0 +1,235 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use super::{CollectionRequest, IncomingChangeset, OutgoingChangeset}; +use crate::client_types::ClientData; +use crate::{telemetry, Guid, ServerTimestamp}; +use anyhow::Result; +use std::fmt; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CollSyncIds { + pub global: Guid, + pub coll: Guid, +} + +/// Defines how an engine is associated with a particular set of records +/// on a sync storage server. It's either disconnected, or believes it is +/// connected with a specific set of GUIDs. If the server and the engine don't +/// agree on the exact GUIDs, the engine will assume something radical happened +/// so it can't believe anything it thinks it knows about the state of the +/// server (ie, it will "reset" then do a full reconcile) +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum EngineSyncAssociation { + /// This store is disconnected (although it may be connected in the future). + Disconnected, + /// Sync is connected, and has the following sync IDs. + Connected(CollSyncIds), +} + +/// The concrete `SyncEngine` implementations +#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub enum SyncEngineId { + // Note that we've derived PartialOrd etc, which uses lexicographic ordering + // of the variants. We leverage that such that the higher priority engines + // are listed first. + // This order matches desktop. + Passwords, + Tabs, + Bookmarks, + Addresses, + CreditCards, + History, +} + +impl SyncEngineId { + // Iterate over all possible engines. Note that we've made a policy decision + // that this should enumerate in "order" as defined by PartialCmp, and tests + // enforce this. + pub fn iter() -> impl Iterator { + [ + Self::Passwords, + Self::Tabs, + Self::Bookmarks, + Self::Addresses, + Self::CreditCards, + Self::History, + ] + .into_iter() + } + + // Get the string identifier for this engine. This must match the strings in SyncEngineSelection. + pub fn name(&self) -> &'static str { + match self { + Self::Passwords => "passwords", + Self::History => "history", + Self::Bookmarks => "bookmarks", + Self::Tabs => "tabs", + Self::Addresses => "addresses", + Self::CreditCards => "creditcards", + } + } +} + +impl fmt::Display for SyncEngineId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.name()) + } +} + +impl TryFrom<&str> for SyncEngineId { + type Error = String; + + fn try_from(value: &str) -> std::result::Result { + match value { + "passwords" => Ok(Self::Passwords), + "history" => Ok(Self::History), + "bookmarks" => Ok(Self::Bookmarks), + "tabs" => Ok(Self::Tabs), + "addresses" => Ok(Self::Addresses), + "creditcards" => Ok(Self::CreditCards), + _ => Err(value.into()), + } + } +} + +/// A "sync engine" is a thing that knows how to sync. It's often implemented +/// by a "store" (which is the generic term responsible for all storage +/// associated with a component, including storage required for sync.) +/// +/// Low-level engine functionality. Engines that need custom reconciliation +/// logic should use this. +/// +/// Different engines will produce errors of different types. To accommodate +/// this, we force them all to return anyhow::Error. +pub trait SyncEngine { + fn collection_name(&self) -> std::borrow::Cow<'static, str>; + + /// Prepares the engine for syncing. The tabs engine currently uses this to + /// store the current list of clients, which it uses to look up device names + /// and types. + /// + /// Note that this method is only called by `sync_multiple`, and only if a + /// command processor is registered. In particular, `prepare_for_sync` will + /// not be called if the store is synced using `sync::synchronize` or + /// `sync_multiple::sync_multiple`. It _will_ be called if the store is + /// synced via the Sync Manager. + /// + /// TODO(issue #2590): This is pretty cludgey and will be hard to extend for + /// any case other than the tabs case. We should find another way to support + /// tabs... + fn prepare_for_sync(&self, _get_client_data: &dyn Fn() -> ClientData) -> Result<()> { + Ok(()) + } + + /// Tells the engine what the local encryption key is for the data managed + /// by the engine. This is only used by collections that store data + /// encrypted locally and is unrelated to the encryption used by Sync. + /// The intent is that for such collections, this key can be used to + /// decrypt local data before it is re-encrypted by Sync and sent to the + /// storage servers, and similarly, data from the storage servers will be + /// decrypted by Sync, then encrypted by the local encryption key before + /// being added to the local database. + /// + /// The expectation is that the key value is being maintained by the + /// embedding application in some secure way suitable for the environment + /// in which the app is running - eg, the OS "keychain". The value of the + /// key is implementation dependent - it is expected that the engine and + /// embedding application already have some external agreement about how + /// to generate keys and in what form they are exchanged. Finally, there's + /// an assumption that sync engines are short-lived and only live for a + /// single sync - this means that sync doesn't hold on to the key for an + /// extended period. + /// + /// This will panic if called by an engine that doesn't have explicit + /// support for local encryption keys as that implies a degree of confusion + /// which shouldn't be possible to ignore. + fn set_local_encryption_key(&mut self, _key: &str) -> Result<()> { + unimplemented!("This engine does not support local encryption"); + } + + /// `inbound` is a vector to support the case where + /// `get_collection_requests` returned multiple requests. The changesets are + /// in the same order as the requests were -- e.g. if `vec![req_a, req_b]` + /// was returned from `get_collection_requests`, `inbound` will have the + /// results from `req_a` as its first index, and those from `req_b` as it's + /// second. + fn apply_incoming( + &self, + inbound: Vec, + telem: &mut telemetry::Engine, + ) -> Result; + + fn sync_finished( + &self, + new_timestamp: ServerTimestamp, + records_synced: Vec, + ) -> Result<()>; + + /// The engine is responsible for building the collection request. Engines + /// typically will store a lastModified timestamp and use that to build a + /// request saying "give me full records since that date" - however, other + /// engines might do something fancier. This could even later be extended to + /// handle "backfills" etc + /// + /// To support more advanced use cases, multiple requests can be returned + /// here - either from the same or different collections. The vast majority + /// of engines will just want to return zero or one item in their vector + /// (zero is a valid optimization when the server timestamp is the same as + /// the engine last saw, one when it is not) + /// + /// Important: In the case when more than one collection is requested, it's + /// assumed the last one is the "canonical" one. (That is, it must be for + /// "this" collection, its timestamp is used to represent the sync, etc). + /// (Note that multiple collection request support is currently unused, so + /// it might make sense to delete it - if we need it later, we may find a + /// better shape for our use-case) + fn get_collection_requests( + &self, + server_timestamp: ServerTimestamp, + ) -> Result>; + + /// Get persisted sync IDs. If they don't match the global state we'll be + /// `reset()` with the new IDs. + fn get_sync_assoc(&self) -> Result; + + /// Reset the engine (and associated store) without wiping local data, + /// ready for a "first sync". + /// `assoc` defines how this store is to be associated with sync. + fn reset(&self, assoc: &EngineSyncAssociation) -> Result<()>; + + fn wipe(&self) -> Result<()>; +} + +#[cfg(test)] +mod test { + use super::*; + use std::iter::zip; + + #[test] + fn test_engine_priority() { + fn sorted(mut engines: Vec) -> Vec { + engines.sort(); + engines + } + assert_eq!( + vec![SyncEngineId::Passwords, SyncEngineId::Tabs], + sorted(vec![SyncEngineId::Passwords, SyncEngineId::Tabs]) + ); + assert_eq!( + vec![SyncEngineId::Passwords, SyncEngineId::Tabs], + sorted(vec![SyncEngineId::Tabs, SyncEngineId::Passwords]) + ); + } + + #[test] + fn test_engine_enum_order() { + let unsorted = SyncEngineId::iter().collect::>(); + let mut sorted = SyncEngineId::iter().collect::>(); + sorted.sort(); + + // iterating should supply identical elements in each. + assert!(zip(unsorted, sorted).fold(true, |acc, (a, b)| acc && (a == b))) + } +} diff --git a/third_party/rust/sync15/src/error.rs b/third_party/rust/sync15/src/error.rs new file mode 100644 index 0000000000..e4a043b8ac --- /dev/null +++ b/third_party/rust/sync15/src/error.rs @@ -0,0 +1,138 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use interrupt_support::Interrupted; + +/// This enum is to discriminate `StorageHttpError`, and not used as an error. +#[cfg(feature = "sync-client")] +#[derive(Debug, Clone)] +pub enum ErrorResponse { + NotFound { route: String }, + // 401 + Unauthorized { route: String }, + // 412 + PreconditionFailed { route: String }, + // 5XX + ServerError { route: String, status: u16 }, // TODO: info for "retry-after" and backoff handling etc here. + // Other HTTP responses. + RequestFailed { route: String, status: u16 }, +} + +pub type Result = std::result::Result; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[cfg(feature = "crypto")] + #[error("Key {0} had wrong length, got {1}, expected {2}")] + BadKeyLength(&'static str, usize, usize), + + #[cfg(feature = "crypto")] + #[error("SHA256 HMAC Mismatch error")] + HmacMismatch, + + #[cfg(feature = "crypto")] + #[error("Crypto/NSS error: {0}")] + CryptoError(#[from] rc_crypto::Error), + + #[cfg(feature = "crypto")] + #[error("Base64 decode error: {0}")] + Base64Decode(#[from] base64::DecodeError), + + #[error("JSON error: {0}")] + JsonError(#[from] serde_json::Error), + + #[error("Bad cleartext UTF8: {0}")] + BadCleartextUtf8(#[from] std::string::FromUtf8Error), + + #[cfg(feature = "crypto")] + #[error("HAWK error: {0}")] + HawkError(#[from] rc_crypto::hawk::Error), + + // + // Errors specific to this module. + // + #[cfg(feature = "sync-client")] + #[error("HTTP status {0} when requesting a token from the tokenserver")] + TokenserverHttpError(u16), + + #[cfg(feature = "sync-client")] + #[error("HTTP storage error: {0:?}")] + StorageHttpError(ErrorResponse), + + #[cfg(feature = "sync-client")] + #[error("Server requested backoff. Retry after {0:?}")] + BackoffError(std::time::SystemTime), + + #[cfg(feature = "sync-client")] + #[error("Outgoing record is too large to upload")] + RecordTooLargeError, + + // Do we want to record the concrete problems? + #[cfg(feature = "sync-client")] + #[error("Not all records were successfully uploaded")] + RecordUploadFailed, + + /// Used for things like a node reassignment or an unexpected syncId + /// implying the app needs to "reset" its understanding of remote storage. + #[cfg(feature = "sync-client")] + #[error("The server has reset the storage for this account")] + StorageResetError, + + #[cfg(feature = "sync-client")] + #[error("Unacceptable URL: {0}")] + UnacceptableUrl(String), + + #[cfg(feature = "sync-client")] + #[error("Missing server timestamp header in request")] + MissingServerTimestamp, + + #[cfg(feature = "sync-client")] + #[error("Unexpected server behavior during batch upload: {0}")] + ServerBatchProblem(&'static str), + + #[cfg(feature = "sync-client")] + #[error("It appears some other client is also trying to setup storage; try again later")] + SetupRace, + + #[cfg(feature = "sync-client")] + #[error("Client upgrade required; server storage version too new")] + ClientUpgradeRequired, + + // This means that our global state machine needs to enter a state (such as + // "FreshStartNeeded", but the allowed_states don't include that state.) + // It typically means we are trying to do a "fast" or "read-only" sync. + #[cfg(feature = "sync-client")] + #[error("Our storage needs setting up and we can't currently do it")] + SetupRequired, + + #[cfg(feature = "sync-client")] + #[error("Store error: {0}")] + StoreError(#[from] anyhow::Error), + + #[cfg(feature = "sync-client")] + #[error("Network error: {0}")] + RequestError(#[from] viaduct::Error), + + #[cfg(feature = "sync-client")] + #[error("Unexpected HTTP status: {0}")] + UnexpectedStatus(#[from] viaduct::UnexpectedStatus), + + #[cfg(feature = "sync-client")] + #[error("URL parse error: {0}")] + MalformedUrl(#[from] url::ParseError), + + #[error("The operation was interrupted.")] + Interrupted(#[from] Interrupted), +} + +#[cfg(feature = "sync-client")] +impl Error { + pub(crate) fn get_backoff(&self) -> Option { + if let Error::BackoffError(time) = self { + Some(*time) + } else { + None + } + } +} diff --git a/third_party/rust/sync15/src/key_bundle.rs b/third_party/rust/sync15/src/key_bundle.rs new file mode 100644 index 0000000000..6e0bdc23c0 --- /dev/null +++ b/third_party/rust/sync15/src/key_bundle.rs @@ -0,0 +1,224 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::{Error, Result}; +use rc_crypto::{ + aead::{self, OpeningKey, SealingKey}, + rand, +}; + +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct KeyBundle { + enc_key: Vec, + mac_key: Vec, +} + +impl std::fmt::Debug for KeyBundle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("KeyBundle").finish() + } +} + +impl KeyBundle { + /// Construct a key bundle from the already-decoded encrypt and hmac keys. + /// Panics (asserts) if they aren't both 32 bytes. + pub fn new(enc: Vec, mac: Vec) -> Result { + if enc.len() != 32 { + error_support::report_error!( + "sync15-key-bundle", + "Bad key length (enc_key): {} != 32", + enc.len() + ); + return Err(Error::BadKeyLength("enc_key", enc.len(), 32)); + } + if mac.len() != 32 { + error_support::report_error!( + "sync15-key-bundle", + "Bad key length (mac_key): {} != 32", + mac.len() + ); + return Err(Error::BadKeyLength("mac_key", mac.len(), 32)); + } + Ok(KeyBundle { + enc_key: enc, + mac_key: mac, + }) + } + + pub fn new_random() -> Result { + let mut buffer = [0u8; 64]; + rand::fill(&mut buffer)?; + KeyBundle::from_ksync_bytes(&buffer) + } + + pub fn from_ksync_bytes(ksync: &[u8]) -> Result { + if ksync.len() != 64 { + error_support::report_error!( + "sync15-key-bundle", + "Bad key length (kSync): {} != 64", + ksync.len() + ); + return Err(Error::BadKeyLength("kSync", ksync.len(), 64)); + } + Ok(KeyBundle { + enc_key: ksync[0..32].into(), + mac_key: ksync[32..64].into(), + }) + } + + pub fn from_ksync_base64(ksync: &str) -> Result { + let bytes = base64::decode_config(&ksync, base64::URL_SAFE_NO_PAD)?; + KeyBundle::from_ksync_bytes(&bytes) + } + + pub fn from_base64(enc: &str, mac: &str) -> Result { + let enc_bytes = base64::decode(&enc)?; + let mac_bytes = base64::decode(&mac)?; + KeyBundle::new(enc_bytes, mac_bytes) + } + + #[inline] + pub fn encryption_key(&self) -> &[u8] { + &self.enc_key + } + + #[inline] + pub fn hmac_key(&self) -> &[u8] { + &self.mac_key + } + + #[inline] + pub fn to_b64_array(&self) -> [String; 2] { + [base64::encode(&self.enc_key), base64::encode(&self.mac_key)] + } + + /// Decrypt the provided ciphertext with the given iv, and decodes the + /// result as a utf8 string. + pub fn decrypt(&self, enc_base64: &str, iv_base64: &str, hmac_base16: &str) -> Result { + // Decode the expected_hmac into bytes to avoid issues if a client happens to encode + // this as uppercase. This shouldn't happen in practice, but doing it this way is more + // robust and avoids an allocation. + let mut decoded_hmac = vec![0u8; 32]; + if base16::decode_slice(hmac_base16, &mut decoded_hmac).is_err() { + log::warn!("Garbage HMAC verification string: contained non base16 characters"); + return Err(Error::HmacMismatch); + } + let iv = base64::decode(iv_base64)?; + let ciphertext_bytes = base64::decode(enc_base64)?; + let key_bytes = [self.encryption_key(), self.hmac_key()].concat(); + let key = OpeningKey::new(&aead::LEGACY_SYNC_AES_256_CBC_HMAC_SHA256, &key_bytes)?; + let nonce = aead::Nonce::try_assume_unique_for_key( + &aead::LEGACY_SYNC_AES_256_CBC_HMAC_SHA256, + &iv, + )?; + let ciphertext_and_hmac = [ciphertext_bytes, decoded_hmac].concat(); + let cleartext_bytes = aead::open(&key, nonce, aead::Aad::empty(), &ciphertext_and_hmac)?; + let cleartext = String::from_utf8(cleartext_bytes)?; + Ok(cleartext) + } + + /// Encrypt using the provided IV. + pub fn encrypt_bytes_with_iv( + &self, + cleartext_bytes: &[u8], + iv: &[u8], + ) -> Result<(String, String)> { + let key_bytes = [self.encryption_key(), self.hmac_key()].concat(); + let key = SealingKey::new(&aead::LEGACY_SYNC_AES_256_CBC_HMAC_SHA256, &key_bytes)?; + let nonce = + aead::Nonce::try_assume_unique_for_key(&aead::LEGACY_SYNC_AES_256_CBC_HMAC_SHA256, iv)?; + let ciphertext_and_hmac = aead::seal(&key, nonce, aead::Aad::empty(), cleartext_bytes)?; + let ciphertext_len = ciphertext_and_hmac.len() - key.algorithm().tag_len(); + // Do the string conversions here so we don't have to split and copy to 2 vectors. + let (ciphertext, hmac_signature) = ciphertext_and_hmac.split_at(ciphertext_len); + let enc_base64 = base64::encode(&ciphertext); + let hmac_base16 = base16::encode_lower(&hmac_signature); + Ok((enc_base64, hmac_base16)) + } + + /// Generate a random iv and encrypt with it. Return both the encrypted bytes + /// and the generated iv. + pub fn encrypt_bytes_rand_iv( + &self, + cleartext_bytes: &[u8], + ) -> Result<(String, String, String)> { + let mut iv = [0u8; 16]; + rand::fill(&mut iv)?; + let (enc_base64, hmac_base16) = self.encrypt_bytes_with_iv(cleartext_bytes, &iv)?; + let iv_base64 = base64::encode(&iv); + Ok((enc_base64, iv_base64, hmac_base16)) + } + + pub fn encrypt_with_iv(&self, cleartext: &str, iv: &[u8]) -> Result<(String, String)> { + self.encrypt_bytes_with_iv(cleartext.as_bytes(), iv) + } + + pub fn encrypt_rand_iv(&self, cleartext: &str) -> Result<(String, String, String)> { + self.encrypt_bytes_rand_iv(cleartext.as_bytes()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + const HMAC_B16: &str = "b1e6c18ac30deb70236bc0d65a46f7a4dce3b8b0e02cf92182b914e3afa5eebc"; + const IV_B64: &str = "GX8L37AAb2FZJMzIoXlX8w=="; + const HMAC_KEY_B64: &str = "MMntEfutgLTc8FlTLQFms8/xMPmCldqPlq/QQXEjx70="; + const ENC_KEY_B64: &str = "9K/wLdXdw+nrTtXo4ZpECyHFNr4d7aYHqeg3KW9+m6Q="; + + const CIPHERTEXT_B64_PIECES: &[&str] = &[ + "NMsdnRulLwQsVcwxKW9XwaUe7ouJk5Wn80QhbD80l0HEcZGCynh45qIbeYBik0lgcHbK", + "mlIxTJNwU+OeqipN+/j7MqhjKOGIlvbpiPQQLC6/ffF2vbzL0nzMUuSyvaQzyGGkSYM2", + "xUFt06aNivoQTvU2GgGmUK6MvadoY38hhW2LCMkoZcNfgCqJ26lO1O0sEO6zHsk3IVz6", + "vsKiJ2Hq6VCo7hu123wNegmujHWQSGyf8JeudZjKzfi0OFRRvvm4QAKyBWf0MgrW1F8S", + "FDnVfkq8amCB7NhdwhgLWbN+21NitNwWYknoEWe1m6hmGZDgDT32uxzWxCV8QqqrpH/Z", + "ggViEr9uMgoy4lYaWqP7G5WKvvechc62aqnsNEYhH26A5QgzmlNyvB+KPFvPsYzxDnSC", + "jOoRSLx7GG86wT59QZw=", + ]; + + const CLEARTEXT_B64_PIECES: &[&str] = &[ + "eyJpZCI6IjVxUnNnWFdSSlpYciIsImhpc3RVcmkiOiJmaWxlOi8vL1VzZXJzL2phc29u", + "L0xpYnJhcnkvQXBwbGljYXRpb24lMjBTdXBwb3J0L0ZpcmVmb3gvUHJvZmlsZXMva3Nn", + "ZDd3cGsuTG9jYWxTeW5jU2VydmVyL3dlYXZlL2xvZ3MvIiwidGl0bGUiOiJJbmRleCBv", + "ZiBmaWxlOi8vL1VzZXJzL2phc29uL0xpYnJhcnkvQXBwbGljYXRpb24gU3VwcG9ydC9G", + "aXJlZm94L1Byb2ZpbGVzL2tzZ2Q3d3BrLkxvY2FsU3luY1NlcnZlci93ZWF2ZS9sb2dz", + "LyIsInZpc2l0cyI6W3siZGF0ZSI6MTMxOTE0OTAxMjM3MjQyNSwidHlwZSI6MX1dfQ==", + ]; + + #[test] + fn test_decrypt() { + let key_bundle = KeyBundle::from_base64(ENC_KEY_B64, HMAC_KEY_B64).unwrap(); + let ciphertext = CIPHERTEXT_B64_PIECES.join(""); + let s = key_bundle.decrypt(&ciphertext, IV_B64, HMAC_B16).unwrap(); + + let cleartext = + String::from_utf8(base64::decode(&CLEARTEXT_B64_PIECES.join("")).unwrap()).unwrap(); + assert_eq!(&cleartext, &s); + } + + #[test] + fn test_encrypt() { + let key_bundle = KeyBundle::from_base64(ENC_KEY_B64, HMAC_KEY_B64).unwrap(); + let iv = base64::decode(IV_B64).unwrap(); + + let cleartext_bytes = base64::decode(&CLEARTEXT_B64_PIECES.join("")).unwrap(); + let (enc_base64, _hmac_base16) = key_bundle + .encrypt_bytes_with_iv(&cleartext_bytes, &iv) + .unwrap(); + + let expect_ciphertext = CIPHERTEXT_B64_PIECES.join(""); + + assert_eq!(&enc_base64, &expect_ciphertext); + + let (enc_base64_2, iv_base64_2, hmac_base16_2) = + key_bundle.encrypt_bytes_rand_iv(&cleartext_bytes).unwrap(); + assert_ne!(&enc_base64_2, &expect_ciphertext); + + let s = key_bundle + .decrypt(&enc_base64_2, &iv_base64_2, &hmac_base16_2) + .unwrap(); + assert_eq!(&cleartext_bytes, &s.as_bytes()); + } +} diff --git a/third_party/rust/sync15/src/lib.rs b/third_party/rust/sync15/src/lib.rs new file mode 100644 index 0000000000..796284b77f --- /dev/null +++ b/third_party/rust/sync15/src/lib.rs @@ -0,0 +1,40 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#![allow(unknown_lints, clippy::implicit_hasher)] +#![warn(rust_2018_idioms)] + +pub mod bso; +#[cfg(feature = "sync-client")] +pub mod client; +// Types to describe client records +mod client_types; +// Note that `clients_engine` should probably be in `sync_client`, but let's not make +// things too nested at this stage... +#[cfg(feature = "sync-client")] +pub mod clients_engine; +#[cfg(feature = "crypto")] +mod enc_payload; +#[cfg(feature = "sync-engine")] +pub mod engine; +mod error; +#[cfg(feature = "crypto")] +mod key_bundle; +mod record_types; +mod server_timestamp; +pub mod telemetry; + +pub use crate::client_types::{ClientData, DeviceType, RemoteClient}; +pub use crate::error::{Error, Result}; +#[cfg(feature = "crypto")] +pub use enc_payload::EncryptedPayload; +#[cfg(feature = "crypto")] +pub use key_bundle::KeyBundle; +pub use server_timestamp::ServerTimestamp; +pub use sync_guid::Guid; + +// For skip_serializing_if +fn skip_if_default(v: &T) -> bool { + *v == T::default() +} diff --git a/third_party/rust/sync15/src/record_types.rs b/third_party/rust/sync15/src/record_types.rs new file mode 100644 index 0000000000..f8f1449fdc --- /dev/null +++ b/third_party/rust/sync15/src/record_types.rs @@ -0,0 +1,51 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use serde_derive::*; +use std::collections::HashMap; +use sync_guid::Guid; + +// Known record formats. + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MetaGlobalEngine { + pub version: usize, + #[serde(rename = "syncID")] + pub sync_id: Guid, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MetaGlobalRecord { + #[serde(rename = "syncID")] + pub sync_id: Guid, + #[serde(rename = "storageVersion")] + pub storage_version: usize, + #[serde(default)] + pub engines: HashMap, + #[serde(default)] + pub declined: Vec, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Eq, PartialEq)] +pub struct CryptoKeysRecord { + pub id: Guid, + pub collection: String, + pub default: [String; 2], + pub collections: HashMap, +} + +#[cfg(test)] +#[test] +fn test_deserialize_meta_global() { + let record = serde_json::json!({ + "syncID": "abcd1234abcd", + "storageVersion": 1, + }) + .to_string(); + let r = serde_json::from_str::(&record).unwrap(); + assert_eq!(r.sync_id, "abcd1234abcd"); + assert_eq!(r.storage_version, 1); + assert_eq!(r.engines.len(), 0); + assert_eq!(r.declined.len(), 0); +} diff --git a/third_party/rust/sync15/src/server_timestamp.rs b/third_party/rust/sync15/src/server_timestamp.rs new file mode 100644 index 0000000000..7dd7a7fc42 --- /dev/null +++ b/third_party/rust/sync15/src/server_timestamp.rs @@ -0,0 +1,122 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +use std::time::Duration; + +/// Typesafe way to manage server timestamps without accidentally mixing them up with +/// local ones. +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Default)] +pub struct ServerTimestamp(pub i64); + +impl ServerTimestamp { + pub fn from_float_seconds(ts: f64) -> Self { + let rf = (ts * 1000.0).round(); + if !rf.is_finite() || rf < 0.0 || rf >= i64::max_value() as f64 { + error_support::report_error!("sync15-illegal-timestamp", "Illegal timestamp: {}", ts); + ServerTimestamp(0) + } else { + ServerTimestamp(rf as i64) + } + } + + pub fn from_millis(ts: i64) -> Self { + // Catch it in tests, but just complain and replace with 0 otherwise. + debug_assert!(ts >= 0, "Bad timestamp: {}", ts); + if ts >= 0 { + Self(ts) + } else { + error_support::report_error!( + "sync15-illegal-timestamp", + "Illegal timestamp, substituting 0: {}", + ts + ); + Self(0) + } + } +} + +// This lets us use these in hyper header! blocks. +impl std::str::FromStr for ServerTimestamp { + type Err = std::num::ParseFloatError; + fn from_str(s: &str) -> Result { + let val = f64::from_str(s)?; + Ok(Self::from_float_seconds(val)) + } +} + +impl std::fmt::Display for ServerTimestamp { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0 as f64 / 1000.0) + } +} + +impl ServerTimestamp { + pub const EPOCH: ServerTimestamp = ServerTimestamp(0); + + /// Returns None if `other` is later than `self` (Duration may not represent + /// negative timespans in rust). + #[inline] + pub fn duration_since(self, other: ServerTimestamp) -> Option { + let delta = self.0 - other.0; + if delta < 0 { + None + } else { + Some(Duration::from_millis(delta as u64)) + } + } + + /// Get the milliseconds for the timestamp. + #[inline] + pub fn as_millis(self) -> i64 { + self.0 + } +} + +impl serde::ser::Serialize for ServerTimestamp { + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_f64(self.0 as f64 / 1000.0) + } +} + +impl<'de> serde::de::Deserialize<'de> for ServerTimestamp { + fn deserialize>(d: D) -> Result { + f64::deserialize(d).map(Self::from_float_seconds) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_server_timestamp() { + let t0 = ServerTimestamp(10_300_150); + let t1 = ServerTimestamp(10_100_050); + assert!(t1.duration_since(t0).is_none()); + assert!(t0.duration_since(t1).is_some()); + let dur = t0.duration_since(t1).unwrap(); + assert_eq!(dur.as_secs(), 200); + assert_eq!(dur.subsec_nanos(), 100_000_000); + } + + #[test] + fn test_serde() { + let ts = ServerTimestamp(123_456); + + // test serialize + let ser = serde_json::to_string(&ts).unwrap(); + assert_eq!("123.456".to_string(), ser); + + // test deserialize of float + let ts: ServerTimestamp = serde_json::from_str(&ser).unwrap(); + assert_eq!(ServerTimestamp(123_456), ts); + + // test deserialize of whole number + let ts: ServerTimestamp = serde_json::from_str("123").unwrap(); + assert_eq!(ServerTimestamp(123_000), ts); + + // test deserialize of negative number + let ts: ServerTimestamp = serde_json::from_str("-123").unwrap(); + assert_eq!(ServerTimestamp(0), ts); + } +} diff --git a/third_party/rust/sync15/src/telemetry.rs b/third_party/rust/sync15/src/telemetry.rs new file mode 100644 index 0000000000..dac018f032 --- /dev/null +++ b/third_party/rust/sync15/src/telemetry.rs @@ -0,0 +1,824 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! Manage recording sync telemetry. Assumes some external telemetry +//! library/code which manages submitting. + +use crate::error::Error; +#[cfg(feature = "sync-client")] +use crate::error::ErrorResponse; + +use std::collections::HashMap; +use std::time; + +use serde::{ser, Serialize, Serializer}; + +// A test helper, used by the many test modules below. +#[cfg(test)] +fn assert_json(v: &T, expected: serde_json::Value) +where + T: serde::Serialize, +{ + assert_eq!( + serde_json::to_value(&v).expect("should get a value"), + expected + ); +} + +/// What we record for 'when' and 'took' in a telemetry record. +#[derive(Debug, Serialize)] +struct WhenTook { + when: f64, + #[serde(skip_serializing_if = "crate::skip_if_default")] + took: u64, +} + +/// What we track while recording 'when' and 'took. It serializes as a WhenTook, +/// except when .finished() hasn't been called, in which case it panics. +#[derive(Debug)] +enum Stopwatch { + Started(time::SystemTime, time::Instant), + Finished(WhenTook), +} + +impl Default for Stopwatch { + fn default() -> Self { + Stopwatch::new() + } +} + +impl Stopwatch { + fn new() -> Self { + Stopwatch::Started(time::SystemTime::now(), time::Instant::now()) + } + + // For tests we don't want real timestamps because we test against literals. + #[cfg(test)] + fn finished(&self) -> Self { + Stopwatch::Finished(WhenTook { when: 0.0, took: 0 }) + } + + #[cfg(not(test))] + fn finished(&self) -> Self { + match self { + Stopwatch::Started(st, si) => { + let std = st.duration_since(time::UNIX_EPOCH).unwrap_or_default(); + let when = std.as_secs() as f64; // we don't want sub-sec accuracy. Do we need to write a float? + + let sid = si.elapsed(); + let took = sid.as_secs() * 1000 + (u64::from(sid.subsec_nanos()) / 1_000_000); + Stopwatch::Finished(WhenTook { when, took }) + } + _ => { + unreachable!("can't finish twice"); + } + } + } +} + +impl Serialize for Stopwatch { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: Serializer, + { + match self { + Stopwatch::Started(_, _) => Err(ser::Error::custom("StopWatch has not been finished")), + Stopwatch::Finished(c) => c.serialize(serializer), + } + } +} + +#[cfg(test)] +mod stopwatch_tests { + use super::*; + + // A wrapper struct because we flatten - this struct should serialize with + // 'when' and 'took' keys (but with no 'sw'.) + #[derive(Debug, Serialize)] + struct WT { + #[serde(flatten)] + sw: Stopwatch, + } + + #[test] + fn test_not_finished() { + let wt = WT { + sw: Stopwatch::new(), + }; + serde_json::to_string(&wt).expect_err("unfinished stopwatch should fail"); + } + + #[test] + fn test() { + assert_json( + &WT { + sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 1 }), + }, + serde_json::json!({"when": 1.0, "took": 1}), + ); + assert_json( + &WT { + sw: Stopwatch::Finished(WhenTook { when: 1.0, took: 0 }), + }, + serde_json::json!({"when": 1.0}), + ); + } +} + +/// A generic "Event" - suitable for all kinds of pings (although this module +/// only cares about the sync ping) +#[derive(Debug, Serialize)] +pub struct Event { + // We use static str references as we expect values to be literals. + object: &'static str, + + method: &'static str, + + // Maybe "value" should be a string? + #[serde(skip_serializing_if = "Option::is_none")] + value: Option<&'static str>, + + // we expect the keys to be literals but values are real strings. + #[serde(skip_serializing_if = "Option::is_none")] + extra: Option>, +} + +impl Event { + pub fn new(object: &'static str, method: &'static str) -> Self { + assert!(object.len() <= 20); + assert!(method.len() <= 20); + Self { + object, + method, + value: None, + extra: None, + } + } + + pub fn value(mut self, v: &'static str) -> Self { + assert!(v.len() <= 80); + self.value = Some(v); + self + } + + pub fn extra(mut self, key: &'static str, val: String) -> Self { + assert!(key.len() <= 15); + assert!(val.len() <= 85); + match self.extra { + None => self.extra = Some(HashMap::new()), + Some(ref e) => assert!(e.len() < 10), + } + self.extra.as_mut().unwrap().insert(key, val); + self + } +} + +#[cfg(test)] +mod test_events { + use super::*; + + #[test] + #[should_panic] + fn test_invalid_length_ctor() { + Event::new("A very long object value", "Method"); + } + + #[test] + #[should_panic] + fn test_invalid_length_extra_key() { + Event::new("O", "M").extra("A very long key value", "v".to_string()); + } + + #[test] + #[should_panic] + fn test_invalid_length_extra_val() { + let l = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ + abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + Event::new("O", "M").extra("k", l.to_string()); + } + + #[test] + #[should_panic] + fn test_too_many_extras() { + let l = "abcdefghijk"; + let mut e = Event::new("Object", "Method"); + for i in 0..l.len() { + e = e.extra(&l[i..=i], "v".to_string()); + } + } + + #[test] + fn test_json() { + assert_json( + &Event::new("Object", "Method").value("Value"), + serde_json::json!({"object": "Object", "method": "Method", "value": "Value"}), + ); + + assert_json( + &Event::new("Object", "Method").extra("one", "one".to_string()), + serde_json::json!({"object": "Object", + "method": "Method", + "extra": {"one": "one"} + }), + ) + } +} + +/// A Sync failure. +#[derive(Debug, Serialize)] +#[serde(tag = "name")] +pub enum SyncFailure { + #[serde(rename = "shutdownerror")] + Shutdown, + + #[serde(rename = "othererror")] + Other { error: String }, + + #[serde(rename = "unexpectederror")] + Unexpected { error: String }, + + #[serde(rename = "autherror")] + Auth { from: &'static str }, + + #[serde(rename = "httperror")] + Http { code: u16 }, +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn reprs() { + assert_json( + &SyncFailure::Shutdown, + serde_json::json!({"name": "shutdownerror"}), + ); + + assert_json( + &SyncFailure::Other { + error: "dunno".to_string(), + }, + serde_json::json!({"name": "othererror", "error": "dunno"}), + ); + + assert_json( + &SyncFailure::Unexpected { + error: "dunno".to_string(), + }, + serde_json::json!({"name": "unexpectederror", "error": "dunno"}), + ); + + assert_json( + &SyncFailure::Auth { from: "FxA" }, + serde_json::json!({"name": "autherror", "from": "FxA"}), + ); + + assert_json( + &SyncFailure::Http { code: 500 }, + serde_json::json!({"name": "httperror", "code": 500}), + ); + } +} + +/// Incoming record for an engine's sync +#[derive(Debug, Default, Serialize)] +pub struct EngineIncoming { + #[serde(skip_serializing_if = "crate::skip_if_default")] + applied: u32, + + #[serde(skip_serializing_if = "crate::skip_if_default")] + failed: u32, + + #[serde(rename = "newFailed")] + #[serde(skip_serializing_if = "crate::skip_if_default")] + new_failed: u32, + + #[serde(skip_serializing_if = "crate::skip_if_default")] + reconciled: u32, +} + +impl EngineIncoming { + pub fn new() -> Self { + Self { + ..Default::default() + } + } + + // A helper used via skip_serializing_if + fn is_empty(inc: &Option) -> bool { + match inc { + Some(a) => a.applied == 0 && a.failed == 0 && a.new_failed == 0 && a.reconciled == 0, + None => true, + } + } + + /// Increment the value of `applied` by `n`. + #[inline] + pub fn applied(&mut self, n: u32) { + self.applied += n; + } + + /// Increment the value of `failed` by `n`. + #[inline] + pub fn failed(&mut self, n: u32) { + self.failed += n; + } + + /// Increment the value of `new_failed` by `n`. + #[inline] + pub fn new_failed(&mut self, n: u32) { + self.new_failed += n; + } + + /// Increment the value of `reconciled` by `n`. + #[inline] + pub fn reconciled(&mut self, n: u32) { + self.reconciled += n; + } + + /// Get the value of `applied`. Mostly useful for testing. + #[inline] + pub fn get_applied(&self) -> u32 { + self.applied + } + + /// Get the value of `failed`. Mostly useful for testing. + #[inline] + pub fn get_failed(&self) -> u32 { + self.failed + } + + /// Get the value of `new_failed`. Mostly useful for testing. + #[inline] + pub fn get_new_failed(&self) -> u32 { + self.new_failed + } + + /// Get the value of `reconciled`. Mostly useful for testing. + #[inline] + pub fn get_reconciled(&self) -> u32 { + self.reconciled + } +} + +/// Outgoing record for an engine's sync +#[derive(Debug, Default, Serialize)] +pub struct EngineOutgoing { + #[serde(skip_serializing_if = "crate::skip_if_default")] + sent: usize, + + #[serde(skip_serializing_if = "crate::skip_if_default")] + failed: usize, +} + +impl EngineOutgoing { + pub fn new() -> Self { + EngineOutgoing { + ..Default::default() + } + } + + #[inline] + pub fn sent(&mut self, n: usize) { + self.sent += n; + } + + #[inline] + pub fn failed(&mut self, n: usize) { + self.failed += n; + } +} + +/// One engine's sync. +#[derive(Debug, Serialize)] +pub struct Engine { + name: String, + + #[serde(flatten)] + when_took: Stopwatch, + + #[serde(skip_serializing_if = "EngineIncoming::is_empty")] + incoming: Option, + + #[serde(skip_serializing_if = "Vec::is_empty")] + outgoing: Vec, // one for each batch posted. + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "failureReason")] + failure: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + validation: Option, +} + +impl Engine { + pub fn new(name: impl Into) -> Self { + Self { + name: name.into(), + when_took: Stopwatch::new(), + incoming: None, + outgoing: Vec::new(), + failure: None, + validation: None, + } + } + + pub fn incoming(&mut self, inc: EngineIncoming) { + assert!(self.incoming.is_none()); + self.incoming = Some(inc); + } + + pub fn outgoing(&mut self, out: EngineOutgoing) { + self.outgoing.push(out); + } + + pub fn failure(&mut self, err: impl Into) { + // Currently we take the first error, under the assumption that the + // first is the most important and all others stem from that. + let failure = err.into(); + if self.failure.is_none() { + self.failure = Some(failure); + } else { + log::warn!( + "engine already has recorded a failure of {:?} - ignoring {:?}", + &self.failure, + &failure + ); + } + } + + pub fn validation(&mut self, v: Validation) { + assert!(self.validation.is_none()); + self.validation = Some(v); + } + + fn finished(&mut self) { + self.when_took = self.when_took.finished(); + } +} + +#[derive(Debug, Default, Serialize)] +pub struct Validation { + version: u32, + + #[serde(skip_serializing_if = "Vec::is_empty")] + problems: Vec, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "failureReason")] + failure: Option, +} + +impl Validation { + pub fn with_version(version: u32) -> Validation { + Validation { + version, + ..Validation::default() + } + } + + pub fn problem(&mut self, name: &'static str, count: usize) -> &mut Self { + if count > 0 { + self.problems.push(Problem { name, count }); + } + self + } +} + +#[derive(Debug, Default, Serialize)] +pub struct Problem { + name: &'static str, + #[serde(skip_serializing_if = "crate::skip_if_default")] + count: usize, +} + +#[cfg(test)] +mod engine_tests { + use super::*; + + #[test] + fn test_engine() { + let mut e = Engine::new("test_engine"); + e.finished(); + assert_json(&e, serde_json::json!({"name": "test_engine", "when": 0.0})); + } + + #[test] + fn test_engine_not_finished() { + let e = Engine::new("test_engine"); + serde_json::to_value(&e).expect_err("unfinished stopwatch should fail"); + } + + #[test] + fn test_incoming() { + let mut i = EngineIncoming::new(); + i.applied(1); + i.failed(2); + let mut e = Engine::new("TestEngine"); + e.incoming(i); + e.finished(); + assert_json( + &e, + serde_json::json!({"name": "TestEngine", "when": 0.0, "incoming": {"applied": 1, "failed": 2}}), + ); + } + + #[test] + fn test_outgoing() { + let mut o = EngineOutgoing::new(); + o.sent(2); + o.failed(1); + let mut e = Engine::new("TestEngine"); + e.outgoing(o); + e.finished(); + assert_json( + &e, + serde_json::json!({"name": "TestEngine", "when": 0.0, "outgoing": [{"sent": 2, "failed": 1}]}), + ); + } + + #[test] + fn test_failure() { + let mut e = Engine::new("TestEngine"); + e.failure(SyncFailure::Http { code: 500 }); + e.finished(); + assert_json( + &e, + serde_json::json!({"name": "TestEngine", + "when": 0.0, + "failureReason": {"name": "httperror", "code": 500} + }), + ); + } + + #[test] + fn test_raw() { + let mut e = Engine::new("TestEngine"); + let mut inc = EngineIncoming::new(); + inc.applied(10); + e.incoming(inc); + let mut out = EngineOutgoing::new(); + out.sent(1); + e.outgoing(out); + e.failure(SyncFailure::Http { code: 500 }); + e.finished(); + + assert_eq!(e.outgoing.len(), 1); + assert_eq!(e.incoming.as_ref().unwrap().applied, 10); + assert_eq!(e.outgoing[0].sent, 1); + assert!(e.failure.is_some()); + serde_json::to_string(&e).expect("should get json"); + } +} + +/// A single sync. May have many engines, may have its own failure. +#[derive(Debug, Serialize, Default)] +pub struct SyncTelemetry { + #[serde(flatten)] + when_took: Stopwatch, + + #[serde(skip_serializing_if = "Vec::is_empty")] + engines: Vec, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "failureReason")] + failure: Option, +} + +impl SyncTelemetry { + pub fn new() -> Self { + Default::default() + } + + pub fn engine(&mut self, mut e: Engine) { + e.finished(); + self.engines.push(e); + } + + pub fn failure(&mut self, failure: SyncFailure) { + assert!(self.failure.is_none()); + self.failure = Some(failure); + } + + // Note that unlike other 'finished' methods, this isn't private - someone + // needs to explicitly call this before handling the json payload to + // whatever ends up submitting it. + pub fn finished(&mut self) { + self.when_took = self.when_took.finished(); + } +} + +#[cfg(test)] +mod sync_tests { + use super::*; + + #[test] + fn test_accum() { + let mut s = SyncTelemetry::new(); + let mut inc = EngineIncoming::new(); + inc.applied(10); + let mut e = Engine::new("test_engine"); + e.incoming(inc); + e.failure(SyncFailure::Http { code: 500 }); + e.finished(); + s.engine(e); + s.finished(); + + assert_json( + &s, + serde_json::json!({ + "when": 0.0, + "engines": [{ + "name":"test_engine", + "when":0.0, + "incoming": { + "applied": 10 + }, + "failureReason": { + "name": "httperror", + "code": 500 + } + }] + }), + ); + } + + #[test] + fn test_multi_engine() { + let mut inc_e1 = EngineIncoming::new(); + inc_e1.applied(1); + let mut e1 = Engine::new("test_engine"); + e1.incoming(inc_e1); + + let mut inc_e2 = EngineIncoming::new(); + inc_e2.failed(1); + let mut e2 = Engine::new("test_engine_2"); + e2.incoming(inc_e2); + let mut out_e2 = EngineOutgoing::new(); + out_e2.sent(1); + e2.outgoing(out_e2); + + let mut s = SyncTelemetry::new(); + s.engine(e1); + s.engine(e2); + s.failure(SyncFailure::Http { code: 500 }); + s.finished(); + assert_json( + &s, + serde_json::json!({ + "when": 0.0, + "engines": [{ + "name": "test_engine", + "when": 0.0, + "incoming": { + "applied": 1 + } + },{ + "name": "test_engine_2", + "when": 0.0, + "incoming": { + "failed": 1 + }, + "outgoing": [{ + "sent": 1 + }] + }], + "failureReason": { + "name": "httperror", + "code": 500 + } + }), + ); + } +} + +/// The Sync ping payload, as documented at +/// https://firefox-source-docs.mozilla.org/toolkit/components/telemetry/telemetry/data/sync-ping.html. +/// May have many syncs, may have many events. However, due to the architecture +/// of apps which use these components, this payload is almost certainly not +/// suitable for submitting directly. For example, we will always return a +/// payload with exactly 1 sync, and it will not know certain other fields +/// in the payload, such as the *hashed* FxA device ID (see +/// https://searchfox.org/mozilla-central/rev/c3ebaf6de2d481c262c04bb9657eaf76bf47e2ac/services/sync/modules/browserid_identity.js#185 +/// for an example of how the device ID is constructed). The intention is that +/// consumers of this will use this to create a "real" payload - eg, accumulating +/// until some threshold number of syncs is reached, and contributing +/// additional data which only the consumer knows. +#[derive(Debug, Serialize, Default)] +pub struct SyncTelemetryPing { + version: u32, + + uid: Option, + + #[serde(skip_serializing_if = "Vec::is_empty")] + events: Vec, + + #[serde(skip_serializing_if = "Vec::is_empty")] + syncs: Vec, +} + +impl SyncTelemetryPing { + pub fn new() -> Self { + Self { + version: 1, + ..Default::default() + } + } + + pub fn uid(&mut self, uid: String) { + if let Some(ref existing) = self.uid { + if *existing != uid { + log::warn!("existing uid ${} being replaced by {}", existing, uid); + } + } + self.uid = Some(uid); + } + + pub fn sync(&mut self, mut s: SyncTelemetry) { + s.finished(); + self.syncs.push(s); + } + + pub fn event(&mut self, e: Event) { + self.events.push(e); + } +} + +ffi_support::implement_into_ffi_by_json!(SyncTelemetryPing); + +#[cfg(test)] +mod ping_tests { + use super::*; + #[test] + fn test_ping() { + let engine = Engine::new("test"); + let mut s = SyncTelemetry::new(); + s.engine(engine); + let mut p = SyncTelemetryPing::new(); + p.uid("user-id".into()); + p.sync(s); + let event = Event::new("foo", "bar"); + p.event(event); + assert_json( + &p, + serde_json::json!({ + "events": [{ + "method": "bar", "object": "foo" + }], + "syncs": [{ + "engines": [{ + "name": "test", "when": 0.0 + }], + "when": 0.0 + }], + "uid": "user-id", + "version": 1 + }), + ); + } +} + +impl<'a> From<&'a Error> for SyncFailure { + fn from(e: &Error) -> SyncFailure { + match e { + #[cfg(feature = "sync-client")] + Error::TokenserverHttpError(status) => { + if *status == 401 { + SyncFailure::Auth { + from: "tokenserver", + } + } else { + SyncFailure::Http { code: *status } + } + } + #[cfg(feature = "sync-client")] + Error::BackoffError(_) => SyncFailure::Http { code: 503 }, + #[cfg(feature = "sync-client")] + Error::StorageHttpError(ref e) => match e { + ErrorResponse::NotFound { .. } => SyncFailure::Http { code: 404 }, + ErrorResponse::Unauthorized { .. } => SyncFailure::Auth { from: "storage" }, + ErrorResponse::PreconditionFailed { .. } => SyncFailure::Http { code: 412 }, + ErrorResponse::ServerError { status, .. } => SyncFailure::Http { code: *status }, + ErrorResponse::RequestFailed { status, .. } => SyncFailure::Http { code: *status }, + }, + #[cfg(feature = "crypto")] + Error::CryptoError(ref e) => SyncFailure::Unexpected { + error: e.to_string(), + }, + #[cfg(feature = "sync-client")] + Error::RequestError(ref e) => SyncFailure::Unexpected { + error: e.to_string(), + }, + #[cfg(feature = "sync-client")] + Error::UnexpectedStatus(ref e) => SyncFailure::Http { code: e.status }, + Error::Interrupted(ref e) => SyncFailure::Unexpected { + error: e.to_string(), + }, + e => SyncFailure::Other { + error: e.to_string(), + }, + } + } +} -- cgit v1.2.3