diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/webext-storage/src | |
parent | Initial commit. (diff) | |
download | firefox-esr-upstream.tar.xz firefox-esr-upstream.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/webext-storage/src')
-rw-r--r-- | third_party/rust/webext-storage/src/api.rs | 745 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/db.rs | 301 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/error.rs | 72 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/ffi.rs | 48 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/lib.rs | 26 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/migration.rs | 454 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/schema.rs | 213 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/store.rs | 218 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/sync/bridge.rs | 388 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/sync/incoming.rs | 863 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/sync/mod.rs | 429 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/sync/outgoing.rs | 186 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/sync/sync_tests.rs | 529 |
13 files changed, 4472 insertions, 0 deletions
diff --git a/third_party/rust/webext-storage/src/api.rs b/third_party/rust/webext-storage/src/api.rs new file mode 100644 index 0000000000..fe838a075d --- /dev/null +++ b/third_party/rust/webext-storage/src/api.rs @@ -0,0 +1,745 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::*; +use rusqlite::{Connection, Transaction}; +use serde::{ser::SerializeMap, Serialize, Serializer}; + +use serde_json::{Map, Value as JsonValue}; +use sql_support::{self, ConnExt}; + +// These constants are defined by the chrome.storage.sync spec. We export them +// publicly from this module, then from the crate, so they wind up in the +// clients. +// Note the limits for `chrome.storage.sync` and `chrome.storage.local` are +// different, and these are from `.sync` - we'll have work to do if we end up +// wanting this to be used for `.local` too! +pub const SYNC_QUOTA_BYTES: usize = 102_400; +pub const SYNC_QUOTA_BYTES_PER_ITEM: usize = 8_192; +pub const SYNC_MAX_ITEMS: usize = 512; +// Note there are also constants for "operations per minute" etc, which aren't +// enforced here. + +type JsonMap = Map<String, JsonValue>; + +enum StorageChangeOp { + Clear, + Set(JsonValue), + SetWithoutQuota(JsonValue), +} + +fn get_from_db(conn: &Connection, ext_id: &str) -> Result<Option<JsonMap>> { + Ok( + match conn.try_query_one::<String, _>( + "SELECT data FROM storage_sync_data + WHERE ext_id = :ext_id", + &[(":ext_id", &ext_id)], + true, + )? { + Some(s) => match serde_json::from_str(&s)? { + JsonValue::Object(m) => Some(m), + // we could panic here as it's theoretically impossible, but we + // might as well treat it as not existing... + _ => None, + }, + None => None, + }, + ) +} + +fn save_to_db(tx: &Transaction<'_>, ext_id: &str, val: &StorageChangeOp) -> Result<()> { + // This function also handles removals. Either an empty map or explicit null + // is a removal. If there's a mirror record for this extension ID, then we + // must leave a tombstone behind for syncing. + let is_delete = match val { + StorageChangeOp::Clear => true, + StorageChangeOp::Set(JsonValue::Object(v)) => v.is_empty(), + StorageChangeOp::SetWithoutQuota(JsonValue::Object(v)) => v.is_empty(), + _ => false, + }; + if is_delete { + let in_mirror = tx + .try_query_one( + "SELECT EXISTS(SELECT 1 FROM storage_sync_mirror WHERE ext_id = :ext_id);", + rusqlite::named_params! { + ":ext_id": ext_id, + }, + true, + )? + .unwrap_or_default(); + if in_mirror { + log::trace!("saving data for '{}': leaving a tombstone", ext_id); + tx.execute_cached( + " + INSERT INTO storage_sync_data(ext_id, data, sync_change_counter) + VALUES (:ext_id, NULL, 1) + ON CONFLICT (ext_id) DO UPDATE + SET data = NULL, sync_change_counter = sync_change_counter + 1", + rusqlite::named_params! { + ":ext_id": ext_id, + }, + )?; + } else { + log::trace!("saving data for '{}': removing the row", ext_id); + tx.execute_cached( + " + DELETE FROM storage_sync_data WHERE ext_id = :ext_id", + rusqlite::named_params! { + ":ext_id": ext_id, + }, + )?; + } + } else { + // Convert to bytes so we can enforce the quota if necessary. + let sval = match val { + StorageChangeOp::Set(v) => { + let sv = v.to_string(); + if sv.len() > SYNC_QUOTA_BYTES { + return Err(ErrorKind::QuotaError(QuotaReason::TotalBytes).into()); + } + sv + } + StorageChangeOp::SetWithoutQuota(v) => v.to_string(), + StorageChangeOp::Clear => unreachable!(), + }; + + log::trace!("saving data for '{}': writing", ext_id); + tx.execute_cached( + "INSERT INTO storage_sync_data(ext_id, data, sync_change_counter) + VALUES (:ext_id, :data, 1) + ON CONFLICT (ext_id) DO UPDATE + set data=:data, sync_change_counter = sync_change_counter + 1", + rusqlite::named_params! { + ":ext_id": ext_id, + ":data": &sval, + }, + )?; + } + Ok(()) +} + +fn remove_from_db(tx: &Transaction<'_>, ext_id: &str) -> Result<()> { + save_to_db(tx, ext_id, &StorageChangeOp::Clear) +} + +// This is a "helper struct" for the callback part of the chrome.storage spec, +// but shaped in a way to make it more convenient from the rust side of the +// world. +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct StorageValueChange { + #[serde(skip_serializing)] + pub key: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub old_value: Option<JsonValue>, + #[serde(skip_serializing_if = "Option::is_none")] + pub new_value: Option<JsonValue>, +} + +// This is, largely, a helper so that this serializes correctly as per the +// chrome.storage.sync spec. If not for custom serialization it should just +// be a plain vec +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct StorageChanges { + changes: Vec<StorageValueChange>, +} + +impl StorageChanges { + pub fn new() -> Self { + Self::default() + } + + pub fn with_capacity(n: usize) -> Self { + Self { + changes: Vec::with_capacity(n), + } + } + + pub fn is_empty(&self) -> bool { + self.changes.is_empty() + } + + pub fn push(&mut self, change: StorageValueChange) { + self.changes.push(change) + } +} + +// and it serializes as a map. +impl Serialize for StorageChanges { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let mut map = serializer.serialize_map(Some(self.changes.len()))?; + for change in &self.changes { + map.serialize_entry(&change.key, change)?; + } + map.end() + } +} + +// A helper to determine the size of a key/value combination from the +// perspective of quota and getBytesInUse(). +pub fn get_quota_size_of(key: &str, v: &JsonValue) -> usize { + // Reading the chrome docs literally re the quota, the length of the key + // is just the string len, but the value is the json val, as bytes. + key.len() + v.to_string().len() +} + +/// The implementation of `storage[.sync].set()`. On success this returns the +/// StorageChanges defined by the chrome API - it's assumed the caller will +/// arrange to deliver this to observers as defined in that API. +pub fn set(tx: &Transaction<'_>, ext_id: &str, val: JsonValue) -> Result<StorageChanges> { + let val_map = match val { + JsonValue::Object(m) => m, + // Not clear what the error semantics should be yet. For now, pretend an empty map. + _ => Map::new(), + }; + + let mut current = get_from_db(tx, ext_id)?.unwrap_or_default(); + + let mut changes = StorageChanges::with_capacity(val_map.len()); + + // iterate over the value we are adding/updating. + for (k, v) in val_map.into_iter() { + let old_value = current.remove(&k); + if current.len() >= SYNC_MAX_ITEMS { + return Err(ErrorKind::QuotaError(QuotaReason::MaxItems).into()); + } + // Reading the chrome docs literally re the quota, the length of the key + // is just the string len, but the value is the json val, as bytes + if get_quota_size_of(&k, &v) > SYNC_QUOTA_BYTES_PER_ITEM { + return Err(ErrorKind::QuotaError(QuotaReason::ItemBytes).into()); + } + let change = StorageValueChange { + key: k.clone(), + old_value, + new_value: Some(v.clone()), + }; + changes.push(change); + current.insert(k, v); + } + + save_to_db( + tx, + ext_id, + &StorageChangeOp::Set(JsonValue::Object(current)), + )?; + Ok(changes) +} + +// A helper which takes a param indicating what keys should be returned and +// converts that to a vec of real strings. Also returns "default" values to +// be used if no item exists for that key. +fn get_keys(keys: JsonValue) -> Vec<(String, Option<JsonValue>)> { + match keys { + JsonValue::String(s) => vec![(s, None)], + JsonValue::Array(keys) => { + // because nothing with json is ever simple, each key may not be + // a string. We ignore any which aren't. + keys.iter() + .filter_map(|v| v.as_str().map(|s| (s.to_string(), None))) + .collect() + } + JsonValue::Object(m) => m.into_iter().map(|(k, d)| (k, Some(d))).collect(), + _ => vec![], + } +} + +/// The implementation of `storage[.sync].get()` - on success this always +/// returns a Json object. +pub fn get(conn: &Connection, ext_id: &str, keys: JsonValue) -> Result<JsonValue> { + // key is optional, or string or array of string or object keys + let maybe_existing = get_from_db(conn, ext_id)?; + let mut existing = match (maybe_existing, keys.is_object()) { + (None, true) => return Ok(keys), + (None, false) => return Ok(JsonValue::Object(Map::new())), + (Some(v), _) => v, + }; + // take the quick path for null, where we just return the entire object. + if keys.is_null() { + return Ok(JsonValue::Object(existing)); + } + // OK, so we need to build a list of keys to get. + let keys_and_defaults = get_keys(keys); + let mut result = Map::with_capacity(keys_and_defaults.len()); + for (key, maybe_default) in keys_and_defaults { + if let Some(v) = existing.remove(&key) { + result.insert(key, v); + } else if let Some(def) = maybe_default { + result.insert(key, def); + } + // else |keys| is a string/array instead of an object with defaults. + // Don't include keys without default values. + } + Ok(JsonValue::Object(result)) +} + +/// The implementation of `storage[.sync].remove()`. On success this returns the +/// StorageChanges defined by the chrome API - it's assumed the caller will +/// arrange to deliver this to observers as defined in that API. +pub fn remove(tx: &Transaction<'_>, ext_id: &str, keys: JsonValue) -> Result<StorageChanges> { + let mut existing = match get_from_db(tx, ext_id)? { + None => return Ok(StorageChanges::new()), + Some(v) => v, + }; + + // Note: get_keys parses strings, arrays and objects, but remove() + // is expected to only be passed a string or array of strings. + let keys_and_defs = get_keys(keys); + + let mut result = StorageChanges::with_capacity(keys_and_defs.len()); + for (key, _) in keys_and_defs { + if let Some(v) = existing.remove(&key) { + result.push(StorageValueChange { + key, + old_value: Some(v), + new_value: None, + }); + } + } + if !result.is_empty() { + save_to_db( + tx, + ext_id, + &StorageChangeOp::SetWithoutQuota(JsonValue::Object(existing)), + )?; + } + Ok(result) +} + +/// The implementation of `storage[.sync].clear()`. On success this returns the +/// StorageChanges defined by the chrome API - it's assumed the caller will +/// arrange to deliver this to observers as defined in that API. +pub fn clear(tx: &Transaction<'_>, ext_id: &str) -> Result<StorageChanges> { + let existing = match get_from_db(tx, ext_id)? { + None => return Ok(StorageChanges::new()), + Some(v) => v, + }; + let mut result = StorageChanges::with_capacity(existing.len()); + for (key, val) in existing.into_iter() { + result.push(StorageValueChange { + key: key.to_string(), + new_value: None, + old_value: Some(val), + }); + } + remove_from_db(tx, ext_id)?; + Ok(result) +} + +/// The implementation of `storage[.sync].getBytesInUse()`. +pub fn get_bytes_in_use(conn: &Connection, ext_id: &str, keys: JsonValue) -> Result<usize> { + let maybe_existing = get_from_db(conn, ext_id)?; + let existing = match maybe_existing { + None => return Ok(0), + Some(v) => v, + }; + // Make an array of all the keys we we are going to count. + let keys: Vec<&str> = match &keys { + JsonValue::Null => existing.keys().map(|v| v.as_str()).collect(), + JsonValue::String(name) => vec![name.as_str()], + JsonValue::Array(names) => names.iter().filter_map(|v| v.as_str()).collect(), + // in the spirit of json-based APIs, silently ignore strange things. + _ => return Ok(0), + }; + // We must use the same way of counting as our quota enforcement. + let mut size = 0; + for key in keys.into_iter() { + if let Some(v) = existing.get(key) { + size += get_quota_size_of(key, v); + } + } + Ok(size) +} + +/// Information about the usage of a single extension. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct UsageInfo { + /// The extension id. + pub ext_id: String, + /// The number of keys the extension uses. + pub num_keys: usize, + /// The number of bytes used by the extension. This result is somewhat rough + /// -- it doesn't bother counting the size of the extension ID, or data in + /// the mirror, and favors returning the exact number of bytes used by the + /// column (that is, the size of the JSON object) rather than replicating + /// the `get_bytes_in_use` return value for all keys. + pub num_bytes: usize, +} + +/// Exposes information about per-collection usage for the purpose of telemetry. +/// (Doesn't map to an actual `chrome.storage.sync` API). +pub fn usage(db: &Connection) -> Result<Vec<UsageInfo>> { + type JsonObject = Map<String, JsonValue>; + let sql = " + SELECT ext_id, data + FROM storage_sync_data + WHERE data IS NOT NULL + -- for tests and determinism + ORDER BY ext_id + "; + db.query_rows_into(sql, [], |row| { + let ext_id: String = row.get("ext_id")?; + let data: String = row.get("data")?; + let num_bytes = data.len(); + let num_keys = serde_json::from_str::<JsonObject>(&data)?.len(); + Ok(UsageInfo { + ext_id, + num_keys, + num_bytes, + }) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::db::test::new_mem_db; + use serde_json::json; + + #[test] + fn test_serialize_storage_changes() -> Result<()> { + let c = StorageChanges { + changes: vec![StorageValueChange { + key: "key".to_string(), + old_value: Some(json!("old")), + new_value: None, + }], + }; + assert_eq!(serde_json::to_string(&c)?, r#"{"key":{"oldValue":"old"}}"#); + let c = StorageChanges { + changes: vec![StorageValueChange { + key: "key".to_string(), + old_value: None, + new_value: Some(json!({"foo": "bar"})), + }], + }; + assert_eq!( + serde_json::to_string(&c)?, + r#"{"key":{"newValue":{"foo":"bar"}}}"# + ); + Ok(()) + } + + fn make_changes(changes: &[(&str, Option<JsonValue>, Option<JsonValue>)]) -> StorageChanges { + let mut r = StorageChanges::with_capacity(changes.len()); + for (name, old_value, new_value) in changes { + r.push(StorageValueChange { + key: (*name).to_string(), + old_value: old_value.clone(), + new_value: new_value.clone(), + }); + } + r + } + + #[test] + fn test_simple() -> Result<()> { + let ext_id = "x"; + let mut db = new_mem_db(); + let tx = db.transaction()?; + + // an empty store. + for q in vec![JsonValue::Null, json!("foo"), json!(["foo"])].into_iter() { + assert_eq!(get(&tx, ext_id, q)?, json!({})); + } + + // Default values in an empty store. + for q in vec![json!({ "foo": null }), json!({"foo": "default"})].into_iter() { + assert_eq!(get(&tx, ext_id, q.clone())?, q.clone()); + } + + // Single item in the store. + set(&tx, ext_id, json!({"foo": "bar" }))?; + for q in vec![ + JsonValue::Null, + json!("foo"), + json!(["foo"]), + json!({ "foo": null }), + json!({"foo": "default"}), + ] + .into_iter() + { + assert_eq!(get(&tx, ext_id, q)?, json!({"foo": "bar" })); + } + + // Default values in a non-empty store. + for q in vec![ + json!({ "non_existing_key": null }), + json!({"non_existing_key": 0}), + json!({"non_existing_key": false}), + json!({"non_existing_key": "default"}), + json!({"non_existing_key": ["array"]}), + json!({"non_existing_key": {"objectkey": "value"}}), + ] + .into_iter() + { + assert_eq!(get(&tx, ext_id, q.clone())?, q.clone()); + } + + // more complex stuff, including changes checking. + assert_eq!( + set(&tx, ext_id, json!({"foo": "new", "other": "also new" }))?, + make_changes(&[ + ("foo", Some(json!("bar")), Some(json!("new"))), + ("other", None, Some(json!("also new"))) + ]) + ); + assert_eq!( + get(&tx, ext_id, JsonValue::Null)?, + json!({"foo": "new", "other": "also new"}) + ); + assert_eq!(get(&tx, ext_id, json!("foo"))?, json!({"foo": "new"})); + assert_eq!( + get(&tx, ext_id, json!(["foo", "other"]))?, + json!({"foo": "new", "other": "also new"}) + ); + assert_eq!( + get(&tx, ext_id, json!({"foo": null, "default": "yo"}))?, + json!({"foo": "new", "default": "yo"}) + ); + + assert_eq!( + remove(&tx, ext_id, json!("foo"))?, + make_changes(&[("foo", Some(json!("new")), None)]), + ); + + assert_eq!( + set(&tx, ext_id, json!({"foo": {"sub-object": "sub-value"}}))?, + make_changes(&[("foo", None, Some(json!({"sub-object": "sub-value"}))),]) + ); + + // XXX - other variants. + + assert_eq!( + clear(&tx, ext_id)?, + make_changes(&[ + ("foo", Some(json!({"sub-object": "sub-value"})), None), + ("other", Some(json!("also new")), None), + ]), + ); + assert_eq!(get(&tx, ext_id, JsonValue::Null)?, json!({})); + + Ok(()) + } + + #[test] + fn test_check_get_impl() -> Result<()> { + // This is a port of checkGetImpl in test_ext_storage.js in Desktop. + let ext_id = "x"; + let mut db = new_mem_db(); + let tx = db.transaction()?; + + let prop = "test-prop"; + let value = "test-value"; + + set(&tx, ext_id, json!({ prop: value }))?; + + // this is the checkGetImpl part! + let mut data = get(&tx, ext_id, json!(null))?; + assert_eq!(value, json!(data[prop]), "null getter worked for {}", prop); + + data = get(&tx, ext_id, json!(prop))?; + assert_eq!( + value, + json!(data[prop]), + "string getter worked for {}", + prop + ); + assert_eq!( + data.as_object().unwrap().len(), + 1, + "string getter should return an object with a single property" + ); + + data = get(&tx, ext_id, json!([prop]))?; + assert_eq!(value, json!(data[prop]), "array getter worked for {}", prop); + assert_eq!( + data.as_object().unwrap().len(), + 1, + "array getter with a single key should return an object with a single property" + ); + + // checkGetImpl() uses `{ [prop]: undefined }` - but json!() can't do that :( + // Hopefully it's just testing a simple object, so we use `{ prop: null }` + data = get(&tx, ext_id, json!({ prop: null }))?; + assert_eq!( + value, + json!(data[prop]), + "object getter worked for {}", + prop + ); + assert_eq!( + data.as_object().unwrap().len(), + 1, + "object getter with a single key should return an object with a single property" + ); + + Ok(()) + } + + #[test] + fn test_bug_1621162() -> Result<()> { + // apparently Firefox, unlike Chrome, will not optimize the changes. + // See bug 1621162 for more! + let mut db = new_mem_db(); + let tx = db.transaction()?; + let ext_id = "xyz"; + + set(&tx, ext_id, json!({"foo": "bar" }))?; + + assert_eq!( + set(&tx, ext_id, json!({"foo": "bar" }))?, + make_changes(&[("foo", Some(json!("bar")), Some(json!("bar")))]), + ); + Ok(()) + } + + #[test] + fn test_quota_maxitems() -> Result<()> { + let mut db = new_mem_db(); + let tx = db.transaction()?; + let ext_id = "xyz"; + for i in 1..SYNC_MAX_ITEMS + 1 { + set( + &tx, + ext_id, + json!({ format!("key-{}", i): format!("value-{}", i) }), + )?; + } + let e = set(&tx, ext_id, json!({"another": "another"})).unwrap_err(); + match e.kind() { + ErrorKind::QuotaError(QuotaReason::MaxItems) => {} + _ => panic!("unexpected error type"), + }; + Ok(()) + } + + #[test] + fn test_quota_bytesperitem() -> Result<()> { + let mut db = new_mem_db(); + let tx = db.transaction()?; + let ext_id = "xyz"; + // A string 5 bytes less than the max. This should be counted as being + // 3 bytes less than the max as the quotes are counted. Plus the length + // of the key (no quotes) means we should come in 2 bytes under. + let val = "x".repeat(SYNC_QUOTA_BYTES_PER_ITEM - 5); + + // Key length doesn't push it over. + set(&tx, ext_id, json!({ "x": val }))?; + assert_eq!( + get_bytes_in_use(&tx, ext_id, json!("x"))?, + SYNC_QUOTA_BYTES_PER_ITEM - 2 + ); + + // Key length does push it over. + let e = set(&tx, ext_id, json!({ "xxxx": val })).unwrap_err(); + match e.kind() { + ErrorKind::QuotaError(QuotaReason::ItemBytes) => {} + _ => panic!("unexpected error type"), + }; + Ok(()) + } + + #[test] + fn test_quota_bytes() -> Result<()> { + let mut db = new_mem_db(); + let tx = db.transaction()?; + let ext_id = "xyz"; + let val = "x".repeat(SYNC_QUOTA_BYTES + 1); + + // Init an over quota db with a single key. + save_to_db( + &tx, + ext_id, + &StorageChangeOp::SetWithoutQuota(json!({ "x": val })), + )?; + + // Adding more data fails. + let e = set(&tx, ext_id, json!({ "y": "newvalue" })).unwrap_err(); + match e.kind() { + ErrorKind::QuotaError(QuotaReason::TotalBytes) => {} + _ => panic!("unexpected error type"), + }; + + // Remove data does not fails. + remove(&tx, ext_id, json!["x"])?; + + // Restore the over quota data. + save_to_db( + &tx, + ext_id, + &StorageChangeOp::SetWithoutQuota(json!({ "y": val })), + )?; + + // Overwrite with less data does not fail. + set(&tx, ext_id, json!({ "y": "lessdata" }))?; + + Ok(()) + } + + #[test] + fn test_get_bytes_in_use() -> Result<()> { + let mut db = new_mem_db(); + let tx = db.transaction()?; + let ext_id = "xyz"; + + assert_eq!(get_bytes_in_use(&tx, ext_id, json!(null))?, 0); + + set(&tx, ext_id, json!({ "a": "a" }))?; // should be 4 + set(&tx, ext_id, json!({ "b": "bb" }))?; // should be 5 + set(&tx, ext_id, json!({ "c": "ccc" }))?; // should be 6 + set(&tx, ext_id, json!({ "n": 999_999 }))?; // should be 7 + + assert_eq!(get_bytes_in_use(&tx, ext_id, json!("x"))?, 0); + assert_eq!(get_bytes_in_use(&tx, ext_id, json!("a"))?, 4); + assert_eq!(get_bytes_in_use(&tx, ext_id, json!("b"))?, 5); + assert_eq!(get_bytes_in_use(&tx, ext_id, json!("c"))?, 6); + assert_eq!(get_bytes_in_use(&tx, ext_id, json!("n"))?, 7); + + assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a"]))?, 4); + assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a", "x"]))?, 4); + assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a", "b"]))?, 9); + assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a", "c"]))?, 10); + + assert_eq!( + get_bytes_in_use(&tx, ext_id, json!(["a", "b", "c", "n"]))?, + 22 + ); + assert_eq!(get_bytes_in_use(&tx, ext_id, json!(null))?, 22); + Ok(()) + } + + #[test] + fn test_usage() { + let mut db = new_mem_db(); + let tx = db.transaction().unwrap(); + // '{"a":"a","b":"bb","c":"ccc","n":999999}': 39 bytes + set(&tx, "xyz", json!({ "a": "a" })).unwrap(); + set(&tx, "xyz", json!({ "b": "bb" })).unwrap(); + set(&tx, "xyz", json!({ "c": "ccc" })).unwrap(); + set(&tx, "xyz", json!({ "n": 999_999 })).unwrap(); + + // '{"a":"a"}': 9 bytes + set(&tx, "abc", json!({ "a": "a" })).unwrap(); + + tx.commit().unwrap(); + + let usage = usage(&db).unwrap(); + let expect = [ + UsageInfo { + ext_id: "abc".to_string(), + num_keys: 1, + num_bytes: 9, + }, + UsageInfo { + ext_id: "xyz".to_string(), + num_keys: 4, + num_bytes: 39, + }, + ]; + assert_eq!(&usage, &expect); + } +} diff --git a/third_party/rust/webext-storage/src/db.rs b/third_party/rust/webext-storage/src/db.rs new file mode 100644 index 0000000000..c88dc99959 --- /dev/null +++ b/third_party/rust/webext-storage/src/db.rs @@ -0,0 +1,301 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::*; +use crate::schema; +use interrupt_support::{SqlInterruptHandle, SqlInterruptScope}; +use parking_lot::Mutex; +use rusqlite::types::{FromSql, ToSql}; +use rusqlite::Connection; +use rusqlite::OpenFlags; +use sql_support::open_database::open_database_with_flags; +use sql_support::ConnExt; +use std::ops::{Deref, DerefMut}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use url::Url; + +/// A `StorageDb` wraps a read-write SQLite connection, and handles schema +/// migrations and recovering from database file corruption. It can be used +/// anywhere a `rusqlite::Connection` is expected, thanks to its `Deref{Mut}` +/// implementations. +/// +/// We only support a single writer connection - so that's the only thing we +/// store. It's still a bit overkill, but there's only so many yaks in a day. +pub struct StorageDb { + writer: Connection, + interrupt_handle: Arc<SqlInterruptHandle>, +} +impl StorageDb { + /// Create a new, or fetch an already open, StorageDb backed by a file on disk. + pub fn new(db_path: impl AsRef<Path>) -> Result<Self> { + let db_path = normalize_path(db_path)?; + Self::new_named(db_path) + } + + /// Create a new, or fetch an already open, memory-based StorageDb. You must + /// provide a name, but you are still able to have a single writer and many + /// reader connections to the same memory DB open. + #[cfg(test)] + pub fn new_memory(db_path: &str) -> Result<Self> { + let name = PathBuf::from(format!("file:{}?mode=memory&cache=shared", db_path)); + Self::new_named(name) + } + + fn new_named(db_path: PathBuf) -> Result<Self> { + // We always create the read-write connection for an initial open so + // we can create the schema and/or do version upgrades. + let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX + | OpenFlags::SQLITE_OPEN_URI + | OpenFlags::SQLITE_OPEN_CREATE + | OpenFlags::SQLITE_OPEN_READ_WRITE; + + let conn = open_database_with_flags(db_path, flags, &schema::WebExtMigrationLogin)?; + Ok(Self { + interrupt_handle: Arc::new(SqlInterruptHandle::new(&conn)), + writer: conn, + }) + } + + pub fn interrupt_handle(&self) -> Arc<SqlInterruptHandle> { + Arc::clone(&self.interrupt_handle) + } + + #[allow(dead_code)] + pub fn begin_interrupt_scope(&self) -> Result<SqlInterruptScope> { + Ok(self.interrupt_handle.begin_interrupt_scope()?) + } + + /// Closes the database connection. If there are any unfinalized prepared + /// statements on the connection, `close` will fail and the `StorageDb` will + /// remain open and the connection will be leaked - we used to return the + /// underlying connection so the caller can retry but (a) that's very tricky + /// in an Arc<Mutex<>> world and (b) we never actually took advantage of + /// that retry capability. + pub fn close(self) -> Result<()> { + self.writer.close().map_err(|(writer, err)| { + // In rusqlite 0.28.0 and earlier, if we just let `writer` drop, + // the close would panic on failure. + // Later rusqlite versions will not panic, but this behavior doesn't + // hurt there. + std::mem::forget(writer); + err.into() + }) + } +} + +impl Deref for StorageDb { + type Target = Connection; + + fn deref(&self) -> &Self::Target { + &self.writer + } +} + +impl DerefMut for StorageDb { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.writer + } +} + +// We almost exclusively use this ThreadSafeStorageDb +pub struct ThreadSafeStorageDb { + db: Mutex<StorageDb>, + // This "outer" interrupt_handle not protected by the mutex means + // consumers can interrupt us when the mutex is held - which it always will + // be if we are doing anything interruptable! + interrupt_handle: Arc<SqlInterruptHandle>, +} + +impl ThreadSafeStorageDb { + pub fn new(db: StorageDb) -> Self { + Self { + interrupt_handle: db.interrupt_handle(), + db: Mutex::new(db), + } + } + + pub fn interrupt_handle(&self) -> Arc<SqlInterruptHandle> { + Arc::clone(&self.interrupt_handle) + } + + pub fn begin_interrupt_scope(&self) -> Result<SqlInterruptScope> { + Ok(self.interrupt_handle.begin_interrupt_scope()?) + } + + pub fn into_inner(self) -> StorageDb { + self.db.into_inner() + } +} + +// Deref to a Mutex<StorageDb>, which is how we will use ThreadSafeStorageDb most of the time +impl Deref for ThreadSafeStorageDb { + type Target = Mutex<StorageDb>; + + #[inline] + fn deref(&self) -> &Mutex<StorageDb> { + &self.db + } +} + +// Also implement AsRef<SqlInterruptHandle> so that we can interrupt this at shutdown +impl AsRef<SqlInterruptHandle> for ThreadSafeStorageDb { + fn as_ref(&self) -> &SqlInterruptHandle { + &self.interrupt_handle + } +} + +pub(crate) mod sql_fns { + use rusqlite::{functions::Context, Result}; + use sync_guid::Guid as SyncGuid; + + #[inline(never)] + pub fn generate_guid(_ctx: &Context<'_>) -> Result<SyncGuid> { + Ok(SyncGuid::random()) + } +} + +// These should be somewhere else... +pub fn put_meta(db: &Connection, key: &str, value: &dyn ToSql) -> Result<()> { + db.conn().execute_cached( + "REPLACE INTO meta (key, value) VALUES (:key, :value)", + rusqlite::named_params! { ":key": key, ":value": value }, + )?; + Ok(()) +} + +pub fn get_meta<T: FromSql>(db: &Connection, key: &str) -> Result<Option<T>> { + let res = db.conn().try_query_one( + "SELECT value FROM meta WHERE key = :key", + &[(":key", &key)], + true, + )?; + Ok(res) +} + +pub fn delete_meta(db: &Connection, key: &str) -> Result<()> { + db.conn() + .execute_cached("DELETE FROM meta WHERE key = :key", &[(":key", &key)])?; + Ok(()) +} + +// Utilities for working with paths. +// (From places_utils - ideally these would be shared, but the use of +// ErrorKind values makes that non-trivial. + +/// `Path` is basically just a `str` with no validation, and so in practice it +/// could contain a file URL. Rusqlite takes advantage of this a bit, and says +/// `AsRef<Path>` but really means "anything sqlite can take as an argument". +/// +/// Swift loves using file urls (the only support it has for file manipulation +/// is through file urls), so it's handy to support them if possible. +fn unurl_path(p: impl AsRef<Path>) -> PathBuf { + p.as_ref() + .to_str() + .and_then(|s| Url::parse(s).ok()) + .and_then(|u| { + if u.scheme() == "file" { + u.to_file_path().ok() + } else { + None + } + }) + .unwrap_or_else(|| p.as_ref().to_owned()) +} + +/// If `p` is a file URL, return it, otherwise try and make it one. +/// +/// Errors if `p` is a relative non-url path, or if it's a URL path +/// that's isn't a `file:` URL. +#[allow(dead_code)] +pub fn ensure_url_path(p: impl AsRef<Path>) -> Result<Url> { + if let Some(u) = p.as_ref().to_str().and_then(|s| Url::parse(s).ok()) { + if u.scheme() == "file" { + Ok(u) + } else { + Err(ErrorKind::IllegalDatabasePath(p.as_ref().to_owned()).into()) + } + } else { + let p = p.as_ref(); + let u = Url::from_file_path(p).map_err(|_| ErrorKind::IllegalDatabasePath(p.to_owned()))?; + Ok(u) + } +} + +/// As best as possible, convert `p` into an absolute path, resolving +/// all symlinks along the way. +/// +/// If `p` is a file url, it's converted to a path before this. +fn normalize_path(p: impl AsRef<Path>) -> Result<PathBuf> { + let path = unurl_path(p); + if let Ok(canonical) = path.canonicalize() { + return Ok(canonical); + } + // It probably doesn't exist yet. This is an error, although it seems to + // work on some systems. + // + // We resolve this by trying to canonicalize the parent directory, and + // appending the requested file name onto that. If we can't canonicalize + // the parent, we return an error. + // + // Also, we return errors if the path ends in "..", if there is no + // parent directory, etc. + let file_name = path + .file_name() + .ok_or_else(|| ErrorKind::IllegalDatabasePath(path.clone()))?; + + let parent = path + .parent() + .ok_or_else(|| ErrorKind::IllegalDatabasePath(path.clone()))?; + + let mut canonical = parent.canonicalize()?; + canonical.push(file_name); + Ok(canonical) +} + +// Helpers for tests +#[cfg(test)] +pub mod test { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + // A helper for our tests to get their own memory Api. + static ATOMIC_COUNTER: AtomicUsize = AtomicUsize::new(0); + + pub fn new_mem_db() -> StorageDb { + let _ = env_logger::try_init(); + let counter = ATOMIC_COUNTER.fetch_add(1, Ordering::Relaxed); + StorageDb::new_memory(&format!("test-api-{}", counter)).expect("should get an API") + } + + pub fn new_mem_thread_safe_storage_db() -> Arc<ThreadSafeStorageDb> { + Arc::new(ThreadSafeStorageDb::new(new_mem_db())) + } +} + +#[cfg(test)] +mod tests { + use super::test::*; + use super::*; + + // Sanity check that we can create a database. + #[test] + fn test_open() { + new_mem_db(); + // XXX - should we check anything else? Seems a bit pointless, but if + // we move the meta functions away from here then it's better than + // nothing. + } + + #[test] + fn test_meta() -> Result<()> { + let writer = new_mem_db(); + assert_eq!(get_meta::<String>(&writer, "foo")?, None); + put_meta(&writer, "foo", &"bar".to_string())?; + assert_eq!(get_meta(&writer, "foo")?, Some("bar".to_string())); + delete_meta(&writer, "foo")?; + assert_eq!(get_meta::<String>(&writer, "foo")?, None); + Ok(()) + } +} diff --git a/third_party/rust/webext-storage/src/error.rs b/third_party/rust/webext-storage/src/error.rs new file mode 100644 index 0000000000..1a2e650cd0 --- /dev/null +++ b/third_party/rust/webext-storage/src/error.rs @@ -0,0 +1,72 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use interrupt_support::Interrupted; + +#[derive(Debug)] +pub enum QuotaReason { + TotalBytes, + ItemBytes, + MaxItems, +} + +#[derive(Debug, thiserror::Error)] +pub enum ErrorKind { + #[error("Quota exceeded: {0:?}")] + QuotaError(QuotaReason), + + #[error("Error parsing JSON data: {0}")] + JsonError(#[from] serde_json::Error), + + #[error("Error executing SQL: {0}")] + SqlError(#[from] rusqlite::Error), + + #[error("A connection of this type is already open")] + ConnectionAlreadyOpen, + + #[error("An invalid connection type was specified")] + InvalidConnectionType, + + #[error("IO error: {0}")] + IoError(#[from] std::io::Error), + + #[error("Operation interrupted")] + InterruptedError(#[from] Interrupted), + + #[error("Tried to close connection on wrong StorageApi instance")] + WrongApiForClose, + + // This will happen if you provide something absurd like + // "/" or "" as your database path. For more subtley broken paths, + // we'll likely return an IoError. + #[error("Illegal database path: {0:?}")] + IllegalDatabasePath(std::path::PathBuf), + + #[error("UTF8 Error: {0}")] + Utf8Error(#[from] std::str::Utf8Error), + + #[error("Error opening database: {0}")] + OpenDatabaseError(#[from] sql_support::open_database::Error), + + // When trying to close a connection but we aren't the exclusive owner of the containing Arc<> + #[error("Other shared references to this connection are alive")] + OtherConnectionReferencesExist, + + #[error("The storage database has been closed")] + DatabaseConnectionClosed, + + #[error("Sync Error: {0}")] + SyncError(String), +} + +error_support::define_error! { + ErrorKind { + (JsonError, serde_json::Error), + (SqlError, rusqlite::Error), + (IoError, std::io::Error), + (InterruptedError, Interrupted), + (Utf8Error, std::str::Utf8Error), + (OpenDatabaseError, sql_support::open_database::Error), + } +} diff --git a/third_party/rust/webext-storage/src/ffi.rs b/third_party/rust/webext-storage/src/ffi.rs new file mode 100644 index 0000000000..dc4af8ed6e --- /dev/null +++ b/third_party/rust/webext-storage/src/ffi.rs @@ -0,0 +1,48 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/// This module contains glue for the FFI, including error codes and a +/// `From<Error>` implementation for `ffi_support::ExternError`. These must be +/// implemented in this crate, and not `webext_storage_ffi`, because of Rust's +/// orphan rule for implementing traits (`webext_storage_ffi` can't implement +/// a trait in `ffi_support` for a type in `webext_storage`). +use ffi_support::{ErrorCode, ExternError}; + +use crate::error::{Error, ErrorKind, QuotaReason}; + +mod error_codes { + /// An unexpected error occurred which likely cannot be meaningfully handled + /// by the application. + pub const UNEXPECTED: i32 = 1; + + /// The application passed an invalid JSON string for a storage key or value. + pub const INVALID_JSON: i32 = 2; + + /// The total number of bytes stored in the database for this extension, + /// counting all key-value pairs serialized to JSON, exceeds the allowed limit. + pub const QUOTA_TOTAL_BYTES_EXCEEDED: i32 = 32; + + /// A single key-value pair exceeds the allowed byte limit when serialized + /// to JSON. + pub const QUOTA_ITEM_BYTES_EXCEEDED: i32 = 32 + 1; + + /// The total number of key-value pairs stored for this extension exceeded the + /// allowed limit. + pub const QUOTA_MAX_ITEMS_EXCEEDED: i32 = 32 + 2; +} + +impl From<Error> for ExternError { + fn from(err: Error) -> ExternError { + let code = ErrorCode::new(match err.kind() { + ErrorKind::JsonError(_) => error_codes::INVALID_JSON, + ErrorKind::QuotaError(QuotaReason::TotalBytes) => { + error_codes::QUOTA_TOTAL_BYTES_EXCEEDED + } + ErrorKind::QuotaError(QuotaReason::ItemBytes) => error_codes::QUOTA_ITEM_BYTES_EXCEEDED, + ErrorKind::QuotaError(QuotaReason::MaxItems) => error_codes::QUOTA_MAX_ITEMS_EXCEEDED, + _ => error_codes::UNEXPECTED, + }); + ExternError::new_error(code, err.to_string()) + } +} diff --git a/third_party/rust/webext-storage/src/lib.rs b/third_party/rust/webext-storage/src/lib.rs new file mode 100644 index 0000000000..2e2554781d --- /dev/null +++ b/third_party/rust/webext-storage/src/lib.rs @@ -0,0 +1,26 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#![allow(unknown_lints)] +#![warn(rust_2018_idioms)] + +mod api; +mod db; +pub mod error; +mod ffi; +mod migration; +mod schema; +pub mod store; +mod sync; + +pub use migration::MigrationInfo; + +// We publish some constants from non-public modules. +pub use sync::STORAGE_VERSION; + +pub use api::SYNC_MAX_ITEMS; +pub use api::SYNC_QUOTA_BYTES; +pub use api::SYNC_QUOTA_BYTES_PER_ITEM; + +pub use api::UsageInfo; diff --git a/third_party/rust/webext-storage/src/migration.rs b/third_party/rust/webext-storage/src/migration.rs new file mode 100644 index 0000000000..27f230ef47 --- /dev/null +++ b/third_party/rust/webext-storage/src/migration.rs @@ -0,0 +1,454 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::*; +use rusqlite::{named_params, Connection, OpenFlags, Transaction}; +use serde_json::{Map, Value}; +use sql_support::ConnExt; +use std::collections::HashSet; +use std::path::Path; + +// Simple migration from the "old" kinto-with-sqlite-backing implementation +// to ours. +// Could almost be trivially done in JS using the regular public API if not +// for: +// * We don't want to enforce the same quotas when migrating. +// * We'd rather do the entire migration in a single transaction for perf +// reasons. + +// The sqlite database we migrate from has a very simple structure: +// * table collection_data with columns collection_name, record_id and record +// * `collection_name` is a string of form "default/{extension_id}" +// * `record_id` is `key-{key}` +// * `record` is a string with json, of form: { +// id: {the record id repeated}, +// key: {the key}, +// data: {the actual data}, +// _status: {sync status}, +// last_modified: {timestamp}, +// } +// So the info we need is stored somewhat redundantly. +// Further: +// * There's a special collection_name "default/storage-sync-crypto" that +// we don't want to migrate. Its record_id is 'keys' and its json has no +// `data` + +// Note we don't enforce a quota - we migrate everything - even if this means +// it's too big for the server to store. This is a policy decision - it's better +// to not lose data than to try and work out what data can be disposed of, as +// the addon has the ability to determine this. + +// Our error strategy is "ignore read errors, propagate write errors" under the +// assumption that the former tends to mean a damaged DB or file-system and is +// unlikely to work if we try later (eg, replacing the disk isn't likely to +// uncorrupt the DB), where the latter is likely to be disk-space or file-system +// error, but retry might work (eg, replacing the disk then trying again might +// make the writes work) + +// The struct we read from the DB. +struct LegacyRow { + col_name: String, // collection_name column + record: String, // record column +} + +impl LegacyRow { + // Parse the 2 columns from the DB into the data we need to insert into + // our target database. + fn parse(&self) -> Option<Parsed<'_>> { + if self.col_name.len() < 8 { + log::trace!("collection_name of '{}' is too short", self.col_name); + return None; + } + if &self.col_name[..8] != "default/" { + log::trace!("collection_name of '{}' isn't ours", self.col_name); + return None; + } + let ext_id = &self.col_name[8..]; + let mut record_map = match serde_json::from_str(&self.record) { + Ok(Value::Object(m)) => m, + Ok(o) => { + log::info!("skipping non-json-object 'record' column"); + log::trace!("record value is json, but not an object: {}", o); + return None; + } + Err(e) => { + log::info!("skipping non-json 'record' column"); + log::trace!("record value isn't json: {}", e); + return None; + } + }; + + let key = match record_map.remove("key") { + Some(Value::String(s)) if !s.is_empty() => s, + Some(o) => { + log::trace!("key is json but not a string: {}", o); + return None; + } + _ => { + log::trace!("key doesn't exist in the map"); + return None; + } + }; + let data = match record_map.remove("data") { + Some(d) => d, + _ => { + log::trace!("data doesn't exist in the map"); + return None; + } + }; + Some(Parsed { ext_id, key, data }) + } +} + +// The info we parse from the raw DB strings. +struct Parsed<'a> { + ext_id: &'a str, + key: String, + data: serde_json::Value, +} + +pub fn migrate(tx: &Transaction<'_>, filename: &Path) -> Result<MigrationInfo> { + // We do the grouping manually, collecting string values as we go. + let mut last_ext_id = "".to_string(); + let mut curr_values: Vec<(String, serde_json::Value)> = Vec::new(); + let (rows, mut mi) = read_rows(filename); + for row in rows { + log::trace!("processing '{}' - '{}'", row.col_name, row.record); + let parsed = match row.parse() { + Some(p) => p, + None => continue, + }; + // Do our "grouping" + if parsed.ext_id != last_ext_id { + if !last_ext_id.is_empty() && !curr_values.is_empty() { + // a different extension id - write what we have to the DB. + let entries = do_insert(tx, &last_ext_id, curr_values)?; + mi.extensions_successful += 1; + mi.entries_successful += entries; + } + last_ext_id = parsed.ext_id.to_string(); + curr_values = Vec::new(); + } + // no 'else' here - must also enter this block on ext_id change. + if parsed.ext_id == last_ext_id { + curr_values.push((parsed.key.to_string(), parsed.data)); + log::trace!( + "extension {} now has {} keys", + parsed.ext_id, + curr_values.len() + ); + } + } + // and the last one + if !last_ext_id.is_empty() && !curr_values.is_empty() { + // a different extension id - write what we have to the DB. + let entries = do_insert(tx, &last_ext_id, curr_values)?; + mi.extensions_successful += 1; + mi.entries_successful += entries; + } + log::info!("migrated {} extensions: {:?}", mi.extensions_successful, mi); + Ok(mi) +} + +fn read_rows(filename: &Path) -> (Vec<LegacyRow>, MigrationInfo) { + let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX | OpenFlags::SQLITE_OPEN_READ_ONLY; + let src_conn = match Connection::open_with_flags(filename, flags) { + Ok(conn) => conn, + Err(e) => { + log::warn!("Failed to open the source DB: {}", e); + return (Vec::new(), MigrationInfo::open_failure()); + } + }; + // Failure to prepare the statement probably just means the source DB is + // damaged. + let mut stmt = match src_conn.prepare( + "SELECT collection_name, record FROM collection_data + WHERE collection_name != 'default/storage-sync-crypto' + ORDER BY collection_name", + ) { + Ok(stmt) => stmt, + Err(e) => { + log::warn!("Failed to prepare the statement: {}", e); + return (Vec::new(), MigrationInfo::open_failure()); + } + }; + let rows = match stmt.query_and_then([], |row| -> Result<LegacyRow> { + Ok(LegacyRow { + col_name: row.get(0)?, + record: row.get(1)?, + }) + }) { + Ok(r) => r, + Err(e) => { + log::warn!("Failed to read any rows from the source DB: {}", e); + return (Vec::new(), MigrationInfo::open_failure()); + } + }; + let all_rows: Vec<Result<LegacyRow>> = rows.collect(); + let entries = all_rows.len(); + let successful_rows: Vec<LegacyRow> = all_rows.into_iter().filter_map(Result::ok).collect(); + let distinct_extensions: HashSet<_> = successful_rows.iter().map(|c| &c.col_name).collect(); + + let mi = MigrationInfo { + entries, + extensions: distinct_extensions.len(), + // Populated later. + extensions_successful: 0, + entries_successful: 0, + open_failure: false, + }; + + (successful_rows, mi) +} + +/// Insert the extension and values. If there are multiple values with the same +/// key (which shouldn't be possible but who knows, database corruption causes +/// strange things), chooses an arbitrary one. Returns the number of entries +/// inserted, which could be different from `vals.len()` if multiple entries in +/// `vals` have the same key. +fn do_insert(tx: &Transaction<'_>, ext_id: &str, vals: Vec<(String, Value)>) -> Result<usize> { + let mut map = Map::with_capacity(vals.len()); + for (key, val) in vals { + map.insert(key, val); + } + let num_entries = map.len(); + tx.execute_cached( + "INSERT OR REPLACE INTO storage_sync_data(ext_id, data, sync_change_counter) + VALUES (:ext_id, :data, 1)", + rusqlite::named_params! { + ":ext_id": &ext_id, + ":data": &Value::Object(map), + }, + )?; + Ok(num_entries) +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct MigrationInfo { + /// The number of entries (rows in the original table) we attempted to + /// migrate. Zero if there was some error in computing this number. + /// + /// Note that for the original table, a single row stores a single + /// preference for one extension. That is, if you view the set of + /// preferences for a given extension as a HashMap (as we do), it would be a + /// single entry/key-value-pair in the map. + pub entries: usize, + /// The number of records we successfully migrated (equal to `entries` for + /// entirely successful migrations). + pub entries_successful: usize, + /// The number of extensions (distinct extension ids) in the original + /// table. + pub extensions: usize, + /// The number of extensions we successfully migrated + pub extensions_successful: usize, + /// True iff we failed to open the source DB at all. + pub open_failure: bool, +} + +impl MigrationInfo { + /// Returns a MigrationInfo indicating that we failed to read any rows due + /// to some error case (e.g. the database open failed, or some other very + /// early read error). + fn open_failure() -> Self { + Self { + open_failure: true, + ..Self::default() + } + } + + const META_KEY: &'static str = "migration_info"; + + /// Store `self` in the provided database under `Self::META_KEY`. + pub(crate) fn store(&self, conn: &Connection) -> Result<()> { + let json = serde_json::to_string(self)?; + conn.execute( + "INSERT OR REPLACE INTO meta(key, value) VALUES (:k, :v)", + named_params! { + ":k": Self::META_KEY, + ":v": &json + }, + )?; + Ok(()) + } + + /// Get the MigrationInfo stored under `Self::META_KEY` (if any) out of the + /// DB, and delete it. + pub(crate) fn take(tx: &Transaction<'_>) -> Result<Option<Self>> { + let s = tx.try_query_one::<String, _>( + "SELECT value FROM meta WHERE key = :k", + named_params! { + ":k": Self::META_KEY, + }, + false, + )?; + tx.execute( + "DELETE FROM meta WHERE key = :k", + named_params! { + ":k": Self::META_KEY, + }, + )?; + if let Some(s) = s { + match serde_json::from_str(&s) { + Ok(v) => Ok(Some(v)), + Err(e) => { + // Force test failure, but just log an error otherwise so that + // we commit the transaction that wil. + debug_assert!(false, "Failed to read migration JSON: {:?}", e); + error_support::report_error!( + "webext-storage-migration-json", + "Failed to read migration JSON: {}", + e + ); + Ok(None) + } + } + } else { + Ok(None) + } + } +} +#[cfg(test)] +mod tests { + use super::*; + use crate::api; + use crate::db::{test::new_mem_db, StorageDb}; + use serde_json::json; + use tempfile::tempdir; + + fn init_source_db(path: impl AsRef<Path>, f: impl FnOnce(&Connection)) { + let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX + | OpenFlags::SQLITE_OPEN_CREATE + | OpenFlags::SQLITE_OPEN_READ_WRITE; + let mut conn = Connection::open_with_flags(path, flags).expect("open should work"); + let tx = conn.transaction().expect("should be able to get a tx"); + tx.execute_batch( + "CREATE TABLE collection_data ( + collection_name TEXT, + record_id TEXT, + record TEXT + );", + ) + .expect("create should work"); + f(&tx); + tx.commit().expect("should commit"); + conn.close().expect("close should work"); + } + + // Create a test database, populate it via the callback, migrate it, and + // return a connection to the new, migrated DB for further checking. + fn do_migrate<F>(expect_mi: MigrationInfo, f: F) -> StorageDb + where + F: FnOnce(&Connection), + { + let tmpdir = tempdir().unwrap(); + let path = tmpdir.path().join("source.db"); + init_source_db(path, f); + + // now migrate + let mut db = new_mem_db(); + let tx = db.transaction().expect("tx should work"); + + let mi = migrate(&tx, &tmpdir.path().join("source.db")).expect("migrate should work"); + tx.commit().expect("should work"); + assert_eq!(mi, expect_mi); + db + } + + fn assert_has(c: &Connection, ext_id: &str, expect: Value) { + assert_eq!( + api::get(c, ext_id, json!(null)).expect("should get"), + expect + ); + } + + const HAPPY_PATH_SQL: &str = r#" + INSERT INTO collection_data(collection_name, record) + VALUES + ('default/{e7fefcf3-b39c-4f17-5215-ebfe120a7031}', '{"id":"key-userWelcomed","key":"userWelcomed","data":1570659224457,"_status":"synced","last_modified":1579755940527}'), + ('default/{e7fefcf3-b39c-4f17-5215-ebfe120a7031}', '{"id":"key-isWho","key":"isWho","data":"4ec8109f","_status":"synced","last_modified":1579755940497}'), + ('default/storage-sync-crypto', '{"id":"keys","keys":{"default":["rQ=","lR="],"collections":{"extension@redux.devtools":["Bd=","ju="]}}}'), + ('default/https-everywhere@eff.org', '{"id":"key-userRules","key":"userRules","data":[],"_status":"synced","last_modified":1570079920045}'), + ('default/https-everywhere@eff.org', '{"id":"key-ruleActiveStates","key":"ruleActiveStates","data":{},"_status":"synced","last_modified":1570079919993}'), + ('default/https-everywhere@eff.org', '{"id":"key-migration_5F_version","key":"migration_version","data":2,"_status":"synced","last_modified":1570079919966}') + "#; + const HAPPY_PATH_MIGRATION_INFO: MigrationInfo = MigrationInfo { + entries: 5, + entries_successful: 5, + extensions: 2, + extensions_successful: 2, + open_failure: false, + }; + + #[allow(clippy::unreadable_literal)] + #[test] + fn test_happy_paths() { + // some real data. + let conn = do_migrate(HAPPY_PATH_MIGRATION_INFO, |c| { + c.execute_batch(HAPPY_PATH_SQL).expect("should populate") + }); + + assert_has( + &conn, + "{e7fefcf3-b39c-4f17-5215-ebfe120a7031}", + json!({"userWelcomed": 1570659224457u64, "isWho": "4ec8109f"}), + ); + assert_has( + &conn, + "https-everywhere@eff.org", + json!({"userRules": [], "ruleActiveStates": {}, "migration_version": 2}), + ); + } + + #[test] + fn test_sad_paths() { + do_migrate( + MigrationInfo { + entries: 10, + entries_successful: 0, + extensions: 6, + extensions_successful: 0, + open_failure: false, + }, + |c| { + c.execute_batch( + r#"INSERT INTO collection_data(collection_name, record) + VALUES + ('default/test', '{"key":2,"data":1}'), -- key not a string + ('default/test', '{"key":"","data":1}'), -- key empty string + ('default/test', '{"xey":"k","data":1}'), -- key missing + ('default/test', '{"key":"k","xata":1}'), -- data missing + ('default/test', '{"key":"k","data":1'), -- invalid json + ('xx/test', '{"key":"k","data":1}'), -- bad key format + ('default', '{"key":"k","data":1}'), -- bad key format 2 + ('default/', '{"key":"k","data":1}'), -- bad key format 3 + ('defaultx/test', '{"key":"k","data":1}'), -- bad key format 4 + ('', '') -- empty strings + "#, + ) + .expect("should populate"); + }, + ); + } + + #[test] + fn test_migration_info_storage() { + let tmpdir = tempdir().unwrap(); + let path = tmpdir.path().join("source.db"); + init_source_db(&path, |c| { + c.execute_batch(HAPPY_PATH_SQL).expect("should populate") + }); + + // now migrate + let db = crate::store::test::new_mem_store(); + db.migrate(&path).expect("migration should work"); + let mi = db + .take_migration_info() + .expect("take failed with info present"); + assert_eq!(mi, Some(HAPPY_PATH_MIGRATION_INFO)); + let mi2 = db + .take_migration_info() + .expect("take failed with info missing"); + assert_eq!(mi2, None); + } +} diff --git a/third_party/rust/webext-storage/src/schema.rs b/third_party/rust/webext-storage/src/schema.rs new file mode 100644 index 0000000000..59efcf495a --- /dev/null +++ b/third_party/rust/webext-storage/src/schema.rs @@ -0,0 +1,213 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::db::sql_fns; +use crate::error::Result; +use rusqlite::{Connection, Transaction}; +use sql_support::open_database::{ + ConnectionInitializer as MigrationLogic, Error as MigrationError, Result as MigrationResult, +}; + +const CREATE_SCHEMA_SQL: &str = include_str!("../sql/create_schema.sql"); +const CREATE_SYNC_TEMP_TABLES_SQL: &str = include_str!("../sql/create_sync_temp_tables.sql"); + +pub struct WebExtMigrationLogin; + +impl MigrationLogic for WebExtMigrationLogin { + const NAME: &'static str = "webext storage db"; + const END_VERSION: u32 = 2; + + fn prepare(&self, conn: &Connection, _db_empty: bool) -> MigrationResult<()> { + let initial_pragmas = " + -- We don't care about temp tables being persisted to disk. + PRAGMA temp_store = 2; + -- we unconditionally want write-ahead-logging mode + PRAGMA journal_mode=WAL; + -- foreign keys seem worth enforcing! + PRAGMA foreign_keys = ON; + "; + conn.execute_batch(initial_pragmas)?; + define_functions(conn)?; + conn.set_prepared_statement_cache_capacity(128); + Ok(()) + } + + fn init(&self, db: &Transaction<'_>) -> MigrationResult<()> { + log::debug!("Creating schema"); + db.execute_batch(CREATE_SCHEMA_SQL)?; + Ok(()) + } + + fn upgrade_from(&self, db: &Transaction<'_>, version: u32) -> MigrationResult<()> { + match version { + 1 => upgrade_from_1(db), + _ => Err(MigrationError::IncompatibleVersion(version)), + } + } +} + +fn define_functions(c: &Connection) -> MigrationResult<()> { + use rusqlite::functions::FunctionFlags; + c.create_scalar_function( + "generate_guid", + 0, + FunctionFlags::SQLITE_UTF8, + sql_fns::generate_guid, + )?; + Ok(()) +} + +fn upgrade_from_1(db: &Connection) -> MigrationResult<()> { + // We changed a not null constraint + db.execute_batch("ALTER TABLE storage_sync_mirror RENAME TO old_mirror;")?; + // just re-run the full schema commands to recreate the able. + db.execute_batch(CREATE_SCHEMA_SQL)?; + db.execute_batch( + "INSERT OR IGNORE INTO storage_sync_mirror(guid, ext_id, data) + SELECT guid, ext_id, data FROM old_mirror;", + )?; + db.execute_batch("DROP TABLE old_mirror;")?; + db.execute_batch("PRAGMA user_version = 2;")?; + Ok(()) +} + +// Note that we expect this to be called before and after a sync - before to +// ensure we are syncing with a clean state, after to be good memory citizens +// given the temp tables are in memory. +pub fn create_empty_sync_temp_tables(db: &Connection) -> Result<()> { + log::debug!("Initializing sync temp tables"); + db.execute_batch(CREATE_SYNC_TEMP_TABLES_SQL)?; + Ok(()) +} + +#[cfg(test)] +pub mod test { + use prettytable::{Cell, Row}; + use rusqlite::Result as RusqliteResult; + use rusqlite::{types::Value, Connection}; + + // To help debugging tests etc. + #[allow(unused)] + pub fn print_table(conn: &Connection, table_name: &str) -> RusqliteResult<()> { + let mut stmt = conn.prepare(&format!("SELECT * FROM {}", table_name))?; + let mut rows = stmt.query([])?; + let mut table = prettytable::Table::new(); + let mut titles = Row::empty(); + for col in rows.as_ref().expect("must have statement").columns() { + titles.add_cell(Cell::new(col.name())); + } + table.set_titles(titles); + while let Some(sql_row) = rows.next()? { + let mut table_row = Row::empty(); + for i in 0..sql_row.as_ref().column_count() { + let val = match sql_row.get::<_, Value>(i)? { + Value::Null => "null".to_string(), + Value::Integer(i) => i.to_string(), + Value::Real(f) => f.to_string(), + Value::Text(s) => s, + Value::Blob(b) => format!("<blob with {} bytes>", b.len()), + }; + table_row.add_cell(Cell::new(&val)); + } + table.add_row(table_row); + } + table.printstd(); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::db::test::new_mem_db; + use rusqlite::Error; + use sql_support::open_database::test_utils::MigratedDatabaseFile; + use sql_support::ConnExt; + + const CREATE_SCHEMA_V1_SQL: &str = include_str!("../sql/tests/create_schema_v1.sql"); + + #[test] + fn test_create_schema_twice() { + let db = new_mem_db(); + db.execute_batch(CREATE_SCHEMA_SQL) + .expect("should allow running twice"); + } + + #[test] + fn test_create_empty_sync_temp_tables_twice() { + let db = new_mem_db(); + create_empty_sync_temp_tables(&db).expect("should work first time"); + // insert something into our new temp table and check it's there. + db.execute_batch( + "INSERT INTO temp.storage_sync_staging + (guid, ext_id) VALUES + ('guid', 'ext_id');", + ) + .expect("should work once"); + let count = db + .query_row_and_then( + "SELECT COUNT(*) FROM temp.storage_sync_staging;", + [], + |row| row.get::<_, u32>(0), + ) + .expect("query should work"); + assert_eq!(count, 1, "should be one row"); + + // re-execute + create_empty_sync_temp_tables(&db).expect("should second first time"); + // and it should have deleted existing data. + let count = db + .query_row_and_then( + "SELECT COUNT(*) FROM temp.storage_sync_staging;", + [], + |row| row.get::<_, u32>(0), + ) + .expect("query should work"); + assert_eq!(count, 0, "should be no rows"); + } + + #[test] + fn test_all_upgrades() -> Result<()> { + let db_file = MigratedDatabaseFile::new(WebExtMigrationLogin, CREATE_SCHEMA_V1_SQL); + db_file.run_all_upgrades(); + let db = db_file.open(); + + let get_id_data = |guid: &str| -> Result<(Option<String>, Option<String>)> { + let (ext_id, data) = db + .try_query_row::<_, Error, _, _>( + "SELECT ext_id, data FROM storage_sync_mirror WHERE guid = :guid", + &[(":guid", &guid.to_string())], + |row| Ok((row.get(0)?, row.get(1)?)), + true, + )? + .expect("row should exist."); + Ok((ext_id, data)) + }; + assert_eq!( + get_id_data("guid-1")?, + (Some("ext-id-1".to_string()), Some("data-1".to_string())) + ); + assert_eq!( + get_id_data("guid-2")?, + (Some("ext-id-2".to_string()), Some("data-2".to_string())) + ); + Ok(()) + } + + #[test] + fn test_upgrade_2() -> Result<()> { + let _ = env_logger::try_init(); + + let db_file = MigratedDatabaseFile::new(WebExtMigrationLogin, CREATE_SCHEMA_V1_SQL); + db_file.upgrade_to(2); + let db = db_file.open(); + + // Should be able to insert a new with a NULL ext_id + db.execute_batch( + "INSERT INTO storage_sync_mirror(guid, ext_id, data) + VALUES ('guid-3', NULL, NULL);", + )?; + Ok(()) + } +} diff --git a/third_party/rust/webext-storage/src/store.rs b/third_party/rust/webext-storage/src/store.rs new file mode 100644 index 0000000000..043ec5a11c --- /dev/null +++ b/third_party/rust/webext-storage/src/store.rs @@ -0,0 +1,218 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::api::{self, StorageChanges}; +use crate::db::{StorageDb, ThreadSafeStorageDb}; +use crate::error::*; +use crate::migration::{migrate, MigrationInfo}; +use crate::sync; +use std::path::Path; +use std::sync::Arc; + +use interrupt_support::SqlInterruptHandle; +use serde_json::Value as JsonValue; + +/// A store is used to access `storage.sync` data. It manages an underlying +/// database connection, and exposes methods for reading and writing storage +/// items scoped to an extension ID. Each item is a JSON object, with one or +/// more string keys, and values of any type that can serialize to JSON. +/// +/// An application should create only one store, and manage the instance as a +/// singleton. While this isn't enforced, if you make multiple stores pointing +/// to the same database file, you are going to have a bad time: each store will +/// create its own database connection, using up extra memory and CPU cycles, +/// and causing write contention. For this reason, you should only call +/// `Store::new()` (or `webext_store_new()`, from the FFI) once. +/// +/// Note that our Db implementation is behind an Arc<> because we share that +/// connection with our sync engines - ie, these engines also hold an Arc<> +/// around the same object. +pub struct Store { + db: Arc<ThreadSafeStorageDb>, +} + +impl Store { + /// Creates a store backed by a database at `db_path`. The path can be a + /// file path or `file:` URI. + pub fn new(db_path: impl AsRef<Path>) -> Result<Self> { + let db = StorageDb::new(db_path)?; + Ok(Self { + db: Arc::new(ThreadSafeStorageDb::new(db)), + }) + } + + /// Creates a store backed by an in-memory database. + #[cfg(test)] + pub fn new_memory(db_path: &str) -> Result<Self> { + let db = StorageDb::new_memory(db_path)?; + Ok(Self { + db: Arc::new(ThreadSafeStorageDb::new(db)), + }) + } + + /// Returns an interrupt handle for this store. + pub fn interrupt_handle(&self) -> Arc<SqlInterruptHandle> { + self.db.interrupt_handle() + } + + /// Sets one or more JSON key-value pairs for an extension ID. Returns a + /// list of changes, with existing and new values for each key in `val`. + pub fn set(&self, ext_id: &str, val: JsonValue) -> Result<StorageChanges> { + let db = self.db.lock(); + let tx = db.unchecked_transaction()?; + let result = api::set(&tx, ext_id, val)?; + tx.commit()?; + Ok(result) + } + + /// Returns information about per-extension usage + pub fn usage(&self) -> Result<Vec<crate::UsageInfo>> { + let db = self.db.lock(); + api::usage(&db) + } + + /// Returns the values for one or more keys `keys` can be: + /// + /// - `null`, in which case all key-value pairs for the extension are + /// returned, or an empty object if the extension doesn't have any + /// stored data. + /// - A single string key, in which case an object with only that key + /// and its value is returned, or an empty object if the key doesn't + // exist. + /// - An array of string keys, in which case an object with only those + /// keys and their values is returned. Any keys that don't exist will be + /// omitted. + /// - An object where the property names are keys, and each value is the + /// default value to return if the key doesn't exist. + /// + /// This method always returns an object (that is, a + /// `serde_json::Value::Object`). + pub fn get(&self, ext_id: &str, keys: JsonValue) -> Result<JsonValue> { + // Don't care about transactions here. + let db = self.db.lock(); + api::get(&db, ext_id, keys) + } + + /// Deletes the values for one or more keys. As with `get`, `keys` can be + /// either a single string key, or an array of string keys. Returns a list + /// of changes, where each change contains the old value for each deleted + /// key. + pub fn remove(&self, ext_id: &str, keys: JsonValue) -> Result<StorageChanges> { + let db = self.db.lock(); + let tx = db.unchecked_transaction()?; + let result = api::remove(&tx, ext_id, keys)?; + tx.commit()?; + Ok(result) + } + + /// Deletes all key-value pairs for the extension. As with `remove`, returns + /// a list of changes, where each change contains the old value for each + /// deleted key. + pub fn clear(&self, ext_id: &str) -> Result<StorageChanges> { + let db = self.db.lock(); + let tx = db.unchecked_transaction()?; + let result = api::clear(&tx, ext_id)?; + tx.commit()?; + Ok(result) + } + + /// Returns the bytes in use for the specified items (which can be null, + /// a string, or an array) + pub fn get_bytes_in_use(&self, ext_id: &str, keys: JsonValue) -> Result<usize> { + let db = self.db.lock(); + api::get_bytes_in_use(&db, ext_id, keys) + } + + /// Returns a bridged sync engine for Desktop for this store. + pub fn bridged_engine(&self) -> sync::BridgedEngine { + sync::BridgedEngine::new(&self.db) + } + + /// Closes the store and its database connection. See the docs for + /// `StorageDb::close` for more details on when this can fail. + pub fn close(self) -> Result<()> { + // Even though this consumes `self`, the fact we use an Arc<> means + // we can't guarantee we can actually consume the inner DB - so do + // the best we can. + let shared: ThreadSafeStorageDb = match Arc::try_unwrap(self.db) { + Ok(shared) => shared, + _ => { + // The only way this is possible is if the sync engine has an operation + // running - but that shouldn't be possible in practice because desktop + // uses a single "task queue" such that the close operation can't possibly + // be running concurrently with any sync or storage tasks. + + // If this *could* get hit, rusqlite will attempt to close the DB connection + // as it is dropped, and if that close fails, then rusqlite 0.28.0 and earlier + // would panic - but even that only happens if prepared statements are + // not finalized, which ruqlite also does. + + // tl;dr - this should be impossible. If it was possible, rusqlite might panic, + // but we've never seen it panic in practice other places we don't close + // connections, and the next rusqlite version will not panic anyway. + // So this-is-fine.jpg + log::warn!("Attempting to close a store while other DB references exist."); + return Err(ErrorKind::OtherConnectionReferencesExist.into()); + } + }; + // consume the mutex and get back the inner. + let db = shared.into_inner(); + db.close() + } + + /// Gets the changes which the current sync applied. Should be used + /// immediately after the bridged engine is told to apply incoming changes, + /// and can be used to notify observers of the StorageArea of the changes + /// that were applied. + /// The result is a Vec of already JSON stringified changes. + pub fn get_synced_changes(&self) -> Result<Vec<sync::SyncedExtensionChange>> { + let db = self.db.lock(); + sync::get_synced_changes(&db) + } + + /// Migrates data from a database in the format of the "old" kinto + /// implementation. Information about how the migration went is stored in + /// the database, and can be read using `Self::take_migration_info`. + /// + /// Note that `filename` isn't normalized or canonicalized. + pub fn migrate(&self, filename: impl AsRef<Path>) -> Result<()> { + let db = self.db.lock(); + let tx = db.unchecked_transaction()?; + let result = migrate(&tx, filename.as_ref())?; + tx.commit()?; + // Failing to store this information should not cause migration failure. + if let Err(e) = result.store(&db) { + debug_assert!(false, "Migration error: {:?}", e); + log::warn!("Failed to record migration telmetry: {}", e); + } + Ok(()) + } + + /// Read-and-delete (e.g. `take` in rust parlance, see Option::take) + /// operation for any MigrationInfo stored in this database. + pub fn take_migration_info(&self) -> Result<Option<MigrationInfo>> { + let db = self.db.lock(); + let tx = db.unchecked_transaction()?; + let result = MigrationInfo::take(&tx)?; + tx.commit()?; + Ok(result) + } +} + +#[cfg(test)] +pub mod test { + use super::*; + #[test] + fn test_send() { + fn ensure_send<T: Send>() {} + // Compile will fail if not send. + ensure_send::<Store>(); + } + + pub fn new_mem_store() -> Store { + Store { + db: Arc::new(ThreadSafeStorageDb::new(crate::db::test::new_mem_db())), + } + } +} diff --git a/third_party/rust/webext-storage/src/sync/bridge.rs b/third_party/rust/webext-storage/src/sync/bridge.rs new file mode 100644 index 0000000000..760a0d0e10 --- /dev/null +++ b/third_party/rust/webext-storage/src/sync/bridge.rs @@ -0,0 +1,388 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use anyhow::Result; +use rusqlite::Transaction; +use std::sync::{Arc, Weak}; +use sync15::bso::IncomingBso; +use sync15::engine::ApplyResults; +use sync_guid::Guid as SyncGuid; + +use crate::db::{delete_meta, get_meta, put_meta, ThreadSafeStorageDb}; +use crate::schema; +use crate::sync::incoming::{apply_actions, get_incoming, plan_incoming, stage_incoming}; +use crate::sync::outgoing::{get_outgoing, record_uploaded, stage_outgoing}; + +const LAST_SYNC_META_KEY: &str = "last_sync_time"; +const SYNC_ID_META_KEY: &str = "sync_id"; + +/// A bridged engine implements all the methods needed to make the +/// `storage.sync` store work with Desktop's Sync implementation. +/// Conceptually, it's similar to `sync15::Store`, which we +/// should eventually rename and unify with this trait (#2841). +/// +/// Unlike most of our other implementation which hold a strong reference +/// to the store, this engine keeps a weak reference in an attempt to keep +/// the desktop semantics as close as possible to what they were when the +/// engines all took lifetime params to ensure they don't outlive the store. +pub struct BridgedEngine { + db: Weak<ThreadSafeStorageDb>, +} + +impl BridgedEngine { + /// Creates a bridged engine for syncing. + pub fn new(db: &Arc<ThreadSafeStorageDb>) -> Self { + BridgedEngine { + db: Arc::downgrade(db), + } + } + + fn do_reset(&self, tx: &Transaction<'_>) -> Result<()> { + tx.execute_batch( + "DELETE FROM storage_sync_mirror; + UPDATE storage_sync_data SET sync_change_counter = 1;", + )?; + delete_meta(tx, LAST_SYNC_META_KEY)?; + Ok(()) + } + + fn thread_safe_storage_db(&self) -> Result<Arc<ThreadSafeStorageDb>> { + self.db + .upgrade() + .ok_or_else(|| crate::error::ErrorKind::DatabaseConnectionClosed.into()) + } +} + +impl sync15::engine::BridgedEngine for BridgedEngine { + fn last_sync(&self) -> Result<i64> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + Ok(get_meta(&db, LAST_SYNC_META_KEY)?.unwrap_or(0)) + } + + fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + put_meta(&db, LAST_SYNC_META_KEY, &last_sync_millis)?; + Ok(()) + } + + fn sync_id(&self) -> Result<Option<String>> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + Ok(get_meta(&db, SYNC_ID_META_KEY)?) + } + + fn reset_sync_id(&self) -> Result<String> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let tx = db.unchecked_transaction()?; + let new_id = SyncGuid::random().to_string(); + self.do_reset(&tx)?; + put_meta(&tx, SYNC_ID_META_KEY, &new_id)?; + tx.commit()?; + Ok(new_id) + } + + fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let current: Option<String> = get_meta(&db, SYNC_ID_META_KEY)?; + Ok(match current { + Some(current) if current == sync_id => current, + _ => { + let tx = db.unchecked_transaction()?; + self.do_reset(&tx)?; + let result = sync_id.to_string(); + put_meta(&tx, SYNC_ID_META_KEY, &result)?; + tx.commit()?; + result + } + }) + } + + fn sync_started(&self) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + schema::create_empty_sync_temp_tables(&db)?; + Ok(()) + } + + fn store_incoming(&self, incoming_bsos: Vec<IncomingBso>) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let signal = db.begin_interrupt_scope()?; + let tx = db.unchecked_transaction()?; + let incoming_content: Vec<_> = incoming_bsos + .into_iter() + .map(IncomingBso::into_content::<super::WebextRecord>) + .collect(); + stage_incoming(&tx, &incoming_content, &signal)?; + tx.commit()?; + Ok(()) + } + + fn apply(&self) -> Result<ApplyResults> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let signal = db.begin_interrupt_scope()?; + + let tx = db.unchecked_transaction()?; + let incoming = get_incoming(&tx)?; + let actions = incoming + .into_iter() + .map(|(item, state)| (item, plan_incoming(state))) + .collect(); + apply_actions(&tx, actions, &signal)?; + stage_outgoing(&tx)?; + tx.commit()?; + + Ok(get_outgoing(&db, &signal)?.into()) + } + + fn set_uploaded(&self, _server_modified_millis: i64, ids: &[SyncGuid]) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let signal = db.begin_interrupt_scope()?; + let tx = db.unchecked_transaction()?; + record_uploaded(&tx, ids, &signal)?; + tx.commit()?; + + Ok(()) + } + + fn sync_finished(&self) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + schema::create_empty_sync_temp_tables(&db)?; + Ok(()) + } + + fn reset(&self) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let tx = db.unchecked_transaction()?; + self.do_reset(&tx)?; + delete_meta(&tx, SYNC_ID_META_KEY)?; + tx.commit()?; + Ok(()) + } + + fn wipe(&self) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let tx = db.unchecked_transaction()?; + // We assume the meta table is only used by sync. + tx.execute_batch( + "DELETE FROM storage_sync_data; DELETE FROM storage_sync_mirror; DELETE FROM meta;", + )?; + tx.commit()?; + Ok(()) + } +} + +impl From<anyhow::Error> for crate::error::Error { + fn from(value: anyhow::Error) -> Self { + crate::error::ErrorKind::SyncError(value.to_string()).into() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::db::test::new_mem_thread_safe_storage_db; + use crate::db::StorageDb; + use sync15::engine::BridgedEngine; + + fn query_count(conn: &StorageDb, table: &str) -> u32 { + conn.query_row_and_then(&format!("SELECT COUNT(*) FROM {};", table), [], |row| { + row.get::<_, u32>(0) + }) + .expect("should work") + } + + // Sets up mock data for the tests here. + fn setup_mock_data(engine: &super::BridgedEngine) -> Result<()> { + { + let shared = engine.thread_safe_storage_db()?; + let db = shared.lock(); + db.execute( + "INSERT INTO storage_sync_data (ext_id, data, sync_change_counter) + VALUES ('ext-a', 'invalid-json', 2)", + [], + )?; + db.execute( + "INSERT INTO storage_sync_mirror (guid, ext_id, data) + VALUES ('guid', 'ext-a', '3')", + [], + )?; + } + engine.set_last_sync(1)?; + + let shared = engine.thread_safe_storage_db()?; + let db = shared.lock(); + // and assert we wrote what we think we did. + assert_eq!(query_count(&db, "storage_sync_data"), 1); + assert_eq!(query_count(&db, "storage_sync_mirror"), 1); + assert_eq!(query_count(&db, "meta"), 1); + Ok(()) + } + + // Assuming a DB setup with setup_mock_data, assert it was correctly reset. + fn assert_reset(engine: &super::BridgedEngine) -> Result<()> { + // A reset never wipes data... + let shared = engine.thread_safe_storage_db()?; + let db = shared.lock(); + assert_eq!(query_count(&db, "storage_sync_data"), 1); + + // But did reset the change counter. + let cc = db.query_row_and_then( + "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';", + [], + |row| row.get::<_, u32>(0), + )?; + assert_eq!(cc, 1); + // But did wipe the mirror... + assert_eq!(query_count(&db, "storage_sync_mirror"), 0); + // And the last_sync should have been wiped. + assert!(get_meta::<i64>(&db, LAST_SYNC_META_KEY)?.is_none()); + Ok(()) + } + + // Assuming a DB setup with setup_mock_data, assert it has not been reset. + fn assert_not_reset(engine: &super::BridgedEngine) -> Result<()> { + let shared = engine.thread_safe_storage_db()?; + let db = shared.lock(); + assert_eq!(query_count(&db, "storage_sync_data"), 1); + let cc = db.query_row_and_then( + "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';", + [], + |row| row.get::<_, u32>(0), + )?; + assert_eq!(cc, 2); + assert_eq!(query_count(&db, "storage_sync_mirror"), 1); + // And the last_sync should remain. + assert!(get_meta::<i64>(&db, LAST_SYNC_META_KEY)?.is_some()); + Ok(()) + } + + #[test] + fn test_wipe() -> Result<()> { + let strong = new_mem_thread_safe_storage_db(); + let engine = super::BridgedEngine::new(&strong); + + setup_mock_data(&engine)?; + + engine.wipe()?; + + let shared = engine.thread_safe_storage_db()?; + let db = shared.lock(); + + assert_eq!(query_count(&db, "storage_sync_data"), 0); + assert_eq!(query_count(&db, "storage_sync_mirror"), 0); + assert_eq!(query_count(&db, "meta"), 0); + Ok(()) + } + + #[test] + fn test_reset() -> Result<()> { + let strong = new_mem_thread_safe_storage_db(); + let engine = super::BridgedEngine::new(&strong); + + setup_mock_data(&engine)?; + put_meta( + &engine.thread_safe_storage_db()?.lock(), + SYNC_ID_META_KEY, + &"sync-id".to_string(), + )?; + + engine.reset()?; + assert_reset(&engine)?; + // Only an explicit reset kills the sync-id, so check that here. + assert_eq!( + get_meta::<String>(&engine.thread_safe_storage_db()?.lock(), SYNC_ID_META_KEY)?, + None + ); + + Ok(()) + } + + #[test] + fn test_ensure_missing_sync_id() -> Result<()> { + let strong = new_mem_thread_safe_storage_db(); + let engine = super::BridgedEngine::new(&strong); + + setup_mock_data(&engine)?; + + assert_eq!(engine.sync_id()?, None); + // We don't have a sync ID - so setting one should reset. + engine.ensure_current_sync_id("new-id")?; + // should have cause a reset. + assert_reset(&engine)?; + Ok(()) + } + + #[test] + fn test_ensure_new_sync_id() -> Result<()> { + let strong = new_mem_thread_safe_storage_db(); + let engine = super::BridgedEngine::new(&strong); + + setup_mock_data(&engine)?; + + put_meta( + &engine.thread_safe_storage_db()?.lock(), + SYNC_ID_META_KEY, + &"old-id".to_string(), + )?; + assert_not_reset(&engine)?; + assert_eq!(engine.sync_id()?, Some("old-id".to_string())); + + engine.ensure_current_sync_id("new-id")?; + // should have cause a reset. + assert_reset(&engine)?; + // should have the new id. + assert_eq!(engine.sync_id()?, Some("new-id".to_string())); + Ok(()) + } + + #[test] + fn test_ensure_same_sync_id() -> Result<()> { + let strong = new_mem_thread_safe_storage_db(); + let engine = super::BridgedEngine::new(&strong); + + setup_mock_data(&engine)?; + assert_not_reset(&engine)?; + + put_meta( + &engine.thread_safe_storage_db()?.lock(), + SYNC_ID_META_KEY, + &"sync-id".to_string(), + )?; + + engine.ensure_current_sync_id("sync-id")?; + // should not have reset. + assert_not_reset(&engine)?; + Ok(()) + } + + #[test] + fn test_reset_sync_id() -> Result<()> { + let strong = new_mem_thread_safe_storage_db(); + let engine = super::BridgedEngine::new(&strong); + + setup_mock_data(&engine)?; + put_meta( + &engine.thread_safe_storage_db()?.lock(), + SYNC_ID_META_KEY, + &"sync-id".to_string(), + )?; + + assert_eq!(engine.sync_id()?, Some("sync-id".to_string())); + let new_id = engine.reset_sync_id()?; + // should have cause a reset. + assert_reset(&engine)?; + assert_eq!(engine.sync_id()?, Some(new_id)); + Ok(()) + } +} diff --git a/third_party/rust/webext-storage/src/sync/incoming.rs b/third_party/rust/webext-storage/src/sync/incoming.rs new file mode 100644 index 0000000000..5d00fe8de3 --- /dev/null +++ b/third_party/rust/webext-storage/src/sync/incoming.rs @@ -0,0 +1,863 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// The "incoming" part of syncing - handling the incoming rows, staging them, +// working out a plan for them, updating the local data and mirror, etc. + +use interrupt_support::Interruptee; +use rusqlite::{Connection, Row, Transaction}; +use sql_support::ConnExt; +use sync15::bso::{IncomingContent, IncomingKind}; +use sync_guid::Guid as SyncGuid; + +use crate::api::{StorageChanges, StorageValueChange}; +use crate::error::*; + +use super::{merge, remove_matching_keys, JsonMap, WebextRecord}; + +/// The state data can be in. Could be represented as Option<JsonMap>, but this +/// is clearer and independent of how the data is stored. +#[derive(Debug, PartialEq, Eq)] +pub enum DataState { + /// The data was deleted. + Deleted, + /// Data exists, as stored in the map. + Exists(JsonMap), +} + +// A little helper to create a StorageChanges object when we are creating +// a new value with multiple keys that doesn't exist locally. +fn changes_for_new_incoming(new: &JsonMap) -> StorageChanges { + let mut result = StorageChanges::with_capacity(new.len()); + for (key, val) in new.iter() { + result.push(StorageValueChange { + key: key.clone(), + old_value: None, + new_value: Some(val.clone()), + }); + } + result +} + +// This module deals exclusively with the Map inside a JsonValue::Object(). +// This helper reads such a Map from a SQL row, ignoring anything which is +// either invalid JSON or a different JSON type. +fn json_map_from_row(row: &Row<'_>, col: &str) -> Result<DataState> { + let s = row.get::<_, Option<String>>(col)?; + Ok(match s { + None => DataState::Deleted, + Some(s) => match serde_json::from_str(&s) { + Ok(serde_json::Value::Object(m)) => DataState::Exists(m), + _ => { + // We don't want invalid json or wrong types to kill syncing. + // It should be impossible as we never write anything which + // could cause it, but we can't really log the bad data as there + // might be PII. Logging just a message without any additional + // clues is going to be unhelpfully noisy, so, silently None. + // XXX - Maybe record telemetry? + DataState::Deleted + } + }, + }) +} + +/// The first thing we do with incoming items is to "stage" them in a temp table. +/// The actual processing is done via this table. +pub fn stage_incoming( + tx: &Transaction<'_>, + incoming_records: &[IncomingContent<WebextRecord>], + signal: &dyn Interruptee, +) -> Result<()> { + sql_support::each_sized_chunk( + incoming_records, + // We bind 3 params per chunk. + sql_support::default_max_variable_number() / 3, + |chunk, _| -> Result<()> { + let mut params = Vec::with_capacity(chunk.len() * 3); + for record in chunk { + signal.err_if_interrupted()?; + match &record.kind { + IncomingKind::Content(r) => { + params.push(Some(record.envelope.id.to_string())); + params.push(Some(r.ext_id.to_string())); + params.push(Some(r.data.clone())); + } + IncomingKind::Tombstone => { + params.push(Some(record.envelope.id.to_string())); + params.push(None); + params.push(None); + } + IncomingKind::Malformed => { + log::error!("Ignoring incoming malformed record: {}", record.envelope.id); + } + } + } + // we might have skipped records + let actual_len = params.len() / 3; + if actual_len != 0 { + let sql = format!( + "INSERT OR REPLACE INTO temp.storage_sync_staging + (guid, ext_id, data) + VALUES {}", + sql_support::repeat_multi_values(actual_len, 3) + ); + tx.execute(&sql, rusqlite::params_from_iter(params))?; + } + Ok(()) + }, + )?; + Ok(()) +} + +/// The "state" we find ourselves in when considering an incoming/staging +/// record. This "state" is the input to calculating the IncomingAction and +/// carries all the data we need to make the required local changes. +#[derive(Debug, PartialEq, Eq)] +pub enum IncomingState { + /// There's an incoming item, but data for that extension doesn't exist + /// either in our local data store or in the local mirror. IOW, this is the + /// very first time we've seen this extension. + IncomingOnlyData { ext_id: String, data: JsonMap }, + + /// An incoming tombstone that doesn't exist locally. Because tombstones + /// don't carry the ext-id, it means it's not in our mirror. We are just + /// going to ignore it, but we track the state for consistency. + IncomingOnlyTombstone, + + /// There's an incoming item and we have data for the same extension in + /// our local store - but not in our mirror. This should be relatively + /// uncommon as it means: + /// * Some other profile has recently installed an extension and synced. + /// * This profile has recently installed the same extension. + /// * This is the first sync for this profile since both those events + /// happened. + HasLocal { + ext_id: String, + incoming: DataState, + local: DataState, + }, + /// There's an incoming item and there's an item for the same extension in + /// the mirror. The addon probably doesn't exist locally, or if it does, + /// the last time we synced we synced the deletion of all data. + NotLocal { + ext_id: String, + incoming: DataState, + mirror: DataState, + }, + /// This will be the most common "incoming" case - there's data incoming, + /// in the mirror and in the local store for an addon. + Everywhere { + ext_id: String, + incoming: DataState, + mirror: DataState, + local: DataState, + }, +} + +/// Get the items we need to process from the staging table. Return details about +/// the item and the state of that item, ready for processing. +pub fn get_incoming(conn: &Connection) -> Result<Vec<(SyncGuid, IncomingState)>> { + let sql = " + SELECT + s.guid as guid, + l.ext_id as l_ext_id, + m.ext_id as m_ext_id, + s.ext_id as s_ext_id, + s.data as s_data, m.data as m_data, l.data as l_data, + l.sync_change_counter + FROM temp.storage_sync_staging s + LEFT JOIN storage_sync_mirror m ON m.guid = s.guid + LEFT JOIN storage_sync_data l on l.ext_id IN (m.ext_id, s.ext_id);"; + + fn from_row(row: &Row<'_>) -> Result<(SyncGuid, IncomingState)> { + let guid = row.get("guid")?; + // This is complicated because the staging row doesn't hold the ext_id. + // However, both the local table and the mirror do. + let mirror_ext_id: Option<String> = row.get("m_ext_id")?; + let local_ext_id: Option<String> = row.get("l_ext_id")?; + let staged_ext_id: Option<String> = row.get("s_ext_id")?; + let incoming = json_map_from_row(row, "s_data")?; + + // We find the state by examining which tables the ext-id exists in, + // using whether that column is null as a proxy for that. + let state = match (local_ext_id, mirror_ext_id) { + (None, None) => { + match staged_ext_id { + Some(ext_id) => { + let data = match incoming { + // incoming record with missing data that's not a + // tombstone shouldn't happen, but we can cope by + // pretending it was an empty json map. + DataState::Deleted => JsonMap::new(), + DataState::Exists(data) => data, + }; + IncomingState::IncomingOnlyData { ext_id, data } + } + None => IncomingState::IncomingOnlyTombstone, + } + } + (Some(ext_id), None) => IncomingState::HasLocal { + ext_id, + incoming, + local: json_map_from_row(row, "l_data")?, + }, + (None, Some(ext_id)) => IncomingState::NotLocal { + ext_id, + incoming, + mirror: json_map_from_row(row, "m_data")?, + }, + (Some(ext_id), Some(_)) => IncomingState::Everywhere { + ext_id, + incoming, + mirror: json_map_from_row(row, "m_data")?, + local: json_map_from_row(row, "l_data")?, + }, + }; + Ok((guid, state)) + } + + conn.conn().query_rows_and_then(sql, [], from_row) +} + +/// This is the set of actions we know how to take *locally* for incoming +/// records. Which one depends on the IncomingState. +/// Every state which updates also records the set of changes we should notify +#[derive(Debug, PartialEq, Eq)] +pub enum IncomingAction { + /// We should locally delete the data for this record + DeleteLocally { + ext_id: String, + changes: StorageChanges, + }, + /// We will take the remote. + TakeRemote { + ext_id: String, + data: JsonMap, + changes: StorageChanges, + }, + /// We merged this data - this is what we came up with. + Merge { + ext_id: String, + data: JsonMap, + changes: StorageChanges, + }, + /// Entry exists locally and it's the same as the incoming record. + Same { ext_id: String }, + /// Incoming tombstone for an item we've never seen. + Nothing, +} + +/// Takes the state of an item and returns the action we should take for it. +pub fn plan_incoming(s: IncomingState) -> IncomingAction { + match s { + IncomingState::Everywhere { + ext_id, + incoming, + local, + mirror, + } => { + // All records exist - but do they all have data? + match (incoming, local, mirror) { + ( + DataState::Exists(incoming_data), + DataState::Exists(local_data), + DataState::Exists(mirror_data), + ) => { + // all records have data - 3-way merge. + merge(ext_id, incoming_data, local_data, Some(mirror_data)) + } + ( + DataState::Exists(incoming_data), + DataState::Exists(local_data), + DataState::Deleted, + ) => { + // No parent, so first time seeing this remotely - 2-way merge + merge(ext_id, incoming_data, local_data, None) + } + (DataState::Exists(incoming_data), DataState::Deleted, _) => { + // Incoming data, removed locally. Server wins. + IncomingAction::TakeRemote { + ext_id, + changes: changes_for_new_incoming(&incoming_data), + data: incoming_data, + } + } + (DataState::Deleted, DataState::Exists(local_data), DataState::Exists(mirror)) => { + // Deleted remotely. + // Treat this as a delete of every key that we + // know was present at the time. + let (result, changes) = remove_matching_keys(local_data, &mirror); + if result.is_empty() { + // If there were no more keys left, we can + // delete our version too. + IncomingAction::DeleteLocally { ext_id, changes } + } else { + IncomingAction::Merge { + ext_id, + data: result, + changes, + } + } + } + (DataState::Deleted, DataState::Exists(local_data), DataState::Deleted) => { + // Perhaps another client created and then deleted + // the whole object for this extension since the + // last time we synced. + // Treat this as a delete of every key that we + // knew was present. Unfortunately, we don't know + // any keys that were present, so we delete no keys. + IncomingAction::Merge { + ext_id, + data: local_data, + changes: StorageChanges::new(), + } + } + (DataState::Deleted, DataState::Deleted, _) => { + // We agree with the remote (regardless of what we + // have mirrored). + IncomingAction::Same { ext_id } + } + } + } + IncomingState::HasLocal { + ext_id, + incoming, + local, + } => { + // So we have a local record and an incoming/staging record, but *not* a + // mirror record. This means some other device has synced this for + // the first time and we are yet to do the same. + match (incoming, local) { + (DataState::Exists(incoming_data), DataState::Exists(local_data)) => { + // This means the extension exists locally and remotely + // but this is the first time we've synced it. That's no problem, it's + // just a 2-way merge... + merge(ext_id, incoming_data, local_data, None) + } + (DataState::Deleted, DataState::Exists(local_data)) => { + // We've data locally, but there's an incoming deletion. + // We would normally remove keys that we knew were + // present on the server, but we don't know what + // was on the server, so we don't remove anything. + IncomingAction::Merge { + ext_id, + data: local_data, + changes: StorageChanges::new(), + } + } + (DataState::Exists(incoming_data), DataState::Deleted) => { + // No data locally, but some is incoming - take it. + IncomingAction::TakeRemote { + ext_id, + changes: changes_for_new_incoming(&incoming_data), + data: incoming_data, + } + } + (DataState::Deleted, DataState::Deleted) => { + // Nothing anywhere - odd, but OK. + IncomingAction::Same { ext_id } + } + } + } + IncomingState::NotLocal { + ext_id, incoming, .. + } => { + // No local data but there's mirror and an incoming record. + // This means a local deletion is being replaced by, or just re-doing + // the incoming record. + match incoming { + DataState::Exists(data) => IncomingAction::TakeRemote { + ext_id, + changes: changes_for_new_incoming(&data), + data, + }, + DataState::Deleted => IncomingAction::Same { ext_id }, + } + } + IncomingState::IncomingOnlyData { ext_id, data } => { + // Only the staging record exists and it's not a tombstone. + // This means it's the first time we've ever seen it. No + // conflict possible, just take the remote. + IncomingAction::TakeRemote { + ext_id, + changes: changes_for_new_incoming(&data), + data, + } + } + IncomingState::IncomingOnlyTombstone => { + // Only the staging record exists and it is a tombstone - nothing to do. + IncomingAction::Nothing + } + } +} + +fn insert_changes(tx: &Transaction<'_>, ext_id: &str, changes: &StorageChanges) -> Result<()> { + tx.execute_cached( + "INSERT INTO temp.storage_sync_applied (ext_id, changes) + VALUES (:ext_id, :changes)", + rusqlite::named_params! { + ":ext_id": ext_id, + ":changes": &serde_json::to_string(&changes)?, + }, + )?; + Ok(()) +} + +// Apply the actions necessary to fully process the incoming items. +pub fn apply_actions( + tx: &Transaction<'_>, + actions: Vec<(SyncGuid, IncomingAction)>, + signal: &dyn Interruptee, +) -> Result<()> { + for (item, action) in actions { + signal.err_if_interrupted()?; + + log::trace!("action for '{:?}': {:?}", item, action); + match action { + IncomingAction::DeleteLocally { ext_id, changes } => { + // Can just nuke it entirely. + tx.execute_cached( + "DELETE FROM storage_sync_data WHERE ext_id = :ext_id", + &[(":ext_id", &ext_id)], + )?; + insert_changes(tx, &ext_id, &changes)?; + } + // We want to update the local record with 'data' and after this update the item no longer is considered dirty. + IncomingAction::TakeRemote { + ext_id, + data, + changes, + } => { + tx.execute_cached( + "INSERT OR REPLACE INTO storage_sync_data(ext_id, data, sync_change_counter) + VALUES (:ext_id, :data, 0)", + rusqlite::named_params! { + ":ext_id": ext_id, + ":data": serde_json::Value::Object(data), + }, + )?; + insert_changes(tx, &ext_id, &changes)?; + } + + // We merged this data, so need to update locally but still consider + // it dirty because the merged data must be uploaded. + IncomingAction::Merge { + ext_id, + data, + changes, + } => { + tx.execute_cached( + "UPDATE storage_sync_data SET data = :data, sync_change_counter = sync_change_counter + 1 WHERE ext_id = :ext_id", + rusqlite::named_params! { + ":ext_id": ext_id, + ":data": serde_json::Value::Object(data), + }, + )?; + insert_changes(tx, &ext_id, &changes)?; + } + + // Both local and remote ended up the same - only need to nuke the + // change counter. + IncomingAction::Same { ext_id } => { + tx.execute_cached( + "UPDATE storage_sync_data SET sync_change_counter = 0 WHERE ext_id = :ext_id", + &[(":ext_id", &ext_id)], + )?; + // no changes to write + } + // Literally nothing to do! + IncomingAction::Nothing => {} + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::super::test::new_syncable_mem_db; + use super::*; + use crate::api; + use interrupt_support::NeverInterrupts; + use serde_json::{json, Value}; + use sync15::bso::IncomingBso; + + // select simple int + fn ssi(conn: &Connection, stmt: &str) -> u32 { + conn.try_query_one(stmt, [], true) + .expect("must work") + .unwrap_or_default() + } + + fn array_to_incoming(mut array: Value) -> Vec<IncomingContent<WebextRecord>> { + let jv = array.as_array_mut().expect("you must pass a json array"); + let mut result = Vec::with_capacity(jv.len()); + for elt in jv { + result.push(IncomingBso::from_test_content(elt.take()).into_content()); + } + result + } + + // Can't find a way to import these from crate::sync::tests... + macro_rules! map { + ($($map:tt)+) => { + json!($($map)+).as_object().unwrap().clone() + }; + } + macro_rules! change { + ($key:literal, None, None) => { + StorageValueChange { + key: $key.to_string(), + old_value: None, + new_value: None, + }; + }; + ($key:literal, $old:tt, None) => { + StorageValueChange { + key: $key.to_string(), + old_value: Some(json!($old)), + new_value: None, + } + }; + ($key:literal, None, $new:tt) => { + StorageValueChange { + key: $key.to_string(), + old_value: None, + new_value: Some(json!($new)), + }; + }; + ($key:literal, $old:tt, $new:tt) => { + StorageValueChange { + key: $key.to_string(), + old_value: Some(json!($old)), + new_value: Some(json!($new)), + } + }; + } + macro_rules! changes { + ( $( $change:expr ),* ) => { + { + let mut changes = StorageChanges::new(); + $( + changes.push($change); + )* + changes + } + }; + } + + #[test] + fn test_incoming_populates_staging() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + + let incoming = json! {[ + { + "id": "guidAAAAAAAA", + "extId": "ext1@example.com", + "data": json!({"foo": "bar"}).to_string(), + } + ]}; + + stage_incoming(&tx, &array_to_incoming(incoming), &NeverInterrupts)?; + // check staging table + assert_eq!( + ssi(&tx, "SELECT count(*) FROM temp.storage_sync_staging"), + 1 + ); + Ok(()) + } + + #[test] + fn test_fetch_incoming_state() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + + // Start with an item just in staging. + tx.execute( + r#" + INSERT INTO temp.storage_sync_staging (guid, ext_id, data) + VALUES ('guid', 'ext_id', '{"foo":"bar"}') + "#, + [], + )?; + + let incoming = get_incoming(&tx)?; + assert_eq!(incoming.len(), 1); + assert_eq!(incoming[0].0, SyncGuid::new("guid"),); + assert_eq!( + incoming[0].1, + IncomingState::IncomingOnlyData { + ext_id: "ext_id".to_string(), + data: map!({"foo": "bar"}), + } + ); + + // Add the same item to the mirror. + tx.execute( + r#" + INSERT INTO storage_sync_mirror (guid, ext_id, data) + VALUES ('guid', 'ext_id', '{"foo":"new"}') + "#, + [], + )?; + let incoming = get_incoming(&tx)?; + assert_eq!(incoming.len(), 1); + assert_eq!( + incoming[0].1, + IncomingState::NotLocal { + ext_id: "ext_id".to_string(), + incoming: DataState::Exists(map!({"foo": "bar"})), + mirror: DataState::Exists(map!({"foo": "new"})), + } + ); + + // and finally the data itself - might as use the API here! + api::set(&tx, "ext_id", json!({"foo": "local"}))?; + let incoming = get_incoming(&tx)?; + assert_eq!(incoming.len(), 1); + assert_eq!( + incoming[0].1, + IncomingState::Everywhere { + ext_id: "ext_id".to_string(), + incoming: DataState::Exists(map!({"foo": "bar"})), + local: DataState::Exists(map!({"foo": "local"})), + mirror: DataState::Exists(map!({"foo": "new"})), + } + ); + Ok(()) + } + + // Like test_fetch_incoming_state, but check NULLs are handled correctly. + #[test] + fn test_fetch_incoming_state_nulls() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + + // Start with a tombstone just in staging. + tx.execute( + r#" + INSERT INTO temp.storage_sync_staging (guid, ext_id, data) + VALUES ('guid', NULL, NULL) + "#, + [], + )?; + + let incoming = get_incoming(&tx)?; + assert_eq!(incoming.len(), 1); + assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone,); + + // Add the same item to the mirror (can't store an ext_id for a + // tombstone in the mirror as incoming tombstones never have it) + tx.execute( + r#" + INSERT INTO storage_sync_mirror (guid, ext_id, data) + VALUES ('guid', NULL, NULL) + "#, + [], + )?; + let incoming = get_incoming(&tx)?; + assert_eq!(incoming.len(), 1); + assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone); + + tx.execute( + r#" + INSERT INTO storage_sync_data (ext_id, data) + VALUES ('ext_id', NULL) + "#, + [], + )?; + let incoming = get_incoming(&tx)?; + assert_eq!(incoming.len(), 1); + assert_eq!( + incoming[0].1, + // IncomingOnly* seems a little odd, but it is because we can't + // tie the tombstones together due to the lack of any ext-id/guid + // mapping in this case. + IncomingState::IncomingOnlyTombstone + ); + Ok(()) + } + + // apply_action tests. + #[derive(Debug, PartialEq)] + struct LocalItem { + data: DataState, + sync_change_counter: i32, + } + + fn get_local_item(conn: &Connection) -> Option<LocalItem> { + conn.try_query_row::<_, Error, _, _>( + "SELECT data, sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_id'", + [], + |row| { + let data = json_map_from_row(row, "data")?; + let sync_change_counter = row.get::<_, i32>(1)?; + Ok(LocalItem { + data, + sync_change_counter, + }) + }, + true, + ) + .expect("query should work") + } + + fn get_applied_item_changes(conn: &Connection) -> Option<StorageChanges> { + // no custom deserialize for storagechanges and we only need it for + // tests, so do it manually. + conn.try_query_row::<_, Error, _, _>( + "SELECT changes FROM temp.storage_sync_applied WHERE ext_id = 'ext_id'", + [], + |row| Ok(serde_json::from_str(&row.get::<_, String>("changes")?)?), + true, + ) + .expect("query should work") + .map(|val: serde_json::Value| { + let ob = val.as_object().expect("should be an object of items"); + let mut result = StorageChanges::with_capacity(ob.len()); + for (key, val) in ob.into_iter() { + let details = val.as_object().expect("elts should be objects"); + result.push(StorageValueChange { + key: key.to_string(), + old_value: details.get("oldValue").cloned(), + new_value: details.get("newValue").cloned(), + }); + } + result + }) + } + + fn do_apply_action(tx: &Transaction<'_>, action: IncomingAction) { + let guid = SyncGuid::new("guid"); + apply_actions(tx, vec![(guid, action)], &NeverInterrupts).expect("should apply"); + } + + #[test] + fn test_apply_actions() -> Result<()> { + let mut db = new_syncable_mem_db(); + + // DeleteLocally - row should be entirely removed. + let tx = db.transaction().expect("transaction should work"); + api::set(&tx, "ext_id", json!({"foo": "local"}))?; + assert_eq!( + api::get(&tx, "ext_id", json!(null))?, + json!({"foo": "local"}) + ); + let changes = changes![change!("foo", "local", None)]; + do_apply_action( + &tx, + IncomingAction::DeleteLocally { + ext_id: "ext_id".to_string(), + changes: changes.clone(), + }, + ); + assert_eq!(api::get(&tx, "ext_id", json!(null))?, json!({})); + // and there should not be a local record at all. + assert!(get_local_item(&tx).is_none()); + assert_eq!(get_applied_item_changes(&tx), Some(changes)); + tx.rollback()?; + + // TakeRemote - replace local data with remote and marked as not dirty. + let tx = db.transaction().expect("transaction should work"); + api::set(&tx, "ext_id", json!({"foo": "local"}))?; + assert_eq!( + api::get(&tx, "ext_id", json!(null))?, + json!({"foo": "local"}) + ); + // data should exist locally with a change recorded. + assert_eq!( + get_local_item(&tx), + Some(LocalItem { + data: DataState::Exists(map!({"foo": "local"})), + sync_change_counter: 1 + }) + ); + let changes = changes![change!("foo", "local", "remote")]; + do_apply_action( + &tx, + IncomingAction::TakeRemote { + ext_id: "ext_id".to_string(), + data: map!({"foo": "remote"}), + changes: changes.clone(), + }, + ); + // data should exist locally with the remote data and not be dirty. + assert_eq!( + get_local_item(&tx), + Some(LocalItem { + data: DataState::Exists(map!({"foo": "remote"})), + sync_change_counter: 0 + }) + ); + assert_eq!(get_applied_item_changes(&tx), Some(changes)); + tx.rollback()?; + + // Merge - like ::TakeRemote, but data remains dirty. + let tx = db.transaction().expect("transaction should work"); + api::set(&tx, "ext_id", json!({"foo": "local"}))?; + assert_eq!( + api::get(&tx, "ext_id", json!(null))?, + json!({"foo": "local"}) + ); + // data should exist locally with a change recorded. + assert_eq!( + get_local_item(&tx), + Some(LocalItem { + data: DataState::Exists(map!({"foo": "local"})), + sync_change_counter: 1 + }) + ); + let changes = changes![change!("foo", "local", "remote")]; + do_apply_action( + &tx, + IncomingAction::Merge { + ext_id: "ext_id".to_string(), + data: map!({"foo": "remote"}), + changes: changes.clone(), + }, + ); + assert_eq!( + get_local_item(&tx), + Some(LocalItem { + data: DataState::Exists(map!({"foo": "remote"})), + sync_change_counter: 2 + }) + ); + assert_eq!(get_applied_item_changes(&tx), Some(changes)); + tx.rollback()?; + + // Same - data stays the same but is marked not dirty. + let tx = db.transaction().expect("transaction should work"); + api::set(&tx, "ext_id", json!({"foo": "local"}))?; + assert_eq!( + api::get(&tx, "ext_id", json!(null))?, + json!({"foo": "local"}) + ); + // data should exist locally with a change recorded. + assert_eq!( + get_local_item(&tx), + Some(LocalItem { + data: DataState::Exists(map!({"foo": "local"})), + sync_change_counter: 1 + }) + ); + do_apply_action( + &tx, + IncomingAction::Same { + ext_id: "ext_id".to_string(), + }, + ); + assert_eq!( + get_local_item(&tx), + Some(LocalItem { + data: DataState::Exists(map!({"foo": "local"})), + sync_change_counter: 0 + }) + ); + assert_eq!(get_applied_item_changes(&tx), None); + tx.rollback()?; + + Ok(()) + } +} diff --git a/third_party/rust/webext-storage/src/sync/mod.rs b/third_party/rust/webext-storage/src/sync/mod.rs new file mode 100644 index 0000000000..3144049dca --- /dev/null +++ b/third_party/rust/webext-storage/src/sync/mod.rs @@ -0,0 +1,429 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +mod bridge; +mod incoming; +mod outgoing; + +#[cfg(test)] +mod sync_tests; + +use crate::api::{StorageChanges, StorageValueChange}; +use crate::db::StorageDb; +use crate::error::*; +use serde::Deserialize; +use serde_derive::*; +use sql_support::ConnExt; +use sync_guid::Guid as SyncGuid; + +pub use bridge::BridgedEngine; +use incoming::IncomingAction; + +type JsonMap = serde_json::Map<String, serde_json::Value>; + +pub const STORAGE_VERSION: usize = 1; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WebextRecord { + #[serde(rename = "id")] + guid: SyncGuid, + #[serde(rename = "extId")] + ext_id: String, + data: String, +} + +// Perform a 2-way or 3-way merge, where the incoming value wins on confict. +fn merge( + ext_id: String, + mut other: JsonMap, + mut ours: JsonMap, + parent: Option<JsonMap>, +) -> IncomingAction { + if other == ours { + return IncomingAction::Same { ext_id }; + } + let old_incoming = other.clone(); + // worst case is keys in each are unique. + let mut changes = StorageChanges::with_capacity(other.len() + ours.len()); + if let Some(parent) = parent { + // Perform 3-way merge. First, for every key in parent, + // compare the parent value with the incoming value to compute + // an implicit "diff". + for (key, parent_value) in parent.into_iter() { + if let Some(incoming_value) = other.remove(&key) { + if incoming_value != parent_value { + log::trace!( + "merge: key {} was updated in incoming - copying value locally", + key + ); + let old_value = ours.remove(&key); + let new_value = Some(incoming_value.clone()); + if old_value != new_value { + changes.push(StorageValueChange { + key: key.clone(), + old_value, + new_value, + }); + } + ours.insert(key, incoming_value); + } + } else { + // Key was not present in incoming value. + // Another client must have deleted it. + log::trace!( + "merge: key {} no longer present in incoming - removing it locally", + key + ); + if let Some(old_value) = ours.remove(&key) { + changes.push(StorageValueChange { + key, + old_value: Some(old_value), + new_value: None, + }); + } + } + } + + // Then, go through every remaining key in incoming. These are + // the ones where a corresponding key does not exist in + // parent, so it is a new key, and we need to add it. + for (key, incoming_value) in other.into_iter() { + log::trace!( + "merge: key {} doesn't occur in parent - copying from incoming", + key + ); + changes.push(StorageValueChange { + key: key.clone(), + old_value: None, + new_value: Some(incoming_value.clone()), + }); + ours.insert(key, incoming_value); + } + } else { + // No parent. Server wins. Overwrite every key in ours with + // the corresponding value in other. + log::trace!("merge: no parent - copying all keys from incoming"); + for (key, incoming_value) in other.into_iter() { + let old_value = ours.remove(&key); + let new_value = Some(incoming_value.clone()); + if old_value != new_value { + changes.push(StorageValueChange { + key: key.clone(), + old_value, + new_value, + }); + } + ours.insert(key, incoming_value); + } + } + + if ours == old_incoming { + IncomingAction::TakeRemote { + ext_id, + data: old_incoming, + changes, + } + } else { + IncomingAction::Merge { + ext_id, + data: ours, + changes, + } + } +} + +fn remove_matching_keys(mut ours: JsonMap, keys_to_remove: &JsonMap) -> (JsonMap, StorageChanges) { + let mut changes = StorageChanges::with_capacity(keys_to_remove.len()); + for key in keys_to_remove.keys() { + if let Some(old_value) = ours.remove(key) { + changes.push(StorageValueChange { + key: key.clone(), + old_value: Some(old_value), + new_value: None, + }); + } + } + (ours, changes) +} + +/// Holds a JSON-serialized map of all synced changes for an extension. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct SyncedExtensionChange { + /// The extension ID. + pub ext_id: String, + /// The contents of a `StorageChanges` struct, in JSON format. We don't + /// deserialize these because they need to be passed back to the browser + /// as strings anyway. + pub changes: String, +} + +// Fetches the applied changes we stashed in the storage_sync_applied table. +pub fn get_synced_changes(db: &StorageDb) -> Result<Vec<SyncedExtensionChange>> { + let signal = db.begin_interrupt_scope()?; + let sql = "SELECT ext_id, changes FROM temp.storage_sync_applied"; + db.conn().query_rows_and_then(sql, [], |row| -> Result<_> { + signal.err_if_interrupted()?; + Ok(SyncedExtensionChange { + ext_id: row.get("ext_id")?, + changes: row.get("changes")?, + }) + }) +} + +// Helpers for tests +#[cfg(test)] +pub mod test { + use crate::db::{test::new_mem_db, StorageDb}; + use crate::schema::create_empty_sync_temp_tables; + + pub fn new_syncable_mem_db() -> StorageDb { + let _ = env_logger::try_init(); + let db = new_mem_db(); + create_empty_sync_temp_tables(&db).expect("should work"); + db + } +} + +#[cfg(test)] +mod tests { + use super::test::new_syncable_mem_db; + use super::*; + use serde_json::json; + + #[test] + fn test_serde_record_ser() { + assert_eq!( + serde_json::to_string(&WebextRecord { + guid: "guid".into(), + ext_id: "ext_id".to_string(), + data: "data".to_string() + }) + .unwrap(), + r#"{"id":"guid","extId":"ext_id","data":"data"}"# + ); + } + + // a macro for these tests - constructs a serde_json::Value::Object + macro_rules! map { + ($($map:tt)+) => { + json!($($map)+).as_object().unwrap().clone() + }; + } + + macro_rules! change { + ($key:literal, None, None) => { + StorageValueChange { + key: $key.to_string(), + old_value: None, + new_value: None, + }; + }; + ($key:literal, $old:tt, None) => { + StorageValueChange { + key: $key.to_string(), + old_value: Some(json!($old)), + new_value: None, + } + }; + ($key:literal, None, $new:tt) => { + StorageValueChange { + key: $key.to_string(), + old_value: None, + new_value: Some(json!($new)), + } + }; + ($key:literal, $old:tt, $new:tt) => { + StorageValueChange { + key: $key.to_string(), + old_value: Some(json!($old)), + new_value: Some(json!($new)), + } + }; + } + macro_rules! changes { + ( ) => { + StorageChanges::new() + }; + ( $( $change:expr ),* ) => { + { + let mut changes = StorageChanges::new(); + $( + changes.push($change); + )* + changes + } + }; + } + + #[test] + fn test_3way_merging() { + // No conflict - identical local and remote. + assert_eq!( + merge( + "ext-id".to_string(), + map!({"one": "one", "two": "two"}), + map!({"two": "two", "one": "one"}), + Some(map!({"parent_only": "parent"})), + ), + IncomingAction::Same { + ext_id: "ext-id".to_string() + } + ); + assert_eq!( + merge( + "ext-id".to_string(), + map!({"other_only": "other", "common": "common"}), + map!({"ours_only": "ours", "common": "common"}), + Some(map!({"parent_only": "parent", "common": "old_common"})), + ), + IncomingAction::Merge { + ext_id: "ext-id".to_string(), + data: map!({"other_only": "other", "ours_only": "ours", "common": "common"}), + changes: changes![change!("other_only", None, "other")], + } + ); + // Simple conflict - parent value is neither local nor incoming. incoming wins. + assert_eq!( + merge( + "ext-id".to_string(), + map!({"other_only": "other", "common": "incoming"}), + map!({"ours_only": "ours", "common": "local"}), + Some(map!({"parent_only": "parent", "common": "parent"})), + ), + IncomingAction::Merge { + ext_id: "ext-id".to_string(), + data: map!({"other_only": "other", "ours_only": "ours", "common": "incoming"}), + changes: changes![ + change!("common", "local", "incoming"), + change!("other_only", None, "other") + ], + } + ); + // Local change, no conflict. + assert_eq!( + merge( + "ext-id".to_string(), + map!({"other_only": "other", "common": "old_value"}), + map!({"ours_only": "ours", "common": "new_value"}), + Some(map!({"parent_only": "parent", "common": "old_value"})), + ), + IncomingAction::Merge { + ext_id: "ext-id".to_string(), + data: map!({"other_only": "other", "ours_only": "ours", "common": "new_value"}), + changes: changes![change!("other_only", None, "other")], + } + ); + // Field was removed remotely. + assert_eq!( + merge( + "ext-id".to_string(), + map!({"other_only": "other"}), + map!({"common": "old_value"}), + Some(map!({"common": "old_value"})), + ), + IncomingAction::TakeRemote { + ext_id: "ext-id".to_string(), + data: map!({"other_only": "other"}), + changes: changes![ + change!("common", "old_value", None), + change!("other_only", None, "other") + ], + } + ); + // Field was removed remotely but we added another one. + assert_eq!( + merge( + "ext-id".to_string(), + map!({"other_only": "other"}), + map!({"common": "old_value", "new_key": "new_value"}), + Some(map!({"common": "old_value"})), + ), + IncomingAction::Merge { + ext_id: "ext-id".to_string(), + data: map!({"other_only": "other", "new_key": "new_value"}), + changes: changes![ + change!("common", "old_value", None), + change!("other_only", None, "other") + ], + } + ); + // Field was removed both remotely and locally. + assert_eq!( + merge( + "ext-id".to_string(), + map!({}), + map!({"new_key": "new_value"}), + Some(map!({"common": "old_value"})), + ), + IncomingAction::Merge { + ext_id: "ext-id".to_string(), + data: map!({"new_key": "new_value"}), + changes: changes![], + } + ); + } + + #[test] + fn test_remove_matching_keys() { + assert_eq!( + remove_matching_keys( + map!({"key1": "value1", "key2": "value2"}), + &map!({"key1": "ignored", "key3": "ignored"}) + ), + ( + map!({"key2": "value2"}), + changes![change!("key1", "value1", None)] + ) + ); + } + + #[test] + fn test_get_synced_changes() -> Result<()> { + let db = new_syncable_mem_db(); + db.execute_batch(&format!( + r#"INSERT INTO temp.storage_sync_applied (ext_id, changes) + VALUES + ('an-extension', '{change1}'), + ('ext"id', '{change2}') + "#, + change1 = serde_json::to_string(&changes![change!("key1", "old-val", None)])?, + change2 = serde_json::to_string(&changes![change!("key-for-second", None, "new-val")])? + ))?; + let changes = get_synced_changes(&db)?; + assert_eq!(changes[0].ext_id, "an-extension"); + // sanity check it's valid! + let c1: JsonMap = + serde_json::from_str(&changes[0].changes).expect("changes must be an object"); + assert_eq!( + c1.get("key1") + .expect("must exist") + .as_object() + .expect("must be an object") + .get("oldValue"), + Some(&json!("old-val")) + ); + + // phew - do it again to check the string got escaped. + assert_eq!( + changes[1], + SyncedExtensionChange { + ext_id: "ext\"id".into(), + changes: r#"{"key-for-second":{"newValue":"new-val"}}"#.into(), + } + ); + assert_eq!(changes[1].ext_id, "ext\"id"); + let c2: JsonMap = + serde_json::from_str(&changes[1].changes).expect("changes must be an object"); + assert_eq!( + c2.get("key-for-second") + .expect("must exist") + .as_object() + .expect("must be an object") + .get("newValue"), + Some(&json!("new-val")) + ); + Ok(()) + } +} diff --git a/third_party/rust/webext-storage/src/sync/outgoing.rs b/third_party/rust/webext-storage/src/sync/outgoing.rs new file mode 100644 index 0000000000..c6c9dd64e5 --- /dev/null +++ b/third_party/rust/webext-storage/src/sync/outgoing.rs @@ -0,0 +1,186 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// The "outgoing" part of syncing - building the payloads to upload and +// managing the sync state of the local DB. + +use interrupt_support::Interruptee; +use rusqlite::{Connection, Row, Transaction}; +use sql_support::ConnExt; +use sync15::bso::OutgoingBso; +use sync_guid::Guid as SyncGuid; + +use crate::error::*; + +use super::WebextRecord; + +fn outgoing_from_row(row: &Row<'_>) -> Result<OutgoingBso> { + let guid: SyncGuid = row.get("guid")?; + let ext_id: String = row.get("ext_id")?; + let raw_data: Option<String> = row.get("data")?; + Ok(match raw_data { + Some(raw_data) => { + let record = WebextRecord { + guid, + ext_id, + data: raw_data, + }; + OutgoingBso::from_content_with_id(record)? + } + None => OutgoingBso::new_tombstone(guid.into()), + }) +} + +/// Stages info about what should be uploaded in a temp table. This should be +/// called in the same transaction as `apply_actions`. record_uploaded() can be +/// called after the upload is complete and the data in the temp table will be +/// used to update the local store. +pub fn stage_outgoing(tx: &Transaction<'_>) -> Result<()> { + let sql = " + -- Stage outgoing items. The item may not yet have a GUID (ie, it might + -- not already be in either the mirror nor the incoming staging table), + -- so we generate one if it doesn't exist. + INSERT INTO storage_sync_outgoing_staging + (guid, ext_id, data, sync_change_counter) + SELECT coalesce(m.guid, s.guid, generate_guid()), + l.ext_id, l.data, l.sync_change_counter + FROM storage_sync_data l + -- left joins as one or both may not exist. + LEFT JOIN storage_sync_mirror m ON m.ext_id = l.ext_id + LEFT JOIN storage_sync_staging s ON s.ext_id = l.ext_id + WHERE sync_change_counter > 0; + + -- At this point, we've merged in all new records, so copy incoming + -- staging into the mirror so that it matches what's on the server. + INSERT OR REPLACE INTO storage_sync_mirror (guid, ext_id, data) + SELECT guid, ext_id, data FROM temp.storage_sync_staging; + + -- And copy any incoming records that we aren't reuploading into the + -- local table. We'll copy the outgoing ones into the mirror and local + -- after we upload them. + INSERT OR REPLACE INTO storage_sync_data (ext_id, data, sync_change_counter) + SELECT ext_id, data, 0 + FROM storage_sync_staging s + WHERE ext_id IS NOT NULL + AND NOT EXISTS(SELECT 1 FROM storage_sync_outgoing_staging o + WHERE o.guid = s.guid);"; + tx.execute_batch(sql)?; + Ok(()) +} + +/// Returns a vec of the outgoing records which should be uploaded. +pub fn get_outgoing(conn: &Connection, signal: &dyn Interruptee) -> Result<Vec<OutgoingBso>> { + let sql = "SELECT guid, ext_id, data FROM storage_sync_outgoing_staging"; + let elts = conn + .conn() + .query_rows_and_then(sql, [], |row| -> Result<_> { + signal.err_if_interrupted()?; + outgoing_from_row(row) + })?; + + log::debug!("get_outgoing found {} items", elts.len()); + Ok(elts.into_iter().collect()) +} + +/// Record the fact that items were uploaded. This updates the state of the +/// local DB to reflect the state of the server we just updated. +/// Note that this call is almost certainly going to be made in a *different* +/// transaction than the transaction used in `stage_outgoing()`, and it will +/// be called once per batch upload. +pub fn record_uploaded( + tx: &Transaction<'_>, + items: &[SyncGuid], + signal: &dyn Interruptee, +) -> Result<()> { + log::debug!( + "record_uploaded recording that {} items were uploaded", + items.len() + ); + + // Updating the `was_uploaded` column fires the `record_uploaded` trigger, + // which updates the local change counter and writes the uploaded record + // data back to the mirror. + sql_support::each_chunk(items, |chunk, _| -> Result<()> { + signal.err_if_interrupted()?; + let sql = format!( + "UPDATE storage_sync_outgoing_staging SET + was_uploaded = 1 + WHERE guid IN ({})", + sql_support::repeat_sql_vars(chunk.len()), + ); + tx.execute(&sql, rusqlite::params_from_iter(chunk))?; + Ok(()) + })?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::super::test::new_syncable_mem_db; + use super::*; + use interrupt_support::NeverInterrupts; + + #[test] + fn test_simple() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + + tx.execute_batch( + r#" + INSERT INTO storage_sync_data (ext_id, data, sync_change_counter) + VALUES + ('ext_no_changes', '{"foo":"bar"}', 0), + ('ext_with_changes', '{"foo":"bar"}', 1); + "#, + )?; + + stage_outgoing(&tx)?; + let changes = get_outgoing(&tx, &NeverInterrupts)?; + assert_eq!(changes.len(), 1); + let record: serde_json::Value = serde_json::from_str(&changes[0].payload).unwrap(); + let ext_id = record.get("extId").unwrap().as_str().unwrap(); + + assert_eq!(ext_id, "ext_with_changes"); + + record_uploaded( + &tx, + changes + .into_iter() + .map(|p| p.envelope.id) + .collect::<Vec<SyncGuid>>() + .as_slice(), + &NeverInterrupts, + )?; + + let counter: i32 = tx.conn().query_one( + "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_with_changes'", + )?; + assert_eq!(counter, 0); + Ok(()) + } + + #[test] + fn test_payload_serialization() { + let record = WebextRecord { + guid: SyncGuid::new("guid"), + ext_id: "ext-id".to_string(), + data: "{}".to_string(), + }; + + let outgoing = OutgoingBso::from_content_with_id(record).unwrap(); + + // The envelope should have our ID. + assert_eq!(outgoing.envelope.id, "guid"); + + let outgoing_payload = + serde_json::from_str::<serde_json::Value>(&outgoing.payload).unwrap(); + let outgoing_map = outgoing_payload.as_object().unwrap(); + + assert!(outgoing_map.contains_key("id")); + assert!(outgoing_map.contains_key("data")); + assert!(outgoing_map.contains_key("extId")); + assert_eq!(outgoing_map.len(), 3); + } +} diff --git a/third_party/rust/webext-storage/src/sync/sync_tests.rs b/third_party/rust/webext-storage/src/sync/sync_tests.rs new file mode 100644 index 0000000000..6940be2e59 --- /dev/null +++ b/third_party/rust/webext-storage/src/sync/sync_tests.rs @@ -0,0 +1,529 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// This file tries to simulate "full syncs" - ie, from local state changing, to +// fetching incoming items, generating items to upload, then updating the local +// state (including the mirror) as a result. + +use crate::api::{clear, get, set}; +use crate::error::*; +use crate::schema::create_empty_sync_temp_tables; +use crate::sync::incoming::{apply_actions, get_incoming, plan_incoming, stage_incoming}; +use crate::sync::outgoing::{get_outgoing, record_uploaded, stage_outgoing}; +use crate::sync::test::new_syncable_mem_db; +use crate::sync::WebextRecord; +use interrupt_support::NeverInterrupts; +use rusqlite::{Connection, Row, Transaction}; +use serde_json::json; +use sql_support::ConnExt; +use sync15::bso::{IncomingBso, IncomingContent, OutgoingBso}; +use sync_guid::Guid; + +// Here we try and simulate everything done by a "full sync", just minus the +// engine. Returns the records we uploaded. +fn do_sync( + tx: &Transaction<'_>, + incoming_payloads: &[IncomingContent<WebextRecord>], +) -> Result<Vec<OutgoingBso>> { + log::info!("test do_sync() starting"); + // First we stage the incoming in the temp tables. + stage_incoming(tx, incoming_payloads, &NeverInterrupts)?; + // Then we process them getting a Vec of (item, state), which we turn into + // a Vec of (item, action) + let actions = get_incoming(tx)? + .into_iter() + .map(|(item, state)| (item, plan_incoming(state))) + .collect(); + log::debug!("do_sync applying {:?}", actions); + apply_actions(tx, actions, &NeverInterrupts)?; + // So we've done incoming - do outgoing. + stage_outgoing(tx)?; + let outgoing = get_outgoing(tx, &NeverInterrupts)?; + log::debug!("do_sync has outgoing {:?}", outgoing); + record_uploaded( + tx, + outgoing + .iter() + .map(|p| p.envelope.id.clone()) + .collect::<Vec<Guid>>() + .as_slice(), + &NeverInterrupts, + )?; + create_empty_sync_temp_tables(tx)?; + log::info!("test do_sync() complete"); + Ok(outgoing) +} + +// Check *both* the mirror and local API have ended up with the specified data. +fn check_finished_with(conn: &Connection, ext_id: &str, val: serde_json::Value) -> Result<()> { + let local = get(conn, ext_id, serde_json::Value::Null)?; + assert_eq!(local, val); + let guid = get_mirror_guid(conn, ext_id)?; + let mirror = get_mirror_data(conn, &guid); + assert_eq!(mirror, DbData::Data(val.to_string())); + // and there should be zero items with a change counter. + let count = conn.query_row_and_then( + "SELECT COUNT(*) FROM storage_sync_data WHERE sync_change_counter != 0;", + [], + |row| row.get::<_, u32>(0), + )?; + assert_eq!(count, 0); + Ok(()) +} + +fn get_mirror_guid(conn: &Connection, extid: &str) -> Result<Guid> { + let guid = conn.query_row_and_then( + "SELECT m.guid FROM storage_sync_mirror m WHERE m.ext_id = ?;", + [extid], + |row| row.get::<_, Guid>(0), + )?; + Ok(guid) +} + +#[derive(Debug, PartialEq)] +enum DbData { + NoRow, + NullRow, + Data(String), +} + +impl DbData { + fn has_data(&self) -> bool { + matches!(self, DbData::Data(_)) + } +} + +fn _get(conn: &Connection, id_name: &str, expected_extid: &str, table: &str) -> DbData { + let sql = format!("SELECT {} as id, data FROM {}", id_name, table); + + fn from_row(row: &Row<'_>) -> Result<(String, Option<String>)> { + Ok((row.get("id")?, row.get("data")?)) + } + let mut items = conn + .conn() + .query_rows_and_then(&sql, [], from_row) + .expect("should work"); + if items.is_empty() { + DbData::NoRow + } else { + let item = items.pop().expect("it exists"); + assert_eq!(Guid::new(&item.0), expected_extid); + match item.1 { + None => DbData::NullRow, + Some(v) => DbData::Data(v), + } + } +} + +fn get_mirror_data(conn: &Connection, expected_extid: &str) -> DbData { + _get(conn, "guid", expected_extid, "storage_sync_mirror") +} + +fn get_local_data(conn: &Connection, expected_extid: &str) -> DbData { + _get(conn, "ext_id", expected_extid, "storage_sync_data") +} + +fn make_incoming( + guid: &Guid, + ext_id: &str, + data: &serde_json::Value, +) -> IncomingContent<WebextRecord> { + let content = json!({"id": guid, "extId": ext_id, "data": data.to_string()}); + IncomingBso::from_test_content(content).into_content() +} + +fn make_incoming_tombstone(guid: &Guid) -> IncomingContent<WebextRecord> { + IncomingBso::new_test_tombstone(guid.clone()).into_content() +} + +#[test] +fn test_simple_outgoing_sync() -> Result<()> { + // So we are starting with an empty local store and empty server store. + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data.clone())?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + check_finished_with(&tx, "ext-id", data)?; + Ok(()) +} + +#[test] +fn test_simple_incoming_sync() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + let bridge_record = make_incoming(&Guid::new("guid"), "ext-id", &data); + assert_eq!(do_sync(&tx, &[bridge_record])?.len(), 0); + let key1_from_api = get(&tx, "ext-id", json!("key1"))?; + assert_eq!(key1_from_api, json!({"key1": "key1-value"})); + check_finished_with(&tx, "ext-id", data)?; + Ok(()) +} + +#[test] +fn test_outgoing_tombstone() -> Result<()> { + // Tombstones are only kept when the mirror has that record - so first + // test that, then arrange for the mirror to have the record. + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data.clone())?; + assert_eq!( + get_local_data(&tx, "ext-id"), + DbData::Data(data.to_string()) + ); + // hasn't synced yet, so clearing shouldn't write a tombstone. + clear(&tx, "ext-id")?; + assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow); + // now set data again and sync and *then* remove. + set(&tx, "ext-id", data)?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + assert!(get_local_data(&tx, "ext-id").has_data()); + let guid = get_mirror_guid(&tx, "ext-id")?; + assert!(get_mirror_data(&tx, &guid).has_data()); + clear(&tx, "ext-id")?; + assert_eq!(get_local_data(&tx, "ext-id"), DbData::NullRow); + // then after syncing, the tombstone will be in the mirror but the local row + // has been removed. + assert_eq!(do_sync(&tx, &[])?.len(), 1); + assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow); + assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow); + Ok(()) +} + +#[test] +fn test_incoming_tombstone_exists() -> Result<()> { + // An incoming tombstone for a record we've previously synced (and thus + // have data for) + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data.clone())?; + assert_eq!( + get_local_data(&tx, "ext-id"), + DbData::Data(data.to_string()) + ); + // sync to get data in our mirror. + assert_eq!(do_sync(&tx, &[])?.len(), 1); + assert!(get_local_data(&tx, "ext-id").has_data()); + let guid = get_mirror_guid(&tx, "ext-id")?; + assert!(get_mirror_data(&tx, &guid).has_data()); + // Now an incoming tombstone for it. + let tombstone = make_incoming_tombstone(&guid); + assert_eq!( + do_sync(&tx, &[tombstone])?.len(), + 0, + "expect no outgoing records" + ); + assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow); + assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow); + Ok(()) +} + +#[test] +fn test_incoming_tombstone_not_exists() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + // An incoming tombstone for something that's not anywhere locally. + let guid = Guid::new("guid"); + let tombstone = make_incoming_tombstone(&guid); + assert_eq!( + do_sync(&tx, &[tombstone])?.len(), + 0, + "expect no outgoing records" + ); + // But we still keep the tombstone in the mirror. + assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow); + assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow); + Ok(()) +} + +#[test] +fn test_reconciled() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value"}); + set(&tx, "ext-id", data)?; + // Incoming payload with the same data + let record = make_incoming(&Guid::new("guid"), "ext-id", &json!({"key1": "key1-value"})); + // Should be no outgoing records as we reconciled. + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + check_finished_with(&tx, "ext-id", json!({"key1": "key1-value"}))?; + Ok(()) +} + +/// Tests that we handle things correctly if we get a payload that is +/// identical to what is in the mirrored table. +#[test] +fn test_reconcile_with_null_payload() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value"}); + set(&tx, "ext-id", data.clone())?; + // We try to push this change on the next sync. + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + assert_eq!(get_mirror_data(&tx, &guid), DbData::Data(data.to_string())); + // Incoming payload with the same data. + // This could happen if, for example, another client changed the + // key and then put it back the way it was. + let record = make_incoming(&guid, "ext-id", &data); + // Should be no outgoing records as we reconciled. + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + check_finished_with(&tx, "ext-id", data)?; + Ok(()) +} + +#[test] +fn test_accept_incoming_when_local_is_deleted() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + // We only record an extension as deleted locally if it has been + // uploaded before being deleted. + let data = json!({"key1": "key1-value"}); + set(&tx, "ext-id", data)?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + clear(&tx, "ext-id")?; + // Incoming payload without 'key1'. Because we previously uploaded + // key1, this means another client deleted it. + let record = make_incoming(&guid, "ext-id", &json!({"key2": "key2-value"})); + + // We completely accept the incoming record. + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?; + Ok(()) +} + +#[test] +fn test_accept_incoming_when_local_is_deleted_no_mirror() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value"}); + set(&tx, "ext-id", data)?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + clear(&tx, "ext-id")?; + + // Use a random guid so that we don't find the mirrored data. + // This test is somewhat bad because deduping might obviate + // the need for it. + let record = make_incoming(&Guid::new("guid"), "ext-id", &json!({"key2": "key2-value"})); + + // We completely accept the incoming record. + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?; + Ok(()) +} + +#[test] +fn test_accept_deleted_key_mirrored() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data)?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + // Incoming payload without 'key1'. Because we previously uploaded + // key1, this means another client deleted it. + let record = make_incoming(&guid, "ext-id", &json!({"key2": "key2-value"})); + // We completely accept the incoming record. + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?; + Ok(()) +} + +#[test] +fn test_merged_no_mirror() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value"}); + set(&tx, "ext-id", data)?; + // Incoming payload without 'key1' and some data for 'key2'. + // Because we never uploaded 'key1', we merge our local values + // with the remote. + let record = make_incoming(&Guid::new("guid"), "ext-id", &json!({"key2": "key2-value"})); + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with( + &tx, + "ext-id", + json!({"key1": "key1-value", "key2": "key2-value"}), + )?; + Ok(()) +} + +#[test] +fn test_merged_incoming() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let old_data = json!({"key1": "key1-value", "key2": "key2-value", "doomed_key": "deletable"}); + set(&tx, "ext-id", old_data)?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + // We update 'key1' locally. + let local_data = json!({"key1": "key1-new", "key2": "key2-value", "doomed_key": "deletable"}); + set(&tx, "ext-id", local_data)?; + // Incoming payload where another client set 'key2' and removed + // the 'doomed_key'. + // Because we never uploaded our data, we'll merge our + // key1 in, but otherwise keep the server's changes. + let record = make_incoming( + &guid, + "ext-id", + &json!({"key1": "key1-value", "key2": "key2-incoming"}), + ); + // We should send our 'key1' + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with( + &tx, + "ext-id", + json!({"key1": "key1-new", "key2": "key2-incoming"}), + )?; + Ok(()) +} + +#[test] +fn test_merged_with_null_payload() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let old_data = json!({"key1": "key1-value"}); + set(&tx, "ext-id", old_data.clone())?; + // Push this change remotely. + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + assert_eq!( + get_mirror_data(&tx, &guid), + DbData::Data(old_data.to_string()) + ); + let local_data = json!({"key1": "key1-new", "key2": "key2-value"}); + set(&tx, "ext-id", local_data.clone())?; + // Incoming payload with the same old data. + let record = make_incoming(&guid, "ext-id", &old_data); + // Three-way-merge will not detect any change in key1, so we + // should keep our entire new value. + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with(&tx, "ext-id", local_data)?; + Ok(()) +} + +#[test] +fn test_deleted_mirrored_object_accept() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data)?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + // Incoming payload with data deleted. + // We synchronize this deletion by deleting the keys we think + // were on the server. + let record = make_incoming_tombstone(&guid); + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow); + assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow); + Ok(()) +} + +#[test] +fn test_deleted_mirrored_object_merged() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + set(&tx, "ext-id", json!({"key1": "key1-value"}))?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + set( + &tx, + "ext-id", + json!({"key1": "key1-new", "key2": "key2-value"}), + )?; + // Incoming payload with data deleted. + // We synchronize this deletion by deleting the keys we think + // were on the server. + let record = make_incoming_tombstone(&guid); + // This overrides the change to 'key1', but we still upload 'key2'. + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?; + Ok(()) +} + +/// Like the above test, but with a mirrored tombstone. +#[test] +fn test_deleted_mirrored_tombstone_merged() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + // Sync some data so we can get the guid for this extension. + set(&tx, "ext-id", json!({"key1": "key1-value"}))?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + // Sync a delete for this data so we have a tombstone in the mirror. + let record = make_incoming_tombstone(&guid); + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow); + + // Set some data and sync it simultaneously with another incoming delete. + set(&tx, "ext-id", json!({"key2": "key2-value"}))?; + let record = make_incoming_tombstone(&guid); + // We cannot delete any matching keys because there are no + // matching keys. Instead we push our data. + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?; + Ok(()) +} + +#[test] +fn test_deleted_not_mirrored_object_merged() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data)?; + // Incoming payload with data deleted. + let record = make_incoming_tombstone(&Guid::new("guid")); + // We normally delete the keys we think were on the server, but + // here we have no information about what was on the server, so we + // don't delete anything. We merge in all undeleted keys. + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with( + &tx, + "ext-id", + json!({"key1": "key1-value", "key2": "key2-value"}), + )?; + Ok(()) +} + +#[test] +fn test_conflicting_incoming() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data)?; + // Incoming payload without 'key1' and conflicting for 'key2'. + // Because we never uploaded either of our keys, we'll merge our + // key1 in, but the server key2 wins. + let record = make_incoming( + &Guid::new("guid"), + "ext-id", + &json!({"key2": "key2-incoming"}), + ); + // We should send our 'key1' + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with( + &tx, + "ext-id", + json!({"key1": "key1-value", "key2": "key2-incoming"}), + )?; + Ok(()) +} + +#[test] +fn test_invalid_incoming() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let json = json!({"id": "id", "payload": json!("").to_string()}); + let bso = serde_json::from_value::<IncomingBso>(json).unwrap(); + let record = bso.into_content(); + + // Should do nothing. + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + Ok(()) +} |