diff options
Diffstat (limited to 'third_party/rust/tabs/src/sync/bridge.rs')
-rw-r--r-- | third_party/rust/tabs/src/sync/bridge.rs | 436 |
1 files changed, 436 insertions, 0 deletions
diff --git a/third_party/rust/tabs/src/sync/bridge.rs b/third_party/rust/tabs/src/sync/bridge.rs new file mode 100644 index 0000000000..6e37a0cf5c --- /dev/null +++ b/third_party/rust/tabs/src/sync/bridge.rs @@ -0,0 +1,436 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::TabsApiError; +use crate::sync::engine::TabsSyncImpl; +use crate::TabsStore; +use anyhow::Result; +use std::sync::{Arc, Mutex}; +use sync15::bso::{IncomingBso, OutgoingBso}; +use sync15::engine::{ApplyResults, BridgedEngine, CollSyncIds, EngineSyncAssociation}; +use sync15::{ClientData, ServerTimestamp}; +use sync_guid::Guid as SyncGuid; + +impl TabsStore { + // Returns a bridged sync engine for Desktop for this store. + pub fn bridged_engine(self: Arc<Self>) -> Arc<TabsBridgedEngine> { + let bridge_impl = crate::sync::bridge::BridgedEngineImpl::new(&self); + // This is a concrete struct exposed via uniffi. + let concrete = TabsBridgedEngine::new(bridge_impl); + Arc::new(concrete) + } +} + +/// A bridged engine implements all the methods needed to make the +/// `storage.sync` store work with Desktop's Sync implementation. +/// Conceptually it's very similar to our SyncEngine, and once we have SyncEngine +/// and BridgedEngine using the same types we can probably combine them (or at least +/// implement this bridged engine purely in terms of SyncEngine) +/// See also #2841 and #5139 +pub struct BridgedEngineImpl { + sync_impl: Mutex<TabsSyncImpl>, + incoming: Mutex<Vec<IncomingBso>>, +} + +impl BridgedEngineImpl { + /// Creates a bridged engine for syncing. + pub fn new(store: &Arc<TabsStore>) -> Self { + Self { + sync_impl: Mutex::new(TabsSyncImpl::new(store.clone())), + incoming: Mutex::default(), + } + } +} + +impl BridgedEngine for BridgedEngineImpl { + fn last_sync(&self) -> Result<i64> { + Ok(self + .sync_impl + .lock() + .unwrap() + .get_last_sync()? + .unwrap_or_default() + .as_millis()) + } + + fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> { + self.sync_impl + .lock() + .unwrap() + .set_last_sync(ServerTimestamp::from_millis(last_sync_millis))?; + Ok(()) + } + + fn sync_id(&self) -> Result<Option<String>> { + Ok(match self.sync_impl.lock().unwrap().get_sync_assoc()? { + EngineSyncAssociation::Connected(id) => Some(id.coll.to_string()), + EngineSyncAssociation::Disconnected => None, + }) + } + + fn reset_sync_id(&self) -> Result<String> { + let new_id = SyncGuid::random().to_string(); + let new_coll_ids = CollSyncIds { + global: SyncGuid::empty(), + coll: new_id.clone().into(), + }; + self.sync_impl + .lock() + .unwrap() + .reset(&EngineSyncAssociation::Connected(new_coll_ids))?; + Ok(new_id) + } + + fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> { + let mut sync_impl = self.sync_impl.lock().unwrap(); + let assoc = sync_impl.get_sync_assoc()?; + if matches!(assoc, EngineSyncAssociation::Connected(c) if c.coll == sync_id) { + log::debug!("ensure_current_sync_id is current"); + } else { + let new_coll_ids = CollSyncIds { + global: SyncGuid::empty(), + coll: sync_id.into(), + }; + sync_impl.reset(&EngineSyncAssociation::Connected(new_coll_ids))?; + } + Ok(sync_id.to_string()) + } + + fn prepare_for_sync(&self, client_data: &str) -> Result<()> { + let data: ClientData = serde_json::from_str(client_data)?; + Ok(self.sync_impl.lock().unwrap().prepare_for_sync(data)?) + } + + fn sync_started(&self) -> Result<()> { + // This is a no-op for the Tabs Engine + Ok(()) + } + + fn store_incoming(&self, incoming: Vec<IncomingBso>) -> Result<()> { + // Store the incoming payload in memory so we can use it in apply + *(self.incoming.lock().unwrap()) = incoming; + Ok(()) + } + + fn apply(&self) -> Result<ApplyResults> { + let mut incoming = self.incoming.lock().unwrap(); + // We've a reference to a Vec<> but it's owned by the mutex - swap the mutex owned + // value for an empty vec so we can consume the original. + let mut records = Vec::new(); + std::mem::swap(&mut records, &mut *incoming); + let mut telem = sync15::telemetry::Engine::new("tabs"); + + let mut sync_impl = self.sync_impl.lock().unwrap(); + let outgoing = sync_impl.apply_incoming(records, &mut telem)?; + + Ok(ApplyResults { + records: outgoing, + num_reconciled: Some(0), + }) + } + + fn set_uploaded(&self, server_modified_millis: i64, ids: &[SyncGuid]) -> Result<()> { + Ok(self + .sync_impl + .lock() + .unwrap() + .sync_finished(ServerTimestamp::from_millis(server_modified_millis), ids)?) + } + + fn sync_finished(&self) -> Result<()> { + *(self.incoming.lock().unwrap()) = Vec::default(); + Ok(()) + } + + fn reset(&self) -> Result<()> { + self.sync_impl + .lock() + .unwrap() + .reset(&EngineSyncAssociation::Disconnected)?; + Ok(()) + } + + fn wipe(&self) -> Result<()> { + self.sync_impl.lock().unwrap().wipe()?; + Ok(()) + } +} + +// This is for uniffi to expose, and does nothing than delegate back to the trait. +pub struct TabsBridgedEngine { + bridge_impl: BridgedEngineImpl, +} + +impl TabsBridgedEngine { + pub fn new(bridge_impl: BridgedEngineImpl) -> Self { + Self { bridge_impl } + } + + pub fn last_sync(&self) -> Result<i64> { + self.bridge_impl.last_sync() + } + + pub fn set_last_sync(&self, last_sync: i64) -> Result<()> { + self.bridge_impl.set_last_sync(last_sync) + } + + pub fn sync_id(&self) -> Result<Option<String>> { + self.bridge_impl.sync_id() + } + + pub fn reset_sync_id(&self) -> Result<String> { + self.bridge_impl.reset_sync_id() + } + + pub fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> { + self.bridge_impl.ensure_current_sync_id(sync_id) + } + + pub fn prepare_for_sync(&self, client_data: &str) -> Result<()> { + self.bridge_impl.prepare_for_sync(client_data) + } + + pub fn sync_started(&self) -> Result<()> { + self.bridge_impl.sync_started() + } + + // Decode the JSON-encoded IncomingBso's that UniFFI passes to us + fn convert_incoming_bsos(&self, incoming: Vec<String>) -> Result<Vec<IncomingBso>> { + let mut bsos = Vec::with_capacity(incoming.len()); + for inc in incoming { + bsos.push(serde_json::from_str::<IncomingBso>(&inc)?); + } + Ok(bsos) + } + + // Encode OutgoingBso's into JSON for UniFFI + fn convert_outgoing_bsos(&self, outgoing: Vec<OutgoingBso>) -> Result<Vec<String>> { + let mut bsos = Vec::with_capacity(outgoing.len()); + for e in outgoing { + bsos.push(serde_json::to_string(&e)?); + } + Ok(bsos) + } + + pub fn store_incoming(&self, incoming: Vec<String>) -> Result<()> { + self.bridge_impl + .store_incoming(self.convert_incoming_bsos(incoming)?) + } + + pub fn apply(&self) -> Result<Vec<String>> { + let apply_results = self.bridge_impl.apply()?; + self.convert_outgoing_bsos(apply_results.records) + } + + pub fn set_uploaded(&self, server_modified_millis: i64, guids: Vec<SyncGuid>) -> Result<()> { + self.bridge_impl + .set_uploaded(server_modified_millis, &guids) + } + + pub fn sync_finished(&self) -> Result<()> { + self.bridge_impl.sync_finished() + } + + pub fn reset(&self) -> Result<()> { + self.bridge_impl.reset() + } + + pub fn wipe(&self) -> Result<()> { + self.bridge_impl.wipe() + } +} + +impl From<anyhow::Error> for TabsApiError { + fn from(value: anyhow::Error) -> Self { + TabsApiError::UnexpectedTabsError { + reason: value.to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::{RemoteTab, TABS_CLIENT_TTL}; + use crate::sync::record::TabsRecordTab; + use serde_json::json; + use std::collections::HashMap; + use sync15::{ClientData, DeviceType, RemoteClient}; + + // A copy of the normal "engine" tests but which go via the bridge + #[test] + fn test_sync_via_bridge() { + env_logger::try_init().ok(); + + let store = Arc::new(TabsStore::new_with_mem_path("test-bridge_incoming")); + + // Set some local tabs for our device. + let my_tabs = vec![ + RemoteTab { + title: "my first tab".to_string(), + url_history: vec!["http://1.com".to_string()], + icon: None, + last_used: 2, + }, + RemoteTab { + title: "my second tab".to_string(), + url_history: vec!["http://2.com".to_string()], + icon: None, + last_used: 1, + }, + ]; + store.set_local_tabs(my_tabs.clone()); + + let bridge = store.bridged_engine(); + + let client_data = ClientData { + local_client_id: "my-device".to_string(), + recent_clients: HashMap::from([ + ( + "my-device".to_string(), + RemoteClient { + fxa_device_id: None, + device_name: "my device".to_string(), + device_type: sync15::DeviceType::Unknown, + }, + ), + ( + "device-no-tabs".to_string(), + RemoteClient { + fxa_device_id: None, + device_name: "device with no tabs".to_string(), + device_type: DeviceType::Unknown, + }, + ), + ( + "device-with-a-tab".to_string(), + RemoteClient { + fxa_device_id: None, + device_name: "device with a tab".to_string(), + device_type: DeviceType::Unknown, + }, + ), + ]), + }; + bridge + .prepare_for_sync(&serde_json::to_string(&client_data).unwrap()) + .expect("should work"); + + let records = vec![ + // my-device should be ignored by sync - here it is acting as what our engine last + // wrote, but the actual tabs in our store we set above are what should be used. + json!({ + "id": "my-device", + "clientName": "my device", + "tabs": [{ + "title": "the title", + "urlHistory": [ + "https://mozilla.org/" + ], + "icon": "https://mozilla.org/icon", + "lastUsed": 1643764207 + }] + }), + json!({ + "id": "device-no-tabs", + "clientName": "device with no tabs", + "tabs": [], + }), + json!({ + "id": "device-with-a-tab", + "clientName": "device with a tab", + "tabs": [{ + "title": "the title", + "urlHistory": [ + "https://mozilla.org/" + ], + "icon": "https://mozilla.org/icon", + "lastUsed": 1643764207 + }] + }), + // This has the main payload as OK but the tabs part invalid. + json!({ + "id": "device-with-invalid-tab", + "clientName": "device with a tab", + "tabs": [{ + "foo": "bar", + }] + }), + // We want this to be a valid payload but an invalid tab - so it needs an ID. + json!({ + "id": "invalid-tab", + "foo": "bar" + }), + ]; + + let mut incoming = Vec::new(); + for record in records { + // Annoyingly we can't use `IncomingEnvelope` directly as it intentionally doesn't + // support Serialize - so need to use explicit json. + let envelope = json!({ + "id": record.get("id"), + "modified": 0, + "payload": serde_json::to_string(&record).unwrap(), + }); + incoming.push(serde_json::to_string(&envelope).unwrap()); + } + + bridge.store_incoming(incoming).expect("should store"); + + let out = bridge.apply().expect("should apply"); + + assert_eq!(out.len(), 1); + let ours = serde_json::from_str::<serde_json::Value>(&out[0]).unwrap(); + // As above, can't use `OutgoingEnvelope` as it doesn't Deserialize. + // First, convert my_tabs from the local `RemoteTab` to the Sync specific `TabsRecord` + let expected_tabs: Vec<TabsRecordTab> = + my_tabs.into_iter().map(|t| t.to_record_tab()).collect(); + let expected = json!({ + "id": "my-device".to_string(), + "payload": json!({ + "id": "my-device".to_string(), + "clientName": "my device", + "tabs": serde_json::to_value(expected_tabs).unwrap(), + }).to_string(), + "ttl": TABS_CLIENT_TTL, + }); + + assert_eq!(ours, expected); + bridge.set_uploaded(1234, vec![]).unwrap(); + assert_eq!(bridge.last_sync().unwrap(), 1234); + } + + #[test] + fn test_sync_meta() { + env_logger::try_init().ok(); + + let store = Arc::new(TabsStore::new_with_mem_path("test-meta")); + let bridge = store.bridged_engine(); + + // Should not error or panic + assert_eq!(bridge.last_sync().unwrap(), 0); + bridge.set_last_sync(3).unwrap(); + assert_eq!(bridge.last_sync().unwrap(), 3); + + assert!(bridge.sync_id().unwrap().is_none()); + + bridge.ensure_current_sync_id("some_guid").unwrap(); + assert_eq!(bridge.sync_id().unwrap(), Some("some_guid".to_string())); + // changing the sync ID should reset the timestamp + assert_eq!(bridge.last_sync().unwrap(), 0); + bridge.set_last_sync(3).unwrap(); + + bridge.reset_sync_id().unwrap(); + // should now be a random guid. + assert_ne!(bridge.sync_id().unwrap(), Some("some_guid".to_string())); + // should have reset the last sync timestamp. + assert_eq!(bridge.last_sync().unwrap(), 0); + bridge.set_last_sync(3).unwrap(); + + // `reset` clears the guid and the timestamp + bridge.reset().unwrap(); + assert_eq!(bridge.last_sync().unwrap(), 0); + assert!(bridge.sync_id().unwrap().is_none()); + } +} |