summaryrefslogtreecommitdiffstats
path: root/third_party/rust/sync15/src/client/request.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/sync15/src/client/request.rs')
-rw-r--r--third_party/rust/sync15/src/client/request.rs1199
1 files changed, 1199 insertions, 0 deletions
diff --git a/third_party/rust/sync15/src/client/request.rs b/third_party/rust/sync15/src/client/request.rs
new file mode 100644
index 0000000000..c69b630c8d
--- /dev/null
+++ b/third_party/rust/sync15/src/client/request.rs
@@ -0,0 +1,1199 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+use super::storage_client::Sync15ClientResponse;
+use crate::bso::OutgoingEncryptedBso;
+use crate::error::{self, Error as ErrorKind, Result};
+use crate::ServerTimestamp;
+use serde_derive::*;
+use std::collections::HashMap;
+use std::default::Default;
+use std::ops::Deref;
+use sync_guid::Guid;
+use viaduct::status_codes;
+
+/// Manages a pair of (byte, count) limits for a PostQueue, such as
+/// (max_post_bytes, max_post_records) or (max_total_bytes, max_total_records).
+#[derive(Debug, Clone)]
+struct LimitTracker {
+ max_bytes: usize,
+ max_records: usize,
+ cur_bytes: usize,
+ cur_records: usize,
+}
+
+impl LimitTracker {
+ pub fn new(max_bytes: usize, max_records: usize) -> LimitTracker {
+ LimitTracker {
+ max_bytes,
+ max_records,
+ cur_bytes: 0,
+ cur_records: 0,
+ }
+ }
+
+ pub fn clear(&mut self) {
+ self.cur_records = 0;
+ self.cur_bytes = 0;
+ }
+
+ pub fn can_add_record(&self, payload_size: usize) -> bool {
+ // Desktop does the cur_bytes check as exclusive, but we shouldn't see any servers that
+ // don't have https://github.com/mozilla-services/server-syncstorage/issues/73
+ self.cur_records < self.max_records && self.cur_bytes + payload_size <= self.max_bytes
+ }
+
+ pub fn can_never_add(&self, record_size: usize) -> bool {
+ record_size >= self.max_bytes
+ }
+
+ pub fn record_added(&mut self, record_size: usize) {
+ assert!(
+ self.can_add_record(record_size),
+ "LimitTracker::record_added caller must check can_add_record"
+ );
+ self.cur_records += 1;
+ self.cur_bytes += record_size;
+ }
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct InfoConfiguration {
+ /// The maximum size in bytes of the overall HTTP request body that will be accepted by the
+ /// server.
+ #[serde(default = "default_max_request_bytes")]
+ pub max_request_bytes: usize,
+
+ /// The maximum number of records that can be uploaded to a collection in a single POST request.
+ #[serde(default = "usize::max_value")]
+ pub max_post_records: usize,
+
+ /// The maximum combined size in bytes of the record payloads that can be uploaded to a
+ /// collection in a single POST request.
+ #[serde(default = "usize::max_value")]
+ pub max_post_bytes: usize,
+
+ /// The maximum total number of records that can be uploaded to a collection as part of a
+ /// batched upload.
+ #[serde(default = "usize::max_value")]
+ pub max_total_records: usize,
+
+ /// The maximum total combined size in bytes of the record payloads that can be uploaded to a
+ /// collection as part of a batched upload.
+ #[serde(default = "usize::max_value")]
+ pub max_total_bytes: usize,
+
+ /// The maximum size of an individual BSO payload, in bytes.
+ #[serde(default = "default_max_record_payload_bytes")]
+ pub max_record_payload_bytes: usize,
+}
+
+// This is annoying but seems to be the only way to do it...
+fn default_max_request_bytes() -> usize {
+ 260 * 1024
+}
+fn default_max_record_payload_bytes() -> usize {
+ 256 * 1024
+}
+
+impl Default for InfoConfiguration {
+ #[inline]
+ fn default() -> InfoConfiguration {
+ InfoConfiguration {
+ max_request_bytes: default_max_request_bytes(),
+ max_record_payload_bytes: default_max_record_payload_bytes(),
+ max_post_records: usize::max_value(),
+ max_post_bytes: usize::max_value(),
+ max_total_records: usize::max_value(),
+ max_total_bytes: usize::max_value(),
+ }
+ }
+}
+
+#[derive(Clone, Debug, Default, Deserialize, Serialize)]
+pub struct InfoCollections(pub(crate) HashMap<String, ServerTimestamp>);
+
+impl InfoCollections {
+ pub fn new(collections: HashMap<String, ServerTimestamp>) -> InfoCollections {
+ InfoCollections(collections)
+ }
+}
+
+impl Deref for InfoCollections {
+ type Target = HashMap<String, ServerTimestamp>;
+
+ fn deref(&self) -> &HashMap<String, ServerTimestamp> {
+ &self.0
+ }
+}
+
+#[derive(Debug, Clone, Deserialize)]
+pub struct UploadResult {
+ batch: Option<String>,
+ /// Maps record id => why failed
+ #[serde(default = "HashMap::new")]
+ pub failed: HashMap<Guid, String>,
+ /// Vec of ids
+ #[serde(default = "Vec::new")]
+ pub success: Vec<Guid>,
+}
+
+pub type PostResponse = Sync15ClientResponse<UploadResult>;
+
+#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub enum BatchState {
+ Unsupported,
+ NoBatch,
+ InBatch(String),
+}
+
+#[derive(Debug)]
+pub struct PostQueue<Post, OnResponse> {
+ poster: Post,
+ on_response: OnResponse,
+ post_limits: LimitTracker,
+ batch_limits: LimitTracker,
+ max_payload_bytes: usize,
+ max_request_bytes: usize,
+ queued: Vec<u8>,
+ batch: BatchState,
+ last_modified: ServerTimestamp,
+}
+
+pub trait BatchPoster {
+ /// Note: Last argument (reference to the batch poster) is provided for the purposes of testing
+ /// Important: Poster should not report non-success HTTP statuses as errors!!
+ fn post<P, O>(
+ &self,
+ body: Vec<u8>,
+ xius: ServerTimestamp,
+ batch: Option<String>,
+ commit: bool,
+ queue: &PostQueue<P, O>,
+ ) -> Result<PostResponse>;
+}
+
+// We don't just use a FnMut here since we want to override it in mocking for RefCell<TestType>,
+// which we can't do for FnMut since neither FnMut nor RefCell are defined here. Also, this
+// is somewhat better for documentation.
+pub trait PostResponseHandler {
+ fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()>;
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct NormalResponseHandler {
+ pub failed_ids: Vec<Guid>,
+ pub successful_ids: Vec<Guid>,
+ pub allow_failed: bool,
+ pub pending_failed: Vec<Guid>,
+ pub pending_success: Vec<Guid>,
+}
+
+impl NormalResponseHandler {
+ pub fn new(allow_failed: bool) -> NormalResponseHandler {
+ NormalResponseHandler {
+ failed_ids: vec![],
+ successful_ids: vec![],
+ pending_failed: vec![],
+ pending_success: vec![],
+ allow_failed,
+ }
+ }
+}
+
+impl PostResponseHandler for NormalResponseHandler {
+ fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> error::Result<()> {
+ match r {
+ Sync15ClientResponse::Success { record, .. } => {
+ if !record.failed.is_empty() && !self.allow_failed {
+ return Err(ErrorKind::RecordUploadFailed);
+ }
+ for id in record.success.iter() {
+ self.pending_success.push(id.clone());
+ }
+ for kv in record.failed.iter() {
+ self.pending_failed.push(kv.0.clone());
+ }
+ if !mid_batch {
+ self.successful_ids.append(&mut self.pending_success);
+ self.failed_ids.append(&mut self.pending_failed);
+ }
+ Ok(())
+ }
+ _ => Err(r.create_storage_error()),
+ }
+ }
+}
+
+impl<Poster, OnResponse> PostQueue<Poster, OnResponse>
+where
+ Poster: BatchPoster,
+ OnResponse: PostResponseHandler,
+{
+ pub fn new(
+ config: &InfoConfiguration,
+ ts: ServerTimestamp,
+ poster: Poster,
+ on_response: OnResponse,
+ ) -> PostQueue<Poster, OnResponse> {
+ PostQueue {
+ poster,
+ on_response,
+ last_modified: ts,
+ post_limits: LimitTracker::new(config.max_post_bytes, config.max_post_records),
+ batch_limits: LimitTracker::new(config.max_total_bytes, config.max_total_records),
+ batch: BatchState::NoBatch,
+ max_payload_bytes: config.max_record_payload_bytes,
+ max_request_bytes: config.max_request_bytes,
+ queued: Vec::new(),
+ }
+ }
+
+ #[inline]
+ fn in_batch(&self) -> bool {
+ !matches!(&self.batch, BatchState::Unsupported | BatchState::NoBatch)
+ }
+
+ pub fn enqueue(&mut self, record: &OutgoingEncryptedBso) -> Result<bool> {
+ let payload_length = record.serialized_payload_len();
+
+ if self.post_limits.can_never_add(payload_length)
+ || self.batch_limits.can_never_add(payload_length)
+ || payload_length >= self.max_payload_bytes
+ {
+ log::warn!(
+ "Single record too large to submit to server ({} b)",
+ payload_length
+ );
+ return Ok(false);
+ }
+
+ // Write directly into `queued` but undo if necessary (the vast majority of the time
+ // it won't be necessary). If we hit a problem we need to undo that, but the only error
+ // case we have to worry about right now is in flush()
+ let item_start = self.queued.len();
+
+ // This is conservative but can't hurt.
+ self.queued.reserve(payload_length + 2);
+
+ // Either the first character in an array, or a comma separating
+ // it from the previous item.
+ let c = if self.queued.is_empty() { b'[' } else { b',' };
+ self.queued.push(c);
+
+ // This unwrap is fine, since serde_json's failure case is HashMaps that have non-object
+ // keys, which is impossible. If you decide to change this part, you *need* to call
+ // `self.queued.truncate(item_start)` here in the failure case!
+ serde_json::to_writer(&mut self.queued, &record).unwrap();
+
+ let item_end = self.queued.len();
+
+ debug_assert!(
+ item_end >= payload_length,
+ "EncryptedPayload::serialized_len is bugged"
+ );
+
+ // The + 1 is only relevant for the final record, which will have a trailing ']'.
+ let item_len = item_end - item_start + 1;
+
+ if item_len >= self.max_request_bytes {
+ self.queued.truncate(item_start);
+ log::warn!(
+ "Single record too large to submit to server ({} b)",
+ item_len
+ );
+ return Ok(false);
+ }
+
+ let can_post_record = self.post_limits.can_add_record(payload_length);
+ let can_batch_record = self.batch_limits.can_add_record(payload_length);
+ let can_send_record = self.queued.len() < self.max_request_bytes;
+
+ if !can_post_record || !can_send_record || !can_batch_record {
+ log::debug!(
+ "PostQueue flushing! (can_post = {}, can_send = {}, can_batch = {})",
+ can_post_record,
+ can_send_record,
+ can_batch_record
+ );
+ // "unwrite" the record.
+ self.queued.truncate(item_start);
+ // Flush whatever we have queued.
+ self.flush(!can_batch_record)?;
+ // And write it again.
+ let c = if self.queued.is_empty() { b'[' } else { b',' };
+ self.queued.push(c);
+ serde_json::to_writer(&mut self.queued, &record).unwrap();
+ }
+
+ self.post_limits.record_added(payload_length);
+ self.batch_limits.record_added(payload_length);
+
+ Ok(true)
+ }
+
+ pub fn flush(&mut self, want_commit: bool) -> Result<()> {
+ if self.queued.is_empty() {
+ assert!(
+ !self.in_batch(),
+ "Bug: Somehow we're in a batch but have no queued records"
+ );
+ // Nothing to do!
+ return Ok(());
+ }
+
+ self.queued.push(b']');
+ let batch_id = match &self.batch {
+ // Not the first post and we know we have no batch semantics.
+ BatchState::Unsupported => None,
+ // First commit in possible batch
+ BatchState::NoBatch => Some("true".into()),
+ // In a batch and we have a batch id.
+ BatchState::InBatch(ref s) => Some(s.clone()),
+ };
+
+ log::info!(
+ "Posting {} records of {} bytes",
+ self.post_limits.cur_records,
+ self.queued.len()
+ );
+
+ let is_commit = want_commit && batch_id.is_some();
+ // Weird syntax for calling a function object that is a property.
+ let resp_or_error = self.poster.post(
+ self.queued.clone(),
+ self.last_modified,
+ batch_id,
+ is_commit,
+ self,
+ );
+
+ self.queued.truncate(0);
+
+ if want_commit || self.batch == BatchState::Unsupported {
+ self.batch_limits.clear();
+ }
+ self.post_limits.clear();
+
+ let resp = resp_or_error?;
+
+ let (status, last_modified, record) = match resp {
+ Sync15ClientResponse::Success {
+ status,
+ last_modified,
+ ref record,
+ ..
+ } => (status, last_modified, record),
+ _ => {
+ self.on_response.handle_response(resp, !want_commit)?;
+ // on_response() should always fail!
+ unreachable!();
+ }
+ };
+
+ if want_commit || self.batch == BatchState::Unsupported {
+ self.last_modified = last_modified;
+ }
+
+ if want_commit {
+ log::debug!("Committed batch {:?}", self.batch);
+ self.batch = BatchState::NoBatch;
+ self.on_response.handle_response(resp, false)?;
+ return Ok(());
+ }
+
+ if status != status_codes::ACCEPTED {
+ if self.in_batch() {
+ return Err(ErrorKind::ServerBatchProblem(
+ "Server responded non-202 success code while a batch was in progress",
+ ));
+ }
+ self.last_modified = last_modified;
+ self.batch = BatchState::Unsupported;
+ self.batch_limits.clear();
+ self.on_response.handle_response(resp, false)?;
+ return Ok(());
+ }
+
+ let batch_id = record
+ .batch
+ .as_ref()
+ .ok_or({
+ ErrorKind::ServerBatchProblem("Invalid server response: 202 without a batch ID")
+ })?
+ .clone();
+
+ match &self.batch {
+ BatchState::Unsupported => {
+ log::warn!("Server changed its mind about supporting batching mid-batch...");
+ }
+
+ BatchState::InBatch(ref cur_id) => {
+ if cur_id != &batch_id {
+ return Err(ErrorKind::ServerBatchProblem(
+ "Invalid server response: 202 without a batch ID",
+ ));
+ }
+ }
+ _ => {}
+ }
+
+ // Can't change this in match arms without NLL
+ self.batch = BatchState::InBatch(batch_id);
+ self.last_modified = last_modified;
+
+ self.on_response.handle_response(resp, true)?;
+
+ Ok(())
+ }
+}
+
+#[derive(Clone)]
+pub struct UploadInfo {
+ pub successful_ids: Vec<Guid>,
+ pub failed_ids: Vec<Guid>,
+ pub modified_timestamp: ServerTimestamp,
+}
+
+impl<Poster> PostQueue<Poster, NormalResponseHandler> {
+ // TODO: should take by move
+ pub fn completed_upload_info(&mut self) -> UploadInfo {
+ let mut result = UploadInfo {
+ successful_ids: Vec::with_capacity(self.on_response.successful_ids.len()),
+ failed_ids: Vec::with_capacity(
+ self.on_response.failed_ids.len()
+ + self.on_response.pending_failed.len()
+ + self.on_response.pending_success.len(),
+ ),
+ modified_timestamp: self.last_modified,
+ };
+
+ result
+ .successful_ids
+ .append(&mut self.on_response.successful_ids);
+
+ result.failed_ids.append(&mut self.on_response.failed_ids);
+ result
+ .failed_ids
+ .append(&mut self.on_response.pending_failed);
+ result
+ .failed_ids
+ .append(&mut self.on_response.pending_success);
+
+ result
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::bso::{IncomingEncryptedBso, OutgoingEncryptedBso, OutgoingEnvelope};
+ use crate::EncryptedPayload;
+ use lazy_static::lazy_static;
+ use std::cell::RefCell;
+ use std::collections::VecDeque;
+ use std::rc::Rc;
+
+ #[derive(Debug, Clone)]
+ struct PostedData {
+ body: String,
+ _xius: ServerTimestamp,
+ batch: Option<String>,
+ commit: bool,
+ payload_bytes: usize,
+ records: usize,
+ }
+
+ impl PostedData {
+ fn records_as_json(&self) -> Vec<serde_json::Value> {
+ let values =
+ serde_json::from_str::<serde_json::Value>(&self.body).expect("Posted invalid json");
+ // Check that they actually deserialize as what we want
+ let records_or_err =
+ serde_json::from_value::<Vec<IncomingEncryptedBso>>(values.clone());
+ records_or_err.expect("Failed to deserialize data");
+ serde_json::from_value(values).unwrap()
+ }
+ }
+
+ #[derive(Debug, Clone)]
+ struct BatchInfo {
+ id: Option<String>,
+ posts: Vec<PostedData>,
+ bytes: usize,
+ records: usize,
+ }
+
+ #[derive(Debug, Clone)]
+ struct TestPoster {
+ all_posts: Vec<PostedData>,
+ responses: VecDeque<PostResponse>,
+ batches: Vec<BatchInfo>,
+ cur_batch: Option<BatchInfo>,
+ cfg: InfoConfiguration,
+ }
+
+ type TestPosterRef = Rc<RefCell<TestPoster>>;
+ impl TestPoster {
+ pub fn new<T>(cfg: &InfoConfiguration, responses: T) -> TestPosterRef
+ where
+ T: Into<VecDeque<PostResponse>>,
+ {
+ Rc::new(RefCell::new(TestPoster {
+ all_posts: vec![],
+ responses: responses.into(),
+ batches: vec![],
+ cur_batch: None,
+ cfg: cfg.clone(),
+ }))
+ }
+ // Adds &mut
+ fn do_post<T, O>(
+ &mut self,
+ body: &[u8],
+ xius: ServerTimestamp,
+ batch: Option<String>,
+ commit: bool,
+ queue: &PostQueue<T, O>,
+ ) -> Sync15ClientResponse<UploadResult> {
+ let mut post = PostedData {
+ body: String::from_utf8(body.into()).expect("Posted invalid utf8..."),
+ batch: batch.clone(),
+ _xius: xius,
+ commit,
+ payload_bytes: 0,
+ records: 0,
+ };
+
+ assert!(body.len() <= self.cfg.max_request_bytes);
+
+ let (num_records, record_payload_bytes) = {
+ let recs = post.records_as_json();
+ assert!(recs.len() <= self.cfg.max_post_records);
+ assert!(recs.len() <= self.cfg.max_total_records);
+ let payload_bytes: usize = recs
+ .iter()
+ .map(|r| {
+ let len = r["payload"]
+ .as_str()
+ .expect("Non string payload property")
+ .len();
+ assert!(len <= self.cfg.max_record_payload_bytes);
+ len
+ })
+ .sum();
+ assert!(payload_bytes <= self.cfg.max_post_bytes);
+ assert!(payload_bytes <= self.cfg.max_total_bytes);
+
+ assert_eq!(queue.post_limits.cur_bytes, payload_bytes);
+ assert_eq!(queue.post_limits.cur_records, recs.len());
+ (recs.len(), payload_bytes)
+ };
+ post.payload_bytes = record_payload_bytes;
+ post.records = num_records;
+
+ self.all_posts.push(post.clone());
+ let response = self.responses.pop_front().unwrap();
+
+ let record = match response {
+ Sync15ClientResponse::Success { ref record, .. } => record,
+ _ => {
+ panic!("only success codes are used in this test");
+ }
+ };
+
+ if self.cur_batch.is_none() {
+ assert!(
+ batch.is_none() || batch == Some("true".into()),
+ "We shouldn't be in a batch now"
+ );
+ self.cur_batch = Some(BatchInfo {
+ id: record.batch.clone(),
+ posts: vec![],
+ records: 0,
+ bytes: 0,
+ });
+ } else {
+ assert_eq!(
+ batch,
+ self.cur_batch.as_ref().unwrap().id,
+ "We're in a batch but got the wrong batch id"
+ );
+ }
+
+ {
+ let batch = self.cur_batch.as_mut().unwrap();
+ batch.posts.push(post);
+ batch.records += num_records;
+ batch.bytes += record_payload_bytes;
+
+ assert!(batch.bytes <= self.cfg.max_total_bytes);
+ assert!(batch.records <= self.cfg.max_total_records);
+
+ assert_eq!(batch.records, queue.batch_limits.cur_records);
+ assert_eq!(batch.bytes, queue.batch_limits.cur_bytes);
+ }
+
+ if commit || record.batch.is_none() {
+ let batch = self.cur_batch.take().unwrap();
+ self.batches.push(batch);
+ }
+
+ response
+ }
+
+ fn do_handle_response(&mut self, _: PostResponse, mid_batch: bool) {
+ assert_eq!(mid_batch, self.cur_batch.is_some());
+ }
+ }
+ impl BatchPoster for TestPosterRef {
+ fn post<T, O>(
+ &self,
+ body: Vec<u8>,
+ xius: ServerTimestamp,
+ batch: Option<String>,
+ commit: bool,
+ queue: &PostQueue<T, O>,
+ ) -> Result<PostResponse> {
+ Ok(self.borrow_mut().do_post(&body, xius, batch, commit, queue))
+ }
+ }
+
+ impl PostResponseHandler for TestPosterRef {
+ fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()> {
+ self.borrow_mut().do_handle_response(r, mid_batch);
+ Ok(())
+ }
+ }
+
+ type MockedPostQueue = PostQueue<TestPosterRef, TestPosterRef>;
+
+ fn pq_test_setup(
+ cfg: InfoConfiguration,
+ lm: i64,
+ resps: Vec<PostResponse>,
+ ) -> (MockedPostQueue, TestPosterRef) {
+ let tester = TestPoster::new(&cfg, resps);
+ let pq = PostQueue::new(&cfg, ServerTimestamp(lm), tester.clone(), tester.clone());
+ (pq, tester)
+ }
+
+ fn fake_response<'a, T: Into<Option<&'a str>>>(status: u16, lm: i64, batch: T) -> PostResponse {
+ assert!(status_codes::is_success_code(status));
+ Sync15ClientResponse::Success {
+ status,
+ last_modified: ServerTimestamp(lm),
+ record: UploadResult {
+ batch: batch.into().map(Into::into),
+ failed: HashMap::new(),
+ success: vec![],
+ },
+ route: "test/path".into(),
+ }
+ }
+
+ lazy_static! {
+ // ~40b
+ static ref PAYLOAD_OVERHEAD: usize = {
+ let payload = EncryptedPayload {
+ iv: "".into(),
+ hmac: "".into(),
+ ciphertext: "".into()
+ };
+ serde_json::to_string(&payload).unwrap().len()
+ };
+ // ~80b
+ static ref TOTAL_RECORD_OVERHEAD: usize = {
+ let val = serde_json::to_value(OutgoingEncryptedBso::new(OutgoingEnvelope {
+ id: "".into(),
+ sortindex: None,
+ ttl: None,
+ },
+ EncryptedPayload {
+ iv: "".into(),
+ hmac: "".into(),
+ ciphertext: "".into()
+ },
+ )).unwrap();
+ serde_json::to_string(&val).unwrap().len()
+ };
+ // There's some subtlety in how we calulate this having to do with the fact that
+ // the quotes in the payload are escaped but the escape chars count to the request len
+ // and *not* to the payload len (the payload len check happens after json parsing the
+ // top level object).
+ static ref NON_PAYLOAD_OVERHEAD: usize = {
+ *TOTAL_RECORD_OVERHEAD - *PAYLOAD_OVERHEAD
+ };
+ }
+
+ // Actual record size (for max_request_len) will be larger by some amount
+ fn make_record(payload_size: usize) -> OutgoingEncryptedBso {
+ assert!(payload_size > *PAYLOAD_OVERHEAD);
+ let ciphertext_len = payload_size - *PAYLOAD_OVERHEAD;
+ OutgoingEncryptedBso::new(
+ OutgoingEnvelope {
+ id: "".into(),
+ sortindex: None,
+ ttl: None,
+ },
+ EncryptedPayload {
+ iv: "".into(),
+ hmac: "".into(),
+ ciphertext: "x".repeat(ciphertext_len),
+ },
+ )
+ }
+
+ fn request_bytes_for_payloads(payloads: &[usize]) -> usize {
+ 1 + payloads
+ .iter()
+ .map(|&size| size + 1 + *NON_PAYLOAD_OVERHEAD)
+ .sum::<usize>()
+ }
+
+ #[test]
+ fn test_pq_basic() {
+ let cfg = InfoConfiguration {
+ max_request_bytes: 1000,
+ max_record_payload_bytes: 1000,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![fake_response(status_codes::OK, time + 100_000, None)],
+ );
+
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.flush(true).unwrap();
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 1);
+ assert_eq!(t.batches.len(), 1);
+ assert_eq!(t.batches[0].posts.len(), 1);
+ assert_eq!(t.batches[0].records, 1);
+ assert_eq!(t.batches[0].bytes, 100);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[100])
+ );
+ }
+
+ #[test]
+ fn test_pq_max_request_bytes_no_batch() {
+ let cfg = InfoConfiguration {
+ max_request_bytes: 250,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![
+ fake_response(status_codes::OK, time + 100_000, None),
+ fake_response(status_codes::OK, time + 200_000, None),
+ ],
+ );
+
+ // Note that the total record overhead is around 85 bytes
+ let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
+ pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r]
+ pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 203; [r,r]
+ pq.enqueue(&make_record(payload_size)).unwrap(); // too big, 2nd post.
+ pq.flush(true).unwrap();
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 2);
+ assert_eq!(t.batches.len(), 2);
+ assert_eq!(t.batches[0].posts.len(), 1);
+ assert_eq!(t.batches[0].records, 2);
+ assert_eq!(t.batches[0].bytes, payload_size * 2);
+ assert_eq!(t.batches[0].posts[0].batch, Some("true".into()));
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[payload_size, payload_size])
+ );
+
+ assert_eq!(t.batches[1].posts.len(), 1);
+ assert_eq!(t.batches[1].records, 1);
+ assert_eq!(t.batches[1].bytes, payload_size);
+ // We know at this point that the server does not support batching.
+ assert_eq!(t.batches[1].posts[0].batch, None);
+ assert!(!t.batches[1].posts[0].commit);
+ assert_eq!(
+ t.batches[1].posts[0].body.len(),
+ request_bytes_for_payloads(&[payload_size])
+ );
+ }
+
+ #[test]
+ fn test_pq_max_record_payload_bytes_no_batch() {
+ let cfg = InfoConfiguration {
+ max_record_payload_bytes: 150,
+ max_request_bytes: 350,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![
+ fake_response(status_codes::OK, time + 100_000, None),
+ fake_response(status_codes::OK, time + 200_000, None),
+ ],
+ );
+
+ // Note that the total record overhead is around 85 bytes
+ let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
+ pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r]
+ let enqueued = pq.enqueue(&make_record(151)).unwrap(); // still 102
+ assert!(!enqueued, "Should not have fit");
+ pq.enqueue(&make_record(payload_size)).unwrap();
+ pq.flush(true).unwrap();
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 1);
+ assert_eq!(t.batches.len(), 1);
+ assert_eq!(t.batches[0].posts.len(), 1);
+ assert_eq!(t.batches[0].records, 2);
+ assert_eq!(t.batches[0].bytes, payload_size * 2);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[payload_size, payload_size])
+ );
+ }
+
+ #[test]
+ fn test_pq_single_batch() {
+ let cfg = InfoConfiguration::default();
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![fake_response(
+ status_codes::ACCEPTED,
+ time + 100_000,
+ Some("1234"),
+ )],
+ );
+
+ let payload_size = 100 - *NON_PAYLOAD_OVERHEAD;
+ pq.enqueue(&make_record(payload_size)).unwrap();
+ pq.enqueue(&make_record(payload_size)).unwrap();
+ pq.enqueue(&make_record(payload_size)).unwrap();
+ pq.flush(true).unwrap();
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 1);
+ assert_eq!(t.batches.len(), 1);
+ assert_eq!(t.batches[0].id.as_ref().unwrap(), "1234");
+ assert_eq!(t.batches[0].posts.len(), 1);
+ assert_eq!(t.batches[0].records, 3);
+ assert_eq!(t.batches[0].bytes, payload_size * 3);
+ assert!(t.batches[0].posts[0].commit);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[payload_size, payload_size, payload_size])
+ );
+ }
+
+ #[test]
+ fn test_pq_multi_post_batch_bytes() {
+ let cfg = InfoConfiguration {
+ max_post_bytes: 200,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![
+ fake_response(status_codes::ACCEPTED, time, Some("1234")),
+ fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
+ ],
+ );
+
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.flush(true).unwrap(); // COMMIT
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 2);
+ assert_eq!(t.batches.len(), 1);
+ assert_eq!(t.batches[0].posts.len(), 2);
+ assert_eq!(t.batches[0].records, 3);
+ assert_eq!(t.batches[0].bytes, 300);
+
+ assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
+ assert_eq!(t.batches[0].posts[0].records, 2);
+ assert_eq!(t.batches[0].posts[0].payload_bytes, 200);
+ assert!(!t.batches[0].posts[0].commit);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[100, 100])
+ );
+
+ assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
+ assert_eq!(t.batches[0].posts[1].records, 1);
+ assert_eq!(t.batches[0].posts[1].payload_bytes, 100);
+ assert!(t.batches[0].posts[1].commit);
+ assert_eq!(
+ t.batches[0].posts[1].body.len(),
+ request_bytes_for_payloads(&[100])
+ );
+ }
+
+ #[test]
+ fn test_pq_multi_post_batch_records() {
+ let cfg = InfoConfiguration {
+ max_post_records: 3,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![
+ fake_response(status_codes::ACCEPTED, time, Some("1234")),
+ fake_response(status_codes::ACCEPTED, time, Some("1234")),
+ fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
+ ],
+ );
+
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.flush(true).unwrap(); // COMMIT
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 3);
+ assert_eq!(t.batches.len(), 1);
+ assert_eq!(t.batches[0].posts.len(), 3);
+ assert_eq!(t.batches[0].records, 7);
+ assert_eq!(t.batches[0].bytes, 700);
+
+ assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
+ assert_eq!(t.batches[0].posts[0].records, 3);
+ assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
+ assert!(!t.batches[0].posts[0].commit);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[100, 100, 100])
+ );
+
+ assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
+ assert_eq!(t.batches[0].posts[1].records, 3);
+ assert_eq!(t.batches[0].posts[1].payload_bytes, 300);
+ assert!(!t.batches[0].posts[1].commit);
+ assert_eq!(
+ t.batches[0].posts[1].body.len(),
+ request_bytes_for_payloads(&[100, 100, 100])
+ );
+
+ assert_eq!(t.batches[0].posts[2].batch.as_ref().unwrap(), "1234");
+ assert_eq!(t.batches[0].posts[2].records, 1);
+ assert_eq!(t.batches[0].posts[2].payload_bytes, 100);
+ assert!(t.batches[0].posts[2].commit);
+ assert_eq!(
+ t.batches[0].posts[2].body.len(),
+ request_bytes_for_payloads(&[100])
+ );
+ }
+
+ #[test]
+ #[allow(clippy::cognitive_complexity)]
+ fn test_pq_multi_post_multi_batch_records() {
+ let cfg = InfoConfiguration {
+ max_post_records: 3,
+ max_total_records: 5,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![
+ fake_response(status_codes::ACCEPTED, time, Some("1234")),
+ fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")),
+ fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")),
+ fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")),
+ ],
+ );
+
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST + COMMIT
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.flush(true).unwrap(); // COMMIT
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 4);
+ assert_eq!(t.batches.len(), 2);
+ assert_eq!(t.batches[0].posts.len(), 2);
+ assert_eq!(t.batches[1].posts.len(), 2);
+
+ assert_eq!(t.batches[0].records, 5);
+ assert_eq!(t.batches[1].records, 4);
+
+ assert_eq!(t.batches[0].bytes, 500);
+ assert_eq!(t.batches[1].bytes, 400);
+
+ assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
+ assert_eq!(t.batches[0].posts[0].records, 3);
+ assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
+ assert!(!t.batches[0].posts[0].commit);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[100, 100, 100])
+ );
+
+ assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
+ assert_eq!(t.batches[0].posts[1].records, 2);
+ assert_eq!(t.batches[0].posts[1].payload_bytes, 200);
+ assert!(t.batches[0].posts[1].commit);
+ assert_eq!(
+ t.batches[0].posts[1].body.len(),
+ request_bytes_for_payloads(&[100, 100])
+ );
+
+ assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true");
+ assert_eq!(t.batches[1].posts[0].records, 3);
+ assert_eq!(t.batches[1].posts[0].payload_bytes, 300);
+ assert!(!t.batches[1].posts[0].commit);
+ assert_eq!(
+ t.batches[1].posts[0].body.len(),
+ request_bytes_for_payloads(&[100, 100, 100])
+ );
+
+ assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd");
+ assert_eq!(t.batches[1].posts[1].records, 1);
+ assert_eq!(t.batches[1].posts[1].payload_bytes, 100);
+ assert!(t.batches[1].posts[1].commit);
+ assert_eq!(
+ t.batches[1].posts[1].body.len(),
+ request_bytes_for_payloads(&[100])
+ );
+ }
+
+ #[test]
+ #[allow(clippy::cognitive_complexity)]
+ fn test_pq_multi_post_multi_batch_bytes() {
+ let cfg = InfoConfiguration {
+ max_post_bytes: 300,
+ max_total_bytes: 500,
+ ..InfoConfiguration::default()
+ };
+ let time = 11_111_111_000;
+ let (mut pq, tester) = pq_test_setup(
+ cfg,
+ time,
+ vec![
+ fake_response(status_codes::ACCEPTED, time, Some("1234")),
+ fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), // should commit
+ fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")),
+ fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")), // should commit
+ ],
+ );
+
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ assert_eq!(pq.last_modified.0, time);
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+ // POST + COMMIT
+ pq.enqueue(&make_record(100)).unwrap();
+ assert_eq!(pq.last_modified.0, time + 100_000);
+ pq.enqueue(&make_record(100)).unwrap();
+ pq.enqueue(&make_record(100)).unwrap();
+
+ // POST
+ pq.enqueue(&make_record(100)).unwrap();
+ assert_eq!(pq.last_modified.0, time + 100_000);
+ pq.flush(true).unwrap(); // COMMIT
+
+ assert_eq!(pq.last_modified.0, time + 200_000);
+
+ let t = tester.borrow();
+ assert!(t.cur_batch.is_none());
+ assert_eq!(t.all_posts.len(), 4);
+ assert_eq!(t.batches.len(), 2);
+ assert_eq!(t.batches[0].posts.len(), 2);
+ assert_eq!(t.batches[1].posts.len(), 2);
+
+ assert_eq!(t.batches[0].records, 5);
+ assert_eq!(t.batches[1].records, 4);
+
+ assert_eq!(t.batches[0].bytes, 500);
+ assert_eq!(t.batches[1].bytes, 400);
+
+ assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true");
+ assert_eq!(t.batches[0].posts[0].records, 3);
+ assert_eq!(t.batches[0].posts[0].payload_bytes, 300);
+ assert!(!t.batches[0].posts[0].commit);
+ assert_eq!(
+ t.batches[0].posts[0].body.len(),
+ request_bytes_for_payloads(&[100, 100, 100])
+ );
+
+ assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234");
+ assert_eq!(t.batches[0].posts[1].records, 2);
+ assert_eq!(t.batches[0].posts[1].payload_bytes, 200);
+ assert!(t.batches[0].posts[1].commit);
+ assert_eq!(
+ t.batches[0].posts[1].body.len(),
+ request_bytes_for_payloads(&[100, 100])
+ );
+
+ assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true");
+ assert_eq!(t.batches[1].posts[0].records, 3);
+ assert_eq!(t.batches[1].posts[0].payload_bytes, 300);
+ assert!(!t.batches[1].posts[0].commit);
+ assert_eq!(
+ t.batches[1].posts[0].body.len(),
+ request_bytes_for_payloads(&[100, 100, 100])
+ );
+
+ assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd");
+ assert_eq!(t.batches[1].posts[1].records, 1);
+ assert_eq!(t.batches[1].posts[1].payload_bytes, 100);
+ assert!(t.batches[1].posts[1].commit);
+ assert_eq!(
+ t.batches[1].posts[1].body.len(),
+ request_bytes_for_payloads(&[100])
+ );
+ }
+
+ // TODO: Test
+ //
+ // - error cases!!! We don't test our handling of server errors at all!
+ // - mixed bytes/record limits
+ //
+ // A lot of these have good examples in test_postqueue.js on deskftop sync
+}