1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use super::{
request::{NormalResponseHandler, UploadInfo},
CollState, Sync15ClientResponse, Sync15StorageClient,
};
use crate::bso::OutgoingEncryptedBso;
use crate::engine::{CollectionRequest, IncomingChangeset, OutgoingChangeset};
use crate::error::{self, Error, Result};
use crate::{CollectionName, KeyBundle, ServerTimestamp};
pub fn encrypt_outgoing(
o: OutgoingChangeset,
key: &KeyBundle,
) -> Result<Vec<OutgoingEncryptedBso>> {
o.changes
.into_iter()
.map(|change| change.into_encrypted(key))
.collect()
}
pub fn fetch_incoming(
client: &Sync15StorageClient,
state: &CollState,
collection_request: CollectionRequest,
) -> Result<IncomingChangeset> {
let collection = collection_request.collection.clone();
let (records, timestamp) = match client.get_encrypted_records(collection_request)? {
Sync15ClientResponse::Success {
record,
last_modified,
..
} => (record, last_modified),
other => return Err(other.create_storage_error()),
};
let mut result = IncomingChangeset::new(collection, timestamp);
result.changes.reserve(records.len());
for record in records {
// if we see a HMAC error, we've made an explicit decision to
// NOT handle it here, but restart the global state machine.
// That should cause us to re-read crypto/keys and things should
// work (although if for some reason crypto/keys was updated but
// not all storage was wiped we are probably screwed.)
result.changes.push(record.into_decrypted(&state.key)?);
}
Ok(result)
}
pub struct CollectionUpdate<'a> {
client: &'a Sync15StorageClient,
state: &'a CollState,
collection: CollectionName,
xius: ServerTimestamp,
to_update: Vec<OutgoingEncryptedBso>,
fully_atomic: bool,
}
impl<'a> CollectionUpdate<'a> {
pub fn new(
client: &'a Sync15StorageClient,
state: &'a CollState,
collection: CollectionName,
xius: ServerTimestamp,
records: Vec<OutgoingEncryptedBso>,
fully_atomic: bool,
) -> CollectionUpdate<'a> {
CollectionUpdate {
client,
state,
collection,
xius,
to_update: records,
fully_atomic,
}
}
pub fn new_from_changeset(
client: &'a Sync15StorageClient,
state: &'a CollState,
changeset: OutgoingChangeset,
fully_atomic: bool,
) -> Result<CollectionUpdate<'a>> {
let collection = changeset.collection.clone();
let to_update = encrypt_outgoing(changeset, &state.key)?;
Ok(CollectionUpdate::new(
client,
state,
collection,
state.last_modified,
to_update,
fully_atomic,
))
}
/// Returns a list of the IDs that failed if allowed_dropped_records is true, otherwise
/// returns an empty vec.
pub fn upload(self) -> error::Result<UploadInfo> {
let mut failed = vec![];
let mut q = self.client.new_post_queue(
&self.collection,
&self.state.config,
self.xius,
NormalResponseHandler::new(!self.fully_atomic),
)?;
for record in self.to_update.into_iter() {
let enqueued = q.enqueue(&record)?;
if !enqueued && self.fully_atomic {
return Err(Error::RecordTooLargeError);
}
}
q.flush(true)?;
let mut info = q.completed_upload_info();
info.failed_ids.append(&mut failed);
if self.fully_atomic {
assert_eq!(
info.failed_ids.len(),
0,
"Bug: Should have failed by now if we aren't allowing dropped records"
);
}
Ok(info)
}
}
|