summaryrefslogtreecommitdiffstats
path: root/xpcom/rust/moz_task/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--xpcom/rust/moz_task/src/lib.rs365
1 files changed, 365 insertions, 0 deletions
diff --git a/xpcom/rust/moz_task/src/lib.rs b/xpcom/rust/moz_task/src/lib.rs
new file mode 100644
index 0000000000..d6db10ef38
--- /dev/null
+++ b/xpcom/rust/moz_task/src/lib.rs
@@ -0,0 +1,365 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+//! This module wraps XPCOM threading functions with Rust functions
+//! to make it safer and more convenient to call the XPCOM functions.
+//! It also provides the Task trait and TaskRunnable struct,
+//! which make it easier to dispatch tasks to threads.
+
+extern crate cstr;
+extern crate futures_task;
+extern crate libc;
+extern crate nserror;
+extern crate nsstring;
+extern crate thiserror;
+extern crate xpcom;
+
+mod event_loop;
+mod executor;
+pub use executor::spawn_current_thread;
+
+// Expose functions intended to be used only in gtest via this module.
+// We don't use a feature gate here to stop the need to compile all crates that
+// depend upon `moz_task` twice.
+pub mod gtest_only {
+ pub use event_loop::spin_event_loop_until;
+}
+
+use nserror::{nsresult, NS_OK};
+use nsstring::{nsACString, nsCString};
+use std::{
+ ffi::CStr,
+ marker::PhantomData,
+ mem, ptr,
+ sync::atomic::{AtomicBool, Ordering},
+};
+use xpcom::{
+ getter_addrefs,
+ interfaces::{nsIEventTarget, nsIRunnable, nsISerialEventTarget, nsISupports, nsIThread},
+ xpcom, xpcom_method, AtomicRefcnt, RefCounted, RefPtr, XpCom,
+};
+
+extern "C" {
+ fn NS_GetCurrentThreadEventTarget(result: *mut *const nsIThread) -> nsresult;
+ fn NS_GetMainThreadEventTarget(result: *mut *const nsIThread) -> nsresult;
+ fn NS_IsMainThread() -> bool;
+ fn NS_NewNamedThreadWithDefaultStackSize(
+ name: *const nsACString,
+ result: *mut *const nsIThread,
+ event: *const nsIRunnable,
+ ) -> nsresult;
+ fn NS_IsCurrentThread(thread: *const nsIEventTarget) -> bool;
+ fn NS_ProxyReleaseISupports(
+ name: *const libc::c_char,
+ target: *const nsIEventTarget,
+ doomed: *const nsISupports,
+ always_proxy: bool,
+ );
+ fn NS_CreateBackgroundTaskQueue(
+ name: *const libc::c_char,
+ target: *mut *const nsISerialEventTarget,
+ ) -> nsresult;
+ fn NS_DispatchBackgroundTask(event: *const nsIRunnable, flags: u32) -> nsresult;
+}
+
+pub fn get_current_thread() -> Result<RefPtr<nsIThread>, nsresult> {
+ getter_addrefs(|p| unsafe { NS_GetCurrentThreadEventTarget(p) })
+}
+
+pub fn get_main_thread() -> Result<RefPtr<nsIThread>, nsresult> {
+ getter_addrefs(|p| unsafe { NS_GetMainThreadEventTarget(p) })
+}
+
+pub fn is_main_thread() -> bool {
+ unsafe { NS_IsMainThread() }
+}
+
+pub fn create_thread(name: &str) -> Result<RefPtr<nsIThread>, nsresult> {
+ getter_addrefs(|p| unsafe {
+ NS_NewNamedThreadWithDefaultStackSize(&*nsCString::from(name), p, ptr::null())
+ })
+}
+
+pub fn is_current_thread(thread: &nsIThread) -> bool {
+ unsafe { NS_IsCurrentThread(thread.coerce()) }
+}
+
+/// Creates a queue that runs tasks on the background thread pool. The tasks
+/// will run in the order they're dispatched, one after the other.
+pub fn create_background_task_queue(
+ name: &'static CStr,
+) -> Result<RefPtr<nsISerialEventTarget>, nsresult> {
+ getter_addrefs(|p| unsafe { NS_CreateBackgroundTaskQueue(name.as_ptr(), p) })
+}
+
+/// Dispatches a one-shot runnable to an event target with the default options.
+///
+/// # Safety
+///
+/// As there is no guarantee that the runnable is actually `Send + Sync`, we
+/// can't know that it's safe to dispatch an `nsIRunnable` to any
+/// `nsIEventTarget`.
+#[inline]
+pub unsafe fn dispatch(runnable: &nsIRunnable, target: &nsIEventTarget) -> Result<(), nsresult> {
+ dispatch_with_options(runnable, target, DispatchOptions::default())
+}
+
+/// Dispatches a one-shot runnable to an event target, like a thread or a
+/// task queue, with the given options.
+///
+/// This function leaks the runnable if dispatch fails.
+///
+/// # Safety
+///
+/// As there is no guarantee that the runnable is actually `Send + Sync`, we
+/// can't know that it's safe to dispatch an `nsIRunnable` to any
+/// `nsIEventTarget`.
+pub unsafe fn dispatch_with_options(
+ runnable: &nsIRunnable,
+ target: &nsIEventTarget,
+ options: DispatchOptions,
+) -> Result<(), nsresult> {
+ // NOTE: DispatchFromScript performs an AddRef on `runnable` which is
+ // why this function leaks on failure.
+ target
+ .DispatchFromScript(runnable, options.flags())
+ .to_result()
+}
+
+/// Dispatches a one-shot task runnable to the background thread pool with the
+/// default options.
+#[inline]
+pub fn dispatch_background_task(runnable: RefPtr<nsIRunnable>) -> Result<(), nsresult> {
+ dispatch_background_task_with_options(runnable, DispatchOptions::default())
+}
+
+/// Dispatches a one-shot task runnable to the background thread pool with the
+/// given options. The task may run concurrently with other background tasks.
+/// If you need tasks to run in a specific order, please create a background
+/// task queue using `create_background_task_queue`, and dispatch tasks to it
+/// instead.
+///
+/// ### Safety
+///
+/// This function leaks the runnable if dispatch fails. This avoids a race where
+/// a runnable can be destroyed on either the original or target thread, which
+/// is important if the runnable holds thread-unsafe members.
+pub fn dispatch_background_task_with_options(
+ runnable: RefPtr<nsIRunnable>,
+ options: DispatchOptions,
+) -> Result<(), nsresult> {
+ // This eventually calls the non-`already_AddRefed<nsIRunnable>` overload of
+ // `nsIEventTarget::Dispatch` (see xpcom/threads/nsIEventTarget.idl#20-25),
+ // which adds an owning reference and leaks if dispatch fails.
+ unsafe { NS_DispatchBackgroundTask(runnable.coerce(), options.flags()) }.to_result()
+}
+
+/// Options to control how task runnables are dispatched.
+#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
+pub struct DispatchOptions(u32);
+
+impl Default for DispatchOptions {
+ #[inline]
+ fn default() -> Self {
+ DispatchOptions(nsIEventTarget::DISPATCH_NORMAL as u32)
+ }
+}
+
+impl DispatchOptions {
+ /// Creates a blank set of options. The runnable will be dispatched using
+ /// the default mode.
+ #[inline]
+ pub fn new() -> Self {
+ DispatchOptions::default()
+ }
+
+ /// Indicates whether or not the dispatched runnable may block its target
+ /// thread by waiting on I/O. If `true`, the runnable may be dispatched to a
+ /// dedicated thread pool, leaving the main pool free for CPU-bound tasks.
+ #[inline]
+ pub fn may_block(self, may_block: bool) -> DispatchOptions {
+ const FLAG: u32 = nsIEventTarget::DISPATCH_EVENT_MAY_BLOCK as u32;
+ if may_block {
+ DispatchOptions(self.flags() | FLAG)
+ } else {
+ DispatchOptions(self.flags() & !FLAG)
+ }
+ }
+
+ /// Returns the set of bitflags to pass to `DispatchFromScript`.
+ #[inline]
+ fn flags(self) -> u32 {
+ self.0
+ }
+}
+
+/// A task represents an operation that asynchronously executes on a target
+/// thread, and returns its result to the original thread.
+pub trait Task {
+ fn run(&self);
+ fn done(&self) -> Result<(), nsresult>;
+}
+
+/// The struct responsible for dispatching a Task by calling its run() method
+/// on the target thread and returning its result by calling its done() method
+/// on the original thread.
+///
+/// The struct uses its has_run field to determine whether it should call
+/// run() or done(). It could instead check if task.result is Some or None,
+/// but if run() failed to set task.result, then it would loop infinitely.
+#[derive(xpcom)]
+#[xpimplements(nsIRunnable, nsINamed)]
+#[refcnt = "atomic"]
+pub struct InitTaskRunnable {
+ name: &'static str,
+ original_thread: RefPtr<nsIThread>,
+ task: Box<dyn Task + Send + Sync>,
+ has_run: AtomicBool,
+}
+
+impl TaskRunnable {
+ pub fn new(
+ name: &'static str,
+ task: Box<dyn Task + Send + Sync>,
+ ) -> Result<RefPtr<TaskRunnable>, nsresult> {
+ Ok(TaskRunnable::allocate(InitTaskRunnable {
+ name,
+ original_thread: get_current_thread()?,
+ task,
+ has_run: AtomicBool::new(false),
+ }))
+ }
+
+ /// Dispatches this task runnable to an event target with the default
+ /// options.
+ #[inline]
+ pub fn dispatch(this: RefPtr<Self>, target: &nsIEventTarget) -> Result<(), nsresult> {
+ Self::dispatch_with_options(this, target, DispatchOptions::default())
+ }
+
+ /// Dispatches this task runnable to an event target, like a thread or a
+ /// task queue, with the given options.
+ ///
+ /// Note that this is an associated function, not a method, because it takes
+ /// an owned reference to the runnable, and must be called like
+ /// `TaskRunnable::dispatch_with_options(runnable, options)` and *not*
+ /// `runnable.dispatch_with_options(options)`.
+ ///
+ /// This function leaks the runnable if dispatch fails.
+ pub fn dispatch_with_options(
+ this: RefPtr<Self>,
+ target: &nsIEventTarget,
+ options: DispatchOptions,
+ ) -> Result<(), nsresult> {
+ unsafe { target.DispatchFromScript(this.coerce(), options.flags()) }.to_result()
+ }
+
+ xpcom_method!(run => Run());
+ fn run(&self) -> Result<(), nsresult> {
+ match self
+ .has_run
+ .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
+ {
+ Ok(_) => {
+ self.task.run();
+ Self::dispatch(RefPtr::new(self), &self.original_thread)
+ }
+ Err(_) => {
+ assert!(is_current_thread(&self.original_thread));
+ self.task.done()
+ }
+ }
+ }
+
+ xpcom_method!(get_name => GetName() -> nsACString);
+ fn get_name(&self) -> Result<nsCString, nsresult> {
+ Ok(nsCString::from(self.name))
+ }
+}
+
+pub type ThreadPtrHandle<T> = RefPtr<ThreadPtrHolder<T>>;
+
+/// A Rust analog to `nsMainThreadPtrHolder` that wraps an `nsISupports` object
+/// with thread-safe refcounting. The holder keeps one reference to the wrapped
+/// object that's released when the holder's refcount reaches zero.
+pub struct ThreadPtrHolder<T: XpCom + 'static> {
+ ptr: *const T,
+ marker: PhantomData<T>,
+ name: &'static CStr,
+ owning_thread: RefPtr<nsIThread>,
+ refcnt: AtomicRefcnt,
+}
+
+unsafe impl<T: XpCom + 'static> Send for ThreadPtrHolder<T> {}
+unsafe impl<T: XpCom + 'static> Sync for ThreadPtrHolder<T> {}
+
+unsafe impl<T: XpCom + 'static> RefCounted for ThreadPtrHolder<T> {
+ unsafe fn addref(&self) {
+ self.refcnt.inc();
+ }
+
+ unsafe fn release(&self) {
+ let rc = self.refcnt.dec();
+ if rc == 0 {
+ // Once the holder's count reaches zero, release the wrapped
+ // object...
+ if !self.ptr.is_null() {
+ // The holder can be released on any thread. If we're on the
+ // owning thread, we can release the object directly. Otherwise,
+ // we need to post a proxy release event to release the object
+ // on the owning thread.
+ if is_current_thread(&self.owning_thread) {
+ (*self.ptr).release()
+ } else {
+ NS_ProxyReleaseISupports(
+ self.name.as_ptr(),
+ self.owning_thread.coerce(),
+ self.ptr as *const T as *const nsISupports,
+ false,
+ );
+ }
+ }
+ // ...And deallocate the holder.
+ Box::from_raw(self as *const Self as *mut Self);
+ }
+ }
+}
+
+impl<T: XpCom + 'static> ThreadPtrHolder<T> {
+ /// Creates a new owning thread pointer holder. Returns an error if the
+ /// thread manager has shut down. Panics if `name` isn't a valid C string.
+ pub fn new(name: &'static CStr, ptr: RefPtr<T>) -> Result<RefPtr<Self>, nsresult> {
+ let owning_thread = get_current_thread()?;
+ // Take ownership of the `RefPtr`. This does _not_ decrement its
+ // refcount, which is what we want. Once we've released all references
+ // to the holder, we'll release the wrapped `RefPtr`.
+ let raw: *const T = &*ptr;
+ mem::forget(ptr);
+ unsafe {
+ let boxed = Box::new(ThreadPtrHolder {
+ name,
+ ptr: raw,
+ marker: PhantomData,
+ owning_thread,
+ refcnt: AtomicRefcnt::new(),
+ });
+ Ok(RefPtr::from_raw(Box::into_raw(boxed)).unwrap())
+ }
+ }
+
+ /// Returns the wrapped object's owning thread.
+ pub fn owning_thread(&self) -> &nsIThread {
+ &self.owning_thread
+ }
+
+ /// Returns the wrapped object if called from the owning thread, or
+ /// `None` if called from any other thread.
+ pub fn get(&self) -> Option<&T> {
+ if is_current_thread(&self.owning_thread) && !self.ptr.is_null() {
+ unsafe { Some(&*self.ptr) }
+ } else {
+ None
+ }
+ }
+}