summaryrefslogtreecommitdiffstats
path: root/toolkit/components/bitsdownload/src/bits_interface/task
diff options
context:
space:
mode:
Diffstat (limited to 'toolkit/components/bitsdownload/src/bits_interface/task')
-rw-r--r--toolkit/components/bitsdownload/src/bits_interface/task/client.rs102
-rw-r--r--toolkit/components/bitsdownload/src/bits_interface/task/from_threadbound.rs125
-rw-r--r--toolkit/components/bitsdownload/src/bits_interface/task/mod.rs18
-rw-r--r--toolkit/components/bitsdownload/src/bits_interface/task/request_task.rs425
-rw-r--r--toolkit/components/bitsdownload/src/bits_interface/task/service_task.rs332
5 files changed, 1002 insertions, 0 deletions
diff --git a/toolkit/components/bitsdownload/src/bits_interface/task/client.rs b/toolkit/components/bitsdownload/src/bits_interface/task/client.rs
new file mode 100644
index 0000000000..edbf4d0698
--- /dev/null
+++ b/toolkit/components/bitsdownload/src/bits_interface/task/client.rs
@@ -0,0 +1,102 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+use super::{
+ action::Action,
+ error::{BitsTaskError, ErrorStage::CommandThread, ErrorType::MissingBitsClient},
+ string::nsCString_to_OsString,
+};
+
+use bits_client::BitsClient;
+use nsstring::nsCString;
+use std::cell::Cell;
+
+thread_local! {
+ // This is used to store the `BitsClient` on the Command thread.
+ // Keeping it here solves the problem of how to allow multiple runnables to
+ // be simultaneously queued on the Command thread while giving them all
+ // access to the same `BitsClient`.
+ static BITS_CLIENT: Cell<Option<BitsClient>> = Cell::new(None);
+}
+
+/// This structure holds the data needed to initialize `BitsClient` and
+/// `BitsMonitorClient`.
+#[derive(Debug, Clone)]
+pub struct ClientInitData {
+ pub job_name: nsCString,
+ pub save_path_prefix: nsCString,
+ pub monitor_timeout_ms: u32,
+}
+
+impl ClientInitData {
+ pub fn new(
+ job_name: nsCString,
+ save_path_prefix: nsCString,
+ monitor_timeout_ms: u32,
+ ) -> ClientInitData {
+ ClientInitData {
+ job_name,
+ save_path_prefix,
+ monitor_timeout_ms,
+ }
+ }
+}
+
+/// This function constructs a `BitsClient`, if one does not already exist. If
+/// the `BitsClient` cannot be constructed, a `BitsTaskError` will be returned.
+/// If the `BitsClient` could be obtained, then the function then calls the
+/// closure passed to it, passing a mutable reference to the `BitsClient`.
+/// This function will then return whatever the closure returned, which must be
+/// a `Result<_, BitsTaskError>`.
+pub fn with_maybe_new_bits_client<F, R>(
+ init_data: &ClientInitData,
+ action: Action,
+ closure: F,
+) -> Result<R, BitsTaskError>
+where
+ F: FnOnce(&mut BitsClient) -> Result<R, BitsTaskError>,
+{
+ _with_bits_client(Some(init_data), action, closure)
+}
+
+/// This function assumes that a `BitsClient` has already been constructed. If
+/// there is not one available, a `BitsTaskError` will be returned. Otherwise,
+/// the function calls the closure passed to it, passing a mutable reference to
+/// the `BitsClient`. This function will then return whatever the closure
+/// returned, which must be a `Result<_, BitsTaskError>`.
+pub fn with_bits_client<F, R>(action: Action, closure: F) -> Result<R, BitsTaskError>
+where
+ F: FnOnce(&mut BitsClient) -> Result<R, BitsTaskError>,
+{
+ _with_bits_client(None, action, closure)
+}
+
+fn _with_bits_client<F, R>(
+ maybe_init_data: Option<&ClientInitData>,
+ action: Action,
+ closure: F,
+) -> Result<R, BitsTaskError>
+where
+ F: FnOnce(&mut BitsClient) -> Result<R, BitsTaskError>,
+{
+ BITS_CLIENT.with(|cell| {
+ let maybe_client = cell.take();
+ let mut client = match (maybe_client, maybe_init_data) {
+ (Some(r), _) => r,
+ (None, Some(init_data)) => {
+ // Immediately invoked function to allow for the ? operator
+ BitsClient::new(
+ nsCString_to_OsString(&init_data.job_name, action, CommandThread)?,
+ nsCString_to_OsString(&init_data.save_path_prefix, action, CommandThread)?,
+ )
+ .map_err(|pipe_error| BitsTaskError::from_pipe(action, pipe_error))?
+ }
+ (None, None) => {
+ return Err(BitsTaskError::new(MissingBitsClient, action, CommandThread));
+ }
+ };
+ let result = closure(&mut client);
+ cell.set(Some(client));
+ result
+ })
+}
diff --git a/toolkit/components/bitsdownload/src/bits_interface/task/from_threadbound.rs b/toolkit/components/bitsdownload/src/bits_interface/task/from_threadbound.rs
new file mode 100644
index 0000000000..2cf2e9189a
--- /dev/null
+++ b/toolkit/components/bitsdownload/src/bits_interface/task/from_threadbound.rs
@@ -0,0 +1,125 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+use super::{
+ action::Action,
+ error::{BitsTaskError, ErrorStage, ErrorType},
+};
+use log::warn;
+use xpcom::{RefCounted, ThreadBoundRefPtr};
+
+#[derive(Debug, PartialEq, Clone, Copy)]
+pub enum DataType {
+ Callback,
+ BitsService,
+ BitsRequest,
+ Observer,
+ Context,
+}
+
+#[derive(Debug, PartialEq, Clone, Copy)]
+enum GetThreadboundError {
+ Missing,
+ WrongThread,
+}
+
+impl DataType {
+ fn error_type(&self, error: GetThreadboundError) -> ErrorType {
+ match self {
+ DataType::Callback => match error {
+ GetThreadboundError::Missing => ErrorType::MissingCallback,
+ GetThreadboundError::WrongThread => ErrorType::CallbackOnWrongThread,
+ },
+ DataType::BitsService => match error {
+ GetThreadboundError::Missing => ErrorType::MissingBitsService,
+ GetThreadboundError::WrongThread => ErrorType::BitsServiceOnWrongThread,
+ },
+ DataType::BitsRequest => match error {
+ GetThreadboundError::Missing => ErrorType::MissingBitsRequest,
+ GetThreadboundError::WrongThread => ErrorType::BitsRequestOnWrongThread,
+ },
+ DataType::Observer => match error {
+ GetThreadboundError::Missing => ErrorType::MissingObserver,
+ GetThreadboundError::WrongThread => ErrorType::ObserverOnWrongThread,
+ },
+ DataType::Context => match error {
+ GetThreadboundError::Missing => ErrorType::MissingContext,
+ GetThreadboundError::WrongThread => ErrorType::ContextOnWrongThread,
+ },
+ }
+ }
+
+ fn name(&self) -> &'static str {
+ match self {
+ DataType::Callback => "Callback",
+ DataType::BitsService => "BITS Service",
+ DataType::BitsRequest => "BITS Request",
+ DataType::Observer => "Observer",
+ DataType::Context => "Context",
+ }
+ }
+}
+
+/// Given a reference to a threadbound option
+/// (i.e. `&Option<ThreadBoundRefPtr<_>>`), this function will attempt to
+/// retrieve a reference to the value stored within. If it is not available
+/// (option is `None` or value is on the wrong thread), `None` is returned
+/// instead.
+pub fn get_from_threadbound_option<T>(
+ maybe_threadbound: &Option<ThreadBoundRefPtr<T>>,
+ data_type: DataType,
+ action: Action,
+) -> Option<&T>
+where
+ T: RefCounted + 'static,
+{
+ maybe_threadbound.as_ref().and_then(|threadbound| {
+ let maybe_reference = threadbound.get_ref();
+ if maybe_reference.is_none() {
+ warn!(
+ "Unexpected error {}: {} is on the wrong thread",
+ action.description(),
+ data_type.name(),
+ );
+ }
+ maybe_reference
+ })
+}
+
+/// Given a reference to a threadbound option
+/// (i.e. `&Option<ThreadBoundRefPtr<_>>`), this function will attempt to
+/// retrieve a reference to the value stored within. If it is not available
+/// (option is `None` or value is on the wrong thread), a `BitsTaskError` is
+/// returned instead.
+pub fn expect_from_threadbound_option<T>(
+ maybe_threadbound: &Option<ThreadBoundRefPtr<T>>,
+ data_type: DataType,
+ action: Action,
+) -> Result<&T, BitsTaskError>
+where
+ T: RefCounted + 'static,
+{
+ match maybe_threadbound.as_ref() {
+ Some(threadbound) => {
+ match threadbound.get_ref() {
+ Some(reference) => Ok(reference),
+ None => Err(BitsTaskError::new(
+ data_type.error_type(GetThreadboundError::WrongThread),
+ action,
+ // Retrieving data from threadbounds all happens on the main thread.
+ // No data is ever bound to other threads so there would be no
+ // reason to retrieve it there.
+ ErrorStage::MainThread,
+ )),
+ }
+ }
+ None => Err(BitsTaskError::new(
+ data_type.error_type(GetThreadboundError::Missing),
+ action,
+ // Retrieving data from threadbounds all happens on the main thread.
+ // No data is ever bound to other threads so there would be no
+ // reason to retrieve it there.
+ ErrorStage::MainThread,
+ )),
+ }
+}
diff --git a/toolkit/components/bitsdownload/src/bits_interface/task/mod.rs b/toolkit/components/bitsdownload/src/bits_interface/task/mod.rs
new file mode 100644
index 0000000000..b6b96d887b
--- /dev/null
+++ b/toolkit/components/bitsdownload/src/bits_interface/task/mod.rs
@@ -0,0 +1,18 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+mod from_threadbound;
+
+use super::{action, dispatch_callback, error, request::BitsRequest, string, BitsService};
+
+mod client;
+pub use self::client::ClientInitData;
+
+mod service_task;
+pub use self::service_task::{MonitorDownloadTask, StartDownloadTask};
+
+mod request_task;
+pub use self::request_task::{
+ CancelTask, ChangeMonitorIntervalTask, CompleteTask, Priority, ResumeTask,
+ SetNoProgressTimeoutTask, SetPriorityTask, SuspendTask,
+};
diff --git a/toolkit/components/bitsdownload/src/bits_interface/task/request_task.rs b/toolkit/components/bitsdownload/src/bits_interface/task/request_task.rs
new file mode 100644
index 0000000000..f0fd331ed0
--- /dev/null
+++ b/toolkit/components/bitsdownload/src/bits_interface/task/request_task.rs
@@ -0,0 +1,425 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+use super::{
+ action::{Action, RequestAction},
+ client::with_bits_client,
+ dispatch_callback::{
+ maybe_dispatch_via_callback, CallbackExpected, CallbackOptional, IsCallbackExpected,
+ },
+ error::BitsTaskError,
+ from_threadbound::{expect_from_threadbound_option, DataType},
+ BitsRequest,
+};
+
+use bits_client::{BitsClient, Guid};
+use crossbeam_utils::atomic::AtomicCell;
+use log::info;
+use moz_task::Task;
+use nserror::nsresult;
+use xpcom::{interfaces::nsIBitsCallback, RefPtr, ThreadBoundRefPtr};
+
+type RunFn<D> = fn(Guid, &D, &mut BitsClient) -> Result<(), BitsTaskError>;
+type DoneFn = fn(&BitsRequest, bool) -> Result<(), BitsTaskError>;
+
+pub struct RequestTask<D> {
+ request: AtomicCell<Option<ThreadBoundRefPtr<BitsRequest>>>,
+ guid: Guid,
+ action: RequestAction,
+ task_data: D,
+ run_fn: RunFn<D>,
+ maybe_done_fn: Option<DoneFn>,
+ callback: AtomicCell<Option<ThreadBoundRefPtr<nsIBitsCallback>>>,
+ callback_presence: IsCallbackExpected,
+ result: AtomicCell<Option<Result<(), BitsTaskError>>>,
+}
+
+impl<D> RequestTask<D>
+where
+ D: Sync + Send,
+{
+ pub fn new(
+ request: RefPtr<BitsRequest>,
+ guid: Guid,
+ action: RequestAction,
+ task_data: D,
+ run_fn: RunFn<D>,
+ maybe_done_fn: Option<DoneFn>,
+ callback: Option<RefPtr<nsIBitsCallback>>,
+ callback_presence: IsCallbackExpected,
+ ) -> RequestTask<D> {
+ RequestTask {
+ request: AtomicCell::new(Some(ThreadBoundRefPtr::new(request))),
+ guid,
+ action,
+ task_data,
+ run_fn,
+ maybe_done_fn,
+ callback: AtomicCell::new(callback.map(ThreadBoundRefPtr::new)),
+ result: AtomicCell::new(None),
+ callback_presence,
+ }
+ }
+}
+
+impl<D> Task for RequestTask<D> {
+ fn run(&self) {
+ let result = with_bits_client(self.action.into(), |client| {
+ (self.run_fn)(self.guid.clone(), &self.task_data, client)
+ });
+ self.result.store(Some(result));
+ }
+
+ fn done(&self) -> Result<(), nsresult> {
+ // If TaskRunnable.run() calls Task.done() to return a result
+ // on the main thread before TaskRunnable.run() returns on the worker
+ // thread, then the Task will get dropped on the worker thread.
+ //
+ // But the callback is an nsXPCWrappedJS that isn't safe to release
+ // on the worker thread. So we move it out of the Task here to ensure
+ // it gets released on the main thread.
+ let maybe_tb_callback = self.callback.swap(None);
+ // It also isn't safe to drop the BitsRequest RefPtr off-thread,
+ // because BitsRequest refcounting is non-atomic
+ let maybe_tb_request = self.request.swap(None);
+
+ let action: Action = self.action.into();
+ let maybe_callback =
+ expect_from_threadbound_option(&maybe_tb_callback, DataType::Callback, action);
+
+ // Immediately invoked function expression to allow for the ? operator
+ let result: Result<(), BitsTaskError> = (|| {
+ let request =
+ expect_from_threadbound_option(&maybe_tb_request, DataType::BitsRequest, action)?;
+
+ let maybe_result = self.result.swap(None);
+
+ let success = if let Some(result) = maybe_result.as_ref() {
+ result.is_ok()
+ } else {
+ false
+ };
+
+ if let Some(done_fn) = self.maybe_done_fn {
+ done_fn(request, success)?;
+ }
+
+ maybe_result.ok_or_else(|| BitsTaskError::missing_result(action))?
+ })();
+ info!("BITS Request Task completed: {:?}", result);
+ maybe_dispatch_via_callback(result, maybe_callback, self.callback_presence)
+ }
+}
+
+pub struct CompleteTask(RequestTask<()>);
+
+impl Task for CompleteTask {
+ fn run(&self) {
+ self.0.run();
+ }
+
+ fn done(&self) -> Result<(), nsresult> {
+ self.0.done()
+ }
+}
+
+impl CompleteTask {
+ pub fn new(
+ request: RefPtr<BitsRequest>,
+ id: Guid,
+ callback: RefPtr<nsIBitsCallback>,
+ ) -> CompleteTask {
+ CompleteTask(RequestTask::new(
+ request,
+ id,
+ RequestAction::Complete,
+ (),
+ CompleteTask::run_fn,
+ Some(CompleteTask::done_fn),
+ Some(callback),
+ CallbackExpected,
+ ))
+ }
+
+ fn run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError> {
+ client
+ .complete_job(id)
+ .map_err(|pipe_error| BitsTaskError::from_pipe(Action::Complete, pipe_error))??;
+ Ok(())
+ }
+
+ fn done_fn(request: &BitsRequest, success: bool) -> Result<(), BitsTaskError> {
+ if success {
+ request.on_finished();
+ }
+ Ok(())
+ }
+}
+
+pub struct CancelTask(RequestTask<()>);
+
+impl Task for CancelTask {
+ fn run(&self) {
+ self.0.run();
+ }
+
+ fn done(&self) -> Result<(), nsresult> {
+ self.0.done()
+ }
+}
+
+impl CancelTask {
+ pub fn new(
+ request: RefPtr<BitsRequest>,
+ id: Guid,
+ callback: Option<RefPtr<nsIBitsCallback>>,
+ ) -> CancelTask {
+ let callback_presence = if callback.is_some() {
+ CallbackExpected
+ } else {
+ CallbackOptional
+ };
+
+ CancelTask(RequestTask::new(
+ request,
+ id,
+ RequestAction::Cancel,
+ (),
+ CancelTask::run_fn,
+ Some(CancelTask::done_fn),
+ callback,
+ callback_presence,
+ ))
+ }
+
+ fn run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError> {
+ client
+ .cancel_job(id)
+ .map_err(|pipe_error| BitsTaskError::from_pipe(Action::Cancel, pipe_error))??;
+ Ok(())
+ }
+
+ fn done_fn(request: &BitsRequest, success: bool) -> Result<(), BitsTaskError> {
+ request.finish_cancel_action(success);
+ Ok(())
+ }
+}
+
+pub struct SuspendTask(RequestTask<()>);
+
+impl Task for SuspendTask {
+ fn run(&self) {
+ self.0.run();
+ }
+
+ fn done(&self) -> Result<(), nsresult> {
+ self.0.done()
+ }
+}
+
+impl SuspendTask {
+ pub fn new(
+ request: RefPtr<BitsRequest>,
+ id: Guid,
+ callback: Option<RefPtr<nsIBitsCallback>>,
+ ) -> SuspendTask {
+ let callback_presence = if callback.is_some() {
+ CallbackExpected
+ } else {
+ CallbackOptional
+ };
+
+ SuspendTask(RequestTask::new(
+ request,
+ id,
+ RequestAction::Suspend,
+ (),
+ SuspendTask::run_fn,
+ None,
+ callback,
+ callback_presence,
+ ))
+ }
+
+ fn run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError> {
+ client
+ .suspend_job(id)
+ .map_err(|pipe_error| BitsTaskError::from_pipe(Action::Suspend, pipe_error))??;
+ Ok(())
+ }
+}
+
+pub struct ResumeTask(RequestTask<()>);
+
+impl Task for ResumeTask {
+ fn run(&self) {
+ self.0.run();
+ }
+
+ fn done(&self) -> Result<(), nsresult> {
+ self.0.done()
+ }
+}
+
+impl ResumeTask {
+ pub fn new(
+ request: RefPtr<BitsRequest>,
+ id: Guid,
+ callback: Option<RefPtr<nsIBitsCallback>>,
+ ) -> ResumeTask {
+ let callback_presence = if callback.is_some() {
+ CallbackExpected
+ } else {
+ CallbackOptional
+ };
+
+ ResumeTask(RequestTask::new(
+ request,
+ id,
+ RequestAction::Resume,
+ (),
+ ResumeTask::run_fn,
+ None,
+ callback,
+ callback_presence,
+ ))
+ }
+
+ fn run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError> {
+ client
+ .resume_job(id)
+ .map_err(|pipe_error| BitsTaskError::from_pipe(Action::Resume, pipe_error))??;
+ Ok(())
+ }
+}
+
+pub struct ChangeMonitorIntervalTask(RequestTask<u32>);
+
+impl Task for ChangeMonitorIntervalTask {
+ fn run(&self) {
+ self.0.run();
+ }
+
+ fn done(&self) -> Result<(), nsresult> {
+ self.0.done()
+ }
+}
+
+impl ChangeMonitorIntervalTask {
+ pub fn new(
+ request: RefPtr<BitsRequest>,
+ id: Guid,
+ update_interval_ms: u32,
+ callback: RefPtr<nsIBitsCallback>,
+ ) -> ChangeMonitorIntervalTask {
+ ChangeMonitorIntervalTask(RequestTask::new(
+ request,
+ id,
+ RequestAction::SetMonitorInterval,
+ update_interval_ms,
+ ChangeMonitorIntervalTask::run_fn,
+ None,
+ Some(callback),
+ CallbackExpected,
+ ))
+ }
+
+ fn run_fn(
+ id: Guid,
+ update_interval_ms: &u32,
+ client: &mut BitsClient,
+ ) -> Result<(), BitsTaskError> {
+ client
+ .set_update_interval(id, *update_interval_ms)
+ .map_err(|pipe_error| {
+ BitsTaskError::from_pipe(Action::SetMonitorInterval, pipe_error)
+ })??;
+ Ok(())
+ }
+}
+
+#[derive(Debug, PartialEq, Clone, Copy)]
+pub enum Priority {
+ High,
+ Low,
+}
+
+pub struct SetPriorityTask(RequestTask<Priority>);
+
+impl Task for SetPriorityTask {
+ fn run(&self) {
+ self.0.run();
+ }
+
+ fn done(&self) -> Result<(), nsresult> {
+ self.0.done()
+ }
+}
+
+impl SetPriorityTask {
+ pub fn new(
+ request: RefPtr<BitsRequest>,
+ id: Guid,
+ priority: Priority,
+ callback: RefPtr<nsIBitsCallback>,
+ ) -> SetPriorityTask {
+ SetPriorityTask(RequestTask::new(
+ request,
+ id,
+ RequestAction::SetPriority,
+ priority,
+ SetPriorityTask::run_fn,
+ None,
+ Some(callback),
+ CallbackExpected,
+ ))
+ }
+
+ fn run_fn(id: Guid, priority: &Priority, client: &mut BitsClient) -> Result<(), BitsTaskError> {
+ client
+ .set_job_priority(id, *priority == Priority::High)
+ .map_err(|pipe_error| BitsTaskError::from_pipe(Action::SetPriority, pipe_error))??;
+ Ok(())
+ }
+}
+
+pub struct SetNoProgressTimeoutTask(RequestTask<u32>);
+
+impl Task for SetNoProgressTimeoutTask {
+ fn run(&self) {
+ self.0.run();
+ }
+
+ fn done(&self) -> Result<(), nsresult> {
+ self.0.done()
+ }
+}
+
+impl SetNoProgressTimeoutTask {
+ pub fn new(
+ request: RefPtr<BitsRequest>,
+ id: Guid,
+ timeout_secs: u32,
+ callback: RefPtr<nsIBitsCallback>,
+ ) -> SetNoProgressTimeoutTask {
+ SetNoProgressTimeoutTask(RequestTask::new(
+ request,
+ id,
+ RequestAction::SetNoProgressTimeout,
+ timeout_secs,
+ SetNoProgressTimeoutTask::run_fn,
+ None,
+ Some(callback),
+ CallbackExpected,
+ ))
+ }
+
+ fn run_fn(id: Guid, timeout_secs: &u32, client: &mut BitsClient) -> Result<(), BitsTaskError> {
+ client
+ .set_no_progress_timeout(id, *timeout_secs)
+ .map_err(|pipe_error| {
+ BitsTaskError::from_pipe(Action::SetNoProgressTimeout, pipe_error)
+ })??;
+ Ok(())
+ }
+}
diff --git a/toolkit/components/bitsdownload/src/bits_interface/task/service_task.rs b/toolkit/components/bitsdownload/src/bits_interface/task/service_task.rs
new file mode 100644
index 0000000000..c66f127f7a
--- /dev/null
+++ b/toolkit/components/bitsdownload/src/bits_interface/task/service_task.rs
@@ -0,0 +1,332 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+use super::{
+ action::{
+ Action,
+ Action::{MonitorDownload, StartDownload},
+ ServiceAction,
+ },
+ client::{with_maybe_new_bits_client, ClientInitData},
+ dispatch_callback::{maybe_dispatch_request_via_callback, CallbackExpected},
+ error::{BitsTaskError, ErrorStage::CommandThread},
+ from_threadbound::{expect_from_threadbound_option, get_from_threadbound_option, DataType},
+ string::nsCString_to_OsString,
+ BitsRequest, BitsService,
+};
+
+use bits_client::{BitsClient, BitsMonitorClient, BitsProxyUsage, Guid};
+use crossbeam_utils::atomic::AtomicCell;
+use log::{info, warn};
+use moz_task::Task;
+use nserror::nsresult;
+use nsstring::nsCString;
+use xpcom::{
+ interfaces::{nsIBitsNewRequestCallback, nsIRequestObserver, nsISupports},
+ RefPtr, ThreadBoundRefPtr,
+};
+
+// D is the Data Type that the RunFn function needs to make S.
+// S is the Success Type that the RunFn returns on success and that the
+// DoneFn needs to make the BitsRequest.
+type RunFn<D, S> = fn(&D, &mut BitsClient) -> Result<S, BitsTaskError>;
+type DoneFn<D, S> = fn(
+ &D,
+ S,
+ &ClientInitData,
+ &BitsService,
+ &nsIRequestObserver,
+ Option<&nsISupports>,
+) -> Result<RefPtr<BitsRequest>, BitsTaskError>;
+
+pub struct ServiceTask<D, S> {
+ client_init_data: ClientInitData,
+ action: ServiceAction,
+ task_data: D,
+ run_fn: RunFn<D, S>,
+ done_fn: DoneFn<D, S>,
+ bits_service: AtomicCell<Option<ThreadBoundRefPtr<BitsService>>>,
+ observer: AtomicCell<Option<ThreadBoundRefPtr<nsIRequestObserver>>>,
+ context: AtomicCell<Option<ThreadBoundRefPtr<nsISupports>>>,
+ callback: AtomicCell<Option<ThreadBoundRefPtr<nsIBitsNewRequestCallback>>>,
+ result: AtomicCell<Option<Result<S, BitsTaskError>>>,
+}
+
+impl<D, S> ServiceTask<D, S>
+where
+ D: Sync + Send,
+ S: Sync + Send,
+{
+ pub fn new(
+ client_init_data: ClientInitData,
+ action: ServiceAction,
+ task_data: D,
+ run_fn: RunFn<D, S>,
+ done_fn: DoneFn<D, S>,
+ bits_service: RefPtr<BitsService>,
+ observer: RefPtr<nsIRequestObserver>,
+ context: Option<RefPtr<nsISupports>>,
+ callback: RefPtr<nsIBitsNewRequestCallback>,
+ ) -> ServiceTask<D, S> {
+ ServiceTask {
+ client_init_data,
+ action,
+ task_data,
+ run_fn,
+ done_fn,
+ bits_service: AtomicCell::new(Some(ThreadBoundRefPtr::new(bits_service))),
+ observer: AtomicCell::new(Some(ThreadBoundRefPtr::new(observer))),
+ context: AtomicCell::new(context.map(ThreadBoundRefPtr::new)),
+ callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
+ result: AtomicCell::new(None),
+ }
+ }
+}
+
+impl<D, S> Task for ServiceTask<D, S> {
+ fn run(&self) {
+ let result =
+ with_maybe_new_bits_client(&self.client_init_data, self.action.into(), |client| {
+ (self.run_fn)(&self.task_data, client)
+ });
+ self.result.store(Some(result));
+ }
+
+ fn done(&self) -> Result<(), nsresult> {
+ // If TaskRunnable.run() calls Task.done() to return a result
+ // on the main thread before TaskRunnable.run() returns on the worker
+ // thread, then the Task will get dropped on the worker thread.
+ //
+ // But the callback is an nsXPCWrappedJS that isn't safe to release
+ // on the worker thread. So we move it out of the Task here to ensure
+ // it gets released on the main thread.
+ let maybe_tb_callback = self.callback.swap(None);
+ // It also isn't safe to drop the BitsService RefPtr off-thread,
+ // because BitsService refcounting is non-atomic
+ let maybe_tb_service = self.bits_service.swap(None);
+ // The observer and context are also an nsXPCWrappedJS that aren't safe
+ // to release on the worker thread.
+ let maybe_tb_observer = self.observer.swap(None);
+ let maybe_tb_context = self.context.swap(None);
+
+ let action: Action = self.action.into();
+ let maybe_callback =
+ expect_from_threadbound_option(&maybe_tb_callback, DataType::Callback, action);
+
+ // Immediately invoked function expression to allow for the ? operator
+ let result: Result<RefPtr<BitsRequest>, BitsTaskError> = (|| {
+ let bits_service =
+ expect_from_threadbound_option(&maybe_tb_service, DataType::BitsService, action)?;
+ let observer =
+ expect_from_threadbound_option(&maybe_tb_observer, DataType::Observer, action)?;
+ let maybe_context =
+ get_from_threadbound_option(&maybe_tb_context, DataType::Context, action);
+ let success = self
+ .result
+ .swap(None)
+ .ok_or_else(|| BitsTaskError::missing_result(action))??;
+
+ (self.done_fn)(
+ &self.task_data,
+ success,
+ &self.client_init_data,
+ bits_service,
+ observer,
+ maybe_context,
+ )
+ })();
+ info!("BITS Interface Task completed: {:?}", result);
+ // We incremented the request count when we dispatched an action to
+ // start the job. Now we will decrement since the action completed.
+ // See the declaration of InitBitsService::request_count for details.
+ let bits_service_result =
+ expect_from_threadbound_option(&maybe_tb_service, DataType::BitsService, action);
+ match bits_service_result {
+ Ok(bits_service) => {
+ bits_service.dec_request_count();
+ }
+ Err(error) => {
+ warn!(
+ concat!(
+ "Unable to decrement the request count when finishing ServiceTask. ",
+ "The command thread may not be shut down. Error: {:?}"
+ ),
+ error
+ );
+ }
+ }
+
+ maybe_dispatch_request_via_callback(result, maybe_callback, CallbackExpected)
+ }
+}
+
+struct StartDownloadData {
+ download_url: nsCString,
+ save_rel_path: nsCString,
+ proxy: BitsProxyUsage,
+ no_progress_timeout_secs: u32,
+ update_interval_ms: u32,
+}
+
+struct StartDownloadSuccess {
+ guid: Guid,
+ monitor_client: BitsMonitorClient,
+}
+
+pub struct StartDownloadTask(ServiceTask<StartDownloadData, StartDownloadSuccess>);
+
+impl Task for StartDownloadTask {
+ fn run(&self) {
+ self.0.run();
+ }
+
+ fn done(&self) -> Result<(), nsresult> {
+ self.0.done()
+ }
+}
+
+impl StartDownloadTask {
+ pub fn new(
+ client_init_data: ClientInitData,
+ download_url: nsCString,
+ save_rel_path: nsCString,
+ proxy: BitsProxyUsage,
+ no_progress_timeout_secs: u32,
+ update_interval_ms: u32,
+ bits_service: RefPtr<BitsService>,
+ observer: RefPtr<nsIRequestObserver>,
+ context: Option<RefPtr<nsISupports>>,
+ callback: RefPtr<nsIBitsNewRequestCallback>,
+ ) -> StartDownloadTask {
+ StartDownloadTask(ServiceTask::new(
+ client_init_data,
+ ServiceAction::StartDownload,
+ StartDownloadData {
+ download_url,
+ save_rel_path,
+ proxy,
+ no_progress_timeout_secs,
+ update_interval_ms,
+ },
+ StartDownloadTask::run_fn,
+ StartDownloadTask::done_fn,
+ bits_service,
+ observer,
+ context,
+ callback,
+ ))
+ }
+
+ fn run_fn(
+ data: &StartDownloadData,
+ client: &mut BitsClient,
+ ) -> Result<StartDownloadSuccess, BitsTaskError> {
+ let url = nsCString_to_OsString(&data.download_url, StartDownload, CommandThread)?;
+ let path = nsCString_to_OsString(&data.save_rel_path, StartDownload, CommandThread)?;
+ let (success, monitor_client) = client
+ .start_job(
+ url,
+ path,
+ data.proxy,
+ data.no_progress_timeout_secs,
+ data.update_interval_ms,
+ )
+ .map_err(|pipe_error| BitsTaskError::from_pipe(StartDownload, pipe_error))??;
+ Ok(StartDownloadSuccess {
+ guid: success.guid,
+ monitor_client,
+ })
+ }
+
+ fn done_fn(
+ _data: &StartDownloadData,
+ success: StartDownloadSuccess,
+ client_init_data: &ClientInitData,
+ bits_service: &BitsService,
+ observer: &nsIRequestObserver,
+ maybe_context: Option<&nsISupports>,
+ ) -> Result<RefPtr<BitsRequest>, BitsTaskError> {
+ BitsRequest::new(
+ success.guid.clone(),
+ RefPtr::new(bits_service),
+ client_init_data.monitor_timeout_ms,
+ RefPtr::new(&observer),
+ maybe_context.map(RefPtr::new),
+ success.monitor_client,
+ ServiceAction::StartDownload,
+ )
+ }
+}
+
+struct MonitorDownloadData {
+ guid: Guid,
+ update_interval_ms: u32,
+}
+
+pub struct MonitorDownloadTask(ServiceTask<MonitorDownloadData, BitsMonitorClient>);
+
+impl Task for MonitorDownloadTask {
+ fn run(&self) {
+ self.0.run();
+ }
+
+ fn done(&self) -> Result<(), nsresult> {
+ self.0.done()
+ }
+}
+
+impl MonitorDownloadTask {
+ pub fn new(
+ client_init_data: ClientInitData,
+ guid: Guid,
+ update_interval_ms: u32,
+ bits_service: RefPtr<BitsService>,
+ observer: RefPtr<nsIRequestObserver>,
+ context: Option<RefPtr<nsISupports>>,
+ callback: RefPtr<nsIBitsNewRequestCallback>,
+ ) -> MonitorDownloadTask {
+ MonitorDownloadTask(ServiceTask::new(
+ client_init_data,
+ ServiceAction::MonitorDownload,
+ MonitorDownloadData {
+ guid,
+ update_interval_ms,
+ },
+ MonitorDownloadTask::run_fn,
+ MonitorDownloadTask::done_fn,
+ bits_service,
+ observer,
+ context,
+ callback,
+ ))
+ }
+
+ fn run_fn(
+ data: &MonitorDownloadData,
+ client: &mut BitsClient,
+ ) -> Result<BitsMonitorClient, BitsTaskError> {
+ let result = client
+ .monitor_job(data.guid.clone(), data.update_interval_ms)
+ .map_err(|pipe_error| BitsTaskError::from_pipe(MonitorDownload, pipe_error));
+ Ok(result??)
+ }
+
+ fn done_fn(
+ data: &MonitorDownloadData,
+ monitor_client: BitsMonitorClient,
+ client_init_data: &ClientInitData,
+ bits_service: &BitsService,
+ observer: &nsIRequestObserver,
+ maybe_context: Option<&nsISupports>,
+ ) -> Result<RefPtr<BitsRequest>, BitsTaskError> {
+ BitsRequest::new(
+ data.guid.clone(),
+ RefPtr::new(bits_service),
+ client_init_data.monitor_timeout_ms,
+ RefPtr::new(&observer),
+ maybe_context.map(RefPtr::new),
+ monitor_client,
+ ServiceAction::MonitorDownload,
+ )
+ }
+}