summaryrefslogtreecommitdiffstats
path: root/third_party/rust/glean-core/src/upload/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/glean-core/src/upload/mod.rs')
-rw-r--r--third_party/rust/glean-core/src/upload/mod.rs343
1 files changed, 278 insertions, 65 deletions
diff --git a/third_party/rust/glean-core/src/upload/mod.rs b/third_party/rust/glean-core/src/upload/mod.rs
index d764dcd29e..e51a9d9508 100644
--- a/third_party/rust/glean-core/src/upload/mod.rs
+++ b/third_party/rust/glean-core/src/upload/mod.rs
@@ -30,6 +30,7 @@ use directory::{PingDirectoryManager, PingPayloadsByDirectory};
use policy::Policy;
use request::create_date_header_value;
+pub use directory::{PingMetadata, PingPayload};
pub use request::{HeaderMap, PingRequest};
pub use result::{UploadResult, UploadTaskAction};
@@ -322,21 +323,24 @@ impl PingUploadManager {
///
/// Returns the `PingRequest` or `None` if unable to build,
/// in which case it will delete the ping file and record an error.
- fn build_ping_request(
- &self,
- glean: &Glean,
- document_id: &str,
- path: &str,
- body: &str,
- headers: Option<HeaderMap>,
- ) -> Option<PingRequest> {
+ fn build_ping_request(&self, glean: &Glean, ping: PingPayload) -> Option<PingRequest> {
+ let PingPayload {
+ document_id,
+ upload_path: path,
+ json_body: body,
+ headers,
+ body_has_info_sections,
+ ping_name,
+ } = ping;
let mut request = PingRequest::builder(
&self.language_binding_name,
self.policy.max_ping_body_size(),
)
- .document_id(document_id)
+ .document_id(&document_id)
.path(path)
- .body(body);
+ .body(body)
+ .body_has_info_sections(body_has_info_sections)
+ .ping_name(ping_name);
if let Some(headers) = headers {
request = request.headers(headers);
@@ -346,7 +350,7 @@ impl PingUploadManager {
Ok(request) => Some(request),
Err(e) => {
log::warn!("Error trying to build ping request: {}", e);
- self.directory_manager.delete_file(document_id);
+ self.directory_manager.delete_file(&document_id);
// Record the error.
// Currently the only possible error is PingBodyOverflow.
@@ -362,23 +366,21 @@ impl PingUploadManager {
}
/// Enqueue a ping for upload.
- pub fn enqueue_ping(
- &self,
- glean: &Glean,
- document_id: &str,
- path: &str,
- body: &str,
- headers: Option<HeaderMap>,
- ) {
+ pub fn enqueue_ping(&self, glean: &Glean, ping: PingPayload) {
let mut queue = self
.queue
.write()
.expect("Can't write to pending pings queue.");
+ let PingPayload {
+ ref document_id,
+ upload_path: ref path,
+ ..
+ } = ping;
// Checks if a ping with this `document_id` is already enqueued.
if queue
.iter()
- .any(|request| request.document_id == document_id)
+ .any(|request| request.document_id.as_str() == document_id)
{
log::warn!(
"Attempted to enqueue a duplicate ping {} at {}.",
@@ -404,7 +406,7 @@ impl PingUploadManager {
}
log::trace!("Enqueuing ping {} at {}", document_id, path);
- if let Some(request) = self.build_ping_request(glean, document_id, path, body, headers) {
+ if let Some(request) = self.build_ping_request(glean, ping) {
queue.push_back(request)
}
}
@@ -455,7 +457,7 @@ impl PingUploadManager {
// Thus, we reverse the order of the pending pings vector,
// so that we iterate in descending order (newest -> oldest).
cached_pings.pending_pings.reverse();
- cached_pings.pending_pings.retain(|(file_size, (document_id, _, _, _))| {
+ cached_pings.pending_pings.retain(|(file_size, PingPayload {document_id, ..})| {
pending_pings_count += 1;
pending_pings_directory_size += file_size;
@@ -493,14 +495,14 @@ impl PingUploadManager {
// Enqueue the remaining pending pings and
// enqueue all deletion-request pings.
- let deletion_request_pings = cached_pings.deletion_request_pings.drain(..);
- for (_, (document_id, path, body, headers)) in deletion_request_pings {
- self.enqueue_ping(glean, &document_id, &path, &body, headers);
- }
- let pending_pings = cached_pings.pending_pings.drain(..);
- for (_, (document_id, path, body, headers)) in pending_pings {
- self.enqueue_ping(glean, &document_id, &path, &body, headers);
- }
+ cached_pings
+ .deletion_request_pings
+ .drain(..)
+ .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
+ cached_pings
+ .pending_pings
+ .drain(..)
+ .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
}
}
@@ -532,10 +534,8 @@ impl PingUploadManager {
/// * `glean` - The Glean object holding the database.
/// * `document_id` - The UUID of the ping in question.
pub fn enqueue_ping_from_file(&self, glean: &Glean, document_id: &str) {
- if let Some((doc_id, path, body, headers)) =
- self.directory_manager.process_file(document_id)
- {
- self.enqueue_ping(glean, &doc_id, &path, &body, headers)
+ if let Some(ping) = self.directory_manager.process_file(document_id) {
+ self.enqueue_ping(glean, ping);
}
}
@@ -883,7 +883,17 @@ mod test {
let upload_manager = PingUploadManager::no_policy(dir.path());
// Enqueue a ping
- upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
+ upload_manager.enqueue_ping(
+ &glean,
+ PingPayload {
+ document_id: Uuid::new_v4().to_string(),
+ upload_path: PATH.into(),
+ json_body: "".into(),
+ headers: None,
+ body_has_info_sections: true,
+ ping_name: "ping-name".into(),
+ },
+ );
// Try and get the next request.
// Verify request was returned
@@ -900,7 +910,17 @@ mod test {
// Enqueue a ping multiple times
let n = 10;
for _ in 0..n {
- upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
+ upload_manager.enqueue_ping(
+ &glean,
+ PingPayload {
+ document_id: Uuid::new_v4().to_string(),
+ upload_path: PATH.into(),
+ json_body: "".into(),
+ headers: None,
+ body_has_info_sections: true,
+ ping_name: "ping-name".into(),
+ },
+ );
}
// Verify a request is returned for each submitted ping
@@ -928,7 +948,17 @@ mod test {
// Enqueue the max number of pings allowed per uploading window
for _ in 0..max_pings_per_interval {
- upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
+ upload_manager.enqueue_ping(
+ &glean,
+ PingPayload {
+ document_id: Uuid::new_v4().to_string(),
+ upload_path: PATH.into(),
+ json_body: "".into(),
+ headers: None,
+ body_has_info_sections: true,
+ ping_name: "ping-name".into(),
+ },
+ );
}
// Verify a request is returned for each submitted ping
@@ -938,7 +968,17 @@ mod test {
}
// Enqueue just one more ping
- upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
+ upload_manager.enqueue_ping(
+ &glean,
+ PingPayload {
+ document_id: Uuid::new_v4().to_string(),
+ upload_path: PATH.into(),
+ json_body: "".into(),
+ headers: None,
+ body_has_info_sections: true,
+ ping_name: "ping-name".into(),
+ },
+ );
// Verify that we are indeed told to wait because we are at capacity
match upload_manager.get_upload_task(&glean, false) {
@@ -961,7 +1001,17 @@ mod test {
// Enqueue a ping multiple times
for _ in 0..10 {
- upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
+ upload_manager.enqueue_ping(
+ &glean,
+ PingPayload {
+ document_id: Uuid::new_v4().to_string(),
+ upload_path: PATH.into(),
+ json_body: "".into(),
+ headers: None,
+ body_has_info_sections: true,
+ ping_name: "ping-name".into(),
+ },
+ );
}
// Clear the queue
@@ -979,7 +1029,14 @@ mod test {
let (mut glean, _t) = new_glean(None);
// Register a ping for testing
- let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]);
+ let ping_type = PingType::new(
+ "test",
+ true,
+ /* send_if_empty */ true,
+ true,
+ true,
+ vec![],
+ );
glean.register_ping_type(&ping_type);
// Submit the ping multiple times
@@ -1011,7 +1068,14 @@ mod test {
let (mut glean, dir) = new_glean(None);
// Register a ping for testing
- let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]);
+ let ping_type = PingType::new(
+ "test",
+ true,
+ /* send_if_empty */ true,
+ true,
+ true,
+ vec![],
+ );
glean.register_ping_type(&ping_type);
// Submit the ping multiple times
@@ -1041,7 +1105,14 @@ mod test {
let (mut glean, dir) = new_glean(None);
// Register a ping for testing
- let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]);
+ let ping_type = PingType::new(
+ "test",
+ true,
+ /* send_if_empty */ true,
+ true,
+ true,
+ vec![],
+ );
glean.register_ping_type(&ping_type);
// Submit a ping
@@ -1071,7 +1142,14 @@ mod test {
let (mut glean, dir) = new_glean(None);
// Register a ping for testing
- let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]);
+ let ping_type = PingType::new(
+ "test",
+ true,
+ /* send_if_empty */ true,
+ true,
+ true,
+ vec![],
+ );
glean.register_ping_type(&ping_type);
// Submit a ping
@@ -1101,7 +1179,14 @@ mod test {
let (mut glean, _t) = new_glean(None);
// Register a ping for testing
- let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]);
+ let ping_type = PingType::new(
+ "test",
+ true,
+ /* send_if_empty */ true,
+ true,
+ true,
+ vec![],
+ );
glean.register_ping_type(&ping_type);
// Submit a ping
@@ -1133,7 +1218,14 @@ mod test {
let (mut glean, dir) = new_glean(None);
// Register a ping for testing
- let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]);
+ let ping_type = PingType::new(
+ "test",
+ true,
+ /* send_if_empty */ true,
+ true,
+ true,
+ vec![],
+ );
glean.register_ping_type(&ping_type);
// Submit a ping
@@ -1174,7 +1266,17 @@ mod test {
let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
// Enqueue a ping
- upload_manager.enqueue_ping(&glean, &doc1, &path1, "", None);
+ upload_manager.enqueue_ping(
+ &glean,
+ PingPayload {
+ document_id: doc1.clone(),
+ upload_path: path1,
+ json_body: "".into(),
+ headers: None,
+ body_has_info_sections: true,
+ ping_name: "test-ping".into(),
+ },
+ );
// Try and get the first request.
let req = match upload_manager.get_upload_task(&glean, false) {
@@ -1184,7 +1286,17 @@ mod test {
assert_eq!(doc1, req.document_id);
// Schedule the next one while the first one is "in progress"
- upload_manager.enqueue_ping(&glean, &doc2, &path2, "", None);
+ upload_manager.enqueue_ping(
+ &glean,
+ PingPayload {
+ document_id: doc2.clone(),
+ upload_path: path2,
+ json_body: "".into(),
+ headers: None,
+ body_has_info_sections: true,
+ ping_name: "test-ping".into(),
+ },
+ );
// Mark as processed
upload_manager.process_ping_upload_response(
@@ -1221,7 +1333,14 @@ mod test {
glean.set_debug_view_tag("valid-tag");
// Register a ping for testing
- let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]);
+ let ping_type = PingType::new(
+ "test",
+ true,
+ /* send_if_empty */ true,
+ true,
+ true,
+ vec![],
+ );
glean.register_ping_type(&ping_type);
// Submit a ping
@@ -1248,8 +1367,28 @@ mod test {
let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
// Try to enqueue a ping with the same doc_id twice
- upload_manager.enqueue_ping(&glean, &doc_id, &path, "", None);
- upload_manager.enqueue_ping(&glean, &doc_id, &path, "", None);
+ upload_manager.enqueue_ping(
+ &glean,
+ PingPayload {
+ document_id: doc_id.clone(),
+ upload_path: path.clone(),
+ json_body: "".into(),
+ headers: None,
+ body_has_info_sections: true,
+ ping_name: "test-ping".into(),
+ },
+ );
+ upload_manager.enqueue_ping(
+ &glean,
+ PingPayload {
+ document_id: doc_id,
+ upload_path: path,
+ json_body: "".into(),
+ headers: None,
+ body_has_info_sections: true,
+ ping_name: "test-ping".into(),
+ },
+ );
// Get a task once
let task = upload_manager.get_upload_task(&glean, false);
@@ -1267,7 +1406,14 @@ mod test {
let (mut glean, dir) = new_glean(None);
// Register a ping for testing
- let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]);
+ let ping_type = PingType::new(
+ "test",
+ true,
+ /* send_if_empty */ true,
+ true,
+ true,
+ vec![],
+ );
glean.register_ping_type(&ping_type);
// Submit the ping multiple times
@@ -1317,7 +1463,14 @@ mod test {
let (mut glean, dir) = new_glean(None);
// Register a ping for testing
- let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]);
+ let ping_type = PingType::new(
+ "test",
+ true,
+ /* send_if_empty */ true,
+ true,
+ true,
+ vec![],
+ );
glean.register_ping_type(&ping_type);
// Submit the ping multiple times
@@ -1331,7 +1484,10 @@ mod test {
// The pending pings array is sorted by date in ascending order,
// the newest element is the last one.
let (_, newest_ping) = &pending_pings.last().unwrap();
- let (newest_ping_id, _, _, _) = &newest_ping;
+ let PingPayload {
+ document_id: newest_ping_id,
+ ..
+ } = &newest_ping;
// Create a new upload manager pointing to the same data_path as the glean instance.
let mut upload_manager = PingUploadManager::no_policy(dir.path());
@@ -1385,7 +1541,14 @@ mod test {
let (mut glean, dir) = new_glean(None);
// Register a ping for testing
- let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]);
+ let ping_type = PingType::new(
+ "test",
+ true,
+ /* send_if_empty */ true,
+ true,
+ true,
+ vec![],
+ );
glean.register_ping_type(&ping_type);
// How many pings we allow at maximum
@@ -1406,7 +1569,7 @@ mod test {
.iter()
.rev()
.take(count_quota)
- .map(|(_, ping)| ping.0.clone())
+ .map(|(_, ping)| ping.document_id.clone())
.collect::<Vec<_>>();
// Create a new upload manager pointing to the same data_path as the glean instance.
@@ -1457,7 +1620,14 @@ mod test {
let (mut glean, dir) = new_glean(None);
// Register a ping for testing
- let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]);
+ let ping_type = PingType::new(
+ "test",
+ true,
+ /* send_if_empty */ true,
+ true,
+ true,
+ vec![],
+ );
glean.register_ping_type(&ping_type);
let expected_number_of_pings = 3;
@@ -1477,7 +1647,7 @@ mod test {
.iter()
.rev()
.take(expected_number_of_pings)
- .map(|(_, ping)| ping.0.clone())
+ .map(|(_, ping)| ping.document_id.clone())
.collect::<Vec<_>>();
// Create a new upload manager pointing to the same data_path as the glean instance.
@@ -1531,7 +1701,14 @@ mod test {
let (mut glean, dir) = new_glean(None);
// Register a ping for testing
- let ping_type = PingType::new("test", true, /* send_if_empty */ true, true, vec![]);
+ let ping_type = PingType::new(
+ "test",
+ true,
+ /* send_if_empty */ true,
+ true,
+ true,
+ vec![],
+ );
glean.register_ping_type(&ping_type);
let expected_number_of_pings = 2;
@@ -1551,7 +1728,7 @@ mod test {
.iter()
.rev()
.take(expected_number_of_pings)
- .map(|(_, ping)| ping.0.clone())
+ .map(|(_, ping)| ping.document_id.clone())
.collect::<Vec<_>>();
// Create a new upload manager pointing to the same data_path as the glean instance.
@@ -1622,8 +1799,28 @@ mod test {
upload_manager.set_rate_limiter(secs_per_interval, max_pings_per_interval);
// Enqueue two pings
- upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
- upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
+ upload_manager.enqueue_ping(
+ &glean,
+ PingPayload {
+ document_id: Uuid::new_v4().to_string(),
+ upload_path: PATH.into(),
+ json_body: "".into(),
+ headers: None,
+ body_has_info_sections: true,
+ ping_name: "ping-name".into(),
+ },
+ );
+ upload_manager.enqueue_ping(
+ &glean,
+ PingPayload {
+ document_id: Uuid::new_v4().to_string(),
+ upload_path: PATH.into(),
+ json_body: "".into(),
+ headers: None,
+ body_has_info_sections: true,
+ ping_name: "ping-name".into(),
+ },
+ );
// Get the first ping, it should be returned normally.
match upload_manager.get_upload_task(&glean, false) {
@@ -1679,12 +1876,28 @@ mod test {
let upload_manager = PingUploadManager::no_policy(dir.path());
// Enqueue a ping and start processing it
- let identifier = &Uuid::new_v4().to_string();
- upload_manager.enqueue_ping(&glean, identifier, PATH, "", None);
+ let identifier = &Uuid::new_v4();
+ let ping = PingPayload {
+ document_id: identifier.to_string(),
+ upload_path: PATH.into(),
+ json_body: "".into(),
+ headers: None,
+ body_has_info_sections: true,
+ ping_name: "ping-name".into(),
+ };
+ upload_manager.enqueue_ping(&glean, ping);
assert!(upload_manager.get_upload_task(&glean, false).is_upload());
// Attempt to re-enqueue the same ping
- upload_manager.enqueue_ping(&glean, identifier, PATH, "", None);
+ let ping = PingPayload {
+ document_id: identifier.to_string(),
+ upload_path: PATH.into(),
+ json_body: "".into(),
+ headers: None,
+ body_has_info_sections: true,
+ ping_name: "ping-name".into(),
+ };
+ upload_manager.enqueue_ping(&glean, ping);
// No new pings should have been enqueued so the upload task is Done.
assert_eq!(
@@ -1695,7 +1908,7 @@ mod test {
// Process the upload response
upload_manager.process_ping_upload_response(
&glean,
- identifier,
+ &identifier.to_string(),
UploadResult::http_status(200),
);
}