diff options
Diffstat (limited to 'third_party/rust/glean/src/net/mod.rs')
-rw-r--r-- | third_party/rust/glean/src/net/mod.rs | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/third_party/rust/glean/src/net/mod.rs b/third_party/rust/glean/src/net/mod.rs new file mode 100644 index 0000000000..5571d30e67 --- /dev/null +++ b/third_party/rust/glean/src/net/mod.rs @@ -0,0 +1,238 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Handling the Glean upload logic. +//! +//! This doesn't perform the actual upload but rather handles +//! retries, upload limitations and error tracking. + +use std::sync::{atomic::Ordering, Arc, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + +use glean_core::upload::PingUploadTask; +pub use glean_core::upload::{PingRequest, UploadResult, UploadTaskAction}; + +pub use http_uploader::*; +use thread_state::{AtomicState, State}; + +mod http_uploader; + +/// A description of a component used to upload pings. +pub trait PingUploader: std::fmt::Debug + Send + Sync { + /// Uploads a ping to a server. + /// + /// # Arguments + /// + /// * `url` - the URL path to upload the data to. + /// * `body` - the serialized text data to send. + /// * `headers` - a vector of tuples containing the headers to send with + /// the request, i.e. (Name, Value). + fn upload(&self, url: String, body: Vec<u8>, headers: Vec<(String, String)>) -> UploadResult; +} + +/// The logic for uploading pings: this leaves the actual upload mechanism as +/// a detail of the user-provided object implementing [`PingUploader`]. +#[derive(Debug)] +pub(crate) struct UploadManager { + inner: Arc<Inner>, +} + +#[derive(Debug)] +struct Inner { + server_endpoint: String, + uploader: Box<dyn PingUploader + 'static>, + thread_running: AtomicState, + handle: Mutex<Option<JoinHandle<()>>>, +} + +impl UploadManager { + /// Create a new instance of the upload manager. + /// + /// # Arguments + /// + /// * `server_endpoint` - the server pings are sent to. + /// * `new_uploader` - the instance of the uploader used to send pings. + pub(crate) fn new( + server_endpoint: String, + new_uploader: Box<dyn PingUploader + 'static>, + ) -> Self { + Self { + inner: Arc::new(Inner { + server_endpoint, + uploader: new_uploader, + thread_running: AtomicState::new(State::Stopped), + handle: Mutex::new(None), + }), + } + } + + /// Signals Glean to upload pings at the next best opportunity. + pub(crate) fn trigger_upload(&self) { + // If no other upload proces is running, we're the one starting it. + // Need atomic compare/exchange to avoid any further races + // or we can end up with 2+ uploader threads. + if self + .inner + .thread_running + .compare_exchange( + State::Stopped, + State::Running, + Ordering::SeqCst, + Ordering::SeqCst, + ) + .is_err() + { + return; + } + + let inner = Arc::clone(&self.inner); + + // Need to lock before we start so that noone thinks we're not running. + let mut handle = self.inner.handle.lock().unwrap(); + let thread = thread::Builder::new() + .name("glean.upload".into()) + .spawn(move || { + log::trace!("Started glean.upload thread"); + loop { + let incoming_task = glean_core::glean_get_upload_task(); + + match incoming_task { + PingUploadTask::Upload { request } => { + log::trace!("Received upload task with request {:?}", request); + let doc_id = request.document_id.clone(); + let upload_url = format!("{}{}", inner.server_endpoint, request.path); + let headers: Vec<(String, String)> = + request.headers.into_iter().collect(); + let result = inner.uploader.upload(upload_url, request.body, headers); + // Process the upload response. + match glean_core::glean_process_ping_upload_response(doc_id, result) { + UploadTaskAction::Next => (), + UploadTaskAction::End => break, + } + + let status = inner.thread_running.load(Ordering::SeqCst); + // asked to shut down. let's do it. + if status == State::ShuttingDown { + break; + } + } + PingUploadTask::Wait { time } => { + log::trace!("Instructed to wait for {:?}ms", time); + thread::sleep(Duration::from_millis(time)); + } + PingUploadTask::Done { .. } => { + log::trace!("Received PingUploadTask::Done. Exiting."); + // Nothing to do here, break out of the loop. + break; + } + } + } + + // Clear the running flag to signal that this thread is done, + // but only if there's no shutdown thread. + let _ = inner.thread_running.compare_exchange( + State::Running, + State::Stopped, + Ordering::SeqCst, + Ordering::SeqCst, + ); + }); + + match thread { + Ok(thread) => *handle = Some(thread), + Err(err) => { + log::warn!("Failed to spawn Glean's uploader thread. This will be retried on next upload. Error: {err}"); + // Swapping back the thread state. We're the ones setting it to `Running`, so + // should be able to set it back. + let state_change = self.inner.thread_running.compare_exchange( + State::Running, + State::Stopped, + Ordering::SeqCst, + Ordering::SeqCst, + ); + + if state_change.is_err() { + log::warn!("Failed to swap back thread state. Someone else jumped in and changed the state."); + } + } + }; + } + + pub(crate) fn shutdown(&self) { + // mark as shutting down. + self.inner + .thread_running + .store(State::ShuttingDown, Ordering::SeqCst); + + // take the thread handle out. + let mut handle = self.inner.handle.lock().unwrap(); + let thread = handle.take(); + + if let Some(thread) = thread { + thread + .join() + .expect("couldn't join on the uploader thread."); + } + } +} + +mod thread_state { + use std::sync::atomic::{AtomicU8, Ordering}; + + #[derive(Debug, PartialEq)] + #[repr(u8)] + pub enum State { + Stopped = 0, + Running = 1, + ShuttingDown = 2, + } + + #[derive(Debug)] + pub struct AtomicState(AtomicU8); + + impl AtomicState { + const fn to_u8(val: State) -> u8 { + val as u8 + } + + fn from_u8(val: u8) -> State { + #![allow(non_upper_case_globals)] + const U8_Stopped: u8 = State::Stopped as u8; + const U8_Running: u8 = State::Running as u8; + const U8_ShuttingDown: u8 = State::ShuttingDown as u8; + match val { + U8_Stopped => State::Stopped, + U8_Running => State::Running, + U8_ShuttingDown => State::ShuttingDown, + _ => panic!("Invalid enum discriminant"), + } + } + + pub const fn new(v: State) -> AtomicState { + AtomicState(AtomicU8::new(Self::to_u8(v))) + } + + pub fn load(&self, order: Ordering) -> State { + Self::from_u8(self.0.load(order)) + } + + pub fn store(&self, val: State, order: Ordering) { + self.0.store(Self::to_u8(val), order) + } + + pub fn compare_exchange( + &self, + current: State, + new: State, + success: Ordering, + failure: Ordering, + ) -> Result<State, State> { + self.0 + .compare_exchange(Self::to_u8(current), Self::to_u8(new), success, failure) + .map(Self::from_u8) + .map_err(Self::from_u8) + } + } +} |