diff options
Diffstat (limited to 'third_party/rust/glean-core/src/upload/mod.rs')
-rw-r--r-- | third_party/rust/glean-core/src/upload/mod.rs | 343 |
1 files changed, 278 insertions, 65 deletions
diff --git a/third_party/rust/glean-core/src/upload/mod.rs b/third_party/rust/glean-core/src/upload/mod.rs index d764dcd29e..e51a9d9508 100644 --- a/third_party/rust/glean-core/src/upload/mod.rs +++ b/third_party/rust/glean-core/src/upload/mod.rs @@ -30,6 +30,7 @@ use directory::{PingDirectoryManager, PingPayloadsByDirectory}; use policy::Policy; use request::create_date_header_value; +pub use directory::{PingMetadata, PingPayload}; pub use request::{HeaderMap, PingRequest}; pub use result::{UploadResult, UploadTaskAction}; @@ -322,21 +323,24 @@ impl PingUploadManager { /// /// Returns the `PingRequest` or `None` if unable to build, /// in which case it will delete the ping file and record an error. - fn build_ping_request( - &self, - glean: &Glean, - document_id: &str, - path: &str, - body: &str, - headers: Option<HeaderMap>, - ) -> Option<PingRequest> { + fn build_ping_request(&self, glean: &Glean, ping: PingPayload) -> Option<PingRequest> { + let PingPayload { + document_id, + upload_path: path, + json_body: body, + headers, + body_has_info_sections, + ping_name, + } = ping; let mut request = PingRequest::builder( &self.language_binding_name, self.policy.max_ping_body_size(), ) - .document_id(document_id) + .document_id(&document_id) .path(path) - .body(body); + .body(body) + .body_has_info_sections(body_has_info_sections) + .ping_name(ping_name); if let Some(headers) = headers { request = request.headers(headers); @@ -346,7 +350,7 @@ impl PingUploadManager { Ok(request) => Some(request), Err(e) => { log::warn!("Error trying to build ping request: {}", e); - self.directory_manager.delete_file(document_id); + self.directory_manager.delete_file(&document_id); // Record the error. // Currently the only possible error is PingBodyOverflow. @@ -362,23 +366,21 @@ impl PingUploadManager { } /// Enqueue a ping for upload. - pub fn enqueue_ping( - &self, - glean: &Glean, - document_id: &str, - path: &str, - body: &str, - headers: Option<HeaderMap>, - ) { + pub fn enqueue_ping(&self, glean: &Glean, ping: PingPayload) { let mut queue = self .queue .write() .expect("Can't write to pending pings queue."); + let PingPayload { + ref document_id, + upload_path: ref path, + .. + } = ping; // Checks if a ping with this `document_id` is already enqueued. if queue .iter() - .any(|request| request.document_id == document_id) + .any(|request| request.document_id.as_str() == document_id) { log::warn!( "Attempted to enqueue a duplicate ping {} at {}.", @@ -404,7 +406,7 @@ impl PingUploadManager { } log::trace!("Enqueuing ping {} at {}", document_id, path); - if let Some(request) = self.build_ping_request(glean, document_id, path, body, headers) { + if let Some(request) = self.build_ping_request(glean, ping) { queue.push_back(request) } } @@ -455,7 +457,7 @@ impl PingUploadManager { // Thus, we reverse the order of the pending pings vector, // so that we iterate in descending order (newest -> oldest). cached_pings.pending_pings.reverse(); - cached_pings.pending_pings.retain(|(file_size, (document_id, _, _, _))| { + cached_pings.pending_pings.retain(|(file_size, PingPayload {document_id, ..})| { pending_pings_count += 1; pending_pings_directory_size += file_size; @@ -493,14 +495,14 @@ impl PingUploadManager { // Enqueue the remaining pending pings and // enqueue all deletion-request pings. - let deletion_request_pings = cached_pings.deletion_request_pings.drain(..); - for (_, (document_id, path, body, headers)) in deletion_request_pings { - self.enqueue_ping(glean, &document_id, &path, &body, headers); - } - let pending_pings = cached_pings.pending_pings.drain(..); - for (_, (document_id, path, body, headers)) in pending_pings { - self.enqueue_ping(glean, &document_id, &path, &body, headers); - } + cached_pings + .deletion_request_pings + .drain(..) + .for_each(|(_, ping)| self.enqueue_ping(glean, ping)); + cached_pings + .pending_pings + .drain(..) + .for_each(|(_, ping)| self.enqueue_ping(glean, ping)); } } @@ -532,10 +534,8 @@ impl PingUploadManager { /// * `glean` - The Glean object holding the database. /// * `document_id` - The UUID of the ping in question. pub fn enqueue_ping_from_file(&self, glean: &Glean, document_id: &str) { - if let Some((doc_id, path, body, headers)) = - self.directory_manager.process_file(document_id) - { - self.enqueue_ping(glean, &doc_id, &path, &body, headers) + if let Some(ping) = self.directory_manager.process_file(document_id) { + self.enqueue_ping(glean, ping); } } @@ -883,7 +883,17 @@ mod test { let upload_manager = PingUploadManager::no_policy(dir.path()); // Enqueue a ping - upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None); + upload_manager.enqueue_ping( + &glean, + PingPayload { + document_id: Uuid::new_v4().to_string(), + upload_path: PATH.into(), + json_body: "".into(), + headers: None, + body_has_info_sections: true, + ping_name: "ping-name".into(), + }, + ); // Try and get the next request. // Verify request was returned @@ -900,7 +910,17 @@ mod test { // Enqueue a ping multiple times let n = 10; for _ in 0..n { - upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None); + upload_manager.enqueue_ping( + &glean, + PingPayload { + document_id: Uuid::new_v4().to_string(), + upload_path: PATH.into(), + json_body: "".into(), + headers: None, + body_has_info_sections: true, + ping_name: "ping-name".into(), + }, + ); } // Verify a request is returned for each submitted ping @@ -928,7 +948,17 @@ mod test { // Enqueue the max number of pings allowed per uploading window for _ in 0..max_pings_per_interval { - upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None); + upload_manager.enqueue_ping( + &glean, + PingPayload { + document_id: Uuid::new_v4().to_string(), + upload_path: PATH.into(), + json_body: "".into(), + headers: None, + body_has_info_sections: true, + ping_name: "ping-name".into(), + }, + ); } // Verify a request is returned for each submitted ping @@ -938,7 +968,17 @@ mod test { } // Enqueue just one more ping - upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None); + upload_manager.enqueue_ping( + &glean, + PingPayload { + document_id: Uuid::new_v4().to_string(), + upload_path: PATH.into(), + json_body: "".into(), + headers: None, + body_has_info_sections: true, + ping_name: "ping-name".into(), + }, + ); // Verify that we are indeed told to wait because we are at capacity match upload_manager.get_upload_task(&glean, false) { @@ -961,7 +1001,17 @@ mod test { // Enqueue a ping multiple times for _ in 0..10 { - upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None); + upload_manager.enqueue_ping( + &glean, + PingPayload { + document_id: Uuid::new_v4().to_string(), + upload_path: PATH.into(), + json_body: "".into(), + headers: None, + body_has_info_sections: true, + ping_name: "ping-name".into(), + }, + ); } // Clear the queue @@ -979,7 +1029,14 @@ mod test { let (mut glean, _t) = new_glean(None); // Register a ping for testing - let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]); + let ping_type = PingType::new( + "test", + true, + /* send_if_empty */ true, + true, + true, + vec![], + ); glean.register_ping_type(&ping_type); // Submit the ping multiple times @@ -1011,7 +1068,14 @@ mod test { let (mut glean, dir) = new_glean(None); // Register a ping for testing - let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]); + let ping_type = PingType::new( + "test", + true, + /* send_if_empty */ true, + true, + true, + vec![], + ); glean.register_ping_type(&ping_type); // Submit the ping multiple times @@ -1041,7 +1105,14 @@ mod test { let (mut glean, dir) = new_glean(None); // Register a ping for testing - let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]); + let ping_type = PingType::new( + "test", + true, + /* send_if_empty */ true, + true, + true, + vec![], + ); glean.register_ping_type(&ping_type); // Submit a ping @@ -1071,7 +1142,14 @@ mod test { let (mut glean, dir) = new_glean(None); // Register a ping for testing - let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]); + let ping_type = PingType::new( + "test", + true, + /* send_if_empty */ true, + true, + true, + vec![], + ); glean.register_ping_type(&ping_type); // Submit a ping @@ -1101,7 +1179,14 @@ mod test { let (mut glean, _t) = new_glean(None); // Register a ping for testing - let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]); + let ping_type = PingType::new( + "test", + true, + /* send_if_empty */ true, + true, + true, + vec![], + ); glean.register_ping_type(&ping_type); // Submit a ping @@ -1133,7 +1218,14 @@ mod test { let (mut glean, dir) = new_glean(None); // Register a ping for testing - let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]); + let ping_type = PingType::new( + "test", + true, + /* send_if_empty */ true, + true, + true, + vec![], + ); glean.register_ping_type(&ping_type); // Submit a ping @@ -1174,7 +1266,17 @@ mod test { let path2 = format!("/submit/app_id/test-ping/1/{}", doc2); // Enqueue a ping - upload_manager.enqueue_ping(&glean, &doc1, &path1, "", None); + upload_manager.enqueue_ping( + &glean, + PingPayload { + document_id: doc1.clone(), + upload_path: path1, + json_body: "".into(), + headers: None, + body_has_info_sections: true, + ping_name: "test-ping".into(), + }, + ); // Try and get the first request. let req = match upload_manager.get_upload_task(&glean, false) { @@ -1184,7 +1286,17 @@ mod test { assert_eq!(doc1, req.document_id); // Schedule the next one while the first one is "in progress" - upload_manager.enqueue_ping(&glean, &doc2, &path2, "", None); + upload_manager.enqueue_ping( + &glean, + PingPayload { + document_id: doc2.clone(), + upload_path: path2, + json_body: "".into(), + headers: None, + body_has_info_sections: true, + ping_name: "test-ping".into(), + }, + ); // Mark as processed upload_manager.process_ping_upload_response( @@ -1221,7 +1333,14 @@ mod test { glean.set_debug_view_tag("valid-tag"); // Register a ping for testing - let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]); + let ping_type = PingType::new( + "test", + true, + /* send_if_empty */ true, + true, + true, + vec![], + ); glean.register_ping_type(&ping_type); // Submit a ping @@ -1248,8 +1367,28 @@ mod test { let path = format!("/submit/app_id/test-ping/1/{}", doc_id); // Try to enqueue a ping with the same doc_id twice - upload_manager.enqueue_ping(&glean, &doc_id, &path, "", None); - upload_manager.enqueue_ping(&glean, &doc_id, &path, "", None); + upload_manager.enqueue_ping( + &glean, + PingPayload { + document_id: doc_id.clone(), + upload_path: path.clone(), + json_body: "".into(), + headers: None, + body_has_info_sections: true, + ping_name: "test-ping".into(), + }, + ); + upload_manager.enqueue_ping( + &glean, + PingPayload { + document_id: doc_id, + upload_path: path, + json_body: "".into(), + headers: None, + body_has_info_sections: true, + ping_name: "test-ping".into(), + }, + ); // Get a task once let task = upload_manager.get_upload_task(&glean, false); @@ -1267,7 +1406,14 @@ mod test { let (mut glean, dir) = new_glean(None); // Register a ping for testing - let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]); + let ping_type = PingType::new( + "test", + true, + /* send_if_empty */ true, + true, + true, + vec![], + ); glean.register_ping_type(&ping_type); // Submit the ping multiple times @@ -1317,7 +1463,14 @@ mod test { let (mut glean, dir) = new_glean(None); // Register a ping for testing - let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]); + let ping_type = PingType::new( + "test", + true, + /* send_if_empty */ true, + true, + true, + vec![], + ); glean.register_ping_type(&ping_type); // Submit the ping multiple times @@ -1331,7 +1484,10 @@ mod test { // The pending pings array is sorted by date in ascending order, // the newest element is the last one. let (_, newest_ping) = &pending_pings.last().unwrap(); - let (newest_ping_id, _, _, _) = &newest_ping; + let PingPayload { + document_id: newest_ping_id, + .. + } = &newest_ping; // Create a new upload manager pointing to the same data_path as the glean instance. let mut upload_manager = PingUploadManager::no_policy(dir.path()); @@ -1385,7 +1541,14 @@ mod test { let (mut glean, dir) = new_glean(None); // Register a ping for testing - let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]); + let ping_type = PingType::new( + "test", + true, + /* send_if_empty */ true, + true, + true, + vec![], + ); glean.register_ping_type(&ping_type); // How many pings we allow at maximum @@ -1406,7 +1569,7 @@ mod test { .iter() .rev() .take(count_quota) - .map(|(_, ping)| ping.0.clone()) + .map(|(_, ping)| ping.document_id.clone()) .collect::<Vec<_>>(); // Create a new upload manager pointing to the same data_path as the glean instance. @@ -1457,7 +1620,14 @@ mod test { let (mut glean, dir) = new_glean(None); // Register a ping for testing - let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]); + let ping_type = PingType::new( + "test", + true, + /* send_if_empty */ true, + true, + true, + vec![], + ); glean.register_ping_type(&ping_type); let expected_number_of_pings = 3; @@ -1477,7 +1647,7 @@ mod test { .iter() .rev() .take(expected_number_of_pings) - .map(|(_, ping)| ping.0.clone()) + .map(|(_, ping)| ping.document_id.clone()) .collect::<Vec<_>>(); // Create a new upload manager pointing to the same data_path as the glean instance. @@ -1531,7 +1701,14 @@ mod test { let (mut glean, dir) = new_glean(None); // Register a ping for testing - let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]); + let ping_type = PingType::new( + "test", + true, + /* send_if_empty */ true, + true, + true, + vec![], + ); glean.register_ping_type(&ping_type); let expected_number_of_pings = 2; @@ -1551,7 +1728,7 @@ mod test { .iter() .rev() .take(expected_number_of_pings) - .map(|(_, ping)| ping.0.clone()) + .map(|(_, ping)| ping.document_id.clone()) .collect::<Vec<_>>(); // Create a new upload manager pointing to the same data_path as the glean instance. @@ -1622,8 +1799,28 @@ mod test { upload_manager.set_rate_limiter(secs_per_interval, max_pings_per_interval); // Enqueue two pings - upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None); - upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None); + upload_manager.enqueue_ping( + &glean, + PingPayload { + document_id: Uuid::new_v4().to_string(), + upload_path: PATH.into(), + json_body: "".into(), + headers: None, + body_has_info_sections: true, + ping_name: "ping-name".into(), + }, + ); + upload_manager.enqueue_ping( + &glean, + PingPayload { + document_id: Uuid::new_v4().to_string(), + upload_path: PATH.into(), + json_body: "".into(), + headers: None, + body_has_info_sections: true, + ping_name: "ping-name".into(), + }, + ); // Get the first ping, it should be returned normally. match upload_manager.get_upload_task(&glean, false) { @@ -1679,12 +1876,28 @@ mod test { let upload_manager = PingUploadManager::no_policy(dir.path()); // Enqueue a ping and start processing it - let identifier = &Uuid::new_v4().to_string(); - upload_manager.enqueue_ping(&glean, identifier, PATH, "", None); + let identifier = &Uuid::new_v4(); + let ping = PingPayload { + document_id: identifier.to_string(), + upload_path: PATH.into(), + json_body: "".into(), + headers: None, + body_has_info_sections: true, + ping_name: "ping-name".into(), + }; + upload_manager.enqueue_ping(&glean, ping); assert!(upload_manager.get_upload_task(&glean, false).is_upload()); // Attempt to re-enqueue the same ping - upload_manager.enqueue_ping(&glean, identifier, PATH, "", None); + let ping = PingPayload { + document_id: identifier.to_string(), + upload_path: PATH.into(), + json_body: "".into(), + headers: None, + body_has_info_sections: true, + ping_name: "ping-name".into(), + }; + upload_manager.enqueue_ping(&glean, ping); // No new pings should have been enqueued so the upload task is Done. assert_eq!( @@ -1695,7 +1908,7 @@ mod test { // Process the upload response upload_manager.process_ping_upload_response( &glean, - identifier, + &identifier.to_string(), UploadResult::http_status(200), ); } |