/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ use super::storage_client::Sync15ClientResponse; use crate::bso::OutgoingEncryptedBso; use crate::error::{self, Error as ErrorKind, Result}; use crate::ServerTimestamp; use serde_derive::*; use std::collections::HashMap; use std::default::Default; use std::ops::Deref; use sync_guid::Guid; use viaduct::status_codes; /// Manages a pair of (byte, count) limits for a PostQueue, such as /// (max_post_bytes, max_post_records) or (max_total_bytes, max_total_records). #[derive(Debug, Clone)] struct LimitTracker { max_bytes: usize, max_records: usize, cur_bytes: usize, cur_records: usize, } impl LimitTracker { pub fn new(max_bytes: usize, max_records: usize) -> LimitTracker { LimitTracker { max_bytes, max_records, cur_bytes: 0, cur_records: 0, } } pub fn clear(&mut self) { self.cur_records = 0; self.cur_bytes = 0; } pub fn can_add_record(&self, payload_size: usize) -> bool { // Desktop does the cur_bytes check as exclusive, but we shouldn't see any servers that // don't have https://github.com/mozilla-services/server-syncstorage/issues/73 self.cur_records < self.max_records && self.cur_bytes + payload_size <= self.max_bytes } pub fn can_never_add(&self, record_size: usize) -> bool { record_size >= self.max_bytes } pub fn record_added(&mut self, record_size: usize) { assert!( self.can_add_record(record_size), "LimitTracker::record_added caller must check can_add_record" ); self.cur_records += 1; self.cur_bytes += record_size; } } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct InfoConfiguration { /// The maximum size in bytes of the overall HTTP request body that will be accepted by the /// server. #[serde(default = "default_max_request_bytes")] pub max_request_bytes: usize, /// The maximum number of records that can be uploaded to a collection in a single POST request. #[serde(default = "usize::max_value")] pub max_post_records: usize, /// The maximum combined size in bytes of the record payloads that can be uploaded to a /// collection in a single POST request. #[serde(default = "usize::max_value")] pub max_post_bytes: usize, /// The maximum total number of records that can be uploaded to a collection as part of a /// batched upload. #[serde(default = "usize::max_value")] pub max_total_records: usize, /// The maximum total combined size in bytes of the record payloads that can be uploaded to a /// collection as part of a batched upload. #[serde(default = "usize::max_value")] pub max_total_bytes: usize, /// The maximum size of an individual BSO payload, in bytes. #[serde(default = "default_max_record_payload_bytes")] pub max_record_payload_bytes: usize, } // This is annoying but seems to be the only way to do it... fn default_max_request_bytes() -> usize { 260 * 1024 } fn default_max_record_payload_bytes() -> usize { 256 * 1024 } impl Default for InfoConfiguration { #[inline] fn default() -> InfoConfiguration { InfoConfiguration { max_request_bytes: default_max_request_bytes(), max_record_payload_bytes: default_max_record_payload_bytes(), max_post_records: usize::max_value(), max_post_bytes: usize::max_value(), max_total_records: usize::max_value(), max_total_bytes: usize::max_value(), } } } #[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct InfoCollections(pub(crate) HashMap); impl InfoCollections { pub fn new(collections: HashMap) -> InfoCollections { InfoCollections(collections) } } impl Deref for InfoCollections { type Target = HashMap; fn deref(&self) -> &HashMap { &self.0 } } #[derive(Debug, Clone, Deserialize)] pub struct UploadResult { batch: Option, /// Maps record id => why failed #[serde(default = "HashMap::new")] pub failed: HashMap, /// Vec of ids #[serde(default = "Vec::new")] pub success: Vec, } pub type PostResponse = Sync15ClientResponse; #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub enum BatchState { Unsupported, NoBatch, InBatch(String), } #[derive(Debug)] pub struct PostQueue { poster: Post, on_response: OnResponse, post_limits: LimitTracker, batch_limits: LimitTracker, max_payload_bytes: usize, max_request_bytes: usize, queued: Vec, batch: BatchState, last_modified: ServerTimestamp, } pub trait BatchPoster { /// Note: Last argument (reference to the batch poster) is provided for the purposes of testing /// Important: Poster should not report non-success HTTP statuses as errors!! fn post( &self, body: Vec, xius: ServerTimestamp, batch: Option, commit: bool, queue: &PostQueue, ) -> Result; } // We don't just use a FnMut here since we want to override it in mocking for RefCell, // which we can't do for FnMut since neither FnMut nor RefCell are defined here. Also, this // is somewhat better for documentation. pub trait PostResponseHandler { fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()>; } #[derive(Debug, Clone)] pub(crate) struct NormalResponseHandler { pub failed_ids: Vec, pub successful_ids: Vec, pub allow_failed: bool, pub pending_failed: Vec, pub pending_success: Vec, } impl NormalResponseHandler { pub fn new(allow_failed: bool) -> NormalResponseHandler { NormalResponseHandler { failed_ids: vec![], successful_ids: vec![], pending_failed: vec![], pending_success: vec![], allow_failed, } } } impl PostResponseHandler for NormalResponseHandler { fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> error::Result<()> { match r { Sync15ClientResponse::Success { record, .. } => { if !record.failed.is_empty() && !self.allow_failed { return Err(ErrorKind::RecordUploadFailed); } for id in record.success.iter() { self.pending_success.push(id.clone()); } for kv in record.failed.iter() { self.pending_failed.push(kv.0.clone()); } if !mid_batch { self.successful_ids.append(&mut self.pending_success); self.failed_ids.append(&mut self.pending_failed); } Ok(()) } _ => Err(r.create_storage_error()), } } } impl PostQueue where Poster: BatchPoster, OnResponse: PostResponseHandler, { pub fn new( config: &InfoConfiguration, ts: ServerTimestamp, poster: Poster, on_response: OnResponse, ) -> PostQueue { PostQueue { poster, on_response, last_modified: ts, post_limits: LimitTracker::new(config.max_post_bytes, config.max_post_records), batch_limits: LimitTracker::new(config.max_total_bytes, config.max_total_records), batch: BatchState::NoBatch, max_payload_bytes: config.max_record_payload_bytes, max_request_bytes: config.max_request_bytes, queued: Vec::new(), } } #[inline] fn in_batch(&self) -> bool { !matches!(&self.batch, BatchState::Unsupported | BatchState::NoBatch) } pub fn enqueue(&mut self, record: &OutgoingEncryptedBso) -> Result { let payload_length = record.serialized_payload_len(); if self.post_limits.can_never_add(payload_length) || self.batch_limits.can_never_add(payload_length) || payload_length >= self.max_payload_bytes { log::warn!( "Single record too large to submit to server ({} b)", payload_length ); return Ok(false); } // Write directly into `queued` but undo if necessary (the vast majority of the time // it won't be necessary). If we hit a problem we need to undo that, but the only error // case we have to worry about right now is in flush() let item_start = self.queued.len(); // This is conservative but can't hurt. self.queued.reserve(payload_length + 2); // Either the first character in an array, or a comma separating // it from the previous item. let c = if self.queued.is_empty() { b'[' } else { b',' }; self.queued.push(c); // This unwrap is fine, since serde_json's failure case is HashMaps that have non-object // keys, which is impossible. If you decide to change this part, you *need* to call // `self.queued.truncate(item_start)` here in the failure case! serde_json::to_writer(&mut self.queued, &record).unwrap(); let item_end = self.queued.len(); debug_assert!( item_end >= payload_length, "EncryptedPayload::serialized_len is bugged" ); // The + 1 is only relevant for the final record, which will have a trailing ']'. let item_len = item_end - item_start + 1; if item_len >= self.max_request_bytes { self.queued.truncate(item_start); log::warn!( "Single record too large to submit to server ({} b)", item_len ); return Ok(false); } let can_post_record = self.post_limits.can_add_record(payload_length); let can_batch_record = self.batch_limits.can_add_record(payload_length); let can_send_record = self.queued.len() < self.max_request_bytes; if !can_post_record || !can_send_record || !can_batch_record { log::debug!( "PostQueue flushing! (can_post = {}, can_send = {}, can_batch = {})", can_post_record, can_send_record, can_batch_record ); // "unwrite" the record. self.queued.truncate(item_start); // Flush whatever we have queued. self.flush(!can_batch_record)?; // And write it again. let c = if self.queued.is_empty() { b'[' } else { b',' }; self.queued.push(c); serde_json::to_writer(&mut self.queued, &record).unwrap(); } self.post_limits.record_added(payload_length); self.batch_limits.record_added(payload_length); Ok(true) } pub fn flush(&mut self, want_commit: bool) -> Result<()> { if self.queued.is_empty() { assert!( !self.in_batch(), "Bug: Somehow we're in a batch but have no queued records" ); // Nothing to do! return Ok(()); } self.queued.push(b']'); let batch_id = match &self.batch { // Not the first post and we know we have no batch semantics. BatchState::Unsupported => None, // First commit in possible batch BatchState::NoBatch => Some("true".into()), // In a batch and we have a batch id. BatchState::InBatch(ref s) => Some(s.clone()), }; log::info!( "Posting {} records of {} bytes", self.post_limits.cur_records, self.queued.len() ); let is_commit = want_commit && batch_id.is_some(); // Weird syntax for calling a function object that is a property. let resp_or_error = self.poster.post( self.queued.clone(), self.last_modified, batch_id, is_commit, self, ); self.queued.truncate(0); if want_commit || self.batch == BatchState::Unsupported { self.batch_limits.clear(); } self.post_limits.clear(); let resp = resp_or_error?; let (status, last_modified, record) = match resp { Sync15ClientResponse::Success { status, last_modified, ref record, .. } => (status, last_modified, record), _ => { self.on_response.handle_response(resp, !want_commit)?; // on_response() should always fail! unreachable!(); } }; if want_commit || self.batch == BatchState::Unsupported { self.last_modified = last_modified; } if want_commit { log::debug!("Committed batch {:?}", self.batch); self.batch = BatchState::NoBatch; self.on_response.handle_response(resp, false)?; return Ok(()); } if status != status_codes::ACCEPTED { if self.in_batch() { return Err(ErrorKind::ServerBatchProblem( "Server responded non-202 success code while a batch was in progress", )); } self.last_modified = last_modified; self.batch = BatchState::Unsupported; self.batch_limits.clear(); self.on_response.handle_response(resp, false)?; return Ok(()); } let batch_id = record .batch .as_ref() .ok_or({ ErrorKind::ServerBatchProblem("Invalid server response: 202 without a batch ID") })? .clone(); match &self.batch { BatchState::Unsupported => { log::warn!("Server changed its mind about supporting batching mid-batch..."); } BatchState::InBatch(ref cur_id) => { if cur_id != &batch_id { return Err(ErrorKind::ServerBatchProblem( "Invalid server response: 202 without a batch ID", )); } } _ => {} } // Can't change this in match arms without NLL self.batch = BatchState::InBatch(batch_id); self.last_modified = last_modified; self.on_response.handle_response(resp, true)?; Ok(()) } } #[derive(Clone)] pub struct UploadInfo { pub successful_ids: Vec, pub failed_ids: Vec, pub modified_timestamp: ServerTimestamp, } impl PostQueue { // TODO: should take by move pub fn completed_upload_info(&mut self) -> UploadInfo { let mut result = UploadInfo { successful_ids: Vec::with_capacity(self.on_response.successful_ids.len()), failed_ids: Vec::with_capacity( self.on_response.failed_ids.len() + self.on_response.pending_failed.len() + self.on_response.pending_success.len(), ), modified_timestamp: self.last_modified, }; result .successful_ids .append(&mut self.on_response.successful_ids); result.failed_ids.append(&mut self.on_response.failed_ids); result .failed_ids .append(&mut self.on_response.pending_failed); result .failed_ids .append(&mut self.on_response.pending_success); result } } #[cfg(test)] mod test { use super::*; use crate::bso::{IncomingEncryptedBso, OutgoingEncryptedBso, OutgoingEnvelope}; use crate::EncryptedPayload; use lazy_static::lazy_static; use std::cell::RefCell; use std::collections::VecDeque; use std::rc::Rc; #[derive(Debug, Clone)] struct PostedData { body: String, _xius: ServerTimestamp, batch: Option, commit: bool, payload_bytes: usize, records: usize, } impl PostedData { fn records_as_json(&self) -> Vec { let values = serde_json::from_str::(&self.body).expect("Posted invalid json"); // Check that they actually deserialize as what we want let records_or_err = serde_json::from_value::>(values.clone()); records_or_err.expect("Failed to deserialize data"); serde_json::from_value(values).unwrap() } } #[derive(Debug, Clone)] struct BatchInfo { id: Option, posts: Vec, bytes: usize, records: usize, } #[derive(Debug, Clone)] struct TestPoster { all_posts: Vec, responses: VecDeque, batches: Vec, cur_batch: Option, cfg: InfoConfiguration, } type TestPosterRef = Rc>; impl TestPoster { pub fn new(cfg: &InfoConfiguration, responses: T) -> TestPosterRef where T: Into>, { Rc::new(RefCell::new(TestPoster { all_posts: vec![], responses: responses.into(), batches: vec![], cur_batch: None, cfg: cfg.clone(), })) } // Adds &mut fn do_post( &mut self, body: &[u8], xius: ServerTimestamp, batch: Option, commit: bool, queue: &PostQueue, ) -> Sync15ClientResponse { let mut post = PostedData { body: String::from_utf8(body.into()).expect("Posted invalid utf8..."), batch: batch.clone(), _xius: xius, commit, payload_bytes: 0, records: 0, }; assert!(body.len() <= self.cfg.max_request_bytes); let (num_records, record_payload_bytes) = { let recs = post.records_as_json(); assert!(recs.len() <= self.cfg.max_post_records); assert!(recs.len() <= self.cfg.max_total_records); let payload_bytes: usize = recs .iter() .map(|r| { let len = r["payload"] .as_str() .expect("Non string payload property") .len(); assert!(len <= self.cfg.max_record_payload_bytes); len }) .sum(); assert!(payload_bytes <= self.cfg.max_post_bytes); assert!(payload_bytes <= self.cfg.max_total_bytes); assert_eq!(queue.post_limits.cur_bytes, payload_bytes); assert_eq!(queue.post_limits.cur_records, recs.len()); (recs.len(), payload_bytes) }; post.payload_bytes = record_payload_bytes; post.records = num_records; self.all_posts.push(post.clone()); let response = self.responses.pop_front().unwrap(); let record = match response { Sync15ClientResponse::Success { ref record, .. } => record, _ => { panic!("only success codes are used in this test"); } }; if self.cur_batch.is_none() { assert!( batch.is_none() || batch == Some("true".into()), "We shouldn't be in a batch now" ); self.cur_batch = Some(BatchInfo { id: record.batch.clone(), posts: vec![], records: 0, bytes: 0, }); } else { assert_eq!( batch, self.cur_batch.as_ref().unwrap().id, "We're in a batch but got the wrong batch id" ); } { let batch = self.cur_batch.as_mut().unwrap(); batch.posts.push(post); batch.records += num_records; batch.bytes += record_payload_bytes; assert!(batch.bytes <= self.cfg.max_total_bytes); assert!(batch.records <= self.cfg.max_total_records); assert_eq!(batch.records, queue.batch_limits.cur_records); assert_eq!(batch.bytes, queue.batch_limits.cur_bytes); } if commit || record.batch.is_none() { let batch = self.cur_batch.take().unwrap(); self.batches.push(batch); } response } fn do_handle_response(&mut self, _: PostResponse, mid_batch: bool) { assert_eq!(mid_batch, self.cur_batch.is_some()); } } impl BatchPoster for TestPosterRef { fn post( &self, body: Vec, xius: ServerTimestamp, batch: Option, commit: bool, queue: &PostQueue, ) -> Result { Ok(self.borrow_mut().do_post(&body, xius, batch, commit, queue)) } } impl PostResponseHandler for TestPosterRef { fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()> { self.borrow_mut().do_handle_response(r, mid_batch); Ok(()) } } type MockedPostQueue = PostQueue; fn pq_test_setup( cfg: InfoConfiguration, lm: i64, resps: Vec, ) -> (MockedPostQueue, TestPosterRef) { let tester = TestPoster::new(&cfg, resps); let pq = PostQueue::new(&cfg, ServerTimestamp(lm), tester.clone(), tester.clone()); (pq, tester) } fn fake_response<'a, T: Into>>(status: u16, lm: i64, batch: T) -> PostResponse { assert!(status_codes::is_success_code(status)); Sync15ClientResponse::Success { status, last_modified: ServerTimestamp(lm), record: UploadResult { batch: batch.into().map(Into::into), failed: HashMap::new(), success: vec![], }, route: "test/path".into(), } } lazy_static! { // ~40b static ref PAYLOAD_OVERHEAD: usize = { let payload = EncryptedPayload { iv: "".into(), hmac: "".into(), ciphertext: "".into() }; serde_json::to_string(&payload).unwrap().len() }; // ~80b static ref TOTAL_RECORD_OVERHEAD: usize = { let val = serde_json::to_value(OutgoingEncryptedBso::new(OutgoingEnvelope { id: "".into(), sortindex: None, ttl: None, }, EncryptedPayload { iv: "".into(), hmac: "".into(), ciphertext: "".into() }, )).unwrap(); serde_json::to_string(&val).unwrap().len() }; // There's some subtlety in how we calulate this having to do with the fact that // the quotes in the payload are escaped but the escape chars count to the request len // and *not* to the payload len (the payload len check happens after json parsing the // top level object). static ref NON_PAYLOAD_OVERHEAD: usize = { *TOTAL_RECORD_OVERHEAD - *PAYLOAD_OVERHEAD }; } // Actual record size (for max_request_len) will be larger by some amount fn make_record(payload_size: usize) -> OutgoingEncryptedBso { assert!(payload_size > *PAYLOAD_OVERHEAD); let ciphertext_len = payload_size - *PAYLOAD_OVERHEAD; OutgoingEncryptedBso::new( OutgoingEnvelope { id: "".into(), sortindex: None, ttl: None, }, EncryptedPayload { iv: "".into(), hmac: "".into(), ciphertext: "x".repeat(ciphertext_len), }, ) } fn request_bytes_for_payloads(payloads: &[usize]) -> usize { 1 + payloads .iter() .map(|&size| size + 1 + *NON_PAYLOAD_OVERHEAD) .sum::() } #[test] fn test_pq_basic() { let cfg = InfoConfiguration { max_request_bytes: 1000, max_record_payload_bytes: 1000, ..InfoConfiguration::default() }; let time = 11_111_111_000; let (mut pq, tester) = pq_test_setup( cfg, time, vec![fake_response(status_codes::OK, time + 100_000, None)], ); pq.enqueue(&make_record(100)).unwrap(); pq.flush(true).unwrap(); let t = tester.borrow(); assert!(t.cur_batch.is_none()); assert_eq!(t.all_posts.len(), 1); assert_eq!(t.batches.len(), 1); assert_eq!(t.batches[0].posts.len(), 1); assert_eq!(t.batches[0].records, 1); assert_eq!(t.batches[0].bytes, 100); assert_eq!( t.batches[0].posts[0].body.len(), request_bytes_for_payloads(&[100]) ); } #[test] fn test_pq_max_request_bytes_no_batch() { let cfg = InfoConfiguration { max_request_bytes: 250, ..InfoConfiguration::default() }; let time = 11_111_111_000; let (mut pq, tester) = pq_test_setup( cfg, time, vec![ fake_response(status_codes::OK, time + 100_000, None), fake_response(status_codes::OK, time + 200_000, None), ], ); // Note that the total record overhead is around 85 bytes let payload_size = 100 - *NON_PAYLOAD_OVERHEAD; pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r] pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 203; [r,r] pq.enqueue(&make_record(payload_size)).unwrap(); // too big, 2nd post. pq.flush(true).unwrap(); let t = tester.borrow(); assert!(t.cur_batch.is_none()); assert_eq!(t.all_posts.len(), 2); assert_eq!(t.batches.len(), 2); assert_eq!(t.batches[0].posts.len(), 1); assert_eq!(t.batches[0].records, 2); assert_eq!(t.batches[0].bytes, payload_size * 2); assert_eq!(t.batches[0].posts[0].batch, Some("true".into())); assert_eq!( t.batches[0].posts[0].body.len(), request_bytes_for_payloads(&[payload_size, payload_size]) ); assert_eq!(t.batches[1].posts.len(), 1); assert_eq!(t.batches[1].records, 1); assert_eq!(t.batches[1].bytes, payload_size); // We know at this point that the server does not support batching. assert_eq!(t.batches[1].posts[0].batch, None); assert!(!t.batches[1].posts[0].commit); assert_eq!( t.batches[1].posts[0].body.len(), request_bytes_for_payloads(&[payload_size]) ); } #[test] fn test_pq_max_record_payload_bytes_no_batch() { let cfg = InfoConfiguration { max_record_payload_bytes: 150, max_request_bytes: 350, ..InfoConfiguration::default() }; let time = 11_111_111_000; let (mut pq, tester) = pq_test_setup( cfg, time, vec![ fake_response(status_codes::OK, time + 100_000, None), fake_response(status_codes::OK, time + 200_000, None), ], ); // Note that the total record overhead is around 85 bytes let payload_size = 100 - *NON_PAYLOAD_OVERHEAD; pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r] let enqueued = pq.enqueue(&make_record(151)).unwrap(); // still 102 assert!(!enqueued, "Should not have fit"); pq.enqueue(&make_record(payload_size)).unwrap(); pq.flush(true).unwrap(); let t = tester.borrow(); assert!(t.cur_batch.is_none()); assert_eq!(t.all_posts.len(), 1); assert_eq!(t.batches.len(), 1); assert_eq!(t.batches[0].posts.len(), 1); assert_eq!(t.batches[0].records, 2); assert_eq!(t.batches[0].bytes, payload_size * 2); assert_eq!( t.batches[0].posts[0].body.len(), request_bytes_for_payloads(&[payload_size, payload_size]) ); } #[test] fn test_pq_single_batch() { let cfg = InfoConfiguration::default(); let time = 11_111_111_000; let (mut pq, tester) = pq_test_setup( cfg, time, vec![fake_response( status_codes::ACCEPTED, time + 100_000, Some("1234"), )], ); let payload_size = 100 - *NON_PAYLOAD_OVERHEAD; pq.enqueue(&make_record(payload_size)).unwrap(); pq.enqueue(&make_record(payload_size)).unwrap(); pq.enqueue(&make_record(payload_size)).unwrap(); pq.flush(true).unwrap(); let t = tester.borrow(); assert!(t.cur_batch.is_none()); assert_eq!(t.all_posts.len(), 1); assert_eq!(t.batches.len(), 1); assert_eq!(t.batches[0].id.as_ref().unwrap(), "1234"); assert_eq!(t.batches[0].posts.len(), 1); assert_eq!(t.batches[0].records, 3); assert_eq!(t.batches[0].bytes, payload_size * 3); assert!(t.batches[0].posts[0].commit); assert_eq!( t.batches[0].posts[0].body.len(), request_bytes_for_payloads(&[payload_size, payload_size, payload_size]) ); } #[test] fn test_pq_multi_post_batch_bytes() { let cfg = InfoConfiguration { max_post_bytes: 200, ..InfoConfiguration::default() }; let time = 11_111_111_000; let (mut pq, tester) = pq_test_setup( cfg, time, vec![ fake_response(status_codes::ACCEPTED, time, Some("1234")), fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), ], ); pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); // POST pq.enqueue(&make_record(100)).unwrap(); pq.flush(true).unwrap(); // COMMIT let t = tester.borrow(); assert!(t.cur_batch.is_none()); assert_eq!(t.all_posts.len(), 2); assert_eq!(t.batches.len(), 1); assert_eq!(t.batches[0].posts.len(), 2); assert_eq!(t.batches[0].records, 3); assert_eq!(t.batches[0].bytes, 300); assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); assert_eq!(t.batches[0].posts[0].records, 2); assert_eq!(t.batches[0].posts[0].payload_bytes, 200); assert!(!t.batches[0].posts[0].commit); assert_eq!( t.batches[0].posts[0].body.len(), request_bytes_for_payloads(&[100, 100]) ); assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); assert_eq!(t.batches[0].posts[1].records, 1); assert_eq!(t.batches[0].posts[1].payload_bytes, 100); assert!(t.batches[0].posts[1].commit); assert_eq!( t.batches[0].posts[1].body.len(), request_bytes_for_payloads(&[100]) ); } #[test] fn test_pq_multi_post_batch_records() { let cfg = InfoConfiguration { max_post_records: 3, ..InfoConfiguration::default() }; let time = 11_111_111_000; let (mut pq, tester) = pq_test_setup( cfg, time, vec![ fake_response(status_codes::ACCEPTED, time, Some("1234")), fake_response(status_codes::ACCEPTED, time, Some("1234")), fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), ], ); pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); // POST pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); // POST pq.enqueue(&make_record(100)).unwrap(); pq.flush(true).unwrap(); // COMMIT let t = tester.borrow(); assert!(t.cur_batch.is_none()); assert_eq!(t.all_posts.len(), 3); assert_eq!(t.batches.len(), 1); assert_eq!(t.batches[0].posts.len(), 3); assert_eq!(t.batches[0].records, 7); assert_eq!(t.batches[0].bytes, 700); assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); assert_eq!(t.batches[0].posts[0].records, 3); assert_eq!(t.batches[0].posts[0].payload_bytes, 300); assert!(!t.batches[0].posts[0].commit); assert_eq!( t.batches[0].posts[0].body.len(), request_bytes_for_payloads(&[100, 100, 100]) ); assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); assert_eq!(t.batches[0].posts[1].records, 3); assert_eq!(t.batches[0].posts[1].payload_bytes, 300); assert!(!t.batches[0].posts[1].commit); assert_eq!( t.batches[0].posts[1].body.len(), request_bytes_for_payloads(&[100, 100, 100]) ); assert_eq!(t.batches[0].posts[2].batch.as_ref().unwrap(), "1234"); assert_eq!(t.batches[0].posts[2].records, 1); assert_eq!(t.batches[0].posts[2].payload_bytes, 100); assert!(t.batches[0].posts[2].commit); assert_eq!( t.batches[0].posts[2].body.len(), request_bytes_for_payloads(&[100]) ); } #[test] #[allow(clippy::cognitive_complexity)] fn test_pq_multi_post_multi_batch_records() { let cfg = InfoConfiguration { max_post_records: 3, max_total_records: 5, ..InfoConfiguration::default() }; let time = 11_111_111_000; let (mut pq, tester) = pq_test_setup( cfg, time, vec![ fake_response(status_codes::ACCEPTED, time, Some("1234")), fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")), fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")), ], ); pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); // POST pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); // POST + COMMIT pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); // POST pq.enqueue(&make_record(100)).unwrap(); pq.flush(true).unwrap(); // COMMIT let t = tester.borrow(); assert!(t.cur_batch.is_none()); assert_eq!(t.all_posts.len(), 4); assert_eq!(t.batches.len(), 2); assert_eq!(t.batches[0].posts.len(), 2); assert_eq!(t.batches[1].posts.len(), 2); assert_eq!(t.batches[0].records, 5); assert_eq!(t.batches[1].records, 4); assert_eq!(t.batches[0].bytes, 500); assert_eq!(t.batches[1].bytes, 400); assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); assert_eq!(t.batches[0].posts[0].records, 3); assert_eq!(t.batches[0].posts[0].payload_bytes, 300); assert!(!t.batches[0].posts[0].commit); assert_eq!( t.batches[0].posts[0].body.len(), request_bytes_for_payloads(&[100, 100, 100]) ); assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); assert_eq!(t.batches[0].posts[1].records, 2); assert_eq!(t.batches[0].posts[1].payload_bytes, 200); assert!(t.batches[0].posts[1].commit); assert_eq!( t.batches[0].posts[1].body.len(), request_bytes_for_payloads(&[100, 100]) ); assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true"); assert_eq!(t.batches[1].posts[0].records, 3); assert_eq!(t.batches[1].posts[0].payload_bytes, 300); assert!(!t.batches[1].posts[0].commit); assert_eq!( t.batches[1].posts[0].body.len(), request_bytes_for_payloads(&[100, 100, 100]) ); assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd"); assert_eq!(t.batches[1].posts[1].records, 1); assert_eq!(t.batches[1].posts[1].payload_bytes, 100); assert!(t.batches[1].posts[1].commit); assert_eq!( t.batches[1].posts[1].body.len(), request_bytes_for_payloads(&[100]) ); } #[test] #[allow(clippy::cognitive_complexity)] fn test_pq_multi_post_multi_batch_bytes() { let cfg = InfoConfiguration { max_post_bytes: 300, max_total_bytes: 500, ..InfoConfiguration::default() }; let time = 11_111_111_000; let (mut pq, tester) = pq_test_setup( cfg, time, vec![ fake_response(status_codes::ACCEPTED, time, Some("1234")), fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), // should commit fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")), fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")), // should commit ], ); pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); assert_eq!(pq.last_modified.0, time); // POST pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); // POST + COMMIT pq.enqueue(&make_record(100)).unwrap(); assert_eq!(pq.last_modified.0, time + 100_000); pq.enqueue(&make_record(100)).unwrap(); pq.enqueue(&make_record(100)).unwrap(); // POST pq.enqueue(&make_record(100)).unwrap(); assert_eq!(pq.last_modified.0, time + 100_000); pq.flush(true).unwrap(); // COMMIT assert_eq!(pq.last_modified.0, time + 200_000); let t = tester.borrow(); assert!(t.cur_batch.is_none()); assert_eq!(t.all_posts.len(), 4); assert_eq!(t.batches.len(), 2); assert_eq!(t.batches[0].posts.len(), 2); assert_eq!(t.batches[1].posts.len(), 2); assert_eq!(t.batches[0].records, 5); assert_eq!(t.batches[1].records, 4); assert_eq!(t.batches[0].bytes, 500); assert_eq!(t.batches[1].bytes, 400); assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); assert_eq!(t.batches[0].posts[0].records, 3); assert_eq!(t.batches[0].posts[0].payload_bytes, 300); assert!(!t.batches[0].posts[0].commit); assert_eq!( t.batches[0].posts[0].body.len(), request_bytes_for_payloads(&[100, 100, 100]) ); assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); assert_eq!(t.batches[0].posts[1].records, 2); assert_eq!(t.batches[0].posts[1].payload_bytes, 200); assert!(t.batches[0].posts[1].commit); assert_eq!( t.batches[0].posts[1].body.len(), request_bytes_for_payloads(&[100, 100]) ); assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true"); assert_eq!(t.batches[1].posts[0].records, 3); assert_eq!(t.batches[1].posts[0].payload_bytes, 300); assert!(!t.batches[1].posts[0].commit); assert_eq!( t.batches[1].posts[0].body.len(), request_bytes_for_payloads(&[100, 100, 100]) ); assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd"); assert_eq!(t.batches[1].posts[1].records, 1); assert_eq!(t.batches[1].posts[1].payload_bytes, 100); assert!(t.batches[1].posts[1].commit); assert_eq!( t.batches[1].posts[1].body.len(), request_bytes_for_payloads(&[100]) ); } // TODO: Test // // - error cases!!! We don't test our handling of server errors at all! // - mixed bytes/record limits // // A lot of these have good examples in test_postqueue.js on deskftop sync }