diff options
Diffstat (limited to 'third_party/rust/sync15/src/client/request.rs')
-rw-r--r-- | third_party/rust/sync15/src/client/request.rs | 1199 |
1 files changed, 1199 insertions, 0 deletions
diff --git a/third_party/rust/sync15/src/client/request.rs b/third_party/rust/sync15/src/client/request.rs new file mode 100644 index 0000000000..c69b630c8d --- /dev/null +++ b/third_party/rust/sync15/src/client/request.rs @@ -0,0 +1,1199 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use super::storage_client::Sync15ClientResponse; +use crate::bso::OutgoingEncryptedBso; +use crate::error::{self, Error as ErrorKind, Result}; +use crate::ServerTimestamp; +use serde_derive::*; +use std::collections::HashMap; +use std::default::Default; +use std::ops::Deref; +use sync_guid::Guid; +use viaduct::status_codes; + +/// Manages a pair of (byte, count) limits for a PostQueue, such as +/// (max_post_bytes, max_post_records) or (max_total_bytes, max_total_records). +#[derive(Debug, Clone)] +struct LimitTracker { + max_bytes: usize, + max_records: usize, + cur_bytes: usize, + cur_records: usize, +} + +impl LimitTracker { + pub fn new(max_bytes: usize, max_records: usize) -> LimitTracker { + LimitTracker { + max_bytes, + max_records, + cur_bytes: 0, + cur_records: 0, + } + } + + pub fn clear(&mut self) { + self.cur_records = 0; + self.cur_bytes = 0; + } + + pub fn can_add_record(&self, payload_size: usize) -> bool { + // Desktop does the cur_bytes check as exclusive, but we shouldn't see any servers that + // don't have https://github.com/mozilla-services/server-syncstorage/issues/73 + self.cur_records < self.max_records && self.cur_bytes + payload_size <= self.max_bytes + } + + pub fn can_never_add(&self, record_size: usize) -> bool { + record_size >= self.max_bytes + } + + pub fn record_added(&mut self, record_size: usize) { + assert!( + self.can_add_record(record_size), + "LimitTracker::record_added caller must check can_add_record" + ); + self.cur_records += 1; + self.cur_bytes += record_size; + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct InfoConfiguration { + /// The maximum size in bytes of the overall HTTP request body that will be accepted by the + /// server. + #[serde(default = "default_max_request_bytes")] + pub max_request_bytes: usize, + + /// The maximum number of records that can be uploaded to a collection in a single POST request. + #[serde(default = "usize::max_value")] + pub max_post_records: usize, + + /// The maximum combined size in bytes of the record payloads that can be uploaded to a + /// collection in a single POST request. + #[serde(default = "usize::max_value")] + pub max_post_bytes: usize, + + /// The maximum total number of records that can be uploaded to a collection as part of a + /// batched upload. + #[serde(default = "usize::max_value")] + pub max_total_records: usize, + + /// The maximum total combined size in bytes of the record payloads that can be uploaded to a + /// collection as part of a batched upload. + #[serde(default = "usize::max_value")] + pub max_total_bytes: usize, + + /// The maximum size of an individual BSO payload, in bytes. + #[serde(default = "default_max_record_payload_bytes")] + pub max_record_payload_bytes: usize, +} + +// This is annoying but seems to be the only way to do it... +fn default_max_request_bytes() -> usize { + 260 * 1024 +} +fn default_max_record_payload_bytes() -> usize { + 256 * 1024 +} + +impl Default for InfoConfiguration { + #[inline] + fn default() -> InfoConfiguration { + InfoConfiguration { + max_request_bytes: default_max_request_bytes(), + max_record_payload_bytes: default_max_record_payload_bytes(), + max_post_records: usize::max_value(), + max_post_bytes: usize::max_value(), + max_total_records: usize::max_value(), + max_total_bytes: usize::max_value(), + } + } +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct InfoCollections(pub(crate) HashMap<String, ServerTimestamp>); + +impl InfoCollections { + pub fn new(collections: HashMap<String, ServerTimestamp>) -> InfoCollections { + InfoCollections(collections) + } +} + +impl Deref for InfoCollections { + type Target = HashMap<String, ServerTimestamp>; + + fn deref(&self) -> &HashMap<String, ServerTimestamp> { + &self.0 + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct UploadResult { + batch: Option<String>, + /// Maps record id => why failed + #[serde(default = "HashMap::new")] + pub failed: HashMap<Guid, String>, + /// Vec of ids + #[serde(default = "Vec::new")] + pub success: Vec<Guid>, +} + +pub type PostResponse = Sync15ClientResponse<UploadResult>; + +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub enum BatchState { + Unsupported, + NoBatch, + InBatch(String), +} + +#[derive(Debug)] +pub struct PostQueue<Post, OnResponse> { + poster: Post, + on_response: OnResponse, + post_limits: LimitTracker, + batch_limits: LimitTracker, + max_payload_bytes: usize, + max_request_bytes: usize, + queued: Vec<u8>, + batch: BatchState, + last_modified: ServerTimestamp, +} + +pub trait BatchPoster { + /// Note: Last argument (reference to the batch poster) is provided for the purposes of testing + /// Important: Poster should not report non-success HTTP statuses as errors!! + fn post<P, O>( + &self, + body: Vec<u8>, + xius: ServerTimestamp, + batch: Option<String>, + commit: bool, + queue: &PostQueue<P, O>, + ) -> Result<PostResponse>; +} + +// We don't just use a FnMut here since we want to override it in mocking for RefCell<TestType>, +// which we can't do for FnMut since neither FnMut nor RefCell are defined here. Also, this +// is somewhat better for documentation. +pub trait PostResponseHandler { + fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()>; +} + +#[derive(Debug, Clone)] +pub(crate) struct NormalResponseHandler { + pub failed_ids: Vec<Guid>, + pub successful_ids: Vec<Guid>, + pub allow_failed: bool, + pub pending_failed: Vec<Guid>, + pub pending_success: Vec<Guid>, +} + +impl NormalResponseHandler { + pub fn new(allow_failed: bool) -> NormalResponseHandler { + NormalResponseHandler { + failed_ids: vec![], + successful_ids: vec![], + pending_failed: vec![], + pending_success: vec![], + allow_failed, + } + } +} + +impl PostResponseHandler for NormalResponseHandler { + fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> error::Result<()> { + match r { + Sync15ClientResponse::Success { record, .. } => { + if !record.failed.is_empty() && !self.allow_failed { + return Err(ErrorKind::RecordUploadFailed); + } + for id in record.success.iter() { + self.pending_success.push(id.clone()); + } + for kv in record.failed.iter() { + self.pending_failed.push(kv.0.clone()); + } + if !mid_batch { + self.successful_ids.append(&mut self.pending_success); + self.failed_ids.append(&mut self.pending_failed); + } + Ok(()) + } + _ => Err(r.create_storage_error()), + } + } +} + +impl<Poster, OnResponse> PostQueue<Poster, OnResponse> +where + Poster: BatchPoster, + OnResponse: PostResponseHandler, +{ + pub fn new( + config: &InfoConfiguration, + ts: ServerTimestamp, + poster: Poster, + on_response: OnResponse, + ) -> PostQueue<Poster, OnResponse> { + PostQueue { + poster, + on_response, + last_modified: ts, + post_limits: LimitTracker::new(config.max_post_bytes, config.max_post_records), + batch_limits: LimitTracker::new(config.max_total_bytes, config.max_total_records), + batch: BatchState::NoBatch, + max_payload_bytes: config.max_record_payload_bytes, + max_request_bytes: config.max_request_bytes, + queued: Vec::new(), + } + } + + #[inline] + fn in_batch(&self) -> bool { + !matches!(&self.batch, BatchState::Unsupported | BatchState::NoBatch) + } + + pub fn enqueue(&mut self, record: &OutgoingEncryptedBso) -> Result<bool> { + let payload_length = record.serialized_payload_len(); + + if self.post_limits.can_never_add(payload_length) + || self.batch_limits.can_never_add(payload_length) + || payload_length >= self.max_payload_bytes + { + log::warn!( + "Single record too large to submit to server ({} b)", + payload_length + ); + return Ok(false); + } + + // Write directly into `queued` but undo if necessary (the vast majority of the time + // it won't be necessary). If we hit a problem we need to undo that, but the only error + // case we have to worry about right now is in flush() + let item_start = self.queued.len(); + + // This is conservative but can't hurt. + self.queued.reserve(payload_length + 2); + + // Either the first character in an array, or a comma separating + // it from the previous item. + let c = if self.queued.is_empty() { b'[' } else { b',' }; + self.queued.push(c); + + // This unwrap is fine, since serde_json's failure case is HashMaps that have non-object + // keys, which is impossible. If you decide to change this part, you *need* to call + // `self.queued.truncate(item_start)` here in the failure case! + serde_json::to_writer(&mut self.queued, &record).unwrap(); + + let item_end = self.queued.len(); + + debug_assert!( + item_end >= payload_length, + "EncryptedPayload::serialized_len is bugged" + ); + + // The + 1 is only relevant for the final record, which will have a trailing ']'. + let item_len = item_end - item_start + 1; + + if item_len >= self.max_request_bytes { + self.queued.truncate(item_start); + log::warn!( + "Single record too large to submit to server ({} b)", + item_len + ); + return Ok(false); + } + + let can_post_record = self.post_limits.can_add_record(payload_length); + let can_batch_record = self.batch_limits.can_add_record(payload_length); + let can_send_record = self.queued.len() < self.max_request_bytes; + + if !can_post_record || !can_send_record || !can_batch_record { + log::debug!( + "PostQueue flushing! (can_post = {}, can_send = {}, can_batch = {})", + can_post_record, + can_send_record, + can_batch_record + ); + // "unwrite" the record. + self.queued.truncate(item_start); + // Flush whatever we have queued. + self.flush(!can_batch_record)?; + // And write it again. + let c = if self.queued.is_empty() { b'[' } else { b',' }; + self.queued.push(c); + serde_json::to_writer(&mut self.queued, &record).unwrap(); + } + + self.post_limits.record_added(payload_length); + self.batch_limits.record_added(payload_length); + + Ok(true) + } + + pub fn flush(&mut self, want_commit: bool) -> Result<()> { + if self.queued.is_empty() { + assert!( + !self.in_batch(), + "Bug: Somehow we're in a batch but have no queued records" + ); + // Nothing to do! + return Ok(()); + } + + self.queued.push(b']'); + let batch_id = match &self.batch { + // Not the first post and we know we have no batch semantics. + BatchState::Unsupported => None, + // First commit in possible batch + BatchState::NoBatch => Some("true".into()), + // In a batch and we have a batch id. + BatchState::InBatch(ref s) => Some(s.clone()), + }; + + log::info!( + "Posting {} records of {} bytes", + self.post_limits.cur_records, + self.queued.len() + ); + + let is_commit = want_commit && batch_id.is_some(); + // Weird syntax for calling a function object that is a property. + let resp_or_error = self.poster.post( + self.queued.clone(), + self.last_modified, + batch_id, + is_commit, + self, + ); + + self.queued.truncate(0); + + if want_commit || self.batch == BatchState::Unsupported { + self.batch_limits.clear(); + } + self.post_limits.clear(); + + let resp = resp_or_error?; + + let (status, last_modified, record) = match resp { + Sync15ClientResponse::Success { + status, + last_modified, + ref record, + .. + } => (status, last_modified, record), + _ => { + self.on_response.handle_response(resp, !want_commit)?; + // on_response() should always fail! + unreachable!(); + } + }; + + if want_commit || self.batch == BatchState::Unsupported { + self.last_modified = last_modified; + } + + if want_commit { + log::debug!("Committed batch {:?}", self.batch); + self.batch = BatchState::NoBatch; + self.on_response.handle_response(resp, false)?; + return Ok(()); + } + + if status != status_codes::ACCEPTED { + if self.in_batch() { + return Err(ErrorKind::ServerBatchProblem( + "Server responded non-202 success code while a batch was in progress", + )); + } + self.last_modified = last_modified; + self.batch = BatchState::Unsupported; + self.batch_limits.clear(); + self.on_response.handle_response(resp, false)?; + return Ok(()); + } + + let batch_id = record + .batch + .as_ref() + .ok_or({ + ErrorKind::ServerBatchProblem("Invalid server response: 202 without a batch ID") + })? + .clone(); + + match &self.batch { + BatchState::Unsupported => { + log::warn!("Server changed its mind about supporting batching mid-batch..."); + } + + BatchState::InBatch(ref cur_id) => { + if cur_id != &batch_id { + return Err(ErrorKind::ServerBatchProblem( + "Invalid server response: 202 without a batch ID", + )); + } + } + _ => {} + } + + // Can't change this in match arms without NLL + self.batch = BatchState::InBatch(batch_id); + self.last_modified = last_modified; + + self.on_response.handle_response(resp, true)?; + + Ok(()) + } +} + +#[derive(Clone)] +pub struct UploadInfo { + pub successful_ids: Vec<Guid>, + pub failed_ids: Vec<Guid>, + pub modified_timestamp: ServerTimestamp, +} + +impl<Poster> PostQueue<Poster, NormalResponseHandler> { + // TODO: should take by move + pub fn completed_upload_info(&mut self) -> UploadInfo { + let mut result = UploadInfo { + successful_ids: Vec::with_capacity(self.on_response.successful_ids.len()), + failed_ids: Vec::with_capacity( + self.on_response.failed_ids.len() + + self.on_response.pending_failed.len() + + self.on_response.pending_success.len(), + ), + modified_timestamp: self.last_modified, + }; + + result + .successful_ids + .append(&mut self.on_response.successful_ids); + + result.failed_ids.append(&mut self.on_response.failed_ids); + result + .failed_ids + .append(&mut self.on_response.pending_failed); + result + .failed_ids + .append(&mut self.on_response.pending_success); + + result + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::bso::{IncomingEncryptedBso, OutgoingEncryptedBso, OutgoingEnvelope}; + use crate::EncryptedPayload; + use lazy_static::lazy_static; + use std::cell::RefCell; + use std::collections::VecDeque; + use std::rc::Rc; + + #[derive(Debug, Clone)] + struct PostedData { + body: String, + _xius: ServerTimestamp, + batch: Option<String>, + commit: bool, + payload_bytes: usize, + records: usize, + } + + impl PostedData { + fn records_as_json(&self) -> Vec<serde_json::Value> { + let values = + serde_json::from_str::<serde_json::Value>(&self.body).expect("Posted invalid json"); + // Check that they actually deserialize as what we want + let records_or_err = + serde_json::from_value::<Vec<IncomingEncryptedBso>>(values.clone()); + records_or_err.expect("Failed to deserialize data"); + serde_json::from_value(values).unwrap() + } + } + + #[derive(Debug, Clone)] + struct BatchInfo { + id: Option<String>, + posts: Vec<PostedData>, + bytes: usize, + records: usize, + } + + #[derive(Debug, Clone)] + struct TestPoster { + all_posts: Vec<PostedData>, + responses: VecDeque<PostResponse>, + batches: Vec<BatchInfo>, + cur_batch: Option<BatchInfo>, + cfg: InfoConfiguration, + } + + type TestPosterRef = Rc<RefCell<TestPoster>>; + impl TestPoster { + pub fn new<T>(cfg: &InfoConfiguration, responses: T) -> TestPosterRef + where + T: Into<VecDeque<PostResponse>>, + { + Rc::new(RefCell::new(TestPoster { + all_posts: vec![], + responses: responses.into(), + batches: vec![], + cur_batch: None, + cfg: cfg.clone(), + })) + } + // Adds &mut + fn do_post<T, O>( + &mut self, + body: &[u8], + xius: ServerTimestamp, + batch: Option<String>, + commit: bool, + queue: &PostQueue<T, O>, + ) -> Sync15ClientResponse<UploadResult> { + let mut post = PostedData { + body: String::from_utf8(body.into()).expect("Posted invalid utf8..."), + batch: batch.clone(), + _xius: xius, + commit, + payload_bytes: 0, + records: 0, + }; + + assert!(body.len() <= self.cfg.max_request_bytes); + + let (num_records, record_payload_bytes) = { + let recs = post.records_as_json(); + assert!(recs.len() <= self.cfg.max_post_records); + assert!(recs.len() <= self.cfg.max_total_records); + let payload_bytes: usize = recs + .iter() + .map(|r| { + let len = r["payload"] + .as_str() + .expect("Non string payload property") + .len(); + assert!(len <= self.cfg.max_record_payload_bytes); + len + }) + .sum(); + assert!(payload_bytes <= self.cfg.max_post_bytes); + assert!(payload_bytes <= self.cfg.max_total_bytes); + + assert_eq!(queue.post_limits.cur_bytes, payload_bytes); + assert_eq!(queue.post_limits.cur_records, recs.len()); + (recs.len(), payload_bytes) + }; + post.payload_bytes = record_payload_bytes; + post.records = num_records; + + self.all_posts.push(post.clone()); + let response = self.responses.pop_front().unwrap(); + + let record = match response { + Sync15ClientResponse::Success { ref record, .. } => record, + _ => { + panic!("only success codes are used in this test"); + } + }; + + if self.cur_batch.is_none() { + assert!( + batch.is_none() || batch == Some("true".into()), + "We shouldn't be in a batch now" + ); + self.cur_batch = Some(BatchInfo { + id: record.batch.clone(), + posts: vec![], + records: 0, + bytes: 0, + }); + } else { + assert_eq!( + batch, + self.cur_batch.as_ref().unwrap().id, + "We're in a batch but got the wrong batch id" + ); + } + + { + let batch = self.cur_batch.as_mut().unwrap(); + batch.posts.push(post); + batch.records += num_records; + batch.bytes += record_payload_bytes; + + assert!(batch.bytes <= self.cfg.max_total_bytes); + assert!(batch.records <= self.cfg.max_total_records); + + assert_eq!(batch.records, queue.batch_limits.cur_records); + assert_eq!(batch.bytes, queue.batch_limits.cur_bytes); + } + + if commit || record.batch.is_none() { + let batch = self.cur_batch.take().unwrap(); + self.batches.push(batch); + } + + response + } + + fn do_handle_response(&mut self, _: PostResponse, mid_batch: bool) { + assert_eq!(mid_batch, self.cur_batch.is_some()); + } + } + impl BatchPoster for TestPosterRef { + fn post<T, O>( + &self, + body: Vec<u8>, + xius: ServerTimestamp, + batch: Option<String>, + commit: bool, + queue: &PostQueue<T, O>, + ) -> Result<PostResponse> { + Ok(self.borrow_mut().do_post(&body, xius, batch, commit, queue)) + } + } + + impl PostResponseHandler for TestPosterRef { + fn handle_response(&mut self, r: PostResponse, mid_batch: bool) -> Result<()> { + self.borrow_mut().do_handle_response(r, mid_batch); + Ok(()) + } + } + + type MockedPostQueue = PostQueue<TestPosterRef, TestPosterRef>; + + fn pq_test_setup( + cfg: InfoConfiguration, + lm: i64, + resps: Vec<PostResponse>, + ) -> (MockedPostQueue, TestPosterRef) { + let tester = TestPoster::new(&cfg, resps); + let pq = PostQueue::new(&cfg, ServerTimestamp(lm), tester.clone(), tester.clone()); + (pq, tester) + } + + fn fake_response<'a, T: Into<Option<&'a str>>>(status: u16, lm: i64, batch: T) -> PostResponse { + assert!(status_codes::is_success_code(status)); + Sync15ClientResponse::Success { + status, + last_modified: ServerTimestamp(lm), + record: UploadResult { + batch: batch.into().map(Into::into), + failed: HashMap::new(), + success: vec![], + }, + route: "test/path".into(), + } + } + + lazy_static! { + // ~40b + static ref PAYLOAD_OVERHEAD: usize = { + let payload = EncryptedPayload { + iv: "".into(), + hmac: "".into(), + ciphertext: "".into() + }; + serde_json::to_string(&payload).unwrap().len() + }; + // ~80b + static ref TOTAL_RECORD_OVERHEAD: usize = { + let val = serde_json::to_value(OutgoingEncryptedBso::new(OutgoingEnvelope { + id: "".into(), + sortindex: None, + ttl: None, + }, + EncryptedPayload { + iv: "".into(), + hmac: "".into(), + ciphertext: "".into() + }, + )).unwrap(); + serde_json::to_string(&val).unwrap().len() + }; + // There's some subtlety in how we calulate this having to do with the fact that + // the quotes in the payload are escaped but the escape chars count to the request len + // and *not* to the payload len (the payload len check happens after json parsing the + // top level object). + static ref NON_PAYLOAD_OVERHEAD: usize = { + *TOTAL_RECORD_OVERHEAD - *PAYLOAD_OVERHEAD + }; + } + + // Actual record size (for max_request_len) will be larger by some amount + fn make_record(payload_size: usize) -> OutgoingEncryptedBso { + assert!(payload_size > *PAYLOAD_OVERHEAD); + let ciphertext_len = payload_size - *PAYLOAD_OVERHEAD; + OutgoingEncryptedBso::new( + OutgoingEnvelope { + id: "".into(), + sortindex: None, + ttl: None, + }, + EncryptedPayload { + iv: "".into(), + hmac: "".into(), + ciphertext: "x".repeat(ciphertext_len), + }, + ) + } + + fn request_bytes_for_payloads(payloads: &[usize]) -> usize { + 1 + payloads + .iter() + .map(|&size| size + 1 + *NON_PAYLOAD_OVERHEAD) + .sum::<usize>() + } + + #[test] + fn test_pq_basic() { + let cfg = InfoConfiguration { + max_request_bytes: 1000, + max_record_payload_bytes: 1000, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![fake_response(status_codes::OK, time + 100_000, None)], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.flush(true).unwrap(); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 1); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].posts.len(), 1); + assert_eq!(t.batches[0].records, 1); + assert_eq!(t.batches[0].bytes, 100); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + #[test] + fn test_pq_max_request_bytes_no_batch() { + let cfg = InfoConfiguration { + max_request_bytes: 250, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::OK, time + 100_000, None), + fake_response(status_codes::OK, time + 200_000, None), + ], + ); + + // Note that the total record overhead is around 85 bytes + let payload_size = 100 - *NON_PAYLOAD_OVERHEAD; + pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r] + pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 203; [r,r] + pq.enqueue(&make_record(payload_size)).unwrap(); // too big, 2nd post. + pq.flush(true).unwrap(); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 2); + assert_eq!(t.batches.len(), 2); + assert_eq!(t.batches[0].posts.len(), 1); + assert_eq!(t.batches[0].records, 2); + assert_eq!(t.batches[0].bytes, payload_size * 2); + assert_eq!(t.batches[0].posts[0].batch, Some("true".into())); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[payload_size, payload_size]) + ); + + assert_eq!(t.batches[1].posts.len(), 1); + assert_eq!(t.batches[1].records, 1); + assert_eq!(t.batches[1].bytes, payload_size); + // We know at this point that the server does not support batching. + assert_eq!(t.batches[1].posts[0].batch, None); + assert!(!t.batches[1].posts[0].commit); + assert_eq!( + t.batches[1].posts[0].body.len(), + request_bytes_for_payloads(&[payload_size]) + ); + } + + #[test] + fn test_pq_max_record_payload_bytes_no_batch() { + let cfg = InfoConfiguration { + max_record_payload_bytes: 150, + max_request_bytes: 350, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::OK, time + 100_000, None), + fake_response(status_codes::OK, time + 200_000, None), + ], + ); + + // Note that the total record overhead is around 85 bytes + let payload_size = 100 - *NON_PAYLOAD_OVERHEAD; + pq.enqueue(&make_record(payload_size)).unwrap(); // total size == 102; [r] + let enqueued = pq.enqueue(&make_record(151)).unwrap(); // still 102 + assert!(!enqueued, "Should not have fit"); + pq.enqueue(&make_record(payload_size)).unwrap(); + pq.flush(true).unwrap(); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 1); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].posts.len(), 1); + assert_eq!(t.batches[0].records, 2); + assert_eq!(t.batches[0].bytes, payload_size * 2); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[payload_size, payload_size]) + ); + } + + #[test] + fn test_pq_single_batch() { + let cfg = InfoConfiguration::default(); + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![fake_response( + status_codes::ACCEPTED, + time + 100_000, + Some("1234"), + )], + ); + + let payload_size = 100 - *NON_PAYLOAD_OVERHEAD; + pq.enqueue(&make_record(payload_size)).unwrap(); + pq.enqueue(&make_record(payload_size)).unwrap(); + pq.enqueue(&make_record(payload_size)).unwrap(); + pq.flush(true).unwrap(); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 1); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].id.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts.len(), 1); + assert_eq!(t.batches[0].records, 3); + assert_eq!(t.batches[0].bytes, payload_size * 3); + assert!(t.batches[0].posts[0].commit); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[payload_size, payload_size, payload_size]) + ); + } + + #[test] + fn test_pq_multi_post_batch_bytes() { + let cfg = InfoConfiguration { + max_post_bytes: 200, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), + ], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.flush(true).unwrap(); // COMMIT + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 2); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].posts.len(), 2); + assert_eq!(t.batches[0].records, 3); + assert_eq!(t.batches[0].bytes, 300); + + assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[0].posts[0].records, 2); + assert_eq!(t.batches[0].posts[0].payload_bytes, 200); + assert!(!t.batches[0].posts[0].commit); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100]) + ); + + assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[1].records, 1); + assert_eq!(t.batches[0].posts[1].payload_bytes, 100); + assert!(t.batches[0].posts[1].commit); + assert_eq!( + t.batches[0].posts[1].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + #[test] + fn test_pq_multi_post_batch_records() { + let cfg = InfoConfiguration { + max_post_records: 3, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), + ], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.flush(true).unwrap(); // COMMIT + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 3); + assert_eq!(t.batches.len(), 1); + assert_eq!(t.batches[0].posts.len(), 3); + assert_eq!(t.batches[0].records, 7); + assert_eq!(t.batches[0].bytes, 700); + + assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[0].posts[0].records, 3); + assert_eq!(t.batches[0].posts[0].payload_bytes, 300); + assert!(!t.batches[0].posts[0].commit); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[1].records, 3); + assert_eq!(t.batches[0].posts[1].payload_bytes, 300); + assert!(!t.batches[0].posts[1].commit); + assert_eq!( + t.batches[0].posts[1].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[0].posts[2].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[2].records, 1); + assert_eq!(t.batches[0].posts[2].payload_bytes, 100); + assert!(t.batches[0].posts[2].commit); + assert_eq!( + t.batches[0].posts[2].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + #[test] + #[allow(clippy::cognitive_complexity)] + fn test_pq_multi_post_multi_batch_records() { + let cfg = InfoConfiguration { + max_post_records: 3, + max_total_records: 5, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")), + fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")), + ], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + COMMIT + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.flush(true).unwrap(); // COMMIT + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 4); + assert_eq!(t.batches.len(), 2); + assert_eq!(t.batches[0].posts.len(), 2); + assert_eq!(t.batches[1].posts.len(), 2); + + assert_eq!(t.batches[0].records, 5); + assert_eq!(t.batches[1].records, 4); + + assert_eq!(t.batches[0].bytes, 500); + assert_eq!(t.batches[1].bytes, 400); + + assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[0].posts[0].records, 3); + assert_eq!(t.batches[0].posts[0].payload_bytes, 300); + assert!(!t.batches[0].posts[0].commit); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[1].records, 2); + assert_eq!(t.batches[0].posts[1].payload_bytes, 200); + assert!(t.batches[0].posts[1].commit); + assert_eq!( + t.batches[0].posts[1].body.len(), + request_bytes_for_payloads(&[100, 100]) + ); + + assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[1].posts[0].records, 3); + assert_eq!(t.batches[1].posts[0].payload_bytes, 300); + assert!(!t.batches[1].posts[0].commit); + assert_eq!( + t.batches[1].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd"); + assert_eq!(t.batches[1].posts[1].records, 1); + assert_eq!(t.batches[1].posts[1].payload_bytes, 100); + assert!(t.batches[1].posts[1].commit); + assert_eq!( + t.batches[1].posts[1].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + #[test] + #[allow(clippy::cognitive_complexity)] + fn test_pq_multi_post_multi_batch_bytes() { + let cfg = InfoConfiguration { + max_post_bytes: 300, + max_total_bytes: 500, + ..InfoConfiguration::default() + }; + let time = 11_111_111_000; + let (mut pq, tester) = pq_test_setup( + cfg, + time, + vec![ + fake_response(status_codes::ACCEPTED, time, Some("1234")), + fake_response(status_codes::ACCEPTED, time + 100_000, Some("1234")), // should commit + fake_response(status_codes::ACCEPTED, time + 100_000, Some("abcd")), + fake_response(status_codes::ACCEPTED, time + 200_000, Some("abcd")), // should commit + ], + ); + + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + assert_eq!(pq.last_modified.0, time); + // POST + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + // POST + COMMIT + pq.enqueue(&make_record(100)).unwrap(); + assert_eq!(pq.last_modified.0, time + 100_000); + pq.enqueue(&make_record(100)).unwrap(); + pq.enqueue(&make_record(100)).unwrap(); + + // POST + pq.enqueue(&make_record(100)).unwrap(); + assert_eq!(pq.last_modified.0, time + 100_000); + pq.flush(true).unwrap(); // COMMIT + + assert_eq!(pq.last_modified.0, time + 200_000); + + let t = tester.borrow(); + assert!(t.cur_batch.is_none()); + assert_eq!(t.all_posts.len(), 4); + assert_eq!(t.batches.len(), 2); + assert_eq!(t.batches[0].posts.len(), 2); + assert_eq!(t.batches[1].posts.len(), 2); + + assert_eq!(t.batches[0].records, 5); + assert_eq!(t.batches[1].records, 4); + + assert_eq!(t.batches[0].bytes, 500); + assert_eq!(t.batches[1].bytes, 400); + + assert_eq!(t.batches[0].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[0].posts[0].records, 3); + assert_eq!(t.batches[0].posts[0].payload_bytes, 300); + assert!(!t.batches[0].posts[0].commit); + assert_eq!( + t.batches[0].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[0].posts[1].batch.as_ref().unwrap(), "1234"); + assert_eq!(t.batches[0].posts[1].records, 2); + assert_eq!(t.batches[0].posts[1].payload_bytes, 200); + assert!(t.batches[0].posts[1].commit); + assert_eq!( + t.batches[0].posts[1].body.len(), + request_bytes_for_payloads(&[100, 100]) + ); + + assert_eq!(t.batches[1].posts[0].batch.as_ref().unwrap(), "true"); + assert_eq!(t.batches[1].posts[0].records, 3); + assert_eq!(t.batches[1].posts[0].payload_bytes, 300); + assert!(!t.batches[1].posts[0].commit); + assert_eq!( + t.batches[1].posts[0].body.len(), + request_bytes_for_payloads(&[100, 100, 100]) + ); + + assert_eq!(t.batches[1].posts[1].batch.as_ref().unwrap(), "abcd"); + assert_eq!(t.batches[1].posts[1].records, 1); + assert_eq!(t.batches[1].posts[1].payload_bytes, 100); + assert!(t.batches[1].posts[1].commit); + assert_eq!( + t.batches[1].posts[1].body.len(), + request_bytes_for_payloads(&[100]) + ); + } + + // TODO: Test + // + // - error cases!!! We don't test our handling of server errors at all! + // - mixed bytes/record limits + // + // A lot of these have good examples in test_postqueue.js on deskftop sync +} |