/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use super::{ request::{NormalResponseHandler, UploadInfo}, CollState, Sync15ClientResponse, Sync15StorageClient, }; use crate::bso::OutgoingEncryptedBso; use crate::engine::{CollectionRequest, IncomingChangeset, OutgoingChangeset}; use crate::error::{self, Error, Result}; use crate::{CollectionName, KeyBundle, ServerTimestamp}; pub fn encrypt_outgoing( o: OutgoingChangeset, key: &KeyBundle, ) -> Result> { o.changes .into_iter() .map(|change| change.into_encrypted(key)) .collect() } pub fn fetch_incoming( client: &Sync15StorageClient, state: &CollState, collection_request: CollectionRequest, ) -> Result { let collection = collection_request.collection.clone(); let (records, timestamp) = match client.get_encrypted_records(collection_request)? { Sync15ClientResponse::Success { record, last_modified, .. } => (record, last_modified), other => return Err(other.create_storage_error()), }; let mut result = IncomingChangeset::new(collection, timestamp); result.changes.reserve(records.len()); for record in records { // if we see a HMAC error, we've made an explicit decision to // NOT handle it here, but restart the global state machine. // That should cause us to re-read crypto/keys and things should // work (although if for some reason crypto/keys was updated but // not all storage was wiped we are probably screwed.) result.changes.push(record.into_decrypted(&state.key)?); } Ok(result) } pub struct CollectionUpdate<'a> { client: &'a Sync15StorageClient, state: &'a CollState, collection: CollectionName, xius: ServerTimestamp, to_update: Vec, fully_atomic: bool, } impl<'a> CollectionUpdate<'a> { pub fn new( client: &'a Sync15StorageClient, state: &'a CollState, collection: CollectionName, xius: ServerTimestamp, records: Vec, fully_atomic: bool, ) -> CollectionUpdate<'a> { CollectionUpdate { client, state, collection, xius, to_update: records, fully_atomic, } } pub fn new_from_changeset( client: &'a Sync15StorageClient, state: &'a CollState, changeset: OutgoingChangeset, fully_atomic: bool, ) -> Result> { let collection = changeset.collection.clone(); let to_update = encrypt_outgoing(changeset, &state.key)?; Ok(CollectionUpdate::new( client, state, collection, state.last_modified, to_update, fully_atomic, )) } /// Returns a list of the IDs that failed if allowed_dropped_records is true, otherwise /// returns an empty vec. pub fn upload(self) -> error::Result { let mut failed = vec![]; let mut q = self.client.new_post_queue( &self.collection, &self.state.config, self.xius, NormalResponseHandler::new(!self.fully_atomic), )?; for record in self.to_update.into_iter() { let enqueued = q.enqueue(&record)?; if !enqueued && self.fully_atomic { return Err(Error::RecordTooLargeError); } } q.flush(true)?; let mut info = q.completed_upload_info(); info.failed_ids.append(&mut failed); if self.fully_atomic { assert_eq!( info.failed_ids.len(), 0, "Bug: Should have failed by now if we aren't allowing dropped records" ); } Ok(info) } }