summaryrefslogtreecommitdiffstats
path: root/toolkit/components/bitsdownload/bits_client/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
commit26a029d407be480d791972afb5975cf62c9360a6 (patch)
treef435a8308119effd964b339f76abb83a57c29483 /toolkit/components/bitsdownload/bits_client/src
parentInitial commit. (diff)
downloadfirefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz
firefox-26a029d407be480d791972afb5975cf62c9360a6.zip
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'toolkit/components/bitsdownload/bits_client/src')
-rw-r--r--toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs380
-rw-r--r--toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs504
-rw-r--r--toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs628
-rw-r--r--toolkit/components/bitsdownload/bits_client/src/lib.rs258
4 files changed, 1770 insertions, 0 deletions
diff --git a/toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs b/toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs
new file mode 100644
index 0000000000..6f19f5ef23
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs
@@ -0,0 +1,380 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+//! Command, response, and status types.
+
+use std::error::Error as StdError;
+use std::ffi::OsString;
+use std::fmt;
+use std::result;
+
+use guid_win::Guid;
+use thiserror::Error;
+
+use super::{BitsErrorContext, BitsJobProgress, BitsJobState, BitsJobTimes, BitsProxyUsage};
+
+type HRESULT = i32;
+
+/// An HRESULT with a descriptive message
+#[derive(Clone, Debug)]
+pub struct HResultMessage {
+ pub hr: HRESULT,
+ pub message: String,
+}
+
+impl fmt::Display for HResultMessage {
+ fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
+ self.message.fmt(f)
+ }
+}
+
+impl StdError for HResultMessage {}
+
+/// Commands which can be sent to the server.
+///
+/// This is currently unused as the out-of-process Local Service server is not finished.
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub enum Command {
+ StartJob(StartJobCommand),
+ MonitorJob(MonitorJobCommand),
+ SuspendJob(SuspendJobCommand),
+ ResumeJob(ResumeJobCommand),
+ SetJobPriority(SetJobPriorityCommand),
+ SetNoProgressTimeout(SetNoProgressTimeoutCommand),
+ SetUpdateInterval(SetUpdateIntervalCommand),
+ CompleteJob(CompleteJobCommand),
+ CancelJob(CancelJobCommand),
+}
+
+/// Combine a [`Command`](enum.Command.html) with its success and failure result types.
+#[doc(hidden)]
+pub trait CommandType {
+ type Success;
+ type Failure: StdError;
+ fn wrap(command: Self) -> Command;
+}
+
+// Start Job
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct StartJobCommand {
+ pub url: OsString,
+ pub save_path: OsString,
+ pub proxy_usage: BitsProxyUsage,
+ pub no_progress_timeout_secs: u32,
+ pub monitor: Option<MonitorConfig>,
+}
+
+impl CommandType for StartJobCommand {
+ type Success = StartJobSuccess;
+ type Failure = StartJobFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::StartJob(cmd)
+ }
+}
+
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct MonitorConfig {
+ pub pipe_name: OsString,
+ pub interval_millis: u32,
+}
+
+#[derive(Clone, Debug)]
+pub struct StartJobSuccess {
+ pub guid: Guid,
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum StartJobFailure {
+ #[error("Argument validation failed: {0}")]
+ ArgumentValidation(String),
+ #[error("Create job: {0}")]
+ Create(HResultMessage),
+ #[error("Add file to job: {0}")]
+ AddFile(HResultMessage),
+ #[error("Apply settings to job: {0}")]
+ ApplySettings(HResultMessage),
+ #[error("Resume job: {0}")]
+ Resume(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Monitor Job
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct MonitorJobCommand {
+ pub guid: Guid,
+ pub monitor: MonitorConfig,
+}
+
+impl CommandType for MonitorJobCommand {
+ type Success = ();
+ type Failure = MonitorJobFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::MonitorJob(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum MonitorJobFailure {
+ #[error("Argument validation failed: {0}")]
+ ArgumentValidation(String),
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Suspend Job
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct SuspendJobCommand {
+ pub guid: Guid,
+}
+
+impl CommandType for SuspendJobCommand {
+ type Success = ();
+ type Failure = SuspendJobFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::SuspendJob(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum SuspendJobFailure {
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Suspend job: {0}")]
+ SuspendJob(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Resume Job
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct ResumeJobCommand {
+ pub guid: Guid,
+}
+
+impl CommandType for ResumeJobCommand {
+ type Success = ();
+ type Failure = ResumeJobFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::ResumeJob(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum ResumeJobFailure {
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Resume job: {0}")]
+ ResumeJob(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Set Job Priority
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct SetJobPriorityCommand {
+ pub guid: Guid,
+ pub foreground: bool,
+}
+
+impl CommandType for SetJobPriorityCommand {
+ type Success = ();
+ type Failure = SetJobPriorityFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::SetJobPriority(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum SetJobPriorityFailure {
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Apply settings to job: {0}")]
+ ApplySettings(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Set No Progress Timeout
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct SetNoProgressTimeoutCommand {
+ pub guid: Guid,
+ pub timeout_secs: u32,
+}
+
+impl CommandType for SetNoProgressTimeoutCommand {
+ type Success = ();
+ type Failure = SetNoProgressTimeoutFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::SetNoProgressTimeout(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum SetNoProgressTimeoutFailure {
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Apply settings to job: {0}")]
+ ApplySettings(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Set Update Interval
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct SetUpdateIntervalCommand {
+ pub guid: Guid,
+ pub interval_millis: u32,
+}
+
+impl CommandType for SetUpdateIntervalCommand {
+ type Success = ();
+ type Failure = SetUpdateIntervalFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::SetUpdateInterval(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum SetUpdateIntervalFailure {
+ #[error("Argument validation: {0}")]
+ ArgumentValidation(String),
+ #[error("Monitor not found")]
+ NotFound,
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Complete Job
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct CompleteJobCommand {
+ pub guid: Guid,
+}
+
+impl CommandType for CompleteJobCommand {
+ type Success = ();
+ type Failure = CompleteJobFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::CompleteJob(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum CompleteJobFailure {
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Complete job: {0}")]
+ CompleteJob(HResultMessage),
+ #[error("Job only partially completed")]
+ PartialComplete,
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Cancel Job
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct CancelJobCommand {
+ pub guid: Guid,
+}
+
+impl CommandType for CancelJobCommand {
+ type Success = ();
+ type Failure = CancelJobFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::CancelJob(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum CancelJobFailure {
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Cancel job: {0}")]
+ CancelJob(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+/// Job status report
+///
+/// This includes a URL which updates with redirect but is otherwise the same as
+/// `bits::status::BitsJobStatus`.
+#[derive(Clone, Debug)]
+pub struct JobStatus {
+ pub state: BitsJobState,
+ pub progress: BitsJobProgress,
+ pub error_count: u32,
+ pub error: Option<JobError>,
+ pub times: BitsJobTimes,
+ /// None means same as last time
+ pub url: Option<OsString>,
+}
+
+/// Job error report
+#[derive(Clone, Debug, Error)]
+#[error("Job error in context {context_str}: {error}")]
+pub struct JobError {
+ pub context: BitsErrorContext,
+ pub context_str: String,
+ pub error: HResultMessage,
+}
diff --git a/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs b/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs
new file mode 100644
index 0000000000..c7f6869167
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs
@@ -0,0 +1,504 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+use std::cmp;
+use std::collections::{hash_map, HashMap};
+use std::ffi;
+use std::path;
+use std::sync::{Arc, Condvar, Mutex, Weak};
+use std::time::{Duration, Instant};
+
+use bits::{
+ BackgroundCopyManager, BitsJob, BitsJobPriority, BitsProxyUsage, BG_S_PARTIAL_COMPLETE, E_FAIL,
+};
+use guid_win::Guid;
+
+use bits_protocol::*;
+
+use super::Error;
+
+// This is a macro in order to use the NotFound and GetJob variants from whatever enum is in scope.
+macro_rules! get_job {
+ ($bcm:ident, $guid:expr, $name:expr) => {{
+ $bcm = BackgroundCopyManager::connect().map_err(|e| {
+ ConnectBcm(HResultMessage {
+ hr: e.code(),
+ message: e.to_string(),
+ })
+ })?;
+ $bcm.find_job_by_guid_and_name($guid, $name)
+ .map_err(|e| GetJob($crate::in_process::format_error(&$bcm, e)))?
+ .ok_or(NotFound)?
+ }};
+}
+
+fn format_error(bcm: &BackgroundCopyManager, error: comedy::HResult) -> HResultMessage {
+ let bits_description = bcm.get_error_description(error.code()).ok();
+
+ HResultMessage {
+ hr: error.code(),
+ message: if let Some(desc) = bits_description {
+ format!("{}: {}", error, desc)
+ } else {
+ format!("{}", error)
+ },
+ }
+}
+
+// The in-process client uses direct BITS calls via the `bits` crate.
+// See the corresponding functions in BitsClient.
+pub struct InProcessClient {
+ job_name: ffi::OsString,
+ save_path_prefix: path::PathBuf,
+ monitors: HashMap<Guid, InProcessMonitorControl>,
+}
+
+impl InProcessClient {
+ pub fn new(
+ job_name: ffi::OsString,
+ save_path_prefix: ffi::OsString,
+ ) -> Result<InProcessClient, Error> {
+ Ok(InProcessClient {
+ job_name,
+ save_path_prefix: path::PathBuf::from(save_path_prefix),
+ monitors: HashMap::new(),
+ })
+ }
+
+ pub fn start_job(
+ &mut self,
+ url: ffi::OsString,
+ save_path: ffi::OsString,
+ proxy_usage: BitsProxyUsage,
+ no_progress_timeout_secs: u32,
+ monitor_interval_millis: u32,
+ ) -> Result<(StartJobSuccess, InProcessMonitor), StartJobFailure> {
+ use StartJobFailure::*;
+
+ let full_path = self.save_path_prefix.join(save_path);
+
+ // Verify that `full_path` is under the directory called `save_path_prefix`.
+ {
+ let canonical_prefix = self.save_path_prefix.canonicalize().map_err(|e| {
+ ArgumentValidation(format!("save_path_prefix.canonicalize(): {}", e))
+ })?;
+ // Full path minus file name, canonicalize() fails with nonexistent files, but the
+ // parent directory ought to exist.
+ let canonical_full_path = full_path
+ .parent()
+ .ok_or_else(|| ArgumentValidation("full_path.parent(): None".into()))?
+ .canonicalize()
+ .map_err(|e| {
+ ArgumentValidation(format!("full_path.parent().canonicalize(): {}", e))
+ })?;
+
+ if !canonical_full_path.starts_with(&canonical_prefix) {
+ return Err(ArgumentValidation(format!(
+ "{:?} is not within {:?}",
+ canonical_full_path, canonical_prefix
+ )));
+ }
+ }
+
+ // TODO: Should the job be explicitly cleaned up if this fn can't return success?
+ // If the job is dropped before `AddFile` succeeds, I think it automatically gets
+ // deleted from the queue. There is only one fallible call after that (`Resume`).
+
+ let bcm = BackgroundCopyManager::connect().map_err(|e| {
+ ConnectBcm(HResultMessage {
+ hr: e.code(),
+ message: e.to_string(),
+ })
+ })?;
+ let mut job = bcm
+ .create_job(&self.job_name)
+ .map_err(|e| Create(format_error(&bcm, e)))?;
+
+ let guid = job.guid().map_err(|e| OtherBITS(format_error(&bcm, e)))?;
+
+ (|| {
+ job.set_proxy_usage(proxy_usage)?;
+ job.set_minimum_retry_delay(60)?;
+ job.set_no_progress_timeout(no_progress_timeout_secs)?;
+ job.set_redirect_report()?;
+
+ job.set_priority(BitsJobPriority::Foreground)?;
+
+ Ok(())
+ })()
+ .map_err(|e| ApplySettings(format_error(&bcm, e)))?;
+
+ let (client, control) = InProcessMonitor::new(&mut job, monitor_interval_millis)
+ .map_err(|e| OtherBITS(format_error(&bcm, e)))?;
+
+ job.add_file(&url, &full_path.into_os_string())
+ .map_err(|e| AddFile(format_error(&bcm, e)))?;
+
+ job.resume().map_err(|e| Resume(format_error(&bcm, e)))?;
+
+ self.monitors.insert(guid.clone(), control);
+
+ Ok((StartJobSuccess { guid }, client))
+ }
+
+ pub fn monitor_job(
+ &mut self,
+ guid: Guid,
+ interval_millis: u32,
+ ) -> Result<InProcessMonitor, MonitorJobFailure> {
+ use MonitorJobFailure::*;
+
+ // Stop any preexisting monitor for the same guid.
+ let _ = self.stop_update(guid.clone());
+
+ let bcm;
+ let (client, control) =
+ InProcessMonitor::new(&mut get_job!(bcm, &guid, &self.job_name), interval_millis)
+ .map_err(|e| OtherBITS(format_error(&bcm, e)))?;
+
+ self.monitors.insert(guid, control);
+
+ Ok(client)
+ }
+
+ pub fn suspend_job(&mut self, guid: Guid) -> Result<(), SuspendJobFailure> {
+ use SuspendJobFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .suspend()
+ .map_err(|e| SuspendJob(format_error(&bcm, e)))?;
+
+ Ok(())
+ }
+
+ pub fn resume_job(&mut self, guid: Guid) -> Result<(), ResumeJobFailure> {
+ use ResumeJobFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .resume()
+ .map_err(|e| ResumeJob(format_error(&bcm, e)))?;
+
+ Ok(())
+ }
+
+ pub fn set_job_priority(
+ &mut self,
+ guid: Guid,
+ foreground: bool,
+ ) -> Result<(), SetJobPriorityFailure> {
+ use SetJobPriorityFailure::*;
+
+ let priority = if foreground {
+ BitsJobPriority::Foreground
+ } else {
+ BitsJobPriority::Normal
+ };
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .set_priority(priority)
+ .map_err(|e| ApplySettings(format_error(&bcm, e)))?;
+
+ Ok(())
+ }
+
+ pub fn set_no_progress_timeout(
+ &mut self,
+ guid: Guid,
+ timeout_secs: u32,
+ ) -> Result<(), SetNoProgressTimeoutFailure> {
+ use SetNoProgressTimeoutFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .set_no_progress_timeout(timeout_secs)
+ .map_err(|e| ApplySettings(format_error(&bcm, e)))?;
+
+ Ok(())
+ }
+
+ fn get_monitor_control_sender(&mut self, guid: Guid) -> Option<Arc<ControlPair>> {
+ if let hash_map::Entry::Occupied(occ) = self.monitors.entry(guid) {
+ if let Some(sender) = occ.get().0.upgrade() {
+ Some(sender)
+ } else {
+ // Remove dangling Weak
+ occ.remove_entry();
+ None
+ }
+ } else {
+ None
+ }
+ }
+
+ pub fn set_update_interval(
+ &mut self,
+ guid: Guid,
+ interval_millis: u32,
+ ) -> Result<(), SetUpdateIntervalFailure> {
+ use SetUpdateIntervalFailure::*;
+
+ if let Some(sender) = self.get_monitor_control_sender(guid) {
+ let mut s = sender.1.lock().unwrap();
+ s.interval_millis = interval_millis;
+ sender.0.notify_all();
+ Ok(())
+ } else {
+ Err(NotFound)
+ }
+ }
+
+ pub fn stop_update(&mut self, guid: Guid) -> Result<(), SetUpdateIntervalFailure> {
+ use SetUpdateIntervalFailure::*;
+
+ if let Some(sender) = self.get_monitor_control_sender(guid) {
+ sender.1.lock().unwrap().shutdown = true;
+ sender.0.notify_all();
+ Ok(())
+ } else {
+ Err(NotFound)
+ }
+ }
+
+ pub fn complete_job(&mut self, guid: Guid) -> Result<(), CompleteJobFailure> {
+ use CompleteJobFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .complete()
+ .map_err(|e| CompleteJob(format_error(&bcm, e)))
+ .and_then(|hr| {
+ if hr == BG_S_PARTIAL_COMPLETE as i32 {
+ Err(PartialComplete)
+ } else {
+ Ok(())
+ }
+ })?;
+
+ let _ = self.stop_update(guid);
+
+ Ok(())
+ }
+
+ pub fn cancel_job(&mut self, guid: Guid) -> Result<(), CancelJobFailure> {
+ use CancelJobFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .cancel()
+ .map_err(|e| CancelJob(format_error(&bcm, e)))?;
+
+ let _ = self.stop_update(guid);
+
+ Ok(())
+ }
+}
+
+// InProcessMonitor can be used on any thread, and `ControlPair` can be synchronously modified to
+// control a blocked `get_status` call from another thread.
+pub struct InProcessMonitor {
+ vars: Arc<ControlPair>,
+ guid: Guid,
+ last_status_time: Option<Instant>,
+ last_url: Option<ffi::OsString>,
+}
+
+// The `Condvar` is notified when `InProcessMonitorVars` changes.
+type ControlPair = (Condvar, Mutex<InProcessMonitorVars>);
+struct InProcessMonitorControl(Weak<ControlPair>);
+
+// RefUnwindSafe is not impl'd for Condvar but likely should be,
+// see https://github.com/rust-lang/rust/issues/54768
+impl std::panic::RefUnwindSafe for InProcessMonitorControl {}
+
+struct InProcessMonitorVars {
+ interval_millis: u32,
+ notified: bool,
+ shutdown: bool,
+}
+
+impl InProcessMonitor {
+ fn new(
+ job: &mut BitsJob,
+ interval_millis: u32,
+ ) -> Result<(InProcessMonitor, InProcessMonitorControl), comedy::HResult> {
+ let guid = job.guid()?;
+
+ let vars = Arc::new((
+ Condvar::new(),
+ Mutex::new(InProcessMonitorVars {
+ interval_millis,
+ notified: false,
+ shutdown: false,
+ }),
+ ));
+
+ let transferred_control = InProcessMonitorControl(Arc::downgrade(&vars));
+ let transferred_cb = Box::new(move || {
+ if let Some(control) = transferred_control.0.upgrade() {
+ if let Ok(mut vars) = control.1.lock() {
+ vars.notified = true;
+ control.0.notify_all();
+ return Ok(());
+ }
+ }
+ Err(E_FAIL)
+ });
+
+ let error_control = InProcessMonitorControl(Arc::downgrade(&vars));
+ let error_cb = Box::new(move || {
+ if let Some(control) = error_control.0.upgrade() {
+ if let Ok(mut vars) = control.1.lock() {
+ vars.notified = true;
+ control.0.notify_all();
+ return Ok(());
+ }
+ }
+ Err(E_FAIL)
+ });
+
+ // Note: These callbacks are never explicitly cleared. They will be freed when the
+ // job is deleted from BITS, and they will be cleared if an attempt is made to call them
+ // when they are no longer valid (e.g. after the process exits). This is done mostly for
+ // simplicity and should be safe.
+
+ job.register_callbacks(Some(transferred_cb), Some(error_cb), None)?;
+
+ let control = InProcessMonitorControl(Arc::downgrade(&vars));
+
+ let monitor = InProcessMonitor {
+ guid,
+ vars,
+ last_status_time: None,
+ last_url: None,
+ };
+
+ Ok((monitor, control))
+ }
+
+ pub fn get_status(
+ &mut self,
+ timeout_millis: u32,
+ ) -> Result<Result<JobStatus, HResultMessage>, Error> {
+ let timeout = Duration::from_millis(u64::from(timeout_millis));
+
+ let started = Instant::now();
+ let timeout_end = started + timeout;
+
+ {
+ let mut s = self.vars.1.lock().unwrap();
+ loop {
+ let wait_start = Instant::now();
+
+ if s.shutdown {
+ // Disconnected, immediately return error.
+ // Note: Shutdown takes priority over simultaneous notification.
+ return Err(Error::NotConnected);
+ }
+
+ if wait_start >= timeout_end {
+ // Timed out, immediately return timeout error.
+ // This should not normally happen with the in-process monitor, but the
+ // monitor interval could be longer than the timeout.
+ s.shutdown = true;
+ return Err(Error::Timeout);
+ }
+
+ // Get the interval every pass through the loop, in case it has changed.
+ let interval = Duration::from_millis(u64::from(s.interval_millis));
+
+ let wait_until = self
+ .last_status_time
+ .map(|last_status_time| cmp::min(last_status_time + interval, timeout_end));
+
+ if s.notified {
+ // Notified, exit loop to get status.
+ s.notified = false;
+ break;
+ }
+
+ if wait_until.is_none() {
+ // First status report, no waiting, exit loop to get status.
+ break;
+ }
+
+ let wait_until = wait_until.unwrap();
+
+ if wait_until <= wait_start {
+ // No time left to wait. This can't be due to timeout because
+ // `wait_until <= wait_start < timeout_end`.
+ // Status report due, exit loop to get status.
+ break;
+ }
+
+ // Wait.
+ // Do not attempt to recover from poisoned Mutex.
+ s = self
+ .vars
+ .0
+ .wait_timeout(s, wait_until - wait_start)
+ .unwrap()
+ .0;
+
+ // Mutex re-acquired, loop.
+ }
+ }
+
+ // No error yet, start getting status now.
+ self.last_status_time = Some(Instant::now());
+
+ let bcm = match BackgroundCopyManager::connect() {
+ Ok(bcm) => bcm,
+ Err(e) => {
+ // On any error, disconnect.
+ self.vars.1.lock().unwrap().shutdown = true;
+
+ // Errors below can use the BCM to do `format_error()`, but this one just gets the
+ // basic `comedy::HResult` treatment.
+ return Ok(Err(HResultMessage {
+ hr: e.code(),
+ message: format!("{}", e),
+ }));
+ }
+ };
+
+ Ok((|| {
+ let mut job = bcm.get_job_by_guid(&self.guid)?;
+
+ let status = job.get_status()?;
+ let url = job.get_first_file()?.get_remote_name()?;
+
+ Ok(JobStatus {
+ state: status.state,
+ progress: status.progress,
+ error_count: status.error_count,
+ error: status.error.map(|e| JobError {
+ context: e.context,
+ context_str: e.context_str,
+ error: HResultMessage {
+ hr: e.error,
+ message: e.error_str,
+ },
+ }),
+ times: status.times,
+ url: if self.last_url.is_some() && *self.last_url.as_ref().unwrap() == url {
+ None
+ } else {
+ self.last_url = Some(url);
+ self.last_url.clone()
+ },
+ })
+ })()
+ .map_err(|e| {
+ // On any error, disconnect.
+ self.vars.1.lock().unwrap().shutdown = true;
+ format_error(&bcm, e)
+ }))
+ }
+}
+
+#[cfg(test)]
+mod tests;
diff --git a/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs b/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs
new file mode 100644
index 0000000000..5545f6bd74
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs
@@ -0,0 +1,628 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+// These are full integration tests that use the BITS service.
+
+// TODO
+// It may make sense to restrict how many tests can run at once. BITS is only supposed to support
+// four simultaneous notifications per user, it is not impossible that this test suite could
+// exceed that.
+
+#![cfg(test)]
+extern crate bits;
+extern crate lazy_static;
+extern crate rand;
+extern crate regex;
+extern crate tempfile;
+
+use std::ffi::{OsStr, OsString};
+use std::fs::{self, File};
+use std::io::{Read, Write};
+use std::net::{SocketAddr, TcpListener, TcpStream};
+use std::panic;
+use std::sync::{Arc, Condvar, Mutex};
+use std::thread::{self, JoinHandle};
+use std::time::{Duration, Instant};
+
+use self::{
+ bits::BackgroundCopyManager,
+ lazy_static::lazy_static,
+ rand::{thread_rng, Rng},
+ regex::bytes::Regex,
+ tempfile::{Builder, TempDir},
+};
+use super::{
+ super::{BitsJobState, Error},
+ BitsProxyUsage, InProcessClient, StartJobSuccess,
+};
+
+static SERVER_ADDRESS: [u8; 4] = [127, 0, 0, 1];
+
+lazy_static! {
+ static ref TEST_MUTEX: Mutex<()> = Mutex::new(());
+}
+
+fn format_job_name(name: &str) -> OsString {
+ format!("InProcessClient Test {}", name).into()
+}
+
+fn format_dir_prefix(tmp_dir: &TempDir) -> OsString {
+ let mut dir = tmp_dir.path().to_path_buf().into_os_string();
+ dir.push("\\");
+ dir
+}
+
+fn cancel_jobs(name: &OsStr) {
+ BackgroundCopyManager::connect()
+ .unwrap()
+ .cancel_jobs_by_name(name)
+ .unwrap();
+}
+
+struct HttpServerResponses {
+ body: Box<[u8]>,
+ delay: u64,
+ //error: Box<[u8]>,
+}
+
+struct MockHttpServerHandle {
+ port: u16,
+ join: Option<JoinHandle<Result<(), ()>>>,
+ shutdown: Arc<(Mutex<bool>, Condvar)>,
+}
+
+impl MockHttpServerHandle {
+ fn shutdown(&mut self) {
+ if self.join.is_none() {
+ return;
+ }
+
+ {
+ let &(ref lock, ref cvar) = &*self.shutdown;
+ let mut shutdown = lock.lock().unwrap();
+
+ if !*shutdown {
+ *shutdown = true;
+ cvar.notify_all();
+ }
+ }
+ // Wake up the server from `accept()`. Will fail if the server wasn't listening.
+ let _ = TcpStream::connect_timeout(
+ &(SERVER_ADDRESS, self.port).into(),
+ Duration::from_millis(10_000),
+ );
+
+ match self.join.take().unwrap().join() {
+ Ok(_) => {}
+ Err(p) => panic::resume_unwind(p),
+ }
+ }
+
+ fn format_url(&self, name: &str) -> OsString {
+ format!(
+ "http://{}/{}",
+ SocketAddr::from((SERVER_ADDRESS, self.port)),
+ name
+ )
+ .into()
+ }
+}
+
+fn mock_http_server(name: &'static str, responses: HttpServerResponses) -> MockHttpServerHandle {
+ let mut bind_retries = 10;
+ let shutdown = Arc::new((Mutex::new(false), Condvar::new()));
+ let caller_shutdown = shutdown.clone();
+
+ let (listener, port) = loop {
+ let port = thread_rng().gen_range(1024..0x1_0000u32) as u16;
+ match TcpListener::bind(SocketAddr::from((SERVER_ADDRESS, port))) {
+ Ok(listener) => {
+ break (listener, port);
+ }
+ r @ Err(_) => {
+ if bind_retries == 0 {
+ r.unwrap();
+ }
+ bind_retries -= 1;
+ continue;
+ }
+ }
+ };
+
+ let join = thread::Builder::new()
+ .name(format!("mock_http_server {}", name))
+ .spawn(move || {
+ // returns Err(()) if server should shut down immediately
+ fn check_shutdown(shutdown: &Arc<(Mutex<bool>, Condvar)>) -> Result<(), ()> {
+ if *shutdown.0.lock().unwrap() {
+ Err(())
+ } else {
+ Ok(())
+ }
+ }
+ fn sleep(shutdown: &Arc<(Mutex<bool>, Condvar)>, delay_millis: u64) -> Result<(), ()> {
+ let sleep_start = Instant::now();
+ let sleep_end = sleep_start + Duration::from_millis(delay_millis);
+
+ let (ref lock, ref cvar) = **shutdown;
+ let mut shutdown_requested = lock.lock().unwrap();
+ loop {
+ if *shutdown_requested {
+ return Err(());
+ }
+
+ let before_wait = Instant::now();
+ if before_wait >= sleep_end {
+ return Ok(());
+ }
+ let wait_dur = sleep_end - before_wait;
+ shutdown_requested = cvar.wait_timeout(shutdown_requested, wait_dur).unwrap().0;
+ }
+ }
+
+ let error_404 = Regex::new(r"^((GET)|(HEAD)) [[:print:]]*/error_404 ").unwrap();
+ let error_500 = Regex::new(r"^((GET)|(HEAD)) [[:print:]]*/error_500 ").unwrap();
+
+ loop {
+ let (mut socket, _addr) = listener.accept().expect("accept should succeed");
+
+ socket
+ .set_read_timeout(Some(Duration::from_millis(10_000)))
+ .unwrap();
+ let mut s = Vec::new();
+ for b in Read::by_ref(&mut socket).bytes() {
+ if b.is_err() {
+ eprintln!("read error {:?}", b);
+ break;
+ }
+ let b = b.unwrap();
+ s.push(b);
+ if s.ends_with(b"\r\n\r\n") {
+ break;
+ }
+ check_shutdown(&shutdown)?;
+ }
+
+ // request received
+
+ check_shutdown(&shutdown)?;
+
+ // Special error URIs
+ if error_404.is_match(&s) {
+ sleep(&shutdown, responses.delay)?;
+ let result = socket
+ .write(b"HTTP/1.1 404 Not Found\r\n\r\n")
+ .and_then(|_| socket.flush());
+ if let Err(e) = result {
+ eprintln!("error writing 404 header {:?}", e);
+ }
+ continue;
+ }
+
+ if error_500.is_match(&s) {
+ sleep(&shutdown, responses.delay)?;
+ let result = socket
+ .write(b"HTTP/1.1 500 Internal Server Error\r\n\r\n")
+ .and_then(|_| socket.flush());
+ if let Err(e) = result {
+ eprintln!("error writing 500 header {:?}", e);
+ }
+ continue;
+ }
+
+ // Response with a body.
+ if s.starts_with(b"HEAD") || s.starts_with(b"GET") {
+ let result = socket
+ .write(
+ format!(
+ "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n",
+ responses.body.len()
+ )
+ .as_bytes(),
+ )
+ .and_then(|_| socket.flush());
+ if let Err(e) = result {
+ eprintln!("error writing header {:?}", e);
+ continue;
+ }
+ }
+
+ if s.starts_with(b"GET") {
+ sleep(&shutdown, responses.delay)?;
+ let result = socket.write(&responses.body).and_then(|_| socket.flush());
+ if let Err(e) = result {
+ eprintln!("error writing content {:?}", e);
+ continue;
+ }
+ }
+ }
+ });
+
+ MockHttpServerHandle {
+ port,
+ join: Some(join.unwrap()),
+ shutdown: caller_shutdown,
+ }
+}
+
+// Test wrapper to ensure jobs are canceled, set up name strings
+macro_rules! test {
+ (fn $name:ident($param:ident : &str, $tmpdir:ident : &TempDir) $body:block) => {
+ #[test]
+ fn $name() {
+ let $param = stringify!($name);
+ let $tmpdir = &Builder::new().prefix($param).tempdir().unwrap();
+
+ let result = panic::catch_unwind(|| $body);
+
+ cancel_jobs(&format_job_name($param));
+
+ if let Err(e) = result {
+ panic::resume_unwind(e);
+ }
+ }
+ };
+}
+
+test! {
+ fn start_monitor_and_cancel(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 10_000,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), tmp_dir.path().into()).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 10_000;
+ let timeout = 10_000;
+
+ let (StartJobSuccess {guid}, mut monitor) =
+ client.start_job(
+ server.format_url(name),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // cancel in ~250ms
+ let _join = thread::Builder::new()
+ .spawn(move || {
+ thread::sleep(Duration::from_millis(250));
+ client.cancel_job(guid).unwrap();
+ });
+
+ let start = Instant::now();
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // ~250ms the cancel should cause an immediate disconnect (otherwise we wouldn't get
+ // an update until 10s when the transfer completes or the interval expires)
+ match monitor.get_status(timeout) {
+ Err(Error::NotConnected) => {},
+ Ok(r) => panic!("unexpected success from get_status() {:?}", r),
+ Err(e) => panic!("unexpected failure from get_status() {:?}", e),
+ }
+ assert!(start.elapsed() < Duration::from_millis(9_000));
+
+ server.shutdown();
+ }
+}
+
+test! {
+ fn start_monitor_and_complete(name: &str, tmp_dir: &TempDir) {
+ let file_path = tmp_dir.path().join(name);
+
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 500,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 100;
+ let timeout = 10_000;
+
+ let (StartJobSuccess {guid}, mut monitor) =
+ client.start_job(
+ server.format_url(name),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ let start = Instant::now();
+
+ // Get status reports until transfer finishes (~500ms)
+ let mut completed = false;
+ loop {
+ match monitor.get_status(timeout) {
+ Err(e) => {
+ if completed {
+ break;
+ } else {
+ panic!("monitor failed before completion {:?}", e);
+ }
+ }
+ Ok(Ok(status)) => match BitsJobState::from(status.state) {
+ BitsJobState::Queued | BitsJobState::Connecting
+ | BitsJobState::Transferring => {
+ //eprintln!("{:?}", BitsJobState::from(status.state));
+ //eprintln!("{:?}", status);
+
+ // As long as there is no error, setting the timeout to 0 will not
+ // fail an active transfer.
+ client.set_no_progress_timeout(guid.clone(), 0).unwrap();
+ }
+ BitsJobState::Transferred => {
+ client.complete_job(guid.clone()).unwrap();
+ completed = true;
+ }
+ _ => {
+ panic!("{:?}", status);
+ }
+ }
+ Ok(Err(e)) => panic!("{:?}", e),
+ }
+
+ // Timeout to prevent waiting forever
+ assert!(start.elapsed() < Duration::from_millis(60_000));
+ }
+
+
+ // Verify file contents
+ let result = panic::catch_unwind(|| {
+ let mut file = File::open(file_path.clone()).unwrap();
+ let mut v = Vec::new();
+ file.read_to_end(&mut v).unwrap();
+ assert_eq!(v, name.as_bytes());
+ });
+
+ let _ = fs::remove_file(file_path);
+
+ if let Err(e) = result {
+ panic::resume_unwind(e);
+ }
+
+ // Save this for last to ensure the file is removed.
+ server.shutdown();
+ }
+}
+
+test! {
+ fn async_transferred_notification(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 250,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 60_000;
+ let timeout = 10_000;
+
+ let (_, mut monitor) =
+ client.start_job(
+ server.format_url(name),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // Start the timer now, the initial job creation may be delayed by BITS service startup.
+ let start = Instant::now();
+
+ // First report, immediate
+ let report_1 = monitor.get_status(timeout).expect("should initially be ok").unwrap();
+ let elapsed_to_report_1 = start.elapsed();
+
+ // Transferred notification should come when the job completes in ~250 ms, otherwise we
+ // will be stuck until timeout.
+ let report_2 = monitor.get_status(timeout).expect("should get status update").unwrap();
+ let elapsed_to_report_2 = start.elapsed();
+ assert!(elapsed_to_report_2 < Duration::from_millis(9_000));
+ assert_eq!(report_2.state, BitsJobState::Transferred);
+
+ let short_timeout = 500;
+ let report_3 = monitor.get_status(short_timeout);
+ let elapsed_to_report_3 = start.elapsed();
+
+ if let Ok(report_3) = report_3 {
+ panic!("should be disconnected\n\
+ report_1 ({}.{:03}): {:?}\n\
+ report_2 ({}.{:03}): {:?}\n\
+ report_3 ({}.{:03}): {:?}",
+ elapsed_to_report_1.as_secs(), elapsed_to_report_1.subsec_millis(), report_1,
+ elapsed_to_report_2.as_secs(), elapsed_to_report_2.subsec_millis(), report_2,
+ elapsed_to_report_3.as_secs(), elapsed_to_report_3.subsec_millis(), report_3,
+ );
+ }
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
+
+test! {
+ fn change_interval(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 1000,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 0;
+ let interval = 60_000;
+ let timeout = 10_000;
+
+ let (StartJobSuccess { guid }, mut monitor) =
+ client.start_job(
+ server.format_url(name),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ let start = Instant::now();
+
+ // reduce monitor interval in ~250ms to 500ms
+ let _join = thread::Builder::new()
+ .spawn(move || {
+ thread::sleep(Duration::from_millis(250));
+ client.set_update_interval(guid, 500).unwrap();
+ });
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // Next report should be rescheduled to 500ms by the spawned thread, otherwise no status
+ // until the original 10s interval.
+ monitor.get_status(timeout).expect("expected second status").unwrap();
+ assert!(start.elapsed() < Duration::from_millis(9_000));
+ assert!(start.elapsed() > Duration::from_millis(400));
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
+
+test! {
+ fn async_error_notification(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 100,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 60_000;
+ let timeout = 10_000;
+
+ let (_, mut monitor) =
+ client.start_job(
+ server.format_url("error_404"),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // Start the timer now, the initial job creation may be delayed by BITS service startup.
+ let start = Instant::now();
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // Error notification should come with HEAD response in 100ms, otherwise no status until
+ // 10s interval or timeout.
+ let status = monitor.get_status(timeout).expect("should get status update").unwrap();
+ assert!(start.elapsed() < Duration::from_millis(9_000));
+ assert_eq!(status.state, BitsJobState::Error);
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
+
+test! {
+ fn transient_error(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 100,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 1_000;
+ let timeout = 10_000;
+
+ let (StartJobSuccess { guid }, mut monitor) =
+ client.start_job(
+ server.format_url("error_500"),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // Start the timer now, the initial job creation may be delayed by BITS service startup.
+ let start = Instant::now();
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // Transient error notification should come when the interval expires in ~1s.
+ let status = monitor.get_status(timeout).expect("should get status update").unwrap();
+ assert!(start.elapsed() > Duration::from_millis(900));
+ assert!(start.elapsed() < Duration::from_millis(9_000));
+ assert_eq!(status.state, BitsJobState::TransientError);
+
+ // Lower no progress timeout to 0
+ let set_timeout_at = Instant::now();
+ client.set_no_progress_timeout(guid, 0).unwrap();
+
+ // Should convert the transient error to a permanent error immediately.
+ let status = monitor.get_status(timeout).expect("should get status update").unwrap();
+ assert!(set_timeout_at.elapsed() < Duration::from_millis(500));
+ assert_eq!(status.state, BitsJobState::Error);
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
+
+test! {
+ fn transient_to_permanent_error(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 100,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 0;
+ let interval = 1_000;
+ let timeout = 10_000;
+
+ let (_, mut monitor) =
+ client.start_job(
+ server.format_url("error_500"),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // Start the timer now, the initial job creation may be delayed by BITS service startup.
+ let start = Instant::now();
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // 500 is a transient error, but with no_progress_timeout_secs = 0 it should immediately
+ // produce an error notification with the HEAD response in 100ms. Otherwise no status
+ // until 10s interval or timeout.
+ let status = monitor.get_status(timeout).expect("should get status update").unwrap();
+ assert!(start.elapsed() < Duration::from_millis(500));
+ assert_eq!(status.state, BitsJobState::Error);
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
diff --git a/toolkit/components/bitsdownload/bits_client/src/lib.rs b/toolkit/components/bitsdownload/bits_client/src/lib.rs
new file mode 100644
index 0000000000..7db887bbc5
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/src/lib.rs
@@ -0,0 +1,258 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+//! An interface for managing and monitoring BITS jobs.
+//!
+//! BITS is a Windows service for performing downloads in the background, independent from an
+//! application, usually via HTTP/HTTPS.
+//!
+//! [`BitsClient`](enum.BitsClient.html) is the main interface, used to issue commands.
+//!
+//! [`BitsMonitorClient`](enum.BitsMonitorClient.html) delivers periodic status reports about a
+//! job.
+//!
+//! Microsoft's documentation for BITS can be found at
+//! <https://docs.microsoft.com/en-us/windows/desktop/Bits/background-intelligent-transfer-service-portal>
+
+extern crate bits;
+extern crate comedy;
+extern crate guid_win;
+extern crate thiserror;
+
+pub mod bits_protocol;
+
+mod in_process;
+
+use std::ffi;
+
+use bits_protocol::*;
+use thiserror::Error;
+
+pub use bits::status::{BitsErrorContext, BitsJobState, BitsJobTimes};
+pub use bits::{BitsJobProgress, BitsJobStatus, BitsProxyUsage};
+pub use bits_protocol::{JobError, JobStatus};
+pub use comedy::HResult;
+pub use guid_win::Guid;
+
+// These errors would come from a Local Service client but are mostly unused currently.
+// PipeError properly lives in the crate that deals with named pipes, but it isn't in use now.
+#[derive(Clone, Debug, Eq, Error, PartialEq)]
+pub enum PipeError {
+ #[error("Pipe is not connected")]
+ NotConnected,
+ #[error("Operation timed out")]
+ Timeout,
+ #[error("Should have written {0} bytes, wrote {1}")]
+ WriteCount(usize, u32),
+ #[error("Windows API error")]
+ Api(#[from] HResult),
+}
+
+pub use PipeError as Error;
+
+/// A client for interacting with BITS.
+///
+/// Methods on `BitsClient` return a `Result<Result<_, XyzFailure>, Error>`. The outer `Result`
+/// is `Err` if there was a communication error in sending the associated command or receiving
+/// its response. Currently this is always `Ok` as all clients are in-process. The inner
+/// `Result` is `Err` if there was an error executing the command.
+///
+/// A single `BitsClient` can be used with multiple BITS jobs simultaneously; generally a job
+/// is not bound tightly to a client.
+///
+/// A `BitsClient` tracks all [`BitsMonitorClient`s](enum.BitsMonitorClient.html) that it started
+/// with `start_job()` or `monitor_job()`, so that the monitor can be stopped or modified.
+pub enum BitsClient {
+ // The `InProcess` variant does all BITS calls directly.
+ #[doc(hidden)]
+ InProcess(in_process::InProcessClient),
+ // Space is reserved here for the LocalService variant, which will work through an external
+ // process running as Local Service.
+}
+
+use BitsClient::InProcess;
+
+impl BitsClient {
+ /// Create an in-process `BitsClient`.
+ ///
+ /// `job_name` will be used when creating jobs, and this `BitsClient` can only be used to
+ /// manipulate jobs with that name.
+ ///
+ /// `save_path_prefix` will be prepended to the local `save_path` given to `start_job()`, it
+ /// must name an existing directory.
+ pub fn new(
+ job_name: ffi::OsString,
+ save_path_prefix: ffi::OsString,
+ ) -> Result<BitsClient, Error> {
+ Ok(InProcess(in_process::InProcessClient::new(
+ job_name,
+ save_path_prefix,
+ )?))
+ }
+
+ /// Start a job to download a single file at `url` to local path `save_path` (relative to the
+ /// `save_path_prefix` given when constructing the `BitsClient`).
+ ///
+ /// `save_path_prefix` combined with `save_path` must name a file (existing or not) in an
+ /// existing directory, which must be under the directory named by `save_path_prefix`.
+ ///
+ /// `proxy_usage` determines what proxy will be used.
+ ///
+ /// When a successful result `Ok(result)` is returned, `result.0.guid` is the id for the
+ /// new job, and `result.1` is a monitor client that can be polled for periodic updates,
+ /// returning a result approximately once per `monitor_interval_millis` milliseconds.
+ pub fn start_job(
+ &mut self,
+ url: ffi::OsString,
+ save_path: ffi::OsString,
+ proxy_usage: BitsProxyUsage,
+ no_progress_timeout_secs: u32,
+ monitor_interval_millis: u32,
+ ) -> Result<Result<(StartJobSuccess, BitsMonitorClient), StartJobFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client
+ .start_job(
+ url,
+ save_path,
+ proxy_usage,
+ no_progress_timeout_secs,
+ monitor_interval_millis,
+ )
+ .map(|(success, monitor)| (success, BitsMonitorClient::InProcess(monitor)))),
+ }
+ }
+
+ /// Start monitoring the job with id `guid` approximately once per `monitor_interval_millis`
+ /// milliseconds.
+ ///
+ /// The returned `Ok(monitor)` is a monitor client to be polled for periodic updates.
+ ///
+ /// There can only be one ongoing `BitsMonitorClient` for each job associated with a given
+ /// `BitsClient`. If a monitor client already exists for the specified job, it will be stopped.
+ pub fn monitor_job(
+ &mut self,
+ guid: Guid,
+ interval_millis: u32,
+ ) -> Result<Result<BitsMonitorClient, MonitorJobFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client
+ .monitor_job(guid, interval_millis)
+ .map(BitsMonitorClient::InProcess)),
+ }
+ }
+
+ /// Suspend job `guid`.
+ pub fn suspend_job(&mut self, guid: Guid) -> Result<Result<(), SuspendJobFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.suspend_job(guid)),
+ }
+ }
+
+ /// Resume job `guid`.
+ pub fn resume_job(&mut self, guid: Guid) -> Result<Result<(), ResumeJobFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.resume_job(guid)),
+ }
+ }
+
+ /// Set the priority of job `guid`.
+ ///
+ /// `foreground == true` will set the priority to `BG_JOB_PRIORITY_FOREGROUND`,
+ /// `false` will use the default `BG_JOB_PRIORITY_NORMAL`.
+ /// See the Microsoft documentation for `BG_JOB_PRIORITY` for details.
+ ///
+ /// A job created by `start_job()` will be foreground priority, by default.
+ pub fn set_job_priority(
+ &mut self,
+ guid: Guid,
+ foreground: bool,
+ ) -> Result<Result<(), SetJobPriorityFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.set_job_priority(guid, foreground)),
+ }
+ }
+
+ /// Set the "no progress timeout" of job `guid`.
+ pub fn set_no_progress_timeout(
+ &mut self,
+ guid: Guid,
+ timeout_secs: u32,
+ ) -> Result<Result<(), SetNoProgressTimeoutFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.set_no_progress_timeout(guid, timeout_secs)),
+ }
+ }
+
+ /// Change the update interval for an ongoing monitor of job `guid`.
+ pub fn set_update_interval(
+ &mut self,
+ guid: Guid,
+ interval_millis: u32,
+ ) -> Result<Result<(), SetUpdateIntervalFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.set_update_interval(guid, interval_millis)),
+ }
+ }
+
+ /// Stop any ongoing monitor for job `guid`.
+ pub fn stop_update(
+ &mut self,
+ guid: Guid,
+ ) -> Result<Result<(), SetUpdateIntervalFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.stop_update(guid)),
+ }
+ }
+
+ /// Complete the job `guid`.
+ ///
+ /// This also stops any ongoing monitor for the job.
+ pub fn complete_job(&mut self, guid: Guid) -> Result<Result<(), CompleteJobFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.complete_job(guid)),
+ }
+ }
+
+ /// Cancel the job `guid`.
+ ///
+ /// This also stops any ongoing monitor for the job.
+ pub fn cancel_job(&mut self, guid: Guid) -> Result<Result<(), CancelJobFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.cancel_job(guid)),
+ }
+ }
+}
+
+/// The client side of a monitor for a BITS job.
+///
+/// It is intended to be used by calling `get_status` in a loop to receive notifications about
+/// the status of a job. Because `get_status` blocks, it is recommended to run this loop on its
+/// own thread.
+pub enum BitsMonitorClient {
+ InProcess(in_process::InProcessMonitor),
+}
+
+impl BitsMonitorClient {
+ /// `get_status` will return a result approximately every `monitor_interval_millis`
+ /// milliseconds, but in case a result isn't available within `timeout_millis` milliseconds
+ /// this will return `Err(Error::Timeout)`. Any `Err` returned, including timeout, indicates
+ /// that the monitor has been stopped; the `BitsMonitorClient` should then be discarded.
+ ///
+ /// As with methods on `BitsClient`, `BitsMonitorClient::get_status()` has an inner `Result`
+ /// type which indicates an error returned from the server. Any `Err` here also indicates that
+ /// the monitor has stopped after yielding the result.
+ ///
+ /// The first time `get_status` is called it will return a status without any delay.
+ ///
+ /// If there is an error or the transfer completes, a result may be available sooner than
+ /// the monitor interval.
+ pub fn get_status(
+ &mut self,
+ timeout_millis: u32,
+ ) -> Result<Result<JobStatus, HResultMessage>, Error> {
+ match self {
+ BitsMonitorClient::InProcess(client) => client.get_status(timeout_millis),
+ }
+ }
+}