diff options
Diffstat (limited to 'third_party/rust/sync15/src/engine')
-rw-r--r-- | third_party/rust/sync15/src/engine/bridged_engine.rs | 119 | ||||
-rw-r--r-- | third_party/rust/sync15/src/engine/changeset.rs | 52 | ||||
-rw-r--r-- | third_party/rust/sync15/src/engine/mod.rs | 38 | ||||
-rw-r--r-- | third_party/rust/sync15/src/engine/request.rs | 125 | ||||
-rw-r--r-- | third_party/rust/sync15/src/engine/sync_engine.rs | 235 |
5 files changed, 569 insertions, 0 deletions
diff --git a/third_party/rust/sync15/src/engine/bridged_engine.rs b/third_party/rust/sync15/src/engine/bridged_engine.rs new file mode 100644 index 0000000000..c12780ba04 --- /dev/null +++ b/third_party/rust/sync15/src/engine/bridged_engine.rs @@ -0,0 +1,119 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use anyhow::Result; + +use crate::bso::{IncomingBso, OutgoingBso}; +use crate::Guid; + +/// A BridgedEngine acts as a bridge between application-services, rust +/// implemented sync engines and sync engines as defined by Desktop Firefox. +/// +/// [Desktop Firefox has an abstract implementation of a Sync +/// Engine](https://searchfox.org/mozilla-central/source/services/sync/modules/engines.js) +/// with a number of functions each engine is expected to override. Engines +/// implemented in Rust use a different shape (specifically, the +/// [SyncEngine](crate::SyncEngine) trait), so this BridgedEngine trait adapts +/// between the 2. +pub trait BridgedEngine: Send + Sync { + /// Returns the last sync time, in milliseconds, for this engine's + /// collection. This is called before each sync, to determine the lower + /// bound for new records to fetch from the server. + fn last_sync(&self) -> Result<i64>; + + /// Sets the last sync time, in milliseconds. This is called throughout + /// the sync, to fast-forward the stored last sync time to match the + /// timestamp on the uploaded records. + fn set_last_sync(&self, last_sync_millis: i64) -> Result<()>; + + /// Returns the sync ID for this engine's collection. This is only used in + /// tests. + fn sync_id(&self) -> Result<Option<String>>; + + /// Resets the sync ID for this engine's collection, returning the new ID. + /// As a side effect, implementations should reset all local Sync state, + /// as in `reset`. + fn reset_sync_id(&self) -> Result<String>; + + /// Ensures that the locally stored sync ID for this engine's collection + /// matches the `new_sync_id` from the server. If the two don't match, + /// implementations should reset all local Sync state, as in `reset`. + /// This method returns the assigned sync ID, which can be either the + /// `new_sync_id`, or a different one if the engine wants to force other + /// devices to reset their Sync state for this collection the next time they + /// sync. + fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String>; + + /// Tells the tabs engine about recent FxA devices. A bit of a leaky abstration as it only + /// makes sense for tabs. + /// The arg is a json serialized `ClientData` struct. + fn prepare_for_sync(&self, _client_data: &str) -> Result<()> { + Ok(()) + } + + /// Indicates that the engine is about to start syncing. This is called + /// once per sync, and always before `store_incoming`. + fn sync_started(&self) -> Result<()>; + + /// Stages a batch of incoming Sync records. This is called multiple + /// times per sync, once for each batch. Implementations can use the + /// signal to check if the operation was aborted, and cancel any + /// pending work. + fn store_incoming(&self, incoming_records: Vec<IncomingBso>) -> Result<()>; + + /// Applies all staged records, reconciling changes on both sides and + /// resolving conflicts. Returns a list of records to upload. + fn apply(&self) -> Result<ApplyResults>; + + /// Indicates that the given record IDs were uploaded successfully to the + /// server. This is called multiple times per sync, once for each batch + /// upload. + fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<()>; + + /// Indicates that all records have been uploaded. At this point, any record + /// IDs marked for upload that haven't been passed to `set_uploaded`, can be + /// assumed to have failed: for example, because the server rejected a record + /// with an invalid TTL or sort index. + fn sync_finished(&self) -> Result<()>; + + /// Resets all local Sync state, including any change flags, mirrors, and + /// the last sync time, such that the next sync is treated as a first sync + /// with all new local data. Does not erase any local user data. + fn reset(&self) -> Result<()>; + + /// Erases all local user data for this collection, and any Sync metadata. + /// This method is destructive, and unused for most collections. + fn wipe(&self) -> Result<()>; +} + +// TODO: We should replace this with OutgoingChangeset to reduce the number +// of types engines need to deal with. +#[derive(Debug, Default)] +pub struct ApplyResults { + /// List of records + pub records: Vec<OutgoingBso>, + /// The number of incoming records whose contents were merged because they + /// changed on both sides. None indicates we aren't reporting this + /// information. + pub num_reconciled: Option<usize>, +} + +impl ApplyResults { + pub fn new(records: Vec<OutgoingBso>, num_reconciled: impl Into<Option<usize>>) -> Self { + Self { + records, + num_reconciled: num_reconciled.into(), + } + } +} + +// Shorthand for engines that don't care. +impl From<Vec<OutgoingBso>> for ApplyResults { + fn from(records: Vec<OutgoingBso>) -> Self { + Self { + records, + num_reconciled: None, + } + } +} diff --git a/third_party/rust/sync15/src/engine/changeset.rs b/third_party/rust/sync15/src/engine/changeset.rs new file mode 100644 index 0000000000..c5a4e65fcd --- /dev/null +++ b/third_party/rust/sync15/src/engine/changeset.rs @@ -0,0 +1,52 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::bso::{IncomingBso, OutgoingBso}; +use crate::{CollectionName, ServerTimestamp}; + +// Incoming and Outgoing changesets are almost identical except for the timestamp. +// Separate types still helps avoid confusion with that timestamp, so they're split. +#[derive(Debug)] +pub struct IncomingChangeset { + pub changes: Vec<IncomingBso>, + /// The server timestamp of the collection. + pub timestamp: ServerTimestamp, + pub collection: CollectionName, +} + +impl IncomingChangeset { + #[inline] + pub fn new(collection: CollectionName, timestamp: ServerTimestamp) -> Self { + Self::new_with_changes(collection, timestamp, Vec::new()) + } + + #[inline] + pub fn new_with_changes( + collection: CollectionName, + timestamp: ServerTimestamp, + changes: Vec<IncomingBso>, + ) -> Self { + Self { + changes, + timestamp, + collection, + } + } +} + +#[derive(Debug)] +pub struct OutgoingChangeset { + pub changes: Vec<OutgoingBso>, + pub collection: CollectionName, +} + +impl OutgoingChangeset { + #[inline] + pub fn new(collection: CollectionName, changes: Vec<OutgoingBso>) -> Self { + Self { + collection, + changes, + } + } +} diff --git a/third_party/rust/sync15/src/engine/mod.rs b/third_party/rust/sync15/src/engine/mod.rs new file mode 100644 index 0000000000..427b779c9b --- /dev/null +++ b/third_party/rust/sync15/src/engine/mod.rs @@ -0,0 +1,38 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! This module is used by crates which need to implement a "sync engine". +//! At a high-level, a "sync engine" is code which knows how to take records +//! from a sync server, apply and reconcile them with the local data, then +//! provide records which should be uploaded to the server. +//! +//! Note that the "sync engine" does not itself talk to the server, nor does +//! it manage the state of the remote server, nor does it do any of the +//! encryption/decryption - that is the responsbility of the "sync client", as +//! implemented in the [client] module (or in some cases, implemented externally) +//! +//! There are currently 2 types of engine: +//! * Code which implements the [crate::engine::sync_engine::SyncEngine] +//! trait. These are the "original" Rust engines, designed to be used with +//! the [crate::client](sync client) +//! * Code which implements the [crate::engine::bridged_engine::BridgedEngine] +//! trait. These engines are a "bridge" between the Desktop JS Sync world and +//! this rust code. +//! While these engines end up doing the same thing, the difference is due to +//! implementation differences between the Desktop Sync client and the Rust +//! client. +//! We intend merging these engines - the first step will be to merge the +//! types and payload management used by these traits, then to combine the +//! requirements into a single trait that captures both use-cases. +mod bridged_engine; +mod changeset; +mod request; +mod sync_engine; + +pub use bridged_engine::{ApplyResults, BridgedEngine}; +pub use changeset::{IncomingChangeset, OutgoingChangeset}; +#[cfg(feature = "sync-client")] +pub(crate) use request::CollectionPost; +pub use request::{CollectionRequest, RequestOrder}; +pub use sync_engine::{CollSyncIds, EngineSyncAssociation, SyncEngine, SyncEngineId}; diff --git a/third_party/rust/sync15/src/engine/request.rs b/third_party/rust/sync15/src/engine/request.rs new file mode 100644 index 0000000000..7d634bb5e7 --- /dev/null +++ b/third_party/rust/sync15/src/engine/request.rs @@ -0,0 +1,125 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +use crate::{CollectionName, Guid, ServerTimestamp}; +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct CollectionRequest { + pub collection: CollectionName, + pub full: bool, + pub ids: Option<Vec<Guid>>, + + pub limit: Option<RequestLimit>, + pub older: Option<ServerTimestamp>, + pub newer: Option<ServerTimestamp>, +} + +impl CollectionRequest { + #[inline] + pub fn new(collection: CollectionName) -> CollectionRequest { + CollectionRequest { + collection, + ..Default::default() + } + } + + #[inline] + pub fn ids<V>(mut self, v: V) -> CollectionRequest + where + V: IntoIterator, + V::Item: Into<Guid>, + { + self.ids = Some(v.into_iter().map(|id| id.into()).collect()); + self + } + + #[inline] + pub fn full(mut self) -> CollectionRequest { + self.full = true; + self + } + + #[inline] + pub fn older_than(mut self, ts: ServerTimestamp) -> CollectionRequest { + self.older = Some(ts); + self + } + + #[inline] + pub fn newer_than(mut self, ts: ServerTimestamp) -> CollectionRequest { + self.newer = Some(ts); + self + } + + #[inline] + pub fn limit(mut self, num: usize, order: RequestOrder) -> CollectionRequest { + self.limit = Some(RequestLimit { num, order }); + self + } +} + +// This is just used interally - consumers just provide the content, not request params. +#[cfg(feature = "sync-client")] +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub(crate) struct CollectionPost { + pub collection: CollectionName, + pub commit: bool, + pub batch: Option<String>, +} + +#[cfg(feature = "sync-client")] +impl CollectionPost { + #[inline] + pub fn new(collection: CollectionName) -> Self { + Self { + collection, + ..Default::default() + } + } + + #[inline] + pub fn batch(mut self, batch: Option<String>) -> Self { + self.batch = batch; + self + } + + #[inline] + pub fn commit(mut self, v: bool) -> Self { + self.commit = v; + self + } +} + +// Asking for the order of records only makes sense if you are limiting them +// in some way - consumers don't care about the order otherwise as everything +// is processed as a set. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub enum RequestOrder { + Oldest, + Newest, + Index, +} + +impl RequestOrder { + #[inline] + pub fn as_str(self) -> &'static str { + match self { + RequestOrder::Oldest => "oldest", + RequestOrder::Newest => "newest", + RequestOrder::Index => "index", + } + } +} + +impl std::fmt::Display for RequestOrder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +// If you specify a numerical limit you must provide the order so backfilling +// is possible (ie, so you know which ones you got!) +#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct RequestLimit { + pub(crate) num: usize, + pub(crate) order: RequestOrder, +} diff --git a/third_party/rust/sync15/src/engine/sync_engine.rs b/third_party/rust/sync15/src/engine/sync_engine.rs new file mode 100644 index 0000000000..79b9b1a522 --- /dev/null +++ b/third_party/rust/sync15/src/engine/sync_engine.rs @@ -0,0 +1,235 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use super::{CollectionRequest, IncomingChangeset, OutgoingChangeset}; +use crate::client_types::ClientData; +use crate::{telemetry, CollectionName, Guid, ServerTimestamp}; +use anyhow::Result; +use std::fmt; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CollSyncIds { + pub global: Guid, + pub coll: Guid, +} + +/// Defines how an engine is associated with a particular set of records +/// on a sync storage server. It's either disconnected, or believes it is +/// connected with a specific set of GUIDs. If the server and the engine don't +/// agree on the exact GUIDs, the engine will assume something radical happened +/// so it can't believe anything it thinks it knows about the state of the +/// server (ie, it will "reset" then do a full reconcile) +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum EngineSyncAssociation { + /// This store is disconnected (although it may be connected in the future). + Disconnected, + /// Sync is connected, and has the following sync IDs. + Connected(CollSyncIds), +} + +/// The concrete `SyncEngine` implementations +#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub enum SyncEngineId { + // Note that we've derived PartialOrd etc, which uses lexicographic ordering + // of the variants. We leverage that such that the higher priority engines + // are listed first. + // This order matches desktop. + Passwords, + Tabs, + Bookmarks, + Addresses, + CreditCards, + History, +} + +impl SyncEngineId { + // Iterate over all possible engines. Note that we've made a policy decision + // that this should enumerate in "order" as defined by PartialCmp, and tests + // enforce this. + pub fn iter() -> impl Iterator<Item = SyncEngineId> { + [ + Self::Passwords, + Self::Tabs, + Self::Bookmarks, + Self::Addresses, + Self::CreditCards, + Self::History, + ] + .into_iter() + } + + // Get the string identifier for this engine. This must match the strings in SyncEngineSelection. + pub fn name(&self) -> &'static str { + match self { + Self::Passwords => "passwords", + Self::History => "history", + Self::Bookmarks => "bookmarks", + Self::Tabs => "tabs", + Self::Addresses => "addresses", + Self::CreditCards => "creditcards", + } + } +} + +impl fmt::Display for SyncEngineId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.name()) + } +} + +impl TryFrom<&str> for SyncEngineId { + type Error = String; + + fn try_from(value: &str) -> std::result::Result<Self, Self::Error> { + match value { + "passwords" => Ok(Self::Passwords), + "history" => Ok(Self::History), + "bookmarks" => Ok(Self::Bookmarks), + "tabs" => Ok(Self::Tabs), + "addresses" => Ok(Self::Addresses), + "creditcards" => Ok(Self::CreditCards), + _ => Err(value.into()), + } + } +} + +/// A "sync engine" is a thing that knows how to sync. It's often implemented +/// by a "store" (which is the generic term responsible for all storage +/// associated with a component, including storage required for sync.) +/// +/// Low-level engine functionality. Engines that need custom reconciliation +/// logic should use this. +/// +/// Different engines will produce errors of different types. To accommodate +/// this, we force them all to return anyhow::Error. +pub trait SyncEngine { + fn collection_name(&self) -> CollectionName; + + /// Prepares the engine for syncing. The tabs engine currently uses this to + /// store the current list of clients, which it uses to look up device names + /// and types. + /// + /// Note that this method is only called by `sync_multiple`, and only if a + /// command processor is registered. In particular, `prepare_for_sync` will + /// not be called if the store is synced using `sync::synchronize` or + /// `sync_multiple::sync_multiple`. It _will_ be called if the store is + /// synced via the Sync Manager. + /// + /// TODO(issue #2590): This is pretty cludgey and will be hard to extend for + /// any case other than the tabs case. We should find another way to support + /// tabs... + fn prepare_for_sync(&self, _get_client_data: &dyn Fn() -> ClientData) -> Result<()> { + Ok(()) + } + + /// Tells the engine what the local encryption key is for the data managed + /// by the engine. This is only used by collections that store data + /// encrypted locally and is unrelated to the encryption used by Sync. + /// The intent is that for such collections, this key can be used to + /// decrypt local data before it is re-encrypted by Sync and sent to the + /// storage servers, and similarly, data from the storage servers will be + /// decrypted by Sync, then encrypted by the local encryption key before + /// being added to the local database. + /// + /// The expectation is that the key value is being maintained by the + /// embedding application in some secure way suitable for the environment + /// in which the app is running - eg, the OS "keychain". The value of the + /// key is implementation dependent - it is expected that the engine and + /// embedding application already have some external agreement about how + /// to generate keys and in what form they are exchanged. Finally, there's + /// an assumption that sync engines are short-lived and only live for a + /// single sync - this means that sync doesn't hold on to the key for an + /// extended period. + /// + /// This will panic if called by an engine that doesn't have explicit + /// support for local encryption keys as that implies a degree of confusion + /// which shouldn't be possible to ignore. + fn set_local_encryption_key(&mut self, _key: &str) -> Result<()> { + unimplemented!("This engine does not support local encryption"); + } + + /// `inbound` is a vector to support the case where + /// `get_collection_requests` returned multiple requests. The changesets are + /// in the same order as the requests were -- e.g. if `vec![req_a, req_b]` + /// was returned from `get_collection_requests`, `inbound` will have the + /// results from `req_a` as its first index, and those from `req_b` as it's + /// second. + fn apply_incoming( + &self, + inbound: Vec<IncomingChangeset>, + telem: &mut telemetry::Engine, + ) -> Result<OutgoingChangeset>; + + fn sync_finished( + &self, + new_timestamp: ServerTimestamp, + records_synced: Vec<Guid>, + ) -> Result<()>; + + /// The engine is responsible for building collection requests. Engines + /// typically will store a lastModified timestamp and use that to build a + /// request saying "give me full records since that date" - however, other + /// engines might do something fancier. This could even later be extended to + /// handle "backfills" etc + /// + /// To support more advanced use cases, multiple requests can be returned + /// here - either from the same or different collections. The vast majority + /// of engines will just want to return zero or one item in their vector + /// (zero is a valid optimization when the server timestamp is the same as + /// the engine last saw, one when it is not) + /// + /// Important: In the case when more than one collection is requested, it's + /// assumed the last one is the "canonical" one. (That is, it must be for + /// "this" collection, its timestamp is used to represent the sync, etc). + /// (Note that multiple collection request support is currently unused, so + /// it might make sense to delete it - if we need it later, we may find a + /// better shape for our use-case) + fn get_collection_requests( + &self, + server_timestamp: ServerTimestamp, + ) -> Result<Vec<CollectionRequest>>; + + /// Get persisted sync IDs. If they don't match the global state we'll be + /// `reset()` with the new IDs. + fn get_sync_assoc(&self) -> Result<EngineSyncAssociation>; + + /// Reset the engine (and associated store) without wiping local data, + /// ready for a "first sync". + /// `assoc` defines how this store is to be associated with sync. + fn reset(&self, assoc: &EngineSyncAssociation) -> Result<()>; + + fn wipe(&self) -> Result<()>; +} + +#[cfg(test)] +mod test { + use super::*; + use std::iter::zip; + + #[test] + fn test_engine_priority() { + fn sorted(mut engines: Vec<SyncEngineId>) -> Vec<SyncEngineId> { + engines.sort(); + engines + } + assert_eq!( + vec![SyncEngineId::Passwords, SyncEngineId::Tabs], + sorted(vec![SyncEngineId::Passwords, SyncEngineId::Tabs]) + ); + assert_eq!( + vec![SyncEngineId::Passwords, SyncEngineId::Tabs], + sorted(vec![SyncEngineId::Tabs, SyncEngineId::Passwords]) + ); + } + + #[test] + fn test_engine_enum_order() { + let unsorted = SyncEngineId::iter().collect::<Vec<SyncEngineId>>(); + let mut sorted = SyncEngineId::iter().collect::<Vec<SyncEngineId>>(); + sorted.sort(); + + // iterating should supply identical elements in each. + assert!(zip(unsorted, sorted).fold(true, |acc, (a, b)| acc && (a == b))) + } +} |