diff options
Diffstat (limited to 'third_party/rust/sync15')
25 files changed, 7114 insertions, 0 deletions
diff --git a/third_party/rust/sync15/.cargo-checksum.json b/third_party/rust/sync15/.cargo-checksum.json new file mode 100644 index 0000000000..85f142b019 --- /dev/null +++ b/third_party/rust/sync15/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"Cargo.toml":"354427a8d1be8414740ec8f469b8ac2024c9b1faa825cdd82345c4b03bd1d785","README.md":"8b2d669841fa7618a762b7e2cfaabfcabfb87a74c9725013061aedc6ed9e37de","src/bso_record.rs":"a7d53a6db9b0fddf247aefb1af365c94b4e39427479aa9f80b39bcc3532b3d72","src/changeset.rs":"9f29a1e4f953e0e6525bec6f73b88042afc0208bbfb9dcc56e11fe7f4cd55f8a","src/client.rs":"25d7e357021c3893c13ea28aa5bf616ec58832e88b74e123580e4bfd9339025b","src/clients/engine.rs":"2ae025c005f55634be3bc8ecc77110a69fa8680da26fc0e2d304d93a73b54e54","src/clients/mod.rs":"c7e230d4c4bdfbbe6c390bc970ea35920540af915a0051c02cadcbc97ad6880e","src/clients/record.rs":"f555d2cb2d713553280ca07fa55fc8a46bb6e56deb0a430a688c2f6f6a91493e","src/clients/ser.rs":"10aee1110410e3d8f38cbeaf1ea9b00ce8c4eab4d48feb3de206e5fbe38ca46d","src/coll_state.rs":"ad96ecaeda3522b2209abd4c1a3ac8be91744763cdf872d5d5845cb6213931db","src/collection_keys.rs":"5a167524e1d653c2d2ca794b99b3e1afc236d3036c9445bdaec8f34fe7938ec5","src/error.rs":"2651a4212d87716ef774784e8bd864eeb5a981f2be2b5d92edbe07df87b9a60a","src/key_bundle.rs":"ceddf59ac19a5757c967afe1795a810faf2ae65a762e085685d4be811328b2f8","src/lib.rs":"b735f28c60dd737cd59aea075c0ec563aba3ee42c6d38b27efc9eba040dc4448","src/migrate_state.rs":"214f4fc1d98c8a6f10e1dcb7dcaf9dc9d866b4908c09c5cc16fed6d3218828d4","src/record_types.rs":"02bb3d352fb808131d298f9b90d9c95b7e9e0138b97c5401f3b9fdacc5562f44","src/request.rs":"e0c27705dc5ebfb79c78af54e73d436cbac7a87bf6eeb3a06511a5b66fb8e8e1","src/state.rs":"bae333c014065fb02563ce08add2f109d6d18a26a0f24dfd637c636c8dc6dc7c","src/status.rs":"d1efab1e992d0340e602abafd02e049a9d75a51f5b0efe4ebcb3963e156c92d5","src/sync.rs":"4dcb6e015b9670a9af96647e4f25a0fc723cc6f7eef8ceee4d9d7f9af43cc969","src/sync_multiple.rs":"8153d7f26493c8e3c988aea40e2121d105f2d3483e97c9cf99d11deda435f692","src/telemetry.rs":"742ed86dcd18b9d99ba51b5a54121dc6158d5b7ed567249e9d7f5a65c4c40a66","src/token.rs":"2b9dea900809e855cdd4f676a295b8c2a41583208aeb05596d609b72cbd1f818","src/util.rs":"10bba86298c50ffd4316aee7e4212efa36db76f6d2412b57596b5375ca6713dc"},"package":null}
\ No newline at end of file diff --git a/third_party/rust/sync15/Cargo.toml b/third_party/rust/sync15/Cargo.toml new file mode 100644 index 0000000000..66967b2101 --- /dev/null +++ b/third_party/rust/sync15/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "sync15" +edition = "2018" +version = "0.1.0" +authors = ["Thom Chiovoloni <tchiovoloni@mozilla.com>"] +license = "MPL-2.0" +exclude = ["/android", "/ios"] + +[features] +default = [] + +[dependencies] +base64 = "0.12" +ffi-support = "0.4" +serde = "1" +serde_derive = "1" +serde_json = "1" +url = "2.1" +log = "0.4" +lazy_static = "1.4" +base16 = "0.2" +rc_crypto = { path = "../support/rc_crypto", features = ["hawk"] } +viaduct = { path = "../viaduct" } +interrupt-support = { path = "../support/interrupt" } +error-support = { path = "../support/error" } +sync-guid = { path = "../support/guid", features = ["random"] } +sync15-traits = {path = "../support/sync15-traits"} +thiserror = "1.0" +anyhow = "1.0" + +[dev-dependencies] +env_logger = { version = "0.7", default-features = false } diff --git a/third_party/rust/sync15/README.md b/third_party/rust/sync15/README.md new file mode 100644 index 0000000000..fadd3fb069 --- /dev/null +++ b/third_party/rust/sync15/README.md @@ -0,0 +1,8 @@ +# Low-level sync-1.5 helper component + +This component contains utility code to be shared between different +data stores that want to sync against a Firefox Sync v1.5 sync server. +It handles things like encrypting/decrypting records, obtaining and +using storage node auth tokens, and so-on. + +It also needs a more complete README... diff --git a/third_party/rust/sync15/src/bso_record.rs b/third_party/rust/sync15/src/bso_record.rs new file mode 100644 index 0000000000..2b3f532402 --- /dev/null +++ b/third_party/rust/sync15/src/bso_record.rs @@ -0,0 +1,452 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error; +use crate::key_bundle::KeyBundle; +use crate::util::ServerTimestamp; +use lazy_static::lazy_static; +use serde::de::{Deserialize, DeserializeOwned}; +use serde::ser::Serialize; +use serde_derive::*; +use std::ops::{Deref, DerefMut}; +pub use sync15_traits::Payload; +use sync_guid::Guid; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct BsoRecord<T> { + pub id: Guid, + + // It's not clear to me if this actually can be empty in practice. + // firefox-ios seems to think it can... + #[serde(default = "String::new")] + pub collection: String, + + #[serde(skip_serializing)] + // If we don't give it a default, we fail to deserialize + // items we wrote out during tests and such. + #[serde(default = "ServerTimestamp::default")] + pub modified: ServerTimestamp, + + #[serde(skip_serializing_if = "Option::is_none")] + pub sortindex: Option<i32>, + + #[serde(skip_serializing_if = "Option::is_none")] + pub ttl: Option<u32>, + + // We do some serde magic here with serde to parse the payload from JSON as we deserialize. + // This avoids having a separate intermediate type that only exists so that we can deserialize + // it's payload field as JSON (Especially since this one is going to exist more-or-less just so + // that we can decrypt the data...) + #[serde( + with = "as_json", + bound(serialize = "T: Serialize", deserialize = "T: DeserializeOwned") + )] + pub payload: T, +} + +impl<T> BsoRecord<T> { + #[inline] + pub fn map_payload<P, F>(self, mapper: F) -> BsoRecord<P> + where + F: FnOnce(T) -> P, + { + BsoRecord { + id: self.id, + collection: self.collection, + modified: self.modified, + sortindex: self.sortindex, + ttl: self.ttl, + payload: mapper(self.payload), + } + } + + #[inline] + pub fn with_payload<P>(self, payload: P) -> BsoRecord<P> { + self.map_payload(|_| payload) + } + + #[inline] + pub fn new_record(id: String, coll: String, payload: T) -> BsoRecord<T> { + BsoRecord { + id: id.into(), + collection: coll, + ttl: None, + sortindex: None, + modified: ServerTimestamp::default(), + payload, + } + } + + pub fn try_map_payload<P, E>( + self, + mapper: impl FnOnce(T) -> Result<P, E>, + ) -> Result<BsoRecord<P>, E> { + self.map_payload(mapper).transpose() + } + + pub fn map_payload_or<P>(self, mapper: impl FnOnce(T) -> Option<P>) -> Option<BsoRecord<P>> { + self.map_payload(mapper).transpose() + } + + #[inline] + pub fn into_timestamped_payload(self) -> (T, ServerTimestamp) { + (self.payload, self.modified) + } +} + +impl<T> BsoRecord<Option<T>> { + /// Helper to improve ergonomics for handling records that might be tombstones. + #[inline] + pub fn transpose(self) -> Option<BsoRecord<T>> { + let BsoRecord { + id, + collection, + modified, + sortindex, + ttl, + payload, + } = self; + match payload { + Some(p) => Some(BsoRecord { + id, + collection, + modified, + sortindex, + ttl, + payload: p, + }), + None => None, + } + } +} + +impl<T, E> BsoRecord<Result<T, E>> { + #[inline] + pub fn transpose(self) -> Result<BsoRecord<T>, E> { + let BsoRecord { + id, + collection, + modified, + sortindex, + ttl, + payload, + } = self; + match payload { + Ok(p) => Ok(BsoRecord { + id, + collection, + modified, + sortindex, + ttl, + payload: p, + }), + Err(e) => Err(e), + } + } +} + +impl<T> Deref for BsoRecord<T> { + type Target = T; + #[inline] + fn deref(&self) -> &T { + &self.payload + } +} + +impl<T> DerefMut for BsoRecord<T> { + #[inline] + fn deref_mut(&mut self) -> &mut T { + &mut self.payload + } +} + +impl CleartextBso { + pub fn from_payload(mut payload: Payload, collection: impl Into<String>) -> Self { + let id = payload.id.clone(); + let sortindex: Option<i32> = payload.take_auto_field("sortindex"); + let ttl: Option<u32> = payload.take_auto_field("ttl"); + BsoRecord { + id, + collection: collection.into(), + modified: ServerTimestamp::default(), // Doesn't matter. + sortindex, + ttl, + payload, + } + } +} + +pub type EncryptedBso = BsoRecord<EncryptedPayload>; +pub type CleartextBso = BsoRecord<Payload>; + +// Contains the methods to automatically deserialize the payload to/from json. +mod as_json { + use serde::de::{self, Deserialize, DeserializeOwned, Deserializer}; + use serde::ser::{self, Serialize, Serializer}; + + pub fn serialize<T, S>(t: &T, serializer: S) -> Result<S::Ok, S::Error> + where + T: Serialize, + S: Serializer, + { + let j = serde_json::to_string(t).map_err(ser::Error::custom)?; + serializer.serialize_str(&j) + } + + pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error> + where + T: DeserializeOwned, + D: Deserializer<'de>, + { + let j = String::deserialize(deserializer)?; + serde_json::from_str(&j).map_err(de::Error::custom) + } +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct EncryptedPayload { + #[serde(rename = "IV")] + pub iv: String, + pub hmac: String, + pub ciphertext: String, +} + +// This is a little cludgey but I couldn't think of another way to have easy deserialization +// without a bunch of wrapper types, while still only serializing a single time in the +// postqueue. +lazy_static! { + // The number of bytes taken up by padding in a EncryptedPayload. + static ref EMPTY_ENCRYPTED_PAYLOAD_SIZE: usize = serde_json::to_string( + &EncryptedPayload { iv: "".into(), hmac: "".into(), ciphertext: "".into() } + ).unwrap().len(); +} + +impl EncryptedPayload { + #[inline] + pub fn serialized_len(&self) -> usize { + (*EMPTY_ENCRYPTED_PAYLOAD_SIZE) + self.ciphertext.len() + self.hmac.len() + self.iv.len() + } + + pub fn decrypt_and_parse_payload<T>(&self, key: &KeyBundle) -> error::Result<T> + where + for<'a> T: Deserialize<'a>, + { + let cleartext = key.decrypt(&self.ciphertext, &self.iv, &self.hmac)?; + Ok(serde_json::from_str(&cleartext)?) + } + + pub fn from_cleartext_payload<T: Serialize>( + key: &KeyBundle, + cleartext_payload: &T, + ) -> error::Result<Self> { + let cleartext = serde_json::to_string(cleartext_payload)?; + let (enc_base64, iv_base64, hmac_base16) = + key.encrypt_bytes_rand_iv(&cleartext.as_bytes())?; + Ok(EncryptedPayload { + iv: iv_base64, + hmac: hmac_base16, + ciphertext: enc_base64, + }) + } +} + +impl EncryptedBso { + pub fn decrypt(self, key: &KeyBundle) -> error::Result<CleartextBso> { + let new_payload = self + .payload + .decrypt_and_parse_payload::<Payload>(key)? + .with_auto_field("sortindex", self.sortindex) + .with_auto_field("ttl", self.ttl); + + let result = self.with_payload(new_payload); + Ok(result) + } + + pub fn decrypt_as<T>(self, key: &KeyBundle) -> error::Result<BsoRecord<T>> + where + for<'a> T: Deserialize<'a>, + { + Ok(self.decrypt(key)?.into_record::<T>()?) + } +} + +impl CleartextBso { + pub fn encrypt(self, key: &KeyBundle) -> error::Result<EncryptedBso> { + let encrypted_payload = EncryptedPayload::from_cleartext_payload(key, &self.payload)?; + Ok(self.with_payload(encrypted_payload)) + } + + pub fn into_record<T>(self) -> error::Result<BsoRecord<T>> + where + for<'a> T: Deserialize<'a>, + { + Ok(self.try_map_payload(Payload::into_record)?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use serde_json::Value as JsonValue; + + #[test] + fn test_deserialize_enc() { + let serialized = r#"{ + "id": "1234", + "collection": "passwords", + "modified": 12344321.0, + "payload": "{\"IV\": \"aaaaa\", \"hmac\": \"bbbbb\", \"ciphertext\": \"ccccc\"}" + }"#; + let record: BsoRecord<EncryptedPayload> = serde_json::from_str(serialized).unwrap(); + assert_eq!(&record.id, "1234"); + assert_eq!(&record.collection, "passwords"); + assert_eq!((record.modified.0 - 12_344_321_000).abs(), 0); + assert_eq!(record.sortindex, None); + assert_eq!(&record.payload.iv, "aaaaa"); + assert_eq!(&record.payload.hmac, "bbbbb"); + assert_eq!(&record.payload.ciphertext, "ccccc"); + } + + #[test] + fn test_deserialize_autofields() { + let serialized = r#"{ + "id": "1234", + "collection": "passwords", + "modified": 12344321.0, + "sortindex": 100, + "ttl": 99, + "payload": "{\"IV\": \"aaaaa\", \"hmac\": \"bbbbb\", \"ciphertext\": \"ccccc\"}" + }"#; + let record: BsoRecord<EncryptedPayload> = serde_json::from_str(serialized).unwrap(); + assert_eq!(record.sortindex, Some(100)); + assert_eq!(record.ttl, Some(99)); + } + + #[test] + fn test_serialize_enc() { + let goal = r#"{"id":"1234","collection":"passwords","payload":"{\"IV\":\"aaaaa\",\"hmac\":\"bbbbb\",\"ciphertext\":\"ccccc\"}"}"#; + let record = BsoRecord { + id: "1234".into(), + modified: ServerTimestamp(999), // shouldn't be serialized by client no matter what it's value is + collection: "passwords".into(), + sortindex: None, + ttl: None, + payload: EncryptedPayload { + iv: "aaaaa".into(), + hmac: "bbbbb".into(), + ciphertext: "ccccc".into(), + }, + }; + let actual = serde_json::to_string(&record).unwrap(); + assert_eq!(actual, goal); + + let val_str_payload: serde_json::Value = serde_json::from_str(goal).unwrap(); + assert_eq!( + val_str_payload["payload"].as_str().unwrap().len(), + record.payload.serialized_len() + ) + } + + #[test] + fn test_roundtrip_crypt_tombstone() { + let orig_record = CleartextBso::from_payload( + Payload::from_json(json!({ "id": "aaaaaaaaaaaa", "deleted": true, })).unwrap(), + "dummy", + ); + + assert!(orig_record.is_tombstone()); + + let keybundle = KeyBundle::new_random().unwrap(); + + let encrypted = orig_record.clone().encrypt(&keybundle).unwrap(); + + // While we're here, check on EncryptedPayload::serialized_len + let val_rec = + serde_json::from_str::<JsonValue>(&serde_json::to_string(&encrypted).unwrap()).unwrap(); + + assert_eq!( + encrypted.payload.serialized_len(), + val_rec["payload"].as_str().unwrap().len() + ); + + let decrypted: CleartextBso = encrypted.decrypt(&keybundle).unwrap(); + assert!(decrypted.is_tombstone()); + assert_eq!(decrypted, orig_record); + } + + #[test] + fn test_roundtrip_crypt_record() { + let payload = json!({ "id": "aaaaaaaaaaaa", "age": 105, "meta": "data" }); + let orig_record = + CleartextBso::from_payload(Payload::from_json(payload.clone()).unwrap(), "dummy"); + + assert!(!orig_record.is_tombstone()); + + let keybundle = KeyBundle::new_random().unwrap(); + + let encrypted = orig_record.clone().encrypt(&keybundle).unwrap(); + + // While we're here, check on EncryptedPayload::serialized_len + let val_rec = + serde_json::from_str::<JsonValue>(&serde_json::to_string(&encrypted).unwrap()).unwrap(); + assert_eq!( + encrypted.payload.serialized_len(), + val_rec["payload"].as_str().unwrap().len() + ); + + let decrypted = encrypted.decrypt(&keybundle).unwrap(); + assert!(!decrypted.is_tombstone()); + assert_eq!(decrypted, orig_record); + assert_eq!(serde_json::to_value(decrypted.payload).unwrap(), payload); + } + + #[test] + fn test_record_auto_fields() { + let payload = json!({ "id": "aaaaaaaaaaaa", "age": 105, "meta": "data", "sortindex": 100, "ttl": 99 }); + let bso = CleartextBso::from_payload(Payload::from_json(payload).unwrap(), "dummy"); + + // We don't want the keys ending up in the actual record data on the server. + assert!(!bso.payload.data.contains_key("sortindex")); + assert!(!bso.payload.data.contains_key("ttl")); + + // But we do want them in the BsoRecord. + assert_eq!(bso.sortindex, Some(100)); + assert_eq!(bso.ttl, Some(99)); + + let keybundle = KeyBundle::new_random().unwrap(); + let encrypted = bso.encrypt(&keybundle).unwrap(); + + let decrypted = encrypted.decrypt(&keybundle).unwrap(); + // We add auto fields during decryption. + assert_eq!(decrypted.payload.data["sortindex"], 100); + assert_eq!(decrypted.payload.data["ttl"], 99); + + assert_eq!(decrypted.sortindex, Some(100)); + assert_eq!(decrypted.ttl, Some(99)); + } + #[test] + fn test_record_bad_hmac() { + let payload = json!({ "id": "aaaaaaaaaaaa", "age": 105, "meta": "data", "sortindex": 100, "ttl": 99 }); + let bso = CleartextBso::from_payload(Payload::from_json(payload).unwrap(), "dummy"); + + let keybundle = KeyBundle::new_random().unwrap(); + let encrypted = bso.encrypt(&keybundle).unwrap(); + let keybundle2 = KeyBundle::new_random().unwrap(); + + let e = encrypted + .decrypt(&keybundle2) + .expect_err("Should fail because wrong keybundle"); + + // Note: ErrorKind isn't PartialEq, so. + match e.kind() { + error::ErrorKind::CryptoError(_) => { + // yay. + } + other => { + panic!("Expected Crypto Error, got {:?}", other); + } + } + } +} diff --git a/third_party/rust/sync15/src/changeset.rs b/third_party/rust/sync15/src/changeset.rs new file mode 100644 index 0000000000..24bd888bf8 --- /dev/null +++ b/third_party/rust/sync15/src/changeset.rs @@ -0,0 +1,145 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::bso_record::{CleartextBso, EncryptedBso}; +use crate::client::{Sync15ClientResponse, Sync15StorageClient}; +use crate::error::{self, ErrorKind, ErrorResponse, Result}; +use crate::key_bundle::KeyBundle; +use crate::request::{CollectionRequest, NormalResponseHandler, UploadInfo}; +use crate::util::ServerTimestamp; +use crate::CollState; +use std::borrow::Cow; + +pub use sync15_traits::{IncomingChangeset, OutgoingChangeset, RecordChangeset}; + +pub fn encrypt_outgoing(o: OutgoingChangeset, key: &KeyBundle) -> Result<Vec<EncryptedBso>> { + let RecordChangeset { + changes, + collection, + .. + } = o; + changes + .into_iter() + .map(|change| CleartextBso::from_payload(change, collection.clone()).encrypt(key)) + .collect() +} + +pub fn fetch_incoming( + client: &Sync15StorageClient, + state: &mut CollState, + collection_request: &CollectionRequest, +) -> Result<IncomingChangeset> { + let collection = collection_request.collection.clone(); + let (records, timestamp) = match client.get_encrypted_records(collection_request)? { + Sync15ClientResponse::Success { + record, + last_modified, + .. + } => (record, last_modified), + other => return Err(other.create_storage_error().into()), + }; + // xxx - duplication below of `timestamp` smells wrong + state.last_modified = timestamp; + let mut result = IncomingChangeset::new(collection, timestamp); + result.changes.reserve(records.len()); + for record in records { + // if we see a HMAC error, we've made an explicit decision to + // NOT handle it here, but restart the global state machine. + // That should cause us to re-read crypto/keys and things should + // work (although if for some reason crypto/keys was updated but + // not all storage was wiped we are probably screwed.) + let decrypted = record.decrypt(&state.key)?; + result.changes.push(decrypted.into_timestamped_payload()); + } + Ok(result) +} + +#[derive(Debug, Clone)] +pub struct CollectionUpdate<'a> { + client: &'a Sync15StorageClient, + state: &'a CollState, + collection: Cow<'static, str>, + xius: ServerTimestamp, + to_update: Vec<EncryptedBso>, + fully_atomic: bool, +} + +impl<'a> CollectionUpdate<'a> { + pub fn new( + client: &'a Sync15StorageClient, + state: &'a CollState, + collection: Cow<'static, str>, + xius: ServerTimestamp, + records: Vec<EncryptedBso>, + fully_atomic: bool, + ) -> CollectionUpdate<'a> { + CollectionUpdate { + client, + state, + collection, + xius, + to_update: records, + fully_atomic, + } + } + + pub fn new_from_changeset( + client: &'a Sync15StorageClient, + state: &'a CollState, + changeset: OutgoingChangeset, + fully_atomic: bool, + ) -> Result<CollectionUpdate<'a>> { + let collection = changeset.collection.clone(); + let xius = changeset.timestamp; + if xius < state.last_modified { + // We know we are going to fail the XIUS check... + return Err( + ErrorKind::StorageHttpError(ErrorResponse::PreconditionFailed { + route: collection.into_owned(), + }) + .into(), + ); + } + let to_update = crate::changeset::encrypt_outgoing(changeset, &state.key)?; + Ok(CollectionUpdate::new( + client, + state, + collection, + xius, + to_update, + fully_atomic, + )) + } + + /// Returns a list of the IDs that failed if allowed_dropped_records is true, otherwise + /// returns an empty vec. + pub fn upload(self) -> error::Result<UploadInfo> { + let mut failed = vec![]; + let mut q = self.client.new_post_queue( + &self.collection, + &self.state.config, + self.xius, + NormalResponseHandler::new(!self.fully_atomic), + )?; + + for record in self.to_update.into_iter() { + let enqueued = q.enqueue(&record)?; + if !enqueued && self.fully_atomic { + return Err(ErrorKind::RecordTooLargeError.into()); + } + } + + q.flush(true)?; + let mut info = q.completed_upload_info(); + info.failed_ids.append(&mut failed); + if self.fully_atomic { + assert_eq!( + info.failed_ids.len(), + 0, + "Bug: Should have failed by now if we aren't allowing dropped records" + ); + } + Ok(info) + } +} diff --git a/third_party/rust/sync15/src/client.rs b/third_party/rust/sync15/src/client.rs new file mode 100644 index 0000000000..98ed0dbd5d --- /dev/null +++ b/third_party/rust/sync15/src/client.rs @@ -0,0 +1,452 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::bso_record::{BsoRecord, EncryptedBso}; +use crate::error::{self, ErrorKind, ErrorResponse}; +use crate::record_types::MetaGlobalRecord; +use crate::request::{ + BatchPoster, CollectionRequest, InfoCollections, InfoConfiguration, PostQueue, PostResponse, + PostResponseHandler, +}; +use crate::token; +use crate::util::ServerTimestamp; +use serde_json::Value; +use std::str::FromStr; +use std::sync::atomic::{AtomicU32, Ordering}; +use url::Url; +use viaduct::{ + header_names::{self, AUTHORIZATION}, + Method, Request, Response, +}; + +/// A response from a GET request on a Sync15StorageClient, encapsulating all +/// the variants users of this client needs to care about. +#[derive(Debug, Clone)] +pub enum Sync15ClientResponse<T> { + Success { + status: u16, + record: T, + last_modified: ServerTimestamp, + route: String, + }, + Error(ErrorResponse), +} + +fn parse_seconds(seconds_str: &str) -> Option<u32> { + let secs = seconds_str.parse::<f64>().ok()?.ceil(); + // Note: u32 doesn't impl TryFrom<f64> :( + if !secs.is_finite() || secs < 0.0 || secs >= f64::from(u32::max_value()) { + Some(secs as u32) + } else { + log::warn!("invalid backoff value: {}", secs); + None + } +} + +impl<T> Sync15ClientResponse<T> { + pub fn from_response(resp: Response, backoff_listener: &BackoffListener) -> error::Result<Self> + where + for<'a> T: serde::de::Deserialize<'a>, + { + let route: String = resp.url.path().into(); + // Android seems to respect retry_after even on success requests, so we + // will too if it's present. This also lets us handle both backoff-like + // properties in the same place. + let retry_after = resp + .headers + .get(header_names::RETRY_AFTER) + .and_then(parse_seconds); + + let backoff = resp + .headers + .get(header_names::X_WEAVE_BACKOFF) + .and_then(parse_seconds); + + if let Some(b) = backoff { + backoff_listener.note_backoff(b); + } + if let Some(ra) = retry_after { + backoff_listener.note_retry_after(ra); + } + + Ok(if resp.is_success() { + let record: T = resp.json()?; + let last_modified = resp + .headers + .get(header_names::X_LAST_MODIFIED) + .and_then(|s| ServerTimestamp::from_str(s).ok()) + .ok_or_else(|| ErrorKind::MissingServerTimestamp)?; + log::info!( + "Successful request to \"{}\", incoming x-last-modified={:?}", + route, + last_modified + ); + + Sync15ClientResponse::Success { + status: resp.status, + record, + last_modified, + route, + } + } else { + let status = resp.status; + log::info!("Request \"{}\" was an error (status={})", route, status); + match status { + 404 => Sync15ClientResponse::Error(ErrorResponse::NotFound { route }), + 401 => Sync15ClientResponse::Error(ErrorResponse::Unauthorized { route }), + 412 => Sync15ClientResponse::Error(ErrorResponse::PreconditionFailed { route }), + 500..=600 => { + Sync15ClientResponse::Error(ErrorResponse::ServerError { route, status }) + } + _ => Sync15ClientResponse::Error(ErrorResponse::RequestFailed { route, status }), + } + }) + } + + pub fn create_storage_error(self) -> ErrorKind { + let inner = match self { + Sync15ClientResponse::Success { status, route, .. } => { + // This should never happen as callers are expected to have + // already special-cased this response, so warn if it does. + // (or maybe we could panic?) + log::warn!("Converting success response into an error"); + ErrorResponse::RequestFailed { status, route } + } + Sync15ClientResponse::Error(e) => e, + }; + ErrorKind::StorageHttpError(inner) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Sync15StorageClientInit { + pub key_id: String, + pub access_token: String, + pub tokenserver_url: Url, +} + +/// A trait containing the methods required to run through the setup state +/// machine. This is factored out into a separate trait to make mocking +/// easier. +pub trait SetupStorageClient { + fn fetch_info_configuration(&self) -> error::Result<Sync15ClientResponse<InfoConfiguration>>; + fn fetch_info_collections(&self) -> error::Result<Sync15ClientResponse<InfoCollections>>; + fn fetch_meta_global(&self) -> error::Result<Sync15ClientResponse<MetaGlobalRecord>>; + fn fetch_crypto_keys(&self) -> error::Result<Sync15ClientResponse<EncryptedBso>>; + + fn put_meta_global( + &self, + xius: ServerTimestamp, + global: &MetaGlobalRecord, + ) -> error::Result<ServerTimestamp>; + fn put_crypto_keys(&self, xius: ServerTimestamp, keys: &EncryptedBso) -> error::Result<()>; + fn wipe_all_remote(&self) -> error::Result<()>; +} + +#[derive(Debug, Default)] +pub struct BackoffState { + pub backoff_secs: AtomicU32, + pub retry_after_secs: AtomicU32, +} + +pub(crate) type BackoffListener = std::sync::Arc<BackoffState>; + +pub(crate) fn new_backoff_listener() -> BackoffListener { + std::sync::Arc::new(BackoffState::default()) +} + +impl BackoffState { + pub fn note_backoff(&self, noted: u32) { + crate::util::atomic_update_max(&self.backoff_secs, noted) + } + + pub fn note_retry_after(&self, noted: u32) { + crate::util::atomic_update_max(&self.retry_after_secs, noted) + } + + pub fn get_backoff_secs(&self) -> u32 { + self.backoff_secs.load(Ordering::SeqCst) + } + + pub fn get_retry_after_secs(&self) -> u32 { + self.retry_after_secs.load(Ordering::SeqCst) + } + + pub fn get_required_wait(&self, ignore_soft_backoff: bool) -> Option<std::time::Duration> { + let bo = self.get_backoff_secs(); + let ra = self.get_retry_after_secs(); + let secs = u64::from(if ignore_soft_backoff { ra } else { bo.max(ra) }); + if secs > 0 { + Some(std::time::Duration::from_secs(secs)) + } else { + None + } + } + + pub fn reset(&self) { + self.backoff_secs.store(0, Ordering::SeqCst); + self.retry_after_secs.store(0, Ordering::SeqCst); + } +} + +#[derive(Debug)] +pub struct Sync15StorageClient { + tsc: token::TokenProvider, + pub(crate) backoff: BackoffListener, +} + +impl SetupStorageClient for Sync15StorageClient { + fn fetch_info_configuration(&self) -> error::Result<Sync15ClientResponse<InfoConfiguration>> { + self.relative_storage_request(Method::Get, "info/configuration") + } + + fn fetch_info_collections(&self) -> error::Result<Sync15ClientResponse<InfoCollections>> { + self.relative_storage_request(Method::Get, "info/collections") + } + + fn fetch_meta_global(&self) -> error::Result<Sync15ClientResponse<MetaGlobalRecord>> { + // meta/global is a Bso, so there's an extra dance to do. + let got: Sync15ClientResponse<BsoRecord<MetaGlobalRecord>> = + self.relative_storage_request(Method::Get, "storage/meta/global")?; + Ok(match got { + Sync15ClientResponse::Success { + record, + last_modified, + route, + status, + } => { + log::debug!( + "Got meta global with modified = {}; last-modified = {}", + record.modified, + last_modified + ); + Sync15ClientResponse::Success { + record: record.payload, + last_modified, + route, + status, + } + } + Sync15ClientResponse::Error(e) => Sync15ClientResponse::Error(e), + }) + } + + fn fetch_crypto_keys(&self) -> error::Result<Sync15ClientResponse<EncryptedBso>> { + self.relative_storage_request(Method::Get, "storage/crypto/keys") + } + + fn put_meta_global( + &self, + xius: ServerTimestamp, + global: &MetaGlobalRecord, + ) -> error::Result<ServerTimestamp> { + let bso = BsoRecord::new_record("global".into(), "meta".into(), global); + self.put("storage/meta/global", xius, &bso) + } + + fn put_crypto_keys(&self, xius: ServerTimestamp, keys: &EncryptedBso) -> error::Result<()> { + self.put("storage/crypto/keys", xius, keys)?; + Ok(()) + } + + fn wipe_all_remote(&self) -> error::Result<()> { + let s = self.tsc.api_endpoint()?; + let url = Url::parse(&s)?; + + let req = self.build_request(Method::Delete, url)?; + match self.exec_request::<Value>(req, false) { + Ok(Sync15ClientResponse::Error(ErrorResponse::NotFound { .. })) + | Ok(Sync15ClientResponse::Success { .. }) => Ok(()), + Ok(resp) => Err(resp.create_storage_error().into()), + Err(e) => Err(e), + } + } +} + +impl Sync15StorageClient { + pub fn new(init_params: Sync15StorageClientInit) -> error::Result<Sync15StorageClient> { + rc_crypto::ensure_initialized(); + let tsc = token::TokenProvider::new( + init_params.tokenserver_url, + init_params.access_token, + init_params.key_id, + )?; + Ok(Sync15StorageClient { + tsc, + backoff: new_backoff_listener(), + }) + } + + pub fn get_encrypted_records( + &self, + collection_request: &CollectionRequest, + ) -> error::Result<Sync15ClientResponse<Vec<EncryptedBso>>> { + self.collection_request(Method::Get, collection_request) + } + + #[inline] + fn authorized(&self, req: Request) -> error::Result<Request> { + let hawk_header_value = self.tsc.authorization(&req)?; + Ok(req.header(AUTHORIZATION, hawk_header_value)?) + } + + // TODO: probably want a builder-like API to do collection requests (e.g. something + // that occupies roughly the same conceptual role as the Collection class in desktop) + fn build_request(&self, method: Method, url: Url) -> error::Result<Request> { + self.authorized(Request::new(method, url).header(header_names::ACCEPT, "application/json")?) + } + + fn relative_storage_request<P, T>( + &self, + method: Method, + relative_path: P, + ) -> error::Result<Sync15ClientResponse<T>> + where + P: AsRef<str>, + for<'a> T: serde::de::Deserialize<'a>, + { + let s = self.tsc.api_endpoint()? + "/"; + let url = Url::parse(&s)?.join(relative_path.as_ref())?; + self.exec_request(self.build_request(method, url)?, false) + } + + fn exec_request<T>( + &self, + req: Request, + require_success: bool, + ) -> error::Result<Sync15ClientResponse<T>> + where + for<'a> T: serde::de::Deserialize<'a>, + { + log::trace!( + "request: {} {} ({:?})", + req.method, + req.url.path(), + req.url.query() + ); + let resp = req.send()?; + + let result = Sync15ClientResponse::from_response(resp, &self.backoff)?; + match result { + Sync15ClientResponse::Success { .. } => Ok(result), + _ => { + if require_success { + Err(result.create_storage_error().into()) + } else { + Ok(result) + } + } + } + } + + fn collection_request<T>( + &self, + method: Method, + r: &CollectionRequest, + ) -> error::Result<Sync15ClientResponse<T>> + where + for<'a> T: serde::de::Deserialize<'a>, + { + let url = r.build_url(Url::parse(&self.tsc.api_endpoint()?)?)?; + self.exec_request(self.build_request(method, url)?, false) + } + + pub fn new_post_queue<'a, F: PostResponseHandler>( + &'a self, + coll: &str, + config: &InfoConfiguration, + ts: ServerTimestamp, + on_response: F, + ) -> error::Result<PostQueue<PostWrapper<'a>, F>> { + let pw = PostWrapper { + client: self, + coll: coll.into(), + }; + Ok(PostQueue::new(config, ts, pw, on_response)) + } + + fn put<P, B>( + &self, + relative_path: P, + xius: ServerTimestamp, + body: &B, + ) -> error::Result<ServerTimestamp> + where + P: AsRef<str>, + B: serde::ser::Serialize, + { + let s = self.tsc.api_endpoint()? + "/"; + let url = Url::parse(&s)?.join(relative_path.as_ref())?; + + let req = self + .build_request(Method::Put, url)? + .json(body) + .header(header_names::X_IF_UNMODIFIED_SINCE, format!("{}", xius))?; + + let resp = self.exec_request::<Value>(req, true)?; + // Note: we pass `true` for require_success, so this panic never happens. + if let Sync15ClientResponse::Success { last_modified, .. } = resp { + Ok(last_modified) + } else { + unreachable!("Error returned exec_request when `require_success` was true"); + } + } + + pub fn hashed_uid(&self) -> error::Result<String> { + self.tsc.hashed_uid() + } + + pub(crate) fn wipe_remote_engine(&self, engine: &str) -> error::Result<()> { + let s = self.tsc.api_endpoint()? + "/"; + let url = Url::parse(&s)?.join(&format!("storage/{}", engine))?; + log::debug!("Wiping: {:?}", url); + let req = self.build_request(Method::Delete, url)?; + match self.exec_request::<Value>(req, false) { + Ok(Sync15ClientResponse::Error(ErrorResponse::NotFound { .. })) + | Ok(Sync15ClientResponse::Success { .. }) => Ok(()), + Ok(resp) => Err(resp.create_storage_error().into()), + Err(e) => Err(e), + } + } +} + +pub struct PostWrapper<'a> { + client: &'a Sync15StorageClient, + coll: String, +} + +impl<'a> BatchPoster for PostWrapper<'a> { + fn post<T, O>( + &self, + bytes: Vec<u8>, + xius: ServerTimestamp, + batch: Option<String>, + commit: bool, + _: &PostQueue<T, O>, + ) -> error::Result<PostResponse> { + let url = CollectionRequest::new(self.coll.clone()) + .batch(batch) + .commit(commit) + .build_url(Url::parse(&self.client.tsc.api_endpoint()?)?)?; + + let req = self + .client + .build_request(Method::Post, url)? + .header(header_names::CONTENT_TYPE, "application/json")? + .header(header_names::X_IF_UNMODIFIED_SINCE, format!("{}", xius))? + .body(bytes); + self.client.exec_request(req, false) + } +} + +#[cfg(test)] +mod test { + use super::*; + #[test] + fn test_send() { + fn ensure_send<T: Send>() {} + // Compile will fail if not send. + ensure_send::<Sync15StorageClient>(); + } +} diff --git a/third_party/rust/sync15/src/clients/engine.rs b/third_party/rust/sync15/src/clients/engine.rs new file mode 100644 index 0000000000..a9e77d109d --- /dev/null +++ b/third_party/rust/sync15/src/clients/engine.rs @@ -0,0 +1,706 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::collections::{HashMap, HashSet}; + +use crate::{ + bso_record::Payload, + changeset::{CollectionUpdate, IncomingChangeset, OutgoingChangeset}, + client::Sync15StorageClient, + coll_state::CollState, + collection_keys::CollectionKeys, + key_bundle::KeyBundle, + request::{CollectionRequest, InfoConfiguration}, + state::GlobalState, +}; +use interrupt_support::Interruptee; +use sync15_traits::client::ClientData; + +use super::{ + record::{ClientRecord, CommandRecord}, + ser::shrink_to_fit, + Command, CommandProcessor, CommandStatus, RemoteClient, CLIENTS_TTL, +}; +use crate::error::Result; + +const COLLECTION_NAME: &str = "clients"; + +/// The driver for the clients engine. Internal; split out from the `Engine` +/// struct to make testing easier. +struct Driver<'a> { + command_processor: &'a dyn CommandProcessor, + interruptee: &'a dyn Interruptee, + config: &'a InfoConfiguration, + recent_clients: HashMap<String, RemoteClient>, +} + +impl<'a> Driver<'a> { + fn new( + command_processor: &'a dyn CommandProcessor, + interruptee: &'a dyn Interruptee, + config: &'a InfoConfiguration, + ) -> Driver<'a> { + Driver { + command_processor, + interruptee, + config, + recent_clients: HashMap::new(), + } + } + + fn note_recent_client(&mut self, client: &ClientRecord) { + self.recent_clients.insert(client.id.clone(), client.into()); + } + + fn sync( + &mut self, + inbound: IncomingChangeset, + should_refresh_client: bool, + ) -> Result<OutgoingChangeset> { + let mut outgoing = OutgoingChangeset::new(COLLECTION_NAME, inbound.timestamp); + outgoing.timestamp = inbound.timestamp; + + self.interruptee.err_if_interrupted()?; + let outgoing_commands = self.command_processor.fetch_outgoing_commands()?; + + let mut has_own_client_record = false; + + for (payload, _) in inbound.changes { + self.interruptee.err_if_interrupted()?; + + // Unpack the client record. We should never have tombstones in the + // clients collection, so we don't check for `is_tombstone`. + // https://github.com/mozilla/application-services/issues/1801 + // tracks deleting these from the server. + let client: ClientRecord = payload.into_record()?; + + if client.id == self.command_processor.settings().fxa_device_id { + log::debug!("Found my record on the server"); + // If we see our own client record, apply any incoming commands, + // remove them from the list, and reupload the record. Any + // commands that we don't understand also go back in the list. + // https://github.com/mozilla/application-services/issues/1800 + // tracks if that's the right thing to do. + has_own_client_record = true; + let mut current_client_record = self.current_client_record(); + for c in &client.commands { + let status = match c.as_command() { + Some(command) => self.command_processor.apply_incoming_command(command)?, + None => CommandStatus::Unsupported, + }; + match status { + CommandStatus::Applied => {} + CommandStatus::Ignored => { + log::debug!("Ignored command {:?}", c); + } + CommandStatus::Unsupported => { + log::warn!("Don't know how to apply command {:?}", c); + current_client_record.commands.push(c.clone()); + } + } + } + + // The clients collection has a hard limit on the payload size, + // after which the server starts rejecting our records. Large + // command lists can cause us to exceed this, so we truncate + // the list. + shrink_to_fit( + &mut current_client_record.commands, + self.memcache_max_record_payload_size(), + )?; + + // Add the new client record to our map of recently synced + // clients, so that downstream consumers like synced tabs can + // access them. + self.note_recent_client(¤t_client_record); + + // We periodically upload our own client record, even if it + // doesn't change, to keep it fresh. + // (but this part sucks - if the ttl on the server happens to be + // different (as some other client did something strange) we + // still want the records to compare equal - but the ttl hack + // doesn't allow that.) + let mut client_compare = client.clone(); + client_compare.ttl = current_client_record.ttl; + if should_refresh_client || client_compare != current_client_record { + log::debug!("Will update our client record on the server"); + outgoing + .changes + .push(Payload::from_record(current_client_record)?); + } + } else { + // Add the other client to our map of recently synced clients. + self.note_recent_client(&client); + + // Bail if we don't have any outgoing commands to write into + // the other client's record. + if outgoing_commands.is_empty() { + continue; + } + + // Determine if we have new commands, that aren't already in the + // client's command list. + let current_commands: HashSet<Command> = client + .commands + .iter() + .filter_map(|c| c.as_command()) + .collect(); + let mut new_outgoing_commands = outgoing_commands + .difference(¤t_commands) + .cloned() + .collect::<Vec<_>>(); + // Sort, to ensure deterministic ordering for tests. + new_outgoing_commands.sort(); + let mut new_client = client.clone(); + new_client + .commands + .extend(new_outgoing_commands.into_iter().map(CommandRecord::from)); + if new_client.commands.len() == client.commands.len() { + continue; + } + + // Hooray, we added new commands! Make sure the record still + // fits in the maximum record size, or the server will reject + // our upload. + shrink_to_fit( + &mut new_client.commands, + self.memcache_max_record_payload_size(), + )?; + + // We want to ensure the TTL for all records we write, which + // may not be true for incoming ones - so make sure it is. + new_client.ttl = CLIENTS_TTL; + outgoing.changes.push(Payload::from_record(new_client)?); + } + } + + // Upload a record for our own client, if we didn't replace it already. + if !has_own_client_record { + let current_client_record = self.current_client_record(); + self.note_recent_client(¤t_client_record); + outgoing + .changes + .push(Payload::from_record(current_client_record)?); + } + + Ok(outgoing) + } + + /// Builds a fresh client record for this device. + fn current_client_record(&self) -> ClientRecord { + let settings = self.command_processor.settings(); + ClientRecord { + id: settings.fxa_device_id.clone(), + name: settings.device_name.clone(), + typ: Some(settings.device_type.as_str().into()), + commands: Vec::new(), + fxa_device_id: Some(settings.fxa_device_id.clone()), + version: None, + protocols: vec!["1.5".into()], + form_factor: None, + os: None, + app_package: None, + application: None, + device: None, + ttl: CLIENTS_TTL, + } + } + + fn max_record_payload_size(&self) -> usize { + let payload_max = self.config.max_record_payload_bytes; + if payload_max <= self.config.max_post_bytes { + self.config.max_post_bytes.saturating_sub(4096) + } else { + payload_max + } + } + + /// Collections stored in memcached ("tabs", "clients" or "meta") have a + /// different max size than ones stored in the normal storage server db. + /// In practice, the real limit here is 1M (bug 1300451 comment 40), but + /// there's overhead involved that is hard to calculate on the client, so we + /// use 512k to be safe (at the recommendation of the server team). Note + /// that if the server reports a lower limit (via info/configuration), we + /// respect that limit instead. See also bug 1403052. + fn memcache_max_record_payload_size(&self) -> usize { + self.max_record_payload_size().min(512 * 1024) + } +} + +pub struct Engine<'a> { + pub command_processor: &'a dyn CommandProcessor, + pub interruptee: &'a dyn Interruptee, + pub recent_clients: HashMap<String, RemoteClient>, +} + +impl<'a> Engine<'a> { + /// Creates a new clients engine that delegates to the given command + /// processor to apply incoming commands. + pub fn new<'b>( + command_processor: &'b dyn CommandProcessor, + interruptee: &'b dyn Interruptee, + ) -> Engine<'b> { + Engine { + command_processor, + interruptee, + recent_clients: HashMap::new(), + } + } + + /// Syncs the clients collection. This works a little differently than + /// other collections: + /// + /// 1. It can't be disabled or declined. + /// 2. The sync ID and last sync time aren't meaningful, since we always + /// fetch all client records on every sync. As such, the + /// `LocalCollStateMachine` that we use for other engines doesn't + /// apply to it. + /// 3. It doesn't persist state directly, but relies on the sync manager + /// to persist device settings, and process commands. + /// 4. Failing to sync the clients collection is fatal, and aborts the + /// sync. + /// + /// For these reasons, we implement this engine directly in the `sync15` + /// crate, and provide a specialized `sync` method instead of implementing + /// `sync15::Store`. + pub fn sync( + &mut self, + storage_client: &Sync15StorageClient, + global_state: &GlobalState, + root_sync_key: &KeyBundle, + should_refresh_client: bool, + ) -> Result<()> { + log::info!("Syncing collection clients"); + + let coll_keys = + CollectionKeys::from_encrypted_bso(global_state.keys.clone(), &root_sync_key)?; + let mut coll_state = CollState { + config: global_state.config.clone(), + last_modified: global_state + .collections + .get(COLLECTION_NAME) + .cloned() + .unwrap_or_default(), + key: coll_keys.key_for_collection(COLLECTION_NAME).clone(), + }; + + let inbound = self.fetch_incoming(&storage_client, &mut coll_state)?; + + let mut driver = Driver::new( + self.command_processor, + self.interruptee, + &global_state.config, + ); + + let outgoing = driver.sync(inbound, should_refresh_client)?; + self.recent_clients = driver.recent_clients; + + coll_state.last_modified = outgoing.timestamp; + + self.interruptee.err_if_interrupted()?; + let upload_info = + CollectionUpdate::new_from_changeset(&storage_client, &coll_state, outgoing, true)? + .upload()?; + + log::info!( + "Upload success ({} records success, {} records failed)", + upload_info.successful_ids.len(), + upload_info.failed_ids.len() + ); + + log::info!("Finished syncing clients"); + Ok(()) + } + + fn fetch_incoming( + &self, + storage_client: &Sync15StorageClient, + coll_state: &mut CollState, + ) -> Result<IncomingChangeset> { + // Note that, unlike other stores, we always fetch the full collection + // on every sync, so `inbound` will return all clients, not just the + // ones that changed since the last sync. + let coll_request = CollectionRequest::new(COLLECTION_NAME).full(); + + self.interruptee.err_if_interrupted()?; + let inbound = crate::changeset::fetch_incoming(&storage_client, coll_state, &coll_request)?; + + Ok(inbound) + } + + pub fn local_client_id(&self) -> String { + // Bit dirty but it's the easiest way to reach to our own + // device ID without refactoring the whole sync manager crate. + self.command_processor.settings().fxa_device_id.clone() + } + + pub fn get_client_data(&self) -> ClientData { + ClientData { + local_client_id: self.local_client_id(), + recent_clients: self.recent_clients.clone(), + } + } +} + +#[cfg(test)] +mod tests { + use crate::clients::{CommandStatus, DeviceType, Settings}; + use crate::util::ServerTimestamp; + use anyhow::Result; + use interrupt_support::NeverInterrupts; + use serde_json::{json, Value}; + + use super::*; + + struct TestProcessor { + settings: Settings, + outgoing_commands: HashSet<Command>, + } + + impl CommandProcessor for TestProcessor { + fn settings(&self) -> &Settings { + &self.settings + } + + fn apply_incoming_command(&self, command: Command) -> Result<CommandStatus> { + Ok(if let Command::Reset(name) = command { + if name == "forms" { + CommandStatus::Unsupported + } else { + CommandStatus::Applied + } + } else { + CommandStatus::Ignored + }) + } + + fn fetch_outgoing_commands(&self) -> Result<HashSet<Command>> { + Ok(self.outgoing_commands.clone()) + } + } + + fn inbound_from_clients(clients: Value) -> IncomingChangeset { + if let Value::Array(clients) = clients { + let changes = clients + .into_iter() + .map(|c| (Payload::from_json(c).unwrap(), ServerTimestamp(0))) + .collect(); + IncomingChangeset { + changes, + timestamp: ServerTimestamp(0), + collection: COLLECTION_NAME.into(), + } + } else { + unreachable!("`clients` must be an array of client records") + } + } + + #[test] + fn test_clients_sync() { + let processor = TestProcessor { + settings: Settings { + fxa_device_id: "deviceAAAAAA".into(), + device_name: "Laptop".into(), + device_type: DeviceType::Desktop, + }, + outgoing_commands: [ + Command::Wipe("bookmarks".into()), + Command::Reset("history".into()), + ] + .iter() + .cloned() + .collect(), + }; + + let config = InfoConfiguration::default(); + + let mut driver = Driver::new(&processor, &NeverInterrupts, &config); + + let inbound = inbound_from_clients(json!([{ + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + }, { + "id": "deviceCCCCCC", + "name": "Fenix", + "type": "mobile", + "commands": [], + "fxaDeviceId": "deviceCCCCCC", + }, { + "id": "deviceAAAAAA", + "name": "Laptop with a different name", + "type": "desktop", + "commands": [{ + "command": "wipeEngine", + "args": ["logins"] + }, { + "command": "displayURI", + "args": ["http://example.com", "Fennec", "Example page"], + "flowID": "flooooooooow", + }, { + "command": "resetEngine", + "args": ["forms"], + }, { + "command": "logout", + "args": [], + }], + "fxaDeviceId": "deviceAAAAAA", + }])); + + // Passing false for `should_refresh_client` - it should be ignored + // because we've changed the commands. + let mut outgoing = driver.sync(inbound, false).expect("Should sync clients"); + outgoing.changes.sort_by(|a, b| a.id.cmp(&b.id)); + + // Make sure the list of recently synced remote clients is correct. + let expected_ids = &["deviceAAAAAA", "deviceBBBBBB", "deviceCCCCCC"]; + let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>(); + actual_ids.sort(); + assert_eq!(actual_ids, expected_ids); + + let expected_remote_clients = &[ + RemoteClient { + fxa_device_id: Some("deviceAAAAAA".to_string()), + device_name: "Laptop".into(), + device_type: Some(DeviceType::Desktop), + }, + RemoteClient { + fxa_device_id: Some("iPhooooooone".to_string()), + device_name: "iPhone".into(), + device_type: Some(DeviceType::Mobile), + }, + RemoteClient { + fxa_device_id: Some("deviceCCCCCC".to_string()), + device_name: "Fenix".into(), + device_type: Some(DeviceType::Mobile), + }, + ]; + let actual_remote_clients = expected_ids + .iter() + .filter_map(|&id| driver.recent_clients.get(id)) + .cloned() + .collect::<Vec<RemoteClient>>(); + assert_eq!(actual_remote_clients, expected_remote_clients); + + let expected = json!([{ + "id": "deviceAAAAAA", + "name": "Laptop", + "type": "desktop", + "commands": [{ + "command": "displayURI", + "args": ["http://example.com", "Fennec", "Example page"], + "flowID": "flooooooooow", + }, { + "command": "resetEngine", + "args": ["forms"], + }, { + "command": "logout", + "args": [], + }], + "fxaDeviceId": "deviceAAAAAA", + "protocols": ["1.5"], + "ttl": CLIENTS_TTL, + }, { + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }, { + "command": "wipeEngine", + "args": ["bookmarks"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + "ttl": CLIENTS_TTL, + }, { + "id": "deviceCCCCCC", + "name": "Fenix", + "type": "mobile", + "commands": [{ + "command": "wipeEngine", + "args": ["bookmarks"], + }, { + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "deviceCCCCCC", + "ttl": CLIENTS_TTL, + }]); + if let Value::Array(expected) = expected { + for (i, record) in expected.into_iter().enumerate() { + assert_eq!(outgoing.changes[i], Payload::from_json(record).unwrap()); + } + } else { + unreachable!("`expected_clients` must be an array of client records") + } + } + + #[test] + fn test_clients_sync_explicit_refresh() { + let processor = TestProcessor { + settings: Settings { + fxa_device_id: "deviceAAAAAA".into(), + device_name: "Laptop".into(), + device_type: DeviceType::Desktop, + }, + outgoing_commands: [].iter().cloned().collect(), + }; + + let config = InfoConfiguration::default(); + + let mut driver = Driver::new(&processor, &NeverInterrupts, &config); + + let inbound = inbound_from_clients(json!([{ + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + "ttl": CLIENTS_TTL, + }, { + "id": "deviceAAAAAA", + "name": "Laptop", + "type": "desktop", + "commands": [], + "fxaDeviceId": "deviceAAAAAA", + "protocols": ["1.5"], + "ttl": CLIENTS_TTL, + }])); + + let outgoing = driver + .sync(inbound.clone(), false) + .expect("Should sync clients"); + // should be no outgoing changes. + assert_eq!(outgoing.changes.len(), 0); + + // Make sure the list of recently synced remote clients is correct and + // still includes our record we didn't update. + let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"]; + let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>(); + actual_ids.sort(); + assert_eq!(actual_ids, expected_ids); + + // Do it again - still no changes, but force a refresh. + let outgoing = driver.sync(inbound, true).expect("Should sync clients"); + assert_eq!(outgoing.changes.len(), 1); + + // Do it again - but this time with our own client record needing + // some change. + let inbound = inbound_from_clients(json!([{ + "id": "deviceAAAAAA", + "name": "Laptop with New Name", + "type": "desktop", + "commands": [], + "fxaDeviceId": "deviceAAAAAA", + "protocols": ["1.5"], + }])); + let outgoing = driver.sync(inbound, false).expect("Should sync clients"); + // should still be outgoing because the name changed. + assert_eq!(outgoing.changes.len(), 1); + } + + #[test] + fn test_fresh_client_record() { + let processor = TestProcessor { + settings: Settings { + fxa_device_id: "deviceAAAAAA".into(), + device_name: "Laptop".into(), + device_type: DeviceType::Desktop, + }, + outgoing_commands: HashSet::new(), + }; + + let config = InfoConfiguration::default(); + + let mut driver = Driver::new(&processor, &NeverInterrupts, &config); + + let clients = json!([{ + "id": "deviceBBBBBB", + "name": "iPhone", + "type": "mobile", + "commands": [{ + "command": "resetEngine", + "args": ["history"], + }], + "fxaDeviceId": "iPhooooooone", + "protocols": ["1.5"], + "device": "iPhone", + }]); + + let inbound = if let Value::Array(clients) = clients { + let changes = clients + .into_iter() + .map(|c| (Payload::from_json(c).unwrap(), ServerTimestamp(0))) + .collect(); + IncomingChangeset { + changes, + timestamp: ServerTimestamp(0), + collection: COLLECTION_NAME.into(), + } + } else { + unreachable!("`clients` must be an array of client records") + }; + + // Passing false here for should_refresh_client, but it should be + // ignored as we don't have an existing record yet. + let mut outgoing = driver.sync(inbound, false).expect("Should sync clients"); + outgoing.changes.sort_by(|a, b| a.id.cmp(&b.id)); + + // Make sure the list of recently synced remote clients is correct. + let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"]; + let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>(); + actual_ids.sort(); + assert_eq!(actual_ids, expected_ids); + + let expected_remote_clients = &[ + RemoteClient { + fxa_device_id: Some("deviceAAAAAA".to_string()), + device_name: "Laptop".into(), + device_type: Some(DeviceType::Desktop), + }, + RemoteClient { + fxa_device_id: Some("iPhooooooone".to_string()), + device_name: "iPhone".into(), + device_type: Some(DeviceType::Mobile), + }, + ]; + let actual_remote_clients = expected_ids + .iter() + .filter_map(|&id| driver.recent_clients.get(id)) + .cloned() + .collect::<Vec<RemoteClient>>(); + assert_eq!(actual_remote_clients, expected_remote_clients); + + let expected = json!([{ + "id": "deviceAAAAAA", + "name": "Laptop", + "type": "desktop", + "fxaDeviceId": "deviceAAAAAA", + "protocols": ["1.5"], + "ttl": CLIENTS_TTL, + }]); + if let Value::Array(expected) = expected { + for (i, record) in expected.into_iter().enumerate() { + assert_eq!(outgoing.changes[i], Payload::from_json(record).unwrap()); + } + } else { + unreachable!("`expected_clients` must be an array of client records") + } + } +} diff --git a/third_party/rust/sync15/src/clients/mod.rs b/third_party/rust/sync15/src/clients/mod.rs new file mode 100644 index 0000000000..c1b7c9f32e --- /dev/null +++ b/third_party/rust/sync15/src/clients/mod.rs @@ -0,0 +1,94 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::collections::HashSet; + +mod engine; +mod record; +mod ser; + +use anyhow::Result; +pub use engine::Engine; +pub use sync15_traits::client::{ClientData, DeviceType, RemoteClient}; + +// These are what desktop uses. +const CLIENTS_TTL: u32 = 1_814_400; // 21 days +pub(crate) const CLIENTS_TTL_REFRESH: u64 = 604_800; // 7 days + +/// A command processor applies incoming commands like wipes and resets for all +/// stores, and returns commands to send to other clients. It also manages +/// settings like the device name and type, which is stored in the special +/// `clients` collection. +/// +/// In practice, this trait only has one implementation, in the sync manager. +/// It's split this way because the clients engine depends on internal `sync15` +/// structures, and can't be implemented as a syncable store...but `sync15` +/// doesn't know anything about multiple engines. This lets the sync manager +/// provide its own implementation for handling wipe and reset commands for all +/// the engines that it manages. +pub trait CommandProcessor { + fn settings(&self) -> &Settings; + + /// Fetches commands to send to other clients. An error return value means + /// commands couldn't be fetched, and halts the sync. + fn fetch_outgoing_commands(&self) -> Result<HashSet<Command>>; + + /// Applies a command sent to this client from another client. This method + /// should return a `CommandStatus` indicating whether the command was + /// processed. + /// + /// An error return value means the sync manager encountered an error + /// applying the command, and halts the sync to prevent unexpected behavior + /// (for example, merging local and remote bookmarks, when we were told to + /// wipe our local bookmarks). + fn apply_incoming_command(&self, command: Command) -> Result<CommandStatus>; +} + +/// Indicates if a command was applied successfully, ignored, or not supported. +/// Applied and ignored commands are removed from our client record, and never +/// retried. Unsupported commands are put back into our record, and retried on +/// subsequent syncs. This is to handle clients adding support for new data +/// types. +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub enum CommandStatus { + Applied, + Ignored, + Unsupported, +} + +impl From<&record::ClientRecord> for RemoteClient { + fn from(record: &record::ClientRecord) -> RemoteClient { + RemoteClient { + fxa_device_id: record.fxa_device_id.clone(), + device_name: record.name.clone(), + device_type: record.typ.as_ref().and_then(DeviceType::try_from_str), + } + } +} + +/// Information about this device to include in its client record. This should +/// be persisted across syncs, as part of the sync manager state. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct Settings { + /// The FxA device ID of this client, also used as this client's record ID + /// in the clients collection. + pub fxa_device_id: String, + /// The name of this client. This should match the client's name in the + /// FxA device manager. + pub device_name: String, + /// The type of this client: mobile, tablet, desktop, or other. + pub device_type: DeviceType, +} + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum Command { + /// Erases all local data. + WipeAll, + /// Erases all local data for a specific engine. + Wipe(String), + /// Resets local sync state for all engines. + ResetAll, + /// Resets local sync state for a specific engine. + Reset(String), +} diff --git a/third_party/rust/sync15/src/clients/record.rs b/third_party/rust/sync15/src/clients/record.rs new file mode 100644 index 0000000000..f263b5ac88 --- /dev/null +++ b/third_party/rust/sync15/src/clients/record.rs @@ -0,0 +1,158 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use serde_derive::*; + +use super::Command; + +/// The serialized form of a client record. +#[derive(Clone, Debug, Eq, Deserialize, Hash, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ClientRecord { + #[serde(rename = "id")] + pub id: String, + + pub name: String, + + #[serde(default, rename = "type")] + pub typ: Option<String>, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub commands: Vec<CommandRecord>, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub fxa_device_id: Option<String>, + + /// `version`, `protocols`, `formfactor`, `os`, `appPackage`, `application`, + /// and `device` are unused and optional in all implementations (Desktop, + /// iOS, and Fennec), but we round-trip them. + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub version: Option<String>, + + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub protocols: Vec<String>, + + #[serde( + default, + rename = "formfactor", + skip_serializing_if = "Option::is_none" + )] + pub form_factor: Option<String>, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub os: Option<String>, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub app_package: Option<String>, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub application: Option<String>, + + /// The model of the device, like "iPhone" or "iPod touch" on iOS. Note + /// that this is _not_ the client ID (`id`) or the FxA device ID + /// (`fxa_device_id`). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub device: Option<String>, + + // This field is somewhat magic - it's moved to and from the + // BSO record, so is not expected to be on the unencrypted payload + // when incoming and are not put on the unencrypted payload when outgoing. + // There are hysterical raisens for this, which we should fix. + // https://github.com/mozilla/application-services/issues/2712 + #[serde(default)] + pub ttl: u32, +} + +/// The serialized form of a client command. +#[derive(Clone, Debug, Eq, Deserialize, Hash, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CommandRecord { + /// The command name. This is a string, not an enum, because we want to + /// round-trip commands that we don't support yet. + #[serde(rename = "command")] + pub name: String, + + /// Extra, command-specific arguments. Note that we must send an empty + /// array if the command expects no arguments. + #[serde(default)] + pub args: Vec<String>, + + /// Some commands, like repair, send a "flow ID" that other cliennts can + /// record in their telemetry. We don't currently send commands with + /// flow IDs, but we round-trip them. + #[serde(default, rename = "flowID", skip_serializing_if = "Option::is_none")] + pub flow_id: Option<String>, +} + +impl CommandRecord { + /// Converts a serialized command into one that we can apply. Returns `None` + /// if we don't support the command. + pub fn as_command(&self) -> Option<Command> { + match self.name.as_str() { + "wipeEngine" => self.args.get(0).map(|e| Command::Wipe(e.into())), + "wipeAll" => Some(Command::WipeAll), + "resetEngine" => self.args.get(0).map(|e| Command::Reset(e.into())), + "resetAll" => Some(Command::ResetAll), + _ => None, + } + } +} + +impl From<Command> for CommandRecord { + fn from(command: Command) -> CommandRecord { + match command { + Command::Wipe(engine) => CommandRecord { + name: "wipeEngine".into(), + args: vec![engine], + flow_id: None, + }, + Command::WipeAll => CommandRecord { + name: "wipeAll".into(), + args: Vec::new(), + flow_id: None, + }, + Command::Reset(engine) => CommandRecord { + name: "resetEngine".into(), + args: vec![engine], + flow_id: None, + }, + Command::ResetAll => CommandRecord { + name: "resetAll".into(), + args: Vec::new(), + flow_id: None, + }, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sync15_traits::Payload; + + #[test] + fn test_ttl() { + // The ttl hacks in place mean that magically the ttl field from the + // client record should make it down to a BSO. + let record = ClientRecord { + id: "id".into(), + name: "my device".into(), + typ: Some("type".into()), + commands: Vec::new(), + fxa_device_id: Some("12345".into()), + version: None, + protocols: vec!["1.5".into()], + form_factor: None, + os: None, + app_package: None, + application: None, + device: None, + ttl: 123, + }; + let p = Payload::from_record(record).unwrap(); + let bso = crate::CleartextBso::from_payload(p, "clients"); + assert_eq!(bso.ttl, Some(123)); + } +} diff --git a/third_party/rust/sync15/src/clients/ser.rs b/third_party/rust/sync15/src/clients/ser.rs new file mode 100644 index 0000000000..0e8b85b0b8 --- /dev/null +++ b/third_party/rust/sync15/src/clients/ser.rs @@ -0,0 +1,125 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::Result; +use serde::Serialize; +use std::io::{self, Write}; + +/// A writer that counts the number of bytes it's asked to write, and discards +/// the data. Used to calculate the serialized size of the commands list. +#[derive(Clone, Copy, Default)] +pub struct WriteCount(usize); + +impl WriteCount { + #[inline] + pub fn len(self) -> usize { + self.0 + } +} + +impl Write for WriteCount { + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0 += buf.len(); + Ok(buf.len()) + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +/// Returns the size of the given value, in bytes, when serialized to JSON. +fn compute_serialized_size<T: Serialize>(value: &T) -> Result<usize> { + let mut w = WriteCount::default(); + serde_json::to_writer(&mut w, value)?; + Ok(w.len()) +} + +/// Truncates `list` to fit within `payload_size_max_bytes` when serialized to +/// JSON. +pub fn shrink_to_fit<T: Serialize>(list: &mut Vec<T>, payload_size_max_bytes: usize) -> Result<()> { + let size = compute_serialized_size(&list)?; + // See bug 535326 comment 8 for an explanation of the estimation + match ((payload_size_max_bytes / 4) * 3).checked_sub(1500) { + Some(max_serialized_size) => { + if size > max_serialized_size { + // Estimate a little more than the direct fraction to maximize packing + let cutoff = (list.len() * max_serialized_size - 1) / size + 1; + list.truncate(cutoff + 1); + // Keep dropping off the last entry until the data fits. + while compute_serialized_size(&list)? > max_serialized_size { + if list.pop().is_none() { + break; + } + } + } + Ok(()) + } + None => { + list.clear(); + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::clients::record::CommandRecord; + + #[test] + fn test_compute_serialized_size() { + assert_eq!(compute_serialized_size(&1).unwrap(), 1); + assert_eq!(compute_serialized_size(&"hi").unwrap(), 4); + assert_eq!( + compute_serialized_size(&["hi", "hello", "bye"]).unwrap(), + 20 + ); + } + + #[test] + fn test_shrink_to_fit() { + let mut commands = vec![ + CommandRecord { + name: "wipeEngine".into(), + args: vec!["bookmarks".into()], + flow_id: Some("flow".into()), + }, + CommandRecord { + name: "resetEngine".into(), + args: vec!["history".into()], + flow_id: Some("flow".into()), + }, + CommandRecord { + name: "logout".into(), + args: Vec::new(), + flow_id: None, + }, + ]; + + // 4096 bytes is enough to fit all three commands. + shrink_to_fit(&mut commands, 4096).unwrap(); + assert_eq!(commands.len(), 3); + + let sizes = commands + .iter() + .map(|c| compute_serialized_size(c).unwrap()) + .collect::<Vec<_>>(); + assert_eq!(sizes, &[61, 60, 30]); + + // `logout` won't fit within 2168 bytes. + shrink_to_fit(&mut commands, 2168).unwrap(); + assert_eq!(commands.len(), 2); + + // `resetEngine` won't fit within 2084 bytes. + shrink_to_fit(&mut commands, 2084).unwrap(); + assert_eq!(commands.len(), 1); + + // `wipeEngine` won't fit at all. + shrink_to_fit(&mut commands, 1024).unwrap(); + assert!(commands.is_empty()); + } +} diff --git a/third_party/rust/sync15/src/coll_state.rs b/third_party/rust/sync15/src/coll_state.rs new file mode 100644 index 0000000000..7cb5ea874f --- /dev/null +++ b/third_party/rust/sync15/src/coll_state.rs @@ -0,0 +1,350 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::collection_keys::CollectionKeys; +use crate::error; +use crate::key_bundle::KeyBundle; +use crate::request::InfoConfiguration; +use crate::state::GlobalState; +use crate::sync::Store; +use crate::util::ServerTimestamp; + +pub use sync15_traits::{CollSyncIds, StoreSyncAssociation}; + +/// Holds state for a collection. In general, only the CollState is +/// needed to sync a collection (but a valid GlobalState is needed to obtain +/// a CollState) +#[derive(Debug, Clone)] +pub struct CollState { + pub config: InfoConfiguration, + // initially from meta/global, updated after an xius POST/PUT. + pub last_modified: ServerTimestamp, + pub key: KeyBundle, +} + +#[derive(Debug)] +pub enum LocalCollState { + /// The state is unknown, with the StoreSyncAssociation the collection + /// reports. + Unknown { assoc: StoreSyncAssociation }, + + /// The engine has been declined. This is a "terminal" state. + Declined, + + /// There's no such collection in meta/global. We could possibly update + /// meta/global, but currently all known collections are there by default, + /// so this is, basically, an error condition. + NoSuchCollection, + + /// Either the global or collection sync ID has changed - we will reset the engine. + SyncIdChanged { ids: CollSyncIds }, + + /// The collection is ready to sync. + Ready { key: KeyBundle }, +} + +pub struct LocalCollStateMachine<'state> { + global_state: &'state GlobalState, + root_key: &'state KeyBundle, +} + +impl<'state> LocalCollStateMachine<'state> { + fn advance(&self, from: LocalCollState, store: &dyn Store) -> error::Result<LocalCollState> { + let name = &store.collection_name().to_string(); + let meta_global = &self.global_state.global; + match from { + LocalCollState::Unknown { assoc } => { + if meta_global.declined.contains(name) { + return Ok(LocalCollState::Declined); + } + match meta_global.engines.get(name) { + Some(engine_meta) => match assoc { + StoreSyncAssociation::Disconnected => Ok(LocalCollState::SyncIdChanged { + ids: CollSyncIds { + global: meta_global.sync_id.clone(), + coll: engine_meta.sync_id.clone(), + }, + }), + StoreSyncAssociation::Connected(ref ids) + if ids.global == meta_global.sync_id + && ids.coll == engine_meta.sync_id => + { + let coll_keys = CollectionKeys::from_encrypted_bso( + self.global_state.keys.clone(), + self.root_key, + )?; + Ok(LocalCollState::Ready { + key: coll_keys.key_for_collection(name).clone(), + }) + } + _ => Ok(LocalCollState::SyncIdChanged { + ids: CollSyncIds { + global: meta_global.sync_id.clone(), + coll: engine_meta.sync_id.clone(), + }, + }), + }, + None => Ok(LocalCollState::NoSuchCollection), + } + } + + LocalCollState::Declined => unreachable!("can't advance from declined"), + + LocalCollState::NoSuchCollection => unreachable!("the collection is unknown"), + + LocalCollState::SyncIdChanged { ids } => { + let assoc = StoreSyncAssociation::Connected(ids); + log::info!("Resetting {} store", store.collection_name()); + store.reset(&assoc)?; + Ok(LocalCollState::Unknown { assoc }) + } + + LocalCollState::Ready { .. } => unreachable!("can't advance from ready"), + } + } + + // A little whimsy - a portmanteau of far and fast + fn run_and_run_as_farst_as_you_can( + &mut self, + store: &dyn Store, + ) -> error::Result<Option<CollState>> { + let mut s = LocalCollState::Unknown { + assoc: store.get_sync_assoc()?, + }; + // This is a simple state machine and should never take more than + // 10 goes around. + let mut count = 0; + loop { + log::trace!("LocalCollState in {:?}", s); + match s { + LocalCollState::Ready { key } => { + let name = store.collection_name(); + let config = self.global_state.config.clone(); + let last_modified = self + .global_state + .collections + .get(name.as_ref()) + .cloned() + .unwrap_or_default(); + return Ok(Some(CollState { + config, + last_modified, + key, + })); + } + LocalCollState::Declined | LocalCollState::NoSuchCollection => return Ok(None), + + _ => { + count += 1; + if count > 10 { + log::warn!("LocalCollStateMachine appears to be looping"); + return Ok(None); + } + // should we have better loop detection? Our limit of 10 + // goes is probably OK for now, but not really ideal. + s = self.advance(s, store)?; + } + }; + } + } + + pub fn get_state( + store: &dyn Store, + global_state: &'state GlobalState, + root_key: &'state KeyBundle, + ) -> error::Result<Option<CollState>> { + let mut gingerbread_man = Self { + global_state, + root_key, + }; + gingerbread_man.run_and_run_as_farst_as_you_can(store) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::changeset::{IncomingChangeset, OutgoingChangeset}; + use crate::collection_keys::CollectionKeys; + use crate::record_types::{MetaGlobalEngine, MetaGlobalRecord}; + use crate::request::{CollectionRequest, InfoCollections, InfoConfiguration}; + use crate::telemetry; + use anyhow::Result; + use std::cell::{Cell, RefCell}; + use std::collections::HashMap; + use sync_guid::Guid; + + fn get_global_state(root_key: &KeyBundle) -> GlobalState { + let keys = CollectionKeys::new_random() + .unwrap() + .to_encrypted_bso(&root_key) + .unwrap(); + GlobalState { + config: InfoConfiguration::default(), + collections: InfoCollections::new(HashMap::new()), + global: MetaGlobalRecord { + sync_id: "syncIDAAAAAA".into(), + storage_version: 5usize, + engines: vec![( + "bookmarks", + MetaGlobalEngine { + version: 1usize, + sync_id: "syncIDBBBBBB".into(), + }, + )] + .into_iter() + .map(|(key, value)| (key.to_owned(), value)) + .collect(), + declined: vec![], + }, + global_timestamp: ServerTimestamp::default(), + keys, + } + } + + struct TestStore { + collection_name: &'static str, + assoc: Cell<StoreSyncAssociation>, + num_resets: RefCell<usize>, + } + + impl TestStore { + fn new(collection_name: &'static str, assoc: StoreSyncAssociation) -> Self { + Self { + collection_name, + assoc: Cell::new(assoc), + num_resets: RefCell::new(0), + } + } + fn get_num_resets(&self) -> usize { + *self.num_resets.borrow() + } + } + + impl Store for TestStore { + fn collection_name(&self) -> std::borrow::Cow<'static, str> { + self.collection_name.into() + } + + fn apply_incoming( + &self, + _inbound: Vec<IncomingChangeset>, + _telem: &mut telemetry::Engine, + ) -> Result<OutgoingChangeset> { + unreachable!("these tests shouldn't call these"); + } + + fn sync_finished( + &self, + _new_timestamp: ServerTimestamp, + _records_synced: Vec<Guid>, + ) -> Result<()> { + unreachable!("these tests shouldn't call these"); + } + + fn get_collection_requests( + &self, + _server_timestamp: ServerTimestamp, + ) -> Result<Vec<CollectionRequest>> { + unreachable!("these tests shouldn't call these"); + } + + fn get_sync_assoc(&self) -> Result<StoreSyncAssociation> { + Ok(self.assoc.replace(StoreSyncAssociation::Disconnected)) + } + + fn reset(&self, new_assoc: &StoreSyncAssociation) -> Result<()> { + self.assoc.replace(new_assoc.clone()); + *self.num_resets.borrow_mut() += 1; + Ok(()) + } + + fn wipe(&self) -> Result<()> { + unreachable!("these tests shouldn't call these"); + } + } + + #[test] + fn test_unknown() { + let root_key = KeyBundle::new_random().expect("should work"); + let gs = get_global_state(&root_key); + let store = TestStore::new("unknown", StoreSyncAssociation::Disconnected); + let cs = LocalCollStateMachine::get_state(&store, &gs, &root_key).expect("should work"); + assert!(cs.is_none(), "unknown collection name can't sync"); + assert_eq!(store.get_num_resets(), 0); + } + + #[test] + fn test_known_no_state() { + let root_key = KeyBundle::new_random().expect("should work"); + let gs = get_global_state(&root_key); + let store = TestStore::new("bookmarks", StoreSyncAssociation::Disconnected); + let cs = LocalCollStateMachine::get_state(&store, &gs, &root_key).expect("should work"); + assert!(cs.is_some(), "collection can sync"); + assert_eq!( + store.assoc.replace(StoreSyncAssociation::Disconnected), + StoreSyncAssociation::Connected(CollSyncIds { + global: "syncIDAAAAAA".into(), + coll: "syncIDBBBBBB".into(), + }) + ); + assert_eq!(store.get_num_resets(), 1); + } + + #[test] + fn test_known_wrong_state() { + let root_key = KeyBundle::new_random().expect("should work"); + let gs = get_global_state(&root_key); + let store = TestStore::new( + "bookmarks", + StoreSyncAssociation::Connected(CollSyncIds { + global: "syncIDXXXXXX".into(), + coll: "syncIDYYYYYY".into(), + }), + ); + let cs = LocalCollStateMachine::get_state(&store, &gs, &root_key).expect("should work"); + assert!(cs.is_some(), "collection can sync"); + assert_eq!( + store.assoc.replace(StoreSyncAssociation::Disconnected), + StoreSyncAssociation::Connected(CollSyncIds { + global: "syncIDAAAAAA".into(), + coll: "syncIDBBBBBB".into(), + }) + ); + assert_eq!(store.get_num_resets(), 1); + } + + #[test] + fn test_known_good_state() { + let root_key = KeyBundle::new_random().expect("should work"); + let gs = get_global_state(&root_key); + let store = TestStore::new( + "bookmarks", + StoreSyncAssociation::Connected(CollSyncIds { + global: "syncIDAAAAAA".into(), + coll: "syncIDBBBBBB".into(), + }), + ); + let cs = LocalCollStateMachine::get_state(&store, &gs, &root_key).expect("should work"); + assert!(cs.is_some(), "collection can sync"); + assert_eq!(store.get_num_resets(), 0); + } + + #[test] + fn test_declined() { + let root_key = KeyBundle::new_random().expect("should work"); + let mut gs = get_global_state(&root_key); + gs.global.declined.push("bookmarks".to_string()); + let store = TestStore::new( + "bookmarks", + StoreSyncAssociation::Connected(CollSyncIds { + global: "syncIDAAAAAA".into(), + coll: "syncIDBBBBBB".into(), + }), + ); + let cs = LocalCollStateMachine::get_state(&store, &gs, &root_key).expect("should work"); + assert!(cs.is_none(), "declined collection can sync"); + assert_eq!(store.get_num_resets(), 0); + } +} diff --git a/third_party/rust/sync15/src/collection_keys.rs b/third_party/rust/sync15/src/collection_keys.rs new file mode 100644 index 0000000000..d66fc235b4 --- /dev/null +++ b/third_party/rust/sync15/src/collection_keys.rs @@ -0,0 +1,64 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::bso_record::{EncryptedBso, Payload}; +use crate::error::Result; +use crate::key_bundle::KeyBundle; +use crate::record_types::CryptoKeysRecord; +use crate::util::ServerTimestamp; +use std::collections::HashMap; + +#[derive(Clone, Debug, PartialEq)] +pub struct CollectionKeys { + pub timestamp: ServerTimestamp, + pub default: KeyBundle, + pub collections: HashMap<String, KeyBundle>, +} + +impl CollectionKeys { + pub fn new_random() -> Result<CollectionKeys> { + let default = KeyBundle::new_random()?; + Ok(CollectionKeys { + timestamp: ServerTimestamp(0), + default, + collections: HashMap::new(), + }) + } + + pub fn from_encrypted_bso( + record: EncryptedBso, + root_key: &KeyBundle, + ) -> Result<CollectionKeys> { + let keys = record.decrypt_as::<CryptoKeysRecord>(root_key)?; + Ok(CollectionKeys { + timestamp: keys.modified, + default: KeyBundle::from_base64(&keys.payload.default[0], &keys.payload.default[1])?, + collections: keys + .payload + .collections + .into_iter() + .map(|kv| Ok((kv.0, KeyBundle::from_base64(&kv.1[0], &kv.1[1])?))) + .collect::<Result<HashMap<String, KeyBundle>>>()?, + }) + } + + pub fn to_encrypted_bso(&self, root_key: &KeyBundle) -> Result<EncryptedBso> { + let record = CryptoKeysRecord { + id: "keys".into(), + collection: "crypto".into(), + default: self.default.to_b64_array(), + collections: self + .collections + .iter() + .map(|kv| (kv.0.clone(), kv.1.to_b64_array())) + .collect(), + }; + let bso = crate::CleartextBso::from_payload(Payload::from_record(record)?, "crypto"); + Ok(bso.encrypt(root_key)?) + } + + pub fn key_for_collection<'a>(&'a self, collection: &str) -> &'a KeyBundle { + self.collections.get(collection).unwrap_or(&self.default) + } +} diff --git a/third_party/rust/sync15/src/error.rs b/third_party/rust/sync15/src/error.rs new file mode 100644 index 0000000000..7818f7bfc9 --- /dev/null +++ b/third_party/rust/sync15/src/error.rs @@ -0,0 +1,141 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use interrupt_support::Interrupted; +use rc_crypto::hawk; +use std::string; +use std::time::SystemTime; +use sync15_traits::request::UnacceptableBaseUrl; +/// This enum is to discriminate `StorageHttpError`, and not used as an error. +#[derive(Debug, Clone)] +pub enum ErrorResponse { + NotFound { route: String }, + // 401 + Unauthorized { route: String }, + // 412 + PreconditionFailed { route: String }, + // 5XX + ServerError { route: String, status: u16 }, // TODO: info for "retry-after" and backoff handling etc here. + // Other HTTP responses. + RequestFailed { route: String, status: u16 }, +} + +#[derive(Debug, thiserror::Error)] +pub enum ErrorKind { + #[error("Key {0} had wrong length, got {1}, expected {2}")] + BadKeyLength(&'static str, usize, usize), + + #[error("SHA256 HMAC Mismatch error")] + HmacMismatch, + + #[error("HTTP status {0} when requesting a token from the tokenserver")] + TokenserverHttpError(u16), + + #[error("HTTP storage error: {0:?}")] + StorageHttpError(ErrorResponse), + + #[error("Server requested backoff. Retry after {0:?}")] + BackoffError(SystemTime), + + #[error("Outgoing record is too large to upload")] + RecordTooLargeError, + + // Do we want to record the concrete problems? + #[error("Not all records were successfully uploaded")] + RecordUploadFailed, + + /// Used for things like a node reassignment or an unexpected syncId + /// implying the app needs to "reset" its understanding of remote storage. + #[error("The server has reset the storage for this account")] + StorageResetError, + + #[error("Unacceptable URL: {0}")] + UnacceptableUrl(String), + + #[error("Missing server timestamp header in request")] + MissingServerTimestamp, + + #[error("Unexpected server behavior during batch upload: {0}")] + ServerBatchProblem(&'static str), + + #[error("It appears some other client is also trying to setup storage; try again later")] + SetupRace, + + #[error("Client upgrade required; server storage version too new")] + ClientUpgradeRequired, + + // This means that our global state machine needs to enter a state (such as + // "FreshStartNeeded", but the allowed_states don't include that state.) + // It typically means we are trying to do a "fast" or "read-only" sync. + #[error("Our storage needs setting up and we can't currently do it")] + SetupRequired, + + #[error("Store error: {0}")] + StoreError(#[from] anyhow::Error), + + #[error("Crypto/NSS error: {0}")] + CryptoError(#[from] rc_crypto::Error), + + #[error("Base64 decode error: {0}")] + Base64Decode(#[from] base64::DecodeError), + + #[error("JSON error: {0}")] + JsonError(#[from] serde_json::Error), + + #[error("Bad cleartext UTF8: {0}")] + BadCleartextUtf8(#[from] string::FromUtf8Error), + + #[error("Network error: {0}")] + RequestError(#[from] viaduct::Error), + + #[error("Unexpected HTTP status: {0}")] + UnexpectedStatus(#[from] viaduct::UnexpectedStatus), + + #[error("HAWK error: {0}")] + HawkError(#[from] hawk::Error), + + #[error("URL parse error: {0}")] + MalformedUrl(#[from] url::ParseError), + + #[error("The operation was interrupted.")] + Interrupted(#[from] Interrupted), +} + +error_support::define_error! { + ErrorKind { + (CryptoError, rc_crypto::Error), + (Base64Decode, base64::DecodeError), + (JsonError, serde_json::Error), + (BadCleartextUtf8, std::string::FromUtf8Error), + (RequestError, viaduct::Error), + (UnexpectedStatus, viaduct::UnexpectedStatus), + (MalformedUrl, url::ParseError), + // A bit dubious, since we only want this to happen inside `synchronize` + (StoreError, anyhow::Error), + (Interrupted, Interrupted), + (HawkError, hawk::Error), + } +} + +impl From<UnacceptableBaseUrl> for ErrorKind { + fn from(e: UnacceptableBaseUrl) -> ErrorKind { + ErrorKind::UnacceptableUrl(e.to_string()) + } +} + +impl From<UnacceptableBaseUrl> for Error { + fn from(e: UnacceptableBaseUrl) -> Self { + Error::from(ErrorKind::from(e)) + } +} + +impl Error { + pub(crate) fn get_backoff(&self) -> Option<SystemTime> { + if let ErrorKind::BackoffError(time) = self.kind() { + Some(*time) + } else { + None + } + } +} diff --git a/third_party/rust/sync15/src/key_bundle.rs b/third_party/rust/sync15/src/key_bundle.rs new file mode 100644 index 0000000000..c3eeb716ae --- /dev/null +++ b/third_party/rust/sync15/src/key_bundle.rs @@ -0,0 +1,212 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::{ErrorKind, Result}; +use rc_crypto::{ + aead::{self, OpeningKey, SealingKey}, + rand, +}; + +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct KeyBundle { + enc_key: Vec<u8>, + mac_key: Vec<u8>, +} + +impl std::fmt::Debug for KeyBundle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("KeyBundle").finish() + } +} + +impl KeyBundle { + /// Construct a key bundle from the already-decoded encrypt and hmac keys. + /// Panics (asserts) if they aren't both 32 bytes. + pub fn new(enc: Vec<u8>, mac: Vec<u8>) -> Result<KeyBundle> { + if enc.len() != 32 { + log::error!("Bad key length (enc_key): {} != 32", enc.len()); + return Err(ErrorKind::BadKeyLength("enc_key", enc.len(), 32).into()); + } + if mac.len() != 32 { + log::error!("Bad key length (mac_key): {} != 32", mac.len()); + return Err(ErrorKind::BadKeyLength("mac_key", mac.len(), 32).into()); + } + Ok(KeyBundle { + enc_key: enc, + mac_key: mac, + }) + } + + pub fn new_random() -> Result<KeyBundle> { + let mut buffer = [0u8; 64]; + rand::fill(&mut buffer)?; + KeyBundle::from_ksync_bytes(&buffer) + } + + pub fn from_ksync_bytes(ksync: &[u8]) -> Result<KeyBundle> { + if ksync.len() != 64 { + log::error!("Bad key length (kSync): {} != 64", ksync.len()); + return Err(ErrorKind::BadKeyLength("kSync", ksync.len(), 64).into()); + } + Ok(KeyBundle { + enc_key: ksync[0..32].into(), + mac_key: ksync[32..64].into(), + }) + } + + pub fn from_ksync_base64(ksync: &str) -> Result<KeyBundle> { + let bytes = base64::decode_config(&ksync, base64::URL_SAFE_NO_PAD)?; + KeyBundle::from_ksync_bytes(&bytes) + } + + pub fn from_base64(enc: &str, mac: &str) -> Result<KeyBundle> { + let enc_bytes = base64::decode(&enc)?; + let mac_bytes = base64::decode(&mac)?; + KeyBundle::new(enc_bytes, mac_bytes) + } + + #[inline] + pub fn encryption_key(&self) -> &[u8] { + &self.enc_key + } + + #[inline] + pub fn hmac_key(&self) -> &[u8] { + &self.mac_key + } + + #[inline] + pub fn to_b64_array(&self) -> [String; 2] { + [base64::encode(&self.enc_key), base64::encode(&self.mac_key)] + } + + /// Decrypt the provided ciphertext with the given iv, and decodes the + /// result as a utf8 string. + pub fn decrypt(&self, enc_base64: &str, iv_base64: &str, hmac_base16: &str) -> Result<String> { + // Decode the expected_hmac into bytes to avoid issues if a client happens to encode + // this as uppercase. This shouldn't happen in practice, but doing it this way is more + // robust and avoids an allocation. + let mut decoded_hmac = vec![0u8; 32]; + if base16::decode_slice(hmac_base16, &mut decoded_hmac).is_err() { + log::warn!("Garbage HMAC verification string: contained non base16 characters"); + return Err(ErrorKind::HmacMismatch.into()); + } + let iv = base64::decode(iv_base64)?; + let ciphertext_bytes = base64::decode(enc_base64)?; + let key_bytes = [self.encryption_key(), self.hmac_key()].concat(); + let key = OpeningKey::new(&aead::LEGACY_SYNC_AES_256_CBC_HMAC_SHA256, &key_bytes)?; + let nonce = aead::Nonce::try_assume_unique_for_key( + &aead::LEGACY_SYNC_AES_256_CBC_HMAC_SHA256, + &iv, + )?; + let ciphertext_and_hmac = [ciphertext_bytes, decoded_hmac].concat(); + let cleartext_bytes = aead::open(&key, nonce, aead::Aad::empty(), &ciphertext_and_hmac)?; + let cleartext = String::from_utf8(cleartext_bytes)?; + Ok(cleartext) + } + + /// Encrypt using the provided IV. + pub fn encrypt_bytes_with_iv( + &self, + cleartext_bytes: &[u8], + iv: &[u8], + ) -> Result<(String, String)> { + let key_bytes = [self.encryption_key(), self.hmac_key()].concat(); + let key = SealingKey::new(&aead::LEGACY_SYNC_AES_256_CBC_HMAC_SHA256, &key_bytes)?; + let nonce = + aead::Nonce::try_assume_unique_for_key(&aead::LEGACY_SYNC_AES_256_CBC_HMAC_SHA256, iv)?; + let ciphertext_and_hmac = aead::seal(&key, nonce, aead::Aad::empty(), cleartext_bytes)?; + let ciphertext_len = ciphertext_and_hmac.len() - key.algorithm().tag_len(); + // Do the string conversions here so we don't have to split and copy to 2 vectors. + let (ciphertext, hmac_signature) = ciphertext_and_hmac.split_at(ciphertext_len); + let enc_base64 = base64::encode(&ciphertext); + let hmac_base16 = base16::encode_lower(&hmac_signature); + Ok((enc_base64, hmac_base16)) + } + + /// Generate a random iv and encrypt with it. Return both the encrypted bytes + /// and the generated iv. + pub fn encrypt_bytes_rand_iv( + &self, + cleartext_bytes: &[u8], + ) -> Result<(String, String, String)> { + let mut iv = [0u8; 16]; + rand::fill(&mut iv)?; + let (enc_base64, hmac_base16) = self.encrypt_bytes_with_iv(cleartext_bytes, &iv)?; + let iv_base64 = base64::encode(&iv); + Ok((enc_base64, iv_base64, hmac_base16)) + } + + pub fn encrypt_with_iv(&self, cleartext: &str, iv: &[u8]) -> Result<(String, String)> { + self.encrypt_bytes_with_iv(cleartext.as_bytes(), iv) + } + + pub fn encrypt_rand_iv(&self, cleartext: &str) -> Result<(String, String, String)> { + self.encrypt_bytes_rand_iv(cleartext.as_bytes()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + const HMAC_B16: &str = "b1e6c18ac30deb70236bc0d65a46f7a4dce3b8b0e02cf92182b914e3afa5eebc"; + const IV_B64: &str = "GX8L37AAb2FZJMzIoXlX8w=="; + const HMAC_KEY_B64: &str = "MMntEfutgLTc8FlTLQFms8/xMPmCldqPlq/QQXEjx70="; + const ENC_KEY_B64: &str = "9K/wLdXdw+nrTtXo4ZpECyHFNr4d7aYHqeg3KW9+m6Q="; + + const CIPHERTEXT_B64_PIECES: &[&str] = &[ + "NMsdnRulLwQsVcwxKW9XwaUe7ouJk5Wn80QhbD80l0HEcZGCynh45qIbeYBik0lgcHbK", + "mlIxTJNwU+OeqipN+/j7MqhjKOGIlvbpiPQQLC6/ffF2vbzL0nzMUuSyvaQzyGGkSYM2", + "xUFt06aNivoQTvU2GgGmUK6MvadoY38hhW2LCMkoZcNfgCqJ26lO1O0sEO6zHsk3IVz6", + "vsKiJ2Hq6VCo7hu123wNegmujHWQSGyf8JeudZjKzfi0OFRRvvm4QAKyBWf0MgrW1F8S", + "FDnVfkq8amCB7NhdwhgLWbN+21NitNwWYknoEWe1m6hmGZDgDT32uxzWxCV8QqqrpH/Z", + "ggViEr9uMgoy4lYaWqP7G5WKvvechc62aqnsNEYhH26A5QgzmlNyvB+KPFvPsYzxDnSC", + "jOoRSLx7GG86wT59QZw=", + ]; + + const CLEARTEXT_B64_PIECES: &[&str] = &[ + "eyJpZCI6IjVxUnNnWFdSSlpYciIsImhpc3RVcmkiOiJmaWxlOi8vL1VzZXJzL2phc29u", + "L0xpYnJhcnkvQXBwbGljYXRpb24lMjBTdXBwb3J0L0ZpcmVmb3gvUHJvZmlsZXMva3Nn", + "ZDd3cGsuTG9jYWxTeW5jU2VydmVyL3dlYXZlL2xvZ3MvIiwidGl0bGUiOiJJbmRleCBv", + "ZiBmaWxlOi8vL1VzZXJzL2phc29uL0xpYnJhcnkvQXBwbGljYXRpb24gU3VwcG9ydC9G", + "aXJlZm94L1Byb2ZpbGVzL2tzZ2Q3d3BrLkxvY2FsU3luY1NlcnZlci93ZWF2ZS9sb2dz", + "LyIsInZpc2l0cyI6W3siZGF0ZSI6MTMxOTE0OTAxMjM3MjQyNSwidHlwZSI6MX1dfQ==", + ]; + + #[test] + fn test_decrypt() { + let key_bundle = KeyBundle::from_base64(ENC_KEY_B64, HMAC_KEY_B64).unwrap(); + let ciphertext = CIPHERTEXT_B64_PIECES.join(""); + let s = key_bundle.decrypt(&ciphertext, IV_B64, HMAC_B16).unwrap(); + + let cleartext = + String::from_utf8(base64::decode(&CLEARTEXT_B64_PIECES.join("")).unwrap()).unwrap(); + assert_eq!(&cleartext, &s); + } + + #[test] + fn test_encrypt() { + let key_bundle = KeyBundle::from_base64(ENC_KEY_B64, HMAC_KEY_B64).unwrap(); + let iv = base64::decode(IV_B64).unwrap(); + + let cleartext_bytes = base64::decode(&CLEARTEXT_B64_PIECES.join("")).unwrap(); + let (enc_base64, _hmac_base16) = key_bundle + .encrypt_bytes_with_iv(&cleartext_bytes, &iv) + .unwrap(); + + let expect_ciphertext = CIPHERTEXT_B64_PIECES.join(""); + + assert_eq!(&enc_base64, &expect_ciphertext); + + let (enc_base64_2, iv_base64_2, hmac_base16_2) = + key_bundle.encrypt_bytes_rand_iv(&cleartext_bytes).unwrap(); + assert_ne!(&enc_base64_2, &expect_ciphertext); + + let s = key_bundle + .decrypt(&enc_base64_2, &iv_base64_2, &hmac_base16_2) + .unwrap(); + assert_eq!(&cleartext_bytes, &s.as_bytes()); + } +} diff --git a/third_party/rust/sync15/src/lib.rs b/third_party/rust/sync15/src/lib.rs new file mode 100644 index 0000000000..4420a72f0d --- /dev/null +++ b/third_party/rust/sync15/src/lib.rs @@ -0,0 +1,45 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#![allow(unknown_lints, clippy::implicit_hasher)] +#![warn(rust_2018_idioms)] + +mod bso_record; +pub mod changeset; +mod client; +pub mod clients; +mod coll_state; +mod collection_keys; +mod error; +mod key_bundle; +mod migrate_state; +mod record_types; +mod request; +mod state; +mod status; +mod sync; +mod sync_multiple; +pub mod telemetry; +mod token; +mod util; + +// Re-export some of the types callers are likely to want for convenience. +pub use crate::bso_record::{BsoRecord, CleartextBso, EncryptedBso, EncryptedPayload, Payload}; +pub use crate::changeset::{IncomingChangeset, OutgoingChangeset, RecordChangeset}; +pub use crate::client::{ + SetupStorageClient, Sync15ClientResponse, Sync15StorageClient, Sync15StorageClientInit, +}; +pub use crate::coll_state::{CollState, CollSyncIds, StoreSyncAssociation}; +pub use crate::collection_keys::CollectionKeys; +pub use crate::error::{Error, ErrorKind, Result}; +pub use crate::key_bundle::KeyBundle; +pub use crate::migrate_state::extract_v1_state; +pub use crate::request::CollectionRequest; +pub use crate::state::{GlobalState, SetupStateMachine}; +pub use crate::status::{ServiceStatus, SyncResult}; +pub use crate::sync::{synchronize, Store}; +pub use crate::sync_multiple::{ + sync_multiple, sync_multiple_with_command_processor, MemoryCachedState, SyncRequestInfo, +}; +pub use crate::util::ServerTimestamp; diff --git a/third_party/rust/sync15/src/migrate_state.rs b/third_party/rust/sync15/src/migrate_state.rs new file mode 100644 index 0000000000..73d64676b7 --- /dev/null +++ b/third_party/rust/sync15/src/migrate_state.rs @@ -0,0 +1,203 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::record_types::MetaGlobalRecord; +use crate::state::PersistedGlobalState; +use crate::CollSyncIds; +use serde_json::Value; + +/// Given a string persisted as our old GlobalState V1 struct, extract out +/// the sync IDs for the collection, plus a string which should be used as the +/// new "global persisted state" (which holds the declined engines). +/// Returns (None, None) in early error cases (eg, invalid JSON, wrong schema +/// version etc). Otherwise, you can expect the returned global state to be +/// Some, even if the CollSyncIds is None (which can happen if the engine is +/// missing, or flagged for reset) +pub fn extract_v1_state( + state: Option<String>, + collection: &'static str, +) -> (Option<CollSyncIds>, Option<String>) { + let state = match state { + Some(s) => s, + None => return (None, None), + }; + let j: serde_json::Value = match serde_json::from_str(&state) { + Ok(j) => j, + Err(_) => return (None, None), + }; + if Some("V1") != j.get("schema_version").and_then(Value::as_str) { + return (None, None); + } + + let empty = Vec::<serde_json::Value>::new(); + // Get the global and payload out so we can obtain declined first. + let global = match j.get("global").and_then(Value::as_object) { + None => return (None, None), + Some(v) => v, + }; + // payload is itself a string holding json - so re-parse. + let meta_global = match global["payload"] + .as_str() + .and_then(|s| serde_json::from_str::<MetaGlobalRecord>(s).ok()) + { + Some(p) => p, + None => return (None, None), + }; + let pgs = PersistedGlobalState::V2 { + declined: Some(meta_global.declined), + }; + let new_global_state = serde_json::to_string(&pgs).ok(); + + // See if the collection needs a reset. + for change in j + .get("engine_state_changes") + .and_then(Value::as_array) + .unwrap_or(&empty) + { + if change.as_str() == Some("ResetAll") { + return (None, new_global_state); + } + // other resets we care about are objects - `"Reset":name` and + // `"ResetAllExcept":[name, name]` + if let Some(change_ob) = change.as_object() { + if change_ob.get("Reset").and_then(Value::as_str) == Some(collection) { + // this engine is reset. + return (None, new_global_state); + } + if let Some(except_array) = change_ob.get("ResetAllExcept").and_then(Value::as_array) { + // We have what appears to be a valid list of exceptions to reset. + // If every one lists an engine that isn't us, we are being reset. + if except_array + .iter() + .filter_map(Value::as_str) + .all(|s| s != collection) + { + return (None, new_global_state); + } + } + } + } + + // Try and find the sync guids in the global payload. + let gsid = meta_global.sync_id; + let ids = meta_global.engines.get(collection).map(|coll| CollSyncIds { + global: gsid, + coll: coll.sync_id.clone(), + }); + (ids, new_global_state) +} + +#[cfg(test)] +mod tests { + use super::*; + + // Test our destructuring of the old persisted global state. + + fn get_state_with_engine_changes_and_declined(changes: &str, declined: &str) -> String { + // This is a copy of the V1 persisted state. + // Note some things have been omitted or trimmed from what's actually persisted + // (eg, top-level "config" is removed, "collections" is removed (that's only timestamps) + // hmac keys have array elts removed, global/payload has engines removed, etc) + // Note also that all `{` and `}` have been doubled for use in format!(), + // which we use to patch-in engine_state_changes. + format!( + r#"{{ + "schema_version":"V1", + "global":{{ + "id":"global", + "collection":"", + "payload":"{{\"syncID\":\"qZKAMjhyV6Ti\",\"storageVersion\":5,\"engines\":{{\"addresses\":{{\"version\":1,\"syncID\":\"8M-HfX6dm-pD\"}},\"bookmarks\":{{\"version\":2,\"syncID\":\"AVXtnKkH5OTi\"}}}},\"declined\":[{declined}]}}" + }}, + "keys":{{"timestamp":1548214240.34,"default":{{"enc_key":[36,76],"mac_key":[222,241]}},"collections":{{}}}}, + "engine_state_changes":[ + {changes} + ] + }}"#, + changes = changes, + declined = declined + ) + } + + fn get_state_with_engine_changes(changes: &str) -> String { + get_state_with_engine_changes_and_declined(changes, "") + } + + fn make_csids(global: &str, coll: &str) -> Option<CollSyncIds> { + Some(CollSyncIds { + global: global.into(), + coll: coll.into(), + }) + } + + fn extract_v1_ids_only(state: Option<String>, collection: &'static str) -> Option<CollSyncIds> { + let (sync_ids, new_state) = extract_v1_state(state, collection); + // tests which use this never have declined, so make sure our + // state reflects that. + let expected_state = serde_json::to_string(&PersistedGlobalState::V2 { + declined: Some(Vec::<String>::new()), + }) + .expect("should stringify"); + assert_eq!(new_state, Some(expected_state)); + sync_ids + } + + #[test] + fn test_extract_state_simple() { + let s = get_state_with_engine_changes(""); + assert_eq!( + extract_v1_ids_only(Some(s.clone()), "addresses"), + make_csids("qZKAMjhyV6Ti", "8M-HfX6dm-pD") + ); + assert_eq!( + extract_v1_ids_only(Some(s), "bookmarks"), + make_csids("qZKAMjhyV6Ti", "AVXtnKkH5OTi") + ); + } + + #[test] + fn test_extract_state_simple_with_declined() { + // Note that 'declined' is stringified json, hence the extra back-slashes. + let s = get_state_with_engine_changes_and_declined("", "\\\"foo\\\""); + let expected_state = serde_json::to_string(&PersistedGlobalState::V2 { + declined: Some(vec!["foo".to_string()]), + }) + .unwrap(); + assert_eq!( + extract_v1_state(Some(s), "addresses"), + ( + make_csids("qZKAMjhyV6Ti", "8M-HfX6dm-pD"), + Some(expected_state) + ) + ); + } + + #[test] + fn test_extract_with_engine_reset_all() { + let s = get_state_with_engine_changes("\"ResetAll\""); + assert_eq!(extract_v1_ids_only(Some(s), "addresses"), None); + } + + #[test] + fn test_extract_with_engine_reset() { + let s = get_state_with_engine_changes("{\"Reset\" : \"addresses\"}"); + assert_eq!(extract_v1_ids_only(Some(s.clone()), "addresses"), None); + // bookmarks wasn't reset. + assert_eq!( + extract_v1_ids_only(Some(s), "bookmarks"), + make_csids("qZKAMjhyV6Ti", "AVXtnKkH5OTi") + ); + } + + #[test] + fn test_extract_with_engine_reset_except() { + let s = get_state_with_engine_changes("{\"ResetAllExcept\" : [\"addresses\"]}"); + // addresses is the exception + assert_eq!( + extract_v1_ids_only(Some(s.clone()), "addresses"), + make_csids("qZKAMjhyV6Ti", "8M-HfX6dm-pD") + ); + // bookmarks was reset. + assert_eq!(extract_v1_ids_only(Some(s), "bookmarks"), None); + } +} diff --git a/third_party/rust/sync15/src/record_types.rs b/third_party/rust/sync15/src/record_types.rs new file mode 100644 index 0000000000..f8f1449fdc --- /dev/null +++ b/third_party/rust/sync15/src/record_types.rs @@ -0,0 +1,51 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use serde_derive::*; +use std::collections::HashMap; +use sync_guid::Guid; + +// Known record formats. + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MetaGlobalEngine { + pub version: usize, + #[serde(rename = "syncID")] + pub sync_id: Guid, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MetaGlobalRecord { + #[serde(rename = "syncID")] + pub sync_id: Guid, + #[serde(rename = "storageVersion")] + pub storage_version: usize, + #[serde(default)] + pub engines: HashMap<String, MetaGlobalEngine>, + #[serde(default)] + pub declined: Vec<String>, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Eq, PartialEq)] +pub struct CryptoKeysRecord { + pub id: Guid, + pub collection: String, + pub default: [String; 2], + pub collections: HashMap<String, [String; 2]>, +} + +#[cfg(test)] +#[test] +fn test_deserialize_meta_global() { + let record = serde_json::json!({ + "syncID": "abcd1234abcd", + "storageVersion": 1, + }) + .to_string(); + let r = serde_json::from_str::<MetaGlobalRecord>(&record).unwrap(); + assert_eq!(r.sync_id, "abcd1234abcd"); + assert_eq!(r.storage_version, 1); + assert_eq!(r.engines.len(), 0); + assert_eq!(r.declined.len(), 0); +} diff --git a/third_party/rust/sync15/src/request.rs b/third_party/rust/sync15/src/request.rs new file mode 100644 index 0000000000..9e09f53b68 --- /dev/null +++ b/third_party/rust/sync15/src/request.rs @@ -0,0 +1,1249 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::bso_record::EncryptedBso; +use crate::client::Sync15ClientResponse; +use crate::error::{self, ErrorKind, Result}; +use crate::util::ServerTimestamp; +use serde_derive::*; +use std::collections::HashMap; +use std::default::Default; +use std::ops::Deref; +pub use sync15_traits::{CollectionRequest, RequestOrder}; +use sync_guid::Guid; +use viaduct::status_codes; + +/// Manages a pair of (byte, count) limits for a PostQueue, such as +/// (max_post_bytes, max_post_records) or (max_total_bytes, max_total_records). +#[derive(Debug, Clone)] +struct LimitTracker { + max_bytes: usize, + max_records: usize, + cur_bytes: usize, + cur_records: usize, +} + +impl LimitTracker { + pub fn new(max_bytes: usize, max_records: usize) -> LimitTracker { + LimitTracker { + max_bytes, + max_records, + cur_bytes: 0, + cur_records: 0, + } + } + + pub fn clear(&mut self) { + self.cur_records = 0; + self.cur_bytes = 0; + } + + pub fn can_add_record(&self, payload_size: usize) -> bool { + // Desktop does the cur_bytes check as exclusive, but we shouldn't see any servers that + // don't have https://github.com/mozilla-services/server-syncstorage/issues/73 + self.cur_records < self.max_records && self.cur_bytes + payload_size <= self.max_bytes + } + + pub fn can_never_add(&self, record_size: usize) -> bool { + record_size >= self.max_bytes + } + + pub fn record_added(&mut self, record_size: usize) { + assert!( + self.can_add_record(record_size), + "LimitTracker::record_added caller must check can_add_record" + ); + self.cur_records += 1; + self.cur_bytes += record_size; + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct InfoConfiguration { + /// The maximum size in bytes of the overall HTTP request body that will be accepted by the + /// server. + #[serde(default = "default_max_request_bytes")] + pub max_request_bytes: usize, + + /// The maximum number of records that can be uploaded to a collection in a single POST request. + #[serde(default = "usize::max_value")] + pub max_post_records: usize, + + /// The maximum combined size in bytes of the record payloads that can be uploaded to a + /// collection in a single POST request. + #[serde(default = "usize::max_value")] + pub max_post_bytes: usize, + + /// The maximum total number of records that can be uploaded to a collection as part of a + /// batched upload. + #[serde(default = "usize::max_value")] + pub max_total_records: usize, + + /// The maximum total combined size in bytes of the record payloads that can be uploaded to a + /// collection as part of a batched upload. + #[serde(default = "usize::max_value")] + pub max_total_bytes: usize, + + /// The maximum size of an individual BSO payload, in bytes. + #[serde(default = "default_max_record_payload_bytes")] + pub max_record_payload_bytes: usize, +} + +// This is annoying but seems to be the only way to do it... +fn default_max_request_bytes() -> usize { + 260 * 1024 +} +fn default_max_record_payload_bytes() -> usize { + 256 * 1024 +} + +impl Default for InfoConfiguration { + #[inline] + fn default() -> InfoConfiguration { + InfoConfiguration { + max_request_bytes: default_max_request_bytes(), + max_record_payload_bytes: default_max_record_payload_bytes(), + max_post_records: usize::max_value(), + max_post_bytes: usize::max_value(), + max_total_records: usize::max_value(), + max_total_bytes: usize::max_value(), + } + } +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct InfoCollections(pub(crate) HashMap<String, ServerTimestamp>); + +impl InfoCollections { + pub fn new(collections: HashMap<String, ServerTimestamp>) -> InfoCollections { + InfoCollections(collections) + } +} + +impl Deref for InfoCollections { + type Target = HashMap<String, ServerTimestamp>; + + fn deref(&self) -> &HashMap<String, ServerTimestamp> { + &self.0 + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct UploadResult { + batch: Option<String>, + /// Maps record id => why failed + #[serde(default = "HashMap::new")] + pub failed: HashMap<Guid, String>, + /// Vec of ids + #[serde(default = "Vec::new")] + pub success: Vec<Guid>, +} + +pub type PostResponse = Sync15ClientResponse<UploadResult>; + +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub enum BatchState { + Unsupported, + NoBatch, + InBatch(String), +} + +#[derive(Debug)] +pub struct PostQueue<Post, OnResponse> { + poster: Post, + on_response: OnResponse, + post_limits: LimitTracker, + batch_limits: LimitTracker, + max_payload_bytes: usize, + max_request_bytes: usize, + queued: Vec<u8>, + batch: BatchState, + last_modified: ServerTimestamp, +} + +pub trait BatchPoster { + /// Note: Last argument (reference to the batch poster) is provided for the purposes of testing + /// Important: Poster should not report non-success HTTP statuses as errors!! + fn post<P, O>( + &self, + body: Vec<u8>, + xius: ServerTimestamp, + batch: Option<String>, + commit: bool, + queue: &PostQueue<P, O>, + ) -> Result<PostResponse>; +} + +// We don't just use a FnMut here since we want to override it in mocking for RefCell<TestType>, +// which we can't do for FnMut since neither FnMut nor RefCell are defined here. Also, this +// is somewhat better for documentation. +pub trait PostResponseHandler { + fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()>; +} + +#[derive(Debug, Clone)] +pub(crate) struct NormalResponseHandler { + pub failed_ids: Vec<Guid>, + pub successful_ids: Vec<Guid>, + pub allow_failed: bool, + pub pending_failed: Vec<Guid>, + pub pending_success: Vec<Guid>, +} + +impl NormalResponseHandler { + pub fn new(allow_failed: bool) -> NormalResponseHandler { + NormalResponseHandler { + failed_ids: vec![], + successful_ids: vec![], + pending_failed: vec![], + pending_success: vec![], + allow_failed, + } + } +} + +impl PostResponseHandler for NormalResponseHandler { + fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> error::Result<()> { + match r { + Sync15ClientResponse::Success { record, .. } => { + if !record.failed.is_empty() && !self.allow_failed { + return Err(ErrorKind::RecordUploadFailed.into()); + } + for id in record.success.iter() { + self.pending_success.push(id.clone()); + } + for kv in record.failed.iter() { + self.pending_failed.push(kv.0.clone()); + } + if !mid_batch { + self.successful_ids.append(&mut self.pending_success); + self.failed_ids.append(&mut self.pending_failed); + } + Ok(()) + } + _ => Err(r.create_storage_error().into()), + } + } +} + +impl<Poster, OnResponse> PostQueue<Poster, OnResponse> +where + Poster: BatchPoster, + OnResponse: PostResponseHandler, +{ + pub fn new( + config: &InfoConfiguration, + ts: ServerTimestamp, + poster: Poster, + on_response: OnResponse, + ) -> PostQueue<Poster, OnResponse> { + PostQueue { + poster, + on_response, + last_modified: ts, + post_limits: LimitTracker::new(config.max_post_bytes, config.max_post_records), + batch_limits: LimitTracker::new(config.max_total_bytes, config.max_total_records), + batch: BatchState::NoBatch, + max_payload_bytes: config.max_record_payload_bytes, + max_request_bytes: config.max_request_bytes, + queued: Vec::new(), + } + } + + #[inline] + fn in_batch(&self) -> bool { + !matches!(&self.batch, BatchState::Unsupported | BatchState::NoBatch) + } + + pub fn enqueue(&mut self, record: &EncryptedBso) -> Result<bool> { + let payload_length = record.payload.serialized_len(); + + if self.post_limits.can_never_add(payload_length) + || self.batch_limits.can_never_add(payload_length) + || payload_length >= self.max_payload_bytes + { + log::warn!( + "Single record too large to submit to server ({} b)", + payload_length + ); + return Ok(false); + } + + // Write directly into `queued` but undo if necessary (the vast majority of the time + // it won't be necessary). If we hit a problem we need to undo that, but the only error + // case we have to worry about right now is in flush() + let item_start = self.queued.len(); + + // This is conservative but can't hurt. + self.queued.reserve(payload_length + 2); + + // Either the first character in an array, or a comma separating + // it from the previous item. + let c = if self.queued.is_empty() { b'[' } else { b',' }; + self.queued.push(c); + + // This unwrap is fine, since serde_json's failure case is HashMaps that have non-object + // keys, which is impossible. If you decide to change this part, you *need* to call + // `self.queued.truncate(item_start)` here in the failure case! + serde_json::to_writer(&mut self.queued, &record).unwrap(); + + let item_end = self.queued.len(); + + debug_assert!( + item_end >= payload_length, + "EncryptedPayload::serialized_len is bugged" + ); + + // The + 1 is only relevant for the final record, which will have a trailing ']'. + let item_len = item_end - item_start + 1; + + if item_len >= self.max_request_bytes { + self.queued.truncate(item_start); + log::warn!( + "Single record too large to submit to server ({} b)", + item_len + ); + return Ok(false); + } + + let can_post_record = self.post_limits.can_add_record(payload_length); + let can_batch_record = self.batch_limits.can_add_record(payload_length); + let can_send_record = self.queued.len() < self.max_request_bytes; + + if !can_post_record || !can_send_record || !can_batch_record { + log::debug!( + "PostQueue flushing! (can_post = {}, can_send = {}, can_batch = {})", + can_post_record, + can_send_record, + can_batch_record + ); + // "unwrite" the record. + self.queued.truncate(item_start); + // Flush whatever we have queued. + self.flush(!can_batch_record)?; + // And write it again. + let c = if self.queued.is_empty() { b'[' } else { b',' }; + self.queued.push(c); + serde_json::to_writer(&mut self.queued, &record).unwrap(); + } + + self.post_limits.record_added(payload_length); + self.batch_limits.record_added(payload_length); + + Ok(true) + } + + pub fn flush(&mut self, want_commit: bool) -> Result<()> { + if self.queued.is_empty() { + assert!( + !self.in_batch(), + "Bug: Somehow we're in a batch but have no queued records" + ); + // Nothing to do! + return Ok(()); + } + + self.queued.push(b']'); + let batch_id = match &self.batch { + // Not the first post and we know we have no batch semantics. + BatchState::Unsupported => None, + // First commit in possible batch + BatchState::NoBatch => Some("true".into()), + // In a batch and we have a batch id. + BatchState::InBatch(ref s) => Some(s.clone()), + }; + + log::info!( + "Posting {} records of {} bytes", + self.post_limits.cur_records, + self.queued.len() + ); + + let is_commit = want_commit && batch_id.is_some(); + // Weird syntax for calling a function object that is a property. + let resp_or_error = self.poster.post( + self.queued.clone(), + self.last_modified, + batch_id, + is_commit, + self, + ); + + self.queued.truncate(0); + + if want_commit || self.batch == BatchState::Unsupported { + self.batch_limits.clear(); + } + self.post_limits.clear(); + + let resp = resp_or_error?; + + let (status, last_modified, record) = match resp { + Sync15ClientResponse::Success { + status, + last_modified, + ref record, + .. + } => (status, last_modified, record), + _ => { + self.on_response.handle_response(resp, !want_commit)?; + // on_response() should always fail! + unreachable!(); + } + }; + + if want_commit || self.batch == BatchState::Unsupported { + self.last_modified = last_modified; + } + + if want_commit { + log::debug!("Committed batch {:?}", self.batch); + self.batch = BatchState::NoBatch; + self.on_response.handle_response(resp, false)?; + return Ok(()); + } + + if status != status_codes::ACCEPTED { + if self.in_batch() { + return Err(ErrorKind::ServerBatchProblem( + "Server responded non-202 success code while a batch was in progress", + ) + .into()); + } + self.last_modified = last_modified; + self.batch = BatchState::Unsupported; + self.batch_limits.clear(); + self.on_response.handle_response(resp, false)?; + return Ok(()); + } + + let batch_id = record + .batch + .as_ref() + .ok_or_else(|| { + ErrorKind::ServerBatchProblem("Invalid server response: 202 without a batch ID") + })? + .clone(); + + match &self.batch { + BatchState::Unsupported => { + log::warn!("Server changed its mind about supporting batching mid-batch..."); + } + + BatchState::InBatch(ref cur_id) => { + if cur_id != &batch_id { + return Err(ErrorKind::ServerBatchProblem( + "Invalid server response: 202 without a batch ID", + ) + .into()); + } + } + _ => {} + } + + // Can't change this in match arms without NLL + self.batch = BatchState::InBatch(batch_id); + self.last_modified = last_modified; + + self.on_response.handle_response(resp, true)?; + + Ok(()) + } +} + +#[derive(Clone)] +pub struct UploadInfo { + pub successful_ids: Vec<Guid>, + pub failed_ids: Vec<Guid>, + pub modified_timestamp: ServerTimestamp, +} + +impl<Poster> PostQueue<Poster, NormalResponseHandler> { + // TODO: should take by move + pub fn completed_upload_info(&mut self) -> UploadInfo { + let mut result = UploadInfo { + successful_ids: Vec::with_capacity(self.on_response.successful_ids.len()), + failed_ids: Vec::with_capacity( + self.on_response.failed_ids.len() + + self.on_response.pending_failed.len() + + self.on_response.pending_success.len(), + ), + modified_timestamp: self.last_modified, + }; + + result + .successful_ids + .append(&mut self.on_response.successful_ids); + + result.failed_ids.append(&mut self.on_response.failed_ids); + result + .failed_ids + .append(&mut self.on_response.pending_failed); + result + .failed_ids + .append(&mut self.on_response.pending_success); + + result + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::bso_record::{BsoRecord, EncryptedPayload}; + use lazy_static::lazy_static; + use std::cell::RefCell; + use std::collections::VecDeque; + use std::rc::Rc; + use url::Url; + #[test] + fn test_url_building() { + let base = Url::parse("https://example.com/sync").unwrap(); + let empty = CollectionRequest::new("foo") + .build_url(base.clone()) + .unwrap(); + assert_eq!(empty.as_str(), "https://example.com/sync/storage/foo"); + let batch_start = CollectionRequest::new("bar") + .batch(Some("true".into())) + .commit(false) + .build_url(base.clone()) + .unwrap(); + assert_eq!( + batch_start.as_str(), + "https://example.com/sync/storage/bar?batch=true" + ); + let batch_commit = CollectionRequest::new("asdf") + .batch(Some("1234abc".into())) + .commit(true) + .build_url(base.clone()) + .unwrap(); + assert_eq!( + batch_commit.as_str(), + "https://example.com/sync/storage/asdf?batch=1234abc&commit=true" + ); + + let idreq = CollectionRequest::new("wutang") + .full() + .ids(&["rza", "gza"]) + .build_url(base.clone()) + .unwrap(); + assert_eq!( + idreq.as_str(), + "https://example.com/sync/storage/wutang?full=1&ids=rza%2Cgza" + ); + + let complex = CollectionRequest::new("specific") + .full() + .limit(10) + .sort_by(RequestOrder::Oldest) + .older_than(ServerTimestamp(9_876_540)) + .newer_than(ServerTimestamp(1_234_560)) + .build_url(base) + .unwrap(); + assert_eq!(complex.as_str(), + "https://example.com/sync/storage/specific?full=1&limit=10&older=9876.54&newer=1234.56&sort=oldest"); + } + + #[derive(Debug, Clone)] + struct PostedData { + body: String, + xius: ServerTimestamp, + batch: Option<String>, + commit: bool, + payload_bytes: usize, + records: usize, + } + + impl PostedData { + fn records_as_json(&self) -> Vec<serde_json::Value> { + let values = + serde_json::from_str::<serde_json::Value>(&self.body).expect("Posted invalid json"); + // Check that they actually deserialize as what we want + let records_or_err = serde_json::from_value::<Vec<EncryptedBso>>(values.clone()); + records_or_err.expect("Failed to deserialize data"); + serde_json::from_value(values).unwrap() + } + } + + #[derive(Debug, Clone)] + struct BatchInfo { + id: Option<String>, + posts: Vec<PostedData>, + bytes: usize, + records: usize, + } + + #[derive(Debug, Clone)] + struct TestPoster { + all_posts: Vec<PostedData>, + responses: VecDeque<PostResponse>, + batches: Vec<BatchInfo>, + cur_batch: Option<BatchInfo>, + cfg: InfoConfiguration, + } + + type TestPosterRef = Rc<RefCell<TestPoster>>; + impl TestPoster { + pub fn new<T>(cfg: &InfoConfiguration, responses: T) -> TestPosterRef + where + T: Into<VecDeque<PostResponse>>, + { + Rc::new(RefCell::new(TestPoster { + all_posts: vec![], + responses: responses.into(), + batches: vec![], + cur_batch: None, + cfg: cfg.clone(), + })) + } + // Adds &mut + fn do_post<T, O>( + &mut self, + body: &[u8], + xius: ServerTimestamp, + batch: Option<String>, + commit: bool, + queue: &PostQueue<T, O>, + ) -> Result<PostResponse> { + let mut post = PostedData { + body: String::from_utf8(body.into()).expect("Posted invalid utf8..."), + batch: batch.clone(), + xius, + commit, + payload_bytes: 0, + records: 0, + }; + + assert!(body.len() <= self.cfg.max_request_bytes); + + let (num_records, record_payload_bytes) = { + let recs = post.records_as_json(); + assert!(recs.len() <= self.cfg.max_post_records); + assert!(recs.len() <= self.cfg.max_total_records); + let payload_bytes: usize = recs + .iter() + .map(|r| { + let len = r["payload"] + .as_str() + .expect("Non string payload property") + .len(); + assert!(len <= self.cfg.max_record_payload_bytes); + len + }) + .sum(); + assert!(payload_bytes <= self.cfg.max_post_bytes); + assert!(payload_bytes <= self.cfg.max_total_bytes); + + assert_eq!(queue.post_limits.cur_bytes, payload_bytes); + assert_eq!(queue.post_limits.cur_records, recs.len()); + (recs.len(), payload_bytes) + }; + post.payload_bytes = record_payload_bytes; + post.records = num_records; + + self.all_posts.push(post.clone()); + let response = self.responses.pop_front().unwrap(); + + let record = match response { + Sync15ClientResponse::Success { ref record, .. } => record, + _ => { + panic!("only success codes are used in this test"); + } + }; + + if self.cur_batch.is_none() { + assert!( + batch.is_none() || batch == Some("true".into()), + "We shouldn't be in a batch now" + ); + self.cur_batch = Some(BatchInfo { + id: record.batch.clone(), + posts: vec![], + records: 0, + bytes: 0, + }); + } else { + assert_eq!( + batch, + self.cur_batch.as_ref().unwrap().id, + "We're in a batch but got the wrong batch id" + ); + } + + { + let batch = self.cur_batch.as_mut().unwrap(); + batch.posts.push(post); + batch.records += num_records; + batch.bytes += record_payload_bytes; + + assert!(batch.bytes <= self.cfg.max_total_bytes); + assert!(batch.records <= self.cfg.max_total_records); + + assert_eq!(batch.records, queue.batch_limits.cur_records); + assert_eq!(batch.bytes, queue.batch_limits.cur_bytes); + } + + if commit || record.batch.is_none() { + let batch = self.cur_batch.take().unwrap(); + self.batches.push(batch); + } + + Ok(response) + } + + fn do_handle_response(&mut self, _: PostResponse, mid_batch: bool) -> Result<()> { + assert_eq!(mid_batch, self.cur_batch.is_some()); + Ok(()) + } + } + impl BatchPoster for TestPosterRef { + fn post<T, O>( + &self, + body: Vec<u8>, + xius: ServerTimestamp, + batch: Option<String>, + commit: bool, + queue: &PostQueue<T, O>, + ) -> Result<PostResponse> { + self.borrow_mut().do_post(&body, xius, batch, commit, queue) + } + } + + impl PostResponseHandler for TestPosterRef { + fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()> { + self.borrow_mut().do_handle_response(r, mid_batch) + } + } + + type MockedPostQueue = PostQueue<TestPosterRef, TestPosterRef>; + + fn pq_test_setup( + cfg: InfoConfiguration, + lm: i64, + resps: Vec<PostResponse>, + ) -> (MockedPostQueue, TestPosterRef) { + let tester = TestPoster::new(&cfg, resps); + let pq = PostQueue::new(&cfg, ServerTimestamp(lm), tester.clone(), tester.clone()); + (pq, tester) + } + + fn fake_response<'a, T: Into<Option<&'a str>>>(status: u16, lm: i64, batch: T) -> PostResponse { + assert!(status_codes::is_success_code(status)); + Sync15ClientResponse::Success { + status, + last_modified: ServerTimestamp(lm), + record: UploadResult { + batch: batch.into().map(Into::into), + failed: HashMap::new(), + success: vec![], + }, + route: "test/path".into(), + } + } + + lazy_static! { + // ~40b + static ref PAYLOAD_OVERHEAD: usize = { + let payload = EncryptedPayload { + iv: "".into(), + hmac: "".into(), + ciphertext: "".into() + }; + serde_json::to_string(&payload).unwrap().len() + }; + // ~80b + static ref TOTAL_RECORD_OVERHEAD: usize = { + let val = serde_json::to_value(BsoRecord { + id: "".into(), + collection: "".into(), + modified: ServerTimestamp(0), + sortindex: None, + ttl: None, + payload: EncryptedPayload { + iv: "".into(), + hmac: "".into(), + ciphertext: "".into() + }, + }).unwrap(); + serde_json::to_string(&val).unwrap().len() + }; + // There's some subtlety in how we calulate this having to do with the fact that + // the quotes in the payload are escaped but the escape chars count to the request len + // and *not* to the payload len (the payload len check happens after json parsing the + // top level object). + static ref NON_PAYLOAD_OVERHEAD: usize = { + *TOTAL_RECORD_OVERHEAD - *PAYLOAD_OVERHEAD + }; + } + + // Actual record size (for max_request_len) will be larger by some amount + fn make_record(payload_size: usize) -> EncryptedBso { + assert!(payload_size > *PAYLOAD_OVERHEAD); + let ciphertext_len = payload_size - *PAYLOAD_OVERHEAD; + BsoRecord { + id: "".into(), + collection: "".into(), + modified: ServerTimestamp(0), + sortindex: None, + ttl: None, + payload: EncryptedPayload { + iv: "".into(), + hmac: "".into(), + ciphertext: "x".repeat(ciphertext_len), + }, + } + } + + fn request_bytes_for_payloads(payloads: &[usize]) -> usize { + 1 + payloads + .iter() + .map(|&size| size + 1 + *NON_PAYLOAD_OVERHEAD) + .sum::<usize>() + } + + #[test] + fn test_pq_basic() { + let cfg = InfoConfiguration { + max_request_bytes: 1000, + max_record_payload_bytes: 1000, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![fake_response(status_codes::OK, time + 100_000, None)], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.flush(true).unwrap(); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 1); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].posts.len(), 1); + assert_eq!(t.batches[0].records, 1); + assert_eq!(t.batches[0].bytes, 100); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + #[test] + fn test_pq_max_request_bytes_no_batch() { + let cfg = InfoConfiguration { + max_request_bytes: 250, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::OK, time + 100_000, None), + fake_response(status_codes::OK, time + 200_000, None), + ], + ); + + // Note that the total record overhead is around 85 bytes + let payload_size = 100 - *NON_PAYLOAD_OVERHEAD; + pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r] + pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 203; [r,r] + pq.enqueue(&make_record(payload_size)).unwrap(); // too big, 2nd post. + pq.flush(true).unwrap(); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 2); + assert_eq!(t.batches.len(), 2); + assert_eq!(t.batches[0].posts.len(), 1); + assert_eq!(t.batches[0].records, 2); + assert_eq!(t.batches[0].bytes, payload_size * 2); + assert_eq!(t.batches[0].posts[0].batch, Some("true".into())); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[payload_size, payload_size]) + ); + + assert_eq!(t.batches[1].posts.len(), 1); + assert_eq!(t.batches[1].records, 1); + assert_eq!(t.batches[1].bytes, payload_size); + // We know at this point that the server does not support batching. + assert_eq!(t.batches[1].posts[0].batch, None); + assert_eq!(t.batches[1].posts[0].commit, false); + assert_eq!( + t.batches[1].posts[0].body.len(), + request_bytes_for_payloads(&[payload_size]) + ); + } + + #[test] + fn test_pq_max_record_payload_bytes_no_batch() { + let cfg = InfoConfiguration { + max_record_payload_bytes: 150, + max_request_bytes: 350, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::OK, time + 100_000, None), + fake_response(status_codes::OK, time + 200_000, None), + ], + ); + + // Note that the total record overhead is around 85 bytes + let payload_size = 100 - *NON_PAYLOAD_OVERHEAD; + pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r] + let enqueued = pq.enqueue(&make_record(151)).unwrap(); // still 102 + assert!(!enqueued, "Should not have fit"); + pq.enqueue(&make_record(payload_size)).unwrap(); + pq.flush(true).unwrap(); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 1); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].posts.len(), 1); + assert_eq!(t.batches[0].records, 2); + assert_eq!(t.batches[0].bytes, payload_size * 2); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[payload_size, payload_size]) + ); + } + + #[test] + fn test_pq_single_batch() { + let cfg = InfoConfiguration::default(); + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![fake_response( + status_codes::ACCEPTED, + time + 100_000, + Some("1234"), + )], + ); + + let payload_size = 100 - *NON_PAYLOAD_OVERHEAD; + pq.enqueue(&make_record(payload_size)).unwrap(); + pq.enqueue(&make_record(payload_size)).unwrap(); + pq.enqueue(&make_record(payload_size)).unwrap(); + pq.flush(true).unwrap(); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 1); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].id.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts.len(), 1); + assert_eq!(t.batches[0].records, 3); + assert_eq!(t.batches[0].bytes, payload_size * 3); + assert_eq!(t.batches[0].posts[0].commit, true); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[payload_size, payload_size, payload_size]) + ); + } + + #[test] + fn test_pq_multi_post_batch_bytes() { + let cfg = InfoConfiguration { + max_post_bytes: 200, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), + ], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.flush(true).unwrap(); // COMMIT + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 2); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].posts.len(), 2); + assert_eq!(t.batches[0].records, 3); + assert_eq!(t.batches[0].bytes, 300); + + assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[0].posts[0].records, 2); + assert_eq!(t.batches[0].posts[0].payload_bytes, 200); + assert_eq!(t.batches[0].posts[0].commit, false); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100]) + ); + + assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[1].records, 1); + assert_eq!(t.batches[0].posts[1].payload_bytes, 100); + assert_eq!(t.batches[0].posts[1].commit, true); + assert_eq!( + t.batches[0].posts[1].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + #[test] + fn test_pq_multi_post_batch_records() { + let cfg = InfoConfiguration { + max_post_records: 3, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), + ], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.flush(true).unwrap(); // COMMIT + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 3); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].posts.len(), 3); + assert_eq!(t.batches[0].records, 7); + assert_eq!(t.batches[0].bytes, 700); + + assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[0].posts[0].records, 3); + assert_eq!(t.batches[0].posts[0].payload_bytes, 300); + assert_eq!(t.batches[0].posts[0].commit, false); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[1].records, 3); + assert_eq!(t.batches[0].posts[1].payload_bytes, 300); + assert_eq!(t.batches[0].posts[1].commit, false); + assert_eq!( + t.batches[0].posts[1].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[0].posts[2].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[2].records, 1); + assert_eq!(t.batches[0].posts[2].payload_bytes, 100); + assert_eq!(t.batches[0].posts[2].commit, true); + assert_eq!( + t.batches[0].posts[2].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + #[test] + #[allow(clippy::cognitive_complexity)] + fn test_pq_multi_post_multi_batch_records() { + let cfg = InfoConfiguration { + max_post_records: 3, + max_total_records: 5, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")), + fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")), + ], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + COMMIT + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.flush(true).unwrap(); // COMMIT + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 4); + assert_eq!(t.batches.len(), 2); + assert_eq!(t.batches[0].posts.len(), 2); + assert_eq!(t.batches[1].posts.len(), 2); + + assert_eq!(t.batches[0].records, 5); + assert_eq!(t.batches[1].records, 4); + + assert_eq!(t.batches[0].bytes, 500); + assert_eq!(t.batches[1].bytes, 400); + + assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[0].posts[0].records, 3); + assert_eq!(t.batches[0].posts[0].payload_bytes, 300); + assert_eq!(t.batches[0].posts[0].commit, false); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[1].records, 2); + assert_eq!(t.batches[0].posts[1].payload_bytes, 200); + assert_eq!(t.batches[0].posts[1].commit, true); + assert_eq!( + t.batches[0].posts[1].body.len(), + request_bytes_for_payloads(&[100, 100]) + ); + + assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[1].posts[0].records, 3); + assert_eq!(t.batches[1].posts[0].payload_bytes, 300); + assert_eq!(t.batches[1].posts[0].commit, false); + assert_eq!( + t.batches[1].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd"); + assert_eq!(t.batches[1].posts[1].records, 1); + assert_eq!(t.batches[1].posts[1].payload_bytes, 100); + assert_eq!(t.batches[1].posts[1].commit, true); + assert_eq!( + t.batches[1].posts[1].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + #[test] + #[allow(clippy::cognitive_complexity)] + fn test_pq_multi_post_multi_batch_bytes() { + let cfg = InfoConfiguration { + max_post_bytes: 300, + max_total_bytes: 500, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), // should commit + fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")), + fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")), // should commit + ], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + assert_eq!(pq.last_modified.0, time); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + COMMIT + pq.enqueue(&make_record(100)).unwrap(); + assert_eq!(pq.last_modified.0, time + 100_000); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + + // POST + pq.enqueue(&make_record(100)).unwrap(); + assert_eq!(pq.last_modified.0, time + 100_000); + pq.flush(true).unwrap(); // COMMIT + + assert_eq!(pq.last_modified.0, time + 200_000); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 4); + assert_eq!(t.batches.len(), 2); + assert_eq!(t.batches[0].posts.len(), 2); + assert_eq!(t.batches[1].posts.len(), 2); + + assert_eq!(t.batches[0].records, 5); + assert_eq!(t.batches[1].records, 4); + + assert_eq!(t.batches[0].bytes, 500); + assert_eq!(t.batches[1].bytes, 400); + + assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[0].posts[0].records, 3); + assert_eq!(t.batches[0].posts[0].payload_bytes, 300); + assert_eq!(t.batches[0].posts[0].commit, false); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[1].records, 2); + assert_eq!(t.batches[0].posts[1].payload_bytes, 200); + assert_eq!(t.batches[0].posts[1].commit, true); + assert_eq!( + t.batches[0].posts[1].body.len(), + request_bytes_for_payloads(&[100, 100]) + ); + + assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[1].posts[0].records, 3); + assert_eq!(t.batches[1].posts[0].payload_bytes, 300); + assert_eq!(t.batches[1].posts[0].commit, false); + assert_eq!( + t.batches[1].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd"); + assert_eq!(t.batches[1].posts[1].records, 1); + assert_eq!(t.batches[1].posts[1].payload_bytes, 100); + assert_eq!(t.batches[1].posts[1].commit, true); + assert_eq!( + t.batches[1].posts[1].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + // TODO: Test + // + // - error cases!!! We don't test our handling of server errors at all! + // - mixed bytes/record limits + // + // A lot of these have good examples in test_postqueue.js on deskftop sync +} diff --git a/third_party/rust/sync15/src/state.rs b/third_party/rust/sync15/src/state.rs new file mode 100644 index 0000000000..e25f72305a --- /dev/null +++ b/third_party/rust/sync15/src/state.rs @@ -0,0 +1,1130 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::collections::{HashMap, HashSet}; + +use crate::bso_record::EncryptedBso; +use crate::client::{SetupStorageClient, Sync15ClientResponse}; +use crate::collection_keys::CollectionKeys; +use crate::error::{self, ErrorKind, ErrorResponse}; +use crate::key_bundle::KeyBundle; +use crate::record_types::{MetaGlobalEngine, MetaGlobalRecord}; +use crate::request::{InfoCollections, InfoConfiguration}; +use crate::util::ServerTimestamp; +use interrupt_support::Interruptee; +use serde_derive::*; +use sync_guid::Guid; + +use self::SetupState::*; + +const STORAGE_VERSION: usize = 5; + +/// Maps names to storage versions for engines to include in a fresh +/// `meta/global` record. We include engines that we don't implement +/// because they'll be disabled on other clients if we omit them +/// (bug 1479929). +const DEFAULT_ENGINES: &[(&str, usize)] = &[ + ("passwords", 1), + ("clients", 1), + ("addons", 1), + ("addresses", 1), + ("bookmarks", 2), + ("creditcards", 1), + ("forms", 1), + ("history", 1), + ("prefs", 2), + ("tabs", 1), +]; + +// Declined engines to include in a fresh `meta/global` record. +const DEFAULT_DECLINED: &[&str] = &[]; + +/// State that we require the app to persist to storage for us. +/// It's a little unfortunate we need this, because it's only tracking +/// "declined engines", and even then, only needed in practice when there's +/// no meta/global so we need to create one. It's extra unfortunate because we +/// want to move away from "globally declined" engines anyway, moving towards +/// allowing engines to be enabled or disabled per client rather than globally. +/// +/// Apps are expected to treat this as opaque, so we support serializing it. +/// Note that this structure is *not* used to *change* the declined engines +/// list - that will be done in the future, but the API exposed for that +/// purpose will also take a mutable PersistedGlobalState. +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "schema_version")] +pub enum PersistedGlobalState { + /// V1 was when we persisted the entire GlobalState, keys and all! + + /// V2 is just tracking the globally declined list. + /// None means "I've no idea" and theoretically should only happen on the + /// very first sync for an app. + V2 { declined: Option<Vec<String>> }, +} + +impl Default for PersistedGlobalState { + #[inline] + fn default() -> PersistedGlobalState { + PersistedGlobalState::V2 { declined: None } + } +} + +#[derive(Debug, Default, Clone, PartialEq)] +pub(crate) struct EngineChangesNeeded { + pub local_resets: HashSet<String>, + pub remote_wipes: HashSet<String>, +} + +#[derive(Debug, Default, Clone, PartialEq)] +struct RemoteEngineState { + info_collections: HashSet<String>, + declined: HashSet<String>, +} + +#[derive(Debug, Default, Clone, PartialEq)] +struct EngineStateInput { + local_declined: HashSet<String>, + remote: Option<RemoteEngineState>, + user_changes: HashMap<String, bool>, +} + +#[derive(Debug, Default, Clone, PartialEq)] +struct EngineStateOutput { + // The new declined. + declined: HashSet<String>, + // Which engines need resets or wipes. + changes_needed: EngineChangesNeeded, +} + +fn compute_engine_states(input: EngineStateInput) -> EngineStateOutput { + use crate::util::*; + log::debug!("compute_engine_states: input {:?}", input); + let (must_enable, must_disable) = partition_by_value(&input.user_changes); + let have_remote = input.remote.is_some(); + let RemoteEngineState { + info_collections, + declined: remote_declined, + } = input.remote.clone().unwrap_or_default(); + + let both_declined_and_remote = set_intersection(&info_collections, &remote_declined); + if !both_declined_and_remote.is_empty() { + // Should we wipe these too? + log::warn!( + "Remote state contains engines which are in both info/collections and meta/global's declined: {:?}", + both_declined_and_remote, + ); + } + + let most_recent_declined_list = if have_remote { + &remote_declined + } else { + &input.local_declined + }; + + let result_declined = set_difference( + &set_union(most_recent_declined_list, &must_disable), + &must_enable, + ); + + let output = EngineStateOutput { + changes_needed: EngineChangesNeeded { + // Anything now declined which wasn't in our declined list before gets a reset. + local_resets: set_difference(&result_declined, &input.local_declined), + // Anything remote that we just declined gets a wipe. In the future + // we might want to consider wiping things in both remote declined + // and info/collections, but we'll let other clients pick up their + // own mess for now. + remote_wipes: set_intersection(&info_collections, &must_disable), + }, + declined: result_declined, + }; + // No PII here and this helps debug problems. + log::debug!("compute_engine_states: output {:?}", output); + output +} + +impl PersistedGlobalState { + fn set_declined(&mut self, new_declined: Vec<String>) { + match self { + Self::V2 { ref mut declined } => *declined = Some(new_declined), + } + } + pub(crate) fn get_declined(&self) -> &[String] { + match self { + Self::V2 { declined: Some(d) } => &d, + Self::V2 { declined: None } => &[], + } + } +} + +/// Holds global Sync state, including server upload limits, the +/// last-fetched collection modified times, `meta/global` record, and +/// encrypted copies of the crypto/keys resourse (which we hold as encrypted +/// both to avoid keeping them in memory longer than necessary, and guard against +/// the wrong (ie, a different user's) root key being passed in. +#[derive(Debug, Clone)] +pub struct GlobalState { + pub config: InfoConfiguration, + pub collections: InfoCollections, + pub global: MetaGlobalRecord, + pub global_timestamp: ServerTimestamp, + pub keys: EncryptedBso, +} + +/// Creates a fresh `meta/global` record, using the default engine selections, +/// and declined engines from our PersistedGlobalState. +fn new_global(pgs: &PersistedGlobalState) -> error::Result<MetaGlobalRecord> { + let sync_id = Guid::random(); + let mut engines: HashMap<String, _> = HashMap::new(); + for (name, version) in DEFAULT_ENGINES.iter() { + let sync_id = Guid::random(); + engines.insert( + (*name).to_string(), + MetaGlobalEngine { + version: *version, + sync_id, + }, + ); + } + // We only need our PersistedGlobalState to fill out a new meta/global - if + // we previously saw a meta/global then we would have updated it with what + // it was at the time. + let declined = match pgs { + PersistedGlobalState::V2 { declined: Some(d) } => d.clone(), + _ => DEFAULT_DECLINED.iter().map(ToString::to_string).collect(), + }; + + Ok(MetaGlobalRecord { + sync_id, + storage_version: STORAGE_VERSION, + engines, + declined, + }) +} + +fn fixup_meta_global(global: &mut MetaGlobalRecord) -> bool { + let mut changed_any = false; + for &(name, version) in DEFAULT_ENGINES.iter() { + let had_engine = global.engines.contains_key(name); + let should_have_engine = !global.declined.iter().any(|c| c == name); + if had_engine != should_have_engine { + if should_have_engine { + log::debug!("SyncID for engine {:?} was missing", name); + global.engines.insert( + name.to_string(), + MetaGlobalEngine { + version, + sync_id: Guid::random(), + }, + ); + } else { + log::debug!("SyncID for engine {:?} was present, but shouldn't be", name); + global.engines.remove(name); + } + changed_any = true; + } + } + changed_any +} + +pub struct SetupStateMachine<'a> { + client: &'a dyn SetupStorageClient, + root_key: &'a KeyBundle, + pgs: &'a mut PersistedGlobalState, + // `allowed_states` is designed so that we can arrange for the concept of + // a "fast" sync - so we decline to advance if we need to setup from scratch. + // The idea is that if we need to sync before going to sleep we should do + // it as fast as possible. However, in practice this isn't going to do + // what we expect - a "fast sync" that finds lots to do is almost certainly + // going to take longer than a "full sync" that finds nothing to do. + // We should almost certainly remove this and instead allow for a "time + // budget", after which we get interrupted. Later... + allowed_states: Vec<&'static str>, + sequence: Vec<&'static str>, + engine_updates: Option<&'a HashMap<String, bool>>, + interruptee: &'a dyn Interruptee, + pub(crate) changes_needed: Option<EngineChangesNeeded>, +} + +impl<'a> SetupStateMachine<'a> { + /// Creates a state machine for a "classic" Sync 1.5 client that supports + /// all states, including uploading a fresh `meta/global` and `crypto/keys` + /// after a node reassignment. + pub fn for_full_sync( + client: &'a dyn SetupStorageClient, + root_key: &'a KeyBundle, + pgs: &'a mut PersistedGlobalState, + engine_updates: Option<&'a HashMap<String, bool>>, + interruptee: &'a dyn Interruptee, + ) -> SetupStateMachine<'a> { + SetupStateMachine::with_allowed_states( + client, + root_key, + pgs, + interruptee, + engine_updates, + vec![ + "Initial", + "InitialWithConfig", + "InitialWithInfo", + "InitialWithMetaGlobal", + "Ready", + "FreshStartRequired", + "WithPreviousState", + ], + ) + } + + /// Creates a state machine for a fast sync, which only uses locally + /// cached global state, and bails if `meta/global` or `crypto/keys` + /// are missing or out-of-date. This is useful in cases where it's + /// important to get to ready as quickly as possible, like syncing before + /// sleep, or when conserving time or battery life. + pub fn for_fast_sync( + client: &'a dyn SetupStorageClient, + root_key: &'a KeyBundle, + pgs: &'a mut PersistedGlobalState, + engine_updates: Option<&'a HashMap<String, bool>>, + interruptee: &'a dyn Interruptee, + ) -> SetupStateMachine<'a> { + SetupStateMachine::with_allowed_states( + client, + root_key, + pgs, + interruptee, + engine_updates, + vec!["Ready", "WithPreviousState"], + ) + } + + /// Creates a state machine for a read-only sync, where the client can't + /// upload `meta/global` or `crypto/keys`. Useful for clients that only + /// sync specific collections, like Lockbox. + pub fn for_readonly_sync( + client: &'a dyn SetupStorageClient, + root_key: &'a KeyBundle, + pgs: &'a mut PersistedGlobalState, + interruptee: &'a dyn Interruptee, + ) -> SetupStateMachine<'a> { + SetupStateMachine::with_allowed_states( + client, + root_key, + pgs, + interruptee, + // No engine updates for a readonly sync + None, + // We don't allow a FreshStart in a read-only sync. + vec![ + "Initial", + "InitialWithConfig", + "InitialWithInfo", + "InitialWithMetaGlobal", + "Ready", + "WithPreviousState", + ], + ) + } + + fn with_allowed_states( + client: &'a dyn SetupStorageClient, + root_key: &'a KeyBundle, + pgs: &'a mut PersistedGlobalState, + interruptee: &'a dyn Interruptee, + engine_updates: Option<&'a HashMap<String, bool>>, + allowed_states: Vec<&'static str>, + ) -> SetupStateMachine<'a> { + SetupStateMachine { + client, + root_key, + pgs, + sequence: Vec::new(), + allowed_states, + engine_updates, + interruptee, + changes_needed: None, + } + } + + fn advance(&mut self, from: SetupState) -> error::Result<SetupState> { + match from { + // Fetch `info/configuration` with current server limits, and + // `info/collections` with collection last modified times. + Initial => { + let config = match self.client.fetch_info_configuration()? { + Sync15ClientResponse::Success { record, .. } => record, + Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => { + InfoConfiguration::default() + } + other => return Err(other.create_storage_error().into()), + }; + Ok(InitialWithConfig { config }) + } + + // XXX - we could consider combining these Initial* states, because we don't + // attempt to support filling in "missing" global state - *any* 404 in them + // means `FreshStart`. + // IOW, in all cases, they either `Err()`, move to `FreshStartRequired`, or + // advance to a specific next state. + InitialWithConfig { config } => { + match self.client.fetch_info_collections()? { + Sync15ClientResponse::Success { + record: collections, + .. + } => Ok(InitialWithInfo { + config, + collections, + }), + // If the server doesn't have a `crypto/keys`, start over + // and reupload our `meta/global` and `crypto/keys`. + Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => { + Ok(FreshStartRequired { config }) + } + other => Err(other.create_storage_error().into()), + } + } + + InitialWithInfo { + config, + collections, + } => { + match self.client.fetch_meta_global()? { + Sync15ClientResponse::Success { + record: mut global, + last_modified: mut global_timestamp, + .. + } => { + // If the server has a newer storage version, we can't + // sync until our client is updated. + if global.storage_version > STORAGE_VERSION { + return Err(ErrorKind::ClientUpgradeRequired.into()); + } + + // If the server has an older storage version, wipe and + // reupload. + if global.storage_version < STORAGE_VERSION { + Ok(FreshStartRequired { config }) + } else { + log::info!("Have info/collections and meta/global. Computing new engine states"); + let initial_global_declined: HashSet<String> = + global.declined.iter().cloned().collect(); + let result = compute_engine_states(EngineStateInput { + local_declined: self.pgs.get_declined().iter().cloned().collect(), + user_changes: self.engine_updates.cloned().unwrap_or_default(), + remote: Some(RemoteEngineState { + declined: initial_global_declined.clone(), + info_collections: collections.keys().cloned().collect(), + }), + }); + // Persist the new declined. + self.pgs + .set_declined(result.declined.iter().cloned().collect()); + // If the declined engines differ from remote, fix that. + let fixed_declined = if result.declined != initial_global_declined { + global.declined = result.declined.iter().cloned().collect(); + log::info!( + "Uploading new declined {:?} to meta/global with timestamp {:?}", + global.declined, + global_timestamp, + ); + true + } else { + false + }; + // If there are missing syncIds, we need to fix those as well + let fixed_ids = if fixup_meta_global(&mut global) { + log::info!( + "Uploading corrected meta/global with timestamp {:?}", + global_timestamp, + ); + true + } else { + false + }; + + if fixed_declined || fixed_ids { + global_timestamp = + self.client.put_meta_global(global_timestamp, &global)?; + log::debug!("new global_timestamp: {:?}", global_timestamp); + } + // Update the set of changes needed. + if self.changes_needed.is_some() { + // Should never happen (we prevent state machine + // loops elsewhere) but if it did, the info is stale + // anyway. + log::warn!("Already have a set of changes needed, Overwriting..."); + } + self.changes_needed = Some(result.changes_needed); + Ok(InitialWithMetaGlobal { + config, + collections, + global, + global_timestamp, + }) + } + } + Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => { + Ok(FreshStartRequired { config }) + } + other => Err(other.create_storage_error().into()), + } + } + + InitialWithMetaGlobal { + config, + collections, + global, + global_timestamp, + } => { + // Now try and get keys etc - if we fresh-start we'll re-use declined. + match self.client.fetch_crypto_keys()? { + Sync15ClientResponse::Success { + record, + last_modified, + .. + } => { + // Note that collection/keys is itself a bso, so the + // json body also carries the timestamp. If they aren't + // identical something has screwed up and we should die. + assert_eq!(last_modified, record.modified); + let state = GlobalState { + config, + collections, + global, + global_timestamp, + keys: record, + }; + Ok(Ready { state }) + } + // If the server doesn't have a `crypto/keys`, start over + // and reupload our `meta/global` and `crypto/keys`. + Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => { + Ok(FreshStartRequired { config }) + } + other => Err(other.create_storage_error().into()), + } + } + + // We've got old state that's likely to be OK. + // We keep things simple here - if there's evidence of a new/missing + // meta/global or new/missing keys we just restart from scratch. + WithPreviousState { old_state } => match self.client.fetch_info_collections()? { + Sync15ClientResponse::Success { + record: collections, + .. + } => Ok( + if self.engine_updates.is_none() + && is_same_timestamp(old_state.global_timestamp, &collections, "meta") + && is_same_timestamp(old_state.keys.modified, &collections, "crypto") + { + Ready { + state: GlobalState { + collections, + ..old_state + }, + } + } else { + InitialWithConfig { + config: old_state.config, + } + }, + ), + _ => Ok(InitialWithConfig { + config: old_state.config, + }), + }, + + Ready { state } => Ok(Ready { state }), + + FreshStartRequired { config } => { + // Wipe the server. + log::info!("Fresh start: wiping remote"); + self.client.wipe_all_remote()?; + + // Upload a fresh `meta/global`... + log::info!("Uploading meta/global"); + let computed = compute_engine_states(EngineStateInput { + local_declined: self.pgs.get_declined().iter().cloned().collect(), + user_changes: self.engine_updates.cloned().unwrap_or_default(), + remote: None, + }); + self.pgs + .set_declined(computed.declined.iter().cloned().collect()); + + self.changes_needed = Some(computed.changes_needed); + + let new_global = new_global(self.pgs)?; + + self.client + .put_meta_global(ServerTimestamp::default(), &new_global)?; + + // ...And a fresh `crypto/keys`. + let new_keys = CollectionKeys::new_random()?.to_encrypted_bso(&self.root_key)?; + self.client + .put_crypto_keys(ServerTimestamp::default(), &new_keys)?; + + // TODO(lina): Can we pass along server timestamps from the PUTs + // above, and avoid re-fetching the `m/g` and `c/k` we just + // uploaded? + // OTOH(mark): restarting the state machine keeps life simple and rare. + Ok(InitialWithConfig { config }) + } + } + } + + /// Runs through the state machine to the ready state. + pub fn run_to_ready(&mut self, state: Option<GlobalState>) -> error::Result<GlobalState> { + let mut s = match state { + Some(old_state) => WithPreviousState { old_state }, + None => Initial, + }; + loop { + self.interruptee.err_if_interrupted()?; + let label = &s.label(); + log::trace!("global state: {:?}", label); + match s { + Ready { state } => { + self.sequence.push(label); + return Ok(state); + } + // If we already started over once before, we're likely in a + // cycle, and should try again later. Intermediate states + // aren't a problem, just the initial ones. + FreshStartRequired { .. } | WithPreviousState { .. } | Initial => { + if self.sequence.contains(&label) { + // Is this really the correct error? + return Err(ErrorKind::SetupRace.into()); + } + } + _ => { + if !self.allowed_states.contains(&label) { + return Err(ErrorKind::SetupRequired.into()); + } + } + }; + self.sequence.push(label); + s = self.advance(s)?; + } + } +} + +/// States in the remote setup process. +/// TODO(lina): Add link once #56 is merged. +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +enum SetupState { + // These "Initial" states are only ever used when starting from scratch. + Initial, + InitialWithConfig { + config: InfoConfiguration, + }, + InitialWithInfo { + config: InfoConfiguration, + collections: InfoCollections, + }, + InitialWithMetaGlobal { + config: InfoConfiguration, + collections: InfoCollections, + global: MetaGlobalRecord, + global_timestamp: ServerTimestamp, + }, + WithPreviousState { + old_state: GlobalState, + }, + Ready { + state: GlobalState, + }, + FreshStartRequired { + config: InfoConfiguration, + }, +} + +impl SetupState { + fn label(&self) -> &'static str { + match self { + Initial { .. } => "Initial", + InitialWithConfig { .. } => "InitialWithConfig", + InitialWithInfo { .. } => "InitialWithInfo", + InitialWithMetaGlobal { .. } => "InitialWithMetaGlobal", + Ready { .. } => "Ready", + WithPreviousState { .. } => "WithPreviousState", + FreshStartRequired { .. } => "FreshStartRequired", + } + } +} + +/// Whether we should skip fetching an item. Used when we already have timestamps +/// and want to check if we should reuse our existing state. The state's fairly +/// cheap to recreate and very bad to use if it is wrong, so we insist on the +/// *exact* timestamp matching and not a simple "later than" check. +fn is_same_timestamp(local: ServerTimestamp, collections: &InfoCollections, key: &str) -> bool { + collections.get(key).map_or(false, |ts| local == *ts) +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::bso_record::{BsoRecord, EncryptedBso, EncryptedPayload, Payload}; + use crate::record_types::CryptoKeysRecord; + use interrupt_support::NeverInterrupts; + + struct InMemoryClient { + info_configuration: error::Result<Sync15ClientResponse<InfoConfiguration>>, + info_collections: error::Result<Sync15ClientResponse<InfoCollections>>, + meta_global: error::Result<Sync15ClientResponse<MetaGlobalRecord>>, + crypto_keys: error::Result<Sync15ClientResponse<BsoRecord<EncryptedPayload>>>, + } + + impl SetupStorageClient for InMemoryClient { + fn fetch_info_configuration( + &self, + ) -> error::Result<Sync15ClientResponse<InfoConfiguration>> { + match &self.info_configuration { + Ok(client_response) => Ok(client_response.clone()), + Err(_) => Ok(Sync15ClientResponse::Error(ErrorResponse::ServerError { + status: 500, + route: "test/path".into(), + })), + } + } + + fn fetch_info_collections(&self) -> error::Result<Sync15ClientResponse<InfoCollections>> { + match &self.info_collections { + Ok(collections) => Ok(collections.clone()), + Err(_) => Ok(Sync15ClientResponse::Error(ErrorResponse::ServerError { + status: 500, + route: "test/path".into(), + })), + } + } + + fn fetch_meta_global(&self) -> error::Result<Sync15ClientResponse<MetaGlobalRecord>> { + match &self.meta_global { + Ok(global) => Ok(global.clone()), + // TODO(lina): Special handling for 404s, we want to ensure we + // handle missing keys and other server errors correctly. + Err(_) => Ok(Sync15ClientResponse::Error(ErrorResponse::ServerError { + status: 500, + route: "test/path".into(), + })), + } + } + + fn put_meta_global( + &self, + xius: ServerTimestamp, + global: &MetaGlobalRecord, + ) -> error::Result<ServerTimestamp> { + // Ensure that the meta/global record we uploaded is "fixed up" + assert!(DEFAULT_ENGINES + .iter() + .filter(|e| e.0 != "logins") + .all(|&(k, _v)| global.engines.contains_key(k))); + assert!(!global.engines.contains_key("logins")); + assert_eq!(global.declined, vec!["logins".to_string()]); + // return a different timestamp. + Ok(ServerTimestamp(xius.0 + 1)) + } + + fn fetch_crypto_keys(&self) -> error::Result<Sync15ClientResponse<EncryptedBso>> { + match &self.crypto_keys { + Ok(keys) => Ok(keys.clone()), + // TODO(lina): Same as above, for 404s. + Err(_) => Ok(Sync15ClientResponse::Error(ErrorResponse::ServerError { + status: 500, + route: "test/path".into(), + })), + } + } + + fn put_crypto_keys( + &self, + xius: ServerTimestamp, + _keys: &EncryptedBso, + ) -> error::Result<()> { + assert_eq!(xius, ServerTimestamp(888_800)); + Err(ErrorKind::StorageHttpError(ErrorResponse::ServerError { + status: 500, + route: "crypto/keys".to_string(), + }) + .into()) + } + + fn wipe_all_remote(&self) -> error::Result<()> { + Ok(()) + } + } + + fn mocked_success_ts<T>(t: T, ts: i64) -> error::Result<Sync15ClientResponse<T>> { + Ok(Sync15ClientResponse::Success { + status: 200, + record: t, + last_modified: ServerTimestamp(ts), + route: "test/path".into(), + }) + } + + fn mocked_success<T>(t: T) -> error::Result<Sync15ClientResponse<T>> { + mocked_success_ts(t, 0) + } + + // for tests, we want a BSO with a specific timestamp, which we never + // need in non-test-code as the timestamp comes from the server. + impl CollectionKeys { + pub fn to_encrypted_bso_with_timestamp( + &self, + root_key: &KeyBundle, + modified: ServerTimestamp, + ) -> error::Result<EncryptedBso> { + let record = CryptoKeysRecord { + id: "keys".into(), + collection: "crypto".into(), + default: self.default.to_b64_array(), + collections: self + .collections + .iter() + .map(|kv| (kv.0.clone(), kv.1.to_b64_array())) + .collect(), + }; + let mut bso = + crate::CleartextBso::from_payload(Payload::from_record(record)?, "crypto"); + bso.modified = modified; + Ok(bso.encrypt(root_key)?) + } + } + + #[test] + fn test_state_machine_ready_from_empty() { + let root_key = KeyBundle::new_random().unwrap(); + let keys = CollectionKeys { + timestamp: ServerTimestamp(123_400), + default: KeyBundle::new_random().unwrap(), + collections: HashMap::new(), + }; + let mg = MetaGlobalRecord { + sync_id: "syncIDAAAAAA".into(), + storage_version: 5usize, + engines: vec![( + "bookmarks", + MetaGlobalEngine { + version: 1usize, + sync_id: "syncIDBBBBBB".into(), + }, + )] + .into_iter() + .map(|(key, value)| (key.to_owned(), value)) + .collect(), + // We ensure that the record we upload doesn't have a logins record. + declined: vec!["logins".to_string()], + }; + let client = InMemoryClient { + info_configuration: mocked_success(InfoConfiguration::default()), + info_collections: mocked_success(InfoCollections::new( + vec![("meta", 123_456), ("crypto", 145_000)] + .into_iter() + .map(|(key, value)| (key.to_owned(), ServerTimestamp(value))) + .collect(), + )), + meta_global: mocked_success_ts(mg, 999_000), + crypto_keys: mocked_success_ts( + keys.to_encrypted_bso_with_timestamp(&root_key, ServerTimestamp(888_000)) + .expect("should always work in this test"), + 888_000, + ), + }; + let mut pgs = PersistedGlobalState::V2 { declined: None }; + + let mut state_machine = + SetupStateMachine::for_full_sync(&client, &root_key, &mut pgs, None, &NeverInterrupts); + assert!( + state_machine.run_to_ready(None).is_ok(), + "Should drive state machine to ready" + ); + assert_eq!( + state_machine.sequence, + vec![ + "Initial", + "InitialWithConfig", + "InitialWithInfo", + "InitialWithMetaGlobal", + "Ready", + ], + "Should cycle through all states" + ); + } + + #[test] + fn test_from_previous_state_declined() { + let _ = env_logger::try_init(); + // The state-machine sequence where we didn't use the previous state + // (ie, where the state machine restarted) + let sm_seq_restarted = vec![ + "WithPreviousState", + "InitialWithConfig", + "InitialWithInfo", + "InitialWithMetaGlobal", + "Ready", + ]; + // The state-machine sequence where we used the previous state. + let sm_seq_used_previous = vec!["WithPreviousState", "Ready"]; + + // do the actual test. + fn do_test( + client: &dyn SetupStorageClient, + root_key: &KeyBundle, + mut pgs: &mut PersistedGlobalState, + engine_updates: Option<&HashMap<String, bool>>, + old_state: GlobalState, + expected_states: &[&str], + ) { + let mut state_machine = SetupStateMachine::for_full_sync( + client, + root_key, + &mut pgs, + engine_updates, + &NeverInterrupts, + ); + assert!( + state_machine.run_to_ready(Some(old_state)).is_ok(), + "Should drive state machine to ready" + ); + assert_eq!(state_machine.sequence, expected_states); + } + + // and all the complicated setup... + let ts_metaglobal = 123_456; + let ts_keys = 145_000; + let root_key = KeyBundle::new_random().unwrap(); + let keys = CollectionKeys { + timestamp: ServerTimestamp(ts_keys + 1), + default: KeyBundle::new_random().unwrap(), + collections: HashMap::new(), + }; + let mg = MetaGlobalRecord { + sync_id: "syncIDAAAAAA".into(), + storage_version: 5usize, + engines: vec![( + "bookmarks", + MetaGlobalEngine { + version: 1usize, + sync_id: "syncIDBBBBBB".into(), + }, + )] + .into_iter() + .map(|(key, value)| (key.to_owned(), value)) + .collect(), + // We ensure that the record we upload doesn't have a logins record. + declined: vec!["logins".to_string()], + }; + let collections = InfoCollections::new( + vec![("meta", ts_metaglobal), ("crypto", ts_keys)] + .into_iter() + .map(|(key, value)| (key.to_owned(), ServerTimestamp(value))) + .collect(), + ); + let client = InMemoryClient { + info_configuration: mocked_success(InfoConfiguration::default()), + info_collections: mocked_success(collections.clone()), + meta_global: mocked_success_ts(mg.clone(), ts_metaglobal), + crypto_keys: mocked_success_ts( + keys.to_encrypted_bso_with_timestamp(&root_key, ServerTimestamp(ts_keys)) + .expect("should always work in this test"), + ts_keys, + ), + }; + + // First a test where the "previous" global state is OK to reuse. + { + let mut pgs = PersistedGlobalState::V2 { declined: None }; + // A "previous" global state. + let old_state = GlobalState { + config: InfoConfiguration::default(), + collections: collections.clone(), + global: mg.clone(), + global_timestamp: ServerTimestamp(ts_metaglobal), + keys: keys + .to_encrypted_bso_with_timestamp(&root_key, ServerTimestamp(ts_keys)) + .expect("should always work in this test"), + }; + do_test( + &client, + &root_key, + &mut pgs, + None, + old_state, + &sm_seq_used_previous, + ); + } + + // Now where the meta/global record on the server is later. + { + let mut pgs = PersistedGlobalState::V2 { declined: None }; + // A "previous" global state. + let old_state = GlobalState { + config: InfoConfiguration::default(), + collections: collections.clone(), + global: mg.clone(), + global_timestamp: ServerTimestamp(999_999), + keys: keys + .to_encrypted_bso_with_timestamp(&root_key, ServerTimestamp(ts_keys)) + .expect("should always work in this test"), + }; + do_test( + &client, + &root_key, + &mut pgs, + None, + old_state, + &sm_seq_restarted, + ); + } + + // Where keys on the server is later. + { + let mut pgs = PersistedGlobalState::V2 { declined: None }; + // A "previous" global state. + let old_state = GlobalState { + config: InfoConfiguration::default(), + collections: collections.clone(), + global: mg.clone(), + global_timestamp: ServerTimestamp(ts_metaglobal), + keys: keys + .to_encrypted_bso_with_timestamp(&root_key, ServerTimestamp(999_999)) + .expect("should always work in this test"), + }; + do_test( + &client, + &root_key, + &mut pgs, + None, + old_state, + &sm_seq_restarted, + ); + } + + // Where there are engine-state changes. + { + let mut pgs = PersistedGlobalState::V2 { declined: None }; + // A "previous" global state. + let old_state = GlobalState { + config: InfoConfiguration::default(), + collections, + global: mg, + global_timestamp: ServerTimestamp(ts_metaglobal), + keys: keys + .to_encrypted_bso_with_timestamp(&root_key, ServerTimestamp(ts_keys)) + .expect("should always work in this test"), + }; + let mut engine_updates = HashMap::<String, bool>::new(); + engine_updates.insert("logins".to_string(), false); + do_test( + &client, + &root_key, + &mut pgs, + Some(&engine_updates), + old_state, + &sm_seq_restarted, + ); + let declined = match pgs { + PersistedGlobalState::V2 { declined: d } => d, + }; + // and check we now consider logins as declined. + assert_eq!(declined, Some(vec!["logins".to_string()])); + } + } + + fn string_set(s: &[&str]) -> HashSet<String> { + s.iter().map(ToString::to_string).collect() + } + fn string_map<T: Clone>(s: &[(&str, T)]) -> HashMap<String, T> { + s.iter().map(|v| (v.0.to_string(), v.1.clone())).collect() + } + #[test] + fn test_engine_states() { + assert_eq!( + compute_engine_states(EngineStateInput { + local_declined: string_set(&["foo", "bar"]), + remote: None, + user_changes: Default::default(), + }), + EngineStateOutput { + declined: string_set(&["foo", "bar"]), + // No wipes, no resets + changes_needed: Default::default(), + } + ); + assert_eq!( + compute_engine_states(EngineStateInput { + local_declined: string_set(&["foo", "bar"]), + remote: Some(RemoteEngineState { + declined: string_set(&["foo"]), + info_collections: string_set(&["bar"]) + }), + user_changes: Default::default(), + }), + EngineStateOutput { + // Now we have `foo`. + declined: string_set(&["foo"]), + // No wipes, no resets, should just be a local update. + changes_needed: Default::default(), + } + ); + assert_eq!( + compute_engine_states(EngineStateInput { + local_declined: string_set(&["foo", "bar"]), + remote: Some(RemoteEngineState { + declined: string_set(&["foo", "bar", "quux"]), + info_collections: string_set(&[]) + }), + user_changes: Default::default(), + }), + EngineStateOutput { + // Now we have `foo`. + declined: string_set(&["foo", "bar", "quux"]), + changes_needed: EngineChangesNeeded { + // Should reset `quux`. + local_resets: string_set(&["quux"]), + // No wipes, though. + remote_wipes: string_set(&[]), + } + } + ); + assert_eq!( + compute_engine_states(EngineStateInput { + local_declined: string_set(&["bar", "baz"]), + remote: Some(RemoteEngineState { + declined: string_set(&["bar", "baz",]), + info_collections: string_set(&["quux"]) + }), + // Change a declined engine to undeclined. + user_changes: string_map(&[("bar", true)]), + }), + EngineStateOutput { + declined: string_set(&["baz"]), + // No wipes, just undecline it. + changes_needed: Default::default() + } + ); + assert_eq!( + compute_engine_states(EngineStateInput { + local_declined: string_set(&["bar", "baz"]), + remote: Some(RemoteEngineState { + declined: string_set(&["bar", "baz"]), + info_collections: string_set(&["foo"]) + }), + // Change an engine which exists remotely to declined. + user_changes: string_map(&[("foo", false)]), + }), + EngineStateOutput { + declined: string_set(&["baz", "bar", "foo"]), + // No wipes, just undecline it. + changes_needed: EngineChangesNeeded { + // Should reset our local foo + local_resets: string_set(&["foo"]), + // And wipe the server. + remote_wipes: string_set(&["foo"]), + } + } + ); + } +} diff --git a/third_party/rust/sync15/src/status.rs b/third_party/rust/sync15/src/status.rs new file mode 100644 index 0000000000..b88c710f02 --- /dev/null +++ b/third_party/rust/sync15/src/status.rs @@ -0,0 +1,109 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::{Error, ErrorKind, ErrorResponse}; +use crate::telemetry::SyncTelemetryPing; +use std::collections::HashMap; +use std::time::{Duration, SystemTime}; + +/// The general status of sync - should probably be moved to the "sync manager" +/// once we have one! +#[derive(Debug, Clone, PartialEq)] +pub enum ServiceStatus { + /// Everything is fine. + Ok, + /// Some general network issue. + NetworkError, + /// Some apparent issue with the servers. + ServiceError, + /// Some external FxA action needs to be taken. + AuthenticationError, + /// We declined to do anything for backoff or rate-limiting reasons. + BackedOff, + /// We were interrupted. + Interrupted, + /// Something else - you need to check the logs for more details. May + /// or may not be transient, we really don't know. + OtherError, +} + +impl ServiceStatus { + // This is a bit naive and probably will not survive in this form in the + // SyncManager - eg, we'll want to handle backoff etc. + pub fn from_err(err: &Error) -> ServiceStatus { + match err.kind() { + // HTTP based errors. + ErrorKind::TokenserverHttpError(status) => { + // bit of a shame the tokenserver is different to storage... + if *status == 401 { + ServiceStatus::AuthenticationError + } else { + ServiceStatus::ServiceError + } + } + // BackoffError is also from the tokenserver. + ErrorKind::BackoffError(_) => ServiceStatus::ServiceError, + ErrorKind::StorageHttpError(ref e) => match e { + ErrorResponse::Unauthorized { .. } => ServiceStatus::AuthenticationError, + _ => ServiceStatus::ServiceError, + }, + + // Network errors. + ErrorKind::RequestError(_) + | ErrorKind::UnexpectedStatus(_) + | ErrorKind::HawkError(_) => ServiceStatus::NetworkError, + + ErrorKind::Interrupted(_) => ServiceStatus::Interrupted, + _ => ServiceStatus::OtherError, + } + } +} + +/// The result of a sync request. This too is from the "sync manager", but only +/// has a fraction of the things it will have when we actually build that. +#[derive(Debug)] +pub struct SyncResult { + /// The general health. + pub service_status: ServiceStatus, + + /// The set of declined engines, if we know them. + pub declined: Option<Vec<String>>, + + /// The result of the sync. + pub result: Result<(), Error>, + + /// The result for each engine. + /// Note that we expect the `String` to be replaced with an enum later. + pub engine_results: HashMap<String, Result<(), Error>>, + + pub telemetry: SyncTelemetryPing, + + pub next_sync_after: Option<std::time::SystemTime>, +} + +// If `r` has a BackoffError, then returns the later backoff value. +fn advance_backoff(cur_best: SystemTime, r: &Result<(), Error>) -> SystemTime { + if let Err(e) = r { + if let Some(time) = e.get_backoff() { + return std::cmp::max(time, cur_best); + } + } + cur_best +} + +impl SyncResult { + pub(crate) fn set_sync_after(&mut self, backoff_duration: Duration) { + let now = SystemTime::now(); + let toplevel = advance_backoff(now + backoff_duration, &self.result); + let sync_after = self + .engine_results + .values() + .fold(toplevel, |b, r| advance_backoff(b, r)); + if sync_after <= now { + self.next_sync_after = None; + } else { + self.next_sync_after = Some(sync_after); + } + } +} diff --git a/third_party/rust/sync15/src/sync.rs b/third_party/rust/sync15/src/sync.rs new file mode 100644 index 0000000000..a4030b5440 --- /dev/null +++ b/third_party/rust/sync15/src/sync.rs @@ -0,0 +1,127 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::changeset::CollectionUpdate; +use crate::client::Sync15StorageClient; +use crate::clients; +use crate::coll_state::LocalCollStateMachine; +use crate::error::Error; +use crate::key_bundle::KeyBundle; +use crate::state::GlobalState; +use crate::telemetry; +use interrupt_support::Interruptee; + +pub use sync15_traits::{IncomingChangeset, Store}; + +pub fn synchronize( + client: &Sync15StorageClient, + global_state: &GlobalState, + root_sync_key: &KeyBundle, + store: &dyn Store, + fully_atomic: bool, + telem_engine: &mut telemetry::Engine, + interruptee: &dyn Interruptee, +) -> Result<(), crate::Error> { + synchronize_with_clients_engine( + client, + global_state, + root_sync_key, + None, + store, + fully_atomic, + telem_engine, + interruptee, + ) +} + +#[allow(clippy::too_many_arguments)] +pub fn synchronize_with_clients_engine( + client: &Sync15StorageClient, + global_state: &GlobalState, + root_sync_key: &KeyBundle, + clients: Option<&clients::Engine<'_>>, + store: &dyn Store, + fully_atomic: bool, + telem_engine: &mut telemetry::Engine, + interruptee: &dyn Interruptee, +) -> Result<(), Error> { + let collection = store.collection_name(); + log::info!("Syncing collection {}", collection); + + // our global state machine is ready - get the collection machine going. + let mut coll_state = match LocalCollStateMachine::get_state(store, global_state, root_sync_key)? + { + Some(coll_state) => coll_state, + None => { + // XXX - this is either "error" or "declined". + log::warn!( + "can't setup for the {} collection - hopefully it works later", + collection + ); + return Ok(()); + } + }; + + if let Some(clients) = clients { + store.prepare_for_sync(&|| clients.get_client_data())?; + } + + let collection_requests = store.get_collection_requests(coll_state.last_modified)?; + let incoming = if collection_requests.is_empty() { + log::info!("skipping incoming for {} - not needed.", collection); + vec![IncomingChangeset::new(collection, coll_state.last_modified)] + } else { + assert_eq!(collection_requests.last().unwrap().collection, collection); + + let count = collection_requests.len(); + collection_requests + .into_iter() + .enumerate() + .map(|(idx, collection_request)| { + interruptee.err_if_interrupted()?; + let incoming_changes = + crate::changeset::fetch_incoming(client, &mut coll_state, &collection_request)?; + + log::info!( + "Downloaded {} remote changes (request {} of {})", + incoming_changes.changes.len(), + idx, + count, + ); + Ok(incoming_changes) + }) + .collect::<Result<Vec<_>, Error>>()? + }; + + let new_timestamp = incoming.last().expect("must have >= 1").timestamp; + let mut outgoing = store.apply_incoming(incoming, telem_engine)?; + + interruptee.err_if_interrupted()?; + // Bump the timestamps now just incase the upload fails. + // xxx - duplication below smells wrong + outgoing.timestamp = new_timestamp; + coll_state.last_modified = new_timestamp; + + log::info!("Uploading {} outgoing changes", outgoing.changes.len()); + let upload_info = + CollectionUpdate::new_from_changeset(client, &coll_state, outgoing, fully_atomic)? + .upload()?; + + log::info!( + "Upload success ({} records success, {} records failed)", + upload_info.successful_ids.len(), + upload_info.failed_ids.len() + ); + // ideally we'd report this per-batch, but for now, let's just report it + // as a total. + let mut telem_outgoing = telemetry::EngineOutgoing::new(); + telem_outgoing.sent(upload_info.successful_ids.len() + upload_info.failed_ids.len()); + telem_outgoing.failed(upload_info.failed_ids.len()); + telem_engine.outgoing(telem_outgoing); + + store.sync_finished(upload_info.modified_timestamp, upload_info.successful_ids)?; + + log::info!("Sync finished!"); + Ok(()) +} diff --git a/third_party/rust/sync15/src/sync_multiple.rs b/third_party/rust/sync15/src/sync_multiple.rs new file mode 100644 index 0000000000..84a75a28e1 --- /dev/null +++ b/third_party/rust/sync15/src/sync_multiple.rs @@ -0,0 +1,493 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// This helps you perform a sync of multiple stores and helps you manage +// global and local state between syncs. + +use crate::client::{BackoffListener, Sync15StorageClient, Sync15StorageClientInit}; +use crate::clients::{self, CommandProcessor, CLIENTS_TTL_REFRESH}; +use crate::coll_state::StoreSyncAssociation; +use crate::error::Error; +use crate::key_bundle::KeyBundle; +use crate::state::{EngineChangesNeeded, GlobalState, PersistedGlobalState, SetupStateMachine}; +use crate::status::{ServiceStatus, SyncResult}; +use crate::sync::{self, Store}; +use crate::telemetry; +use interrupt_support::Interruptee; +use std::collections::HashMap; +use std::mem; +use std::result; +use std::time::{Duration, SystemTime}; + +/// Info about the client to use. We reuse the client unless +/// we discover the client_init has changed, in which case we re-create one. +#[derive(Debug)] +struct ClientInfo { + // the client_init used to create `client`. + client_init: Sync15StorageClientInit, + // the client (our tokenserver state machine state, and our http library's state) + client: Sync15StorageClient, +} + +impl ClientInfo { + fn new(ci: &Sync15StorageClientInit) -> Result<Self, Error> { + Ok(Self { + client_init: ci.clone(), + client: Sync15StorageClient::new(ci.clone())?, + }) + } +} + +/// Info we want callers to store *in memory* for us so that subsequent +/// syncs are faster. This should never be persisted to storage as it holds +/// sensitive information, such as the sync decryption keys. +#[derive(Debug, Default)] +pub struct MemoryCachedState { + last_client_info: Option<ClientInfo>, + last_global_state: Option<GlobalState>, + // These are just stored in memory, as persisting an invalid value far in the + // future has the potential to break sync for good. + next_sync_after: Option<SystemTime>, + next_client_refresh_after: Option<SystemTime>, +} + +impl MemoryCachedState { + // Called we notice the cached state is stale. + pub fn clear_sensitive_info(&mut self) { + self.last_client_info = None; + self.last_global_state = None; + // Leave the backoff time, as there's no reason to think it's not still + // true. + } + pub fn get_next_sync_after(&self) -> Option<SystemTime> { + self.next_sync_after + } + pub fn should_refresh_client(&self) -> bool { + match self.next_client_refresh_after { + Some(t) => SystemTime::now() > t, + None => true, + } + } + pub fn note_client_refresh(&mut self) { + self.next_client_refresh_after = + Some(SystemTime::now() + Duration::from_secs(CLIENTS_TTL_REFRESH)); + } +} + +/// Sync multiple stores +/// * `stores` - The stores to sync +/// * `persisted_global_state` - The global state to use, or None if never +/// before provided. At the end of the sync, and even when the sync fails, +/// the value in this cell should be persisted to permanent storage and +/// provided next time the sync is called. +/// * `last_client_info` - The client state to use, or None if never before +/// provided. At the end of the sync, the value should be persisted +/// *in memory only* - it should not be persisted to disk. +/// * `storage_init` - Information about how the sync http client should be +/// configured. +/// * `root_sync_key` - The KeyBundle used for encryption. +/// +/// Returns a map, keyed by name and holding an error value - if any store +/// fails, the sync will continue on to other stores, but the error will be +/// places in this map. The absence of a name in the map implies the store +/// succeeded. +pub fn sync_multiple( + stores: &[&dyn Store], + persisted_global_state: &mut Option<String>, + mem_cached_state: &mut MemoryCachedState, + storage_init: &Sync15StorageClientInit, + root_sync_key: &KeyBundle, + interruptee: &dyn Interruptee, + req_info: Option<SyncRequestInfo<'_>>, +) -> SyncResult { + sync_multiple_with_command_processor( + None, + stores, + persisted_global_state, + mem_cached_state, + storage_init, + root_sync_key, + interruptee, + req_info, + ) +} + +/// Like `sync_multiple`, but specifies an optional command processor to handle +/// commands from the clients collection. This function is called by the sync +/// manager, which provides its own processor. +#[allow(clippy::too_many_arguments)] +pub fn sync_multiple_with_command_processor( + command_processor: Option<&dyn CommandProcessor>, + stores: &[&dyn Store], + persisted_global_state: &mut Option<String>, + mem_cached_state: &mut MemoryCachedState, + storage_init: &Sync15StorageClientInit, + root_sync_key: &KeyBundle, + interruptee: &dyn Interruptee, + req_info: Option<SyncRequestInfo<'_>>, +) -> SyncResult { + log::info!("Syncing {} stores", stores.len()); + let mut sync_result = SyncResult { + service_status: ServiceStatus::OtherError, + result: Ok(()), + declined: None, + next_sync_after: None, + engine_results: HashMap::with_capacity(stores.len()), + telemetry: telemetry::SyncTelemetryPing::new(), + }; + let backoff = crate::client::new_backoff_listener(); + let req_info = req_info.unwrap_or_default(); + let driver = SyncMultipleDriver { + command_processor, + stores, + storage_init, + interruptee, + engines_to_state_change: req_info.engines_to_state_change, + backoff: backoff.clone(), + root_sync_key, + result: &mut sync_result, + persisted_global_state, + mem_cached_state, + saw_auth_error: false, + ignore_soft_backoff: req_info.is_user_action, + }; + match driver.sync() { + Ok(()) => { + log::debug!( + "sync was successful, final status={:?}", + sync_result.service_status + ); + } + Err(e) => { + log::warn!( + "sync failed: {}, final status={:?}\nBacktrace: {:?}", + e, + sync_result.service_status, + e.backtrace() + ); + sync_result.result = Err(e); + } + } + // Respect `backoff` value when computing the next sync time even if we were + // ignoring it during the sync + sync_result.set_sync_after(backoff.get_required_wait(false).unwrap_or_default()); + mem_cached_state.next_sync_after = sync_result.next_sync_after; + log::trace!("Sync result: {:?}", sync_result); + sync_result +} + +/// This is essentially a bag of information that the sync manager knows, but +/// otherwise we won't. It should probably be rethought if it gains many more +/// fields. +#[derive(Debug, Default)] +pub struct SyncRequestInfo<'a> { + pub engines_to_state_change: Option<&'a HashMap<String, bool>>, + pub is_user_action: bool, +} + +// The sync multiple driver +struct SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> { + command_processor: Option<&'info dyn CommandProcessor>, + stores: &'info [&'info dyn Store], + storage_init: &'info Sync15StorageClientInit, + root_sync_key: &'info KeyBundle, + interruptee: &'info dyn Interruptee, + backoff: BackoffListener, + engines_to_state_change: Option<&'info HashMap<String, bool>>, + result: &'res mut SyncResult, + persisted_global_state: &'pgs mut Option<String>, + mem_cached_state: &'mcs mut MemoryCachedState, + ignore_soft_backoff: bool, + saw_auth_error: bool, +} + +impl<'info, 'res, 'pgs, 'mcs> SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> { + /// The actual worker for sync_multiple. + fn sync(mut self) -> result::Result<(), Error> { + log::info!("Loading/initializing persisted state"); + let mut pgs = self.prepare_persisted_state(); + + log::info!("Preparing client info"); + let client_info = self.prepare_client_info()?; + + if self.was_interrupted() { + return Ok(()); + } + + log::info!("Entering sync state machine"); + // Advance the state machine to the point where it can perform a full + // sync. This may involve uploading meta/global, crypto/keys etc. + let mut global_state = self.run_state_machine(&client_info, &mut pgs)?; + + if self.was_interrupted() { + return Ok(()); + } + + // Set the service status to OK here - we may adjust it based on an individual + // store failing. + self.result.service_status = ServiceStatus::Ok; + + let clients_engine = if let Some(command_processor) = self.command_processor { + log::info!("Synchronizing clients engine"); + let should_refresh = self.mem_cached_state.should_refresh_client(); + let mut engine = clients::Engine::new(command_processor, self.interruptee); + if let Err(e) = engine.sync( + &client_info.client, + &global_state, + &self.root_sync_key, + should_refresh, + ) { + // Record telemetry with the error just in case... + let mut telem_sync = telemetry::SyncTelemetry::new(); + let mut telem_engine = telemetry::Engine::new("clients"); + telem_engine.failure(&e); + telem_sync.engine(telem_engine); + self.result.service_status = ServiceStatus::from_err(&e); + + // ...And bail, because a clients engine sync failure is fatal. + return Err(e); + } + // We don't record telemetry for successful clients engine + // syncs, since we only keep client records in memory, we + // expect the counts to be the same most times, and a + // failure aborts the entire sync. + if self.was_interrupted() { + return Ok(()); + } + self.mem_cached_state.note_client_refresh(); + Some(engine) + } else { + None + }; + + log::info!("Synchronizing stores"); + + let telem_sync = self.sync_stores(&client_info, &mut global_state, clients_engine.as_ref()); + self.result.telemetry.sync(telem_sync); + + log::info!("Finished syncing stores."); + + if !self.saw_auth_error { + log::trace!("Updating persisted global state"); + self.mem_cached_state.last_client_info = Some(client_info); + self.mem_cached_state.last_global_state = Some(global_state); + } + + Ok(()) + } + + fn was_interrupted(&mut self) -> bool { + if self.interruptee.was_interrupted() { + log::info!("Interrupted, bailing out"); + self.result.service_status = ServiceStatus::Interrupted; + true + } else { + false + } + } + + fn sync_stores( + &mut self, + client_info: &ClientInfo, + global_state: &mut GlobalState, + clients: Option<&clients::Engine<'_>>, + ) -> telemetry::SyncTelemetry { + let mut telem_sync = telemetry::SyncTelemetry::new(); + for store in self.stores { + let name = store.collection_name(); + if self + .backoff + .get_required_wait(self.ignore_soft_backoff) + .is_some() + { + log::warn!("Got backoff, bailing out of sync early"); + break; + } + if global_state.global.declined.iter().any(|e| e == &*name) { + log::info!("The {} engine is declined. Skipping", name); + continue; + } + log::info!("Syncing {} engine!", name); + + let mut telem_engine = telemetry::Engine::new(&*name); + let result = sync::synchronize_with_clients_engine( + &client_info.client, + &global_state, + self.root_sync_key, + clients, + *store, + true, + &mut telem_engine, + self.interruptee, + ); + + match result { + Ok(()) => log::info!("Sync of {} was successful!", name), + Err(ref e) => { + log::warn!("Sync of {} failed! {:?}", name, e); + let this_status = ServiceStatus::from_err(&e); + // The only error which forces us to discard our state is an + // auth error. + self.saw_auth_error = + self.saw_auth_error || this_status == ServiceStatus::AuthenticationError; + telem_engine.failure(e); + // If the failure from the store looks like anything other than + // a "store error" we don't bother trying the others. + if this_status != ServiceStatus::OtherError { + telem_sync.engine(telem_engine); + self.result.engine_results.insert(name.into(), result); + self.result.service_status = this_status; + break; + } + } + } + telem_sync.engine(telem_engine); + self.result.engine_results.insert(name.into(), result); + if self.was_interrupted() { + break; + } + } + telem_sync + } + + fn run_state_machine( + &mut self, + client_info: &ClientInfo, + pgs: &mut PersistedGlobalState, + ) -> result::Result<GlobalState, Error> { + let last_state = mem::replace(&mut self.mem_cached_state.last_global_state, None); + + let mut state_machine = SetupStateMachine::for_full_sync( + &client_info.client, + &self.root_sync_key, + pgs, + self.engines_to_state_change, + self.interruptee, + ); + + log::info!("Advancing state machine to ready (full)"); + let res = state_machine.run_to_ready(last_state); + // Grab this now even though we don't need it until later to avoid a + // lifetime issue + let changes = state_machine.changes_needed.take(); + // The state machine might have updated our persisted_global_state, so + // update the caller's repr of it. + *self.persisted_global_state = Some(serde_json::to_string(&pgs)?); + + // Now that we've gone through the state machine, store the declined list in + // the sync_result + self.result.declined = Some(pgs.get_declined().to_vec()); + log::debug!( + "Declined engines list after state machine set to: {:?}", + self.result.declined, + ); + + if let Some(c) = changes { + self.wipe_or_reset_engines(c, &client_info.client)?; + } + let state = match res { + Err(e) => { + self.result.service_status = ServiceStatus::from_err(&e); + return Err(e); + } + Ok(state) => state, + }; + self.result.telemetry.uid(client_info.client.hashed_uid()?); + // As for client_info, put None back now so we start from scratch on error. + self.mem_cached_state.last_global_state = None; + Ok(state) + } + + fn wipe_or_reset_engines( + &mut self, + changes: EngineChangesNeeded, + client: &Sync15StorageClient, + ) -> result::Result<(), Error> { + if changes.local_resets.is_empty() && changes.remote_wipes.is_empty() { + return Ok(()); + } + for e in &changes.remote_wipes { + log::info!("Engine {:?} just got disabled locally, wiping server", e); + client.wipe_remote_engine(&e)?; + } + + for s in self.stores { + let name = s.collection_name(); + if changes.local_resets.contains(&*name) { + log::info!("Resetting engine {}, as it was declined remotely", name); + s.reset(&StoreSyncAssociation::Disconnected)?; + } + } + + Ok(()) + } + + fn prepare_client_info(&mut self) -> result::Result<ClientInfo, Error> { + let mut client_info = match mem::replace(&mut self.mem_cached_state.last_client_info, None) + { + Some(client_info) => { + // if our storage_init has changed it probably means the user has + // changed, courtesy of the 'kid' in the structure. Thus, we can't + // reuse the client or the memory cached state. We do keep the disk + // state as currently that's only the declined list. + if client_info.client_init != *self.storage_init { + log::info!("Discarding all state as the account might have changed"); + *self.mem_cached_state = MemoryCachedState::default(); + ClientInfo::new(self.storage_init)? + } else { + log::debug!("Reusing memory-cached client_info"); + // we can reuse it (which should be the common path) + client_info + } + } + None => { + log::debug!("mem_cached_state was stale or missing, need setup"); + // We almost certainly have no other state here, but to be safe, we + // throw away any memory state we do have. + self.mem_cached_state.clear_sensitive_info(); + ClientInfo::new(self.storage_init)? + } + }; + // Ensure we use the correct listener here rather than on all the branches + // above, since it seems less error prone. + client_info.client.backoff = self.backoff.clone(); + Ok(client_info) + } + + fn prepare_persisted_state(&mut self) -> PersistedGlobalState { + // Note that any failure to use a persisted state means we also decline + // to use our memory cached state, so that we fully rebuild that + // persisted state for next time. + match self.persisted_global_state { + Some(persisted_string) if !persisted_string.is_empty() => { + match serde_json::from_str::<PersistedGlobalState>(&persisted_string) { + Ok(state) => { + log::trace!("Read persisted state: {:?}", state); + // Note that we don't set `result.declined` from the + // data in state - it remains None, which explicitly + // indicates "we don't have updated info". + state + } + _ => { + // Don't log the error since it might contain sensitive + // info (although currently it only contains the declined engines list) + log::error!( + "Failed to parse PersistedGlobalState from JSON! Falling back to default" + ); + *self.mem_cached_state = MemoryCachedState::default(); + PersistedGlobalState::default() + } + } + } + _ => { + log::info!( + "The application didn't give us persisted state - \ + this is only expected on the very first run for a given user." + ); + *self.mem_cached_state = MemoryCachedState::default(); + PersistedGlobalState::default() + } + } + } +} diff --git a/third_party/rust/sync15/src/telemetry.rs b/third_party/rust/sync15/src/telemetry.rs new file mode 100644 index 0000000000..eeeb558bca --- /dev/null +++ b/third_party/rust/sync15/src/telemetry.rs @@ -0,0 +1,46 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! Note: this mostly just reexports the things from sync15_traits::telemetry. + +use crate::error::{Error, ErrorKind, ErrorResponse}; + +pub use sync15_traits::telemetry::*; + +impl<'a> From<&'a Error> for SyncFailure { + fn from(e: &Error) -> SyncFailure { + match e.kind() { + ErrorKind::TokenserverHttpError(status) => { + if *status == 401 { + SyncFailure::Auth { + from: "tokenserver", + } + } else { + SyncFailure::Http { code: *status } + } + } + ErrorKind::BackoffError(_) => SyncFailure::Http { code: 503 }, + ErrorKind::StorageHttpError(ref e) => match e { + ErrorResponse::NotFound { .. } => SyncFailure::Http { code: 404 }, + ErrorResponse::Unauthorized { .. } => SyncFailure::Auth { from: "storage" }, + ErrorResponse::PreconditionFailed { .. } => SyncFailure::Http { code: 412 }, + ErrorResponse::ServerError { status, .. } => SyncFailure::Http { code: *status }, + ErrorResponse::RequestFailed { status, .. } => SyncFailure::Http { code: *status }, + }, + ErrorKind::CryptoError(ref e) => SyncFailure::Unexpected { + error: e.to_string(), + }, + ErrorKind::RequestError(ref e) => SyncFailure::Unexpected { + error: e.to_string(), + }, + ErrorKind::UnexpectedStatus(ref e) => SyncFailure::Http { code: e.status }, + ErrorKind::Interrupted(ref e) => SyncFailure::Unexpected { + error: e.to_string(), + }, + e => SyncFailure::Other { + error: e.to_string(), + }, + } + } +} diff --git a/third_party/rust/sync15/src/token.rs b/third_party/rust/sync15/src/token.rs new file mode 100644 index 0000000000..2cceb1e34e --- /dev/null +++ b/third_party/rust/sync15/src/token.rs @@ -0,0 +1,617 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::{self, ErrorKind, Result}; +use crate::util::ServerTimestamp; +use rc_crypto::hawk; +use serde_derive::*; +use std::borrow::{Borrow, Cow}; +use std::cell::RefCell; +use std::fmt; +use std::time::{Duration, SystemTime}; +use url::Url; +use viaduct::{header_names, Request}; + +const RETRY_AFTER_DEFAULT_MS: u64 = 10000; + +// The TokenserverToken is the token as received directly from the token server +// and deserialized from JSON. +#[derive(Deserialize, Clone, PartialEq, Eq)] +struct TokenserverToken { + id: String, + key: String, + api_endpoint: String, + uid: u64, + duration: u64, + hashed_fxa_uid: String, +} + +impl std::fmt::Debug for TokenserverToken { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TokenserverToken") + .field("api_endpoint", &self.api_endpoint) + .field("uid", &self.uid) + .field("duration", &self.duration) + .field("hashed_fxa_uid", &self.hashed_fxa_uid) + .finish() + } +} + +// The struct returned by the TokenFetcher - the token itself and the +// server timestamp. +struct TokenFetchResult { + token: TokenserverToken, + server_timestamp: ServerTimestamp, +} + +// The trait for fetching tokens - we'll provide a "real" implementation but +// tests will re-implement it. +trait TokenFetcher { + fn fetch_token(&self) -> super::Result<TokenFetchResult>; + // We allow the trait to tell us what the time is so tests can get funky. + fn now(&self) -> SystemTime; +} + +// Our "real" token fetcher, implementing the TokenFetcher trait, which hits +// the token server +#[derive(Debug)] +struct TokenServerFetcher { + // The stuff needed to fetch a token. + server_url: Url, + access_token: String, + key_id: String, +} + +fn fixup_server_url(mut url: Url) -> Result<Url> { + // The given `url` is the end-point as returned by .well-known/fxa-client-configuration, + // or as directly specified by self-hosters. As a result, it may or may not have + // the sync 1.5 suffix of "/1.0/sync/1.5", so add it on here if it does not. + if url.as_str().ends_with("1.0/sync/1.5") { + Ok(url) + } else if url.as_str().ends_with("1.0/sync/1.5/") { + // Shouldn't ever be Err() here, but the result is `Result<PathSegmentsMut, ()>` + // and I don't want to unwrap or add a new error type just for PathSegmentsMut failing. + if let Ok(mut path) = url.path_segments_mut() { + path.pop(); + } + Ok(url) + } else { + // We deliberately don't use `.join()` here in order to preserve all path components. + // For example, "http://example.com/token" should produce "http://example.com/token/1.0/sync/1.5" + // but using `.join()` would produce "http://example.com/1.0/sync/1.5". + if let Ok(mut path) = url.path_segments_mut() { + path.pop_if_empty(); + path.extend(&["1.0", "sync", "1.5"]); + } + Ok(url) + } +} + +impl TokenServerFetcher { + fn new(base_url: Url, access_token: String, key_id: String) -> Result<TokenServerFetcher> { + Ok(TokenServerFetcher { + server_url: fixup_server_url(base_url)?, + access_token, + key_id, + }) + } +} + +impl TokenFetcher for TokenServerFetcher { + fn fetch_token(&self) -> Result<TokenFetchResult> { + log::debug!("Fetching token from {}", self.server_url); + let resp = Request::get(self.server_url.clone()) + .header( + header_names::AUTHORIZATION, + format!("Bearer {}", self.access_token), + )? + .header(header_names::X_KEYID, self.key_id.clone())? + .send()?; + + if !resp.is_success() { + log::warn!("Non-success status when fetching token: {}", resp.status); + // TODO: the body should be JSON and contain a status parameter we might need? + log::trace!(" Response body {}", resp.text()); + // XXX - shouldn't we "chain" these errors - ie, a BackoffError could + // have a TokenserverHttpError as its cause? + if let Some(res) = resp.headers.get_as::<f64, _>(header_names::RETRY_AFTER) { + let ms = res + .ok() + .map_or(RETRY_AFTER_DEFAULT_MS, |f| (f * 1000f64) as u64); + let when = self.now() + Duration::from_millis(ms); + return Err(ErrorKind::BackoffError(when).into()); + } + let status = resp.status; + return Err(ErrorKind::TokenserverHttpError(status).into()); + } + + let token: TokenserverToken = resp.json()?; + let server_timestamp = resp + .headers + .try_get::<ServerTimestamp, _>(header_names::X_TIMESTAMP) + .ok_or_else(|| ErrorKind::MissingServerTimestamp)?; + Ok(TokenFetchResult { + token, + server_timestamp, + }) + } + + fn now(&self) -> SystemTime { + SystemTime::now() + } +} + +// The context stored by our TokenProvider when it has a TokenState::Token +// state. +struct TokenContext { + token: TokenserverToken, + credentials: hawk::Credentials, + server_timestamp: ServerTimestamp, + valid_until: SystemTime, +} + +// hawk::Credentials doesn't implement debug -_- +impl fmt::Debug for TokenContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> ::std::result::Result<(), fmt::Error> { + f.debug_struct("TokenContext") + .field("token", &self.token) + .field("credentials", &"(omitted)") + .field("server_timestamp", &self.server_timestamp) + .field("valid_until", &self.valid_until) + .finish() + } +} + +impl TokenContext { + fn new( + token: TokenserverToken, + credentials: hawk::Credentials, + server_timestamp: ServerTimestamp, + valid_until: SystemTime, + ) -> Self { + Self { + token, + credentials, + server_timestamp, + valid_until, + } + } + + fn is_valid(&self, now: SystemTime) -> bool { + // We could consider making the duration a little shorter - if it + // only has 1 second validity there seems a reasonable chance it will + // have expired by the time it gets presented to the remote that wants + // it. + // Either way though, we will eventually need to handle a token being + // rejected as a non-fatal error and recover, so maybe we don't care? + now < self.valid_until + } + + fn authorization(&self, req: &Request) -> Result<String> { + let url = &req.url; + + let path_and_query = match url.query() { + None => Cow::from(url.path()), + Some(qs) => Cow::from(format!("{}?{}", url.path(), qs)), + }; + + let host = url + .host_str() + .ok_or_else(|| ErrorKind::UnacceptableUrl("Storage URL has no host".into()))?; + + // Known defaults exist for https? (among others), so this should be impossible + let port = url.port_or_known_default().ok_or_else(|| { + ErrorKind::UnacceptableUrl( + "Storage URL has no port and no default port is known for the protocol".into(), + ) + })?; + + let header = + hawk::RequestBuilder::new(req.method.as_str(), host, port, path_and_query.borrow()) + .request() + .make_header(&self.credentials)?; + + Ok(format!("Hawk {}", header)) + } +} + +// The state our TokenProvider holds to reflect the state of the token. +#[derive(Debug)] +enum TokenState { + // We've never fetched a token. + NoToken, + // Have a token and last we checked it remained valid. + Token(TokenContext), + // We failed to fetch a token. First elt is the error, second elt is + // the api_endpoint we had before we failed to fetch a new token (or + // None if the very first attempt at fetching a token failed) + Failed(Option<error::Error>, Option<String>), + // Previously failed and told to back-off for SystemTime duration. Second + // elt is the api_endpoint we had before we hit the backoff error. + // XXX - should we roll Backoff and Failed together? + Backoff(SystemTime, Option<String>), + // api_endpoint changed - we are never going to get a token nor move out + // of this state. + NodeReassigned, +} + +/// The generic TokenProvider implementation - long lived and fetches tokens +/// on demand (eg, when first needed, or when an existing one expires.) +#[derive(Debug)] +struct TokenProviderImpl<TF: TokenFetcher> { + fetcher: TF, + // Our token state (ie, whether we have a token, and if not, why not) + current_state: RefCell<TokenState>, +} + +impl<TF: TokenFetcher> TokenProviderImpl<TF> { + fn new(fetcher: TF) -> Self { + // We check this at the real entrypoint of the application, but tests + // can/do bypass that, so check this here too. + rc_crypto::ensure_initialized(); + TokenProviderImpl { + fetcher, + current_state: RefCell::new(TokenState::NoToken), + } + } + + // Uses our fetcher to grab a new token and if successfull, derives other + // info from that token into a usable TokenContext. + fn fetch_context(&self) -> Result<TokenContext> { + let result = self.fetcher.fetch_token()?; + let token = result.token; + let valid_until = SystemTime::now() + Duration::from_secs(token.duration); + + let credentials = hawk::Credentials { + id: token.id.clone(), + key: hawk::Key::new(token.key.as_bytes(), hawk::SHA256)?, + }; + + Ok(TokenContext::new( + token, + credentials, + result.server_timestamp, + valid_until, + )) + } + + // Attempt to fetch a new token and return a new state reflecting that + // operation. If it worked a TokenState will be returned, but errors may + // cause other states. + fn fetch_token(&self, previous_endpoint: Option<&str>) -> TokenState { + match self.fetch_context() { + Ok(tc) => { + // We got a new token - check that the endpoint is the same + // as a previous endpoint we saw (if any) + match previous_endpoint { + Some(prev) => { + if prev == tc.token.api_endpoint { + TokenState::Token(tc) + } else { + log::warn!( + "api_endpoint changed from {} to {}", + prev, + tc.token.api_endpoint + ); + TokenState::NodeReassigned + } + } + None => { + // Never had an api_endpoint in the past, so this is OK. + TokenState::Token(tc) + } + } + } + Err(e) => { + // Early to avoid nll issues... + if let ErrorKind::BackoffError(be) = e.kind() { + return TokenState::Backoff(*be, previous_endpoint.map(ToString::to_string)); + } + TokenState::Failed(Some(e), previous_endpoint.map(ToString::to_string)) + } + } + } + + // Given the state we are currently in, return a new current state. + // Returns None if the current state should be used (eg, if we are + // holding a token that remains valid) or Some() if the state has changed + // (which may have changed to a state with a token or an error state) + fn advance_state(&self, state: &TokenState) -> Option<TokenState> { + match state { + TokenState::NoToken => Some(self.fetch_token(None)), + TokenState::Failed(_, existing_endpoint) => { + Some(self.fetch_token(existing_endpoint.as_ref().map(String::as_str))) + } + TokenState::Token(existing_context) => { + if existing_context.is_valid(self.fetcher.now()) { + None + } else { + Some(self.fetch_token(Some(existing_context.token.api_endpoint.as_str()))) + } + } + TokenState::Backoff(ref until, ref existing_endpoint) => { + if let Ok(remaining) = until.duration_since(self.fetcher.now()) { + log::debug!("enforcing existing backoff - {:?} remains", remaining); + None + } else { + // backoff period is over + Some(self.fetch_token(existing_endpoint.as_ref().map(String::as_str))) + } + } + TokenState::NodeReassigned => { + // We never leave this state. + None + } + } + } + + fn with_token<T, F>(&self, func: F) -> Result<T> + where + F: FnOnce(&TokenContext) -> Result<T>, + { + // first get a mutable ref to our existing state, advance to the + // state we will use, then re-stash that state for next time. + let state: &mut TokenState = &mut self.current_state.borrow_mut(); + if let Some(new_state) = self.advance_state(state) { + *state = new_state; + } + + // Now re-fetch the state we should use for this call - if it's + // anything other than TokenState::Token we will fail. + match state { + TokenState::NoToken => { + // it should be impossible to get here. + panic!("Can't be in NoToken state after advancing"); + } + TokenState::Token(ref token_context) => { + // make the call. + func(token_context) + } + TokenState::Failed(e, _) => { + // We swap the error out of the state enum and return it. + Err(e.take().unwrap()) + } + TokenState::NodeReassigned => { + // this is unrecoverable. + Err(ErrorKind::StorageResetError.into()) + } + TokenState::Backoff(ref remaining, _) => { + Err(ErrorKind::BackoffError(*remaining).into()) + } + } + } + + fn hashed_uid(&self) -> Result<String> { + self.with_token(|ctx| Ok(ctx.token.hashed_fxa_uid.clone())) + } + + fn authorization(&self, req: &Request) -> Result<String> { + self.with_token(|ctx| ctx.authorization(req)) + } + + fn api_endpoint(&self) -> Result<String> { + self.with_token(|ctx| Ok(ctx.token.api_endpoint.clone())) + } + // TODO: we probably want a "drop_token/context" type method so that when + // using a token with some validity fails the caller can force a new one + // (in which case the new token request will probably fail with a 401) +} + +// The public concrete object exposed by this module +#[derive(Debug)] +pub struct TokenProvider { + imp: TokenProviderImpl<TokenServerFetcher>, +} + +impl TokenProvider { + pub fn new(url: Url, access_token: String, key_id: String) -> Result<Self> { + let fetcher = TokenServerFetcher::new(url, access_token, key_id)?; + Ok(Self { + imp: TokenProviderImpl::new(fetcher), + }) + } + + pub fn hashed_uid(&self) -> Result<String> { + self.imp.hashed_uid() + } + + pub fn authorization(&self, req: &Request) -> Result<String> { + self.imp.authorization(req) + } + + pub fn api_endpoint(&self) -> Result<String> { + self.imp.api_endpoint() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::cell::Cell; + + struct TestFetcher<FF, FN> + where + FF: Fn() -> Result<TokenFetchResult>, + FN: Fn() -> SystemTime, + { + fetch: FF, + now: FN, + } + impl<FF, FN> TokenFetcher for TestFetcher<FF, FN> + where + FF: Fn() -> Result<TokenFetchResult>, + FN: Fn() -> SystemTime, + { + fn fetch_token(&self) -> Result<TokenFetchResult> { + (self.fetch)() + } + fn now(&self) -> SystemTime { + (self.now)() + } + } + + fn make_tsc<FF, FN>(fetch: FF, now: FN) -> TokenProviderImpl<TestFetcher<FF, FN>> + where + FF: Fn() -> Result<TokenFetchResult>, + FN: Fn() -> SystemTime, + { + let fetcher: TestFetcher<FF, FN> = TestFetcher { fetch, now }; + TokenProviderImpl::new(fetcher) + } + + #[test] + fn test_endpoint() { + // Use a cell to avoid the closure having a mutable ref to this scope. + let counter: Cell<u32> = Cell::new(0); + let fetch = || { + counter.set(counter.get() + 1); + Ok(TokenFetchResult { + token: TokenserverToken { + id: "id".to_string(), + key: "key".to_string(), + api_endpoint: "api_endpoint".to_string(), + uid: 1, + duration: 1000, + hashed_fxa_uid: "hash".to_string(), + }, + server_timestamp: ServerTimestamp(0i64), + }) + }; + + let tsc = make_tsc(fetch, SystemTime::now); + + let e = tsc.api_endpoint().expect("should work"); + assert_eq!(e, "api_endpoint".to_string()); + assert_eq!(counter.get(), 1); + + let e2 = tsc.api_endpoint().expect("should work"); + assert_eq!(e2, "api_endpoint".to_string()); + // should not have re-fetched. + assert_eq!(counter.get(), 1); + } + + #[test] + fn test_backoff() { + let counter: Cell<u32> = Cell::new(0); + let fetch = || { + counter.set(counter.get() + 1); + let when = SystemTime::now() + Duration::from_millis(10000); + Err(error::Error::from(ErrorKind::BackoffError(when))) + }; + let now: Cell<SystemTime> = Cell::new(SystemTime::now()); + let tsc = make_tsc(fetch, || now.get()); + + tsc.api_endpoint().expect_err("should bail"); + // XXX - check error type. + assert_eq!(counter.get(), 1); + // try and get another token - should not re-fetch as backoff is still + // in progress. + tsc.api_endpoint().expect_err("should bail"); + assert_eq!(counter.get(), 1); + + // Advance the clock. + now.set(now.get() + Duration::new(20, 0)); + + // Our token fetch mock is still returning a backoff error, so we + // still fail, but should have re-hit the fetch function. + tsc.api_endpoint().expect_err("should bail"); + assert_eq!(counter.get(), 2); + } + + #[test] + fn test_validity() { + let counter: Cell<u32> = Cell::new(0); + let fetch = || { + counter.set(counter.get() + 1); + Ok(TokenFetchResult { + token: TokenserverToken { + id: "id".to_string(), + key: "key".to_string(), + api_endpoint: "api_endpoint".to_string(), + uid: 1, + duration: 10, + hashed_fxa_uid: "hash".to_string(), + }, + server_timestamp: ServerTimestamp(0i64), + }) + }; + let now: Cell<SystemTime> = Cell::new(SystemTime::now()); + let tsc = make_tsc(fetch, || now.get()); + + tsc.api_endpoint().expect("should get a valid token"); + assert_eq!(counter.get(), 1); + + // try and get another token - should not re-fetch as the old one + // remains valid. + tsc.api_endpoint().expect("should reuse existing token"); + assert_eq!(counter.get(), 1); + + // Advance the clock. + now.set(now.get() + Duration::new(20, 0)); + + // We should discard our token and fetch a new one. + tsc.api_endpoint().expect("should re-fetch"); + assert_eq!(counter.get(), 2); + } + + #[test] + fn test_server_url() { + assert_eq!( + fixup_server_url( + Url::parse("https://token.services.mozilla.com/1.0/sync/1.5").unwrap() + ) + .unwrap() + .as_str(), + "https://token.services.mozilla.com/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url( + Url::parse("https://token.services.mozilla.com/1.0/sync/1.5/").unwrap() + ) + .unwrap() + .as_str(), + "https://token.services.mozilla.com/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url(Url::parse("https://token.services.mozilla.com").unwrap()) + .unwrap() + .as_str(), + "https://token.services.mozilla.com/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url(Url::parse("https://token.services.mozilla.com/").unwrap()) + .unwrap() + .as_str(), + "https://token.services.mozilla.com/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url( + Url::parse("https://selfhosted.example.com/token/1.0/sync/1.5").unwrap() + ) + .unwrap() + .as_str(), + "https://selfhosted.example.com/token/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url( + Url::parse("https://selfhosted.example.com/token/1.0/sync/1.5/").unwrap() + ) + .unwrap() + .as_str(), + "https://selfhosted.example.com/token/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url(Url::parse("https://selfhosted.example.com/token/").unwrap()) + .unwrap() + .as_str(), + "https://selfhosted.example.com/token/1.0/sync/1.5" + ); + assert_eq!( + fixup_server_url(Url::parse("https://selfhosted.example.com/token").unwrap()) + .unwrap() + .as_str(), + "https://selfhosted.example.com/token/1.0/sync/1.5" + ); + } +} diff --git a/third_party/rust/sync15/src/util.rs b/third_party/rust/sync15/src/util.rs new file mode 100644 index 0000000000..9bd28dc4de --- /dev/null +++ b/third_party/rust/sync15/src/util.rs @@ -0,0 +1,104 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicU32, Ordering}; + +pub use sync15_traits::ServerTimestamp; + +/// Finds the maximum of the current value and the argument `val`, and sets the +/// new value to the result. +/// +/// Note: `AtomicFoo::fetch_max` is unstable, and can't really be implemented as +/// a single atomic operation from outside the stdlib ;-; +pub(crate) fn atomic_update_max(v: &AtomicU32, new: u32) { + // For loads (and the compare_exchange_weak second ordering argument) this + // is too strong, we could probably get away with Acquire (or maybe Relaxed + // because we don't need the result?). In either case, this fn isn't called + // from a hot spot so whatever. + let mut cur = v.load(Ordering::SeqCst); + while cur < new { + // we're already handling the failure case so there's no reason not to + // use _weak here. + match v.compare_exchange_weak(cur, new, Ordering::SeqCst, Ordering::SeqCst) { + Ok(_) => { + // Success. + break; + } + Err(new_cur) => { + // Interrupted, keep trying. + cur = new_cur + } + } + } +} + +// Slight wrappers around the builtin methods for doing this. +pub(crate) fn set_union(a: &HashSet<String>, b: &HashSet<String>) -> HashSet<String> { + a.union(b).cloned().collect() +} + +pub(crate) fn set_difference(a: &HashSet<String>, b: &HashSet<String>) -> HashSet<String> { + a.difference(b).cloned().collect() +} + +pub(crate) fn set_intersection(a: &HashSet<String>, b: &HashSet<String>) -> HashSet<String> { + a.intersection(b).cloned().collect() +} + +pub(crate) fn partition_by_value(v: &HashMap<String, bool>) -> (HashSet<String>, HashSet<String>) { + let mut true_: HashSet<String> = HashSet::new(); + let mut false_: HashSet<String> = HashSet::new(); + for (s, val) in v { + if *val { + true_.insert(s.clone()); + } else { + false_.insert(s.clone()); + } + } + (true_, false_) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_set_ops() { + fn hash_set(s: &[&str]) -> HashSet<String> { + s.iter() + .copied() + .map(ToOwned::to_owned) + .collect::<HashSet<_>>() + } + + assert_eq!( + set_union(&hash_set(&["a", "b", "c"]), &hash_set(&["b", "d"])), + hash_set(&["a", "b", "c", "d"]), + ); + + assert_eq!( + set_difference(&hash_set(&["a", "b", "c"]), &hash_set(&["b", "d"])), + hash_set(&["a", "c"]), + ); + assert_eq!( + set_intersection(&hash_set(&["a", "b", "c"]), &hash_set(&["b", "d"])), + hash_set(&["b"]), + ); + let m: HashMap<String, bool> = [ + ("foo", true), + ("bar", true), + ("baz", false), + ("quux", false), + ] + .iter() + .copied() + .map(|(a, b)| (a.to_owned(), b)) + .collect(); + assert_eq!( + partition_by_value(&m), + (hash_set(&["foo", "bar"]), hash_set(&["baz", "quux"])), + ); + } +} |