diff options
Diffstat (limited to 'third_party/rust/sync15/src/sync.rs')
-rw-r--r-- | third_party/rust/sync15/src/sync.rs | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/third_party/rust/sync15/src/sync.rs b/third_party/rust/sync15/src/sync.rs new file mode 100644 index 0000000000..a4030b5440 --- /dev/null +++ b/third_party/rust/sync15/src/sync.rs @@ -0,0 +1,127 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::changeset::CollectionUpdate; +use crate::client::Sync15StorageClient; +use crate::clients; +use crate::coll_state::LocalCollStateMachine; +use crate::error::Error; +use crate::key_bundle::KeyBundle; +use crate::state::GlobalState; +use crate::telemetry; +use interrupt_support::Interruptee; + +pub use sync15_traits::{IncomingChangeset, Store}; + +pub fn synchronize( + client: &Sync15StorageClient, + global_state: &GlobalState, + root_sync_key: &KeyBundle, + store: &dyn Store, + fully_atomic: bool, + telem_engine: &mut telemetry::Engine, + interruptee: &dyn Interruptee, +) -> Result<(), crate::Error> { + synchronize_with_clients_engine( + client, + global_state, + root_sync_key, + None, + store, + fully_atomic, + telem_engine, + interruptee, + ) +} + +#[allow(clippy::too_many_arguments)] +pub fn synchronize_with_clients_engine( + client: &Sync15StorageClient, + global_state: &GlobalState, + root_sync_key: &KeyBundle, + clients: Option<&clients::Engine<'_>>, + store: &dyn Store, + fully_atomic: bool, + telem_engine: &mut telemetry::Engine, + interruptee: &dyn Interruptee, +) -> Result<(), Error> { + let collection = store.collection_name(); + log::info!("Syncing collection {}", collection); + + // our global state machine is ready - get the collection machine going. + let mut coll_state = match LocalCollStateMachine::get_state(store, global_state, root_sync_key)? + { + Some(coll_state) => coll_state, + None => { + // XXX - this is either "error" or "declined". + log::warn!( + "can't setup for the {} collection - hopefully it works later", + collection + ); + return Ok(()); + } + }; + + if let Some(clients) = clients { + store.prepare_for_sync(&|| clients.get_client_data())?; + } + + let collection_requests = store.get_collection_requests(coll_state.last_modified)?; + let incoming = if collection_requests.is_empty() { + log::info!("skipping incoming for {} - not needed.", collection); + vec![IncomingChangeset::new(collection, coll_state.last_modified)] + } else { + assert_eq!(collection_requests.last().unwrap().collection, collection); + + let count = collection_requests.len(); + collection_requests + .into_iter() + .enumerate() + .map(|(idx, collection_request)| { + interruptee.err_if_interrupted()?; + let incoming_changes = + crate::changeset::fetch_incoming(client, &mut coll_state, &collection_request)?; + + log::info!( + "Downloaded {} remote changes (request {} of {})", + incoming_changes.changes.len(), + idx, + count, + ); + Ok(incoming_changes) + }) + .collect::<Result<Vec<_>, Error>>()? + }; + + let new_timestamp = incoming.last().expect("must have >= 1").timestamp; + let mut outgoing = store.apply_incoming(incoming, telem_engine)?; + + interruptee.err_if_interrupted()?; + // Bump the timestamps now just incase the upload fails. + // xxx - duplication below smells wrong + outgoing.timestamp = new_timestamp; + coll_state.last_modified = new_timestamp; + + log::info!("Uploading {} outgoing changes", outgoing.changes.len()); + let upload_info = + CollectionUpdate::new_from_changeset(client, &coll_state, outgoing, fully_atomic)? + .upload()?; + + log::info!( + "Upload success ({} records success, {} records failed)", + upload_info.successful_ids.len(), + upload_info.failed_ids.len() + ); + // ideally we'd report this per-batch, but for now, let's just report it + // as a total. + let mut telem_outgoing = telemetry::EngineOutgoing::new(); + telem_outgoing.sent(upload_info.successful_ids.len() + upload_info.failed_ids.len()); + telem_outgoing.failed(upload_info.failed_ids.len()); + telem_engine.outgoing(telem_outgoing); + + store.sync_finished(upload_info.modified_timestamp, upload_info.successful_ids)?; + + log::info!("Sync finished!"); + Ok(()) +} |