diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/tabs/src | |
parent | Initial commit. (diff) | |
download | firefox-esr-upstream.tar.xz firefox-esr-upstream.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tabs/src')
-rw-r--r-- | third_party/rust/tabs/src/error.rs | 90 | ||||
-rw-r--r-- | third_party/rust/tabs/src/lib.rs | 39 | ||||
-rw-r--r-- | third_party/rust/tabs/src/schema.rs | 150 | ||||
-rw-r--r-- | third_party/rust/tabs/src/storage.rs | 727 | ||||
-rw-r--r-- | third_party/rust/tabs/src/store.rs | 41 | ||||
-rw-r--r-- | third_party/rust/tabs/src/sync/bridge.rs | 436 | ||||
-rw-r--r-- | third_party/rust/tabs/src/sync/engine.rs | 543 | ||||
-rw-r--r-- | third_party/rust/tabs/src/sync/full_sync.rs | 70 | ||||
-rw-r--r-- | third_party/rust/tabs/src/sync/mod.rs | 35 | ||||
-rw-r--r-- | third_party/rust/tabs/src/sync/record.rs | 97 | ||||
-rw-r--r-- | third_party/rust/tabs/src/tabs.udl | 108 |
11 files changed, 2336 insertions, 0 deletions
diff --git a/third_party/rust/tabs/src/error.rs b/third_party/rust/tabs/src/error.rs new file mode 100644 index 0000000000..748c21f656 --- /dev/null +++ b/third_party/rust/tabs/src/error.rs @@ -0,0 +1,90 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use error_support::{ErrorHandling, GetErrorHandling}; + +/// Result enum for the public interface +pub type ApiResult<T> = std::result::Result<T, TabsApiError>; +/// Result enum for internal functions +pub type Result<T> = std::result::Result<T, Error>; + +// Errors we return via the public interface. +#[derive(Debug, thiserror::Error)] +pub enum TabsApiError { + #[error("SyncError: {reason}")] + SyncError { reason: String }, + + #[error("SqlError: {reason}")] + SqlError { reason: String }, + + #[error("Unexpected tabs error: {reason}")] + UnexpectedTabsError { reason: String }, +} + +// Error we use internally +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[cfg(feature = "full-sync")] + #[error("Error synchronizing: {0}")] + SyncAdapterError(#[from] sync15::Error), + + // Note we are abusing this as a kind of "mis-matched feature" error. + // This works because when `full-sync` isn't enabled we don't actually + // handle any sync15 errors as the bridged-engine never returns them. + #[cfg(not(feature = "full-sync"))] + #[error("Sync feature is disabled: {0}")] + SyncAdapterError(String), + + #[error("Error parsing JSON data: {0}")] + JsonError(#[from] serde_json::Error), + + #[error("Missing SyncUnlockInfo Local ID")] + MissingLocalIdError, + + #[error("Error parsing URL: {0}")] + UrlParseError(#[from] url::ParseError), + + #[error("Error executing SQL: {0}")] + SqlError(#[from] rusqlite::Error), + + #[error("Error opening database: {0}")] + OpenDatabaseError(#[from] sql_support::open_database::Error), +} + +// Define how our internal errors are handled and converted to external errors +// See `support/error/README.md` for how this works, especially the warning about PII. +impl GetErrorHandling for Error { + type ExternalError = TabsApiError; + + fn get_error_handling(&self) -> ErrorHandling<Self::ExternalError> { + match self { + Self::SyncAdapterError(e) => ErrorHandling::convert(TabsApiError::SyncError { + reason: e.to_string(), + }) + .report_error("tabs-sync-error"), + Self::JsonError(e) => ErrorHandling::convert(TabsApiError::UnexpectedTabsError { + reason: e.to_string(), + }) + .report_error("tabs-json-error"), + Self::MissingLocalIdError => { + ErrorHandling::convert(TabsApiError::UnexpectedTabsError { + reason: "MissingLocalId".to_string(), + }) + .report_error("tabs-missing-local-id-error") + } + Self::UrlParseError(e) => ErrorHandling::convert(TabsApiError::UnexpectedTabsError { + reason: e.to_string(), + }) + .report_error("tabs-url-parse-error"), + Self::SqlError(e) => ErrorHandling::convert(TabsApiError::SqlError { + reason: e.to_string(), + }) + .report_error("tabs-sql-error"), + Self::OpenDatabaseError(e) => ErrorHandling::convert(TabsApiError::SqlError { + reason: e.to_string(), + }) + .report_error("tabs-open-database-error"), + } + } +} diff --git a/third_party/rust/tabs/src/lib.rs b/third_party/rust/tabs/src/lib.rs new file mode 100644 index 0000000000..58f32ea5e0 --- /dev/null +++ b/third_party/rust/tabs/src/lib.rs @@ -0,0 +1,39 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#![allow(unknown_lints)] +#![warn(rust_2018_idioms)] + +#[macro_use] +pub mod error; +mod schema; +mod storage; +mod store; +mod sync; + +uniffi::include_scaffolding!("tabs"); + +// Our UDL uses a `Guid` type. +use sync_guid::Guid as TabsGuid; +impl UniffiCustomTypeConverter for TabsGuid { + type Builtin = String; + + fn into_custom(val: Self::Builtin) -> uniffi::Result<TabsGuid> { + Ok(TabsGuid::new(val.as_str())) + } + + fn from_custom(obj: Self) -> Self::Builtin { + obj.into() + } +} + +pub use crate::storage::{ClientRemoteTabs, RemoteTabRecord, TabsDeviceType}; +pub use crate::store::TabsStore; +pub use error::{ApiResult, Error, Result, TabsApiError}; +use sync15::DeviceType; + +pub use crate::sync::engine::get_registered_sync_engine; + +pub use crate::sync::bridge::TabsBridgedEngine; +pub use crate::sync::engine::TabsEngine; diff --git a/third_party/rust/tabs/src/schema.rs b/third_party/rust/tabs/src/schema.rs new file mode 100644 index 0000000000..ad8713b487 --- /dev/null +++ b/third_party/rust/tabs/src/schema.rs @@ -0,0 +1,150 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// Tabs is a bit special - it's a trivial SQL schema and is only used as a persistent +// cache, and the semantics of the "tabs" collection means there's no need for +// syncChangeCounter/syncStatus nor a mirror etc. + +use rusqlite::{Connection, Transaction}; +use sql_support::{ + open_database::{ + ConnectionInitializer as MigrationLogic, Error as MigrationError, Result as MigrationResult, + }, + ConnExt, +}; + +// The record is the TabsRecord struct in json and this module doesn't need to deserialize, so we just +// store each client as its own row. +const CREATE_SCHEMA_SQL: &str = " + CREATE TABLE IF NOT EXISTS tabs ( + guid TEXT NOT NULL PRIMARY KEY, + record TEXT NOT NULL, + last_modified INTEGER NOT NULL + ); +"; + +const CREATE_META_TABLE_SQL: &str = " + CREATE TABLE IF NOT EXISTS moz_meta ( + key TEXT PRIMARY KEY, + value NOT NULL + ) +"; + +pub(crate) static LAST_SYNC_META_KEY: &str = "last_sync_time"; +pub(crate) static GLOBAL_SYNCID_META_KEY: &str = "global_sync_id"; +pub(crate) static COLLECTION_SYNCID_META_KEY: &str = "tabs_sync_id"; +// Tabs stores this in the meta table due to a unique requirement that we only know the list +// of connected clients when syncing, however getting the list of tabs could be called at anytime +// so we store it so we can translate from the tabs sync record ID to the FxA device id for the client +pub(crate) static REMOTE_CLIENTS_KEY: &str = "remote_clients"; + +pub struct TabsMigrationLogic; + +impl MigrationLogic for TabsMigrationLogic { + const NAME: &'static str = "tabs storage db"; + const END_VERSION: u32 = 2; + + fn prepare(&self, conn: &Connection, _db_empty: bool) -> MigrationResult<()> { + let initial_pragmas = " + -- We don't care about temp tables being persisted to disk. + PRAGMA temp_store = 2; + -- we unconditionally want write-ahead-logging mode. + PRAGMA journal_mode=WAL; + -- foreign keys seem worth enforcing (and again, we don't care in practice) + PRAGMA foreign_keys = ON; + "; + conn.execute_batch(initial_pragmas)?; + // This is where we'd define our sql functions if we had any! + conn.set_prepared_statement_cache_capacity(128); + Ok(()) + } + + fn init(&self, db: &Transaction<'_>) -> MigrationResult<()> { + log::debug!("Creating schemas"); + db.execute_all(&[CREATE_SCHEMA_SQL, CREATE_META_TABLE_SQL])?; + Ok(()) + } + + fn upgrade_from(&self, db: &Transaction<'_>, version: u32) -> MigrationResult<()> { + match version { + 1 => upgrade_from_v1(db), + _ => Err(MigrationError::IncompatibleVersion(version)), + } + } +} + +fn upgrade_from_v1(db: &Connection) -> MigrationResult<()> { + // The previous version stored the entire payload in one row + // and cleared on each sync -- it's fine to just drop it + db.execute_batch("DROP TABLE tabs;")?; + // Recreate the world + db.execute_all(&[CREATE_SCHEMA_SQL, CREATE_META_TABLE_SQL])?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::TabsStorage; + use rusqlite::OptionalExtension; + use serde_json::json; + use sql_support::open_database::test_utils::MigratedDatabaseFile; + + const CREATE_V1_SCHEMA_SQL: &str = " + CREATE TABLE IF NOT EXISTS tabs ( + payload TEXT NOT NULL + ); + PRAGMA user_version=1; + "; + + #[test] + fn test_create_schema_twice() { + let mut db = TabsStorage::new_with_mem_path("test"); + let conn = db.open_or_create().unwrap(); + conn.execute_batch(CREATE_SCHEMA_SQL) + .expect("should allow running twice"); + } + + #[test] + fn test_tabs_db_upgrade_from_v1() { + let db_file = MigratedDatabaseFile::new(TabsMigrationLogic, CREATE_V1_SCHEMA_SQL); + db_file.run_all_upgrades(); + // Verify we can open the DB just fine, since migration is essentially a drop + // we don't need to check any data integrity + let mut storage = TabsStorage::new(db_file.path); + storage.open_or_create().unwrap(); + assert!(storage.open_if_exists().unwrap().is_some()); + + let test_payload = json!({ + "id": "device-with-a-tab", + "clientName": "device with a tab", + "tabs": [{ + "title": "the title", + "urlHistory": [ + "https://mozilla.org/" + ], + "icon": "https://mozilla.org/icon", + "lastUsed": 1643764207, + }] + }); + let db = storage.open_if_exists().unwrap().unwrap(); + // We should be able to insert without a SQL error after upgrade + db.execute( + "INSERT INTO tabs (guid, record, last_modified) VALUES (:guid, :record, :last_modified);", + rusqlite::named_params! { + ":guid": "my-device", + ":record": serde_json::to_string(&test_payload).unwrap(), + ":last_modified": "1643764207" + }, + ) + .unwrap(); + + let row: Option<String> = db + .query_row("SELECT guid FROM tabs;", [], |row| row.get(0)) + .optional() + .unwrap(); + // Verify we can query for a valid guid now + assert_eq!(row.unwrap(), "my-device"); + } +} diff --git a/third_party/rust/tabs/src/storage.rs b/third_party/rust/tabs/src/storage.rs new file mode 100644 index 0000000000..eaccbc3446 --- /dev/null +++ b/third_party/rust/tabs/src/storage.rs @@ -0,0 +1,727 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// From https://searchfox.org/mozilla-central/rev/ea63a0888d406fae720cf24f4727d87569a8cab5/services/sync/modules/constants.js#75 +const URI_LENGTH_MAX: usize = 65536; +// https://searchfox.org/mozilla-central/rev/ea63a0888d406fae720cf24f4727d87569a8cab5/services/sync/modules/engines/tabs.js#8 +const TAB_ENTRIES_LIMIT: usize = 5; + +use crate::error::*; +use crate::schema; +use crate::sync::record::TabsRecord; +use crate::DeviceType; +use rusqlite::{ + types::{FromSql, ToSql}, + Connection, OpenFlags, +}; +use serde_derive::{Deserialize, Serialize}; +use sql_support::open_database::{self, open_database_with_flags}; +use sql_support::ConnExt; +use std::cell::RefCell; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use sync15::{RemoteClient, ServerTimestamp}; +pub type TabsDeviceType = crate::DeviceType; +pub type RemoteTabRecord = RemoteTab; + +pub(crate) const TABS_CLIENT_TTL: u32 = 15_552_000; // 180 days, same as CLIENTS_TTL +const FAR_FUTURE: i64 = 4_102_405_200_000; // 2100/01/01 +const MAX_PAYLOAD_SIZE: usize = 512 * 1024; // Twice as big as desktop, still smaller than server max (2MB) +const MAX_TITLE_CHAR_LENGTH: usize = 512; // We put an upper limit on title sizes for tabs to reduce memory + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RemoteTab { + pub title: String, + pub url_history: Vec<String>, + pub icon: Option<String>, + pub last_used: i64, // In ms. +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ClientRemoteTabs { + // The fxa_device_id of the client. *Should not* come from the id in the `clients` collection, + // because that may or may not be the fxa_device_id (currently, it will not be for desktop + // records.) + pub client_id: String, + pub client_name: String, + #[serde( + default = "devicetype_default_deser", + skip_serializing_if = "devicetype_is_unknown" + )] + pub device_type: DeviceType, + // serde default so we can read old rows that didn't persist this. + #[serde(default)] + pub last_modified: i64, + pub remote_tabs: Vec<RemoteTab>, +} + +fn devicetype_default_deser() -> DeviceType { + // replace with `DeviceType::default_deser` once #4861 lands. + DeviceType::Unknown +} + +// Unlike most other uses-cases, here we do allow serializing ::Unknown, but skip it. +fn devicetype_is_unknown(val: &DeviceType) -> bool { + matches!(val, DeviceType::Unknown) +} + +// Tabs has unique requirements for storage: +// * The "local_tabs" exist only so we can sync them out. There's no facility to +// query "local tabs", so there's no need to store these persistently - ie, they +// are write-only. +// * The "remote_tabs" exist purely for incoming items via sync - there's no facility +// to set them locally - they are read-only. +// Note that this means a database is only actually needed after Sync fetches remote tabs, +// and because sync users are in the minority, the use of a database here is purely +// optional and created on demand. The implication here is that asking for the "remote tabs" +// when no database exists is considered a normal situation and just implies no remote tabs exist. +// (Note however we don't attempt to remove the database when no remote tabs exist, so having +// no remote tabs in an existing DB is also a normal situation) +pub struct TabsStorage { + local_tabs: RefCell<Option<Vec<RemoteTab>>>, + db_path: PathBuf, + db_connection: Option<Connection>, +} + +impl TabsStorage { + pub fn new(db_path: impl AsRef<Path>) -> Self { + Self { + local_tabs: RefCell::default(), + db_path: db_path.as_ref().to_path_buf(), + db_connection: None, + } + } + + /// Arrange for a new memory-based TabsStorage. As per other DB semantics, creating + /// this isn't enough to actually create the db! + pub fn new_with_mem_path(db_path: &str) -> Self { + let name = PathBuf::from(format!("file:{}?mode=memory&cache=shared", db_path)); + Self::new(name) + } + + /// If a DB file exists, open and return it. + pub fn open_if_exists(&mut self) -> Result<Option<&Connection>> { + if let Some(ref existing) = self.db_connection { + return Ok(Some(existing)); + } + let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX + | OpenFlags::SQLITE_OPEN_URI + | OpenFlags::SQLITE_OPEN_READ_WRITE; + match open_database_with_flags( + self.db_path.clone(), + flags, + &crate::schema::TabsMigrationLogic, + ) { + Ok(conn) => { + self.db_connection = Some(conn); + Ok(self.db_connection.as_ref()) + } + Err(open_database::Error::SqlError(rusqlite::Error::SqliteFailure(code, _))) + if code.code == rusqlite::ErrorCode::CannotOpen => + { + Ok(None) + } + Err(e) => Err(e.into()), + } + } + + /// Open and return the DB, creating it if necessary. + pub fn open_or_create(&mut self) -> Result<&Connection> { + if let Some(ref existing) = self.db_connection { + return Ok(existing); + } + let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX + | OpenFlags::SQLITE_OPEN_URI + | OpenFlags::SQLITE_OPEN_READ_WRITE + | OpenFlags::SQLITE_OPEN_CREATE; + let conn = open_database_with_flags( + self.db_path.clone(), + flags, + &crate::schema::TabsMigrationLogic, + )?; + self.db_connection = Some(conn); + Ok(self.db_connection.as_ref().unwrap()) + } + + pub fn update_local_state(&mut self, local_state: Vec<RemoteTab>) { + self.local_tabs.borrow_mut().replace(local_state); + } + + // We try our best to fit as many tabs in a payload as possible, this includes + // limiting the url history entries, title character count and finally drop enough tabs + // until we have small enough payload that the server will accept + pub fn prepare_local_tabs_for_upload(&self) -> Option<Vec<RemoteTab>> { + if let Some(local_tabs) = self.local_tabs.borrow().as_ref() { + let mut sanitized_tabs: Vec<RemoteTab> = local_tabs + .iter() + .cloned() + .filter_map(|mut tab| { + if tab.url_history.is_empty() || !is_url_syncable(&tab.url_history[0]) { + return None; + } + let mut sanitized_history = Vec::with_capacity(TAB_ENTRIES_LIMIT); + for url in tab.url_history { + if sanitized_history.len() == TAB_ENTRIES_LIMIT { + break; + } + if is_url_syncable(&url) { + sanitized_history.push(url); + } + } + + tab.url_history = sanitized_history; + // Potentially truncate the title to some limit + tab.title = slice_up_to(tab.title, MAX_TITLE_CHAR_LENGTH); + Some(tab) + }) + .collect(); + // Sort the tabs so when we trim tabs it's the oldest tabs + sanitized_tabs.sort_by(|a, b| b.last_used.cmp(&a.last_used)); + // If trimming the tab length failed for some reason, just return the untrimmed tabs + trim_tabs_length(&mut sanitized_tabs, MAX_PAYLOAD_SIZE); + return Some(sanitized_tabs); + } + None + } + + pub fn get_remote_tabs(&mut self) -> Option<Vec<ClientRemoteTabs>> { + let conn = match self.open_if_exists() { + Err(e) => { + error_support::report_error!( + "tabs-read-remote", + "Failed to read remote tabs: {}", + e + ); + return None; + } + Ok(None) => return None, + Ok(Some(conn)) => conn, + }; + + let records: Vec<(TabsRecord, ServerTimestamp)> = match conn.query_rows_and_then_cached( + "SELECT record, last_modified FROM tabs", + [], + |row| -> Result<_> { + Ok(( + serde_json::from_str(&row.get::<_, String>(0)?)?, + ServerTimestamp(row.get::<_, i64>(1)?), + )) + }, + ) { + Ok(records) => records, + Err(e) => { + error_support::report_error!("tabs-read-remote", "Failed to read database: {}", e); + return None; + } + }; + let mut crts: Vec<ClientRemoteTabs> = Vec::new(); + let remote_clients: HashMap<String, RemoteClient> = + match self.get_meta::<String>(schema::REMOTE_CLIENTS_KEY) { + Err(e) => { + error_support::report_error!( + "tabs-read-remote", + "Failed to get remote clients: {}", + e + ); + return None; + } + // We don't return early here since we still store tabs even if we don't + // "know" about the client it's associated with (incase it becomes available later) + Ok(None) => HashMap::default(), + Ok(Some(json)) => serde_json::from_str(&json).unwrap(), + }; + for (record, last_modified) in records { + let id = record.id.clone(); + let crt = if let Some(remote_client) = remote_clients.get(&id) { + ClientRemoteTabs::from_record_with_remote_client( + remote_client + .fxa_device_id + .as_ref() + .unwrap_or(&id) + .to_owned(), + last_modified, + remote_client, + record, + ) + } else { + // A record with a device that's not in our remote clients seems unlikely, but + // could happen - in most cases though, it will be due to a disconnected client - + // so we really should consider just dropping it? (Sadly though, it does seem + // possible it's actually a very recently connected client, so we keep it) + // We should get rid of this eventually - https://github.com/mozilla/application-services/issues/5199 + log::info!( + "Storing tabs from a client that doesn't appear in the devices list: {}", + id, + ); + ClientRemoteTabs::from_record(id, last_modified, record) + }; + crts.push(crt); + } + Some(crts) + } + + // Keep DB from growing infinitely since we only ask for records since our last sync + // and may or may not know about the client it's associated with -- but we could at some point + // and should start returning those tabs immediately. If that client hasn't been seen in 3 weeks, + // we remove it until it reconnects + pub fn remove_stale_clients(&mut self) -> Result<()> { + let last_sync = self.get_meta::<i64>(schema::LAST_SYNC_META_KEY)?; + if let Some(conn) = self.open_if_exists()? { + if let Some(last_sync) = last_sync { + let client_ttl_ms = (TABS_CLIENT_TTL as i64) * 1000; + // On desktop, a quick write temporarily sets the last_sync to FAR_FUTURE + // but if it doesn't set it back to the original (crash, etc) it + // means we'll most likely trash all our records (as it's more than any TTL we'd ever do) + // so we need to detect this for now until we have native quick write support + if last_sync - client_ttl_ms >= 0 && last_sync != (FAR_FUTURE * 1000) { + let tx = conn.unchecked_transaction()?; + let num_removed = tx.execute_cached( + "DELETE FROM tabs WHERE last_modified <= :last_sync - :ttl", + rusqlite::named_params! { + ":last_sync": last_sync, + ":ttl": client_ttl_ms, + }, + )?; + log::info!( + "removed {} stale clients (threshold was {})", + num_removed, + last_sync - client_ttl_ms + ); + tx.commit()?; + } + } + } + Ok(()) + } +} + +impl TabsStorage { + pub(crate) fn replace_remote_tabs( + &mut self, + // This is a tuple because we need to know what the server reports + // as the last time a record was modified + new_remote_tabs: Vec<(TabsRecord, ServerTimestamp)>, + ) -> Result<()> { + let connection = self.open_or_create()?; + let tx = connection.unchecked_transaction()?; + + // For tabs it's fine if we override the existing tabs for a remote + // there can only ever be one record for each client + for remote_tab in new_remote_tabs { + let record = remote_tab.0; + let last_modified = remote_tab.1; + log::info!( + "inserting tab for device {}, last modified at {}", + record.id, + last_modified.as_millis() + ); + tx.execute_cached( + "INSERT OR REPLACE INTO tabs (guid, record, last_modified) VALUES (:guid, :record, :last_modified);", + rusqlite::named_params! { + ":guid": &record.id, + ":record": serde_json::to_string(&record).expect("tabs don't fail to serialize"), + ":last_modified": last_modified.as_millis() + }, + )?; + } + tx.commit()?; + Ok(()) + } + + pub(crate) fn wipe_remote_tabs(&mut self) -> Result<()> { + if let Some(db) = self.open_if_exists()? { + db.execute_batch("DELETE FROM tabs")?; + } + Ok(()) + } + + pub(crate) fn wipe_local_tabs(&self) { + self.local_tabs.replace(None); + } + + pub(crate) fn put_meta(&mut self, key: &str, value: &dyn ToSql) -> Result<()> { + let db = self.open_or_create()?; + db.execute_cached( + "REPLACE INTO moz_meta (key, value) VALUES (:key, :value)", + &[(":key", &key as &dyn ToSql), (":value", value)], + )?; + Ok(()) + } + + pub(crate) fn get_meta<T: FromSql>(&mut self, key: &str) -> Result<Option<T>> { + match self.open_if_exists() { + Ok(Some(db)) => { + let res = db.try_query_one( + "SELECT value FROM moz_meta WHERE key = :key", + &[(":key", &key)], + true, + )?; + Ok(res) + } + Err(e) => Err(e), + Ok(None) => Ok(None), + } + } + + pub(crate) fn delete_meta(&mut self, key: &str) -> Result<()> { + if let Some(db) = self.open_if_exists()? { + db.execute_cached("DELETE FROM moz_meta WHERE key = :key", &[(":key", &key)])?; + } + Ok(()) + } +} + +// Trim the amount of tabs in a list to fit the specified memory size +fn trim_tabs_length(tabs: &mut Vec<RemoteTab>, payload_size_max_bytes: usize) { + // Ported from https://searchfox.org/mozilla-central/rev/84fb1c4511312a0b9187f647d90059e3a6dd27f8/services/sync/modules/util.sys.mjs#422 + // See bug 535326 comment 8 for an explanation of the estimation + let max_serialized_size = (payload_size_max_bytes / 4) * 3 - 1500; + let size = compute_serialized_size(tabs); + if size > max_serialized_size { + // Estimate a little more than the direct fraction to maximize packing + let cutoff = (tabs.len() * max_serialized_size) / size; + tabs.truncate(cutoff); + + // Keep dropping off the last entry until the data fits. + while compute_serialized_size(tabs) > max_serialized_size { + tabs.pop(); + } + } +} + +fn compute_serialized_size(v: &Vec<RemoteTab>) -> usize { + serde_json::to_string(v).unwrap_or_default().len() +} + +// Similar to places/utils.js +// This method ensures we safely truncate a string up to a certain max_len while +// respecting char bounds to prevent rust panics. If we do end up truncating, we +// append an ellipsis to the string +pub fn slice_up_to(s: String, max_len: usize) -> String { + if max_len >= s.len() { + return s; + } + + let ellipsis = '\u{2026}'; + // Ensure we leave space for the ellipsis while still being under the max + let mut idx = max_len - ellipsis.len_utf8(); + while !s.is_char_boundary(idx) { + idx -= 1; + } + let mut new_str = s[..idx].to_string(); + new_str.push(ellipsis); + new_str +} + +// Try to keep in sync with https://searchfox.org/mozilla-central/rev/2ad13433da20a0749e1e9a10ec0ab49b987c2c8e/modules/libpref/init/all.js#3927 +fn is_url_syncable(url: &str) -> bool { + url.len() <= URI_LENGTH_MAX + && !(url.starts_with("about:") + || url.starts_with("resource:") + || url.starts_with("chrome:") + || url.starts_with("wyciwyg:") + || url.starts_with("blob:") + || url.starts_with("file:") + || url.starts_with("moz-extension:") + || url.starts_with("data:")) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::sync::record::TabsRecordTab; + + #[test] + fn test_is_url_syncable() { + assert!(is_url_syncable("https://bobo.com")); + assert!(is_url_syncable("ftp://bobo.com")); + assert!(!is_url_syncable("about:blank")); + // XXX - this smells wrong - we should insist on a valid complete URL? + assert!(is_url_syncable("aboutbobo.com")); + assert!(!is_url_syncable("file:///Users/eoger/bobo")); + } + + #[test] + fn test_open_if_exists_no_file() { + let dir = tempfile::tempdir().unwrap(); + let db_name = dir.path().join("test_open_for_read_no_file.db"); + let mut storage = TabsStorage::new(db_name.clone()); + assert!(storage.open_if_exists().unwrap().is_none()); + storage.open_or_create().unwrap(); // will have created it. + // make a new storage, but leave the file alone. + let mut storage = TabsStorage::new(db_name); + // db file exists, so opening for read should open it. + assert!(storage.open_if_exists().unwrap().is_some()); + } + + #[test] + fn test_tabs_meta() { + let dir = tempfile::tempdir().unwrap(); + let db_name = dir.path().join("test_tabs_meta.db"); + let mut db = TabsStorage::new(db_name); + let test_key = "TEST KEY A"; + let test_value = "TEST VALUE A"; + let test_key2 = "TEST KEY B"; + let test_value2 = "TEST VALUE B"; + + // should automatically make the DB if one doesn't exist + db.put_meta(test_key, &test_value).unwrap(); + db.put_meta(test_key2, &test_value2).unwrap(); + + let retrieved_value: String = db.get_meta(test_key).unwrap().expect("test value"); + let retrieved_value2: String = db.get_meta(test_key2).unwrap().expect("test value 2"); + + assert_eq!(retrieved_value, test_value); + assert_eq!(retrieved_value2, test_value2); + + // check that the value of an existing key can be updated + let test_value3 = "TEST VALUE C"; + db.put_meta(test_key, &test_value3).unwrap(); + + let retrieved_value3: String = db.get_meta(test_key).unwrap().expect("test value 3"); + + assert_eq!(retrieved_value3, test_value3); + + // check that a deleted key is not retrieved + db.delete_meta(test_key).unwrap(); + let retrieved_value4: Option<String> = db.get_meta(test_key).unwrap(); + assert!(retrieved_value4.is_none()); + } + + #[test] + fn test_prepare_local_tabs_for_upload() { + let mut storage = TabsStorage::new_with_mem_path("test_prepare_local_tabs_for_upload"); + assert_eq!(storage.prepare_local_tabs_for_upload(), None); + storage.update_local_state(vec![ + RemoteTab { + title: "".to_owned(), + url_history: vec!["about:blank".to_owned(), "https://foo.bar".to_owned()], + icon: None, + last_used: 0, + }, + RemoteTab { + title: "".to_owned(), + url_history: vec![ + "https://foo.bar".to_owned(), + "about:blank".to_owned(), + "about:blank".to_owned(), + "about:blank".to_owned(), + "about:blank".to_owned(), + "about:blank".to_owned(), + "about:blank".to_owned(), + "about:blank".to_owned(), + ], + icon: None, + last_used: 0, + }, + RemoteTab { + title: "".to_owned(), + url_history: vec![ + "https://foo.bar".to_owned(), + "about:blank".to_owned(), + "https://foo2.bar".to_owned(), + "https://foo3.bar".to_owned(), + "https://foo4.bar".to_owned(), + "https://foo5.bar".to_owned(), + "https://foo6.bar".to_owned(), + ], + icon: None, + last_used: 0, + }, + RemoteTab { + title: "".to_owned(), + url_history: vec![], + icon: None, + last_used: 0, + }, + ]); + assert_eq!( + storage.prepare_local_tabs_for_upload(), + Some(vec![ + RemoteTab { + title: "".to_owned(), + url_history: vec!["https://foo.bar".to_owned()], + icon: None, + last_used: 0, + }, + RemoteTab { + title: "".to_owned(), + url_history: vec![ + "https://foo.bar".to_owned(), + "https://foo2.bar".to_owned(), + "https://foo3.bar".to_owned(), + "https://foo4.bar".to_owned(), + "https://foo5.bar".to_owned() + ], + icon: None, + last_used: 0, + }, + ]) + ); + } + #[test] + fn test_trimming_tab_title() { + let mut storage = TabsStorage::new_with_mem_path("test_prepare_local_tabs_for_upload"); + assert_eq!(storage.prepare_local_tabs_for_upload(), None); + storage.update_local_state(vec![RemoteTab { + title: "a".repeat(MAX_TITLE_CHAR_LENGTH + 10), // Fill a string more than max + url_history: vec!["https://foo.bar".to_owned()], + icon: None, + last_used: 0, + }]); + let ellipsis_char = '\u{2026}'; + let mut truncated_title = "a".repeat(MAX_TITLE_CHAR_LENGTH - ellipsis_char.len_utf8()); + truncated_title.push(ellipsis_char); + assert_eq!( + storage.prepare_local_tabs_for_upload(), + Some(vec![ + // title trimmed to 50 characters + RemoteTab { + title: truncated_title, // title was trimmed to only max char length + url_history: vec!["https://foo.bar".to_owned()], + icon: None, + last_used: 0, + }, + ]) + ); + } + #[test] + fn test_utf8_safe_title_trim() { + let mut storage = TabsStorage::new_with_mem_path("test_prepare_local_tabs_for_upload"); + assert_eq!(storage.prepare_local_tabs_for_upload(), None); + storage.update_local_state(vec![ + RemoteTab { + title: "😍".repeat(MAX_TITLE_CHAR_LENGTH + 10), // Fill a string more than max + url_history: vec!["https://foo.bar".to_owned()], + icon: None, + last_used: 0, + }, + RemoteTab { + title: "を".repeat(MAX_TITLE_CHAR_LENGTH + 5), // Fill a string more than max + url_history: vec!["https://foo_jp.bar".to_owned()], + icon: None, + last_used: 0, + }, + ]); + let ellipsis_char = '\u{2026}'; + // (MAX_TITLE_CHAR_LENGTH - ellipsis / "😍" bytes) + let mut truncated_title = "😍".repeat(127); + // (MAX_TITLE_CHAR_LENGTH - ellipsis / "を" bytes) + let mut truncated_jp_title = "を".repeat(169); + truncated_title.push(ellipsis_char); + truncated_jp_title.push(ellipsis_char); + let remote_tabs = storage.prepare_local_tabs_for_upload().unwrap(); + assert_eq!( + remote_tabs, + vec![ + RemoteTab { + title: truncated_title, // title was trimmed to only max char length + url_history: vec!["https://foo.bar".to_owned()], + icon: None, + last_used: 0, + }, + RemoteTab { + title: truncated_jp_title, // title was trimmed to only max char length + url_history: vec!["https://foo_jp.bar".to_owned()], + icon: None, + last_used: 0, + }, + ] + ); + // We should be less than max + assert!(remote_tabs[0].title.chars().count() <= MAX_TITLE_CHAR_LENGTH); + assert!(remote_tabs[1].title.chars().count() <= MAX_TITLE_CHAR_LENGTH); + } + #[test] + fn test_trim_tabs_length() { + let mut storage = TabsStorage::new_with_mem_path("test_prepare_local_tabs_for_upload"); + assert_eq!(storage.prepare_local_tabs_for_upload(), None); + let mut too_many_tabs: Vec<RemoteTab> = Vec::new(); + for n in 1..5000 { + too_many_tabs.push(RemoteTab { + title: "aaaa aaaa aaaa aaaa aaaa aaaa aaaa aaaa aaaa aaaa" //50 characters + .to_owned(), + url_history: vec![format!("https://foo{}.bar", n)], + icon: None, + last_used: 0, + }); + } + let tabs_mem_size = compute_serialized_size(&too_many_tabs); + // ensure we are definitely over the payload limit + assert!(tabs_mem_size > MAX_PAYLOAD_SIZE); + // Add our over-the-limit tabs to the local state + storage.update_local_state(too_many_tabs.clone()); + // prepare_local_tabs_for_upload did the trimming we needed to get under payload size + let tabs_to_upload = &storage.prepare_local_tabs_for_upload().unwrap(); + assert!(compute_serialized_size(tabs_to_upload) <= MAX_PAYLOAD_SIZE); + } + // Helper struct to model what's stored in the DB + struct TabsSQLRecord { + guid: String, + record: TabsRecord, + last_modified: i64, + } + #[test] + fn test_remove_stale_clients() { + let dir = tempfile::tempdir().unwrap(); + let db_name = dir.path().join("test_remove_stale_clients.db"); + let mut storage = TabsStorage::new(db_name); + storage.open_or_create().unwrap(); + assert!(storage.open_if_exists().unwrap().is_some()); + + let records = vec![ + TabsSQLRecord { + guid: "device-1".to_string(), + record: TabsRecord { + id: "device-1".to_string(), + client_name: "Device #1".to_string(), + tabs: vec![TabsRecordTab { + title: "the title".to_string(), + url_history: vec!["https://mozilla.org/".to_string()], + icon: Some("https://mozilla.org/icon".to_string()), + last_used: 1643764207000, + }], + }, + last_modified: 1643764207000, + }, + TabsSQLRecord { + guid: "device-outdated".to_string(), + record: TabsRecord { + id: "device-outdated".to_string(), + client_name: "Device outdated".to_string(), + tabs: vec![TabsRecordTab { + title: "the title".to_string(), + url_history: vec!["https://mozilla.org/".to_string()], + icon: Some("https://mozilla.org/icon".to_string()), + last_used: 1643764207000, + }], + }, + last_modified: 1443764207000, // old + }, + ]; + let db = storage.open_if_exists().unwrap().unwrap(); + for record in records { + db.execute( + "INSERT INTO tabs (guid, record, last_modified) VALUES (:guid, :record, :last_modified);", + rusqlite::named_params! { + ":guid": &record.guid, + ":record": serde_json::to_string(&record.record).unwrap(), + ":last_modified": &record.last_modified, + }, + ).unwrap(); + } + // pretend we just synced + let last_synced = 1643764207000_i64; + storage + .put_meta(schema::LAST_SYNC_META_KEY, &last_synced) + .unwrap(); + storage.remove_stale_clients().unwrap(); + + let remote_tabs = storage.get_remote_tabs().unwrap(); + // We should've removed the outdated device + assert_eq!(remote_tabs.len(), 1); + // Assert the correct record is still being returned + assert_eq!(remote_tabs[0].client_id, "device-1"); + } +} diff --git a/third_party/rust/tabs/src/store.rs b/third_party/rust/tabs/src/store.rs new file mode 100644 index 0000000000..67bae498f7 --- /dev/null +++ b/third_party/rust/tabs/src/store.rs @@ -0,0 +1,41 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::storage::{ClientRemoteTabs, RemoteTab, TabsStorage}; +use std::path::Path; +use std::sync::Mutex; + +pub struct TabsStore { + pub storage: Mutex<TabsStorage>, +} + +impl TabsStore { + pub fn new(db_path: impl AsRef<Path>) -> Self { + Self { + storage: Mutex::new(TabsStorage::new(db_path)), + } + } + + pub fn new_with_mem_path(db_path: &str) -> Self { + Self { + storage: Mutex::new(TabsStorage::new_with_mem_path(db_path)), + } + } + + pub fn set_local_tabs(&self, local_state: Vec<RemoteTab>) { + self.storage.lock().unwrap().update_local_state(local_state); + } + + // like remote_tabs, but serves the uniffi layer + pub fn get_all(&self) -> Vec<ClientRemoteTabs> { + match self.remote_tabs() { + Some(list) => list, + None => vec![], + } + } + + pub fn remote_tabs(&self) -> Option<Vec<ClientRemoteTabs>> { + self.storage.lock().unwrap().get_remote_tabs() + } +} diff --git a/third_party/rust/tabs/src/sync/bridge.rs b/third_party/rust/tabs/src/sync/bridge.rs new file mode 100644 index 0000000000..6e37a0cf5c --- /dev/null +++ b/third_party/rust/tabs/src/sync/bridge.rs @@ -0,0 +1,436 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::error::TabsApiError; +use crate::sync::engine::TabsSyncImpl; +use crate::TabsStore; +use anyhow::Result; +use std::sync::{Arc, Mutex}; +use sync15::bso::{IncomingBso, OutgoingBso}; +use sync15::engine::{ApplyResults, BridgedEngine, CollSyncIds, EngineSyncAssociation}; +use sync15::{ClientData, ServerTimestamp}; +use sync_guid::Guid as SyncGuid; + +impl TabsStore { + // Returns a bridged sync engine for Desktop for this store. + pub fn bridged_engine(self: Arc<Self>) -> Arc<TabsBridgedEngine> { + let bridge_impl = crate::sync::bridge::BridgedEngineImpl::new(&self); + // This is a concrete struct exposed via uniffi. + let concrete = TabsBridgedEngine::new(bridge_impl); + Arc::new(concrete) + } +} + +/// A bridged engine implements all the methods needed to make the +/// `storage.sync` store work with Desktop's Sync implementation. +/// Conceptually it's very similar to our SyncEngine, and once we have SyncEngine +/// and BridgedEngine using the same types we can probably combine them (or at least +/// implement this bridged engine purely in terms of SyncEngine) +/// See also #2841 and #5139 +pub struct BridgedEngineImpl { + sync_impl: Mutex<TabsSyncImpl>, + incoming: Mutex<Vec<IncomingBso>>, +} + +impl BridgedEngineImpl { + /// Creates a bridged engine for syncing. + pub fn new(store: &Arc<TabsStore>) -> Self { + Self { + sync_impl: Mutex::new(TabsSyncImpl::new(store.clone())), + incoming: Mutex::default(), + } + } +} + +impl BridgedEngine for BridgedEngineImpl { + fn last_sync(&self) -> Result<i64> { + Ok(self + .sync_impl + .lock() + .unwrap() + .get_last_sync()? + .unwrap_or_default() + .as_millis()) + } + + fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> { + self.sync_impl + .lock() + .unwrap() + .set_last_sync(ServerTimestamp::from_millis(last_sync_millis))?; + Ok(()) + } + + fn sync_id(&self) -> Result<Option<String>> { + Ok(match self.sync_impl.lock().unwrap().get_sync_assoc()? { + EngineSyncAssociation::Connected(id) => Some(id.coll.to_string()), + EngineSyncAssociation::Disconnected => None, + }) + } + + fn reset_sync_id(&self) -> Result<String> { + let new_id = SyncGuid::random().to_string(); + let new_coll_ids = CollSyncIds { + global: SyncGuid::empty(), + coll: new_id.clone().into(), + }; + self.sync_impl + .lock() + .unwrap() + .reset(&EngineSyncAssociation::Connected(new_coll_ids))?; + Ok(new_id) + } + + fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> { + let mut sync_impl = self.sync_impl.lock().unwrap(); + let assoc = sync_impl.get_sync_assoc()?; + if matches!(assoc, EngineSyncAssociation::Connected(c) if c.coll == sync_id) { + log::debug!("ensure_current_sync_id is current"); + } else { + let new_coll_ids = CollSyncIds { + global: SyncGuid::empty(), + coll: sync_id.into(), + }; + sync_impl.reset(&EngineSyncAssociation::Connected(new_coll_ids))?; + } + Ok(sync_id.to_string()) + } + + fn prepare_for_sync(&self, client_data: &str) -> Result<()> { + let data: ClientData = serde_json::from_str(client_data)?; + Ok(self.sync_impl.lock().unwrap().prepare_for_sync(data)?) + } + + fn sync_started(&self) -> Result<()> { + // This is a no-op for the Tabs Engine + Ok(()) + } + + fn store_incoming(&self, incoming: Vec<IncomingBso>) -> Result<()> { + // Store the incoming payload in memory so we can use it in apply + *(self.incoming.lock().unwrap()) = incoming; + Ok(()) + } + + fn apply(&self) -> Result<ApplyResults> { + let mut incoming = self.incoming.lock().unwrap(); + // We've a reference to a Vec<> but it's owned by the mutex - swap the mutex owned + // value for an empty vec so we can consume the original. + let mut records = Vec::new(); + std::mem::swap(&mut records, &mut *incoming); + let mut telem = sync15::telemetry::Engine::new("tabs"); + + let mut sync_impl = self.sync_impl.lock().unwrap(); + let outgoing = sync_impl.apply_incoming(records, &mut telem)?; + + Ok(ApplyResults { + records: outgoing, + num_reconciled: Some(0), + }) + } + + fn set_uploaded(&self, server_modified_millis: i64, ids: &[SyncGuid]) -> Result<()> { + Ok(self + .sync_impl + .lock() + .unwrap() + .sync_finished(ServerTimestamp::from_millis(server_modified_millis), ids)?) + } + + fn sync_finished(&self) -> Result<()> { + *(self.incoming.lock().unwrap()) = Vec::default(); + Ok(()) + } + + fn reset(&self) -> Result<()> { + self.sync_impl + .lock() + .unwrap() + .reset(&EngineSyncAssociation::Disconnected)?; + Ok(()) + } + + fn wipe(&self) -> Result<()> { + self.sync_impl.lock().unwrap().wipe()?; + Ok(()) + } +} + +// This is for uniffi to expose, and does nothing than delegate back to the trait. +pub struct TabsBridgedEngine { + bridge_impl: BridgedEngineImpl, +} + +impl TabsBridgedEngine { + pub fn new(bridge_impl: BridgedEngineImpl) -> Self { + Self { bridge_impl } + } + + pub fn last_sync(&self) -> Result<i64> { + self.bridge_impl.last_sync() + } + + pub fn set_last_sync(&self, last_sync: i64) -> Result<()> { + self.bridge_impl.set_last_sync(last_sync) + } + + pub fn sync_id(&self) -> Result<Option<String>> { + self.bridge_impl.sync_id() + } + + pub fn reset_sync_id(&self) -> Result<String> { + self.bridge_impl.reset_sync_id() + } + + pub fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> { + self.bridge_impl.ensure_current_sync_id(sync_id) + } + + pub fn prepare_for_sync(&self, client_data: &str) -> Result<()> { + self.bridge_impl.prepare_for_sync(client_data) + } + + pub fn sync_started(&self) -> Result<()> { + self.bridge_impl.sync_started() + } + + // Decode the JSON-encoded IncomingBso's that UniFFI passes to us + fn convert_incoming_bsos(&self, incoming: Vec<String>) -> Result<Vec<IncomingBso>> { + let mut bsos = Vec::with_capacity(incoming.len()); + for inc in incoming { + bsos.push(serde_json::from_str::<IncomingBso>(&inc)?); + } + Ok(bsos) + } + + // Encode OutgoingBso's into JSON for UniFFI + fn convert_outgoing_bsos(&self, outgoing: Vec<OutgoingBso>) -> Result<Vec<String>> { + let mut bsos = Vec::with_capacity(outgoing.len()); + for e in outgoing { + bsos.push(serde_json::to_string(&e)?); + } + Ok(bsos) + } + + pub fn store_incoming(&self, incoming: Vec<String>) -> Result<()> { + self.bridge_impl + .store_incoming(self.convert_incoming_bsos(incoming)?) + } + + pub fn apply(&self) -> Result<Vec<String>> { + let apply_results = self.bridge_impl.apply()?; + self.convert_outgoing_bsos(apply_results.records) + } + + pub fn set_uploaded(&self, server_modified_millis: i64, guids: Vec<SyncGuid>) -> Result<()> { + self.bridge_impl + .set_uploaded(server_modified_millis, &guids) + } + + pub fn sync_finished(&self) -> Result<()> { + self.bridge_impl.sync_finished() + } + + pub fn reset(&self) -> Result<()> { + self.bridge_impl.reset() + } + + pub fn wipe(&self) -> Result<()> { + self.bridge_impl.wipe() + } +} + +impl From<anyhow::Error> for TabsApiError { + fn from(value: anyhow::Error) -> Self { + TabsApiError::UnexpectedTabsError { + reason: value.to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::{RemoteTab, TABS_CLIENT_TTL}; + use crate::sync::record::TabsRecordTab; + use serde_json::json; + use std::collections::HashMap; + use sync15::{ClientData, DeviceType, RemoteClient}; + + // A copy of the normal "engine" tests but which go via the bridge + #[test] + fn test_sync_via_bridge() { + env_logger::try_init().ok(); + + let store = Arc::new(TabsStore::new_with_mem_path("test-bridge_incoming")); + + // Set some local tabs for our device. + let my_tabs = vec![ + RemoteTab { + title: "my first tab".to_string(), + url_history: vec!["http://1.com".to_string()], + icon: None, + last_used: 2, + }, + RemoteTab { + title: "my second tab".to_string(), + url_history: vec!["http://2.com".to_string()], + icon: None, + last_used: 1, + }, + ]; + store.set_local_tabs(my_tabs.clone()); + + let bridge = store.bridged_engine(); + + let client_data = ClientData { + local_client_id: "my-device".to_string(), + recent_clients: HashMap::from([ + ( + "my-device".to_string(), + RemoteClient { + fxa_device_id: None, + device_name: "my device".to_string(), + device_type: sync15::DeviceType::Unknown, + }, + ), + ( + "device-no-tabs".to_string(), + RemoteClient { + fxa_device_id: None, + device_name: "device with no tabs".to_string(), + device_type: DeviceType::Unknown, + }, + ), + ( + "device-with-a-tab".to_string(), + RemoteClient { + fxa_device_id: None, + device_name: "device with a tab".to_string(), + device_type: DeviceType::Unknown, + }, + ), + ]), + }; + bridge + .prepare_for_sync(&serde_json::to_string(&client_data).unwrap()) + .expect("should work"); + + let records = vec![ + // my-device should be ignored by sync - here it is acting as what our engine last + // wrote, but the actual tabs in our store we set above are what should be used. + json!({ + "id": "my-device", + "clientName": "my device", + "tabs": [{ + "title": "the title", + "urlHistory": [ + "https://mozilla.org/" + ], + "icon": "https://mozilla.org/icon", + "lastUsed": 1643764207 + }] + }), + json!({ + "id": "device-no-tabs", + "clientName": "device with no tabs", + "tabs": [], + }), + json!({ + "id": "device-with-a-tab", + "clientName": "device with a tab", + "tabs": [{ + "title": "the title", + "urlHistory": [ + "https://mozilla.org/" + ], + "icon": "https://mozilla.org/icon", + "lastUsed": 1643764207 + }] + }), + // This has the main payload as OK but the tabs part invalid. + json!({ + "id": "device-with-invalid-tab", + "clientName": "device with a tab", + "tabs": [{ + "foo": "bar", + }] + }), + // We want this to be a valid payload but an invalid tab - so it needs an ID. + json!({ + "id": "invalid-tab", + "foo": "bar" + }), + ]; + + let mut incoming = Vec::new(); + for record in records { + // Annoyingly we can't use `IncomingEnvelope` directly as it intentionally doesn't + // support Serialize - so need to use explicit json. + let envelope = json!({ + "id": record.get("id"), + "modified": 0, + "payload": serde_json::to_string(&record).unwrap(), + }); + incoming.push(serde_json::to_string(&envelope).unwrap()); + } + + bridge.store_incoming(incoming).expect("should store"); + + let out = bridge.apply().expect("should apply"); + + assert_eq!(out.len(), 1); + let ours = serde_json::from_str::<serde_json::Value>(&out[0]).unwrap(); + // As above, can't use `OutgoingEnvelope` as it doesn't Deserialize. + // First, convert my_tabs from the local `RemoteTab` to the Sync specific `TabsRecord` + let expected_tabs: Vec<TabsRecordTab> = + my_tabs.into_iter().map(|t| t.to_record_tab()).collect(); + let expected = json!({ + "id": "my-device".to_string(), + "payload": json!({ + "id": "my-device".to_string(), + "clientName": "my device", + "tabs": serde_json::to_value(expected_tabs).unwrap(), + }).to_string(), + "ttl": TABS_CLIENT_TTL, + }); + + assert_eq!(ours, expected); + bridge.set_uploaded(1234, vec![]).unwrap(); + assert_eq!(bridge.last_sync().unwrap(), 1234); + } + + #[test] + fn test_sync_meta() { + env_logger::try_init().ok(); + + let store = Arc::new(TabsStore::new_with_mem_path("test-meta")); + let bridge = store.bridged_engine(); + + // Should not error or panic + assert_eq!(bridge.last_sync().unwrap(), 0); + bridge.set_last_sync(3).unwrap(); + assert_eq!(bridge.last_sync().unwrap(), 3); + + assert!(bridge.sync_id().unwrap().is_none()); + + bridge.ensure_current_sync_id("some_guid").unwrap(); + assert_eq!(bridge.sync_id().unwrap(), Some("some_guid".to_string())); + // changing the sync ID should reset the timestamp + assert_eq!(bridge.last_sync().unwrap(), 0); + bridge.set_last_sync(3).unwrap(); + + bridge.reset_sync_id().unwrap(); + // should now be a random guid. + assert_ne!(bridge.sync_id().unwrap(), Some("some_guid".to_string())); + // should have reset the last sync timestamp. + assert_eq!(bridge.last_sync().unwrap(), 0); + bridge.set_last_sync(3).unwrap(); + + // `reset` clears the guid and the timestamp + bridge.reset().unwrap(); + assert_eq!(bridge.last_sync().unwrap(), 0); + assert!(bridge.sync_id().unwrap().is_none()); + } +} diff --git a/third_party/rust/tabs/src/sync/engine.rs b/third_party/rust/tabs/src/sync/engine.rs new file mode 100644 index 0000000000..bc0978cbf2 --- /dev/null +++ b/third_party/rust/tabs/src/sync/engine.rs @@ -0,0 +1,543 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::schema; +use crate::storage::{ClientRemoteTabs, RemoteTab, TABS_CLIENT_TTL}; +use crate::store::TabsStore; +use crate::sync::record::{TabsRecord, TabsRecordTab}; +use crate::Result; +use std::collections::HashMap; +use std::sync::{Arc, Mutex, Weak}; +use sync15::bso::{IncomingBso, OutgoingBso, OutgoingEnvelope}; +use sync15::engine::{ + CollSyncIds, CollectionRequest, EngineSyncAssociation, IncomingChangeset, OutgoingChangeset, + SyncEngine, SyncEngineId, +}; +use sync15::{telemetry, ClientData, DeviceType, RemoteClient, ServerTimestamp}; +use sync_guid::Guid; + +// Our "sync manager" will use whatever is stashed here. +lazy_static::lazy_static! { + // Mutex: just taken long enough to update the inner stuff + static ref STORE_FOR_MANAGER: Mutex<Weak<TabsStore>> = Mutex::new(Weak::new()); +} + +/// Called by the sync manager to get a sync engine via the store previously +/// registered with the sync manager. +pub fn get_registered_sync_engine(engine_id: &SyncEngineId) -> Option<Box<dyn SyncEngine>> { + let weak = STORE_FOR_MANAGER.lock().unwrap(); + match weak.upgrade() { + None => None, + Some(store) => match engine_id { + SyncEngineId::Tabs => Some(Box::new(TabsEngine::new(Arc::clone(&store)))), + // panicing here seems reasonable - it's a static error if this + // it hit, not something that runtime conditions can influence. + _ => unreachable!("can't provide unknown engine: {}", engine_id), + }, + } +} + +impl ClientRemoteTabs { + pub(crate) fn from_record_with_remote_client( + client_id: String, + last_modified: ServerTimestamp, + remote_client: &RemoteClient, + record: TabsRecord, + ) -> Self { + Self { + client_id, + client_name: remote_client.device_name.clone(), + device_type: remote_client.device_type, + last_modified: last_modified.as_millis(), + remote_tabs: record.tabs.iter().map(RemoteTab::from_record_tab).collect(), + } + } + + // Note that this should die as part of https://github.com/mozilla/application-services/issues/5199 + // If we don't have a `RemoteClient` record, then we don't know whether the ID passed here is + // the fxa_device_id (which is must be) or the client_id (which it will be if this ends up being + // called for desktop records, where client_id != fxa_device_id) + pub(crate) fn from_record( + client_id: String, + last_modified: ServerTimestamp, + record: TabsRecord, + ) -> Self { + Self { + client_id, + client_name: record.client_name, + device_type: DeviceType::Unknown, + last_modified: last_modified.as_millis(), + remote_tabs: record.tabs.iter().map(RemoteTab::from_record_tab).collect(), + } + } + fn to_record(&self) -> TabsRecord { + TabsRecord { + id: self.client_id.clone(), + client_name: self.client_name.clone(), + tabs: self + .remote_tabs + .iter() + .map(RemoteTab::to_record_tab) + .collect(), + } + } +} + +impl RemoteTab { + pub(crate) fn from_record_tab(tab: &TabsRecordTab) -> Self { + Self { + title: tab.title.clone(), + url_history: tab.url_history.clone(), + icon: tab.icon.clone(), + last_used: tab.last_used.checked_mul(1000).unwrap_or_default(), + } + } + pub(super) fn to_record_tab(&self) -> TabsRecordTab { + TabsRecordTab { + title: self.title.clone(), + url_history: self.url_history.clone(), + icon: self.icon.clone(), + last_used: self.last_used.checked_div(1000).unwrap_or_default(), + } + } +} + +// This is the implementation of syncing, which is used by the 2 different "sync engines" +// (We hope to get these 2 engines even closer in the future, but for now, we suck this up) +pub struct TabsSyncImpl { + pub(super) store: Arc<TabsStore>, + pub(super) local_id: String, +} + +impl TabsSyncImpl { + pub fn new(store: Arc<TabsStore>) -> Self { + Self { + store, + local_id: Default::default(), + } + } + + pub fn prepare_for_sync(&mut self, client_data: ClientData) -> Result<()> { + let mut storage = self.store.storage.lock().unwrap(); + // We only know the client list at sync time, but need to return tabs potentially + // at any time -- so we store the clients in the meta table to be able to properly + // return a ClientRemoteTab struct + storage.put_meta( + schema::REMOTE_CLIENTS_KEY, + &serde_json::to_string(&client_data.recent_clients)?, + )?; + self.local_id = client_data.local_client_id; + Ok(()) + } + + pub fn apply_incoming( + &mut self, + inbound: Vec<IncomingBso>, + telem: &mut telemetry::Engine, + ) -> Result<Vec<OutgoingBso>> { + let local_id = self.local_id.clone(); + let mut remote_tabs = Vec::with_capacity(inbound.len()); + let mut incoming_telemetry = telemetry::EngineIncoming::new(); + + let remote_clients: HashMap<String, RemoteClient> = { + let mut storage = self.store.storage.lock().unwrap(); + match storage.get_meta::<String>(schema::REMOTE_CLIENTS_KEY)? { + None => HashMap::default(), + Some(json) => serde_json::from_str(&json).unwrap(), + } + }; + for incoming in inbound { + if incoming.envelope.id == local_id { + // That's our own record, ignore it. + continue; + } + let modified = incoming.envelope.modified; + let record = match incoming.into_content::<TabsRecord>().content() { + Some(record) => record, + None => { + // Invalid record or a "tombstone" which tabs don't have. + log::warn!("Ignoring incoming invalid tab"); + incoming_telemetry.failed(1); + continue; + } + }; + remote_tabs.push((record, modified)); + } + + // We want to keep the mutex for as short as possible + let local_tabs = { + let mut storage = self.store.storage.lock().unwrap(); + // In desktop we might end up here with zero records when doing a quick-write, in + // which case we don't want to wipe the DB. + if !remote_tabs.is_empty() { + storage.replace_remote_tabs(remote_tabs)?; + } + storage.remove_stale_clients()?; + storage.prepare_local_tabs_for_upload() + }; + let outgoing = if let Some(local_tabs) = local_tabs { + let (client_name, device_type) = remote_clients + .get(&local_id) + .map(|client| (client.device_name.clone(), client.device_type)) + .unwrap_or_else(|| (String::new(), DeviceType::Unknown)); + let local_record = ClientRemoteTabs { + client_id: local_id.clone(), + client_name, + device_type, + last_modified: 0, // ignored for outgoing records. + remote_tabs: local_tabs.to_vec(), + }; + log::trace!("outgoing {:?}", local_record); + let envelope = OutgoingEnvelope { + id: local_id.into(), + ttl: Some(TABS_CLIENT_TTL), + ..Default::default() + }; + vec![OutgoingBso::from_content( + envelope, + local_record.to_record(), + )?] + } else { + vec![] + }; + telem.incoming(incoming_telemetry); + Ok(outgoing) + } + + pub fn sync_finished( + &mut self, + new_timestamp: ServerTimestamp, + records_synced: &[Guid], + ) -> Result<()> { + log::info!( + "sync completed after uploading {} records", + records_synced.len() + ); + self.set_last_sync(new_timestamp)?; + Ok(()) + } + + pub fn reset(&mut self, assoc: &EngineSyncAssociation) -> Result<()> { + self.set_last_sync(ServerTimestamp(0))?; + let mut storage = self.store.storage.lock().unwrap(); + storage.delete_meta(schema::REMOTE_CLIENTS_KEY)?; + storage.wipe_remote_tabs()?; + match assoc { + EngineSyncAssociation::Disconnected => { + storage.delete_meta(schema::GLOBAL_SYNCID_META_KEY)?; + storage.delete_meta(schema::COLLECTION_SYNCID_META_KEY)?; + } + EngineSyncAssociation::Connected(ids) => { + storage.put_meta(schema::GLOBAL_SYNCID_META_KEY, &ids.global.to_string())?; + storage.put_meta(schema::COLLECTION_SYNCID_META_KEY, &ids.coll.to_string())?; + } + }; + Ok(()) + } + + pub fn wipe(&mut self) -> Result<()> { + self.reset(&EngineSyncAssociation::Disconnected)?; + // not clear why we need to wipe the local tabs - the app is just going + // to re-add them? + self.store.storage.lock().unwrap().wipe_local_tabs(); + Ok(()) + } + + pub fn get_sync_assoc(&self) -> Result<EngineSyncAssociation> { + let mut storage = self.store.storage.lock().unwrap(); + let global = storage.get_meta::<String>(schema::GLOBAL_SYNCID_META_KEY)?; + let coll = storage.get_meta::<String>(schema::COLLECTION_SYNCID_META_KEY)?; + Ok(if let (Some(global), Some(coll)) = (global, coll) { + EngineSyncAssociation::Connected(CollSyncIds { + global: Guid::from_string(global), + coll: Guid::from_string(coll), + }) + } else { + EngineSyncAssociation::Disconnected + }) + } + + pub fn set_last_sync(&self, last_sync: ServerTimestamp) -> Result<()> { + let mut storage = self.store.storage.lock().unwrap(); + log::debug!("Updating last sync to {}", last_sync); + let last_sync_millis = last_sync.as_millis(); + storage.put_meta(schema::LAST_SYNC_META_KEY, &last_sync_millis) + } + + pub fn get_last_sync(&self) -> Result<Option<ServerTimestamp>> { + let mut storage = self.store.storage.lock().unwrap(); + let millis = storage.get_meta::<i64>(schema::LAST_SYNC_META_KEY)?; + Ok(millis.map(ServerTimestamp)) + } +} + +// This is the "SyncEngine" used when syncing via the Sync Manager. +pub struct TabsEngine { + pub sync_impl: Mutex<TabsSyncImpl>, +} + +impl TabsEngine { + pub fn new(store: Arc<TabsStore>) -> Self { + Self { + sync_impl: Mutex::new(TabsSyncImpl::new(store)), + } + } +} + +impl SyncEngine for TabsEngine { + fn collection_name(&self) -> std::borrow::Cow<'static, str> { + "tabs".into() + } + + fn prepare_for_sync(&self, get_client_data: &dyn Fn() -> ClientData) -> anyhow::Result<()> { + Ok(self + .sync_impl + .lock() + .unwrap() + .prepare_for_sync(get_client_data())?) + } + + fn apply_incoming( + &self, + inbound: Vec<IncomingChangeset>, + telem: &mut telemetry::Engine, + ) -> anyhow::Result<OutgoingChangeset> { + assert_eq!(inbound.len(), 1, "only requested one set of records"); + let inbound = inbound.into_iter().next().unwrap(); + let outgoing_records = self + .sync_impl + .lock() + .unwrap() + .apply_incoming(inbound.changes, telem)?; + + Ok(OutgoingChangeset::new("tabs".into(), outgoing_records)) + } + + fn sync_finished( + &self, + new_timestamp: ServerTimestamp, + records_synced: Vec<Guid>, + ) -> anyhow::Result<()> { + Ok(self + .sync_impl + .lock() + .unwrap() + .sync_finished(new_timestamp, &records_synced)?) + } + + fn get_collection_requests( + &self, + server_timestamp: ServerTimestamp, + ) -> anyhow::Result<Vec<CollectionRequest>> { + let since = self + .sync_impl + .lock() + .unwrap() + .get_last_sync()? + .unwrap_or_default(); + Ok(if since == server_timestamp { + vec![] + } else { + vec![CollectionRequest::new("tabs".into()) + .full() + .newer_than(since)] + }) + } + + fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> { + Ok(self.sync_impl.lock().unwrap().get_sync_assoc()?) + } + + fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> { + Ok(self.sync_impl.lock().unwrap().reset(assoc)?) + } + + fn wipe(&self) -> anyhow::Result<()> { + Ok(self.sync_impl.lock().unwrap().wipe()?) + } +} + +impl crate::TabsStore { + // This allows the embedding app to say "make this instance available to + // the sync manager". The implementation is more like "offer to sync mgr" + // (thereby avoiding us needing to link with the sync manager) but + // `register_with_sync_manager()` is logically what's happening so that's + // the name it gets. + pub fn register_with_sync_manager(self: Arc<Self>) { + let mut state = STORE_FOR_MANAGER.lock().unwrap(); + *state = Arc::downgrade(&self); + } +} + +#[cfg(test)] +pub mod test { + use super::*; + use serde_json::json; + use sync15::bso::IncomingBso; + + #[test] + fn test_incoming_tabs() { + env_logger::try_init().ok(); + + let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path("test-incoming"))); + + let records = vec![ + json!({ + "id": "device-no-tabs", + "clientName": "device with no tabs", + "tabs": [], + }), + json!({ + "id": "device-with-a-tab", + "clientName": "device with a tab", + "tabs": [{ + "title": "the title", + "urlHistory": [ + "https://mozilla.org/" + ], + "icon": "https://mozilla.org/icon", + "lastUsed": 1643764207 + }] + }), + // test an updated payload will replace the previous record + json!({ + "id": "device-with-a-tab", + "clientName": "device with an updated tab", + "tabs": [{ + "title": "the new title", + "urlHistory": [ + "https://mozilla.org/" + ], + "icon": "https://mozilla.org/icon", + "lastUsed": 1643764208 + }] + }), + // This has the main payload as OK but the tabs part invalid. + json!({ + "id": "device-with-invalid-tab", + "clientName": "device with a tab", + "tabs": [{ + "foo": "bar", + }] + }), + // We want this to be a valid payload but an invalid tab - so it needs an ID. + json!({ + "id": "invalid-tab", + "foo": "bar" + }), + ]; + + let incoming = IncomingChangeset::new_with_changes( + engine.collection_name(), + ServerTimestamp(0), + records + .into_iter() + .map(IncomingBso::from_test_content) + .collect(), + ); + let outgoing = engine + .apply_incoming(vec![incoming], &mut telemetry::Engine::new("tabs")) + .expect("Should apply incoming and stage outgoing records"); + + assert!(outgoing.changes.is_empty()); + + // now check the store has what we think it has. + let sync_impl = engine.sync_impl.lock().unwrap(); + let mut storage = sync_impl.store.storage.lock().unwrap(); + let mut crts = storage.get_remote_tabs().expect("should work"); + crts.sort_by(|a, b| a.client_name.partial_cmp(&b.client_name).unwrap()); + assert_eq!(crts.len(), 2, "we currently include devices with no tabs"); + let crt = &crts[0]; + assert_eq!(crt.client_name, "device with an updated tab"); + assert_eq!(crt.device_type, DeviceType::Unknown); + assert_eq!(crt.remote_tabs.len(), 1); + assert_eq!(crt.remote_tabs[0].title, "the new title"); + + let crt = &crts[1]; + assert_eq!(crt.client_name, "device with no tabs"); + assert_eq!(crt.device_type, DeviceType::Unknown); + assert_eq!(crt.remote_tabs.len(), 0); + } + + #[test] + fn test_no_incoming_doesnt_write() { + env_logger::try_init().ok(); + + let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path( + "test_no_incoming_doesnt_write", + ))); + + let records = vec![json!({ + "id": "device-with-a-tab", + "clientName": "device with a tab", + "tabs": [{ + "title": "the title", + "urlHistory": [ + "https://mozilla.org/" + ], + "icon": "https://mozilla.org/icon", + "lastUsed": 1643764207 + }] + })]; + + let incoming = IncomingChangeset::new_with_changes( + engine.collection_name(), + ServerTimestamp(0), + records + .into_iter() + .map(IncomingBso::from_test_content) + .collect(), + ); + engine + .apply_incoming(vec![incoming], &mut telemetry::Engine::new("tabs")) + .expect("Should apply incoming and stage outgoing records"); + + // now check the store has what we think it has. + { + let sync_impl = engine.sync_impl.lock().unwrap(); + let mut storage = sync_impl.store.storage.lock().unwrap(); + assert_eq!(storage.get_remote_tabs().expect("should work").len(), 1); + } + + // Now another sync with zero incoming records, should still be able to get back + // our one client. + let incoming = IncomingChangeset::new_with_changes( + engine.collection_name(), + ServerTimestamp(0), + vec![], + ); + engine + .apply_incoming(vec![incoming], &mut telemetry::Engine::new("tabs")) + .expect("Should succeed applying zero records"); + + { + let sync_impl = engine.sync_impl.lock().unwrap(); + let mut storage = sync_impl.store.storage.lock().unwrap(); + assert_eq!(storage.get_remote_tabs().expect("should work").len(), 1); + } + } + + #[test] + fn test_sync_manager_registration() { + let store = Arc::new(TabsStore::new_with_mem_path("test-registration")); + assert_eq!(Arc::strong_count(&store), 1); + assert_eq!(Arc::weak_count(&store), 0); + Arc::clone(&store).register_with_sync_manager(); + assert_eq!(Arc::strong_count(&store), 1); + assert_eq!(Arc::weak_count(&store), 1); + let registered = STORE_FOR_MANAGER + .lock() + .unwrap() + .upgrade() + .expect("should upgrade"); + assert!(Arc::ptr_eq(&store, ®istered)); + drop(registered); + // should be no new references + assert_eq!(Arc::strong_count(&store), 1); + assert_eq!(Arc::weak_count(&store), 1); + // dropping the registered object should drop the registration. + drop(store); + assert!(STORE_FOR_MANAGER.lock().unwrap().upgrade().is_none()); + } +} diff --git a/third_party/rust/tabs/src/sync/full_sync.rs b/third_party/rust/tabs/src/sync/full_sync.rs new file mode 100644 index 0000000000..768aecce0f --- /dev/null +++ b/third_party/rust/tabs/src/sync/full_sync.rs @@ -0,0 +1,70 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::{sync::engine::TabsSyncImpl, ApiResult, TabsEngine, TabsStore}; +use error_support::handle_error; +use interrupt_support::NeverInterrupts; +use std::sync::Arc; +use sync15::client::{sync_multiple, MemoryCachedState, Sync15StorageClientInit}; +use sync15::engine::EngineSyncAssociation; +use sync15::KeyBundle; + +impl TabsStore { + #[handle_error(crate::Error)] + pub fn reset(self: Arc<Self>) -> ApiResult<()> { + let mut sync_impl = TabsSyncImpl::new(Arc::clone(&self)); + sync_impl.reset(&EngineSyncAssociation::Disconnected)?; + Ok(()) + } + + /// A convenience wrapper around sync_multiple. + #[handle_error(crate::Error)] + pub fn sync( + self: Arc<Self>, + key_id: String, + access_token: String, + sync_key: String, + tokenserver_url: String, + local_id: String, + ) -> ApiResult<String> { + let mut mem_cached_state = MemoryCachedState::default(); + let engine = TabsEngine::new(Arc::clone(&self)); + + // Since we are syncing without the sync manager, there's no + // command processor, therefore no clients engine, and in + // consequence `TabsStore::prepare_for_sync` is never called + // which means our `local_id` will never be set. + // Do it here. + engine.sync_impl.lock().unwrap().local_id = local_id; + + let storage_init = &Sync15StorageClientInit { + key_id, + access_token, + tokenserver_url: url::Url::parse(tokenserver_url.as_str())?, + }; + let root_sync_key = &KeyBundle::from_ksync_base64(sync_key.as_str())?; + + let mut result = sync_multiple( + &[&engine], + &mut None, + &mut mem_cached_state, + storage_init, + root_sync_key, + &NeverInterrupts, + None, + ); + + // for b/w compat reasons, we do some dances with the result. + // XXX - note that this means telemetry isn't going to be reported back + // to the app - we need to check with lockwise about whether they really + // need these failures to be reported or whether we can loosen this. + if let Err(e) = result.result { + return Err(e.into()); + } + match result.engine_results.remove("tabs") { + None | Some(Ok(())) => Ok(serde_json::to_string(&result.telemetry)?), + Some(Err(e)) => Err(e.into()), + } + } +} diff --git a/third_party/rust/tabs/src/sync/mod.rs b/third_party/rust/tabs/src/sync/mod.rs new file mode 100644 index 0000000000..7d668ff843 --- /dev/null +++ b/third_party/rust/tabs/src/sync/mod.rs @@ -0,0 +1,35 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +pub(crate) mod bridge; +pub(crate) mod engine; +pub(crate) mod record; + +#[cfg(feature = "full-sync")] +pub mod full_sync; + +// When full-sync isn't enabled we need stub versions for these UDL exposed functions. +#[cfg(not(feature = "full-sync"))] +impl crate::TabsStore { + pub fn reset(self: std::sync::Arc<Self>) -> crate::error::ApiResult<()> { + log::warn!("reset: feature not enabled"); + Err(crate::error::TabsApiError::SyncError { + reason: "reset".to_string(), + }) + } + + pub fn sync( + self: std::sync::Arc<Self>, + _key_id: String, + _access_token: String, + _sync_key: String, + _tokenserver_url: String, + _local_id: String, + ) -> crate::error::ApiResult<String> { + log::warn!("sync: feature not enabled"); + Err(crate::error::TabsApiError::SyncError { + reason: "sync".to_string(), + }) + } +} diff --git a/third_party/rust/tabs/src/sync/record.rs b/third_party/rust/tabs/src/sync/record.rs new file mode 100644 index 0000000000..a1da1a33e8 --- /dev/null +++ b/third_party/rust/tabs/src/sync/record.rs @@ -0,0 +1,97 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use serde_derive::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct TabsRecordTab { + pub title: String, + pub url_history: Vec<String>, + pub icon: Option<String>, + pub last_used: i64, // Seconds since epoch! +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +// This struct mirrors what is stored on the server +pub struct TabsRecord { + // `String` instead of `SyncGuid` because some IDs are FxA device ID (XXX - that doesn't + // matter though - this could easily be a Guid!) + pub id: String, + pub client_name: String, + pub tabs: Vec<TabsRecordTab>, +} + +#[cfg(test)] +pub mod test { + use super::*; + use serde_json::json; + + #[test] + fn test_payload() { + let payload = json!({ + "id": "JkeBPC50ZI0m", + "clientName": "client name", + "tabs": [{ + "title": "the title", + "urlHistory": [ + "https://mozilla.org/" + ], + "icon": "https://mozilla.org/icon", + "lastUsed": 1643764207 + }] + }); + let record: TabsRecord = serde_json::from_value(payload).expect("should work"); + assert_eq!(record.id, "JkeBPC50ZI0m"); + assert_eq!(record.client_name, "client name"); + assert_eq!(record.tabs.len(), 1); + let tab = &record.tabs[0]; + assert_eq!(tab.title, "the title"); + assert_eq!(tab.icon, Some("https://mozilla.org/icon".to_string())); + assert_eq!(tab.last_used, 1643764207); + } + + #[test] + fn test_roundtrip() { + let tab = TabsRecord { + id: "JkeBPC50ZI0m".into(), + client_name: "client name".into(), + tabs: vec![TabsRecordTab { + title: "the title".into(), + url_history: vec!["https://mozilla.org/".into()], + icon: Some("https://mozilla.org/icon".into()), + last_used: 1643764207, + }], + }; + let round_tripped = + serde_json::from_value(serde_json::to_value(tab.clone()).unwrap()).unwrap(); + assert_eq!(tab, round_tripped); + } + + #[test] + fn test_extra_fields() { + let payload = json!({ + "id": "JkeBPC50ZI0m", + // Let's say we agree on new tabs to record, we want old versions to + // ignore them! + "ignoredField": "??", + "clientName": "client name", + "tabs": [{ + "title": "the title", + "urlHistory": [ + "https://mozilla.org/" + ], + "icon": "https://mozilla.org/icon", + "lastUsed": 1643764207, + // Ditto - make sure we ignore unexpected fields in each tab. + "ignoredField": "??", + }] + }); + let record: TabsRecord = serde_json::from_value(payload).unwrap(); + // The point of this test is really just to ensure the deser worked, so + // just check the ID. + assert_eq!(record.id, "JkeBPC50ZI0m"); + } +} diff --git a/third_party/rust/tabs/src/tabs.udl b/third_party/rust/tabs/src/tabs.udl new file mode 100644 index 0000000000..d1964ea0d2 --- /dev/null +++ b/third_party/rust/tabs/src/tabs.udl @@ -0,0 +1,108 @@ +[Custom] +typedef string TabsGuid; + +namespace tabs { + +}; + +[Error] +interface TabsApiError { + SyncError(string reason); + SqlError(string reason); + UnexpectedTabsError(string reason); +}; + + +interface TabsStore { + constructor(string path); + + sequence<ClientRemoteTabs> get_all(); + + void set_local_tabs(sequence<RemoteTabRecord> remote_tabs); + + [Self=ByArc] + void register_with_sync_manager(); + + [Throws=TabsApiError, Self=ByArc] + void reset(); + + [Throws=TabsApiError, Self=ByArc] + string sync(string key_id, string access_token, string sync_key, string tokenserver_url, string local_id); + + [Self=ByArc] + TabsBridgedEngine bridged_engine(); + +}; + +// Note that this enum is duplicated in fxa-client.udl (although the underlying type *is* +// shared). This duplication exists because there's no direct dependency between that crate and +// this one. We can probably remove the duplication when sync15 gets a .udl file, then we could +// reference it via an `[Extern=...]typedef` +enum TabsDeviceType { "Desktop", "Mobile", "Tablet", "VR", "TV", "Unknown" }; + +dictionary RemoteTabRecord { + string title; + sequence<string> url_history; + string? icon; + // Number of ms since the unix epoch (as reported by the client's clock) + i64 last_used; +}; + +dictionary ClientRemoteTabs { + string client_id; + string client_name; + TabsDeviceType device_type; + // Number of ms since the unix epoch (as reported by the server's clock) + i64 last_modified; + sequence<RemoteTabRecord> remote_tabs; +}; + +// Note the canonical docs for this are in https://searchfox.org/mozilla-central/source/services/interfaces/mozIBridgedSyncEngine.idl +// It's only actually used in desktop, but it's fine to expose this everywhere. +// NOTE: all timestamps here are milliseconds. +interface TabsBridgedEngine { + //readonly attribute long storageVersion; + // readonly attribute boolean allowSkippedRecord; + + // XXX - better logging story than this? + // attribute mozIServicesLogSink logger; + + [Throws=TabsApiError] + i64 last_sync(); + + [Throws=TabsApiError] + void set_last_sync(i64 last_sync); + + [Throws=TabsApiError] + string? sync_id(); + + [Throws=TabsApiError] + string reset_sync_id(); + + [Throws=TabsApiError] + string ensure_current_sync_id([ByRef]string new_sync_id); + + [Throws=TabsApiError] + void prepare_for_sync([ByRef]string client_data); + + [Throws=TabsApiError] + void sync_started(); + + [Throws=TabsApiError] + void store_incoming(sequence<string> incoming_envelopes_as_json); + + [Throws=TabsApiError] + sequence<string> apply(); + + [Throws=TabsApiError] + void set_uploaded(i64 new_timestamp, sequence<TabsGuid> uploaded_ids); + + [Throws=TabsApiError] + void sync_finished(); + + [Throws=TabsApiError] + void reset(); + + [Throws=TabsApiError] + void wipe(); +}; |