diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/sync15/src/client/sync_multiple.rs | |
parent | Initial commit. (diff) | |
download | firefox-upstream.tar.xz firefox-upstream.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/sync15/src/client/sync_multiple.rs')
-rw-r--r-- | third_party/rust/sync15/src/client/sync_multiple.rs | 493 |
1 files changed, 493 insertions, 0 deletions
diff --git a/third_party/rust/sync15/src/client/sync_multiple.rs b/third_party/rust/sync15/src/client/sync_multiple.rs new file mode 100644 index 0000000000..79ddceff3c --- /dev/null +++ b/third_party/rust/sync15/src/client/sync_multiple.rs @@ -0,0 +1,493 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// This helps you perform a sync of multiple engines and helps you manage +// global and local state between syncs. + +use super::state::{EngineChangesNeeded, GlobalState, PersistedGlobalState, SetupStateMachine}; +use super::status::{ServiceStatus, SyncResult}; +use super::storage_client::{BackoffListener, Sync15StorageClient, Sync15StorageClientInit}; +use crate::clients_engine::{self, CommandProcessor, CLIENTS_TTL_REFRESH}; +use crate::engine::{EngineSyncAssociation, SyncEngine}; +use crate::error::Error; +use crate::telemetry; +use crate::KeyBundle; +use interrupt_support::Interruptee; +use std::collections::HashMap; +use std::mem; +use std::result; +use std::time::{Duration, SystemTime}; + +/// Info about the client to use. We reuse the client unless +/// we discover the client_init has changed, in which case we re-create one. +#[derive(Debug)] +struct ClientInfo { + // the client_init used to create `client`. + client_init: Sync15StorageClientInit, + // the client (our tokenserver state machine state, and our http library's state) + client: Sync15StorageClient, +} + +impl ClientInfo { + fn new(ci: &Sync15StorageClientInit) -> Result<Self, Error> { + Ok(Self { + client_init: ci.clone(), + client: Sync15StorageClient::new(ci.clone())?, + }) + } +} + +/// Info we want callers to engine *in memory* for us so that subsequent +/// syncs are faster. This should never be persisted to storage as it holds +/// sensitive information, such as the sync decryption keys. +#[derive(Debug, Default)] +pub struct MemoryCachedState { + last_client_info: Option<ClientInfo>, + last_global_state: Option<GlobalState>, + // These are just engined in memory, as persisting an invalid value far in the + // future has the potential to break sync for good. + next_sync_after: Option<SystemTime>, + next_client_refresh_after: Option<SystemTime>, +} + +impl MemoryCachedState { + // Called we notice the cached state is stale. + pub fn clear_sensitive_info(&mut self) { + self.last_client_info = None; + self.last_global_state = None; + // Leave the backoff time, as there's no reason to think it's not still + // true. + } + pub fn get_next_sync_after(&self) -> Option<SystemTime> { + self.next_sync_after + } + pub fn should_refresh_client(&self) -> bool { + match self.next_client_refresh_after { + Some(t) => SystemTime::now() > t, + None => true, + } + } + pub fn note_client_refresh(&mut self) { + self.next_client_refresh_after = + Some(SystemTime::now() + Duration::from_secs(CLIENTS_TTL_REFRESH)); + } +} + +/// Sync multiple engines +/// * `engines` - The engines to sync +/// * `persisted_global_state` - The global state to use, or None if never +/// before provided. At the end of the sync, and even when the sync fails, +/// the value in this cell should be persisted to permanent storage and +/// provided next time the sync is called. +/// * `last_client_info` - The client state to use, or None if never before +/// provided. At the end of the sync, the value should be persisted +/// *in memory only* - it should not be persisted to disk. +/// * `storage_init` - Information about how the sync http client should be +/// configured. +/// * `root_sync_key` - The KeyBundle used for encryption. +/// +/// Returns a map, keyed by name and holding an error value - if any engine +/// fails, the sync will continue on to other engines, but the error will be +/// places in this map. The absence of a name in the map implies the engine +/// succeeded. +pub fn sync_multiple( + engines: &[&dyn SyncEngine], + persisted_global_state: &mut Option<String>, + mem_cached_state: &mut MemoryCachedState, + storage_init: &Sync15StorageClientInit, + root_sync_key: &KeyBundle, + interruptee: &dyn Interruptee, + req_info: Option<SyncRequestInfo<'_>>, +) -> SyncResult { + sync_multiple_with_command_processor( + None, + engines, + persisted_global_state, + mem_cached_state, + storage_init, + root_sync_key, + interruptee, + req_info, + ) +} + +/// Like `sync_multiple`, but specifies an optional command processor to handle +/// commands from the clients collection. This function is called by the sync +/// manager, which provides its own processor. +#[allow(clippy::too_many_arguments)] +pub fn sync_multiple_with_command_processor( + command_processor: Option<&dyn CommandProcessor>, + engines: &[&dyn SyncEngine], + persisted_global_state: &mut Option<String>, + mem_cached_state: &mut MemoryCachedState, + storage_init: &Sync15StorageClientInit, + root_sync_key: &KeyBundle, + interruptee: &dyn Interruptee, + req_info: Option<SyncRequestInfo<'_>>, +) -> SyncResult { + log::info!("Syncing {} engines", engines.len()); + let mut sync_result = SyncResult { + service_status: ServiceStatus::OtherError, + result: Ok(()), + declined: None, + next_sync_after: None, + engine_results: HashMap::with_capacity(engines.len()), + telemetry: telemetry::SyncTelemetryPing::new(), + }; + let backoff = super::storage_client::new_backoff_listener(); + let req_info = req_info.unwrap_or_default(); + let driver = SyncMultipleDriver { + command_processor, + engines, + storage_init, + interruptee, + engines_to_state_change: req_info.engines_to_state_change, + backoff: backoff.clone(), + root_sync_key, + result: &mut sync_result, + persisted_global_state, + mem_cached_state, + saw_auth_error: false, + ignore_soft_backoff: req_info.is_user_action, + }; + match driver.sync() { + Ok(()) => { + log::debug!( + "sync was successful, final status={:?}", + sync_result.service_status + ); + } + Err(e) => { + log::warn!( + "sync failed: {}, final status={:?}", + e, + sync_result.service_status, + ); + sync_result.result = Err(e); + } + } + // Respect `backoff` value when computing the next sync time even if we were + // ignoring it during the sync + sync_result.set_sync_after(backoff.get_required_wait(false).unwrap_or_default()); + mem_cached_state.next_sync_after = sync_result.next_sync_after; + log::trace!("Sync result: {:?}", sync_result); + sync_result +} + +/// This is essentially a bag of information that the sync manager knows, but +/// otherwise we won't. It should probably be rethought if it gains many more +/// fields. +#[derive(Debug, Default)] +pub struct SyncRequestInfo<'a> { + pub engines_to_state_change: Option<&'a HashMap<String, bool>>, + pub is_user_action: bool, +} + +// The sync multiple driver +struct SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> { + command_processor: Option<&'info dyn CommandProcessor>, + engines: &'info [&'info dyn SyncEngine], + storage_init: &'info Sync15StorageClientInit, + root_sync_key: &'info KeyBundle, + interruptee: &'info dyn Interruptee, + backoff: BackoffListener, + engines_to_state_change: Option<&'info HashMap<String, bool>>, + result: &'res mut SyncResult, + persisted_global_state: &'pgs mut Option<String>, + mem_cached_state: &'mcs mut MemoryCachedState, + ignore_soft_backoff: bool, + saw_auth_error: bool, +} + +impl<'info, 'res, 'pgs, 'mcs> SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> { + /// The actual worker for sync_multiple. + fn sync(mut self) -> result::Result<(), Error> { + log::info!("Loading/initializing persisted state"); + let mut pgs = self.prepare_persisted_state(); + + log::info!("Preparing client info"); + let client_info = self.prepare_client_info()?; + + if self.was_interrupted() { + return Ok(()); + } + + log::info!("Entering sync state machine"); + // Advance the state machine to the point where it can perform a full + // sync. This may involve uploading meta/global, crypto/keys etc. + let mut global_state = self.run_state_machine(&client_info, &mut pgs)?; + + if self.was_interrupted() { + return Ok(()); + } + + // Set the service status to OK here - we may adjust it based on an individual + // engine failing. + self.result.service_status = ServiceStatus::Ok; + + let clients_engine = if let Some(command_processor) = self.command_processor { + log::info!("Synchronizing clients engine"); + let should_refresh = self.mem_cached_state.should_refresh_client(); + let mut engine = clients_engine::Engine::new(command_processor, self.interruptee); + if let Err(e) = engine.sync( + &client_info.client, + &global_state, + self.root_sync_key, + should_refresh, + ) { + // Record telemetry with the error just in case... + let mut telem_sync = telemetry::SyncTelemetry::new(); + let mut telem_engine = telemetry::Engine::new("clients"); + telem_engine.failure(&e); + telem_sync.engine(telem_engine); + self.result.service_status = ServiceStatus::from_err(&e); + + // ...And bail, because a clients engine sync failure is fatal. + return Err(e); + } + // We don't record telemetry for successful clients engine + // syncs, since we only keep client records in memory, we + // expect the counts to be the same most times, and a + // failure aborts the entire sync. + if self.was_interrupted() { + return Ok(()); + } + self.mem_cached_state.note_client_refresh(); + Some(engine) + } else { + None + }; + + log::info!("Synchronizing engines"); + + let telem_sync = + self.sync_engines(&client_info, &mut global_state, clients_engine.as_ref()); + self.result.telemetry.sync(telem_sync); + + log::info!("Finished syncing engines."); + + if !self.saw_auth_error { + log::trace!("Updating persisted global state"); + self.mem_cached_state.last_client_info = Some(client_info); + self.mem_cached_state.last_global_state = Some(global_state); + } + + Ok(()) + } + + fn was_interrupted(&mut self) -> bool { + if self.interruptee.was_interrupted() { + log::info!("Interrupted, bailing out"); + self.result.service_status = ServiceStatus::Interrupted; + true + } else { + false + } + } + + fn sync_engines( + &mut self, + client_info: &ClientInfo, + global_state: &mut GlobalState, + clients: Option<&clients_engine::Engine<'_>>, + ) -> telemetry::SyncTelemetry { + let mut telem_sync = telemetry::SyncTelemetry::new(); + for engine in self.engines { + let name = engine.collection_name(); + if self + .backoff + .get_required_wait(self.ignore_soft_backoff) + .is_some() + { + log::warn!("Got backoff, bailing out of sync early"); + break; + } + if global_state.global.declined.iter().any(|e| e == &*name) { + log::info!("The {} engine is declined. Skipping", name); + continue; + } + log::info!("Syncing {} engine!", name); + + let mut telem_engine = telemetry::Engine::new(&*name); + let result = super::sync::synchronize_with_clients_engine( + &client_info.client, + global_state, + self.root_sync_key, + clients, + *engine, + true, + &mut telem_engine, + self.interruptee, + ); + + match result { + Ok(()) => log::info!("Sync of {} was successful!", name), + Err(ref e) => { + log::warn!("Sync of {} failed! {:?}", name, e); + let this_status = ServiceStatus::from_err(e); + // The only error which forces us to discard our state is an + // auth error. + self.saw_auth_error = + self.saw_auth_error || this_status == ServiceStatus::AuthenticationError; + telem_engine.failure(e); + // If the failure from the engine looks like anything other than + // a "engine error" we don't bother trying the others. + if this_status != ServiceStatus::OtherError { + telem_sync.engine(telem_engine); + self.result.engine_results.insert(name.into(), result); + self.result.service_status = this_status; + break; + } + } + } + telem_sync.engine(telem_engine); + self.result.engine_results.insert(name.into(), result); + if self.was_interrupted() { + break; + } + } + telem_sync + } + + fn run_state_machine( + &mut self, + client_info: &ClientInfo, + pgs: &mut PersistedGlobalState, + ) -> result::Result<GlobalState, Error> { + let last_state = mem::replace(&mut self.mem_cached_state.last_global_state, None); + + let mut state_machine = SetupStateMachine::for_full_sync( + &client_info.client, + self.root_sync_key, + pgs, + self.engines_to_state_change, + self.interruptee, + ); + + log::info!("Advancing state machine to ready (full)"); + let res = state_machine.run_to_ready(last_state); + // Grab this now even though we don't need it until later to avoid a + // lifetime issue + let changes = state_machine.changes_needed.take(); + // The state machine might have updated our persisted_global_state, so + // update the caller's repr of it. + *self.persisted_global_state = Some(serde_json::to_string(&pgs)?); + + // Now that we've gone through the state machine, engine the declined list in + // the sync_result + self.result.declined = Some(pgs.get_declined().to_vec()); + log::debug!( + "Declined engines list after state machine set to: {:?}", + self.result.declined, + ); + + if let Some(c) = changes { + self.wipe_or_reset_engines(c, &client_info.client)?; + } + let state = match res { + Err(e) => { + self.result.service_status = ServiceStatus::from_err(&e); + return Err(e); + } + Ok(state) => state, + }; + self.result.telemetry.uid(client_info.client.hashed_uid()?); + // As for client_info, put None back now so we start from scratch on error. + self.mem_cached_state.last_global_state = None; + Ok(state) + } + + fn wipe_or_reset_engines( + &mut self, + changes: EngineChangesNeeded, + client: &Sync15StorageClient, + ) -> result::Result<(), Error> { + if changes.local_resets.is_empty() && changes.remote_wipes.is_empty() { + return Ok(()); + } + for e in &changes.remote_wipes { + log::info!("Engine {:?} just got disabled locally, wiping server", e); + client.wipe_remote_engine(e)?; + } + + for s in self.engines { + let name = s.collection_name(); + if changes.local_resets.contains(&*name) { + log::info!("Resetting engine {}, as it was declined remotely", name); + s.reset(&EngineSyncAssociation::Disconnected)?; + } + } + + Ok(()) + } + + fn prepare_client_info(&mut self) -> result::Result<ClientInfo, Error> { + let mut client_info = match mem::replace(&mut self.mem_cached_state.last_client_info, None) + { + Some(client_info) => { + // if our storage_init has changed it probably means the user has + // changed, courtesy of the 'kid' in the structure. Thus, we can't + // reuse the client or the memory cached state. We do keep the disk + // state as currently that's only the declined list. + if client_info.client_init != *self.storage_init { + log::info!("Discarding all state as the account might have changed"); + *self.mem_cached_state = MemoryCachedState::default(); + ClientInfo::new(self.storage_init)? + } else { + log::debug!("Reusing memory-cached client_info"); + // we can reuse it (which should be the common path) + client_info + } + } + None => { + log::debug!("mem_cached_state was stale or missing, need setup"); + // We almost certainly have no other state here, but to be safe, we + // throw away any memory state we do have. + self.mem_cached_state.clear_sensitive_info(); + ClientInfo::new(self.storage_init)? + } + }; + // Ensure we use the correct listener here rather than on all the branches + // above, since it seems less error prone. + client_info.client.backoff = self.backoff.clone(); + Ok(client_info) + } + + fn prepare_persisted_state(&mut self) -> PersistedGlobalState { + // Note that any failure to use a persisted state means we also decline + // to use our memory cached state, so that we fully rebuild that + // persisted state for next time. + match self.persisted_global_state { + Some(persisted_string) if !persisted_string.is_empty() => { + match serde_json::from_str::<PersistedGlobalState>(persisted_string) { + Ok(state) => { + log::trace!("Read persisted state: {:?}", state); + // Note that we don't set `result.declined` from the + // data in state - it remains None, which explicitly + // indicates "we don't have updated info". + state + } + _ => { + // Don't log the error since it might contain sensitive + // info (although currently it only contains the declined engines list) + error_support::report_error!( + "sync15-prepare-persisted-state", + "Failed to parse PersistedGlobalState from JSON! Falling back to default" + ); + *self.mem_cached_state = MemoryCachedState::default(); + PersistedGlobalState::default() + } + } + } + _ => { + log::info!( + "The application didn't give us persisted state - \ + this is only expected on the very first run for a given user." + ); + *self.mem_cached_state = MemoryCachedState::default(); + PersistedGlobalState::default() + } + } + } +} |