diff options
Diffstat (limited to 'third_party/rust/sync15/src/client/coll_update.rs')
-rw-r--r-- | third_party/rust/sync15/src/client/coll_update.rs | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/third_party/rust/sync15/src/client/coll_update.rs b/third_party/rust/sync15/src/client/coll_update.rs new file mode 100644 index 0000000000..baa551c9a4 --- /dev/null +++ b/third_party/rust/sync15/src/client/coll_update.rs @@ -0,0 +1,137 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use super::{ + request::{NormalResponseHandler, UploadInfo}, + CollState, Sync15ClientResponse, Sync15StorageClient, +}; +use crate::bso::OutgoingEncryptedBso; +use crate::engine::{CollectionRequest, IncomingChangeset, OutgoingChangeset}; +use crate::error::{self, Error, ErrorResponse, Result}; +use crate::{KeyBundle, ServerTimestamp}; +use std::borrow::Cow; + +pub fn encrypt_outgoing( + o: OutgoingChangeset, + key: &KeyBundle, +) -> Result<Vec<OutgoingEncryptedBso>> { + o.changes + .into_iter() + .map(|change| change.into_encrypted(key)) + .collect() +} + +pub fn fetch_incoming( + client: &Sync15StorageClient, + state: &mut CollState, + collection_request: &CollectionRequest, +) -> Result<IncomingChangeset> { + let collection = collection_request.collection.clone(); + let (records, timestamp) = match client.get_encrypted_records(collection_request)? { + Sync15ClientResponse::Success { + record, + last_modified, + .. + } => (record, last_modified), + other => return Err(other.create_storage_error()), + }; + // xxx - duplication below of `timestamp` smells wrong + state.last_modified = timestamp; + let mut result = IncomingChangeset::new(collection, timestamp); + result.changes.reserve(records.len()); + for record in records { + // if we see a HMAC error, we've made an explicit decision to + // NOT handle it here, but restart the global state machine. + // That should cause us to re-read crypto/keys and things should + // work (although if for some reason crypto/keys was updated but + // not all storage was wiped we are probably screwed.) + result.changes.push(record.into_decrypted(&state.key)?); + } + Ok(result) +} + +pub struct CollectionUpdate<'a> { + client: &'a Sync15StorageClient, + state: &'a CollState, + collection: Cow<'static, str>, + xius: ServerTimestamp, + to_update: Vec<OutgoingEncryptedBso>, + fully_atomic: bool, +} + +impl<'a> CollectionUpdate<'a> { + pub fn new( + client: &'a Sync15StorageClient, + state: &'a CollState, + collection: Cow<'static, str>, + xius: ServerTimestamp, + records: Vec<OutgoingEncryptedBso>, + fully_atomic: bool, + ) -> CollectionUpdate<'a> { + CollectionUpdate { + client, + state, + collection, + xius, + to_update: records, + fully_atomic, + } + } + + pub fn new_from_changeset( + client: &'a Sync15StorageClient, + state: &'a CollState, + changeset: OutgoingChangeset, + fully_atomic: bool, + ) -> Result<CollectionUpdate<'a>> { + let collection = changeset.collection.clone(); + let xius = changeset.timestamp; + if xius < state.last_modified { + // We know we are going to fail the XIUS check... + return Err(Error::StorageHttpError(ErrorResponse::PreconditionFailed { + route: collection.into_owned(), + })); + } + let to_update = encrypt_outgoing(changeset, &state.key)?; + Ok(CollectionUpdate::new( + client, + state, + collection, + xius, + to_update, + fully_atomic, + )) + } + + /// Returns a list of the IDs that failed if allowed_dropped_records is true, otherwise + /// returns an empty vec. + pub fn upload(self) -> error::Result<UploadInfo> { + let mut failed = vec![]; + let mut q = self.client.new_post_queue( + &self.collection, + &self.state.config, + self.xius, + NormalResponseHandler::new(!self.fully_atomic), + )?; + + for record in self.to_update.into_iter() { + let enqueued = q.enqueue(&record)?; + if !enqueued && self.fully_atomic { + return Err(Error::RecordTooLargeError); + } + } + + q.flush(true)?; + let mut info = q.completed_upload_info(); + info.failed_ids.append(&mut failed); + if self.fully_atomic { + assert_eq!( + info.failed_ids.len(), + 0, + "Bug: Should have failed by now if we aren't allowing dropped records" + ); + } + Ok(info) + } +} |