summaryrefslogtreecommitdiffstats
path: root/third_party/rust/webext-storage/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/webext-storage/src/sync')
-rw-r--r--third_party/rust/webext-storage/src/sync/bridge.rs388
-rw-r--r--third_party/rust/webext-storage/src/sync/incoming.rs863
-rw-r--r--third_party/rust/webext-storage/src/sync/mod.rs429
-rw-r--r--third_party/rust/webext-storage/src/sync/outgoing.rs186
-rw-r--r--third_party/rust/webext-storage/src/sync/sync_tests.rs529
5 files changed, 2395 insertions, 0 deletions
diff --git a/third_party/rust/webext-storage/src/sync/bridge.rs b/third_party/rust/webext-storage/src/sync/bridge.rs
new file mode 100644
index 0000000000..760a0d0e10
--- /dev/null
+++ b/third_party/rust/webext-storage/src/sync/bridge.rs
@@ -0,0 +1,388 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use anyhow::Result;
+use rusqlite::Transaction;
+use std::sync::{Arc, Weak};
+use sync15::bso::IncomingBso;
+use sync15::engine::ApplyResults;
+use sync_guid::Guid as SyncGuid;
+
+use crate::db::{delete_meta, get_meta, put_meta, ThreadSafeStorageDb};
+use crate::schema;
+use crate::sync::incoming::{apply_actions, get_incoming, plan_incoming, stage_incoming};
+use crate::sync::outgoing::{get_outgoing, record_uploaded, stage_outgoing};
+
+const LAST_SYNC_META_KEY: &str = "last_sync_time";
+const SYNC_ID_META_KEY: &str = "sync_id";
+
+/// A bridged engine implements all the methods needed to make the
+/// `storage.sync` store work with Desktop's Sync implementation.
+/// Conceptually, it's similar to `sync15::Store`, which we
+/// should eventually rename and unify with this trait (#2841).
+///
+/// Unlike most of our other implementation which hold a strong reference
+/// to the store, this engine keeps a weak reference in an attempt to keep
+/// the desktop semantics as close as possible to what they were when the
+/// engines all took lifetime params to ensure they don't outlive the store.
+pub struct BridgedEngine {
+ db: Weak<ThreadSafeStorageDb>,
+}
+
+impl BridgedEngine {
+ /// Creates a bridged engine for syncing.
+ pub fn new(db: &Arc<ThreadSafeStorageDb>) -> Self {
+ BridgedEngine {
+ db: Arc::downgrade(db),
+ }
+ }
+
+ fn do_reset(&self, tx: &Transaction<'_>) -> Result<()> {
+ tx.execute_batch(
+ "DELETE FROM storage_sync_mirror;
+ UPDATE storage_sync_data SET sync_change_counter = 1;",
+ )?;
+ delete_meta(tx, LAST_SYNC_META_KEY)?;
+ Ok(())
+ }
+
+ fn thread_safe_storage_db(&self) -> Result<Arc<ThreadSafeStorageDb>> {
+ self.db
+ .upgrade()
+ .ok_or_else(|| crate::error::ErrorKind::DatabaseConnectionClosed.into())
+ }
+}
+
+impl sync15::engine::BridgedEngine for BridgedEngine {
+ fn last_sync(&self) -> Result<i64> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ Ok(get_meta(&db, LAST_SYNC_META_KEY)?.unwrap_or(0))
+ }
+
+ fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ put_meta(&db, LAST_SYNC_META_KEY, &last_sync_millis)?;
+ Ok(())
+ }
+
+ fn sync_id(&self) -> Result<Option<String>> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ Ok(get_meta(&db, SYNC_ID_META_KEY)?)
+ }
+
+ fn reset_sync_id(&self) -> Result<String> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let tx = db.unchecked_transaction()?;
+ let new_id = SyncGuid::random().to_string();
+ self.do_reset(&tx)?;
+ put_meta(&tx, SYNC_ID_META_KEY, &new_id)?;
+ tx.commit()?;
+ Ok(new_id)
+ }
+
+ fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let current: Option<String> = get_meta(&db, SYNC_ID_META_KEY)?;
+ Ok(match current {
+ Some(current) if current == sync_id => current,
+ _ => {
+ let tx = db.unchecked_transaction()?;
+ self.do_reset(&tx)?;
+ let result = sync_id.to_string();
+ put_meta(&tx, SYNC_ID_META_KEY, &result)?;
+ tx.commit()?;
+ result
+ }
+ })
+ }
+
+ fn sync_started(&self) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ schema::create_empty_sync_temp_tables(&db)?;
+ Ok(())
+ }
+
+ fn store_incoming(&self, incoming_bsos: Vec<IncomingBso>) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let signal = db.begin_interrupt_scope()?;
+ let tx = db.unchecked_transaction()?;
+ let incoming_content: Vec<_> = incoming_bsos
+ .into_iter()
+ .map(IncomingBso::into_content::<super::WebextRecord>)
+ .collect();
+ stage_incoming(&tx, &incoming_content, &signal)?;
+ tx.commit()?;
+ Ok(())
+ }
+
+ fn apply(&self) -> Result<ApplyResults> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let signal = db.begin_interrupt_scope()?;
+
+ let tx = db.unchecked_transaction()?;
+ let incoming = get_incoming(&tx)?;
+ let actions = incoming
+ .into_iter()
+ .map(|(item, state)| (item, plan_incoming(state)))
+ .collect();
+ apply_actions(&tx, actions, &signal)?;
+ stage_outgoing(&tx)?;
+ tx.commit()?;
+
+ Ok(get_outgoing(&db, &signal)?.into())
+ }
+
+ fn set_uploaded(&self, _server_modified_millis: i64, ids: &[SyncGuid]) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let signal = db.begin_interrupt_scope()?;
+ let tx = db.unchecked_transaction()?;
+ record_uploaded(&tx, ids, &signal)?;
+ tx.commit()?;
+
+ Ok(())
+ }
+
+ fn sync_finished(&self) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ schema::create_empty_sync_temp_tables(&db)?;
+ Ok(())
+ }
+
+ fn reset(&self) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let tx = db.unchecked_transaction()?;
+ self.do_reset(&tx)?;
+ delete_meta(&tx, SYNC_ID_META_KEY)?;
+ tx.commit()?;
+ Ok(())
+ }
+
+ fn wipe(&self) -> Result<()> {
+ let shared_db = self.thread_safe_storage_db()?;
+ let db = shared_db.lock();
+ let tx = db.unchecked_transaction()?;
+ // We assume the meta table is only used by sync.
+ tx.execute_batch(
+ "DELETE FROM storage_sync_data; DELETE FROM storage_sync_mirror; DELETE FROM meta;",
+ )?;
+ tx.commit()?;
+ Ok(())
+ }
+}
+
+impl From<anyhow::Error> for crate::error::Error {
+ fn from(value: anyhow::Error) -> Self {
+ crate::error::ErrorKind::SyncError(value.to_string()).into()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::db::test::new_mem_thread_safe_storage_db;
+ use crate::db::StorageDb;
+ use sync15::engine::BridgedEngine;
+
+ fn query_count(conn: &StorageDb, table: &str) -> u32 {
+ conn.query_row_and_then(&format!("SELECT COUNT(*) FROM {};", table), [], |row| {
+ row.get::<_, u32>(0)
+ })
+ .expect("should work")
+ }
+
+ // Sets up mock data for the tests here.
+ fn setup_mock_data(engine: &super::BridgedEngine) -> Result<()> {
+ {
+ let shared = engine.thread_safe_storage_db()?;
+ let db = shared.lock();
+ db.execute(
+ "INSERT INTO storage_sync_data (ext_id, data, sync_change_counter)
+ VALUES ('ext-a', 'invalid-json', 2)",
+ [],
+ )?;
+ db.execute(
+ "INSERT INTO storage_sync_mirror (guid, ext_id, data)
+ VALUES ('guid', 'ext-a', '3')",
+ [],
+ )?;
+ }
+ engine.set_last_sync(1)?;
+
+ let shared = engine.thread_safe_storage_db()?;
+ let db = shared.lock();
+ // and assert we wrote what we think we did.
+ assert_eq!(query_count(&db, "storage_sync_data"), 1);
+ assert_eq!(query_count(&db, "storage_sync_mirror"), 1);
+ assert_eq!(query_count(&db, "meta"), 1);
+ Ok(())
+ }
+
+ // Assuming a DB setup with setup_mock_data, assert it was correctly reset.
+ fn assert_reset(engine: &super::BridgedEngine) -> Result<()> {
+ // A reset never wipes data...
+ let shared = engine.thread_safe_storage_db()?;
+ let db = shared.lock();
+ assert_eq!(query_count(&db, "storage_sync_data"), 1);
+
+ // But did reset the change counter.
+ let cc = db.query_row_and_then(
+ "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';",
+ [],
+ |row| row.get::<_, u32>(0),
+ )?;
+ assert_eq!(cc, 1);
+ // But did wipe the mirror...
+ assert_eq!(query_count(&db, "storage_sync_mirror"), 0);
+ // And the last_sync should have been wiped.
+ assert!(get_meta::<i64>(&db, LAST_SYNC_META_KEY)?.is_none());
+ Ok(())
+ }
+
+ // Assuming a DB setup with setup_mock_data, assert it has not been reset.
+ fn assert_not_reset(engine: &super::BridgedEngine) -> Result<()> {
+ let shared = engine.thread_safe_storage_db()?;
+ let db = shared.lock();
+ assert_eq!(query_count(&db, "storage_sync_data"), 1);
+ let cc = db.query_row_and_then(
+ "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';",
+ [],
+ |row| row.get::<_, u32>(0),
+ )?;
+ assert_eq!(cc, 2);
+ assert_eq!(query_count(&db, "storage_sync_mirror"), 1);
+ // And the last_sync should remain.
+ assert!(get_meta::<i64>(&db, LAST_SYNC_META_KEY)?.is_some());
+ Ok(())
+ }
+
+ #[test]
+ fn test_wipe() -> Result<()> {
+ let strong = new_mem_thread_safe_storage_db();
+ let engine = super::BridgedEngine::new(&strong);
+
+ setup_mock_data(&engine)?;
+
+ engine.wipe()?;
+
+ let shared = engine.thread_safe_storage_db()?;
+ let db = shared.lock();
+
+ assert_eq!(query_count(&db, "storage_sync_data"), 0);
+ assert_eq!(query_count(&db, "storage_sync_mirror"), 0);
+ assert_eq!(query_count(&db, "meta"), 0);
+ Ok(())
+ }
+
+ #[test]
+ fn test_reset() -> Result<()> {
+ let strong = new_mem_thread_safe_storage_db();
+ let engine = super::BridgedEngine::new(&strong);
+
+ setup_mock_data(&engine)?;
+ put_meta(
+ &engine.thread_safe_storage_db()?.lock(),
+ SYNC_ID_META_KEY,
+ &"sync-id".to_string(),
+ )?;
+
+ engine.reset()?;
+ assert_reset(&engine)?;
+ // Only an explicit reset kills the sync-id, so check that here.
+ assert_eq!(
+ get_meta::<String>(&engine.thread_safe_storage_db()?.lock(), SYNC_ID_META_KEY)?,
+ None
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_ensure_missing_sync_id() -> Result<()> {
+ let strong = new_mem_thread_safe_storage_db();
+ let engine = super::BridgedEngine::new(&strong);
+
+ setup_mock_data(&engine)?;
+
+ assert_eq!(engine.sync_id()?, None);
+ // We don't have a sync ID - so setting one should reset.
+ engine.ensure_current_sync_id("new-id")?;
+ // should have cause a reset.
+ assert_reset(&engine)?;
+ Ok(())
+ }
+
+ #[test]
+ fn test_ensure_new_sync_id() -> Result<()> {
+ let strong = new_mem_thread_safe_storage_db();
+ let engine = super::BridgedEngine::new(&strong);
+
+ setup_mock_data(&engine)?;
+
+ put_meta(
+ &engine.thread_safe_storage_db()?.lock(),
+ SYNC_ID_META_KEY,
+ &"old-id".to_string(),
+ )?;
+ assert_not_reset(&engine)?;
+ assert_eq!(engine.sync_id()?, Some("old-id".to_string()));
+
+ engine.ensure_current_sync_id("new-id")?;
+ // should have cause a reset.
+ assert_reset(&engine)?;
+ // should have the new id.
+ assert_eq!(engine.sync_id()?, Some("new-id".to_string()));
+ Ok(())
+ }
+
+ #[test]
+ fn test_ensure_same_sync_id() -> Result<()> {
+ let strong = new_mem_thread_safe_storage_db();
+ let engine = super::BridgedEngine::new(&strong);
+
+ setup_mock_data(&engine)?;
+ assert_not_reset(&engine)?;
+
+ put_meta(
+ &engine.thread_safe_storage_db()?.lock(),
+ SYNC_ID_META_KEY,
+ &"sync-id".to_string(),
+ )?;
+
+ engine.ensure_current_sync_id("sync-id")?;
+ // should not have reset.
+ assert_not_reset(&engine)?;
+ Ok(())
+ }
+
+ #[test]
+ fn test_reset_sync_id() -> Result<()> {
+ let strong = new_mem_thread_safe_storage_db();
+ let engine = super::BridgedEngine::new(&strong);
+
+ setup_mock_data(&engine)?;
+ put_meta(
+ &engine.thread_safe_storage_db()?.lock(),
+ SYNC_ID_META_KEY,
+ &"sync-id".to_string(),
+ )?;
+
+ assert_eq!(engine.sync_id()?, Some("sync-id".to_string()));
+ let new_id = engine.reset_sync_id()?;
+ // should have cause a reset.
+ assert_reset(&engine)?;
+ assert_eq!(engine.sync_id()?, Some(new_id));
+ Ok(())
+ }
+}
diff --git a/third_party/rust/webext-storage/src/sync/incoming.rs b/third_party/rust/webext-storage/src/sync/incoming.rs
new file mode 100644
index 0000000000..5d00fe8de3
--- /dev/null
+++ b/third_party/rust/webext-storage/src/sync/incoming.rs
@@ -0,0 +1,863 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// The "incoming" part of syncing - handling the incoming rows, staging them,
+// working out a plan for them, updating the local data and mirror, etc.
+
+use interrupt_support::Interruptee;
+use rusqlite::{Connection, Row, Transaction};
+use sql_support::ConnExt;
+use sync15::bso::{IncomingContent, IncomingKind};
+use sync_guid::Guid as SyncGuid;
+
+use crate::api::{StorageChanges, StorageValueChange};
+use crate::error::*;
+
+use super::{merge, remove_matching_keys, JsonMap, WebextRecord};
+
+/// The state data can be in. Could be represented as Option<JsonMap>, but this
+/// is clearer and independent of how the data is stored.
+#[derive(Debug, PartialEq, Eq)]
+pub enum DataState {
+ /// The data was deleted.
+ Deleted,
+ /// Data exists, as stored in the map.
+ Exists(JsonMap),
+}
+
+// A little helper to create a StorageChanges object when we are creating
+// a new value with multiple keys that doesn't exist locally.
+fn changes_for_new_incoming(new: &JsonMap) -> StorageChanges {
+ let mut result = StorageChanges::with_capacity(new.len());
+ for (key, val) in new.iter() {
+ result.push(StorageValueChange {
+ key: key.clone(),
+ old_value: None,
+ new_value: Some(val.clone()),
+ });
+ }
+ result
+}
+
+// This module deals exclusively with the Map inside a JsonValue::Object().
+// This helper reads such a Map from a SQL row, ignoring anything which is
+// either invalid JSON or a different JSON type.
+fn json_map_from_row(row: &Row<'_>, col: &str) -> Result<DataState> {
+ let s = row.get::<_, Option<String>>(col)?;
+ Ok(match s {
+ None => DataState::Deleted,
+ Some(s) => match serde_json::from_str(&s) {
+ Ok(serde_json::Value::Object(m)) => DataState::Exists(m),
+ _ => {
+ // We don't want invalid json or wrong types to kill syncing.
+ // It should be impossible as we never write anything which
+ // could cause it, but we can't really log the bad data as there
+ // might be PII. Logging just a message without any additional
+ // clues is going to be unhelpfully noisy, so, silently None.
+ // XXX - Maybe record telemetry?
+ DataState::Deleted
+ }
+ },
+ })
+}
+
+/// The first thing we do with incoming items is to "stage" them in a temp table.
+/// The actual processing is done via this table.
+pub fn stage_incoming(
+ tx: &Transaction<'_>,
+ incoming_records: &[IncomingContent<WebextRecord>],
+ signal: &dyn Interruptee,
+) -> Result<()> {
+ sql_support::each_sized_chunk(
+ incoming_records,
+ // We bind 3 params per chunk.
+ sql_support::default_max_variable_number() / 3,
+ |chunk, _| -> Result<()> {
+ let mut params = Vec::with_capacity(chunk.len() * 3);
+ for record in chunk {
+ signal.err_if_interrupted()?;
+ match &record.kind {
+ IncomingKind::Content(r) => {
+ params.push(Some(record.envelope.id.to_string()));
+ params.push(Some(r.ext_id.to_string()));
+ params.push(Some(r.data.clone()));
+ }
+ IncomingKind::Tombstone => {
+ params.push(Some(record.envelope.id.to_string()));
+ params.push(None);
+ params.push(None);
+ }
+ IncomingKind::Malformed => {
+ log::error!("Ignoring incoming malformed record: {}", record.envelope.id);
+ }
+ }
+ }
+ // we might have skipped records
+ let actual_len = params.len() / 3;
+ if actual_len != 0 {
+ let sql = format!(
+ "INSERT OR REPLACE INTO temp.storage_sync_staging
+ (guid, ext_id, data)
+ VALUES {}",
+ sql_support::repeat_multi_values(actual_len, 3)
+ );
+ tx.execute(&sql, rusqlite::params_from_iter(params))?;
+ }
+ Ok(())
+ },
+ )?;
+ Ok(())
+}
+
+/// The "state" we find ourselves in when considering an incoming/staging
+/// record. This "state" is the input to calculating the IncomingAction and
+/// carries all the data we need to make the required local changes.
+#[derive(Debug, PartialEq, Eq)]
+pub enum IncomingState {
+ /// There's an incoming item, but data for that extension doesn't exist
+ /// either in our local data store or in the local mirror. IOW, this is the
+ /// very first time we've seen this extension.
+ IncomingOnlyData { ext_id: String, data: JsonMap },
+
+ /// An incoming tombstone that doesn't exist locally. Because tombstones
+ /// don't carry the ext-id, it means it's not in our mirror. We are just
+ /// going to ignore it, but we track the state for consistency.
+ IncomingOnlyTombstone,
+
+ /// There's an incoming item and we have data for the same extension in
+ /// our local store - but not in our mirror. This should be relatively
+ /// uncommon as it means:
+ /// * Some other profile has recently installed an extension and synced.
+ /// * This profile has recently installed the same extension.
+ /// * This is the first sync for this profile since both those events
+ /// happened.
+ HasLocal {
+ ext_id: String,
+ incoming: DataState,
+ local: DataState,
+ },
+ /// There's an incoming item and there's an item for the same extension in
+ /// the mirror. The addon probably doesn't exist locally, or if it does,
+ /// the last time we synced we synced the deletion of all data.
+ NotLocal {
+ ext_id: String,
+ incoming: DataState,
+ mirror: DataState,
+ },
+ /// This will be the most common "incoming" case - there's data incoming,
+ /// in the mirror and in the local store for an addon.
+ Everywhere {
+ ext_id: String,
+ incoming: DataState,
+ mirror: DataState,
+ local: DataState,
+ },
+}
+
+/// Get the items we need to process from the staging table. Return details about
+/// the item and the state of that item, ready for processing.
+pub fn get_incoming(conn: &Connection) -> Result<Vec<(SyncGuid, IncomingState)>> {
+ let sql = "
+ SELECT
+ s.guid as guid,
+ l.ext_id as l_ext_id,
+ m.ext_id as m_ext_id,
+ s.ext_id as s_ext_id,
+ s.data as s_data, m.data as m_data, l.data as l_data,
+ l.sync_change_counter
+ FROM temp.storage_sync_staging s
+ LEFT JOIN storage_sync_mirror m ON m.guid = s.guid
+ LEFT JOIN storage_sync_data l on l.ext_id IN (m.ext_id, s.ext_id);";
+
+ fn from_row(row: &Row<'_>) -> Result<(SyncGuid, IncomingState)> {
+ let guid = row.get("guid")?;
+ // This is complicated because the staging row doesn't hold the ext_id.
+ // However, both the local table and the mirror do.
+ let mirror_ext_id: Option<String> = row.get("m_ext_id")?;
+ let local_ext_id: Option<String> = row.get("l_ext_id")?;
+ let staged_ext_id: Option<String> = row.get("s_ext_id")?;
+ let incoming = json_map_from_row(row, "s_data")?;
+
+ // We find the state by examining which tables the ext-id exists in,
+ // using whether that column is null as a proxy for that.
+ let state = match (local_ext_id, mirror_ext_id) {
+ (None, None) => {
+ match staged_ext_id {
+ Some(ext_id) => {
+ let data = match incoming {
+ // incoming record with missing data that's not a
+ // tombstone shouldn't happen, but we can cope by
+ // pretending it was an empty json map.
+ DataState::Deleted => JsonMap::new(),
+ DataState::Exists(data) => data,
+ };
+ IncomingState::IncomingOnlyData { ext_id, data }
+ }
+ None => IncomingState::IncomingOnlyTombstone,
+ }
+ }
+ (Some(ext_id), None) => IncomingState::HasLocal {
+ ext_id,
+ incoming,
+ local: json_map_from_row(row, "l_data")?,
+ },
+ (None, Some(ext_id)) => IncomingState::NotLocal {
+ ext_id,
+ incoming,
+ mirror: json_map_from_row(row, "m_data")?,
+ },
+ (Some(ext_id), Some(_)) => IncomingState::Everywhere {
+ ext_id,
+ incoming,
+ mirror: json_map_from_row(row, "m_data")?,
+ local: json_map_from_row(row, "l_data")?,
+ },
+ };
+ Ok((guid, state))
+ }
+
+ conn.conn().query_rows_and_then(sql, [], from_row)
+}
+
+/// This is the set of actions we know how to take *locally* for incoming
+/// records. Which one depends on the IncomingState.
+/// Every state which updates also records the set of changes we should notify
+#[derive(Debug, PartialEq, Eq)]
+pub enum IncomingAction {
+ /// We should locally delete the data for this record
+ DeleteLocally {
+ ext_id: String,
+ changes: StorageChanges,
+ },
+ /// We will take the remote.
+ TakeRemote {
+ ext_id: String,
+ data: JsonMap,
+ changes: StorageChanges,
+ },
+ /// We merged this data - this is what we came up with.
+ Merge {
+ ext_id: String,
+ data: JsonMap,
+ changes: StorageChanges,
+ },
+ /// Entry exists locally and it's the same as the incoming record.
+ Same { ext_id: String },
+ /// Incoming tombstone for an item we've never seen.
+ Nothing,
+}
+
+/// Takes the state of an item and returns the action we should take for it.
+pub fn plan_incoming(s: IncomingState) -> IncomingAction {
+ match s {
+ IncomingState::Everywhere {
+ ext_id,
+ incoming,
+ local,
+ mirror,
+ } => {
+ // All records exist - but do they all have data?
+ match (incoming, local, mirror) {
+ (
+ DataState::Exists(incoming_data),
+ DataState::Exists(local_data),
+ DataState::Exists(mirror_data),
+ ) => {
+ // all records have data - 3-way merge.
+ merge(ext_id, incoming_data, local_data, Some(mirror_data))
+ }
+ (
+ DataState::Exists(incoming_data),
+ DataState::Exists(local_data),
+ DataState::Deleted,
+ ) => {
+ // No parent, so first time seeing this remotely - 2-way merge
+ merge(ext_id, incoming_data, local_data, None)
+ }
+ (DataState::Exists(incoming_data), DataState::Deleted, _) => {
+ // Incoming data, removed locally. Server wins.
+ IncomingAction::TakeRemote {
+ ext_id,
+ changes: changes_for_new_incoming(&incoming_data),
+ data: incoming_data,
+ }
+ }
+ (DataState::Deleted, DataState::Exists(local_data), DataState::Exists(mirror)) => {
+ // Deleted remotely.
+ // Treat this as a delete of every key that we
+ // know was present at the time.
+ let (result, changes) = remove_matching_keys(local_data, &mirror);
+ if result.is_empty() {
+ // If there were no more keys left, we can
+ // delete our version too.
+ IncomingAction::DeleteLocally { ext_id, changes }
+ } else {
+ IncomingAction::Merge {
+ ext_id,
+ data: result,
+ changes,
+ }
+ }
+ }
+ (DataState::Deleted, DataState::Exists(local_data), DataState::Deleted) => {
+ // Perhaps another client created and then deleted
+ // the whole object for this extension since the
+ // last time we synced.
+ // Treat this as a delete of every key that we
+ // knew was present. Unfortunately, we don't know
+ // any keys that were present, so we delete no keys.
+ IncomingAction::Merge {
+ ext_id,
+ data: local_data,
+ changes: StorageChanges::new(),
+ }
+ }
+ (DataState::Deleted, DataState::Deleted, _) => {
+ // We agree with the remote (regardless of what we
+ // have mirrored).
+ IncomingAction::Same { ext_id }
+ }
+ }
+ }
+ IncomingState::HasLocal {
+ ext_id,
+ incoming,
+ local,
+ } => {
+ // So we have a local record and an incoming/staging record, but *not* a
+ // mirror record. This means some other device has synced this for
+ // the first time and we are yet to do the same.
+ match (incoming, local) {
+ (DataState::Exists(incoming_data), DataState::Exists(local_data)) => {
+ // This means the extension exists locally and remotely
+ // but this is the first time we've synced it. That's no problem, it's
+ // just a 2-way merge...
+ merge(ext_id, incoming_data, local_data, None)
+ }
+ (DataState::Deleted, DataState::Exists(local_data)) => {
+ // We've data locally, but there's an incoming deletion.
+ // We would normally remove keys that we knew were
+ // present on the server, but we don't know what
+ // was on the server, so we don't remove anything.
+ IncomingAction::Merge {
+ ext_id,
+ data: local_data,
+ changes: StorageChanges::new(),
+ }
+ }
+ (DataState::Exists(incoming_data), DataState::Deleted) => {
+ // No data locally, but some is incoming - take it.
+ IncomingAction::TakeRemote {
+ ext_id,
+ changes: changes_for_new_incoming(&incoming_data),
+ data: incoming_data,
+ }
+ }
+ (DataState::Deleted, DataState::Deleted) => {
+ // Nothing anywhere - odd, but OK.
+ IncomingAction::Same { ext_id }
+ }
+ }
+ }
+ IncomingState::NotLocal {
+ ext_id, incoming, ..
+ } => {
+ // No local data but there's mirror and an incoming record.
+ // This means a local deletion is being replaced by, or just re-doing
+ // the incoming record.
+ match incoming {
+ DataState::Exists(data) => IncomingAction::TakeRemote {
+ ext_id,
+ changes: changes_for_new_incoming(&data),
+ data,
+ },
+ DataState::Deleted => IncomingAction::Same { ext_id },
+ }
+ }
+ IncomingState::IncomingOnlyData { ext_id, data } => {
+ // Only the staging record exists and it's not a tombstone.
+ // This means it's the first time we've ever seen it. No
+ // conflict possible, just take the remote.
+ IncomingAction::TakeRemote {
+ ext_id,
+ changes: changes_for_new_incoming(&data),
+ data,
+ }
+ }
+ IncomingState::IncomingOnlyTombstone => {
+ // Only the staging record exists and it is a tombstone - nothing to do.
+ IncomingAction::Nothing
+ }
+ }
+}
+
+fn insert_changes(tx: &Transaction<'_>, ext_id: &str, changes: &StorageChanges) -> Result<()> {
+ tx.execute_cached(
+ "INSERT INTO temp.storage_sync_applied (ext_id, changes)
+ VALUES (:ext_id, :changes)",
+ rusqlite::named_params! {
+ ":ext_id": ext_id,
+ ":changes": &serde_json::to_string(&changes)?,
+ },
+ )?;
+ Ok(())
+}
+
+// Apply the actions necessary to fully process the incoming items.
+pub fn apply_actions(
+ tx: &Transaction<'_>,
+ actions: Vec<(SyncGuid, IncomingAction)>,
+ signal: &dyn Interruptee,
+) -> Result<()> {
+ for (item, action) in actions {
+ signal.err_if_interrupted()?;
+
+ log::trace!("action for '{:?}': {:?}", item, action);
+ match action {
+ IncomingAction::DeleteLocally { ext_id, changes } => {
+ // Can just nuke it entirely.
+ tx.execute_cached(
+ "DELETE FROM storage_sync_data WHERE ext_id = :ext_id",
+ &[(":ext_id", &ext_id)],
+ )?;
+ insert_changes(tx, &ext_id, &changes)?;
+ }
+ // We want to update the local record with 'data' and after this update the item no longer is considered dirty.
+ IncomingAction::TakeRemote {
+ ext_id,
+ data,
+ changes,
+ } => {
+ tx.execute_cached(
+ "INSERT OR REPLACE INTO storage_sync_data(ext_id, data, sync_change_counter)
+ VALUES (:ext_id, :data, 0)",
+ rusqlite::named_params! {
+ ":ext_id": ext_id,
+ ":data": serde_json::Value::Object(data),
+ },
+ )?;
+ insert_changes(tx, &ext_id, &changes)?;
+ }
+
+ // We merged this data, so need to update locally but still consider
+ // it dirty because the merged data must be uploaded.
+ IncomingAction::Merge {
+ ext_id,
+ data,
+ changes,
+ } => {
+ tx.execute_cached(
+ "UPDATE storage_sync_data SET data = :data, sync_change_counter = sync_change_counter + 1 WHERE ext_id = :ext_id",
+ rusqlite::named_params! {
+ ":ext_id": ext_id,
+ ":data": serde_json::Value::Object(data),
+ },
+ )?;
+ insert_changes(tx, &ext_id, &changes)?;
+ }
+
+ // Both local and remote ended up the same - only need to nuke the
+ // change counter.
+ IncomingAction::Same { ext_id } => {
+ tx.execute_cached(
+ "UPDATE storage_sync_data SET sync_change_counter = 0 WHERE ext_id = :ext_id",
+ &[(":ext_id", &ext_id)],
+ )?;
+ // no changes to write
+ }
+ // Literally nothing to do!
+ IncomingAction::Nothing => {}
+ }
+ }
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::super::test::new_syncable_mem_db;
+ use super::*;
+ use crate::api;
+ use interrupt_support::NeverInterrupts;
+ use serde_json::{json, Value};
+ use sync15::bso::IncomingBso;
+
+ // select simple int
+ fn ssi(conn: &Connection, stmt: &str) -> u32 {
+ conn.try_query_one(stmt, [], true)
+ .expect("must work")
+ .unwrap_or_default()
+ }
+
+ fn array_to_incoming(mut array: Value) -> Vec<IncomingContent<WebextRecord>> {
+ let jv = array.as_array_mut().expect("you must pass a json array");
+ let mut result = Vec::with_capacity(jv.len());
+ for elt in jv {
+ result.push(IncomingBso::from_test_content(elt.take()).into_content());
+ }
+ result
+ }
+
+ // Can't find a way to import these from crate::sync::tests...
+ macro_rules! map {
+ ($($map:tt)+) => {
+ json!($($map)+).as_object().unwrap().clone()
+ };
+ }
+ macro_rules! change {
+ ($key:literal, None, None) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: None,
+ new_value: None,
+ };
+ };
+ ($key:literal, $old:tt, None) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: Some(json!($old)),
+ new_value: None,
+ }
+ };
+ ($key:literal, None, $new:tt) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: None,
+ new_value: Some(json!($new)),
+ };
+ };
+ ($key:literal, $old:tt, $new:tt) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: Some(json!($old)),
+ new_value: Some(json!($new)),
+ }
+ };
+ }
+ macro_rules! changes {
+ ( $( $change:expr ),* ) => {
+ {
+ let mut changes = StorageChanges::new();
+ $(
+ changes.push($change);
+ )*
+ changes
+ }
+ };
+ }
+
+ #[test]
+ fn test_incoming_populates_staging() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+
+ let incoming = json! {[
+ {
+ "id": "guidAAAAAAAA",
+ "extId": "ext1@example.com",
+ "data": json!({"foo": "bar"}).to_string(),
+ }
+ ]};
+
+ stage_incoming(&tx, &array_to_incoming(incoming), &NeverInterrupts)?;
+ // check staging table
+ assert_eq!(
+ ssi(&tx, "SELECT count(*) FROM temp.storage_sync_staging"),
+ 1
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn test_fetch_incoming_state() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+
+ // Start with an item just in staging.
+ tx.execute(
+ r#"
+ INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
+ VALUES ('guid', 'ext_id', '{"foo":"bar"}')
+ "#,
+ [],
+ )?;
+
+ let incoming = get_incoming(&tx)?;
+ assert_eq!(incoming.len(), 1);
+ assert_eq!(incoming[0].0, SyncGuid::new("guid"),);
+ assert_eq!(
+ incoming[0].1,
+ IncomingState::IncomingOnlyData {
+ ext_id: "ext_id".to_string(),
+ data: map!({"foo": "bar"}),
+ }
+ );
+
+ // Add the same item to the mirror.
+ tx.execute(
+ r#"
+ INSERT INTO storage_sync_mirror (guid, ext_id, data)
+ VALUES ('guid', 'ext_id', '{"foo":"new"}')
+ "#,
+ [],
+ )?;
+ let incoming = get_incoming(&tx)?;
+ assert_eq!(incoming.len(), 1);
+ assert_eq!(
+ incoming[0].1,
+ IncomingState::NotLocal {
+ ext_id: "ext_id".to_string(),
+ incoming: DataState::Exists(map!({"foo": "bar"})),
+ mirror: DataState::Exists(map!({"foo": "new"})),
+ }
+ );
+
+ // and finally the data itself - might as use the API here!
+ api::set(&tx, "ext_id", json!({"foo": "local"}))?;
+ let incoming = get_incoming(&tx)?;
+ assert_eq!(incoming.len(), 1);
+ assert_eq!(
+ incoming[0].1,
+ IncomingState::Everywhere {
+ ext_id: "ext_id".to_string(),
+ incoming: DataState::Exists(map!({"foo": "bar"})),
+ local: DataState::Exists(map!({"foo": "local"})),
+ mirror: DataState::Exists(map!({"foo": "new"})),
+ }
+ );
+ Ok(())
+ }
+
+ // Like test_fetch_incoming_state, but check NULLs are handled correctly.
+ #[test]
+ fn test_fetch_incoming_state_nulls() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+
+ // Start with a tombstone just in staging.
+ tx.execute(
+ r#"
+ INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
+ VALUES ('guid', NULL, NULL)
+ "#,
+ [],
+ )?;
+
+ let incoming = get_incoming(&tx)?;
+ assert_eq!(incoming.len(), 1);
+ assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone,);
+
+ // Add the same item to the mirror (can't store an ext_id for a
+ // tombstone in the mirror as incoming tombstones never have it)
+ tx.execute(
+ r#"
+ INSERT INTO storage_sync_mirror (guid, ext_id, data)
+ VALUES ('guid', NULL, NULL)
+ "#,
+ [],
+ )?;
+ let incoming = get_incoming(&tx)?;
+ assert_eq!(incoming.len(), 1);
+ assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone);
+
+ tx.execute(
+ r#"
+ INSERT INTO storage_sync_data (ext_id, data)
+ VALUES ('ext_id', NULL)
+ "#,
+ [],
+ )?;
+ let incoming = get_incoming(&tx)?;
+ assert_eq!(incoming.len(), 1);
+ assert_eq!(
+ incoming[0].1,
+ // IncomingOnly* seems a little odd, but it is because we can't
+ // tie the tombstones together due to the lack of any ext-id/guid
+ // mapping in this case.
+ IncomingState::IncomingOnlyTombstone
+ );
+ Ok(())
+ }
+
+ // apply_action tests.
+ #[derive(Debug, PartialEq)]
+ struct LocalItem {
+ data: DataState,
+ sync_change_counter: i32,
+ }
+
+ fn get_local_item(conn: &Connection) -> Option<LocalItem> {
+ conn.try_query_row::<_, Error, _, _>(
+ "SELECT data, sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_id'",
+ [],
+ |row| {
+ let data = json_map_from_row(row, "data")?;
+ let sync_change_counter = row.get::<_, i32>(1)?;
+ Ok(LocalItem {
+ data,
+ sync_change_counter,
+ })
+ },
+ true,
+ )
+ .expect("query should work")
+ }
+
+ fn get_applied_item_changes(conn: &Connection) -> Option<StorageChanges> {
+ // no custom deserialize for storagechanges and we only need it for
+ // tests, so do it manually.
+ conn.try_query_row::<_, Error, _, _>(
+ "SELECT changes FROM temp.storage_sync_applied WHERE ext_id = 'ext_id'",
+ [],
+ |row| Ok(serde_json::from_str(&row.get::<_, String>("changes")?)?),
+ true,
+ )
+ .expect("query should work")
+ .map(|val: serde_json::Value| {
+ let ob = val.as_object().expect("should be an object of items");
+ let mut result = StorageChanges::with_capacity(ob.len());
+ for (key, val) in ob.into_iter() {
+ let details = val.as_object().expect("elts should be objects");
+ result.push(StorageValueChange {
+ key: key.to_string(),
+ old_value: details.get("oldValue").cloned(),
+ new_value: details.get("newValue").cloned(),
+ });
+ }
+ result
+ })
+ }
+
+ fn do_apply_action(tx: &Transaction<'_>, action: IncomingAction) {
+ let guid = SyncGuid::new("guid");
+ apply_actions(tx, vec![(guid, action)], &NeverInterrupts).expect("should apply");
+ }
+
+ #[test]
+ fn test_apply_actions() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+
+ // DeleteLocally - row should be entirely removed.
+ let tx = db.transaction().expect("transaction should work");
+ api::set(&tx, "ext_id", json!({"foo": "local"}))?;
+ assert_eq!(
+ api::get(&tx, "ext_id", json!(null))?,
+ json!({"foo": "local"})
+ );
+ let changes = changes![change!("foo", "local", None)];
+ do_apply_action(
+ &tx,
+ IncomingAction::DeleteLocally {
+ ext_id: "ext_id".to_string(),
+ changes: changes.clone(),
+ },
+ );
+ assert_eq!(api::get(&tx, "ext_id", json!(null))?, json!({}));
+ // and there should not be a local record at all.
+ assert!(get_local_item(&tx).is_none());
+ assert_eq!(get_applied_item_changes(&tx), Some(changes));
+ tx.rollback()?;
+
+ // TakeRemote - replace local data with remote and marked as not dirty.
+ let tx = db.transaction().expect("transaction should work");
+ api::set(&tx, "ext_id", json!({"foo": "local"}))?;
+ assert_eq!(
+ api::get(&tx, "ext_id", json!(null))?,
+ json!({"foo": "local"})
+ );
+ // data should exist locally with a change recorded.
+ assert_eq!(
+ get_local_item(&tx),
+ Some(LocalItem {
+ data: DataState::Exists(map!({"foo": "local"})),
+ sync_change_counter: 1
+ })
+ );
+ let changes = changes![change!("foo", "local", "remote")];
+ do_apply_action(
+ &tx,
+ IncomingAction::TakeRemote {
+ ext_id: "ext_id".to_string(),
+ data: map!({"foo": "remote"}),
+ changes: changes.clone(),
+ },
+ );
+ // data should exist locally with the remote data and not be dirty.
+ assert_eq!(
+ get_local_item(&tx),
+ Some(LocalItem {
+ data: DataState::Exists(map!({"foo": "remote"})),
+ sync_change_counter: 0
+ })
+ );
+ assert_eq!(get_applied_item_changes(&tx), Some(changes));
+ tx.rollback()?;
+
+ // Merge - like ::TakeRemote, but data remains dirty.
+ let tx = db.transaction().expect("transaction should work");
+ api::set(&tx, "ext_id", json!({"foo": "local"}))?;
+ assert_eq!(
+ api::get(&tx, "ext_id", json!(null))?,
+ json!({"foo": "local"})
+ );
+ // data should exist locally with a change recorded.
+ assert_eq!(
+ get_local_item(&tx),
+ Some(LocalItem {
+ data: DataState::Exists(map!({"foo": "local"})),
+ sync_change_counter: 1
+ })
+ );
+ let changes = changes![change!("foo", "local", "remote")];
+ do_apply_action(
+ &tx,
+ IncomingAction::Merge {
+ ext_id: "ext_id".to_string(),
+ data: map!({"foo": "remote"}),
+ changes: changes.clone(),
+ },
+ );
+ assert_eq!(
+ get_local_item(&tx),
+ Some(LocalItem {
+ data: DataState::Exists(map!({"foo": "remote"})),
+ sync_change_counter: 2
+ })
+ );
+ assert_eq!(get_applied_item_changes(&tx), Some(changes));
+ tx.rollback()?;
+
+ // Same - data stays the same but is marked not dirty.
+ let tx = db.transaction().expect("transaction should work");
+ api::set(&tx, "ext_id", json!({"foo": "local"}))?;
+ assert_eq!(
+ api::get(&tx, "ext_id", json!(null))?,
+ json!({"foo": "local"})
+ );
+ // data should exist locally with a change recorded.
+ assert_eq!(
+ get_local_item(&tx),
+ Some(LocalItem {
+ data: DataState::Exists(map!({"foo": "local"})),
+ sync_change_counter: 1
+ })
+ );
+ do_apply_action(
+ &tx,
+ IncomingAction::Same {
+ ext_id: "ext_id".to_string(),
+ },
+ );
+ assert_eq!(
+ get_local_item(&tx),
+ Some(LocalItem {
+ data: DataState::Exists(map!({"foo": "local"})),
+ sync_change_counter: 0
+ })
+ );
+ assert_eq!(get_applied_item_changes(&tx), None);
+ tx.rollback()?;
+
+ Ok(())
+ }
+}
diff --git a/third_party/rust/webext-storage/src/sync/mod.rs b/third_party/rust/webext-storage/src/sync/mod.rs
new file mode 100644
index 0000000000..3144049dca
--- /dev/null
+++ b/third_party/rust/webext-storage/src/sync/mod.rs
@@ -0,0 +1,429 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+mod bridge;
+mod incoming;
+mod outgoing;
+
+#[cfg(test)]
+mod sync_tests;
+
+use crate::api::{StorageChanges, StorageValueChange};
+use crate::db::StorageDb;
+use crate::error::*;
+use serde::Deserialize;
+use serde_derive::*;
+use sql_support::ConnExt;
+use sync_guid::Guid as SyncGuid;
+
+pub use bridge::BridgedEngine;
+use incoming::IncomingAction;
+
+type JsonMap = serde_json::Map<String, serde_json::Value>;
+
+pub const STORAGE_VERSION: usize = 1;
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WebextRecord {
+ #[serde(rename = "id")]
+ guid: SyncGuid,
+ #[serde(rename = "extId")]
+ ext_id: String,
+ data: String,
+}
+
+// Perform a 2-way or 3-way merge, where the incoming value wins on confict.
+fn merge(
+ ext_id: String,
+ mut other: JsonMap,
+ mut ours: JsonMap,
+ parent: Option<JsonMap>,
+) -> IncomingAction {
+ if other == ours {
+ return IncomingAction::Same { ext_id };
+ }
+ let old_incoming = other.clone();
+ // worst case is keys in each are unique.
+ let mut changes = StorageChanges::with_capacity(other.len() + ours.len());
+ if let Some(parent) = parent {
+ // Perform 3-way merge. First, for every key in parent,
+ // compare the parent value with the incoming value to compute
+ // an implicit "diff".
+ for (key, parent_value) in parent.into_iter() {
+ if let Some(incoming_value) = other.remove(&key) {
+ if incoming_value != parent_value {
+ log::trace!(
+ "merge: key {} was updated in incoming - copying value locally",
+ key
+ );
+ let old_value = ours.remove(&key);
+ let new_value = Some(incoming_value.clone());
+ if old_value != new_value {
+ changes.push(StorageValueChange {
+ key: key.clone(),
+ old_value,
+ new_value,
+ });
+ }
+ ours.insert(key, incoming_value);
+ }
+ } else {
+ // Key was not present in incoming value.
+ // Another client must have deleted it.
+ log::trace!(
+ "merge: key {} no longer present in incoming - removing it locally",
+ key
+ );
+ if let Some(old_value) = ours.remove(&key) {
+ changes.push(StorageValueChange {
+ key,
+ old_value: Some(old_value),
+ new_value: None,
+ });
+ }
+ }
+ }
+
+ // Then, go through every remaining key in incoming. These are
+ // the ones where a corresponding key does not exist in
+ // parent, so it is a new key, and we need to add it.
+ for (key, incoming_value) in other.into_iter() {
+ log::trace!(
+ "merge: key {} doesn't occur in parent - copying from incoming",
+ key
+ );
+ changes.push(StorageValueChange {
+ key: key.clone(),
+ old_value: None,
+ new_value: Some(incoming_value.clone()),
+ });
+ ours.insert(key, incoming_value);
+ }
+ } else {
+ // No parent. Server wins. Overwrite every key in ours with
+ // the corresponding value in other.
+ log::trace!("merge: no parent - copying all keys from incoming");
+ for (key, incoming_value) in other.into_iter() {
+ let old_value = ours.remove(&key);
+ let new_value = Some(incoming_value.clone());
+ if old_value != new_value {
+ changes.push(StorageValueChange {
+ key: key.clone(),
+ old_value,
+ new_value,
+ });
+ }
+ ours.insert(key, incoming_value);
+ }
+ }
+
+ if ours == old_incoming {
+ IncomingAction::TakeRemote {
+ ext_id,
+ data: old_incoming,
+ changes,
+ }
+ } else {
+ IncomingAction::Merge {
+ ext_id,
+ data: ours,
+ changes,
+ }
+ }
+}
+
+fn remove_matching_keys(mut ours: JsonMap, keys_to_remove: &JsonMap) -> (JsonMap, StorageChanges) {
+ let mut changes = StorageChanges::with_capacity(keys_to_remove.len());
+ for key in keys_to_remove.keys() {
+ if let Some(old_value) = ours.remove(key) {
+ changes.push(StorageValueChange {
+ key: key.clone(),
+ old_value: Some(old_value),
+ new_value: None,
+ });
+ }
+ }
+ (ours, changes)
+}
+
+/// Holds a JSON-serialized map of all synced changes for an extension.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct SyncedExtensionChange {
+ /// The extension ID.
+ pub ext_id: String,
+ /// The contents of a `StorageChanges` struct, in JSON format. We don't
+ /// deserialize these because they need to be passed back to the browser
+ /// as strings anyway.
+ pub changes: String,
+}
+
+// Fetches the applied changes we stashed in the storage_sync_applied table.
+pub fn get_synced_changes(db: &StorageDb) -> Result<Vec<SyncedExtensionChange>> {
+ let signal = db.begin_interrupt_scope()?;
+ let sql = "SELECT ext_id, changes FROM temp.storage_sync_applied";
+ db.conn().query_rows_and_then(sql, [], |row| -> Result<_> {
+ signal.err_if_interrupted()?;
+ Ok(SyncedExtensionChange {
+ ext_id: row.get("ext_id")?,
+ changes: row.get("changes")?,
+ })
+ })
+}
+
+// Helpers for tests
+#[cfg(test)]
+pub mod test {
+ use crate::db::{test::new_mem_db, StorageDb};
+ use crate::schema::create_empty_sync_temp_tables;
+
+ pub fn new_syncable_mem_db() -> StorageDb {
+ let _ = env_logger::try_init();
+ let db = new_mem_db();
+ create_empty_sync_temp_tables(&db).expect("should work");
+ db
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::test::new_syncable_mem_db;
+ use super::*;
+ use serde_json::json;
+
+ #[test]
+ fn test_serde_record_ser() {
+ assert_eq!(
+ serde_json::to_string(&WebextRecord {
+ guid: "guid".into(),
+ ext_id: "ext_id".to_string(),
+ data: "data".to_string()
+ })
+ .unwrap(),
+ r#"{"id":"guid","extId":"ext_id","data":"data"}"#
+ );
+ }
+
+ // a macro for these tests - constructs a serde_json::Value::Object
+ macro_rules! map {
+ ($($map:tt)+) => {
+ json!($($map)+).as_object().unwrap().clone()
+ };
+ }
+
+ macro_rules! change {
+ ($key:literal, None, None) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: None,
+ new_value: None,
+ };
+ };
+ ($key:literal, $old:tt, None) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: Some(json!($old)),
+ new_value: None,
+ }
+ };
+ ($key:literal, None, $new:tt) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: None,
+ new_value: Some(json!($new)),
+ }
+ };
+ ($key:literal, $old:tt, $new:tt) => {
+ StorageValueChange {
+ key: $key.to_string(),
+ old_value: Some(json!($old)),
+ new_value: Some(json!($new)),
+ }
+ };
+ }
+ macro_rules! changes {
+ ( ) => {
+ StorageChanges::new()
+ };
+ ( $( $change:expr ),* ) => {
+ {
+ let mut changes = StorageChanges::new();
+ $(
+ changes.push($change);
+ )*
+ changes
+ }
+ };
+ }
+
+ #[test]
+ fn test_3way_merging() {
+ // No conflict - identical local and remote.
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({"one": "one", "two": "two"}),
+ map!({"two": "two", "one": "one"}),
+ Some(map!({"parent_only": "parent"})),
+ ),
+ IncomingAction::Same {
+ ext_id: "ext-id".to_string()
+ }
+ );
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({"other_only": "other", "common": "common"}),
+ map!({"ours_only": "ours", "common": "common"}),
+ Some(map!({"parent_only": "parent", "common": "old_common"})),
+ ),
+ IncomingAction::Merge {
+ ext_id: "ext-id".to_string(),
+ data: map!({"other_only": "other", "ours_only": "ours", "common": "common"}),
+ changes: changes![change!("other_only", None, "other")],
+ }
+ );
+ // Simple conflict - parent value is neither local nor incoming. incoming wins.
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({"other_only": "other", "common": "incoming"}),
+ map!({"ours_only": "ours", "common": "local"}),
+ Some(map!({"parent_only": "parent", "common": "parent"})),
+ ),
+ IncomingAction::Merge {
+ ext_id: "ext-id".to_string(),
+ data: map!({"other_only": "other", "ours_only": "ours", "common": "incoming"}),
+ changes: changes![
+ change!("common", "local", "incoming"),
+ change!("other_only", None, "other")
+ ],
+ }
+ );
+ // Local change, no conflict.
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({"other_only": "other", "common": "old_value"}),
+ map!({"ours_only": "ours", "common": "new_value"}),
+ Some(map!({"parent_only": "parent", "common": "old_value"})),
+ ),
+ IncomingAction::Merge {
+ ext_id: "ext-id".to_string(),
+ data: map!({"other_only": "other", "ours_only": "ours", "common": "new_value"}),
+ changes: changes![change!("other_only", None, "other")],
+ }
+ );
+ // Field was removed remotely.
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({"other_only": "other"}),
+ map!({"common": "old_value"}),
+ Some(map!({"common": "old_value"})),
+ ),
+ IncomingAction::TakeRemote {
+ ext_id: "ext-id".to_string(),
+ data: map!({"other_only": "other"}),
+ changes: changes![
+ change!("common", "old_value", None),
+ change!("other_only", None, "other")
+ ],
+ }
+ );
+ // Field was removed remotely but we added another one.
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({"other_only": "other"}),
+ map!({"common": "old_value", "new_key": "new_value"}),
+ Some(map!({"common": "old_value"})),
+ ),
+ IncomingAction::Merge {
+ ext_id: "ext-id".to_string(),
+ data: map!({"other_only": "other", "new_key": "new_value"}),
+ changes: changes![
+ change!("common", "old_value", None),
+ change!("other_only", None, "other")
+ ],
+ }
+ );
+ // Field was removed both remotely and locally.
+ assert_eq!(
+ merge(
+ "ext-id".to_string(),
+ map!({}),
+ map!({"new_key": "new_value"}),
+ Some(map!({"common": "old_value"})),
+ ),
+ IncomingAction::Merge {
+ ext_id: "ext-id".to_string(),
+ data: map!({"new_key": "new_value"}),
+ changes: changes![],
+ }
+ );
+ }
+
+ #[test]
+ fn test_remove_matching_keys() {
+ assert_eq!(
+ remove_matching_keys(
+ map!({"key1": "value1", "key2": "value2"}),
+ &map!({"key1": "ignored", "key3": "ignored"})
+ ),
+ (
+ map!({"key2": "value2"}),
+ changes![change!("key1", "value1", None)]
+ )
+ );
+ }
+
+ #[test]
+ fn test_get_synced_changes() -> Result<()> {
+ let db = new_syncable_mem_db();
+ db.execute_batch(&format!(
+ r#"INSERT INTO temp.storage_sync_applied (ext_id, changes)
+ VALUES
+ ('an-extension', '{change1}'),
+ ('ext"id', '{change2}')
+ "#,
+ change1 = serde_json::to_string(&changes![change!("key1", "old-val", None)])?,
+ change2 = serde_json::to_string(&changes![change!("key-for-second", None, "new-val")])?
+ ))?;
+ let changes = get_synced_changes(&db)?;
+ assert_eq!(changes[0].ext_id, "an-extension");
+ // sanity check it's valid!
+ let c1: JsonMap =
+ serde_json::from_str(&changes[0].changes).expect("changes must be an object");
+ assert_eq!(
+ c1.get("key1")
+ .expect("must exist")
+ .as_object()
+ .expect("must be an object")
+ .get("oldValue"),
+ Some(&json!("old-val"))
+ );
+
+ // phew - do it again to check the string got escaped.
+ assert_eq!(
+ changes[1],
+ SyncedExtensionChange {
+ ext_id: "ext\"id".into(),
+ changes: r#"{"key-for-second":{"newValue":"new-val"}}"#.into(),
+ }
+ );
+ assert_eq!(changes[1].ext_id, "ext\"id");
+ let c2: JsonMap =
+ serde_json::from_str(&changes[1].changes).expect("changes must be an object");
+ assert_eq!(
+ c2.get("key-for-second")
+ .expect("must exist")
+ .as_object()
+ .expect("must be an object")
+ .get("newValue"),
+ Some(&json!("new-val"))
+ );
+ Ok(())
+ }
+}
diff --git a/third_party/rust/webext-storage/src/sync/outgoing.rs b/third_party/rust/webext-storage/src/sync/outgoing.rs
new file mode 100644
index 0000000000..c6c9dd64e5
--- /dev/null
+++ b/third_party/rust/webext-storage/src/sync/outgoing.rs
@@ -0,0 +1,186 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// The "outgoing" part of syncing - building the payloads to upload and
+// managing the sync state of the local DB.
+
+use interrupt_support::Interruptee;
+use rusqlite::{Connection, Row, Transaction};
+use sql_support::ConnExt;
+use sync15::bso::OutgoingBso;
+use sync_guid::Guid as SyncGuid;
+
+use crate::error::*;
+
+use super::WebextRecord;
+
+fn outgoing_from_row(row: &Row<'_>) -> Result<OutgoingBso> {
+ let guid: SyncGuid = row.get("guid")?;
+ let ext_id: String = row.get("ext_id")?;
+ let raw_data: Option<String> = row.get("data")?;
+ Ok(match raw_data {
+ Some(raw_data) => {
+ let record = WebextRecord {
+ guid,
+ ext_id,
+ data: raw_data,
+ };
+ OutgoingBso::from_content_with_id(record)?
+ }
+ None => OutgoingBso::new_tombstone(guid.into()),
+ })
+}
+
+/// Stages info about what should be uploaded in a temp table. This should be
+/// called in the same transaction as `apply_actions`. record_uploaded() can be
+/// called after the upload is complete and the data in the temp table will be
+/// used to update the local store.
+pub fn stage_outgoing(tx: &Transaction<'_>) -> Result<()> {
+ let sql = "
+ -- Stage outgoing items. The item may not yet have a GUID (ie, it might
+ -- not already be in either the mirror nor the incoming staging table),
+ -- so we generate one if it doesn't exist.
+ INSERT INTO storage_sync_outgoing_staging
+ (guid, ext_id, data, sync_change_counter)
+ SELECT coalesce(m.guid, s.guid, generate_guid()),
+ l.ext_id, l.data, l.sync_change_counter
+ FROM storage_sync_data l
+ -- left joins as one or both may not exist.
+ LEFT JOIN storage_sync_mirror m ON m.ext_id = l.ext_id
+ LEFT JOIN storage_sync_staging s ON s.ext_id = l.ext_id
+ WHERE sync_change_counter > 0;
+
+ -- At this point, we've merged in all new records, so copy incoming
+ -- staging into the mirror so that it matches what's on the server.
+ INSERT OR REPLACE INTO storage_sync_mirror (guid, ext_id, data)
+ SELECT guid, ext_id, data FROM temp.storage_sync_staging;
+
+ -- And copy any incoming records that we aren't reuploading into the
+ -- local table. We'll copy the outgoing ones into the mirror and local
+ -- after we upload them.
+ INSERT OR REPLACE INTO storage_sync_data (ext_id, data, sync_change_counter)
+ SELECT ext_id, data, 0
+ FROM storage_sync_staging s
+ WHERE ext_id IS NOT NULL
+ AND NOT EXISTS(SELECT 1 FROM storage_sync_outgoing_staging o
+ WHERE o.guid = s.guid);";
+ tx.execute_batch(sql)?;
+ Ok(())
+}
+
+/// Returns a vec of the outgoing records which should be uploaded.
+pub fn get_outgoing(conn: &Connection, signal: &dyn Interruptee) -> Result<Vec<OutgoingBso>> {
+ let sql = "SELECT guid, ext_id, data FROM storage_sync_outgoing_staging";
+ let elts = conn
+ .conn()
+ .query_rows_and_then(sql, [], |row| -> Result<_> {
+ signal.err_if_interrupted()?;
+ outgoing_from_row(row)
+ })?;
+
+ log::debug!("get_outgoing found {} items", elts.len());
+ Ok(elts.into_iter().collect())
+}
+
+/// Record the fact that items were uploaded. This updates the state of the
+/// local DB to reflect the state of the server we just updated.
+/// Note that this call is almost certainly going to be made in a *different*
+/// transaction than the transaction used in `stage_outgoing()`, and it will
+/// be called once per batch upload.
+pub fn record_uploaded(
+ tx: &Transaction<'_>,
+ items: &[SyncGuid],
+ signal: &dyn Interruptee,
+) -> Result<()> {
+ log::debug!(
+ "record_uploaded recording that {} items were uploaded",
+ items.len()
+ );
+
+ // Updating the `was_uploaded` column fires the `record_uploaded` trigger,
+ // which updates the local change counter and writes the uploaded record
+ // data back to the mirror.
+ sql_support::each_chunk(items, |chunk, _| -> Result<()> {
+ signal.err_if_interrupted()?;
+ let sql = format!(
+ "UPDATE storage_sync_outgoing_staging SET
+ was_uploaded = 1
+ WHERE guid IN ({})",
+ sql_support::repeat_sql_vars(chunk.len()),
+ );
+ tx.execute(&sql, rusqlite::params_from_iter(chunk))?;
+ Ok(())
+ })?;
+
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::super::test::new_syncable_mem_db;
+ use super::*;
+ use interrupt_support::NeverInterrupts;
+
+ #[test]
+ fn test_simple() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+
+ tx.execute_batch(
+ r#"
+ INSERT INTO storage_sync_data (ext_id, data, sync_change_counter)
+ VALUES
+ ('ext_no_changes', '{"foo":"bar"}', 0),
+ ('ext_with_changes', '{"foo":"bar"}', 1);
+ "#,
+ )?;
+
+ stage_outgoing(&tx)?;
+ let changes = get_outgoing(&tx, &NeverInterrupts)?;
+ assert_eq!(changes.len(), 1);
+ let record: serde_json::Value = serde_json::from_str(&changes[0].payload).unwrap();
+ let ext_id = record.get("extId").unwrap().as_str().unwrap();
+
+ assert_eq!(ext_id, "ext_with_changes");
+
+ record_uploaded(
+ &tx,
+ changes
+ .into_iter()
+ .map(|p| p.envelope.id)
+ .collect::<Vec<SyncGuid>>()
+ .as_slice(),
+ &NeverInterrupts,
+ )?;
+
+ let counter: i32 = tx.conn().query_one(
+ "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_with_changes'",
+ )?;
+ assert_eq!(counter, 0);
+ Ok(())
+ }
+
+ #[test]
+ fn test_payload_serialization() {
+ let record = WebextRecord {
+ guid: SyncGuid::new("guid"),
+ ext_id: "ext-id".to_string(),
+ data: "{}".to_string(),
+ };
+
+ let outgoing = OutgoingBso::from_content_with_id(record).unwrap();
+
+ // The envelope should have our ID.
+ assert_eq!(outgoing.envelope.id, "guid");
+
+ let outgoing_payload =
+ serde_json::from_str::<serde_json::Value>(&outgoing.payload).unwrap();
+ let outgoing_map = outgoing_payload.as_object().unwrap();
+
+ assert!(outgoing_map.contains_key("id"));
+ assert!(outgoing_map.contains_key("data"));
+ assert!(outgoing_map.contains_key("extId"));
+ assert_eq!(outgoing_map.len(), 3);
+ }
+}
diff --git a/third_party/rust/webext-storage/src/sync/sync_tests.rs b/third_party/rust/webext-storage/src/sync/sync_tests.rs
new file mode 100644
index 0000000000..6940be2e59
--- /dev/null
+++ b/third_party/rust/webext-storage/src/sync/sync_tests.rs
@@ -0,0 +1,529 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// This file tries to simulate "full syncs" - ie, from local state changing, to
+// fetching incoming items, generating items to upload, then updating the local
+// state (including the mirror) as a result.
+
+use crate::api::{clear, get, set};
+use crate::error::*;
+use crate::schema::create_empty_sync_temp_tables;
+use crate::sync::incoming::{apply_actions, get_incoming, plan_incoming, stage_incoming};
+use crate::sync::outgoing::{get_outgoing, record_uploaded, stage_outgoing};
+use crate::sync::test::new_syncable_mem_db;
+use crate::sync::WebextRecord;
+use interrupt_support::NeverInterrupts;
+use rusqlite::{Connection, Row, Transaction};
+use serde_json::json;
+use sql_support::ConnExt;
+use sync15::bso::{IncomingBso, IncomingContent, OutgoingBso};
+use sync_guid::Guid;
+
+// Here we try and simulate everything done by a "full sync", just minus the
+// engine. Returns the records we uploaded.
+fn do_sync(
+ tx: &Transaction<'_>,
+ incoming_payloads: &[IncomingContent<WebextRecord>],
+) -> Result<Vec<OutgoingBso>> {
+ log::info!("test do_sync() starting");
+ // First we stage the incoming in the temp tables.
+ stage_incoming(tx, incoming_payloads, &NeverInterrupts)?;
+ // Then we process them getting a Vec of (item, state), which we turn into
+ // a Vec of (item, action)
+ let actions = get_incoming(tx)?
+ .into_iter()
+ .map(|(item, state)| (item, plan_incoming(state)))
+ .collect();
+ log::debug!("do_sync applying {:?}", actions);
+ apply_actions(tx, actions, &NeverInterrupts)?;
+ // So we've done incoming - do outgoing.
+ stage_outgoing(tx)?;
+ let outgoing = get_outgoing(tx, &NeverInterrupts)?;
+ log::debug!("do_sync has outgoing {:?}", outgoing);
+ record_uploaded(
+ tx,
+ outgoing
+ .iter()
+ .map(|p| p.envelope.id.clone())
+ .collect::<Vec<Guid>>()
+ .as_slice(),
+ &NeverInterrupts,
+ )?;
+ create_empty_sync_temp_tables(tx)?;
+ log::info!("test do_sync() complete");
+ Ok(outgoing)
+}
+
+// Check *both* the mirror and local API have ended up with the specified data.
+fn check_finished_with(conn: &Connection, ext_id: &str, val: serde_json::Value) -> Result<()> {
+ let local = get(conn, ext_id, serde_json::Value::Null)?;
+ assert_eq!(local, val);
+ let guid = get_mirror_guid(conn, ext_id)?;
+ let mirror = get_mirror_data(conn, &guid);
+ assert_eq!(mirror, DbData::Data(val.to_string()));
+ // and there should be zero items with a change counter.
+ let count = conn.query_row_and_then(
+ "SELECT COUNT(*) FROM storage_sync_data WHERE sync_change_counter != 0;",
+ [],
+ |row| row.get::<_, u32>(0),
+ )?;
+ assert_eq!(count, 0);
+ Ok(())
+}
+
+fn get_mirror_guid(conn: &Connection, extid: &str) -> Result<Guid> {
+ let guid = conn.query_row_and_then(
+ "SELECT m.guid FROM storage_sync_mirror m WHERE m.ext_id = ?;",
+ [extid],
+ |row| row.get::<_, Guid>(0),
+ )?;
+ Ok(guid)
+}
+
+#[derive(Debug, PartialEq)]
+enum DbData {
+ NoRow,
+ NullRow,
+ Data(String),
+}
+
+impl DbData {
+ fn has_data(&self) -> bool {
+ matches!(self, DbData::Data(_))
+ }
+}
+
+fn _get(conn: &Connection, id_name: &str, expected_extid: &str, table: &str) -> DbData {
+ let sql = format!("SELECT {} as id, data FROM {}", id_name, table);
+
+ fn from_row(row: &Row<'_>) -> Result<(String, Option<String>)> {
+ Ok((row.get("id")?, row.get("data")?))
+ }
+ let mut items = conn
+ .conn()
+ .query_rows_and_then(&sql, [], from_row)
+ .expect("should work");
+ if items.is_empty() {
+ DbData::NoRow
+ } else {
+ let item = items.pop().expect("it exists");
+ assert_eq!(Guid::new(&item.0), expected_extid);
+ match item.1 {
+ None => DbData::NullRow,
+ Some(v) => DbData::Data(v),
+ }
+ }
+}
+
+fn get_mirror_data(conn: &Connection, expected_extid: &str) -> DbData {
+ _get(conn, "guid", expected_extid, "storage_sync_mirror")
+}
+
+fn get_local_data(conn: &Connection, expected_extid: &str) -> DbData {
+ _get(conn, "ext_id", expected_extid, "storage_sync_data")
+}
+
+fn make_incoming(
+ guid: &Guid,
+ ext_id: &str,
+ data: &serde_json::Value,
+) -> IncomingContent<WebextRecord> {
+ let content = json!({"id": guid, "extId": ext_id, "data": data.to_string()});
+ IncomingBso::from_test_content(content).into_content()
+}
+
+fn make_incoming_tombstone(guid: &Guid) -> IncomingContent<WebextRecord> {
+ IncomingBso::new_test_tombstone(guid.clone()).into_content()
+}
+
+#[test]
+fn test_simple_outgoing_sync() -> Result<()> {
+ // So we are starting with an empty local store and empty server store.
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data.clone())?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ check_finished_with(&tx, "ext-id", data)?;
+ Ok(())
+}
+
+#[test]
+fn test_simple_incoming_sync() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ let bridge_record = make_incoming(&Guid::new("guid"), "ext-id", &data);
+ assert_eq!(do_sync(&tx, &[bridge_record])?.len(), 0);
+ let key1_from_api = get(&tx, "ext-id", json!("key1"))?;
+ assert_eq!(key1_from_api, json!({"key1": "key1-value"}));
+ check_finished_with(&tx, "ext-id", data)?;
+ Ok(())
+}
+
+#[test]
+fn test_outgoing_tombstone() -> Result<()> {
+ // Tombstones are only kept when the mirror has that record - so first
+ // test that, then arrange for the mirror to have the record.
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data.clone())?;
+ assert_eq!(
+ get_local_data(&tx, "ext-id"),
+ DbData::Data(data.to_string())
+ );
+ // hasn't synced yet, so clearing shouldn't write a tombstone.
+ clear(&tx, "ext-id")?;
+ assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow);
+ // now set data again and sync and *then* remove.
+ set(&tx, "ext-id", data)?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ assert!(get_local_data(&tx, "ext-id").has_data());
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ assert!(get_mirror_data(&tx, &guid).has_data());
+ clear(&tx, "ext-id")?;
+ assert_eq!(get_local_data(&tx, "ext-id"), DbData::NullRow);
+ // then after syncing, the tombstone will be in the mirror but the local row
+ // has been removed.
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow);
+ assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow);
+ Ok(())
+}
+
+#[test]
+fn test_incoming_tombstone_exists() -> Result<()> {
+ // An incoming tombstone for a record we've previously synced (and thus
+ // have data for)
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data.clone())?;
+ assert_eq!(
+ get_local_data(&tx, "ext-id"),
+ DbData::Data(data.to_string())
+ );
+ // sync to get data in our mirror.
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ assert!(get_local_data(&tx, "ext-id").has_data());
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ assert!(get_mirror_data(&tx, &guid).has_data());
+ // Now an incoming tombstone for it.
+ let tombstone = make_incoming_tombstone(&guid);
+ assert_eq!(
+ do_sync(&tx, &[tombstone])?.len(),
+ 0,
+ "expect no outgoing records"
+ );
+ assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow);
+ assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow);
+ Ok(())
+}
+
+#[test]
+fn test_incoming_tombstone_not_exists() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ // An incoming tombstone for something that's not anywhere locally.
+ let guid = Guid::new("guid");
+ let tombstone = make_incoming_tombstone(&guid);
+ assert_eq!(
+ do_sync(&tx, &[tombstone])?.len(),
+ 0,
+ "expect no outgoing records"
+ );
+ // But we still keep the tombstone in the mirror.
+ assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow);
+ assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow);
+ Ok(())
+}
+
+#[test]
+fn test_reconciled() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value"});
+ set(&tx, "ext-id", data)?;
+ // Incoming payload with the same data
+ let record = make_incoming(&Guid::new("guid"), "ext-id", &json!({"key1": "key1-value"}));
+ // Should be no outgoing records as we reconciled.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ check_finished_with(&tx, "ext-id", json!({"key1": "key1-value"}))?;
+ Ok(())
+}
+
+/// Tests that we handle things correctly if we get a payload that is
+/// identical to what is in the mirrored table.
+#[test]
+fn test_reconcile_with_null_payload() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value"});
+ set(&tx, "ext-id", data.clone())?;
+ // We try to push this change on the next sync.
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ assert_eq!(get_mirror_data(&tx, &guid), DbData::Data(data.to_string()));
+ // Incoming payload with the same data.
+ // This could happen if, for example, another client changed the
+ // key and then put it back the way it was.
+ let record = make_incoming(&guid, "ext-id", &data);
+ // Should be no outgoing records as we reconciled.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ check_finished_with(&tx, "ext-id", data)?;
+ Ok(())
+}
+
+#[test]
+fn test_accept_incoming_when_local_is_deleted() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ // We only record an extension as deleted locally if it has been
+ // uploaded before being deleted.
+ let data = json!({"key1": "key1-value"});
+ set(&tx, "ext-id", data)?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ clear(&tx, "ext-id")?;
+ // Incoming payload without 'key1'. Because we previously uploaded
+ // key1, this means another client deleted it.
+ let record = make_incoming(&guid, "ext-id", &json!({"key2": "key2-value"}));
+
+ // We completely accept the incoming record.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?;
+ Ok(())
+}
+
+#[test]
+fn test_accept_incoming_when_local_is_deleted_no_mirror() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value"});
+ set(&tx, "ext-id", data)?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ clear(&tx, "ext-id")?;
+
+ // Use a random guid so that we don't find the mirrored data.
+ // This test is somewhat bad because deduping might obviate
+ // the need for it.
+ let record = make_incoming(&Guid::new("guid"), "ext-id", &json!({"key2": "key2-value"}));
+
+ // We completely accept the incoming record.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?;
+ Ok(())
+}
+
+#[test]
+fn test_accept_deleted_key_mirrored() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data)?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ // Incoming payload without 'key1'. Because we previously uploaded
+ // key1, this means another client deleted it.
+ let record = make_incoming(&guid, "ext-id", &json!({"key2": "key2-value"}));
+ // We completely accept the incoming record.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?;
+ Ok(())
+}
+
+#[test]
+fn test_merged_no_mirror() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value"});
+ set(&tx, "ext-id", data)?;
+ // Incoming payload without 'key1' and some data for 'key2'.
+ // Because we never uploaded 'key1', we merge our local values
+ // with the remote.
+ let record = make_incoming(&Guid::new("guid"), "ext-id", &json!({"key2": "key2-value"}));
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(
+ &tx,
+ "ext-id",
+ json!({"key1": "key1-value", "key2": "key2-value"}),
+ )?;
+ Ok(())
+}
+
+#[test]
+fn test_merged_incoming() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let old_data = json!({"key1": "key1-value", "key2": "key2-value", "doomed_key": "deletable"});
+ set(&tx, "ext-id", old_data)?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ // We update 'key1' locally.
+ let local_data = json!({"key1": "key1-new", "key2": "key2-value", "doomed_key": "deletable"});
+ set(&tx, "ext-id", local_data)?;
+ // Incoming payload where another client set 'key2' and removed
+ // the 'doomed_key'.
+ // Because we never uploaded our data, we'll merge our
+ // key1 in, but otherwise keep the server's changes.
+ let record = make_incoming(
+ &guid,
+ "ext-id",
+ &json!({"key1": "key1-value", "key2": "key2-incoming"}),
+ );
+ // We should send our 'key1'
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(
+ &tx,
+ "ext-id",
+ json!({"key1": "key1-new", "key2": "key2-incoming"}),
+ )?;
+ Ok(())
+}
+
+#[test]
+fn test_merged_with_null_payload() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let old_data = json!({"key1": "key1-value"});
+ set(&tx, "ext-id", old_data.clone())?;
+ // Push this change remotely.
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ assert_eq!(
+ get_mirror_data(&tx, &guid),
+ DbData::Data(old_data.to_string())
+ );
+ let local_data = json!({"key1": "key1-new", "key2": "key2-value"});
+ set(&tx, "ext-id", local_data.clone())?;
+ // Incoming payload with the same old data.
+ let record = make_incoming(&guid, "ext-id", &old_data);
+ // Three-way-merge will not detect any change in key1, so we
+ // should keep our entire new value.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(&tx, "ext-id", local_data)?;
+ Ok(())
+}
+
+#[test]
+fn test_deleted_mirrored_object_accept() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data)?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ // Incoming payload with data deleted.
+ // We synchronize this deletion by deleting the keys we think
+ // were on the server.
+ let record = make_incoming_tombstone(&guid);
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow);
+ assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow);
+ Ok(())
+}
+
+#[test]
+fn test_deleted_mirrored_object_merged() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ set(&tx, "ext-id", json!({"key1": "key1-value"}))?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ set(
+ &tx,
+ "ext-id",
+ json!({"key1": "key1-new", "key2": "key2-value"}),
+ )?;
+ // Incoming payload with data deleted.
+ // We synchronize this deletion by deleting the keys we think
+ // were on the server.
+ let record = make_incoming_tombstone(&guid);
+ // This overrides the change to 'key1', but we still upload 'key2'.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?;
+ Ok(())
+}
+
+/// Like the above test, but with a mirrored tombstone.
+#[test]
+fn test_deleted_mirrored_tombstone_merged() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ // Sync some data so we can get the guid for this extension.
+ set(&tx, "ext-id", json!({"key1": "key1-value"}))?;
+ assert_eq!(do_sync(&tx, &[])?.len(), 1);
+ let guid = get_mirror_guid(&tx, "ext-id")?;
+ // Sync a delete for this data so we have a tombstone in the mirror.
+ let record = make_incoming_tombstone(&guid);
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow);
+
+ // Set some data and sync it simultaneously with another incoming delete.
+ set(&tx, "ext-id", json!({"key2": "key2-value"}))?;
+ let record = make_incoming_tombstone(&guid);
+ // We cannot delete any matching keys because there are no
+ // matching keys. Instead we push our data.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?;
+ Ok(())
+}
+
+#[test]
+fn test_deleted_not_mirrored_object_merged() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data)?;
+ // Incoming payload with data deleted.
+ let record = make_incoming_tombstone(&Guid::new("guid"));
+ // We normally delete the keys we think were on the server, but
+ // here we have no information about what was on the server, so we
+ // don't delete anything. We merge in all undeleted keys.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(
+ &tx,
+ "ext-id",
+ json!({"key1": "key1-value", "key2": "key2-value"}),
+ )?;
+ Ok(())
+}
+
+#[test]
+fn test_conflicting_incoming() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let data = json!({"key1": "key1-value", "key2": "key2-value"});
+ set(&tx, "ext-id", data)?;
+ // Incoming payload without 'key1' and conflicting for 'key2'.
+ // Because we never uploaded either of our keys, we'll merge our
+ // key1 in, but the server key2 wins.
+ let record = make_incoming(
+ &Guid::new("guid"),
+ "ext-id",
+ &json!({"key2": "key2-incoming"}),
+ );
+ // We should send our 'key1'
+ assert_eq!(do_sync(&tx, &[record])?.len(), 1);
+ check_finished_with(
+ &tx,
+ "ext-id",
+ json!({"key1": "key1-value", "key2": "key2-incoming"}),
+ )?;
+ Ok(())
+}
+
+#[test]
+fn test_invalid_incoming() -> Result<()> {
+ let mut db = new_syncable_mem_db();
+ let tx = db.transaction()?;
+ let json = json!({"id": "id", "payload": json!("").to_string()});
+ let bso = serde_json::from_value::<IncomingBso>(json).unwrap();
+ let record = bso.into_content();
+
+ // Should do nothing.
+ assert_eq!(do_sync(&tx, &[record])?.len(), 0);
+ Ok(())
+}