diff options
Diffstat (limited to 'toolkit/components/bitsdownload/src/bits_interface/task')
5 files changed, 1002 insertions, 0 deletions
diff --git a/toolkit/components/bitsdownload/src/bits_interface/task/client.rs b/toolkit/components/bitsdownload/src/bits_interface/task/client.rs new file mode 100644 index 0000000000..edbf4d0698 --- /dev/null +++ b/toolkit/components/bitsdownload/src/bits_interface/task/client.rs @@ -0,0 +1,102 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +use super::{ + action::Action, + error::{BitsTaskError, ErrorStage::CommandThread, ErrorType::MissingBitsClient}, + string::nsCString_to_OsString, +}; + +use bits_client::BitsClient; +use nsstring::nsCString; +use std::cell::Cell; + +thread_local! { + // This is used to store the `BitsClient` on the Command thread. + // Keeping it here solves the problem of how to allow multiple runnables to + // be simultaneously queued on the Command thread while giving them all + // access to the same `BitsClient`. + static BITS_CLIENT: Cell<Option<BitsClient>> = Cell::new(None); +} + +/// This structure holds the data needed to initialize `BitsClient` and +/// `BitsMonitorClient`. +#[derive(Debug, Clone)] +pub struct ClientInitData { + pub job_name: nsCString, + pub save_path_prefix: nsCString, + pub monitor_timeout_ms: u32, +} + +impl ClientInitData { + pub fn new( + job_name: nsCString, + save_path_prefix: nsCString, + monitor_timeout_ms: u32, + ) -> ClientInitData { + ClientInitData { + job_name, + save_path_prefix, + monitor_timeout_ms, + } + } +} + +/// This function constructs a `BitsClient`, if one does not already exist. If +/// the `BitsClient` cannot be constructed, a `BitsTaskError` will be returned. +/// If the `BitsClient` could be obtained, then the function then calls the +/// closure passed to it, passing a mutable reference to the `BitsClient`. +/// This function will then return whatever the closure returned, which must be +/// a `Result<_, BitsTaskError>`. +pub fn with_maybe_new_bits_client<F, R>( + init_data: &ClientInitData, + action: Action, + closure: F, +) -> Result<R, BitsTaskError> +where + F: FnOnce(&mut BitsClient) -> Result<R, BitsTaskError>, +{ + _with_bits_client(Some(init_data), action, closure) +} + +/// This function assumes that a `BitsClient` has already been constructed. If +/// there is not one available, a `BitsTaskError` will be returned. Otherwise, +/// the function calls the closure passed to it, passing a mutable reference to +/// the `BitsClient`. This function will then return whatever the closure +/// returned, which must be a `Result<_, BitsTaskError>`. +pub fn with_bits_client<F, R>(action: Action, closure: F) -> Result<R, BitsTaskError> +where + F: FnOnce(&mut BitsClient) -> Result<R, BitsTaskError>, +{ + _with_bits_client(None, action, closure) +} + +fn _with_bits_client<F, R>( + maybe_init_data: Option<&ClientInitData>, + action: Action, + closure: F, +) -> Result<R, BitsTaskError> +where + F: FnOnce(&mut BitsClient) -> Result<R, BitsTaskError>, +{ + BITS_CLIENT.with(|cell| { + let maybe_client = cell.take(); + let mut client = match (maybe_client, maybe_init_data) { + (Some(r), _) => r, + (None, Some(init_data)) => { + // Immediately invoked function to allow for the ? operator + BitsClient::new( + nsCString_to_OsString(&init_data.job_name, action, CommandThread)?, + nsCString_to_OsString(&init_data.save_path_prefix, action, CommandThread)?, + ) + .map_err(|pipe_error| BitsTaskError::from_pipe(action, pipe_error))? + } + (None, None) => { + return Err(BitsTaskError::new(MissingBitsClient, action, CommandThread)); + } + }; + let result = closure(&mut client); + cell.set(Some(client)); + result + }) +} diff --git a/toolkit/components/bitsdownload/src/bits_interface/task/from_threadbound.rs b/toolkit/components/bitsdownload/src/bits_interface/task/from_threadbound.rs new file mode 100644 index 0000000000..2cf2e9189a --- /dev/null +++ b/toolkit/components/bitsdownload/src/bits_interface/task/from_threadbound.rs @@ -0,0 +1,125 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +use super::{ + action::Action, + error::{BitsTaskError, ErrorStage, ErrorType}, +}; +use log::warn; +use xpcom::{RefCounted, ThreadBoundRefPtr}; + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum DataType { + Callback, + BitsService, + BitsRequest, + Observer, + Context, +} + +#[derive(Debug, PartialEq, Clone, Copy)] +enum GetThreadboundError { + Missing, + WrongThread, +} + +impl DataType { + fn error_type(&self, error: GetThreadboundError) -> ErrorType { + match self { + DataType::Callback => match error { + GetThreadboundError::Missing => ErrorType::MissingCallback, + GetThreadboundError::WrongThread => ErrorType::CallbackOnWrongThread, + }, + DataType::BitsService => match error { + GetThreadboundError::Missing => ErrorType::MissingBitsService, + GetThreadboundError::WrongThread => ErrorType::BitsServiceOnWrongThread, + }, + DataType::BitsRequest => match error { + GetThreadboundError::Missing => ErrorType::MissingBitsRequest, + GetThreadboundError::WrongThread => ErrorType::BitsRequestOnWrongThread, + }, + DataType::Observer => match error { + GetThreadboundError::Missing => ErrorType::MissingObserver, + GetThreadboundError::WrongThread => ErrorType::ObserverOnWrongThread, + }, + DataType::Context => match error { + GetThreadboundError::Missing => ErrorType::MissingContext, + GetThreadboundError::WrongThread => ErrorType::ContextOnWrongThread, + }, + } + } + + fn name(&self) -> &'static str { + match self { + DataType::Callback => "Callback", + DataType::BitsService => "BITS Service", + DataType::BitsRequest => "BITS Request", + DataType::Observer => "Observer", + DataType::Context => "Context", + } + } +} + +/// Given a reference to a threadbound option +/// (i.e. `&Option<ThreadBoundRefPtr<_>>`), this function will attempt to +/// retrieve a reference to the value stored within. If it is not available +/// (option is `None` or value is on the wrong thread), `None` is returned +/// instead. +pub fn get_from_threadbound_option<T>( + maybe_threadbound: &Option<ThreadBoundRefPtr<T>>, + data_type: DataType, + action: Action, +) -> Option<&T> +where + T: RefCounted + 'static, +{ + maybe_threadbound.as_ref().and_then(|threadbound| { + let maybe_reference = threadbound.get_ref(); + if maybe_reference.is_none() { + warn!( + "Unexpected error {}: {} is on the wrong thread", + action.description(), + data_type.name(), + ); + } + maybe_reference + }) +} + +/// Given a reference to a threadbound option +/// (i.e. `&Option<ThreadBoundRefPtr<_>>`), this function will attempt to +/// retrieve a reference to the value stored within. If it is not available +/// (option is `None` or value is on the wrong thread), a `BitsTaskError` is +/// returned instead. +pub fn expect_from_threadbound_option<T>( + maybe_threadbound: &Option<ThreadBoundRefPtr<T>>, + data_type: DataType, + action: Action, +) -> Result<&T, BitsTaskError> +where + T: RefCounted + 'static, +{ + match maybe_threadbound.as_ref() { + Some(threadbound) => { + match threadbound.get_ref() { + Some(reference) => Ok(reference), + None => Err(BitsTaskError::new( + data_type.error_type(GetThreadboundError::WrongThread), + action, + // Retrieving data from threadbounds all happens on the main thread. + // No data is ever bound to other threads so there would be no + // reason to retrieve it there. + ErrorStage::MainThread, + )), + } + } + None => Err(BitsTaskError::new( + data_type.error_type(GetThreadboundError::Missing), + action, + // Retrieving data from threadbounds all happens on the main thread. + // No data is ever bound to other threads so there would be no + // reason to retrieve it there. + ErrorStage::MainThread, + )), + } +} diff --git a/toolkit/components/bitsdownload/src/bits_interface/task/mod.rs b/toolkit/components/bitsdownload/src/bits_interface/task/mod.rs new file mode 100644 index 0000000000..b6b96d887b --- /dev/null +++ b/toolkit/components/bitsdownload/src/bits_interface/task/mod.rs @@ -0,0 +1,18 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +mod from_threadbound; + +use super::{action, dispatch_callback, error, request::BitsRequest, string, BitsService}; + +mod client; +pub use self::client::ClientInitData; + +mod service_task; +pub use self::service_task::{MonitorDownloadTask, StartDownloadTask}; + +mod request_task; +pub use self::request_task::{ + CancelTask, ChangeMonitorIntervalTask, CompleteTask, Priority, ResumeTask, + SetNoProgressTimeoutTask, SetPriorityTask, SuspendTask, +}; diff --git a/toolkit/components/bitsdownload/src/bits_interface/task/request_task.rs b/toolkit/components/bitsdownload/src/bits_interface/task/request_task.rs new file mode 100644 index 0000000000..f0fd331ed0 --- /dev/null +++ b/toolkit/components/bitsdownload/src/bits_interface/task/request_task.rs @@ -0,0 +1,425 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +use super::{ + action::{Action, RequestAction}, + client::with_bits_client, + dispatch_callback::{ + maybe_dispatch_via_callback, CallbackExpected, CallbackOptional, IsCallbackExpected, + }, + error::BitsTaskError, + from_threadbound::{expect_from_threadbound_option, DataType}, + BitsRequest, +}; + +use bits_client::{BitsClient, Guid}; +use crossbeam_utils::atomic::AtomicCell; +use log::info; +use moz_task::Task; +use nserror::nsresult; +use xpcom::{interfaces::nsIBitsCallback, RefPtr, ThreadBoundRefPtr}; + +type RunFn<D> = fn(Guid, &D, &mut BitsClient) -> Result<(), BitsTaskError>; +type DoneFn = fn(&BitsRequest, bool) -> Result<(), BitsTaskError>; + +pub struct RequestTask<D> { + request: AtomicCell<Option<ThreadBoundRefPtr<BitsRequest>>>, + guid: Guid, + action: RequestAction, + task_data: D, + run_fn: RunFn<D>, + maybe_done_fn: Option<DoneFn>, + callback: AtomicCell<Option<ThreadBoundRefPtr<nsIBitsCallback>>>, + callback_presence: IsCallbackExpected, + result: AtomicCell<Option<Result<(), BitsTaskError>>>, +} + +impl<D> RequestTask<D> +where + D: Sync + Send, +{ + pub fn new( + request: RefPtr<BitsRequest>, + guid: Guid, + action: RequestAction, + task_data: D, + run_fn: RunFn<D>, + maybe_done_fn: Option<DoneFn>, + callback: Option<RefPtr<nsIBitsCallback>>, + callback_presence: IsCallbackExpected, + ) -> RequestTask<D> { + RequestTask { + request: AtomicCell::new(Some(ThreadBoundRefPtr::new(request))), + guid, + action, + task_data, + run_fn, + maybe_done_fn, + callback: AtomicCell::new(callback.map(ThreadBoundRefPtr::new)), + result: AtomicCell::new(None), + callback_presence, + } + } +} + +impl<D> Task for RequestTask<D> { + fn run(&self) { + let result = with_bits_client(self.action.into(), |client| { + (self.run_fn)(self.guid.clone(), &self.task_data, client) + }); + self.result.store(Some(result)); + } + + fn done(&self) -> Result<(), nsresult> { + // If TaskRunnable.run() calls Task.done() to return a result + // on the main thread before TaskRunnable.run() returns on the worker + // thread, then the Task will get dropped on the worker thread. + // + // But the callback is an nsXPCWrappedJS that isn't safe to release + // on the worker thread. So we move it out of the Task here to ensure + // it gets released on the main thread. + let maybe_tb_callback = self.callback.swap(None); + // It also isn't safe to drop the BitsRequest RefPtr off-thread, + // because BitsRequest refcounting is non-atomic + let maybe_tb_request = self.request.swap(None); + + let action: Action = self.action.into(); + let maybe_callback = + expect_from_threadbound_option(&maybe_tb_callback, DataType::Callback, action); + + // Immediately invoked function expression to allow for the ? operator + let result: Result<(), BitsTaskError> = (|| { + let request = + expect_from_threadbound_option(&maybe_tb_request, DataType::BitsRequest, action)?; + + let maybe_result = self.result.swap(None); + + let success = if let Some(result) = maybe_result.as_ref() { + result.is_ok() + } else { + false + }; + + if let Some(done_fn) = self.maybe_done_fn { + done_fn(request, success)?; + } + + maybe_result.ok_or_else(|| BitsTaskError::missing_result(action))? + })(); + info!("BITS Request Task completed: {:?}", result); + maybe_dispatch_via_callback(result, maybe_callback, self.callback_presence) + } +} + +pub struct CompleteTask(RequestTask<()>); + +impl Task for CompleteTask { + fn run(&self) { + self.0.run(); + } + + fn done(&self) -> Result<(), nsresult> { + self.0.done() + } +} + +impl CompleteTask { + pub fn new( + request: RefPtr<BitsRequest>, + id: Guid, + callback: RefPtr<nsIBitsCallback>, + ) -> CompleteTask { + CompleteTask(RequestTask::new( + request, + id, + RequestAction::Complete, + (), + CompleteTask::run_fn, + Some(CompleteTask::done_fn), + Some(callback), + CallbackExpected, + )) + } + + fn run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError> { + client + .complete_job(id) + .map_err(|pipe_error| BitsTaskError::from_pipe(Action::Complete, pipe_error))??; + Ok(()) + } + + fn done_fn(request: &BitsRequest, success: bool) -> Result<(), BitsTaskError> { + if success { + request.on_finished(); + } + Ok(()) + } +} + +pub struct CancelTask(RequestTask<()>); + +impl Task for CancelTask { + fn run(&self) { + self.0.run(); + } + + fn done(&self) -> Result<(), nsresult> { + self.0.done() + } +} + +impl CancelTask { + pub fn new( + request: RefPtr<BitsRequest>, + id: Guid, + callback: Option<RefPtr<nsIBitsCallback>>, + ) -> CancelTask { + let callback_presence = if callback.is_some() { + CallbackExpected + } else { + CallbackOptional + }; + + CancelTask(RequestTask::new( + request, + id, + RequestAction::Cancel, + (), + CancelTask::run_fn, + Some(CancelTask::done_fn), + callback, + callback_presence, + )) + } + + fn run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError> { + client + .cancel_job(id) + .map_err(|pipe_error| BitsTaskError::from_pipe(Action::Cancel, pipe_error))??; + Ok(()) + } + + fn done_fn(request: &BitsRequest, success: bool) -> Result<(), BitsTaskError> { + request.finish_cancel_action(success); + Ok(()) + } +} + +pub struct SuspendTask(RequestTask<()>); + +impl Task for SuspendTask { + fn run(&self) { + self.0.run(); + } + + fn done(&self) -> Result<(), nsresult> { + self.0.done() + } +} + +impl SuspendTask { + pub fn new( + request: RefPtr<BitsRequest>, + id: Guid, + callback: Option<RefPtr<nsIBitsCallback>>, + ) -> SuspendTask { + let callback_presence = if callback.is_some() { + CallbackExpected + } else { + CallbackOptional + }; + + SuspendTask(RequestTask::new( + request, + id, + RequestAction::Suspend, + (), + SuspendTask::run_fn, + None, + callback, + callback_presence, + )) + } + + fn run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError> { + client + .suspend_job(id) + .map_err(|pipe_error| BitsTaskError::from_pipe(Action::Suspend, pipe_error))??; + Ok(()) + } +} + +pub struct ResumeTask(RequestTask<()>); + +impl Task for ResumeTask { + fn run(&self) { + self.0.run(); + } + + fn done(&self) -> Result<(), nsresult> { + self.0.done() + } +} + +impl ResumeTask { + pub fn new( + request: RefPtr<BitsRequest>, + id: Guid, + callback: Option<RefPtr<nsIBitsCallback>>, + ) -> ResumeTask { + let callback_presence = if callback.is_some() { + CallbackExpected + } else { + CallbackOptional + }; + + ResumeTask(RequestTask::new( + request, + id, + RequestAction::Resume, + (), + ResumeTask::run_fn, + None, + callback, + callback_presence, + )) + } + + fn run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError> { + client + .resume_job(id) + .map_err(|pipe_error| BitsTaskError::from_pipe(Action::Resume, pipe_error))??; + Ok(()) + } +} + +pub struct ChangeMonitorIntervalTask(RequestTask<u32>); + +impl Task for ChangeMonitorIntervalTask { + fn run(&self) { + self.0.run(); + } + + fn done(&self) -> Result<(), nsresult> { + self.0.done() + } +} + +impl ChangeMonitorIntervalTask { + pub fn new( + request: RefPtr<BitsRequest>, + id: Guid, + update_interval_ms: u32, + callback: RefPtr<nsIBitsCallback>, + ) -> ChangeMonitorIntervalTask { + ChangeMonitorIntervalTask(RequestTask::new( + request, + id, + RequestAction::SetMonitorInterval, + update_interval_ms, + ChangeMonitorIntervalTask::run_fn, + None, + Some(callback), + CallbackExpected, + )) + } + + fn run_fn( + id: Guid, + update_interval_ms: &u32, + client: &mut BitsClient, + ) -> Result<(), BitsTaskError> { + client + .set_update_interval(id, *update_interval_ms) + .map_err(|pipe_error| { + BitsTaskError::from_pipe(Action::SetMonitorInterval, pipe_error) + })??; + Ok(()) + } +} + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum Priority { + High, + Low, +} + +pub struct SetPriorityTask(RequestTask<Priority>); + +impl Task for SetPriorityTask { + fn run(&self) { + self.0.run(); + } + + fn done(&self) -> Result<(), nsresult> { + self.0.done() + } +} + +impl SetPriorityTask { + pub fn new( + request: RefPtr<BitsRequest>, + id: Guid, + priority: Priority, + callback: RefPtr<nsIBitsCallback>, + ) -> SetPriorityTask { + SetPriorityTask(RequestTask::new( + request, + id, + RequestAction::SetPriority, + priority, + SetPriorityTask::run_fn, + None, + Some(callback), + CallbackExpected, + )) + } + + fn run_fn(id: Guid, priority: &Priority, client: &mut BitsClient) -> Result<(), BitsTaskError> { + client + .set_job_priority(id, *priority == Priority::High) + .map_err(|pipe_error| BitsTaskError::from_pipe(Action::SetPriority, pipe_error))??; + Ok(()) + } +} + +pub struct SetNoProgressTimeoutTask(RequestTask<u32>); + +impl Task for SetNoProgressTimeoutTask { + fn run(&self) { + self.0.run(); + } + + fn done(&self) -> Result<(), nsresult> { + self.0.done() + } +} + +impl SetNoProgressTimeoutTask { + pub fn new( + request: RefPtr<BitsRequest>, + id: Guid, + timeout_secs: u32, + callback: RefPtr<nsIBitsCallback>, + ) -> SetNoProgressTimeoutTask { + SetNoProgressTimeoutTask(RequestTask::new( + request, + id, + RequestAction::SetNoProgressTimeout, + timeout_secs, + SetNoProgressTimeoutTask::run_fn, + None, + Some(callback), + CallbackExpected, + )) + } + + fn run_fn(id: Guid, timeout_secs: &u32, client: &mut BitsClient) -> Result<(), BitsTaskError> { + client + .set_no_progress_timeout(id, *timeout_secs) + .map_err(|pipe_error| { + BitsTaskError::from_pipe(Action::SetNoProgressTimeout, pipe_error) + })??; + Ok(()) + } +} diff --git a/toolkit/components/bitsdownload/src/bits_interface/task/service_task.rs b/toolkit/components/bitsdownload/src/bits_interface/task/service_task.rs new file mode 100644 index 0000000000..c66f127f7a --- /dev/null +++ b/toolkit/components/bitsdownload/src/bits_interface/task/service_task.rs @@ -0,0 +1,332 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +use super::{ + action::{ + Action, + Action::{MonitorDownload, StartDownload}, + ServiceAction, + }, + client::{with_maybe_new_bits_client, ClientInitData}, + dispatch_callback::{maybe_dispatch_request_via_callback, CallbackExpected}, + error::{BitsTaskError, ErrorStage::CommandThread}, + from_threadbound::{expect_from_threadbound_option, get_from_threadbound_option, DataType}, + string::nsCString_to_OsString, + BitsRequest, BitsService, +}; + +use bits_client::{BitsClient, BitsMonitorClient, BitsProxyUsage, Guid}; +use crossbeam_utils::atomic::AtomicCell; +use log::{info, warn}; +use moz_task::Task; +use nserror::nsresult; +use nsstring::nsCString; +use xpcom::{ + interfaces::{nsIBitsNewRequestCallback, nsIRequestObserver, nsISupports}, + RefPtr, ThreadBoundRefPtr, +}; + +// D is the Data Type that the RunFn function needs to make S. +// S is the Success Type that the RunFn returns on success and that the +// DoneFn needs to make the BitsRequest. +type RunFn<D, S> = fn(&D, &mut BitsClient) -> Result<S, BitsTaskError>; +type DoneFn<D, S> = fn( + &D, + S, + &ClientInitData, + &BitsService, + &nsIRequestObserver, + Option<&nsISupports>, +) -> Result<RefPtr<BitsRequest>, BitsTaskError>; + +pub struct ServiceTask<D, S> { + client_init_data: ClientInitData, + action: ServiceAction, + task_data: D, + run_fn: RunFn<D, S>, + done_fn: DoneFn<D, S>, + bits_service: AtomicCell<Option<ThreadBoundRefPtr<BitsService>>>, + observer: AtomicCell<Option<ThreadBoundRefPtr<nsIRequestObserver>>>, + context: AtomicCell<Option<ThreadBoundRefPtr<nsISupports>>>, + callback: AtomicCell<Option<ThreadBoundRefPtr<nsIBitsNewRequestCallback>>>, + result: AtomicCell<Option<Result<S, BitsTaskError>>>, +} + +impl<D, S> ServiceTask<D, S> +where + D: Sync + Send, + S: Sync + Send, +{ + pub fn new( + client_init_data: ClientInitData, + action: ServiceAction, + task_data: D, + run_fn: RunFn<D, S>, + done_fn: DoneFn<D, S>, + bits_service: RefPtr<BitsService>, + observer: RefPtr<nsIRequestObserver>, + context: Option<RefPtr<nsISupports>>, + callback: RefPtr<nsIBitsNewRequestCallback>, + ) -> ServiceTask<D, S> { + ServiceTask { + client_init_data, + action, + task_data, + run_fn, + done_fn, + bits_service: AtomicCell::new(Some(ThreadBoundRefPtr::new(bits_service))), + observer: AtomicCell::new(Some(ThreadBoundRefPtr::new(observer))), + context: AtomicCell::new(context.map(ThreadBoundRefPtr::new)), + callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))), + result: AtomicCell::new(None), + } + } +} + +impl<D, S> Task for ServiceTask<D, S> { + fn run(&self) { + let result = + with_maybe_new_bits_client(&self.client_init_data, self.action.into(), |client| { + (self.run_fn)(&self.task_data, client) + }); + self.result.store(Some(result)); + } + + fn done(&self) -> Result<(), nsresult> { + // If TaskRunnable.run() calls Task.done() to return a result + // on the main thread before TaskRunnable.run() returns on the worker + // thread, then the Task will get dropped on the worker thread. + // + // But the callback is an nsXPCWrappedJS that isn't safe to release + // on the worker thread. So we move it out of the Task here to ensure + // it gets released on the main thread. + let maybe_tb_callback = self.callback.swap(None); + // It also isn't safe to drop the BitsService RefPtr off-thread, + // because BitsService refcounting is non-atomic + let maybe_tb_service = self.bits_service.swap(None); + // The observer and context are also an nsXPCWrappedJS that aren't safe + // to release on the worker thread. + let maybe_tb_observer = self.observer.swap(None); + let maybe_tb_context = self.context.swap(None); + + let action: Action = self.action.into(); + let maybe_callback = + expect_from_threadbound_option(&maybe_tb_callback, DataType::Callback, action); + + // Immediately invoked function expression to allow for the ? operator + let result: Result<RefPtr<BitsRequest>, BitsTaskError> = (|| { + let bits_service = + expect_from_threadbound_option(&maybe_tb_service, DataType::BitsService, action)?; + let observer = + expect_from_threadbound_option(&maybe_tb_observer, DataType::Observer, action)?; + let maybe_context = + get_from_threadbound_option(&maybe_tb_context, DataType::Context, action); + let success = self + .result + .swap(None) + .ok_or_else(|| BitsTaskError::missing_result(action))??; + + (self.done_fn)( + &self.task_data, + success, + &self.client_init_data, + bits_service, + observer, + maybe_context, + ) + })(); + info!("BITS Interface Task completed: {:?}", result); + // We incremented the request count when we dispatched an action to + // start the job. Now we will decrement since the action completed. + // See the declaration of InitBitsService::request_count for details. + let bits_service_result = + expect_from_threadbound_option(&maybe_tb_service, DataType::BitsService, action); + match bits_service_result { + Ok(bits_service) => { + bits_service.dec_request_count(); + } + Err(error) => { + warn!( + concat!( + "Unable to decrement the request count when finishing ServiceTask. ", + "The command thread may not be shut down. Error: {:?}" + ), + error + ); + } + } + + maybe_dispatch_request_via_callback(result, maybe_callback, CallbackExpected) + } +} + +struct StartDownloadData { + download_url: nsCString, + save_rel_path: nsCString, + proxy: BitsProxyUsage, + no_progress_timeout_secs: u32, + update_interval_ms: u32, +} + +struct StartDownloadSuccess { + guid: Guid, + monitor_client: BitsMonitorClient, +} + +pub struct StartDownloadTask(ServiceTask<StartDownloadData, StartDownloadSuccess>); + +impl Task for StartDownloadTask { + fn run(&self) { + self.0.run(); + } + + fn done(&self) -> Result<(), nsresult> { + self.0.done() + } +} + +impl StartDownloadTask { + pub fn new( + client_init_data: ClientInitData, + download_url: nsCString, + save_rel_path: nsCString, + proxy: BitsProxyUsage, + no_progress_timeout_secs: u32, + update_interval_ms: u32, + bits_service: RefPtr<BitsService>, + observer: RefPtr<nsIRequestObserver>, + context: Option<RefPtr<nsISupports>>, + callback: RefPtr<nsIBitsNewRequestCallback>, + ) -> StartDownloadTask { + StartDownloadTask(ServiceTask::new( + client_init_data, + ServiceAction::StartDownload, + StartDownloadData { + download_url, + save_rel_path, + proxy, + no_progress_timeout_secs, + update_interval_ms, + }, + StartDownloadTask::run_fn, + StartDownloadTask::done_fn, + bits_service, + observer, + context, + callback, + )) + } + + fn run_fn( + data: &StartDownloadData, + client: &mut BitsClient, + ) -> Result<StartDownloadSuccess, BitsTaskError> { + let url = nsCString_to_OsString(&data.download_url, StartDownload, CommandThread)?; + let path = nsCString_to_OsString(&data.save_rel_path, StartDownload, CommandThread)?; + let (success, monitor_client) = client + .start_job( + url, + path, + data.proxy, + data.no_progress_timeout_secs, + data.update_interval_ms, + ) + .map_err(|pipe_error| BitsTaskError::from_pipe(StartDownload, pipe_error))??; + Ok(StartDownloadSuccess { + guid: success.guid, + monitor_client, + }) + } + + fn done_fn( + _data: &StartDownloadData, + success: StartDownloadSuccess, + client_init_data: &ClientInitData, + bits_service: &BitsService, + observer: &nsIRequestObserver, + maybe_context: Option<&nsISupports>, + ) -> Result<RefPtr<BitsRequest>, BitsTaskError> { + BitsRequest::new( + success.guid.clone(), + RefPtr::new(bits_service), + client_init_data.monitor_timeout_ms, + RefPtr::new(&observer), + maybe_context.map(RefPtr::new), + success.monitor_client, + ServiceAction::StartDownload, + ) + } +} + +struct MonitorDownloadData { + guid: Guid, + update_interval_ms: u32, +} + +pub struct MonitorDownloadTask(ServiceTask<MonitorDownloadData, BitsMonitorClient>); + +impl Task for MonitorDownloadTask { + fn run(&self) { + self.0.run(); + } + + fn done(&self) -> Result<(), nsresult> { + self.0.done() + } +} + +impl MonitorDownloadTask { + pub fn new( + client_init_data: ClientInitData, + guid: Guid, + update_interval_ms: u32, + bits_service: RefPtr<BitsService>, + observer: RefPtr<nsIRequestObserver>, + context: Option<RefPtr<nsISupports>>, + callback: RefPtr<nsIBitsNewRequestCallback>, + ) -> MonitorDownloadTask { + MonitorDownloadTask(ServiceTask::new( + client_init_data, + ServiceAction::MonitorDownload, + MonitorDownloadData { + guid, + update_interval_ms, + }, + MonitorDownloadTask::run_fn, + MonitorDownloadTask::done_fn, + bits_service, + observer, + context, + callback, + )) + } + + fn run_fn( + data: &MonitorDownloadData, + client: &mut BitsClient, + ) -> Result<BitsMonitorClient, BitsTaskError> { + let result = client + .monitor_job(data.guid.clone(), data.update_interval_ms) + .map_err(|pipe_error| BitsTaskError::from_pipe(MonitorDownload, pipe_error)); + Ok(result??) + } + + fn done_fn( + data: &MonitorDownloadData, + monitor_client: BitsMonitorClient, + client_init_data: &ClientInitData, + bits_service: &BitsService, + observer: &nsIRequestObserver, + maybe_context: Option<&nsISupports>, + ) -> Result<RefPtr<BitsRequest>, BitsTaskError> { + BitsRequest::new( + data.guid.clone(), + RefPtr::new(bits_service), + client_init_data.monitor_timeout_ms, + RefPtr::new(&observer), + maybe_context.map(RefPtr::new), + monitor_client, + ServiceAction::MonitorDownload, + ) + } +} |