summaryrefslogtreecommitdiffstats
path: root/toolkit/components/bitsdownload/bits_client
diff options
context:
space:
mode:
Diffstat (limited to 'toolkit/components/bitsdownload/bits_client')
-rw-r--r--toolkit/components/bitsdownload/bits_client/.gitignore3
-rw-r--r--toolkit/components/bitsdownload/bits_client/Cargo.toml19
-rw-r--r--toolkit/components/bitsdownload/bits_client/README.md23
-rw-r--r--toolkit/components/bitsdownload/bits_client/bits/Cargo.toml31
-rw-r--r--toolkit/components/bitsdownload/bits_client/bits/src/callback.rs205
-rw-r--r--toolkit/components/bitsdownload/bits_client/bits/src/lib.rs592
-rw-r--r--toolkit/components/bitsdownload/bits_client/bits/src/status.rs118
-rw-r--r--toolkit/components/bitsdownload/bits_client/bits/src/wide.rs38
-rw-r--r--toolkit/components/bitsdownload/bits_client/examples/test_client.rs285
-rw-r--r--toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs380
-rw-r--r--toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs504
-rw-r--r--toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs628
-rw-r--r--toolkit/components/bitsdownload/bits_client/src/lib.rs258
13 files changed, 3084 insertions, 0 deletions
diff --git a/toolkit/components/bitsdownload/bits_client/.gitignore b/toolkit/components/bitsdownload/bits_client/.gitignore
new file mode 100644
index 0000000000..d78faf4575
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/.gitignore
@@ -0,0 +1,3 @@
+/target
+**/*.rs.bk
+**/.*.swp
diff --git a/toolkit/components/bitsdownload/bits_client/Cargo.toml b/toolkit/components/bitsdownload/bits_client/Cargo.toml
new file mode 100644
index 0000000000..eb0083d8e2
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/Cargo.toml
@@ -0,0 +1,19 @@
+[package]
+name = "bits_client"
+version = "0.2.0"
+authors = ["Adam Gashlin <agashlin@mozilla.com>"]
+license = "MPL-2.0"
+publish = false
+
+[dependencies]
+bits = { path = "./bits" }
+comedy = "0.2.0"
+guid_win = "0.2.0"
+thiserror = "1"
+
+[dev-dependencies]
+#ctrlc = "3.1.1"
+lazy_static = "1.0.1"
+rand = "0.8"
+regex = { version = "1", default_features = false, features = ["perf", "std"] }
+tempfile = "3"
diff --git a/toolkit/components/bitsdownload/bits_client/README.md b/toolkit/components/bitsdownload/bits_client/README.md
new file mode 100644
index 0000000000..0c21a5b68c
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/README.md
@@ -0,0 +1,23 @@
+bits\_client
+============
+
+Interfaces for BITS.
+
+bits\_client lib
+---------------
+
+`bits_client` is the primary target and provides `BitsClient`, an API for creating and monitoring BITS jobs.
+
+`bits_client::new()` creates a `BitsClient` that does all operations within the current process, as the current user.
+
+bits crate
+----------
+
+`bits` is a safe interface to BITS, providing connections to the
+Background Copy Manager, some basic operations on Background Copy Jobs, and
+methods for implementing `IBackgroundCopyCallback`s in Rust.
+
+test\_client example
+-------------------
+
+`examples/test_client.rs` shows how to use the API.
diff --git a/toolkit/components/bitsdownload/bits_client/bits/Cargo.toml b/toolkit/components/bitsdownload/bits_client/bits/Cargo.toml
new file mode 100644
index 0000000000..36b7b5ff89
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/bits/Cargo.toml
@@ -0,0 +1,31 @@
+[package]
+name = "bits"
+version = "0.2.0"
+authors = ["Adam Gashlin <agashlin@mozilla.com>"]
+license = "MIT/Apache-2.0"
+publish = false
+
+[features]
+status_serde = ["serde", "serde_derive"]
+
+[dependencies]
+comedy = "0.2.0"
+filetime_win = "0.2.0"
+guid_win = "0.2.0"
+serde = { version = "1.0.80", optional = true }
+serde_derive = { version = "1.0.80", optional = true }
+
+[dependencies.winapi]
+version = "0.3.7"
+features = ["basetsd",
+ "bits",
+ "bits2_5",
+ "bitsmsg",
+ "guiddef",
+ "minwindef",
+ "ntdef",
+ "rpcndr",
+ "unknwnbase",
+ "winerror",
+ "winnls",
+ ]
diff --git a/toolkit/components/bitsdownload/bits_client/bits/src/callback.rs b/toolkit/components/bitsdownload/bits_client/bits/src/callback.rs
new file mode 100644
index 0000000000..6dace83be8
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/bits/src/callback.rs
@@ -0,0 +1,205 @@
+// Licensed under the Apache License, Version 2.0
+// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
+// All files in the project carrying such notice may not be copied, modified, or distributed
+// except according to those terms.
+
+use std::panic::{catch_unwind, RefUnwindSafe};
+use std::ptr::NonNull;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+use comedy::{com::ComRef, HResult};
+use guid_win::Guid;
+use winapi::ctypes::c_void;
+use winapi::shared::guiddef::REFIID;
+use winapi::shared::minwindef::DWORD;
+use winapi::shared::ntdef::ULONG;
+use winapi::shared::winerror::{E_FAIL, E_NOINTERFACE, HRESULT, NOERROR, S_OK};
+use winapi::um::bits::{
+ IBackgroundCopyCallback, IBackgroundCopyCallbackVtbl, IBackgroundCopyError, IBackgroundCopyJob,
+};
+use winapi::um::unknwnbase::{IUnknown, IUnknownVtbl};
+use winapi::Interface;
+
+use BitsJob;
+
+/// The type of a notification callback.
+///
+/// The callbacks must be `Fn()` to be called arbitrarily many times, `RefUnwindSafe` to have a
+/// panic unwind safely caught, `Send`, `Sync` and `'static` to run on any thread COM invokes us on
+/// any time.
+///
+/// If the callback returns a non-success `HRESULT`, the notification may pass to other BITS
+/// mechanisms such as `IBackgroundCopyJob2::SetNotifyCmdLine`.
+pub type TransferredCallback =
+ dyn (Fn() -> Result<(), HRESULT>) + RefUnwindSafe + Send + Sync + 'static;
+pub type ErrorCallback = dyn (Fn() -> Result<(), HRESULT>) + RefUnwindSafe + Send + Sync + 'static;
+pub type ModificationCallback =
+ dyn (Fn() -> Result<(), HRESULT>) + RefUnwindSafe + Send + Sync + 'static;
+
+#[repr(C)]
+pub struct BackgroundCopyCallback {
+ // Everything assumes that the interface vtable is the first member of this struct.
+ interface: IBackgroundCopyCallback,
+ rc: AtomicUsize,
+ transferred_cb: Option<Box<TransferredCallback>>,
+ error_cb: Option<Box<ErrorCallback>>,
+ modification_cb: Option<Box<ModificationCallback>>,
+}
+
+impl BackgroundCopyCallback {
+ /// Construct the callback object and register it with a job.
+ ///
+ /// Only one notify interface can be present on a job at once, so this will release BITS'
+ /// ref to any previously registered interface.
+ pub fn register(
+ job: &mut BitsJob,
+ transferred_cb: Option<Box<TransferredCallback>>,
+ error_cb: Option<Box<ErrorCallback>>,
+ modification_cb: Option<Box<ModificationCallback>>,
+ ) -> Result<(), HResult> {
+ let cb = Box::new(BackgroundCopyCallback {
+ interface: IBackgroundCopyCallback { lpVtbl: &VTBL },
+ rc: AtomicUsize::new(1),
+ transferred_cb,
+ error_cb,
+ modification_cb,
+ });
+
+ // Leak the callback, it has no Rust owner until we need to drop it later.
+ // The ComRef will Release when it goes out of scope.
+ unsafe {
+ let cb = ComRef::from_raw(NonNull::new_unchecked(Box::into_raw(cb) as *mut IUnknown));
+
+ job.set_notify_interface(cb.as_raw_ptr())?;
+ }
+
+ Ok(())
+ }
+}
+
+extern "system" fn query_interface(
+ this: *mut IUnknown,
+ riid: REFIID,
+ obj: *mut *mut c_void,
+) -> HRESULT {
+ unsafe {
+ // `IBackgroundCopyCallback` is the first (currently only) interface on the
+ // `BackgroundCopyCallback` object, so we can return `this` either as
+ // `IUnknown` or `IBackgroundCopyCallback`.
+ if Guid(*riid) == Guid(IUnknown::uuidof())
+ || Guid(*riid) == Guid(IBackgroundCopyCallback::uuidof())
+ {
+ addref(this);
+ // Cast first to `IBackgroundCopyCallback` to be clear which `IUnknown`
+ // we are pointing at.
+ *obj = this as *mut IBackgroundCopyCallback as *mut c_void;
+ NOERROR
+ } else {
+ E_NOINTERFACE
+ }
+ }
+}
+
+extern "system" fn addref(raw_this: *mut IUnknown) -> ULONG {
+ unsafe {
+ let this = raw_this as *const BackgroundCopyCallback;
+
+ // Forge a reference for just this statement.
+ let old_rc = (*this).rc.fetch_add(1, Ordering::SeqCst);
+ (old_rc + 1) as ULONG
+ }
+}
+
+extern "system" fn release(raw_this: *mut IUnknown) -> ULONG {
+ unsafe {
+ {
+ let this = raw_this as *const BackgroundCopyCallback;
+
+ // Forge a reference for just this statement.
+ let old_rc = (*this).rc.fetch_sub(1, Ordering::SeqCst);
+
+ let rc = old_rc - 1;
+ if rc > 0 {
+ return rc as ULONG;
+ }
+ }
+
+ // rc will have been 0 for us to get here, and we're out of scope of the reference above,
+ // so there should be no references or pointers left (besides `this`).
+ // Re-Box and to drop immediately.
+ let _ = Box::from_raw(raw_this as *mut BackgroundCopyCallback);
+
+ 0
+ }
+}
+
+extern "system" fn transferred_stub(
+ raw_this: *mut IBackgroundCopyCallback,
+ _job: *mut IBackgroundCopyJob,
+) -> HRESULT {
+ unsafe {
+ let this = raw_this as *const BackgroundCopyCallback;
+ // Forge a reference just for this statement.
+ if let Some(ref cb) = (*this).transferred_cb {
+ match catch_unwind(|| cb()) {
+ Ok(Ok(())) => S_OK,
+ Ok(Err(hr)) => hr,
+ Err(_) => E_FAIL,
+ }
+ } else {
+ S_OK
+ }
+ }
+}
+
+extern "system" fn error_stub(
+ raw_this: *mut IBackgroundCopyCallback,
+ _job: *mut IBackgroundCopyJob,
+ _error: *mut IBackgroundCopyError,
+) -> HRESULT {
+ unsafe {
+ let this = raw_this as *const BackgroundCopyCallback;
+ // Forge a reference just for this statement.
+ if let Some(ref cb) = (*this).error_cb {
+ match catch_unwind(|| cb()) {
+ Ok(Ok(())) => S_OK,
+ Ok(Err(hr)) => hr,
+ Err(_) => E_FAIL,
+ }
+ } else {
+ S_OK
+ }
+ }
+}
+
+extern "system" fn modification_stub(
+ raw_this: *mut IBackgroundCopyCallback,
+ _job: *mut IBackgroundCopyJob,
+ _reserved: DWORD,
+) -> HRESULT {
+ unsafe {
+ let this = raw_this as *const BackgroundCopyCallback;
+ // Forge a reference just for this statement.
+ if let Some(ref cb) = (*this).modification_cb {
+ match catch_unwind(|| cb()) {
+ Ok(Ok(())) => S_OK,
+ Ok(Err(hr)) => hr,
+ Err(_) => E_FAIL,
+ }
+ } else {
+ S_OK
+ }
+ }
+}
+
+pub static VTBL: IBackgroundCopyCallbackVtbl = IBackgroundCopyCallbackVtbl {
+ parent: IUnknownVtbl {
+ QueryInterface: query_interface,
+ AddRef: addref,
+ Release: release,
+ },
+ JobTransferred: transferred_stub,
+ JobError: error_stub,
+ JobModification: modification_stub,
+};
diff --git a/toolkit/components/bitsdownload/bits_client/bits/src/lib.rs b/toolkit/components/bitsdownload/bits_client/bits/src/lib.rs
new file mode 100644
index 0000000000..09da2b0349
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/bits/src/lib.rs
@@ -0,0 +1,592 @@
+// Licensed under the Apache License, Version 2.0
+// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
+// All files in the project carrying such notice may not be copied, modified, or distributed
+// except according to those terms.
+
+//! A safe interface for BITS
+//!
+//! The primary entry point into BITS is the
+//! [`BackgroundCopyManager`](struct.BackgroundCopyManager.html) struct.
+//!
+//! Functionality is only provided by this crate on an as-needed basis for
+//! [bits_client](../bits_client/index.html), so there are vast swathes of the BITS API
+//! unsupported.
+
+extern crate comedy;
+extern crate filetime_win;
+extern crate guid_win;
+extern crate winapi;
+
+#[cfg(feature = "status_serde")]
+extern crate serde;
+#[cfg(feature = "status_serde")]
+extern crate serde_derive;
+
+mod callback;
+pub mod status;
+mod wide;
+
+use std::ffi::{OsStr, OsString};
+use std::mem;
+use std::os::windows::ffi::OsStringExt;
+use std::ptr;
+use std::result;
+
+use comedy::com::{create_instance_local_server, CoTaskMem, ComRef, INIT_MTA};
+use comedy::error::{HResult, ResultExt};
+use comedy::{com_call, com_call_getter, com_call_taskmem_getter};
+use filetime_win::FileTime;
+use guid_win::Guid;
+use winapi::shared::minwindef::DWORD;
+use winapi::shared::ntdef::{HRESULT, LANGIDFROMLCID, ULONG};
+use winapi::shared::winerror::S_FALSE;
+use winapi::um::bits::{
+ IBackgroundCopyError, IBackgroundCopyFile, IBackgroundCopyJob, IBackgroundCopyManager,
+ IEnumBackgroundCopyFiles, IEnumBackgroundCopyJobs, BG_JOB_PRIORITY, BG_JOB_PRIORITY_FOREGROUND,
+ BG_JOB_PRIORITY_HIGH, BG_JOB_PRIORITY_LOW, BG_JOB_PRIORITY_NORMAL, BG_JOB_PROXY_USAGE,
+ BG_JOB_PROXY_USAGE_AUTODETECT, BG_JOB_PROXY_USAGE_NO_PROXY, BG_JOB_PROXY_USAGE_PRECONFIG,
+ BG_JOB_STATE_ERROR, BG_JOB_STATE_TRANSIENT_ERROR, BG_JOB_TYPE_DOWNLOAD, BG_NOTIFY_DISABLE,
+ BG_NOTIFY_JOB_ERROR, BG_NOTIFY_JOB_MODIFICATION, BG_NOTIFY_JOB_TRANSFERRED, BG_SIZE_UNKNOWN,
+};
+use winapi::um::bits2_5::{IBackgroundCopyJobHttpOptions, BG_HTTP_REDIRECT_POLICY_ALLOW_REPORT};
+use winapi::um::bitsmsg::BG_E_NOT_FOUND;
+use winapi::um::unknwnbase::IUnknown;
+use winapi::um::winnls::GetThreadLocale;
+
+pub use winapi::um::bits::{BG_ERROR_CONTEXT, BG_JOB_STATE};
+pub use winapi::um::bitsmsg::{BG_S_PARTIAL_COMPLETE, BG_S_UNABLE_TO_DELETE_FILES};
+
+pub use status::{
+ BitsErrorContext, BitsJobError, BitsJobProgress, BitsJobState, BitsJobStatus, BitsJobTimes,
+};
+use wide::ToWideNull;
+
+pub use winapi::shared::winerror::E_FAIL;
+
+#[repr(u32)]
+#[derive(Copy, Clone, Debug)]
+pub enum BitsJobPriority {
+ Foreground = BG_JOB_PRIORITY_FOREGROUND,
+ High = BG_JOB_PRIORITY_HIGH,
+ /// Default
+ Normal = BG_JOB_PRIORITY_NORMAL,
+ Low = BG_JOB_PRIORITY_LOW,
+}
+
+#[repr(u32)]
+#[derive(Copy, Clone, Debug)]
+pub enum BitsProxyUsage {
+ /// Directly access the network.
+ NoProxy = BG_JOB_PROXY_USAGE_NO_PROXY,
+ /// Use Internet Explorer proxy settings. This is the default.
+ Preconfig = BG_JOB_PROXY_USAGE_PRECONFIG,
+ /// Attempt to auto-detect the connection's proxy settings.
+ AutoDetect = BG_JOB_PROXY_USAGE_AUTODETECT,
+}
+
+type Result<T> = result::Result<T, HResult>;
+
+pub struct BackgroundCopyManager(ComRef<IBackgroundCopyManager>);
+
+impl BackgroundCopyManager {
+ /// Get access to the local BITS service.
+ ///
+ /// # COM Initialization and Threading Model #
+ ///
+ /// This method uses a thread local variable to initialize COM with a multithreaded apartment
+ /// model for this thread, and leaves it this way until the thread local is dropped.
+ /// If the thread was in a single-threaded apartment, `connect()` will fail gracefully.
+ ///
+ /// # Safety #
+ ///
+ /// If there are mismatched `CoUninitialize` calls on this thread which lead to COM shutting
+ /// down before this thread ends, unsafe behavior may result.
+ pub fn connect() -> Result<BackgroundCopyManager> {
+ INIT_MTA.with(|com| {
+ if let Err(e) = com {
+ return Err(e.clone());
+ }
+ Ok(())
+ })?;
+
+ // Assuming no mismatched CoUninitialize calls, methods do not have to check for
+ // successfully initialized COM once the object is constructed: `BackgroundCopyManager`
+ // is not `Send` or `Sync` so it must be used on the thread it was constructed on,
+ // which has now successfully inited MTA for the lifetime of thread local `INIT_MTA`.
+ // This also holds for any functions using pointers only derived from these methods, like
+ // the `BitsJob` methods.
+
+ Ok(BackgroundCopyManager(create_instance_local_server::<
+ winapi::um::bits::BackgroundCopyManager,
+ IBackgroundCopyManager,
+ >()?))
+ }
+
+ /// Create a new download job with the given name.
+ pub fn create_job(&self, display_name: &OsStr) -> Result<BitsJob> {
+ unsafe {
+ let mut guid = mem::zeroed();
+ Ok(BitsJob(com_call_getter!(
+ |job| self.0,
+ IBackgroundCopyManager::CreateJob(
+ display_name.to_wide_null().as_ptr(),
+ BG_JOB_TYPE_DOWNLOAD,
+ &mut guid,
+ job,
+ )
+ )?))
+ }
+ }
+
+ /// Cancel all jobs with the given name.
+ ///
+ /// This only attempts to cancel jobs owned by the current user.
+ /// No errors are returned for jobs that failed to cancel.
+ pub fn cancel_jobs_by_name(&self, match_name: &OsStr) -> Result<()> {
+ let jobs =
+ unsafe { com_call_getter!(|jobs| self.0, IBackgroundCopyManager::EnumJobs(0, jobs))? };
+
+ loop {
+ let result = unsafe {
+ com_call_getter!(
+ |job| jobs,
+ IEnumBackgroundCopyJobs::Next(1, job, ptr::null_mut())
+ )
+ };
+ match result {
+ Ok(job) => {
+ if job_name_eq(&job, match_name)? {
+ unsafe {
+ let _ = com_call!(job, IBackgroundCopyJob::Cancel());
+ }
+ }
+ }
+ Err(e) => {
+ if e.code() == S_FALSE {
+ // Ran out of jobs to enumerate
+ return Ok(());
+ } else {
+ return Err(e);
+ }
+ }
+ }
+ }
+ }
+
+ /// Get the job with the given GUID.
+ ///
+ /// Returns Err if the job was not found.
+ pub fn get_job_by_guid(&self, guid: &Guid) -> Result<BitsJob> {
+ unsafe { com_call_getter!(|job| self.0, IBackgroundCopyManager::GetJob(&guid.0, job)) }
+ .map(BitsJob)
+ }
+
+ /// Try to find a job with a given GUID.
+ ///
+ /// Returns Ok(None) if the job was not found but there was no other error.
+ pub fn find_job_by_guid(&self, guid: &Guid) -> Result<Option<BitsJob>> {
+ Ok(self
+ .get_job_by_guid(guid)
+ .map(Some)
+ .allow_err(BG_E_NOT_FOUND as i32, None)?)
+ }
+
+ /// Try to find a job with a given GUID and name.
+ ///
+ /// Returns Ok(None) if the job was not found, or if it had the wrong name, as long as there
+ /// was no other error.
+ pub fn find_job_by_guid_and_name(
+ &self,
+ guid: &Guid,
+ match_name: &OsStr,
+ ) -> Result<Option<BitsJob>> {
+ Ok(match self.find_job_by_guid(guid)? {
+ None => None,
+ Some(BitsJob(ref job)) if !job_name_eq(job, match_name)? => None,
+ result => result,
+ })
+ }
+
+ /// Translate a BITS `HRESULT` to a textual description.
+ ///
+ /// This uses the current thread's locale to look up the message associated with a BITS
+ /// error. It should only be used for `HRESULT`s returned from BITS COM interfaces.
+ pub fn get_error_description(&self, hr: HRESULT) -> Result<String> {
+ unsafe {
+ let language_id = DWORD::from(LANGIDFROMLCID(GetThreadLocale()));
+
+ Ok(taskmem_into_lossy_string(com_call_taskmem_getter!(
+ |desc| self.0,
+ IBackgroundCopyManager::GetErrorDescription(hr, language_id, desc)
+ )?))
+ }
+ }
+}
+
+unsafe fn taskmem_into_lossy_string(taskmem: CoTaskMem<u16>) -> String {
+ OsString::from_wide(taskmem.as_slice_until_null())
+ .to_string_lossy()
+ .into_owned()
+}
+
+fn job_name_eq(job: &ComRef<IBackgroundCopyJob>, match_name: &OsStr) -> Result<bool> {
+ let job_name = unsafe {
+ OsString::from_wide(
+ com_call_taskmem_getter!(|name| job, IBackgroundCopyJob::GetDisplayName(name))?
+ .as_slice_until_null(),
+ )
+ };
+
+ Ok(job_name == match_name)
+}
+
+pub struct BitsJob(ComRef<IBackgroundCopyJob>);
+
+impl BitsJob {
+ /// Get the job's GUID.
+ pub fn guid(&self) -> Result<Guid> {
+ // TODO: cache on create or retrieved by GUID?
+ unsafe {
+ let mut guid = mem::zeroed();
+ com_call!(self.0, IBackgroundCopyJob::GetId(&mut guid))?;
+ Ok(Guid(guid))
+ }
+ }
+
+ /// Add a file to the job.
+ pub fn add_file(&mut self, remote_url: &OsStr, local_file: &OsStr) -> Result<()> {
+ unsafe {
+ com_call!(
+ self.0,
+ IBackgroundCopyJob::AddFile(
+ remote_url.to_wide_null().as_ptr(),
+ local_file.to_wide_null().as_ptr(),
+ )
+ )
+ }?;
+ Ok(())
+ }
+
+ /// Get the first file in the job.
+ ///
+ /// This is provided for collecting the redirected remote name of single file jobs.
+ pub fn get_first_file(&mut self) -> Result<BitsFile> {
+ let files = unsafe { com_call_getter!(|e| self.0, IBackgroundCopyJob::EnumFiles(e))? };
+
+ let file = unsafe {
+ com_call_getter!(
+ |file| files,
+ IEnumBackgroundCopyFiles::Next(1, file, ptr::null_mut())
+ )?
+ };
+
+ Ok(BitsFile(file))
+ }
+
+ /// Set the job's description string.
+ ///
+ /// This is different from the display name set when creating the job.
+ pub fn set_description(&mut self, description: &OsStr) -> Result<()> {
+ unsafe {
+ com_call!(
+ self.0,
+ IBackgroundCopyJob::SetDescription(description.to_wide_null().as_ptr())
+ )
+ }?;
+ Ok(())
+ }
+
+ /// Change the job's proxy usage setting.
+ ///
+ /// The default is `BitsProxyUsage::Preconfig`.
+ pub fn set_proxy_usage(&mut self, usage: BitsProxyUsage) -> Result<()> {
+ use BitsProxyUsage::*;
+
+ match usage {
+ Preconfig | NoProxy | AutoDetect => {
+ unsafe {
+ com_call!(
+ self.0,
+ IBackgroundCopyJob::SetProxySettings(
+ usage as BG_JOB_PROXY_USAGE,
+ ptr::null_mut(),
+ ptr::null_mut(),
+ )
+ )
+ }?;
+ Ok(())
+ }
+ }
+ }
+
+ /// Change the job's priority.
+ ///
+ /// The default is `BitsJobPriority::Normal`.
+ pub fn set_priority(&mut self, priority: BitsJobPriority) -> Result<()> {
+ unsafe {
+ com_call!(
+ self.0,
+ IBackgroundCopyJob::SetPriority(priority as BG_JOB_PRIORITY)
+ )
+ }?;
+ Ok(())
+ }
+
+ pub fn set_minimum_retry_delay(&mut self, seconds: ULONG) -> Result<()> {
+ unsafe { com_call!(self.0, IBackgroundCopyJob::SetMinimumRetryDelay(seconds)) }?;
+ Ok(())
+ }
+
+ pub fn set_no_progress_timeout(&mut self, seconds: ULONG) -> Result<()> {
+ unsafe { com_call!(self.0, IBackgroundCopyJob::SetNoProgressTimeout(seconds)) }?;
+ Ok(())
+ }
+
+ /// Enable HTTP redirect reporting.
+ ///
+ /// The default setting is to allow HTTP redirects, but to not report them in any way. With
+ /// this setting enabled, the remote name of a file will be updated to reflect the redirect.
+ ///
+ /// # Compatibility #
+ ///
+ /// First available in Windows Vista.
+ pub fn set_redirect_report(&mut self) -> Result<()> {
+ unsafe {
+ com_call!(
+ self.0.cast()?,
+ IBackgroundCopyJobHttpOptions::SetSecurityFlags(
+ BG_HTTP_REDIRECT_POLICY_ALLOW_REPORT
+ )
+ )
+ }?;
+
+ Ok(())
+ }
+
+ /// Resume the job. This must be done at least once to initially enqueue the job.
+ pub fn resume(&mut self) -> Result<()> {
+ unsafe { com_call!(self.0, IBackgroundCopyJob::Resume()) }?;
+ Ok(())
+ }
+
+ pub fn suspend(&mut self) -> Result<()> {
+ unsafe { com_call!(self.0, IBackgroundCopyJob::Suspend()) }?;
+ Ok(())
+ }
+
+ /// Complete the job, moving the local files to their final names.
+ ///
+ /// Has two interesting success `HRESULT`s: `BG_S_PARTIAL_COMPLETE` and
+ /// `BG_S_UNABLE_TO_DELETE_FILES`.
+ pub fn complete(&mut self) -> Result<HRESULT> {
+ unsafe { com_call!(self.0, IBackgroundCopyJob::Complete()) }
+ }
+
+ /// Cancel the job, deleting any temporary files.
+ ///
+ /// Has an interesting success `HRESULT`: `BG_S_UNABLE_TO_DELETE_FILES`.
+ pub fn cancel(&mut self) -> Result<HRESULT> {
+ unsafe { com_call!(self.0, IBackgroundCopyJob::Cancel()) }
+ }
+
+ /// Set the notification callbacks to use with this job.
+ ///
+ /// This will replace any previously set callbacks.
+ pub fn register_callbacks(
+ &mut self,
+ transferred_cb: Option<Box<callback::TransferredCallback>>,
+ error_cb: Option<Box<callback::ErrorCallback>>,
+ modification_cb: Option<Box<callback::ModificationCallback>>,
+ ) -> Result<()> {
+ let mut flags = 0;
+ if transferred_cb.is_some() {
+ flags |= BG_NOTIFY_JOB_TRANSFERRED;
+ }
+ if error_cb.is_some() {
+ flags |= BG_NOTIFY_JOB_ERROR;
+ }
+ if modification_cb.is_some() {
+ flags |= BG_NOTIFY_JOB_MODIFICATION;
+ }
+
+ callback::BackgroundCopyCallback::register(
+ self,
+ transferred_cb,
+ error_cb,
+ modification_cb,
+ )?;
+
+ unsafe { com_call!(self.0, IBackgroundCopyJob::SetNotifyFlags(flags)) }?;
+
+ Ok(())
+ }
+
+ fn _clear_callbacks(&mut self) -> Result<()> {
+ unsafe {
+ com_call!(
+ self.0,
+ IBackgroundCopyJob::SetNotifyFlags(BG_NOTIFY_DISABLE)
+ )?;
+
+ self.set_notify_interface(ptr::null_mut() as *mut IUnknown)
+ }
+ }
+
+ /// Collect the current status of the job, including errors.
+ pub fn get_status(&self) -> Result<BitsJobStatus> {
+ let mut state = 0;
+ let mut progress = unsafe { mem::zeroed() };
+ let mut error_count = 0;
+ let mut times = unsafe { mem::zeroed() };
+
+ unsafe {
+ com_call!(self.0, IBackgroundCopyJob::GetState(&mut state))?;
+ com_call!(self.0, IBackgroundCopyJob::GetProgress(&mut progress))?;
+ com_call!(self.0, IBackgroundCopyJob::GetErrorCount(&mut error_count))?;
+ com_call!(self.0, IBackgroundCopyJob::GetTimes(&mut times))?;
+ }
+
+ Ok(BitsJobStatus {
+ state: BitsJobState::from(state),
+ progress: BitsJobProgress {
+ total_bytes: if progress.BytesTotal == BG_SIZE_UNKNOWN {
+ None
+ } else {
+ Some(progress.BytesTotal)
+ },
+ transferred_bytes: progress.BytesTransferred,
+ total_files: progress.FilesTotal,
+ transferred_files: progress.FilesTransferred,
+ },
+ error_count,
+ error: if state == BG_JOB_STATE_ERROR || state == BG_JOB_STATE_TRANSIENT_ERROR {
+ let error_obj =
+ unsafe { com_call_getter!(|e| self.0, IBackgroundCopyJob::GetError(e)) }?;
+
+ Some(BitsJob::get_error(error_obj)?)
+ } else {
+ None
+ },
+ times: BitsJobTimes {
+ creation: FileTime(times.CreationTime),
+ modification: FileTime(times.ModificationTime),
+ transfer_completion: if times.TransferCompletionTime.dwLowDateTime == 0
+ && times.TransferCompletionTime.dwHighDateTime == 0
+ {
+ None
+ } else {
+ Some(FileTime(times.TransferCompletionTime))
+ },
+ },
+ })
+ }
+
+ fn get_error(error_obj: ComRef<IBackgroundCopyError>) -> Result<BitsJobError> {
+ let mut context = 0;
+ let mut hresult = 0;
+ unsafe {
+ com_call!(
+ error_obj,
+ IBackgroundCopyError::GetError(&mut context, &mut hresult)
+ )?;
+
+ let language_id = DWORD::from(LANGIDFROMLCID(GetThreadLocale()));
+
+ let context = BitsErrorContext::from(context);
+ let context_str = com_call_taskmem_getter!(
+ |desc| error_obj,
+ IBackgroundCopyError::GetErrorContextDescription(language_id, desc)
+ )
+ .map(|s| taskmem_into_lossy_string(s))
+ .unwrap_or_else(|_| format!("{:?}", context));
+ let error_str = com_call_taskmem_getter!(
+ |desc| error_obj,
+ IBackgroundCopyError::GetErrorDescription(language_id, desc)
+ )
+ .map(|s| taskmem_into_lossy_string(s))
+ .unwrap_or_else(|_| format!("{:#08x}", hresult));
+
+ Ok(BitsJobError {
+ context,
+ context_str,
+ error: hresult,
+ error_str,
+ })
+ }
+ }
+
+ unsafe fn set_notify_interface(&self, interface: *mut IUnknown) -> Result<()> {
+ com_call!(self.0, IBackgroundCopyJob::SetNotifyInterface(interface))?;
+ Ok(())
+ }
+}
+
+pub struct BitsFile(ComRef<IBackgroundCopyFile>);
+
+/// A single file in a BITS job.
+///
+/// This is provided for collecting the redirected remote name.
+impl BitsFile {
+ /// Get the remote name from which the file is being downloaded.
+ ///
+ /// If [`BitsJob::set_redirect_report()`](struct.BitsJob.html#method.set_redirect_report)
+ /// hasn't been called on the job, this won't be
+ /// updated as HTTP redirects are processed.
+ pub fn get_remote_name(&self) -> Result<OsString> {
+ unsafe {
+ Ok(OsString::from_wide(
+ com_call_taskmem_getter!(|name| self.0, IBackgroundCopyFile::GetRemoteName(name))?
+ .as_slice_until_null(),
+ ))
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::BackgroundCopyManager;
+ use std::ffi::OsString;
+ use std::mem;
+
+ #[test]
+ #[ignore]
+ fn test_find_job() {
+ let bcm = BackgroundCopyManager::connect().unwrap();
+ let name = OsString::from("bits test job");
+ let wrong_name = OsString::from("bits test jobbo");
+
+ let mut job = bcm.create_job(&name).unwrap();
+ let guid = job.guid().unwrap();
+
+ assert_eq!(
+ bcm.find_job_by_guid(&guid)
+ .unwrap()
+ .unwrap()
+ .guid()
+ .unwrap(),
+ guid
+ );
+ assert_eq!(
+ bcm.find_job_by_guid_and_name(&guid, &name)
+ .unwrap()
+ .unwrap()
+ .guid()
+ .unwrap(),
+ guid
+ );
+ assert!(bcm
+ .find_job_by_guid_and_name(&guid, &wrong_name)
+ .unwrap()
+ .is_none());
+
+ job.cancel().unwrap();
+ mem::drop(job);
+
+ assert!(bcm.find_job_by_guid(&guid).unwrap().is_none());
+ assert!(bcm
+ .find_job_by_guid_and_name(&guid, &name)
+ .unwrap()
+ .is_none());
+ }
+}
diff --git a/toolkit/components/bitsdownload/bits_client/bits/src/status.rs b/toolkit/components/bitsdownload/bits_client/bits/src/status.rs
new file mode 100644
index 0000000000..648a26866e
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/bits/src/status.rs
@@ -0,0 +1,118 @@
+// Licensed under the Apache License, Version 2.0
+// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
+// All files in the project carrying such notice may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Data types for reporting a job's status
+
+use filetime_win::FileTime;
+use winapi::shared::winerror::HRESULT;
+use winapi::um::bits::{BG_ERROR_CONTEXT, BG_JOB_STATE};
+
+#[cfg(feature = "status_serde")]
+use serde_derive::{Deserialize, Serialize};
+
+#[derive(Clone, Debug)]
+#[cfg_attr(feature = "status_serde", derive(Serialize, Deserialize))]
+pub struct BitsJobStatus {
+ pub state: BitsJobState,
+ pub progress: BitsJobProgress,
+ pub error_count: u32,
+ pub error: Option<BitsJobError>,
+ pub times: BitsJobTimes,
+}
+
+#[derive(Clone, Debug)]
+#[cfg_attr(feature = "status_serde", derive(Serialize, Deserialize))]
+pub struct BitsJobError {
+ pub context: BitsErrorContext,
+ pub context_str: String,
+ pub error: HRESULT,
+ pub error_str: String,
+}
+
+#[derive(Copy, Clone, Debug, Eq, PartialEq)]
+#[cfg_attr(feature = "status_serde", derive(Serialize, Deserialize))]
+pub enum BitsErrorContext {
+ None,
+ Unknown,
+ GeneralQueueManager,
+ QueueManagerNotification,
+ LocalFile,
+ RemoteFile,
+ GeneralTransport,
+ RemoteApplication,
+ /// No other values are documented
+ Other(BG_ERROR_CONTEXT),
+}
+
+impl From<BG_ERROR_CONTEXT> for BitsErrorContext {
+ fn from(ec: BG_ERROR_CONTEXT) -> BitsErrorContext {
+ use self::BitsErrorContext::*;
+ use winapi::um::bits;
+ match ec {
+ bits::BG_ERROR_CONTEXT_NONE => None,
+ bits::BG_ERROR_CONTEXT_UNKNOWN => Unknown,
+ bits::BG_ERROR_CONTEXT_GENERAL_QUEUE_MANAGER => GeneralQueueManager,
+ bits::BG_ERROR_CONTEXT_QUEUE_MANAGER_NOTIFICATION => QueueManagerNotification,
+ bits::BG_ERROR_CONTEXT_LOCAL_FILE => LocalFile,
+ bits::BG_ERROR_CONTEXT_REMOTE_FILE => RemoteFile,
+ bits::BG_ERROR_CONTEXT_GENERAL_TRANSPORT => GeneralTransport,
+ bits::BG_ERROR_CONTEXT_REMOTE_APPLICATION => RemoteApplication,
+ ec => Other(ec),
+ }
+ }
+}
+
+#[derive(Copy, Clone, Debug, Eq, PartialEq)]
+#[cfg_attr(feature = "status_serde", derive(Serialize, Deserialize))]
+pub enum BitsJobState {
+ Queued,
+ Connecting,
+ Transferring,
+ Suspended,
+ Error,
+ TransientError,
+ Transferred,
+ Acknowledged,
+ Cancelled,
+ /// No other values are documented
+ Other(BG_JOB_STATE),
+}
+
+impl From<BG_JOB_STATE> for BitsJobState {
+ fn from(s: BG_JOB_STATE) -> BitsJobState {
+ use self::BitsJobState::*;
+ use winapi::um::bits;
+ match s {
+ bits::BG_JOB_STATE_QUEUED => Queued,
+ bits::BG_JOB_STATE_CONNECTING => Connecting,
+ bits::BG_JOB_STATE_TRANSFERRING => Transferring,
+ bits::BG_JOB_STATE_SUSPENDED => Suspended,
+ bits::BG_JOB_STATE_ERROR => Error,
+ bits::BG_JOB_STATE_TRANSIENT_ERROR => TransientError,
+ bits::BG_JOB_STATE_TRANSFERRED => Transferred,
+ bits::BG_JOB_STATE_ACKNOWLEDGED => Acknowledged,
+ bits::BG_JOB_STATE_CANCELLED => Cancelled,
+ s => Other(s),
+ }
+ }
+}
+
+#[derive(Copy, Clone, Debug)]
+#[cfg_attr(feature = "status_serde", derive(Serialize, Deserialize))]
+pub struct BitsJobProgress {
+ pub total_bytes: Option<u64>,
+ pub transferred_bytes: u64,
+ pub total_files: u32,
+ pub transferred_files: u32,
+}
+
+#[derive(Copy, Clone, Debug)]
+#[cfg_attr(feature = "status_serde", derive(Serialize, Deserialize))]
+pub struct BitsJobTimes {
+ pub creation: FileTime,
+ pub modification: FileTime,
+ pub transfer_completion: Option<FileTime>,
+}
diff --git a/toolkit/components/bitsdownload/bits_client/bits/src/wide.rs b/toolkit/components/bitsdownload/bits_client/bits/src/wide.rs
new file mode 100644
index 0000000000..c108f8d629
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/bits/src/wide.rs
@@ -0,0 +1,38 @@
+// Licensed under the Apache License, Version 2.0
+// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
+// All files in the project carrying such notice may not be copied, modified, or distributed
+// except according to those terms.
+
+// Minimal null-terminated wide string support from wio.
+
+use std::ffi::{OsStr, OsString};
+use std::os::windows::ffi::{OsStrExt, OsStringExt};
+use std::slice;
+
+pub trait ToWideNull {
+ fn to_wide_null(&self) -> Vec<u16>;
+}
+
+impl<T: AsRef<OsStr>> ToWideNull for T {
+ fn to_wide_null(&self) -> Vec<u16> {
+ self.as_ref().encode_wide().chain(Some(0)).collect()
+ }
+}
+
+pub trait FromWidePtrNull {
+ unsafe fn from_wide_ptr_null(wide: *const u16) -> Self;
+}
+
+impl FromWidePtrNull for OsString {
+ unsafe fn from_wide_ptr_null(wide: *const u16) -> Self {
+ assert!(!wide.is_null());
+
+ for i in 0.. {
+ if *wide.offset(i) == 0 {
+ return Self::from_wide(&slice::from_raw_parts(wide, i as usize));
+ }
+ }
+ unreachable!()
+ }
+}
diff --git a/toolkit/components/bitsdownload/bits_client/examples/test_client.rs b/toolkit/components/bitsdownload/bits_client/examples/test_client.rs
new file mode 100644
index 0000000000..25d7a0d82e
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/examples/test_client.rs
@@ -0,0 +1,285 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+extern crate bits_client;
+extern crate comedy;
+//extern crate ctrlc;
+extern crate guid_win;
+extern crate thiserror;
+
+use std::env;
+use std::ffi::{OsStr, OsString};
+use std::process;
+use std::str::FromStr;
+use std::sync::{Arc, Mutex};
+
+use thiserror::Error;
+
+use bits_client::bits_protocol::HResultMessage;
+use bits_client::{BitsClient, BitsJobState, BitsMonitorClient, BitsProxyUsage, Guid, PipeError};
+
+#[derive(Debug, Error)]
+enum MyError {
+ #[error("{0}")]
+ Msg(String),
+ #[error("HResult")]
+ HResult(#[from] comedy::HResult),
+ #[error("Win32Error")]
+ Win32Error(#[from] comedy::Win32Error),
+ #[error("PipeError")]
+ PipeError(#[from] PipeError),
+ #[error("HResultMessage")]
+ HResultMessage(#[from] HResultMessage),
+}
+
+macro_rules! bail {
+ ($e:expr) => {
+ return Err($crate::MyError::Msg($e.to_string()))
+ };
+ ($fmt:expr, $($arg:tt)*) => {
+ return Err($crate::MyError::Msg(format!($fmt, $($arg)*)))
+ };
+}
+
+type Result = std::result::Result<(), MyError>;
+
+pub fn main() {
+ if let Err(err) = entry() {
+ eprintln!("{}", err);
+ let mut err: &dyn std::error::Error = &err;
+ while let Some(source) = err.source() {
+ eprintln!("caused by {}", source);
+ err = source;
+ }
+
+ process::exit(1);
+ } else {
+ println!("OK");
+ }
+}
+
+const EXE_NAME: &'static str = "test_client";
+
+fn usage() -> String {
+ format!(
+ concat!(
+ "Usage {0} <command> ",
+ "[local-service] ",
+ "[args...]\n",
+ "Commands:\n",
+ " bits-start <URL> <local file>\n",
+ " bits-monitor <GUID>\n",
+ " bits-bg <GUID>\n",
+ " bits-fg <GUID>\n",
+ " bits-suspend <GUID>\n",
+ " bits-resume <GUID>\n",
+ " bits-complete <GUID>\n",
+ " bits-cancel <GUID> ...\n"
+ ),
+ EXE_NAME
+ )
+}
+
+fn entry() -> Result {
+ let args: Vec<_> = env::args_os().collect();
+
+ let mut client = BitsClient::new(
+ OsString::from("bits_client test"),
+ OsString::from("C:\\ProgramData"),
+ )?;
+
+ if args.len() < 2 {
+ eprintln!("{}", usage());
+ bail!("not enough arguments");
+ }
+
+ let cmd = &*args[1].to_string_lossy();
+ let cmd_args = &args[2..];
+
+ match cmd {
+ // command line client for testing
+ "bits-start" if cmd_args.len() == 2 => bits_start(
+ Arc::new(Mutex::new(client)),
+ cmd_args[0].clone(),
+ cmd_args[1].clone(),
+ BitsProxyUsage::Preconfig,
+ ),
+ "bits-monitor" if cmd_args.len() == 1 => {
+ bits_monitor(Arc::new(Mutex::new(client)), &cmd_args[0])
+ }
+ // TODO: some way of testing set update interval
+ "bits-bg" if cmd_args.len() == 1 => bits_bg(&mut client, &cmd_args[0]),
+ "bits-fg" if cmd_args.len() == 1 => bits_fg(&mut client, &cmd_args[0]),
+ "bits-suspend" if cmd_args.len() == 1 => bits_suspend(&mut client, &cmd_args[0]),
+ "bits-resume" if cmd_args.len() == 1 => bits_resume(&mut client, &cmd_args[0]),
+ "bits-complete" if cmd_args.len() == 1 => bits_complete(&mut client, &cmd_args[0]),
+ "bits-cancel" if cmd_args.len() >= 1 => {
+ for guid in cmd_args {
+ bits_cancel(&mut client, guid)?;
+ }
+ Ok(())
+ }
+ _ => {
+ eprintln!("{}", usage());
+ bail!("usage error");
+ }
+ }
+}
+
+fn bits_start(
+ client: Arc<Mutex<BitsClient>>,
+ url: OsString,
+ save_path: OsString,
+ proxy_usage: BitsProxyUsage,
+) -> Result {
+ //let interval = 10 * 60 * 1000;
+ let no_progress_timeout_secs = 60;
+ let interval = 1000;
+
+ let result = client.lock().unwrap().start_job(
+ url,
+ save_path,
+ proxy_usage,
+ no_progress_timeout_secs,
+ interval,
+ )?;
+
+ match result {
+ Ok((r, monitor_client)) => {
+ println!("start success, guid = {}", r.guid);
+ client
+ .lock()
+ .unwrap()
+ .set_update_interval(r.guid.clone(), interval)?
+ .unwrap();
+ monitor_loop(client, monitor_client, r.guid.clone(), interval)?;
+ Ok(())
+ }
+ Err(e) => {
+ let _ = e.clone();
+ bail!("error from server {}", e)
+ }
+ }
+}
+
+fn bits_monitor(client: Arc<Mutex<BitsClient>>, guid: &OsStr) -> Result {
+ let guid = Guid::from_str(&guid.to_string_lossy())?;
+ let result = client.lock().unwrap().monitor_job(guid.clone(), 1000)?;
+ match result {
+ Ok(monitor_client) => {
+ println!("monitor success");
+ monitor_loop(client, monitor_client, guid, 1000)?;
+ Ok(())
+ }
+ Err(e) => bail!("error from server {}", e),
+ }
+}
+
+fn _check_client_send()
+where
+ BitsClient: Send,
+{
+}
+fn _check_monitor_send()
+where
+ BitsMonitorClient: Send,
+{
+}
+
+fn monitor_loop(
+ _client: Arc<Mutex<BitsClient>>,
+ mut monitor_client: BitsMonitorClient,
+ _guid: Guid,
+ wait_millis: u32,
+) -> Result {
+ /*
+ // Commented out to avoid vendoring ctrlc.
+ // This could also possibly be done with `exclude` in the mozilla-central `Cargo.toml`.
+ let client_for_handler = _client.clone();
+ ctrlc::set_handler(move || {
+ eprintln!("Ctrl-C!");
+ let _ = client_for_handler.lock().unwrap().stop_update(_guid.clone());
+ })
+ .expect("Error setting Ctrl-C handler");
+ */
+
+ loop {
+ let status = monitor_client.get_status(wait_millis * 10)??;
+
+ println!("{:?} {:?}", BitsJobState::from(status.state), status);
+
+ //println!("{}", job.get_first_file()?.get_remote_name()?.into_string().unwrap());
+ let transfer_completion_time = if let Some(ft) = status.times.transfer_completion {
+ format!("Some({})", ft.to_system_time_utc()?)
+ } else {
+ String::from("None")
+ };
+ println!(
+ "creation: {}, modification: {}, transfer completion: {}",
+ status.times.creation.to_system_time_utc()?,
+ status.times.modification.to_system_time_utc()?,
+ transfer_completion_time
+ );
+
+ match BitsJobState::from(status.state) {
+ BitsJobState::Connecting
+ | BitsJobState::Transferring
+ | BitsJobState::TransientError => {}
+ _ => break,
+ }
+ }
+ println!("monitor loop ending");
+ println!("sleeping...");
+ std::thread::sleep(std::time::Duration::from_secs(1));
+
+ Ok(())
+}
+
+fn bits_bg(client: &mut BitsClient, guid: &OsStr) -> Result {
+ bits_set_priority(client, guid, false)
+}
+
+fn bits_fg(client: &mut BitsClient, guid: &OsStr) -> Result {
+ bits_set_priority(client, guid, true)
+}
+
+fn bits_set_priority(client: &mut BitsClient, guid: &OsStr, foreground: bool) -> Result {
+ let guid = Guid::from_str(&guid.to_string_lossy())?;
+ match client.set_job_priority(guid, foreground)? {
+ Ok(()) => Ok(()),
+ Err(e) => bail!("error from server {}", e),
+ }
+}
+
+fn bits_suspend(client: &mut BitsClient, guid: &OsStr) -> Result {
+ let guid = Guid::from_str(&guid.to_string_lossy())?;
+ match client.suspend_job(guid)? {
+ Ok(()) => Ok(()),
+ Err(e) => bail!("error from server {}", e),
+ }
+}
+
+fn bits_resume(client: &mut BitsClient, guid: &OsStr) -> Result {
+ let guid = Guid::from_str(&guid.to_string_lossy())?;
+ match client.resume_job(guid)? {
+ Ok(()) => Ok(()),
+ Err(e) => bail!("error from server {}", e),
+ }
+}
+
+fn bits_complete(client: &mut BitsClient, guid: &OsStr) -> Result {
+ let guid = Guid::from_str(&guid.to_string_lossy())?;
+ match client.complete_job(guid)? {
+ Ok(()) => Ok(()),
+ Err(e) => bail!("error from server {}", e),
+ }
+}
+
+fn bits_cancel(client: &mut BitsClient, guid: &OsStr) -> Result {
+ let guid = Guid::from_str(&guid.to_string_lossy())?;
+ match client.cancel_job(guid)? {
+ Ok(()) => Ok(()),
+ Err(e) => bail!("error from server {}", e),
+ }
+}
diff --git a/toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs b/toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs
new file mode 100644
index 0000000000..6f19f5ef23
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs
@@ -0,0 +1,380 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+//! Command, response, and status types.
+
+use std::error::Error as StdError;
+use std::ffi::OsString;
+use std::fmt;
+use std::result;
+
+use guid_win::Guid;
+use thiserror::Error;
+
+use super::{BitsErrorContext, BitsJobProgress, BitsJobState, BitsJobTimes, BitsProxyUsage};
+
+type HRESULT = i32;
+
+/// An HRESULT with a descriptive message
+#[derive(Clone, Debug)]
+pub struct HResultMessage {
+ pub hr: HRESULT,
+ pub message: String,
+}
+
+impl fmt::Display for HResultMessage {
+ fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
+ self.message.fmt(f)
+ }
+}
+
+impl StdError for HResultMessage {}
+
+/// Commands which can be sent to the server.
+///
+/// This is currently unused as the out-of-process Local Service server is not finished.
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub enum Command {
+ StartJob(StartJobCommand),
+ MonitorJob(MonitorJobCommand),
+ SuspendJob(SuspendJobCommand),
+ ResumeJob(ResumeJobCommand),
+ SetJobPriority(SetJobPriorityCommand),
+ SetNoProgressTimeout(SetNoProgressTimeoutCommand),
+ SetUpdateInterval(SetUpdateIntervalCommand),
+ CompleteJob(CompleteJobCommand),
+ CancelJob(CancelJobCommand),
+}
+
+/// Combine a [`Command`](enum.Command.html) with its success and failure result types.
+#[doc(hidden)]
+pub trait CommandType {
+ type Success;
+ type Failure: StdError;
+ fn wrap(command: Self) -> Command;
+}
+
+// Start Job
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct StartJobCommand {
+ pub url: OsString,
+ pub save_path: OsString,
+ pub proxy_usage: BitsProxyUsage,
+ pub no_progress_timeout_secs: u32,
+ pub monitor: Option<MonitorConfig>,
+}
+
+impl CommandType for StartJobCommand {
+ type Success = StartJobSuccess;
+ type Failure = StartJobFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::StartJob(cmd)
+ }
+}
+
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct MonitorConfig {
+ pub pipe_name: OsString,
+ pub interval_millis: u32,
+}
+
+#[derive(Clone, Debug)]
+pub struct StartJobSuccess {
+ pub guid: Guid,
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum StartJobFailure {
+ #[error("Argument validation failed: {0}")]
+ ArgumentValidation(String),
+ #[error("Create job: {0}")]
+ Create(HResultMessage),
+ #[error("Add file to job: {0}")]
+ AddFile(HResultMessage),
+ #[error("Apply settings to job: {0}")]
+ ApplySettings(HResultMessage),
+ #[error("Resume job: {0}")]
+ Resume(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Monitor Job
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct MonitorJobCommand {
+ pub guid: Guid,
+ pub monitor: MonitorConfig,
+}
+
+impl CommandType for MonitorJobCommand {
+ type Success = ();
+ type Failure = MonitorJobFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::MonitorJob(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum MonitorJobFailure {
+ #[error("Argument validation failed: {0}")]
+ ArgumentValidation(String),
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Suspend Job
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct SuspendJobCommand {
+ pub guid: Guid,
+}
+
+impl CommandType for SuspendJobCommand {
+ type Success = ();
+ type Failure = SuspendJobFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::SuspendJob(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum SuspendJobFailure {
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Suspend job: {0}")]
+ SuspendJob(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Resume Job
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct ResumeJobCommand {
+ pub guid: Guid,
+}
+
+impl CommandType for ResumeJobCommand {
+ type Success = ();
+ type Failure = ResumeJobFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::ResumeJob(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum ResumeJobFailure {
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Resume job: {0}")]
+ ResumeJob(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Set Job Priority
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct SetJobPriorityCommand {
+ pub guid: Guid,
+ pub foreground: bool,
+}
+
+impl CommandType for SetJobPriorityCommand {
+ type Success = ();
+ type Failure = SetJobPriorityFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::SetJobPriority(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum SetJobPriorityFailure {
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Apply settings to job: {0}")]
+ ApplySettings(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Set No Progress Timeout
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct SetNoProgressTimeoutCommand {
+ pub guid: Guid,
+ pub timeout_secs: u32,
+}
+
+impl CommandType for SetNoProgressTimeoutCommand {
+ type Success = ();
+ type Failure = SetNoProgressTimeoutFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::SetNoProgressTimeout(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum SetNoProgressTimeoutFailure {
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Apply settings to job: {0}")]
+ ApplySettings(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Set Update Interval
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct SetUpdateIntervalCommand {
+ pub guid: Guid,
+ pub interval_millis: u32,
+}
+
+impl CommandType for SetUpdateIntervalCommand {
+ type Success = ();
+ type Failure = SetUpdateIntervalFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::SetUpdateInterval(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum SetUpdateIntervalFailure {
+ #[error("Argument validation: {0}")]
+ ArgumentValidation(String),
+ #[error("Monitor not found")]
+ NotFound,
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Complete Job
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct CompleteJobCommand {
+ pub guid: Guid,
+}
+
+impl CommandType for CompleteJobCommand {
+ type Success = ();
+ type Failure = CompleteJobFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::CompleteJob(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum CompleteJobFailure {
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Complete job: {0}")]
+ CompleteJob(HResultMessage),
+ #[error("Job only partially completed")]
+ PartialComplete,
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+// Cancel Job
+#[doc(hidden)]
+#[derive(Clone, Debug)]
+pub struct CancelJobCommand {
+ pub guid: Guid,
+}
+
+impl CommandType for CancelJobCommand {
+ type Success = ();
+ type Failure = CancelJobFailure;
+ fn wrap(cmd: Self) -> Command {
+ Command::CancelJob(cmd)
+ }
+}
+
+#[derive(Clone, Debug, Error)]
+pub enum CancelJobFailure {
+ #[error("Job not found")]
+ NotFound,
+ #[error("Get job: {0}")]
+ GetJob(HResultMessage),
+ #[error("Cancel job: {0}")]
+ CancelJob(HResultMessage),
+ #[error("Connect to BackgroundCopyManager: {0}")]
+ ConnectBcm(HResultMessage),
+ #[error("BITS error: {0}")]
+ OtherBITS(HResultMessage),
+ #[error("Other failure: {0}")]
+ Other(String),
+}
+
+/// Job status report
+///
+/// This includes a URL which updates with redirect but is otherwise the same as
+/// `bits::status::BitsJobStatus`.
+#[derive(Clone, Debug)]
+pub struct JobStatus {
+ pub state: BitsJobState,
+ pub progress: BitsJobProgress,
+ pub error_count: u32,
+ pub error: Option<JobError>,
+ pub times: BitsJobTimes,
+ /// None means same as last time
+ pub url: Option<OsString>,
+}
+
+/// Job error report
+#[derive(Clone, Debug, Error)]
+#[error("Job error in context {context_str}: {error}")]
+pub struct JobError {
+ pub context: BitsErrorContext,
+ pub context_str: String,
+ pub error: HResultMessage,
+}
diff --git a/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs b/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs
new file mode 100644
index 0000000000..c7f6869167
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs
@@ -0,0 +1,504 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+use std::cmp;
+use std::collections::{hash_map, HashMap};
+use std::ffi;
+use std::path;
+use std::sync::{Arc, Condvar, Mutex, Weak};
+use std::time::{Duration, Instant};
+
+use bits::{
+ BackgroundCopyManager, BitsJob, BitsJobPriority, BitsProxyUsage, BG_S_PARTIAL_COMPLETE, E_FAIL,
+};
+use guid_win::Guid;
+
+use bits_protocol::*;
+
+use super::Error;
+
+// This is a macro in order to use the NotFound and GetJob variants from whatever enum is in scope.
+macro_rules! get_job {
+ ($bcm:ident, $guid:expr, $name:expr) => {{
+ $bcm = BackgroundCopyManager::connect().map_err(|e| {
+ ConnectBcm(HResultMessage {
+ hr: e.code(),
+ message: e.to_string(),
+ })
+ })?;
+ $bcm.find_job_by_guid_and_name($guid, $name)
+ .map_err(|e| GetJob($crate::in_process::format_error(&$bcm, e)))?
+ .ok_or(NotFound)?
+ }};
+}
+
+fn format_error(bcm: &BackgroundCopyManager, error: comedy::HResult) -> HResultMessage {
+ let bits_description = bcm.get_error_description(error.code()).ok();
+
+ HResultMessage {
+ hr: error.code(),
+ message: if let Some(desc) = bits_description {
+ format!("{}: {}", error, desc)
+ } else {
+ format!("{}", error)
+ },
+ }
+}
+
+// The in-process client uses direct BITS calls via the `bits` crate.
+// See the corresponding functions in BitsClient.
+pub struct InProcessClient {
+ job_name: ffi::OsString,
+ save_path_prefix: path::PathBuf,
+ monitors: HashMap<Guid, InProcessMonitorControl>,
+}
+
+impl InProcessClient {
+ pub fn new(
+ job_name: ffi::OsString,
+ save_path_prefix: ffi::OsString,
+ ) -> Result<InProcessClient, Error> {
+ Ok(InProcessClient {
+ job_name,
+ save_path_prefix: path::PathBuf::from(save_path_prefix),
+ monitors: HashMap::new(),
+ })
+ }
+
+ pub fn start_job(
+ &mut self,
+ url: ffi::OsString,
+ save_path: ffi::OsString,
+ proxy_usage: BitsProxyUsage,
+ no_progress_timeout_secs: u32,
+ monitor_interval_millis: u32,
+ ) -> Result<(StartJobSuccess, InProcessMonitor), StartJobFailure> {
+ use StartJobFailure::*;
+
+ let full_path = self.save_path_prefix.join(save_path);
+
+ // Verify that `full_path` is under the directory called `save_path_prefix`.
+ {
+ let canonical_prefix = self.save_path_prefix.canonicalize().map_err(|e| {
+ ArgumentValidation(format!("save_path_prefix.canonicalize(): {}", e))
+ })?;
+ // Full path minus file name, canonicalize() fails with nonexistent files, but the
+ // parent directory ought to exist.
+ let canonical_full_path = full_path
+ .parent()
+ .ok_or_else(|| ArgumentValidation("full_path.parent(): None".into()))?
+ .canonicalize()
+ .map_err(|e| {
+ ArgumentValidation(format!("full_path.parent().canonicalize(): {}", e))
+ })?;
+
+ if !canonical_full_path.starts_with(&canonical_prefix) {
+ return Err(ArgumentValidation(format!(
+ "{:?} is not within {:?}",
+ canonical_full_path, canonical_prefix
+ )));
+ }
+ }
+
+ // TODO: Should the job be explicitly cleaned up if this fn can't return success?
+ // If the job is dropped before `AddFile` succeeds, I think it automatically gets
+ // deleted from the queue. There is only one fallible call after that (`Resume`).
+
+ let bcm = BackgroundCopyManager::connect().map_err(|e| {
+ ConnectBcm(HResultMessage {
+ hr: e.code(),
+ message: e.to_string(),
+ })
+ })?;
+ let mut job = bcm
+ .create_job(&self.job_name)
+ .map_err(|e| Create(format_error(&bcm, e)))?;
+
+ let guid = job.guid().map_err(|e| OtherBITS(format_error(&bcm, e)))?;
+
+ (|| {
+ job.set_proxy_usage(proxy_usage)?;
+ job.set_minimum_retry_delay(60)?;
+ job.set_no_progress_timeout(no_progress_timeout_secs)?;
+ job.set_redirect_report()?;
+
+ job.set_priority(BitsJobPriority::Foreground)?;
+
+ Ok(())
+ })()
+ .map_err(|e| ApplySettings(format_error(&bcm, e)))?;
+
+ let (client, control) = InProcessMonitor::new(&mut job, monitor_interval_millis)
+ .map_err(|e| OtherBITS(format_error(&bcm, e)))?;
+
+ job.add_file(&url, &full_path.into_os_string())
+ .map_err(|e| AddFile(format_error(&bcm, e)))?;
+
+ job.resume().map_err(|e| Resume(format_error(&bcm, e)))?;
+
+ self.monitors.insert(guid.clone(), control);
+
+ Ok((StartJobSuccess { guid }, client))
+ }
+
+ pub fn monitor_job(
+ &mut self,
+ guid: Guid,
+ interval_millis: u32,
+ ) -> Result<InProcessMonitor, MonitorJobFailure> {
+ use MonitorJobFailure::*;
+
+ // Stop any preexisting monitor for the same guid.
+ let _ = self.stop_update(guid.clone());
+
+ let bcm;
+ let (client, control) =
+ InProcessMonitor::new(&mut get_job!(bcm, &guid, &self.job_name), interval_millis)
+ .map_err(|e| OtherBITS(format_error(&bcm, e)))?;
+
+ self.monitors.insert(guid, control);
+
+ Ok(client)
+ }
+
+ pub fn suspend_job(&mut self, guid: Guid) -> Result<(), SuspendJobFailure> {
+ use SuspendJobFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .suspend()
+ .map_err(|e| SuspendJob(format_error(&bcm, e)))?;
+
+ Ok(())
+ }
+
+ pub fn resume_job(&mut self, guid: Guid) -> Result<(), ResumeJobFailure> {
+ use ResumeJobFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .resume()
+ .map_err(|e| ResumeJob(format_error(&bcm, e)))?;
+
+ Ok(())
+ }
+
+ pub fn set_job_priority(
+ &mut self,
+ guid: Guid,
+ foreground: bool,
+ ) -> Result<(), SetJobPriorityFailure> {
+ use SetJobPriorityFailure::*;
+
+ let priority = if foreground {
+ BitsJobPriority::Foreground
+ } else {
+ BitsJobPriority::Normal
+ };
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .set_priority(priority)
+ .map_err(|e| ApplySettings(format_error(&bcm, e)))?;
+
+ Ok(())
+ }
+
+ pub fn set_no_progress_timeout(
+ &mut self,
+ guid: Guid,
+ timeout_secs: u32,
+ ) -> Result<(), SetNoProgressTimeoutFailure> {
+ use SetNoProgressTimeoutFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .set_no_progress_timeout(timeout_secs)
+ .map_err(|e| ApplySettings(format_error(&bcm, e)))?;
+
+ Ok(())
+ }
+
+ fn get_monitor_control_sender(&mut self, guid: Guid) -> Option<Arc<ControlPair>> {
+ if let hash_map::Entry::Occupied(occ) = self.monitors.entry(guid) {
+ if let Some(sender) = occ.get().0.upgrade() {
+ Some(sender)
+ } else {
+ // Remove dangling Weak
+ occ.remove_entry();
+ None
+ }
+ } else {
+ None
+ }
+ }
+
+ pub fn set_update_interval(
+ &mut self,
+ guid: Guid,
+ interval_millis: u32,
+ ) -> Result<(), SetUpdateIntervalFailure> {
+ use SetUpdateIntervalFailure::*;
+
+ if let Some(sender) = self.get_monitor_control_sender(guid) {
+ let mut s = sender.1.lock().unwrap();
+ s.interval_millis = interval_millis;
+ sender.0.notify_all();
+ Ok(())
+ } else {
+ Err(NotFound)
+ }
+ }
+
+ pub fn stop_update(&mut self, guid: Guid) -> Result<(), SetUpdateIntervalFailure> {
+ use SetUpdateIntervalFailure::*;
+
+ if let Some(sender) = self.get_monitor_control_sender(guid) {
+ sender.1.lock().unwrap().shutdown = true;
+ sender.0.notify_all();
+ Ok(())
+ } else {
+ Err(NotFound)
+ }
+ }
+
+ pub fn complete_job(&mut self, guid: Guid) -> Result<(), CompleteJobFailure> {
+ use CompleteJobFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .complete()
+ .map_err(|e| CompleteJob(format_error(&bcm, e)))
+ .and_then(|hr| {
+ if hr == BG_S_PARTIAL_COMPLETE as i32 {
+ Err(PartialComplete)
+ } else {
+ Ok(())
+ }
+ })?;
+
+ let _ = self.stop_update(guid);
+
+ Ok(())
+ }
+
+ pub fn cancel_job(&mut self, guid: Guid) -> Result<(), CancelJobFailure> {
+ use CancelJobFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .cancel()
+ .map_err(|e| CancelJob(format_error(&bcm, e)))?;
+
+ let _ = self.stop_update(guid);
+
+ Ok(())
+ }
+}
+
+// InProcessMonitor can be used on any thread, and `ControlPair` can be synchronously modified to
+// control a blocked `get_status` call from another thread.
+pub struct InProcessMonitor {
+ vars: Arc<ControlPair>,
+ guid: Guid,
+ last_status_time: Option<Instant>,
+ last_url: Option<ffi::OsString>,
+}
+
+// The `Condvar` is notified when `InProcessMonitorVars` changes.
+type ControlPair = (Condvar, Mutex<InProcessMonitorVars>);
+struct InProcessMonitorControl(Weak<ControlPair>);
+
+// RefUnwindSafe is not impl'd for Condvar but likely should be,
+// see https://github.com/rust-lang/rust/issues/54768
+impl std::panic::RefUnwindSafe for InProcessMonitorControl {}
+
+struct InProcessMonitorVars {
+ interval_millis: u32,
+ notified: bool,
+ shutdown: bool,
+}
+
+impl InProcessMonitor {
+ fn new(
+ job: &mut BitsJob,
+ interval_millis: u32,
+ ) -> Result<(InProcessMonitor, InProcessMonitorControl), comedy::HResult> {
+ let guid = job.guid()?;
+
+ let vars = Arc::new((
+ Condvar::new(),
+ Mutex::new(InProcessMonitorVars {
+ interval_millis,
+ notified: false,
+ shutdown: false,
+ }),
+ ));
+
+ let transferred_control = InProcessMonitorControl(Arc::downgrade(&vars));
+ let transferred_cb = Box::new(move || {
+ if let Some(control) = transferred_control.0.upgrade() {
+ if let Ok(mut vars) = control.1.lock() {
+ vars.notified = true;
+ control.0.notify_all();
+ return Ok(());
+ }
+ }
+ Err(E_FAIL)
+ });
+
+ let error_control = InProcessMonitorControl(Arc::downgrade(&vars));
+ let error_cb = Box::new(move || {
+ if let Some(control) = error_control.0.upgrade() {
+ if let Ok(mut vars) = control.1.lock() {
+ vars.notified = true;
+ control.0.notify_all();
+ return Ok(());
+ }
+ }
+ Err(E_FAIL)
+ });
+
+ // Note: These callbacks are never explicitly cleared. They will be freed when the
+ // job is deleted from BITS, and they will be cleared if an attempt is made to call them
+ // when they are no longer valid (e.g. after the process exits). This is done mostly for
+ // simplicity and should be safe.
+
+ job.register_callbacks(Some(transferred_cb), Some(error_cb), None)?;
+
+ let control = InProcessMonitorControl(Arc::downgrade(&vars));
+
+ let monitor = InProcessMonitor {
+ guid,
+ vars,
+ last_status_time: None,
+ last_url: None,
+ };
+
+ Ok((monitor, control))
+ }
+
+ pub fn get_status(
+ &mut self,
+ timeout_millis: u32,
+ ) -> Result<Result<JobStatus, HResultMessage>, Error> {
+ let timeout = Duration::from_millis(u64::from(timeout_millis));
+
+ let started = Instant::now();
+ let timeout_end = started + timeout;
+
+ {
+ let mut s = self.vars.1.lock().unwrap();
+ loop {
+ let wait_start = Instant::now();
+
+ if s.shutdown {
+ // Disconnected, immediately return error.
+ // Note: Shutdown takes priority over simultaneous notification.
+ return Err(Error::NotConnected);
+ }
+
+ if wait_start >= timeout_end {
+ // Timed out, immediately return timeout error.
+ // This should not normally happen with the in-process monitor, but the
+ // monitor interval could be longer than the timeout.
+ s.shutdown = true;
+ return Err(Error::Timeout);
+ }
+
+ // Get the interval every pass through the loop, in case it has changed.
+ let interval = Duration::from_millis(u64::from(s.interval_millis));
+
+ let wait_until = self
+ .last_status_time
+ .map(|last_status_time| cmp::min(last_status_time + interval, timeout_end));
+
+ if s.notified {
+ // Notified, exit loop to get status.
+ s.notified = false;
+ break;
+ }
+
+ if wait_until.is_none() {
+ // First status report, no waiting, exit loop to get status.
+ break;
+ }
+
+ let wait_until = wait_until.unwrap();
+
+ if wait_until <= wait_start {
+ // No time left to wait. This can't be due to timeout because
+ // `wait_until <= wait_start < timeout_end`.
+ // Status report due, exit loop to get status.
+ break;
+ }
+
+ // Wait.
+ // Do not attempt to recover from poisoned Mutex.
+ s = self
+ .vars
+ .0
+ .wait_timeout(s, wait_until - wait_start)
+ .unwrap()
+ .0;
+
+ // Mutex re-acquired, loop.
+ }
+ }
+
+ // No error yet, start getting status now.
+ self.last_status_time = Some(Instant::now());
+
+ let bcm = match BackgroundCopyManager::connect() {
+ Ok(bcm) => bcm,
+ Err(e) => {
+ // On any error, disconnect.
+ self.vars.1.lock().unwrap().shutdown = true;
+
+ // Errors below can use the BCM to do `format_error()`, but this one just gets the
+ // basic `comedy::HResult` treatment.
+ return Ok(Err(HResultMessage {
+ hr: e.code(),
+ message: format!("{}", e),
+ }));
+ }
+ };
+
+ Ok((|| {
+ let mut job = bcm.get_job_by_guid(&self.guid)?;
+
+ let status = job.get_status()?;
+ let url = job.get_first_file()?.get_remote_name()?;
+
+ Ok(JobStatus {
+ state: status.state,
+ progress: status.progress,
+ error_count: status.error_count,
+ error: status.error.map(|e| JobError {
+ context: e.context,
+ context_str: e.context_str,
+ error: HResultMessage {
+ hr: e.error,
+ message: e.error_str,
+ },
+ }),
+ times: status.times,
+ url: if self.last_url.is_some() && *self.last_url.as_ref().unwrap() == url {
+ None
+ } else {
+ self.last_url = Some(url);
+ self.last_url.clone()
+ },
+ })
+ })()
+ .map_err(|e| {
+ // On any error, disconnect.
+ self.vars.1.lock().unwrap().shutdown = true;
+ format_error(&bcm, e)
+ }))
+ }
+}
+
+#[cfg(test)]
+mod tests;
diff --git a/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs b/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs
new file mode 100644
index 0000000000..5545f6bd74
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs
@@ -0,0 +1,628 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+// These are full integration tests that use the BITS service.
+
+// TODO
+// It may make sense to restrict how many tests can run at once. BITS is only supposed to support
+// four simultaneous notifications per user, it is not impossible that this test suite could
+// exceed that.
+
+#![cfg(test)]
+extern crate bits;
+extern crate lazy_static;
+extern crate rand;
+extern crate regex;
+extern crate tempfile;
+
+use std::ffi::{OsStr, OsString};
+use std::fs::{self, File};
+use std::io::{Read, Write};
+use std::net::{SocketAddr, TcpListener, TcpStream};
+use std::panic;
+use std::sync::{Arc, Condvar, Mutex};
+use std::thread::{self, JoinHandle};
+use std::time::{Duration, Instant};
+
+use self::{
+ bits::BackgroundCopyManager,
+ lazy_static::lazy_static,
+ rand::{thread_rng, Rng},
+ regex::bytes::Regex,
+ tempfile::{Builder, TempDir},
+};
+use super::{
+ super::{BitsJobState, Error},
+ BitsProxyUsage, InProcessClient, StartJobSuccess,
+};
+
+static SERVER_ADDRESS: [u8; 4] = [127, 0, 0, 1];
+
+lazy_static! {
+ static ref TEST_MUTEX: Mutex<()> = Mutex::new(());
+}
+
+fn format_job_name(name: &str) -> OsString {
+ format!("InProcessClient Test {}", name).into()
+}
+
+fn format_dir_prefix(tmp_dir: &TempDir) -> OsString {
+ let mut dir = tmp_dir.path().to_path_buf().into_os_string();
+ dir.push("\\");
+ dir
+}
+
+fn cancel_jobs(name: &OsStr) {
+ BackgroundCopyManager::connect()
+ .unwrap()
+ .cancel_jobs_by_name(name)
+ .unwrap();
+}
+
+struct HttpServerResponses {
+ body: Box<[u8]>,
+ delay: u64,
+ //error: Box<[u8]>,
+}
+
+struct MockHttpServerHandle {
+ port: u16,
+ join: Option<JoinHandle<Result<(), ()>>>,
+ shutdown: Arc<(Mutex<bool>, Condvar)>,
+}
+
+impl MockHttpServerHandle {
+ fn shutdown(&mut self) {
+ if self.join.is_none() {
+ return;
+ }
+
+ {
+ let &(ref lock, ref cvar) = &*self.shutdown;
+ let mut shutdown = lock.lock().unwrap();
+
+ if !*shutdown {
+ *shutdown = true;
+ cvar.notify_all();
+ }
+ }
+ // Wake up the server from `accept()`. Will fail if the server wasn't listening.
+ let _ = TcpStream::connect_timeout(
+ &(SERVER_ADDRESS, self.port).into(),
+ Duration::from_millis(10_000),
+ );
+
+ match self.join.take().unwrap().join() {
+ Ok(_) => {}
+ Err(p) => panic::resume_unwind(p),
+ }
+ }
+
+ fn format_url(&self, name: &str) -> OsString {
+ format!(
+ "http://{}/{}",
+ SocketAddr::from((SERVER_ADDRESS, self.port)),
+ name
+ )
+ .into()
+ }
+}
+
+fn mock_http_server(name: &'static str, responses: HttpServerResponses) -> MockHttpServerHandle {
+ let mut bind_retries = 10;
+ let shutdown = Arc::new((Mutex::new(false), Condvar::new()));
+ let caller_shutdown = shutdown.clone();
+
+ let (listener, port) = loop {
+ let port = thread_rng().gen_range(1024..0x1_0000u32) as u16;
+ match TcpListener::bind(SocketAddr::from((SERVER_ADDRESS, port))) {
+ Ok(listener) => {
+ break (listener, port);
+ }
+ r @ Err(_) => {
+ if bind_retries == 0 {
+ r.unwrap();
+ }
+ bind_retries -= 1;
+ continue;
+ }
+ }
+ };
+
+ let join = thread::Builder::new()
+ .name(format!("mock_http_server {}", name))
+ .spawn(move || {
+ // returns Err(()) if server should shut down immediately
+ fn check_shutdown(shutdown: &Arc<(Mutex<bool>, Condvar)>) -> Result<(), ()> {
+ if *shutdown.0.lock().unwrap() {
+ Err(())
+ } else {
+ Ok(())
+ }
+ }
+ fn sleep(shutdown: &Arc<(Mutex<bool>, Condvar)>, delay_millis: u64) -> Result<(), ()> {
+ let sleep_start = Instant::now();
+ let sleep_end = sleep_start + Duration::from_millis(delay_millis);
+
+ let (ref lock, ref cvar) = **shutdown;
+ let mut shutdown_requested = lock.lock().unwrap();
+ loop {
+ if *shutdown_requested {
+ return Err(());
+ }
+
+ let before_wait = Instant::now();
+ if before_wait >= sleep_end {
+ return Ok(());
+ }
+ let wait_dur = sleep_end - before_wait;
+ shutdown_requested = cvar.wait_timeout(shutdown_requested, wait_dur).unwrap().0;
+ }
+ }
+
+ let error_404 = Regex::new(r"^((GET)|(HEAD)) [[:print:]]*/error_404 ").unwrap();
+ let error_500 = Regex::new(r"^((GET)|(HEAD)) [[:print:]]*/error_500 ").unwrap();
+
+ loop {
+ let (mut socket, _addr) = listener.accept().expect("accept should succeed");
+
+ socket
+ .set_read_timeout(Some(Duration::from_millis(10_000)))
+ .unwrap();
+ let mut s = Vec::new();
+ for b in Read::by_ref(&mut socket).bytes() {
+ if b.is_err() {
+ eprintln!("read error {:?}", b);
+ break;
+ }
+ let b = b.unwrap();
+ s.push(b);
+ if s.ends_with(b"\r\n\r\n") {
+ break;
+ }
+ check_shutdown(&shutdown)?;
+ }
+
+ // request received
+
+ check_shutdown(&shutdown)?;
+
+ // Special error URIs
+ if error_404.is_match(&s) {
+ sleep(&shutdown, responses.delay)?;
+ let result = socket
+ .write(b"HTTP/1.1 404 Not Found\r\n\r\n")
+ .and_then(|_| socket.flush());
+ if let Err(e) = result {
+ eprintln!("error writing 404 header {:?}", e);
+ }
+ continue;
+ }
+
+ if error_500.is_match(&s) {
+ sleep(&shutdown, responses.delay)?;
+ let result = socket
+ .write(b"HTTP/1.1 500 Internal Server Error\r\n\r\n")
+ .and_then(|_| socket.flush());
+ if let Err(e) = result {
+ eprintln!("error writing 500 header {:?}", e);
+ }
+ continue;
+ }
+
+ // Response with a body.
+ if s.starts_with(b"HEAD") || s.starts_with(b"GET") {
+ let result = socket
+ .write(
+ format!(
+ "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n",
+ responses.body.len()
+ )
+ .as_bytes(),
+ )
+ .and_then(|_| socket.flush());
+ if let Err(e) = result {
+ eprintln!("error writing header {:?}", e);
+ continue;
+ }
+ }
+
+ if s.starts_with(b"GET") {
+ sleep(&shutdown, responses.delay)?;
+ let result = socket.write(&responses.body).and_then(|_| socket.flush());
+ if let Err(e) = result {
+ eprintln!("error writing content {:?}", e);
+ continue;
+ }
+ }
+ }
+ });
+
+ MockHttpServerHandle {
+ port,
+ join: Some(join.unwrap()),
+ shutdown: caller_shutdown,
+ }
+}
+
+// Test wrapper to ensure jobs are canceled, set up name strings
+macro_rules! test {
+ (fn $name:ident($param:ident : &str, $tmpdir:ident : &TempDir) $body:block) => {
+ #[test]
+ fn $name() {
+ let $param = stringify!($name);
+ let $tmpdir = &Builder::new().prefix($param).tempdir().unwrap();
+
+ let result = panic::catch_unwind(|| $body);
+
+ cancel_jobs(&format_job_name($param));
+
+ if let Err(e) = result {
+ panic::resume_unwind(e);
+ }
+ }
+ };
+}
+
+test! {
+ fn start_monitor_and_cancel(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 10_000,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), tmp_dir.path().into()).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 10_000;
+ let timeout = 10_000;
+
+ let (StartJobSuccess {guid}, mut monitor) =
+ client.start_job(
+ server.format_url(name),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // cancel in ~250ms
+ let _join = thread::Builder::new()
+ .spawn(move || {
+ thread::sleep(Duration::from_millis(250));
+ client.cancel_job(guid).unwrap();
+ });
+
+ let start = Instant::now();
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // ~250ms the cancel should cause an immediate disconnect (otherwise we wouldn't get
+ // an update until 10s when the transfer completes or the interval expires)
+ match monitor.get_status(timeout) {
+ Err(Error::NotConnected) => {},
+ Ok(r) => panic!("unexpected success from get_status() {:?}", r),
+ Err(e) => panic!("unexpected failure from get_status() {:?}", e),
+ }
+ assert!(start.elapsed() < Duration::from_millis(9_000));
+
+ server.shutdown();
+ }
+}
+
+test! {
+ fn start_monitor_and_complete(name: &str, tmp_dir: &TempDir) {
+ let file_path = tmp_dir.path().join(name);
+
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 500,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 100;
+ let timeout = 10_000;
+
+ let (StartJobSuccess {guid}, mut monitor) =
+ client.start_job(
+ server.format_url(name),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ let start = Instant::now();
+
+ // Get status reports until transfer finishes (~500ms)
+ let mut completed = false;
+ loop {
+ match monitor.get_status(timeout) {
+ Err(e) => {
+ if completed {
+ break;
+ } else {
+ panic!("monitor failed before completion {:?}", e);
+ }
+ }
+ Ok(Ok(status)) => match BitsJobState::from(status.state) {
+ BitsJobState::Queued | BitsJobState::Connecting
+ | BitsJobState::Transferring => {
+ //eprintln!("{:?}", BitsJobState::from(status.state));
+ //eprintln!("{:?}", status);
+
+ // As long as there is no error, setting the timeout to 0 will not
+ // fail an active transfer.
+ client.set_no_progress_timeout(guid.clone(), 0).unwrap();
+ }
+ BitsJobState::Transferred => {
+ client.complete_job(guid.clone()).unwrap();
+ completed = true;
+ }
+ _ => {
+ panic!("{:?}", status);
+ }
+ }
+ Ok(Err(e)) => panic!("{:?}", e),
+ }
+
+ // Timeout to prevent waiting forever
+ assert!(start.elapsed() < Duration::from_millis(60_000));
+ }
+
+
+ // Verify file contents
+ let result = panic::catch_unwind(|| {
+ let mut file = File::open(file_path.clone()).unwrap();
+ let mut v = Vec::new();
+ file.read_to_end(&mut v).unwrap();
+ assert_eq!(v, name.as_bytes());
+ });
+
+ let _ = fs::remove_file(file_path);
+
+ if let Err(e) = result {
+ panic::resume_unwind(e);
+ }
+
+ // Save this for last to ensure the file is removed.
+ server.shutdown();
+ }
+}
+
+test! {
+ fn async_transferred_notification(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 250,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 60_000;
+ let timeout = 10_000;
+
+ let (_, mut monitor) =
+ client.start_job(
+ server.format_url(name),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // Start the timer now, the initial job creation may be delayed by BITS service startup.
+ let start = Instant::now();
+
+ // First report, immediate
+ let report_1 = monitor.get_status(timeout).expect("should initially be ok").unwrap();
+ let elapsed_to_report_1 = start.elapsed();
+
+ // Transferred notification should come when the job completes in ~250 ms, otherwise we
+ // will be stuck until timeout.
+ let report_2 = monitor.get_status(timeout).expect("should get status update").unwrap();
+ let elapsed_to_report_2 = start.elapsed();
+ assert!(elapsed_to_report_2 < Duration::from_millis(9_000));
+ assert_eq!(report_2.state, BitsJobState::Transferred);
+
+ let short_timeout = 500;
+ let report_3 = monitor.get_status(short_timeout);
+ let elapsed_to_report_3 = start.elapsed();
+
+ if let Ok(report_3) = report_3 {
+ panic!("should be disconnected\n\
+ report_1 ({}.{:03}): {:?}\n\
+ report_2 ({}.{:03}): {:?}\n\
+ report_3 ({}.{:03}): {:?}",
+ elapsed_to_report_1.as_secs(), elapsed_to_report_1.subsec_millis(), report_1,
+ elapsed_to_report_2.as_secs(), elapsed_to_report_2.subsec_millis(), report_2,
+ elapsed_to_report_3.as_secs(), elapsed_to_report_3.subsec_millis(), report_3,
+ );
+ }
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
+
+test! {
+ fn change_interval(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 1000,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 0;
+ let interval = 60_000;
+ let timeout = 10_000;
+
+ let (StartJobSuccess { guid }, mut monitor) =
+ client.start_job(
+ server.format_url(name),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ let start = Instant::now();
+
+ // reduce monitor interval in ~250ms to 500ms
+ let _join = thread::Builder::new()
+ .spawn(move || {
+ thread::sleep(Duration::from_millis(250));
+ client.set_update_interval(guid, 500).unwrap();
+ });
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // Next report should be rescheduled to 500ms by the spawned thread, otherwise no status
+ // until the original 10s interval.
+ monitor.get_status(timeout).expect("expected second status").unwrap();
+ assert!(start.elapsed() < Duration::from_millis(9_000));
+ assert!(start.elapsed() > Duration::from_millis(400));
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
+
+test! {
+ fn async_error_notification(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 100,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 60_000;
+ let timeout = 10_000;
+
+ let (_, mut monitor) =
+ client.start_job(
+ server.format_url("error_404"),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // Start the timer now, the initial job creation may be delayed by BITS service startup.
+ let start = Instant::now();
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // Error notification should come with HEAD response in 100ms, otherwise no status until
+ // 10s interval or timeout.
+ let status = monitor.get_status(timeout).expect("should get status update").unwrap();
+ assert!(start.elapsed() < Duration::from_millis(9_000));
+ assert_eq!(status.state, BitsJobState::Error);
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
+
+test! {
+ fn transient_error(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 100,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 1_000;
+ let timeout = 10_000;
+
+ let (StartJobSuccess { guid }, mut monitor) =
+ client.start_job(
+ server.format_url("error_500"),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // Start the timer now, the initial job creation may be delayed by BITS service startup.
+ let start = Instant::now();
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // Transient error notification should come when the interval expires in ~1s.
+ let status = monitor.get_status(timeout).expect("should get status update").unwrap();
+ assert!(start.elapsed() > Duration::from_millis(900));
+ assert!(start.elapsed() < Duration::from_millis(9_000));
+ assert_eq!(status.state, BitsJobState::TransientError);
+
+ // Lower no progress timeout to 0
+ let set_timeout_at = Instant::now();
+ client.set_no_progress_timeout(guid, 0).unwrap();
+
+ // Should convert the transient error to a permanent error immediately.
+ let status = monitor.get_status(timeout).expect("should get status update").unwrap();
+ assert!(set_timeout_at.elapsed() < Duration::from_millis(500));
+ assert_eq!(status.state, BitsJobState::Error);
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
+
+test! {
+ fn transient_to_permanent_error(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 100,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 0;
+ let interval = 1_000;
+ let timeout = 10_000;
+
+ let (_, mut monitor) =
+ client.start_job(
+ server.format_url("error_500"),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // Start the timer now, the initial job creation may be delayed by BITS service startup.
+ let start = Instant::now();
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // 500 is a transient error, but with no_progress_timeout_secs = 0 it should immediately
+ // produce an error notification with the HEAD response in 100ms. Otherwise no status
+ // until 10s interval or timeout.
+ let status = monitor.get_status(timeout).expect("should get status update").unwrap();
+ assert!(start.elapsed() < Duration::from_millis(500));
+ assert_eq!(status.state, BitsJobState::Error);
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
diff --git a/toolkit/components/bitsdownload/bits_client/src/lib.rs b/toolkit/components/bitsdownload/bits_client/src/lib.rs
new file mode 100644
index 0000000000..7db887bbc5
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/src/lib.rs
@@ -0,0 +1,258 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+//! An interface for managing and monitoring BITS jobs.
+//!
+//! BITS is a Windows service for performing downloads in the background, independent from an
+//! application, usually via HTTP/HTTPS.
+//!
+//! [`BitsClient`](enum.BitsClient.html) is the main interface, used to issue commands.
+//!
+//! [`BitsMonitorClient`](enum.BitsMonitorClient.html) delivers periodic status reports about a
+//! job.
+//!
+//! Microsoft's documentation for BITS can be found at
+//! <https://docs.microsoft.com/en-us/windows/desktop/Bits/background-intelligent-transfer-service-portal>
+
+extern crate bits;
+extern crate comedy;
+extern crate guid_win;
+extern crate thiserror;
+
+pub mod bits_protocol;
+
+mod in_process;
+
+use std::ffi;
+
+use bits_protocol::*;
+use thiserror::Error;
+
+pub use bits::status::{BitsErrorContext, BitsJobState, BitsJobTimes};
+pub use bits::{BitsJobProgress, BitsJobStatus, BitsProxyUsage};
+pub use bits_protocol::{JobError, JobStatus};
+pub use comedy::HResult;
+pub use guid_win::Guid;
+
+// These errors would come from a Local Service client but are mostly unused currently.
+// PipeError properly lives in the crate that deals with named pipes, but it isn't in use now.
+#[derive(Clone, Debug, Eq, Error, PartialEq)]
+pub enum PipeError {
+ #[error("Pipe is not connected")]
+ NotConnected,
+ #[error("Operation timed out")]
+ Timeout,
+ #[error("Should have written {0} bytes, wrote {1}")]
+ WriteCount(usize, u32),
+ #[error("Windows API error")]
+ Api(#[from] HResult),
+}
+
+pub use PipeError as Error;
+
+/// A client for interacting with BITS.
+///
+/// Methods on `BitsClient` return a `Result<Result<_, XyzFailure>, Error>`. The outer `Result`
+/// is `Err` if there was a communication error in sending the associated command or receiving
+/// its response. Currently this is always `Ok` as all clients are in-process. The inner
+/// `Result` is `Err` if there was an error executing the command.
+///
+/// A single `BitsClient` can be used with multiple BITS jobs simultaneously; generally a job
+/// is not bound tightly to a client.
+///
+/// A `BitsClient` tracks all [`BitsMonitorClient`s](enum.BitsMonitorClient.html) that it started
+/// with `start_job()` or `monitor_job()`, so that the monitor can be stopped or modified.
+pub enum BitsClient {
+ // The `InProcess` variant does all BITS calls directly.
+ #[doc(hidden)]
+ InProcess(in_process::InProcessClient),
+ // Space is reserved here for the LocalService variant, which will work through an external
+ // process running as Local Service.
+}
+
+use BitsClient::InProcess;
+
+impl BitsClient {
+ /// Create an in-process `BitsClient`.
+ ///
+ /// `job_name` will be used when creating jobs, and this `BitsClient` can only be used to
+ /// manipulate jobs with that name.
+ ///
+ /// `save_path_prefix` will be prepended to the local `save_path` given to `start_job()`, it
+ /// must name an existing directory.
+ pub fn new(
+ job_name: ffi::OsString,
+ save_path_prefix: ffi::OsString,
+ ) -> Result<BitsClient, Error> {
+ Ok(InProcess(in_process::InProcessClient::new(
+ job_name,
+ save_path_prefix,
+ )?))
+ }
+
+ /// Start a job to download a single file at `url` to local path `save_path` (relative to the
+ /// `save_path_prefix` given when constructing the `BitsClient`).
+ ///
+ /// `save_path_prefix` combined with `save_path` must name a file (existing or not) in an
+ /// existing directory, which must be under the directory named by `save_path_prefix`.
+ ///
+ /// `proxy_usage` determines what proxy will be used.
+ ///
+ /// When a successful result `Ok(result)` is returned, `result.0.guid` is the id for the
+ /// new job, and `result.1` is a monitor client that can be polled for periodic updates,
+ /// returning a result approximately once per `monitor_interval_millis` milliseconds.
+ pub fn start_job(
+ &mut self,
+ url: ffi::OsString,
+ save_path: ffi::OsString,
+ proxy_usage: BitsProxyUsage,
+ no_progress_timeout_secs: u32,
+ monitor_interval_millis: u32,
+ ) -> Result<Result<(StartJobSuccess, BitsMonitorClient), StartJobFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client
+ .start_job(
+ url,
+ save_path,
+ proxy_usage,
+ no_progress_timeout_secs,
+ monitor_interval_millis,
+ )
+ .map(|(success, monitor)| (success, BitsMonitorClient::InProcess(monitor)))),
+ }
+ }
+
+ /// Start monitoring the job with id `guid` approximately once per `monitor_interval_millis`
+ /// milliseconds.
+ ///
+ /// The returned `Ok(monitor)` is a monitor client to be polled for periodic updates.
+ ///
+ /// There can only be one ongoing `BitsMonitorClient` for each job associated with a given
+ /// `BitsClient`. If a monitor client already exists for the specified job, it will be stopped.
+ pub fn monitor_job(
+ &mut self,
+ guid: Guid,
+ interval_millis: u32,
+ ) -> Result<Result<BitsMonitorClient, MonitorJobFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client
+ .monitor_job(guid, interval_millis)
+ .map(BitsMonitorClient::InProcess)),
+ }
+ }
+
+ /// Suspend job `guid`.
+ pub fn suspend_job(&mut self, guid: Guid) -> Result<Result<(), SuspendJobFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.suspend_job(guid)),
+ }
+ }
+
+ /// Resume job `guid`.
+ pub fn resume_job(&mut self, guid: Guid) -> Result<Result<(), ResumeJobFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.resume_job(guid)),
+ }
+ }
+
+ /// Set the priority of job `guid`.
+ ///
+ /// `foreground == true` will set the priority to `BG_JOB_PRIORITY_FOREGROUND`,
+ /// `false` will use the default `BG_JOB_PRIORITY_NORMAL`.
+ /// See the Microsoft documentation for `BG_JOB_PRIORITY` for details.
+ ///
+ /// A job created by `start_job()` will be foreground priority, by default.
+ pub fn set_job_priority(
+ &mut self,
+ guid: Guid,
+ foreground: bool,
+ ) -> Result<Result<(), SetJobPriorityFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.set_job_priority(guid, foreground)),
+ }
+ }
+
+ /// Set the "no progress timeout" of job `guid`.
+ pub fn set_no_progress_timeout(
+ &mut self,
+ guid: Guid,
+ timeout_secs: u32,
+ ) -> Result<Result<(), SetNoProgressTimeoutFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.set_no_progress_timeout(guid, timeout_secs)),
+ }
+ }
+
+ /// Change the update interval for an ongoing monitor of job `guid`.
+ pub fn set_update_interval(
+ &mut self,
+ guid: Guid,
+ interval_millis: u32,
+ ) -> Result<Result<(), SetUpdateIntervalFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.set_update_interval(guid, interval_millis)),
+ }
+ }
+
+ /// Stop any ongoing monitor for job `guid`.
+ pub fn stop_update(
+ &mut self,
+ guid: Guid,
+ ) -> Result<Result<(), SetUpdateIntervalFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.stop_update(guid)),
+ }
+ }
+
+ /// Complete the job `guid`.
+ ///
+ /// This also stops any ongoing monitor for the job.
+ pub fn complete_job(&mut self, guid: Guid) -> Result<Result<(), CompleteJobFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.complete_job(guid)),
+ }
+ }
+
+ /// Cancel the job `guid`.
+ ///
+ /// This also stops any ongoing monitor for the job.
+ pub fn cancel_job(&mut self, guid: Guid) -> Result<Result<(), CancelJobFailure>, Error> {
+ match self {
+ InProcess(client) => Ok(client.cancel_job(guid)),
+ }
+ }
+}
+
+/// The client side of a monitor for a BITS job.
+///
+/// It is intended to be used by calling `get_status` in a loop to receive notifications about
+/// the status of a job. Because `get_status` blocks, it is recommended to run this loop on its
+/// own thread.
+pub enum BitsMonitorClient {
+ InProcess(in_process::InProcessMonitor),
+}
+
+impl BitsMonitorClient {
+ /// `get_status` will return a result approximately every `monitor_interval_millis`
+ /// milliseconds, but in case a result isn't available within `timeout_millis` milliseconds
+ /// this will return `Err(Error::Timeout)`. Any `Err` returned, including timeout, indicates
+ /// that the monitor has been stopped; the `BitsMonitorClient` should then be discarded.
+ ///
+ /// As with methods on `BitsClient`, `BitsMonitorClient::get_status()` has an inner `Result`
+ /// type which indicates an error returned from the server. Any `Err` here also indicates that
+ /// the monitor has stopped after yielding the result.
+ ///
+ /// The first time `get_status` is called it will return a status without any delay.
+ ///
+ /// If there is an error or the transfer completes, a result may be available sooner than
+ /// the monitor interval.
+ pub fn get_status(
+ &mut self,
+ timeout_millis: u32,
+ ) -> Result<Result<JobStatus, HResultMessage>, Error> {
+ match self {
+ BitsMonitorClient::InProcess(client) => client.get_status(timeout_millis),
+ }
+ }
+}