summaryrefslogtreecommitdiffstats
path: root/third_party/rust/glean/src/dispatcher/global.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/glean/src/dispatcher/global.rs')
-rw-r--r--third_party/rust/glean/src/dispatcher/global.rs191
1 files changed, 191 insertions, 0 deletions
diff --git a/third_party/rust/glean/src/dispatcher/global.rs b/third_party/rust/glean/src/dispatcher/global.rs
new file mode 100644
index 0000000000..c35428d6f6
--- /dev/null
+++ b/third_party/rust/glean/src/dispatcher/global.rs
@@ -0,0 +1,191 @@
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+
+use once_cell::sync::Lazy;
+use std::sync::RwLock;
+
+use super::{DispatchError, DispatchGuard, Dispatcher};
+
+const GLOBAL_DISPATCHER_LIMIT: usize = 100;
+static GLOBAL_DISPATCHER: Lazy<RwLock<Option<Dispatcher>>> =
+ Lazy::new(|| RwLock::new(Some(Dispatcher::new(GLOBAL_DISPATCHER_LIMIT))));
+
+/// Get a dispatcher for the global queue.
+///
+/// A dispatcher is cheap to create, so we create one on every access instead of caching it.
+/// This avoids troubles for tests where the global dispatcher _can_ change.
+fn guard() -> DispatchGuard {
+ GLOBAL_DISPATCHER
+ .read()
+ .unwrap()
+ .as_ref()
+ .map(|dispatcher| dispatcher.guard())
+ .unwrap()
+}
+
+/// Launches a new task on the global dispatch queue.
+///
+/// The new task will be enqueued immediately.
+/// If the pre-init queue was already flushed,
+/// the background thread will process tasks in the queue (see [`flush_init`]).
+///
+/// This will not block.
+///
+/// [`flush_init`]: fn.flush_init.html
+pub fn launch(task: impl FnOnce() + Send + 'static) {
+ match guard().launch(task) {
+ Ok(_) => {}
+ Err(DispatchError::QueueFull) => {
+ log::info!("Exceeded maximum queue size, discarding task");
+ // TODO: Record this as an error.
+ }
+ Err(_) => {
+ log::info!("Failed to launch a task on the queue. Discarding task.");
+ }
+ }
+}
+
+/// Block until all tasks prior to this call are processed.
+pub fn block_on_queue() {
+ guard().block_on_queue();
+}
+
+/// Starts processing queued tasks in the global dispatch queue.
+///
+/// This function blocks until queued tasks prior to this call are finished.
+/// Once the initial queue is empty the dispatcher will wait for new tasks to be launched.
+pub fn flush_init() -> Result<(), DispatchError> {
+ guard().flush_init()
+}
+
+fn join_dispatcher_thread() -> Result<(), DispatchError> {
+ // After we issue the shutdown command, make sure to wait for the
+ // worker thread to join.
+ let mut lock = GLOBAL_DISPATCHER.write().unwrap();
+ let dispatcher = lock.as_mut().expect("Global dispatcher has gone missing");
+
+ if let Some(worker) = dispatcher.worker.take() {
+ return worker.join().map_err(|_| DispatchError::WorkerPanic);
+ }
+
+ Ok(())
+}
+
+/// Kill the blocked dispatcher without processing the queue.
+///
+/// This will immediately shutdown the worker thread
+/// and no other tasks will be processed.
+/// This only has an effect when the queue is still blocked.
+pub fn kill() -> Result<(), DispatchError> {
+ guard().kill()?;
+ join_dispatcher_thread()
+}
+
+/// Shuts down the dispatch queue.
+///
+/// This will initiate a shutdown of the worker thread
+/// and no new tasks will be processed after this.
+pub fn shutdown() -> Result<(), DispatchError> {
+ guard().shutdown()?;
+ join_dispatcher_thread()
+}
+
+/// TEST ONLY FUNCTION.
+/// Resets the Glean state and triggers init again.
+pub(crate) fn reset_dispatcher() {
+ // We don't care about shutdown errors, since they will
+ // definitely happen if this
+ let _ = shutdown();
+
+ // Now that the dispatcher is shut down, replace it.
+ // For that we
+ // 1. Create a new
+ // 2. Replace the global one
+ // 3. Only then return (and thus release the lock)
+ let mut lock = GLOBAL_DISPATCHER.write().unwrap();
+ let new_dispatcher = Some(Dispatcher::new(GLOBAL_DISPATCHER_LIMIT));
+ *lock = new_dispatcher;
+}
+
+#[cfg(test)]
+mod test {
+ use std::sync::{Arc, Mutex};
+
+ use super::*;
+
+ #[test]
+ #[ignore] // We can't reset the queue at the moment, so filling it up breaks other tests.
+ fn global_fills_up_in_order_and_works() {
+ let _ = env_logger::builder().is_test(true).try_init();
+
+ let result = Arc::new(Mutex::new(vec![]));
+
+ for i in 1..=GLOBAL_DISPATCHER_LIMIT {
+ let result = Arc::clone(&result);
+ launch(move || {
+ result.lock().unwrap().push(i);
+ });
+ }
+
+ {
+ let result = Arc::clone(&result);
+ launch(move || {
+ result.lock().unwrap().push(150);
+ });
+ }
+
+ flush_init().unwrap();
+
+ {
+ let result = Arc::clone(&result);
+ launch(move || {
+ result.lock().unwrap().push(200);
+ });
+ }
+
+ block_on_queue();
+
+ let mut expected = (1..=GLOBAL_DISPATCHER_LIMIT).collect::<Vec<_>>();
+ expected.push(200);
+ assert_eq!(&*result.lock().unwrap(), &expected);
+ }
+
+ #[test]
+ #[ignore] // We can't reset the queue at the moment, so flushing it breaks other tests.
+ fn global_nested_calls() {
+ let _ = env_logger::builder().is_test(true).try_init();
+
+ let result = Arc::new(Mutex::new(vec![]));
+
+ {
+ let result = Arc::clone(&result);
+ launch(move || {
+ result.lock().unwrap().push(1);
+ });
+ }
+
+ flush_init().unwrap();
+
+ {
+ let result = Arc::clone(&result);
+ launch(move || {
+ result.lock().unwrap().push(21);
+
+ {
+ let result = Arc::clone(&result);
+ launch(move || {
+ result.lock().unwrap().push(3);
+ });
+ }
+
+ result.lock().unwrap().push(22);
+ });
+ }
+
+ block_on_queue();
+
+ let expected = vec![1, 21, 22, 3];
+ assert_eq!(&*result.lock().unwrap(), &expected);
+ }
+}