summaryrefslogtreecommitdiffstats
path: root/third_party/rust/sync15/src/client/coll_update.rs
blob: d09b1c2eb43324b8f3d4662d620f2e51c1e373fa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use super::{
    request::{NormalResponseHandler, UploadInfo},
    CollState, Sync15ClientResponse, Sync15StorageClient,
};
use crate::bso::OutgoingEncryptedBso;
use crate::engine::{CollectionRequest, IncomingChangeset, OutgoingChangeset};
use crate::error::{self, Error, Result};
use crate::{CollectionName, KeyBundle, ServerTimestamp};

pub fn encrypt_outgoing(
    o: OutgoingChangeset,
    key: &KeyBundle,
) -> Result<Vec<OutgoingEncryptedBso>> {
    o.changes
        .into_iter()
        .map(|change| change.into_encrypted(key))
        .collect()
}

pub fn fetch_incoming(
    client: &Sync15StorageClient,
    state: &CollState,
    collection_request: CollectionRequest,
) -> Result<IncomingChangeset> {
    let collection = collection_request.collection.clone();
    let (records, timestamp) = match client.get_encrypted_records(collection_request)? {
        Sync15ClientResponse::Success {
            record,
            last_modified,
            ..
        } => (record, last_modified),
        other => return Err(other.create_storage_error()),
    };
    let mut result = IncomingChangeset::new(collection, timestamp);
    result.changes.reserve(records.len());
    for record in records {
        // if we see a HMAC error, we've made an explicit decision to
        // NOT handle it here, but restart the global state machine.
        // That should cause us to re-read crypto/keys and things should
        // work (although if for some reason crypto/keys was updated but
        // not all storage was wiped we are probably screwed.)
        result.changes.push(record.into_decrypted(&state.key)?);
    }
    Ok(result)
}

pub struct CollectionUpdate<'a> {
    client: &'a Sync15StorageClient,
    state: &'a CollState,
    collection: CollectionName,
    xius: ServerTimestamp,
    to_update: Vec<OutgoingEncryptedBso>,
    fully_atomic: bool,
}

impl<'a> CollectionUpdate<'a> {
    pub fn new(
        client: &'a Sync15StorageClient,
        state: &'a CollState,
        collection: CollectionName,
        xius: ServerTimestamp,
        records: Vec<OutgoingEncryptedBso>,
        fully_atomic: bool,
    ) -> CollectionUpdate<'a> {
        CollectionUpdate {
            client,
            state,
            collection,
            xius,
            to_update: records,
            fully_atomic,
        }
    }

    pub fn new_from_changeset(
        client: &'a Sync15StorageClient,
        state: &'a CollState,
        changeset: OutgoingChangeset,
        fully_atomic: bool,
    ) -> Result<CollectionUpdate<'a>> {
        let collection = changeset.collection.clone();
        let to_update = encrypt_outgoing(changeset, &state.key)?;
        Ok(CollectionUpdate::new(
            client,
            state,
            collection,
            state.last_modified,
            to_update,
            fully_atomic,
        ))
    }

    /// Returns a list of the IDs that failed if allowed_dropped_records is true, otherwise
    /// returns an empty vec.
    pub fn upload(self) -> error::Result<UploadInfo> {
        let mut failed = vec![];
        let mut q = self.client.new_post_queue(
            &self.collection,
            &self.state.config,
            self.xius,
            NormalResponseHandler::new(!self.fully_atomic),
        )?;

        for record in self.to_update.into_iter() {
            let enqueued = q.enqueue(&record)?;
            if !enqueued && self.fully_atomic {
                return Err(Error::RecordTooLargeError);
            }
        }

        q.flush(true)?;
        let mut info = q.completed_upload_info();
        info.failed_ids.append(&mut failed);
        if self.fully_atomic {
            assert_eq!(
                info.failed_ids.len(),
                0,
                "Bug: Should have failed by now if we aren't allowing dropped records"
            );
        }
        Ok(info)
    }
}