summaryrefslogtreecommitdiffstats
path: root/third_party/rust/sync15
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/sync15')
-rw-r--r--third_party/rust/sync15/.cargo-checksum.json1
-rw-r--r--third_party/rust/sync15/Cargo.toml32
-rw-r--r--third_party/rust/sync15/README.md8
-rw-r--r--third_party/rust/sync15/src/bso_record.rs452
-rw-r--r--third_party/rust/sync15/src/changeset.rs145
-rw-r--r--third_party/rust/sync15/src/client.rs452
-rw-r--r--third_party/rust/sync15/src/clients/engine.rs706
-rw-r--r--third_party/rust/sync15/src/clients/mod.rs94
-rw-r--r--third_party/rust/sync15/src/clients/record.rs158
-rw-r--r--third_party/rust/sync15/src/clients/ser.rs125
-rw-r--r--third_party/rust/sync15/src/coll_state.rs350
-rw-r--r--third_party/rust/sync15/src/collection_keys.rs64
-rw-r--r--third_party/rust/sync15/src/error.rs141
-rw-r--r--third_party/rust/sync15/src/key_bundle.rs212
-rw-r--r--third_party/rust/sync15/src/lib.rs45
-rw-r--r--third_party/rust/sync15/src/migrate_state.rs203
-rw-r--r--third_party/rust/sync15/src/record_types.rs51
-rw-r--r--third_party/rust/sync15/src/request.rs1249
-rw-r--r--third_party/rust/sync15/src/state.rs1130
-rw-r--r--third_party/rust/sync15/src/status.rs109
-rw-r--r--third_party/rust/sync15/src/sync.rs127
-rw-r--r--third_party/rust/sync15/src/sync_multiple.rs493
-rw-r--r--third_party/rust/sync15/src/telemetry.rs46
-rw-r--r--third_party/rust/sync15/src/token.rs617
-rw-r--r--third_party/rust/sync15/src/util.rs104
25 files changed, 7114 insertions, 0 deletions
diff --git a/third_party/rust/sync15/.cargo-checksum.json b/third_party/rust/sync15/.cargo-checksum.json
new file mode 100644
index 0000000000..85f142b019
--- /dev/null
+++ b/third_party/rust/sync15/.cargo-checksum.json
@@ -0,0 +1 @@
+{"files":{"Cargo.toml":"354427a8d1be8414740ec8f469b8ac2024c9b1faa825cdd82345c4b03bd1d785","README.md":"8b2d669841fa7618a762b7e2cfaabfcabfb87a74c9725013061aedc6ed9e37de","src/bso_record.rs":"a7d53a6db9b0fddf247aefb1af365c94b4e39427479aa9f80b39bcc3532b3d72","src/changeset.rs":"9f29a1e4f953e0e6525bec6f73b88042afc0208bbfb9dcc56e11fe7f4cd55f8a","src/client.rs":"25d7e357021c3893c13ea28aa5bf616ec58832e88b74e123580e4bfd9339025b","src/clients/engine.rs":"2ae025c005f55634be3bc8ecc77110a69fa8680da26fc0e2d304d93a73b54e54","src/clients/mod.rs":"c7e230d4c4bdfbbe6c390bc970ea35920540af915a0051c02cadcbc97ad6880e","src/clients/record.rs":"f555d2cb2d713553280ca07fa55fc8a46bb6e56deb0a430a688c2f6f6a91493e","src/clients/ser.rs":"10aee1110410e3d8f38cbeaf1ea9b00ce8c4eab4d48feb3de206e5fbe38ca46d","src/coll_state.rs":"ad96ecaeda3522b2209abd4c1a3ac8be91744763cdf872d5d5845cb6213931db","src/collection_keys.rs":"5a167524e1d653c2d2ca794b99b3e1afc236d3036c9445bdaec8f34fe7938ec5","src/error.rs":"2651a4212d87716ef774784e8bd864eeb5a981f2be2b5d92edbe07df87b9a60a","src/key_bundle.rs":"ceddf59ac19a5757c967afe1795a810faf2ae65a762e085685d4be811328b2f8","src/lib.rs":"b735f28c60dd737cd59aea075c0ec563aba3ee42c6d38b27efc9eba040dc4448","src/migrate_state.rs":"214f4fc1d98c8a6f10e1dcb7dcaf9dc9d866b4908c09c5cc16fed6d3218828d4","src/record_types.rs":"02bb3d352fb808131d298f9b90d9c95b7e9e0138b97c5401f3b9fdacc5562f44","src/request.rs":"e0c27705dc5ebfb79c78af54e73d436cbac7a87bf6eeb3a06511a5b66fb8e8e1","src/state.rs":"bae333c014065fb02563ce08add2f109d6d18a26a0f24dfd637c636c8dc6dc7c","src/status.rs":"d1efab1e992d0340e602abafd02e049a9d75a51f5b0efe4ebcb3963e156c92d5","src/sync.rs":"4dcb6e015b9670a9af96647e4f25a0fc723cc6f7eef8ceee4d9d7f9af43cc969","src/sync_multiple.rs":"8153d7f26493c8e3c988aea40e2121d105f2d3483e97c9cf99d11deda435f692","src/telemetry.rs":"742ed86dcd18b9d99ba51b5a54121dc6158d5b7ed567249e9d7f5a65c4c40a66","src/token.rs":"2b9dea900809e855cdd4f676a295b8c2a41583208aeb05596d609b72cbd1f818","src/util.rs":"10bba86298c50ffd4316aee7e4212efa36db76f6d2412b57596b5375ca6713dc"},"package":null} \ No newline at end of file
diff --git a/third_party/rust/sync15/Cargo.toml b/third_party/rust/sync15/Cargo.toml
new file mode 100644
index 0000000000..66967b2101
--- /dev/null
+++ b/third_party/rust/sync15/Cargo.toml
@@ -0,0 +1,32 @@
+[package]
+name = "sync15"
+edition = "2018"
+version = "0.1.0"
+authors = ["Thom Chiovoloni <tchiovoloni@mozilla.com>"]
+license = "MPL-2.0"
+exclude = ["/android", "/ios"]
+
+[features]
+default = []
+
+[dependencies]
+base64 = "0.12"
+ffi-support = "0.4"
+serde = "1"
+serde_derive = "1"
+serde_json = "1"
+url = "2.1"
+log = "0.4"
+lazy_static = "1.4"
+base16 = "0.2"
+rc_crypto = { path = "../support/rc_crypto", features = ["hawk"] }
+viaduct = { path = "../viaduct" }
+interrupt-support = { path = "../support/interrupt" }
+error-support = { path = "../support/error" }
+sync-guid = { path = "../support/guid", features = ["random"] }
+sync15-traits = {path = "../support/sync15-traits"}
+thiserror = "1.0"
+anyhow = "1.0"
+
+[dev-dependencies]
+env_logger = { version = "0.7", default-features = false }
diff --git a/third_party/rust/sync15/README.md b/third_party/rust/sync15/README.md
new file mode 100644
index 0000000000..fadd3fb069
--- /dev/null
+++ b/third_party/rust/sync15/README.md
@@ -0,0 +1,8 @@
+# Low-level sync-1.5 helper component
+
+This component contains utility code to be shared between different
+data stores that want to sync against a Firefox Sync v1.5 sync server.
+It handles things like encrypting/decrypting records, obtaining and
+using storage node auth tokens, and so-on.
+
+It also needs a more complete README...
diff --git a/third_party/rust/sync15/src/bso_record.rs b/third_party/rust/sync15/src/bso_record.rs
new file mode 100644
index 0000000000..2b3f532402
--- /dev/null
+++ b/third_party/rust/sync15/src/bso_record.rs
@@ -0,0 +1,452 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::error;
+use crate::key_bundle::KeyBundle;
+use crate::util::ServerTimestamp;
+use lazy_static::lazy_static;
+use serde::de::{Deserialize, DeserializeOwned};
+use serde::ser::Serialize;
+use serde_derive::*;
+use std::ops::{Deref, DerefMut};
+pub use sync15_traits::Payload;
+use sync_guid::Guid;
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+pub struct BsoRecord<T> {
+ pub id: Guid,
+
+ // It's not clear to me if this actually can be empty in practice.
+ // firefox-ios seems to think it can...
+ #[serde(default = "String::new")]
+ pub collection: String,
+
+ #[serde(skip_serializing)]
+ // If we don't give it a default, we fail to deserialize
+ // items we wrote out during tests and such.
+ #[serde(default = "ServerTimestamp::default")]
+ pub modified: ServerTimestamp,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub sortindex: Option<i32>,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub ttl: Option<u32>,
+
+ // We do some serde magic here with serde to parse the payload from JSON as we deserialize.
+ // This avoids having a separate intermediate type that only exists so that we can deserialize
+ // it's payload field as JSON (Especially since this one is going to exist more-or-less just so
+ // that we can decrypt the data...)
+ #[serde(
+ with = "as_json",
+ bound(serialize = "T: Serialize", deserialize = "T: DeserializeOwned")
+ )]
+ pub payload: T,
+}
+
+impl<T> BsoRecord<T> {
+ #[inline]
+ pub fn map_payload<P, F>(self, mapper: F) -> BsoRecord<P>
+ where
+ F: FnOnce(T) -> P,
+ {
+ BsoRecord {
+ id: self.id,
+ collection: self.collection,
+ modified: self.modified,
+ sortindex: self.sortindex,
+ ttl: self.ttl,
+ payload: mapper(self.payload),
+ }
+ }
+
+ #[inline]
+ pub fn with_payload<P>(self, payload: P) -> BsoRecord<P> {
+ self.map_payload(|_| payload)
+ }
+
+ #[inline]
+ pub fn new_record(id: String, coll: String, payload: T) -> BsoRecord<T> {
+ BsoRecord {
+ id: id.into(),
+ collection: coll,
+ ttl: None,
+ sortindex: None,
+ modified: ServerTimestamp::default(),
+ payload,
+ }
+ }
+
+ pub fn try_map_payload<P, E>(
+ self,
+ mapper: impl FnOnce(T) -> Result<P, E>,
+ ) -> Result<BsoRecord<P>, E> {
+ self.map_payload(mapper).transpose()
+ }
+
+ pub fn map_payload_or<P>(self, mapper: impl FnOnce(T) -> Option<P>) -> Option<BsoRecord<P>> {
+ self.map_payload(mapper).transpose()
+ }
+
+ #[inline]
+ pub fn into_timestamped_payload(self) -> (T, ServerTimestamp) {
+ (self.payload, self.modified)
+ }
+}
+
+impl<T> BsoRecord<Option<T>> {
+ /// Helper to improve ergonomics for handling records that might be tombstones.
+ #[inline]
+ pub fn transpose(self) -> Option<BsoRecord<T>> {
+ let BsoRecord {
+ id,
+ collection,
+ modified,
+ sortindex,
+ ttl,
+ payload,
+ } = self;
+ match payload {
+ Some(p) => Some(BsoRecord {
+ id,
+ collection,
+ modified,
+ sortindex,
+ ttl,
+ payload: p,
+ }),
+ None => None,
+ }
+ }
+}
+
+impl<T, E> BsoRecord<Result<T, E>> {
+ #[inline]
+ pub fn transpose(self) -> Result<BsoRecord<T>, E> {
+ let BsoRecord {
+ id,
+ collection,
+ modified,
+ sortindex,
+ ttl,
+ payload,
+ } = self;
+ match payload {
+ Ok(p) => Ok(BsoRecord {
+ id,
+ collection,
+ modified,
+ sortindex,
+ ttl,
+ payload: p,
+ }),
+ Err(e) => Err(e),
+ }
+ }
+}
+
+impl<T> Deref for BsoRecord<T> {
+ type Target = T;
+ #[inline]
+ fn deref(&self) -> &T {
+ &self.payload
+ }
+}
+
+impl<T> DerefMut for BsoRecord<T> {
+ #[inline]
+ fn deref_mut(&mut self) -> &mut T {
+ &mut self.payload
+ }
+}
+
+impl CleartextBso {
+ pub fn from_payload(mut payload: Payload, collection: impl Into<String>) -> Self {
+ let id = payload.id.clone();
+ let sortindex: Option<i32> = payload.take_auto_field("sortindex");
+ let ttl: Option<u32> = payload.take_auto_field("ttl");
+ BsoRecord {
+ id,
+ collection: collection.into(),
+ modified: ServerTimestamp::default(), // Doesn't matter.
+ sortindex,
+ ttl,
+ payload,
+ }
+ }
+}
+
+pub type EncryptedBso = BsoRecord<EncryptedPayload>;
+pub type CleartextBso = BsoRecord<Payload>;
+
+// Contains the methods to automatically deserialize the payload to/from json.
+mod as_json {
+ use serde::de::{self, Deserialize, DeserializeOwned, Deserializer};
+ use serde::ser::{self, Serialize, Serializer};
+
+ pub fn serialize<T, S>(t: &T, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ T: Serialize,
+ S: Serializer,
+ {
+ let j = serde_json::to_string(t).map_err(ser::Error::custom)?;
+ serializer.serialize_str(&j)
+ }
+
+ pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
+ where
+ T: DeserializeOwned,
+ D: Deserializer<'de>,
+ {
+ let j = String::deserialize(deserializer)?;
+ serde_json::from_str(&j).map_err(de::Error::custom)
+ }
+}
+
+#[derive(Deserialize, Serialize, Clone, Debug)]
+pub struct EncryptedPayload {
+ #[serde(rename = "IV")]
+ pub iv: String,
+ pub hmac: String,
+ pub ciphertext: String,
+}
+
+// This is a little cludgey but I couldn't think of another way to have easy deserialization
+// without a bunch of wrapper types, while still only serializing a single time in the
+// postqueue.
+lazy_static! {
+ // The number of bytes taken up by padding in a EncryptedPayload.
+ static ref EMPTY_ENCRYPTED_PAYLOAD_SIZE: usize = serde_json::to_string(
+ &EncryptedPayload { iv: "".into(), hmac: "".into(), ciphertext: "".into() }
+ ).unwrap().len();
+}
+
+impl EncryptedPayload {
+ #[inline]
+ pub fn serialized_len(&self) -> usize {
+ (*EMPTY_ENCRYPTED_PAYLOAD_SIZE) + self.ciphertext.len() + self.hmac.len() + self.iv.len()
+ }
+
+ pub fn decrypt_and_parse_payload<T>(&self, key: &KeyBundle) -> error::Result<T>
+ where
+ for<'a> T: Deserialize<'a>,
+ {
+ let cleartext = key.decrypt(&self.ciphertext, &self.iv, &self.hmac)?;
+ Ok(serde_json::from_str(&cleartext)?)
+ }
+
+ pub fn from_cleartext_payload<T: Serialize>(
+ key: &KeyBundle,
+ cleartext_payload: &T,
+ ) -> error::Result<Self> {
+ let cleartext = serde_json::to_string(cleartext_payload)?;
+ let (enc_base64, iv_base64, hmac_base16) =
+ key.encrypt_bytes_rand_iv(&cleartext.as_bytes())?;
+ Ok(EncryptedPayload {
+ iv: iv_base64,
+ hmac: hmac_base16,
+ ciphertext: enc_base64,
+ })
+ }
+}
+
+impl EncryptedBso {
+ pub fn decrypt(self, key: &KeyBundle) -> error::Result<CleartextBso> {
+ let new_payload = self
+ .payload
+ .decrypt_and_parse_payload::<Payload>(key)?
+ .with_auto_field("sortindex", self.sortindex)
+ .with_auto_field("ttl", self.ttl);
+
+ let result = self.with_payload(new_payload);
+ Ok(result)
+ }
+
+ pub fn decrypt_as<T>(self, key: &KeyBundle) -> error::Result<BsoRecord<T>>
+ where
+ for<'a> T: Deserialize<'a>,
+ {
+ Ok(self.decrypt(key)?.into_record::<T>()?)
+ }
+}
+
+impl CleartextBso {
+ pub fn encrypt(self, key: &KeyBundle) -> error::Result<EncryptedBso> {
+ let encrypted_payload = EncryptedPayload::from_cleartext_payload(key, &self.payload)?;
+ Ok(self.with_payload(encrypted_payload))
+ }
+
+ pub fn into_record<T>(self) -> error::Result<BsoRecord<T>>
+ where
+ for<'a> T: Deserialize<'a>,
+ {
+ Ok(self.try_map_payload(Payload::into_record)?)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use serde_json::json;
+ use serde_json::Value as JsonValue;
+
+ #[test]
+ fn test_deserialize_enc() {
+ let serialized = r#"{
+ "id": "1234",
+ "collection": "passwords",
+ "modified": 12344321.0,
+ "payload": "{\"IV\": \"aaaaa\", \"hmac\": \"bbbbb\", \"ciphertext\": \"ccccc\"}"
+ }"#;
+ let record: BsoRecord<EncryptedPayload> = serde_json::from_str(serialized).unwrap();
+ assert_eq!(&record.id, "1234");
+ assert_eq!(&record.collection, "passwords");
+ assert_eq!((record.modified.0 - 12_344_321_000).abs(), 0);
+ assert_eq!(record.sortindex, None);
+ assert_eq!(&record.payload.iv, "aaaaa");
+ assert_eq!(&record.payload.hmac, "bbbbb");
+ assert_eq!(&record.payload.ciphertext, "ccccc");
+ }
+
+ #[test]
+ fn test_deserialize_autofields() {
+ let serialized = r#"{
+ "id": "1234",
+ "collection": "passwords",
+ "modified": 12344321.0,
+ "sortindex": 100,
+ "ttl": 99,
+ "payload": "{\"IV\": \"aaaaa\", \"hmac\": \"bbbbb\", \"ciphertext\": \"ccccc\"}"
+ }"#;
+ let record: BsoRecord<EncryptedPayload> = serde_json::from_str(serialized).unwrap();
+ assert_eq!(record.sortindex, Some(100));
+ assert_eq!(record.ttl, Some(99));
+ }
+
+ #[test]
+ fn test_serialize_enc() {
+ let goal = r#"{"id":"1234","collection":"passwords","payload":"{\"IV\":\"aaaaa\",\"hmac\":\"bbbbb\",\"ciphertext\":\"ccccc\"}"}"#;
+ let record = BsoRecord {
+ id: "1234".into(),
+ modified: ServerTimestamp(999), // shouldn't be serialized by client no matter what it's value is
+ collection: "passwords".into(),
+ sortindex: None,
+ ttl: None,
+ payload: EncryptedPayload {
+ iv: "aaaaa".into(),
+ hmac: "bbbbb".into(),
+ ciphertext: "ccccc".into(),
+ },
+ };
+ let actual = serde_json::to_string(&record).unwrap();
+ assert_eq!(actual, goal);
+
+ let val_str_payload: serde_json::Value = serde_json::from_str(goal).unwrap();
+ assert_eq!(
+ val_str_payload["payload"].as_str().unwrap().len(),
+ record.payload.serialized_len()
+ )
+ }
+
+ #[test]
+ fn test_roundtrip_crypt_tombstone() {
+ let orig_record = CleartextBso::from_payload(
+ Payload::from_json(json!({ "id": "aaaaaaaaaaaa", "deleted": true, })).unwrap(),
+ "dummy",
+ );
+
+ assert!(orig_record.is_tombstone());
+
+ let keybundle = KeyBundle::new_random().unwrap();
+
+ let encrypted = orig_record.clone().encrypt(&keybundle).unwrap();
+
+ // While we're here, check on EncryptedPayload::serialized_len
+ let val_rec =
+ serde_json::from_str::<JsonValue>(&serde_json::to_string(&encrypted).unwrap()).unwrap();
+
+ assert_eq!(
+ encrypted.payload.serialized_len(),
+ val_rec["payload"].as_str().unwrap().len()
+ );
+
+ let decrypted: CleartextBso = encrypted.decrypt(&keybundle).unwrap();
+ assert!(decrypted.is_tombstone());
+ assert_eq!(decrypted, orig_record);
+ }
+
+ #[test]
+ fn test_roundtrip_crypt_record() {
+ let payload = json!({ "id": "aaaaaaaaaaaa", "age": 105, "meta": "data" });
+ let orig_record =
+ CleartextBso::from_payload(Payload::from_json(payload.clone()).unwrap(), "dummy");
+
+ assert!(!orig_record.is_tombstone());
+
+ let keybundle = KeyBundle::new_random().unwrap();
+
+ let encrypted = orig_record.clone().encrypt(&keybundle).unwrap();
+
+ // While we're here, check on EncryptedPayload::serialized_len
+ let val_rec =
+ serde_json::from_str::<JsonValue>(&serde_json::to_string(&encrypted).unwrap()).unwrap();
+ assert_eq!(
+ encrypted.payload.serialized_len(),
+ val_rec["payload"].as_str().unwrap().len()
+ );
+
+ let decrypted = encrypted.decrypt(&keybundle).unwrap();
+ assert!(!decrypted.is_tombstone());
+ assert_eq!(decrypted, orig_record);
+ assert_eq!(serde_json::to_value(decrypted.payload).unwrap(), payload);
+ }
+
+ #[test]
+ fn test_record_auto_fields() {
+ let payload = json!({ "id": "aaaaaaaaaaaa", "age": 105, "meta": "data", "sortindex": 100, "ttl": 99 });
+ let bso = CleartextBso::from_payload(Payload::from_json(payload).unwrap(), "dummy");
+
+ // We don't want the keys ending up in the actual record data on the server.
+ assert!(!bso.payload.data.contains_key("sortindex"));
+ assert!(!bso.payload.data.contains_key("ttl"));
+
+ // But we do want them in the BsoRecord.
+ assert_eq!(bso.sortindex, Some(100));
+ assert_eq!(bso.ttl, Some(99));
+
+ let keybundle = KeyBundle::new_random().unwrap();
+ let encrypted = bso.encrypt(&keybundle).unwrap();
+
+ let decrypted = encrypted.decrypt(&keybundle).unwrap();
+ // We add auto fields during decryption.
+ assert_eq!(decrypted.payload.data["sortindex"], 100);
+ assert_eq!(decrypted.payload.data["ttl"], 99);
+
+ assert_eq!(decrypted.sortindex, Some(100));
+ assert_eq!(decrypted.ttl, Some(99));
+ }
+ #[test]
+ fn test_record_bad_hmac() {
+ let payload = json!({ "id": "aaaaaaaaaaaa", "age": 105, "meta": "data", "sortindex": 100, "ttl": 99 });
+ let bso = CleartextBso::from_payload(Payload::from_json(payload).unwrap(), "dummy");
+
+ let keybundle = KeyBundle::new_random().unwrap();
+ let encrypted = bso.encrypt(&keybundle).unwrap();
+ let keybundle2 = KeyBundle::new_random().unwrap();
+
+ let e = encrypted
+ .decrypt(&keybundle2)
+ .expect_err("Should fail because wrong keybundle");
+
+ // Note: ErrorKind isn't PartialEq, so.
+ match e.kind() {
+ error::ErrorKind::CryptoError(_) => {
+ // yay.
+ }
+ other => {
+ panic!("Expected Crypto Error, got {:?}", other);
+ }
+ }
+ }
+}
diff --git a/third_party/rust/sync15/src/changeset.rs b/third_party/rust/sync15/src/changeset.rs
new file mode 100644
index 0000000000..24bd888bf8
--- /dev/null
+++ b/third_party/rust/sync15/src/changeset.rs
@@ -0,0 +1,145 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::bso_record::{CleartextBso, EncryptedBso};
+use crate::client::{Sync15ClientResponse, Sync15StorageClient};
+use crate::error::{self, ErrorKind, ErrorResponse, Result};
+use crate::key_bundle::KeyBundle;
+use crate::request::{CollectionRequest, NormalResponseHandler, UploadInfo};
+use crate::util::ServerTimestamp;
+use crate::CollState;
+use std::borrow::Cow;
+
+pub use sync15_traits::{IncomingChangeset, OutgoingChangeset, RecordChangeset};
+
+pub fn encrypt_outgoing(o: OutgoingChangeset, key: &KeyBundle) -> Result<Vec<EncryptedBso>> {
+ let RecordChangeset {
+ changes,
+ collection,
+ ..
+ } = o;
+ changes
+ .into_iter()
+ .map(|change| CleartextBso::from_payload(change, collection.clone()).encrypt(key))
+ .collect()
+}
+
+pub fn fetch_incoming(
+ client: &Sync15StorageClient,
+ state: &mut CollState,
+ collection_request: &CollectionRequest,
+) -> Result<IncomingChangeset> {
+ let collection = collection_request.collection.clone();
+ let (records, timestamp) = match client.get_encrypted_records(collection_request)? {
+ Sync15ClientResponse::Success {
+ record,
+ last_modified,
+ ..
+ } => (record, last_modified),
+ other => return Err(other.create_storage_error().into()),
+ };
+ // xxx - duplication below of `timestamp` smells wrong
+ state.last_modified = timestamp;
+ let mut result = IncomingChangeset::new(collection, timestamp);
+ result.changes.reserve(records.len());
+ for record in records {
+ // if we see a HMAC error, we've made an explicit decision to
+ // NOT handle it here, but restart the global state machine.
+ // That should cause us to re-read crypto/keys and things should
+ // work (although if for some reason crypto/keys was updated but
+ // not all storage was wiped we are probably screwed.)
+ let decrypted = record.decrypt(&state.key)?;
+ result.changes.push(decrypted.into_timestamped_payload());
+ }
+ Ok(result)
+}
+
+#[derive(Debug, Clone)]
+pub struct CollectionUpdate<'a> {
+ client: &'a Sync15StorageClient,
+ state: &'a CollState,
+ collection: Cow<'static, str>,
+ xius: ServerTimestamp,
+ to_update: Vec<EncryptedBso>,
+ fully_atomic: bool,
+}
+
+impl<'a> CollectionUpdate<'a> {
+ pub fn new(
+ client: &'a Sync15StorageClient,
+ state: &'a CollState,
+ collection: Cow<'static, str>,
+ xius: ServerTimestamp,
+ records: Vec<EncryptedBso>,
+ fully_atomic: bool,
+ ) -> CollectionUpdate<'a> {
+ CollectionUpdate {
+ client,
+ state,
+ collection,
+ xius,
+ to_update: records,
+ fully_atomic,
+ }
+ }
+
+ pub fn new_from_changeset(
+ client: &'a Sync15StorageClient,
+ state: &'a CollState,
+ changeset: OutgoingChangeset,
+ fully_atomic: bool,
+ ) -> Result<CollectionUpdate<'a>> {
+ let collection = changeset.collection.clone();
+ let xius = changeset.timestamp;
+ if xius < state.last_modified {
+ // We know we are going to fail the XIUS check...
+ return Err(
+ ErrorKind::StorageHttpError(ErrorResponse::PreconditionFailed {
+ route: collection.into_owned(),
+ })
+ .into(),
+ );
+ }
+ let to_update = crate::changeset::encrypt_outgoing(changeset, &state.key)?;
+ Ok(CollectionUpdate::new(
+ client,
+ state,
+ collection,
+ xius,
+ to_update,
+ fully_atomic,
+ ))
+ }
+
+ /// Returns a list of the IDs that failed if allowed_dropped_records is true, otherwise
+ /// returns an empty vec.
+ pub fn upload(self) -> error::Result<UploadInfo> {
+ let mut failed = vec![];
+ let mut q = self.client.new_post_queue(
+ &self.collection,
+ &self.state.config,
+ self.xius,
+ NormalResponseHandler::new(!self.fully_atomic),
+ )?;
+
+ for record in self.to_update.into_iter() {
+ let enqueued = q.enqueue(&record)?;
+ if !enqueued && self.fully_atomic {
+ return Err(ErrorKind::RecordTooLargeError.into());
+ }
+ }
+
+ q.flush(true)?;
+ let mut info = q.completed_upload_info();
+ info.failed_ids.append(&mut failed);
+ if self.fully_atomic {
+ assert_eq!(
+ info.failed_ids.len(),
+ 0,
+ "Bug: Should have failed by now if we aren't allowing dropped records"
+ );
+ }
+ Ok(info)
+ }
+}
diff --git a/third_party/rust/sync15/src/client.rs b/third_party/rust/sync15/src/client.rs
new file mode 100644
index 0000000000..98ed0dbd5d
--- /dev/null
+++ b/third_party/rust/sync15/src/client.rs
@@ -0,0 +1,452 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::bso_record::{BsoRecord, EncryptedBso};
+use crate::error::{self, ErrorKind, ErrorResponse};
+use crate::record_types::MetaGlobalRecord;
+use crate::request::{
+ BatchPoster, CollectionRequest, InfoCollections, InfoConfiguration, PostQueue, PostResponse,
+ PostResponseHandler,
+};
+use crate::token;
+use crate::util::ServerTimestamp;
+use serde_json::Value;
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU32, Ordering};
+use url::Url;
+use viaduct::{
+ header_names::{self, AUTHORIZATION},
+ Method, Request, Response,
+};
+
+/// A response from a GET request on a Sync15StorageClient, encapsulating all
+/// the variants users of this client needs to care about.
+#[derive(Debug, Clone)]
+pub enum Sync15ClientResponse<T> {
+ Success {
+ status: u16,
+ record: T,
+ last_modified: ServerTimestamp,
+ route: String,
+ },
+ Error(ErrorResponse),
+}
+
+fn parse_seconds(seconds_str: &str) -> Option<u32> {
+ let secs = seconds_str.parse::<f64>().ok()?.ceil();
+ // Note: u32 doesn't impl TryFrom<f64> :(
+ if !secs.is_finite() || secs < 0.0 || secs >= f64::from(u32::max_value()) {
+ Some(secs as u32)
+ } else {
+ log::warn!("invalid backoff value: {}", secs);
+ None
+ }
+}
+
+impl<T> Sync15ClientResponse<T> {
+ pub fn from_response(resp: Response, backoff_listener: &BackoffListener) -> error::Result<Self>
+ where
+ for<'a> T: serde::de::Deserialize<'a>,
+ {
+ let route: String = resp.url.path().into();
+ // Android seems to respect retry_after even on success requests, so we
+ // will too if it's present. This also lets us handle both backoff-like
+ // properties in the same place.
+ let retry_after = resp
+ .headers
+ .get(header_names::RETRY_AFTER)
+ .and_then(parse_seconds);
+
+ let backoff = resp
+ .headers
+ .get(header_names::X_WEAVE_BACKOFF)
+ .and_then(parse_seconds);
+
+ if let Some(b) = backoff {
+ backoff_listener.note_backoff(b);
+ }
+ if let Some(ra) = retry_after {
+ backoff_listener.note_retry_after(ra);
+ }
+
+ Ok(if resp.is_success() {
+ let record: T = resp.json()?;
+ let last_modified = resp
+ .headers
+ .get(header_names::X_LAST_MODIFIED)
+ .and_then(|s| ServerTimestamp::from_str(s).ok())
+ .ok_or_else(|| ErrorKind::MissingServerTimestamp)?;
+ log::info!(
+ "Successful request to \"{}\", incoming x-last-modified={:?}",
+ route,
+ last_modified
+ );
+
+ Sync15ClientResponse::Success {
+ status: resp.status,
+ record,
+ last_modified,
+ route,
+ }
+ } else {
+ let status = resp.status;
+ log::info!("Request \"{}\" was an error (status={})", route, status);
+ match status {
+ 404 => Sync15ClientResponse::Error(ErrorResponse::NotFound { route }),
+ 401 => Sync15ClientResponse::Error(ErrorResponse::Unauthorized { route }),
+ 412 => Sync15ClientResponse::Error(ErrorResponse::PreconditionFailed { route }),
+ 500..=600 => {
+ Sync15ClientResponse::Error(ErrorResponse::ServerError { route, status })
+ }
+ _ => Sync15ClientResponse::Error(ErrorResponse::RequestFailed { route, status }),
+ }
+ })
+ }
+
+ pub fn create_storage_error(self) -> ErrorKind {
+ let inner = match self {
+ Sync15ClientResponse::Success { status, route, .. } => {
+ // This should never happen as callers are expected to have
+ // already special-cased this response, so warn if it does.
+ // (or maybe we could panic?)
+ log::warn!("Converting success response into an error");
+ ErrorResponse::RequestFailed { status, route }
+ }
+ Sync15ClientResponse::Error(e) => e,
+ };
+ ErrorKind::StorageHttpError(inner)
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct Sync15StorageClientInit {
+ pub key_id: String,
+ pub access_token: String,
+ pub tokenserver_url: Url,
+}
+
+/// A trait containing the methods required to run through the setup state
+/// machine. This is factored out into a separate trait to make mocking
+/// easier.
+pub trait SetupStorageClient {
+ fn fetch_info_configuration(&self) -> error::Result<Sync15ClientResponse<InfoConfiguration>>;
+ fn fetch_info_collections(&self) -> error::Result<Sync15ClientResponse<InfoCollections>>;
+ fn fetch_meta_global(&self) -> error::Result<Sync15ClientResponse<MetaGlobalRecord>>;
+ fn fetch_crypto_keys(&self) -> error::Result<Sync15ClientResponse<EncryptedBso>>;
+
+ fn put_meta_global(
+ &self,
+ xius: ServerTimestamp,
+ global: &MetaGlobalRecord,
+ ) -> error::Result<ServerTimestamp>;
+ fn put_crypto_keys(&self, xius: ServerTimestamp, keys: &EncryptedBso) -> error::Result<()>;
+ fn wipe_all_remote(&self) -> error::Result<()>;
+}
+
+#[derive(Debug, Default)]
+pub struct BackoffState {
+ pub backoff_secs: AtomicU32,
+ pub retry_after_secs: AtomicU32,
+}
+
+pub(crate) type BackoffListener = std::sync::Arc<BackoffState>;
+
+pub(crate) fn new_backoff_listener() -> BackoffListener {
+ std::sync::Arc::new(BackoffState::default())
+}
+
+impl BackoffState {
+ pub fn note_backoff(&self, noted: u32) {
+ crate::util::atomic_update_max(&self.backoff_secs, noted)
+ }
+
+ pub fn note_retry_after(&self, noted: u32) {
+ crate::util::atomic_update_max(&self.retry_after_secs, noted)
+ }
+
+ pub fn get_backoff_secs(&self) -> u32 {
+ self.backoff_secs.load(Ordering::SeqCst)
+ }
+
+ pub fn get_retry_after_secs(&self) -> u32 {
+ self.retry_after_secs.load(Ordering::SeqCst)
+ }
+
+ pub fn get_required_wait(&self, ignore_soft_backoff: bool) -> Option<std::time::Duration> {
+ let bo = self.get_backoff_secs();
+ let ra = self.get_retry_after_secs();
+ let secs = u64::from(if ignore_soft_backoff { ra } else { bo.max(ra) });
+ if secs > 0 {
+ Some(std::time::Duration::from_secs(secs))
+ } else {
+ None
+ }
+ }
+
+ pub fn reset(&self) {
+ self.backoff_secs.store(0, Ordering::SeqCst);
+ self.retry_after_secs.store(0, Ordering::SeqCst);
+ }
+}
+
+#[derive(Debug)]
+pub struct Sync15StorageClient {
+ tsc: token::TokenProvider,
+ pub(crate) backoff: BackoffListener,
+}
+
+impl SetupStorageClient for Sync15StorageClient {
+ fn fetch_info_configuration(&self) -> error::Result<Sync15ClientResponse<InfoConfiguration>> {
+ self.relative_storage_request(Method::Get, "info/configuration")
+ }
+
+ fn fetch_info_collections(&self) -> error::Result<Sync15ClientResponse<InfoCollections>> {
+ self.relative_storage_request(Method::Get, "info/collections")
+ }
+
+ fn fetch_meta_global(&self) -> error::Result<Sync15ClientResponse<MetaGlobalRecord>> {
+ // meta/global is a Bso, so there's an extra dance to do.
+ let got: Sync15ClientResponse<BsoRecord<MetaGlobalRecord>> =
+ self.relative_storage_request(Method::Get, "storage/meta/global")?;
+ Ok(match got {
+ Sync15ClientResponse::Success {
+ record,
+ last_modified,
+ route,
+ status,
+ } => {
+ log::debug!(
+ "Got meta global with modified = {}; last-modified = {}",
+ record.modified,
+ last_modified
+ );
+ Sync15ClientResponse::Success {
+ record: record.payload,
+ last_modified,
+ route,
+ status,
+ }
+ }
+ Sync15ClientResponse::Error(e) => Sync15ClientResponse::Error(e),
+ })
+ }
+
+ fn fetch_crypto_keys(&self) -> error::Result<Sync15ClientResponse<EncryptedBso>> {
+ self.relative_storage_request(Method::Get, "storage/crypto/keys")
+ }
+
+ fn put_meta_global(
+ &self,
+ xius: ServerTimestamp,
+ global: &MetaGlobalRecord,
+ ) -> error::Result<ServerTimestamp> {
+ let bso = BsoRecord::new_record("global".into(), "meta".into(), global);
+ self.put("storage/meta/global", xius, &bso)
+ }
+
+ fn put_crypto_keys(&self, xius: ServerTimestamp, keys: &EncryptedBso) -> error::Result<()> {
+ self.put("storage/crypto/keys", xius, keys)?;
+ Ok(())
+ }
+
+ fn wipe_all_remote(&self) -> error::Result<()> {
+ let s = self.tsc.api_endpoint()?;
+ let url = Url::parse(&s)?;
+
+ let req = self.build_request(Method::Delete, url)?;
+ match self.exec_request::<Value>(req, false) {
+ Ok(Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }))
+ | Ok(Sync15ClientResponse::Success { .. }) => Ok(()),
+ Ok(resp) => Err(resp.create_storage_error().into()),
+ Err(e) => Err(e),
+ }
+ }
+}
+
+impl Sync15StorageClient {
+ pub fn new(init_params: Sync15StorageClientInit) -> error::Result<Sync15StorageClient> {
+ rc_crypto::ensure_initialized();
+ let tsc = token::TokenProvider::new(
+ init_params.tokenserver_url,
+ init_params.access_token,
+ init_params.key_id,
+ )?;
+ Ok(Sync15StorageClient {
+ tsc,
+ backoff: new_backoff_listener(),
+ })
+ }
+
+ pub fn get_encrypted_records(
+ &self,
+ collection_request: &CollectionRequest,
+ ) -> error::Result<Sync15ClientResponse<Vec<EncryptedBso>>> {
+ self.collection_request(Method::Get, collection_request)
+ }
+
+ #[inline]
+ fn authorized(&self, req: Request) -> error::Result<Request> {
+ let hawk_header_value = self.tsc.authorization(&req)?;
+ Ok(req.header(AUTHORIZATION, hawk_header_value)?)
+ }
+
+ // TODO: probably want a builder-like API to do collection requests (e.g. something
+ // that occupies roughly the same conceptual role as the Collection class in desktop)
+ fn build_request(&self, method: Method, url: Url) -> error::Result<Request> {
+ self.authorized(Request::new(method, url).header(header_names::ACCEPT, "application/json")?)
+ }
+
+ fn relative_storage_request<P, T>(
+ &self,
+ method: Method,
+ relative_path: P,
+ ) -> error::Result<Sync15ClientResponse<T>>
+ where
+ P: AsRef<str>,
+ for<'a> T: serde::de::Deserialize<'a>,
+ {
+ let s = self.tsc.api_endpoint()? + "/";
+ let url = Url::parse(&s)?.join(relative_path.as_ref())?;
+ self.exec_request(self.build_request(method, url)?, false)
+ }
+
+ fn exec_request<T>(
+ &self,
+ req: Request,
+ require_success: bool,
+ ) -> error::Result<Sync15ClientResponse<T>>
+ where
+ for<'a> T: serde::de::Deserialize<'a>,
+ {
+ log::trace!(
+ "request: {} {} ({:?})",
+ req.method,
+ req.url.path(),
+ req.url.query()
+ );
+ let resp = req.send()?;
+
+ let result = Sync15ClientResponse::from_response(resp, &self.backoff)?;
+ match result {
+ Sync15ClientResponse::Success { .. } => Ok(result),
+ _ => {
+ if require_success {
+ Err(result.create_storage_error().into())
+ } else {
+ Ok(result)
+ }
+ }
+ }
+ }
+
+ fn collection_request<T>(
+ &self,
+ method: Method,
+ r: &CollectionRequest,
+ ) -> error::Result<Sync15ClientResponse<T>>
+ where
+ for<'a> T: serde::de::Deserialize<'a>,
+ {
+ let url = r.build_url(Url::parse(&self.tsc.api_endpoint()?)?)?;
+ self.exec_request(self.build_request(method, url)?, false)
+ }
+
+ pub fn new_post_queue<'a, F: PostResponseHandler>(
+ &'a self,
+ coll: &str,
+ config: &InfoConfiguration,
+ ts: ServerTimestamp,
+ on_response: F,
+ ) -> error::Result<PostQueue<PostWrapper<'a>, F>> {
+ let pw = PostWrapper {
+ client: self,
+ coll: coll.into(),
+ };
+ Ok(PostQueue::new(config, ts, pw, on_response))
+ }
+
+ fn put<P, B>(
+ &self,
+ relative_path: P,
+ xius: ServerTimestamp,
+ body: &B,
+ ) -> error::Result<ServerTimestamp>
+ where
+ P: AsRef<str>,
+ B: serde::ser::Serialize,
+ {
+ let s = self.tsc.api_endpoint()? + "/";
+ let url = Url::parse(&s)?.join(relative_path.as_ref())?;
+
+ let req = self
+ .build_request(Method::Put, url)?
+ .json(body)
+ .header(header_names::X_IF_UNMODIFIED_SINCE, format!("{}", xius))?;
+
+ let resp = self.exec_request::<Value>(req, true)?;
+ // Note: we pass `true` for require_success, so this panic never happens.
+ if let Sync15ClientResponse::Success { last_modified, .. } = resp {
+ Ok(last_modified)
+ } else {
+ unreachable!("Error returned exec_request when `require_success` was true");
+ }
+ }
+
+ pub fn hashed_uid(&self) -> error::Result<String> {
+ self.tsc.hashed_uid()
+ }
+
+ pub(crate) fn wipe_remote_engine(&self, engine: &str) -> error::Result<()> {
+ let s = self.tsc.api_endpoint()? + "/";
+ let url = Url::parse(&s)?.join(&format!("storage/{}", engine))?;
+ log::debug!("Wiping: {:?}", url);
+ let req = self.build_request(Method::Delete, url)?;
+ match self.exec_request::<Value>(req, false) {
+ Ok(Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }))
+ | Ok(Sync15ClientResponse::Success { .. }) => Ok(()),
+ Ok(resp) => Err(resp.create_storage_error().into()),
+ Err(e) => Err(e),
+ }
+ }
+}
+
+pub struct PostWrapper<'a> {
+ client: &'a Sync15StorageClient,
+ coll: String,
+}
+
+impl<'a> BatchPoster for PostWrapper<'a> {
+ fn post<T, O>(
+ &self,
+ bytes: Vec<u8>,
+ xius: ServerTimestamp,
+ batch: Option<String>,
+ commit: bool,
+ _: &PostQueue<T, O>,
+ ) -> error::Result<PostResponse> {
+ let url = CollectionRequest::new(self.coll.clone())
+ .batch(batch)
+ .commit(commit)
+ .build_url(Url::parse(&self.client.tsc.api_endpoint()?)?)?;
+
+ let req = self
+ .client
+ .build_request(Method::Post, url)?
+ .header(header_names::CONTENT_TYPE, "application/json")?
+ .header(header_names::X_IF_UNMODIFIED_SINCE, format!("{}", xius))?
+ .body(bytes);
+ self.client.exec_request(req, false)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ #[test]
+ fn test_send() {
+ fn ensure_send<T: Send>() {}
+ // Compile will fail if not send.
+ ensure_send::<Sync15StorageClient>();
+ }
+}
diff --git a/third_party/rust/sync15/src/clients/engine.rs b/third_party/rust/sync15/src/clients/engine.rs
new file mode 100644
index 0000000000..a9e77d109d
--- /dev/null
+++ b/third_party/rust/sync15/src/clients/engine.rs
@@ -0,0 +1,706 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::collections::{HashMap, HashSet};
+
+use crate::{
+ bso_record::Payload,
+ changeset::{CollectionUpdate, IncomingChangeset, OutgoingChangeset},
+ client::Sync15StorageClient,
+ coll_state::CollState,
+ collection_keys::CollectionKeys,
+ key_bundle::KeyBundle,
+ request::{CollectionRequest, InfoConfiguration},
+ state::GlobalState,
+};
+use interrupt_support::Interruptee;
+use sync15_traits::client::ClientData;
+
+use super::{
+ record::{ClientRecord, CommandRecord},
+ ser::shrink_to_fit,
+ Command, CommandProcessor, CommandStatus, RemoteClient, CLIENTS_TTL,
+};
+use crate::error::Result;
+
+const COLLECTION_NAME: &str = "clients";
+
+/// The driver for the clients engine. Internal; split out from the `Engine`
+/// struct to make testing easier.
+struct Driver<'a> {
+ command_processor: &'a dyn CommandProcessor,
+ interruptee: &'a dyn Interruptee,
+ config: &'a InfoConfiguration,
+ recent_clients: HashMap<String, RemoteClient>,
+}
+
+impl<'a> Driver<'a> {
+ fn new(
+ command_processor: &'a dyn CommandProcessor,
+ interruptee: &'a dyn Interruptee,
+ config: &'a InfoConfiguration,
+ ) -> Driver<'a> {
+ Driver {
+ command_processor,
+ interruptee,
+ config,
+ recent_clients: HashMap::new(),
+ }
+ }
+
+ fn note_recent_client(&mut self, client: &ClientRecord) {
+ self.recent_clients.insert(client.id.clone(), client.into());
+ }
+
+ fn sync(
+ &mut self,
+ inbound: IncomingChangeset,
+ should_refresh_client: bool,
+ ) -> Result<OutgoingChangeset> {
+ let mut outgoing = OutgoingChangeset::new(COLLECTION_NAME, inbound.timestamp);
+ outgoing.timestamp = inbound.timestamp;
+
+ self.interruptee.err_if_interrupted()?;
+ let outgoing_commands = self.command_processor.fetch_outgoing_commands()?;
+
+ let mut has_own_client_record = false;
+
+ for (payload, _) in inbound.changes {
+ self.interruptee.err_if_interrupted()?;
+
+ // Unpack the client record. We should never have tombstones in the
+ // clients collection, so we don't check for `is_tombstone`.
+ // https://github.com/mozilla/application-services/issues/1801
+ // tracks deleting these from the server.
+ let client: ClientRecord = payload.into_record()?;
+
+ if client.id == self.command_processor.settings().fxa_device_id {
+ log::debug!("Found my record on the server");
+ // If we see our own client record, apply any incoming commands,
+ // remove them from the list, and reupload the record. Any
+ // commands that we don't understand also go back in the list.
+ // https://github.com/mozilla/application-services/issues/1800
+ // tracks if that's the right thing to do.
+ has_own_client_record = true;
+ let mut current_client_record = self.current_client_record();
+ for c in &client.commands {
+ let status = match c.as_command() {
+ Some(command) => self.command_processor.apply_incoming_command(command)?,
+ None => CommandStatus::Unsupported,
+ };
+ match status {
+ CommandStatus::Applied => {}
+ CommandStatus::Ignored => {
+ log::debug!("Ignored command {:?}", c);
+ }
+ CommandStatus::Unsupported => {
+ log::warn!("Don't know how to apply command {:?}", c);
+ current_client_record.commands.push(c.clone());
+ }
+ }
+ }
+
+ // The clients collection has a hard limit on the payload size,
+ // after which the server starts rejecting our records. Large
+ // command lists can cause us to exceed this, so we truncate
+ // the list.
+ shrink_to_fit(
+ &mut current_client_record.commands,
+ self.memcache_max_record_payload_size(),
+ )?;
+
+ // Add the new client record to our map of recently synced
+ // clients, so that downstream consumers like synced tabs can
+ // access them.
+ self.note_recent_client(&current_client_record);
+
+ // We periodically upload our own client record, even if it
+ // doesn't change, to keep it fresh.
+ // (but this part sucks - if the ttl on the server happens to be
+ // different (as some other client did something strange) we
+ // still want the records to compare equal - but the ttl hack
+ // doesn't allow that.)
+ let mut client_compare = client.clone();
+ client_compare.ttl = current_client_record.ttl;
+ if should_refresh_client || client_compare != current_client_record {
+ log::debug!("Will update our client record on the server");
+ outgoing
+ .changes
+ .push(Payload::from_record(current_client_record)?);
+ }
+ } else {
+ // Add the other client to our map of recently synced clients.
+ self.note_recent_client(&client);
+
+ // Bail if we don't have any outgoing commands to write into
+ // the other client's record.
+ if outgoing_commands.is_empty() {
+ continue;
+ }
+
+ // Determine if we have new commands, that aren't already in the
+ // client's command list.
+ let current_commands: HashSet<Command> = client
+ .commands
+ .iter()
+ .filter_map(|c| c.as_command())
+ .collect();
+ let mut new_outgoing_commands = outgoing_commands
+ .difference(&current_commands)
+ .cloned()
+ .collect::<Vec<_>>();
+ // Sort, to ensure deterministic ordering for tests.
+ new_outgoing_commands.sort();
+ let mut new_client = client.clone();
+ new_client
+ .commands
+ .extend(new_outgoing_commands.into_iter().map(CommandRecord::from));
+ if new_client.commands.len() == client.commands.len() {
+ continue;
+ }
+
+ // Hooray, we added new commands! Make sure the record still
+ // fits in the maximum record size, or the server will reject
+ // our upload.
+ shrink_to_fit(
+ &mut new_client.commands,
+ self.memcache_max_record_payload_size(),
+ )?;
+
+ // We want to ensure the TTL for all records we write, which
+ // may not be true for incoming ones - so make sure it is.
+ new_client.ttl = CLIENTS_TTL;
+ outgoing.changes.push(Payload::from_record(new_client)?);
+ }
+ }
+
+ // Upload a record for our own client, if we didn't replace it already.
+ if !has_own_client_record {
+ let current_client_record = self.current_client_record();
+ self.note_recent_client(&current_client_record);
+ outgoing
+ .changes
+ .push(Payload::from_record(current_client_record)?);
+ }
+
+ Ok(outgoing)
+ }
+
+ /// Builds a fresh client record for this device.
+ fn current_client_record(&self) -> ClientRecord {
+ let settings = self.command_processor.settings();
+ ClientRecord {
+ id: settings.fxa_device_id.clone(),
+ name: settings.device_name.clone(),
+ typ: Some(settings.device_type.as_str().into()),
+ commands: Vec::new(),
+ fxa_device_id: Some(settings.fxa_device_id.clone()),
+ version: None,
+ protocols: vec!["1.5".into()],
+ form_factor: None,
+ os: None,
+ app_package: None,
+ application: None,
+ device: None,
+ ttl: CLIENTS_TTL,
+ }
+ }
+
+ fn max_record_payload_size(&self) -> usize {
+ let payload_max = self.config.max_record_payload_bytes;
+ if payload_max <= self.config.max_post_bytes {
+ self.config.max_post_bytes.saturating_sub(4096)
+ } else {
+ payload_max
+ }
+ }
+
+ /// Collections stored in memcached ("tabs", "clients" or "meta") have a
+ /// different max size than ones stored in the normal storage server db.
+ /// In practice, the real limit here is 1M (bug 1300451 comment 40), but
+ /// there's overhead involved that is hard to calculate on the client, so we
+ /// use 512k to be safe (at the recommendation of the server team). Note
+ /// that if the server reports a lower limit (via info/configuration), we
+ /// respect that limit instead. See also bug 1403052.
+ fn memcache_max_record_payload_size(&self) -> usize {
+ self.max_record_payload_size().min(512 * 1024)
+ }
+}
+
+pub struct Engine<'a> {
+ pub command_processor: &'a dyn CommandProcessor,
+ pub interruptee: &'a dyn Interruptee,
+ pub recent_clients: HashMap<String, RemoteClient>,
+}
+
+impl<'a> Engine<'a> {
+ /// Creates a new clients engine that delegates to the given command
+ /// processor to apply incoming commands.
+ pub fn new<'b>(
+ command_processor: &'b dyn CommandProcessor,
+ interruptee: &'b dyn Interruptee,
+ ) -> Engine<'b> {
+ Engine {
+ command_processor,
+ interruptee,
+ recent_clients: HashMap::new(),
+ }
+ }
+
+ /// Syncs the clients collection. This works a little differently than
+ /// other collections:
+ ///
+ /// 1. It can't be disabled or declined.
+ /// 2. The sync ID and last sync time aren't meaningful, since we always
+ /// fetch all client records on every sync. As such, the
+ /// `LocalCollStateMachine` that we use for other engines doesn't
+ /// apply to it.
+ /// 3. It doesn't persist state directly, but relies on the sync manager
+ /// to persist device settings, and process commands.
+ /// 4. Failing to sync the clients collection is fatal, and aborts the
+ /// sync.
+ ///
+ /// For these reasons, we implement this engine directly in the `sync15`
+ /// crate, and provide a specialized `sync` method instead of implementing
+ /// `sync15::Store`.
+ pub fn sync(
+ &mut self,
+ storage_client: &Sync15StorageClient,
+ global_state: &GlobalState,
+ root_sync_key: &KeyBundle,
+ should_refresh_client: bool,
+ ) -> Result<()> {
+ log::info!("Syncing collection clients");
+
+ let coll_keys =
+ CollectionKeys::from_encrypted_bso(global_state.keys.clone(), &root_sync_key)?;
+ let mut coll_state = CollState {
+ config: global_state.config.clone(),
+ last_modified: global_state
+ .collections
+ .get(COLLECTION_NAME)
+ .cloned()
+ .unwrap_or_default(),
+ key: coll_keys.key_for_collection(COLLECTION_NAME).clone(),
+ };
+
+ let inbound = self.fetch_incoming(&storage_client, &mut coll_state)?;
+
+ let mut driver = Driver::new(
+ self.command_processor,
+ self.interruptee,
+ &global_state.config,
+ );
+
+ let outgoing = driver.sync(inbound, should_refresh_client)?;
+ self.recent_clients = driver.recent_clients;
+
+ coll_state.last_modified = outgoing.timestamp;
+
+ self.interruptee.err_if_interrupted()?;
+ let upload_info =
+ CollectionUpdate::new_from_changeset(&storage_client, &coll_state, outgoing, true)?
+ .upload()?;
+
+ log::info!(
+ "Upload success ({} records success, {} records failed)",
+ upload_info.successful_ids.len(),
+ upload_info.failed_ids.len()
+ );
+
+ log::info!("Finished syncing clients");
+ Ok(())
+ }
+
+ fn fetch_incoming(
+ &self,
+ storage_client: &Sync15StorageClient,
+ coll_state: &mut CollState,
+ ) -> Result<IncomingChangeset> {
+ // Note that, unlike other stores, we always fetch the full collection
+ // on every sync, so `inbound` will return all clients, not just the
+ // ones that changed since the last sync.
+ let coll_request = CollectionRequest::new(COLLECTION_NAME).full();
+
+ self.interruptee.err_if_interrupted()?;
+ let inbound = crate::changeset::fetch_incoming(&storage_client, coll_state, &coll_request)?;
+
+ Ok(inbound)
+ }
+
+ pub fn local_client_id(&self) -> String {
+ // Bit dirty but it's the easiest way to reach to our own
+ // device ID without refactoring the whole sync manager crate.
+ self.command_processor.settings().fxa_device_id.clone()
+ }
+
+ pub fn get_client_data(&self) -> ClientData {
+ ClientData {
+ local_client_id: self.local_client_id(),
+ recent_clients: self.recent_clients.clone(),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::clients::{CommandStatus, DeviceType, Settings};
+ use crate::util::ServerTimestamp;
+ use anyhow::Result;
+ use interrupt_support::NeverInterrupts;
+ use serde_json::{json, Value};
+
+ use super::*;
+
+ struct TestProcessor {
+ settings: Settings,
+ outgoing_commands: HashSet<Command>,
+ }
+
+ impl CommandProcessor for TestProcessor {
+ fn settings(&self) -> &Settings {
+ &self.settings
+ }
+
+ fn apply_incoming_command(&self, command: Command) -> Result<CommandStatus> {
+ Ok(if let Command::Reset(name) = command {
+ if name == "forms" {
+ CommandStatus::Unsupported
+ } else {
+ CommandStatus::Applied
+ }
+ } else {
+ CommandStatus::Ignored
+ })
+ }
+
+ fn fetch_outgoing_commands(&self) -> Result<HashSet<Command>> {
+ Ok(self.outgoing_commands.clone())
+ }
+ }
+
+ fn inbound_from_clients(clients: Value) -> IncomingChangeset {
+ if let Value::Array(clients) = clients {
+ let changes = clients
+ .into_iter()
+ .map(|c| (Payload::from_json(c).unwrap(), ServerTimestamp(0)))
+ .collect();
+ IncomingChangeset {
+ changes,
+ timestamp: ServerTimestamp(0),
+ collection: COLLECTION_NAME.into(),
+ }
+ } else {
+ unreachable!("`clients` must be an array of client records")
+ }
+ }
+
+ #[test]
+ fn test_clients_sync() {
+ let processor = TestProcessor {
+ settings: Settings {
+ fxa_device_id: "deviceAAAAAA".into(),
+ device_name: "Laptop".into(),
+ device_type: DeviceType::Desktop,
+ },
+ outgoing_commands: [
+ Command::Wipe("bookmarks".into()),
+ Command::Reset("history".into()),
+ ]
+ .iter()
+ .cloned()
+ .collect(),
+ };
+
+ let config = InfoConfiguration::default();
+
+ let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
+
+ let inbound = inbound_from_clients(json!([{
+ "id": "deviceBBBBBB",
+ "name": "iPhone",
+ "type": "mobile",
+ "commands": [{
+ "command": "resetEngine",
+ "args": ["history"],
+ }],
+ "fxaDeviceId": "iPhooooooone",
+ "protocols": ["1.5"],
+ "device": "iPhone",
+ }, {
+ "id": "deviceCCCCCC",
+ "name": "Fenix",
+ "type": "mobile",
+ "commands": [],
+ "fxaDeviceId": "deviceCCCCCC",
+ }, {
+ "id": "deviceAAAAAA",
+ "name": "Laptop with a different name",
+ "type": "desktop",
+ "commands": [{
+ "command": "wipeEngine",
+ "args": ["logins"]
+ }, {
+ "command": "displayURI",
+ "args": ["http://example.com", "Fennec", "Example page"],
+ "flowID": "flooooooooow",
+ }, {
+ "command": "resetEngine",
+ "args": ["forms"],
+ }, {
+ "command": "logout",
+ "args": [],
+ }],
+ "fxaDeviceId": "deviceAAAAAA",
+ }]));
+
+ // Passing false for `should_refresh_client` - it should be ignored
+ // because we've changed the commands.
+ let mut outgoing = driver.sync(inbound, false).expect("Should sync clients");
+ outgoing.changes.sort_by(|a, b| a.id.cmp(&b.id));
+
+ // Make sure the list of recently synced remote clients is correct.
+ let expected_ids = &["deviceAAAAAA", "deviceBBBBBB", "deviceCCCCCC"];
+ let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
+ actual_ids.sort();
+ assert_eq!(actual_ids, expected_ids);
+
+ let expected_remote_clients = &[
+ RemoteClient {
+ fxa_device_id: Some("deviceAAAAAA".to_string()),
+ device_name: "Laptop".into(),
+ device_type: Some(DeviceType::Desktop),
+ },
+ RemoteClient {
+ fxa_device_id: Some("iPhooooooone".to_string()),
+ device_name: "iPhone".into(),
+ device_type: Some(DeviceType::Mobile),
+ },
+ RemoteClient {
+ fxa_device_id: Some("deviceCCCCCC".to_string()),
+ device_name: "Fenix".into(),
+ device_type: Some(DeviceType::Mobile),
+ },
+ ];
+ let actual_remote_clients = expected_ids
+ .iter()
+ .filter_map(|&id| driver.recent_clients.get(id))
+ .cloned()
+ .collect::<Vec<RemoteClient>>();
+ assert_eq!(actual_remote_clients, expected_remote_clients);
+
+ let expected = json!([{
+ "id": "deviceAAAAAA",
+ "name": "Laptop",
+ "type": "desktop",
+ "commands": [{
+ "command": "displayURI",
+ "args": ["http://example.com", "Fennec", "Example page"],
+ "flowID": "flooooooooow",
+ }, {
+ "command": "resetEngine",
+ "args": ["forms"],
+ }, {
+ "command": "logout",
+ "args": [],
+ }],
+ "fxaDeviceId": "deviceAAAAAA",
+ "protocols": ["1.5"],
+ "ttl": CLIENTS_TTL,
+ }, {
+ "id": "deviceBBBBBB",
+ "name": "iPhone",
+ "type": "mobile",
+ "commands": [{
+ "command": "resetEngine",
+ "args": ["history"],
+ }, {
+ "command": "wipeEngine",
+ "args": ["bookmarks"],
+ }],
+ "fxaDeviceId": "iPhooooooone",
+ "protocols": ["1.5"],
+ "device": "iPhone",
+ "ttl": CLIENTS_TTL,
+ }, {
+ "id": "deviceCCCCCC",
+ "name": "Fenix",
+ "type": "mobile",
+ "commands": [{
+ "command": "wipeEngine",
+ "args": ["bookmarks"],
+ }, {
+ "command": "resetEngine",
+ "args": ["history"],
+ }],
+ "fxaDeviceId": "deviceCCCCCC",
+ "ttl": CLIENTS_TTL,
+ }]);
+ if let Value::Array(expected) = expected {
+ for (i, record) in expected.into_iter().enumerate() {
+ assert_eq!(outgoing.changes[i], Payload::from_json(record).unwrap());
+ }
+ } else {
+ unreachable!("`expected_clients` must be an array of client records")
+ }
+ }
+
+ #[test]
+ fn test_clients_sync_explicit_refresh() {
+ let processor = TestProcessor {
+ settings: Settings {
+ fxa_device_id: "deviceAAAAAA".into(),
+ device_name: "Laptop".into(),
+ device_type: DeviceType::Desktop,
+ },
+ outgoing_commands: [].iter().cloned().collect(),
+ };
+
+ let config = InfoConfiguration::default();
+
+ let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
+
+ let inbound = inbound_from_clients(json!([{
+ "id": "deviceBBBBBB",
+ "name": "iPhone",
+ "type": "mobile",
+ "commands": [{
+ "command": "resetEngine",
+ "args": ["history"],
+ }],
+ "fxaDeviceId": "iPhooooooone",
+ "protocols": ["1.5"],
+ "device": "iPhone",
+ "ttl": CLIENTS_TTL,
+ }, {
+ "id": "deviceAAAAAA",
+ "name": "Laptop",
+ "type": "desktop",
+ "commands": [],
+ "fxaDeviceId": "deviceAAAAAA",
+ "protocols": ["1.5"],
+ "ttl": CLIENTS_TTL,
+ }]));
+
+ let outgoing = driver
+ .sync(inbound.clone(), false)
+ .expect("Should sync clients");
+ // should be no outgoing changes.
+ assert_eq!(outgoing.changes.len(), 0);
+
+ // Make sure the list of recently synced remote clients is correct and
+ // still includes our record we didn't update.
+ let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
+ let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
+ actual_ids.sort();
+ assert_eq!(actual_ids, expected_ids);
+
+ // Do it again - still no changes, but force a refresh.
+ let outgoing = driver.sync(inbound, true).expect("Should sync clients");
+ assert_eq!(outgoing.changes.len(), 1);
+
+ // Do it again - but this time with our own client record needing
+ // some change.
+ let inbound = inbound_from_clients(json!([{
+ "id": "deviceAAAAAA",
+ "name": "Laptop with New Name",
+ "type": "desktop",
+ "commands": [],
+ "fxaDeviceId": "deviceAAAAAA",
+ "protocols": ["1.5"],
+ }]));
+ let outgoing = driver.sync(inbound, false).expect("Should sync clients");
+ // should still be outgoing because the name changed.
+ assert_eq!(outgoing.changes.len(), 1);
+ }
+
+ #[test]
+ fn test_fresh_client_record() {
+ let processor = TestProcessor {
+ settings: Settings {
+ fxa_device_id: "deviceAAAAAA".into(),
+ device_name: "Laptop".into(),
+ device_type: DeviceType::Desktop,
+ },
+ outgoing_commands: HashSet::new(),
+ };
+
+ let config = InfoConfiguration::default();
+
+ let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
+
+ let clients = json!([{
+ "id": "deviceBBBBBB",
+ "name": "iPhone",
+ "type": "mobile",
+ "commands": [{
+ "command": "resetEngine",
+ "args": ["history"],
+ }],
+ "fxaDeviceId": "iPhooooooone",
+ "protocols": ["1.5"],
+ "device": "iPhone",
+ }]);
+
+ let inbound = if let Value::Array(clients) = clients {
+ let changes = clients
+ .into_iter()
+ .map(|c| (Payload::from_json(c).unwrap(), ServerTimestamp(0)))
+ .collect();
+ IncomingChangeset {
+ changes,
+ timestamp: ServerTimestamp(0),
+ collection: COLLECTION_NAME.into(),
+ }
+ } else {
+ unreachable!("`clients` must be an array of client records")
+ };
+
+ // Passing false here for should_refresh_client, but it should be
+ // ignored as we don't have an existing record yet.
+ let mut outgoing = driver.sync(inbound, false).expect("Should sync clients");
+ outgoing.changes.sort_by(|a, b| a.id.cmp(&b.id));
+
+ // Make sure the list of recently synced remote clients is correct.
+ let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
+ let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
+ actual_ids.sort();
+ assert_eq!(actual_ids, expected_ids);
+
+ let expected_remote_clients = &[
+ RemoteClient {
+ fxa_device_id: Some("deviceAAAAAA".to_string()),
+ device_name: "Laptop".into(),
+ device_type: Some(DeviceType::Desktop),
+ },
+ RemoteClient {
+ fxa_device_id: Some("iPhooooooone".to_string()),
+ device_name: "iPhone".into(),
+ device_type: Some(DeviceType::Mobile),
+ },
+ ];
+ let actual_remote_clients = expected_ids
+ .iter()
+ .filter_map(|&id| driver.recent_clients.get(id))
+ .cloned()
+ .collect::<Vec<RemoteClient>>();
+ assert_eq!(actual_remote_clients, expected_remote_clients);
+
+ let expected = json!([{
+ "id": "deviceAAAAAA",
+ "name": "Laptop",
+ "type": "desktop",
+ "fxaDeviceId": "deviceAAAAAA",
+ "protocols": ["1.5"],
+ "ttl": CLIENTS_TTL,
+ }]);
+ if let Value::Array(expected) = expected {
+ for (i, record) in expected.into_iter().enumerate() {
+ assert_eq!(outgoing.changes[i], Payload::from_json(record).unwrap());
+ }
+ } else {
+ unreachable!("`expected_clients` must be an array of client records")
+ }
+ }
+}
diff --git a/third_party/rust/sync15/src/clients/mod.rs b/third_party/rust/sync15/src/clients/mod.rs
new file mode 100644
index 0000000000..c1b7c9f32e
--- /dev/null
+++ b/third_party/rust/sync15/src/clients/mod.rs
@@ -0,0 +1,94 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::collections::HashSet;
+
+mod engine;
+mod record;
+mod ser;
+
+use anyhow::Result;
+pub use engine::Engine;
+pub use sync15_traits::client::{ClientData, DeviceType, RemoteClient};
+
+// These are what desktop uses.
+const CLIENTS_TTL: u32 = 1_814_400; // 21 days
+pub(crate) const CLIENTS_TTL_REFRESH: u64 = 604_800; // 7 days
+
+/// A command processor applies incoming commands like wipes and resets for all
+/// stores, and returns commands to send to other clients. It also manages
+/// settings like the device name and type, which is stored in the special
+/// `clients` collection.
+///
+/// In practice, this trait only has one implementation, in the sync manager.
+/// It's split this way because the clients engine depends on internal `sync15`
+/// structures, and can't be implemented as a syncable store...but `sync15`
+/// doesn't know anything about multiple engines. This lets the sync manager
+/// provide its own implementation for handling wipe and reset commands for all
+/// the engines that it manages.
+pub trait CommandProcessor {
+ fn settings(&self) -> &Settings;
+
+ /// Fetches commands to send to other clients. An error return value means
+ /// commands couldn't be fetched, and halts the sync.
+ fn fetch_outgoing_commands(&self) -> Result<HashSet<Command>>;
+
+ /// Applies a command sent to this client from another client. This method
+ /// should return a `CommandStatus` indicating whether the command was
+ /// processed.
+ ///
+ /// An error return value means the sync manager encountered an error
+ /// applying the command, and halts the sync to prevent unexpected behavior
+ /// (for example, merging local and remote bookmarks, when we were told to
+ /// wipe our local bookmarks).
+ fn apply_incoming_command(&self, command: Command) -> Result<CommandStatus>;
+}
+
+/// Indicates if a command was applied successfully, ignored, or not supported.
+/// Applied and ignored commands are removed from our client record, and never
+/// retried. Unsupported commands are put back into our record, and retried on
+/// subsequent syncs. This is to handle clients adding support for new data
+/// types.
+#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
+pub enum CommandStatus {
+ Applied,
+ Ignored,
+ Unsupported,
+}
+
+impl From<&record::ClientRecord> for RemoteClient {
+ fn from(record: &record::ClientRecord) -> RemoteClient {
+ RemoteClient {
+ fxa_device_id: record.fxa_device_id.clone(),
+ device_name: record.name.clone(),
+ device_type: record.typ.as_ref().and_then(DeviceType::try_from_str),
+ }
+ }
+}
+
+/// Information about this device to include in its client record. This should
+/// be persisted across syncs, as part of the sync manager state.
+#[derive(Clone, Debug, Eq, Hash, PartialEq)]
+pub struct Settings {
+ /// The FxA device ID of this client, also used as this client's record ID
+ /// in the clients collection.
+ pub fxa_device_id: String,
+ /// The name of this client. This should match the client's name in the
+ /// FxA device manager.
+ pub device_name: String,
+ /// The type of this client: mobile, tablet, desktop, or other.
+ pub device_type: DeviceType,
+}
+
+#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
+pub enum Command {
+ /// Erases all local data.
+ WipeAll,
+ /// Erases all local data for a specific engine.
+ Wipe(String),
+ /// Resets local sync state for all engines.
+ ResetAll,
+ /// Resets local sync state for a specific engine.
+ Reset(String),
+}
diff --git a/third_party/rust/sync15/src/clients/record.rs b/third_party/rust/sync15/src/clients/record.rs
new file mode 100644
index 0000000000..f263b5ac88
--- /dev/null
+++ b/third_party/rust/sync15/src/clients/record.rs
@@ -0,0 +1,158 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use serde_derive::*;
+
+use super::Command;
+
+/// The serialized form of a client record.
+#[derive(Clone, Debug, Eq, Deserialize, Hash, PartialEq, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ClientRecord {
+ #[serde(rename = "id")]
+ pub id: String,
+
+ pub name: String,
+
+ #[serde(default, rename = "type")]
+ pub typ: Option<String>,
+
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub commands: Vec<CommandRecord>,
+
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub fxa_device_id: Option<String>,
+
+ /// `version`, `protocols`, `formfactor`, `os`, `appPackage`, `application`,
+ /// and `device` are unused and optional in all implementations (Desktop,
+ /// iOS, and Fennec), but we round-trip them.
+
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub version: Option<String>,
+
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub protocols: Vec<String>,
+
+ #[serde(
+ default,
+ rename = "formfactor",
+ skip_serializing_if = "Option::is_none"
+ )]
+ pub form_factor: Option<String>,
+
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub os: Option<String>,
+
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub app_package: Option<String>,
+
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub application: Option<String>,
+
+ /// The model of the device, like "iPhone" or "iPod touch" on iOS. Note
+ /// that this is _not_ the client ID (`id`) or the FxA device ID
+ /// (`fxa_device_id`).
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub device: Option<String>,
+
+ // This field is somewhat magic - it's moved to and from the
+ // BSO record, so is not expected to be on the unencrypted payload
+ // when incoming and are not put on the unencrypted payload when outgoing.
+ // There are hysterical raisens for this, which we should fix.
+ // https://github.com/mozilla/application-services/issues/2712
+ #[serde(default)]
+ pub ttl: u32,
+}
+
+/// The serialized form of a client command.
+#[derive(Clone, Debug, Eq, Deserialize, Hash, PartialEq, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CommandRecord {
+ /// The command name. This is a string, not an enum, because we want to
+ /// round-trip commands that we don't support yet.
+ #[serde(rename = "command")]
+ pub name: String,
+
+ /// Extra, command-specific arguments. Note that we must send an empty
+ /// array if the command expects no arguments.
+ #[serde(default)]
+ pub args: Vec<String>,
+
+ /// Some commands, like repair, send a "flow ID" that other cliennts can
+ /// record in their telemetry. We don't currently send commands with
+ /// flow IDs, but we round-trip them.
+ #[serde(default, rename = "flowID", skip_serializing_if = "Option::is_none")]
+ pub flow_id: Option<String>,
+}
+
+impl CommandRecord {
+ /// Converts a serialized command into one that we can apply. Returns `None`
+ /// if we don't support the command.
+ pub fn as_command(&self) -> Option<Command> {
+ match self.name.as_str() {
+ "wipeEngine" => self.args.get(0).map(|e| Command::Wipe(e.into())),
+ "wipeAll" => Some(Command::WipeAll),
+ "resetEngine" => self.args.get(0).map(|e| Command::Reset(e.into())),
+ "resetAll" => Some(Command::ResetAll),
+ _ => None,
+ }
+ }
+}
+
+impl From<Command> for CommandRecord {
+ fn from(command: Command) -> CommandRecord {
+ match command {
+ Command::Wipe(engine) => CommandRecord {
+ name: "wipeEngine".into(),
+ args: vec![engine],
+ flow_id: None,
+ },
+ Command::WipeAll => CommandRecord {
+ name: "wipeAll".into(),
+ args: Vec::new(),
+ flow_id: None,
+ },
+ Command::Reset(engine) => CommandRecord {
+ name: "resetEngine".into(),
+ args: vec![engine],
+ flow_id: None,
+ },
+ Command::ResetAll => CommandRecord {
+ name: "resetAll".into(),
+ args: Vec::new(),
+ flow_id: None,
+ },
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use sync15_traits::Payload;
+
+ #[test]
+ fn test_ttl() {
+ // The ttl hacks in place mean that magically the ttl field from the
+ // client record should make it down to a BSO.
+ let record = ClientRecord {
+ id: "id".into(),
+ name: "my device".into(),
+ typ: Some("type".into()),
+ commands: Vec::new(),
+ fxa_device_id: Some("12345".into()),
+ version: None,
+ protocols: vec!["1.5".into()],
+ form_factor: None,
+ os: None,
+ app_package: None,
+ application: None,
+ device: None,
+ ttl: 123,
+ };
+ let p = Payload::from_record(record).unwrap();
+ let bso = crate::CleartextBso::from_payload(p, "clients");
+ assert_eq!(bso.ttl, Some(123));
+ }
+}
diff --git a/third_party/rust/sync15/src/clients/ser.rs b/third_party/rust/sync15/src/clients/ser.rs
new file mode 100644
index 0000000000..0e8b85b0b8
--- /dev/null
+++ b/third_party/rust/sync15/src/clients/ser.rs
@@ -0,0 +1,125 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::error::Result;
+use serde::Serialize;
+use std::io::{self, Write};
+
+/// A writer that counts the number of bytes it's asked to write, and discards
+/// the data. Used to calculate the serialized size of the commands list.
+#[derive(Clone, Copy, Default)]
+pub struct WriteCount(usize);
+
+impl WriteCount {
+ #[inline]
+ pub fn len(self) -> usize {
+ self.0
+ }
+}
+
+impl Write for WriteCount {
+ #[inline]
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.0 += buf.len();
+ Ok(buf.len())
+ }
+
+ #[inline]
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+/// Returns the size of the given value, in bytes, when serialized to JSON.
+fn compute_serialized_size<T: Serialize>(value: &T) -> Result<usize> {
+ let mut w = WriteCount::default();
+ serde_json::to_writer(&mut w, value)?;
+ Ok(w.len())
+}
+
+/// Truncates `list` to fit within `payload_size_max_bytes` when serialized to
+/// JSON.
+pub fn shrink_to_fit<T: Serialize>(list: &mut Vec<T>, payload_size_max_bytes: usize) -> Result<()> {
+ let size = compute_serialized_size(&list)?;
+ // See bug 535326 comment 8 for an explanation of the estimation
+ match ((payload_size_max_bytes / 4) * 3).checked_sub(1500) {
+ Some(max_serialized_size) => {
+ if size > max_serialized_size {
+ // Estimate a little more than the direct fraction to maximize packing
+ let cutoff = (list.len() * max_serialized_size - 1) / size + 1;
+ list.truncate(cutoff + 1);
+ // Keep dropping off the last entry until the data fits.
+ while compute_serialized_size(&list)? > max_serialized_size {
+ if list.pop().is_none() {
+ break;
+ }
+ }
+ }
+ Ok(())
+ }
+ None => {
+ list.clear();
+ Ok(())
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::clients::record::CommandRecord;
+
+ #[test]
+ fn test_compute_serialized_size() {
+ assert_eq!(compute_serialized_size(&1).unwrap(), 1);
+ assert_eq!(compute_serialized_size(&"hi").unwrap(), 4);
+ assert_eq!(
+ compute_serialized_size(&["hi", "hello", "bye"]).unwrap(),
+ 20
+ );
+ }
+
+ #[test]
+ fn test_shrink_to_fit() {
+ let mut commands = vec![
+ CommandRecord {
+ name: "wipeEngine".into(),
+ args: vec!["bookmarks".into()],
+ flow_id: Some("flow".into()),
+ },
+ CommandRecord {
+ name: "resetEngine".into(),
+ args: vec!["history".into()],
+ flow_id: Some("flow".into()),
+ },
+ CommandRecord {
+ name: "logout".into(),
+ args: Vec::new(),
+ flow_id: None,
+ },
+ ];
+
+ // 4096 bytes is enough to fit all three commands.
+ shrink_to_fit(&mut commands, 4096).unwrap();
+ assert_eq!(commands.len(), 3);
+
+ let sizes = commands
+ .iter()
+ .map(|c| compute_serialized_size(c).unwrap())
+ .collect::<Vec<_>>();
+ assert_eq!(sizes, &[61, 60, 30]);
+
+ // `logout` won't fit within 2168 bytes.
+ shrink_to_fit(&mut commands, 2168).unwrap();
+ assert_eq!(commands.len(), 2);
+
+ // `resetEngine` won't fit within 2084 bytes.
+ shrink_to_fit(&mut commands, 2084).unwrap();
+ assert_eq!(commands.len(), 1);
+
+ // `wipeEngine` won't fit at all.
+ shrink_to_fit(&mut commands, 1024).unwrap();
+ assert!(commands.is_empty());
+ }
+}
diff --git a/third_party/rust/sync15/src/coll_state.rs b/third_party/rust/sync15/src/coll_state.rs
new file mode 100644
index 0000000000..7cb5ea874f
--- /dev/null
+++ b/third_party/rust/sync15/src/coll_state.rs
@@ -0,0 +1,350 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::collection_keys::CollectionKeys;
+use crate::error;
+use crate::key_bundle::KeyBundle;
+use crate::request::InfoConfiguration;
+use crate::state::GlobalState;
+use crate::sync::Store;
+use crate::util::ServerTimestamp;
+
+pub use sync15_traits::{CollSyncIds, StoreSyncAssociation};
+
+/// Holds state for a collection. In general, only the CollState is
+/// needed to sync a collection (but a valid GlobalState is needed to obtain
+/// a CollState)
+#[derive(Debug, Clone)]
+pub struct CollState {
+ pub config: InfoConfiguration,
+ // initially from meta/global, updated after an xius POST/PUT.
+ pub last_modified: ServerTimestamp,
+ pub key: KeyBundle,
+}
+
+#[derive(Debug)]
+pub enum LocalCollState {
+ /// The state is unknown, with the StoreSyncAssociation the collection
+ /// reports.
+ Unknown { assoc: StoreSyncAssociation },
+
+ /// The engine has been declined. This is a "terminal" state.
+ Declined,
+
+ /// There's no such collection in meta/global. We could possibly update
+ /// meta/global, but currently all known collections are there by default,
+ /// so this is, basically, an error condition.
+ NoSuchCollection,
+
+ /// Either the global or collection sync ID has changed - we will reset the engine.
+ SyncIdChanged { ids: CollSyncIds },
+
+ /// The collection is ready to sync.
+ Ready { key: KeyBundle },
+}
+
+pub struct LocalCollStateMachine<'state> {
+ global_state: &'state GlobalState,
+ root_key: &'state KeyBundle,
+}
+
+impl<'state> LocalCollStateMachine<'state> {
+ fn advance(&self, from: LocalCollState, store: &dyn Store) -> error::Result<LocalCollState> {
+ let name = &store.collection_name().to_string();
+ let meta_global = &self.global_state.global;
+ match from {
+ LocalCollState::Unknown { assoc } => {
+ if meta_global.declined.contains(name) {
+ return Ok(LocalCollState::Declined);
+ }
+ match meta_global.engines.get(name) {
+ Some(engine_meta) => match assoc {
+ StoreSyncAssociation::Disconnected => Ok(LocalCollState::SyncIdChanged {
+ ids: CollSyncIds {
+ global: meta_global.sync_id.clone(),
+ coll: engine_meta.sync_id.clone(),
+ },
+ }),
+ StoreSyncAssociation::Connected(ref ids)
+ if ids.global == meta_global.sync_id
+ && ids.coll == engine_meta.sync_id =>
+ {
+ let coll_keys = CollectionKeys::from_encrypted_bso(
+ self.global_state.keys.clone(),
+ self.root_key,
+ )?;
+ Ok(LocalCollState::Ready {
+ key: coll_keys.key_for_collection(name).clone(),
+ })
+ }
+ _ => Ok(LocalCollState::SyncIdChanged {
+ ids: CollSyncIds {
+ global: meta_global.sync_id.clone(),
+ coll: engine_meta.sync_id.clone(),
+ },
+ }),
+ },
+ None => Ok(LocalCollState::NoSuchCollection),
+ }
+ }
+
+ LocalCollState::Declined => unreachable!("can't advance from declined"),
+
+ LocalCollState::NoSuchCollection => unreachable!("the collection is unknown"),
+
+ LocalCollState::SyncIdChanged { ids } => {
+ let assoc = StoreSyncAssociation::Connected(ids);
+ log::info!("Resetting {} store", store.collection_name());
+ store.reset(&assoc)?;
+ Ok(LocalCollState::Unknown { assoc })
+ }
+
+ LocalCollState::Ready { .. } => unreachable!("can't advance from ready"),
+ }
+ }
+
+ // A little whimsy - a portmanteau of far and fast
+ fn run_and_run_as_farst_as_you_can(
+ &mut self,
+ store: &dyn Store,
+ ) -> error::Result<Option<CollState>> {
+ let mut s = LocalCollState::Unknown {
+ assoc: store.get_sync_assoc()?,
+ };
+ // This is a simple state machine and should never take more than
+ // 10 goes around.
+ let mut count = 0;
+ loop {
+ log::trace!("LocalCollState in {:?}", s);
+ match s {
+ LocalCollState::Ready { key } => {
+ let name = store.collection_name();
+ let config = self.global_state.config.clone();
+ let last_modified = self
+ .global_state
+ .collections
+ .get(name.as_ref())
+ .cloned()
+ .unwrap_or_default();
+ return Ok(Some(CollState {
+ config,
+ last_modified,
+ key,
+ }));
+ }
+ LocalCollState::Declined | LocalCollState::NoSuchCollection => return Ok(None),
+
+ _ => {
+ count += 1;
+ if count > 10 {
+ log::warn!("LocalCollStateMachine appears to be looping");
+ return Ok(None);
+ }
+ // should we have better loop detection? Our limit of 10
+ // goes is probably OK for now, but not really ideal.
+ s = self.advance(s, store)?;
+ }
+ };
+ }
+ }
+
+ pub fn get_state(
+ store: &dyn Store,
+ global_state: &'state GlobalState,
+ root_key: &'state KeyBundle,
+ ) -> error::Result<Option<CollState>> {
+ let mut gingerbread_man = Self {
+ global_state,
+ root_key,
+ };
+ gingerbread_man.run_and_run_as_farst_as_you_can(store)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::changeset::{IncomingChangeset, OutgoingChangeset};
+ use crate::collection_keys::CollectionKeys;
+ use crate::record_types::{MetaGlobalEngine, MetaGlobalRecord};
+ use crate::request::{CollectionRequest, InfoCollections, InfoConfiguration};
+ use crate::telemetry;
+ use anyhow::Result;
+ use std::cell::{Cell, RefCell};
+ use std::collections::HashMap;
+ use sync_guid::Guid;
+
+ fn get_global_state(root_key: &KeyBundle) -> GlobalState {
+ let keys = CollectionKeys::new_random()
+ .unwrap()
+ .to_encrypted_bso(&root_key)
+ .unwrap();
+ GlobalState {
+ config: InfoConfiguration::default(),
+ collections: InfoCollections::new(HashMap::new()),
+ global: MetaGlobalRecord {
+ sync_id: "syncIDAAAAAA".into(),
+ storage_version: 5usize,
+ engines: vec![(
+ "bookmarks",
+ MetaGlobalEngine {
+ version: 1usize,
+ sync_id: "syncIDBBBBBB".into(),
+ },
+ )]
+ .into_iter()
+ .map(|(key, value)| (key.to_owned(), value))
+ .collect(),
+ declined: vec![],
+ },
+ global_timestamp: ServerTimestamp::default(),
+ keys,
+ }
+ }
+
+ struct TestStore {
+ collection_name: &'static str,
+ assoc: Cell<StoreSyncAssociation>,
+ num_resets: RefCell<usize>,
+ }
+
+ impl TestStore {
+ fn new(collection_name: &'static str, assoc: StoreSyncAssociation) -> Self {
+ Self {
+ collection_name,
+ assoc: Cell::new(assoc),
+ num_resets: RefCell::new(0),
+ }
+ }
+ fn get_num_resets(&self) -> usize {
+ *self.num_resets.borrow()
+ }
+ }
+
+ impl Store for TestStore {
+ fn collection_name(&self) -> std::borrow::Cow<'static, str> {
+ self.collection_name.into()
+ }
+
+ fn apply_incoming(
+ &self,
+ _inbound: Vec<IncomingChangeset>,
+ _telem: &mut telemetry::Engine,
+ ) -> Result<OutgoingChangeset> {
+ unreachable!("these tests shouldn't call these");
+ }
+
+ fn sync_finished(
+ &self,
+ _new_timestamp: ServerTimestamp,
+ _records_synced: Vec<Guid>,
+ ) -> Result<()> {
+ unreachable!("these tests shouldn't call these");
+ }
+
+ fn get_collection_requests(
+ &self,
+ _server_timestamp: ServerTimestamp,
+ ) -> Result<Vec<CollectionRequest>> {
+ unreachable!("these tests shouldn't call these");
+ }
+
+ fn get_sync_assoc(&self) -> Result<StoreSyncAssociation> {
+ Ok(self.assoc.replace(StoreSyncAssociation::Disconnected))
+ }
+
+ fn reset(&self, new_assoc: &StoreSyncAssociation) -> Result<()> {
+ self.assoc.replace(new_assoc.clone());
+ *self.num_resets.borrow_mut() += 1;
+ Ok(())
+ }
+
+ fn wipe(&self) -> Result<()> {
+ unreachable!("these tests shouldn't call these");
+ }
+ }
+
+ #[test]
+ fn test_unknown() {
+ let root_key = KeyBundle::new_random().expect("should work");
+ let gs = get_global_state(&root_key);
+ let store = TestStore::new("unknown", StoreSyncAssociation::Disconnected);
+ let cs = LocalCollStateMachine::get_state(&store, &gs, &root_key).expect("should work");
+ assert!(cs.is_none(), "unknown collection name can't sync");
+ assert_eq!(store.get_num_resets(), 0);
+ }
+
+ #[test]
+ fn test_known_no_state() {
+ let root_key = KeyBundle::new_random().expect("should work");
+ let gs = get_global_state(&root_key);
+ let store = TestStore::new("bookmarks", StoreSyncAssociation::Disconnected);
+ let cs = LocalCollStateMachine::get_state(&store, &gs, &root_key).expect("should work");
+ assert!(cs.is_some(), "collection can sync");
+ assert_eq!(
+ store.assoc.replace(StoreSyncAssociation::Disconnected),
+ StoreSyncAssociation::Connected(CollSyncIds {
+ global: "syncIDAAAAAA".into(),
+ coll: "syncIDBBBBBB".into(),
+ })
+ );
+ assert_eq!(store.get_num_resets(), 1);
+ }
+
+ #[test]
+ fn test_known_wrong_state() {
+ let root_key = KeyBundle::new_random().expect("should work");
+ let gs = get_global_state(&root_key);
+ let store = TestStore::new(
+ "bookmarks",
+ StoreSyncAssociation::Connected(CollSyncIds {
+ global: "syncIDXXXXXX".into(),
+ coll: "syncIDYYYYYY".into(),
+ }),
+ );
+ let cs = LocalCollStateMachine::get_state(&store, &gs, &root_key).expect("should work");
+ assert!(cs.is_some(), "collection can sync");
+ assert_eq!(
+ store.assoc.replace(StoreSyncAssociation::Disconnected),
+ StoreSyncAssociation::Connected(CollSyncIds {
+ global: "syncIDAAAAAA".into(),
+ coll: "syncIDBBBBBB".into(),
+ })
+ );
+ assert_eq!(store.get_num_resets(), 1);
+ }
+
+ #[test]
+ fn test_known_good_state() {
+ let root_key = KeyBundle::new_random().expect("should work");
+ let gs = get_global_state(&root_key);
+ let store = TestStore::new(
+ "bookmarks",
+ StoreSyncAssociation::Connected(CollSyncIds {
+ global: "syncIDAAAAAA".into(),
+ coll: "syncIDBBBBBB".into(),
+ }),
+ );
+ let cs = LocalCollStateMachine::get_state(&store, &gs, &root_key).expect("should work");
+ assert!(cs.is_some(), "collection can sync");
+ assert_eq!(store.get_num_resets(), 0);
+ }
+
+ #[test]
+ fn test_declined() {
+ let root_key = KeyBundle::new_random().expect("should work");
+ let mut gs = get_global_state(&root_key);
+ gs.global.declined.push("bookmarks".to_string());
+ let store = TestStore::new(
+ "bookmarks",
+ StoreSyncAssociation::Connected(CollSyncIds {
+ global: "syncIDAAAAAA".into(),
+ coll: "syncIDBBBBBB".into(),
+ }),
+ );
+ let cs = LocalCollStateMachine::get_state(&store, &gs, &root_key).expect("should work");
+ assert!(cs.is_none(), "declined collection can sync");
+ assert_eq!(store.get_num_resets(), 0);
+ }
+}
diff --git a/third_party/rust/sync15/src/collection_keys.rs b/third_party/rust/sync15/src/collection_keys.rs
new file mode 100644
index 0000000000..d66fc235b4
--- /dev/null
+++ b/third_party/rust/sync15/src/collection_keys.rs
@@ -0,0 +1,64 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::bso_record::{EncryptedBso, Payload};
+use crate::error::Result;
+use crate::key_bundle::KeyBundle;
+use crate::record_types::CryptoKeysRecord;
+use crate::util::ServerTimestamp;
+use std::collections::HashMap;
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct CollectionKeys {
+ pub timestamp: ServerTimestamp,
+ pub default: KeyBundle,
+ pub collections: HashMap<String, KeyBundle>,
+}
+
+impl CollectionKeys {
+ pub fn new_random() -> Result<CollectionKeys> {
+ let default = KeyBundle::new_random()?;
+ Ok(CollectionKeys {
+ timestamp: ServerTimestamp(0),
+ default,
+ collections: HashMap::new(),
+ })
+ }
+
+ pub fn from_encrypted_bso(
+ record: EncryptedBso,
+ root_key: &KeyBundle,
+ ) -> Result<CollectionKeys> {
+ let keys = record.decrypt_as::<CryptoKeysRecord>(root_key)?;
+ Ok(CollectionKeys {
+ timestamp: keys.modified,
+ default: KeyBundle::from_base64(&keys.payload.default[0], &keys.payload.default[1])?,
+ collections: keys
+ .payload
+ .collections
+ .into_iter()
+ .map(|kv| Ok((kv.0, KeyBundle::from_base64(&kv.1[0], &kv.1[1])?)))
+ .collect::<Result<HashMap<String, KeyBundle>>>()?,
+ })
+ }
+
+ pub fn to_encrypted_bso(&self, root_key: &KeyBundle) -> Result<EncryptedBso> {
+ let record = CryptoKeysRecord {
+ id: "keys".into(),
+ collection: "crypto".into(),
+ default: self.default.to_b64_array(),
+ collections: self
+ .collections
+ .iter()
+ .map(|kv| (kv.0.clone(), kv.1.to_b64_array()))
+ .collect(),
+ };
+ let bso = crate::CleartextBso::from_payload(Payload::from_record(record)?, "crypto");
+ Ok(bso.encrypt(root_key)?)
+ }
+
+ pub fn key_for_collection<'a>(&'a self, collection: &str) -> &'a KeyBundle {
+ self.collections.get(collection).unwrap_or(&self.default)
+ }
+}
diff --git a/third_party/rust/sync15/src/error.rs b/third_party/rust/sync15/src/error.rs
new file mode 100644
index 0000000000..7818f7bfc9
--- /dev/null
+++ b/third_party/rust/sync15/src/error.rs
@@ -0,0 +1,141 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use interrupt_support::Interrupted;
+use rc_crypto::hawk;
+use std::string;
+use std::time::SystemTime;
+use sync15_traits::request::UnacceptableBaseUrl;
+/// This enum is to discriminate `StorageHttpError`, and not used as an error.
+#[derive(Debug, Clone)]
+pub enum ErrorResponse {
+ NotFound { route: String },
+ // 401
+ Unauthorized { route: String },
+ // 412
+ PreconditionFailed { route: String },
+ // 5XX
+ ServerError { route: String, status: u16 }, // TODO: info for "retry-after" and backoff handling etc here.
+ // Other HTTP responses.
+ RequestFailed { route: String, status: u16 },
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum ErrorKind {
+ #[error("Key {0} had wrong length, got {1}, expected {2}")]
+ BadKeyLength(&'static str, usize, usize),
+
+ #[error("SHA256 HMAC Mismatch error")]
+ HmacMismatch,
+
+ #[error("HTTP status {0} when requesting a token from the tokenserver")]
+ TokenserverHttpError(u16),
+
+ #[error("HTTP storage error: {0:?}")]
+ StorageHttpError(ErrorResponse),
+
+ #[error("Server requested backoff. Retry after {0:?}")]
+ BackoffError(SystemTime),
+
+ #[error("Outgoing record is too large to upload")]
+ RecordTooLargeError,
+
+ // Do we want to record the concrete problems?
+ #[error("Not all records were successfully uploaded")]
+ RecordUploadFailed,
+
+ /// Used for things like a node reassignment or an unexpected syncId
+ /// implying the app needs to "reset" its understanding of remote storage.
+ #[error("The server has reset the storage for this account")]
+ StorageResetError,
+
+ #[error("Unacceptable URL: {0}")]
+ UnacceptableUrl(String),
+
+ #[error("Missing server timestamp header in request")]
+ MissingServerTimestamp,
+
+ #[error("Unexpected server behavior during batch upload: {0}")]
+ ServerBatchProblem(&'static str),
+
+ #[error("It appears some other client is also trying to setup storage; try again later")]
+ SetupRace,
+
+ #[error("Client upgrade required; server storage version too new")]
+ ClientUpgradeRequired,
+
+ // This means that our global state machine needs to enter a state (such as
+ // "FreshStartNeeded", but the allowed_states don't include that state.)
+ // It typically means we are trying to do a "fast" or "read-only" sync.
+ #[error("Our storage needs setting up and we can't currently do it")]
+ SetupRequired,
+
+ #[error("Store error: {0}")]
+ StoreError(#[from] anyhow::Error),
+
+ #[error("Crypto/NSS error: {0}")]
+ CryptoError(#[from] rc_crypto::Error),
+
+ #[error("Base64 decode error: {0}")]
+ Base64Decode(#[from] base64::DecodeError),
+
+ #[error("JSON error: {0}")]
+ JsonError(#[from] serde_json::Error),
+
+ #[error("Bad cleartext UTF8: {0}")]
+ BadCleartextUtf8(#[from] string::FromUtf8Error),
+
+ #[error("Network error: {0}")]
+ RequestError(#[from] viaduct::Error),
+
+ #[error("Unexpected HTTP status: {0}")]
+ UnexpectedStatus(#[from] viaduct::UnexpectedStatus),
+
+ #[error("HAWK error: {0}")]
+ HawkError(#[from] hawk::Error),
+
+ #[error("URL parse error: {0}")]
+ MalformedUrl(#[from] url::ParseError),
+
+ #[error("The operation was interrupted.")]
+ Interrupted(#[from] Interrupted),
+}
+
+error_support::define_error! {
+ ErrorKind {
+ (CryptoError, rc_crypto::Error),
+ (Base64Decode, base64::DecodeError),
+ (JsonError, serde_json::Error),
+ (BadCleartextUtf8, std::string::FromUtf8Error),
+ (RequestError, viaduct::Error),
+ (UnexpectedStatus, viaduct::UnexpectedStatus),
+ (MalformedUrl, url::ParseError),
+ // A bit dubious, since we only want this to happen inside `synchronize`
+ (StoreError, anyhow::Error),
+ (Interrupted, Interrupted),
+ (HawkError, hawk::Error),
+ }
+}
+
+impl From<UnacceptableBaseUrl> for ErrorKind {
+ fn from(e: UnacceptableBaseUrl) -> ErrorKind {
+ ErrorKind::UnacceptableUrl(e.to_string())
+ }
+}
+
+impl From<UnacceptableBaseUrl> for Error {
+ fn from(e: UnacceptableBaseUrl) -> Self {
+ Error::from(ErrorKind::from(e))
+ }
+}
+
+impl Error {
+ pub(crate) fn get_backoff(&self) -> Option<SystemTime> {
+ if let ErrorKind::BackoffError(time) = self.kind() {
+ Some(*time)
+ } else {
+ None
+ }
+ }
+}
diff --git a/third_party/rust/sync15/src/key_bundle.rs b/third_party/rust/sync15/src/key_bundle.rs
new file mode 100644
index 0000000000..c3eeb716ae
--- /dev/null
+++ b/third_party/rust/sync15/src/key_bundle.rs
@@ -0,0 +1,212 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::error::{ErrorKind, Result};
+use rc_crypto::{
+ aead::{self, OpeningKey, SealingKey},
+ rand,
+};
+
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct KeyBundle {
+ enc_key: Vec<u8>,
+ mac_key: Vec<u8>,
+}
+
+impl std::fmt::Debug for KeyBundle {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("KeyBundle").finish()
+ }
+}
+
+impl KeyBundle {
+ /// Construct a key bundle from the already-decoded encrypt and hmac keys.
+ /// Panics (asserts) if they aren't both 32 bytes.
+ pub fn new(enc: Vec<u8>, mac: Vec<u8>) -> Result<KeyBundle> {
+ if enc.len() != 32 {
+ log::error!("Bad key length (enc_key): {} != 32", enc.len());
+ return Err(ErrorKind::BadKeyLength("enc_key", enc.len(), 32).into());
+ }
+ if mac.len() != 32 {
+ log::error!("Bad key length (mac_key): {} != 32", mac.len());
+ return Err(ErrorKind::BadKeyLength("mac_key", mac.len(), 32).into());
+ }
+ Ok(KeyBundle {
+ enc_key: enc,
+ mac_key: mac,
+ })
+ }
+
+ pub fn new_random() -> Result<KeyBundle> {
+ let mut buffer = [0u8; 64];
+ rand::fill(&mut buffer)?;
+ KeyBundle::from_ksync_bytes(&buffer)
+ }
+
+ pub fn from_ksync_bytes(ksync: &[u8]) -> Result<KeyBundle> {
+ if ksync.len() != 64 {
+ log::error!("Bad key length (kSync): {} != 64", ksync.len());
+ return Err(ErrorKind::BadKeyLength("kSync", ksync.len(), 64).into());
+ }
+ Ok(KeyBundle {
+ enc_key: ksync[0..32].into(),
+ mac_key: ksync[32..64].into(),
+ })
+ }
+
+ pub fn from_ksync_base64(ksync: &str) -> Result<KeyBundle> {
+ let bytes = base64::decode_config(&ksync, base64::URL_SAFE_NO_PAD)?;
+ KeyBundle::from_ksync_bytes(&bytes)
+ }
+
+ pub fn from_base64(enc: &str, mac: &str) -> Result<KeyBundle> {
+ let enc_bytes = base64::decode(&enc)?;
+ let mac_bytes = base64::decode(&mac)?;
+ KeyBundle::new(enc_bytes, mac_bytes)
+ }
+
+ #[inline]
+ pub fn encryption_key(&self) -> &[u8] {
+ &self.enc_key
+ }
+
+ #[inline]
+ pub fn hmac_key(&self) -> &[u8] {
+ &self.mac_key
+ }
+
+ #[inline]
+ pub fn to_b64_array(&self) -> [String; 2] {
+ [base64::encode(&self.enc_key), base64::encode(&self.mac_key)]
+ }
+
+ /// Decrypt the provided ciphertext with the given iv, and decodes the
+ /// result as a utf8 string.
+ pub fn decrypt(&self, enc_base64: &str, iv_base64: &str, hmac_base16: &str) -> Result<String> {
+ // Decode the expected_hmac into bytes to avoid issues if a client happens to encode
+ // this as uppercase. This shouldn't happen in practice, but doing it this way is more
+ // robust and avoids an allocation.
+ let mut decoded_hmac = vec![0u8; 32];
+ if base16::decode_slice(hmac_base16, &mut decoded_hmac).is_err() {
+ log::warn!("Garbage HMAC verification string: contained non base16 characters");
+ return Err(ErrorKind::HmacMismatch.into());
+ }
+ let iv = base64::decode(iv_base64)?;
+ let ciphertext_bytes = base64::decode(enc_base64)?;
+ let key_bytes = [self.encryption_key(), self.hmac_key()].concat();
+ let key = OpeningKey::new(&aead::LEGACY_SYNC_AES_256_CBC_HMAC_SHA256, &key_bytes)?;
+ let nonce = aead::Nonce::try_assume_unique_for_key(
+ &aead::LEGACY_SYNC_AES_256_CBC_HMAC_SHA256,
+ &iv,
+ )?;
+ let ciphertext_and_hmac = [ciphertext_bytes, decoded_hmac].concat();
+ let cleartext_bytes = aead::open(&key, nonce, aead::Aad::empty(), &ciphertext_and_hmac)?;
+ let cleartext = String::from_utf8(cleartext_bytes)?;
+ Ok(cleartext)
+ }
+
+ /// Encrypt using the provided IV.
+ pub fn encrypt_bytes_with_iv(
+ &self,
+ cleartext_bytes: &[u8],
+ iv: &[u8],
+ ) -> Result<(String, String)> {
+ let key_bytes = [self.encryption_key(), self.hmac_key()].concat();
+ let key = SealingKey::new(&aead::LEGACY_SYNC_AES_256_CBC_HMAC_SHA256, &key_bytes)?;
+ let nonce =
+ aead::Nonce::try_assume_unique_for_key(&aead::LEGACY_SYNC_AES_256_CBC_HMAC_SHA256, iv)?;
+ let ciphertext_and_hmac = aead::seal(&key, nonce, aead::Aad::empty(), cleartext_bytes)?;
+ let ciphertext_len = ciphertext_and_hmac.len() - key.algorithm().tag_len();
+ // Do the string conversions here so we don't have to split and copy to 2 vectors.
+ let (ciphertext, hmac_signature) = ciphertext_and_hmac.split_at(ciphertext_len);
+ let enc_base64 = base64::encode(&ciphertext);
+ let hmac_base16 = base16::encode_lower(&hmac_signature);
+ Ok((enc_base64, hmac_base16))
+ }
+
+ /// Generate a random iv and encrypt with it. Return both the encrypted bytes
+ /// and the generated iv.
+ pub fn encrypt_bytes_rand_iv(
+ &self,
+ cleartext_bytes: &[u8],
+ ) -> Result<(String, String, String)> {
+ let mut iv = [0u8; 16];
+ rand::fill(&mut iv)?;
+ let (enc_base64, hmac_base16) = self.encrypt_bytes_with_iv(cleartext_bytes, &iv)?;
+ let iv_base64 = base64::encode(&iv);
+ Ok((enc_base64, iv_base64, hmac_base16))
+ }
+
+ pub fn encrypt_with_iv(&self, cleartext: &str, iv: &[u8]) -> Result<(String, String)> {
+ self.encrypt_bytes_with_iv(cleartext.as_bytes(), iv)
+ }
+
+ pub fn encrypt_rand_iv(&self, cleartext: &str) -> Result<(String, String, String)> {
+ self.encrypt_bytes_rand_iv(cleartext.as_bytes())
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ const HMAC_B16: &str = "b1e6c18ac30deb70236bc0d65a46f7a4dce3b8b0e02cf92182b914e3afa5eebc";
+ const IV_B64: &str = "GX8L37AAb2FZJMzIoXlX8w==";
+ const HMAC_KEY_B64: &str = "MMntEfutgLTc8FlTLQFms8/xMPmCldqPlq/QQXEjx70=";
+ const ENC_KEY_B64: &str = "9K/wLdXdw+nrTtXo4ZpECyHFNr4d7aYHqeg3KW9+m6Q=";
+
+ const CIPHERTEXT_B64_PIECES: &[&str] = &[
+ "NMsdnRulLwQsVcwxKW9XwaUe7ouJk5Wn80QhbD80l0HEcZGCynh45qIbeYBik0lgcHbK",
+ "mlIxTJNwU+OeqipN+/j7MqhjKOGIlvbpiPQQLC6/ffF2vbzL0nzMUuSyvaQzyGGkSYM2",
+ "xUFt06aNivoQTvU2GgGmUK6MvadoY38hhW2LCMkoZcNfgCqJ26lO1O0sEO6zHsk3IVz6",
+ "vsKiJ2Hq6VCo7hu123wNegmujHWQSGyf8JeudZjKzfi0OFRRvvm4QAKyBWf0MgrW1F8S",
+ "FDnVfkq8amCB7NhdwhgLWbN+21NitNwWYknoEWe1m6hmGZDgDT32uxzWxCV8QqqrpH/Z",
+ "ggViEr9uMgoy4lYaWqP7G5WKvvechc62aqnsNEYhH26A5QgzmlNyvB+KPFvPsYzxDnSC",
+ "jOoRSLx7GG86wT59QZw=",
+ ];
+
+ const CLEARTEXT_B64_PIECES: &[&str] = &[
+ "eyJpZCI6IjVxUnNnWFdSSlpYciIsImhpc3RVcmkiOiJmaWxlOi8vL1VzZXJzL2phc29u",
+ "L0xpYnJhcnkvQXBwbGljYXRpb24lMjBTdXBwb3J0L0ZpcmVmb3gvUHJvZmlsZXMva3Nn",
+ "ZDd3cGsuTG9jYWxTeW5jU2VydmVyL3dlYXZlL2xvZ3MvIiwidGl0bGUiOiJJbmRleCBv",
+ "ZiBmaWxlOi8vL1VzZXJzL2phc29uL0xpYnJhcnkvQXBwbGljYXRpb24gU3VwcG9ydC9G",
+ "aXJlZm94L1Byb2ZpbGVzL2tzZ2Q3d3BrLkxvY2FsU3luY1NlcnZlci93ZWF2ZS9sb2dz",
+ "LyIsInZpc2l0cyI6W3siZGF0ZSI6MTMxOTE0OTAxMjM3MjQyNSwidHlwZSI6MX1dfQ==",
+ ];
+
+ #[test]
+ fn test_decrypt() {
+ let key_bundle = KeyBundle::from_base64(ENC_KEY_B64, HMAC_KEY_B64).unwrap();
+ let ciphertext = CIPHERTEXT_B64_PIECES.join("");
+ let s = key_bundle.decrypt(&ciphertext, IV_B64, HMAC_B16).unwrap();
+
+ let cleartext =
+ String::from_utf8(base64::decode(&CLEARTEXT_B64_PIECES.join("")).unwrap()).unwrap();
+ assert_eq!(&cleartext, &s);
+ }
+
+ #[test]
+ fn test_encrypt() {
+ let key_bundle = KeyBundle::from_base64(ENC_KEY_B64, HMAC_KEY_B64).unwrap();
+ let iv = base64::decode(IV_B64).unwrap();
+
+ let cleartext_bytes = base64::decode(&CLEARTEXT_B64_PIECES.join("")).unwrap();
+ let (enc_base64, _hmac_base16) = key_bundle
+ .encrypt_bytes_with_iv(&cleartext_bytes, &iv)
+ .unwrap();
+
+ let expect_ciphertext = CIPHERTEXT_B64_PIECES.join("");
+
+ assert_eq!(&enc_base64, &expect_ciphertext);
+
+ let (enc_base64_2, iv_base64_2, hmac_base16_2) =
+ key_bundle.encrypt_bytes_rand_iv(&cleartext_bytes).unwrap();
+ assert_ne!(&enc_base64_2, &expect_ciphertext);
+
+ let s = key_bundle
+ .decrypt(&enc_base64_2, &iv_base64_2, &hmac_base16_2)
+ .unwrap();
+ assert_eq!(&cleartext_bytes, &s.as_bytes());
+ }
+}
diff --git a/third_party/rust/sync15/src/lib.rs b/third_party/rust/sync15/src/lib.rs
new file mode 100644
index 0000000000..4420a72f0d
--- /dev/null
+++ b/third_party/rust/sync15/src/lib.rs
@@ -0,0 +1,45 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#![allow(unknown_lints, clippy::implicit_hasher)]
+#![warn(rust_2018_idioms)]
+
+mod bso_record;
+pub mod changeset;
+mod client;
+pub mod clients;
+mod coll_state;
+mod collection_keys;
+mod error;
+mod key_bundle;
+mod migrate_state;
+mod record_types;
+mod request;
+mod state;
+mod status;
+mod sync;
+mod sync_multiple;
+pub mod telemetry;
+mod token;
+mod util;
+
+// Re-export some of the types callers are likely to want for convenience.
+pub use crate::bso_record::{BsoRecord, CleartextBso, EncryptedBso, EncryptedPayload, Payload};
+pub use crate::changeset::{IncomingChangeset, OutgoingChangeset, RecordChangeset};
+pub use crate::client::{
+ SetupStorageClient, Sync15ClientResponse, Sync15StorageClient, Sync15StorageClientInit,
+};
+pub use crate::coll_state::{CollState, CollSyncIds, StoreSyncAssociation};
+pub use crate::collection_keys::CollectionKeys;
+pub use crate::error::{Error, ErrorKind, Result};
+pub use crate::key_bundle::KeyBundle;
+pub use crate::migrate_state::extract_v1_state;
+pub use crate::request::CollectionRequest;
+pub use crate::state::{GlobalState, SetupStateMachine};
+pub use crate::status::{ServiceStatus, SyncResult};
+pub use crate::sync::{synchronize, Store};
+pub use crate::sync_multiple::{
+ sync_multiple, sync_multiple_with_command_processor, MemoryCachedState, SyncRequestInfo,
+};
+pub use crate::util::ServerTimestamp;
diff --git a/third_party/rust/sync15/src/migrate_state.rs b/third_party/rust/sync15/src/migrate_state.rs
new file mode 100644
index 0000000000..73d64676b7
--- /dev/null
+++ b/third_party/rust/sync15/src/migrate_state.rs
@@ -0,0 +1,203 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::record_types::MetaGlobalRecord;
+use crate::state::PersistedGlobalState;
+use crate::CollSyncIds;
+use serde_json::Value;
+
+/// Given a string persisted as our old GlobalState V1 struct, extract out
+/// the sync IDs for the collection, plus a string which should be used as the
+/// new "global persisted state" (which holds the declined engines).
+/// Returns (None, None) in early error cases (eg, invalid JSON, wrong schema
+/// version etc). Otherwise, you can expect the returned global state to be
+/// Some, even if the CollSyncIds is None (which can happen if the engine is
+/// missing, or flagged for reset)
+pub fn extract_v1_state(
+ state: Option<String>,
+ collection: &'static str,
+) -> (Option<CollSyncIds>, Option<String>) {
+ let state = match state {
+ Some(s) => s,
+ None => return (None, None),
+ };
+ let j: serde_json::Value = match serde_json::from_str(&state) {
+ Ok(j) => j,
+ Err(_) => return (None, None),
+ };
+ if Some("V1") != j.get("schema_version").and_then(Value::as_str) {
+ return (None, None);
+ }
+
+ let empty = Vec::<serde_json::Value>::new();
+ // Get the global and payload out so we can obtain declined first.
+ let global = match j.get("global").and_then(Value::as_object) {
+ None => return (None, None),
+ Some(v) => v,
+ };
+ // payload is itself a string holding json - so re-parse.
+ let meta_global = match global["payload"]
+ .as_str()
+ .and_then(|s| serde_json::from_str::<MetaGlobalRecord>(s).ok())
+ {
+ Some(p) => p,
+ None => return (None, None),
+ };
+ let pgs = PersistedGlobalState::V2 {
+ declined: Some(meta_global.declined),
+ };
+ let new_global_state = serde_json::to_string(&pgs).ok();
+
+ // See if the collection needs a reset.
+ for change in j
+ .get("engine_state_changes")
+ .and_then(Value::as_array)
+ .unwrap_or(&empty)
+ {
+ if change.as_str() == Some("ResetAll") {
+ return (None, new_global_state);
+ }
+ // other resets we care about are objects - `"Reset":name` and
+ // `"ResetAllExcept":[name, name]`
+ if let Some(change_ob) = change.as_object() {
+ if change_ob.get("Reset").and_then(Value::as_str) == Some(collection) {
+ // this engine is reset.
+ return (None, new_global_state);
+ }
+ if let Some(except_array) = change_ob.get("ResetAllExcept").and_then(Value::as_array) {
+ // We have what appears to be a valid list of exceptions to reset.
+ // If every one lists an engine that isn't us, we are being reset.
+ if except_array
+ .iter()
+ .filter_map(Value::as_str)
+ .all(|s| s != collection)
+ {
+ return (None, new_global_state);
+ }
+ }
+ }
+ }
+
+ // Try and find the sync guids in the global payload.
+ let gsid = meta_global.sync_id;
+ let ids = meta_global.engines.get(collection).map(|coll| CollSyncIds {
+ global: gsid,
+ coll: coll.sync_id.clone(),
+ });
+ (ids, new_global_state)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ // Test our destructuring of the old persisted global state.
+
+ fn get_state_with_engine_changes_and_declined(changes: &str, declined: &str) -> String {
+ // This is a copy of the V1 persisted state.
+ // Note some things have been omitted or trimmed from what's actually persisted
+ // (eg, top-level "config" is removed, "collections" is removed (that's only timestamps)
+ // hmac keys have array elts removed, global/payload has engines removed, etc)
+ // Note also that all `{` and `}` have been doubled for use in format!(),
+ // which we use to patch-in engine_state_changes.
+ format!(
+ r#"{{
+ "schema_version":"V1",
+ "global":{{
+ "id":"global",
+ "collection":"",
+ "payload":"{{\"syncID\":\"qZKAMjhyV6Ti\",\"storageVersion\":5,\"engines\":{{\"addresses\":{{\"version\":1,\"syncID\":\"8M-HfX6dm-pD\"}},\"bookmarks\":{{\"version\":2,\"syncID\":\"AVXtnKkH5OTi\"}}}},\"declined\":[{declined}]}}"
+ }},
+ "keys":{{"timestamp":1548214240.34,"default":{{"enc_key":[36,76],"mac_key":[222,241]}},"collections":{{}}}},
+ "engine_state_changes":[
+ {changes}
+ ]
+ }}"#,
+ changes = changes,
+ declined = declined
+ )
+ }
+
+ fn get_state_with_engine_changes(changes: &str) -> String {
+ get_state_with_engine_changes_and_declined(changes, "")
+ }
+
+ fn make_csids(global: &str, coll: &str) -> Option<CollSyncIds> {
+ Some(CollSyncIds {
+ global: global.into(),
+ coll: coll.into(),
+ })
+ }
+
+ fn extract_v1_ids_only(state: Option<String>, collection: &'static str) -> Option<CollSyncIds> {
+ let (sync_ids, new_state) = extract_v1_state(state, collection);
+ // tests which use this never have declined, so make sure our
+ // state reflects that.
+ let expected_state = serde_json::to_string(&PersistedGlobalState::V2 {
+ declined: Some(Vec::<String>::new()),
+ })
+ .expect("should stringify");
+ assert_eq!(new_state, Some(expected_state));
+ sync_ids
+ }
+
+ #[test]
+ fn test_extract_state_simple() {
+ let s = get_state_with_engine_changes("");
+ assert_eq!(
+ extract_v1_ids_only(Some(s.clone()), "addresses"),
+ make_csids("qZKAMjhyV6Ti", "8M-HfX6dm-pD")
+ );
+ assert_eq!(
+ extract_v1_ids_only(Some(s), "bookmarks"),
+ make_csids("qZKAMjhyV6Ti", "AVXtnKkH5OTi")
+ );
+ }
+
+ #[test]
+ fn test_extract_state_simple_with_declined() {
+ // Note that 'declined' is stringified json, hence the extra back-slashes.
+ let s = get_state_with_engine_changes_and_declined("", "\\\"foo\\\"");
+ let expected_state = serde_json::to_string(&PersistedGlobalState::V2 {
+ declined: Some(vec!["foo".to_string()]),
+ })
+ .unwrap();
+ assert_eq!(
+ extract_v1_state(Some(s), "addresses"),
+ (
+ make_csids("qZKAMjhyV6Ti", "8M-HfX6dm-pD"),
+ Some(expected_state)
+ )
+ );
+ }
+
+ #[test]
+ fn test_extract_with_engine_reset_all() {
+ let s = get_state_with_engine_changes("\"ResetAll\"");
+ assert_eq!(extract_v1_ids_only(Some(s), "addresses"), None);
+ }
+
+ #[test]
+ fn test_extract_with_engine_reset() {
+ let s = get_state_with_engine_changes("{\"Reset\" : \"addresses\"}");
+ assert_eq!(extract_v1_ids_only(Some(s.clone()), "addresses"), None);
+ // bookmarks wasn't reset.
+ assert_eq!(
+ extract_v1_ids_only(Some(s), "bookmarks"),
+ make_csids("qZKAMjhyV6Ti", "AVXtnKkH5OTi")
+ );
+ }
+
+ #[test]
+ fn test_extract_with_engine_reset_except() {
+ let s = get_state_with_engine_changes("{\"ResetAllExcept\" : [\"addresses\"]}");
+ // addresses is the exception
+ assert_eq!(
+ extract_v1_ids_only(Some(s.clone()), "addresses"),
+ make_csids("qZKAMjhyV6Ti", "8M-HfX6dm-pD")
+ );
+ // bookmarks was reset.
+ assert_eq!(extract_v1_ids_only(Some(s), "bookmarks"), None);
+ }
+}
diff --git a/third_party/rust/sync15/src/record_types.rs b/third_party/rust/sync15/src/record_types.rs
new file mode 100644
index 0000000000..f8f1449fdc
--- /dev/null
+++ b/third_party/rust/sync15/src/record_types.rs
@@ -0,0 +1,51 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use serde_derive::*;
+use std::collections::HashMap;
+use sync_guid::Guid;
+
+// Known record formats.
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct MetaGlobalEngine {
+ pub version: usize,
+ #[serde(rename = "syncID")]
+ pub sync_id: Guid,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct MetaGlobalRecord {
+ #[serde(rename = "syncID")]
+ pub sync_id: Guid,
+ #[serde(rename = "storageVersion")]
+ pub storage_version: usize,
+ #[serde(default)]
+ pub engines: HashMap<String, MetaGlobalEngine>,
+ #[serde(default)]
+ pub declined: Vec<String>,
+}
+
+#[derive(Deserialize, Serialize, Clone, Debug, Eq, PartialEq)]
+pub struct CryptoKeysRecord {
+ pub id: Guid,
+ pub collection: String,
+ pub default: [String; 2],
+ pub collections: HashMap<String, [String; 2]>,
+}
+
+#[cfg(test)]
+#[test]
+fn test_deserialize_meta_global() {
+ let record = serde_json::json!({
+ "syncID": "abcd1234abcd",
+ "storageVersion": 1,
+ })
+ .to_string();
+ let r = serde_json::from_str::<MetaGlobalRecord>(&record).unwrap();
+ assert_eq!(r.sync_id, "abcd1234abcd");
+ assert_eq!(r.storage_version, 1);
+ assert_eq!(r.engines.len(), 0);
+ assert_eq!(r.declined.len(), 0);
+}
diff --git a/third_party/rust/sync15/src/request.rs b/third_party/rust/sync15/src/request.rs
new file mode 100644
index 0000000000..9e09f53b68
--- /dev/null
+++ b/third_party/rust/sync15/src/request.rs
@@ -0,0 +1,1249 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::bso_record::EncryptedBso;
+use crate::client::Sync15ClientResponse;
+use crate::error::{self, ErrorKind, Result};
+use crate::util::ServerTimestamp;
+use serde_derive::*;
+use std::collections::HashMap;
+use std::default::Default;
+use std::ops::Deref;
+pub use sync15_traits::{CollectionRequest, RequestOrder};
+use sync_guid::Guid;
+use viaduct::status_codes;
+
+/// Manages a pair of (byte, count) limits for a PostQueue, such as
+/// (max_post_bytes, max_post_records) or (max_total_bytes, max_total_records).
+#[derive(Debug, Clone)]
+struct LimitTracker {
+ max_bytes: usize,
+ max_records: usize,
+ cur_bytes: usize,
+ cur_records: usize,
+}
+
+impl LimitTracker {
+ pub fn new(max_bytes: usize, max_records: usize) -> LimitTracker {
+ LimitTracker {
+ max_bytes,
+ max_records,
+ cur_bytes: 0,
+ cur_records: 0,
+ }
+ }
+
+ pub fn clear(&mut self) {
+ self.cur_records = 0;
+ self.cur_bytes = 0;
+ }
+
+ pub fn can_add_record(&self, payload_size: usize) -> bool {
+ // Desktop does the cur_bytes check as exclusive, but we shouldn't see any servers that
+ // don't have https://github.com/mozilla-services/server-syncstorage/issues/73
+ self.cur_records < self.max_records && self.cur_bytes + payload_size <= self.max_bytes
+ }
+
+ pub fn can_never_add(&self, record_size: usize) -> bool {
+ record_size >= self.max_bytes
+ }
+
+ pub fn record_added(&mut self, record_size: usize) {
+ assert!(
+ self.can_add_record(record_size),
+ "LimitTracker::record_added caller must check can_add_record"
+ );
+ self.cur_records += 1;
+ self.cur_bytes += record_size;
+ }
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct InfoConfiguration {
+ /// The maximum size in bytes of the overall HTTP request body that will be accepted by the
+ /// server.
+ #[serde(default = "default_max_request_bytes")]
+ pub max_request_bytes: usize,
+
+ /// The maximum number of records that can be uploaded to a collection in a single POST request.
+ #[serde(default = "usize::max_value")]
+ pub max_post_records: usize,
+
+ /// The maximum combined size in bytes of the record payloads that can be uploaded to a
+ /// collection in a single POST request.
+ #[serde(default = "usize::max_value")]
+ pub max_post_bytes: usize,
+
+ /// The maximum total number of records that can be uploaded to a collection as part of a
+ /// batched upload.
+ #[serde(default = "usize::max_value")]
+ pub max_total_records: usize,
+
+ /// The maximum total combined size in bytes of the record payloads that can be uploaded to a
+ /// collection as part of a batched upload.
+ #[serde(default = "usize::max_value")]
+ pub max_total_bytes: usize,
+
+ /// The maximum size of an individual BSO payload, in bytes.
+ #[serde(default = "default_max_record_payload_bytes")]
+ pub max_record_payload_bytes: usize,
+}
+
+// This is annoying but seems to be the only way to do it...
+fn default_max_request_bytes() -> usize {
+ 260 * 1024
+}
+fn default_max_record_payload_bytes() -> usize {
+ 256 * 1024
+}
+
+impl Default for InfoConfiguration {
+ #[inline]
+ fn default() -> InfoConfiguration {
+ InfoConfiguration {
+ max_request_bytes: default_max_request_bytes(),
+ max_record_payload_bytes: default_max_record_payload_bytes(),
+ max_post_records: usize::max_value(),
+ max_post_bytes: usize::max_value(),
+ max_total_records: usize::max_value(),
+ max_total_bytes: usize::max_value(),
+ }
+ }
+}
+
+#[derive(Clone, Debug, Default, Deserialize, Serialize)]
+pub struct InfoCollections(pub(crate) HashMap<String, ServerTimestamp>);
+
+impl InfoCollections {
+ pub fn new(collections: HashMap<String, ServerTimestamp>) -> InfoCollections {
+ InfoCollections(collections)
+ }
+}
+
+impl Deref for InfoCollections {
+ type Target = HashMap<String, ServerTimestamp>;
+
+ fn deref(&self) -> &HashMap<String, ServerTimestamp> {
+ &self.0
+ }
+}
+
+#[derive(Debug, Clone, Deserialize)]
+pub struct UploadResult {
+ batch: Option<String>,
+ /// Maps record id => why failed
+ #[serde(default = "HashMap::new")]
+ pub failed: HashMap<Guid, String>,
+ /// Vec of ids
+ #[serde(default = "Vec::new")]
+ pub success: Vec<Guid>,
+}
+
+pub type PostResponse = Sync15ClientResponse<UploadResult>;
+
+#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub enum BatchState {
+ Unsupported,
+ NoBatch,
+ InBatch(String),
+}
+
+#[derive(Debug)]
+pub struct PostQueue<Post, OnResponse> {
+ poster: Post,
+ on_response: OnResponse,
+ post_limits: LimitTracker,
+ batch_limits: LimitTracker,
+ max_payload_bytes: usize,
+ max_request_bytes: usize,
+ queued: Vec<u8>,
+ batch: BatchState,
+ last_modified: ServerTimestamp,
+}
+
+pub trait BatchPoster {
+ /// Note: Last argument (reference to the batch poster) is provided for the purposes of testing
+ /// Important: Poster should not report non-success HTTP statuses as errors!!
+ fn post<P, O>(
+ &self,
+ body: Vec<u8>,
+ xius: ServerTimestamp,
+ batch: Option<String>,
+ commit: bool,
+ queue: &PostQueue<P, O>,
+ ) -> Result<PostResponse>;
+}
+
+// We don't just use a FnMut here since we want to override it in mocking for RefCell<TestType>,
+// which we can't do for FnMut since neither FnMut nor RefCell are defined here. Also, this
+// is somewhat better for documentation.
+pub trait PostResponseHandler {
+ fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()>;
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct NormalResponseHandler {
+ pub failed_ids: Vec<Guid>,
+ pub successful_ids: Vec<Guid>,
+ pub allow_failed: bool,
+ pub pending_failed: Vec<Guid>,
+ pub pending_success: Vec<Guid>,
+}
+
+impl NormalResponseHandler {
+ pub fn new(allow_failed: bool) -> NormalResponseHandler {
+ NormalResponseHandler {
+ failed_ids: vec![],
+ successful_ids: vec![],
+ pending_failed: vec![],
+ pending_success: vec![],
+ allow_failed,
+ }
+ }
+}
+
+impl PostResponseHandler for NormalResponseHandler {
+ fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> error::Result<()> {
+ match r {
+ Sync15ClientResponse::Success { record, .. } => {
+ if !record.failed.is_empty() && !self.allow_failed {
+ return Err(ErrorKind::RecordUploadFailed.into());
+ }
+ for id in record.success.iter() {
+ self.pending_success.push(id.clone());
+ }
+ for kv in record.failed.iter() {
+ self.pending_failed.push(kv.0.clone());
+ }
+ if !mid_batch {
+ self.successful_ids.append(&mut self.pending_success);
+ self.failed_ids.append(&mut self.pending_failed);
+ }
+ Ok(())
+ }
+ _ => Err(r.create_storage_error().into()),
+ }
+ }
+}
+
+impl<Poster, OnResponse> PostQueue<Poster, OnResponse>
+where
+ Poster: BatchPoster,
+ OnResponse: PostResponseHandler,
+{
+ pub fn new(
+ config: &InfoConfiguration,
+ ts: ServerTimestamp,
+ poster: Poster,
+ on_response: OnResponse,
+ ) -> PostQueue<Poster, OnResponse> {
+ PostQueue {
+ poster,
+ on_response,
+ last_modified: ts,
+ post_limits: LimitTracker::new(config.max_post_bytes, config.max_post_records),
+ batch_limits: LimitTracker::new(config.max_total_bytes, config.max_total_records),
+ batch: BatchState::NoBatch,
+ max_payload_bytes: config.max_record_payload_bytes,
+ max_request_bytes: config.max_request_bytes,
+ queued: Vec::new(),
+ }
+ }
+
+ #[inline]
+ fn in_batch(&self) -> bool {
+ !matches!(&self.batch, BatchState::Unsupported | BatchState::NoBatch)
+ }
+
+ pub fn enqueue(&mut self, record: &EncryptedBso) -> Result<bool> {
+ let payload_length = record.payload.serialized_len();
+
+ if self.post_limits.can_never_add(payload_length)
+ || self.batch_limits.can_never_add(payload_length)
+ || payload_length >= self.max_payload_bytes
+ {
+ log::warn!(
+ "Single record too large to submit to server ({} b)",
+ payload_length
+ );
+ return Ok(false);
+ }
+
+ // Write directly into `queued` but undo if necessary (the vast majority of the time
+ // it won't be necessary). If we hit a problem we need to undo that, but the only error
+ // case we have to worry about right now is in flush()
+ let item_start = self.queued.len();
+
+ // This is conservative but can't hurt.
+ self.queued.reserve(payload_length + 2);
+
+ // Either the first character in an array, or a comma separating
+ // it from the previous item.
+ let c = if self.queued.is_empty() { b'[' } else { b',' };
+ self.queued.push(c);
+
+ // This unwrap is fine, since serde_json's failure case is HashMaps that have non-object
+ // keys, which is impossible. If you decide to change this part, you *need* to call
+ // `self.queued.truncate(item_start)` here in the failure case!
+ serde_json::to_writer(&mut self.queued, &record).unwrap();
+
+ let item_end = self.queued.len();
+
+ debug_assert!(
+ item_end >= payload_length,
+ "EncryptedPayload::serialized_len is bugged"
+ );
+
+ // The + 1 is only relevant for the final record, which will have a trailing ']'.
+ let item_len = item_end - item_start + 1;
+
+ if item_len >= self.max_request_bytes {
+ self.queued.truncate(item_start);
+ log::warn!(
+ "Single record too large to submit to server ({} b)",
+ item_len
+ );
+ return Ok(false);
+ }
+
+ let can_post_record = self.post_limits.can_add_record(payload_length);
+ let can_batch_record = self.batch_limits.can_add_record(payload_length);
+ let can_send_record = self.queued.len() < self.max_request_bytes;
+
+ if !can_post_record || !can_send_record || !can_batch_record {
+ log::debug!(
+ "PostQueue flushing! (can_post = {}, can_send = {}, can_batch = {})",
+ can_post_record,
+ can_send_record,
+ can_batch_record
+ );
+ // "unwrite" the record.
+ self.queued.truncate(item_start);
+ // Flush whatever we have queued.
+ self.flush(!can_batch_record)?;
+ // And write it again.
+ let c = if self.queued.is_empty() { b'[' } else { b',' };
+ self.queued.push(c);
+ serde_json::to_writer(&mut self.queued, &record).unwrap();
+ }
+
+ self.post_limits.record_added(payload_length);
+ self.batch_limits.record_added(payload_length);
+
+ Ok(true)
+ }
+
+ pub fn flush(&mut self, want_commit: bool) -> Result<()> {
+ if self.queued.is_empty() {
+ assert!(
+ !self.in_batch(),
+ "Bug: Somehow we're in a batch but have no queued records"
+ );
+ // Nothing to do!
+ return Ok(());
+ }
+
+ self.queued.push(b']');
+ let batch_id = match &self.batch {
+ // Not the first post and we know we have no batch semantics.
+ BatchState::Unsupported => None,
+ // First commit in possible batch
+ BatchState::NoBatch => Some("true".into()),
+ // In a batch and we have a batch id.
+ BatchState::InBatch(ref s) => Some(s.clone()),
+ };
+
+ log::info!(
+ "Posting {} records of {} bytes",
+ self.post_limits.cur_records,
+ self.queued.len()
+ );
+
+ let is_commit = want_commit && batch_id.is_some();
+ // Weird syntax for calling a function object that is a property.
+ let resp_or_error = self.poster.post(
+ self.queued.clone(),
+ self.last_modified,
+ batch_id,
+ is_commit,
+ self,
+ );
+
+ self.queued.truncate(0);
+
+ if want_commit || self.batch == BatchState::Unsupported {
+ self.batch_limits.clear();
+ }
+ self.post_limits.clear();
+
+ let resp = resp_or_error?;
+
+ let (status, last_modified, record) = match resp {
+ Sync15ClientResponse::Success {
+ status,
+ last_modified,
+ ref record,
+ ..
+ } => (status, last_modified, record),
+ _ => {
+ self.on_response.handle_response(resp, !want_commit)?;
+ // on_response() should always fail!
+ unreachable!();
+ }
+ };
+
+ if want_commit || self.batch == BatchState::Unsupported {
+ self.last_modified = last_modified;
+ }
+
+ if want_commit {
+ log::debug!("Committed batch {:?}", self.batch);
+ self.batch = BatchState::NoBatch;
+ self.on_response.handle_response(resp, false)?;
+ return Ok(());
+ }
+
+ if status != status_codes::ACCEPTED {
+ if self.in_batch() {
+ return Err(ErrorKind::ServerBatchProblem(
+ "Server responded non-202 success code while a batch was in progress",
+ )
+ .into());
+ }
+ self.last_modified = last_modified;
+ self.batch = BatchState::Unsupported;
+ self.batch_limits.clear();
+ self.on_response.handle_response(resp, false)?;
+ return Ok(());
+ }
+
+ let batch_id = record
+ .batch
+ .as_ref()
+ .ok_or_else(|| {
+ ErrorKind::ServerBatchProblem("Invalid server response: 202 without a batch ID")
+ })?
+ .clone();
+
+ match &self.batch {
+ BatchState::Unsupported => {
+ log::warn!("Server changed its mind about supporting batching mid-batch...");
+ }
+
+ BatchState::InBatch(ref cur_id) => {
+ if cur_id != &batch_id {
+ return Err(ErrorKind::ServerBatchProblem(
+ "Invalid server response: 202 without a batch ID",
+ )
+ .into());
+ }
+ }
+ _ => {}
+ }
+
+ // Can't change this in match arms without NLL
+ self.batch = BatchState::InBatch(batch_id);
+ self.last_modified = last_modified;
+
+ self.on_response.handle_response(resp, true)?;
+
+ Ok(())
+ }
+}
+
+#[derive(Clone)]
+pub struct UploadInfo {
+ pub successful_ids: Vec<Guid>,
+ pub failed_ids: Vec<Guid>,
+ pub modified_timestamp: ServerTimestamp,
+}
+
+impl<Poster> PostQueue<Poster, NormalResponseHandler> {
+ // TODO: should take by move
+ pub fn completed_upload_info(&mut self) -> UploadInfo {
+ let mut result = UploadInfo {
+ successful_ids: Vec::with_capacity(self.on_response.successful_ids.len()),
+ failed_ids: Vec::with_capacity(
+ self.on_response.failed_ids.len()
+ + self.on_response.pending_failed.len()
+ + self.on_response.pending_success.len(),
+ ),
+ modified_timestamp: self.last_modified,
+ };
+
+ result
+ .successful_ids
+ .append(&mut self.on_response.successful_ids);
+
+ result.failed_ids.append(&mut self.on_response.failed_ids);
+ result
+ .failed_ids
+ .append(&mut self.on_response.pending_failed);
+ result
+ .failed_ids
+ .append(&mut self.on_response.pending_success);
+
+ result
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::bso_record::{BsoRecord, EncryptedPayload};
+ use lazy_static::lazy_static;
+ use std::cell::RefCell;
+ use std::collections::VecDeque;
+ use std::rc::Rc;
+ use url::Url;
+ #[test]
+ fn test_url_building() {
+ let base = Url::parse("https://example.com/sync").unwrap();
+ let empty = CollectionRequest::new("foo")
+ .build_url(base.clone())
+ .unwrap();
+ assert_eq!(empty.as_str(), "https://example.com/sync/storage/foo");
+ let batch_start = CollectionRequest::new("bar")
+ .batch(Some("true".into()))
+ .commit(false)
+ .build_url(base.clone())
+ .unwrap();
+ assert_eq!(
+ batch_start.as_str(),
+ "https://example.com/sync/storage/bar?batch=true"
+ );
+ let batch_commit = CollectionRequest::new("asdf")
+ .batch(Some("1234abc".into()))
+ .commit(true)
+ .build_url(base.clone())
+ .unwrap();
+ assert_eq!(
+ batch_commit.as_str(),
+ "https://example.com/sync/storage/asdf?batch=1234abc&commit=true"
+ );
+
+ let idreq = CollectionRequest::new("wutang")
+ .full()
+ .ids(&["rza", "gza"])
+ .build_url(base.clone())
+ .unwrap();
+ assert_eq!(
+ idreq.as_str(),
+ "https://example.com/sync/storage/wutang?full=1&ids=rza%2Cgza"
+ );
+
+ let complex = CollectionRequest::new("specific")
+ .full()
+ .limit(10)
+ .sort_by(RequestOrder::Oldest)
+ .older_than(ServerTimestamp(9_876_540))
+ .newer_than(ServerTimestamp(1_234_560))
+ .build_url(base)
+ .unwrap();
+ assert_eq!(complex.as_str(),
+ "https://example.com/sync/storage/specific?full=1&limit=10&older=9876.54&newer=1234.56&sort=oldest");
+ }
+
+ #[derive(Debug, Clone)]
+ struct PostedData {
+ body: String,
+ xius: ServerTimestamp,
+ batch: Option<String>,
+ commit: bool,
+ payload_bytes: usize,
+ records: usize,
+ }
+
+ impl PostedData {
+ fn records_as_json(&self) -> Vec<serde_json::Value> {
+ let values =
+ serde_json::from_str::<serde_json::Value>(&self.body).expect("Posted invalid json");
+ // Check that they actually deserialize as what we want
+ let records_or_err = serde_json::from_value::<Vec<EncryptedBso>>(values.clone());
+ records_or_err.expect("Failed to deserialize data");
+ serde_json::from_value(values).unwrap()
+ }
+ }
+
+ #[derive(Debug, Clone)]
+ struct BatchInfo {
+ id: Option<String>,
+ posts: Vec<PostedData>,
+ bytes: usize,
+ records: usize,
+ }
+
+ #[derive(Debug, Clone)]
+ struct TestPoster {
+ all_posts: Vec<PostedData>,
+ responses: VecDeque<PostResponse>,
+ batches: Vec<BatchInfo>,
+ cur_batch: Option<BatchInfo>,
+ cfg: InfoConfiguration,
+ }
+
+ type TestPosterRef = Rc<RefCell<TestPoster>>;
+ impl TestPoster {
+ pub fn new<T>(cfg: &InfoConfiguration, responses: T) -> TestPosterRef
+ where
+ T: Into<VecDeque<PostResponse>>,
+ {
+ Rc::new(RefCell::new(TestPoster {
+ all_posts: vec![],
+ responses: responses.into(),
+ batches: vec![],
+ cur_batch: None,
+ cfg: cfg.clone(),
+ }))
+ }
+ // Adds &mut
+ fn do_post<T, O>(
+ &mut self,
+ body: &[u8],
+ xius: ServerTimestamp,
+ batch: Option<String>,
+ commit: bool,
+ queue: &PostQueue<T, O>,
+ ) -> Result<PostResponse> {
+ let mut post = PostedData {
+ body: String::from_utf8(body.into()).expect("Posted invalid utf8..."),
+ batch: batch.clone(),
+ xius,
+ commit,
+ payload_bytes: 0,
+ records: 0,
+ };
+
+ assert!(body.len() <= self.cfg.max_request_bytes);
+
+ let (num_records, record_payload_bytes) = {
+ let recs = post.records_as_json();
+ assert!(recs.len() <= self.cfg.max_post_records);
+ assert!(recs.len() <= self.cfg.max_total_records);
+ let payload_bytes: usize = recs
+ .iter()
+ .map(|r| {
+ let len = r["payload"]
+ .as_str()
+ .expect("Non string payload property")
+ .len();
+ assert!(len <= self.cfg.max_record_payload_bytes);
+ len
+ })
+ .sum();
+ assert!(payload_bytes <= self.cfg.max_post_bytes);
+ assert!(payload_bytes <= self.cfg.max_total_bytes);
+
+ assert_eq!(queue.post_limits.cur_bytes, payload_bytes);
+ assert_eq!(queue.post_limits.cur_records, recs.len());
+ (recs.len(), payload_bytes)
+ };
+ post.payload_bytes = record_payload_bytes;
+ post.records = num_records;
+
+ self.all_posts.push(post.clone());
+ let response = self.responses.pop_front().unwrap();
+
+ let record = match response {
+ Sync15ClientResponse::Success { ref record, .. } => record,
+ _ => {
+ panic!("only success codes are used in this test");
+ }
+ };
+
+ if self.cur_batch.is_none() {
+ assert!(
+ batch.is_none() || batch == Some("true".into()),
+ "We shouldn't be in a batch now"
+ );
+ self.cur_batch = Some(BatchInfo {
+ id: record.batch.clone(),
+ posts: vec![],
+ records: 0,
+ bytes: 0,
+ });
+ } else {
+ assert_eq!(
+ batch,
+ self.cur_batch.as_ref().unwrap().id,
+ "We're in a batch but got the wrong batch id"
+ );
+ }
+
+ {
+ let batch = self.cur_batch.as_mut().unwrap();
+ batch.posts.push(post);
+ batch.records += num_records;
+ batch.bytes += record_payload_bytes;
+
+ assert!(batch.bytes <= self.cfg.max_total_bytes);
+ assert!(batch.records <= self.cfg.max_total_records);
+
+ assert_eq!(batch.records, queue.batch_limits.cur_records);
+ assert_eq!(batch.bytes, queue.batch_limits.cur_bytes);
+ }
+
+ if commit || record.batch.is_none() {
+ let batch = self.cur_batch.take().unwrap();
+ self.batches.push(batch);
+ }
+
+ Ok(response)
+ }
+
+ fn do_handle_response(&mut self, _: PostResponse, mid_batch: bool) -> Result<()> {
+ assert_eq!(mid_batch, self.cur_batch.is_some());
+ Ok(())
+ }
+ }
+ impl BatchPoster for TestPosterRef {
+ fn post<T, O>(
+ &self,
+ body: Vec<u8>,
+ xius: ServerTimestamp,
+ batch: Option<String>,
+ commit: bool,
+ queue: &PostQueue<T, O>,
+ ) -> Result<PostResponse> {
+ self.borrow_mut().do_post(&body, xius, batch, commit, queue)
+ }
+ }
+
+ impl PostResponseHandler for TestPosterRef {
+ fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()> {
+ self.borrow_mut().do_handle_response(r, mid_batch)
+ }
+ }
+
+ type MockedPostQueue = PostQueue<TestPosterRef, TestPosterRef>;
+
+ fn pq_test_setup(
+ cfg: InfoConfiguration,
+ lm: i64,
+ resps: Vec<PostResponse>,
+ ) -> (MockedPostQueue, TestPosterRef) {
+ let tester = TestPoster::new(&cfg, resps);
+ let pq = PostQueue::new(&cfg, ServerTimestamp(lm), tester.clone(), tester.clone());
+ (pq, tester)
+ }
+
+ fn fake_response<'a, T: Into<Option<&'a str>>>(status: u16, lm: i64, batch: T) -> PostResponse {
+ assert!(status_codes::is_success_code(status));
+ Sync15ClientResponse::Success {
+ status,
+ last_modified: ServerTimestamp(lm),
+ record: UploadResult {
+ batch: batch.into().map(Into::into),
+ failed: HashMap::new(),
+ success: vec![],
+ },
+ route: "test/path".into(),
+ }
+ }
+
+ lazy_static! {
+ // ~40b
+ static ref PAYLOAD_OVERHEAD: usize = {
+ let payload = EncryptedPayload {
+ iv: "".into(),
+ hmac: "".into(),
+ ciphertext: "".into()
+ };
+ serde_json::to_string(&payload).unwrap().len()
+ };
+ // ~80b
+ static ref TOTAL_RECORD_OVERHEAD: usize = {
+ let val = serde_json::to_value(BsoRecord {
+ id: "".into(),
+ collection: "".into(),
+ modified: ServerTimestamp(0),
+ sortindex: None,
+ ttl: None,
+ payload: EncryptedPayload {
+ iv: "".into(),
+ hmac: "".into(),
+ ciphertext: "".into()
+ },
+ }).unwrap();
+ serde_json::to_string(&val).unwrap().len()
+ };
+ // There's some subtlety in how we calulate this having to do with the fact that
+ // the quotes in the payload are escaped but the escape chars count to the request len
+ // and *not* to the payload len (the payload len check happens after json parsing the
+ // top level object).
+ static ref NON_PAYLOAD_OVERHEAD: usize = {
+ *TOTAL_RECORD_OVERHEAD - *PAYLOAD_OVERHEAD
+ };
+ }
+
+ // Actual record size (for max_request_len) will be larger by some amount
+ fn make_record(payload_size: usize) -> EncryptedBso {
+ assert!(payload_size > *PAYLOAD_OVERHEAD);
+ let ciphertext_len = payload_size - *PAYLOAD_OVERHEAD;
+ BsoRecord {
+ id: "".into(),
+ collection: "".into(),
+ modified: ServerTimestamp(0),
+ sortindex: None,
+ ttl: None,
+ payload: EncryptedPayload {
+ iv: "".into(),
+ hmac: "".into(),
+ ciphertext: "x".repeat(ciphertext_len),
+ },
+ }
+ }
+
+ fn request_bytes_for_payloads(payloads: &[usize]) -> usize {
+ 1 + payloads
+ .iter()
+ .map(|&size| size + 1 + *NON_PAYLOAD_OVERHEAD)
+ .sum::<usize>()
+ }
+
+ #[test]
+ fn test_pq_basic() {
+ let cfg = InfoConfiguration {
+ max_request_bytes: 1000,
+ max_record_payload_bytes: 1000,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![fake_response(status_codes::OK, time + 100_000, None)],
+ );
+
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.flush(true).unwrap();
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 1);
+ assert_eq!(t.batches.len(), 1);
+ assert_eq!(t.batches[0].posts.len(), 1);
+ assert_eq!(t.batches[0].records, 1);
+ assert_eq!(t.batches[0].bytes, 100);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[100])
+ );
+ }
+
+ #[test]
+ fn test_pq_max_request_bytes_no_batch() {
+ let cfg = InfoConfiguration {
+ max_request_bytes: 250,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![
+ fake_response(status_codes::OK, time + 100_000, None),
+ fake_response(status_codes::OK, time + 200_000, None),
+ ],
+ );
+
+ // Note that the total record overhead is around 85 bytes
+ let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
+ pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r]
+ pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 203; [r,r]
+ pq.enqueue(&make_record(payload_size)).unwrap(); // too big, 2nd post.
+ pq.flush(true).unwrap();
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 2);
+ assert_eq!(t.batches.len(), 2);
+ assert_eq!(t.batches[0].posts.len(), 1);
+ assert_eq!(t.batches[0].records, 2);
+ assert_eq!(t.batches[0].bytes, payload_size * 2);
+ assert_eq!(t.batches[0].posts[0].batch, Some("true".into()));
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[payload_size, payload_size])
+ );
+
+ assert_eq!(t.batches[1].posts.len(), 1);
+ assert_eq!(t.batches[1].records, 1);
+ assert_eq!(t.batches[1].bytes, payload_size);
+ // We know at this point that the server does not support batching.
+ assert_eq!(t.batches[1].posts[0].batch, None);
+ assert_eq!(t.batches[1].posts[0].commit, false);
+ assert_eq!(
+ t.batches[1].posts[0].body.len(),
+ request_bytes_for_payloads(&[payload_size])
+ );
+ }
+
+ #[test]
+ fn test_pq_max_record_payload_bytes_no_batch() {
+ let cfg = InfoConfiguration {
+ max_record_payload_bytes: 150,
+ max_request_bytes: 350,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![
+ fake_response(status_codes::OK, time + 100_000, None),
+ fake_response(status_codes::OK, time + 200_000, None),
+ ],
+ );
+
+ // Note that the total record overhead is around 85 bytes
+ let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
+ pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r]
+ let enqueued = pq.enqueue(&make_record(151)).unwrap(); // still 102
+ assert!(!enqueued, "Should not have fit");
+ pq.enqueue(&make_record(payload_size)).unwrap();
+ pq.flush(true).unwrap();
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 1);
+ assert_eq!(t.batches.len(), 1);
+ assert_eq!(t.batches[0].posts.len(), 1);
+ assert_eq!(t.batches[0].records, 2);
+ assert_eq!(t.batches[0].bytes, payload_size * 2);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[payload_size, payload_size])
+ );
+ }
+
+ #[test]
+ fn test_pq_single_batch() {
+ let cfg = InfoConfiguration::default();
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![fake_response(
+ status_codes::ACCEPTED,
+ time + 100_000,
+ Some("1234"),
+ )],
+ );
+
+ let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
+ pq.enqueue(&make_record(payload_size)).unwrap();
+ pq.enqueue(&make_record(payload_size)).unwrap();
+ pq.enqueue(&make_record(payload_size)).unwrap();
+ pq.flush(true).unwrap();
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 1);
+ assert_eq!(t.batches.len(), 1);
+ assert_eq!(t.batches[0].id.as_ref().unwrap(), "1234");
+ assert_eq!(t.batches[0].posts.len(), 1);
+ assert_eq!(t.batches[0].records, 3);
+ assert_eq!(t.batches[0].bytes, payload_size * 3);
+ assert_eq!(t.batches[0].posts[0].commit, true);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[payload_size, payload_size, payload_size])
+ );
+ }
+
+ #[test]
+ fn test_pq_multi_post_batch_bytes() {
+ let cfg = InfoConfiguration {
+ max_post_bytes: 200,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![
+ fake_response(status_codes::ACCEPTED, time, Some("1234")),
+ fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
+ ],
+ );
+
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.flush(true).unwrap(); // COMMIT
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 2);
+ assert_eq!(t.batches.len(), 1);
+ assert_eq!(t.batches[0].posts.len(), 2);
+ assert_eq!(t.batches[0].records, 3);
+ assert_eq!(t.batches[0].bytes, 300);
+
+ assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
+ assert_eq!(t.batches[0].posts[0].records, 2);
+ assert_eq!(t.batches[0].posts[0].payload_bytes, 200);
+ assert_eq!(t.batches[0].posts[0].commit, false);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[100, 100])
+ );
+
+ assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
+ assert_eq!(t.batches[0].posts[1].records, 1);
+ assert_eq!(t.batches[0].posts[1].payload_bytes, 100);
+ assert_eq!(t.batches[0].posts[1].commit, true);
+ assert_eq!(
+ t.batches[0].posts[1].body.len(),
+ request_bytes_for_payloads(&[100])
+ );
+ }
+
+ #[test]
+ fn test_pq_multi_post_batch_records() {
+ let cfg = InfoConfiguration {
+ max_post_records: 3,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![
+ fake_response(status_codes::ACCEPTED, time, Some("1234")),
+ fake_response(status_codes::ACCEPTED, time, Some("1234")),
+ fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
+ ],
+ );
+
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.flush(true).unwrap(); // COMMIT
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 3);
+ assert_eq!(t.batches.len(), 1);
+ assert_eq!(t.batches[0].posts.len(), 3);
+ assert_eq!(t.batches[0].records, 7);
+ assert_eq!(t.batches[0].bytes, 700);
+
+ assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
+ assert_eq!(t.batches[0].posts[0].records, 3);
+ assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
+ assert_eq!(t.batches[0].posts[0].commit, false);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[100, 100, 100])
+ );
+
+ assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
+ assert_eq!(t.batches[0].posts[1].records, 3);
+ assert_eq!(t.batches[0].posts[1].payload_bytes, 300);
+ assert_eq!(t.batches[0].posts[1].commit, false);
+ assert_eq!(
+ t.batches[0].posts[1].body.len(),
+ request_bytes_for_payloads(&[100, 100, 100])
+ );
+
+ assert_eq!(t.batches[0].posts[2].batch.as_ref().unwrap(), "1234");
+ assert_eq!(t.batches[0].posts[2].records, 1);
+ assert_eq!(t.batches[0].posts[2].payload_bytes, 100);
+ assert_eq!(t.batches[0].posts[2].commit, true);
+ assert_eq!(
+ t.batches[0].posts[2].body.len(),
+ request_bytes_for_payloads(&[100])
+ );
+ }
+
+ #[test]
+ #[allow(clippy::cognitive_complexity)]
+ fn test_pq_multi_post_multi_batch_records() {
+ let cfg = InfoConfiguration {
+ max_post_records: 3,
+ max_total_records: 5,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![
+ fake_response(status_codes::ACCEPTED, time, Some("1234")),
+ fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
+ fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")),
+ fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")),
+ ],
+ );
+
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST + COMMIT
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.flush(true).unwrap(); // COMMIT
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 4);
+ assert_eq!(t.batches.len(), 2);
+ assert_eq!(t.batches[0].posts.len(), 2);
+ assert_eq!(t.batches[1].posts.len(), 2);
+
+ assert_eq!(t.batches[0].records, 5);
+ assert_eq!(t.batches[1].records, 4);
+
+ assert_eq!(t.batches[0].bytes, 500);
+ assert_eq!(t.batches[1].bytes, 400);
+
+ assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
+ assert_eq!(t.batches[0].posts[0].records, 3);
+ assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
+ assert_eq!(t.batches[0].posts[0].commit, false);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[100, 100, 100])
+ );
+
+ assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
+ assert_eq!(t.batches[0].posts[1].records, 2);
+ assert_eq!(t.batches[0].posts[1].payload_bytes, 200);
+ assert_eq!(t.batches[0].posts[1].commit, true);
+ assert_eq!(
+ t.batches[0].posts[1].body.len(),
+ request_bytes_for_payloads(&[100, 100])
+ );
+
+ assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true");
+ assert_eq!(t.batches[1].posts[0].records, 3);
+ assert_eq!(t.batches[1].posts[0].payload_bytes, 300);
+ assert_eq!(t.batches[1].posts[0].commit, false);
+ assert_eq!(
+ t.batches[1].posts[0].body.len(),
+ request_bytes_for_payloads(&[100, 100, 100])
+ );
+
+ assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd");
+ assert_eq!(t.batches[1].posts[1].records, 1);
+ assert_eq!(t.batches[1].posts[1].payload_bytes, 100);
+ assert_eq!(t.batches[1].posts[1].commit, true);
+ assert_eq!(
+ t.batches[1].posts[1].body.len(),
+ request_bytes_for_payloads(&[100])
+ );
+ }
+
+ #[test]
+ #[allow(clippy::cognitive_complexity)]
+ fn test_pq_multi_post_multi_batch_bytes() {
+ let cfg = InfoConfiguration {
+ max_post_bytes: 300,
+ max_total_bytes: 500,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![
+ fake_response(status_codes::ACCEPTED, time, Some("1234")),
+ fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), // should commit
+ fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")),
+ fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")), // should commit
+ ],
+ );
+
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ assert_eq!(pq.last_modified.0, time);
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST + COMMIT
+ pq.enqueue(&make_record(100)).unwrap();
+ assert_eq!(pq.last_modified.0, time + 100_000);
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ assert_eq!(pq.last_modified.0, time + 100_000);
+ pq.flush(true).unwrap(); // COMMIT
+
+ assert_eq!(pq.last_modified.0, time + 200_000);
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 4);
+ assert_eq!(t.batches.len(), 2);
+ assert_eq!(t.batches[0].posts.len(), 2);
+ assert_eq!(t.batches[1].posts.len(), 2);
+
+ assert_eq!(t.batches[0].records, 5);
+ assert_eq!(t.batches[1].records, 4);
+
+ assert_eq!(t.batches[0].bytes, 500);
+ assert_eq!(t.batches[1].bytes, 400);
+
+ assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
+ assert_eq!(t.batches[0].posts[0].records, 3);
+ assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
+ assert_eq!(t.batches[0].posts[0].commit, false);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[100, 100, 100])
+ );
+
+ assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
+ assert_eq!(t.batches[0].posts[1].records, 2);
+ assert_eq!(t.batches[0].posts[1].payload_bytes, 200);
+ assert_eq!(t.batches[0].posts[1].commit, true);
+ assert_eq!(
+ t.batches[0].posts[1].body.len(),
+ request_bytes_for_payloads(&[100, 100])
+ );
+
+ assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true");
+ assert_eq!(t.batches[1].posts[0].records, 3);
+ assert_eq!(t.batches[1].posts[0].payload_bytes, 300);
+ assert_eq!(t.batches[1].posts[0].commit, false);
+ assert_eq!(
+ t.batches[1].posts[0].body.len(),
+ request_bytes_for_payloads(&[100, 100, 100])
+ );
+
+ assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd");
+ assert_eq!(t.batches[1].posts[1].records, 1);
+ assert_eq!(t.batches[1].posts[1].payload_bytes, 100);
+ assert_eq!(t.batches[1].posts[1].commit, true);
+ assert_eq!(
+ t.batches[1].posts[1].body.len(),
+ request_bytes_for_payloads(&[100])
+ );
+ }
+
+ // TODO: Test
+ //
+ // - error cases!!! We don't test our handling of server errors at all!
+ // - mixed bytes/record limits
+ //
+ // A lot of these have good examples in test_postqueue.js on deskftop sync
+}
diff --git a/third_party/rust/sync15/src/state.rs b/third_party/rust/sync15/src/state.rs
new file mode 100644
index 0000000000..e25f72305a
--- /dev/null
+++ b/third_party/rust/sync15/src/state.rs
@@ -0,0 +1,1130 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::collections::{HashMap, HashSet};
+
+use crate::bso_record::EncryptedBso;
+use crate::client::{SetupStorageClient, Sync15ClientResponse};
+use crate::collection_keys::CollectionKeys;
+use crate::error::{self, ErrorKind, ErrorResponse};
+use crate::key_bundle::KeyBundle;
+use crate::record_types::{MetaGlobalEngine, MetaGlobalRecord};
+use crate::request::{InfoCollections, InfoConfiguration};
+use crate::util::ServerTimestamp;
+use interrupt_support::Interruptee;
+use serde_derive::*;
+use sync_guid::Guid;
+
+use self::SetupState::*;
+
+const STORAGE_VERSION: usize = 5;
+
+/// Maps names to storage versions for engines to include in a fresh
+/// `meta/global` record. We include engines that we don't implement
+/// because they'll be disabled on other clients if we omit them
+/// (bug 1479929).
+const DEFAULT_ENGINES: &[(&str, usize)] = &[
+ ("passwords", 1),
+ ("clients", 1),
+ ("addons", 1),
+ ("addresses", 1),
+ ("bookmarks", 2),
+ ("creditcards", 1),
+ ("forms", 1),
+ ("history", 1),
+ ("prefs", 2),
+ ("tabs", 1),
+];
+
+// Declined engines to include in a fresh `meta/global` record.
+const DEFAULT_DECLINED: &[&str] = &[];
+
+/// State that we require the app to persist to storage for us.
+/// It's a little unfortunate we need this, because it's only tracking
+/// "declined engines", and even then, only needed in practice when there's
+/// no meta/global so we need to create one. It's extra unfortunate because we
+/// want to move away from "globally declined" engines anyway, moving towards
+/// allowing engines to be enabled or disabled per client rather than globally.
+///
+/// Apps are expected to treat this as opaque, so we support serializing it.
+/// Note that this structure is *not* used to *change* the declined engines
+/// list - that will be done in the future, but the API exposed for that
+/// purpose will also take a mutable PersistedGlobalState.
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(tag = "schema_version")]
+pub enum PersistedGlobalState {
+ /// V1 was when we persisted the entire GlobalState, keys and all!
+
+ /// V2 is just tracking the globally declined list.
+ /// None means "I've no idea" and theoretically should only happen on the
+ /// very first sync for an app.
+ V2 { declined: Option<Vec<String>> },
+}
+
+impl Default for PersistedGlobalState {
+ #[inline]
+ fn default() -> PersistedGlobalState {
+ PersistedGlobalState::V2 { declined: None }
+ }
+}
+
+#[derive(Debug, Default, Clone, PartialEq)]
+pub(crate) struct EngineChangesNeeded {
+ pub local_resets: HashSet<String>,
+ pub remote_wipes: HashSet<String>,
+}
+
+#[derive(Debug, Default, Clone, PartialEq)]
+struct RemoteEngineState {
+ info_collections: HashSet<String>,
+ declined: HashSet<String>,
+}
+
+#[derive(Debug, Default, Clone, PartialEq)]
+struct EngineStateInput {
+ local_declined: HashSet<String>,
+ remote: Option<RemoteEngineState>,
+ user_changes: HashMap<String, bool>,
+}
+
+#[derive(Debug, Default, Clone, PartialEq)]
+struct EngineStateOutput {
+ // The new declined.
+ declined: HashSet<String>,
+ // Which engines need resets or wipes.
+ changes_needed: EngineChangesNeeded,
+}
+
+fn compute_engine_states(input: EngineStateInput) -> EngineStateOutput {
+ use crate::util::*;
+ log::debug!("compute_engine_states: input {:?}", input);
+ let (must_enable, must_disable) = partition_by_value(&input.user_changes);
+ let have_remote = input.remote.is_some();
+ let RemoteEngineState {
+ info_collections,
+ declined: remote_declined,
+ } = input.remote.clone().unwrap_or_default();
+
+ let both_declined_and_remote = set_intersection(&info_collections, &remote_declined);
+ if !both_declined_and_remote.is_empty() {
+ // Should we wipe these too?
+ log::warn!(
+ "Remote state contains engines which are in both info/collections and meta/global's declined: {:?}",
+ both_declined_and_remote,
+ );
+ }
+
+ let most_recent_declined_list = if have_remote {
+ &remote_declined
+ } else {
+ &input.local_declined
+ };
+
+ let result_declined = set_difference(
+ &set_union(most_recent_declined_list, &must_disable),
+ &must_enable,
+ );
+
+ let output = EngineStateOutput {
+ changes_needed: EngineChangesNeeded {
+ // Anything now declined which wasn't in our declined list before gets a reset.
+ local_resets: set_difference(&result_declined, &input.local_declined),
+ // Anything remote that we just declined gets a wipe. In the future
+ // we might want to consider wiping things in both remote declined
+ // and info/collections, but we'll let other clients pick up their
+ // own mess for now.
+ remote_wipes: set_intersection(&info_collections, &must_disable),
+ },
+ declined: result_declined,
+ };
+ // No PII here and this helps debug problems.
+ log::debug!("compute_engine_states: output {:?}", output);
+ output
+}
+
+impl PersistedGlobalState {
+ fn set_declined(&mut self, new_declined: Vec<String>) {
+ match self {
+ Self::V2 { ref mut declined } => *declined = Some(new_declined),
+ }
+ }
+ pub(crate) fn get_declined(&self) -> &[String] {
+ match self {
+ Self::V2 { declined: Some(d) } => &d,
+ Self::V2 { declined: None } => &[],
+ }
+ }
+}
+
+/// Holds global Sync state, including server upload limits, the
+/// last-fetched collection modified times, `meta/global` record, and
+/// encrypted copies of the crypto/keys resourse (which we hold as encrypted
+/// both to avoid keeping them in memory longer than necessary, and guard against
+/// the wrong (ie, a different user's) root key being passed in.
+#[derive(Debug, Clone)]
+pub struct GlobalState {
+ pub config: InfoConfiguration,
+ pub collections: InfoCollections,
+ pub global: MetaGlobalRecord,
+ pub global_timestamp: ServerTimestamp,
+ pub keys: EncryptedBso,
+}
+
+/// Creates a fresh `meta/global` record, using the default engine selections,
+/// and declined engines from our PersistedGlobalState.
+fn new_global(pgs: &PersistedGlobalState) -> error::Result<MetaGlobalRecord> {
+ let sync_id = Guid::random();
+ let mut engines: HashMap<String, _> = HashMap::new();
+ for (name, version) in DEFAULT_ENGINES.iter() {
+ let sync_id = Guid::random();
+ engines.insert(
+ (*name).to_string(),
+ MetaGlobalEngine {
+ version: *version,
+ sync_id,
+ },
+ );
+ }
+ // We only need our PersistedGlobalState to fill out a new meta/global - if
+ // we previously saw a meta/global then we would have updated it with what
+ // it was at the time.
+ let declined = match pgs {
+ PersistedGlobalState::V2 { declined: Some(d) } => d.clone(),
+ _ => DEFAULT_DECLINED.iter().map(ToString::to_string).collect(),
+ };
+
+ Ok(MetaGlobalRecord {
+ sync_id,
+ storage_version: STORAGE_VERSION,
+ engines,
+ declined,
+ })
+}
+
+fn fixup_meta_global(global: &mut MetaGlobalRecord) -> bool {
+ let mut changed_any = false;
+ for &(name, version) in DEFAULT_ENGINES.iter() {
+ let had_engine = global.engines.contains_key(name);
+ let should_have_engine = !global.declined.iter().any(|c| c == name);
+ if had_engine != should_have_engine {
+ if should_have_engine {
+ log::debug!("SyncID for engine {:?} was missing", name);
+ global.engines.insert(
+ name.to_string(),
+ MetaGlobalEngine {
+ version,
+ sync_id: Guid::random(),
+ },
+ );
+ } else {
+ log::debug!("SyncID for engine {:?} was present, but shouldn't be", name);
+ global.engines.remove(name);
+ }
+ changed_any = true;
+ }
+ }
+ changed_any
+}
+
+pub struct SetupStateMachine<'a> {
+ client: &'a dyn SetupStorageClient,
+ root_key: &'a KeyBundle,
+ pgs: &'a mut PersistedGlobalState,
+ // `allowed_states` is designed so that we can arrange for the concept of
+ // a "fast" sync - so we decline to advance if we need to setup from scratch.
+ // The idea is that if we need to sync before going to sleep we should do
+ // it as fast as possible. However, in practice this isn't going to do
+ // what we expect - a "fast sync" that finds lots to do is almost certainly
+ // going to take longer than a "full sync" that finds nothing to do.
+ // We should almost certainly remove this and instead allow for a "time
+ // budget", after which we get interrupted. Later...
+ allowed_states: Vec<&'static str>,
+ sequence: Vec<&'static str>,
+ engine_updates: Option<&'a HashMap<String, bool>>,
+ interruptee: &'a dyn Interruptee,
+ pub(crate) changes_needed: Option<EngineChangesNeeded>,
+}
+
+impl<'a> SetupStateMachine<'a> {
+ /// Creates a state machine for a "classic" Sync 1.5 client that supports
+ /// all states, including uploading a fresh `meta/global` and `crypto/keys`
+ /// after a node reassignment.
+ pub fn for_full_sync(
+ client: &'a dyn SetupStorageClient,
+ root_key: &'a KeyBundle,
+ pgs: &'a mut PersistedGlobalState,
+ engine_updates: Option<&'a HashMap<String, bool>>,
+ interruptee: &'a dyn Interruptee,
+ ) -> SetupStateMachine<'a> {
+ SetupStateMachine::with_allowed_states(
+ client,
+ root_key,
+ pgs,
+ interruptee,
+ engine_updates,
+ vec![
+ "Initial",
+ "InitialWithConfig",
+ "InitialWithInfo",
+ "InitialWithMetaGlobal",
+ "Ready",
+ "FreshStartRequired",
+ "WithPreviousState",
+ ],
+ )
+ }
+
+ /// Creates a state machine for a fast sync, which only uses locally
+ /// cached global state, and bails if `meta/global` or `crypto/keys`
+ /// are missing or out-of-date. This is useful in cases where it's
+ /// important to get to ready as quickly as possible, like syncing before
+ /// sleep, or when conserving time or battery life.
+ pub fn for_fast_sync(
+ client: &'a dyn SetupStorageClient,
+ root_key: &'a KeyBundle,
+ pgs: &'a mut PersistedGlobalState,
+ engine_updates: Option<&'a HashMap<String, bool>>,
+ interruptee: &'a dyn Interruptee,
+ ) -> SetupStateMachine<'a> {
+ SetupStateMachine::with_allowed_states(
+ client,
+ root_key,
+ pgs,
+ interruptee,
+ engine_updates,
+ vec!["Ready", "WithPreviousState"],
+ )
+ }
+
+ /// Creates a state machine for a read-only sync, where the client can't
+ /// upload `meta/global` or `crypto/keys`. Useful for clients that only
+ /// sync specific collections, like Lockbox.
+ pub fn for_readonly_sync(
+ client: &'a dyn SetupStorageClient,
+ root_key: &'a KeyBundle,
+ pgs: &'a mut PersistedGlobalState,
+ interruptee: &'a dyn Interruptee,
+ ) -> SetupStateMachine<'a> {
+ SetupStateMachine::with_allowed_states(
+ client,
+ root_key,
+ pgs,
+ interruptee,
+ // No engine updates for a readonly sync
+ None,
+ // We don't allow a FreshStart in a read-only sync.
+ vec![
+ "Initial",
+ "InitialWithConfig",
+ "InitialWithInfo",
+ "InitialWithMetaGlobal",
+ "Ready",
+ "WithPreviousState",
+ ],
+ )
+ }
+
+ fn with_allowed_states(
+ client: &'a dyn SetupStorageClient,
+ root_key: &'a KeyBundle,
+ pgs: &'a mut PersistedGlobalState,
+ interruptee: &'a dyn Interruptee,
+ engine_updates: Option<&'a HashMap<String, bool>>,
+ allowed_states: Vec<&'static str>,
+ ) -> SetupStateMachine<'a> {
+ SetupStateMachine {
+ client,
+ root_key,
+ pgs,
+ sequence: Vec::new(),
+ allowed_states,
+ engine_updates,
+ interruptee,
+ changes_needed: None,
+ }
+ }
+
+ fn advance(&mut self, from: SetupState) -> error::Result<SetupState> {
+ match from {
+ // Fetch `info/configuration` with current server limits, and
+ // `info/collections` with collection last modified times.
+ Initial => {
+ let config = match self.client.fetch_info_configuration()? {
+ Sync15ClientResponse::Success { record, .. } => record,
+ Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => {
+ InfoConfiguration::default()
+ }
+ other => return Err(other.create_storage_error().into()),
+ };
+ Ok(InitialWithConfig { config })
+ }
+
+ // XXX - we could consider combining these Initial* states, because we don't
+ // attempt to support filling in "missing" global state - *any* 404 in them
+ // means `FreshStart`.
+ // IOW, in all cases, they either `Err()`, move to `FreshStartRequired`, or
+ // advance to a specific next state.
+ InitialWithConfig { config } => {
+ match self.client.fetch_info_collections()? {
+ Sync15ClientResponse::Success {
+ record: collections,
+ ..
+ } => Ok(InitialWithInfo {
+ config,
+ collections,
+ }),
+ // If the server doesn't have a `crypto/keys`, start over
+ // and reupload our `meta/global` and `crypto/keys`.
+ Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => {
+ Ok(FreshStartRequired { config })
+ }
+ other => Err(other.create_storage_error().into()),
+ }
+ }
+
+ InitialWithInfo {
+ config,
+ collections,
+ } => {
+ match self.client.fetch_meta_global()? {
+ Sync15ClientResponse::Success {
+ record: mut global,
+ last_modified: mut global_timestamp,
+ ..
+ } => {
+ // If the server has a newer storage version, we can't
+ // sync until our client is updated.
+ if global.storage_version > STORAGE_VERSION {
+ return Err(ErrorKind::ClientUpgradeRequired.into());
+ }
+
+ // If the server has an older storage version, wipe and
+ // reupload.
+ if global.storage_version < STORAGE_VERSION {
+ Ok(FreshStartRequired { config })
+ } else {
+ log::info!("Have info/collections and meta/global. Computing new engine states");
+ let initial_global_declined: HashSet<String> =
+ global.declined.iter().cloned().collect();
+ let result = compute_engine_states(EngineStateInput {
+ local_declined: self.pgs.get_declined().iter().cloned().collect(),
+ user_changes: self.engine_updates.cloned().unwrap_or_default(),
+ remote: Some(RemoteEngineState {
+ declined: initial_global_declined.clone(),
+ info_collections: collections.keys().cloned().collect(),
+ }),
+ });
+ // Persist the new declined.
+ self.pgs
+ .set_declined(result.declined.iter().cloned().collect());
+ // If the declined engines differ from remote, fix that.
+ let fixed_declined = if result.declined != initial_global_declined {
+ global.declined = result.declined.iter().cloned().collect();
+ log::info!(
+ "Uploading new declined {:?} to meta/global with timestamp {:?}",
+ global.declined,
+ global_timestamp,
+ );
+ true
+ } else {
+ false
+ };
+ // If there are missing syncIds, we need to fix those as well
+ let fixed_ids = if fixup_meta_global(&mut global) {
+ log::info!(
+ "Uploading corrected meta/global with timestamp {:?}",
+ global_timestamp,
+ );
+ true
+ } else {
+ false
+ };
+
+ if fixed_declined || fixed_ids {
+ global_timestamp =
+ self.client.put_meta_global(global_timestamp, &global)?;
+ log::debug!("new global_timestamp: {:?}", global_timestamp);
+ }
+ // Update the set of changes needed.
+ if self.changes_needed.is_some() {
+ // Should never happen (we prevent state machine
+ // loops elsewhere) but if it did, the info is stale
+ // anyway.
+ log::warn!("Already have a set of changes needed, Overwriting...");
+ }
+ self.changes_needed = Some(result.changes_needed);
+ Ok(InitialWithMetaGlobal {
+ config,
+ collections,
+ global,
+ global_timestamp,
+ })
+ }
+ }
+ Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => {
+ Ok(FreshStartRequired { config })
+ }
+ other => Err(other.create_storage_error().into()),
+ }
+ }
+
+ InitialWithMetaGlobal {
+ config,
+ collections,
+ global,
+ global_timestamp,
+ } => {
+ // Now try and get keys etc - if we fresh-start we'll re-use declined.
+ match self.client.fetch_crypto_keys()? {
+ Sync15ClientResponse::Success {
+ record,
+ last_modified,
+ ..
+ } => {
+ // Note that collection/keys is itself a bso, so the
+ // json body also carries the timestamp. If they aren't
+ // identical something has screwed up and we should die.
+ assert_eq!(last_modified, record.modified);
+ let state = GlobalState {
+ config,
+ collections,
+ global,
+ global_timestamp,
+ keys: record,
+ };
+ Ok(Ready { state })
+ }
+ // If the server doesn't have a `crypto/keys`, start over
+ // and reupload our `meta/global` and `crypto/keys`.
+ Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => {
+ Ok(FreshStartRequired { config })
+ }
+ other => Err(other.create_storage_error().into()),
+ }
+ }
+
+ // We've got old state that's likely to be OK.
+ // We keep things simple here - if there's evidence of a new/missing
+ // meta/global or new/missing keys we just restart from scratch.
+ WithPreviousState { old_state } => match self.client.fetch_info_collections()? {
+ Sync15ClientResponse::Success {
+ record: collections,
+ ..
+ } => Ok(
+ if self.engine_updates.is_none()
+ && is_same_timestamp(old_state.global_timestamp, &collections, "meta")
+ && is_same_timestamp(old_state.keys.modified, &collections, "crypto")
+ {
+ Ready {
+ state: GlobalState {
+ collections,
+ ..old_state
+ },
+ }
+ } else {
+ InitialWithConfig {
+ config: old_state.config,
+ }
+ },
+ ),
+ _ => Ok(InitialWithConfig {
+ config: old_state.config,
+ }),
+ },
+
+ Ready { state } => Ok(Ready { state }),
+
+ FreshStartRequired { config } => {
+ // Wipe the server.
+ log::info!("Fresh start: wiping remote");
+ self.client.wipe_all_remote()?;
+
+ // Upload a fresh `meta/global`...
+ log::info!("Uploading meta/global");
+ let computed = compute_engine_states(EngineStateInput {
+ local_declined: self.pgs.get_declined().iter().cloned().collect(),
+ user_changes: self.engine_updates.cloned().unwrap_or_default(),
+ remote: None,
+ });
+ self.pgs
+ .set_declined(computed.declined.iter().cloned().collect());
+
+ self.changes_needed = Some(computed.changes_needed);
+
+ let new_global = new_global(self.pgs)?;
+
+ self.client
+ .put_meta_global(ServerTimestamp::default(), &new_global)?;
+
+ // ...And a fresh `crypto/keys`.
+ let new_keys = CollectionKeys::new_random()?.to_encrypted_bso(&self.root_key)?;
+ self.client
+ .put_crypto_keys(ServerTimestamp::default(), &new_keys)?;
+
+ // TODO(lina): Can we pass along server timestamps from the PUTs
+ // above, and avoid re-fetching the `m/g` and `c/k` we just
+ // uploaded?
+ // OTOH(mark): restarting the state machine keeps life simple and rare.
+ Ok(InitialWithConfig { config })
+ }
+ }
+ }
+
+ /// Runs through the state machine to the ready state.
+ pub fn run_to_ready(&mut self, state: Option<GlobalState>) -> error::Result<GlobalState> {
+ let mut s = match state {
+ Some(old_state) => WithPreviousState { old_state },
+ None => Initial,
+ };
+ loop {
+ self.interruptee.err_if_interrupted()?;
+ let label = &s.label();
+ log::trace!("global state: {:?}", label);
+ match s {
+ Ready { state } => {
+ self.sequence.push(label);
+ return Ok(state);
+ }
+ // If we already started over once before, we're likely in a
+ // cycle, and should try again later. Intermediate states
+ // aren't a problem, just the initial ones.
+ FreshStartRequired { .. } | WithPreviousState { .. } | Initial => {
+ if self.sequence.contains(&label) {
+ // Is this really the correct error?
+ return Err(ErrorKind::SetupRace.into());
+ }
+ }
+ _ => {
+ if !self.allowed_states.contains(&label) {
+ return Err(ErrorKind::SetupRequired.into());
+ }
+ }
+ };
+ self.sequence.push(label);
+ s = self.advance(s)?;
+ }
+ }
+}
+
+/// States in the remote setup process.
+/// TODO(lina): Add link once #56 is merged.
+#[derive(Debug)]
+#[allow(clippy::large_enum_variant)]
+enum SetupState {
+ // These "Initial" states are only ever used when starting from scratch.
+ Initial,
+ InitialWithConfig {
+ config: InfoConfiguration,
+ },
+ InitialWithInfo {
+ config: InfoConfiguration,
+ collections: InfoCollections,
+ },
+ InitialWithMetaGlobal {
+ config: InfoConfiguration,
+ collections: InfoCollections,
+ global: MetaGlobalRecord,
+ global_timestamp: ServerTimestamp,
+ },
+ WithPreviousState {
+ old_state: GlobalState,
+ },
+ Ready {
+ state: GlobalState,
+ },
+ FreshStartRequired {
+ config: InfoConfiguration,
+ },
+}
+
+impl SetupState {
+ fn label(&self) -> &'static str {
+ match self {
+ Initial { .. } => "Initial",
+ InitialWithConfig { .. } => "InitialWithConfig",
+ InitialWithInfo { .. } => "InitialWithInfo",
+ InitialWithMetaGlobal { .. } => "InitialWithMetaGlobal",
+ Ready { .. } => "Ready",
+ WithPreviousState { .. } => "WithPreviousState",
+ FreshStartRequired { .. } => "FreshStartRequired",
+ }
+ }
+}
+
+/// Whether we should skip fetching an item. Used when we already have timestamps
+/// and want to check if we should reuse our existing state. The state's fairly
+/// cheap to recreate and very bad to use if it is wrong, so we insist on the
+/// *exact* timestamp matching and not a simple "later than" check.
+fn is_same_timestamp(local: ServerTimestamp, collections: &InfoCollections, key: &str) -> bool {
+ collections.get(key).map_or(false, |ts| local == *ts)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use crate::bso_record::{BsoRecord, EncryptedBso, EncryptedPayload, Payload};
+ use crate::record_types::CryptoKeysRecord;
+ use interrupt_support::NeverInterrupts;
+
+ struct InMemoryClient {
+ info_configuration: error::Result<Sync15ClientResponse<InfoConfiguration>>,
+ info_collections: error::Result<Sync15ClientResponse<InfoCollections>>,
+ meta_global: error::Result<Sync15ClientResponse<MetaGlobalRecord>>,
+ crypto_keys: error::Result<Sync15ClientResponse<BsoRecord<EncryptedPayload>>>,
+ }
+
+ impl SetupStorageClient for InMemoryClient {
+ fn fetch_info_configuration(
+ &self,
+ ) -> error::Result<Sync15ClientResponse<InfoConfiguration>> {
+ match &self.info_configuration {
+ Ok(client_response) => Ok(client_response.clone()),
+ Err(_) => Ok(Sync15ClientResponse::Error(ErrorResponse::ServerError {
+ status: 500,
+ route: "test/path".into(),
+ })),
+ }
+ }
+
+ fn fetch_info_collections(&self) -> error::Result<Sync15ClientResponse<InfoCollections>> {
+ match &self.info_collections {
+ Ok(collections) => Ok(collections.clone()),
+ Err(_) => Ok(Sync15ClientResponse::Error(ErrorResponse::ServerError {
+ status: 500,
+ route: "test/path".into(),
+ })),
+ }
+ }
+
+ fn fetch_meta_global(&self) -> error::Result<Sync15ClientResponse<MetaGlobalRecord>> {
+ match &self.meta_global {
+ Ok(global) => Ok(global.clone()),
+ // TODO(lina): Special handling for 404s, we want to ensure we
+ // handle missing keys and other server errors correctly.
+ Err(_) => Ok(Sync15ClientResponse::Error(ErrorResponse::ServerError {
+ status: 500,
+ route: "test/path".into(),
+ })),
+ }
+ }
+
+ fn put_meta_global(
+ &self,
+ xius: ServerTimestamp,
+ global: &MetaGlobalRecord,
+ ) -> error::Result<ServerTimestamp> {
+ // Ensure that the meta/global record we uploaded is "fixed up"
+ assert!(DEFAULT_ENGINES
+ .iter()
+ .filter(|e| e.0 != "logins")
+ .all(|&(k, _v)| global.engines.contains_key(k)));
+ assert!(!global.engines.contains_key("logins"));
+ assert_eq!(global.declined, vec!["logins".to_string()]);
+ // return a different timestamp.
+ Ok(ServerTimestamp(xius.0 + 1))
+ }
+
+ fn fetch_crypto_keys(&self) -> error::Result<Sync15ClientResponse<EncryptedBso>> {
+ match &self.crypto_keys {
+ Ok(keys) => Ok(keys.clone()),
+ // TODO(lina): Same as above, for 404s.
+ Err(_) => Ok(Sync15ClientResponse::Error(ErrorResponse::ServerError {
+ status: 500,
+ route: "test/path".into(),
+ })),
+ }
+ }
+
+ fn put_crypto_keys(
+ &self,
+ xius: ServerTimestamp,
+ _keys: &EncryptedBso,
+ ) -> error::Result<()> {
+ assert_eq!(xius, ServerTimestamp(888_800));
+ Err(ErrorKind::StorageHttpError(ErrorResponse::ServerError {
+ status: 500,
+ route: "crypto/keys".to_string(),
+ })
+ .into())
+ }
+
+ fn wipe_all_remote(&self) -> error::Result<()> {
+ Ok(())
+ }
+ }
+
+ fn mocked_success_ts<T>(t: T, ts: i64) -> error::Result<Sync15ClientResponse<T>> {
+ Ok(Sync15ClientResponse::Success {
+ status: 200,
+ record: t,
+ last_modified: ServerTimestamp(ts),
+ route: "test/path".into(),
+ })
+ }
+
+ fn mocked_success<T>(t: T) -> error::Result<Sync15ClientResponse<T>> {
+ mocked_success_ts(t, 0)
+ }
+
+ // for tests, we want a BSO with a specific timestamp, which we never
+ // need in non-test-code as the timestamp comes from the server.
+ impl CollectionKeys {
+ pub fn to_encrypted_bso_with_timestamp(
+ &self,
+ root_key: &KeyBundle,
+ modified: ServerTimestamp,
+ ) -> error::Result<EncryptedBso> {
+ let record = CryptoKeysRecord {
+ id: "keys".into(),
+ collection: "crypto".into(),
+ default: self.default.to_b64_array(),
+ collections: self
+ .collections
+ .iter()
+ .map(|kv| (kv.0.clone(), kv.1.to_b64_array()))
+ .collect(),
+ };
+ let mut bso =
+ crate::CleartextBso::from_payload(Payload::from_record(record)?, "crypto");
+ bso.modified = modified;
+ Ok(bso.encrypt(root_key)?)
+ }
+ }
+
+ #[test]
+ fn test_state_machine_ready_from_empty() {
+ let root_key = KeyBundle::new_random().unwrap();
+ let keys = CollectionKeys {
+ timestamp: ServerTimestamp(123_400),
+ default: KeyBundle::new_random().unwrap(),
+ collections: HashMap::new(),
+ };
+ let mg = MetaGlobalRecord {
+ sync_id: "syncIDAAAAAA".into(),
+ storage_version: 5usize,
+ engines: vec![(
+ "bookmarks",
+ MetaGlobalEngine {
+ version: 1usize,
+ sync_id: "syncIDBBBBBB".into(),
+ },
+ )]
+ .into_iter()
+ .map(|(key, value)| (key.to_owned(), value))
+ .collect(),
+ // We ensure that the record we upload doesn't have a logins record.
+ declined: vec!["logins".to_string()],
+ };
+ let client = InMemoryClient {
+ info_configuration: mocked_success(InfoConfiguration::default()),
+ info_collections: mocked_success(InfoCollections::new(
+ vec![("meta", 123_456), ("crypto", 145_000)]
+ .into_iter()
+ .map(|(key, value)| (key.to_owned(), ServerTimestamp(value)))
+ .collect(),
+ )),
+ meta_global: mocked_success_ts(mg, 999_000),
+ crypto_keys: mocked_success_ts(
+ keys.to_encrypted_bso_with_timestamp(&root_key, ServerTimestamp(888_000))
+ .expect("should always work in this test"),
+ 888_000,
+ ),
+ };
+ let mut pgs = PersistedGlobalState::V2 { declined: None };
+
+ let mut state_machine =
+ SetupStateMachine::for_full_sync(&client, &root_key, &mut pgs, None, &NeverInterrupts);
+ assert!(
+ state_machine.run_to_ready(None).is_ok(),
+ "Should drive state machine to ready"
+ );
+ assert_eq!(
+ state_machine.sequence,
+ vec![
+ "Initial",
+ "InitialWithConfig",
+ "InitialWithInfo",
+ "InitialWithMetaGlobal",
+ "Ready",
+ ],
+ "Should cycle through all states"
+ );
+ }
+
+ #[test]
+ fn test_from_previous_state_declined() {
+ let _ = env_logger::try_init();
+ // The state-machine sequence where we didn't use the previous state
+ // (ie, where the state machine restarted)
+ let sm_seq_restarted = vec![
+ "WithPreviousState",
+ "InitialWithConfig",
+ "InitialWithInfo",
+ "InitialWithMetaGlobal",
+ "Ready",
+ ];
+ // The state-machine sequence where we used the previous state.
+ let sm_seq_used_previous = vec!["WithPreviousState", "Ready"];
+
+ // do the actual test.
+ fn do_test(
+ client: &dyn SetupStorageClient,
+ root_key: &KeyBundle,
+ mut pgs: &mut PersistedGlobalState,
+ engine_updates: Option<&HashMap<String, bool>>,
+ old_state: GlobalState,
+ expected_states: &[&str],
+ ) {
+ let mut state_machine = SetupStateMachine::for_full_sync(
+ client,
+ root_key,
+ &mut pgs,
+ engine_updates,
+ &NeverInterrupts,
+ );
+ assert!(
+ state_machine.run_to_ready(Some(old_state)).is_ok(),
+ "Should drive state machine to ready"
+ );
+ assert_eq!(state_machine.sequence, expected_states);
+ }
+
+ // and all the complicated setup...
+ let ts_metaglobal = 123_456;
+ let ts_keys = 145_000;
+ let root_key = KeyBundle::new_random().unwrap();
+ let keys = CollectionKeys {
+ timestamp: ServerTimestamp(ts_keys + 1),
+ default: KeyBundle::new_random().unwrap(),
+ collections: HashMap::new(),
+ };
+ let mg = MetaGlobalRecord {
+ sync_id: "syncIDAAAAAA".into(),
+ storage_version: 5usize,
+ engines: vec![(
+ "bookmarks",
+ MetaGlobalEngine {
+ version: 1usize,
+ sync_id: "syncIDBBBBBB".into(),
+ },
+ )]
+ .into_iter()
+ .map(|(key, value)| (key.to_owned(), value))
+ .collect(),
+ // We ensure that the record we upload doesn't have a logins record.
+ declined: vec!["logins".to_string()],
+ };
+ let collections = InfoCollections::new(
+ vec![("meta", ts_metaglobal), ("crypto", ts_keys)]
+ .into_iter()
+ .map(|(key, value)| (key.to_owned(), ServerTimestamp(value)))
+ .collect(),
+ );
+ let client = InMemoryClient {
+ info_configuration: mocked_success(InfoConfiguration::default()),
+ info_collections: mocked_success(collections.clone()),
+ meta_global: mocked_success_ts(mg.clone(), ts_metaglobal),
+ crypto_keys: mocked_success_ts(
+ keys.to_encrypted_bso_with_timestamp(&root_key, ServerTimestamp(ts_keys))
+ .expect("should always work in this test"),
+ ts_keys,
+ ),
+ };
+
+ // First a test where the "previous" global state is OK to reuse.
+ {
+ let mut pgs = PersistedGlobalState::V2 { declined: None };
+ // A "previous" global state.
+ let old_state = GlobalState {
+ config: InfoConfiguration::default(),
+ collections: collections.clone(),
+ global: mg.clone(),
+ global_timestamp: ServerTimestamp(ts_metaglobal),
+ keys: keys
+ .to_encrypted_bso_with_timestamp(&root_key, ServerTimestamp(ts_keys))
+ .expect("should always work in this test"),
+ };
+ do_test(
+ &client,
+ &root_key,
+ &mut pgs,
+ None,
+ old_state,
+ &sm_seq_used_previous,
+ );
+ }
+
+ // Now where the meta/global record on the server is later.
+ {
+ let mut pgs = PersistedGlobalState::V2 { declined: None };
+ // A "previous" global state.
+ let old_state = GlobalState {
+ config: InfoConfiguration::default(),
+ collections: collections.clone(),
+ global: mg.clone(),
+ global_timestamp: ServerTimestamp(999_999),
+ keys: keys
+ .to_encrypted_bso_with_timestamp(&root_key, ServerTimestamp(ts_keys))
+ .expect("should always work in this test"),
+ };
+ do_test(
+ &client,
+ &root_key,
+ &mut pgs,
+ None,
+ old_state,
+ &sm_seq_restarted,
+ );
+ }
+
+ // Where keys on the server is later.
+ {
+ let mut pgs = PersistedGlobalState::V2 { declined: None };
+ // A "previous" global state.
+ let old_state = GlobalState {
+ config: InfoConfiguration::default(),
+ collections: collections.clone(),
+ global: mg.clone(),
+ global_timestamp: ServerTimestamp(ts_metaglobal),
+ keys: keys
+ .to_encrypted_bso_with_timestamp(&root_key, ServerTimestamp(999_999))
+ .expect("should always work in this test"),
+ };
+ do_test(
+ &client,
+ &root_key,
+ &mut pgs,
+ None,
+ old_state,
+ &sm_seq_restarted,
+ );
+ }
+
+ // Where there are engine-state changes.
+ {
+ let mut pgs = PersistedGlobalState::V2 { declined: None };
+ // A "previous" global state.
+ let old_state = GlobalState {
+ config: InfoConfiguration::default(),
+ collections,
+ global: mg,
+ global_timestamp: ServerTimestamp(ts_metaglobal),
+ keys: keys
+ .to_encrypted_bso_with_timestamp(&root_key, ServerTimestamp(ts_keys))
+ .expect("should always work in this test"),
+ };
+ let mut engine_updates = HashMap::<String, bool>::new();
+ engine_updates.insert("logins".to_string(), false);
+ do_test(
+ &client,
+ &root_key,
+ &mut pgs,
+ Some(&engine_updates),
+ old_state,
+ &sm_seq_restarted,
+ );
+ let declined = match pgs {
+ PersistedGlobalState::V2 { declined: d } => d,
+ };
+ // and check we now consider logins as declined.
+ assert_eq!(declined, Some(vec!["logins".to_string()]));
+ }
+ }
+
+ fn string_set(s: &[&str]) -> HashSet<String> {
+ s.iter().map(ToString::to_string).collect()
+ }
+ fn string_map<T: Clone>(s: &[(&str, T)]) -> HashMap<String, T> {
+ s.iter().map(|v| (v.0.to_string(), v.1.clone())).collect()
+ }
+ #[test]
+ fn test_engine_states() {
+ assert_eq!(
+ compute_engine_states(EngineStateInput {
+ local_declined: string_set(&["foo", "bar"]),
+ remote: None,
+ user_changes: Default::default(),
+ }),
+ EngineStateOutput {
+ declined: string_set(&["foo", "bar"]),
+ // No wipes, no resets
+ changes_needed: Default::default(),
+ }
+ );
+ assert_eq!(
+ compute_engine_states(EngineStateInput {
+ local_declined: string_set(&["foo", "bar"]),
+ remote: Some(RemoteEngineState {
+ declined: string_set(&["foo"]),
+ info_collections: string_set(&["bar"])
+ }),
+ user_changes: Default::default(),
+ }),
+ EngineStateOutput {
+ // Now we have `foo`.
+ declined: string_set(&["foo"]),
+ // No wipes, no resets, should just be a local update.
+ changes_needed: Default::default(),
+ }
+ );
+ assert_eq!(
+ compute_engine_states(EngineStateInput {
+ local_declined: string_set(&["foo", "bar"]),
+ remote: Some(RemoteEngineState {
+ declined: string_set(&["foo", "bar", "quux"]),
+ info_collections: string_set(&[])
+ }),
+ user_changes: Default::default(),
+ }),
+ EngineStateOutput {
+ // Now we have `foo`.
+ declined: string_set(&["foo", "bar", "quux"]),
+ changes_needed: EngineChangesNeeded {
+ // Should reset `quux`.
+ local_resets: string_set(&["quux"]),
+ // No wipes, though.
+ remote_wipes: string_set(&[]),
+ }
+ }
+ );
+ assert_eq!(
+ compute_engine_states(EngineStateInput {
+ local_declined: string_set(&["bar", "baz"]),
+ remote: Some(RemoteEngineState {
+ declined: string_set(&["bar", "baz",]),
+ info_collections: string_set(&["quux"])
+ }),
+ // Change a declined engine to undeclined.
+ user_changes: string_map(&[("bar", true)]),
+ }),
+ EngineStateOutput {
+ declined: string_set(&["baz"]),
+ // No wipes, just undecline it.
+ changes_needed: Default::default()
+ }
+ );
+ assert_eq!(
+ compute_engine_states(EngineStateInput {
+ local_declined: string_set(&["bar", "baz"]),
+ remote: Some(RemoteEngineState {
+ declined: string_set(&["bar", "baz"]),
+ info_collections: string_set(&["foo"])
+ }),
+ // Change an engine which exists remotely to declined.
+ user_changes: string_map(&[("foo", false)]),
+ }),
+ EngineStateOutput {
+ declined: string_set(&["baz", "bar", "foo"]),
+ // No wipes, just undecline it.
+ changes_needed: EngineChangesNeeded {
+ // Should reset our local foo
+ local_resets: string_set(&["foo"]),
+ // And wipe the server.
+ remote_wipes: string_set(&["foo"]),
+ }
+ }
+ );
+ }
+}
diff --git a/third_party/rust/sync15/src/status.rs b/third_party/rust/sync15/src/status.rs
new file mode 100644
index 0000000000..b88c710f02
--- /dev/null
+++ b/third_party/rust/sync15/src/status.rs
@@ -0,0 +1,109 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::error::{Error, ErrorKind, ErrorResponse};
+use crate::telemetry::SyncTelemetryPing;
+use std::collections::HashMap;
+use std::time::{Duration, SystemTime};
+
+/// The general status of sync - should probably be moved to the "sync manager"
+/// once we have one!
+#[derive(Debug, Clone, PartialEq)]
+pub enum ServiceStatus {
+ /// Everything is fine.
+ Ok,
+ /// Some general network issue.
+ NetworkError,
+ /// Some apparent issue with the servers.
+ ServiceError,
+ /// Some external FxA action needs to be taken.
+ AuthenticationError,
+ /// We declined to do anything for backoff or rate-limiting reasons.
+ BackedOff,
+ /// We were interrupted.
+ Interrupted,
+ /// Something else - you need to check the logs for more details. May
+ /// or may not be transient, we really don't know.
+ OtherError,
+}
+
+impl ServiceStatus {
+ // This is a bit naive and probably will not survive in this form in the
+ // SyncManager - eg, we'll want to handle backoff etc.
+ pub fn from_err(err: &Error) -> ServiceStatus {
+ match err.kind() {
+ // HTTP based errors.
+ ErrorKind::TokenserverHttpError(status) => {
+ // bit of a shame the tokenserver is different to storage...
+ if *status == 401 {
+ ServiceStatus::AuthenticationError
+ } else {
+ ServiceStatus::ServiceError
+ }
+ }
+ // BackoffError is also from the tokenserver.
+ ErrorKind::BackoffError(_) => ServiceStatus::ServiceError,
+ ErrorKind::StorageHttpError(ref e) => match e {
+ ErrorResponse::Unauthorized { .. } => ServiceStatus::AuthenticationError,
+ _ => ServiceStatus::ServiceError,
+ },
+
+ // Network errors.
+ ErrorKind::RequestError(_)
+ | ErrorKind::UnexpectedStatus(_)
+ | ErrorKind::HawkError(_) => ServiceStatus::NetworkError,
+
+ ErrorKind::Interrupted(_) => ServiceStatus::Interrupted,
+ _ => ServiceStatus::OtherError,
+ }
+ }
+}
+
+/// The result of a sync request. This too is from the "sync manager", but only
+/// has a fraction of the things it will have when we actually build that.
+#[derive(Debug)]
+pub struct SyncResult {
+ /// The general health.
+ pub service_status: ServiceStatus,
+
+ /// The set of declined engines, if we know them.
+ pub declined: Option<Vec<String>>,
+
+ /// The result of the sync.
+ pub result: Result<(), Error>,
+
+ /// The result for each engine.
+ /// Note that we expect the `String` to be replaced with an enum later.
+ pub engine_results: HashMap<String, Result<(), Error>>,
+
+ pub telemetry: SyncTelemetryPing,
+
+ pub next_sync_after: Option<std::time::SystemTime>,
+}
+
+// If `r` has a BackoffError, then returns the later backoff value.
+fn advance_backoff(cur_best: SystemTime, r: &Result<(), Error>) -> SystemTime {
+ if let Err(e) = r {
+ if let Some(time) = e.get_backoff() {
+ return std::cmp::max(time, cur_best);
+ }
+ }
+ cur_best
+}
+
+impl SyncResult {
+ pub(crate) fn set_sync_after(&mut self, backoff_duration: Duration) {
+ let now = SystemTime::now();
+ let toplevel = advance_backoff(now + backoff_duration, &self.result);
+ let sync_after = self
+ .engine_results
+ .values()
+ .fold(toplevel, |b, r| advance_backoff(b, r));
+ if sync_after <= now {
+ self.next_sync_after = None;
+ } else {
+ self.next_sync_after = Some(sync_after);
+ }
+ }
+}
diff --git a/third_party/rust/sync15/src/sync.rs b/third_party/rust/sync15/src/sync.rs
new file mode 100644
index 0000000000..a4030b5440
--- /dev/null
+++ b/third_party/rust/sync15/src/sync.rs
@@ -0,0 +1,127 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::changeset::CollectionUpdate;
+use crate::client::Sync15StorageClient;
+use crate::clients;
+use crate::coll_state::LocalCollStateMachine;
+use crate::error::Error;
+use crate::key_bundle::KeyBundle;
+use crate::state::GlobalState;
+use crate::telemetry;
+use interrupt_support::Interruptee;
+
+pub use sync15_traits::{IncomingChangeset, Store};
+
+pub fn synchronize(
+ client: &Sync15StorageClient,
+ global_state: &GlobalState,
+ root_sync_key: &KeyBundle,
+ store: &dyn Store,
+ fully_atomic: bool,
+ telem_engine: &mut telemetry::Engine,
+ interruptee: &dyn Interruptee,
+) -> Result<(), crate::Error> {
+ synchronize_with_clients_engine(
+ client,
+ global_state,
+ root_sync_key,
+ None,
+ store,
+ fully_atomic,
+ telem_engine,
+ interruptee,
+ )
+}
+
+#[allow(clippy::too_many_arguments)]
+pub fn synchronize_with_clients_engine(
+ client: &Sync15StorageClient,
+ global_state: &GlobalState,
+ root_sync_key: &KeyBundle,
+ clients: Option<&clients::Engine<'_>>,
+ store: &dyn Store,
+ fully_atomic: bool,
+ telem_engine: &mut telemetry::Engine,
+ interruptee: &dyn Interruptee,
+) -> Result<(), Error> {
+ let collection = store.collection_name();
+ log::info!("Syncing collection {}", collection);
+
+ // our global state machine is ready - get the collection machine going.
+ let mut coll_state = match LocalCollStateMachine::get_state(store, global_state, root_sync_key)?
+ {
+ Some(coll_state) => coll_state,
+ None => {
+ // XXX - this is either "error" or "declined".
+ log::warn!(
+ "can't setup for the {} collection - hopefully it works later",
+ collection
+ );
+ return Ok(());
+ }
+ };
+
+ if let Some(clients) = clients {
+ store.prepare_for_sync(&|| clients.get_client_data())?;
+ }
+
+ let collection_requests = store.get_collection_requests(coll_state.last_modified)?;
+ let incoming = if collection_requests.is_empty() {
+ log::info!("skipping incoming for {} - not needed.", collection);
+ vec![IncomingChangeset::new(collection, coll_state.last_modified)]
+ } else {
+ assert_eq!(collection_requests.last().unwrap().collection, collection);
+
+ let count = collection_requests.len();
+ collection_requests
+ .into_iter()
+ .enumerate()
+ .map(|(idx, collection_request)| {
+ interruptee.err_if_interrupted()?;
+ let incoming_changes =
+ crate::changeset::fetch_incoming(client, &mut coll_state, &collection_request)?;
+
+ log::info!(
+ "Downloaded {} remote changes (request {} of {})",
+ incoming_changes.changes.len(),
+ idx,
+ count,
+ );
+ Ok(incoming_changes)
+ })
+ .collect::<Result<Vec<_>, Error>>()?
+ };
+
+ let new_timestamp = incoming.last().expect("must have >= 1").timestamp;
+ let mut outgoing = store.apply_incoming(incoming, telem_engine)?;
+
+ interruptee.err_if_interrupted()?;
+ // Bump the timestamps now just incase the upload fails.
+ // xxx - duplication below smells wrong
+ outgoing.timestamp = new_timestamp;
+ coll_state.last_modified = new_timestamp;
+
+ log::info!("Uploading {} outgoing changes", outgoing.changes.len());
+ let upload_info =
+ CollectionUpdate::new_from_changeset(client, &coll_state, outgoing, fully_atomic)?
+ .upload()?;
+
+ log::info!(
+ "Upload success ({} records success, {} records failed)",
+ upload_info.successful_ids.len(),
+ upload_info.failed_ids.len()
+ );
+ // ideally we'd report this per-batch, but for now, let's just report it
+ // as a total.
+ let mut telem_outgoing = telemetry::EngineOutgoing::new();
+ telem_outgoing.sent(upload_info.successful_ids.len() + upload_info.failed_ids.len());
+ telem_outgoing.failed(upload_info.failed_ids.len());
+ telem_engine.outgoing(telem_outgoing);
+
+ store.sync_finished(upload_info.modified_timestamp, upload_info.successful_ids)?;
+
+ log::info!("Sync finished!");
+ Ok(())
+}
diff --git a/third_party/rust/sync15/src/sync_multiple.rs b/third_party/rust/sync15/src/sync_multiple.rs
new file mode 100644
index 0000000000..84a75a28e1
--- /dev/null
+++ b/third_party/rust/sync15/src/sync_multiple.rs
@@ -0,0 +1,493 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// This helps you perform a sync of multiple stores and helps you manage
+// global and local state between syncs.
+
+use crate::client::{BackoffListener, Sync15StorageClient, Sync15StorageClientInit};
+use crate::clients::{self, CommandProcessor, CLIENTS_TTL_REFRESH};
+use crate::coll_state::StoreSyncAssociation;
+use crate::error::Error;
+use crate::key_bundle::KeyBundle;
+use crate::state::{EngineChangesNeeded, GlobalState, PersistedGlobalState, SetupStateMachine};
+use crate::status::{ServiceStatus, SyncResult};
+use crate::sync::{self, Store};
+use crate::telemetry;
+use interrupt_support::Interruptee;
+use std::collections::HashMap;
+use std::mem;
+use std::result;
+use std::time::{Duration, SystemTime};
+
+/// Info about the client to use. We reuse the client unless
+/// we discover the client_init has changed, in which case we re-create one.
+#[derive(Debug)]
+struct ClientInfo {
+ // the client_init used to create `client`.
+ client_init: Sync15StorageClientInit,
+ // the client (our tokenserver state machine state, and our http library's state)
+ client: Sync15StorageClient,
+}
+
+impl ClientInfo {
+ fn new(ci: &Sync15StorageClientInit) -> Result<Self, Error> {
+ Ok(Self {
+ client_init: ci.clone(),
+ client: Sync15StorageClient::new(ci.clone())?,
+ })
+ }
+}
+
+/// Info we want callers to store *in memory* for us so that subsequent
+/// syncs are faster. This should never be persisted to storage as it holds
+/// sensitive information, such as the sync decryption keys.
+#[derive(Debug, Default)]
+pub struct MemoryCachedState {
+ last_client_info: Option<ClientInfo>,
+ last_global_state: Option<GlobalState>,
+ // These are just stored in memory, as persisting an invalid value far in the
+ // future has the potential to break sync for good.
+ next_sync_after: Option<SystemTime>,
+ next_client_refresh_after: Option<SystemTime>,
+}
+
+impl MemoryCachedState {
+ // Called we notice the cached state is stale.
+ pub fn clear_sensitive_info(&mut self) {
+ self.last_client_info = None;
+ self.last_global_state = None;
+ // Leave the backoff time, as there's no reason to think it's not still
+ // true.
+ }
+ pub fn get_next_sync_after(&self) -> Option<SystemTime> {
+ self.next_sync_after
+ }
+ pub fn should_refresh_client(&self) -> bool {
+ match self.next_client_refresh_after {
+ Some(t) => SystemTime::now() > t,
+ None => true,
+ }
+ }
+ pub fn note_client_refresh(&mut self) {
+ self.next_client_refresh_after =
+ Some(SystemTime::now() + Duration::from_secs(CLIENTS_TTL_REFRESH));
+ }
+}
+
+/// Sync multiple stores
+/// * `stores` - The stores to sync
+/// * `persisted_global_state` - The global state to use, or None if never
+/// before provided. At the end of the sync, and even when the sync fails,
+/// the value in this cell should be persisted to permanent storage and
+/// provided next time the sync is called.
+/// * `last_client_info` - The client state to use, or None if never before
+/// provided. At the end of the sync, the value should be persisted
+/// *in memory only* - it should not be persisted to disk.
+/// * `storage_init` - Information about how the sync http client should be
+/// configured.
+/// * `root_sync_key` - The KeyBundle used for encryption.
+///
+/// Returns a map, keyed by name and holding an error value - if any store
+/// fails, the sync will continue on to other stores, but the error will be
+/// places in this map. The absence of a name in the map implies the store
+/// succeeded.
+pub fn sync_multiple(
+ stores: &[&dyn Store],
+ persisted_global_state: &mut Option<String>,
+ mem_cached_state: &mut MemoryCachedState,
+ storage_init: &Sync15StorageClientInit,
+ root_sync_key: &KeyBundle,
+ interruptee: &dyn Interruptee,
+ req_info: Option<SyncRequestInfo<'_>>,
+) -> SyncResult {
+ sync_multiple_with_command_processor(
+ None,
+ stores,
+ persisted_global_state,
+ mem_cached_state,
+ storage_init,
+ root_sync_key,
+ interruptee,
+ req_info,
+ )
+}
+
+/// Like `sync_multiple`, but specifies an optional command processor to handle
+/// commands from the clients collection. This function is called by the sync
+/// manager, which provides its own processor.
+#[allow(clippy::too_many_arguments)]
+pub fn sync_multiple_with_command_processor(
+ command_processor: Option<&dyn CommandProcessor>,
+ stores: &[&dyn Store],
+ persisted_global_state: &mut Option<String>,
+ mem_cached_state: &mut MemoryCachedState,
+ storage_init: &Sync15StorageClientInit,
+ root_sync_key: &KeyBundle,
+ interruptee: &dyn Interruptee,
+ req_info: Option<SyncRequestInfo<'_>>,
+) -> SyncResult {
+ log::info!("Syncing {} stores", stores.len());
+ let mut sync_result = SyncResult {
+ service_status: ServiceStatus::OtherError,
+ result: Ok(()),
+ declined: None,
+ next_sync_after: None,
+ engine_results: HashMap::with_capacity(stores.len()),
+ telemetry: telemetry::SyncTelemetryPing::new(),
+ };
+ let backoff = crate::client::new_backoff_listener();
+ let req_info = req_info.unwrap_or_default();
+ let driver = SyncMultipleDriver {
+ command_processor,
+ stores,
+ storage_init,
+ interruptee,
+ engines_to_state_change: req_info.engines_to_state_change,
+ backoff: backoff.clone(),
+ root_sync_key,
+ result: &mut sync_result,
+ persisted_global_state,
+ mem_cached_state,
+ saw_auth_error: false,
+ ignore_soft_backoff: req_info.is_user_action,
+ };
+ match driver.sync() {
+ Ok(()) => {
+ log::debug!(
+ "sync was successful, final status={:?}",
+ sync_result.service_status
+ );
+ }
+ Err(e) => {
+ log::warn!(
+ "sync failed: {}, final status={:?}\nBacktrace: {:?}",
+ e,
+ sync_result.service_status,
+ e.backtrace()
+ );
+ sync_result.result = Err(e);
+ }
+ }
+ // Respect `backoff` value when computing the next sync time even if we were
+ // ignoring it during the sync
+ sync_result.set_sync_after(backoff.get_required_wait(false).unwrap_or_default());
+ mem_cached_state.next_sync_after = sync_result.next_sync_after;
+ log::trace!("Sync result: {:?}", sync_result);
+ sync_result
+}
+
+/// This is essentially a bag of information that the sync manager knows, but
+/// otherwise we won't. It should probably be rethought if it gains many more
+/// fields.
+#[derive(Debug, Default)]
+pub struct SyncRequestInfo<'a> {
+ pub engines_to_state_change: Option<&'a HashMap<String, bool>>,
+ pub is_user_action: bool,
+}
+
+// The sync multiple driver
+struct SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> {
+ command_processor: Option<&'info dyn CommandProcessor>,
+ stores: &'info [&'info dyn Store],
+ storage_init: &'info Sync15StorageClientInit,
+ root_sync_key: &'info KeyBundle,
+ interruptee: &'info dyn Interruptee,
+ backoff: BackoffListener,
+ engines_to_state_change: Option<&'info HashMap<String, bool>>,
+ result: &'res mut SyncResult,
+ persisted_global_state: &'pgs mut Option<String>,
+ mem_cached_state: &'mcs mut MemoryCachedState,
+ ignore_soft_backoff: bool,
+ saw_auth_error: bool,
+}
+
+impl<'info, 'res, 'pgs, 'mcs> SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> {
+ /// The actual worker for sync_multiple.
+ fn sync(mut self) -> result::Result<(), Error> {
+ log::info!("Loading/initializing persisted state");
+ let mut pgs = self.prepare_persisted_state();
+
+ log::info!("Preparing client info");
+ let client_info = self.prepare_client_info()?;
+
+ if self.was_interrupted() {
+ return Ok(());
+ }
+
+ log::info!("Entering sync state machine");
+ // Advance the state machine to the point where it can perform a full
+ // sync. This may involve uploading meta/global, crypto/keys etc.
+ let mut global_state = self.run_state_machine(&client_info, &mut pgs)?;
+
+ if self.was_interrupted() {
+ return Ok(());
+ }
+
+ // Set the service status to OK here - we may adjust it based on an individual
+ // store failing.
+ self.result.service_status = ServiceStatus::Ok;
+
+ let clients_engine = if let Some(command_processor) = self.command_processor {
+ log::info!("Synchronizing clients engine");
+ let should_refresh = self.mem_cached_state.should_refresh_client();
+ let mut engine = clients::Engine::new(command_processor, self.interruptee);
+ if let Err(e) = engine.sync(
+ &client_info.client,
+ &global_state,
+ &self.root_sync_key,
+ should_refresh,
+ ) {
+ // Record telemetry with the error just in case...
+ let mut telem_sync = telemetry::SyncTelemetry::new();
+ let mut telem_engine = telemetry::Engine::new("clients");
+ telem_engine.failure(&e);
+ telem_sync.engine(telem_engine);
+ self.result.service_status = ServiceStatus::from_err(&e);
+
+ // ...And bail, because a clients engine sync failure is fatal.
+ return Err(e);
+ }
+ // We don't record telemetry for successful clients engine
+ // syncs, since we only keep client records in memory, we
+ // expect the counts to be the same most times, and a
+ // failure aborts the entire sync.
+ if self.was_interrupted() {
+ return Ok(());
+ }
+ self.mem_cached_state.note_client_refresh();
+ Some(engine)
+ } else {
+ None
+ };
+
+ log::info!("Synchronizing stores");
+
+ let telem_sync = self.sync_stores(&client_info, &mut global_state, clients_engine.as_ref());
+ self.result.telemetry.sync(telem_sync);
+
+ log::info!("Finished syncing stores.");
+
+ if !self.saw_auth_error {
+ log::trace!("Updating persisted global state");
+ self.mem_cached_state.last_client_info = Some(client_info);
+ self.mem_cached_state.last_global_state = Some(global_state);
+ }
+
+ Ok(())
+ }
+
+ fn was_interrupted(&mut self) -> bool {
+ if self.interruptee.was_interrupted() {
+ log::info!("Interrupted, bailing out");
+ self.result.service_status = ServiceStatus::Interrupted;
+ true
+ } else {
+ false
+ }
+ }
+
+ fn sync_stores(
+ &mut self,
+ client_info: &ClientInfo,
+ global_state: &mut GlobalState,
+ clients: Option<&clients::Engine<'_>>,
+ ) -> telemetry::SyncTelemetry {
+ let mut telem_sync = telemetry::SyncTelemetry::new();
+ for store in self.stores {
+ let name = store.collection_name();
+ if self
+ .backoff
+ .get_required_wait(self.ignore_soft_backoff)
+ .is_some()
+ {
+ log::warn!("Got backoff, bailing out of sync early");
+ break;
+ }
+ if global_state.global.declined.iter().any(|e| e == &*name) {
+ log::info!("The {} engine is declined. Skipping", name);
+ continue;
+ }
+ log::info!("Syncing {} engine!", name);
+
+ let mut telem_engine = telemetry::Engine::new(&*name);
+ let result = sync::synchronize_with_clients_engine(
+ &client_info.client,
+ &global_state,
+ self.root_sync_key,
+ clients,
+ *store,
+ true,
+ &mut telem_engine,
+ self.interruptee,
+ );
+
+ match result {
+ Ok(()) => log::info!("Sync of {} was successful!", name),
+ Err(ref e) => {
+ log::warn!("Sync of {} failed! {:?}", name, e);
+ let this_status = ServiceStatus::from_err(&e);
+ // The only error which forces us to discard our state is an
+ // auth error.
+ self.saw_auth_error =
+ self.saw_auth_error || this_status == ServiceStatus::AuthenticationError;
+ telem_engine.failure(e);
+ // If the failure from the store looks like anything other than
+ // a "store error" we don't bother trying the others.
+ if this_status != ServiceStatus::OtherError {
+ telem_sync.engine(telem_engine);
+ self.result.engine_results.insert(name.into(), result);
+ self.result.service_status = this_status;
+ break;
+ }
+ }
+ }
+ telem_sync.engine(telem_engine);
+ self.result.engine_results.insert(name.into(), result);
+ if self.was_interrupted() {
+ break;
+ }
+ }
+ telem_sync
+ }
+
+ fn run_state_machine(
+ &mut self,
+ client_info: &ClientInfo,
+ pgs: &mut PersistedGlobalState,
+ ) -> result::Result<GlobalState, Error> {
+ let last_state = mem::replace(&mut self.mem_cached_state.last_global_state, None);
+
+ let mut state_machine = SetupStateMachine::for_full_sync(
+ &client_info.client,
+ &self.root_sync_key,
+ pgs,
+ self.engines_to_state_change,
+ self.interruptee,
+ );
+
+ log::info!("Advancing state machine to ready (full)");
+ let res = state_machine.run_to_ready(last_state);
+ // Grab this now even though we don't need it until later to avoid a
+ // lifetime issue
+ let changes = state_machine.changes_needed.take();
+ // The state machine might have updated our persisted_global_state, so
+ // update the caller's repr of it.
+ *self.persisted_global_state = Some(serde_json::to_string(&pgs)?);
+
+ // Now that we've gone through the state machine, store the declined list in
+ // the sync_result
+ self.result.declined = Some(pgs.get_declined().to_vec());
+ log::debug!(
+ "Declined engines list after state machine set to: {:?}",
+ self.result.declined,
+ );
+
+ if let Some(c) = changes {
+ self.wipe_or_reset_engines(c, &client_info.client)?;
+ }
+ let state = match res {
+ Err(e) => {
+ self.result.service_status = ServiceStatus::from_err(&e);
+ return Err(e);
+ }
+ Ok(state) => state,
+ };
+ self.result.telemetry.uid(client_info.client.hashed_uid()?);
+ // As for client_info, put None back now so we start from scratch on error.
+ self.mem_cached_state.last_global_state = None;
+ Ok(state)
+ }
+
+ fn wipe_or_reset_engines(
+ &mut self,
+ changes: EngineChangesNeeded,
+ client: &Sync15StorageClient,
+ ) -> result::Result<(), Error> {
+ if changes.local_resets.is_empty() && changes.remote_wipes.is_empty() {
+ return Ok(());
+ }
+ for e in &changes.remote_wipes {
+ log::info!("Engine {:?} just got disabled locally, wiping server", e);
+ client.wipe_remote_engine(&e)?;
+ }
+
+ for s in self.stores {
+ let name = s.collection_name();
+ if changes.local_resets.contains(&*name) {
+ log::info!("Resetting engine {}, as it was declined remotely", name);
+ s.reset(&StoreSyncAssociation::Disconnected)?;
+ }
+ }
+
+ Ok(())
+ }
+
+ fn prepare_client_info(&mut self) -> result::Result<ClientInfo, Error> {
+ let mut client_info = match mem::replace(&mut self.mem_cached_state.last_client_info, None)
+ {
+ Some(client_info) => {
+ // if our storage_init has changed it probably means the user has
+ // changed, courtesy of the 'kid' in the structure. Thus, we can't
+ // reuse the client or the memory cached state. We do keep the disk
+ // state as currently that's only the declined list.
+ if client_info.client_init != *self.storage_init {
+ log::info!("Discarding all state as the account might have changed");
+ *self.mem_cached_state = MemoryCachedState::default();
+ ClientInfo::new(self.storage_init)?
+ } else {
+ log::debug!("Reusing memory-cached client_info");
+ // we can reuse it (which should be the common path)
+ client_info
+ }
+ }
+ None => {
+ log::debug!("mem_cached_state was stale or missing, need setup");
+ // We almost certainly have no other state here, but to be safe, we
+ // throw away any memory state we do have.
+ self.mem_cached_state.clear_sensitive_info();
+ ClientInfo::new(self.storage_init)?
+ }
+ };
+ // Ensure we use the correct listener here rather than on all the branches
+ // above, since it seems less error prone.
+ client_info.client.backoff = self.backoff.clone();
+ Ok(client_info)
+ }
+
+ fn prepare_persisted_state(&mut self) -> PersistedGlobalState {
+ // Note that any failure to use a persisted state means we also decline
+ // to use our memory cached state, so that we fully rebuild that
+ // persisted state for next time.
+ match self.persisted_global_state {
+ Some(persisted_string) if !persisted_string.is_empty() => {
+ match serde_json::from_str::<PersistedGlobalState>(&persisted_string) {
+ Ok(state) => {
+ log::trace!("Read persisted state: {:?}", state);
+ // Note that we don't set `result.declined` from the
+ // data in state - it remains None, which explicitly
+ // indicates "we don't have updated info".
+ state
+ }
+ _ => {
+ // Don't log the error since it might contain sensitive
+ // info (although currently it only contains the declined engines list)
+ log::error!(
+ "Failed to parse PersistedGlobalState from JSON! Falling back to default"
+ );
+ *self.mem_cached_state = MemoryCachedState::default();
+ PersistedGlobalState::default()
+ }
+ }
+ }
+ _ => {
+ log::info!(
+ "The application didn't give us persisted state - \
+ this is only expected on the very first run for a given user."
+ );
+ *self.mem_cached_state = MemoryCachedState::default();
+ PersistedGlobalState::default()
+ }
+ }
+ }
+}
diff --git a/third_party/rust/sync15/src/telemetry.rs b/third_party/rust/sync15/src/telemetry.rs
new file mode 100644
index 0000000000..eeeb558bca
--- /dev/null
+++ b/third_party/rust/sync15/src/telemetry.rs
@@ -0,0 +1,46 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+//! Note: this mostly just reexports the things from sync15_traits::telemetry.
+
+use crate::error::{Error, ErrorKind, ErrorResponse};
+
+pub use sync15_traits::telemetry::*;
+
+impl<'a> From<&'a Error> for SyncFailure {
+ fn from(e: &Error) -> SyncFailure {
+ match e.kind() {
+ ErrorKind::TokenserverHttpError(status) => {
+ if *status == 401 {
+ SyncFailure::Auth {
+ from: "tokenserver",
+ }
+ } else {
+ SyncFailure::Http { code: *status }
+ }
+ }
+ ErrorKind::BackoffError(_) => SyncFailure::Http { code: 503 },
+ ErrorKind::StorageHttpError(ref e) => match e {
+ ErrorResponse::NotFound { .. } => SyncFailure::Http { code: 404 },
+ ErrorResponse::Unauthorized { .. } => SyncFailure::Auth { from: "storage" },
+ ErrorResponse::PreconditionFailed { .. } => SyncFailure::Http { code: 412 },
+ ErrorResponse::ServerError { status, .. } => SyncFailure::Http { code: *status },
+ ErrorResponse::RequestFailed { status, .. } => SyncFailure::Http { code: *status },
+ },
+ ErrorKind::CryptoError(ref e) => SyncFailure::Unexpected {
+ error: e.to_string(),
+ },
+ ErrorKind::RequestError(ref e) => SyncFailure::Unexpected {
+ error: e.to_string(),
+ },
+ ErrorKind::UnexpectedStatus(ref e) => SyncFailure::Http { code: e.status },
+ ErrorKind::Interrupted(ref e) => SyncFailure::Unexpected {
+ error: e.to_string(),
+ },
+ e => SyncFailure::Other {
+ error: e.to_string(),
+ },
+ }
+ }
+}
diff --git a/third_party/rust/sync15/src/token.rs b/third_party/rust/sync15/src/token.rs
new file mode 100644
index 0000000000..2cceb1e34e
--- /dev/null
+++ b/third_party/rust/sync15/src/token.rs
@@ -0,0 +1,617 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::error::{self, ErrorKind, Result};
+use crate::util::ServerTimestamp;
+use rc_crypto::hawk;
+use serde_derive::*;
+use std::borrow::{Borrow, Cow};
+use std::cell::RefCell;
+use std::fmt;
+use std::time::{Duration, SystemTime};
+use url::Url;
+use viaduct::{header_names, Request};
+
+const RETRY_AFTER_DEFAULT_MS: u64 = 10000;
+
+// The TokenserverToken is the token as received directly from the token server
+// and deserialized from JSON.
+#[derive(Deserialize, Clone, PartialEq, Eq)]
+struct TokenserverToken {
+ id: String,
+ key: String,
+ api_endpoint: String,
+ uid: u64,
+ duration: u64,
+ hashed_fxa_uid: String,
+}
+
+impl std::fmt::Debug for TokenserverToken {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("TokenserverToken")
+ .field("api_endpoint", &self.api_endpoint)
+ .field("uid", &self.uid)
+ .field("duration", &self.duration)
+ .field("hashed_fxa_uid", &self.hashed_fxa_uid)
+ .finish()
+ }
+}
+
+// The struct returned by the TokenFetcher - the token itself and the
+// server timestamp.
+struct TokenFetchResult {
+ token: TokenserverToken,
+ server_timestamp: ServerTimestamp,
+}
+
+// The trait for fetching tokens - we'll provide a "real" implementation but
+// tests will re-implement it.
+trait TokenFetcher {
+ fn fetch_token(&self) -> super::Result<TokenFetchResult>;
+ // We allow the trait to tell us what the time is so tests can get funky.
+ fn now(&self) -> SystemTime;
+}
+
+// Our "real" token fetcher, implementing the TokenFetcher trait, which hits
+// the token server
+#[derive(Debug)]
+struct TokenServerFetcher {
+ // The stuff needed to fetch a token.
+ server_url: Url,
+ access_token: String,
+ key_id: String,
+}
+
+fn fixup_server_url(mut url: Url) -> Result<Url> {
+ // The given `url` is the end-point as returned by .well-known/fxa-client-configuration,
+ // or as directly specified by self-hosters. As a result, it may or may not have
+ // the sync 1.5 suffix of "/1.0/sync/1.5", so add it on here if it does not.
+ if url.as_str().ends_with("1.0/sync/1.5") {
+ Ok(url)
+ } else if url.as_str().ends_with("1.0/sync/1.5/") {
+ // Shouldn't ever be Err() here, but the result is `Result<PathSegmentsMut, ()>`
+ // and I don't want to unwrap or add a new error type just for PathSegmentsMut failing.
+ if let Ok(mut path) = url.path_segments_mut() {
+ path.pop();
+ }
+ Ok(url)
+ } else {
+ // We deliberately don't use `.join()` here in order to preserve all path components.
+ // For example, "http://example.com/token" should produce "http://example.com/token/1.0/sync/1.5"
+ // but using `.join()` would produce "http://example.com/1.0/sync/1.5".
+ if let Ok(mut path) = url.path_segments_mut() {
+ path.pop_if_empty();
+ path.extend(&["1.0", "sync", "1.5"]);
+ }
+ Ok(url)
+ }
+}
+
+impl TokenServerFetcher {
+ fn new(base_url: Url, access_token: String, key_id: String) -> Result<TokenServerFetcher> {
+ Ok(TokenServerFetcher {
+ server_url: fixup_server_url(base_url)?,
+ access_token,
+ key_id,
+ })
+ }
+}
+
+impl TokenFetcher for TokenServerFetcher {
+ fn fetch_token(&self) -> Result<TokenFetchResult> {
+ log::debug!("Fetching token from {}", self.server_url);
+ let resp = Request::get(self.server_url.clone())
+ .header(
+ header_names::AUTHORIZATION,
+ format!("Bearer {}", self.access_token),
+ )?
+ .header(header_names::X_KEYID, self.key_id.clone())?
+ .send()?;
+
+ if !resp.is_success() {
+ log::warn!("Non-success status when fetching token: {}", resp.status);
+ // TODO: the body should be JSON and contain a status parameter we might need?
+ log::trace!(" Response body {}", resp.text());
+ // XXX - shouldn't we "chain" these errors - ie, a BackoffError could
+ // have a TokenserverHttpError as its cause?
+ if let Some(res) = resp.headers.get_as::<f64, _>(header_names::RETRY_AFTER) {
+ let ms = res
+ .ok()
+ .map_or(RETRY_AFTER_DEFAULT_MS, |f| (f * 1000f64) as u64);
+ let when = self.now() + Duration::from_millis(ms);
+ return Err(ErrorKind::BackoffError(when).into());
+ }
+ let status = resp.status;
+ return Err(ErrorKind::TokenserverHttpError(status).into());
+ }
+
+ let token: TokenserverToken = resp.json()?;
+ let server_timestamp = resp
+ .headers
+ .try_get::<ServerTimestamp, _>(header_names::X_TIMESTAMP)
+ .ok_or_else(|| ErrorKind::MissingServerTimestamp)?;
+ Ok(TokenFetchResult {
+ token,
+ server_timestamp,
+ })
+ }
+
+ fn now(&self) -> SystemTime {
+ SystemTime::now()
+ }
+}
+
+// The context stored by our TokenProvider when it has a TokenState::Token
+// state.
+struct TokenContext {
+ token: TokenserverToken,
+ credentials: hawk::Credentials,
+ server_timestamp: ServerTimestamp,
+ valid_until: SystemTime,
+}
+
+// hawk::Credentials doesn't implement debug -_-
+impl fmt::Debug for TokenContext {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> ::std::result::Result<(), fmt::Error> {
+ f.debug_struct("TokenContext")
+ .field("token", &self.token)
+ .field("credentials", &"(omitted)")
+ .field("server_timestamp", &self.server_timestamp)
+ .field("valid_until", &self.valid_until)
+ .finish()
+ }
+}
+
+impl TokenContext {
+ fn new(
+ token: TokenserverToken,
+ credentials: hawk::Credentials,
+ server_timestamp: ServerTimestamp,
+ valid_until: SystemTime,
+ ) -> Self {
+ Self {
+ token,
+ credentials,
+ server_timestamp,
+ valid_until,
+ }
+ }
+
+ fn is_valid(&self, now: SystemTime) -> bool {
+ // We could consider making the duration a little shorter - if it
+ // only has 1 second validity there seems a reasonable chance it will
+ // have expired by the time it gets presented to the remote that wants
+ // it.
+ // Either way though, we will eventually need to handle a token being
+ // rejected as a non-fatal error and recover, so maybe we don't care?
+ now < self.valid_until
+ }
+
+ fn authorization(&self, req: &Request) -> Result<String> {
+ let url = &req.url;
+
+ let path_and_query = match url.query() {
+ None => Cow::from(url.path()),
+ Some(qs) => Cow::from(format!("{}?{}", url.path(), qs)),
+ };
+
+ let host = url
+ .host_str()
+ .ok_or_else(|| ErrorKind::UnacceptableUrl("Storage URL has no host".into()))?;
+
+ // Known defaults exist for https? (among others), so this should be impossible
+ let port = url.port_or_known_default().ok_or_else(|| {
+ ErrorKind::UnacceptableUrl(
+ "Storage URL has no port and no default port is known for the protocol".into(),
+ )
+ })?;
+
+ let header =
+ hawk::RequestBuilder::new(req.method.as_str(), host, port, path_and_query.borrow())
+ .request()
+ .make_header(&self.credentials)?;
+
+ Ok(format!("Hawk {}", header))
+ }
+}
+
+// The state our TokenProvider holds to reflect the state of the token.
+#[derive(Debug)]
+enum TokenState {
+ // We've never fetched a token.
+ NoToken,
+ // Have a token and last we checked it remained valid.
+ Token(TokenContext),
+ // We failed to fetch a token. First elt is the error, second elt is
+ // the api_endpoint we had before we failed to fetch a new token (or
+ // None if the very first attempt at fetching a token failed)
+ Failed(Option<error::Error>, Option<String>),
+ // Previously failed and told to back-off for SystemTime duration. Second
+ // elt is the api_endpoint we had before we hit the backoff error.
+ // XXX - should we roll Backoff and Failed together?
+ Backoff(SystemTime, Option<String>),
+ // api_endpoint changed - we are never going to get a token nor move out
+ // of this state.
+ NodeReassigned,
+}
+
+/// The generic TokenProvider implementation - long lived and fetches tokens
+/// on demand (eg, when first needed, or when an existing one expires.)
+#[derive(Debug)]
+struct TokenProviderImpl<TF: TokenFetcher> {
+ fetcher: TF,
+ // Our token state (ie, whether we have a token, and if not, why not)
+ current_state: RefCell<TokenState>,
+}
+
+impl<TF: TokenFetcher> TokenProviderImpl<TF> {
+ fn new(fetcher: TF) -> Self {
+ // We check this at the real entrypoint of the application, but tests
+ // can/do bypass that, so check this here too.
+ rc_crypto::ensure_initialized();
+ TokenProviderImpl {
+ fetcher,
+ current_state: RefCell::new(TokenState::NoToken),
+ }
+ }
+
+ // Uses our fetcher to grab a new token and if successfull, derives other
+ // info from that token into a usable TokenContext.
+ fn fetch_context(&self) -> Result<TokenContext> {
+ let result = self.fetcher.fetch_token()?;
+ let token = result.token;
+ let valid_until = SystemTime::now() + Duration::from_secs(token.duration);
+
+ let credentials = hawk::Credentials {
+ id: token.id.clone(),
+ key: hawk::Key::new(token.key.as_bytes(), hawk::SHA256)?,
+ };
+
+ Ok(TokenContext::new(
+ token,
+ credentials,
+ result.server_timestamp,
+ valid_until,
+ ))
+ }
+
+ // Attempt to fetch a new token and return a new state reflecting that
+ // operation. If it worked a TokenState will be returned, but errors may
+ // cause other states.
+ fn fetch_token(&self, previous_endpoint: Option<&str>) -> TokenState {
+ match self.fetch_context() {
+ Ok(tc) => {
+ // We got a new token - check that the endpoint is the same
+ // as a previous endpoint we saw (if any)
+ match previous_endpoint {
+ Some(prev) => {
+ if prev == tc.token.api_endpoint {
+ TokenState::Token(tc)
+ } else {
+ log::warn!(
+ "api_endpoint changed from {} to {}",
+ prev,
+ tc.token.api_endpoint
+ );
+ TokenState::NodeReassigned
+ }
+ }
+ None => {
+ // Never had an api_endpoint in the past, so this is OK.
+ TokenState::Token(tc)
+ }
+ }
+ }
+ Err(e) => {
+ // Early to avoid nll issues...
+ if let ErrorKind::BackoffError(be) = e.kind() {
+ return TokenState::Backoff(*be, previous_endpoint.map(ToString::to_string));
+ }
+ TokenState::Failed(Some(e), previous_endpoint.map(ToString::to_string))
+ }
+ }
+ }
+
+ // Given the state we are currently in, return a new current state.
+ // Returns None if the current state should be used (eg, if we are
+ // holding a token that remains valid) or Some() if the state has changed
+ // (which may have changed to a state with a token or an error state)
+ fn advance_state(&self, state: &TokenState) -> Option<TokenState> {
+ match state {
+ TokenState::NoToken => Some(self.fetch_token(None)),
+ TokenState::Failed(_, existing_endpoint) => {
+ Some(self.fetch_token(existing_endpoint.as_ref().map(String::as_str)))
+ }
+ TokenState::Token(existing_context) => {
+ if existing_context.is_valid(self.fetcher.now()) {
+ None
+ } else {
+ Some(self.fetch_token(Some(existing_context.token.api_endpoint.as_str())))
+ }
+ }
+ TokenState::Backoff(ref until, ref existing_endpoint) => {
+ if let Ok(remaining) = until.duration_since(self.fetcher.now()) {
+ log::debug!("enforcing existing backoff - {:?} remains", remaining);
+ None
+ } else {
+ // backoff period is over
+ Some(self.fetch_token(existing_endpoint.as_ref().map(String::as_str)))
+ }
+ }
+ TokenState::NodeReassigned => {
+ // We never leave this state.
+ None
+ }
+ }
+ }
+
+ fn with_token<T, F>(&self, func: F) -> Result<T>
+ where
+ F: FnOnce(&TokenContext) -> Result<T>,
+ {
+ // first get a mutable ref to our existing state, advance to the
+ // state we will use, then re-stash that state for next time.
+ let state: &mut TokenState = &mut self.current_state.borrow_mut();
+ if let Some(new_state) = self.advance_state(state) {
+ *state = new_state;
+ }
+
+ // Now re-fetch the state we should use for this call - if it's
+ // anything other than TokenState::Token we will fail.
+ match state {
+ TokenState::NoToken => {
+ // it should be impossible to get here.
+ panic!("Can't be in NoToken state after advancing");
+ }
+ TokenState::Token(ref token_context) => {
+ // make the call.
+ func(token_context)
+ }
+ TokenState::Failed(e, _) => {
+ // We swap the error out of the state enum and return it.
+ Err(e.take().unwrap())
+ }
+ TokenState::NodeReassigned => {
+ // this is unrecoverable.
+ Err(ErrorKind::StorageResetError.into())
+ }
+ TokenState::Backoff(ref remaining, _) => {
+ Err(ErrorKind::BackoffError(*remaining).into())
+ }
+ }
+ }
+
+ fn hashed_uid(&self) -> Result<String> {
+ self.with_token(|ctx| Ok(ctx.token.hashed_fxa_uid.clone()))
+ }
+
+ fn authorization(&self, req: &Request) -> Result<String> {
+ self.with_token(|ctx| ctx.authorization(req))
+ }
+
+ fn api_endpoint(&self) -> Result<String> {
+ self.with_token(|ctx| Ok(ctx.token.api_endpoint.clone()))
+ }
+ // TODO: we probably want a "drop_token/context" type method so that when
+ // using a token with some validity fails the caller can force a new one
+ // (in which case the new token request will probably fail with a 401)
+}
+
+// The public concrete object exposed by this module
+#[derive(Debug)]
+pub struct TokenProvider {
+ imp: TokenProviderImpl<TokenServerFetcher>,
+}
+
+impl TokenProvider {
+ pub fn new(url: Url, access_token: String, key_id: String) -> Result<Self> {
+ let fetcher = TokenServerFetcher::new(url, access_token, key_id)?;
+ Ok(Self {
+ imp: TokenProviderImpl::new(fetcher),
+ })
+ }
+
+ pub fn hashed_uid(&self) -> Result<String> {
+ self.imp.hashed_uid()
+ }
+
+ pub fn authorization(&self, req: &Request) -> Result<String> {
+ self.imp.authorization(req)
+ }
+
+ pub fn api_endpoint(&self) -> Result<String> {
+ self.imp.api_endpoint()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::cell::Cell;
+
+ struct TestFetcher<FF, FN>
+ where
+ FF: Fn() -> Result<TokenFetchResult>,
+ FN: Fn() -> SystemTime,
+ {
+ fetch: FF,
+ now: FN,
+ }
+ impl<FF, FN> TokenFetcher for TestFetcher<FF, FN>
+ where
+ FF: Fn() -> Result<TokenFetchResult>,
+ FN: Fn() -> SystemTime,
+ {
+ fn fetch_token(&self) -> Result<TokenFetchResult> {
+ (self.fetch)()
+ }
+ fn now(&self) -> SystemTime {
+ (self.now)()
+ }
+ }
+
+ fn make_tsc<FF, FN>(fetch: FF, now: FN) -> TokenProviderImpl<TestFetcher<FF, FN>>
+ where
+ FF: Fn() -> Result<TokenFetchResult>,
+ FN: Fn() -> SystemTime,
+ {
+ let fetcher: TestFetcher<FF, FN> = TestFetcher { fetch, now };
+ TokenProviderImpl::new(fetcher)
+ }
+
+ #[test]
+ fn test_endpoint() {
+ // Use a cell to avoid the closure having a mutable ref to this scope.
+ let counter: Cell<u32> = Cell::new(0);
+ let fetch = || {
+ counter.set(counter.get() + 1);
+ Ok(TokenFetchResult {
+ token: TokenserverToken {
+ id: "id".to_string(),
+ key: "key".to_string(),
+ api_endpoint: "api_endpoint".to_string(),
+ uid: 1,
+ duration: 1000,
+ hashed_fxa_uid: "hash".to_string(),
+ },
+ server_timestamp: ServerTimestamp(0i64),
+ })
+ };
+
+ let tsc = make_tsc(fetch, SystemTime::now);
+
+ let e = tsc.api_endpoint().expect("should work");
+ assert_eq!(e, "api_endpoint".to_string());
+ assert_eq!(counter.get(), 1);
+
+ let e2 = tsc.api_endpoint().expect("should work");
+ assert_eq!(e2, "api_endpoint".to_string());
+ // should not have re-fetched.
+ assert_eq!(counter.get(), 1);
+ }
+
+ #[test]
+ fn test_backoff() {
+ let counter: Cell<u32> = Cell::new(0);
+ let fetch = || {
+ counter.set(counter.get() + 1);
+ let when = SystemTime::now() + Duration::from_millis(10000);
+ Err(error::Error::from(ErrorKind::BackoffError(when)))
+ };
+ let now: Cell<SystemTime> = Cell::new(SystemTime::now());
+ let tsc = make_tsc(fetch, || now.get());
+
+ tsc.api_endpoint().expect_err("should bail");
+ // XXX - check error type.
+ assert_eq!(counter.get(), 1);
+ // try and get another token - should not re-fetch as backoff is still
+ // in progress.
+ tsc.api_endpoint().expect_err("should bail");
+ assert_eq!(counter.get(), 1);
+
+ // Advance the clock.
+ now.set(now.get() + Duration::new(20, 0));
+
+ // Our token fetch mock is still returning a backoff error, so we
+ // still fail, but should have re-hit the fetch function.
+ tsc.api_endpoint().expect_err("should bail");
+ assert_eq!(counter.get(), 2);
+ }
+
+ #[test]
+ fn test_validity() {
+ let counter: Cell<u32> = Cell::new(0);
+ let fetch = || {
+ counter.set(counter.get() + 1);
+ Ok(TokenFetchResult {
+ token: TokenserverToken {
+ id: "id".to_string(),
+ key: "key".to_string(),
+ api_endpoint: "api_endpoint".to_string(),
+ uid: 1,
+ duration: 10,
+ hashed_fxa_uid: "hash".to_string(),
+ },
+ server_timestamp: ServerTimestamp(0i64),
+ })
+ };
+ let now: Cell<SystemTime> = Cell::new(SystemTime::now());
+ let tsc = make_tsc(fetch, || now.get());
+
+ tsc.api_endpoint().expect("should get a valid token");
+ assert_eq!(counter.get(), 1);
+
+ // try and get another token - should not re-fetch as the old one
+ // remains valid.
+ tsc.api_endpoint().expect("should reuse existing token");
+ assert_eq!(counter.get(), 1);
+
+ // Advance the clock.
+ now.set(now.get() + Duration::new(20, 0));
+
+ // We should discard our token and fetch a new one.
+ tsc.api_endpoint().expect("should re-fetch");
+ assert_eq!(counter.get(), 2);
+ }
+
+ #[test]
+ fn test_server_url() {
+ assert_eq!(
+ fixup_server_url(
+ Url::parse("https://token.services.mozilla.com/1.0/sync/1.5").unwrap()
+ )
+ .unwrap()
+ .as_str(),
+ "https://token.services.mozilla.com/1.0/sync/1.5"
+ );
+ assert_eq!(
+ fixup_server_url(
+ Url::parse("https://token.services.mozilla.com/1.0/sync/1.5/").unwrap()
+ )
+ .unwrap()
+ .as_str(),
+ "https://token.services.mozilla.com/1.0/sync/1.5"
+ );
+ assert_eq!(
+ fixup_server_url(Url::parse("https://token.services.mozilla.com").unwrap())
+ .unwrap()
+ .as_str(),
+ "https://token.services.mozilla.com/1.0/sync/1.5"
+ );
+ assert_eq!(
+ fixup_server_url(Url::parse("https://token.services.mozilla.com/").unwrap())
+ .unwrap()
+ .as_str(),
+ "https://token.services.mozilla.com/1.0/sync/1.5"
+ );
+ assert_eq!(
+ fixup_server_url(
+ Url::parse("https://selfhosted.example.com/token/1.0/sync/1.5").unwrap()
+ )
+ .unwrap()
+ .as_str(),
+ "https://selfhosted.example.com/token/1.0/sync/1.5"
+ );
+ assert_eq!(
+ fixup_server_url(
+ Url::parse("https://selfhosted.example.com/token/1.0/sync/1.5/").unwrap()
+ )
+ .unwrap()
+ .as_str(),
+ "https://selfhosted.example.com/token/1.0/sync/1.5"
+ );
+ assert_eq!(
+ fixup_server_url(Url::parse("https://selfhosted.example.com/token/").unwrap())
+ .unwrap()
+ .as_str(),
+ "https://selfhosted.example.com/token/1.0/sync/1.5"
+ );
+ assert_eq!(
+ fixup_server_url(Url::parse("https://selfhosted.example.com/token").unwrap())
+ .unwrap()
+ .as_str(),
+ "https://selfhosted.example.com/token/1.0/sync/1.5"
+ );
+ }
+}
diff --git a/third_party/rust/sync15/src/util.rs b/third_party/rust/sync15/src/util.rs
new file mode 100644
index 0000000000..9bd28dc4de
--- /dev/null
+++ b/third_party/rust/sync15/src/util.rs
@@ -0,0 +1,104 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use std::collections::{HashMap, HashSet};
+use std::sync::atomic::{AtomicU32, Ordering};
+
+pub use sync15_traits::ServerTimestamp;
+
+/// Finds the maximum of the current value and the argument `val`, and sets the
+/// new value to the result.
+///
+/// Note: `AtomicFoo::fetch_max` is unstable, and can't really be implemented as
+/// a single atomic operation from outside the stdlib ;-;
+pub(crate) fn atomic_update_max(v: &AtomicU32, new: u32) {
+ // For loads (and the compare_exchange_weak second ordering argument) this
+ // is too strong, we could probably get away with Acquire (or maybe Relaxed
+ // because we don't need the result?). In either case, this fn isn't called
+ // from a hot spot so whatever.
+ let mut cur = v.load(Ordering::SeqCst);
+ while cur < new {
+ // we're already handling the failure case so there's no reason not to
+ // use _weak here.
+ match v.compare_exchange_weak(cur, new, Ordering::SeqCst, Ordering::SeqCst) {
+ Ok(_) => {
+ // Success.
+ break;
+ }
+ Err(new_cur) => {
+ // Interrupted, keep trying.
+ cur = new_cur
+ }
+ }
+ }
+}
+
+// Slight wrappers around the builtin methods for doing this.
+pub(crate) fn set_union(a: &HashSet<String>, b: &HashSet<String>) -> HashSet<String> {
+ a.union(b).cloned().collect()
+}
+
+pub(crate) fn set_difference(a: &HashSet<String>, b: &HashSet<String>) -> HashSet<String> {
+ a.difference(b).cloned().collect()
+}
+
+pub(crate) fn set_intersection(a: &HashSet<String>, b: &HashSet<String>) -> HashSet<String> {
+ a.intersection(b).cloned().collect()
+}
+
+pub(crate) fn partition_by_value(v: &HashMap<String, bool>) -> (HashSet<String>, HashSet<String>) {
+ let mut true_: HashSet<String> = HashSet::new();
+ let mut false_: HashSet<String> = HashSet::new();
+ for (s, val) in v {
+ if *val {
+ true_.insert(s.clone());
+ } else {
+ false_.insert(s.clone());
+ }
+ }
+ (true_, false_)
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_set_ops() {
+ fn hash_set(s: &[&str]) -> HashSet<String> {
+ s.iter()
+ .copied()
+ .map(ToOwned::to_owned)
+ .collect::<HashSet<_>>()
+ }
+
+ assert_eq!(
+ set_union(&hash_set(&["a", "b", "c"]), &hash_set(&["b", "d"])),
+ hash_set(&["a", "b", "c", "d"]),
+ );
+
+ assert_eq!(
+ set_difference(&hash_set(&["a", "b", "c"]), &hash_set(&["b", "d"])),
+ hash_set(&["a", "c"]),
+ );
+ assert_eq!(
+ set_intersection(&hash_set(&["a", "b", "c"]), &hash_set(&["b", "d"])),
+ hash_set(&["b"]),
+ );
+ let m: HashMap<String, bool> = [
+ ("foo", true),
+ ("bar", true),
+ ("baz", false),
+ ("quux", false),
+ ]
+ .iter()
+ .copied()
+ .map(|(a, b)| (a.to_owned(), b))
+ .collect();
+ assert_eq!(
+ partition_by_value(&m),
+ (hash_set(&["foo", "bar"]), hash_set(&["baz", "quux"])),
+ );
+ }
+}