diff options
Diffstat (limited to 'third_party/rust/webext-storage/src/sync/outgoing.rs')
-rw-r--r-- | third_party/rust/webext-storage/src/sync/outgoing.rs | 186 |
1 files changed, 186 insertions, 0 deletions
diff --git a/third_party/rust/webext-storage/src/sync/outgoing.rs b/third_party/rust/webext-storage/src/sync/outgoing.rs new file mode 100644 index 0000000000..c6c9dd64e5 --- /dev/null +++ b/third_party/rust/webext-storage/src/sync/outgoing.rs @@ -0,0 +1,186 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// The "outgoing" part of syncing - building the payloads to upload and +// managing the sync state of the local DB. + +use interrupt_support::Interruptee; +use rusqlite::{Connection, Row, Transaction}; +use sql_support::ConnExt; +use sync15::bso::OutgoingBso; +use sync_guid::Guid as SyncGuid; + +use crate::error::*; + +use super::WebextRecord; + +fn outgoing_from_row(row: &Row<'_>) -> Result<OutgoingBso> { + let guid: SyncGuid = row.get("guid")?; + let ext_id: String = row.get("ext_id")?; + let raw_data: Option<String> = row.get("data")?; + Ok(match raw_data { + Some(raw_data) => { + let record = WebextRecord { + guid, + ext_id, + data: raw_data, + }; + OutgoingBso::from_content_with_id(record)? + } + None => OutgoingBso::new_tombstone(guid.into()), + }) +} + +/// Stages info about what should be uploaded in a temp table. This should be +/// called in the same transaction as `apply_actions`. record_uploaded() can be +/// called after the upload is complete and the data in the temp table will be +/// used to update the local store. +pub fn stage_outgoing(tx: &Transaction<'_>) -> Result<()> { + let sql = " + -- Stage outgoing items. The item may not yet have a GUID (ie, it might + -- not already be in either the mirror nor the incoming staging table), + -- so we generate one if it doesn't exist. + INSERT INTO storage_sync_outgoing_staging + (guid, ext_id, data, sync_change_counter) + SELECT coalesce(m.guid, s.guid, generate_guid()), + l.ext_id, l.data, l.sync_change_counter + FROM storage_sync_data l + -- left joins as one or both may not exist. + LEFT JOIN storage_sync_mirror m ON m.ext_id = l.ext_id + LEFT JOIN storage_sync_staging s ON s.ext_id = l.ext_id + WHERE sync_change_counter > 0; + + -- At this point, we've merged in all new records, so copy incoming + -- staging into the mirror so that it matches what's on the server. + INSERT OR REPLACE INTO storage_sync_mirror (guid, ext_id, data) + SELECT guid, ext_id, data FROM temp.storage_sync_staging; + + -- And copy any incoming records that we aren't reuploading into the + -- local table. We'll copy the outgoing ones into the mirror and local + -- after we upload them. + INSERT OR REPLACE INTO storage_sync_data (ext_id, data, sync_change_counter) + SELECT ext_id, data, 0 + FROM storage_sync_staging s + WHERE ext_id IS NOT NULL + AND NOT EXISTS(SELECT 1 FROM storage_sync_outgoing_staging o + WHERE o.guid = s.guid);"; + tx.execute_batch(sql)?; + Ok(()) +} + +/// Returns a vec of the outgoing records which should be uploaded. +pub fn get_outgoing(conn: &Connection, signal: &dyn Interruptee) -> Result<Vec<OutgoingBso>> { + let sql = "SELECT guid, ext_id, data FROM storage_sync_outgoing_staging"; + let elts = conn + .conn() + .query_rows_and_then(sql, [], |row| -> Result<_> { + signal.err_if_interrupted()?; + outgoing_from_row(row) + })?; + + log::debug!("get_outgoing found {} items", elts.len()); + Ok(elts.into_iter().collect()) +} + +/// Record the fact that items were uploaded. This updates the state of the +/// local DB to reflect the state of the server we just updated. +/// Note that this call is almost certainly going to be made in a *different* +/// transaction than the transaction used in `stage_outgoing()`, and it will +/// be called once per batch upload. +pub fn record_uploaded( + tx: &Transaction<'_>, + items: &[SyncGuid], + signal: &dyn Interruptee, +) -> Result<()> { + log::debug!( + "record_uploaded recording that {} items were uploaded", + items.len() + ); + + // Updating the `was_uploaded` column fires the `record_uploaded` trigger, + // which updates the local change counter and writes the uploaded record + // data back to the mirror. + sql_support::each_chunk(items, |chunk, _| -> Result<()> { + signal.err_if_interrupted()?; + let sql = format!( + "UPDATE storage_sync_outgoing_staging SET + was_uploaded = 1 + WHERE guid IN ({})", + sql_support::repeat_sql_vars(chunk.len()), + ); + tx.execute(&sql, rusqlite::params_from_iter(chunk))?; + Ok(()) + })?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::super::test::new_syncable_mem_db; + use super::*; + use interrupt_support::NeverInterrupts; + + #[test] + fn test_simple() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + + tx.execute_batch( + r#" + INSERT INTO storage_sync_data (ext_id, data, sync_change_counter) + VALUES + ('ext_no_changes', '{"foo":"bar"}', 0), + ('ext_with_changes', '{"foo":"bar"}', 1); + "#, + )?; + + stage_outgoing(&tx)?; + let changes = get_outgoing(&tx, &NeverInterrupts)?; + assert_eq!(changes.len(), 1); + let record: serde_json::Value = serde_json::from_str(&changes[0].payload).unwrap(); + let ext_id = record.get("extId").unwrap().as_str().unwrap(); + + assert_eq!(ext_id, "ext_with_changes"); + + record_uploaded( + &tx, + changes + .into_iter() + .map(|p| p.envelope.id) + .collect::<Vec<SyncGuid>>() + .as_slice(), + &NeverInterrupts, + )?; + + let counter: i32 = tx.conn().query_one( + "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_with_changes'", + )?; + assert_eq!(counter, 0); + Ok(()) + } + + #[test] + fn test_payload_serialization() { + let record = WebextRecord { + guid: SyncGuid::new("guid"), + ext_id: "ext-id".to_string(), + data: "{}".to_string(), + }; + + let outgoing = OutgoingBso::from_content_with_id(record).unwrap(); + + // The envelope should have our ID. + assert_eq!(outgoing.envelope.id, "guid"); + + let outgoing_payload = + serde_json::from_str::<serde_json::Value>(&outgoing.payload).unwrap(); + let outgoing_map = outgoing_payload.as_object().unwrap(); + + assert!(outgoing_map.contains_key("id")); + assert!(outgoing_map.contains_key("data")); + assert!(outgoing_map.contains_key("extId")); + assert_eq!(outgoing_map.len(), 3); + } +} |