summaryrefslogtreecommitdiffstats
path: root/third_party/rust/sync15/src/engine
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/sync15/src/engine')
-rw-r--r--third_party/rust/sync15/src/engine/bridged_engine.rs119
-rw-r--r--third_party/rust/sync15/src/engine/changeset.rs52
-rw-r--r--third_party/rust/sync15/src/engine/mod.rs38
-rw-r--r--third_party/rust/sync15/src/engine/request.rs125
-rw-r--r--third_party/rust/sync15/src/engine/sync_engine.rs235
5 files changed, 569 insertions, 0 deletions
diff --git a/third_party/rust/sync15/src/engine/bridged_engine.rs b/third_party/rust/sync15/src/engine/bridged_engine.rs
new file mode 100644
index 0000000000..c12780ba04
--- /dev/null
+++ b/third_party/rust/sync15/src/engine/bridged_engine.rs
@@ -0,0 +1,119 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use anyhow::Result;
+
+use crate::bso::{IncomingBso, OutgoingBso};
+use crate::Guid;
+
+/// A BridgedEngine acts as a bridge between application-services, rust
+/// implemented sync engines and sync engines as defined by Desktop Firefox.
+///
+/// [Desktop Firefox has an abstract implementation of a Sync
+/// Engine](https://searchfox.org/mozilla-central/source/services/sync/modules/engines.js)
+/// with a number of functions each engine is expected to override. Engines
+/// implemented in Rust use a different shape (specifically, the
+/// [SyncEngine](crate::SyncEngine) trait), so this BridgedEngine trait adapts
+/// between the 2.
+pub trait BridgedEngine: Send + Sync {
+ /// Returns the last sync time, in milliseconds, for this engine's
+ /// collection. This is called before each sync, to determine the lower
+ /// bound for new records to fetch from the server.
+ fn last_sync(&self) -> Result<i64>;
+
+ /// Sets the last sync time, in milliseconds. This is called throughout
+ /// the sync, to fast-forward the stored last sync time to match the
+ /// timestamp on the uploaded records.
+ fn set_last_sync(&self, last_sync_millis: i64) -> Result<()>;
+
+ /// Returns the sync ID for this engine's collection. This is only used in
+ /// tests.
+ fn sync_id(&self) -> Result<Option<String>>;
+
+ /// Resets the sync ID for this engine's collection, returning the new ID.
+ /// As a side effect, implementations should reset all local Sync state,
+ /// as in `reset`.
+ fn reset_sync_id(&self) -> Result<String>;
+
+ /// Ensures that the locally stored sync ID for this engine's collection
+ /// matches the `new_sync_id` from the server. If the two don't match,
+ /// implementations should reset all local Sync state, as in `reset`.
+ /// This method returns the assigned sync ID, which can be either the
+ /// `new_sync_id`, or a different one if the engine wants to force other
+ /// devices to reset their Sync state for this collection the next time they
+ /// sync.
+ fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String>;
+
+ /// Tells the tabs engine about recent FxA devices. A bit of a leaky abstration as it only
+ /// makes sense for tabs.
+ /// The arg is a json serialized `ClientData` struct.
+ fn prepare_for_sync(&self, _client_data: &str) -> Result<()> {
+ Ok(())
+ }
+
+ /// Indicates that the engine is about to start syncing. This is called
+ /// once per sync, and always before `store_incoming`.
+ fn sync_started(&self) -> Result<()>;
+
+ /// Stages a batch of incoming Sync records. This is called multiple
+ /// times per sync, once for each batch. Implementations can use the
+ /// signal to check if the operation was aborted, and cancel any
+ /// pending work.
+ fn store_incoming(&self, incoming_records: Vec<IncomingBso>) -> Result<()>;
+
+ /// Applies all staged records, reconciling changes on both sides and
+ /// resolving conflicts. Returns a list of records to upload.
+ fn apply(&self) -> Result<ApplyResults>;
+
+ /// Indicates that the given record IDs were uploaded successfully to the
+ /// server. This is called multiple times per sync, once for each batch
+ /// upload.
+ fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<()>;
+
+ /// Indicates that all records have been uploaded. At this point, any record
+ /// IDs marked for upload that haven't been passed to `set_uploaded`, can be
+ /// assumed to have failed: for example, because the server rejected a record
+ /// with an invalid TTL or sort index.
+ fn sync_finished(&self) -> Result<()>;
+
+ /// Resets all local Sync state, including any change flags, mirrors, and
+ /// the last sync time, such that the next sync is treated as a first sync
+ /// with all new local data. Does not erase any local user data.
+ fn reset(&self) -> Result<()>;
+
+ /// Erases all local user data for this collection, and any Sync metadata.
+ /// This method is destructive, and unused for most collections.
+ fn wipe(&self) -> Result<()>;
+}
+
+// TODO: We should replace this with OutgoingChangeset to reduce the number
+// of types engines need to deal with.
+#[derive(Debug, Default)]
+pub struct ApplyResults {
+ /// List of records
+ pub records: Vec<OutgoingBso>,
+ /// The number of incoming records whose contents were merged because they
+ /// changed on both sides. None indicates we aren't reporting this
+ /// information.
+ pub num_reconciled: Option<usize>,
+}
+
+impl ApplyResults {
+ pub fn new(records: Vec<OutgoingBso>, num_reconciled: impl Into<Option<usize>>) -> Self {
+ Self {
+ records,
+ num_reconciled: num_reconciled.into(),
+ }
+ }
+}
+
+// Shorthand for engines that don't care.
+impl From<Vec<OutgoingBso>> for ApplyResults {
+ fn from(records: Vec<OutgoingBso>) -> Self {
+ Self {
+ records,
+ num_reconciled: None,
+ }
+ }
+}
diff --git a/third_party/rust/sync15/src/engine/changeset.rs b/third_party/rust/sync15/src/engine/changeset.rs
new file mode 100644
index 0000000000..c5a4e65fcd
--- /dev/null
+++ b/third_party/rust/sync15/src/engine/changeset.rs
@@ -0,0 +1,52 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use crate::bso::{IncomingBso, OutgoingBso};
+use crate::{CollectionName, ServerTimestamp};
+
+// Incoming and Outgoing changesets are almost identical except for the timestamp.
+// Separate types still helps avoid confusion with that timestamp, so they're split.
+#[derive(Debug)]
+pub struct IncomingChangeset {
+ pub changes: Vec<IncomingBso>,
+ /// The server timestamp of the collection.
+ pub timestamp: ServerTimestamp,
+ pub collection: CollectionName,
+}
+
+impl IncomingChangeset {
+ #[inline]
+ pub fn new(collection: CollectionName, timestamp: ServerTimestamp) -> Self {
+ Self::new_with_changes(collection, timestamp, Vec::new())
+ }
+
+ #[inline]
+ pub fn new_with_changes(
+ collection: CollectionName,
+ timestamp: ServerTimestamp,
+ changes: Vec<IncomingBso>,
+ ) -> Self {
+ Self {
+ changes,
+ timestamp,
+ collection,
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct OutgoingChangeset {
+ pub changes: Vec<OutgoingBso>,
+ pub collection: CollectionName,
+}
+
+impl OutgoingChangeset {
+ #[inline]
+ pub fn new(collection: CollectionName, changes: Vec<OutgoingBso>) -> Self {
+ Self {
+ collection,
+ changes,
+ }
+ }
+}
diff --git a/third_party/rust/sync15/src/engine/mod.rs b/third_party/rust/sync15/src/engine/mod.rs
new file mode 100644
index 0000000000..427b779c9b
--- /dev/null
+++ b/third_party/rust/sync15/src/engine/mod.rs
@@ -0,0 +1,38 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+//! This module is used by crates which need to implement a "sync engine".
+//! At a high-level, a "sync engine" is code which knows how to take records
+//! from a sync server, apply and reconcile them with the local data, then
+//! provide records which should be uploaded to the server.
+//!
+//! Note that the "sync engine" does not itself talk to the server, nor does
+//! it manage the state of the remote server, nor does it do any of the
+//! encryption/decryption - that is the responsbility of the "sync client", as
+//! implemented in the [client] module (or in some cases, implemented externally)
+//!
+//! There are currently 2 types of engine:
+//! * Code which implements the [crate::engine::sync_engine::SyncEngine]
+//! trait. These are the "original" Rust engines, designed to be used with
+//! the [crate::client](sync client)
+//! * Code which implements the [crate::engine::bridged_engine::BridgedEngine]
+//! trait. These engines are a "bridge" between the Desktop JS Sync world and
+//! this rust code.
+//! While these engines end up doing the same thing, the difference is due to
+//! implementation differences between the Desktop Sync client and the Rust
+//! client.
+//! We intend merging these engines - the first step will be to merge the
+//! types and payload management used by these traits, then to combine the
+//! requirements into a single trait that captures both use-cases.
+mod bridged_engine;
+mod changeset;
+mod request;
+mod sync_engine;
+
+pub use bridged_engine::{ApplyResults, BridgedEngine};
+pub use changeset::{IncomingChangeset, OutgoingChangeset};
+#[cfg(feature = "sync-client")]
+pub(crate) use request::CollectionPost;
+pub use request::{CollectionRequest, RequestOrder};
+pub use sync_engine::{CollSyncIds, EngineSyncAssociation, SyncEngine, SyncEngineId};
diff --git a/third_party/rust/sync15/src/engine/request.rs b/third_party/rust/sync15/src/engine/request.rs
new file mode 100644
index 0000000000..7d634bb5e7
--- /dev/null
+++ b/third_party/rust/sync15/src/engine/request.rs
@@ -0,0 +1,125 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+use crate::{CollectionName, Guid, ServerTimestamp};
+#[derive(Debug, Default, Clone, PartialEq, Eq)]
+pub struct CollectionRequest {
+ pub collection: CollectionName,
+ pub full: bool,
+ pub ids: Option<Vec<Guid>>,
+
+ pub limit: Option<RequestLimit>,
+ pub older: Option<ServerTimestamp>,
+ pub newer: Option<ServerTimestamp>,
+}
+
+impl CollectionRequest {
+ #[inline]
+ pub fn new(collection: CollectionName) -> CollectionRequest {
+ CollectionRequest {
+ collection,
+ ..Default::default()
+ }
+ }
+
+ #[inline]
+ pub fn ids<V>(mut self, v: V) -> CollectionRequest
+ where
+ V: IntoIterator,
+ V::Item: Into<Guid>,
+ {
+ self.ids = Some(v.into_iter().map(|id| id.into()).collect());
+ self
+ }
+
+ #[inline]
+ pub fn full(mut self) -> CollectionRequest {
+ self.full = true;
+ self
+ }
+
+ #[inline]
+ pub fn older_than(mut self, ts: ServerTimestamp) -> CollectionRequest {
+ self.older = Some(ts);
+ self
+ }
+
+ #[inline]
+ pub fn newer_than(mut self, ts: ServerTimestamp) -> CollectionRequest {
+ self.newer = Some(ts);
+ self
+ }
+
+ #[inline]
+ pub fn limit(mut self, num: usize, order: RequestOrder) -> CollectionRequest {
+ self.limit = Some(RequestLimit { num, order });
+ self
+ }
+}
+
+// This is just used interally - consumers just provide the content, not request params.
+#[cfg(feature = "sync-client")]
+#[derive(Debug, Default, Clone, PartialEq, Eq)]
+pub(crate) struct CollectionPost {
+ pub collection: CollectionName,
+ pub commit: bool,
+ pub batch: Option<String>,
+}
+
+#[cfg(feature = "sync-client")]
+impl CollectionPost {
+ #[inline]
+ pub fn new(collection: CollectionName) -> Self {
+ Self {
+ collection,
+ ..Default::default()
+ }
+ }
+
+ #[inline]
+ pub fn batch(mut self, batch: Option<String>) -> Self {
+ self.batch = batch;
+ self
+ }
+
+ #[inline]
+ pub fn commit(mut self, v: bool) -> Self {
+ self.commit = v;
+ self
+ }
+}
+
+// Asking for the order of records only makes sense if you are limiting them
+// in some way - consumers don't care about the order otherwise as everything
+// is processed as a set.
+#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub enum RequestOrder {
+ Oldest,
+ Newest,
+ Index,
+}
+
+impl RequestOrder {
+ #[inline]
+ pub fn as_str(self) -> &'static str {
+ match self {
+ RequestOrder::Oldest => "oldest",
+ RequestOrder::Newest => "newest",
+ RequestOrder::Index => "index",
+ }
+ }
+}
+
+impl std::fmt::Display for RequestOrder {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str(self.as_str())
+ }
+}
+
+// If you specify a numerical limit you must provide the order so backfilling
+// is possible (ie, so you know which ones you got!)
+#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct RequestLimit {
+ pub(crate) num: usize,
+ pub(crate) order: RequestOrder,
+}
diff --git a/third_party/rust/sync15/src/engine/sync_engine.rs b/third_party/rust/sync15/src/engine/sync_engine.rs
new file mode 100644
index 0000000000..79b9b1a522
--- /dev/null
+++ b/third_party/rust/sync15/src/engine/sync_engine.rs
@@ -0,0 +1,235 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use super::{CollectionRequest, IncomingChangeset, OutgoingChangeset};
+use crate::client_types::ClientData;
+use crate::{telemetry, CollectionName, Guid, ServerTimestamp};
+use anyhow::Result;
+use std::fmt;
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CollSyncIds {
+ pub global: Guid,
+ pub coll: Guid,
+}
+
+/// Defines how an engine is associated with a particular set of records
+/// on a sync storage server. It's either disconnected, or believes it is
+/// connected with a specific set of GUIDs. If the server and the engine don't
+/// agree on the exact GUIDs, the engine will assume something radical happened
+/// so it can't believe anything it thinks it knows about the state of the
+/// server (ie, it will "reset" then do a full reconcile)
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum EngineSyncAssociation {
+ /// This store is disconnected (although it may be connected in the future).
+ Disconnected,
+ /// Sync is connected, and has the following sync IDs.
+ Connected(CollSyncIds),
+}
+
+/// The concrete `SyncEngine` implementations
+#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
+pub enum SyncEngineId {
+ // Note that we've derived PartialOrd etc, which uses lexicographic ordering
+ // of the variants. We leverage that such that the higher priority engines
+ // are listed first.
+ // This order matches desktop.
+ Passwords,
+ Tabs,
+ Bookmarks,
+ Addresses,
+ CreditCards,
+ History,
+}
+
+impl SyncEngineId {
+ // Iterate over all possible engines. Note that we've made a policy decision
+ // that this should enumerate in "order" as defined by PartialCmp, and tests
+ // enforce this.
+ pub fn iter() -> impl Iterator<Item = SyncEngineId> {
+ [
+ Self::Passwords,
+ Self::Tabs,
+ Self::Bookmarks,
+ Self::Addresses,
+ Self::CreditCards,
+ Self::History,
+ ]
+ .into_iter()
+ }
+
+ // Get the string identifier for this engine. This must match the strings in SyncEngineSelection.
+ pub fn name(&self) -> &'static str {
+ match self {
+ Self::Passwords => "passwords",
+ Self::History => "history",
+ Self::Bookmarks => "bookmarks",
+ Self::Tabs => "tabs",
+ Self::Addresses => "addresses",
+ Self::CreditCards => "creditcards",
+ }
+ }
+}
+
+impl fmt::Display for SyncEngineId {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "{}", self.name())
+ }
+}
+
+impl TryFrom<&str> for SyncEngineId {
+ type Error = String;
+
+ fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
+ match value {
+ "passwords" => Ok(Self::Passwords),
+ "history" => Ok(Self::History),
+ "bookmarks" => Ok(Self::Bookmarks),
+ "tabs" => Ok(Self::Tabs),
+ "addresses" => Ok(Self::Addresses),
+ "creditcards" => Ok(Self::CreditCards),
+ _ => Err(value.into()),
+ }
+ }
+}
+
+/// A "sync engine" is a thing that knows how to sync. It's often implemented
+/// by a "store" (which is the generic term responsible for all storage
+/// associated with a component, including storage required for sync.)
+///
+/// Low-level engine functionality. Engines that need custom reconciliation
+/// logic should use this.
+///
+/// Different engines will produce errors of different types. To accommodate
+/// this, we force them all to return anyhow::Error.
+pub trait SyncEngine {
+ fn collection_name(&self) -> CollectionName;
+
+ /// Prepares the engine for syncing. The tabs engine currently uses this to
+ /// store the current list of clients, which it uses to look up device names
+ /// and types.
+ ///
+ /// Note that this method is only called by `sync_multiple`, and only if a
+ /// command processor is registered. In particular, `prepare_for_sync` will
+ /// not be called if the store is synced using `sync::synchronize` or
+ /// `sync_multiple::sync_multiple`. It _will_ be called if the store is
+ /// synced via the Sync Manager.
+ ///
+ /// TODO(issue #2590): This is pretty cludgey and will be hard to extend for
+ /// any case other than the tabs case. We should find another way to support
+ /// tabs...
+ fn prepare_for_sync(&self, _get_client_data: &dyn Fn() -> ClientData) -> Result<()> {
+ Ok(())
+ }
+
+ /// Tells the engine what the local encryption key is for the data managed
+ /// by the engine. This is only used by collections that store data
+ /// encrypted locally and is unrelated to the encryption used by Sync.
+ /// The intent is that for such collections, this key can be used to
+ /// decrypt local data before it is re-encrypted by Sync and sent to the
+ /// storage servers, and similarly, data from the storage servers will be
+ /// decrypted by Sync, then encrypted by the local encryption key before
+ /// being added to the local database.
+ ///
+ /// The expectation is that the key value is being maintained by the
+ /// embedding application in some secure way suitable for the environment
+ /// in which the app is running - eg, the OS "keychain". The value of the
+ /// key is implementation dependent - it is expected that the engine and
+ /// embedding application already have some external agreement about how
+ /// to generate keys and in what form they are exchanged. Finally, there's
+ /// an assumption that sync engines are short-lived and only live for a
+ /// single sync - this means that sync doesn't hold on to the key for an
+ /// extended period.
+ ///
+ /// This will panic if called by an engine that doesn't have explicit
+ /// support for local encryption keys as that implies a degree of confusion
+ /// which shouldn't be possible to ignore.
+ fn set_local_encryption_key(&mut self, _key: &str) -> Result<()> {
+ unimplemented!("This engine does not support local encryption");
+ }
+
+ /// `inbound` is a vector to support the case where
+ /// `get_collection_requests` returned multiple requests. The changesets are
+ /// in the same order as the requests were -- e.g. if `vec![req_a, req_b]`
+ /// was returned from `get_collection_requests`, `inbound` will have the
+ /// results from `req_a` as its first index, and those from `req_b` as it's
+ /// second.
+ fn apply_incoming(
+ &self,
+ inbound: Vec<IncomingChangeset>,
+ telem: &mut telemetry::Engine,
+ ) -> Result<OutgoingChangeset>;
+
+ fn sync_finished(
+ &self,
+ new_timestamp: ServerTimestamp,
+ records_synced: Vec<Guid>,
+ ) -> Result<()>;
+
+ /// The engine is responsible for building collection requests. Engines
+ /// typically will store a lastModified timestamp and use that to build a
+ /// request saying "give me full records since that date" - however, other
+ /// engines might do something fancier. This could even later be extended to
+ /// handle "backfills" etc
+ ///
+ /// To support more advanced use cases, multiple requests can be returned
+ /// here - either from the same or different collections. The vast majority
+ /// of engines will just want to return zero or one item in their vector
+ /// (zero is a valid optimization when the server timestamp is the same as
+ /// the engine last saw, one when it is not)
+ ///
+ /// Important: In the case when more than one collection is requested, it's
+ /// assumed the last one is the "canonical" one. (That is, it must be for
+ /// "this" collection, its timestamp is used to represent the sync, etc).
+ /// (Note that multiple collection request support is currently unused, so
+ /// it might make sense to delete it - if we need it later, we may find a
+ /// better shape for our use-case)
+ fn get_collection_requests(
+ &self,
+ server_timestamp: ServerTimestamp,
+ ) -> Result<Vec<CollectionRequest>>;
+
+ /// Get persisted sync IDs. If they don't match the global state we'll be
+ /// `reset()` with the new IDs.
+ fn get_sync_assoc(&self) -> Result<EngineSyncAssociation>;
+
+ /// Reset the engine (and associated store) without wiping local data,
+ /// ready for a "first sync".
+ /// `assoc` defines how this store is to be associated with sync.
+ fn reset(&self, assoc: &EngineSyncAssociation) -> Result<()>;
+
+ fn wipe(&self) -> Result<()>;
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use std::iter::zip;
+
+ #[test]
+ fn test_engine_priority() {
+ fn sorted(mut engines: Vec<SyncEngineId>) -> Vec<SyncEngineId> {
+ engines.sort();
+ engines
+ }
+ assert_eq!(
+ vec![SyncEngineId::Passwords, SyncEngineId::Tabs],
+ sorted(vec![SyncEngineId::Passwords, SyncEngineId::Tabs])
+ );
+ assert_eq!(
+ vec![SyncEngineId::Passwords, SyncEngineId::Tabs],
+ sorted(vec![SyncEngineId::Tabs, SyncEngineId::Passwords])
+ );
+ }
+
+ #[test]
+ fn test_engine_enum_order() {
+ let unsorted = SyncEngineId::iter().collect::<Vec<SyncEngineId>>();
+ let mut sorted = SyncEngineId::iter().collect::<Vec<SyncEngineId>>();
+ sorted.sort();
+
+ // iterating should supply identical elements in each.
+ assert!(zip(unsorted, sorted).fold(true, |acc, (a, b)| acc && (a == b)))
+ }
+}