diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/webext-storage/src/sync | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/webext-storage/src/sync')
-rw-r--r-- | third_party/rust/webext-storage/src/sync/bridge.rs | 388 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/sync/incoming.rs | 863 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/sync/mod.rs | 429 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/sync/outgoing.rs | 186 | ||||
-rw-r--r-- | third_party/rust/webext-storage/src/sync/sync_tests.rs | 529 |
5 files changed, 2395 insertions, 0 deletions
diff --git a/third_party/rust/webext-storage/src/sync/bridge.rs b/third_party/rust/webext-storage/src/sync/bridge.rs new file mode 100644 index 0000000000..760a0d0e10 --- /dev/null +++ b/third_party/rust/webext-storage/src/sync/bridge.rs @@ -0,0 +1,388 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use anyhow::Result; +use rusqlite::Transaction; +use std::sync::{Arc, Weak}; +use sync15::bso::IncomingBso; +use sync15::engine::ApplyResults; +use sync_guid::Guid as SyncGuid; + +use crate::db::{delete_meta, get_meta, put_meta, ThreadSafeStorageDb}; +use crate::schema; +use crate::sync::incoming::{apply_actions, get_incoming, plan_incoming, stage_incoming}; +use crate::sync::outgoing::{get_outgoing, record_uploaded, stage_outgoing}; + +const LAST_SYNC_META_KEY: &str = "last_sync_time"; +const SYNC_ID_META_KEY: &str = "sync_id"; + +/// A bridged engine implements all the methods needed to make the +/// `storage.sync` store work with Desktop's Sync implementation. +/// Conceptually, it's similar to `sync15::Store`, which we +/// should eventually rename and unify with this trait (#2841). +/// +/// Unlike most of our other implementation which hold a strong reference +/// to the store, this engine keeps a weak reference in an attempt to keep +/// the desktop semantics as close as possible to what they were when the +/// engines all took lifetime params to ensure they don't outlive the store. +pub struct BridgedEngine { + db: Weak<ThreadSafeStorageDb>, +} + +impl BridgedEngine { + /// Creates a bridged engine for syncing. + pub fn new(db: &Arc<ThreadSafeStorageDb>) -> Self { + BridgedEngine { + db: Arc::downgrade(db), + } + } + + fn do_reset(&self, tx: &Transaction<'_>) -> Result<()> { + tx.execute_batch( + "DELETE FROM storage_sync_mirror; + UPDATE storage_sync_data SET sync_change_counter = 1;", + )?; + delete_meta(tx, LAST_SYNC_META_KEY)?; + Ok(()) + } + + fn thread_safe_storage_db(&self) -> Result<Arc<ThreadSafeStorageDb>> { + self.db + .upgrade() + .ok_or_else(|| crate::error::ErrorKind::DatabaseConnectionClosed.into()) + } +} + +impl sync15::engine::BridgedEngine for BridgedEngine { + fn last_sync(&self) -> Result<i64> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + Ok(get_meta(&db, LAST_SYNC_META_KEY)?.unwrap_or(0)) + } + + fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + put_meta(&db, LAST_SYNC_META_KEY, &last_sync_millis)?; + Ok(()) + } + + fn sync_id(&self) -> Result<Option<String>> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + Ok(get_meta(&db, SYNC_ID_META_KEY)?) + } + + fn reset_sync_id(&self) -> Result<String> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let tx = db.unchecked_transaction()?; + let new_id = SyncGuid::random().to_string(); + self.do_reset(&tx)?; + put_meta(&tx, SYNC_ID_META_KEY, &new_id)?; + tx.commit()?; + Ok(new_id) + } + + fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let current: Option<String> = get_meta(&db, SYNC_ID_META_KEY)?; + Ok(match current { + Some(current) if current == sync_id => current, + _ => { + let tx = db.unchecked_transaction()?; + self.do_reset(&tx)?; + let result = sync_id.to_string(); + put_meta(&tx, SYNC_ID_META_KEY, &result)?; + tx.commit()?; + result + } + }) + } + + fn sync_started(&self) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + schema::create_empty_sync_temp_tables(&db)?; + Ok(()) + } + + fn store_incoming(&self, incoming_bsos: Vec<IncomingBso>) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let signal = db.begin_interrupt_scope()?; + let tx = db.unchecked_transaction()?; + let incoming_content: Vec<_> = incoming_bsos + .into_iter() + .map(IncomingBso::into_content::<super::WebextRecord>) + .collect(); + stage_incoming(&tx, &incoming_content, &signal)?; + tx.commit()?; + Ok(()) + } + + fn apply(&self) -> Result<ApplyResults> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let signal = db.begin_interrupt_scope()?; + + let tx = db.unchecked_transaction()?; + let incoming = get_incoming(&tx)?; + let actions = incoming + .into_iter() + .map(|(item, state)| (item, plan_incoming(state))) + .collect(); + apply_actions(&tx, actions, &signal)?; + stage_outgoing(&tx)?; + tx.commit()?; + + Ok(get_outgoing(&db, &signal)?.into()) + } + + fn set_uploaded(&self, _server_modified_millis: i64, ids: &[SyncGuid]) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let signal = db.begin_interrupt_scope()?; + let tx = db.unchecked_transaction()?; + record_uploaded(&tx, ids, &signal)?; + tx.commit()?; + + Ok(()) + } + + fn sync_finished(&self) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + schema::create_empty_sync_temp_tables(&db)?; + Ok(()) + } + + fn reset(&self) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let tx = db.unchecked_transaction()?; + self.do_reset(&tx)?; + delete_meta(&tx, SYNC_ID_META_KEY)?; + tx.commit()?; + Ok(()) + } + + fn wipe(&self) -> Result<()> { + let shared_db = self.thread_safe_storage_db()?; + let db = shared_db.lock(); + let tx = db.unchecked_transaction()?; + // We assume the meta table is only used by sync. + tx.execute_batch( + "DELETE FROM storage_sync_data; DELETE FROM storage_sync_mirror; DELETE FROM meta;", + )?; + tx.commit()?; + Ok(()) + } +} + +impl From<anyhow::Error> for crate::error::Error { + fn from(value: anyhow::Error) -> Self { + crate::error::ErrorKind::SyncError(value.to_string()).into() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::db::test::new_mem_thread_safe_storage_db; + use crate::db::StorageDb; + use sync15::engine::BridgedEngine; + + fn query_count(conn: &StorageDb, table: &str) -> u32 { + conn.query_row_and_then(&format!("SELECT COUNT(*) FROM {};", table), [], |row| { + row.get::<_, u32>(0) + }) + .expect("should work") + } + + // Sets up mock data for the tests here. + fn setup_mock_data(engine: &super::BridgedEngine) -> Result<()> { + { + let shared = engine.thread_safe_storage_db()?; + let db = shared.lock(); + db.execute( + "INSERT INTO storage_sync_data (ext_id, data, sync_change_counter) + VALUES ('ext-a', 'invalid-json', 2)", + [], + )?; + db.execute( + "INSERT INTO storage_sync_mirror (guid, ext_id, data) + VALUES ('guid', 'ext-a', '3')", + [], + )?; + } + engine.set_last_sync(1)?; + + let shared = engine.thread_safe_storage_db()?; + let db = shared.lock(); + // and assert we wrote what we think we did. + assert_eq!(query_count(&db, "storage_sync_data"), 1); + assert_eq!(query_count(&db, "storage_sync_mirror"), 1); + assert_eq!(query_count(&db, "meta"), 1); + Ok(()) + } + + // Assuming a DB setup with setup_mock_data, assert it was correctly reset. + fn assert_reset(engine: &super::BridgedEngine) -> Result<()> { + // A reset never wipes data... + let shared = engine.thread_safe_storage_db()?; + let db = shared.lock(); + assert_eq!(query_count(&db, "storage_sync_data"), 1); + + // But did reset the change counter. + let cc = db.query_row_and_then( + "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';", + [], + |row| row.get::<_, u32>(0), + )?; + assert_eq!(cc, 1); + // But did wipe the mirror... + assert_eq!(query_count(&db, "storage_sync_mirror"), 0); + // And the last_sync should have been wiped. + assert!(get_meta::<i64>(&db, LAST_SYNC_META_KEY)?.is_none()); + Ok(()) + } + + // Assuming a DB setup with setup_mock_data, assert it has not been reset. + fn assert_not_reset(engine: &super::BridgedEngine) -> Result<()> { + let shared = engine.thread_safe_storage_db()?; + let db = shared.lock(); + assert_eq!(query_count(&db, "storage_sync_data"), 1); + let cc = db.query_row_and_then( + "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';", + [], + |row| row.get::<_, u32>(0), + )?; + assert_eq!(cc, 2); + assert_eq!(query_count(&db, "storage_sync_mirror"), 1); + // And the last_sync should remain. + assert!(get_meta::<i64>(&db, LAST_SYNC_META_KEY)?.is_some()); + Ok(()) + } + + #[test] + fn test_wipe() -> Result<()> { + let strong = new_mem_thread_safe_storage_db(); + let engine = super::BridgedEngine::new(&strong); + + setup_mock_data(&engine)?; + + engine.wipe()?; + + let shared = engine.thread_safe_storage_db()?; + let db = shared.lock(); + + assert_eq!(query_count(&db, "storage_sync_data"), 0); + assert_eq!(query_count(&db, "storage_sync_mirror"), 0); + assert_eq!(query_count(&db, "meta"), 0); + Ok(()) + } + + #[test] + fn test_reset() -> Result<()> { + let strong = new_mem_thread_safe_storage_db(); + let engine = super::BridgedEngine::new(&strong); + + setup_mock_data(&engine)?; + put_meta( + &engine.thread_safe_storage_db()?.lock(), + SYNC_ID_META_KEY, + &"sync-id".to_string(), + )?; + + engine.reset()?; + assert_reset(&engine)?; + // Only an explicit reset kills the sync-id, so check that here. + assert_eq!( + get_meta::<String>(&engine.thread_safe_storage_db()?.lock(), SYNC_ID_META_KEY)?, + None + ); + + Ok(()) + } + + #[test] + fn test_ensure_missing_sync_id() -> Result<()> { + let strong = new_mem_thread_safe_storage_db(); + let engine = super::BridgedEngine::new(&strong); + + setup_mock_data(&engine)?; + + assert_eq!(engine.sync_id()?, None); + // We don't have a sync ID - so setting one should reset. + engine.ensure_current_sync_id("new-id")?; + // should have cause a reset. + assert_reset(&engine)?; + Ok(()) + } + + #[test] + fn test_ensure_new_sync_id() -> Result<()> { + let strong = new_mem_thread_safe_storage_db(); + let engine = super::BridgedEngine::new(&strong); + + setup_mock_data(&engine)?; + + put_meta( + &engine.thread_safe_storage_db()?.lock(), + SYNC_ID_META_KEY, + &"old-id".to_string(), + )?; + assert_not_reset(&engine)?; + assert_eq!(engine.sync_id()?, Some("old-id".to_string())); + + engine.ensure_current_sync_id("new-id")?; + // should have cause a reset. + assert_reset(&engine)?; + // should have the new id. + assert_eq!(engine.sync_id()?, Some("new-id".to_string())); + Ok(()) + } + + #[test] + fn test_ensure_same_sync_id() -> Result<()> { + let strong = new_mem_thread_safe_storage_db(); + let engine = super::BridgedEngine::new(&strong); + + setup_mock_data(&engine)?; + assert_not_reset(&engine)?; + + put_meta( + &engine.thread_safe_storage_db()?.lock(), + SYNC_ID_META_KEY, + &"sync-id".to_string(), + )?; + + engine.ensure_current_sync_id("sync-id")?; + // should not have reset. + assert_not_reset(&engine)?; + Ok(()) + } + + #[test] + fn test_reset_sync_id() -> Result<()> { + let strong = new_mem_thread_safe_storage_db(); + let engine = super::BridgedEngine::new(&strong); + + setup_mock_data(&engine)?; + put_meta( + &engine.thread_safe_storage_db()?.lock(), + SYNC_ID_META_KEY, + &"sync-id".to_string(), + )?; + + assert_eq!(engine.sync_id()?, Some("sync-id".to_string())); + let new_id = engine.reset_sync_id()?; + // should have cause a reset. + assert_reset(&engine)?; + assert_eq!(engine.sync_id()?, Some(new_id)); + Ok(()) + } +} diff --git a/third_party/rust/webext-storage/src/sync/incoming.rs b/third_party/rust/webext-storage/src/sync/incoming.rs new file mode 100644 index 0000000000..5d00fe8de3 --- /dev/null +++ b/third_party/rust/webext-storage/src/sync/incoming.rs @@ -0,0 +1,863 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// The "incoming" part of syncing - handling the incoming rows, staging them, +// working out a plan for them, updating the local data and mirror, etc. + +use interrupt_support::Interruptee; +use rusqlite::{Connection, Row, Transaction}; +use sql_support::ConnExt; +use sync15::bso::{IncomingContent, IncomingKind}; +use sync_guid::Guid as SyncGuid; + +use crate::api::{StorageChanges, StorageValueChange}; +use crate::error::*; + +use super::{merge, remove_matching_keys, JsonMap, WebextRecord}; + +/// The state data can be in. Could be represented as Option<JsonMap>, but this +/// is clearer and independent of how the data is stored. +#[derive(Debug, PartialEq, Eq)] +pub enum DataState { + /// The data was deleted. + Deleted, + /// Data exists, as stored in the map. + Exists(JsonMap), +} + +// A little helper to create a StorageChanges object when we are creating +// a new value with multiple keys that doesn't exist locally. +fn changes_for_new_incoming(new: &JsonMap) -> StorageChanges { + let mut result = StorageChanges::with_capacity(new.len()); + for (key, val) in new.iter() { + result.push(StorageValueChange { + key: key.clone(), + old_value: None, + new_value: Some(val.clone()), + }); + } + result +} + +// This module deals exclusively with the Map inside a JsonValue::Object(). +// This helper reads such a Map from a SQL row, ignoring anything which is +// either invalid JSON or a different JSON type. +fn json_map_from_row(row: &Row<'_>, col: &str) -> Result<DataState> { + let s = row.get::<_, Option<String>>(col)?; + Ok(match s { + None => DataState::Deleted, + Some(s) => match serde_json::from_str(&s) { + Ok(serde_json::Value::Object(m)) => DataState::Exists(m), + _ => { + // We don't want invalid json or wrong types to kill syncing. + // It should be impossible as we never write anything which + // could cause it, but we can't really log the bad data as there + // might be PII. Logging just a message without any additional + // clues is going to be unhelpfully noisy, so, silently None. + // XXX - Maybe record telemetry? + DataState::Deleted + } + }, + }) +} + +/// The first thing we do with incoming items is to "stage" them in a temp table. +/// The actual processing is done via this table. +pub fn stage_incoming( + tx: &Transaction<'_>, + incoming_records: &[IncomingContent<WebextRecord>], + signal: &dyn Interruptee, +) -> Result<()> { + sql_support::each_sized_chunk( + incoming_records, + // We bind 3 params per chunk. + sql_support::default_max_variable_number() / 3, + |chunk, _| -> Result<()> { + let mut params = Vec::with_capacity(chunk.len() * 3); + for record in chunk { + signal.err_if_interrupted()?; + match &record.kind { + IncomingKind::Content(r) => { + params.push(Some(record.envelope.id.to_string())); + params.push(Some(r.ext_id.to_string())); + params.push(Some(r.data.clone())); + } + IncomingKind::Tombstone => { + params.push(Some(record.envelope.id.to_string())); + params.push(None); + params.push(None); + } + IncomingKind::Malformed => { + log::error!("Ignoring incoming malformed record: {}", record.envelope.id); + } + } + } + // we might have skipped records + let actual_len = params.len() / 3; + if actual_len != 0 { + let sql = format!( + "INSERT OR REPLACE INTO temp.storage_sync_staging + (guid, ext_id, data) + VALUES {}", + sql_support::repeat_multi_values(actual_len, 3) + ); + tx.execute(&sql, rusqlite::params_from_iter(params))?; + } + Ok(()) + }, + )?; + Ok(()) +} + +/// The "state" we find ourselves in when considering an incoming/staging +/// record. This "state" is the input to calculating the IncomingAction and +/// carries all the data we need to make the required local changes. +#[derive(Debug, PartialEq, Eq)] +pub enum IncomingState { + /// There's an incoming item, but data for that extension doesn't exist + /// either in our local data store or in the local mirror. IOW, this is the + /// very first time we've seen this extension. + IncomingOnlyData { ext_id: String, data: JsonMap }, + + /// An incoming tombstone that doesn't exist locally. Because tombstones + /// don't carry the ext-id, it means it's not in our mirror. We are just + /// going to ignore it, but we track the state for consistency. + IncomingOnlyTombstone, + + /// There's an incoming item and we have data for the same extension in + /// our local store - but not in our mirror. This should be relatively + /// uncommon as it means: + /// * Some other profile has recently installed an extension and synced. + /// * This profile has recently installed the same extension. + /// * This is the first sync for this profile since both those events + /// happened. + HasLocal { + ext_id: String, + incoming: DataState, + local: DataState, + }, + /// There's an incoming item and there's an item for the same extension in + /// the mirror. The addon probably doesn't exist locally, or if it does, + /// the last time we synced we synced the deletion of all data. + NotLocal { + ext_id: String, + incoming: DataState, + mirror: DataState, + }, + /// This will be the most common "incoming" case - there's data incoming, + /// in the mirror and in the local store for an addon. + Everywhere { + ext_id: String, + incoming: DataState, + mirror: DataState, + local: DataState, + }, +} + +/// Get the items we need to process from the staging table. Return details about +/// the item and the state of that item, ready for processing. +pub fn get_incoming(conn: &Connection) -> Result<Vec<(SyncGuid, IncomingState)>> { + let sql = " + SELECT + s.guid as guid, + l.ext_id as l_ext_id, + m.ext_id as m_ext_id, + s.ext_id as s_ext_id, + s.data as s_data, m.data as m_data, l.data as l_data, + l.sync_change_counter + FROM temp.storage_sync_staging s + LEFT JOIN storage_sync_mirror m ON m.guid = s.guid + LEFT JOIN storage_sync_data l on l.ext_id IN (m.ext_id, s.ext_id);"; + + fn from_row(row: &Row<'_>) -> Result<(SyncGuid, IncomingState)> { + let guid = row.get("guid")?; + // This is complicated because the staging row doesn't hold the ext_id. + // However, both the local table and the mirror do. + let mirror_ext_id: Option<String> = row.get("m_ext_id")?; + let local_ext_id: Option<String> = row.get("l_ext_id")?; + let staged_ext_id: Option<String> = row.get("s_ext_id")?; + let incoming = json_map_from_row(row, "s_data")?; + + // We find the state by examining which tables the ext-id exists in, + // using whether that column is null as a proxy for that. + let state = match (local_ext_id, mirror_ext_id) { + (None, None) => { + match staged_ext_id { + Some(ext_id) => { + let data = match incoming { + // incoming record with missing data that's not a + // tombstone shouldn't happen, but we can cope by + // pretending it was an empty json map. + DataState::Deleted => JsonMap::new(), + DataState::Exists(data) => data, + }; + IncomingState::IncomingOnlyData { ext_id, data } + } + None => IncomingState::IncomingOnlyTombstone, + } + } + (Some(ext_id), None) => IncomingState::HasLocal { + ext_id, + incoming, + local: json_map_from_row(row, "l_data")?, + }, + (None, Some(ext_id)) => IncomingState::NotLocal { + ext_id, + incoming, + mirror: json_map_from_row(row, "m_data")?, + }, + (Some(ext_id), Some(_)) => IncomingState::Everywhere { + ext_id, + incoming, + mirror: json_map_from_row(row, "m_data")?, + local: json_map_from_row(row, "l_data")?, + }, + }; + Ok((guid, state)) + } + + conn.conn().query_rows_and_then(sql, [], from_row) +} + +/// This is the set of actions we know how to take *locally* for incoming +/// records. Which one depends on the IncomingState. +/// Every state which updates also records the set of changes we should notify +#[derive(Debug, PartialEq, Eq)] +pub enum IncomingAction { + /// We should locally delete the data for this record + DeleteLocally { + ext_id: String, + changes: StorageChanges, + }, + /// We will take the remote. + TakeRemote { + ext_id: String, + data: JsonMap, + changes: StorageChanges, + }, + /// We merged this data - this is what we came up with. + Merge { + ext_id: String, + data: JsonMap, + changes: StorageChanges, + }, + /// Entry exists locally and it's the same as the incoming record. + Same { ext_id: String }, + /// Incoming tombstone for an item we've never seen. + Nothing, +} + +/// Takes the state of an item and returns the action we should take for it. +pub fn plan_incoming(s: IncomingState) -> IncomingAction { + match s { + IncomingState::Everywhere { + ext_id, + incoming, + local, + mirror, + } => { + // All records exist - but do they all have data? + match (incoming, local, mirror) { + ( + DataState::Exists(incoming_data), + DataState::Exists(local_data), + DataState::Exists(mirror_data), + ) => { + // all records have data - 3-way merge. + merge(ext_id, incoming_data, local_data, Some(mirror_data)) + } + ( + DataState::Exists(incoming_data), + DataState::Exists(local_data), + DataState::Deleted, + ) => { + // No parent, so first time seeing this remotely - 2-way merge + merge(ext_id, incoming_data, local_data, None) + } + (DataState::Exists(incoming_data), DataState::Deleted, _) => { + // Incoming data, removed locally. Server wins. + IncomingAction::TakeRemote { + ext_id, + changes: changes_for_new_incoming(&incoming_data), + data: incoming_data, + } + } + (DataState::Deleted, DataState::Exists(local_data), DataState::Exists(mirror)) => { + // Deleted remotely. + // Treat this as a delete of every key that we + // know was present at the time. + let (result, changes) = remove_matching_keys(local_data, &mirror); + if result.is_empty() { + // If there were no more keys left, we can + // delete our version too. + IncomingAction::DeleteLocally { ext_id, changes } + } else { + IncomingAction::Merge { + ext_id, + data: result, + changes, + } + } + } + (DataState::Deleted, DataState::Exists(local_data), DataState::Deleted) => { + // Perhaps another client created and then deleted + // the whole object for this extension since the + // last time we synced. + // Treat this as a delete of every key that we + // knew was present. Unfortunately, we don't know + // any keys that were present, so we delete no keys. + IncomingAction::Merge { + ext_id, + data: local_data, + changes: StorageChanges::new(), + } + } + (DataState::Deleted, DataState::Deleted, _) => { + // We agree with the remote (regardless of what we + // have mirrored). + IncomingAction::Same { ext_id } + } + } + } + IncomingState::HasLocal { + ext_id, + incoming, + local, + } => { + // So we have a local record and an incoming/staging record, but *not* a + // mirror record. This means some other device has synced this for + // the first time and we are yet to do the same. + match (incoming, local) { + (DataState::Exists(incoming_data), DataState::Exists(local_data)) => { + // This means the extension exists locally and remotely + // but this is the first time we've synced it. That's no problem, it's + // just a 2-way merge... + merge(ext_id, incoming_data, local_data, None) + } + (DataState::Deleted, DataState::Exists(local_data)) => { + // We've data locally, but there's an incoming deletion. + // We would normally remove keys that we knew were + // present on the server, but we don't know what + // was on the server, so we don't remove anything. + IncomingAction::Merge { + ext_id, + data: local_data, + changes: StorageChanges::new(), + } + } + (DataState::Exists(incoming_data), DataState::Deleted) => { + // No data locally, but some is incoming - take it. + IncomingAction::TakeRemote { + ext_id, + changes: changes_for_new_incoming(&incoming_data), + data: incoming_data, + } + } + (DataState::Deleted, DataState::Deleted) => { + // Nothing anywhere - odd, but OK. + IncomingAction::Same { ext_id } + } + } + } + IncomingState::NotLocal { + ext_id, incoming, .. + } => { + // No local data but there's mirror and an incoming record. + // This means a local deletion is being replaced by, or just re-doing + // the incoming record. + match incoming { + DataState::Exists(data) => IncomingAction::TakeRemote { + ext_id, + changes: changes_for_new_incoming(&data), + data, + }, + DataState::Deleted => IncomingAction::Same { ext_id }, + } + } + IncomingState::IncomingOnlyData { ext_id, data } => { + // Only the staging record exists and it's not a tombstone. + // This means it's the first time we've ever seen it. No + // conflict possible, just take the remote. + IncomingAction::TakeRemote { + ext_id, + changes: changes_for_new_incoming(&data), + data, + } + } + IncomingState::IncomingOnlyTombstone => { + // Only the staging record exists and it is a tombstone - nothing to do. + IncomingAction::Nothing + } + } +} + +fn insert_changes(tx: &Transaction<'_>, ext_id: &str, changes: &StorageChanges) -> Result<()> { + tx.execute_cached( + "INSERT INTO temp.storage_sync_applied (ext_id, changes) + VALUES (:ext_id, :changes)", + rusqlite::named_params! { + ":ext_id": ext_id, + ":changes": &serde_json::to_string(&changes)?, + }, + )?; + Ok(()) +} + +// Apply the actions necessary to fully process the incoming items. +pub fn apply_actions( + tx: &Transaction<'_>, + actions: Vec<(SyncGuid, IncomingAction)>, + signal: &dyn Interruptee, +) -> Result<()> { + for (item, action) in actions { + signal.err_if_interrupted()?; + + log::trace!("action for '{:?}': {:?}", item, action); + match action { + IncomingAction::DeleteLocally { ext_id, changes } => { + // Can just nuke it entirely. + tx.execute_cached( + "DELETE FROM storage_sync_data WHERE ext_id = :ext_id", + &[(":ext_id", &ext_id)], + )?; + insert_changes(tx, &ext_id, &changes)?; + } + // We want to update the local record with 'data' and after this update the item no longer is considered dirty. + IncomingAction::TakeRemote { + ext_id, + data, + changes, + } => { + tx.execute_cached( + "INSERT OR REPLACE INTO storage_sync_data(ext_id, data, sync_change_counter) + VALUES (:ext_id, :data, 0)", + rusqlite::named_params! { + ":ext_id": ext_id, + ":data": serde_json::Value::Object(data), + }, + )?; + insert_changes(tx, &ext_id, &changes)?; + } + + // We merged this data, so need to update locally but still consider + // it dirty because the merged data must be uploaded. + IncomingAction::Merge { + ext_id, + data, + changes, + } => { + tx.execute_cached( + "UPDATE storage_sync_data SET data = :data, sync_change_counter = sync_change_counter + 1 WHERE ext_id = :ext_id", + rusqlite::named_params! { + ":ext_id": ext_id, + ":data": serde_json::Value::Object(data), + }, + )?; + insert_changes(tx, &ext_id, &changes)?; + } + + // Both local and remote ended up the same - only need to nuke the + // change counter. + IncomingAction::Same { ext_id } => { + tx.execute_cached( + "UPDATE storage_sync_data SET sync_change_counter = 0 WHERE ext_id = :ext_id", + &[(":ext_id", &ext_id)], + )?; + // no changes to write + } + // Literally nothing to do! + IncomingAction::Nothing => {} + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::super::test::new_syncable_mem_db; + use super::*; + use crate::api; + use interrupt_support::NeverInterrupts; + use serde_json::{json, Value}; + use sync15::bso::IncomingBso; + + // select simple int + fn ssi(conn: &Connection, stmt: &str) -> u32 { + conn.try_query_one(stmt, [], true) + .expect("must work") + .unwrap_or_default() + } + + fn array_to_incoming(mut array: Value) -> Vec<IncomingContent<WebextRecord>> { + let jv = array.as_array_mut().expect("you must pass a json array"); + let mut result = Vec::with_capacity(jv.len()); + for elt in jv { + result.push(IncomingBso::from_test_content(elt.take()).into_content()); + } + result + } + + // Can't find a way to import these from crate::sync::tests... + macro_rules! map { + ($($map:tt)+) => { + json!($($map)+).as_object().unwrap().clone() + }; + } + macro_rules! change { + ($key:literal, None, None) => { + StorageValueChange { + key: $key.to_string(), + old_value: None, + new_value: None, + }; + }; + ($key:literal, $old:tt, None) => { + StorageValueChange { + key: $key.to_string(), + old_value: Some(json!($old)), + new_value: None, + } + }; + ($key:literal, None, $new:tt) => { + StorageValueChange { + key: $key.to_string(), + old_value: None, + new_value: Some(json!($new)), + }; + }; + ($key:literal, $old:tt, $new:tt) => { + StorageValueChange { + key: $key.to_string(), + old_value: Some(json!($old)), + new_value: Some(json!($new)), + } + }; + } + macro_rules! changes { + ( $( $change:expr ),* ) => { + { + let mut changes = StorageChanges::new(); + $( + changes.push($change); + )* + changes + } + }; + } + + #[test] + fn test_incoming_populates_staging() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + + let incoming = json! {[ + { + "id": "guidAAAAAAAA", + "extId": "ext1@example.com", + "data": json!({"foo": "bar"}).to_string(), + } + ]}; + + stage_incoming(&tx, &array_to_incoming(incoming), &NeverInterrupts)?; + // check staging table + assert_eq!( + ssi(&tx, "SELECT count(*) FROM temp.storage_sync_staging"), + 1 + ); + Ok(()) + } + + #[test] + fn test_fetch_incoming_state() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + + // Start with an item just in staging. + tx.execute( + r#" + INSERT INTO temp.storage_sync_staging (guid, ext_id, data) + VALUES ('guid', 'ext_id', '{"foo":"bar"}') + "#, + [], + )?; + + let incoming = get_incoming(&tx)?; + assert_eq!(incoming.len(), 1); + assert_eq!(incoming[0].0, SyncGuid::new("guid"),); + assert_eq!( + incoming[0].1, + IncomingState::IncomingOnlyData { + ext_id: "ext_id".to_string(), + data: map!({"foo": "bar"}), + } + ); + + // Add the same item to the mirror. + tx.execute( + r#" + INSERT INTO storage_sync_mirror (guid, ext_id, data) + VALUES ('guid', 'ext_id', '{"foo":"new"}') + "#, + [], + )?; + let incoming = get_incoming(&tx)?; + assert_eq!(incoming.len(), 1); + assert_eq!( + incoming[0].1, + IncomingState::NotLocal { + ext_id: "ext_id".to_string(), + incoming: DataState::Exists(map!({"foo": "bar"})), + mirror: DataState::Exists(map!({"foo": "new"})), + } + ); + + // and finally the data itself - might as use the API here! + api::set(&tx, "ext_id", json!({"foo": "local"}))?; + let incoming = get_incoming(&tx)?; + assert_eq!(incoming.len(), 1); + assert_eq!( + incoming[0].1, + IncomingState::Everywhere { + ext_id: "ext_id".to_string(), + incoming: DataState::Exists(map!({"foo": "bar"})), + local: DataState::Exists(map!({"foo": "local"})), + mirror: DataState::Exists(map!({"foo": "new"})), + } + ); + Ok(()) + } + + // Like test_fetch_incoming_state, but check NULLs are handled correctly. + #[test] + fn test_fetch_incoming_state_nulls() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + + // Start with a tombstone just in staging. + tx.execute( + r#" + INSERT INTO temp.storage_sync_staging (guid, ext_id, data) + VALUES ('guid', NULL, NULL) + "#, + [], + )?; + + let incoming = get_incoming(&tx)?; + assert_eq!(incoming.len(), 1); + assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone,); + + // Add the same item to the mirror (can't store an ext_id for a + // tombstone in the mirror as incoming tombstones never have it) + tx.execute( + r#" + INSERT INTO storage_sync_mirror (guid, ext_id, data) + VALUES ('guid', NULL, NULL) + "#, + [], + )?; + let incoming = get_incoming(&tx)?; + assert_eq!(incoming.len(), 1); + assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone); + + tx.execute( + r#" + INSERT INTO storage_sync_data (ext_id, data) + VALUES ('ext_id', NULL) + "#, + [], + )?; + let incoming = get_incoming(&tx)?; + assert_eq!(incoming.len(), 1); + assert_eq!( + incoming[0].1, + // IncomingOnly* seems a little odd, but it is because we can't + // tie the tombstones together due to the lack of any ext-id/guid + // mapping in this case. + IncomingState::IncomingOnlyTombstone + ); + Ok(()) + } + + // apply_action tests. + #[derive(Debug, PartialEq)] + struct LocalItem { + data: DataState, + sync_change_counter: i32, + } + + fn get_local_item(conn: &Connection) -> Option<LocalItem> { + conn.try_query_row::<_, Error, _, _>( + "SELECT data, sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_id'", + [], + |row| { + let data = json_map_from_row(row, "data")?; + let sync_change_counter = row.get::<_, i32>(1)?; + Ok(LocalItem { + data, + sync_change_counter, + }) + }, + true, + ) + .expect("query should work") + } + + fn get_applied_item_changes(conn: &Connection) -> Option<StorageChanges> { + // no custom deserialize for storagechanges and we only need it for + // tests, so do it manually. + conn.try_query_row::<_, Error, _, _>( + "SELECT changes FROM temp.storage_sync_applied WHERE ext_id = 'ext_id'", + [], + |row| Ok(serde_json::from_str(&row.get::<_, String>("changes")?)?), + true, + ) + .expect("query should work") + .map(|val: serde_json::Value| { + let ob = val.as_object().expect("should be an object of items"); + let mut result = StorageChanges::with_capacity(ob.len()); + for (key, val) in ob.into_iter() { + let details = val.as_object().expect("elts should be objects"); + result.push(StorageValueChange { + key: key.to_string(), + old_value: details.get("oldValue").cloned(), + new_value: details.get("newValue").cloned(), + }); + } + result + }) + } + + fn do_apply_action(tx: &Transaction<'_>, action: IncomingAction) { + let guid = SyncGuid::new("guid"); + apply_actions(tx, vec![(guid, action)], &NeverInterrupts).expect("should apply"); + } + + #[test] + fn test_apply_actions() -> Result<()> { + let mut db = new_syncable_mem_db(); + + // DeleteLocally - row should be entirely removed. + let tx = db.transaction().expect("transaction should work"); + api::set(&tx, "ext_id", json!({"foo": "local"}))?; + assert_eq!( + api::get(&tx, "ext_id", json!(null))?, + json!({"foo": "local"}) + ); + let changes = changes![change!("foo", "local", None)]; + do_apply_action( + &tx, + IncomingAction::DeleteLocally { + ext_id: "ext_id".to_string(), + changes: changes.clone(), + }, + ); + assert_eq!(api::get(&tx, "ext_id", json!(null))?, json!({})); + // and there should not be a local record at all. + assert!(get_local_item(&tx).is_none()); + assert_eq!(get_applied_item_changes(&tx), Some(changes)); + tx.rollback()?; + + // TakeRemote - replace local data with remote and marked as not dirty. + let tx = db.transaction().expect("transaction should work"); + api::set(&tx, "ext_id", json!({"foo": "local"}))?; + assert_eq!( + api::get(&tx, "ext_id", json!(null))?, + json!({"foo": "local"}) + ); + // data should exist locally with a change recorded. + assert_eq!( + get_local_item(&tx), + Some(LocalItem { + data: DataState::Exists(map!({"foo": "local"})), + sync_change_counter: 1 + }) + ); + let changes = changes![change!("foo", "local", "remote")]; + do_apply_action( + &tx, + IncomingAction::TakeRemote { + ext_id: "ext_id".to_string(), + data: map!({"foo": "remote"}), + changes: changes.clone(), + }, + ); + // data should exist locally with the remote data and not be dirty. + assert_eq!( + get_local_item(&tx), + Some(LocalItem { + data: DataState::Exists(map!({"foo": "remote"})), + sync_change_counter: 0 + }) + ); + assert_eq!(get_applied_item_changes(&tx), Some(changes)); + tx.rollback()?; + + // Merge - like ::TakeRemote, but data remains dirty. + let tx = db.transaction().expect("transaction should work"); + api::set(&tx, "ext_id", json!({"foo": "local"}))?; + assert_eq!( + api::get(&tx, "ext_id", json!(null))?, + json!({"foo": "local"}) + ); + // data should exist locally with a change recorded. + assert_eq!( + get_local_item(&tx), + Some(LocalItem { + data: DataState::Exists(map!({"foo": "local"})), + sync_change_counter: 1 + }) + ); + let changes = changes![change!("foo", "local", "remote")]; + do_apply_action( + &tx, + IncomingAction::Merge { + ext_id: "ext_id".to_string(), + data: map!({"foo": "remote"}), + changes: changes.clone(), + }, + ); + assert_eq!( + get_local_item(&tx), + Some(LocalItem { + data: DataState::Exists(map!({"foo": "remote"})), + sync_change_counter: 2 + }) + ); + assert_eq!(get_applied_item_changes(&tx), Some(changes)); + tx.rollback()?; + + // Same - data stays the same but is marked not dirty. + let tx = db.transaction().expect("transaction should work"); + api::set(&tx, "ext_id", json!({"foo": "local"}))?; + assert_eq!( + api::get(&tx, "ext_id", json!(null))?, + json!({"foo": "local"}) + ); + // data should exist locally with a change recorded. + assert_eq!( + get_local_item(&tx), + Some(LocalItem { + data: DataState::Exists(map!({"foo": "local"})), + sync_change_counter: 1 + }) + ); + do_apply_action( + &tx, + IncomingAction::Same { + ext_id: "ext_id".to_string(), + }, + ); + assert_eq!( + get_local_item(&tx), + Some(LocalItem { + data: DataState::Exists(map!({"foo": "local"})), + sync_change_counter: 0 + }) + ); + assert_eq!(get_applied_item_changes(&tx), None); + tx.rollback()?; + + Ok(()) + } +} diff --git a/third_party/rust/webext-storage/src/sync/mod.rs b/third_party/rust/webext-storage/src/sync/mod.rs new file mode 100644 index 0000000000..3144049dca --- /dev/null +++ b/third_party/rust/webext-storage/src/sync/mod.rs @@ -0,0 +1,429 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +mod bridge; +mod incoming; +mod outgoing; + +#[cfg(test)] +mod sync_tests; + +use crate::api::{StorageChanges, StorageValueChange}; +use crate::db::StorageDb; +use crate::error::*; +use serde::Deserialize; +use serde_derive::*; +use sql_support::ConnExt; +use sync_guid::Guid as SyncGuid; + +pub use bridge::BridgedEngine; +use incoming::IncomingAction; + +type JsonMap = serde_json::Map<String, serde_json::Value>; + +pub const STORAGE_VERSION: usize = 1; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WebextRecord { + #[serde(rename = "id")] + guid: SyncGuid, + #[serde(rename = "extId")] + ext_id: String, + data: String, +} + +// Perform a 2-way or 3-way merge, where the incoming value wins on confict. +fn merge( + ext_id: String, + mut other: JsonMap, + mut ours: JsonMap, + parent: Option<JsonMap>, +) -> IncomingAction { + if other == ours { + return IncomingAction::Same { ext_id }; + } + let old_incoming = other.clone(); + // worst case is keys in each are unique. + let mut changes = StorageChanges::with_capacity(other.len() + ours.len()); + if let Some(parent) = parent { + // Perform 3-way merge. First, for every key in parent, + // compare the parent value with the incoming value to compute + // an implicit "diff". + for (key, parent_value) in parent.into_iter() { + if let Some(incoming_value) = other.remove(&key) { + if incoming_value != parent_value { + log::trace!( + "merge: key {} was updated in incoming - copying value locally", + key + ); + let old_value = ours.remove(&key); + let new_value = Some(incoming_value.clone()); + if old_value != new_value { + changes.push(StorageValueChange { + key: key.clone(), + old_value, + new_value, + }); + } + ours.insert(key, incoming_value); + } + } else { + // Key was not present in incoming value. + // Another client must have deleted it. + log::trace!( + "merge: key {} no longer present in incoming - removing it locally", + key + ); + if let Some(old_value) = ours.remove(&key) { + changes.push(StorageValueChange { + key, + old_value: Some(old_value), + new_value: None, + }); + } + } + } + + // Then, go through every remaining key in incoming. These are + // the ones where a corresponding key does not exist in + // parent, so it is a new key, and we need to add it. + for (key, incoming_value) in other.into_iter() { + log::trace!( + "merge: key {} doesn't occur in parent - copying from incoming", + key + ); + changes.push(StorageValueChange { + key: key.clone(), + old_value: None, + new_value: Some(incoming_value.clone()), + }); + ours.insert(key, incoming_value); + } + } else { + // No parent. Server wins. Overwrite every key in ours with + // the corresponding value in other. + log::trace!("merge: no parent - copying all keys from incoming"); + for (key, incoming_value) in other.into_iter() { + let old_value = ours.remove(&key); + let new_value = Some(incoming_value.clone()); + if old_value != new_value { + changes.push(StorageValueChange { + key: key.clone(), + old_value, + new_value, + }); + } + ours.insert(key, incoming_value); + } + } + + if ours == old_incoming { + IncomingAction::TakeRemote { + ext_id, + data: old_incoming, + changes, + } + } else { + IncomingAction::Merge { + ext_id, + data: ours, + changes, + } + } +} + +fn remove_matching_keys(mut ours: JsonMap, keys_to_remove: &JsonMap) -> (JsonMap, StorageChanges) { + let mut changes = StorageChanges::with_capacity(keys_to_remove.len()); + for key in keys_to_remove.keys() { + if let Some(old_value) = ours.remove(key) { + changes.push(StorageValueChange { + key: key.clone(), + old_value: Some(old_value), + new_value: None, + }); + } + } + (ours, changes) +} + +/// Holds a JSON-serialized map of all synced changes for an extension. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct SyncedExtensionChange { + /// The extension ID. + pub ext_id: String, + /// The contents of a `StorageChanges` struct, in JSON format. We don't + /// deserialize these because they need to be passed back to the browser + /// as strings anyway. + pub changes: String, +} + +// Fetches the applied changes we stashed in the storage_sync_applied table. +pub fn get_synced_changes(db: &StorageDb) -> Result<Vec<SyncedExtensionChange>> { + let signal = db.begin_interrupt_scope()?; + let sql = "SELECT ext_id, changes FROM temp.storage_sync_applied"; + db.conn().query_rows_and_then(sql, [], |row| -> Result<_> { + signal.err_if_interrupted()?; + Ok(SyncedExtensionChange { + ext_id: row.get("ext_id")?, + changes: row.get("changes")?, + }) + }) +} + +// Helpers for tests +#[cfg(test)] +pub mod test { + use crate::db::{test::new_mem_db, StorageDb}; + use crate::schema::create_empty_sync_temp_tables; + + pub fn new_syncable_mem_db() -> StorageDb { + let _ = env_logger::try_init(); + let db = new_mem_db(); + create_empty_sync_temp_tables(&db).expect("should work"); + db + } +} + +#[cfg(test)] +mod tests { + use super::test::new_syncable_mem_db; + use super::*; + use serde_json::json; + + #[test] + fn test_serde_record_ser() { + assert_eq!( + serde_json::to_string(&WebextRecord { + guid: "guid".into(), + ext_id: "ext_id".to_string(), + data: "data".to_string() + }) + .unwrap(), + r#"{"id":"guid","extId":"ext_id","data":"data"}"# + ); + } + + // a macro for these tests - constructs a serde_json::Value::Object + macro_rules! map { + ($($map:tt)+) => { + json!($($map)+).as_object().unwrap().clone() + }; + } + + macro_rules! change { + ($key:literal, None, None) => { + StorageValueChange { + key: $key.to_string(), + old_value: None, + new_value: None, + }; + }; + ($key:literal, $old:tt, None) => { + StorageValueChange { + key: $key.to_string(), + old_value: Some(json!($old)), + new_value: None, + } + }; + ($key:literal, None, $new:tt) => { + StorageValueChange { + key: $key.to_string(), + old_value: None, + new_value: Some(json!($new)), + } + }; + ($key:literal, $old:tt, $new:tt) => { + StorageValueChange { + key: $key.to_string(), + old_value: Some(json!($old)), + new_value: Some(json!($new)), + } + }; + } + macro_rules! changes { + ( ) => { + StorageChanges::new() + }; + ( $( $change:expr ),* ) => { + { + let mut changes = StorageChanges::new(); + $( + changes.push($change); + )* + changes + } + }; + } + + #[test] + fn test_3way_merging() { + // No conflict - identical local and remote. + assert_eq!( + merge( + "ext-id".to_string(), + map!({"one": "one", "two": "two"}), + map!({"two": "two", "one": "one"}), + Some(map!({"parent_only": "parent"})), + ), + IncomingAction::Same { + ext_id: "ext-id".to_string() + } + ); + assert_eq!( + merge( + "ext-id".to_string(), + map!({"other_only": "other", "common": "common"}), + map!({"ours_only": "ours", "common": "common"}), + Some(map!({"parent_only": "parent", "common": "old_common"})), + ), + IncomingAction::Merge { + ext_id: "ext-id".to_string(), + data: map!({"other_only": "other", "ours_only": "ours", "common": "common"}), + changes: changes![change!("other_only", None, "other")], + } + ); + // Simple conflict - parent value is neither local nor incoming. incoming wins. + assert_eq!( + merge( + "ext-id".to_string(), + map!({"other_only": "other", "common": "incoming"}), + map!({"ours_only": "ours", "common": "local"}), + Some(map!({"parent_only": "parent", "common": "parent"})), + ), + IncomingAction::Merge { + ext_id: "ext-id".to_string(), + data: map!({"other_only": "other", "ours_only": "ours", "common": "incoming"}), + changes: changes![ + change!("common", "local", "incoming"), + change!("other_only", None, "other") + ], + } + ); + // Local change, no conflict. + assert_eq!( + merge( + "ext-id".to_string(), + map!({"other_only": "other", "common": "old_value"}), + map!({"ours_only": "ours", "common": "new_value"}), + Some(map!({"parent_only": "parent", "common": "old_value"})), + ), + IncomingAction::Merge { + ext_id: "ext-id".to_string(), + data: map!({"other_only": "other", "ours_only": "ours", "common": "new_value"}), + changes: changes![change!("other_only", None, "other")], + } + ); + // Field was removed remotely. + assert_eq!( + merge( + "ext-id".to_string(), + map!({"other_only": "other"}), + map!({"common": "old_value"}), + Some(map!({"common": "old_value"})), + ), + IncomingAction::TakeRemote { + ext_id: "ext-id".to_string(), + data: map!({"other_only": "other"}), + changes: changes![ + change!("common", "old_value", None), + change!("other_only", None, "other") + ], + } + ); + // Field was removed remotely but we added another one. + assert_eq!( + merge( + "ext-id".to_string(), + map!({"other_only": "other"}), + map!({"common": "old_value", "new_key": "new_value"}), + Some(map!({"common": "old_value"})), + ), + IncomingAction::Merge { + ext_id: "ext-id".to_string(), + data: map!({"other_only": "other", "new_key": "new_value"}), + changes: changes![ + change!("common", "old_value", None), + change!("other_only", None, "other") + ], + } + ); + // Field was removed both remotely and locally. + assert_eq!( + merge( + "ext-id".to_string(), + map!({}), + map!({"new_key": "new_value"}), + Some(map!({"common": "old_value"})), + ), + IncomingAction::Merge { + ext_id: "ext-id".to_string(), + data: map!({"new_key": "new_value"}), + changes: changes![], + } + ); + } + + #[test] + fn test_remove_matching_keys() { + assert_eq!( + remove_matching_keys( + map!({"key1": "value1", "key2": "value2"}), + &map!({"key1": "ignored", "key3": "ignored"}) + ), + ( + map!({"key2": "value2"}), + changes![change!("key1", "value1", None)] + ) + ); + } + + #[test] + fn test_get_synced_changes() -> Result<()> { + let db = new_syncable_mem_db(); + db.execute_batch(&format!( + r#"INSERT INTO temp.storage_sync_applied (ext_id, changes) + VALUES + ('an-extension', '{change1}'), + ('ext"id', '{change2}') + "#, + change1 = serde_json::to_string(&changes![change!("key1", "old-val", None)])?, + change2 = serde_json::to_string(&changes![change!("key-for-second", None, "new-val")])? + ))?; + let changes = get_synced_changes(&db)?; + assert_eq!(changes[0].ext_id, "an-extension"); + // sanity check it's valid! + let c1: JsonMap = + serde_json::from_str(&changes[0].changes).expect("changes must be an object"); + assert_eq!( + c1.get("key1") + .expect("must exist") + .as_object() + .expect("must be an object") + .get("oldValue"), + Some(&json!("old-val")) + ); + + // phew - do it again to check the string got escaped. + assert_eq!( + changes[1], + SyncedExtensionChange { + ext_id: "ext\"id".into(), + changes: r#"{"key-for-second":{"newValue":"new-val"}}"#.into(), + } + ); + assert_eq!(changes[1].ext_id, "ext\"id"); + let c2: JsonMap = + serde_json::from_str(&changes[1].changes).expect("changes must be an object"); + assert_eq!( + c2.get("key-for-second") + .expect("must exist") + .as_object() + .expect("must be an object") + .get("newValue"), + Some(&json!("new-val")) + ); + Ok(()) + } +} diff --git a/third_party/rust/webext-storage/src/sync/outgoing.rs b/third_party/rust/webext-storage/src/sync/outgoing.rs new file mode 100644 index 0000000000..c6c9dd64e5 --- /dev/null +++ b/third_party/rust/webext-storage/src/sync/outgoing.rs @@ -0,0 +1,186 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// The "outgoing" part of syncing - building the payloads to upload and +// managing the sync state of the local DB. + +use interrupt_support::Interruptee; +use rusqlite::{Connection, Row, Transaction}; +use sql_support::ConnExt; +use sync15::bso::OutgoingBso; +use sync_guid::Guid as SyncGuid; + +use crate::error::*; + +use super::WebextRecord; + +fn outgoing_from_row(row: &Row<'_>) -> Result<OutgoingBso> { + let guid: SyncGuid = row.get("guid")?; + let ext_id: String = row.get("ext_id")?; + let raw_data: Option<String> = row.get("data")?; + Ok(match raw_data { + Some(raw_data) => { + let record = WebextRecord { + guid, + ext_id, + data: raw_data, + }; + OutgoingBso::from_content_with_id(record)? + } + None => OutgoingBso::new_tombstone(guid.into()), + }) +} + +/// Stages info about what should be uploaded in a temp table. This should be +/// called in the same transaction as `apply_actions`. record_uploaded() can be +/// called after the upload is complete and the data in the temp table will be +/// used to update the local store. +pub fn stage_outgoing(tx: &Transaction<'_>) -> Result<()> { + let sql = " + -- Stage outgoing items. The item may not yet have a GUID (ie, it might + -- not already be in either the mirror nor the incoming staging table), + -- so we generate one if it doesn't exist. + INSERT INTO storage_sync_outgoing_staging + (guid, ext_id, data, sync_change_counter) + SELECT coalesce(m.guid, s.guid, generate_guid()), + l.ext_id, l.data, l.sync_change_counter + FROM storage_sync_data l + -- left joins as one or both may not exist. + LEFT JOIN storage_sync_mirror m ON m.ext_id = l.ext_id + LEFT JOIN storage_sync_staging s ON s.ext_id = l.ext_id + WHERE sync_change_counter > 0; + + -- At this point, we've merged in all new records, so copy incoming + -- staging into the mirror so that it matches what's on the server. + INSERT OR REPLACE INTO storage_sync_mirror (guid, ext_id, data) + SELECT guid, ext_id, data FROM temp.storage_sync_staging; + + -- And copy any incoming records that we aren't reuploading into the + -- local table. We'll copy the outgoing ones into the mirror and local + -- after we upload them. + INSERT OR REPLACE INTO storage_sync_data (ext_id, data, sync_change_counter) + SELECT ext_id, data, 0 + FROM storage_sync_staging s + WHERE ext_id IS NOT NULL + AND NOT EXISTS(SELECT 1 FROM storage_sync_outgoing_staging o + WHERE o.guid = s.guid);"; + tx.execute_batch(sql)?; + Ok(()) +} + +/// Returns a vec of the outgoing records which should be uploaded. +pub fn get_outgoing(conn: &Connection, signal: &dyn Interruptee) -> Result<Vec<OutgoingBso>> { + let sql = "SELECT guid, ext_id, data FROM storage_sync_outgoing_staging"; + let elts = conn + .conn() + .query_rows_and_then(sql, [], |row| -> Result<_> { + signal.err_if_interrupted()?; + outgoing_from_row(row) + })?; + + log::debug!("get_outgoing found {} items", elts.len()); + Ok(elts.into_iter().collect()) +} + +/// Record the fact that items were uploaded. This updates the state of the +/// local DB to reflect the state of the server we just updated. +/// Note that this call is almost certainly going to be made in a *different* +/// transaction than the transaction used in `stage_outgoing()`, and it will +/// be called once per batch upload. +pub fn record_uploaded( + tx: &Transaction<'_>, + items: &[SyncGuid], + signal: &dyn Interruptee, +) -> Result<()> { + log::debug!( + "record_uploaded recording that {} items were uploaded", + items.len() + ); + + // Updating the `was_uploaded` column fires the `record_uploaded` trigger, + // which updates the local change counter and writes the uploaded record + // data back to the mirror. + sql_support::each_chunk(items, |chunk, _| -> Result<()> { + signal.err_if_interrupted()?; + let sql = format!( + "UPDATE storage_sync_outgoing_staging SET + was_uploaded = 1 + WHERE guid IN ({})", + sql_support::repeat_sql_vars(chunk.len()), + ); + tx.execute(&sql, rusqlite::params_from_iter(chunk))?; + Ok(()) + })?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::super::test::new_syncable_mem_db; + use super::*; + use interrupt_support::NeverInterrupts; + + #[test] + fn test_simple() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + + tx.execute_batch( + r#" + INSERT INTO storage_sync_data (ext_id, data, sync_change_counter) + VALUES + ('ext_no_changes', '{"foo":"bar"}', 0), + ('ext_with_changes', '{"foo":"bar"}', 1); + "#, + )?; + + stage_outgoing(&tx)?; + let changes = get_outgoing(&tx, &NeverInterrupts)?; + assert_eq!(changes.len(), 1); + let record: serde_json::Value = serde_json::from_str(&changes[0].payload).unwrap(); + let ext_id = record.get("extId").unwrap().as_str().unwrap(); + + assert_eq!(ext_id, "ext_with_changes"); + + record_uploaded( + &tx, + changes + .into_iter() + .map(|p| p.envelope.id) + .collect::<Vec<SyncGuid>>() + .as_slice(), + &NeverInterrupts, + )?; + + let counter: i32 = tx.conn().query_one( + "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_with_changes'", + )?; + assert_eq!(counter, 0); + Ok(()) + } + + #[test] + fn test_payload_serialization() { + let record = WebextRecord { + guid: SyncGuid::new("guid"), + ext_id: "ext-id".to_string(), + data: "{}".to_string(), + }; + + let outgoing = OutgoingBso::from_content_with_id(record).unwrap(); + + // The envelope should have our ID. + assert_eq!(outgoing.envelope.id, "guid"); + + let outgoing_payload = + serde_json::from_str::<serde_json::Value>(&outgoing.payload).unwrap(); + let outgoing_map = outgoing_payload.as_object().unwrap(); + + assert!(outgoing_map.contains_key("id")); + assert!(outgoing_map.contains_key("data")); + assert!(outgoing_map.contains_key("extId")); + assert_eq!(outgoing_map.len(), 3); + } +} diff --git a/third_party/rust/webext-storage/src/sync/sync_tests.rs b/third_party/rust/webext-storage/src/sync/sync_tests.rs new file mode 100644 index 0000000000..6940be2e59 --- /dev/null +++ b/third_party/rust/webext-storage/src/sync/sync_tests.rs @@ -0,0 +1,529 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// This file tries to simulate "full syncs" - ie, from local state changing, to +// fetching incoming items, generating items to upload, then updating the local +// state (including the mirror) as a result. + +use crate::api::{clear, get, set}; +use crate::error::*; +use crate::schema::create_empty_sync_temp_tables; +use crate::sync::incoming::{apply_actions, get_incoming, plan_incoming, stage_incoming}; +use crate::sync::outgoing::{get_outgoing, record_uploaded, stage_outgoing}; +use crate::sync::test::new_syncable_mem_db; +use crate::sync::WebextRecord; +use interrupt_support::NeverInterrupts; +use rusqlite::{Connection, Row, Transaction}; +use serde_json::json; +use sql_support::ConnExt; +use sync15::bso::{IncomingBso, IncomingContent, OutgoingBso}; +use sync_guid::Guid; + +// Here we try and simulate everything done by a "full sync", just minus the +// engine. Returns the records we uploaded. +fn do_sync( + tx: &Transaction<'_>, + incoming_payloads: &[IncomingContent<WebextRecord>], +) -> Result<Vec<OutgoingBso>> { + log::info!("test do_sync() starting"); + // First we stage the incoming in the temp tables. + stage_incoming(tx, incoming_payloads, &NeverInterrupts)?; + // Then we process them getting a Vec of (item, state), which we turn into + // a Vec of (item, action) + let actions = get_incoming(tx)? + .into_iter() + .map(|(item, state)| (item, plan_incoming(state))) + .collect(); + log::debug!("do_sync applying {:?}", actions); + apply_actions(tx, actions, &NeverInterrupts)?; + // So we've done incoming - do outgoing. + stage_outgoing(tx)?; + let outgoing = get_outgoing(tx, &NeverInterrupts)?; + log::debug!("do_sync has outgoing {:?}", outgoing); + record_uploaded( + tx, + outgoing + .iter() + .map(|p| p.envelope.id.clone()) + .collect::<Vec<Guid>>() + .as_slice(), + &NeverInterrupts, + )?; + create_empty_sync_temp_tables(tx)?; + log::info!("test do_sync() complete"); + Ok(outgoing) +} + +// Check *both* the mirror and local API have ended up with the specified data. +fn check_finished_with(conn: &Connection, ext_id: &str, val: serde_json::Value) -> Result<()> { + let local = get(conn, ext_id, serde_json::Value::Null)?; + assert_eq!(local, val); + let guid = get_mirror_guid(conn, ext_id)?; + let mirror = get_mirror_data(conn, &guid); + assert_eq!(mirror, DbData::Data(val.to_string())); + // and there should be zero items with a change counter. + let count = conn.query_row_and_then( + "SELECT COUNT(*) FROM storage_sync_data WHERE sync_change_counter != 0;", + [], + |row| row.get::<_, u32>(0), + )?; + assert_eq!(count, 0); + Ok(()) +} + +fn get_mirror_guid(conn: &Connection, extid: &str) -> Result<Guid> { + let guid = conn.query_row_and_then( + "SELECT m.guid FROM storage_sync_mirror m WHERE m.ext_id = ?;", + [extid], + |row| row.get::<_, Guid>(0), + )?; + Ok(guid) +} + +#[derive(Debug, PartialEq)] +enum DbData { + NoRow, + NullRow, + Data(String), +} + +impl DbData { + fn has_data(&self) -> bool { + matches!(self, DbData::Data(_)) + } +} + +fn _get(conn: &Connection, id_name: &str, expected_extid: &str, table: &str) -> DbData { + let sql = format!("SELECT {} as id, data FROM {}", id_name, table); + + fn from_row(row: &Row<'_>) -> Result<(String, Option<String>)> { + Ok((row.get("id")?, row.get("data")?)) + } + let mut items = conn + .conn() + .query_rows_and_then(&sql, [], from_row) + .expect("should work"); + if items.is_empty() { + DbData::NoRow + } else { + let item = items.pop().expect("it exists"); + assert_eq!(Guid::new(&item.0), expected_extid); + match item.1 { + None => DbData::NullRow, + Some(v) => DbData::Data(v), + } + } +} + +fn get_mirror_data(conn: &Connection, expected_extid: &str) -> DbData { + _get(conn, "guid", expected_extid, "storage_sync_mirror") +} + +fn get_local_data(conn: &Connection, expected_extid: &str) -> DbData { + _get(conn, "ext_id", expected_extid, "storage_sync_data") +} + +fn make_incoming( + guid: &Guid, + ext_id: &str, + data: &serde_json::Value, +) -> IncomingContent<WebextRecord> { + let content = json!({"id": guid, "extId": ext_id, "data": data.to_string()}); + IncomingBso::from_test_content(content).into_content() +} + +fn make_incoming_tombstone(guid: &Guid) -> IncomingContent<WebextRecord> { + IncomingBso::new_test_tombstone(guid.clone()).into_content() +} + +#[test] +fn test_simple_outgoing_sync() -> Result<()> { + // So we are starting with an empty local store and empty server store. + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data.clone())?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + check_finished_with(&tx, "ext-id", data)?; + Ok(()) +} + +#[test] +fn test_simple_incoming_sync() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + let bridge_record = make_incoming(&Guid::new("guid"), "ext-id", &data); + assert_eq!(do_sync(&tx, &[bridge_record])?.len(), 0); + let key1_from_api = get(&tx, "ext-id", json!("key1"))?; + assert_eq!(key1_from_api, json!({"key1": "key1-value"})); + check_finished_with(&tx, "ext-id", data)?; + Ok(()) +} + +#[test] +fn test_outgoing_tombstone() -> Result<()> { + // Tombstones are only kept when the mirror has that record - so first + // test that, then arrange for the mirror to have the record. + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data.clone())?; + assert_eq!( + get_local_data(&tx, "ext-id"), + DbData::Data(data.to_string()) + ); + // hasn't synced yet, so clearing shouldn't write a tombstone. + clear(&tx, "ext-id")?; + assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow); + // now set data again and sync and *then* remove. + set(&tx, "ext-id", data)?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + assert!(get_local_data(&tx, "ext-id").has_data()); + let guid = get_mirror_guid(&tx, "ext-id")?; + assert!(get_mirror_data(&tx, &guid).has_data()); + clear(&tx, "ext-id")?; + assert_eq!(get_local_data(&tx, "ext-id"), DbData::NullRow); + // then after syncing, the tombstone will be in the mirror but the local row + // has been removed. + assert_eq!(do_sync(&tx, &[])?.len(), 1); + assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow); + assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow); + Ok(()) +} + +#[test] +fn test_incoming_tombstone_exists() -> Result<()> { + // An incoming tombstone for a record we've previously synced (and thus + // have data for) + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data.clone())?; + assert_eq!( + get_local_data(&tx, "ext-id"), + DbData::Data(data.to_string()) + ); + // sync to get data in our mirror. + assert_eq!(do_sync(&tx, &[])?.len(), 1); + assert!(get_local_data(&tx, "ext-id").has_data()); + let guid = get_mirror_guid(&tx, "ext-id")?; + assert!(get_mirror_data(&tx, &guid).has_data()); + // Now an incoming tombstone for it. + let tombstone = make_incoming_tombstone(&guid); + assert_eq!( + do_sync(&tx, &[tombstone])?.len(), + 0, + "expect no outgoing records" + ); + assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow); + assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow); + Ok(()) +} + +#[test] +fn test_incoming_tombstone_not_exists() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + // An incoming tombstone for something that's not anywhere locally. + let guid = Guid::new("guid"); + let tombstone = make_incoming_tombstone(&guid); + assert_eq!( + do_sync(&tx, &[tombstone])?.len(), + 0, + "expect no outgoing records" + ); + // But we still keep the tombstone in the mirror. + assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow); + assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow); + Ok(()) +} + +#[test] +fn test_reconciled() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value"}); + set(&tx, "ext-id", data)?; + // Incoming payload with the same data + let record = make_incoming(&Guid::new("guid"), "ext-id", &json!({"key1": "key1-value"})); + // Should be no outgoing records as we reconciled. + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + check_finished_with(&tx, "ext-id", json!({"key1": "key1-value"}))?; + Ok(()) +} + +/// Tests that we handle things correctly if we get a payload that is +/// identical to what is in the mirrored table. +#[test] +fn test_reconcile_with_null_payload() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value"}); + set(&tx, "ext-id", data.clone())?; + // We try to push this change on the next sync. + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + assert_eq!(get_mirror_data(&tx, &guid), DbData::Data(data.to_string())); + // Incoming payload with the same data. + // This could happen if, for example, another client changed the + // key and then put it back the way it was. + let record = make_incoming(&guid, "ext-id", &data); + // Should be no outgoing records as we reconciled. + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + check_finished_with(&tx, "ext-id", data)?; + Ok(()) +} + +#[test] +fn test_accept_incoming_when_local_is_deleted() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + // We only record an extension as deleted locally if it has been + // uploaded before being deleted. + let data = json!({"key1": "key1-value"}); + set(&tx, "ext-id", data)?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + clear(&tx, "ext-id")?; + // Incoming payload without 'key1'. Because we previously uploaded + // key1, this means another client deleted it. + let record = make_incoming(&guid, "ext-id", &json!({"key2": "key2-value"})); + + // We completely accept the incoming record. + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?; + Ok(()) +} + +#[test] +fn test_accept_incoming_when_local_is_deleted_no_mirror() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value"}); + set(&tx, "ext-id", data)?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + clear(&tx, "ext-id")?; + + // Use a random guid so that we don't find the mirrored data. + // This test is somewhat bad because deduping might obviate + // the need for it. + let record = make_incoming(&Guid::new("guid"), "ext-id", &json!({"key2": "key2-value"})); + + // We completely accept the incoming record. + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?; + Ok(()) +} + +#[test] +fn test_accept_deleted_key_mirrored() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data)?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + // Incoming payload without 'key1'. Because we previously uploaded + // key1, this means another client deleted it. + let record = make_incoming(&guid, "ext-id", &json!({"key2": "key2-value"})); + // We completely accept the incoming record. + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?; + Ok(()) +} + +#[test] +fn test_merged_no_mirror() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value"}); + set(&tx, "ext-id", data)?; + // Incoming payload without 'key1' and some data for 'key2'. + // Because we never uploaded 'key1', we merge our local values + // with the remote. + let record = make_incoming(&Guid::new("guid"), "ext-id", &json!({"key2": "key2-value"})); + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with( + &tx, + "ext-id", + json!({"key1": "key1-value", "key2": "key2-value"}), + )?; + Ok(()) +} + +#[test] +fn test_merged_incoming() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let old_data = json!({"key1": "key1-value", "key2": "key2-value", "doomed_key": "deletable"}); + set(&tx, "ext-id", old_data)?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + // We update 'key1' locally. + let local_data = json!({"key1": "key1-new", "key2": "key2-value", "doomed_key": "deletable"}); + set(&tx, "ext-id", local_data)?; + // Incoming payload where another client set 'key2' and removed + // the 'doomed_key'. + // Because we never uploaded our data, we'll merge our + // key1 in, but otherwise keep the server's changes. + let record = make_incoming( + &guid, + "ext-id", + &json!({"key1": "key1-value", "key2": "key2-incoming"}), + ); + // We should send our 'key1' + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with( + &tx, + "ext-id", + json!({"key1": "key1-new", "key2": "key2-incoming"}), + )?; + Ok(()) +} + +#[test] +fn test_merged_with_null_payload() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let old_data = json!({"key1": "key1-value"}); + set(&tx, "ext-id", old_data.clone())?; + // Push this change remotely. + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + assert_eq!( + get_mirror_data(&tx, &guid), + DbData::Data(old_data.to_string()) + ); + let local_data = json!({"key1": "key1-new", "key2": "key2-value"}); + set(&tx, "ext-id", local_data.clone())?; + // Incoming payload with the same old data. + let record = make_incoming(&guid, "ext-id", &old_data); + // Three-way-merge will not detect any change in key1, so we + // should keep our entire new value. + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with(&tx, "ext-id", local_data)?; + Ok(()) +} + +#[test] +fn test_deleted_mirrored_object_accept() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data)?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + // Incoming payload with data deleted. + // We synchronize this deletion by deleting the keys we think + // were on the server. + let record = make_incoming_tombstone(&guid); + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + assert_eq!(get_local_data(&tx, "ext-id"), DbData::NoRow); + assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow); + Ok(()) +} + +#[test] +fn test_deleted_mirrored_object_merged() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + set(&tx, "ext-id", json!({"key1": "key1-value"}))?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + set( + &tx, + "ext-id", + json!({"key1": "key1-new", "key2": "key2-value"}), + )?; + // Incoming payload with data deleted. + // We synchronize this deletion by deleting the keys we think + // were on the server. + let record = make_incoming_tombstone(&guid); + // This overrides the change to 'key1', but we still upload 'key2'. + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?; + Ok(()) +} + +/// Like the above test, but with a mirrored tombstone. +#[test] +fn test_deleted_mirrored_tombstone_merged() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + // Sync some data so we can get the guid for this extension. + set(&tx, "ext-id", json!({"key1": "key1-value"}))?; + assert_eq!(do_sync(&tx, &[])?.len(), 1); + let guid = get_mirror_guid(&tx, "ext-id")?; + // Sync a delete for this data so we have a tombstone in the mirror. + let record = make_incoming_tombstone(&guid); + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + assert_eq!(get_mirror_data(&tx, &guid), DbData::NullRow); + + // Set some data and sync it simultaneously with another incoming delete. + set(&tx, "ext-id", json!({"key2": "key2-value"}))?; + let record = make_incoming_tombstone(&guid); + // We cannot delete any matching keys because there are no + // matching keys. Instead we push our data. + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with(&tx, "ext-id", json!({"key2": "key2-value"}))?; + Ok(()) +} + +#[test] +fn test_deleted_not_mirrored_object_merged() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data)?; + // Incoming payload with data deleted. + let record = make_incoming_tombstone(&Guid::new("guid")); + // We normally delete the keys we think were on the server, but + // here we have no information about what was on the server, so we + // don't delete anything. We merge in all undeleted keys. + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with( + &tx, + "ext-id", + json!({"key1": "key1-value", "key2": "key2-value"}), + )?; + Ok(()) +} + +#[test] +fn test_conflicting_incoming() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let data = json!({"key1": "key1-value", "key2": "key2-value"}); + set(&tx, "ext-id", data)?; + // Incoming payload without 'key1' and conflicting for 'key2'. + // Because we never uploaded either of our keys, we'll merge our + // key1 in, but the server key2 wins. + let record = make_incoming( + &Guid::new("guid"), + "ext-id", + &json!({"key2": "key2-incoming"}), + ); + // We should send our 'key1' + assert_eq!(do_sync(&tx, &[record])?.len(), 1); + check_finished_with( + &tx, + "ext-id", + json!({"key1": "key1-value", "key2": "key2-incoming"}), + )?; + Ok(()) +} + +#[test] +fn test_invalid_incoming() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + let json = json!({"id": "id", "payload": json!("").to_string()}); + let bso = serde_json::from_value::<IncomingBso>(json).unwrap(); + let record = bso.into_content(); + + // Should do nothing. + assert_eq!(do_sync(&tx, &[record])?.len(), 0); + Ok(()) +} |