diff options
Diffstat (limited to '')
-rw-r--r-- | xpcom/rust/moz_task/Cargo.toml | 16 | ||||
-rw-r--r-- | xpcom/rust/moz_task/src/dispatcher.rs | 153 | ||||
-rw-r--r-- | xpcom/rust/moz_task/src/event_loop.rs | 66 | ||||
-rw-r--r-- | xpcom/rust/moz_task/src/executor.rs | 291 | ||||
-rw-r--r-- | xpcom/rust/moz_task/src/lib.rs | 377 |
5 files changed, 903 insertions, 0 deletions
diff --git a/xpcom/rust/moz_task/Cargo.toml b/xpcom/rust/moz_task/Cargo.toml new file mode 100644 index 0000000000..ec60bbf156 --- /dev/null +++ b/xpcom/rust/moz_task/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "moz_task" +version = "0.1.0" +authors = ["Myk Melez <myk@mykzilla.org>"] +license = "MPL-2.0" +description = "Rust wrappers around XPCOM threading functions" +edition = "2018" + +[dependencies] +log = "0.4" +cstr = "0.2" +libc = "0.2" +async-task = { version = "4.0" } +nserror = { path = "../nserror" } +nsstring = { path = "../nsstring" } +xpcom = { path = "../xpcom" } diff --git a/xpcom/rust/moz_task/src/dispatcher.rs b/xpcom/rust/moz_task/src/dispatcher.rs new file mode 100644 index 0000000000..17ad9ceb81 --- /dev/null +++ b/xpcom/rust/moz_task/src/dispatcher.rs @@ -0,0 +1,153 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::{ + dispatch_background_task_runnable, dispatch_runnable, get_current_thread, DispatchOptions, +}; +use nserror::{nsresult, NS_OK}; +use nsstring::nsACString; +use std::sync::Mutex; +use xpcom::interfaces::{nsIEventTarget, nsIRunnablePriority}; +use xpcom::xpcom; + +/// Basic wrapper to convert a FnOnce callback into a `nsIRunnable` to be +/// dispatched using XPCOM. +#[xpcom(implement(nsIRunnable, nsINamed, nsIRunnablePriority), atomic)] +struct RunnableFunction<F: FnOnce() + 'static> { + name: &'static str, + priority: u32, + function: Mutex<Option<F>>, +} + +impl<F: FnOnce() + 'static> RunnableFunction<F> { + #[allow(non_snake_case)] + fn Run(&self) -> nsresult { + let function = self.function.lock().unwrap().take(); + debug_assert!(function.is_some(), "runnable invoked twice?"); + if let Some(function) = function { + function(); + } + NS_OK + } + + #[allow(non_snake_case)] + unsafe fn GetName(&self, result: *mut nsACString) -> nsresult { + (*result).assign(self.name); + NS_OK + } + + #[allow(non_snake_case)] + unsafe fn GetPriority(&self, result: *mut u32) -> nsresult { + *result = self.priority; + NS_OK + } +} + +pub struct RunnableBuilder<F> { + name: &'static str, + function: F, + priority: u32, + options: DispatchOptions, +} + +impl<F> RunnableBuilder<F> { + pub fn new(name: &'static str, function: F) -> Self { + RunnableBuilder { + name, + function, + priority: nsIRunnablePriority::PRIORITY_NORMAL, + options: DispatchOptions::default(), + } + } + + pub fn priority(mut self, priority: u32) -> Self { + self.priority = priority; + self + } + + pub fn options(mut self, options: DispatchOptions) -> Self { + self.options = options; + self + } + + pub fn may_block(mut self, may_block: bool) -> Self { + self.options = self.options.may_block(may_block); + self + } + + pub unsafe fn at_end(mut self, at_end: bool) -> Self { + self.options = self.options.at_end(at_end); + self + } +} + +impl<F> RunnableBuilder<F> +where + F: FnOnce() + Send + 'static, +{ + /// Dispatch this Runnable to the specified EventTarget. The runnable function must be `Send`. + pub fn dispatch(self, target: &nsIEventTarget) -> Result<(), nsresult> { + let runnable = RunnableFunction::allocate(InitRunnableFunction { + name: self.name, + priority: self.priority, + function: Mutex::new(Some(self.function)), + }); + unsafe { dispatch_runnable(runnable.coerce(), target, self.options) } + } + + /// Dispatch this Runnable to the specified EventTarget as a background + /// task. The runnable function must be `Send`. + pub fn dispatch_background_task(self) -> Result<(), nsresult> { + let runnable = RunnableFunction::allocate(InitRunnableFunction { + name: self.name, + priority: self.priority, + function: Mutex::new(Some(self.function)), + }); + unsafe { dispatch_background_task_runnable(runnable.coerce(), self.options) } + } +} + +impl<F> RunnableBuilder<F> +where + F: FnOnce() + 'static, +{ + /// Dispatch this Runnable to the current thread. + /// + /// Unlike `dispatch` and `dispatch_background_task`, the runnable does not + /// need to be `Send` to dispatch to the current thread. + pub fn dispatch_local(self) -> Result<(), nsresult> { + let target = get_current_thread()?; + let runnable = RunnableFunction::allocate(InitRunnableFunction { + name: self.name, + priority: self.priority, + function: Mutex::new(Some(self.function)), + }); + unsafe { dispatch_runnable(runnable.coerce(), target.coerce(), self.options) } + } +} + +pub fn dispatch_onto<F>( + name: &'static str, + target: &nsIEventTarget, + function: F, +) -> Result<(), nsresult> +where + F: FnOnce() + Send + 'static, +{ + RunnableBuilder::new(name, function).dispatch(target) +} + +pub fn dispatch_background_task<F>(name: &'static str, function: F) -> Result<(), nsresult> +where + F: FnOnce() + Send + 'static, +{ + RunnableBuilder::new(name, function).dispatch_background_task() +} + +pub fn dispatch_local<F>(name: &'static str, function: F) -> Result<(), nsresult> +where + F: FnOnce() + 'static, +{ + RunnableBuilder::new(name, function).dispatch_local() +} diff --git a/xpcom/rust/moz_task/src/event_loop.rs b/xpcom/rust/moz_task/src/event_loop.rs new file mode 100644 index 0000000000..f8d113ed57 --- /dev/null +++ b/xpcom/rust/moz_task/src/event_loop.rs @@ -0,0 +1,66 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +extern crate nsstring; + +use cstr::cstr; +use nserror::{nsresult, NS_ERROR_SERVICE_NOT_AVAILABLE, NS_ERROR_UNEXPECTED, NS_OK}; +use nsstring::*; +use std::cell::RefCell; +use std::future::Future; +use xpcom::{interfaces::nsIThreadManager, xpcom, xpcom_method}; + +#[xpcom(implement(nsINestedEventLoopCondition), nonatomic)] +struct FutureCompleteCondition<T: 'static> { + value: RefCell<Option<T>>, +} + +impl<T: 'static> FutureCompleteCondition<T> { + xpcom_method!(is_done => IsDone() -> bool); + fn is_done(&self) -> Result<bool, nsresult> { + Ok(self.value.borrow().is_some()) + } +} + +/// Spin the event loop on the current thread until `future` is resolved. +/// +/// # Safety +/// +/// Spinning a nested event loop should always be avoided when possible, as it +/// can cause hangs, break JS run-to-completion guarantees, and break other C++ +/// code currently on the stack relying on heap invariants. While in a pure-rust +/// codebase this method would only be ill-advised and not technically "unsafe", +/// it is marked as unsafe due to the potential for triggering unsafety in +/// unrelated C++ code. +pub unsafe fn spin_event_loop_until<F>( + reason: &'static str, + future: F, +) -> Result<F::Output, nsresult> +where + F: Future + 'static, + F::Output: 'static, +{ + let thread_manager = + xpcom::get_service::<nsIThreadManager>(cstr!("@mozilla.org/thread-manager;1")) + .ok_or(NS_ERROR_SERVICE_NOT_AVAILABLE)?; + + let cond = FutureCompleteCondition::<F::Output>::allocate(InitFutureCompleteCondition { + value: RefCell::new(None), + }); + + // Spawn our future onto the current thread event loop, and record the + // completed value as it completes. + let cond2 = cond.clone(); + crate::spawn_local(reason, async move { + let rv = future.await; + *cond2.value.borrow_mut() = Some(rv); + }) + .detach(); + + thread_manager + .SpinEventLoopUntil(&*nsCStr::from(reason), cond.coerce()) + .to_result()?; + let rv = cond.value.borrow_mut().take(); + rv.ok_or(NS_ERROR_UNEXPECTED) +} diff --git a/xpcom/rust/moz_task/src/executor.rs b/xpcom/rust/moz_task/src/executor.rs new file mode 100644 index 0000000000..0016839373 --- /dev/null +++ b/xpcom/rust/moz_task/src/executor.rs @@ -0,0 +1,291 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +use crate::{get_current_thread, DispatchOptions, RunnableBuilder}; +use std::{ + cell::Cell, + fmt::Debug, + future::Future, + pin::Pin, + ptr, + sync::Arc, + task::{Context, Poll}, +}; +use xpcom::interfaces::{nsIEventTarget, nsIRunnablePriority}; +use xpcom::RefPtr; + +/// A spawned task. +/// +/// A [`AsyncTask`] can be awaited to retrieve the output of its future. +/// +/// Dropping an [`AsyncTask`] cancels it, which means its future won't be polled +/// again. To drop the [`AsyncTask`] handle without canceling it, use +/// [`detach()`][`AsyncTask::detach()`] instead. To cancel a task gracefully and +/// wait until it is fully destroyed, use the [`cancel()`][AsyncTask::cancel()] +/// method. +/// +/// A task which is cancelled due to the nsIEventTarget it was dispatched to no +/// longer accepting events will never be resolved. +#[derive(Debug)] +#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] +pub struct AsyncTask<T> { + task: async_task::FallibleTask<T>, +} + +impl<T> AsyncTask<T> { + fn new(task: async_task::Task<T>) -> Self { + AsyncTask { + task: task.fallible(), + } + } + + /// Detaches the task to let it keep running in the background. + pub fn detach(self) { + self.task.detach() + } + + /// Cancels the task and waits for it to stop running. + /// + /// Returns the task's output if it was completed just before it got canceled, or [`None`] if + /// it didn't complete. + /// + /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of + /// canceling because it also waits for the task to stop running. + pub async fn cancel(self) -> Option<T> { + self.task.cancel().await + } +} + +impl<T> Future for AsyncTask<T> { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + // Wrap the future produced by `AsyncTask` to never resolve if the + // Runnable was dropped, and the task was cancelled. + match Pin::new(&mut self.task).poll(cx) { + Poll::Ready(Some(t)) => Poll::Ready(t), + Poll::Ready(None) | Poll::Pending => Poll::Pending, + } + } +} + +enum SpawnTarget { + BackgroundTask, + EventTarget(RefPtr<nsIEventTarget>), +} + +// SAFETY: All XPCOM interfaces are considered !Send + !Sync, however all +// well-behaved nsIEventTarget instances must be threadsafe. +unsafe impl Send for SpawnTarget {} +unsafe impl Sync for SpawnTarget {} + +/// Information used by tasks as they are spawned. Stored in an Arc such that +/// their identity can be used for `POLLING_TASK`. +struct TaskSpawnConfig { + name: &'static str, + priority: u32, + options: DispatchOptions, + target: SpawnTarget, +} + +thread_local! { + /// Raw pointer to the TaskSpawnConfig for the currently polling task. Used + /// to detect scheduling callbacks for a runnable while it is polled, to set + /// `DISPATCH_AT_END` on the notification. + static POLLING_TASK: Cell<*const TaskSpawnConfig> = Cell::new(ptr::null()); +} + +fn schedule(config: Arc<TaskSpawnConfig>, runnable: async_task::Runnable) { + // If we're dispatching this task while it is currently running on the same + // thread, set the `DISPATCH_AT_END` flag in the dispatch options to tell + // our threadpool target to not bother to spin up another thread. + let currently_polling = POLLING_TASK.with(|t| t.get() == Arc::as_ptr(&config)); + + // SAFETY: We use the POLLING_TASK thread local to check if we meet the + // requirements for `at_end`. + let options = unsafe { config.options.at_end(currently_polling) }; + + // Build the RunnableBuilder for our task to be dispatched. + let config2 = config.clone(); + let builder = RunnableBuilder::new(config.name, move || { + // Record the pointer for the currently executing task in the + // POLLING_TASK thread-local so that nested dispatches can detect it. + POLLING_TASK.with(|t| { + let prev = t.get(); + t.set(Arc::as_ptr(&config2)); + runnable.run(); + t.set(prev); + }); + }) + .priority(config.priority) + .options(options); + + let rv = match &config.target { + SpawnTarget::BackgroundTask => builder.dispatch_background_task(), + SpawnTarget::EventTarget(target) => builder.dispatch(&*target), + }; + if let Err(err) = rv { + log::warn!( + "dispatch for spawned task '{}' failed: {:?}", + config.name, + err + ); + } +} + +/// Helper for starting an async task which will run a future to completion. +#[derive(Debug)] +pub struct TaskBuilder<F> { + name: &'static str, + future: F, + priority: u32, + options: DispatchOptions, +} + +impl<F> TaskBuilder<F> { + pub fn new(name: &'static str, future: F) -> TaskBuilder<F> { + TaskBuilder { + name, + future, + priority: nsIRunnablePriority::PRIORITY_NORMAL, + options: DispatchOptions::default(), + } + } + + /// Specify the priority of the task's runnables. + pub fn priority(mut self, priority: u32) -> Self { + self.priority = priority; + self + } + + /// Specify options to use when dispatching the task. + pub fn options(mut self, options: DispatchOptions) -> Self { + self.options = options; + self + } + + /// Set whether or not the event may block, and should be run on the IO + /// thread pool. + pub fn may_block(mut self, may_block: bool) -> Self { + self.options = self.options.may_block(may_block); + self + } +} + +impl<F> TaskBuilder<F> +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + /// Run the future on the background task pool. + pub fn spawn(self) -> AsyncTask<F::Output> { + let config = Arc::new(TaskSpawnConfig { + name: self.name, + priority: self.priority, + options: self.options, + target: SpawnTarget::BackgroundTask, + }); + let (runnable, task) = async_task::spawn(self.future, move |runnable| { + schedule(config.clone(), runnable) + }); + runnable.schedule(); + AsyncTask::new(task) + } + + /// Run the future on the specified nsIEventTarget. + pub fn spawn_onto(self, target: &nsIEventTarget) -> AsyncTask<F::Output> { + let config = Arc::new(TaskSpawnConfig { + name: self.name, + priority: self.priority, + options: self.options, + target: SpawnTarget::EventTarget(RefPtr::new(target)), + }); + let (runnable, task) = async_task::spawn(self.future, move |runnable| { + schedule(config.clone(), runnable) + }); + runnable.schedule(); + AsyncTask::new(task) + } +} + +impl<F> TaskBuilder<F> +where + F: Future + 'static, + F::Output: 'static, +{ + /// Run the future on the current thread. + /// + /// Unlike the other `spawn` methods, this method supports non-Send futures. + /// + /// # Panics + /// + /// This method may panic if run on a thread which cannot run local futures + /// (e.g. due to it is not being an XPCOM thread, or if we are very late + /// during shutdown). + pub fn spawn_local(self) -> AsyncTask<F::Output> { + let current_thread = get_current_thread().expect("cannot get current thread"); + let config = Arc::new(TaskSpawnConfig { + name: self.name, + priority: self.priority, + options: self.options, + target: SpawnTarget::EventTarget(RefPtr::new(current_thread.coerce())), + }); + let (runnable, task) = async_task::spawn_local(self.future, move |runnable| { + schedule(config.clone(), runnable) + }); + runnable.schedule(); + AsyncTask::new(task) + } +} + +/// Spawn a future onto the background task pool. The future will not be run on +/// the main thread. +pub fn spawn<F>(name: &'static str, future: F) -> AsyncTask<F::Output> +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + TaskBuilder::new(name, future).spawn() +} + +/// Spawn a potentially-blocking future onto the background task pool. The +/// future will not be run on the main thread. +pub fn spawn_blocking<F>(name: &'static str, future: F) -> AsyncTask<F::Output> +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + TaskBuilder::new(name, future).may_block(true).spawn() +} + +/// Spawn a local future onto the current thread. +pub fn spawn_local<F>(name: &'static str, future: F) -> AsyncTask<F::Output> +where + F: Future + 'static, + F::Output: 'static, +{ + TaskBuilder::new(name, future).spawn_local() +} + +pub fn spawn_onto<F>(name: &'static str, target: &nsIEventTarget, future: F) -> AsyncTask<F::Output> +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + TaskBuilder::new(name, future).spawn_onto(target) +} + +pub fn spawn_onto_blocking<F>( + name: &'static str, + target: &nsIEventTarget, + future: F, +) -> AsyncTask<F::Output> +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + TaskBuilder::new(name, future) + .may_block(true) + .spawn_onto(target) +} diff --git a/xpcom/rust/moz_task/src/lib.rs b/xpcom/rust/moz_task/src/lib.rs new file mode 100644 index 0000000000..7c1ea3f970 --- /dev/null +++ b/xpcom/rust/moz_task/src/lib.rs @@ -0,0 +1,377 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +//! This module wraps XPCOM threading functions with Rust functions +//! to make it safer and more convenient to call the XPCOM functions. +//! It also provides the Task trait and TaskRunnable struct, +//! which make it easier to dispatch tasks to threads. + +mod dispatcher; +pub use dispatcher::{dispatch_background_task, dispatch_local, dispatch_onto, RunnableBuilder}; +mod event_loop; +mod executor; +pub use executor::{ + spawn, spawn_blocking, spawn_local, spawn_onto, spawn_onto_blocking, AsyncTask, TaskBuilder, +}; + +// Expose functions intended to be used only in gtest via this module. +// We don't use a feature gate here to stop the need to compile all crates that +// depend upon `moz_task` twice. +pub mod gtest_only { + pub use crate::event_loop::spin_event_loop_until; +} + +use nserror::nsresult; +use nsstring::{nsACString, nsCString}; +use std::{ffi::CStr, marker::PhantomData, mem, ptr}; +use xpcom::{ + getter_addrefs, + interfaces::{nsIEventTarget, nsIRunnable, nsISerialEventTarget, nsISupports, nsIThread}, + AtomicRefcnt, RefCounted, RefPtr, XpCom, +}; + +extern "C" { + fn NS_GetCurrentThreadRust(result: *mut *const nsIThread) -> nsresult; + fn NS_GetMainThreadRust(result: *mut *const nsIThread) -> nsresult; + fn NS_IsMainThread() -> bool; + fn NS_NewNamedThreadWithDefaultStackSize( + name: *const nsACString, + result: *mut *const nsIThread, + event: *const nsIRunnable, + ) -> nsresult; + fn NS_IsOnCurrentThread(target: *const nsIEventTarget) -> bool; + fn NS_ProxyReleaseISupports( + name: *const libc::c_char, + target: *const nsIEventTarget, + doomed: *const nsISupports, + always_proxy: bool, + ); + fn NS_CreateBackgroundTaskQueue( + name: *const libc::c_char, + target: *mut *const nsISerialEventTarget, + ) -> nsresult; + fn NS_DispatchBackgroundTask(event: *const nsIRunnable, flags: u32) -> nsresult; +} + +pub fn get_current_thread() -> Result<RefPtr<nsIThread>, nsresult> { + getter_addrefs(|p| unsafe { NS_GetCurrentThreadRust(p) }) +} + +pub fn get_main_thread() -> Result<RefPtr<nsIThread>, nsresult> { + getter_addrefs(|p| unsafe { NS_GetMainThreadRust(p) }) +} + +pub fn is_main_thread() -> bool { + unsafe { NS_IsMainThread() } +} + +pub fn create_thread(name: &str) -> Result<RefPtr<nsIThread>, nsresult> { + getter_addrefs(|p| unsafe { + NS_NewNamedThreadWithDefaultStackSize(&*nsCString::from(name), p, ptr::null()) + }) +} + +pub fn is_on_current_thread(target: &nsIEventTarget) -> bool { + unsafe { NS_IsOnCurrentThread(target) } +} + +/// Creates a queue that runs tasks on the background thread pool. The tasks +/// will run in the order they're dispatched, one after the other. +pub fn create_background_task_queue( + name: &'static CStr, +) -> Result<RefPtr<nsISerialEventTarget>, nsresult> { + getter_addrefs(|p| unsafe { NS_CreateBackgroundTaskQueue(name.as_ptr(), p) }) +} + +/// Dispatches a one-shot runnable to an event target, like a thread or a +/// task queue, with the given options. +/// +/// This function leaks the runnable if dispatch fails. +/// +/// # Safety +/// +/// As there is no guarantee that the runnable is actually `Send + Sync`, we +/// can't know that it's safe to dispatch an `nsIRunnable` to any +/// `nsIEventTarget`. +pub unsafe fn dispatch_runnable( + runnable: &nsIRunnable, + target: &nsIEventTarget, + options: DispatchOptions, +) -> Result<(), nsresult> { + // NOTE: DispatchFromScript performs an AddRef on `runnable` which is + // why this function leaks on failure. + target + .DispatchFromScript(runnable, options.flags()) + .to_result() +} + +/// Dispatches a one-shot task runnable to the background thread pool with the +/// given options. The task may run concurrently with other background tasks. +/// If you need tasks to run in a specific order, please create a background +/// task queue using `create_background_task_queue`, and dispatch tasks to it +/// instead. +/// +/// This function leaks the runnable if dispatch fails. This avoids a race where +/// a runnable can be destroyed on either the original or target thread, which +/// is important if the runnable holds thread-unsafe members. +/// +/// ### Safety +/// +/// As there is no guarantee that the runnable is actually `Send + Sync`, we +/// can't know that it's safe to dispatch an `nsIRunnable` to any +/// `nsIEventTarget`. +pub unsafe fn dispatch_background_task_runnable( + runnable: &nsIRunnable, + options: DispatchOptions, +) -> Result<(), nsresult> { + // This eventually calls the non-`already_AddRefed<nsIRunnable>` overload of + // `nsIEventTarget::Dispatch` (see xpcom/threads/nsIEventTarget.idl#20-25), + // which adds an owning reference and leaks if dispatch fails. + NS_DispatchBackgroundTask(runnable, options.flags()).to_result() +} + +/// Options to control how task runnables are dispatched. +/// +/// NOTE: The `DISPATCH_SYNC` flag is intentionally not supported by this type. +#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] +pub struct DispatchOptions(u32); + +impl Default for DispatchOptions { + #[inline] + fn default() -> Self { + DispatchOptions(nsIEventTarget::DISPATCH_NORMAL) + } +} + +impl DispatchOptions { + /// Creates a blank set of options. The runnable will be dispatched using + /// the default mode. + #[inline] + pub fn new() -> Self { + DispatchOptions::default() + } + + /// Indicates whether or not the dispatched runnable may block its target + /// thread by waiting on I/O. If `true`, the runnable may be dispatched to a + /// dedicated thread pool, leaving the main pool free for CPU-bound tasks. + #[inline] + pub fn may_block(self, may_block: bool) -> DispatchOptions { + const FLAG: u32 = nsIEventTarget::DISPATCH_EVENT_MAY_BLOCK; + if may_block { + DispatchOptions(self.flags() | FLAG) + } else { + DispatchOptions(self.flags() & !FLAG) + } + } + + /// Specifies that the dispatch is occurring from a running event that was + /// dispatched to the same event target, and that event is about to finish. + /// + /// A thread pool can use this as an optimization hint to not spin up + /// another thread, since the current thread is about to become idle. + /// + /// Setting this flag is unsafe, as it may only be used from the target + /// event target when the event is about to finish. + #[inline] + pub unsafe fn at_end(self, may_block: bool) -> DispatchOptions { + const FLAG: u32 = nsIEventTarget::DISPATCH_AT_END; + if may_block { + DispatchOptions(self.flags() | FLAG) + } else { + DispatchOptions(self.flags() & !FLAG) + } + } + + /// Returns the set of bitflags to pass to `DispatchFromScript`. + #[inline] + fn flags(self) -> u32 { + self.0 + } +} + +/// A task represents an operation that asynchronously executes on a target +/// thread, and returns its result to the original thread. +/// +/// # Alternatives +/// +/// This trait is no longer necessary for basic tasks to be dispatched to +/// another thread with a callback on the originating thread. `moz_task` now has +/// a series of more rust-like primitives which can be used instead. For +/// example, it may be preferable to use the async executor over `Task`: +/// +/// ```ignore +/// // Spawn a task onto the background task pool, and capture the result of its +/// // execution. +/// let bg_task = moz_task::spawn("Example", async move { +/// do_background_work(captured_state) +/// }); +/// +/// // Spawn another task on the calling thread which will await on the result +/// // of the async operation, and invoke a non-Send callback. This task won't +/// // be awaited on, so needs to be `detach`-ed. +/// moz_task::spawn_local("Example", async move { +/// callback.completed(bg_task.await); +/// }) +/// .detach(); +/// ``` +/// +/// If no result is needed, the task returned from `spawn` may be also detached +/// directly. +pub trait Task { + // FIXME: These could accept `&mut`. + fn run(&self); + fn done(&self) -> Result<(), nsresult>; +} + +pub struct TaskRunnable { + name: &'static str, + task: Box<dyn Task + Send + Sync>, +} + +impl TaskRunnable { + // XXX: Fixme: clean up this old API. (bug 1744312) + pub fn new( + name: &'static str, + task: Box<dyn Task + Send + Sync>, + ) -> Result<TaskRunnable, nsresult> { + Ok(TaskRunnable { name, task }) + } + + pub fn dispatch(self, target: &nsIEventTarget) -> Result<(), nsresult> { + self.dispatch_with_options(target, DispatchOptions::default()) + } + + pub fn dispatch_with_options( + self, + target: &nsIEventTarget, + options: DispatchOptions, + ) -> Result<(), nsresult> { + // Perform `task.run()` on a background thread. + let task = self.task; + let handle = TaskBuilder::new(self.name, async move { + task.run(); + task + }) + .options(options) + .spawn_onto(target); + + // Run `task.done()` on the starting thread once the background thread + // is done with the task. + spawn_local(self.name, async move { + let task = handle.await; + let _ = task.done(); + }) + .detach(); + Ok(()) + } + + pub fn dispatch_background_task_with_options( + self, + options: DispatchOptions, + ) -> Result<(), nsresult> { + // Perform `task.run()` on a background thread. + let task = self.task; + let handle = TaskBuilder::new(self.name, async move { + task.run(); + task + }) + .options(options) + .spawn(); + + // Run `task.done()` on the starting thread once the background thread + // is done with the task. + spawn_local(self.name, async move { + let task = handle.await; + let _ = task.done(); + }) + .detach(); + Ok(()) + } +} + +pub type ThreadPtrHandle<T> = RefPtr<ThreadPtrHolder<T>>; + +/// A Rust analog to `nsMainThreadPtrHolder` that wraps an `nsISupports` object +/// with thread-safe refcounting. The holder keeps one reference to the wrapped +/// object that's released when the holder's refcount reaches zero. +pub struct ThreadPtrHolder<T: XpCom + 'static> { + ptr: *const T, + marker: PhantomData<T>, + name: &'static CStr, + owning_thread: RefPtr<nsIThread>, + refcnt: AtomicRefcnt, +} + +unsafe impl<T: XpCom + 'static> Send for ThreadPtrHolder<T> {} +unsafe impl<T: XpCom + 'static> Sync for ThreadPtrHolder<T> {} + +unsafe impl<T: XpCom + 'static> RefCounted for ThreadPtrHolder<T> { + unsafe fn addref(&self) { + self.refcnt.inc(); + } + + unsafe fn release(&self) { + let rc = self.refcnt.dec(); + if rc == 0 { + // Once the holder's count reaches zero, release the wrapped + // object... + if !self.ptr.is_null() { + // The holder can be released on any thread. If we're on the + // owning thread, we can release the object directly. Otherwise, + // we need to post a proxy release event to release the object + // on the owning thread. + if is_on_current_thread(&self.owning_thread) { + (*self.ptr).release() + } else { + NS_ProxyReleaseISupports( + self.name.as_ptr(), + self.owning_thread.coerce(), + self.ptr as *const T as *const nsISupports, + false, + ); + } + } + // ...And deallocate the holder. + mem::drop(Box::from_raw(self as *const Self as *mut Self)); + } + } +} + +impl<T: XpCom + 'static> ThreadPtrHolder<T> { + /// Creates a new owning thread pointer holder. Returns an error if the + /// thread manager has shut down. Panics if `name` isn't a valid C string. + pub fn new(name: &'static CStr, ptr: RefPtr<T>) -> Result<RefPtr<Self>, nsresult> { + let owning_thread = get_current_thread()?; + // Take ownership of the `RefPtr`. This does _not_ decrement its + // refcount, which is what we want. Once we've released all references + // to the holder, we'll release the wrapped `RefPtr`. + let raw: *const T = &*ptr; + mem::forget(ptr); + unsafe { + let boxed = Box::new(ThreadPtrHolder { + name, + ptr: raw, + marker: PhantomData, + owning_thread, + refcnt: AtomicRefcnt::new(), + }); + Ok(RefPtr::from_raw(Box::into_raw(boxed)).unwrap()) + } + } + + /// Returns the wrapped object's owning thread. + pub fn owning_thread(&self) -> &nsIThread { + &self.owning_thread + } + + /// Returns the wrapped object if called from the owning thread, or + /// `None` if called from any other thread. + pub fn get(&self) -> Option<&T> { + if is_on_current_thread(&self.owning_thread) && !self.ptr.is_null() { + unsafe { Some(&*self.ptr) } + } else { + None + } + } +} |