summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tabs/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/tabs/src
parentInitial commit. (diff)
downloadfirefox-esr-upstream.tar.xz
firefox-esr-upstream.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tabs/src')
-rw-r--r--third_party/rust/tabs/src/error.rs90
-rw-r--r--third_party/rust/tabs/src/lib.rs39
-rw-r--r--third_party/rust/tabs/src/schema.rs150
-rw-r--r--third_party/rust/tabs/src/storage.rs727
-rw-r--r--third_party/rust/tabs/src/store.rs41
-rw-r--r--third_party/rust/tabs/src/sync/bridge.rs436
-rw-r--r--third_party/rust/tabs/src/sync/engine.rs543
-rw-r--r--third_party/rust/tabs/src/sync/full_sync.rs70
-rw-r--r--third_party/rust/tabs/src/sync/mod.rs35
-rw-r--r--third_party/rust/tabs/src/sync/record.rs97
-rw-r--r--third_party/rust/tabs/src/tabs.udl108
11 files changed, 2336 insertions, 0 deletions
diff --git a/third_party/rust/tabs/src/error.rs b/third_party/rust/tabs/src/error.rs
new file mode 100644
index 0000000000..748c21f656
--- /dev/null
+++ b/third_party/rust/tabs/src/error.rs
@@ -0,0 +1,90 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use error_support::{ErrorHandling, GetErrorHandling};
+
+/// Result enum for the public interface
+pub type ApiResult<T> = std::result::Result<T, TabsApiError>;
+/// Result enum for internal functions
+pub type Result<T> = std::result::Result<T, Error>;
+
+// Errors we return via the public interface.
+#[derive(Debug, thiserror::Error)]
+pub enum TabsApiError {
+ #[error("SyncError: {reason}")]
+ SyncError { reason: String },
+
+ #[error("SqlError: {reason}")]
+ SqlError { reason: String },
+
+ #[error("Unexpected tabs error: {reason}")]
+ UnexpectedTabsError { reason: String },
+}
+
+// Error we use internally
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+ #[cfg(feature = "full-sync")]
+ #[error("Error synchronizing: {0}")]
+ SyncAdapterError(#[from] sync15::Error),
+
+ // Note we are abusing this as a kind of "mis-matched feature" error.
+ // This works because when `full-sync` isn't enabled we don't actually
+ // handle any sync15 errors as the bridged-engine never returns them.
+ #[cfg(not(feature = "full-sync"))]
+ #[error("Sync feature is disabled: {0}")]
+ SyncAdapterError(String),
+
+ #[error("Error parsing JSON data: {0}")]
+ JsonError(#[from] serde_json::Error),
+
+ #[error("Missing SyncUnlockInfo Local ID")]
+ MissingLocalIdError,
+
+ #[error("Error parsing URL: {0}")]
+ UrlParseError(#[from] url::ParseError),
+
+ #[error("Error executing SQL: {0}")]
+ SqlError(#[from] rusqlite::Error),
+
+ #[error("Error opening database: {0}")]
+ OpenDatabaseError(#[from] sql_support::open_database::Error),
+}
+
+// Define how our internal errors are handled and converted to external errors
+// See `support/error/README.md` for how this works, especially the warning about PII.
+impl GetErrorHandling for Error {
+ type ExternalError = TabsApiError;
+
+ fn get_error_handling(&self) -> ErrorHandling<Self::ExternalError> {
+ match self {
+ Self::SyncAdapterError(e) => ErrorHandling::convert(TabsApiError::SyncError {
+ reason: e.to_string(),
+ })
+ .report_error("tabs-sync-error"),
+ Self::JsonError(e) => ErrorHandling::convert(TabsApiError::UnexpectedTabsError {
+ reason: e.to_string(),
+ })
+ .report_error("tabs-json-error"),
+ Self::MissingLocalIdError => {
+ ErrorHandling::convert(TabsApiError::UnexpectedTabsError {
+ reason: "MissingLocalId".to_string(),
+ })
+ .report_error("tabs-missing-local-id-error")
+ }
+ Self::UrlParseError(e) => ErrorHandling::convert(TabsApiError::UnexpectedTabsError {
+ reason: e.to_string(),
+ })
+ .report_error("tabs-url-parse-error"),
+ Self::SqlError(e) => ErrorHandling::convert(TabsApiError::SqlError {
+ reason: e.to_string(),
+ })
+ .report_error("tabs-sql-error"),
+ Self::OpenDatabaseError(e) => ErrorHandling::convert(TabsApiError::SqlError {
+ reason: e.to_string(),
+ })
+ .report_error("tabs-open-database-error"),
+ }
+ }
+}
diff --git a/third_party/rust/tabs/src/lib.rs b/third_party/rust/tabs/src/lib.rs
new file mode 100644
index 0000000000..58f32ea5e0
--- /dev/null
+++ b/third_party/rust/tabs/src/lib.rs
@@ -0,0 +1,39 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#![allow(unknown_lints)]
+#![warn(rust_2018_idioms)]
+
+#[macro_use]
+pub mod error;
+mod schema;
+mod storage;
+mod store;
+mod sync;
+
+uniffi::include_scaffolding!("tabs");
+
+// Our UDL uses a `Guid` type.
+use sync_guid::Guid as TabsGuid;
+impl UniffiCustomTypeConverter for TabsGuid {
+ type Builtin = String;
+
+ fn into_custom(val: Self::Builtin) -> uniffi::Result<TabsGuid> {
+ Ok(TabsGuid::new(val.as_str()))
+ }
+
+ fn from_custom(obj: Self) -> Self::Builtin {
+ obj.into()
+ }
+}
+
+pub use crate::storage::{ClientRemoteTabs, RemoteTabRecord, TabsDeviceType};
+pub use crate::store::TabsStore;
+pub use error::{ApiResult, Error, Result, TabsApiError};
+use sync15::DeviceType;
+
+pub use crate::sync::engine::get_registered_sync_engine;
+
+pub use crate::sync::bridge::TabsBridgedEngine;
+pub use crate::sync::engine::TabsEngine;
diff --git a/third_party/rust/tabs/src/schema.rs b/third_party/rust/tabs/src/schema.rs
new file mode 100644
index 0000000000..ad8713b487
--- /dev/null
+++ b/third_party/rust/tabs/src/schema.rs
@@ -0,0 +1,150 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// Tabs is a bit special - it's a trivial SQL schema and is only used as a persistent
+// cache, and the semantics of the "tabs" collection means there's no need for
+// syncChangeCounter/syncStatus nor a mirror etc.
+
+use rusqlite::{Connection, Transaction};
+use sql_support::{
+ open_database::{
+ ConnectionInitializer as MigrationLogic, Error as MigrationError, Result as MigrationResult,
+ },
+ ConnExt,
+};
+
+// The record is the TabsRecord struct in json and this module doesn't need to deserialize, so we just
+// store each client as its own row.
+const CREATE_SCHEMA_SQL: &str = "
+ CREATE TABLE IF NOT EXISTS tabs (
+ guid TEXT NOT NULL PRIMARY KEY,
+ record TEXT NOT NULL,
+ last_modified INTEGER NOT NULL
+ );
+";
+
+const CREATE_META_TABLE_SQL: &str = "
+ CREATE TABLE IF NOT EXISTS moz_meta (
+ key TEXT PRIMARY KEY,
+ value NOT NULL
+ )
+";
+
+pub(crate) static LAST_SYNC_META_KEY: &str = "last_sync_time";
+pub(crate) static GLOBAL_SYNCID_META_KEY: &str = "global_sync_id";
+pub(crate) static COLLECTION_SYNCID_META_KEY: &str = "tabs_sync_id";
+// Tabs stores this in the meta table due to a unique requirement that we only know the list
+// of connected clients when syncing, however getting the list of tabs could be called at anytime
+// so we store it so we can translate from the tabs sync record ID to the FxA device id for the client
+pub(crate) static REMOTE_CLIENTS_KEY: &str = "remote_clients";
+
+pub struct TabsMigrationLogic;
+
+impl MigrationLogic for TabsMigrationLogic {
+ const NAME: &'static str = "tabs storage db";
+ const END_VERSION: u32 = 2;
+
+ fn prepare(&self, conn: &Connection, _db_empty: bool) -> MigrationResult<()> {
+ let initial_pragmas = "
+ -- We don't care about temp tables being persisted to disk.
+ PRAGMA temp_store = 2;
+ -- we unconditionally want write-ahead-logging mode.
+ PRAGMA journal_mode=WAL;
+ -- foreign keys seem worth enforcing (and again, we don't care in practice)
+ PRAGMA foreign_keys = ON;
+ ";
+ conn.execute_batch(initial_pragmas)?;
+ // This is where we'd define our sql functions if we had any!
+ conn.set_prepared_statement_cache_capacity(128);
+ Ok(())
+ }
+
+ fn init(&self, db: &Transaction<'_>) -> MigrationResult<()> {
+ log::debug!("Creating schemas");
+ db.execute_all(&[CREATE_SCHEMA_SQL, CREATE_META_TABLE_SQL])?;
+ Ok(())
+ }
+
+ fn upgrade_from(&self, db: &Transaction<'_>, version: u32) -> MigrationResult<()> {
+ match version {
+ 1 => upgrade_from_v1(db),
+ _ => Err(MigrationError::IncompatibleVersion(version)),
+ }
+ }
+}
+
+fn upgrade_from_v1(db: &Connection) -> MigrationResult<()> {
+ // The previous version stored the entire payload in one row
+ // and cleared on each sync -- it's fine to just drop it
+ db.execute_batch("DROP TABLE tabs;")?;
+ // Recreate the world
+ db.execute_all(&[CREATE_SCHEMA_SQL, CREATE_META_TABLE_SQL])?;
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::storage::TabsStorage;
+ use rusqlite::OptionalExtension;
+ use serde_json::json;
+ use sql_support::open_database::test_utils::MigratedDatabaseFile;
+
+ const CREATE_V1_SCHEMA_SQL: &str = "
+ CREATE TABLE IF NOT EXISTS tabs (
+ payload TEXT NOT NULL
+ );
+ PRAGMA user_version=1;
+ ";
+
+ #[test]
+ fn test_create_schema_twice() {
+ let mut db = TabsStorage::new_with_mem_path("test");
+ let conn = db.open_or_create().unwrap();
+ conn.execute_batch(CREATE_SCHEMA_SQL)
+ .expect("should allow running twice");
+ }
+
+ #[test]
+ fn test_tabs_db_upgrade_from_v1() {
+ let db_file = MigratedDatabaseFile::new(TabsMigrationLogic, CREATE_V1_SCHEMA_SQL);
+ db_file.run_all_upgrades();
+ // Verify we can open the DB just fine, since migration is essentially a drop
+ // we don't need to check any data integrity
+ let mut storage = TabsStorage::new(db_file.path);
+ storage.open_or_create().unwrap();
+ assert!(storage.open_if_exists().unwrap().is_some());
+
+ let test_payload = json!({
+ "id": "device-with-a-tab",
+ "clientName": "device with a tab",
+ "tabs": [{
+ "title": "the title",
+ "urlHistory": [
+ "https://mozilla.org/"
+ ],
+ "icon": "https://mozilla.org/icon",
+ "lastUsed": 1643764207,
+ }]
+ });
+ let db = storage.open_if_exists().unwrap().unwrap();
+ // We should be able to insert without a SQL error after upgrade
+ db.execute(
+ "INSERT INTO tabs (guid, record, last_modified) VALUES (:guid, :record, :last_modified);",
+ rusqlite::named_params! {
+ ":guid": "my-device",
+ ":record": serde_json::to_string(&test_payload).unwrap(),
+ ":last_modified": "1643764207"
+ },
+ )
+ .unwrap();
+
+ let row: Option<String> = db
+ .query_row("SELECT guid FROM tabs;", [], |row| row.get(0))
+ .optional()
+ .unwrap();
+ // Verify we can query for a valid guid now
+ assert_eq!(row.unwrap(), "my-device");
+ }
+}
diff --git a/third_party/rust/tabs/src/storage.rs b/third_party/rust/tabs/src/storage.rs
new file mode 100644
index 0000000000..eaccbc3446
--- /dev/null
+++ b/third_party/rust/tabs/src/storage.rs
@@ -0,0 +1,727 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+// From https://searchfox.org/mozilla-central/rev/ea63a0888d406fae720cf24f4727d87569a8cab5/services/sync/modules/constants.js#75
+const URI_LENGTH_MAX: usize = 65536;
+// https://searchfox.org/mozilla-central/rev/ea63a0888d406fae720cf24f4727d87569a8cab5/services/sync/modules/engines/tabs.js#8
+const TAB_ENTRIES_LIMIT: usize = 5;
+
+use crate::error::*;
+use crate::schema;
+use crate::sync::record::TabsRecord;
+use crate::DeviceType;
+use rusqlite::{
+ types::{FromSql, ToSql},
+ Connection, OpenFlags,
+};
+use serde_derive::{Deserialize, Serialize};
+use sql_support::open_database::{self, open_database_with_flags};
+use sql_support::ConnExt;
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::path::{Path, PathBuf};
+use sync15::{RemoteClient, ServerTimestamp};
+pub type TabsDeviceType = crate::DeviceType;
+pub type RemoteTabRecord = RemoteTab;
+
+pub(crate) const TABS_CLIENT_TTL: u32 = 15_552_000; // 180 days, same as CLIENTS_TTL
+const FAR_FUTURE: i64 = 4_102_405_200_000; // 2100/01/01
+const MAX_PAYLOAD_SIZE: usize = 512 * 1024; // Twice as big as desktop, still smaller than server max (2MB)
+const MAX_TITLE_CHAR_LENGTH: usize = 512; // We put an upper limit on title sizes for tabs to reduce memory
+
+#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
+pub struct RemoteTab {
+ pub title: String,
+ pub url_history: Vec<String>,
+ pub icon: Option<String>,
+ pub last_used: i64, // In ms.
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct ClientRemoteTabs {
+ // The fxa_device_id of the client. *Should not* come from the id in the `clients` collection,
+ // because that may or may not be the fxa_device_id (currently, it will not be for desktop
+ // records.)
+ pub client_id: String,
+ pub client_name: String,
+ #[serde(
+ default = "devicetype_default_deser",
+ skip_serializing_if = "devicetype_is_unknown"
+ )]
+ pub device_type: DeviceType,
+ // serde default so we can read old rows that didn't persist this.
+ #[serde(default)]
+ pub last_modified: i64,
+ pub remote_tabs: Vec<RemoteTab>,
+}
+
+fn devicetype_default_deser() -> DeviceType {
+ // replace with `DeviceType::default_deser` once #4861 lands.
+ DeviceType::Unknown
+}
+
+// Unlike most other uses-cases, here we do allow serializing ::Unknown, but skip it.
+fn devicetype_is_unknown(val: &DeviceType) -> bool {
+ matches!(val, DeviceType::Unknown)
+}
+
+// Tabs has unique requirements for storage:
+// * The "local_tabs" exist only so we can sync them out. There's no facility to
+// query "local tabs", so there's no need to store these persistently - ie, they
+// are write-only.
+// * The "remote_tabs" exist purely for incoming items via sync - there's no facility
+// to set them locally - they are read-only.
+// Note that this means a database is only actually needed after Sync fetches remote tabs,
+// and because sync users are in the minority, the use of a database here is purely
+// optional and created on demand. The implication here is that asking for the "remote tabs"
+// when no database exists is considered a normal situation and just implies no remote tabs exist.
+// (Note however we don't attempt to remove the database when no remote tabs exist, so having
+// no remote tabs in an existing DB is also a normal situation)
+pub struct TabsStorage {
+ local_tabs: RefCell<Option<Vec<RemoteTab>>>,
+ db_path: PathBuf,
+ db_connection: Option<Connection>,
+}
+
+impl TabsStorage {
+ pub fn new(db_path: impl AsRef<Path>) -> Self {
+ Self {
+ local_tabs: RefCell::default(),
+ db_path: db_path.as_ref().to_path_buf(),
+ db_connection: None,
+ }
+ }
+
+ /// Arrange for a new memory-based TabsStorage. As per other DB semantics, creating
+ /// this isn't enough to actually create the db!
+ pub fn new_with_mem_path(db_path: &str) -> Self {
+ let name = PathBuf::from(format!("file:{}?mode=memory&cache=shared", db_path));
+ Self::new(name)
+ }
+
+ /// If a DB file exists, open and return it.
+ pub fn open_if_exists(&mut self) -> Result<Option<&Connection>> {
+ if let Some(ref existing) = self.db_connection {
+ return Ok(Some(existing));
+ }
+ let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
+ | OpenFlags::SQLITE_OPEN_URI
+ | OpenFlags::SQLITE_OPEN_READ_WRITE;
+ match open_database_with_flags(
+ self.db_path.clone(),
+ flags,
+ &crate::schema::TabsMigrationLogic,
+ ) {
+ Ok(conn) => {
+ self.db_connection = Some(conn);
+ Ok(self.db_connection.as_ref())
+ }
+ Err(open_database::Error::SqlError(rusqlite::Error::SqliteFailure(code, _)))
+ if code.code == rusqlite::ErrorCode::CannotOpen =>
+ {
+ Ok(None)
+ }
+ Err(e) => Err(e.into()),
+ }
+ }
+
+ /// Open and return the DB, creating it if necessary.
+ pub fn open_or_create(&mut self) -> Result<&Connection> {
+ if let Some(ref existing) = self.db_connection {
+ return Ok(existing);
+ }
+ let flags = OpenFlags::SQLITE_OPEN_NO_MUTEX
+ | OpenFlags::SQLITE_OPEN_URI
+ | OpenFlags::SQLITE_OPEN_READ_WRITE
+ | OpenFlags::SQLITE_OPEN_CREATE;
+ let conn = open_database_with_flags(
+ self.db_path.clone(),
+ flags,
+ &crate::schema::TabsMigrationLogic,
+ )?;
+ self.db_connection = Some(conn);
+ Ok(self.db_connection.as_ref().unwrap())
+ }
+
+ pub fn update_local_state(&mut self, local_state: Vec<RemoteTab>) {
+ self.local_tabs.borrow_mut().replace(local_state);
+ }
+
+ // We try our best to fit as many tabs in a payload as possible, this includes
+ // limiting the url history entries, title character count and finally drop enough tabs
+ // until we have small enough payload that the server will accept
+ pub fn prepare_local_tabs_for_upload(&self) -> Option<Vec<RemoteTab>> {
+ if let Some(local_tabs) = self.local_tabs.borrow().as_ref() {
+ let mut sanitized_tabs: Vec<RemoteTab> = local_tabs
+ .iter()
+ .cloned()
+ .filter_map(|mut tab| {
+ if tab.url_history.is_empty() || !is_url_syncable(&tab.url_history[0]) {
+ return None;
+ }
+ let mut sanitized_history = Vec::with_capacity(TAB_ENTRIES_LIMIT);
+ for url in tab.url_history {
+ if sanitized_history.len() == TAB_ENTRIES_LIMIT {
+ break;
+ }
+ if is_url_syncable(&url) {
+ sanitized_history.push(url);
+ }
+ }
+
+ tab.url_history = sanitized_history;
+ // Potentially truncate the title to some limit
+ tab.title = slice_up_to(tab.title, MAX_TITLE_CHAR_LENGTH);
+ Some(tab)
+ })
+ .collect();
+ // Sort the tabs so when we trim tabs it's the oldest tabs
+ sanitized_tabs.sort_by(|a, b| b.last_used.cmp(&a.last_used));
+ // If trimming the tab length failed for some reason, just return the untrimmed tabs
+ trim_tabs_length(&mut sanitized_tabs, MAX_PAYLOAD_SIZE);
+ return Some(sanitized_tabs);
+ }
+ None
+ }
+
+ pub fn get_remote_tabs(&mut self) -> Option<Vec<ClientRemoteTabs>> {
+ let conn = match self.open_if_exists() {
+ Err(e) => {
+ error_support::report_error!(
+ "tabs-read-remote",
+ "Failed to read remote tabs: {}",
+ e
+ );
+ return None;
+ }
+ Ok(None) => return None,
+ Ok(Some(conn)) => conn,
+ };
+
+ let records: Vec<(TabsRecord, ServerTimestamp)> = match conn.query_rows_and_then_cached(
+ "SELECT record, last_modified FROM tabs",
+ [],
+ |row| -> Result<_> {
+ Ok((
+ serde_json::from_str(&row.get::<_, String>(0)?)?,
+ ServerTimestamp(row.get::<_, i64>(1)?),
+ ))
+ },
+ ) {
+ Ok(records) => records,
+ Err(e) => {
+ error_support::report_error!("tabs-read-remote", "Failed to read database: {}", e);
+ return None;
+ }
+ };
+ let mut crts: Vec<ClientRemoteTabs> = Vec::new();
+ let remote_clients: HashMap<String, RemoteClient> =
+ match self.get_meta::<String>(schema::REMOTE_CLIENTS_KEY) {
+ Err(e) => {
+ error_support::report_error!(
+ "tabs-read-remote",
+ "Failed to get remote clients: {}",
+ e
+ );
+ return None;
+ }
+ // We don't return early here since we still store tabs even if we don't
+ // "know" about the client it's associated with (incase it becomes available later)
+ Ok(None) => HashMap::default(),
+ Ok(Some(json)) => serde_json::from_str(&json).unwrap(),
+ };
+ for (record, last_modified) in records {
+ let id = record.id.clone();
+ let crt = if let Some(remote_client) = remote_clients.get(&id) {
+ ClientRemoteTabs::from_record_with_remote_client(
+ remote_client
+ .fxa_device_id
+ .as_ref()
+ .unwrap_or(&id)
+ .to_owned(),
+ last_modified,
+ remote_client,
+ record,
+ )
+ } else {
+ // A record with a device that's not in our remote clients seems unlikely, but
+ // could happen - in most cases though, it will be due to a disconnected client -
+ // so we really should consider just dropping it? (Sadly though, it does seem
+ // possible it's actually a very recently connected client, so we keep it)
+ // We should get rid of this eventually - https://github.com/mozilla/application-services/issues/5199
+ log::info!(
+ "Storing tabs from a client that doesn't appear in the devices list: {}",
+ id,
+ );
+ ClientRemoteTabs::from_record(id, last_modified, record)
+ };
+ crts.push(crt);
+ }
+ Some(crts)
+ }
+
+ // Keep DB from growing infinitely since we only ask for records since our last sync
+ // and may or may not know about the client it's associated with -- but we could at some point
+ // and should start returning those tabs immediately. If that client hasn't been seen in 3 weeks,
+ // we remove it until it reconnects
+ pub fn remove_stale_clients(&mut self) -> Result<()> {
+ let last_sync = self.get_meta::<i64>(schema::LAST_SYNC_META_KEY)?;
+ if let Some(conn) = self.open_if_exists()? {
+ if let Some(last_sync) = last_sync {
+ let client_ttl_ms = (TABS_CLIENT_TTL as i64) * 1000;
+ // On desktop, a quick write temporarily sets the last_sync to FAR_FUTURE
+ // but if it doesn't set it back to the original (crash, etc) it
+ // means we'll most likely trash all our records (as it's more than any TTL we'd ever do)
+ // so we need to detect this for now until we have native quick write support
+ if last_sync - client_ttl_ms >= 0 && last_sync != (FAR_FUTURE * 1000) {
+ let tx = conn.unchecked_transaction()?;
+ let num_removed = tx.execute_cached(
+ "DELETE FROM tabs WHERE last_modified <= :last_sync - :ttl",
+ rusqlite::named_params! {
+ ":last_sync": last_sync,
+ ":ttl": client_ttl_ms,
+ },
+ )?;
+ log::info!(
+ "removed {} stale clients (threshold was {})",
+ num_removed,
+ last_sync - client_ttl_ms
+ );
+ tx.commit()?;
+ }
+ }
+ }
+ Ok(())
+ }
+}
+
+impl TabsStorage {
+ pub(crate) fn replace_remote_tabs(
+ &mut self,
+ // This is a tuple because we need to know what the server reports
+ // as the last time a record was modified
+ new_remote_tabs: Vec<(TabsRecord, ServerTimestamp)>,
+ ) -> Result<()> {
+ let connection = self.open_or_create()?;
+ let tx = connection.unchecked_transaction()?;
+
+ // For tabs it's fine if we override the existing tabs for a remote
+ // there can only ever be one record for each client
+ for remote_tab in new_remote_tabs {
+ let record = remote_tab.0;
+ let last_modified = remote_tab.1;
+ log::info!(
+ "inserting tab for device {}, last modified at {}",
+ record.id,
+ last_modified.as_millis()
+ );
+ tx.execute_cached(
+ "INSERT OR REPLACE INTO tabs (guid, record, last_modified) VALUES (:guid, :record, :last_modified);",
+ rusqlite::named_params! {
+ ":guid": &record.id,
+ ":record": serde_json::to_string(&record).expect("tabs don't fail to serialize"),
+ ":last_modified": last_modified.as_millis()
+ },
+ )?;
+ }
+ tx.commit()?;
+ Ok(())
+ }
+
+ pub(crate) fn wipe_remote_tabs(&mut self) -> Result<()> {
+ if let Some(db) = self.open_if_exists()? {
+ db.execute_batch("DELETE FROM tabs")?;
+ }
+ Ok(())
+ }
+
+ pub(crate) fn wipe_local_tabs(&self) {
+ self.local_tabs.replace(None);
+ }
+
+ pub(crate) fn put_meta(&mut self, key: &str, value: &dyn ToSql) -> Result<()> {
+ let db = self.open_or_create()?;
+ db.execute_cached(
+ "REPLACE INTO moz_meta (key, value) VALUES (:key, :value)",
+ &[(":key", &key as &dyn ToSql), (":value", value)],
+ )?;
+ Ok(())
+ }
+
+ pub(crate) fn get_meta<T: FromSql>(&mut self, key: &str) -> Result<Option<T>> {
+ match self.open_if_exists() {
+ Ok(Some(db)) => {
+ let res = db.try_query_one(
+ "SELECT value FROM moz_meta WHERE key = :key",
+ &[(":key", &key)],
+ true,
+ )?;
+ Ok(res)
+ }
+ Err(e) => Err(e),
+ Ok(None) => Ok(None),
+ }
+ }
+
+ pub(crate) fn delete_meta(&mut self, key: &str) -> Result<()> {
+ if let Some(db) = self.open_if_exists()? {
+ db.execute_cached("DELETE FROM moz_meta WHERE key = :key", &[(":key", &key)])?;
+ }
+ Ok(())
+ }
+}
+
+// Trim the amount of tabs in a list to fit the specified memory size
+fn trim_tabs_length(tabs: &mut Vec<RemoteTab>, payload_size_max_bytes: usize) {
+ // Ported from https://searchfox.org/mozilla-central/rev/84fb1c4511312a0b9187f647d90059e3a6dd27f8/services/sync/modules/util.sys.mjs#422
+ // See bug 535326 comment 8 for an explanation of the estimation
+ let max_serialized_size = (payload_size_max_bytes / 4) * 3 - 1500;
+ let size = compute_serialized_size(tabs);
+ if size > max_serialized_size {
+ // Estimate a little more than the direct fraction to maximize packing
+ let cutoff = (tabs.len() * max_serialized_size) / size;
+ tabs.truncate(cutoff);
+
+ // Keep dropping off the last entry until the data fits.
+ while compute_serialized_size(tabs) > max_serialized_size {
+ tabs.pop();
+ }
+ }
+}
+
+fn compute_serialized_size(v: &Vec<RemoteTab>) -> usize {
+ serde_json::to_string(v).unwrap_or_default().len()
+}
+
+// Similar to places/utils.js
+// This method ensures we safely truncate a string up to a certain max_len while
+// respecting char bounds to prevent rust panics. If we do end up truncating, we
+// append an ellipsis to the string
+pub fn slice_up_to(s: String, max_len: usize) -> String {
+ if max_len >= s.len() {
+ return s;
+ }
+
+ let ellipsis = '\u{2026}';
+ // Ensure we leave space for the ellipsis while still being under the max
+ let mut idx = max_len - ellipsis.len_utf8();
+ while !s.is_char_boundary(idx) {
+ idx -= 1;
+ }
+ let mut new_str = s[..idx].to_string();
+ new_str.push(ellipsis);
+ new_str
+}
+
+// Try to keep in sync with https://searchfox.org/mozilla-central/rev/2ad13433da20a0749e1e9a10ec0ab49b987c2c8e/modules/libpref/init/all.js#3927
+fn is_url_syncable(url: &str) -> bool {
+ url.len() <= URI_LENGTH_MAX
+ && !(url.starts_with("about:")
+ || url.starts_with("resource:")
+ || url.starts_with("chrome:")
+ || url.starts_with("wyciwyg:")
+ || url.starts_with("blob:")
+ || url.starts_with("file:")
+ || url.starts_with("moz-extension:")
+ || url.starts_with("data:"))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::sync::record::TabsRecordTab;
+
+ #[test]
+ fn test_is_url_syncable() {
+ assert!(is_url_syncable("https://bobo.com"));
+ assert!(is_url_syncable("ftp://bobo.com"));
+ assert!(!is_url_syncable("about:blank"));
+ // XXX - this smells wrong - we should insist on a valid complete URL?
+ assert!(is_url_syncable("aboutbobo.com"));
+ assert!(!is_url_syncable("file:///Users/eoger/bobo"));
+ }
+
+ #[test]
+ fn test_open_if_exists_no_file() {
+ let dir = tempfile::tempdir().unwrap();
+ let db_name = dir.path().join("test_open_for_read_no_file.db");
+ let mut storage = TabsStorage::new(db_name.clone());
+ assert!(storage.open_if_exists().unwrap().is_none());
+ storage.open_or_create().unwrap(); // will have created it.
+ // make a new storage, but leave the file alone.
+ let mut storage = TabsStorage::new(db_name);
+ // db file exists, so opening for read should open it.
+ assert!(storage.open_if_exists().unwrap().is_some());
+ }
+
+ #[test]
+ fn test_tabs_meta() {
+ let dir = tempfile::tempdir().unwrap();
+ let db_name = dir.path().join("test_tabs_meta.db");
+ let mut db = TabsStorage::new(db_name);
+ let test_key = "TEST KEY A";
+ let test_value = "TEST VALUE A";
+ let test_key2 = "TEST KEY B";
+ let test_value2 = "TEST VALUE B";
+
+ // should automatically make the DB if one doesn't exist
+ db.put_meta(test_key, &test_value).unwrap();
+ db.put_meta(test_key2, &test_value2).unwrap();
+
+ let retrieved_value: String = db.get_meta(test_key).unwrap().expect("test value");
+ let retrieved_value2: String = db.get_meta(test_key2).unwrap().expect("test value 2");
+
+ assert_eq!(retrieved_value, test_value);
+ assert_eq!(retrieved_value2, test_value2);
+
+ // check that the value of an existing key can be updated
+ let test_value3 = "TEST VALUE C";
+ db.put_meta(test_key, &test_value3).unwrap();
+
+ let retrieved_value3: String = db.get_meta(test_key).unwrap().expect("test value 3");
+
+ assert_eq!(retrieved_value3, test_value3);
+
+ // check that a deleted key is not retrieved
+ db.delete_meta(test_key).unwrap();
+ let retrieved_value4: Option<String> = db.get_meta(test_key).unwrap();
+ assert!(retrieved_value4.is_none());
+ }
+
+ #[test]
+ fn test_prepare_local_tabs_for_upload() {
+ let mut storage = TabsStorage::new_with_mem_path("test_prepare_local_tabs_for_upload");
+ assert_eq!(storage.prepare_local_tabs_for_upload(), None);
+ storage.update_local_state(vec![
+ RemoteTab {
+ title: "".to_owned(),
+ url_history: vec!["about:blank".to_owned(), "https://foo.bar".to_owned()],
+ icon: None,
+ last_used: 0,
+ },
+ RemoteTab {
+ title: "".to_owned(),
+ url_history: vec![
+ "https://foo.bar".to_owned(),
+ "about:blank".to_owned(),
+ "about:blank".to_owned(),
+ "about:blank".to_owned(),
+ "about:blank".to_owned(),
+ "about:blank".to_owned(),
+ "about:blank".to_owned(),
+ "about:blank".to_owned(),
+ ],
+ icon: None,
+ last_used: 0,
+ },
+ RemoteTab {
+ title: "".to_owned(),
+ url_history: vec![
+ "https://foo.bar".to_owned(),
+ "about:blank".to_owned(),
+ "https://foo2.bar".to_owned(),
+ "https://foo3.bar".to_owned(),
+ "https://foo4.bar".to_owned(),
+ "https://foo5.bar".to_owned(),
+ "https://foo6.bar".to_owned(),
+ ],
+ icon: None,
+ last_used: 0,
+ },
+ RemoteTab {
+ title: "".to_owned(),
+ url_history: vec![],
+ icon: None,
+ last_used: 0,
+ },
+ ]);
+ assert_eq!(
+ storage.prepare_local_tabs_for_upload(),
+ Some(vec![
+ RemoteTab {
+ title: "".to_owned(),
+ url_history: vec!["https://foo.bar".to_owned()],
+ icon: None,
+ last_used: 0,
+ },
+ RemoteTab {
+ title: "".to_owned(),
+ url_history: vec![
+ "https://foo.bar".to_owned(),
+ "https://foo2.bar".to_owned(),
+ "https://foo3.bar".to_owned(),
+ "https://foo4.bar".to_owned(),
+ "https://foo5.bar".to_owned()
+ ],
+ icon: None,
+ last_used: 0,
+ },
+ ])
+ );
+ }
+ #[test]
+ fn test_trimming_tab_title() {
+ let mut storage = TabsStorage::new_with_mem_path("test_prepare_local_tabs_for_upload");
+ assert_eq!(storage.prepare_local_tabs_for_upload(), None);
+ storage.update_local_state(vec![RemoteTab {
+ title: "a".repeat(MAX_TITLE_CHAR_LENGTH + 10), // Fill a string more than max
+ url_history: vec!["https://foo.bar".to_owned()],
+ icon: None,
+ last_used: 0,
+ }]);
+ let ellipsis_char = '\u{2026}';
+ let mut truncated_title = "a".repeat(MAX_TITLE_CHAR_LENGTH - ellipsis_char.len_utf8());
+ truncated_title.push(ellipsis_char);
+ assert_eq!(
+ storage.prepare_local_tabs_for_upload(),
+ Some(vec![
+ // title trimmed to 50 characters
+ RemoteTab {
+ title: truncated_title, // title was trimmed to only max char length
+ url_history: vec!["https://foo.bar".to_owned()],
+ icon: None,
+ last_used: 0,
+ },
+ ])
+ );
+ }
+ #[test]
+ fn test_utf8_safe_title_trim() {
+ let mut storage = TabsStorage::new_with_mem_path("test_prepare_local_tabs_for_upload");
+ assert_eq!(storage.prepare_local_tabs_for_upload(), None);
+ storage.update_local_state(vec![
+ RemoteTab {
+ title: "😍".repeat(MAX_TITLE_CHAR_LENGTH + 10), // Fill a string more than max
+ url_history: vec!["https://foo.bar".to_owned()],
+ icon: None,
+ last_used: 0,
+ },
+ RemoteTab {
+ title: "を".repeat(MAX_TITLE_CHAR_LENGTH + 5), // Fill a string more than max
+ url_history: vec!["https://foo_jp.bar".to_owned()],
+ icon: None,
+ last_used: 0,
+ },
+ ]);
+ let ellipsis_char = '\u{2026}';
+ // (MAX_TITLE_CHAR_LENGTH - ellipsis / "😍" bytes)
+ let mut truncated_title = "😍".repeat(127);
+ // (MAX_TITLE_CHAR_LENGTH - ellipsis / "を" bytes)
+ let mut truncated_jp_title = "を".repeat(169);
+ truncated_title.push(ellipsis_char);
+ truncated_jp_title.push(ellipsis_char);
+ let remote_tabs = storage.prepare_local_tabs_for_upload().unwrap();
+ assert_eq!(
+ remote_tabs,
+ vec![
+ RemoteTab {
+ title: truncated_title, // title was trimmed to only max char length
+ url_history: vec!["https://foo.bar".to_owned()],
+ icon: None,
+ last_used: 0,
+ },
+ RemoteTab {
+ title: truncated_jp_title, // title was trimmed to only max char length
+ url_history: vec!["https://foo_jp.bar".to_owned()],
+ icon: None,
+ last_used: 0,
+ },
+ ]
+ );
+ // We should be less than max
+ assert!(remote_tabs[0].title.chars().count() <= MAX_TITLE_CHAR_LENGTH);
+ assert!(remote_tabs[1].title.chars().count() <= MAX_TITLE_CHAR_LENGTH);
+ }
+ #[test]
+ fn test_trim_tabs_length() {
+ let mut storage = TabsStorage::new_with_mem_path("test_prepare_local_tabs_for_upload");
+ assert_eq!(storage.prepare_local_tabs_for_upload(), None);
+ let mut too_many_tabs: Vec<RemoteTab> = Vec::new();
+ for n in 1..5000 {
+ too_many_tabs.push(RemoteTab {
+ title: "aaaa aaaa aaaa aaaa aaaa aaaa aaaa aaaa aaaa aaaa" //50 characters
+ .to_owned(),
+ url_history: vec![format!("https://foo{}.bar", n)],
+ icon: None,
+ last_used: 0,
+ });
+ }
+ let tabs_mem_size = compute_serialized_size(&too_many_tabs);
+ // ensure we are definitely over the payload limit
+ assert!(tabs_mem_size > MAX_PAYLOAD_SIZE);
+ // Add our over-the-limit tabs to the local state
+ storage.update_local_state(too_many_tabs.clone());
+ // prepare_local_tabs_for_upload did the trimming we needed to get under payload size
+ let tabs_to_upload = &storage.prepare_local_tabs_for_upload().unwrap();
+ assert!(compute_serialized_size(tabs_to_upload) <= MAX_PAYLOAD_SIZE);
+ }
+ // Helper struct to model what's stored in the DB
+ struct TabsSQLRecord {
+ guid: String,
+ record: TabsRecord,
+ last_modified: i64,
+ }
+ #[test]
+ fn test_remove_stale_clients() {
+ let dir = tempfile::tempdir().unwrap();
+ let db_name = dir.path().join("test_remove_stale_clients.db");
+ let mut storage = TabsStorage::new(db_name);
+ storage.open_or_create().unwrap();
+ assert!(storage.open_if_exists().unwrap().is_some());
+
+ let records = vec![
+ TabsSQLRecord {
+ guid: "device-1".to_string(),
+ record: TabsRecord {
+ id: "device-1".to_string(),
+ client_name: "Device #1".to_string(),
+ tabs: vec![TabsRecordTab {
+ title: "the title".to_string(),
+ url_history: vec!["https://mozilla.org/".to_string()],
+ icon: Some("https://mozilla.org/icon".to_string()),
+ last_used: 1643764207000,
+ }],
+ },
+ last_modified: 1643764207000,
+ },
+ TabsSQLRecord {
+ guid: "device-outdated".to_string(),
+ record: TabsRecord {
+ id: "device-outdated".to_string(),
+ client_name: "Device outdated".to_string(),
+ tabs: vec![TabsRecordTab {
+ title: "the title".to_string(),
+ url_history: vec!["https://mozilla.org/".to_string()],
+ icon: Some("https://mozilla.org/icon".to_string()),
+ last_used: 1643764207000,
+ }],
+ },
+ last_modified: 1443764207000, // old
+ },
+ ];
+ let db = storage.open_if_exists().unwrap().unwrap();
+ for record in records {
+ db.execute(
+ "INSERT INTO tabs (guid, record, last_modified) VALUES (:guid, :record, :last_modified);",
+ rusqlite::named_params! {
+ ":guid": &record.guid,
+ ":record": serde_json::to_string(&record.record).unwrap(),
+ ":last_modified": &record.last_modified,
+ },
+ ).unwrap();
+ }
+ // pretend we just synced
+ let last_synced = 1643764207000_i64;
+ storage
+ .put_meta(schema::LAST_SYNC_META_KEY, &last_synced)
+ .unwrap();
+ storage.remove_stale_clients().unwrap();
+
+ let remote_tabs = storage.get_remote_tabs().unwrap();
+ // We should've removed the outdated device
+ assert_eq!(remote_tabs.len(), 1);
+ // Assert the correct record is still being returned
+ assert_eq!(remote_tabs[0].client_id, "device-1");
+ }
+}
diff --git a/third_party/rust/tabs/src/store.rs b/third_party/rust/tabs/src/store.rs
new file mode 100644
index 0000000000..67bae498f7
--- /dev/null
+++ b/third_party/rust/tabs/src/store.rs
@@ -0,0 +1,41 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::storage::{ClientRemoteTabs, RemoteTab, TabsStorage};
+use std::path::Path;
+use std::sync::Mutex;
+
+pub struct TabsStore {
+ pub storage: Mutex<TabsStorage>,
+}
+
+impl TabsStore {
+ pub fn new(db_path: impl AsRef<Path>) -> Self {
+ Self {
+ storage: Mutex::new(TabsStorage::new(db_path)),
+ }
+ }
+
+ pub fn new_with_mem_path(db_path: &str) -> Self {
+ Self {
+ storage: Mutex::new(TabsStorage::new_with_mem_path(db_path)),
+ }
+ }
+
+ pub fn set_local_tabs(&self, local_state: Vec<RemoteTab>) {
+ self.storage.lock().unwrap().update_local_state(local_state);
+ }
+
+ // like remote_tabs, but serves the uniffi layer
+ pub fn get_all(&self) -> Vec<ClientRemoteTabs> {
+ match self.remote_tabs() {
+ Some(list) => list,
+ None => vec![],
+ }
+ }
+
+ pub fn remote_tabs(&self) -> Option<Vec<ClientRemoteTabs>> {
+ self.storage.lock().unwrap().get_remote_tabs()
+ }
+}
diff --git a/third_party/rust/tabs/src/sync/bridge.rs b/third_party/rust/tabs/src/sync/bridge.rs
new file mode 100644
index 0000000000..6e37a0cf5c
--- /dev/null
+++ b/third_party/rust/tabs/src/sync/bridge.rs
@@ -0,0 +1,436 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::error::TabsApiError;
+use crate::sync::engine::TabsSyncImpl;
+use crate::TabsStore;
+use anyhow::Result;
+use std::sync::{Arc, Mutex};
+use sync15::bso::{IncomingBso, OutgoingBso};
+use sync15::engine::{ApplyResults, BridgedEngine, CollSyncIds, EngineSyncAssociation};
+use sync15::{ClientData, ServerTimestamp};
+use sync_guid::Guid as SyncGuid;
+
+impl TabsStore {
+ // Returns a bridged sync engine for Desktop for this store.
+ pub fn bridged_engine(self: Arc<Self>) -> Arc<TabsBridgedEngine> {
+ let bridge_impl = crate::sync::bridge::BridgedEngineImpl::new(&self);
+ // This is a concrete struct exposed via uniffi.
+ let concrete = TabsBridgedEngine::new(bridge_impl);
+ Arc::new(concrete)
+ }
+}
+
+/// A bridged engine implements all the methods needed to make the
+/// `storage.sync` store work with Desktop's Sync implementation.
+/// Conceptually it's very similar to our SyncEngine, and once we have SyncEngine
+/// and BridgedEngine using the same types we can probably combine them (or at least
+/// implement this bridged engine purely in terms of SyncEngine)
+/// See also #2841 and #5139
+pub struct BridgedEngineImpl {
+ sync_impl: Mutex<TabsSyncImpl>,
+ incoming: Mutex<Vec<IncomingBso>>,
+}
+
+impl BridgedEngineImpl {
+ /// Creates a bridged engine for syncing.
+ pub fn new(store: &Arc<TabsStore>) -> Self {
+ Self {
+ sync_impl: Mutex::new(TabsSyncImpl::new(store.clone())),
+ incoming: Mutex::default(),
+ }
+ }
+}
+
+impl BridgedEngine for BridgedEngineImpl {
+ fn last_sync(&self) -> Result<i64> {
+ Ok(self
+ .sync_impl
+ .lock()
+ .unwrap()
+ .get_last_sync()?
+ .unwrap_or_default()
+ .as_millis())
+ }
+
+ fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> {
+ self.sync_impl
+ .lock()
+ .unwrap()
+ .set_last_sync(ServerTimestamp::from_millis(last_sync_millis))?;
+ Ok(())
+ }
+
+ fn sync_id(&self) -> Result<Option<String>> {
+ Ok(match self.sync_impl.lock().unwrap().get_sync_assoc()? {
+ EngineSyncAssociation::Connected(id) => Some(id.coll.to_string()),
+ EngineSyncAssociation::Disconnected => None,
+ })
+ }
+
+ fn reset_sync_id(&self) -> Result<String> {
+ let new_id = SyncGuid::random().to_string();
+ let new_coll_ids = CollSyncIds {
+ global: SyncGuid::empty(),
+ coll: new_id.clone().into(),
+ };
+ self.sync_impl
+ .lock()
+ .unwrap()
+ .reset(&EngineSyncAssociation::Connected(new_coll_ids))?;
+ Ok(new_id)
+ }
+
+ fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
+ let mut sync_impl = self.sync_impl.lock().unwrap();
+ let assoc = sync_impl.get_sync_assoc()?;
+ if matches!(assoc, EngineSyncAssociation::Connected(c) if c.coll == sync_id) {
+ log::debug!("ensure_current_sync_id is current");
+ } else {
+ let new_coll_ids = CollSyncIds {
+ global: SyncGuid::empty(),
+ coll: sync_id.into(),
+ };
+ sync_impl.reset(&EngineSyncAssociation::Connected(new_coll_ids))?;
+ }
+ Ok(sync_id.to_string())
+ }
+
+ fn prepare_for_sync(&self, client_data: &str) -> Result<()> {
+ let data: ClientData = serde_json::from_str(client_data)?;
+ Ok(self.sync_impl.lock().unwrap().prepare_for_sync(data)?)
+ }
+
+ fn sync_started(&self) -> Result<()> {
+ // This is a no-op for the Tabs Engine
+ Ok(())
+ }
+
+ fn store_incoming(&self, incoming: Vec<IncomingBso>) -> Result<()> {
+ // Store the incoming payload in memory so we can use it in apply
+ *(self.incoming.lock().unwrap()) = incoming;
+ Ok(())
+ }
+
+ fn apply(&self) -> Result<ApplyResults> {
+ let mut incoming = self.incoming.lock().unwrap();
+ // We've a reference to a Vec<> but it's owned by the mutex - swap the mutex owned
+ // value for an empty vec so we can consume the original.
+ let mut records = Vec::new();
+ std::mem::swap(&mut records, &mut *incoming);
+ let mut telem = sync15::telemetry::Engine::new("tabs");
+
+ let mut sync_impl = self.sync_impl.lock().unwrap();
+ let outgoing = sync_impl.apply_incoming(records, &mut telem)?;
+
+ Ok(ApplyResults {
+ records: outgoing,
+ num_reconciled: Some(0),
+ })
+ }
+
+ fn set_uploaded(&self, server_modified_millis: i64, ids: &[SyncGuid]) -> Result<()> {
+ Ok(self
+ .sync_impl
+ .lock()
+ .unwrap()
+ .sync_finished(ServerTimestamp::from_millis(server_modified_millis), ids)?)
+ }
+
+ fn sync_finished(&self) -> Result<()> {
+ *(self.incoming.lock().unwrap()) = Vec::default();
+ Ok(())
+ }
+
+ fn reset(&self) -> Result<()> {
+ self.sync_impl
+ .lock()
+ .unwrap()
+ .reset(&EngineSyncAssociation::Disconnected)?;
+ Ok(())
+ }
+
+ fn wipe(&self) -> Result<()> {
+ self.sync_impl.lock().unwrap().wipe()?;
+ Ok(())
+ }
+}
+
+// This is for uniffi to expose, and does nothing than delegate back to the trait.
+pub struct TabsBridgedEngine {
+ bridge_impl: BridgedEngineImpl,
+}
+
+impl TabsBridgedEngine {
+ pub fn new(bridge_impl: BridgedEngineImpl) -> Self {
+ Self { bridge_impl }
+ }
+
+ pub fn last_sync(&self) -> Result<i64> {
+ self.bridge_impl.last_sync()
+ }
+
+ pub fn set_last_sync(&self, last_sync: i64) -> Result<()> {
+ self.bridge_impl.set_last_sync(last_sync)
+ }
+
+ pub fn sync_id(&self) -> Result<Option<String>> {
+ self.bridge_impl.sync_id()
+ }
+
+ pub fn reset_sync_id(&self) -> Result<String> {
+ self.bridge_impl.reset_sync_id()
+ }
+
+ pub fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
+ self.bridge_impl.ensure_current_sync_id(sync_id)
+ }
+
+ pub fn prepare_for_sync(&self, client_data: &str) -> Result<()> {
+ self.bridge_impl.prepare_for_sync(client_data)
+ }
+
+ pub fn sync_started(&self) -> Result<()> {
+ self.bridge_impl.sync_started()
+ }
+
+ // Decode the JSON-encoded IncomingBso's that UniFFI passes to us
+ fn convert_incoming_bsos(&self, incoming: Vec<String>) -> Result<Vec<IncomingBso>> {
+ let mut bsos = Vec::with_capacity(incoming.len());
+ for inc in incoming {
+ bsos.push(serde_json::from_str::<IncomingBso>(&inc)?);
+ }
+ Ok(bsos)
+ }
+
+ // Encode OutgoingBso's into JSON for UniFFI
+ fn convert_outgoing_bsos(&self, outgoing: Vec<OutgoingBso>) -> Result<Vec<String>> {
+ let mut bsos = Vec::with_capacity(outgoing.len());
+ for e in outgoing {
+ bsos.push(serde_json::to_string(&e)?);
+ }
+ Ok(bsos)
+ }
+
+ pub fn store_incoming(&self, incoming: Vec<String>) -> Result<()> {
+ self.bridge_impl
+ .store_incoming(self.convert_incoming_bsos(incoming)?)
+ }
+
+ pub fn apply(&self) -> Result<Vec<String>> {
+ let apply_results = self.bridge_impl.apply()?;
+ self.convert_outgoing_bsos(apply_results.records)
+ }
+
+ pub fn set_uploaded(&self, server_modified_millis: i64, guids: Vec<SyncGuid>) -> Result<()> {
+ self.bridge_impl
+ .set_uploaded(server_modified_millis, &guids)
+ }
+
+ pub fn sync_finished(&self) -> Result<()> {
+ self.bridge_impl.sync_finished()
+ }
+
+ pub fn reset(&self) -> Result<()> {
+ self.bridge_impl.reset()
+ }
+
+ pub fn wipe(&self) -> Result<()> {
+ self.bridge_impl.wipe()
+ }
+}
+
+impl From<anyhow::Error> for TabsApiError {
+ fn from(value: anyhow::Error) -> Self {
+ TabsApiError::UnexpectedTabsError {
+ reason: value.to_string(),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::storage::{RemoteTab, TABS_CLIENT_TTL};
+ use crate::sync::record::TabsRecordTab;
+ use serde_json::json;
+ use std::collections::HashMap;
+ use sync15::{ClientData, DeviceType, RemoteClient};
+
+ // A copy of the normal "engine" tests but which go via the bridge
+ #[test]
+ fn test_sync_via_bridge() {
+ env_logger::try_init().ok();
+
+ let store = Arc::new(TabsStore::new_with_mem_path("test-bridge_incoming"));
+
+ // Set some local tabs for our device.
+ let my_tabs = vec![
+ RemoteTab {
+ title: "my first tab".to_string(),
+ url_history: vec!["http://1.com".to_string()],
+ icon: None,
+ last_used: 2,
+ },
+ RemoteTab {
+ title: "my second tab".to_string(),
+ url_history: vec!["http://2.com".to_string()],
+ icon: None,
+ last_used: 1,
+ },
+ ];
+ store.set_local_tabs(my_tabs.clone());
+
+ let bridge = store.bridged_engine();
+
+ let client_data = ClientData {
+ local_client_id: "my-device".to_string(),
+ recent_clients: HashMap::from([
+ (
+ "my-device".to_string(),
+ RemoteClient {
+ fxa_device_id: None,
+ device_name: "my device".to_string(),
+ device_type: sync15::DeviceType::Unknown,
+ },
+ ),
+ (
+ "device-no-tabs".to_string(),
+ RemoteClient {
+ fxa_device_id: None,
+ device_name: "device with no tabs".to_string(),
+ device_type: DeviceType::Unknown,
+ },
+ ),
+ (
+ "device-with-a-tab".to_string(),
+ RemoteClient {
+ fxa_device_id: None,
+ device_name: "device with a tab".to_string(),
+ device_type: DeviceType::Unknown,
+ },
+ ),
+ ]),
+ };
+ bridge
+ .prepare_for_sync(&serde_json::to_string(&client_data).unwrap())
+ .expect("should work");
+
+ let records = vec![
+ // my-device should be ignored by sync - here it is acting as what our engine last
+ // wrote, but the actual tabs in our store we set above are what should be used.
+ json!({
+ "id": "my-device",
+ "clientName": "my device",
+ "tabs": [{
+ "title": "the title",
+ "urlHistory": [
+ "https://mozilla.org/"
+ ],
+ "icon": "https://mozilla.org/icon",
+ "lastUsed": 1643764207
+ }]
+ }),
+ json!({
+ "id": "device-no-tabs",
+ "clientName": "device with no tabs",
+ "tabs": [],
+ }),
+ json!({
+ "id": "device-with-a-tab",
+ "clientName": "device with a tab",
+ "tabs": [{
+ "title": "the title",
+ "urlHistory": [
+ "https://mozilla.org/"
+ ],
+ "icon": "https://mozilla.org/icon",
+ "lastUsed": 1643764207
+ }]
+ }),
+ // This has the main payload as OK but the tabs part invalid.
+ json!({
+ "id": "device-with-invalid-tab",
+ "clientName": "device with a tab",
+ "tabs": [{
+ "foo": "bar",
+ }]
+ }),
+ // We want this to be a valid payload but an invalid tab - so it needs an ID.
+ json!({
+ "id": "invalid-tab",
+ "foo": "bar"
+ }),
+ ];
+
+ let mut incoming = Vec::new();
+ for record in records {
+ // Annoyingly we can't use `IncomingEnvelope` directly as it intentionally doesn't
+ // support Serialize - so need to use explicit json.
+ let envelope = json!({
+ "id": record.get("id"),
+ "modified": 0,
+ "payload": serde_json::to_string(&record).unwrap(),
+ });
+ incoming.push(serde_json::to_string(&envelope).unwrap());
+ }
+
+ bridge.store_incoming(incoming).expect("should store");
+
+ let out = bridge.apply().expect("should apply");
+
+ assert_eq!(out.len(), 1);
+ let ours = serde_json::from_str::<serde_json::Value>(&out[0]).unwrap();
+ // As above, can't use `OutgoingEnvelope` as it doesn't Deserialize.
+ // First, convert my_tabs from the local `RemoteTab` to the Sync specific `TabsRecord`
+ let expected_tabs: Vec<TabsRecordTab> =
+ my_tabs.into_iter().map(|t| t.to_record_tab()).collect();
+ let expected = json!({
+ "id": "my-device".to_string(),
+ "payload": json!({
+ "id": "my-device".to_string(),
+ "clientName": "my device",
+ "tabs": serde_json::to_value(expected_tabs).unwrap(),
+ }).to_string(),
+ "ttl": TABS_CLIENT_TTL,
+ });
+
+ assert_eq!(ours, expected);
+ bridge.set_uploaded(1234, vec![]).unwrap();
+ assert_eq!(bridge.last_sync().unwrap(), 1234);
+ }
+
+ #[test]
+ fn test_sync_meta() {
+ env_logger::try_init().ok();
+
+ let store = Arc::new(TabsStore::new_with_mem_path("test-meta"));
+ let bridge = store.bridged_engine();
+
+ // Should not error or panic
+ assert_eq!(bridge.last_sync().unwrap(), 0);
+ bridge.set_last_sync(3).unwrap();
+ assert_eq!(bridge.last_sync().unwrap(), 3);
+
+ assert!(bridge.sync_id().unwrap().is_none());
+
+ bridge.ensure_current_sync_id("some_guid").unwrap();
+ assert_eq!(bridge.sync_id().unwrap(), Some("some_guid".to_string()));
+ // changing the sync ID should reset the timestamp
+ assert_eq!(bridge.last_sync().unwrap(), 0);
+ bridge.set_last_sync(3).unwrap();
+
+ bridge.reset_sync_id().unwrap();
+ // should now be a random guid.
+ assert_ne!(bridge.sync_id().unwrap(), Some("some_guid".to_string()));
+ // should have reset the last sync timestamp.
+ assert_eq!(bridge.last_sync().unwrap(), 0);
+ bridge.set_last_sync(3).unwrap();
+
+ // `reset` clears the guid and the timestamp
+ bridge.reset().unwrap();
+ assert_eq!(bridge.last_sync().unwrap(), 0);
+ assert!(bridge.sync_id().unwrap().is_none());
+ }
+}
diff --git a/third_party/rust/tabs/src/sync/engine.rs b/third_party/rust/tabs/src/sync/engine.rs
new file mode 100644
index 0000000000..bc0978cbf2
--- /dev/null
+++ b/third_party/rust/tabs/src/sync/engine.rs
@@ -0,0 +1,543 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::schema;
+use crate::storage::{ClientRemoteTabs, RemoteTab, TABS_CLIENT_TTL};
+use crate::store::TabsStore;
+use crate::sync::record::{TabsRecord, TabsRecordTab};
+use crate::Result;
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex, Weak};
+use sync15::bso::{IncomingBso, OutgoingBso, OutgoingEnvelope};
+use sync15::engine::{
+ CollSyncIds, CollectionRequest, EngineSyncAssociation, IncomingChangeset, OutgoingChangeset,
+ SyncEngine, SyncEngineId,
+};
+use sync15::{telemetry, ClientData, DeviceType, RemoteClient, ServerTimestamp};
+use sync_guid::Guid;
+
+// Our "sync manager" will use whatever is stashed here.
+lazy_static::lazy_static! {
+ // Mutex: just taken long enough to update the inner stuff
+ static ref STORE_FOR_MANAGER: Mutex<Weak<TabsStore>> = Mutex::new(Weak::new());
+}
+
+/// Called by the sync manager to get a sync engine via the store previously
+/// registered with the sync manager.
+pub fn get_registered_sync_engine(engine_id: &SyncEngineId) -> Option<Box<dyn SyncEngine>> {
+ let weak = STORE_FOR_MANAGER.lock().unwrap();
+ match weak.upgrade() {
+ None => None,
+ Some(store) => match engine_id {
+ SyncEngineId::Tabs => Some(Box::new(TabsEngine::new(Arc::clone(&store)))),
+ // panicing here seems reasonable - it's a static error if this
+ // it hit, not something that runtime conditions can influence.
+ _ => unreachable!("can't provide unknown engine: {}", engine_id),
+ },
+ }
+}
+
+impl ClientRemoteTabs {
+ pub(crate) fn from_record_with_remote_client(
+ client_id: String,
+ last_modified: ServerTimestamp,
+ remote_client: &RemoteClient,
+ record: TabsRecord,
+ ) -> Self {
+ Self {
+ client_id,
+ client_name: remote_client.device_name.clone(),
+ device_type: remote_client.device_type,
+ last_modified: last_modified.as_millis(),
+ remote_tabs: record.tabs.iter().map(RemoteTab::from_record_tab).collect(),
+ }
+ }
+
+ // Note that this should die as part of https://github.com/mozilla/application-services/issues/5199
+ // If we don't have a `RemoteClient` record, then we don't know whether the ID passed here is
+ // the fxa_device_id (which is must be) or the client_id (which it will be if this ends up being
+ // called for desktop records, where client_id != fxa_device_id)
+ pub(crate) fn from_record(
+ client_id: String,
+ last_modified: ServerTimestamp,
+ record: TabsRecord,
+ ) -> Self {
+ Self {
+ client_id,
+ client_name: record.client_name,
+ device_type: DeviceType::Unknown,
+ last_modified: last_modified.as_millis(),
+ remote_tabs: record.tabs.iter().map(RemoteTab::from_record_tab).collect(),
+ }
+ }
+ fn to_record(&self) -> TabsRecord {
+ TabsRecord {
+ id: self.client_id.clone(),
+ client_name: self.client_name.clone(),
+ tabs: self
+ .remote_tabs
+ .iter()
+ .map(RemoteTab::to_record_tab)
+ .collect(),
+ }
+ }
+}
+
+impl RemoteTab {
+ pub(crate) fn from_record_tab(tab: &TabsRecordTab) -> Self {
+ Self {
+ title: tab.title.clone(),
+ url_history: tab.url_history.clone(),
+ icon: tab.icon.clone(),
+ last_used: tab.last_used.checked_mul(1000).unwrap_or_default(),
+ }
+ }
+ pub(super) fn to_record_tab(&self) -> TabsRecordTab {
+ TabsRecordTab {
+ title: self.title.clone(),
+ url_history: self.url_history.clone(),
+ icon: self.icon.clone(),
+ last_used: self.last_used.checked_div(1000).unwrap_or_default(),
+ }
+ }
+}
+
+// This is the implementation of syncing, which is used by the 2 different "sync engines"
+// (We hope to get these 2 engines even closer in the future, but for now, we suck this up)
+pub struct TabsSyncImpl {
+ pub(super) store: Arc<TabsStore>,
+ pub(super) local_id: String,
+}
+
+impl TabsSyncImpl {
+ pub fn new(store: Arc<TabsStore>) -> Self {
+ Self {
+ store,
+ local_id: Default::default(),
+ }
+ }
+
+ pub fn prepare_for_sync(&mut self, client_data: ClientData) -> Result<()> {
+ let mut storage = self.store.storage.lock().unwrap();
+ // We only know the client list at sync time, but need to return tabs potentially
+ // at any time -- so we store the clients in the meta table to be able to properly
+ // return a ClientRemoteTab struct
+ storage.put_meta(
+ schema::REMOTE_CLIENTS_KEY,
+ &serde_json::to_string(&client_data.recent_clients)?,
+ )?;
+ self.local_id = client_data.local_client_id;
+ Ok(())
+ }
+
+ pub fn apply_incoming(
+ &mut self,
+ inbound: Vec<IncomingBso>,
+ telem: &mut telemetry::Engine,
+ ) -> Result<Vec<OutgoingBso>> {
+ let local_id = self.local_id.clone();
+ let mut remote_tabs = Vec::with_capacity(inbound.len());
+ let mut incoming_telemetry = telemetry::EngineIncoming::new();
+
+ let remote_clients: HashMap<String, RemoteClient> = {
+ let mut storage = self.store.storage.lock().unwrap();
+ match storage.get_meta::<String>(schema::REMOTE_CLIENTS_KEY)? {
+ None => HashMap::default(),
+ Some(json) => serde_json::from_str(&json).unwrap(),
+ }
+ };
+ for incoming in inbound {
+ if incoming.envelope.id == local_id {
+ // That's our own record, ignore it.
+ continue;
+ }
+ let modified = incoming.envelope.modified;
+ let record = match incoming.into_content::<TabsRecord>().content() {
+ Some(record) => record,
+ None => {
+ // Invalid record or a "tombstone" which tabs don't have.
+ log::warn!("Ignoring incoming invalid tab");
+ incoming_telemetry.failed(1);
+ continue;
+ }
+ };
+ remote_tabs.push((record, modified));
+ }
+
+ // We want to keep the mutex for as short as possible
+ let local_tabs = {
+ let mut storage = self.store.storage.lock().unwrap();
+ // In desktop we might end up here with zero records when doing a quick-write, in
+ // which case we don't want to wipe the DB.
+ if !remote_tabs.is_empty() {
+ storage.replace_remote_tabs(remote_tabs)?;
+ }
+ storage.remove_stale_clients()?;
+ storage.prepare_local_tabs_for_upload()
+ };
+ let outgoing = if let Some(local_tabs) = local_tabs {
+ let (client_name, device_type) = remote_clients
+ .get(&local_id)
+ .map(|client| (client.device_name.clone(), client.device_type))
+ .unwrap_or_else(|| (String::new(), DeviceType::Unknown));
+ let local_record = ClientRemoteTabs {
+ client_id: local_id.clone(),
+ client_name,
+ device_type,
+ last_modified: 0, // ignored for outgoing records.
+ remote_tabs: local_tabs.to_vec(),
+ };
+ log::trace!("outgoing {:?}", local_record);
+ let envelope = OutgoingEnvelope {
+ id: local_id.into(),
+ ttl: Some(TABS_CLIENT_TTL),
+ ..Default::default()
+ };
+ vec![OutgoingBso::from_content(
+ envelope,
+ local_record.to_record(),
+ )?]
+ } else {
+ vec![]
+ };
+ telem.incoming(incoming_telemetry);
+ Ok(outgoing)
+ }
+
+ pub fn sync_finished(
+ &mut self,
+ new_timestamp: ServerTimestamp,
+ records_synced: &[Guid],
+ ) -> Result<()> {
+ log::info!(
+ "sync completed after uploading {} records",
+ records_synced.len()
+ );
+ self.set_last_sync(new_timestamp)?;
+ Ok(())
+ }
+
+ pub fn reset(&mut self, assoc: &EngineSyncAssociation) -> Result<()> {
+ self.set_last_sync(ServerTimestamp(0))?;
+ let mut storage = self.store.storage.lock().unwrap();
+ storage.delete_meta(schema::REMOTE_CLIENTS_KEY)?;
+ storage.wipe_remote_tabs()?;
+ match assoc {
+ EngineSyncAssociation::Disconnected => {
+ storage.delete_meta(schema::GLOBAL_SYNCID_META_KEY)?;
+ storage.delete_meta(schema::COLLECTION_SYNCID_META_KEY)?;
+ }
+ EngineSyncAssociation::Connected(ids) => {
+ storage.put_meta(schema::GLOBAL_SYNCID_META_KEY, &ids.global.to_string())?;
+ storage.put_meta(schema::COLLECTION_SYNCID_META_KEY, &ids.coll.to_string())?;
+ }
+ };
+ Ok(())
+ }
+
+ pub fn wipe(&mut self) -> Result<()> {
+ self.reset(&EngineSyncAssociation::Disconnected)?;
+ // not clear why we need to wipe the local tabs - the app is just going
+ // to re-add them?
+ self.store.storage.lock().unwrap().wipe_local_tabs();
+ Ok(())
+ }
+
+ pub fn get_sync_assoc(&self) -> Result<EngineSyncAssociation> {
+ let mut storage = self.store.storage.lock().unwrap();
+ let global = storage.get_meta::<String>(schema::GLOBAL_SYNCID_META_KEY)?;
+ let coll = storage.get_meta::<String>(schema::COLLECTION_SYNCID_META_KEY)?;
+ Ok(if let (Some(global), Some(coll)) = (global, coll) {
+ EngineSyncAssociation::Connected(CollSyncIds {
+ global: Guid::from_string(global),
+ coll: Guid::from_string(coll),
+ })
+ } else {
+ EngineSyncAssociation::Disconnected
+ })
+ }
+
+ pub fn set_last_sync(&self, last_sync: ServerTimestamp) -> Result<()> {
+ let mut storage = self.store.storage.lock().unwrap();
+ log::debug!("Updating last sync to {}", last_sync);
+ let last_sync_millis = last_sync.as_millis();
+ storage.put_meta(schema::LAST_SYNC_META_KEY, &last_sync_millis)
+ }
+
+ pub fn get_last_sync(&self) -> Result<Option<ServerTimestamp>> {
+ let mut storage = self.store.storage.lock().unwrap();
+ let millis = storage.get_meta::<i64>(schema::LAST_SYNC_META_KEY)?;
+ Ok(millis.map(ServerTimestamp))
+ }
+}
+
+// This is the "SyncEngine" used when syncing via the Sync Manager.
+pub struct TabsEngine {
+ pub sync_impl: Mutex<TabsSyncImpl>,
+}
+
+impl TabsEngine {
+ pub fn new(store: Arc<TabsStore>) -> Self {
+ Self {
+ sync_impl: Mutex::new(TabsSyncImpl::new(store)),
+ }
+ }
+}
+
+impl SyncEngine for TabsEngine {
+ fn collection_name(&self) -> std::borrow::Cow<'static, str> {
+ "tabs".into()
+ }
+
+ fn prepare_for_sync(&self, get_client_data: &dyn Fn() -> ClientData) -> anyhow::Result<()> {
+ Ok(self
+ .sync_impl
+ .lock()
+ .unwrap()
+ .prepare_for_sync(get_client_data())?)
+ }
+
+ fn apply_incoming(
+ &self,
+ inbound: Vec<IncomingChangeset>,
+ telem: &mut telemetry::Engine,
+ ) -> anyhow::Result<OutgoingChangeset> {
+ assert_eq!(inbound.len(), 1, "only requested one set of records");
+ let inbound = inbound.into_iter().next().unwrap();
+ let outgoing_records = self
+ .sync_impl
+ .lock()
+ .unwrap()
+ .apply_incoming(inbound.changes, telem)?;
+
+ Ok(OutgoingChangeset::new("tabs".into(), outgoing_records))
+ }
+
+ fn sync_finished(
+ &self,
+ new_timestamp: ServerTimestamp,
+ records_synced: Vec<Guid>,
+ ) -> anyhow::Result<()> {
+ Ok(self
+ .sync_impl
+ .lock()
+ .unwrap()
+ .sync_finished(new_timestamp, &records_synced)?)
+ }
+
+ fn get_collection_requests(
+ &self,
+ server_timestamp: ServerTimestamp,
+ ) -> anyhow::Result<Vec<CollectionRequest>> {
+ let since = self
+ .sync_impl
+ .lock()
+ .unwrap()
+ .get_last_sync()?
+ .unwrap_or_default();
+ Ok(if since == server_timestamp {
+ vec![]
+ } else {
+ vec![CollectionRequest::new("tabs".into())
+ .full()
+ .newer_than(since)]
+ })
+ }
+
+ fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> {
+ Ok(self.sync_impl.lock().unwrap().get_sync_assoc()?)
+ }
+
+ fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> {
+ Ok(self.sync_impl.lock().unwrap().reset(assoc)?)
+ }
+
+ fn wipe(&self) -> anyhow::Result<()> {
+ Ok(self.sync_impl.lock().unwrap().wipe()?)
+ }
+}
+
+impl crate::TabsStore {
+ // This allows the embedding app to say "make this instance available to
+ // the sync manager". The implementation is more like "offer to sync mgr"
+ // (thereby avoiding us needing to link with the sync manager) but
+ // `register_with_sync_manager()` is logically what's happening so that's
+ // the name it gets.
+ pub fn register_with_sync_manager(self: Arc<Self>) {
+ let mut state = STORE_FOR_MANAGER.lock().unwrap();
+ *state = Arc::downgrade(&self);
+ }
+}
+
+#[cfg(test)]
+pub mod test {
+ use super::*;
+ use serde_json::json;
+ use sync15::bso::IncomingBso;
+
+ #[test]
+ fn test_incoming_tabs() {
+ env_logger::try_init().ok();
+
+ let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path("test-incoming")));
+
+ let records = vec![
+ json!({
+ "id": "device-no-tabs",
+ "clientName": "device with no tabs",
+ "tabs": [],
+ }),
+ json!({
+ "id": "device-with-a-tab",
+ "clientName": "device with a tab",
+ "tabs": [{
+ "title": "the title",
+ "urlHistory": [
+ "https://mozilla.org/"
+ ],
+ "icon": "https://mozilla.org/icon",
+ "lastUsed": 1643764207
+ }]
+ }),
+ // test an updated payload will replace the previous record
+ json!({
+ "id": "device-with-a-tab",
+ "clientName": "device with an updated tab",
+ "tabs": [{
+ "title": "the new title",
+ "urlHistory": [
+ "https://mozilla.org/"
+ ],
+ "icon": "https://mozilla.org/icon",
+ "lastUsed": 1643764208
+ }]
+ }),
+ // This has the main payload as OK but the tabs part invalid.
+ json!({
+ "id": "device-with-invalid-tab",
+ "clientName": "device with a tab",
+ "tabs": [{
+ "foo": "bar",
+ }]
+ }),
+ // We want this to be a valid payload but an invalid tab - so it needs an ID.
+ json!({
+ "id": "invalid-tab",
+ "foo": "bar"
+ }),
+ ];
+
+ let incoming = IncomingChangeset::new_with_changes(
+ engine.collection_name(),
+ ServerTimestamp(0),
+ records
+ .into_iter()
+ .map(IncomingBso::from_test_content)
+ .collect(),
+ );
+ let outgoing = engine
+ .apply_incoming(vec![incoming], &mut telemetry::Engine::new("tabs"))
+ .expect("Should apply incoming and stage outgoing records");
+
+ assert!(outgoing.changes.is_empty());
+
+ // now check the store has what we think it has.
+ let sync_impl = engine.sync_impl.lock().unwrap();
+ let mut storage = sync_impl.store.storage.lock().unwrap();
+ let mut crts = storage.get_remote_tabs().expect("should work");
+ crts.sort_by(|a, b| a.client_name.partial_cmp(&b.client_name).unwrap());
+ assert_eq!(crts.len(), 2, "we currently include devices with no tabs");
+ let crt = &crts[0];
+ assert_eq!(crt.client_name, "device with an updated tab");
+ assert_eq!(crt.device_type, DeviceType::Unknown);
+ assert_eq!(crt.remote_tabs.len(), 1);
+ assert_eq!(crt.remote_tabs[0].title, "the new title");
+
+ let crt = &crts[1];
+ assert_eq!(crt.client_name, "device with no tabs");
+ assert_eq!(crt.device_type, DeviceType::Unknown);
+ assert_eq!(crt.remote_tabs.len(), 0);
+ }
+
+ #[test]
+ fn test_no_incoming_doesnt_write() {
+ env_logger::try_init().ok();
+
+ let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path(
+ "test_no_incoming_doesnt_write",
+ )));
+
+ let records = vec![json!({
+ "id": "device-with-a-tab",
+ "clientName": "device with a tab",
+ "tabs": [{
+ "title": "the title",
+ "urlHistory": [
+ "https://mozilla.org/"
+ ],
+ "icon": "https://mozilla.org/icon",
+ "lastUsed": 1643764207
+ }]
+ })];
+
+ let incoming = IncomingChangeset::new_with_changes(
+ engine.collection_name(),
+ ServerTimestamp(0),
+ records
+ .into_iter()
+ .map(IncomingBso::from_test_content)
+ .collect(),
+ );
+ engine
+ .apply_incoming(vec![incoming], &mut telemetry::Engine::new("tabs"))
+ .expect("Should apply incoming and stage outgoing records");
+
+ // now check the store has what we think it has.
+ {
+ let sync_impl = engine.sync_impl.lock().unwrap();
+ let mut storage = sync_impl.store.storage.lock().unwrap();
+ assert_eq!(storage.get_remote_tabs().expect("should work").len(), 1);
+ }
+
+ // Now another sync with zero incoming records, should still be able to get back
+ // our one client.
+ let incoming = IncomingChangeset::new_with_changes(
+ engine.collection_name(),
+ ServerTimestamp(0),
+ vec![],
+ );
+ engine
+ .apply_incoming(vec![incoming], &mut telemetry::Engine::new("tabs"))
+ .expect("Should succeed applying zero records");
+
+ {
+ let sync_impl = engine.sync_impl.lock().unwrap();
+ let mut storage = sync_impl.store.storage.lock().unwrap();
+ assert_eq!(storage.get_remote_tabs().expect("should work").len(), 1);
+ }
+ }
+
+ #[test]
+ fn test_sync_manager_registration() {
+ let store = Arc::new(TabsStore::new_with_mem_path("test-registration"));
+ assert_eq!(Arc::strong_count(&store), 1);
+ assert_eq!(Arc::weak_count(&store), 0);
+ Arc::clone(&store).register_with_sync_manager();
+ assert_eq!(Arc::strong_count(&store), 1);
+ assert_eq!(Arc::weak_count(&store), 1);
+ let registered = STORE_FOR_MANAGER
+ .lock()
+ .unwrap()
+ .upgrade()
+ .expect("should upgrade");
+ assert!(Arc::ptr_eq(&store, &registered));
+ drop(registered);
+ // should be no new references
+ assert_eq!(Arc::strong_count(&store), 1);
+ assert_eq!(Arc::weak_count(&store), 1);
+ // dropping the registered object should drop the registration.
+ drop(store);
+ assert!(STORE_FOR_MANAGER.lock().unwrap().upgrade().is_none());
+ }
+}
diff --git a/third_party/rust/tabs/src/sync/full_sync.rs b/third_party/rust/tabs/src/sync/full_sync.rs
new file mode 100644
index 0000000000..768aecce0f
--- /dev/null
+++ b/third_party/rust/tabs/src/sync/full_sync.rs
@@ -0,0 +1,70 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::{sync::engine::TabsSyncImpl, ApiResult, TabsEngine, TabsStore};
+use error_support::handle_error;
+use interrupt_support::NeverInterrupts;
+use std::sync::Arc;
+use sync15::client::{sync_multiple, MemoryCachedState, Sync15StorageClientInit};
+use sync15::engine::EngineSyncAssociation;
+use sync15::KeyBundle;
+
+impl TabsStore {
+ #[handle_error(crate::Error)]
+ pub fn reset(self: Arc<Self>) -> ApiResult<()> {
+ let mut sync_impl = TabsSyncImpl::new(Arc::clone(&self));
+ sync_impl.reset(&EngineSyncAssociation::Disconnected)?;
+ Ok(())
+ }
+
+ /// A convenience wrapper around sync_multiple.
+ #[handle_error(crate::Error)]
+ pub fn sync(
+ self: Arc<Self>,
+ key_id: String,
+ access_token: String,
+ sync_key: String,
+ tokenserver_url: String,
+ local_id: String,
+ ) -> ApiResult<String> {
+ let mut mem_cached_state = MemoryCachedState::default();
+ let engine = TabsEngine::new(Arc::clone(&self));
+
+ // Since we are syncing without the sync manager, there's no
+ // command processor, therefore no clients engine, and in
+ // consequence `TabsStore::prepare_for_sync` is never called
+ // which means our `local_id` will never be set.
+ // Do it here.
+ engine.sync_impl.lock().unwrap().local_id = local_id;
+
+ let storage_init = &Sync15StorageClientInit {
+ key_id,
+ access_token,
+ tokenserver_url: url::Url::parse(tokenserver_url.as_str())?,
+ };
+ let root_sync_key = &KeyBundle::from_ksync_base64(sync_key.as_str())?;
+
+ let mut result = sync_multiple(
+ &[&engine],
+ &mut None,
+ &mut mem_cached_state,
+ storage_init,
+ root_sync_key,
+ &NeverInterrupts,
+ None,
+ );
+
+ // for b/w compat reasons, we do some dances with the result.
+ // XXX - note that this means telemetry isn't going to be reported back
+ // to the app - we need to check with lockwise about whether they really
+ // need these failures to be reported or whether we can loosen this.
+ if let Err(e) = result.result {
+ return Err(e.into());
+ }
+ match result.engine_results.remove("tabs") {
+ None | Some(Ok(())) => Ok(serde_json::to_string(&result.telemetry)?),
+ Some(Err(e)) => Err(e.into()),
+ }
+ }
+}
diff --git a/third_party/rust/tabs/src/sync/mod.rs b/third_party/rust/tabs/src/sync/mod.rs
new file mode 100644
index 0000000000..7d668ff843
--- /dev/null
+++ b/third_party/rust/tabs/src/sync/mod.rs
@@ -0,0 +1,35 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+pub(crate) mod bridge;
+pub(crate) mod engine;
+pub(crate) mod record;
+
+#[cfg(feature = "full-sync")]
+pub mod full_sync;
+
+// When full-sync isn't enabled we need stub versions for these UDL exposed functions.
+#[cfg(not(feature = "full-sync"))]
+impl crate::TabsStore {
+ pub fn reset(self: std::sync::Arc<Self>) -> crate::error::ApiResult<()> {
+ log::warn!("reset: feature not enabled");
+ Err(crate::error::TabsApiError::SyncError {
+ reason: "reset".to_string(),
+ })
+ }
+
+ pub fn sync(
+ self: std::sync::Arc<Self>,
+ _key_id: String,
+ _access_token: String,
+ _sync_key: String,
+ _tokenserver_url: String,
+ _local_id: String,
+ ) -> crate::error::ApiResult<String> {
+ log::warn!("sync: feature not enabled");
+ Err(crate::error::TabsApiError::SyncError {
+ reason: "sync".to_string(),
+ })
+ }
+}
diff --git a/third_party/rust/tabs/src/sync/record.rs b/third_party/rust/tabs/src/sync/record.rs
new file mode 100644
index 0000000000..a1da1a33e8
--- /dev/null
+++ b/third_party/rust/tabs/src/sync/record.rs
@@ -0,0 +1,97 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use serde_derive::{Deserialize, Serialize};
+
+#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize, Default)]
+#[serde(rename_all = "camelCase")]
+pub struct TabsRecordTab {
+ pub title: String,
+ pub url_history: Vec<String>,
+ pub icon: Option<String>,
+ pub last_used: i64, // Seconds since epoch!
+}
+
+#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+// This struct mirrors what is stored on the server
+pub struct TabsRecord {
+ // `String` instead of `SyncGuid` because some IDs are FxA device ID (XXX - that doesn't
+ // matter though - this could easily be a Guid!)
+ pub id: String,
+ pub client_name: String,
+ pub tabs: Vec<TabsRecordTab>,
+}
+
+#[cfg(test)]
+pub mod test {
+ use super::*;
+ use serde_json::json;
+
+ #[test]
+ fn test_payload() {
+ let payload = json!({
+ "id": "JkeBPC50ZI0m",
+ "clientName": "client name",
+ "tabs": [{
+ "title": "the title",
+ "urlHistory": [
+ "https://mozilla.org/"
+ ],
+ "icon": "https://mozilla.org/icon",
+ "lastUsed": 1643764207
+ }]
+ });
+ let record: TabsRecord = serde_json::from_value(payload).expect("should work");
+ assert_eq!(record.id, "JkeBPC50ZI0m");
+ assert_eq!(record.client_name, "client name");
+ assert_eq!(record.tabs.len(), 1);
+ let tab = &record.tabs[0];
+ assert_eq!(tab.title, "the title");
+ assert_eq!(tab.icon, Some("https://mozilla.org/icon".to_string()));
+ assert_eq!(tab.last_used, 1643764207);
+ }
+
+ #[test]
+ fn test_roundtrip() {
+ let tab = TabsRecord {
+ id: "JkeBPC50ZI0m".into(),
+ client_name: "client name".into(),
+ tabs: vec![TabsRecordTab {
+ title: "the title".into(),
+ url_history: vec!["https://mozilla.org/".into()],
+ icon: Some("https://mozilla.org/icon".into()),
+ last_used: 1643764207,
+ }],
+ };
+ let round_tripped =
+ serde_json::from_value(serde_json::to_value(tab.clone()).unwrap()).unwrap();
+ assert_eq!(tab, round_tripped);
+ }
+
+ #[test]
+ fn test_extra_fields() {
+ let payload = json!({
+ "id": "JkeBPC50ZI0m",
+ // Let's say we agree on new tabs to record, we want old versions to
+ // ignore them!
+ "ignoredField": "??",
+ "clientName": "client name",
+ "tabs": [{
+ "title": "the title",
+ "urlHistory": [
+ "https://mozilla.org/"
+ ],
+ "icon": "https://mozilla.org/icon",
+ "lastUsed": 1643764207,
+ // Ditto - make sure we ignore unexpected fields in each tab.
+ "ignoredField": "??",
+ }]
+ });
+ let record: TabsRecord = serde_json::from_value(payload).unwrap();
+ // The point of this test is really just to ensure the deser worked, so
+ // just check the ID.
+ assert_eq!(record.id, "JkeBPC50ZI0m");
+ }
+}
diff --git a/third_party/rust/tabs/src/tabs.udl b/third_party/rust/tabs/src/tabs.udl
new file mode 100644
index 0000000000..d1964ea0d2
--- /dev/null
+++ b/third_party/rust/tabs/src/tabs.udl
@@ -0,0 +1,108 @@
+[Custom]
+typedef string TabsGuid;
+
+namespace tabs {
+
+};
+
+[Error]
+interface TabsApiError {
+ SyncError(string reason);
+ SqlError(string reason);
+ UnexpectedTabsError(string reason);
+};
+
+
+interface TabsStore {
+ constructor(string path);
+
+ sequence<ClientRemoteTabs> get_all();
+
+ void set_local_tabs(sequence<RemoteTabRecord> remote_tabs);
+
+ [Self=ByArc]
+ void register_with_sync_manager();
+
+ [Throws=TabsApiError, Self=ByArc]
+ void reset();
+
+ [Throws=TabsApiError, Self=ByArc]
+ string sync(string key_id, string access_token, string sync_key, string tokenserver_url, string local_id);
+
+ [Self=ByArc]
+ TabsBridgedEngine bridged_engine();
+
+};
+
+// Note that this enum is duplicated in fxa-client.udl (although the underlying type *is*
+// shared). This duplication exists because there's no direct dependency between that crate and
+// this one. We can probably remove the duplication when sync15 gets a .udl file, then we could
+// reference it via an `[Extern=...]typedef`
+enum TabsDeviceType { "Desktop", "Mobile", "Tablet", "VR", "TV", "Unknown" };
+
+dictionary RemoteTabRecord {
+ string title;
+ sequence<string> url_history;
+ string? icon;
+ // Number of ms since the unix epoch (as reported by the client's clock)
+ i64 last_used;
+};
+
+dictionary ClientRemoteTabs {
+ string client_id;
+ string client_name;
+ TabsDeviceType device_type;
+ // Number of ms since the unix epoch (as reported by the server's clock)
+ i64 last_modified;
+ sequence<RemoteTabRecord> remote_tabs;
+};
+
+// Note the canonical docs for this are in https://searchfox.org/mozilla-central/source/services/interfaces/mozIBridgedSyncEngine.idl
+// It's only actually used in desktop, but it's fine to expose this everywhere.
+// NOTE: all timestamps here are milliseconds.
+interface TabsBridgedEngine {
+ //readonly attribute long storageVersion;
+ // readonly attribute boolean allowSkippedRecord;
+
+ // XXX - better logging story than this?
+ // attribute mozIServicesLogSink logger;
+
+ [Throws=TabsApiError]
+ i64 last_sync();
+
+ [Throws=TabsApiError]
+ void set_last_sync(i64 last_sync);
+
+ [Throws=TabsApiError]
+ string? sync_id();
+
+ [Throws=TabsApiError]
+ string reset_sync_id();
+
+ [Throws=TabsApiError]
+ string ensure_current_sync_id([ByRef]string new_sync_id);
+
+ [Throws=TabsApiError]
+ void prepare_for_sync([ByRef]string client_data);
+
+ [Throws=TabsApiError]
+ void sync_started();
+
+ [Throws=TabsApiError]
+ void store_incoming(sequence<string> incoming_envelopes_as_json);
+
+ [Throws=TabsApiError]
+ sequence<string> apply();
+
+ [Throws=TabsApiError]
+ void set_uploaded(i64 new_timestamp, sequence<TabsGuid> uploaded_ids);
+
+ [Throws=TabsApiError]
+ void sync_finished();
+
+ [Throws=TabsApiError]
+ void reset();
+
+ [Throws=TabsApiError]
+ void wipe();
+};