summaryrefslogtreecommitdiffstats
path: root/third_party/rust/webext-storage/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/webext-storage/src
parentInitial commit. (diff)
downloadfirefox-esr-upstream.tar.xz
firefox-esr-upstream.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/webext-storage/src')
-rw-r--r--third_party/rust/webext-storage/src/api.rs745
-rw-r--r--third_party/rust/webext-storage/src/db.rs301
-rw-r--r--third_party/rust/webext-storage/src/error.rs72
-rw-r--r--third_party/rust/webext-storage/src/ffi.rs48
-rw-r--r--third_party/rust/webext-storage/src/lib.rs26
-rw-r--r--third_party/rust/webext-storage/src/migration.rs454
-rw-r--r--third_party/rust/webext-storage/src/schema.rs213
-rw-r--r--third_party/rust/webext-storage/src/store.rs218
-rw-r--r--third_party/rust/webext-storage/src/sync/bridge.rs388
-rw-r--r--third_party/rust/webext-storage/src/sync/incoming.rs863
-rw-r--r--third_party/rust/webext-storage/src/sync/mod.rs429
-rw-r--r--third_party/rust/webext-storage/src/sync/outgoing.rs186
-rw-r--r--third_party/rust/webext-storage/src/sync/sync_tests.rs529
13 files changed, 4472 insertions, 0 deletions
diff --git a/third_party/rust/webext-storage/src/api.rs b/third_party/rust/webext-storage/src/api.rs
new file mode 100644
index 0000000000..fe838a075d
--- /dev/null
+++ b/third_party/rust/webext-storage/src/api.rs
@@ -0,0 +1,745 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::error::*;
+use rusqlite::{Connection, Transaction};
+use serde::{ser::SerializeMap, Serialize, Serializer};
+
+use serde_json::{Map, Value as JsonValue};
+use sql_support::{self, ConnExt};
+
+// These constants are defined by the chrome.storage.sync spec. We export them
+// publicly from this module, then from the crate, so they wind up in the
+// clients.
+// Note the limits for `chrome.storage.sync` and `chrome.storage.local` are
+// different, and these are from `.sync` - we'll have work to do if we end up
+// wanting this to be used for `.local` too!
+pub const SYNC_QUOTA_BYTES: usize = 102_400;
+pub const SYNC_QUOTA_BYTES_PER_ITEM: usize = 8_192;
+pub const SYNC_MAX_ITEMS: usize = 512;
+// Note there are also constants for "operations per minute" etc, which aren't
+// enforced here.
+
+type JsonMap = Map<String, JsonValue>;
+
+enum StorageChangeOp {
+ Clear,
+ Set(JsonValue),
+ SetWithoutQuota(JsonValue),
+}
+
+fn get_from_db(conn: &Connection, ext_id: &str) -> Result<Option<JsonMap>> {
+ Ok(
+ match conn.try_query_one::<String, _>(
+ "SELECT data FROM storage_sync_data
+ WHERE ext_id = :ext_id",
+ &[(":ext_id", &ext_id)],
+ true,
+ )? {
+ Some(s) => match serde_json::from_str(&s)? {
+ JsonValue::Object(m) => Some(m),
+ // we could panic here as it's theoretically impossible, but we
+ // might as well treat it as not existing...
+ _ => None,
+ },
+ None => None,
+ },
+ )
+}
+
+fn save_to_db(tx: &Transaction<'_>, ext_id: &str, val: &StorageChangeOp) -> Result<()> {
+ // This function also handles removals. Either an empty map or explicit null
+ // is a removal. If there's a mirror record for this extension ID, then we
+ // must leave a tombstone behind for syncing.
+ let is_delete = match val {
+ StorageChangeOp::Clear => true,
+ StorageChangeOp::Set(JsonValue::Object(v)) => v.is_empty(),
+ StorageChangeOp::SetWithoutQuota(JsonValue::Object(v)) => v.is_empty(),
+ _ => false,
+ };
+ if is_delete {
+ let in_mirror = tx
+ .try_query_one(
+ "SELECT EXISTS(SELECT 1 FROM storage_sync_mirror WHERE ext_id = :ext_id);",
+ rusqlite::named_params! {
+ ":ext_id": ext_id,
+ },
+ true,
+ )?
+ .unwrap_or_default();
+ if in_mirror {
+ log::trace!("saving data for '{}': leaving a tombstone", ext_id);
+ tx.execute_cached(
+ "
+ INSERT INTO storage_sync_data(ext_id, data, sync_change_counter)
+ VALUES (:ext_id, NULL, 1)
+ ON CONFLICT (ext_id) DO UPDATE
+ SET data = NULL, sync_change_counter = sync_change_counter + 1",
+ rusqlite::named_params! {
+ ":ext_id": ext_id,
+ },
+ )?;
+ } else {
+ log::trace!("saving data for '{}': removing the row", ext_id);
+ tx.execute_cached(
+ "
+ DELETE FROM storage_sync_data WHERE ext_id = :ext_id",
+ rusqlite::named_params! {
+ ":ext_id": ext_id,
+ },
+ )?;
+ }
+ } else {
+ // Convert to bytes so we can enforce the quota if necessary.
+ let sval = match val {
+ StorageChangeOp::Set(v) => {
+ let sv = v.to_string();
+ if sv.len() > SYNC_QUOTA_BYTES {
+ return Err(ErrorKind::QuotaError(QuotaReason::TotalBytes).into());
+ }
+ sv
+ }
+ StorageChangeOp::SetWithoutQuota(v) => v.to_string(),
+ StorageChangeOp::Clear => unreachable!(),
+ };
+
+ log::trace!("saving data for '{}': writing", ext_id);
+ tx.execute_cached(
+ "INSERT INTO storage_sync_data(ext_id, data, sync_change_counter)
+ VALUES (:ext_id, :data, 1)
+ ON CONFLICT (ext_id) DO UPDATE
+ set data=:data, sync_change_counter = sync_change_counter + 1",
+ rusqlite::named_params! {
+ ":ext_id": ext_id,
+ ":data": &sval,
+ },
+ )?;
+ }
+ Ok(())
+}
+
+fn remove_from_db(tx: &Transaction<'_>, ext_id: &str) -> Result<()> {
+ save_to_db(tx, ext_id, &StorageChangeOp::Clear)
+}
+
+// This is a "helper struct" for the callback part of the chrome.storage spec,
+// but shaped in a way to make it more convenient from the rust side of the
+// world.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct StorageValueChange {
+ #[serde(skip_serializing)]
+ pub key: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub old_value: Option<JsonValue>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub new_value: Option<JsonValue>,
+}
+
+// This is, largely, a helper so that this serializes correctly as per the
+// chrome.storage.sync spec. If not for custom serialization it should just
+// be a plain vec
+#[derive(Debug, Default, Clone, PartialEq, Eq)]
+pub struct StorageChanges {
+ changes: Vec<StorageValueChange>,
+}
+
+impl StorageChanges {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn with_capacity(n: usize) -> Self {
+ Self {
+ changes: Vec::with_capacity(n),
+ }
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.changes.is_empty()
+ }
+
+ pub fn push(&mut self, change: StorageValueChange) {
+ self.changes.push(change)
+ }
+}
+
+// and it serializes as a map.
+impl Serialize for StorageChanges {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ let mut map = serializer.serialize_map(Some(self.changes.len()))?;
+ for change in &self.changes {
+ map.serialize_entry(&change.key, change)?;
+ }
+ map.end()
+ }
+}
+
+// A helper to determine the size of a key/value combination from the
+// perspective of quota and getBytesInUse().
+pub fn get_quota_size_of(key: &str, v: &JsonValue) -> usize {
+ // Reading the chrome docs literally re the quota, the length of the key
+ // is just the string len, but the value is the json val, as bytes.
+ key.len() + v.to_string().len()
+}
+
+/// The implementation of `storage[.sync].set()`. On success this returns the
+/// StorageChanges defined by the chrome API - it's assumed the caller will
+/// arrange to deliver this to observers as defined in that API.
+pub fn set(tx: &Transaction<'_>, ext_id: &str, val: JsonValue) -> Result<StorageChanges> {
+ let val_map = match val {
+ JsonValue::Object(m) => m,
+ // Not clear what the error semantics should be yet. For now, pretend an empty map.
+ _ => Map::new(),
+ };
+
+ let mut current = get_from_db(tx, ext_id)?.unwrap_or_default();
+
+ let mut changes = StorageChanges::with_capacity(val_map.len());
+
+ // iterate over the value we are adding/updating.
+ for (k, v) in val_map.into_iter() {
+ let old_value = current.remove(&k);
+ if current.len() >= SYNC_MAX_ITEMS {
+ return Err(ErrorKind::QuotaError(QuotaReason::MaxItems).into());
+ }
+ // Reading the chrome docs literally re the quota, the length of the key
+ // is just the string len, but the value is the json val, as bytes
+ if get_quota_size_of(&k, &v) > SYNC_QUOTA_BYTES_PER_ITEM {
+ return Err(ErrorKind::QuotaError(QuotaReason::ItemBytes).into());
+ }
+ let change = StorageValueChange {
+ key: k.clone(),
+ old_value,
+ new_value: Some(v.clone()),
+ };
+ changes.push(change);
+ current.insert(k, v);
+ }
+
+ save_to_db(
+ tx,
+ ext_id,
+ &StorageChangeOp::Set(JsonValue::Object(current)),
+ )?;
+ Ok(changes)
+}
+
+// A helper which takes a param indicating what keys should be returned and
+// converts that to a vec of real strings. Also returns "default" values to
+// be used if no item exists for that key.
+fn get_keys(keys: JsonValue) -> Vec<(String, Option<JsonValue>)> {
+ match keys {
+ JsonValue::String(s) => vec![(s, None)],
+ JsonValue::Array(keys) => {
+ // because nothing with json is ever simple, each key may not be
+ // a string. We ignore any which aren't.
+ keys.iter()
+ .filter_map(|v| v.as_str().map(|s| (s.to_string(), None)))
+ .collect()
+ }
+ JsonValue::Object(m) => m.into_iter().map(|(k, d)| (k, Some(d))).collect(),
+ _ => vec![],
+ }
+}
+
+/// The implementation of `storage[.sync].get()` - on success this always
+/// returns a Json object.
+pub fn get(conn: &Connection, ext_id: &str, keys: JsonValue) -> Result<JsonValue> {
+ // key is optional, or string or array of string or object keys
+ let maybe_existing = get_from_db(conn, ext_id)?;
+ let mut existing = match (maybe_existing, keys.is_object()) {
+ (None, true) => return Ok(keys),
+ (None, false) => return Ok(JsonValue::Object(Map::new())),
+ (Some(v), _) => v,
+ };
+ // take the quick path for null, where we just return the entire object.
+ if keys.is_null() {
+ return Ok(JsonValue::Object(existing));
+ }
+ // OK, so we need to build a list of keys to get.
+ let keys_and_defaults = get_keys(keys);
+ let mut result = Map::with_capacity(keys_and_defaults.len());
+ for (key, maybe_default) in keys_and_defaults {
+ if let Some(v) = existing.remove(&key) {
+ result.insert(key, v);
+ } else if let Some(def) = maybe_default {
+ result.insert(key, def);
+ }
+ // else |keys| is a string/array instead of an object with defaults.
+ // Don't include keys without default values.
+ }
+ Ok(JsonValue::Object(result))
+}
+
+/// The implementation of `storage[.sync].remove()`. On success this returns the
+/// StorageChanges defined by the chrome API - it's assumed the caller will
+/// arrange to deliver this to observers as defined in that API.
+pub fn remove(tx: &Transaction<'_>, ext_id: &str, keys: JsonValue) -> Result<StorageChanges> {
+ let mut existing = match get_from_db(tx, ext_id)? {
+ None => return Ok(StorageChanges::new()),
+ Some(v) => v,
+ };
+
+ // Note: get_keys parses strings, arrays and objects, but remove()
+ // is expected to only be passed a string or array of strings.
+ let keys_and_defs = get_keys(keys);
+
+ let mut result = StorageChanges::with_capacity(keys_and_defs.len());
+ for (key, _) in keys_and_defs {
+ if let Some(v) = existing.remove(&key) {
+ result.push(StorageValueChange {
+ key,
+ old_value: Some(v),
+ new_value: None,
+ });
+ }
+ }
+ if !result.is_empty() {
+ save_to_db(
+ tx,
+ ext_id,
+ &StorageChangeOp::SetWithoutQuota(JsonValue::Object(existing)),
+ )?;
+ }
+ Ok(result)
+}
+
+/// The implementation of `storage[.sync].clear()`. On success this returns the
+/// StorageChanges defined by the chrome API - it's assumed the caller will
+/// arrange to deliver this to observers as defined in that API.
+pub fn clear(tx: &Transaction<'_>, ext_id: &str) -> Result<StorageChanges> {
+ let existing = match get_from_db(tx, ext_id)? {
+ None => return Ok(StorageChanges::new()),
+ Some(v) => v,
+ };
+ let mut result = StorageChanges::with_capacity(existing.len());
+ for (key, val) in existing.into_iter() {
+ result.push(StorageValueChange {
+ key: key.to_string(),
+ new_value: None,
+ old_value: Some(val),
+ });
+ }
+ remove_from_db(tx, ext_id)?;
+ Ok(result)
+}
+
+/// The implementation of `storage[.sync].getBytesInUse()`.
+pub fn get_bytes_in_use(conn: &Connection, ext_id: &str, keys: JsonValue) -> Result<usize> {
+ let maybe_existing = get_from_db(conn, ext_id)?;
+ let existing = match maybe_existing {
+ None => return Ok(0),
+ Some(v) => v,
+ };
+ // Make an array of all the keys we we are going to count.
+ let keys: Vec<&str> = match &keys {
+ JsonValue::Null => existing.keys().map(|v| v.as_str()).collect(),
+ JsonValue::String(name) => vec![name.as_str()],
+ JsonValue::Array(names) => names.iter().filter_map(|v| v.as_str()).collect(),
+ // in the spirit of json-based APIs, silently ignore strange things.
+ _ => return Ok(0),
+ };
+ // We must use the same way of counting as our quota enforcement.
+ let mut size = 0;
+ for key in keys.into_iter() {
+ if let Some(v) = existing.get(key) {
+ size += get_quota_size_of(key, v);
+ }
+ }
+ Ok(size)
+}
+
+/// Information about the usage of a single extension.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct UsageInfo {
+ /// The extension id.
+ pub ext_id: String,
+ /// The number of keys the extension uses.
+ pub num_keys: usize,
+ /// The number of bytes used by the extension. This result is somewhat rough
+ /// -- it doesn't bother counting the size of the extension ID, or data in
+ /// the mirror, and favors returning the exact number of bytes used by the
+ /// column (that is, the size of the JSON object) rather than replicating
+ /// the `get_bytes_in_use` return value for all keys.
+ pub num_bytes: usize,
+}
+
+/// Exposes information about per-collection usage for the purpose of telemetry.
+/// (Doesn't map to an actual `chrome.storage.sync` API).
+pub fn usage(db: &Connection) -> Result<Vec<UsageInfo>> {
+ type JsonObject = Map<String, JsonValue>;
+ let sql = "
+ SELECT ext_id, data
+ FROM storage_sync_data
+ WHERE data IS NOT NULL
+ -- for tests and determinism
+ ORDER BY ext_id
+ ";
+ db.query_rows_into(sql, [], |row| {
+ let ext_id: String = row.get("ext_id")?;
+ let data: String = row.get("data")?;
+ let num_bytes = data.len();
+ let num_keys = serde_json::from_str::<JsonObject>(&data)?.len();
+ Ok(UsageInfo {
+ ext_id,
+ num_keys,
+ num_bytes,
+ })
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::db::test::new_mem_db;
+ use serde_json::json;
+
+ #[test]
+ fn test_serialize_storage_changes() -> Result<()> {
+ let c = StorageChanges {
+ changes: vec![StorageValueChange {
+ key: "key".to_string(),
+ old_value: Some(json!("old")),
+ new_value: None,
+ }],
+ };
+ assert_eq!(serde_json::to_string(&c)?, r#"{"key":{"oldValue":"old"}}"#);
+ let c = StorageChanges {
+ changes: vec![StorageValueChange {
+ key: "key".to_string(),
+ old_value: None,
+ new_value: Some(json!({"foo": "bar"})),
+ }],
+ };
+ assert_eq!(
+ serde_json::to_string(&c)?,
+ r#"{"key":{"newValue":{"foo":"bar"}}}"#
+ );
+ Ok(())
+ }
+
+ fn make_changes(changes: &[(&str, Option<JsonValue>, Option<JsonValue>)]) -> StorageChanges {
+ let mut r = StorageChanges::with_capacity(changes.len());
+ for (name, old_value, new_value) in changes {
+ r.push(StorageValueChange {
+ key: (*name).to_string(),
+ old_value: old_value.clone(),
+ new_value: new_value.clone(),
+ });
+ }
+ r
+ }
+
+ #[test]
+ fn test_simple() -> Result<()> {
+ let ext_id = "x";
+ let mut db = new_mem_db();
+ let tx = db.transaction()?;
+
+ // an empty store.
+ for q in vec![JsonValue::Null, json!("foo"), json!(["foo"])].into_iter() {
+ assert_eq!(get(&tx, ext_id, q)?, json!({}));
+ }
+
+ // Default values in an empty store.
+ for q in vec![json!({ "foo": null }), json!({"foo": "default"})].into_iter() {
+ assert_eq!(get(&tx, ext_id, q.clone())?, q.clone());
+ }
+
+ // Single item in the store.
+ set(&tx, ext_id, json!({"foo": "bar" }))?;
+ for q in vec![
+ JsonValue::Null,
+ json!("foo"),
+ json!(["foo"]),
+ json!({ "foo": null }),
+ json!({"foo": "default"}),
+ ]
+ .into_iter()
+ {
+ assert_eq!(get(&tx, ext_id, q)?, json!({"foo": "bar" }));
+ }
+
+ // Default values in a non-empty store.
+ for q in vec![
+ json!({ "non_existing_key": null }),
+ json!({"non_existing_key": 0}),
+ json!({"non_existing_key": false}),
+ json!({"non_existing_key": "default"}),
+ json!({"non_existing_key": ["array"]}),
+ json!({"non_existing_key": {"objectkey": "value"}}),
+ ]
+ .into_iter()
+ {
+ assert_eq!(get(&tx, ext_id, q.clone())?, q.clone());
+ }
+
+ // more complex stuff, including changes checking.
+ assert_eq!(
+ set(&tx, ext_id, json!({"foo": "new", "other": "also new" }))?,
+ make_changes(&[
+ ("foo", Some(json!("bar")), Some(json!("new"))),
+ ("other", None, Some(json!("also new")))
+ ])
+ );
+ assert_eq!(
+ get(&tx, ext_id, JsonValue::Null)?,
+ json!({"foo": "new", "other": "also new"})
+ );
+ assert_eq!(get(&tx, ext_id, json!("foo"))?, json!({"foo": "new"}));
+ assert_eq!(
+ get(&tx, ext_id, json!(["foo", "other"]))?,
+ json!({"foo": "new", "other": "also new"})
+ );
+ assert_eq!(
+ get(&tx, ext_id, json!({"foo": null, "default": "yo"}))?,
+ json!({"foo": "new", "default": "yo"})
+ );
+
+ assert_eq!(
+ remove(&tx, ext_id, json!("foo"))?,
+ make_changes(&[("foo", Some(json!("new")), None)]),
+ );
+
+ assert_eq!(
+ set(&tx, ext_id, json!({"foo": {"sub-object": "sub-value"}}))?,
+ make_changes(&[("foo", None, Some(json!({"sub-object": "sub-value"}))),])
+ );
+
+ // XXX - other variants.
+
+ assert_eq!(
+ clear(&tx, ext_id)?,
+ make_changes(&[
+ ("foo", Some(json!({"sub-object": "sub-value"})), None),
+ ("other", Some(json!("also new")), None),
+ ]),
+ );
+ assert_eq!(get(&tx, ext_id, JsonValue::Null)?, json!({}));
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_check_get_impl() -> Result<()> {
+ // This is a port of checkGetImpl in test_ext_storage.js in Desktop.
+ let ext_id = "x";
+ let mut db = new_mem_db();
+ let tx = db.transaction()?;
+
+ let prop = "test-prop";
+ let value = "test-value";
+
+ set(&tx, ext_id, json!({ prop: value }))?;
+
+ // this is the checkGetImpl part!
+ let mut data = get(&tx, ext_id, json!(null))?;
+ assert_eq!(value, json!(data[prop]), "null getter worked for {}", prop);
+
+ data = get(&tx, ext_id, json!(prop))?;
+ assert_eq!(
+ value,
+ json!(data[prop]),
+ "string getter worked for {}",
+ prop
+ );
+ assert_eq!(
+ data.as_object().unwrap().len(),
+ 1,
+ "string getter should return an object with a single property"
+ );
+
+ data = get(&tx, ext_id, json!([prop]))?;
+ assert_eq!(value, json!(data[prop]), "array getter worked for {}", prop);
+ assert_eq!(
+ data.as_object().unwrap().len(),
+ 1,
+ "array getter with a single key should return an object with a single property"
+ );
+
+ // checkGetImpl() uses `{ [prop]: undefined }` - but json!() can't do that :(
+ // Hopefully it's just testing a simple object, so we use `{ prop: null }`
+ data = get(&tx, ext_id, json!({ prop: null }))?;
+ assert_eq!(
+ value,
+ json!(data[prop]),
+ "object getter worked for {}",
+ prop
+ );
+ assert_eq!(
+ data.as_object().unwrap().len(),
+ 1,
+ "object getter with a single key should return an object with a single property"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_bug_1621162() -> Result<()> {
+ // apparently Firefox, unlike Chrome, will not optimize the changes.
+ // See bug 1621162 for more!
+ let mut db = new_mem_db();
+ let tx = db.transaction()?;
+ let ext_id = "xyz";
+
+ set(&tx, ext_id, json!({"foo": "bar" }))?;
+
+ assert_eq!(
+ set(&tx, ext_id, json!({"foo": "bar" }))?,
+ make_changes(&[("foo", Some(json!("bar")), Some(json!("bar")))]),
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_quota_maxitems() -> Result<()> {
+ let mut db = new_mem_db();
+ let tx = db.transaction()?;
+ let ext_id = "xyz";
+ for i in 1..SYNC_MAX_ITEMS + 1 {
+ set(
+ &tx,
+ ext_id,
+ json!({ format!("key-{}", i): format!("value-{}", i) }),
+ )?;
+ }
+ let e = set(&tx, ext_id, json!({"another": "another"})).unwrap_err();
+ match e.kind() {
+ ErrorKind::QuotaError(QuotaReason::MaxItems) => {}
+ _ => panic!("unexpected error type"),
+ };
+ Ok(())
+ }
+
+ #[test]
+ fn test_quota_bytesperitem() -> Result<()> {
+ let mut db = new_mem_db();
+ let tx = db.transaction()?;
+ let ext_id = "xyz";
+ // A string 5 bytes less than the max. This should be counted as being
+ // 3 bytes less than the max as the quotes are counted. Plus the length
+ // of the key (no quotes) means we should come in 2 bytes under.
+ let val = "x".repeat(SYNC_QUOTA_BYTES_PER_ITEM - 5);
+
+ // Key length doesn't push it over.
+ set(&tx, ext_id, json!({ "x": val }))?;
+ assert_eq!(
+ get_bytes_in_use(&tx, ext_id, json!("x"))?,
+ SYNC_QUOTA_BYTES_PER_ITEM - 2
+ );
+
+ // Key length does push it over.
+ let e = set(&tx, ext_id, json!({ "xxxx": val })).unwrap_err();
+ match e.kind() {
+ ErrorKind::QuotaError(QuotaReason::ItemBytes) => {}
+ _ => panic!("unexpected error type"),
+ };
+ Ok(())
+ }
+
+ #[test]
+ fn test_quota_bytes() -> Result<()> {
+ let mut db = new_mem_db();
+ let tx = db.transaction()?;
+ let ext_id = "xyz";
+ let val = "x".repeat(SYNC_QUOTA_BYTES + 1);
+
+ // Init an over quota db with a single key.
+ save_to_db(
+ &tx,
+ ext_id,
+ &StorageChangeOp::SetWithoutQuota(json!({ "x": val })),
+ )?;
+
+ // Adding more data fails.
+ let e = set(&tx, ext_id, json!({ "y": "newvalue" })).unwrap_err();
+ match e.kind() {
+ ErrorKind::QuotaError(QuotaReason::TotalBytes) => {}
+ _ => panic!("unexpected error type"),
+ };
+
+ // Remove data does not fails.
+ remove(&tx, ext_id, json!["x"])?;
+
+ // Restore the over quota data.
+ save_to_db(
+ &tx,
+ ext_id,
+ &StorageChangeOp::SetWithoutQuota(json!({ "y": val })),
+ )?;
+
+ // Overwrite with less data does not fail.
+ set(&tx, ext_id, json!({ "y": "lessdata" }))?;
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_get_bytes_in_use() -> Result<()> {
+ let mut db = new_mem_db();
+ let tx = db.transaction()?;
+ let ext_id = "xyz";
+
+ assert_eq!(get_bytes_in_use(&tx, ext_id, json!(null))?, 0);
+
+ set(&tx, ext_id, json!({ "a": "a" }))?; // should be 4
+ set(&tx, ext_id, json!({ "b": "bb" }))?; // should be 5
+ set(&tx, ext_id, json!({ "c": "ccc" }))?; // should be 6
+ set(&tx, ext_id, json!({ "n": 999_999 }))?; // should be 7
+
+ assert_eq!(get_bytes_in_use(&tx, ext_id, json!("x"))?, 0);
+ assert_eq!(get_bytes_in_use(&tx, ext_id, json!("a"))?, 4);
+ assert_eq!(get_bytes_in_use(&tx, ext_id, json!("b"))?, 5);
+ assert_eq!(get_bytes_in_use(&tx, ext_id, json!("c"))?, 6);
+ assert_eq!(get_bytes_in_use(&tx, ext_id, json!("n"))?, 7);
+
+ assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a"]))?, 4);
+ assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a", "x"]))?, 4);
+ assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a", "b"]))?, 9);
+ assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a", "c"]))?, 10);
+
+ assert_eq!(
+ get_bytes_in_use(&tx, ext_id, json!(["a", "b", "c", "n"]))?,
+ 22
+ );
+ assert_eq!(get_bytes_in_use(&tx, ext_id, json!(null))?, 22);
+ Ok(())
+ }
+
+ #[test]
+ fn test_usage() {
+ let mut db = new_mem_db();
+ let tx = db.transaction().unwrap();
+ // '{"a":"a","b":"bb","c":"ccc","n":999999}': 39 bytes
+ set(&tx, "xyz", json!({ "a": "a" })).unwrap();
+ set(&tx, "xyz", json!({ "b": "bb" })).unwrap();
+ set(&tx, "xyz", json!({ "c": "ccc" })).unwrap();
+ set(&tx, "xyz", json!({ "n": 999_999 })).unwrap();
+
+ // '{"a":"a"}': 9 bytes
+ set(&tx, "abc", json!({ "a": "a" })).unwrap();
+
+ tx.commit().unwrap();
+
+ let usage = usage(&db).unwrap();
+ let expect = [
+ UsageInfo {
+ ext_id: "abc".to_string(),
+ num_keys: 1,
+ num_bytes: 9,
+ },
+ UsageInfo {
+ ext_id: "xyz".to_string(),
+ num_keys: 4,
+ num_bytes: 39,
+ },
+ ];
+ assert_eq!(&usage, &expect);
+ }
+}
diff --git a/third_party/rust/webext-storage/src/db.rs b/third_party/rust/webext-storage/src/db.rs
new file mode 100644
index 0000000000..c88dc99959
--- /dev/null
+++ b/third_party/rust/webext-storage/src/db.rs
@@ -0,0 +1,301 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::error::*;
+use crate::schema;
+use interrupt_support::{SqlInterruptHandle, SqlInterruptScope};
+use parking_lot::Mutex;
+use rusqlite::types::{FromSql, ToSql};
+use rusqlite::Connection;
+use rusqlite::OpenFlags;
+use sql_support::open_database::open_database_with_flags;
+use sql_support::ConnExt;
+use std::ops::{Deref, DerefMut};
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+use url::Url;
+
+/// A `StorageDb` wraps a read-write SQLite connection, and handles schema
+/// migrations and recovering from database file corruption. It can be used
+/// anywhere a `rusqlite::Connection` is expected, thanks to its `Deref{Mut}`
+/// implementations.
+///
+/// We only support a single writer connection - so that's the only thing we
+/// store. It's still a bit overkill, but there's only so many yaks in a day.
+pub struct StorageDb {
+ writer: Connection,
+ interrupt_handle: Arc<SqlInterruptHandle>,
+}
+impl StorageDb {
+ /// Create a new, or fetch an already open, StorageDb backed by a file on disk.
+ pub fn new(db_path: impl AsRef<Path>) -> Result<Self> {
+ let db_path = normalize_path(db_path)?;
+ Self::new_named(db_path)
+ }
+
+ /// Create a new, or fetch an already open, memory-based StorageDb. You must
+ /// provide a name, but you are still able to have a single writer and many
+ /// reader connections to the same memory DB open.
+ #[cfg(test)]
+ pub fn new_memory(db_path: &str) -> Result<Self> {
+ let name = PathBuf::from(format!("file:{}?mode=memory&cache=shared", db_path));
+ Self::new_named(name)
+ }
+
+ fn new_named(db_path: PathBuf) -> Result<Self> {
+ // We always create the read-write connection for an initial open so
+ // we can create the schema and/or do version upgrades.
+ let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
+ | OpenFlags::SQLITE_OPEN_URI
+ | OpenFlags::SQLITE_OPEN_CREATE
+ | OpenFlags::SQLITE_OPEN_READ_WRITE;
+
+ let conn = open_database_with_flags(db_path, flags, &schema::WebExtMigrationLogin)?;
+ Ok(Self {
+ interrupt_handle: Arc::new(SqlInterruptHandle::new(&conn)),
+ writer: conn,
+ })
+ }
+
+ pub fn interrupt_handle(&self) -> Arc<SqlInterruptHandle> {
+ Arc::clone(&self.interrupt_handle)
+ }
+
+ #[allow(dead_code)]
+ pub fn begin_interrupt_scope(&self) -> Result<SqlInterruptScope> {
+ Ok(self.interrupt_handle.begin_interrupt_scope()?)
+ }
+
+ /// Closes the database connection. If there are any unfinalized prepared
+ /// statements on the connection, `close` will fail and the `StorageDb` will
+ /// remain open and the connection will be leaked - we used to return the
+ /// underlying connection so the caller can retry but (a) that's very tricky
+ /// in an Arc<Mutex<>> world and (b) we never actually took advantage of
+ /// that retry capability.
+ pub fn close(self) -> Result<()> {
+ self.writer.close().map_err(|(writer, err)| {
+ // In rusqlite 0.28.0 and earlier, if we just let `writer` drop,
+ // the close would panic on failure.
+ // Later rusqlite versions will not panic, but this behavior doesn't
+ // hurt there.
+ std::mem::forget(writer);
+ err.into()
+ })
+ }
+}
+
+impl Deref for StorageDb {
+ type Target = Connection;
+
+ fn deref(&self) -> &Self::Target {
+ &self.writer
+ }
+}
+
+impl DerefMut for StorageDb {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.writer
+ }
+}
+
+// We almost exclusively use this ThreadSafeStorageDb
+pub struct ThreadSafeStorageDb {
+ db: Mutex<StorageDb>,
+ // This "outer" interrupt_handle not protected by the mutex means
+ // consumers can interrupt us when the mutex is held - which it always will
+ // be if we are doing anything interruptable!
+ interrupt_handle: Arc<SqlInterruptHandle>,
+}
+
+impl ThreadSafeStorageDb {
+ pub fn new(db: StorageDb) -> Self {
+ Self {
+ interrupt_handle: db.interrupt_handle(),
+ db: Mutex::new(db),
+ }
+ }
+
+ pub fn interrupt_handle(&self) -> Arc<SqlInterruptHandle> {
+ Arc::clone(&self.interrupt_handle)
+ }
+
+ pub fn begin_interrupt_scope(&self) -> Result<SqlInterruptScope> {
+ Ok(self.interrupt_handle.begin_interrupt_scope()?)
+ }
+
+ pub fn into_inner(self) -> StorageDb {
+ self.db.into_inner()
+ }
+}
+
+// Deref to a Mutex<StorageDb>, which is how we will use ThreadSafeStorageDb most of the time
+impl Deref for ThreadSafeStorageDb {
+ type Target = Mutex<StorageDb>;
+
+ #[inline]
+ fn deref(&self) -> &Mutex<StorageDb> {
+ &self.db
+ }
+}
+
+// Also implement AsRef<SqlInterruptHandle> so that we can interrupt this at shutdown
+impl AsRef<SqlInterruptHandle> for ThreadSafeStorageDb {
+ fn as_ref(&self) -> &SqlInterruptHandle {
+ &self.interrupt_handle
+ }
+}
+
+pub(crate) mod sql_fns {
+ use rusqlite::{functions::Context, Result};
+ use sync_guid::Guid as SyncGuid;
+
+ #[inline(never)]
+ pub fn generate_guid(_ctx: &Context<'_>) -> Result<SyncGuid> {
+ Ok(SyncGuid::random())
+ }
+}
+
+// These should be somewhere else...
+pub fn put_meta(db: &Connection, key: &str, value: &dyn ToSql) -> Result<()> {
+ db.conn().execute_cached(
+ "REPLACE INTO meta (key, value) VALUES (:key, :value)",
+ rusqlite::named_params! { ":key": key, ":value": value },
+ )?;
+ Ok(())
+}
+
+pub fn get_meta<T: FromSql>(db: &Connection, key: &str) -> Result<Option<T>> {
+ let res = db.conn().try_query_one(
+ "SELECT value FROM meta WHERE key = :key",
+ &[(":key", &key)],
+ true,
+ )?;
+ Ok(res)
+}
+
+pub fn delete_meta(db: &Connection, key: &str) -> Result<()> {
+ db.conn()
+ .execute_cached("DELETE FROM meta WHERE key = :key", &[(":key", &key)])?;
+ Ok(())
+}
+
+// Utilities for working with paths.
+// (From places_utils - ideally these would be shared, but the use of
+// ErrorKind values makes that non-trivial.
+
+/// `Path` is basically just a `str` with no validation, and so in practice it
+/// could contain a file URL. Rusqlite takes advantage of this a bit, and says
+/// `AsRef<Path>` but really means "anything sqlite can take as an argument".
+///
+/// Swift loves using file urls (the only support it has for file manipulation
+/// is through file urls), so it's handy to support them if possible.
+fn unurl_path(p: impl AsRef<Path>) -> PathBuf {
+ p.as_ref()
+ .to_str()
+ .and_then(|s| Url::parse(s).ok())
+ .and_then(|u| {
+ if u.scheme() == "file" {
+ u.to_file_path().ok()
+ } else {
+ None
+ }
+ })
+ .unwrap_or_else(|| p.as_ref().to_owned())
+}
+
+/// If `p` is a file URL, return it, otherwise try and make it one.
+///
+/// Errors if `p` is a relative non-url path, or if it's a URL path
+/// that's isn't a `file:` URL.
+#[allow(dead_code)]
+pub fn ensure_url_path(p: impl AsRef<Path>) -> Result<Url> {
+ if let Some(u) = p.as_ref().to_str().and_then(|s| Url::parse(s).ok()) {
+ if u.scheme() == "file" {
+ Ok(u)
+ } else {
+ Err(ErrorKind::IllegalDatabasePath(p.as_ref().to_owned()).into())
+ }
+ } else {
+ let p = p.as_ref();
+ let u = Url::from_file_path(p).map_err(|_| ErrorKind::IllegalDatabasePath(p.to_owned()))?;
+ Ok(u)
+ }
+}
+
+/// As best as possible, convert `p` into an absolute path, resolving
+/// all symlinks along the way.
+///
+/// If `p` is a file url, it's converted to a path before this.
+fn normalize_path(p: impl AsRef<Path>) -> Result<PathBuf> {
+ let path = unurl_path(p);
+ if let Ok(canonical) = path.canonicalize() {
+ return Ok(canonical);
+ }
+ // It probably doesn't exist yet. This is an error, although it seems to
+ // work on some systems.
+ //
+ // We resolve this by trying to canonicalize the parent directory, and
+ // appending the requested file name onto that. If we can't canonicalize
+ // the parent, we return an error.
+ //
+ // Also, we return errors if the path ends in "..", if there is no
+ // parent directory, etc.
+ let file_name = path
+ .file_name()
+ .ok_or_else(|| ErrorKind::IllegalDatabasePath(path.clone()))?;
+
+ let parent = path
+ .parent()
+ .ok_or_else(|| ErrorKind::IllegalDatabasePath(path.clone()))?;
+
+ let mut canonical = parent.canonicalize()?;
+ canonical.push(file_name);
+ Ok(canonical)
+}
+
+// Helpers for tests
+#[cfg(test)]
+pub mod test {
+ use super::*;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+
+ // A helper for our tests to get their own memory Api.
+ static ATOMIC_COUNTER: AtomicUsize = AtomicUsize::new(0);
+
+ pub fn new_mem_db() -> StorageDb {
+ let _ = env_logger::try_init();
+ let counter = ATOMIC_COUNTER.fetch_add(1, Ordering::Relaxed);
+ StorageDb::new_memory(&format!("test-api-{}", counter)).expect("should get an API")
+ }
+
+ pub fn new_mem_thread_safe_storage_db() -> Arc<ThreadSafeStorageDb> {
+ Arc::new(ThreadSafeStorageDb::new(new_mem_db()))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::test::*;
+ use super::*;
+
+ // Sanity check that we can create a database.
+ #[test]
+ fn test_open() {
+ new_mem_db();
+ // XXX - should we check anything else? Seems a bit pointless, but if
+ // we move the meta functions away from here then it's better than
+ // nothing.
+ }
+
+ #[test]
+ fn test_meta() -> Result<()> {
+ let writer = new_mem_db();
+ assert_eq!(get_meta::<String>(&writer, "foo")?, None);
+ put_meta(&writer, "foo", &"bar".to_string())?;
+ assert_eq!(get_meta(&writer, "foo")?, Some("bar".to_string()));
+ delete_meta(&writer, "foo")?;
+ assert_eq!(get_meta::<String>(&writer, "foo")?, None);
+ Ok(())
+ }
+}
diff --git a/third_party/rust/webext-storage/src/error.rs b/third_party/rust/webext-storage/src/error.rs
new file mode 100644
index 0000000000..1a2e650cd0
--- /dev/null
+++ b/third_party/rust/webext-storage/src/error.rs
@@ -0,0 +1,72 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use interrupt_support::Interrupted;
+
+#[derive(Debug)]
+pub enum QuotaReason {
+ TotalBytes,
+ ItemBytes,
+ MaxItems,
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum ErrorKind {
+ #[error("Quota exceeded: {0:?}")]
+ QuotaError(QuotaReason),
+
+ #[error("Error parsing JSON data: {0}")]
+ JsonError(#[from] serde_json::Error),
+
+ #[error("Error executing SQL: {0}")]
+ SqlError(#[from] rusqlite::Error),
+
+ #[error("A connection of this type is already open")]
+ ConnectionAlreadyOpen,
+
+ #[error("An invalid connection type was specified")]
+ InvalidConnectionType,
+
+ #[error("IO error: {0}")]
+ IoError(#[from] std::io::Error),
+
+ #[error("Operation interrupted")]
+ InterruptedError(#[from] Interrupted),
+
+ #[error("Tried to close connection on wrong StorageApi instance")]
+ WrongApiForClose,
+
+ // This will happen if you provide something absurd like
+ // "/" or "" as your database path. For more subtley broken paths,
+ // we'll likely return an IoError.
+ #[error("Illegal database path: {0:?}")]
+ IllegalDatabasePath(std::path::PathBuf),
+
+ #[error("UTF8 Error: {0}")]
+ Utf8Error(#[from] std::str::Utf8Error),
+
+ #[error("Error opening database: {0}")]
+ OpenDatabaseError(#[from] sql_support::open_database::Error),
+
+ // When trying to close a connection but we aren't the exclusive owner of the containing Arc<>
+ #[error("Other shared references to this connection are alive")]
+ OtherConnectionReferencesExist,
+
+ #[error("The storage database has been closed")]
+ DatabaseConnectionClosed,
+
+ #[error("Sync Error: {0}")]
+ SyncError(String),
+}
+
+error_support::define_error! {
+ ErrorKind {
+ (JsonError, serde_json::Error),
+ (SqlError, rusqlite::Error),
+ (IoError, std::io::Error),
+ (InterruptedError, Interrupted),
+ (Utf8Error, std::str::Utf8Error),
+ (OpenDatabaseError, sql_support::open_database::Error),
+ }
+}
diff --git a/third_party/rust/webext-storage/src/ffi.rs b/third_party/rust/webext-storage/src/ffi.rs
new file mode 100644
index 0000000000..dc4af8ed6e
--- /dev/null
+++ b/third_party/rust/webext-storage/src/ffi.rs
@@ -0,0 +1,48 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/// This module contains glue for the FFI, including error codes and a
+/// `From<Error>` implementation for `ffi_support::ExternError`. These must be
+/// implemented in this crate, and not `webext_storage_ffi`, because of Rust's
+/// orphan rule for implementing traits (`webext_storage_ffi` can't implement
+/// a trait in `ffi_support` for a type in `webext_storage`).
+use ffi_support::{ErrorCode, ExternError};
+
+use crate::error::{Error, ErrorKind, QuotaReason};
+
+mod error_codes {
+ /// An unexpected error occurred which likely cannot be meaningfully handled
+ /// by the application.
+ pub const UNEXPECTED: i32 = 1;
+
+ /// The application passed an invalid JSON string for a storage key or value.
+ pub const INVALID_JSON: i32 = 2;
+
+ /// The total number of bytes stored in the database for this extension,
+ /// counting all key-value pairs serialized to JSON, exceeds the allowed limit.
+ pub const QUOTA_TOTAL_BYTES_EXCEEDED: i32 = 32;
+
+ /// A single key-value pair exceeds the allowed byte limit when serialized
+ /// to JSON.
+ pub const QUOTA_ITEM_BYTES_EXCEEDED: i32 = 32 + 1;
+
+ /// The total number of key-value pairs stored for this extension exceeded the
+ /// allowed limit.
+ pub const QUOTA_MAX_ITEMS_EXCEEDED: i32 = 32 + 2;
+}
+
+impl From<Error> for ExternError {
+ fn from(err: Error) -> ExternError {
+ let code = ErrorCode::new(match err.kind() {
+ ErrorKind::JsonError(_) => error_codes::INVALID_JSON,
+ ErrorKind::QuotaError(QuotaReason::TotalBytes) => {
+ error_codes::QUOTA_TOTAL_BYTES_EXCEEDED
+ }
+ ErrorKind::QuotaError(QuotaReason::ItemBytes) => error_codes::QUOTA_ITEM_BYTES_EXCEEDED,
+ ErrorKind::QuotaError(QuotaReason::MaxItems) => error_codes::QUOTA_MAX_ITEMS_EXCEEDED,
+ _ => error_codes::UNEXPECTED,
+ });
+ ExternError::new_error(code, err.to_string())
+ }
+}
diff --git a/third_party/rust/webext-storage/src/lib.rs b/third_party/rust/webext-storage/src/lib.rs
new file mode 100644
index 0000000000..2e2554781d
--- /dev/null
+++ b/third_party/rust/webext-storage/src/lib.rs
@@ -0,0 +1,26 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#![allow(unknown_lints)]
+#![warn(rust_2018_idioms)]
+
+mod api;
+mod db;
+pub mod error;
+mod ffi;
+mod migration;
+mod schema;
+pub mod store;
+mod sync;
+
+pub use migration::MigrationInfo;
+
+// We publish some constants from non-public modules.
+pub use sync::STORAGE_VERSION;
+
+pub use api::SYNC_MAX_ITEMS;
+pub use api::SYNC_QUOTA_BYTES;
+pub use api::SYNC_QUOTA_BYTES_PER_ITEM;
+
+pub use api::UsageInfo;
diff --git a/third_party/rust/webext-storage/src/migration.rs b/third_party/rust/webext-storage/src/migration.rs
new file mode 100644
index 0000000000..27f230ef47
--- /dev/null
+++ b/third_party/rust/webext-storage/src/migration.rs
@@ -0,0 +1,454 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::error::*;
+use rusqlite::{named_params, Connection, OpenFlags, Transaction};
+use serde_json::{Map, Value};
+use sql_support::ConnExt;
+use std::collections::HashSet;
+use std::path::Path;
+
+// Simple migration from the "old" kinto-with-sqlite-backing implementation
+// to ours.
+// Could almost be trivially done in JS using the regular public API if not
+// for:
+// * We don't want to enforce the same quotas when migrating.
+// * We'd rather do the entire migration in a single transaction for perf
+// reasons.
+
+// The sqlite database we migrate from has a very simple structure:
+// * table collection_data with columns collection_name, record_id and record
+// * `collection_name` is a string of form "default/{extension_id}"
+// * `record_id` is `key-{key}`
+// * `record` is a string with json, of form: {
+// id: {the record id repeated},
+// key: {the key},
+// data: {the actual data},
+// _status: {sync status},
+// last_modified: {timestamp},
+// }
+// So the info we need is stored somewhat redundantly.
+// Further:
+// * There's a special collection_name "default/storage-sync-crypto" that
+// we don't want to migrate. Its record_id is 'keys' and its json has no
+// `data`
+
+// Note we don't enforce a quota - we migrate everything - even if this means
+// it's too big for the server to store. This is a policy decision - it's better
+// to not lose data than to try and work out what data can be disposed of, as
+// the addon has the ability to determine this.
+
+// Our error strategy is "ignore read errors, propagate write errors" under the
+// assumption that the former tends to mean a damaged DB or file-system and is
+// unlikely to work if we try later (eg, replacing the disk isn't likely to
+// uncorrupt the DB), where the latter is likely to be disk-space or file-system
+// error, but retry might work (eg, replacing the disk then trying again might
+// make the writes work)
+
+// The struct we read from the DB.
+struct LegacyRow {
+ col_name: String, // collection_name column
+ record: String, // record column
+}
+
+impl LegacyRow {
+ // Parse the 2 columns from the DB into the data we need to insert into
+ // our target database.
+ fn parse(&self) -> Option<Parsed<'_>> {
+ if self.col_name.len() < 8 {
+ log::trace!("collection_name of '{}' is too short", self.col_name);
+ return None;
+ }
+ if &self.col_name[..8] != "default/" {
+ log::trace!("collection_name of '{}' isn't ours", self.col_name);
+ return None;
+ }
+ let ext_id = &self.col_name[8..];
+ let mut record_map = match serde_json::from_str(&self.record) {
+ Ok(Value::Object(m)) => m,
+ Ok(o) => {
+ log::info!("skipping non-json-object 'record' column");
+ log::trace!("record value is json, but not an object: {}", o);
+ return None;
+ }
+ Err(e) => {
+ log::info!("skipping non-json 'record' column");
+ log::trace!("record value isn't json: {}", e);
+ return None;
+ }
+ };
+
+ let key = match record_map.remove("key") {
+ Some(Value::String(s)) if !s.is_empty() => s,
+ Some(o) => {
+ log::trace!("key is json but not a string: {}", o);
+ return None;
+ }
+ _ => {
+ log::trace!("key doesn't exist in the map");
+ return None;
+ }
+ };
+ let data = match record_map.remove("data") {
+ Some(d) => d,
+ _ => {
+ log::trace!("data doesn't exist in the map");
+ return None;
+ }
+ };
+ Some(Parsed { ext_id, key, data })
+ }
+}
+
+// The info we parse from the raw DB strings.
+struct Parsed<'a> {
+ ext_id: &'a str,
+ key: String,
+ data: serde_json::Value,
+}
+
+pub fn migrate(tx: &Transaction<'_>, filename: &Path) -> Result<MigrationInfo> {
+ // We do the grouping manually, collecting string values as we go.
+ let mut last_ext_id = "".to_string();
+ let mut curr_values: Vec<(String, serde_json::Value)> = Vec::new();
+ let (rows, mut mi) = read_rows(filename);
+ for row in rows {
+ log::trace!("processing '{}' - '{}'", row.col_name, row.record);
+ let parsed = match row.parse() {
+ Some(p) => p,
+ None => continue,
+ };
+ // Do our "grouping"
+ if parsed.ext_id != last_ext_id {
+ if !last_ext_id.is_empty() && !curr_values.is_empty() {
+ // a different extension id - write what we have to the DB.
+ let entries = do_insert(tx, &last_ext_id, curr_values)?;
+ mi.extensions_successful += 1;
+ mi.entries_successful += entries;
+ }
+ last_ext_id = parsed.ext_id.to_string();
+ curr_values = Vec::new();
+ }
+ // no 'else' here - must also enter this block on ext_id change.
+ if parsed.ext_id == last_ext_id {
+ curr_values.push((parsed.key.to_string(), parsed.data));
+ log::trace!(
+ "extension {} now has {} keys",
+ parsed.ext_id,
+ curr_values.len()
+ );
+ }
+ }
+ // and the last one
+ if !last_ext_id.is_empty() && !curr_values.is_empty() {
+ // a different extension id - write what we have to the DB.
+ let entries = do_insert(tx, &last_ext_id, curr_values)?;
+ mi.extensions_successful += 1;
+ mi.entries_successful += entries;
+ }
+ log::info!("migrated {} extensions: {:?}", mi.extensions_successful, mi);
+ Ok(mi)
+}
+
+fn read_rows(filename: &Path) -> (Vec<LegacyRow>, MigrationInfo) {
+ let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX | OpenFlags::SQLITE_OPEN_READ_ONLY;
+ let src_conn = match Connection::open_with_flags(filename, flags) {
+ Ok(conn) => conn,
+ Err(e) => {
+ log::warn!("Failed to open the source DB: {}", e);
+ return (Vec::new(), MigrationInfo::open_failure());
+ }
+ };
+ // Failure to prepare the statement probably just means the source DB is
+ // damaged.
+ let mut stmt = match src_conn.prepare(
+ "SELECT collection_name, record FROM collection_data
+ WHERE collection_name != 'default/storage-sync-crypto'
+ ORDER BY collection_name",
+ ) {
+ Ok(stmt) => stmt,
+ Err(e) => {
+ log::warn!("Failed to prepare the statement: {}", e);
+ return (Vec::new(), MigrationInfo::open_failure());
+ }
+ };
+ let rows = match stmt.query_and_then([], |row| -> Result<LegacyRow> {
+ Ok(LegacyRow {
+ col_name: row.get(0)?,
+ record: row.get(1)?,
+ })
+ }) {
+ Ok(r) => r,
+ Err(e) => {
+ log::warn!("Failed to read any rows from the source DB: {}", e);
+ return (Vec::new(), MigrationInfo::open_failure());
+ }
+ };
+ let all_rows: Vec<Result<LegacyRow>> = rows.collect();
+ let entries = all_rows.len();
+ let successful_rows: Vec<LegacyRow> = all_rows.into_iter().filter_map(Result::ok).collect();
+ let distinct_extensions: HashSet<_> = successful_rows.iter().map(|c| &c.col_name).collect();
+
+ let mi = MigrationInfo {
+ entries,
+ extensions: distinct_extensions.len(),
+ // Populated later.
+ extensions_successful: 0,
+ entries_successful: 0,
+ open_failure: false,
+ };
+
+ (successful_rows, mi)
+}
+
+/// Insert the extension and values. If there are multiple values with the same
+/// key (which shouldn't be possible but who knows, database corruption causes
+/// strange things), chooses an arbitrary one. Returns the number of entries
+/// inserted, which could be different from `vals.len()` if multiple entries in
+/// `vals` have the same key.
+fn do_insert(tx: &Transaction<'_>, ext_id: &str, vals: Vec<(String, Value)>) -> Result<usize> {
+ let mut map = Map::with_capacity(vals.len());
+ for (key, val) in vals {
+ map.insert(key, val);
+ }
+ let num_entries = map.len();
+ tx.execute_cached(
+ "INSERT OR REPLACE INTO storage_sync_data(ext_id, data, sync_change_counter)
+ VALUES (:ext_id, :data, 1)",
+ rusqlite::named_params! {
+ ":ext_id": &ext_id,
+ ":data": &Value::Object(map),
+ },
+ )?;
+ Ok(num_entries)
+}
+
+#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
+pub struct MigrationInfo {
+ /// The number of entries (rows in the original table) we attempted to
+ /// migrate. Zero if there was some error in computing this number.
+ ///
+ /// Note that for the original table, a single row stores a single
+ /// preference for one extension. That is, if you view the set of
+ /// preferences for a given extension as a HashMap (as we do), it would be a
+ /// single entry/key-value-pair in the map.
+ pub entries: usize,
+ /// The number of records we successfully migrated (equal to `entries` for
+ /// entirely successful migrations).
+ pub entries_successful: usize,
+ /// The number of extensions (distinct extension ids) in the original
+ /// table.
+ pub extensions: usize,
+ /// The number of extensions we successfully migrated
+ pub extensions_successful: usize,
+ /// True iff we failed to open the source DB at all.
+ pub open_failure: bool,
+}
+
+impl MigrationInfo {
+ /// Returns a MigrationInfo indicating that we failed to read any rows due
+ /// to some error case (e.g. the database open failed, or some other very
+ /// early read error).
+ fn open_failure() -> Self {
+ Self {
+ open_failure: true,
+ ..Self::default()
+ }
+ }
+
+ const META_KEY: &'static str = "migration_info";
+
+ /// Store `self` in the provided database under `Self::META_KEY`.
+ pub(crate) fn store(&self, conn: &Connection) -> Result<()> {
+ let json = serde_json::to_string(self)?;
+ conn.execute(
+ "INSERT OR REPLACE INTO meta(key, value) VALUES (:k, :v)",
+ named_params! {
+ ":k": Self::META_KEY,
+ ":v": &json
+ },
+ )?;
+ Ok(())
+ }
+
+ /// Get the MigrationInfo stored under `Self::META_KEY` (if any) out of the
+ /// DB, and delete it.
+ pub(crate) fn take(tx: &Transaction<'_>) -> Result<Option<Self>> {
+ let s = tx.try_query_one::<String, _>(
+ "SELECT value FROM meta WHERE key = :k",
+ named_params! {
+ ":k": Self::META_KEY,
+ },
+ false,
+ )?;
+ tx.execute(
+ "DELETE FROM meta WHERE key = :k",
+ named_params! {
+ ":k": Self::META_KEY,
+ },
+ )?;
+ if let Some(s) = s {
+ match serde_json::from_str(&s) {
+ Ok(v) => Ok(Some(v)),
+ Err(e) => {
+ // Force test failure, but just log an error otherwise so that
+ // we commit the transaction that wil.
+ debug_assert!(false, "Failed to read migration JSON: {:?}", e);
+ error_support::report_error!(
+ "webext-storage-migration-json",
+ "Failed to read migration JSON: {}",
+ e
+ );
+ Ok(None)
+ }
+ }
+ } else {
+ Ok(None)
+ }
+ }
+}
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::api;
+ use crate::db::{test::new_mem_db, StorageDb};
+ use serde_json::json;
+ use tempfile::tempdir;
+
+ fn init_source_db(path: impl AsRef<Path>, f: impl FnOnce(&Connection)) {
+ let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
+ | OpenFlags::SQLITE_OPEN_CREATE
+ | OpenFlags::SQLITE_OPEN_READ_WRITE;
+ let mut conn = Connection::open_with_flags(path, flags).expect("open should work");
+ let tx = conn.transaction().expect("should be able to get a tx");
+ tx.execute_batch(
+ "CREATE TABLE collection_data (
+ collection_name TEXT,
+ record_id TEXT,
+ record TEXT
+ );",
+ )
+ .expect("create should work");
+ f(&tx);
+ tx.commit().expect("should commit");
+ conn.close().expect("close should work");
+ }
+
+ // Create a test database, populate it via the callback, migrate it, and
+ // return a connection to the new, migrated DB for further checking.
+ fn do_migrate<F>(expect_mi: MigrationInfo, f: F) -> StorageDb
+ where
+ F: FnOnce(&Connection),
+ {
+ let tmpdir = tempdir().unwrap();
+ let path = tmpdir.path().join("source.db");
+ init_source_db(path, f);
+
+ // now migrate
+ let mut db = new_mem_db();
+ let tx = db.transaction().expect("tx should work");
+
+ let mi = migrate(&tx, &tmpdir.path().join("source.db")).expect("migrate should work");
+ tx.commit().expect("should work");
+ assert_eq!(mi, expect_mi);
+ db
+ }
+
+ fn assert_has(c: &Connection, ext_id: &str, expect: Value) {
+ assert_eq!(
+ api::get(c, ext_id, json!(null)).expect("should get"),
+ expect
+ );
+ }
+
+ const HAPPY_PATH_SQL: &str = r#"
+ INSERT INTO collection_data(collection_name, record)
+ VALUES
+ ('default/{e7fefcf3-b39c-4f17-5215-ebfe120a7031}', '{"id":"key-userWelcomed","key":"userWelcomed","data":1570659224457,"_status":"synced","last_modified":1579755940527}'),
+ ('default/{e7fefcf3-b39c-4f17-5215-ebfe120a7031}', '{"id":"key-isWho","key":"isWho","data":"4ec8109f","_status":"synced","last_modified":1579755940497}'),
+ ('default/storage-sync-crypto', '{"id":"keys","keys":{"default":["rQ=","lR="],"collections":{"extension@redux.devtools":["Bd=","ju="]}}}'),
+ ('default/https-everywhere@eff.org', '{"id":"key-userRules","key":"userRules","data":[],"_status":"synced","last_modified":1570079920045}'),
+ ('default/https-everywhere@eff.org', '{"id":"key-ruleActiveStates","key":"ruleActiveStates","data":{},"_status":"synced","last_modified":1570079919993}'),
+ ('default/https-everywhere@eff.org', '{"id":"key-migration_5F_version","key":"migration_version","data":2,"_status":"synced","last_modified":1570079919966}')
+ "#;
+ const HAPPY_PATH_MIGRATION_INFO: MigrationInfo = MigrationInfo {
+ entries: 5,
+ entries_successful: 5,
+ extensions: 2,
+ extensions_successful: 2,
+ open_failure: false,
+ };
+
+ #[allow(clippy::unreadable_literal)]
+ #[test]
+ fn test_happy_paths() {
+ // some real data.
+ let conn = do_migrate(HAPPY_PATH_MIGRATION_INFO, |c| {
+ c.execute_batch(HAPPY_PATH_SQL).expect("should populate")
+ });
+
+ assert_has(
+ &conn,
+ "{e7fefcf3-b39c-4f17-5215-ebfe120a7031}",
+ json!({"userWelcomed": 1570659224457u64, "isWho": "4ec8109f"}),
+ );
+ assert_has(
+ &conn,
+ "https-everywhere@eff.org",
+ json!({"userRules": [], "ruleActiveStates": {}, "migration_version": 2}),
+ );
+ }
+
+ #[test]
+ fn test_sad_paths() {
+ do_migrate(
+ MigrationInfo {
+ entries: 10,
+ entries_successful: 0,
+ extensions: 6,
+ extensions_successful: 0,
+ open_failure: false,
+ },
+ |c| {
+ c.execute_batch(
+ r#"INSERT INTO collection_data(collection_name, record)
+ VALUES
+ ('default/test', '{"key":2,"data":1}'), -- key not a string
+ ('default/test', '{"key":"","data":1}'), -- key empty string
+ ('default/test', '{"xey":"k","data":1}'), -- key missing
+ ('default/test', '{"key":"k","xata":1}'), -- data missing
+ ('default/test', '{"key":"k","data":1'), -- invalid json
+ ('xx/test', '{"key":"k","data":1}'), -- bad key format
+ ('default', '{"key":"k","data":1}'), -- bad key format 2
+ ('default/', '{"key":"k","data":1}'), -- bad key format 3
+ ('defaultx/test', '{"key":"k","data":1}'), -- bad key format 4
+ ('', '') -- empty strings
+ "#,
+ )
+ .expect("should populate");
+ },
+ );
+ }
+
+ #[test]
+ fn test_migration_info_storage() {
+ let tmpdir = tempdir().unwrap();
+ let path = tmpdir.path().join("source.db");
+ init_source_db(&path, |c| {
+ c.execute_batch(HAPPY_PATH_SQL).expect("should populate")
+ });
+
+ // now migrate
+ let db = crate::store::test::new_mem_store();
+ db.migrate(&path).expect("migration should work");
+ let mi = db
+ .take_migration_info()
+ .expect("take failed with info present");
+ assert_eq!(mi, Some(HAPPY_PATH_MIGRATION_INFO));
+ let mi2 = db
+ .take_migration_info()
+ .expect("take failed with info missing");
+ assert_eq!(mi2, None);
+ }
+}
diff --git a/third_party/rust/webext-storage/src/schema.rs b/third_party/rust/webext-storage/src/schema.rs
new file mode 100644
index 0000000000..59efcf495a
--- /dev/null
+++ b/third_party/rust/webext-storage/src/schema.rs
@@ -0,0 +1,213 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::db::sql_fns;
+use crate::error::Result;
+use rusqlite::{Connection, Transaction};
+use sql_support::open_database::{
+ ConnectionInitializer as MigrationLogic, Error as MigrationError, Result as MigrationResult,
+};
+
+const CREATE_SCHEMA_SQL: &str = include_str!("../sql/create_schema.sql");
+const CREATE_SYNC_TEMP_TABLES_SQL: &str = include_str!("../sql/create_sync_temp_tables.sql");
+
+pub struct WebExtMigrationLogin;
+
+impl MigrationLogic for WebExtMigrationLogin {
+ const NAME: &'static str = "webext storage db";
+ const END_VERSION: u32 = 2;
+
+ fn prepare(&self, conn: &Connection, _db_empty: bool) -> MigrationResult<()> {
+ let initial_pragmas = "
+ -- We don't care about temp tables being persisted to disk.
+ PRAGMA temp_store = 2;
+ -- we unconditionally want write-ahead-logging mode
+ PRAGMA journal_mode=WAL;
+ -- foreign keys seem worth enforcing!
+ PRAGMA foreign_keys = ON;
+ ";
+ conn.execute_batch(initial_pragmas)?;
+ define_functions(conn)?;
+ conn.set_prepared_statement_cache_capacity(128);
+ Ok(())
+ }
+
+ fn init(&self, db: &Transaction<'_>) -> MigrationResult<()> {
+ log::debug!("Creating schema");
+ db.execute_batch(CREATE_SCHEMA_SQL)?;
+ Ok(())
+ }
+
+ fn upgrade_from(&self, db: &Transaction<'_>, version: u32) -> MigrationResult<()> {
+ match version {
+ 1 => upgrade_from_1(db),
+ _ => Err(MigrationError::IncompatibleVersion(version)),
+ }
+ }
+}
+
+fn define_functions(c: &Connection) -> MigrationResult<()> {
+ use rusqlite::functions::FunctionFlags;
+ c.create_scalar_function(
+ "generate_guid",
+ 0,
+ FunctionFlags::SQLITE_UTF8,
+ sql_fns::generate_guid,
+ )?;
+ Ok(())
+}
+
+fn upgrade_from_1(db: &Connection) -> MigrationResult<()> {
+ // We changed a not null constraint
+ db.execute_batch("ALTER TABLE storage_sync_mirror RENAME TO old_mirror;")?;
+ // just re-run the full schema commands to recreate the able.
+ db.execute_batch(CREATE_SCHEMA_SQL)?;
+ db.execute_batch(
+ "INSERT OR IGNORE INTO storage_sync_mirror(guid, ext_id, data)
+ SELECT guid, ext_id, data FROM old_mirror;",
+ )?;
+ db.execute_batch("DROP TABLE old_mirror;")?;
+ db.execute_batch("PRAGMA user_version = 2;")?;
+ Ok(())
+}
+
+// Note that we expect this to be called before and after a sync - before to
+// ensure we are syncing with a clean state, after to be good memory citizens
+// given the temp tables are in memory.
+pub fn create_empty_sync_temp_tables(db: &Connection) -> Result<()> {
+ log::debug!("Initializing sync temp tables");
+ db.execute_batch(CREATE_SYNC_TEMP_TABLES_SQL)?;
+ Ok(())
+}
+
+#[cfg(test)]
+pub mod test {
+ use prettytable::{Cell, Row};
+ use rusqlite::Result as RusqliteResult;
+ use rusqlite::{types::Value, Connection};
+
+ // To help debugging tests etc.
+ #[allow(unused)]
+ pub fn print_table(conn: &Connection, table_name: &str) -> RusqliteResult<()> {
+ let mut stmt = conn.prepare(&format!("SELECT * FROM {}", table_name))?;
+ let mut rows = stmt.query([])?;
+ let mut table = prettytable::Table::new();
+ let mut titles = Row::empty();
+ for col in rows.as_ref().expect("must have statement").columns() {
+ titles.add_cell(Cell::new(col.name()));
+ }
+ table.set_titles(titles);
+ while let Some(sql_row) = rows.next()? {
+ let mut table_row = Row::empty();
+ for i in 0..sql_row.as_ref().column_count() {
+ let val = match sql_row.get::<_, Value>(i)? {
+ Value::Null => "null".to_string(),
+ Value::Integer(i) => i.to_string(),
+ Value::Real(f) => f.to_string(),
+ Value::Text(s) => s,
+ Value::Blob(b) => format!("<blob with {} bytes>", b.len()),
+ };
+ table_row.add_cell(Cell::new(&val));
+ }
+ table.add_row(table_row);
+ }
+ table.printstd();
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::db::test::new_mem_db;
+ use rusqlite::Error;
+ use sql_support::open_database::test_utils::MigratedDatabaseFile;
+ use sql_support::ConnExt;
+
+ const CREATE_SCHEMA_V1_SQL: &str = include_str!("../sql/tests/create_schema_v1.sql");
+
+ #[test]
+ fn test_create_schema_twice() {
+ let db = new_mem_db();
+ db.execute_batch(CREATE_SCHEMA_SQL)
+ .expect("should allow running twice");
+ }
+
+ #[test]
+ fn test_create_empty_sync_temp_tables_twice() {
+ let db = new_mem_db();
+ create_empty_sync_temp_tables(&db).expect("should work first time");
+ // insert something into our new temp table and check it's there.
+ db.execute_batch(
+ "INSERT INTO temp.storage_sync_staging
+ (guid, ext_id) VALUES
+ ('guid', 'ext_id');",
+ )
+ .expect("should work once");
+ let count = db
+ .query_row_and_then(
+ "SELECT COUNT(*) FROM temp.storage_sync_staging;",
+ [],
+ |row| row.get::<_, u32>(0),
+ )
+ .expect("query should work");
+ assert_eq!(count, 1, "should be one row");
+
+ // re-execute
+ create_empty_sync_temp_tables(&db).expect("should second first time");
+ // and it should have deleted existing data.
+ let count = db
+ .query_row_and_then(
+ "SELECT COUNT(*) FROM temp.storage_sync_staging;",
+ [],
+ |row| row.get::<_, u32>(0),
+ )
+ .expect("query should work");
+ assert_eq!(count, 0, "should be no rows");
+ }
+
+ #[test]
+ fn test_all_upgrades() -> Result<()> {
+ let db_file = MigratedDatabaseFile::new(WebExtMigrationLogin, CREATE_SCHEMA_V1_SQL);
+ db_file.run_all_upgrades();
+ let db = db_file.open();
+
+ let get_id_data = |guid: &str| -> Result<(Option<String>, Option<String>)> {
+ let (ext_id, data) = db
+ .try_query_row::<_, Error, _, _>(
+ "SELECT ext_id, data FROM storage_sync_mirror WHERE guid = :guid",
+ &[(":guid", &guid.to_string())],
+ |row| Ok((row.get(0)?, row.get(1)?)),
+ true,
+ )?
+ .expect("row should exist.");
+ Ok((ext_id, data))
+ };
+ assert_eq!(
+ get_id_data("guid-1")?,
+ (Some("ext-id-1".to_string()), Some("data-1".to_string()))
+ );
+ assert_eq!(
+ get_id_data("guid-2")?,
+ (Some("ext-id-2".to_string()), Some("data-2".to_string()))
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_upgrade_2() -> Result<()> {
+ let _ = env_logger::try_init();
+
+ let db_file = MigratedDatabaseFile::new(WebExtMigrationLogin, CREATE_SCHEMA_V1_SQL);
+ db_file.upgrade_to(2);
+ let db = db_file.open();
+
+ // Should be able to insert a new with a NULL ext_id
+ db.execute_batch(
+ "INSERT INTO storage_sync_mirror(guid, ext_id, data)
+ VALUES ('guid-3', NULL, NULL);",
+ )?;
+ Ok(())
+ }
+}
diff --git a/third_party/rust/webext-storage/src/store.rs b/third_party/rust/webext-storage/src/store.rs
new file mode 100644
index 0000000000..043ec5a11c
--- /dev/null
+++ b/third_party/rust/webext-storage/src/store.rs
@@ -0,0 +1,218 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::api::{self, StorageChanges};
+use crate::db::{StorageDb, ThreadSafeStorageDb};
+use crate::error::*;
+use crate::migration::{migrate, MigrationInfo};
+use crate::sync;
+use std::path::Path;
+use std::sync::Arc;
+
+use interrupt_support::SqlInterruptHandle;
+use serde_json::Value as JsonValue;
+
+/// A store is used to access `storage.sync` data. It manages an underlying
+/// database connection, and exposes methods for reading and writing storage
+/// items scoped to an extension ID. Each item is a JSON object, with one or
+/// more string keys, and values of any type that can serialize to JSON.
+///
+/// An application should create only one store, and manage the instance as a
+/// singleton. While this isn't enforced, if you make multiple stores pointing
+/// to the same database file, you are going to have a bad time: each store will
+/// create its own database connection, using up extra memory and CPU cycles,
+/// and causing write contention. For this reason, you should only call
+/// `Store::new()` (or `webext_store_new()`, from the FFI) once.
+///
+/// Note that our Db implementation is behind an Arc<> because we share that
+/// connection with our sync engines - ie, these engines also hold an Arc<>
+/// around the same object.
+pub struct Store {
+ db: Arc<ThreadSafeStorageDb>,
+}
+
+impl Store {
+ /// Creates a store backed by a database at `db_path`. The path can be a
+ /// file path or `file:` URI.
+ pub fn new(db_path: impl AsRef<Path>) -> Result<Self> {
+ let db = StorageDb::new(db_path)?;
+ Ok(Self {
+ db: Arc::new(ThreadSafeStorageDb::new(db)),
+ })
+ }
+
+ /// Creates a store backed by an in-memory database.
+ #[cfg(test)]
+ pub fn new_memory(db_path: &str) -> Result<Self> {
+ let db = StorageDb::new_memory(db_path)?;
+ Ok(Self {
+ db: Arc::new(ThreadSafeStorageDb::new(db)),
+ })
+ }
+
+ /// Returns an interrupt handle for this store.
+ pub fn interrupt_handle(&self) -> Arc<SqlInterruptHandle> {
+ self.db.interrupt_handle()
+ }
+
+ /// Sets one or more JSON key-value pairs for an extension ID. Returns a
+ /// list of changes, with existing and new values for each key in `val`.
+ pub fn set(&self, ext_id: &str, val: JsonValue) -> Result<StorageChanges> {
+ let db = self.db.lock();
+ let tx = db.unchecked_transaction()?;
+ let result = api::set(&tx, ext_id, val)?;
+ tx.commit()?;
+ Ok(result)
+ }
+
+ /// Returns information about per-extension usage
+ pub fn usage(&self) -> Result<Vec<crate::UsageInfo>> {
+ let db = self.db.lock();
+ api::usage(&db)
+ }
+
+ /// Returns the values for one or more keys `keys` can be:
+ ///
+ /// - `null`, in which case all key-value pairs for the extension are
+ /// returned, or an empty object if the extension doesn't have any
+ /// stored data.
+ /// - A single string key, in which case an object with only that key
+ /// and its value is returned, or an empty object if the key doesn't
+ // exist.
+ /// - An array of string keys, in which case an object with only those
+ /// keys and their values is returned. Any keys that don't exist will be
+ /// omitted.
+ /// - An object where the property names are keys, and each value is the
+ /// default value to return if the key doesn't exist.
+ ///
+ /// This method always returns an object (that is, a
+ /// `serde_json::Value::Object`).
+ pub fn get(&self, ext_id: &str, keys: JsonValue) -> Result<JsonValue> {
+ // Don't care about transactions here.
+ let db = self.db.lock();
+ api::get(&db, ext_id, keys)
+ }
+
+ /// Deletes the values for one or more keys. As with `get`, `keys` can be
+ /// either a single string key, or an array of string keys. Returns a list
+ /// of changes, where each change contains the old value for each deleted
+ /// key.
+ pub fn remove(&self, ext_id: &str, keys: JsonValue) -> Result<StorageChanges> {
+ let db = self.db.lock();
+ let tx = db.unchecked_transaction()?;
+ let result = api::remove(&tx, ext_id, keys)?;
+ tx.commit()?;
+ Ok(result)
+ }
+
+ /// Deletes all key-value pairs for the extension. As with `remove`, returns
+ /// a list of changes, where each change contains the old value for each
+ /// deleted key.
+ pub fn clear(&self, ext_id: &str) -> Result<StorageChanges> {
+ let db = self.db.lock();
+ let tx = db.unchecked_transaction()?;
+ let result = api::clear(&tx, ext_id)?;
+ tx.commit()?;
+ Ok(result)
+ }
+
+ /// Returns the bytes in use for the specified items (which can be null,
+ /// a string, or an array)
+ pub fn get_bytes_in_use(&self, ext_id: &str, keys: JsonValue) -> Result<usize> {
+ let db = self.db.lock();
+ api::get_bytes_in_use(&db, ext_id, keys)
+ }
+
+ /// Returns a bridged sync engine for Desktop for this store.
+ pub fn bridged_engine(&self) -> sync::BridgedEngine {
+ sync::BridgedEngine::new(&self.db)
+ }
+
+ /// Closes the store and its database connection. See the docs for
+ /// `StorageDb::close` for more details on when this can fail.
+ pub fn close(self) -> Result<()> {
+ // Even though this consumes `self`, the fact we use an Arc<> means
+ // we can't guarantee we can actually consume the inner DB - so do
+ // the best we can.
+ let shared: ThreadSafeStorageDb = match Arc::try_unwrap(self.db) {
+ Ok(shared) => shared,
+ _ => {
+ // The only way this is possible is if the sync engine has an operation
+ // running - but that shouldn't be possible in practice because desktop
+ // uses a single "task queue" such that the close operation can't possibly
+ // be running concurrently with any sync or storage tasks.
+
+ // If this *could* get hit, rusqlite will attempt to close the DB connection
+ // as it is dropped, and if that close fails, then rusqlite 0.28.0 and earlier
+ // would panic - but even that only happens if prepared statements are
+ // not finalized, which ruqlite also does.
+
+ // tl;dr - this should be impossible. If it was possible, rusqlite might panic,
+ // but we've never seen it panic in practice other places we don't close
+ // connections, and the next rusqlite version will not panic anyway.
+ // So this-is-fine.jpg
+ log::warn!("Attempting to close a store while other DB references exist.");
+ return Err(ErrorKind::OtherConnectionReferencesExist.into());
+ }
+ };
+ // consume the mutex and get back the inner.
+ let db = shared.into_inner();
+ db.close()
+ }
+
+ /// Gets the changes which the current sync applied. Should be used
+ /// immediately after the bridged engine is told to apply incoming changes,
+ /// and can be used to notify observers of the StorageArea of the changes
+ /// that were applied.
+ /// The result is a Vec of already JSON stringified changes.
+ pub fn get_synced_changes(&self) -> Result<Vec<sync::SyncedExtensionChange>> {
+ let db = self.db.lock();
+ sync::get_synced_changes(&db)
+ }
+
+ /// Migrates data from a database in the format of the "old" kinto
+ /// implementation. Information about how the migration went is stored in
+ /// the database, and can be read using `Self::take_migration_info`.
+ ///
+ /// Note that `filename` isn't normalized or canonicalized.
+ pub fn migrate(&self, filename: impl AsRef<Path>) -> Result<()> {
+ let db = self.db.lock();
+ let tx = db.unchecked_transaction()?;
+ let result = migrate(&tx, filename.as_ref())?;
+ tx.commit()?;
+ // Failing to store this information should not cause migration failure.
+ if let Err(e) = result.store(&db) {
+ debug_assert!(false, "Migration error: {:?}", e);
+ log::warn!("Failed to record migration telmetry: {}", e);
+ }
+ Ok(())
+ }
+
+ /// Read-and-delete (e.g. `take` in rust parlance, see Option::take)
+ /// operation for any MigrationInfo stored in this database.
+ pub fn take_migration_info(&self) -> Result<Option<MigrationInfo>> {
+ let db = self.db.lock();
+ let tx = db.unchecked_transaction()?;
+ let result = MigrationInfo::take(&tx)?;
+ tx.commit()?;
+ Ok(result)
+ }
+}
+
+#[cfg(test)]
+pub mod test {
+ use super::*;
+ #[test]
+ fn test_send() {
+ fn ensure_send<T: Send>() {}
+ // Compile will fail if not send.
+ ensure_send::<Store>();
+ }
+
+ pub fn new_mem_store() -> Store {
+ Store {
+ db: Arc::new(ThreadSafeStorageDb::new(crate::db::test::new_mem_db())),
+ }
+ }
+}
diff --git a/third_party/rust/webext-storage/src/sync/bridge.rs b/third_party/rust/webext-storage/src/sync/bridge.rs
new file mode 100644
index 0000000000..760a0d0e10
--- /dev/null
+++ b/third_party/rust/webext-storage/src/sync/bridge.rs
@@ -0,0 +1,388 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use anyhow::Result;
+use rusqlite::Transaction;
+use std::sync::{Arc, Weak};
+use sync15::bso::IncomingBso;
+use sync15::engine::ApplyResults;
+use sync_guid::Guid as SyncGuid;
+
+use crate::db::{delete_meta, get_meta, put_meta, ThreadSafeStorageDb};
+use crate::schema;
+use crate::sync::incoming::{apply_actions, get_incoming, plan_incoming, stage_incoming};
+use crate::sync::outgoing::{get_outgoing, record_uploaded, stage_outgoing};
+
+const LAST_SYNC_META_KEY: &str = "last_sync_time";
+const SYNC_ID_META_KEY: &str = "sync_id";
+
+/// A bridged engine implements all the methods needed to make the
+/// `storage.sync` store work with Desktop's Sync implementation.
+/// Conceptually, it's similar to `sync15::Store`, which we
+/// should eventually rename and unify with this trait (#2841).
+///
+/// Unlike most of our other implementation which hold a strong reference
+/// to the store, this engine keeps a weak reference in an attempt to keep
+/// the desktop semantics as close as possible to what they were when the
+/// engines all took lifetime params to ensure they don't outlive the store.
+pub struct BridgedEngine {
+ db: Weak<ThreadSafeStorageDb>,
+}
+
+impl BridgedEngine {
+ /// Creates a bridged engine for syncing.
+ pub fn new(db: &Arc<ThreadSafeStorageDb>) -> Self {
+ BridgedEngine {
+ db: Arc::downgrade(db),
+ }
+ }
+
+ fn do_reset(&self, tx: &Transaction<'_>) -> Result<()> {
+ tx.execute_batch(
+ "DELETE FROM storage_sync_mirror;
+ UPDATE storage_sync_data SET sync_change_counter = 1;",
+ )?;
+ delete_meta(tx, LAST_SYNC_META_KEY)?;
+ Ok(())
+ }
+
+ fn thread_safe_storage_db(&self) -> Result<Arc<ThreadSafeStorageDb>> {
+ self.db
+ .upgrade()
+ .ok_or_else(|| crate::error::ErrorKind::DatabaseConnectionClosed.into())
+ }
+}
+
+impl sync15::engine::BridgedEngine for BridgedEngine {
+ fn last_sync(&self) -> Result<i64> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ Ok(get_meta(&db, LAST_SYNC_META_KEY)?.unwrap_or(0))
+ }
+
+ fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ put_meta(&db, LAST_SYNC_META_KEY, &last_sync_millis)?;
+ Ok(())
+ }
+
+ fn sync_id(&self) -> Result<Option<String>> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ Ok(get_meta(&db, SYNC_ID_META_KEY)?)
+ }
+
+ fn reset_sync_id(&self) -> Result<String> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let tx = db.unchecked_transaction()?;
+ let new_id = SyncGuid::random().to_string();
+ self.do_reset(&tx)?;
+ put_meta(&tx, SYNC_ID_META_KEY, &new_id)?;
+ tx.commit()?;
+ Ok(new_id)
+ }
+
+ fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let current: Option<String> = get_meta(&db, SYNC_ID_META_KEY)?;
+ Ok(match current {
+ Some(current) if current == sync_id => current,
+ _ => {
+ let tx = db.unchecked_transaction()?;
+ self.do_reset(&tx)?;
+ let result = sync_id.to_string();
+ put_meta(&tx, SYNC_ID_META_KEY, &result)?;
+ tx.commit()?;
+ result
+ }
+ })
+ }
+
+ fn sync_started(&self) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ schema::create_empty_sync_temp_tables(&db)?;
+ Ok(())
+ }
+
+ fn store_incoming(&self, incoming_bsos: Vec<IncomingBso>) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let signal = db.begin_interrupt_scope()?;
+ let tx = db.unchecked_transaction()?;
+ let incoming_content: Vec<_> = incoming_bsos
+ .into_iter()
+ .map(IncomingBso::into_content::<super::WebextRecord>)
+ .collect();
+ stage_incoming(&tx, &incoming_content, &signal)?;
+ tx.commit()?;
+ Ok(())
+ }
+
+ fn apply(&self) -> Result<ApplyResults> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let signal = db.begin_interrupt_scope()?;
+
+ let tx = db.unchecked_transaction()?;
+ let incoming = get_incoming(&tx)?;
+ let actions = incoming
+ .into_iter()
+ .map(|(item, state)| (item, plan_incoming(state)))
+ .collect();
+ apply_actions(&tx, actions, &signal)?;
+ stage_outgoing(&tx)?;
+ tx.commit()?;
+
+ Ok(get_outgoing(&db, &signal)?.into())
+ }
+
+ fn set_uploaded(&self, _server_modified_millis: i64, ids: &[SyncGuid]) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let signal = db.begin_interrupt_scope()?;
+ let tx = db.unchecked_transaction()?;
+ record_uploaded(&tx, ids, &signal)?;
+ tx.commit()?;
+
+ Ok(())
+ }
+
+ fn sync_finished(&self) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ schema::create_empty_sync_temp_tables(&db)?;
+ Ok(())
+ }
+
+ fn reset(&self) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let tx = db.unchecked_transaction()?;
+ self.do_reset(&tx)?;
+ delete_meta(&tx, SYNC_ID_META_KEY)?;
+ tx.commit()?;
+ Ok(())
+ }
+
+ fn wipe(&self) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let tx = db.unchecked_transaction()?;
+ // We assume the meta table is only used by sync.
+ tx.execute_batch(
+ "DELETE FROM storage_sync_data; DELETE FROM storage_sync_mirror; DELETE FROM meta;",
+ )?;
+ tx.commit()?;
+ Ok(())
+ }
+}
+
+impl From<anyhow::Error> for crate::error::Error {
+ fn from(value: anyhow::Error) -> Self {
+ crate::error::ErrorKind::SyncError(value.to_string()).into()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::db::test::new_mem_thread_safe_storage_db;
+ use crate::db::StorageDb;
+ use sync15::engine::BridgedEngine;
+
+ fn query_count(conn: &StorageDb, table: &str) -> u32 {
+ conn.query_row_and_then(&format!("SELECT COUNT(*) FROM {};", table), [], |row| {
+ row.get::<_, u32>(0)
+ })
+ .expect("should work")
+ }
+
+ // Sets up mock data for the tests here.
+ fn setup_mock_data(engine: &super::BridgedEngine) -> Result<()> {
+ {
+ let shared = engine.thread_safe_storage_db()?;
+ let db = shared.lock();
+ db.execute(
+ "INSERT INTO storage_sync_data (ext_id, data, sync_change_counter)
+ VALUES ('ext-a', 'invalid-json', 2)",
+ [],
+ )?;
+ db.execute(
+ "INSERT INTO storage_sync_mirror (guid, ext_id, data)
+ VALUES ('guid', 'ext-a', '3')",
+ [],
+ )?;
+ }
+ engine.set_last_sync(1)?;
+
+ let shared = engine.thread_safe_storage_db()?;
+ let db = shared.lock();
+ // and assert we wrote what we think we did.
+ assert_eq!(query_count(&db, "storage_sync_data"), 1);
+ assert_eq!(query_count(&db, "storage_sync_mirror"), 1);
+ assert_eq!(query_count(&db, "meta"), 1);
+ Ok(())
+ }
+
+ // Assuming a DB setup with setup_mock_data, assert it was correctly reset.
+ fn assert_reset(engine: &super::BridgedEngine) -> Result<()> {
+ // A reset never wipes data...
+ let shared = engine.thread_safe_storage_db()?;
+ let db = shared.lock();
+ assert_eq!(query_count(&db, "storage_sync_data"), 1);
+
+ // But did reset the change counter.
+ let cc = db.query_row_and_then(
+ "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';",
+ [],
+ |row| row.get::<_, u32>(0),
+ )?;
+ assert_eq!(cc, 1);
+ // But did wipe the mirror...
+ assert_eq!(query_count(&db, "storage_sync_mirror"), 0);
+ // And the last_sync should have been wiped.
+ assert!(get_meta::<i64>(&db, LAST_SYNC_META_KEY)?.is_none());
+ Ok(())
+ }
+
+ // Assuming a DB setup with setup_mock_data, assert it has not been reset.
+ fn assert_not_reset(engine: &super::BridgedEngine) -> Result<()> {
+ let shared = engine.thread_safe_storage_db()?;
+ let db = shared.lock();
+ assert_eq!(query_count(&db, "storage_sync_data"), 1);
+ let cc = db.query_row_and_then(
+ "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';",
+ [],
+ |row| row.get::<_, u32>(0),
+ )?;
+ assert_eq!(cc, 2);
+ assert_eq!(query_count(&db, "storage_sync_mirror"), 1);
+ // And the last_sync should remain.
+ assert!(get_meta::<i64>(&db, LAST_SYNC_META_KEY)?.is_some());
+ Ok(())
+ }
+
+ #[test]
+ fn test_wipe() -> Result<()> {
+ let strong = new_mem_thread_safe_storage_db();
+ let engine = super::BridgedEngine::new(&strong);
+
+ setup_mock_data(&engine)?;
+
+ engine.wipe()?;
+
+ let shared = engine.thread_safe_storage_db()?;
+ let db = shared.lock();
+
+ assert_eq!(query_count(&db, "storage_sync_data"), 0);
+ assert_eq!(query_count(&db, "storage_sync_mirror"), 0);
+ assert_eq!(query_count(&db, "meta"), 0);
+ Ok(())
+ }
+
+ #[test]
+ fn test_reset() -> Result<()> {
+ let strong = new_mem_thread_safe_storage_db();
+ let engine = super::BridgedEngine::new(&strong);
+
+ setup_mock_data(&engine)?;
+ put_meta(
+ &engine.thread_safe_storage_db()?.lock(),
+ SYNC_ID_META_KEY,
+ &"sync-id".to_string(),
+ )?;
+
+ engine.reset()?;
+ assert_reset(&engine)?;
+ // Only an explicit reset kills the sync-id, so check that here.
+ assert_eq!(
+ get_meta::<String>(&engine.thread_safe_storage_db()?.lock(), SYNC_ID_META_KEY)?,
+ None
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_ensure_missing_sync_id() -> Result<()> {
+ let strong = new_mem_thread_safe_storage_db();
+ let engine = super::BridgedEngine::new(&strong);
+
+ setup_mock_data(&engine)?;
+
+ assert_eq!(engine.sync_id()?, None);
+ // We don't have a sync ID - so setting one should reset.
+ engine.ensure_current_sync_id("new-id")?;
+ // should have cause a reset.
+ assert_reset(&engine)?;
+ Ok(())
+ }
+
+ #[test]
+ fn test_ensure_new_sync_id() -> Result<()> {
+ let strong = new_mem_thread_safe_storage_db();
+ let engine = super::BridgedEngine::new(&strong);
+
+ setup_mock_data(&engine)?;
+
+ put_meta(
+ &engine.thread_safe_storage_db()?.lock(),
+ SYNC_ID_META_KEY,
+ &"old-id".to_string(),
+ )?;
+ assert_not_reset(&engine)?;
+ assert_eq!(engine.sync_id()?, Some("old-id".to_string()));
+
+ engine.ensure_current_sync_id("new-id")?;
+ // should have cause a reset.
+ assert_reset(&engine)?;
+ // should have the new id.
+ assert_eq!(engine.sync_id()?, Some("new-id".to_string()));
+ Ok(())
+ }
+
+ #[test]
+ fn test_ensure_same_sync_id() -> Result<()> {
+ let strong = new_mem_thread_safe_storage_db();
+ let engine = super::BridgedEngine::new(&strong);
+
+ setup_mock_data(&engine)?;
+ assert_not_reset(&engine)?;
+
+ put_meta(
+ &engine.thread_safe_storage_db()?.lock(),
+ SYNC_ID_META_KEY,
+ &"sync-id".to_string(),
+ )?;
+
+ engine.ensure_current_sync_id("sync-id")?;
+ // should not have reset.
+ assert_not_reset(&engine)?;
+ Ok(())
+ }
+
+ #[test]
+ fn test_reset_sync_id() -> Result<()> {
+ let strong = new_mem_thread_safe_storage_db();
+ let engine = super::BridgedEngine::new(&strong);
+
+ setup_mock_data(&engine)?;
+ put_meta(
+ &engine.thread_safe_storage_db()?.lock(),
+ SYNC_ID_META_KEY,
+ &"sync-id".to_string(),
+ )?;
+
+ assert_eq!(engine.sync_id()?, Some("sync-id".to_string()));
+ let new_id = engine.reset_sync_id()?;
+ // should have cause a reset.
+ assert_reset(&engine)?;
+ assert_eq!(engine.sync_id()?, Some(new_id));
+ Ok(())
+ }
+}
diff --git a/third_party/rust/webext-storage/src/sync/incoming.rs b/third_party/rust/webext-storage/src/sync/incoming.rs
new file mode 100644
index 0000000000..5d00fe8de3
--- /dev/null
+++ b/third_party/rust/webext-storage/src/sync/incoming.rs
@@ -0,0 +1,863 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// The "incoming" part of syncing - handling the incoming rows, staging them,
+// working out a plan for them, updating the local data and mirror, etc.
+
+use interrupt_support::Interruptee;
+use rusqlite::{Connection, Row, Transaction};
+use sql_support::ConnExt;
+use sync15::bso::{IncomingContent, IncomingKind};
+use sync_guid::Guid as SyncGuid;
+
+use crate::api::{StorageChanges, StorageValueChange};
+use crate::error::*;
+
+use super::{merge, remove_matching_keys, JsonMap, WebextRecord};
+
+/// The state data can be in. Could be represented as Option<JsonMap>, but this
+/// is clearer and independent of how the data is stored.
+#[derive(Debug, PartialEq, Eq)]
+pub enum DataState {
+ /// The data was deleted.
+ Deleted,
+ /// Data exists, as stored in the map.
+ Exists(JsonMap),
+}
+
+// A little helper to create a StorageChanges object when we are creating
+// a new value with multiple keys that doesn't exist locally.
+fn changes_for_new_incoming(new: &JsonMap) -> StorageChanges {
+ let mut result = StorageChanges::with_capacity(new.len());
+ for (key, val) in new.iter() {
+ result.push(StorageValueChange {
+ key: key.clone(),
+ old_value: None,
+ new_value: Some(val.clone()),
+ });
+ }
+ result
+}
+
+// This module deals exclusively with the Map inside a JsonValue::Object().
+// This helper reads such a Map from a SQL row, ignoring anything which is
+// either invalid JSON or a different JSON type.
+fn json_map_from_row(row: &Row<'_>, col: &str) -> Result<DataState> {
+ let s = row.get::<_, Option<String>>(col)?;
+ Ok(match s {
+ None => DataState::Deleted,
+ Some(s) => match serde_json::from_str(&s) {
+ Ok(serde_json::Value::Object(m)) => DataState::Exists(m),
+ _ => {
+ // We don't want invalid json or wrong types to kill syncing.
+ // It should be impossible as we never write anything which
+ // could cause it, but we can't really log the bad data as there
+ // might be PII. Logging just a message without any additional
+ // clues is going to be unhelpfully noisy, so, silently None.
+ // XXX - Maybe record telemetry?
+ DataState::Deleted
+ }
+ },
+ })
+}
+
+/// The first thing we do with incoming items is to "stage" them in a temp table.
+/// The actual processing is done via this table.
+pub fn stage_incoming(
+ tx: &Transaction<'_>,
+ incoming_records: &[IncomingContent<WebextRecord>],
+ signal: &dyn Interruptee,
+) -> Result<()> {
+ sql_support::each_sized_chunk(
+ incoming_records,
+ // We bind 3 params per chunk.
+ sql_support::default_max_variable_number() / 3,
+ |chunk, _| -> Result<()> {
+ let mut params = Vec::with_capacity(chunk.len() * 3);
+ for record in chunk {
+ signal.err_if_interrupted()?;
+ match &record.kind {
+ IncomingKind::Content(r) => {
+ params.push(Some(record.envelope.id.to_string()));
+ params.push(Some(r.ext_id.to_string()));
+ params.push(Some(r.data.clone()));
+ }
+ IncomingKind::Tombstone => {
+ params.push(Some(record.envelope.id.to_string()));
+ params.push(None);
+ params.push(None);
+ }
+ IncomingKind::Malformed => {
+ log::error!("Ignoring incoming malformed record: {}", record.envelope.id);
+ }
+ }
+ }
+ // we might have skipped records
+ let actual_len = params.len() / 3;
+ if actual_len != 0 {
+ let sql = format!(
+ "INSERT OR REPLACE INTO temp.storage_sync_staging
+ (guid, ext_id, data)
+ VALUES {}",
+ sql_support::repeat_multi_values(actual_len, 3)
+ );
+ tx.execute(&sql, rusqlite::params_from_iter(params))?;
+ }
+ Ok(())
+ },
+ )?;
+ Ok(())
+}
+
+/// The "state" we find ourselves in when considering an incoming/staging
+/// record. This "state" is the input to calculating the IncomingAction and
+/// carries all the data we need to make the required local changes.
+#[derive(Debug, PartialEq, Eq)]
+pub enum IncomingState {
+ /// There's an incoming item, but data for that extension doesn't exist
+ /// either in our local data store or in the local mirror. IOW, this is the
+ /// very first time we've seen this extension.
+ IncomingOnlyData { ext_id: String, data: JsonMap },
+
+ /// An incoming tombstone that doesn't exist locally. Because tombstones
+ /// don't carry the ext-id, it means it's not in our mirror. We are just
+ /// going to ignore it, but we track the state for consistency.
+ IncomingOnlyTombstone,
+
+ /// There's an incoming item and we have data for the same extension in
+ /// our local store - but not in our mirror. This should be relatively
+ /// uncommon as it means:
+ /// * Some other profile has recently installed an extension and synced.
+ /// * This profile has recently installed the same extension.
+ /// * This is the first sync for this profile since both those events
+ /// happened.
+ HasLocal {
+ ext_id: String,
+ incoming: DataState,
+ local: DataState,
+ },
+ /// There's an incoming item and there's an item for the same extension in
+ /// the mirror. The addon probably doesn't exist locally, or if it does,
+ /// the last time we synced we synced the deletion of all data.
+ NotLocal {
+ ext_id: String,
+ incoming: DataState,
+ mirror: DataState,
+ },
+ /// This will be the most common "incoming" case - there's data incoming,
+ /// in the mirror and in the local store for an addon.
+ Everywhere {
+ ext_id: String,
+ incoming: DataState,
+ mirror: DataState,
+ local: DataState,
+ },
+}
+
+/// Get the items we need to process from the staging table. Return details about
+/// the item and the state of that item, ready for processing.
+pub fn get_incoming(conn: &Connection) -> Result<Vec<(SyncGuid, IncomingState)>> {
+ let sql = "
+ SELECT
+ s.guid as guid,
+ l.ext_id as l_ext_id,
+ m.ext_id as m_ext_id,
+ s.ext_id as s_ext_id,
+ s.data as s_data, m.data as m_data, l.data as l_data,
+ l.sync_change_counter
+ FROM temp.storage_sync_staging s
+ LEFT JOIN storage_sync_mirror m ON m.guid = s.guid
+ LEFT JOIN storage_sync_data l on l.ext_id IN (m.ext_id, s.ext_id);";
+
+ fn from_row(row: &Row<'_>) -> Result<(SyncGuid, IncomingState)> {
+ let guid = row.get("guid")?;
+ // This is complicated because the staging row doesn't hold the ext_id.
+ // However, both the local table and the mirror do.
+ let mirror_ext_id: Option<String> = row.get("m_ext_id")?;
+ let local_ext_id: Option<String> = row.get("l_ext_id")?;
+ let staged_ext_id: Option<String> = row.get("s_ext_id")?;
+ let incoming = json_map_from_row(row, "s_data")?;
+
+ // We find the state by examining which tables the ext-id exists in,
+ // using whether that column is null as a proxy for that.
+ let state = match (local_ext_id, mirror_ext_id) {
+ (None, None) => {
+ match staged_ext_id {
+ Some(ext_id) => {
+ let data = match incoming {
+ // incoming record with missing data that's not a
+ // tombstone shouldn't happen, but we can cope by
+ // pretending it was an empty json map.
+ DataState::Deleted => JsonMap::new(),
+ DataState::Exists(data) => data,
+ };
+ IncomingState::IncomingOnlyData { ext_id, data }
+ }
+ None => IncomingState::IncomingOnlyTombstone,
+ }
+ }
+ (Some(ext_id), None) => IncomingState::HasLocal {
+ ext_id,
+ incoming,
+ local: json_map_from_row(row, "l_data")?,
+ },
+ (None, Some(ext_id)) => IncomingState::NotLocal {
+ ext_id,
+ incoming,
+ mirror: json_map_from_row(row, "m_data")?,
+ },
+ (Some(ext_id), Some(_)) => IncomingState::Everywhere {
+ ext_id,
+ incoming,
+ mirror: json_map_from_row(row, "m_data")?,
+ local: json_map_from_row(row, "l_data")?,
+ },
+ };
+ Ok((guid, state))
+ }
+
+ conn.conn().query_rows_and_then(sql, [], from_row)
+}
+
+/// This is the set of actions we know how to take *locally* for incoming
+/// records. Which one depends on the IncomingState.
+/// Every state which updates also records the set of changes we should notify
+#[derive(Debug, PartialEq, Eq)]
+pub enum IncomingAction {
+ /// We should locally delete the data for this record
+ DeleteLocally {
+ ext_id: String,
+ changes: StorageChanges,
+ },
+ /// We will take the remote.
+ TakeRemote {
+ ext_id: String,
+ data: JsonMap,
+ changes: StorageChanges,
+ },
+ /// We merged this data - this is what we came up with.
+ Merge {
+ ext_id: String,
+ data: JsonMap,
+ changes: StorageChanges,
+ },
+ /// Entry exists locally and it's the same as the incoming record.
+ Same { ext_id: String },
+ /// Incoming tombstone for an item we've never seen.
+ Nothing,
+}
+
+/// Takes the state of an item and returns the action we should take for it.
+pub fn plan_incoming(s: IncomingState) -> IncomingAction {
+ match s {
+ IncomingState::Everywhere {
+ ext_id,
+ incoming,
+ local,
+ mirror,
+ } => {
+ // All records exist - but do they all have data?
+ match (incoming, local, mirror) {
+ (
+ DataState::Exists(incoming_data),
+ DataState::Exists(local_data),
+ DataState::Exists(mirror_data),
+ ) => {
+ // all records have data - 3-way merge.
+ merge(ext_id, incoming_data, local_data, Some(mirror_data))
+ }
+ (
+ DataState::Exists(incoming_data),
+ DataState::Exists(local_data),
+ DataState::Deleted,
+ ) => {
+ // No parent, so first time seeing this remotely - 2-way merge
+ merge(ext_id, incoming_data, local_data, None)
+ }
+ (DataState::Exists(incoming_data), DataState::Deleted, _) => {
+ // Incoming data, removed locally. Server wins.
+ IncomingAction::TakeRemote {
+ ext_id,
+ changes: changes_for_new_incoming(&incoming_data),
+ data: incoming_data,
+ }
+ }
+ (DataState::Deleted, DataState::Exists(local_data), DataState::Exists(mirror)) => {
+ // Deleted remotely.
+ // Treat this as a delete of every key that we
+ // know was present at the time.
+ let (result, changes) = remove_matching_keys(local_data, &mirror);
+ if result.is_empty() {
+ // If there were no more keys left, we can
+ // delete our version too.
+ IncomingAction::DeleteLocally { ext_id, changes }
+ } else {
+ IncomingAction::Merge {
+ ext_id,
+ data: result,
+ changes,
+ }
+ }
+ }
+ (DataState::Deleted, DataState::Exists(local_data), DataState::Deleted) => {
+ // Perhaps another client created and then deleted
+ // the whole object for this extension since the
+ // last time we synced.
+ // Treat this as a delete of every key that we
+ // knew was present. Unfortunately, we don't know
+ // any keys that were present, so we delete no keys.
+ IncomingAction::Merge {
+ ext_id,
+ data: local_data,
+ changes: StorageChanges::new(),
+ }
+ }
+ (DataState::Deleted, DataState::Deleted, _) => {
+ // We agree with the remote (regardless of what we
+ // have mirrored).
+ IncomingAction::Same { ext_id }
+ }
+ }
+ }
+ IncomingState::HasLocal {
+ ext_id,
+ incoming,
+ local,
+ } => {
+ // So we have a local record and an incoming/staging record, but *not* a
+ // mirror record. This means some other device has synced this for
+ // the first time and we are yet to do the same.
+ match (incoming, local) {
+ (DataState::Exists(incoming_data), DataState::Exists(local_data)) => {
+ // This means the extension exists locally and remotely
+ // but this is the first time we've synced it. That's no problem, it's
+ // just a 2-way merge...
+ merge(ext_id, incoming_data, local_data, None)
+ }
+ (DataState::Deleted, DataState::Exists(local_data)) => {
+ // We've data locally, but there's an incoming deletion.
+ // We would normally remove keys that we knew were
+ // present on the server, but we don't know what
+ // was on the server, so we don't remove anything.
+ IncomingAction::Merge {
+ ext_id,
+ data: local_data,
+ changes: StorageChanges::new(),
+ }
+ }
+ (DataState::Exists(incoming_data), DataState::Deleted) => {
+ // No data locally, but some is incoming - take it.
+ IncomingAction::TakeRemote {
+ ext_id,
+ changes: changes_for_new_incoming(&incoming_data),
+ data: incoming_data,
+ }
+ }
+ (DataState::Deleted, DataState::Deleted) => {
+ // Nothing anywhere - odd, but OK.
+ IncomingAction::Same { ext_id }
+ }
+ }
+ }
+ IncomingState::NotLocal {
+ ext_id, incoming, ..
+ } => {
+ // No local data but there's mirror and an incoming record.
+ // This means a local deletion is being replaced by, or just re-doing
+ // the incoming record.
+ match incoming {
+ DataState::Exists(data) => IncomingAction::TakeRemote {
+ ext_id,
+ changes: changes_for_new_incoming(&data),
+ data,
+ },
+ DataState::Deleted => IncomingAction::Same { ext_id },
+ }
+ }
+ IncomingState::IncomingOnlyData { ext_id, data } => {
+ // Only the staging record exists and it's not a tombstone.
+ // This means it's the first time we've ever seen it. No
+ // conflict possible, just take the remote.
+ IncomingAction::TakeRemote {
+ ext_id,
+ changes: changes_for_new_incoming(&data),
+ data,
+ }
+ }
+ IncomingState::IncomingOnlyTombstone => {
+ // Only the staging record exists and it is a tombstone - nothing to do.
+ IncomingAction::Nothing
+ }
+ }
+}
+
+fn insert_changes(tx: &Transaction<'_>, ext_id: &str, changes: &StorageChanges) -> Result<()> {
+ tx.execute_cached(
+ "INSERT INTO temp.storage_sync_applied (ext_id, changes)
+ VALUES (:ext_id, :changes)",
+ rusqlite::named_params! {
+ ":ext_id": ext_id,
+ ":changes": &serde_json::to_string(&changes)?,
+ },
+ )?;
+ Ok(())
+}
+
+// Apply the actions necessary to fully process the incoming items.
+pub fn apply_actions(
+ tx: &Transaction<'_>,
+ actions: Vec<(SyncGuid, IncomingAction)>,
+ signal: &dyn Interruptee,
+) -> Result<()> {
+ for (item, action) in actions {
+ signal.err_if_interrupted()?;
+
+ log::trace!("action for '{:?}': {:?}", item, action);
+ match action {
+ IncomingAction::DeleteLocally { ext_id, changes } => {
+ // Can just nuke it entirely.
+ tx.execute_cached(
+ "DELETE FROM storage_sync_data WHERE ext_id = :ext_id",
+ &[(":ext_id", &ext_id)],
+ )?;
+ insert_changes(tx, &ext_id, &changes)?;
+ }
+ // We want to update the local record with 'data' and after this update the item no longer is considered dirty.
+ IncomingAction::TakeRemote {
+ ext_id,
+ data,
+ changes,
+ } => {
+ tx.execute_cached(
+ "INSERT OR REPLACE INTO storage_sync_data(ext_id, data, sync_change_counter)
+ VALUES (:ext_id, :data, 0)",
+ rusqlite::named_params! {
+ ":ext_id": ext_id,
+ ":data": serde_json::Value::Object(data),
+ },
+ )?;
+ insert_changes(tx, &ext_id, &changes)?;
+ }
+
+ // We merged this data, so need to update locally but still consider
+ // it dirty because the merged data must be uploaded.
+ IncomingAction::Merge {
+ ext_id,
+ data,
+ changes,
+ } => {
+ tx.execute_cached(
+ "UPDATE storage_sync_data SET data = :data, sync_change_counter = sync_change_counter + 1 WHERE ext_id = :ext_id",
+ rusqlite::named_params! {
+ ":ext_id": ext_id,
+ ":data": serde_json::Value::Object(data),
+ },
+ )?;
+ insert_changes(tx, &ext_id, &changes)?;
+ }
+
+ // Both local and remote ended up the same - only need to nuke the
+ // change counter.
+ IncomingAction::Same { ext_id } => {
+ tx.execute_cached(
+ "UPDATE storage_sync_data SET sync_change_counter = 0 WHERE ext_id = :ext_id",
+ &[(":ext_id", &ext_id)],
+ )?;
+ // no changes to write
+ }
+ // Literally nothing to do!
+ IncomingAction::Nothing => {}
+ }
+ }
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::super::test::new_syncable_mem_db;
+ use super::*;
+ use crate::api;
+ use interrupt_support::NeverInterrupts;
+ use serde_json::{json, Value};
+ use sync15::bso::IncomingBso;
+
+ // select simple int
+ fn ssi(conn: &Connection, stmt: &str) -> u32 {
+ conn.try_query_one(stmt, [], true)
+ .expect("must work")
+ .unwrap_or_default()
+ }
+
+ fn array_to_incoming(mut array: Value) -> Vec<IncomingContent<WebextRecord>> {
+ let jv = array.as_array_mut().expect("you must pass a json array");
+ let mut result = Vec::with_capacity(jv.len());
+ for elt in jv {
+ result.push(IncomingBso::from_test_content(elt.take()).into_content());
+ }
+ result
+ }
+
+ // Can't find a way to import these from crate::sync::tests...
+ macro_rules! map {
+ ($($map:tt)+) => {
+ json!($($map)+).as_object().unwrap().clone()
+ };
+ }
+ macro_rules! change {
+ ($key:literal, None, None) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: None,
+ new_value: None,
+ };
+ };
+ ($key:literal, $old:tt, None) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: Some(json!($old)),
+ new_value: None,
+ }
+ };
+ ($key:literal, None, $new:tt) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: None,
+ new_value: Some(json!($new)),
+ };
+ };
+ ($key:literal, $old:tt, $new:tt) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: Some(json!($old)),
+ new_value: Some(json!($new)),
+ }
+ };
+ }
+ macro_rules! changes {
+ ( $( $change:expr ),* ) => {
+ {
+ let mut changes = StorageChanges::new();
+ $(
+ changes.push($change);
+ )*
+ changes
+ }
+ };
+ }
+
+ #[test]
+ fn test_incoming_populates_staging() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+
+ let incoming = json! {[
+ {
+ "id": "guidAAAAAAAA",
+ "extId": "ext1@example.com",
+ "data": json!({"foo": "bar"}).to_string(),
+ }
+ ]};
+
+ stage_incoming(&tx, &array_to_incoming(incoming), &NeverInterrupts)?;
+ // check staging table
+ assert_eq!(
+ ssi(&tx, "SELECT count(*) FROM temp.storage_sync_staging"),
+ 1
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_fetch_incoming_state() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+
+ // Start with an item just in staging.
+ tx.execute(
+ r#"
+ INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
+ VALUES ('guid', 'ext_id', '{"foo":"bar"}')
+ "#,
+ [],
+ )?;
+
+ let incoming = get_incoming(&tx)?;
+ assert_eq!(incoming.len(), 1);
+ assert_eq!(incoming[0].0, SyncGuid::new("guid"),);
+ assert_eq!(
+ incoming[0].1,
+ IncomingState::IncomingOnlyData {
+ ext_id: "ext_id".to_string(),
+ data: map!({"foo": "bar"}),
+ }
+ );
+
+ // Add the same item to the mirror.
+ tx.execute(
+ r#"
+ INSERT INTO storage_sync_mirror (guid, ext_id, data)
+ VALUES ('guid', 'ext_id', '{"foo":"new"}')
+ "#,
+ [],
+ )?;
+ let incoming = get_incoming(&tx)?;
+ assert_eq!(incoming.len(), 1);
+ assert_eq!(
+ incoming[0].1,
+ IncomingState::NotLocal {
+ ext_id: "ext_id".to_string(),
+ incoming: DataState::Exists(map!({"foo": "bar"})),
+ mirror: DataState::Exists(map!({"foo": "new"})),
+ }
+ );
+
+ // and finally the data itself - might as use the API here!
+ api::set(&tx, "ext_id", json!({"foo": "local"}))?;
+ let incoming = get_incoming(&tx)?;
+ assert_eq!(incoming.len(), 1);
+ assert_eq!(
+ incoming[0].1,
+ IncomingState::Everywhere {
+ ext_id: "ext_id".to_string(),
+ incoming: DataState::Exists(map!({"foo": "bar"})),
+ local: DataState::Exists(map!({"foo": "local"})),
+ mirror: DataState::Exists(map!({"foo": "new"})),
+ }
+ );
+ Ok(())
+ }
+
+ // Like test_fetch_incoming_state, but check NULLs are handled correctly.
+ #[test]
+ fn test_fetch_incoming_state_nulls() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+
+ // Start with a tombstone just in staging.
+ tx.execute(
+ r#"
+ INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
+ VALUES ('guid', NULL, NULL)
+ "#,
+ [],
+ )?;
+
+ let incoming = get_incoming(&tx)?;
+ assert_eq!(incoming.len(), 1);
+ assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone,);
+
+ // Add the same item to the mirror (can't store an ext_id for a
+ // tombstone in the mirror as incoming tombstones never have it)
+ tx.execute(
+ r#"
+ INSERT INTO storage_sync_mirror (guid, ext_id, data)
+ VALUES ('guid', NULL, NULL)
+ "#,
+ [],
+ )?;
+ let incoming = get_incoming(&tx)?;
+ assert_eq!(incoming.len(), 1);
+ assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone);
+
+ tx.execute(
+ r#"
+ INSERT INTO storage_sync_data (ext_id, data)
+ VALUES ('ext_id', NULL)
+ "#,
+ [],
+ )?;
+ let incoming = get_incoming(&tx)?;
+ assert_eq!(incoming.len(), 1);
+ assert_eq!(
+ incoming[0].1,
+ // IncomingOnly* seems a little odd, but it is because we can't
+ // tie the tombstones together due to the lack of any ext-id/guid
+ // mapping in this case.
+ IncomingState::IncomingOnlyTombstone
+ );
+ Ok(())
+ }
+
+ // apply_action tests.
+ #[derive(Debug, PartialEq)]
+ struct LocalItem {
+ data: DataState,
+ sync_change_counter: i32,
+ }
+
+ fn get_local_item(conn: &Connection) -> Option<LocalItem> {
+ conn.try_query_row::<_, Error, _, _>(
+ "SELECT data, sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_id'",
+ [],
+ |row| {
+ let data = json_map_from_row(row, "data")?;
+ let sync_change_counter = row.get::<_, i32>(1)?;
+ Ok(LocalItem {
+ data,
+ sync_change_counter,
+ })
+ },
+ true,
+ )
+ .expect("query should work")
+ }
+
+ fn get_applied_item_changes(conn: &Connection) -> Option<StorageChanges> {
+ // no custom deserialize for storagechanges and we only need it for
+ // tests, so do it manually.
+ conn.try_query_row::<_, Error, _, _>(
+ "SELECT changes FROM temp.storage_sync_applied WHERE ext_id = 'ext_id'",
+ [],
+ |row| Ok(serde_json::from_str(&row.get::<_, String>("changes")?)?),
+ true,
+ )
+ .expect("query should work")
+ .map(|val: serde_json::Value| {
+ let ob = val.as_object().expect("should be an object of items");
+ let mut result = StorageChanges::with_capacity(ob.len());
+ for (key, val) in ob.into_iter() {
+ let details = val.as_object().expect("elts should be objects");
+ result.push(StorageValueChange {
+ key: key.to_string(),
+ old_value: details.get("oldValue").cloned(),
+ new_value: details.get("newValue").cloned(),
+ });
+ }
+ result
+ })
+ }
+
+ fn do_apply_action(tx: &Transaction<'_>, action: IncomingAction) {
+ let guid = SyncGuid::new("guid");
+ apply_actions(tx, vec![(guid, action)], &NeverInterrupts).expect("should apply");
+ }
+
+ #[test]
+ fn test_apply_actions() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+
+ // DeleteLocally - row should be entirely removed.
+ let tx = db.transaction().expect("transaction should work");
+ api::set(&tx, "ext_id", json!({"foo": "local"}))?;
+ assert_eq!(
+ api::get(&tx, "ext_id", json!(null))?,
+ json!({"foo": "local"})
+ );
+ let changes = changes![change!("foo", "local", None)];
+ do_apply_action(
+ &tx,
+ IncomingAction::DeleteLocally {
+ ext_id: "ext_id".to_string(),
+ changes: changes.clone(),
+ },
+ );
+ assert_eq!(api::get(&tx, "ext_id", json!(null))?, json!({}));
+ // and there should not be a local record at all.
+ assert!(get_local_item(&tx).is_none());
+ assert_eq!(get_applied_item_changes(&tx), Some(changes));
+ tx.rollback()?;
+
+ // TakeRemote - replace local data with remote and marked as not dirty.
+ let tx = db.transaction().expect("transaction should work");
+ api::set(&tx, "ext_id", json!({"foo": "local"}))?;
+ assert_eq!(
+ api::get(&tx, "ext_id", json!(null))?,
+ json!({"foo": "local"})
+ );
+ // data should exist locally with a change recorded.
+ assert_eq!(
+ get_local_item(&tx),
+ Some(LocalItem {
+ data: DataState::Exists(map!({"foo": "local"})),
+ sync_change_counter: 1
+ })
+ );
+ let changes = changes![change!("foo", "local", "remote")];
+ do_apply_action(
+ &tx,
+ IncomingAction::TakeRemote {
+ ext_id: "ext_id".to_string(),
+ data: map!({"foo": "remote"}),
+ changes: changes.clone(),
+ },
+ );
+ // data should exist locally with the remote data and not be dirty.
+ assert_eq!(
+ get_local_item(&tx),
+ Some(LocalItem {
+ data: DataState::Exists(map!({"foo": "remote"})),
+ sync_change_counter: 0
+ })
+ );
+ assert_eq!(get_applied_item_changes(&tx), Some(changes));
+ tx.rollback()?;
+
+ // Merge - like ::TakeRemote, but data remains dirty.
+ let tx = db.transaction().expect("transaction should work");
+ api::set(&tx, "ext_id", json!({"foo": "local"}))?;
+ assert_eq!(
+ api::get(&tx, "ext_id", json!(null))?,
+ json!({"foo": "local"})
+ );
+ // data should exist locally with a change recorded.
+ assert_eq!(
+ get_local_item(&tx),
+ Some(LocalItem {
+ data: DataState::Exists(map!({"foo": "local"})),
+ sync_change_counter: 1
+ })
+ );
+ let changes = changes![change!("foo", "local", "remote")];
+ do_apply_action(
+ &tx,
+ IncomingAction::Merge {
+ ext_id: "ext_id".to_string(),
+ data: map!({"foo": "remote"}),
+ changes: changes.clone(),
+ },
+ );
+ assert_eq!(
+ get_local_item(&tx),
+ Some(LocalItem {
+ data: DataState::Exists(map!({"foo": "remote"})),
+ sync_change_counter: 2
+ })
+ );
+ assert_eq!(get_applied_item_changes(&tx), Some(changes));
+ tx.rollback()?;
+
+ // Same - data stays the same but is marked not dirty.
+ let tx = db.transaction().expect("transaction should work");
+ api::set(&tx, "ext_id", json!({"foo": "local"}))?;
+ assert_eq!(
+ api::get(&tx, "ext_id", json!(null))?,
+ json!({"foo": "local"})
+ );
+ // data should exist locally with a change recorded.
+ assert_eq!(
+ get_local_item(&tx),
+ Some(LocalItem {
+ data: DataState::Exists(map!({"foo": "local"})),
+ sync_change_counter: 1
+ })
+ );
+ do_apply_action(
+ &tx,
+ IncomingAction::Same {
+ ext_id: "ext_id".to_string(),
+ },
+ );
+ assert_eq!(
+ get_local_item(&tx),
+ Some(LocalItem {
+ data: DataState::Exists(map!({"foo": "local"})),
+ sync_change_counter: 0
+ })
+ );
+ assert_eq!(get_applied_item_changes(&tx), None);
+ tx.rollback()?;
+
+ Ok(())
+ }
+}
diff --git a/third_party/rust/webext-storage/src/sync/mod.rs b/third_party/rust/webext-storage/src/sync/mod.rs
new file mode 100644
index 0000000000..3144049dca
--- /dev/null
+++ b/third_party/rust/webext-storage/src/sync/mod.rs
@@ -0,0 +1,429 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+mod bridge;
+mod incoming;
+mod outgoing;
+
+#[cfg(test)]
+mod sync_tests;
+
+use crate::api::{StorageChanges, StorageValueChange};
+use crate::db::StorageDb;
+use crate::error::*;
+use serde::Deserialize;
+use serde_derive::*;
+use sql_support::ConnExt;
+use sync_guid::Guid as SyncGuid;
+
+pub use bridge::BridgedEngine;
+use incoming::IncomingAction;
+
+type JsonMap = serde_json::Map<String, serde_json::Value>;
+
+pub const STORAGE_VERSION: usize = 1;
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WebextRecord {
+ #[serde(rename = "id")]
+ guid: SyncGuid,
+ #[serde(rename = "extId")]
+ ext_id: String,
+ data: String,
+}
+
+// Perform a 2-way or 3-way merge, where the incoming value wins on confict.
+fn merge(
+ ext_id: String,
+ mut other: JsonMap,
+ mut ours: JsonMap,
+ parent: Option<JsonMap>,
+) -> IncomingAction {
+ if other == ours {
+ return IncomingAction::Same { ext_id };
+ }
+ let old_incoming = other.clone();
+ // worst case is keys in each are unique.
+ let mut changes = StorageChanges::with_capacity(other.len() + ours.len());
+ if let Some(parent) = parent {
+ // Perform 3-way merge. First, for every key in parent,
+ // compare the parent value with the incoming value to compute
+ // an implicit "diff".
+ for (key, parent_value) in parent.into_iter() {
+ if let Some(incoming_value) = other.remove(&key) {
+ if incoming_value != parent_value {
+ log::trace!(
+ "merge: key {} was updated in incoming - copying value locally",
+ key
+ );
+ let old_value = ours.remove(&key);
+ let new_value = Some(incoming_value.clone());
+ if old_value != new_value {
+ changes.push(StorageValueChange {
+ key: key.clone(),
+ old_value,
+ new_value,
+ });
+ }
+ ours.insert(key, incoming_value);
+ }
+ } else {
+ // Key was not present in incoming value.
+ // Another client must have deleted it.
+ log::trace!(
+ "merge: key {} no longer present in incoming - removing it locally",
+ key
+ );
+ if let Some(old_value) = ours.remove(&key) {
+ changes.push(StorageValueChange {
+ key,
+ old_value: Some(old_value),
+ new_value: None,
+ });
+ }
+ }
+ }
+
+ // Then, go through every remaining key in incoming. These are
+ // the ones where a corresponding key does not exist in
+ // parent, so it is a new key, and we need to add it.
+ for (key, incoming_value) in other.into_iter() {
+ log::trace!(
+ "merge: key {} doesn't occur in parent - copying from incoming",
+ key
+ );
+ changes.push(StorageValueChange {
+ key: key.clone(),
+ old_value: None,
+ new_value: Some(incoming_value.clone()),
+ });
+ ours.insert(key, incoming_value);
+ }
+ } else {
+ // No parent. Server wins. Overwrite every key in ours with
+ // the corresponding value in other.
+ log::trace!("merge: no parent - copying all keys from incoming");
+ for (key, incoming_value) in other.into_iter() {
+ let old_value = ours.remove(&key);
+ let new_value = Some(incoming_value.clone());
+ if old_value != new_value {
+ changes.push(StorageValueChange {
+ key: key.clone(),
+ old_value,
+ new_value,
+ });
+ }
+ ours.insert(key, incoming_value);
+ }
+ }
+
+ if ours == old_incoming {
+ IncomingAction::TakeRemote {
+ ext_id,
+ data: old_incoming,
+ changes,
+ }
+ } else {
+ IncomingAction::Merge {
+ ext_id,
+ data: ours,
+ changes,
+ }
+ }
+}
+
+fn remove_matching_keys(mut ours: JsonMap, keys_to_remove: &JsonMap) -> (JsonMap, StorageChanges) {
+ let mut changes = StorageChanges::with_capacity(keys_to_remove.len());
+ for key in keys_to_remove.keys() {
+ if let Some(old_value) = ours.remove(key) {
+ changes.push(StorageValueChange {
+ key: key.clone(),
+ old_value: Some(old_value),
+ new_value: None,
+ });
+ }
+ }
+ (ours, changes)
+}
+
+/// Holds a JSON-serialized map of all synced changes for an extension.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct SyncedExtensionChange {
+ /// The extension ID.
+ pub ext_id: String,
+ /// The contents of a `StorageChanges` struct, in JSON format. We don't
+ /// deserialize these because they need to be passed back to the browser
+ /// as strings anyway.
+ pub changes: String,
+}
+
+// Fetches the applied changes we stashed in the storage_sync_applied table.
+pub fn get_synced_changes(db: &StorageDb) -> Result<Vec<SyncedExtensionChange>> {
+ let signal = db.begin_interrupt_scope()?;
+ let sql = "SELECT ext_id, changes FROM temp.storage_sync_applied";
+ db.conn().query_rows_and_then(sql, [], |row| -> Result<_> {
+ signal.err_if_interrupted()?;
+ Ok(SyncedExtensionChange {
+ ext_id: row.get("ext_id")?,
+ changes: row.get("changes")?,
+ })
+ })
+}
+
+// Helpers for tests
+#[cfg(test)]
+pub mod test {
+ use crate::db::{test::new_mem_db, StorageDb};
+ use crate::schema::create_empty_sync_temp_tables;
+
+ pub fn new_syncable_mem_db() -> StorageDb {
+ let _ = env_logger::try_init();
+ let db = new_mem_db();
+ create_empty_sync_temp_tables(&db).expect("should work");
+ db
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::test::new_syncable_mem_db;
+ use super::*;
+ use serde_json::json;
+
+ #[test]
+ fn test_serde_record_ser() {
+ assert_eq!(
+ serde_json::to_string(&WebextRecord {
+ guid: "guid".into(),
+ ext_id: "ext_id".to_string(),
+ data: "data".to_string()
+ })
+ .unwrap(),
+ r#"{"id":"guid","extId":"ext_id","data":"data"}"#
+ );
+ }
+
+ // a macro for these tests - constructs a serde_json::Value::Object
+ macro_rules! map {
+ ($($map:tt)+) => {
+ json!($($map)+).as_object().unwrap().clone()
+ };
+ }
+
+ macro_rules! change {
+ ($key:literal, None, None) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: None,
+ new_value: None,
+ };
+ };
+ ($key:literal, $old:tt, None) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: Some(json!($old)),
+ new_value: None,
+ }
+ };
+ ($key:literal, None, $new:tt) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: None,
+ new_value: Some(json!($new)),
+ }
+ };
+ ($key:literal, $old:tt, $new:tt) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: Some(json!($old)),
+ new_value: Some(json!($new)),
+ }
+ };
+ }
+ macro_rules! changes {
+ ( ) => {
+ StorageChanges::new()
+ };
+ ( $( $change:expr ),* ) => {
+ {
+ let mut changes = StorageChanges::new();
+ $(
+ changes.push($change);
+ )*
+ changes
+ }
+ };
+ }
+
+ #[test]
+ fn test_3way_merging() {
+ // No conflict - identical local and remote.
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({"one": "one", "two": "two"}),
+ map!({"two": "two", "one": "one"}),
+ Some(map!({"parent_only": "parent"})),
+ ),
+ IncomingAction::Same {
+ ext_id: "ext-id".to_string()
+ }
+ );
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({"other_only": "other", "common": "common"}),
+ map!({"ours_only": "ours", "common": "common"}),
+ Some(map!({"parent_only": "parent", "common": "old_common"})),
+ ),
+ IncomingAction::Merge {
+ ext_id: "ext-id".to_string(),
+ data: map!({"other_only": "other", "ours_only": "ours", "common": "common"}),
+ changes: changes![change!("other_only", None, "other")],
+ }
+ );
+ // Simple conflict - parent value is neither local nor incoming. incoming wins.
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({"other_only": "other", "common": "incoming"}),
+ map!({"ours_only": "ours", "common": "local"}),
+ Some(map!({"parent_only": "parent", "common": "parent"})),
+ ),
+ IncomingAction::Merge {
+ ext_id: "ext-id".to_string(),
+ data: map!({"other_only": "other", "ours_only": "ours", "common": "incoming"}),
+ changes: changes![
+ change!("common", "local", "incoming"),
+ change!("other_only", None, "other")
+ ],
+ }
+ );
+ // Local change, no conflict.
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({"other_only": "other", "common": "old_value"}),
+ map!({"ours_only": "ours", "common": "new_value"}),
+ Some(map!({"parent_only": "parent", "common": "old_value"})),
+ ),
+ IncomingAction::Merge {
+ ext_id: "ext-id".to_string(),
+ data: map!({"other_only": "other", "ours_only": "ours", "common": "new_value"}),
+ changes: changes![change!("other_only", None, "other")],
+ }
+ );
+ // Field was removed remotely.
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({"other_only": "other"}),
+ map!({"common": "old_value"}),
+ Some(map!({"common": "old_value"})),
+ ),
+ IncomingAction::TakeRemote {
+ ext_id: "ext-id".to_string(),
+ data: map!({"other_only": "other"}),
+ changes: changes![
+ change!("common", "old_value", None),
+ change!("other_only", None, "other")
+ ],
+ }
+ );
+ // Field was removed remotely but we added another one.
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({"other_only": "other"}),
+ map!({"common": "old_value", "new_key": "new_value"}),
+ Some(map!({"common": "old_value"})),
+ ),
+ IncomingAction::Merge {
+ ext_id: "ext-id".to_string(),
+ data: map!({"other_only": "other", "new_key": "new_value"}),
+ changes: changes![
+ change!("common", "old_value", None),
+ change!("other_only", None, "other")
+ ],
+ }
+ );
+ // Field was removed both remotely and locally.
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({}),
+ map!({"new_key": "new_value"}),
+ Some(map!({"common": "old_value"})),
+ ),
+ IncomingAction::Merge {
+ ext_id: "ext-id".to_string(),
+ data: map!({"new_key": "new_value"}),
+ changes: changes![],
+ }
+ );
+ }
+
+ #[test]
+ fn test_remove_matching_keys() {
+ assert_eq!(
+ remove_matching_keys(
+ map!({"key1": "value1", "key2": "value2"}),
+ &map!({"key1": "ignored", "key3": "ignored"})
+ ),
+ (
+ map!({"key2": "value2"}),
+ changes![change!("key1", "value1", None)]
+ )
+ );
+ }
+
+ #[test]
+ fn test_get_synced_changes() -> Result<()> {
+ let db = new_syncable_mem_db();
+ db.execute_batch(&format!(
+ r#"INSERT INTO temp.storage_sync_applied (ext_id, changes)
+ VALUES
+ ('an-extension', '{change1}'),
+ ('ext"id', '{change2}')
+ "#,
+ change1 = serde_json::to_string(&changes![change!("key1", "old-val", None)])?,
+ change2 = serde_json::to_string(&changes![change!("key-for-second", None, "new-val")])?
+ ))?;
+ let changes = get_synced_changes(&db)?;
+ assert_eq!(changes[0].ext_id, "an-extension");
+ // sanity check it's valid!
+ let c1: JsonMap =
+ serde_json::from_str(&changes[0].changes).expect("changes must be an object");
+ assert_eq!(
+ c1.get("key1")
+ .expect("must exist")
+ .as_object()
+ .expect("must be an object")
+ .get("oldValue"),
+ Some(&json!("old-val"))
+ );
+
+ // phew - do it again to check the string got escaped.
+ assert_eq!(
+ changes[1],
+ SyncedExtensionChange {
+ ext_id: "ext\"id".into(),
+ changes: r#"{"key-for-second":{"newValue":"new-val"}}"#.into(),
+ }
+ );
+ assert_eq!(changes[1].ext_id, "ext\"id");
+ let c2: JsonMap =
+ serde_json::from_str(&changes[1].changes).expect("changes must be an object");
+ assert_eq!(
+ c2.get("key-for-second")
+ .expect("must exist")
+ .as_object()
+ .expect("must be an object")
+ .get("newValue"),
+ Some(&json!("new-val"))
+ );
+ Ok(())
+ }
+}
diff --git a/third_party/rust/webext-storage/src/sync/outgoing.rs b/third_party/rust/webext-storage/src/sync/outgoing.rs
new file mode 100644
index 0000000000..c6c9dd64e5
--- /dev/null
+++ b/third_party/rust/webext-storage/src/sync/outgoing.rs
@@ -0,0 +1,186 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// The "outgoing" part of syncing - building the payloads to upload and
+// managing the sync state of the local DB.
+
+use interrupt_support::Interruptee;
+use rusqlite::{Connection, Row, Transaction};
+use sql_support::ConnExt;
+use sync15::bso::OutgoingBso;
+use sync_guid::Guid as SyncGuid;
+
+use crate::error::*;
+
+use super::WebextRecord;
+
+fn outgoing_from_row(row: &Row<'_>) -> Result<OutgoingBso> {
+ let guid: SyncGuid = row.get("guid")?;
+ let ext_id: String = row.get("ext_id")?;
+ let raw_data: Option<String> = row.get("data")?;
+ Ok(match raw_data {
+ Some(raw_data) => {
+ let record = WebextRecord {
+ guid,
+ ext_id,
+ data: raw_data,
+ };
+ OutgoingBso::from_content_with_id(record)?
+ }
+ None => OutgoingBso::new_tombstone(guid.into()),
+ })
+}
+
+/// Stages info about what should be uploaded in a temp table. This should be
+/// called in the same transaction as `apply_actions`. record_uploaded() can be
+/// called after the upload is complete and the data in the temp table will be
+/// used to update the local store.
+pub fn stage_outgoing(tx: &Transaction<'_>) -> Result<()> {
+ let sql = "
+ -- Stage outgoing items. The item may not yet have a GUID (ie, it might
+ -- not already be in either the mirror nor the incoming staging table),
+ -- so we generate one if it doesn't exist.
+ INSERT INTO storage_sync_outgoing_staging
+ (guid, ext_id, data, sync_change_counter)
+ SELECT coalesce(m.guid, s.guid, generate_guid()),
+ l.ext_id, l.data, l.sync_change_counter
+ FROM storage_sync_data l
+ -- left joins as one or both may not exist.
+ LEFT JOIN storage_sync_mirror m ON m.ext_id = l.ext_id
+ LEFT JOIN storage_sync_staging s ON s.ext_id = l.ext_id
+ WHERE sync_change_counter > 0;
+
+ -- At this point, we've merged in all new records, so copy incoming
+ -- staging into the mirror so that it matches what's on the server.
+ INSERT OR REPLACE INTO storage_sync_mirror (guid, ext_id, data)
+ SELECT guid, ext_id, data FROM temp.storage_sync_staging;
+
+ -- And copy any incoming records that we aren't reuploading into the
+ -- local table. We'll copy the outgoing ones into the mirror and local
+ -- after we upload them.
+ INSERT OR REPLACE INTO storage_sync_data (ext_id, data, sync_change_counter)
+ SELECT ext_id, data, 0
+ FROM storage_sync_staging s
+ WHERE ext_id IS NOT NULL
+ AND NOT EXISTS(SELECT 1 FROM storage_sync_outgoing_staging o
+ WHERE o.guid = s.guid);";
+ tx.execute_batch(sql)?;
+ Ok(())
+}
+
+/// Returns a vec of the outgoing records which should be uploaded.
+pub fn get_outgoing(conn: &Connection, signal: &dyn Interruptee) -> Result<Vec<OutgoingBso>> {
+ let sql = "SELECT guid, ext_id, data FROM storage_sync_outgoing_staging";
+ let elts = conn
+ .conn()
+ .query_rows_and_then(sql, [], |row| -> Result<_> {
+ signal.err_if_interrupted()?;
+ outgoing_from_row(row)
+ })?;
+
+ log::debug!("get_outgoing found {} items", elts.len());
+ Ok(elts.into_iter().collect())
+}
+
+/// Record the fact that items were uploaded. This updates the state of the
+/// local DB to reflect the state of the server we just updated.
+/// Note that this call is almost certainly going to be made in a *different*
+/// transaction than the transaction used in `stage_outgoing()`, and it will
+/// be called once per batch upload.
+pub fn record_uploaded(
+ tx: &Transaction<'_>,
+ items: &[SyncGuid],
+ signal: &dyn Interruptee,
+) -> Result<()> {
+ log::debug!(
+ "record_uploaded recording that {} items were uploaded",
+ items.len()
+ );
+
+ // Updating the `was_uploaded` column fires the `record_uploaded` trigger,
+ // which updates the local change counter and writes the uploaded record
+ // data back to the mirror.
+ sql_support::each_chunk(items, |chunk, _| -> Result<()> {
+ signal.err_if_interrupted()?;
+ let sql = format!(
+ "UPDATE storage_sync_outgoing_staging SET
+ was_uploaded = 1
+ WHERE guid IN ({})",
+ sql_support::repeat_sql_vars(chunk.len()),
+ );
+ tx.execute(&sql, rusqlite::params_from_iter(chunk))?;
+ Ok(())
+ })?;
+
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::super::test::new_syncable_mem_db;
+ use super::*;
+ use interrupt_support::NeverInterrupts;
+
+ #[test]
+ fn test_simple() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+
+ tx.execute_batch(
+ r#"
+ INSERT INTO storage_sync_data (ext_id, data, sync_change_counter)
+ VALUES
+ ('ext_no_changes', '{"foo":"bar"}', 0),
+ ('ext_with_changes', '{"foo":"bar"}', 1);
+ "#,
+ )?;
+
+ stage_outgoing(&tx)?;
+ let changes = get_outgoing(&tx, &NeverInterrupts)?;
+ assert_eq!(changes.len(), 1);
+ let record: serde_json::Value = serde_json::from_str(&changes[0].payload).unwrap();
+ let ext_id = record.get("extId").unwrap().as_str().unwrap();
+
+ assert_eq!(ext_id, "ext_with_changes");
+
+ record_uploaded(
+ &tx,
+ changes
+ .into_iter()
+ .map(|p| p.envelope.id)
+ .collect::<Vec<SyncGuid>>()
+ .as_slice(),
+ &NeverInterrupts,
+ )?;
+
+ let counter: i32 = tx.conn().query_one(
+ "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_with_changes'",
+ )?;
+ assert_eq!(counter, 0);
+ Ok(())
+ }
+
+ #[test]
+ fn test_payload_serialization() {
+ let record = WebextRecord {
+ guid: SyncGuid::new("guid"),
+ ext_id: "ext-id".to_string(),
+ data: "{}".to_string(),
+ };
+
+ let outgoing = OutgoingBso::from_content_with_id(record).unwrap();
+
+ // The envelope should have our ID.
+ assert_eq!(outgoing.envelope.id, "guid");
+
+ let outgoing_payload =
+ serde_json::from_str::<serde_json::Value>(&outgoing.payload).unwrap();
+ let outgoing_map = outgoing_payload.as_object().unwrap();
+
+ assert!(outgoing_map.contains_key("id"));
+ assert!(outgoing_map.contains_key("data"));
+ assert!(outgoing_map.contains_key("extId"));
+ assert_eq!(outgoing_map.len(), 3);
+ }
+}
diff --git a/third_party/rust/webext-storage/src/sync/sync_tests.rs b/third_party/rust/webext-storage/src/sync/sync_tests.rs
new file mode 100644
index 0000000000..6940be2e59
--- /dev/null
+++ b/third_party/rust/webext-storage/src/sync/sync_tests.rs
@@ -0,0 +1,529 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// This file tries to simulate "full syncs" - ie, from local state changing, to
+// fetching incoming items, generating items to upload, then updating the local
+// state (including the mirror) as a result.
+
+use crate::api::{clear, get, set};
+use crate::error::*;
+use crate::schema::create_empty_sync_temp_tables;
+use crate::sync::incoming::{apply_actions, get_incoming, plan_incoming, stage_incoming};
+use crate::sync::outgoing::{get_outgoing, record_uploaded, stage_outgoing};
+use crate::sync::test::new_syncable_mem_db;
+use crate::sync::WebextRecord;
+use interrupt_support::NeverInterrupts;
+use rusqlite::{Connection, Row, Transaction};
+use serde_json::json;
+use sql_support::ConnExt;
+use sync15::bso::{IncomingBso, IncomingContent, OutgoingBso};
+use sync_guid::Guid;
+
+// Here we try and simulate everything done by a "full sync", just minus the
+// engine. Returns the records we uploaded.
+fn do_sync(
+ tx: &Transaction<'_>,
+ incoming_payloads: &[IncomingContent<WebextRecord>],
+) -> Result<Vec<OutgoingBso>> {
+ log::info!("test do_sync() starting");
+ // First we stage the incoming in the temp tables.
+ stage_incoming(tx, incoming_payloads, &NeverInterrupts)?;
+ // Then we process them getting a Vec of (item, state), which we turn into
+ // a Vec of (item, action)
+ let actions = get_incoming(tx)?
+ .into_iter()
+ .map(|(item, state)| (item, plan_incoming(state)))
+ .collect();
+ log::debug!("do_sync applying {:?}", actions);
+ apply_actions(tx, actions, &NeverInterrupts)?;
+ // So we've done incoming - do outgoing.
+ stage_outgoing(tx)?;
+ let outgoing = get_outgoing(tx, &NeverInterrupts)?;
+ log::debug!("do_sync has outgoing {:?}", outgoing);
+ record_uploaded(
+ tx,
+ outgoing
+ .iter()
+ .map(|p| p.envelope.id.clone())
+ .collect::<Vec<Guid>>()
+ .as_slice(),
+ &NeverInterrupts,
+ )?;
+ create_empty_sync_temp_tables(tx)?;
+ log::info!("test do_sync() complete");
+ Ok(outgoing)
+}
+
+// Check *both* the mirror and local API have ended up with the specified data.
+fn check_finished_with(conn: &Connection, ext_id: &str, val: serde_json::Value) -> Result<()> {
+ let local = get(conn, ext_id, serde_json::Value::Null)?;
+ assert_eq!(local, val);
+ let guid = get_mirror_guid(conn, ext_id)?;
+ let mirror = get_mirror_data(conn, &guid);
+ assert_eq!(mirror, DbData::Data(val.to_string()));
+ // and there should be zero items with a change counter.
+ let count = conn.query_row_and_then(
+ "SELECT COUNT(*) FROM storage_sync_data WHERE sync_change_counter != 0;",
+ [],
+ |row| row.get::<_, u32>(0),
+ )?;
+ assert_eq!(count, 0);
+ Ok(())
+}
+
+fn get_mirror_guid(conn: &Connection, extid: &str) -> Result<Guid> {
+ let guid = conn.query_row_and_then(
+ "SELECT m.guid FROM storage_sync_mirror m WHERE m.ext_id = ?;",
+ [extid],
+ |row| row.get::<_, Guid>(0),
+ )?;
+ Ok(guid)
+}
+
+#[derive(Debug, PartialEq)]
+enum DbData {
+ NoRow,
+ NullRow,
+ Data(String),
+}
+
+impl DbData {
+ fn has_data(&self) -> bool {
+ matches!(self, DbData::Data(_))
+ }
+}
+
+fn _get(conn: &Connection, id_name: &str, expected_extid: &str, table: &str) -> DbData {
+ let sql = format!("SELECT {} as id, data FROM {}", id_name, table);
+
+ fn from_row(row: &Row<'_>) -> Result<(String, Option<String>)> {
+ Ok((row.get("id")?, row.get("data")?))
+ }
+ let mut items = conn
+ .conn()
+ .query_rows_and_then(&sql, [], from_row)
+ .expect("should work");
+ if items.is_empty() {
+ DbData::NoRow
+ } else {
+ let item = items.pop().expect("it exists");
+ assert_eq!(Guid::new(&item.0), expected_extid);
+ match item.1 {
+ None => DbData::NullRow,
+ Some(v) => DbData::Data(v),
+ }
+ }
+}
+
+fn get_mirror_data(conn: &Connection, expected_extid: &str) -> DbData {
+ _get(conn, "guid", expected_extid, "storage_sync_mirror")
+}
+
+fn get_local_data(conn: &Connection, expected_extid: &str) -> DbData {
+ _get(conn, "ext_id", expected_extid, "storage_sync_data")
+}
+
+fn make_incoming(
+ guid: &Guid,
+ ext_id: &str,
+ data: &serde_json::Value,
+) -> IncomingContent<WebextRecord> {
+ let content = json!({"id": guid, "extId": ext_id, "data": data.to_string()});
+ IncomingBso::from_test_content(content).into_content()
+}
+
+fn make_incoming_tombstone(guid: &Guid) -> IncomingContent<WebextRecord> {
+ IncomingBso::new_test_tombstone(guid.clone()).into_content()
+}
+
+#[test]
+fn test_simple_outgoing_sync() -> Result<()> {
+ // So we are starting with an empty local store and empty server store.
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data.clone())?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ check_finished_with(&tx, "ext-id", data)?;
+ Ok(())
+}
+
+#[test]
+fn test_simple_incoming_sync() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ let bridge_record = make_incoming(&Guid::new("guid"), "ext-id", &data);
+ assert_eq!(do_sync(&tx, &[bridge_record])?.len(), 0);
+ let key1_from_api = get(&tx, "ext-id", json!("key1"))?;
+ assert_eq!(key1_from_api, json!({"key1": "key1-value"}));
+ check_finished_with(&tx, "ext-id", data)?;
+ Ok(())
+}
+
+#[test]
+fn test_outgoing_tombstone() -> Result<()> {
+ // Tombstones are only kept when the mirror has that record - so first
+ // test that, then arrange for the mirror to have the record.
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data.clone())?;
+ assert_eq!(
+ get_local_data(&tx, "ext-id"),
+ DbData::Data(data.to_string())
+ );
+ // hasn't synced yet, so clearing shouldn't write a tombstone.
+ clear(&tx, "ext-id")?;
+ assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow);
+ // now set data again and sync and *then* remove.
+ set(&tx, "ext-id", data)?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ assert!(get_local_data(&tx, "ext-id").has_data());
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ assert!(get_mirror_data(&tx, &guid).has_data());
+ clear(&tx, "ext-id")?;
+ assert_eq!(get_local_data(&tx, "ext-id"), DbData::NullRow);
+ // then after syncing, the tombstone will be in the mirror but the local row
+ // has been removed.
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow);
+ assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow);
+ Ok(())
+}
+
+#[test]
+fn test_incoming_tombstone_exists() -> Result<()> {
+ // An incoming tombstone for a record we've previously synced (and thus
+ // have data for)
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data.clone())?;
+ assert_eq!(
+ get_local_data(&tx, "ext-id"),
+ DbData::Data(data.to_string())
+ );
+ // sync to get data in our mirror.
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ assert!(get_local_data(&tx, "ext-id").has_data());
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ assert!(get_mirror_data(&tx, &guid).has_data());
+ // Now an incoming tombstone for it.
+ let tombstone = make_incoming_tombstone(&guid);
+ assert_eq!(
+ do_sync(&tx, &[tombstone])?.len(),
+ 0,
+ "expect no outgoing records"
+ );
+ assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow);
+ assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow);
+ Ok(())
+}
+
+#[test]
+fn test_incoming_tombstone_not_exists() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ // An incoming tombstone for something that's not anywhere locally.
+ let guid = Guid::new("guid");
+ let tombstone = make_incoming_tombstone(&guid);
+ assert_eq!(
+ do_sync(&tx, &[tombstone])?.len(),
+ 0,
+ "expect no outgoing records"
+ );
+ // But we still keep the tombstone in the mirror.
+ assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow);
+ assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow);
+ Ok(())
+}
+
+#[test]
+fn test_reconciled() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value"});
+ set(&tx, "ext-id", data)?;
+ // Incoming payload with the same data
+ let record = make_incoming(&Guid::new("guid"), "ext-id", &json!({"key1": "key1-value"}));
+ // Should be no outgoing records as we reconciled.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ check_finished_with(&tx, "ext-id", json!({"key1": "key1-value"}))?;
+ Ok(())
+}
+
+/// Tests that we handle things correctly if we get a payload that is
+/// identical to what is in the mirrored table.
+#[test]
+fn test_reconcile_with_null_payload() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value"});
+ set(&tx, "ext-id", data.clone())?;
+ // We try to push this change on the next sync.
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ assert_eq!(get_mirror_data(&tx, &guid), DbData::Data(data.to_string()));
+ // Incoming payload with the same data.
+ // This could happen if, for example, another client changed the
+ // key and then put it back the way it was.
+ let record = make_incoming(&guid, "ext-id", &data);
+ // Should be no outgoing records as we reconciled.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ check_finished_with(&tx, "ext-id", data)?;
+ Ok(())
+}
+
+#[test]
+fn test_accept_incoming_when_local_is_deleted() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ // We only record an extension as deleted locally if it has been
+ // uploaded before being deleted.
+ let data = json!({"key1": "key1-value"});
+ set(&tx, "ext-id", data)?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ clear(&tx, "ext-id")?;
+ // Incoming payload without 'key1'. Because we previously uploaded
+ // key1, this means another client deleted it.
+ let record = make_incoming(&guid, "ext-id", &json!({"key2": "key2-value"}));
+
+ // We completely accept the incoming record.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?;
+ Ok(())
+}
+
+#[test]
+fn test_accept_incoming_when_local_is_deleted_no_mirror() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value"});
+ set(&tx, "ext-id", data)?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ clear(&tx, "ext-id")?;
+
+ // Use a random guid so that we don't find the mirrored data.
+ // This test is somewhat bad because deduping might obviate
+ // the need for it.
+ let record = make_incoming(&Guid::new("guid"), "ext-id", &json!({"key2": "key2-value"}));
+
+ // We completely accept the incoming record.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?;
+ Ok(())
+}
+
+#[test]
+fn test_accept_deleted_key_mirrored() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data)?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ // Incoming payload without 'key1'. Because we previously uploaded
+ // key1, this means another client deleted it.
+ let record = make_incoming(&guid, "ext-id", &json!({"key2": "key2-value"}));
+ // We completely accept the incoming record.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?;
+ Ok(())
+}
+
+#[test]
+fn test_merged_no_mirror() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value"});
+ set(&tx, "ext-id", data)?;
+ // Incoming payload without 'key1' and some data for 'key2'.
+ // Because we never uploaded 'key1', we merge our local values
+ // with the remote.
+ let record = make_incoming(&Guid::new("guid"), "ext-id", &json!({"key2": "key2-value"}));
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(
+ &tx,
+ "ext-id",
+ json!({"key1": "key1-value", "key2": "key2-value"}),
+ )?;
+ Ok(())
+}
+
+#[test]
+fn test_merged_incoming() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let old_data = json!({"key1": "key1-value", "key2": "key2-value", "doomed_key": "deletable"});
+ set(&tx, "ext-id", old_data)?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ // We update 'key1' locally.
+ let local_data = json!({"key1": "key1-new", "key2": "key2-value", "doomed_key": "deletable"});
+ set(&tx, "ext-id", local_data)?;
+ // Incoming payload where another client set 'key2' and removed
+ // the 'doomed_key'.
+ // Because we never uploaded our data, we'll merge our
+ // key1 in, but otherwise keep the server's changes.
+ let record = make_incoming(
+ &guid,
+ "ext-id",
+ &json!({"key1": "key1-value", "key2": "key2-incoming"}),
+ );
+ // We should send our 'key1'
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(
+ &tx,
+ "ext-id",
+ json!({"key1": "key1-new", "key2": "key2-incoming"}),
+ )?;
+ Ok(())
+}
+
+#[test]
+fn test_merged_with_null_payload() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let old_data = json!({"key1": "key1-value"});
+ set(&tx, "ext-id", old_data.clone())?;
+ // Push this change remotely.
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ assert_eq!(
+ get_mirror_data(&tx, &guid),
+ DbData::Data(old_data.to_string())
+ );
+ let local_data = json!({"key1": "key1-new", "key2": "key2-value"});
+ set(&tx, "ext-id", local_data.clone())?;
+ // Incoming payload with the same old data.
+ let record = make_incoming(&guid, "ext-id", &old_data);
+ // Three-way-merge will not detect any change in key1, so we
+ // should keep our entire new value.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(&tx, "ext-id", local_data)?;
+ Ok(())
+}
+
+#[test]
+fn test_deleted_mirrored_object_accept() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data)?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ // Incoming payload with data deleted.
+ // We synchronize this deletion by deleting the keys we think
+ // were on the server.
+ let record = make_incoming_tombstone(&guid);
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow);
+ assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow);
+ Ok(())
+}
+
+#[test]
+fn test_deleted_mirrored_object_merged() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ set(&tx, "ext-id", json!({"key1": "key1-value"}))?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ set(
+ &tx,
+ "ext-id",
+ json!({"key1": "key1-new", "key2": "key2-value"}),
+ )?;
+ // Incoming payload with data deleted.
+ // We synchronize this deletion by deleting the keys we think
+ // were on the server.
+ let record = make_incoming_tombstone(&guid);
+ // This overrides the change to 'key1', but we still upload 'key2'.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?;
+ Ok(())
+}
+
+/// Like the above test, but with a mirrored tombstone.
+#[test]
+fn test_deleted_mirrored_tombstone_merged() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ // Sync some data so we can get the guid for this extension.
+ set(&tx, "ext-id", json!({"key1": "key1-value"}))?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ // Sync a delete for this data so we have a tombstone in the mirror.
+ let record = make_incoming_tombstone(&guid);
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow);
+
+ // Set some data and sync it simultaneously with another incoming delete.
+ set(&tx, "ext-id", json!({"key2": "key2-value"}))?;
+ let record = make_incoming_tombstone(&guid);
+ // We cannot delete any matching keys because there are no
+ // matching keys. Instead we push our data.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?;
+ Ok(())
+}
+
+#[test]
+fn test_deleted_not_mirrored_object_merged() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data)?;
+ // Incoming payload with data deleted.
+ let record = make_incoming_tombstone(&Guid::new("guid"));
+ // We normally delete the keys we think were on the server, but
+ // here we have no information about what was on the server, so we
+ // don't delete anything. We merge in all undeleted keys.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(
+ &tx,
+ "ext-id",
+ json!({"key1": "key1-value", "key2": "key2-value"}),
+ )?;
+ Ok(())
+}
+
+#[test]
+fn test_conflicting_incoming() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data)?;
+ // Incoming payload without 'key1' and conflicting for 'key2'.
+ // Because we never uploaded either of our keys, we'll merge our
+ // key1 in, but the server key2 wins.
+ let record = make_incoming(
+ &Guid::new("guid"),
+ "ext-id",
+ &json!({"key2": "key2-incoming"}),
+ );
+ // We should send our 'key1'
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(
+ &tx,
+ "ext-id",
+ json!({"key1": "key1-value", "key2": "key2-incoming"}),
+ )?;
+ Ok(())
+}
+
+#[test]
+fn test_invalid_incoming() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let json = json!({"id": "id", "payload": json!("").to_string()});
+ let bso = serde_json::from_value::<IncomingBso>(json).unwrap();
+ let record = bso.into_content();
+
+ // Should do nothing.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ Ok(())
+}