summaryrefslogtreecommitdiffstats
path: root/third_party/rust/sync15/src/client/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/sync15/src/client/sync.rs')
-rw-r--r--third_party/rust/sync15/src/client/sync.rs114
1 files changed, 114 insertions, 0 deletions
diff --git a/third_party/rust/sync15/src/client/sync.rs b/third_party/rust/sync15/src/client/sync.rs
new file mode 100644
index 0000000000..b25220032f
--- /dev/null
+++ b/third_party/rust/sync15/src/client/sync.rs
@@ -0,0 +1,114 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use super::{CollectionUpdate, GlobalState, LocalCollStateMachine, Sync15StorageClient};
+use crate::clients_engine;
+use crate::engine::SyncEngine;
+use crate::error::Error;
+use crate::telemetry;
+use crate::KeyBundle;
+use interrupt_support::Interruptee;
+
+#[allow(clippy::too_many_arguments)]
+pub fn synchronize_with_clients_engine(
+ client: &Sync15StorageClient,
+ global_state: &GlobalState,
+ root_sync_key: &KeyBundle,
+ clients: Option<&clients_engine::Engine<'_>>,
+ engine: &dyn SyncEngine,
+ fully_atomic: bool,
+ telem_engine: &mut telemetry::Engine,
+ interruptee: &dyn Interruptee,
+) -> Result<(), Error> {
+ let collection = engine.collection_name();
+ log::info!("Syncing collection {}", collection);
+
+ // our global state machine is ready - get the collection machine going.
+ let coll_state = match LocalCollStateMachine::get_state(engine, global_state, root_sync_key)? {
+ Some(coll_state) => coll_state,
+ None => {
+ // XXX - this is either "error" or "declined".
+ log::warn!(
+ "can't setup for the {} collection - hopefully it works later",
+ collection
+ );
+ return Ok(());
+ }
+ };
+
+ if let Some(clients) = clients {
+ engine.prepare_for_sync(&|| clients.get_client_data())?;
+ }
+ interruptee.err_if_interrupted()?;
+ // We assume an "engine" manages exactly one "collection" with the engine's name.
+ match engine.get_collection_request(coll_state.last_modified)? {
+ None => {
+ log::info!("skipping incoming for {} - not needed.", collection);
+ }
+ Some(collection_request) => {
+ // Ideally we would "batch" incoming records (eg, fetch just 1000 at a time)
+ // and ask the engine to "stage" them as they come in - but currently we just read
+ // them all in one request.
+
+ // Doing this batching will involve specifying a "limit=" param and
+ // "x-if-unmodified-since" for each request, looking for an
+ // "X-Weave-Next-Offset header in the response and using that in subsequent
+ // requests.
+ // See https://mozilla-services.readthedocs.io/en/latest/storage/apis-1.5.html#syncstorage-paging
+ //
+ // But even if we had that, we need to deal with a 412 response on a subsequent batch,
+ // so we can't know if we've staged *every* record for that timestamp; the next
+ // sync must use an earlier one.
+ //
+ // For this reason, an engine can't really trust a server timestamp until the
+ // very end when we know we've staged them all.
+ let incoming = super::fetch_incoming(client, &coll_state, collection_request)?;
+ log::info!("Downloaded {} remote changes", incoming.len());
+ engine.stage_incoming(incoming, telem_engine)?;
+ interruptee.err_if_interrupted()?;
+ }
+ };
+
+ // Should consider adding a new `fetch_outgoing()` and having `apply()` only apply.
+ // It *might* even make sense to only call `apply()` when something was staged,
+ // but that's not clear - see the discussion at
+ // https://github.com/mozilla/application-services/pull/5441/files/f36274f455a6299f10e7ce56b167882c369aa806#r1189267540
+ log::info!("Applying changes");
+ let outgoing = engine.apply(coll_state.last_modified, telem_engine)?;
+ interruptee.err_if_interrupted()?;
+
+ // XXX - this upload strategy is buggy due to batching. With enough records, we will commit
+ // 2 batches on the server. If the second fails, we get an Err<> here, so can't tell the
+ // engine about the successful server batch commit.
+ // Most stuff below should be called per-batch rather than at the successful end of all
+ // batches, but that's not trivial.
+ log::info!("Uploading {} outgoing changes", outgoing.len());
+ let upload_info = CollectionUpdate::new_from_changeset(
+ client,
+ &coll_state,
+ collection,
+ outgoing,
+ fully_atomic,
+ )?
+ .upload()?;
+ log::info!(
+ "Upload success ({} records success, {} records failed)",
+ upload_info.successful_ids.len(),
+ upload_info.failed_ids.len()
+ );
+
+ let mut telem_outgoing = telemetry::EngineOutgoing::new();
+ telem_outgoing.sent(upload_info.successful_ids.len() + upload_info.failed_ids.len());
+ telem_outgoing.failed(upload_info.failed_ids.len());
+ telem_engine.outgoing(telem_outgoing);
+
+ engine.set_uploaded(upload_info.modified_timestamp, upload_info.successful_ids)?;
+
+ // The above should all be per-batch :(
+
+ engine.sync_finished()?;
+
+ log::info!("Sync finished!");
+ Ok(())
+}