1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use super::{
request::{NormalResponseHandler, UploadInfo},
CollState, Sync15ClientResponse, Sync15StorageClient,
};
use crate::bso::OutgoingEncryptedBso;
use crate::engine::{CollectionRequest, IncomingChangeset, OutgoingChangeset};
use crate::error::{self, Error, ErrorResponse, Result};
use crate::{KeyBundle, ServerTimestamp};
use std::borrow::Cow;
pub fn encrypt_outgoing(
o: OutgoingChangeset,
key: &KeyBundle,
) -> Result<Vec<OutgoingEncryptedBso>> {
o.changes
.into_iter()
.map(|change| change.into_encrypted(key))
.collect()
}
pub fn fetch_incoming(
client: &Sync15StorageClient,
state: &mut CollState,
collection_request: &CollectionRequest,
) -> Result<IncomingChangeset> {
let collection = collection_request.collection.clone();
let (records, timestamp) = match client.get_encrypted_records(collection_request)? {
Sync15ClientResponse::Success {
record,
last_modified,
..
} => (record, last_modified),
other => return Err(other.create_storage_error()),
};
// xxx - duplication below of `timestamp` smells wrong
state.last_modified = timestamp;
let mut result = IncomingChangeset::new(collection, timestamp);
result.changes.reserve(records.len());
for record in records {
// if we see a HMAC error, we've made an explicit decision to
// NOT handle it here, but restart the global state machine.
// That should cause us to re-read crypto/keys and things should
// work (although if for some reason crypto/keys was updated but
// not all storage was wiped we are probably screwed.)
result.changes.push(record.into_decrypted(&state.key)?);
}
Ok(result)
}
pub struct CollectionUpdate<'a> {
client: &'a Sync15StorageClient,
state: &'a CollState,
collection: Cow<'static, str>,
xius: ServerTimestamp,
to_update: Vec<OutgoingEncryptedBso>,
fully_atomic: bool,
}
impl<'a> CollectionUpdate<'a> {
pub fn new(
client: &'a Sync15StorageClient,
state: &'a CollState,
collection: Cow<'static, str>,
xius: ServerTimestamp,
records: Vec<OutgoingEncryptedBso>,
fully_atomic: bool,
) -> CollectionUpdate<'a> {
CollectionUpdate {
client,
state,
collection,
xius,
to_update: records,
fully_atomic,
}
}
pub fn new_from_changeset(
client: &'a Sync15StorageClient,
state: &'a CollState,
changeset: OutgoingChangeset,
fully_atomic: bool,
) -> Result<CollectionUpdate<'a>> {
let collection = changeset.collection.clone();
let xius = changeset.timestamp;
if xius < state.last_modified {
// We know we are going to fail the XIUS check...
return Err(Error::StorageHttpError(ErrorResponse::PreconditionFailed {
route: collection.into_owned(),
}));
}
let to_update = encrypt_outgoing(changeset, &state.key)?;
Ok(CollectionUpdate::new(
client,
state,
collection,
xius,
to_update,
fully_atomic,
))
}
/// Returns a list of the IDs that failed if allowed_dropped_records is true, otherwise
/// returns an empty vec.
pub fn upload(self) -> error::Result<UploadInfo> {
let mut failed = vec![];
let mut q = self.client.new_post_queue(
&self.collection,
&self.state.config,
self.xius,
NormalResponseHandler::new(!self.fully_atomic),
)?;
for record in self.to_update.into_iter() {
let enqueued = q.enqueue(&record)?;
if !enqueued && self.fully_atomic {
return Err(Error::RecordTooLargeError);
}
}
q.flush(true)?;
let mut info = q.completed_upload_info();
info.failed_ids.append(&mut failed);
if self.fully_atomic {
assert_eq!(
info.failed_ids.len(),
0,
"Bug: Should have failed by now if we aren't allowing dropped records"
);
}
Ok(info)
}
}
|