diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /toolkit/components/bitsdownload/bits_client | |
parent | Initial commit. (diff) | |
download | firefox-upstream.tar.xz firefox-upstream.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'toolkit/components/bitsdownload/bits_client')
13 files changed, 3084 insertions, 0 deletions
diff --git a/toolkit/components/bitsdownload/bits_client/.gitignore b/toolkit/components/bitsdownload/bits_client/.gitignore new file mode 100644 index 0000000000..d78faf4575 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/.gitignore @@ -0,0 +1,3 @@ +/target +**/*.rs.bk +**/.*.swp diff --git a/toolkit/components/bitsdownload/bits_client/Cargo.toml b/toolkit/components/bitsdownload/bits_client/Cargo.toml new file mode 100644 index 0000000000..eb0083d8e2 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "bits_client" +version = "0.2.0" +authors = ["Adam Gashlin <agashlin@mozilla.com>"] +license = "MPL-2.0" +publish = false + +[dependencies] +bits = { path = "./bits" } +comedy = "0.2.0" +guid_win = "0.2.0" +thiserror = "1" + +[dev-dependencies] +#ctrlc = "3.1.1" +lazy_static = "1.0.1" +rand = "0.8" +regex = { version = "1", default_features = false, features = ["perf", "std"] } +tempfile = "3" diff --git a/toolkit/components/bitsdownload/bits_client/README.md b/toolkit/components/bitsdownload/bits_client/README.md new file mode 100644 index 0000000000..0c21a5b68c --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/README.md @@ -0,0 +1,23 @@ +bits\_client +============ + +Interfaces for BITS. + +bits\_client lib +--------------- + +`bits_client` is the primary target and provides `BitsClient`, an API for creating and monitoring BITS jobs. + +`bits_client::new()` creates a `BitsClient` that does all operations within the current process, as the current user. + +bits crate +---------- + +`bits` is a safe interface to BITS, providing connections to the +Background Copy Manager, some basic operations on Background Copy Jobs, and +methods for implementing `IBackgroundCopyCallback`s in Rust. + +test\_client example +------------------- + +`examples/test_client.rs` shows how to use the API. diff --git a/toolkit/components/bitsdownload/bits_client/bits/Cargo.toml b/toolkit/components/bitsdownload/bits_client/bits/Cargo.toml new file mode 100644 index 0000000000..36b7b5ff89 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/bits/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "bits" +version = "0.2.0" +authors = ["Adam Gashlin <agashlin@mozilla.com>"] +license = "MIT/Apache-2.0" +publish = false + +[features] +status_serde = ["serde", "serde_derive"] + +[dependencies] +comedy = "0.2.0" +filetime_win = "0.2.0" +guid_win = "0.2.0" +serde = { version = "1.0.80", optional = true } +serde_derive = { version = "1.0.80", optional = true } + +[dependencies.winapi] +version = "0.3.7" +features = ["basetsd", + "bits", + "bits2_5", + "bitsmsg", + "guiddef", + "minwindef", + "ntdef", + "rpcndr", + "unknwnbase", + "winerror", + "winnls", + ] diff --git a/toolkit/components/bitsdownload/bits_client/bits/src/callback.rs b/toolkit/components/bitsdownload/bits_client/bits/src/callback.rs new file mode 100644 index 0000000000..6dace83be8 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/bits/src/callback.rs @@ -0,0 +1,205 @@ +// Licensed under the Apache License, Version 2.0 +// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option. +// All files in the project carrying such notice may not be copied, modified, or distributed +// except according to those terms. + +use std::panic::{catch_unwind, RefUnwindSafe}; +use std::ptr::NonNull; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use comedy::{com::ComRef, HResult}; +use guid_win::Guid; +use winapi::ctypes::c_void; +use winapi::shared::guiddef::REFIID; +use winapi::shared::minwindef::DWORD; +use winapi::shared::ntdef::ULONG; +use winapi::shared::winerror::{E_FAIL, E_NOINTERFACE, HRESULT, NOERROR, S_OK}; +use winapi::um::bits::{ + IBackgroundCopyCallback, IBackgroundCopyCallbackVtbl, IBackgroundCopyError, IBackgroundCopyJob, +}; +use winapi::um::unknwnbase::{IUnknown, IUnknownVtbl}; +use winapi::Interface; + +use BitsJob; + +/// The type of a notification callback. +/// +/// The callbacks must be `Fn()` to be called arbitrarily many times, `RefUnwindSafe` to have a +/// panic unwind safely caught, `Send`, `Sync` and `'static` to run on any thread COM invokes us on +/// any time. +/// +/// If the callback returns a non-success `HRESULT`, the notification may pass to other BITS +/// mechanisms such as `IBackgroundCopyJob2::SetNotifyCmdLine`. +pub type TransferredCallback = + dyn (Fn() -> Result<(), HRESULT>) + RefUnwindSafe + Send + Sync + 'static; +pub type ErrorCallback = dyn (Fn() -> Result<(), HRESULT>) + RefUnwindSafe + Send + Sync + 'static; +pub type ModificationCallback = + dyn (Fn() -> Result<(), HRESULT>) + RefUnwindSafe + Send + Sync + 'static; + +#[repr(C)] +pub struct BackgroundCopyCallback { + // Everything assumes that the interface vtable is the first member of this struct. + interface: IBackgroundCopyCallback, + rc: AtomicUsize, + transferred_cb: Option<Box<TransferredCallback>>, + error_cb: Option<Box<ErrorCallback>>, + modification_cb: Option<Box<ModificationCallback>>, +} + +impl BackgroundCopyCallback { + /// Construct the callback object and register it with a job. + /// + /// Only one notify interface can be present on a job at once, so this will release BITS' + /// ref to any previously registered interface. + pub fn register( + job: &mut BitsJob, + transferred_cb: Option<Box<TransferredCallback>>, + error_cb: Option<Box<ErrorCallback>>, + modification_cb: Option<Box<ModificationCallback>>, + ) -> Result<(), HResult> { + let cb = Box::new(BackgroundCopyCallback { + interface: IBackgroundCopyCallback { lpVtbl: &VTBL }, + rc: AtomicUsize::new(1), + transferred_cb, + error_cb, + modification_cb, + }); + + // Leak the callback, it has no Rust owner until we need to drop it later. + // The ComRef will Release when it goes out of scope. + unsafe { + let cb = ComRef::from_raw(NonNull::new_unchecked(Box::into_raw(cb) as *mut IUnknown)); + + job.set_notify_interface(cb.as_raw_ptr())?; + } + + Ok(()) + } +} + +extern "system" fn query_interface( + this: *mut IUnknown, + riid: REFIID, + obj: *mut *mut c_void, +) -> HRESULT { + unsafe { + // `IBackgroundCopyCallback` is the first (currently only) interface on the + // `BackgroundCopyCallback` object, so we can return `this` either as + // `IUnknown` or `IBackgroundCopyCallback`. + if Guid(*riid) == Guid(IUnknown::uuidof()) + || Guid(*riid) == Guid(IBackgroundCopyCallback::uuidof()) + { + addref(this); + // Cast first to `IBackgroundCopyCallback` to be clear which `IUnknown` + // we are pointing at. + *obj = this as *mut IBackgroundCopyCallback as *mut c_void; + NOERROR + } else { + E_NOINTERFACE + } + } +} + +extern "system" fn addref(raw_this: *mut IUnknown) -> ULONG { + unsafe { + let this = raw_this as *const BackgroundCopyCallback; + + // Forge a reference for just this statement. + let old_rc = (*this).rc.fetch_add(1, Ordering::SeqCst); + (old_rc + 1) as ULONG + } +} + +extern "system" fn release(raw_this: *mut IUnknown) -> ULONG { + unsafe { + { + let this = raw_this as *const BackgroundCopyCallback; + + // Forge a reference for just this statement. + let old_rc = (*this).rc.fetch_sub(1, Ordering::SeqCst); + + let rc = old_rc - 1; + if rc > 0 { + return rc as ULONG; + } + } + + // rc will have been 0 for us to get here, and we're out of scope of the reference above, + // so there should be no references or pointers left (besides `this`). + // Re-Box and to drop immediately. + let _ = Box::from_raw(raw_this as *mut BackgroundCopyCallback); + + 0 + } +} + +extern "system" fn transferred_stub( + raw_this: *mut IBackgroundCopyCallback, + _job: *mut IBackgroundCopyJob, +) -> HRESULT { + unsafe { + let this = raw_this as *const BackgroundCopyCallback; + // Forge a reference just for this statement. + if let Some(ref cb) = (*this).transferred_cb { + match catch_unwind(|| cb()) { + Ok(Ok(())) => S_OK, + Ok(Err(hr)) => hr, + Err(_) => E_FAIL, + } + } else { + S_OK + } + } +} + +extern "system" fn error_stub( + raw_this: *mut IBackgroundCopyCallback, + _job: *mut IBackgroundCopyJob, + _error: *mut IBackgroundCopyError, +) -> HRESULT { + unsafe { + let this = raw_this as *const BackgroundCopyCallback; + // Forge a reference just for this statement. + if let Some(ref cb) = (*this).error_cb { + match catch_unwind(|| cb()) { + Ok(Ok(())) => S_OK, + Ok(Err(hr)) => hr, + Err(_) => E_FAIL, + } + } else { + S_OK + } + } +} + +extern "system" fn modification_stub( + raw_this: *mut IBackgroundCopyCallback, + _job: *mut IBackgroundCopyJob, + _reserved: DWORD, +) -> HRESULT { + unsafe { + let this = raw_this as *const BackgroundCopyCallback; + // Forge a reference just for this statement. + if let Some(ref cb) = (*this).modification_cb { + match catch_unwind(|| cb()) { + Ok(Ok(())) => S_OK, + Ok(Err(hr)) => hr, + Err(_) => E_FAIL, + } + } else { + S_OK + } + } +} + +pub static VTBL: IBackgroundCopyCallbackVtbl = IBackgroundCopyCallbackVtbl { + parent: IUnknownVtbl { + QueryInterface: query_interface, + AddRef: addref, + Release: release, + }, + JobTransferred: transferred_stub, + JobError: error_stub, + JobModification: modification_stub, +}; diff --git a/toolkit/components/bitsdownload/bits_client/bits/src/lib.rs b/toolkit/components/bitsdownload/bits_client/bits/src/lib.rs new file mode 100644 index 0000000000..09da2b0349 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/bits/src/lib.rs @@ -0,0 +1,592 @@ +// Licensed under the Apache License, Version 2.0 +// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option. +// All files in the project carrying such notice may not be copied, modified, or distributed +// except according to those terms. + +//! A safe interface for BITS +//! +//! The primary entry point into BITS is the +//! [`BackgroundCopyManager`](struct.BackgroundCopyManager.html) struct. +//! +//! Functionality is only provided by this crate on an as-needed basis for +//! [bits_client](../bits_client/index.html), so there are vast swathes of the BITS API +//! unsupported. + +extern crate comedy; +extern crate filetime_win; +extern crate guid_win; +extern crate winapi; + +#[cfg(feature = "status_serde")] +extern crate serde; +#[cfg(feature = "status_serde")] +extern crate serde_derive; + +mod callback; +pub mod status; +mod wide; + +use std::ffi::{OsStr, OsString}; +use std::mem; +use std::os::windows::ffi::OsStringExt; +use std::ptr; +use std::result; + +use comedy::com::{create_instance_local_server, CoTaskMem, ComRef, INIT_MTA}; +use comedy::error::{HResult, ResultExt}; +use comedy::{com_call, com_call_getter, com_call_taskmem_getter}; +use filetime_win::FileTime; +use guid_win::Guid; +use winapi::shared::minwindef::DWORD; +use winapi::shared::ntdef::{HRESULT, LANGIDFROMLCID, ULONG}; +use winapi::shared::winerror::S_FALSE; +use winapi::um::bits::{ + IBackgroundCopyError, IBackgroundCopyFile, IBackgroundCopyJob, IBackgroundCopyManager, + IEnumBackgroundCopyFiles, IEnumBackgroundCopyJobs, BG_JOB_PRIORITY, BG_JOB_PRIORITY_FOREGROUND, + BG_JOB_PRIORITY_HIGH, BG_JOB_PRIORITY_LOW, BG_JOB_PRIORITY_NORMAL, BG_JOB_PROXY_USAGE, + BG_JOB_PROXY_USAGE_AUTODETECT, BG_JOB_PROXY_USAGE_NO_PROXY, BG_JOB_PROXY_USAGE_PRECONFIG, + BG_JOB_STATE_ERROR, BG_JOB_STATE_TRANSIENT_ERROR, BG_JOB_TYPE_DOWNLOAD, BG_NOTIFY_DISABLE, + BG_NOTIFY_JOB_ERROR, BG_NOTIFY_JOB_MODIFICATION, BG_NOTIFY_JOB_TRANSFERRED, BG_SIZE_UNKNOWN, +}; +use winapi::um::bits2_5::{IBackgroundCopyJobHttpOptions, BG_HTTP_REDIRECT_POLICY_ALLOW_REPORT}; +use winapi::um::bitsmsg::BG_E_NOT_FOUND; +use winapi::um::unknwnbase::IUnknown; +use winapi::um::winnls::GetThreadLocale; + +pub use winapi::um::bits::{BG_ERROR_CONTEXT, BG_JOB_STATE}; +pub use winapi::um::bitsmsg::{BG_S_PARTIAL_COMPLETE, BG_S_UNABLE_TO_DELETE_FILES}; + +pub use status::{ + BitsErrorContext, BitsJobError, BitsJobProgress, BitsJobState, BitsJobStatus, BitsJobTimes, +}; +use wide::ToWideNull; + +pub use winapi::shared::winerror::E_FAIL; + +#[repr(u32)] +#[derive(Copy, Clone, Debug)] +pub enum BitsJobPriority { + Foreground = BG_JOB_PRIORITY_FOREGROUND, + High = BG_JOB_PRIORITY_HIGH, + /// Default + Normal = BG_JOB_PRIORITY_NORMAL, + Low = BG_JOB_PRIORITY_LOW, +} + +#[repr(u32)] +#[derive(Copy, Clone, Debug)] +pub enum BitsProxyUsage { + /// Directly access the network. + NoProxy = BG_JOB_PROXY_USAGE_NO_PROXY, + /// Use Internet Explorer proxy settings. This is the default. + Preconfig = BG_JOB_PROXY_USAGE_PRECONFIG, + /// Attempt to auto-detect the connection's proxy settings. + AutoDetect = BG_JOB_PROXY_USAGE_AUTODETECT, +} + +type Result<T> = result::Result<T, HResult>; + +pub struct BackgroundCopyManager(ComRef<IBackgroundCopyManager>); + +impl BackgroundCopyManager { + /// Get access to the local BITS service. + /// + /// # COM Initialization and Threading Model # + /// + /// This method uses a thread local variable to initialize COM with a multithreaded apartment + /// model for this thread, and leaves it this way until the thread local is dropped. + /// If the thread was in a single-threaded apartment, `connect()` will fail gracefully. + /// + /// # Safety # + /// + /// If there are mismatched `CoUninitialize` calls on this thread which lead to COM shutting + /// down before this thread ends, unsafe behavior may result. + pub fn connect() -> Result<BackgroundCopyManager> { + INIT_MTA.with(|com| { + if let Err(e) = com { + return Err(e.clone()); + } + Ok(()) + })?; + + // Assuming no mismatched CoUninitialize calls, methods do not have to check for + // successfully initialized COM once the object is constructed: `BackgroundCopyManager` + // is not `Send` or `Sync` so it must be used on the thread it was constructed on, + // which has now successfully inited MTA for the lifetime of thread local `INIT_MTA`. + // This also holds for any functions using pointers only derived from these methods, like + // the `BitsJob` methods. + + Ok(BackgroundCopyManager(create_instance_local_server::< + winapi::um::bits::BackgroundCopyManager, + IBackgroundCopyManager, + >()?)) + } + + /// Create a new download job with the given name. + pub fn create_job(&self, display_name: &OsStr) -> Result<BitsJob> { + unsafe { + let mut guid = mem::zeroed(); + Ok(BitsJob(com_call_getter!( + |job| self.0, + IBackgroundCopyManager::CreateJob( + display_name.to_wide_null().as_ptr(), + BG_JOB_TYPE_DOWNLOAD, + &mut guid, + job, + ) + )?)) + } + } + + /// Cancel all jobs with the given name. + /// + /// This only attempts to cancel jobs owned by the current user. + /// No errors are returned for jobs that failed to cancel. + pub fn cancel_jobs_by_name(&self, match_name: &OsStr) -> Result<()> { + let jobs = + unsafe { com_call_getter!(|jobs| self.0, IBackgroundCopyManager::EnumJobs(0, jobs))? }; + + loop { + let result = unsafe { + com_call_getter!( + |job| jobs, + IEnumBackgroundCopyJobs::Next(1, job, ptr::null_mut()) + ) + }; + match result { + Ok(job) => { + if job_name_eq(&job, match_name)? { + unsafe { + let _ = com_call!(job, IBackgroundCopyJob::Cancel()); + } + } + } + Err(e) => { + if e.code() == S_FALSE { + // Ran out of jobs to enumerate + return Ok(()); + } else { + return Err(e); + } + } + } + } + } + + /// Get the job with the given GUID. + /// + /// Returns Err if the job was not found. + pub fn get_job_by_guid(&self, guid: &Guid) -> Result<BitsJob> { + unsafe { com_call_getter!(|job| self.0, IBackgroundCopyManager::GetJob(&guid.0, job)) } + .map(BitsJob) + } + + /// Try to find a job with a given GUID. + /// + /// Returns Ok(None) if the job was not found but there was no other error. + pub fn find_job_by_guid(&self, guid: &Guid) -> Result<Option<BitsJob>> { + Ok(self + .get_job_by_guid(guid) + .map(Some) + .allow_err(BG_E_NOT_FOUND as i32, None)?) + } + + /// Try to find a job with a given GUID and name. + /// + /// Returns Ok(None) if the job was not found, or if it had the wrong name, as long as there + /// was no other error. + pub fn find_job_by_guid_and_name( + &self, + guid: &Guid, + match_name: &OsStr, + ) -> Result<Option<BitsJob>> { + Ok(match self.find_job_by_guid(guid)? { + None => None, + Some(BitsJob(ref job)) if !job_name_eq(job, match_name)? => None, + result => result, + }) + } + + /// Translate a BITS `HRESULT` to a textual description. + /// + /// This uses the current thread's locale to look up the message associated with a BITS + /// error. It should only be used for `HRESULT`s returned from BITS COM interfaces. + pub fn get_error_description(&self, hr: HRESULT) -> Result<String> { + unsafe { + let language_id = DWORD::from(LANGIDFROMLCID(GetThreadLocale())); + + Ok(taskmem_into_lossy_string(com_call_taskmem_getter!( + |desc| self.0, + IBackgroundCopyManager::GetErrorDescription(hr, language_id, desc) + )?)) + } + } +} + +unsafe fn taskmem_into_lossy_string(taskmem: CoTaskMem<u16>) -> String { + OsString::from_wide(taskmem.as_slice_until_null()) + .to_string_lossy() + .into_owned() +} + +fn job_name_eq(job: &ComRef<IBackgroundCopyJob>, match_name: &OsStr) -> Result<bool> { + let job_name = unsafe { + OsString::from_wide( + com_call_taskmem_getter!(|name| job, IBackgroundCopyJob::GetDisplayName(name))? + .as_slice_until_null(), + ) + }; + + Ok(job_name == match_name) +} + +pub struct BitsJob(ComRef<IBackgroundCopyJob>); + +impl BitsJob { + /// Get the job's GUID. + pub fn guid(&self) -> Result<Guid> { + // TODO: cache on create or retrieved by GUID? + unsafe { + let mut guid = mem::zeroed(); + com_call!(self.0, IBackgroundCopyJob::GetId(&mut guid))?; + Ok(Guid(guid)) + } + } + + /// Add a file to the job. + pub fn add_file(&mut self, remote_url: &OsStr, local_file: &OsStr) -> Result<()> { + unsafe { + com_call!( + self.0, + IBackgroundCopyJob::AddFile( + remote_url.to_wide_null().as_ptr(), + local_file.to_wide_null().as_ptr(), + ) + ) + }?; + Ok(()) + } + + /// Get the first file in the job. + /// + /// This is provided for collecting the redirected remote name of single file jobs. + pub fn get_first_file(&mut self) -> Result<BitsFile> { + let files = unsafe { com_call_getter!(|e| self.0, IBackgroundCopyJob::EnumFiles(e))? }; + + let file = unsafe { + com_call_getter!( + |file| files, + IEnumBackgroundCopyFiles::Next(1, file, ptr::null_mut()) + )? + }; + + Ok(BitsFile(file)) + } + + /// Set the job's description string. + /// + /// This is different from the display name set when creating the job. + pub fn set_description(&mut self, description: &OsStr) -> Result<()> { + unsafe { + com_call!( + self.0, + IBackgroundCopyJob::SetDescription(description.to_wide_null().as_ptr()) + ) + }?; + Ok(()) + } + + /// Change the job's proxy usage setting. + /// + /// The default is `BitsProxyUsage::Preconfig`. + pub fn set_proxy_usage(&mut self, usage: BitsProxyUsage) -> Result<()> { + use BitsProxyUsage::*; + + match usage { + Preconfig | NoProxy | AutoDetect => { + unsafe { + com_call!( + self.0, + IBackgroundCopyJob::SetProxySettings( + usage as BG_JOB_PROXY_USAGE, + ptr::null_mut(), + ptr::null_mut(), + ) + ) + }?; + Ok(()) + } + } + } + + /// Change the job's priority. + /// + /// The default is `BitsJobPriority::Normal`. + pub fn set_priority(&mut self, priority: BitsJobPriority) -> Result<()> { + unsafe { + com_call!( + self.0, + IBackgroundCopyJob::SetPriority(priority as BG_JOB_PRIORITY) + ) + }?; + Ok(()) + } + + pub fn set_minimum_retry_delay(&mut self, seconds: ULONG) -> Result<()> { + unsafe { com_call!(self.0, IBackgroundCopyJob::SetMinimumRetryDelay(seconds)) }?; + Ok(()) + } + + pub fn set_no_progress_timeout(&mut self, seconds: ULONG) -> Result<()> { + unsafe { com_call!(self.0, IBackgroundCopyJob::SetNoProgressTimeout(seconds)) }?; + Ok(()) + } + + /// Enable HTTP redirect reporting. + /// + /// The default setting is to allow HTTP redirects, but to not report them in any way. With + /// this setting enabled, the remote name of a file will be updated to reflect the redirect. + /// + /// # Compatibility # + /// + /// First available in Windows Vista. + pub fn set_redirect_report(&mut self) -> Result<()> { + unsafe { + com_call!( + self.0.cast()?, + IBackgroundCopyJobHttpOptions::SetSecurityFlags( + BG_HTTP_REDIRECT_POLICY_ALLOW_REPORT + ) + ) + }?; + + Ok(()) + } + + /// Resume the job. This must be done at least once to initially enqueue the job. + pub fn resume(&mut self) -> Result<()> { + unsafe { com_call!(self.0, IBackgroundCopyJob::Resume()) }?; + Ok(()) + } + + pub fn suspend(&mut self) -> Result<()> { + unsafe { com_call!(self.0, IBackgroundCopyJob::Suspend()) }?; + Ok(()) + } + + /// Complete the job, moving the local files to their final names. + /// + /// Has two interesting success `HRESULT`s: `BG_S_PARTIAL_COMPLETE` and + /// `BG_S_UNABLE_TO_DELETE_FILES`. + pub fn complete(&mut self) -> Result<HRESULT> { + unsafe { com_call!(self.0, IBackgroundCopyJob::Complete()) } + } + + /// Cancel the job, deleting any temporary files. + /// + /// Has an interesting success `HRESULT`: `BG_S_UNABLE_TO_DELETE_FILES`. + pub fn cancel(&mut self) -> Result<HRESULT> { + unsafe { com_call!(self.0, IBackgroundCopyJob::Cancel()) } + } + + /// Set the notification callbacks to use with this job. + /// + /// This will replace any previously set callbacks. + pub fn register_callbacks( + &mut self, + transferred_cb: Option<Box<callback::TransferredCallback>>, + error_cb: Option<Box<callback::ErrorCallback>>, + modification_cb: Option<Box<callback::ModificationCallback>>, + ) -> Result<()> { + let mut flags = 0; + if transferred_cb.is_some() { + flags |= BG_NOTIFY_JOB_TRANSFERRED; + } + if error_cb.is_some() { + flags |= BG_NOTIFY_JOB_ERROR; + } + if modification_cb.is_some() { + flags |= BG_NOTIFY_JOB_MODIFICATION; + } + + callback::BackgroundCopyCallback::register( + self, + transferred_cb, + error_cb, + modification_cb, + )?; + + unsafe { com_call!(self.0, IBackgroundCopyJob::SetNotifyFlags(flags)) }?; + + Ok(()) + } + + fn _clear_callbacks(&mut self) -> Result<()> { + unsafe { + com_call!( + self.0, + IBackgroundCopyJob::SetNotifyFlags(BG_NOTIFY_DISABLE) + )?; + + self.set_notify_interface(ptr::null_mut() as *mut IUnknown) + } + } + + /// Collect the current status of the job, including errors. + pub fn get_status(&self) -> Result<BitsJobStatus> { + let mut state = 0; + let mut progress = unsafe { mem::zeroed() }; + let mut error_count = 0; + let mut times = unsafe { mem::zeroed() }; + + unsafe { + com_call!(self.0, IBackgroundCopyJob::GetState(&mut state))?; + com_call!(self.0, IBackgroundCopyJob::GetProgress(&mut progress))?; + com_call!(self.0, IBackgroundCopyJob::GetErrorCount(&mut error_count))?; + com_call!(self.0, IBackgroundCopyJob::GetTimes(&mut times))?; + } + + Ok(BitsJobStatus { + state: BitsJobState::from(state), + progress: BitsJobProgress { + total_bytes: if progress.BytesTotal == BG_SIZE_UNKNOWN { + None + } else { + Some(progress.BytesTotal) + }, + transferred_bytes: progress.BytesTransferred, + total_files: progress.FilesTotal, + transferred_files: progress.FilesTransferred, + }, + error_count, + error: if state == BG_JOB_STATE_ERROR || state == BG_JOB_STATE_TRANSIENT_ERROR { + let error_obj = + unsafe { com_call_getter!(|e| self.0, IBackgroundCopyJob::GetError(e)) }?; + + Some(BitsJob::get_error(error_obj)?) + } else { + None + }, + times: BitsJobTimes { + creation: FileTime(times.CreationTime), + modification: FileTime(times.ModificationTime), + transfer_completion: if times.TransferCompletionTime.dwLowDateTime == 0 + && times.TransferCompletionTime.dwHighDateTime == 0 + { + None + } else { + Some(FileTime(times.TransferCompletionTime)) + }, + }, + }) + } + + fn get_error(error_obj: ComRef<IBackgroundCopyError>) -> Result<BitsJobError> { + let mut context = 0; + let mut hresult = 0; + unsafe { + com_call!( + error_obj, + IBackgroundCopyError::GetError(&mut context, &mut hresult) + )?; + + let language_id = DWORD::from(LANGIDFROMLCID(GetThreadLocale())); + + let context = BitsErrorContext::from(context); + let context_str = com_call_taskmem_getter!( + |desc| error_obj, + IBackgroundCopyError::GetErrorContextDescription(language_id, desc) + ) + .map(|s| taskmem_into_lossy_string(s)) + .unwrap_or_else(|_| format!("{:?}", context)); + let error_str = com_call_taskmem_getter!( + |desc| error_obj, + IBackgroundCopyError::GetErrorDescription(language_id, desc) + ) + .map(|s| taskmem_into_lossy_string(s)) + .unwrap_or_else(|_| format!("{:#08x}", hresult)); + + Ok(BitsJobError { + context, + context_str, + error: hresult, + error_str, + }) + } + } + + unsafe fn set_notify_interface(&self, interface: *mut IUnknown) -> Result<()> { + com_call!(self.0, IBackgroundCopyJob::SetNotifyInterface(interface))?; + Ok(()) + } +} + +pub struct BitsFile(ComRef<IBackgroundCopyFile>); + +/// A single file in a BITS job. +/// +/// This is provided for collecting the redirected remote name. +impl BitsFile { + /// Get the remote name from which the file is being downloaded. + /// + /// If [`BitsJob::set_redirect_report()`](struct.BitsJob.html#method.set_redirect_report) + /// hasn't been called on the job, this won't be + /// updated as HTTP redirects are processed. + pub fn get_remote_name(&self) -> Result<OsString> { + unsafe { + Ok(OsString::from_wide( + com_call_taskmem_getter!(|name| self.0, IBackgroundCopyFile::GetRemoteName(name))? + .as_slice_until_null(), + )) + } + } +} + +#[cfg(test)] +mod test { + use super::BackgroundCopyManager; + use std::ffi::OsString; + use std::mem; + + #[test] + #[ignore] + fn test_find_job() { + let bcm = BackgroundCopyManager::connect().unwrap(); + let name = OsString::from("bits test job"); + let wrong_name = OsString::from("bits test jobbo"); + + let mut job = bcm.create_job(&name).unwrap(); + let guid = job.guid().unwrap(); + + assert_eq!( + bcm.find_job_by_guid(&guid) + .unwrap() + .unwrap() + .guid() + .unwrap(), + guid + ); + assert_eq!( + bcm.find_job_by_guid_and_name(&guid, &name) + .unwrap() + .unwrap() + .guid() + .unwrap(), + guid + ); + assert!(bcm + .find_job_by_guid_and_name(&guid, &wrong_name) + .unwrap() + .is_none()); + + job.cancel().unwrap(); + mem::drop(job); + + assert!(bcm.find_job_by_guid(&guid).unwrap().is_none()); + assert!(bcm + .find_job_by_guid_and_name(&guid, &name) + .unwrap() + .is_none()); + } +} diff --git a/toolkit/components/bitsdownload/bits_client/bits/src/status.rs b/toolkit/components/bitsdownload/bits_client/bits/src/status.rs new file mode 100644 index 0000000000..648a26866e --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/bits/src/status.rs @@ -0,0 +1,118 @@ +// Licensed under the Apache License, Version 2.0 +// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option. +// All files in the project carrying such notice may not be copied, modified, or distributed +// except according to those terms. + +//! Data types for reporting a job's status + +use filetime_win::FileTime; +use winapi::shared::winerror::HRESULT; +use winapi::um::bits::{BG_ERROR_CONTEXT, BG_JOB_STATE}; + +#[cfg(feature = "status_serde")] +use serde_derive::{Deserialize, Serialize}; + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "status_serde", derive(Serialize, Deserialize))] +pub struct BitsJobStatus { + pub state: BitsJobState, + pub progress: BitsJobProgress, + pub error_count: u32, + pub error: Option<BitsJobError>, + pub times: BitsJobTimes, +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "status_serde", derive(Serialize, Deserialize))] +pub struct BitsJobError { + pub context: BitsErrorContext, + pub context_str: String, + pub error: HRESULT, + pub error_str: String, +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[cfg_attr(feature = "status_serde", derive(Serialize, Deserialize))] +pub enum BitsErrorContext { + None, + Unknown, + GeneralQueueManager, + QueueManagerNotification, + LocalFile, + RemoteFile, + GeneralTransport, + RemoteApplication, + /// No other values are documented + Other(BG_ERROR_CONTEXT), +} + +impl From<BG_ERROR_CONTEXT> for BitsErrorContext { + fn from(ec: BG_ERROR_CONTEXT) -> BitsErrorContext { + use self::BitsErrorContext::*; + use winapi::um::bits; + match ec { + bits::BG_ERROR_CONTEXT_NONE => None, + bits::BG_ERROR_CONTEXT_UNKNOWN => Unknown, + bits::BG_ERROR_CONTEXT_GENERAL_QUEUE_MANAGER => GeneralQueueManager, + bits::BG_ERROR_CONTEXT_QUEUE_MANAGER_NOTIFICATION => QueueManagerNotification, + bits::BG_ERROR_CONTEXT_LOCAL_FILE => LocalFile, + bits::BG_ERROR_CONTEXT_REMOTE_FILE => RemoteFile, + bits::BG_ERROR_CONTEXT_GENERAL_TRANSPORT => GeneralTransport, + bits::BG_ERROR_CONTEXT_REMOTE_APPLICATION => RemoteApplication, + ec => Other(ec), + } + } +} + +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[cfg_attr(feature = "status_serde", derive(Serialize, Deserialize))] +pub enum BitsJobState { + Queued, + Connecting, + Transferring, + Suspended, + Error, + TransientError, + Transferred, + Acknowledged, + Cancelled, + /// No other values are documented + Other(BG_JOB_STATE), +} + +impl From<BG_JOB_STATE> for BitsJobState { + fn from(s: BG_JOB_STATE) -> BitsJobState { + use self::BitsJobState::*; + use winapi::um::bits; + match s { + bits::BG_JOB_STATE_QUEUED => Queued, + bits::BG_JOB_STATE_CONNECTING => Connecting, + bits::BG_JOB_STATE_TRANSFERRING => Transferring, + bits::BG_JOB_STATE_SUSPENDED => Suspended, + bits::BG_JOB_STATE_ERROR => Error, + bits::BG_JOB_STATE_TRANSIENT_ERROR => TransientError, + bits::BG_JOB_STATE_TRANSFERRED => Transferred, + bits::BG_JOB_STATE_ACKNOWLEDGED => Acknowledged, + bits::BG_JOB_STATE_CANCELLED => Cancelled, + s => Other(s), + } + } +} + +#[derive(Copy, Clone, Debug)] +#[cfg_attr(feature = "status_serde", derive(Serialize, Deserialize))] +pub struct BitsJobProgress { + pub total_bytes: Option<u64>, + pub transferred_bytes: u64, + pub total_files: u32, + pub transferred_files: u32, +} + +#[derive(Copy, Clone, Debug)] +#[cfg_attr(feature = "status_serde", derive(Serialize, Deserialize))] +pub struct BitsJobTimes { + pub creation: FileTime, + pub modification: FileTime, + pub transfer_completion: Option<FileTime>, +} diff --git a/toolkit/components/bitsdownload/bits_client/bits/src/wide.rs b/toolkit/components/bitsdownload/bits_client/bits/src/wide.rs new file mode 100644 index 0000000000..c108f8d629 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/bits/src/wide.rs @@ -0,0 +1,38 @@ +// Licensed under the Apache License, Version 2.0 +// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option. +// All files in the project carrying such notice may not be copied, modified, or distributed +// except according to those terms. + +// Minimal null-terminated wide string support from wio. + +use std::ffi::{OsStr, OsString}; +use std::os::windows::ffi::{OsStrExt, OsStringExt}; +use std::slice; + +pub trait ToWideNull { + fn to_wide_null(&self) -> Vec<u16>; +} + +impl<T: AsRef<OsStr>> ToWideNull for T { + fn to_wide_null(&self) -> Vec<u16> { + self.as_ref().encode_wide().chain(Some(0)).collect() + } +} + +pub trait FromWidePtrNull { + unsafe fn from_wide_ptr_null(wide: *const u16) -> Self; +} + +impl FromWidePtrNull for OsString { + unsafe fn from_wide_ptr_null(wide: *const u16) -> Self { + assert!(!wide.is_null()); + + for i in 0.. { + if *wide.offset(i) == 0 { + return Self::from_wide(&slice::from_raw_parts(wide, i as usize)); + } + } + unreachable!() + } +} diff --git a/toolkit/components/bitsdownload/bits_client/examples/test_client.rs b/toolkit/components/bitsdownload/bits_client/examples/test_client.rs new file mode 100644 index 0000000000..25d7a0d82e --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/examples/test_client.rs @@ -0,0 +1,285 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +extern crate bits_client; +extern crate comedy; +//extern crate ctrlc; +extern crate guid_win; +extern crate thiserror; + +use std::env; +use std::ffi::{OsStr, OsString}; +use std::process; +use std::str::FromStr; +use std::sync::{Arc, Mutex}; + +use thiserror::Error; + +use bits_client::bits_protocol::HResultMessage; +use bits_client::{BitsClient, BitsJobState, BitsMonitorClient, BitsProxyUsage, Guid, PipeError}; + +#[derive(Debug, Error)] +enum MyError { + #[error("{0}")] + Msg(String), + #[error("HResult")] + HResult(#[from] comedy::HResult), + #[error("Win32Error")] + Win32Error(#[from] comedy::Win32Error), + #[error("PipeError")] + PipeError(#[from] PipeError), + #[error("HResultMessage")] + HResultMessage(#[from] HResultMessage), +} + +macro_rules! bail { + ($e:expr) => { + return Err($crate::MyError::Msg($e.to_string())) + }; + ($fmt:expr, $($arg:tt)*) => { + return Err($crate::MyError::Msg(format!($fmt, $($arg)*))) + }; +} + +type Result = std::result::Result<(), MyError>; + +pub fn main() { + if let Err(err) = entry() { + eprintln!("{}", err); + let mut err: &dyn std::error::Error = &err; + while let Some(source) = err.source() { + eprintln!("caused by {}", source); + err = source; + } + + process::exit(1); + } else { + println!("OK"); + } +} + +const EXE_NAME: &'static str = "test_client"; + +fn usage() -> String { + format!( + concat!( + "Usage {0} <command> ", + "[local-service] ", + "[args...]\n", + "Commands:\n", + " bits-start <URL> <local file>\n", + " bits-monitor <GUID>\n", + " bits-bg <GUID>\n", + " bits-fg <GUID>\n", + " bits-suspend <GUID>\n", + " bits-resume <GUID>\n", + " bits-complete <GUID>\n", + " bits-cancel <GUID> ...\n" + ), + EXE_NAME + ) +} + +fn entry() -> Result { + let args: Vec<_> = env::args_os().collect(); + + let mut client = BitsClient::new( + OsString::from("bits_client test"), + OsString::from("C:\\ProgramData"), + )?; + + if args.len() < 2 { + eprintln!("{}", usage()); + bail!("not enough arguments"); + } + + let cmd = &*args[1].to_string_lossy(); + let cmd_args = &args[2..]; + + match cmd { + // command line client for testing + "bits-start" if cmd_args.len() == 2 => bits_start( + Arc::new(Mutex::new(client)), + cmd_args[0].clone(), + cmd_args[1].clone(), + BitsProxyUsage::Preconfig, + ), + "bits-monitor" if cmd_args.len() == 1 => { + bits_monitor(Arc::new(Mutex::new(client)), &cmd_args[0]) + } + // TODO: some way of testing set update interval + "bits-bg" if cmd_args.len() == 1 => bits_bg(&mut client, &cmd_args[0]), + "bits-fg" if cmd_args.len() == 1 => bits_fg(&mut client, &cmd_args[0]), + "bits-suspend" if cmd_args.len() == 1 => bits_suspend(&mut client, &cmd_args[0]), + "bits-resume" if cmd_args.len() == 1 => bits_resume(&mut client, &cmd_args[0]), + "bits-complete" if cmd_args.len() == 1 => bits_complete(&mut client, &cmd_args[0]), + "bits-cancel" if cmd_args.len() >= 1 => { + for guid in cmd_args { + bits_cancel(&mut client, guid)?; + } + Ok(()) + } + _ => { + eprintln!("{}", usage()); + bail!("usage error"); + } + } +} + +fn bits_start( + client: Arc<Mutex<BitsClient>>, + url: OsString, + save_path: OsString, + proxy_usage: BitsProxyUsage, +) -> Result { + //let interval = 10 * 60 * 1000; + let no_progress_timeout_secs = 60; + let interval = 1000; + + let result = client.lock().unwrap().start_job( + url, + save_path, + proxy_usage, + no_progress_timeout_secs, + interval, + )?; + + match result { + Ok((r, monitor_client)) => { + println!("start success, guid = {}", r.guid); + client + .lock() + .unwrap() + .set_update_interval(r.guid.clone(), interval)? + .unwrap(); + monitor_loop(client, monitor_client, r.guid.clone(), interval)?; + Ok(()) + } + Err(e) => { + let _ = e.clone(); + bail!("error from server {}", e) + } + } +} + +fn bits_monitor(client: Arc<Mutex<BitsClient>>, guid: &OsStr) -> Result { + let guid = Guid::from_str(&guid.to_string_lossy())?; + let result = client.lock().unwrap().monitor_job(guid.clone(), 1000)?; + match result { + Ok(monitor_client) => { + println!("monitor success"); + monitor_loop(client, monitor_client, guid, 1000)?; + Ok(()) + } + Err(e) => bail!("error from server {}", e), + } +} + +fn _check_client_send() +where + BitsClient: Send, +{ +} +fn _check_monitor_send() +where + BitsMonitorClient: Send, +{ +} + +fn monitor_loop( + _client: Arc<Mutex<BitsClient>>, + mut monitor_client: BitsMonitorClient, + _guid: Guid, + wait_millis: u32, +) -> Result { + /* + // Commented out to avoid vendoring ctrlc. + // This could also possibly be done with `exclude` in the mozilla-central `Cargo.toml`. + let client_for_handler = _client.clone(); + ctrlc::set_handler(move || { + eprintln!("Ctrl-C!"); + let _ = client_for_handler.lock().unwrap().stop_update(_guid.clone()); + }) + .expect("Error setting Ctrl-C handler"); + */ + + loop { + let status = monitor_client.get_status(wait_millis * 10)??; + + println!("{:?} {:?}", BitsJobState::from(status.state), status); + + //println!("{}", job.get_first_file()?.get_remote_name()?.into_string().unwrap()); + let transfer_completion_time = if let Some(ft) = status.times.transfer_completion { + format!("Some({})", ft.to_system_time_utc()?) + } else { + String::from("None") + }; + println!( + "creation: {}, modification: {}, transfer completion: {}", + status.times.creation.to_system_time_utc()?, + status.times.modification.to_system_time_utc()?, + transfer_completion_time + ); + + match BitsJobState::from(status.state) { + BitsJobState::Connecting + | BitsJobState::Transferring + | BitsJobState::TransientError => {} + _ => break, + } + } + println!("monitor loop ending"); + println!("sleeping..."); + std::thread::sleep(std::time::Duration::from_secs(1)); + + Ok(()) +} + +fn bits_bg(client: &mut BitsClient, guid: &OsStr) -> Result { + bits_set_priority(client, guid, false) +} + +fn bits_fg(client: &mut BitsClient, guid: &OsStr) -> Result { + bits_set_priority(client, guid, true) +} + +fn bits_set_priority(client: &mut BitsClient, guid: &OsStr, foreground: bool) -> Result { + let guid = Guid::from_str(&guid.to_string_lossy())?; + match client.set_job_priority(guid, foreground)? { + Ok(()) => Ok(()), + Err(e) => bail!("error from server {}", e), + } +} + +fn bits_suspend(client: &mut BitsClient, guid: &OsStr) -> Result { + let guid = Guid::from_str(&guid.to_string_lossy())?; + match client.suspend_job(guid)? { + Ok(()) => Ok(()), + Err(e) => bail!("error from server {}", e), + } +} + +fn bits_resume(client: &mut BitsClient, guid: &OsStr) -> Result { + let guid = Guid::from_str(&guid.to_string_lossy())?; + match client.resume_job(guid)? { + Ok(()) => Ok(()), + Err(e) => bail!("error from server {}", e), + } +} + +fn bits_complete(client: &mut BitsClient, guid: &OsStr) -> Result { + let guid = Guid::from_str(&guid.to_string_lossy())?; + match client.complete_job(guid)? { + Ok(()) => Ok(()), + Err(e) => bail!("error from server {}", e), + } +} + +fn bits_cancel(client: &mut BitsClient, guid: &OsStr) -> Result { + let guid = Guid::from_str(&guid.to_string_lossy())?; + match client.cancel_job(guid)? { + Ok(()) => Ok(()), + Err(e) => bail!("error from server {}", e), + } +} diff --git a/toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs b/toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs new file mode 100644 index 0000000000..6f19f5ef23 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs @@ -0,0 +1,380 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +//! Command, response, and status types. + +use std::error::Error as StdError; +use std::ffi::OsString; +use std::fmt; +use std::result; + +use guid_win::Guid; +use thiserror::Error; + +use super::{BitsErrorContext, BitsJobProgress, BitsJobState, BitsJobTimes, BitsProxyUsage}; + +type HRESULT = i32; + +/// An HRESULT with a descriptive message +#[derive(Clone, Debug)] +pub struct HResultMessage { + pub hr: HRESULT, + pub message: String, +} + +impl fmt::Display for HResultMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> { + self.message.fmt(f) + } +} + +impl StdError for HResultMessage {} + +/// Commands which can be sent to the server. +/// +/// This is currently unused as the out-of-process Local Service server is not finished. +#[doc(hidden)] +#[derive(Clone, Debug)] +pub enum Command { + StartJob(StartJobCommand), + MonitorJob(MonitorJobCommand), + SuspendJob(SuspendJobCommand), + ResumeJob(ResumeJobCommand), + SetJobPriority(SetJobPriorityCommand), + SetNoProgressTimeout(SetNoProgressTimeoutCommand), + SetUpdateInterval(SetUpdateIntervalCommand), + CompleteJob(CompleteJobCommand), + CancelJob(CancelJobCommand), +} + +/// Combine a [`Command`](enum.Command.html) with its success and failure result types. +#[doc(hidden)] +pub trait CommandType { + type Success; + type Failure: StdError; + fn wrap(command: Self) -> Command; +} + +// Start Job +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct StartJobCommand { + pub url: OsString, + pub save_path: OsString, + pub proxy_usage: BitsProxyUsage, + pub no_progress_timeout_secs: u32, + pub monitor: Option<MonitorConfig>, +} + +impl CommandType for StartJobCommand { + type Success = StartJobSuccess; + type Failure = StartJobFailure; + fn wrap(cmd: Self) -> Command { + Command::StartJob(cmd) + } +} + +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct MonitorConfig { + pub pipe_name: OsString, + pub interval_millis: u32, +} + +#[derive(Clone, Debug)] +pub struct StartJobSuccess { + pub guid: Guid, +} + +#[derive(Clone, Debug, Error)] +pub enum StartJobFailure { + #[error("Argument validation failed: {0}")] + ArgumentValidation(String), + #[error("Create job: {0}")] + Create(HResultMessage), + #[error("Add file to job: {0}")] + AddFile(HResultMessage), + #[error("Apply settings to job: {0}")] + ApplySettings(HResultMessage), + #[error("Resume job: {0}")] + Resume(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Monitor Job +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct MonitorJobCommand { + pub guid: Guid, + pub monitor: MonitorConfig, +} + +impl CommandType for MonitorJobCommand { + type Success = (); + type Failure = MonitorJobFailure; + fn wrap(cmd: Self) -> Command { + Command::MonitorJob(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum MonitorJobFailure { + #[error("Argument validation failed: {0}")] + ArgumentValidation(String), + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Suspend Job +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct SuspendJobCommand { + pub guid: Guid, +} + +impl CommandType for SuspendJobCommand { + type Success = (); + type Failure = SuspendJobFailure; + fn wrap(cmd: Self) -> Command { + Command::SuspendJob(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum SuspendJobFailure { + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Suspend job: {0}")] + SuspendJob(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Resume Job +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct ResumeJobCommand { + pub guid: Guid, +} + +impl CommandType for ResumeJobCommand { + type Success = (); + type Failure = ResumeJobFailure; + fn wrap(cmd: Self) -> Command { + Command::ResumeJob(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum ResumeJobFailure { + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Resume job: {0}")] + ResumeJob(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Set Job Priority +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct SetJobPriorityCommand { + pub guid: Guid, + pub foreground: bool, +} + +impl CommandType for SetJobPriorityCommand { + type Success = (); + type Failure = SetJobPriorityFailure; + fn wrap(cmd: Self) -> Command { + Command::SetJobPriority(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum SetJobPriorityFailure { + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Apply settings to job: {0}")] + ApplySettings(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Set No Progress Timeout +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct SetNoProgressTimeoutCommand { + pub guid: Guid, + pub timeout_secs: u32, +} + +impl CommandType for SetNoProgressTimeoutCommand { + type Success = (); + type Failure = SetNoProgressTimeoutFailure; + fn wrap(cmd: Self) -> Command { + Command::SetNoProgressTimeout(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum SetNoProgressTimeoutFailure { + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Apply settings to job: {0}")] + ApplySettings(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Set Update Interval +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct SetUpdateIntervalCommand { + pub guid: Guid, + pub interval_millis: u32, +} + +impl CommandType for SetUpdateIntervalCommand { + type Success = (); + type Failure = SetUpdateIntervalFailure; + fn wrap(cmd: Self) -> Command { + Command::SetUpdateInterval(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum SetUpdateIntervalFailure { + #[error("Argument validation: {0}")] + ArgumentValidation(String), + #[error("Monitor not found")] + NotFound, + #[error("Other failure: {0}")] + Other(String), +} + +// Complete Job +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct CompleteJobCommand { + pub guid: Guid, +} + +impl CommandType for CompleteJobCommand { + type Success = (); + type Failure = CompleteJobFailure; + fn wrap(cmd: Self) -> Command { + Command::CompleteJob(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum CompleteJobFailure { + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Complete job: {0}")] + CompleteJob(HResultMessage), + #[error("Job only partially completed")] + PartialComplete, + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Cancel Job +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct CancelJobCommand { + pub guid: Guid, +} + +impl CommandType for CancelJobCommand { + type Success = (); + type Failure = CancelJobFailure; + fn wrap(cmd: Self) -> Command { + Command::CancelJob(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum CancelJobFailure { + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Cancel job: {0}")] + CancelJob(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +/// Job status report +/// +/// This includes a URL which updates with redirect but is otherwise the same as +/// `bits::status::BitsJobStatus`. +#[derive(Clone, Debug)] +pub struct JobStatus { + pub state: BitsJobState, + pub progress: BitsJobProgress, + pub error_count: u32, + pub error: Option<JobError>, + pub times: BitsJobTimes, + /// None means same as last time + pub url: Option<OsString>, +} + +/// Job error report +#[derive(Clone, Debug, Error)] +#[error("Job error in context {context_str}: {error}")] +pub struct JobError { + pub context: BitsErrorContext, + pub context_str: String, + pub error: HResultMessage, +} diff --git a/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs b/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs new file mode 100644 index 0000000000..c7f6869167 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs @@ -0,0 +1,504 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +use std::cmp; +use std::collections::{hash_map, HashMap}; +use std::ffi; +use std::path; +use std::sync::{Arc, Condvar, Mutex, Weak}; +use std::time::{Duration, Instant}; + +use bits::{ + BackgroundCopyManager, BitsJob, BitsJobPriority, BitsProxyUsage, BG_S_PARTIAL_COMPLETE, E_FAIL, +}; +use guid_win::Guid; + +use bits_protocol::*; + +use super::Error; + +// This is a macro in order to use the NotFound and GetJob variants from whatever enum is in scope. +macro_rules! get_job { + ($bcm:ident, $guid:expr, $name:expr) => {{ + $bcm = BackgroundCopyManager::connect().map_err(|e| { + ConnectBcm(HResultMessage { + hr: e.code(), + message: e.to_string(), + }) + })?; + $bcm.find_job_by_guid_and_name($guid, $name) + .map_err(|e| GetJob($crate::in_process::format_error(&$bcm, e)))? + .ok_or(NotFound)? + }}; +} + +fn format_error(bcm: &BackgroundCopyManager, error: comedy::HResult) -> HResultMessage { + let bits_description = bcm.get_error_description(error.code()).ok(); + + HResultMessage { + hr: error.code(), + message: if let Some(desc) = bits_description { + format!("{}: {}", error, desc) + } else { + format!("{}", error) + }, + } +} + +// The in-process client uses direct BITS calls via the `bits` crate. +// See the corresponding functions in BitsClient. +pub struct InProcessClient { + job_name: ffi::OsString, + save_path_prefix: path::PathBuf, + monitors: HashMap<Guid, InProcessMonitorControl>, +} + +impl InProcessClient { + pub fn new( + job_name: ffi::OsString, + save_path_prefix: ffi::OsString, + ) -> Result<InProcessClient, Error> { + Ok(InProcessClient { + job_name, + save_path_prefix: path::PathBuf::from(save_path_prefix), + monitors: HashMap::new(), + }) + } + + pub fn start_job( + &mut self, + url: ffi::OsString, + save_path: ffi::OsString, + proxy_usage: BitsProxyUsage, + no_progress_timeout_secs: u32, + monitor_interval_millis: u32, + ) -> Result<(StartJobSuccess, InProcessMonitor), StartJobFailure> { + use StartJobFailure::*; + + let full_path = self.save_path_prefix.join(save_path); + + // Verify that `full_path` is under the directory called `save_path_prefix`. + { + let canonical_prefix = self.save_path_prefix.canonicalize().map_err(|e| { + ArgumentValidation(format!("save_path_prefix.canonicalize(): {}", e)) + })?; + // Full path minus file name, canonicalize() fails with nonexistent files, but the + // parent directory ought to exist. + let canonical_full_path = full_path + .parent() + .ok_or_else(|| ArgumentValidation("full_path.parent(): None".into()))? + .canonicalize() + .map_err(|e| { + ArgumentValidation(format!("full_path.parent().canonicalize(): {}", e)) + })?; + + if !canonical_full_path.starts_with(&canonical_prefix) { + return Err(ArgumentValidation(format!( + "{:?} is not within {:?}", + canonical_full_path, canonical_prefix + ))); + } + } + + // TODO: Should the job be explicitly cleaned up if this fn can't return success? + // If the job is dropped before `AddFile` succeeds, I think it automatically gets + // deleted from the queue. There is only one fallible call after that (`Resume`). + + let bcm = BackgroundCopyManager::connect().map_err(|e| { + ConnectBcm(HResultMessage { + hr: e.code(), + message: e.to_string(), + }) + })?; + let mut job = bcm + .create_job(&self.job_name) + .map_err(|e| Create(format_error(&bcm, e)))?; + + let guid = job.guid().map_err(|e| OtherBITS(format_error(&bcm, e)))?; + + (|| { + job.set_proxy_usage(proxy_usage)?; + job.set_minimum_retry_delay(60)?; + job.set_no_progress_timeout(no_progress_timeout_secs)?; + job.set_redirect_report()?; + + job.set_priority(BitsJobPriority::Foreground)?; + + Ok(()) + })() + .map_err(|e| ApplySettings(format_error(&bcm, e)))?; + + let (client, control) = InProcessMonitor::new(&mut job, monitor_interval_millis) + .map_err(|e| OtherBITS(format_error(&bcm, e)))?; + + job.add_file(&url, &full_path.into_os_string()) + .map_err(|e| AddFile(format_error(&bcm, e)))?; + + job.resume().map_err(|e| Resume(format_error(&bcm, e)))?; + + self.monitors.insert(guid.clone(), control); + + Ok((StartJobSuccess { guid }, client)) + } + + pub fn monitor_job( + &mut self, + guid: Guid, + interval_millis: u32, + ) -> Result<InProcessMonitor, MonitorJobFailure> { + use MonitorJobFailure::*; + + // Stop any preexisting monitor for the same guid. + let _ = self.stop_update(guid.clone()); + + let bcm; + let (client, control) = + InProcessMonitor::new(&mut get_job!(bcm, &guid, &self.job_name), interval_millis) + .map_err(|e| OtherBITS(format_error(&bcm, e)))?; + + self.monitors.insert(guid, control); + + Ok(client) + } + + pub fn suspend_job(&mut self, guid: Guid) -> Result<(), SuspendJobFailure> { + use SuspendJobFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .suspend() + .map_err(|e| SuspendJob(format_error(&bcm, e)))?; + + Ok(()) + } + + pub fn resume_job(&mut self, guid: Guid) -> Result<(), ResumeJobFailure> { + use ResumeJobFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .resume() + .map_err(|e| ResumeJob(format_error(&bcm, e)))?; + + Ok(()) + } + + pub fn set_job_priority( + &mut self, + guid: Guid, + foreground: bool, + ) -> Result<(), SetJobPriorityFailure> { + use SetJobPriorityFailure::*; + + let priority = if foreground { + BitsJobPriority::Foreground + } else { + BitsJobPriority::Normal + }; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .set_priority(priority) + .map_err(|e| ApplySettings(format_error(&bcm, e)))?; + + Ok(()) + } + + pub fn set_no_progress_timeout( + &mut self, + guid: Guid, + timeout_secs: u32, + ) -> Result<(), SetNoProgressTimeoutFailure> { + use SetNoProgressTimeoutFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .set_no_progress_timeout(timeout_secs) + .map_err(|e| ApplySettings(format_error(&bcm, e)))?; + + Ok(()) + } + + fn get_monitor_control_sender(&mut self, guid: Guid) -> Option<Arc<ControlPair>> { + if let hash_map::Entry::Occupied(occ) = self.monitors.entry(guid) { + if let Some(sender) = occ.get().0.upgrade() { + Some(sender) + } else { + // Remove dangling Weak + occ.remove_entry(); + None + } + } else { + None + } + } + + pub fn set_update_interval( + &mut self, + guid: Guid, + interval_millis: u32, + ) -> Result<(), SetUpdateIntervalFailure> { + use SetUpdateIntervalFailure::*; + + if let Some(sender) = self.get_monitor_control_sender(guid) { + let mut s = sender.1.lock().unwrap(); + s.interval_millis = interval_millis; + sender.0.notify_all(); + Ok(()) + } else { + Err(NotFound) + } + } + + pub fn stop_update(&mut self, guid: Guid) -> Result<(), SetUpdateIntervalFailure> { + use SetUpdateIntervalFailure::*; + + if let Some(sender) = self.get_monitor_control_sender(guid) { + sender.1.lock().unwrap().shutdown = true; + sender.0.notify_all(); + Ok(()) + } else { + Err(NotFound) + } + } + + pub fn complete_job(&mut self, guid: Guid) -> Result<(), CompleteJobFailure> { + use CompleteJobFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .complete() + .map_err(|e| CompleteJob(format_error(&bcm, e))) + .and_then(|hr| { + if hr == BG_S_PARTIAL_COMPLETE as i32 { + Err(PartialComplete) + } else { + Ok(()) + } + })?; + + let _ = self.stop_update(guid); + + Ok(()) + } + + pub fn cancel_job(&mut self, guid: Guid) -> Result<(), CancelJobFailure> { + use CancelJobFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .cancel() + .map_err(|e| CancelJob(format_error(&bcm, e)))?; + + let _ = self.stop_update(guid); + + Ok(()) + } +} + +// InProcessMonitor can be used on any thread, and `ControlPair` can be synchronously modified to +// control a blocked `get_status` call from another thread. +pub struct InProcessMonitor { + vars: Arc<ControlPair>, + guid: Guid, + last_status_time: Option<Instant>, + last_url: Option<ffi::OsString>, +} + +// The `Condvar` is notified when `InProcessMonitorVars` changes. +type ControlPair = (Condvar, Mutex<InProcessMonitorVars>); +struct InProcessMonitorControl(Weak<ControlPair>); + +// RefUnwindSafe is not impl'd for Condvar but likely should be, +// see https://github.com/rust-lang/rust/issues/54768 +impl std::panic::RefUnwindSafe for InProcessMonitorControl {} + +struct InProcessMonitorVars { + interval_millis: u32, + notified: bool, + shutdown: bool, +} + +impl InProcessMonitor { + fn new( + job: &mut BitsJob, + interval_millis: u32, + ) -> Result<(InProcessMonitor, InProcessMonitorControl), comedy::HResult> { + let guid = job.guid()?; + + let vars = Arc::new(( + Condvar::new(), + Mutex::new(InProcessMonitorVars { + interval_millis, + notified: false, + shutdown: false, + }), + )); + + let transferred_control = InProcessMonitorControl(Arc::downgrade(&vars)); + let transferred_cb = Box::new(move || { + if let Some(control) = transferred_control.0.upgrade() { + if let Ok(mut vars) = control.1.lock() { + vars.notified = true; + control.0.notify_all(); + return Ok(()); + } + } + Err(E_FAIL) + }); + + let error_control = InProcessMonitorControl(Arc::downgrade(&vars)); + let error_cb = Box::new(move || { + if let Some(control) = error_control.0.upgrade() { + if let Ok(mut vars) = control.1.lock() { + vars.notified = true; + control.0.notify_all(); + return Ok(()); + } + } + Err(E_FAIL) + }); + + // Note: These callbacks are never explicitly cleared. They will be freed when the + // job is deleted from BITS, and they will be cleared if an attempt is made to call them + // when they are no longer valid (e.g. after the process exits). This is done mostly for + // simplicity and should be safe. + + job.register_callbacks(Some(transferred_cb), Some(error_cb), None)?; + + let control = InProcessMonitorControl(Arc::downgrade(&vars)); + + let monitor = InProcessMonitor { + guid, + vars, + last_status_time: None, + last_url: None, + }; + + Ok((monitor, control)) + } + + pub fn get_status( + &mut self, + timeout_millis: u32, + ) -> Result<Result<JobStatus, HResultMessage>, Error> { + let timeout = Duration::from_millis(u64::from(timeout_millis)); + + let started = Instant::now(); + let timeout_end = started + timeout; + + { + let mut s = self.vars.1.lock().unwrap(); + loop { + let wait_start = Instant::now(); + + if s.shutdown { + // Disconnected, immediately return error. + // Note: Shutdown takes priority over simultaneous notification. + return Err(Error::NotConnected); + } + + if wait_start >= timeout_end { + // Timed out, immediately return timeout error. + // This should not normally happen with the in-process monitor, but the + // monitor interval could be longer than the timeout. + s.shutdown = true; + return Err(Error::Timeout); + } + + // Get the interval every pass through the loop, in case it has changed. + let interval = Duration::from_millis(u64::from(s.interval_millis)); + + let wait_until = self + .last_status_time + .map(|last_status_time| cmp::min(last_status_time + interval, timeout_end)); + + if s.notified { + // Notified, exit loop to get status. + s.notified = false; + break; + } + + if wait_until.is_none() { + // First status report, no waiting, exit loop to get status. + break; + } + + let wait_until = wait_until.unwrap(); + + if wait_until <= wait_start { + // No time left to wait. This can't be due to timeout because + // `wait_until <= wait_start < timeout_end`. + // Status report due, exit loop to get status. + break; + } + + // Wait. + // Do not attempt to recover from poisoned Mutex. + s = self + .vars + .0 + .wait_timeout(s, wait_until - wait_start) + .unwrap() + .0; + + // Mutex re-acquired, loop. + } + } + + // No error yet, start getting status now. + self.last_status_time = Some(Instant::now()); + + let bcm = match BackgroundCopyManager::connect() { + Ok(bcm) => bcm, + Err(e) => { + // On any error, disconnect. + self.vars.1.lock().unwrap().shutdown = true; + + // Errors below can use the BCM to do `format_error()`, but this one just gets the + // basic `comedy::HResult` treatment. + return Ok(Err(HResultMessage { + hr: e.code(), + message: format!("{}", e), + })); + } + }; + + Ok((|| { + let mut job = bcm.get_job_by_guid(&self.guid)?; + + let status = job.get_status()?; + let url = job.get_first_file()?.get_remote_name()?; + + Ok(JobStatus { + state: status.state, + progress: status.progress, + error_count: status.error_count, + error: status.error.map(|e| JobError { + context: e.context, + context_str: e.context_str, + error: HResultMessage { + hr: e.error, + message: e.error_str, + }, + }), + times: status.times, + url: if self.last_url.is_some() && *self.last_url.as_ref().unwrap() == url { + None + } else { + self.last_url = Some(url); + self.last_url.clone() + }, + }) + })() + .map_err(|e| { + // On any error, disconnect. + self.vars.1.lock().unwrap().shutdown = true; + format_error(&bcm, e) + })) + } +} + +#[cfg(test)] +mod tests; diff --git a/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs b/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs new file mode 100644 index 0000000000..5545f6bd74 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs @@ -0,0 +1,628 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +// These are full integration tests that use the BITS service. + +// TODO +// It may make sense to restrict how many tests can run at once. BITS is only supposed to support +// four simultaneous notifications per user, it is not impossible that this test suite could +// exceed that. + +#![cfg(test)] +extern crate bits; +extern crate lazy_static; +extern crate rand; +extern crate regex; +extern crate tempfile; + +use std::ffi::{OsStr, OsString}; +use std::fs::{self, File}; +use std::io::{Read, Write}; +use std::net::{SocketAddr, TcpListener, TcpStream}; +use std::panic; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +use self::{ + bits::BackgroundCopyManager, + lazy_static::lazy_static, + rand::{thread_rng, Rng}, + regex::bytes::Regex, + tempfile::{Builder, TempDir}, +}; +use super::{ + super::{BitsJobState, Error}, + BitsProxyUsage, InProcessClient, StartJobSuccess, +}; + +static SERVER_ADDRESS: [u8; 4] = [127, 0, 0, 1]; + +lazy_static! { + static ref TEST_MUTEX: Mutex<()> = Mutex::new(()); +} + +fn format_job_name(name: &str) -> OsString { + format!("InProcessClient Test {}", name).into() +} + +fn format_dir_prefix(tmp_dir: &TempDir) -> OsString { + let mut dir = tmp_dir.path().to_path_buf().into_os_string(); + dir.push("\\"); + dir +} + +fn cancel_jobs(name: &OsStr) { + BackgroundCopyManager::connect() + .unwrap() + .cancel_jobs_by_name(name) + .unwrap(); +} + +struct HttpServerResponses { + body: Box<[u8]>, + delay: u64, + //error: Box<[u8]>, +} + +struct MockHttpServerHandle { + port: u16, + join: Option<JoinHandle<Result<(), ()>>>, + shutdown: Arc<(Mutex<bool>, Condvar)>, +} + +impl MockHttpServerHandle { + fn shutdown(&mut self) { + if self.join.is_none() { + return; + } + + { + let &(ref lock, ref cvar) = &*self.shutdown; + let mut shutdown = lock.lock().unwrap(); + + if !*shutdown { + *shutdown = true; + cvar.notify_all(); + } + } + // Wake up the server from `accept()`. Will fail if the server wasn't listening. + let _ = TcpStream::connect_timeout( + &(SERVER_ADDRESS, self.port).into(), + Duration::from_millis(10_000), + ); + + match self.join.take().unwrap().join() { + Ok(_) => {} + Err(p) => panic::resume_unwind(p), + } + } + + fn format_url(&self, name: &str) -> OsString { + format!( + "http://{}/{}", + SocketAddr::from((SERVER_ADDRESS, self.port)), + name + ) + .into() + } +} + +fn mock_http_server(name: &'static str, responses: HttpServerResponses) -> MockHttpServerHandle { + let mut bind_retries = 10; + let shutdown = Arc::new((Mutex::new(false), Condvar::new())); + let caller_shutdown = shutdown.clone(); + + let (listener, port) = loop { + let port = thread_rng().gen_range(1024..0x1_0000u32) as u16; + match TcpListener::bind(SocketAddr::from((SERVER_ADDRESS, port))) { + Ok(listener) => { + break (listener, port); + } + r @ Err(_) => { + if bind_retries == 0 { + r.unwrap(); + } + bind_retries -= 1; + continue; + } + } + }; + + let join = thread::Builder::new() + .name(format!("mock_http_server {}", name)) + .spawn(move || { + // returns Err(()) if server should shut down immediately + fn check_shutdown(shutdown: &Arc<(Mutex<bool>, Condvar)>) -> Result<(), ()> { + if *shutdown.0.lock().unwrap() { + Err(()) + } else { + Ok(()) + } + } + fn sleep(shutdown: &Arc<(Mutex<bool>, Condvar)>, delay_millis: u64) -> Result<(), ()> { + let sleep_start = Instant::now(); + let sleep_end = sleep_start + Duration::from_millis(delay_millis); + + let (ref lock, ref cvar) = **shutdown; + let mut shutdown_requested = lock.lock().unwrap(); + loop { + if *shutdown_requested { + return Err(()); + } + + let before_wait = Instant::now(); + if before_wait >= sleep_end { + return Ok(()); + } + let wait_dur = sleep_end - before_wait; + shutdown_requested = cvar.wait_timeout(shutdown_requested, wait_dur).unwrap().0; + } + } + + let error_404 = Regex::new(r"^((GET)|(HEAD)) [[:print:]]*/error_404 ").unwrap(); + let error_500 = Regex::new(r"^((GET)|(HEAD)) [[:print:]]*/error_500 ").unwrap(); + + loop { + let (mut socket, _addr) = listener.accept().expect("accept should succeed"); + + socket + .set_read_timeout(Some(Duration::from_millis(10_000))) + .unwrap(); + let mut s = Vec::new(); + for b in Read::by_ref(&mut socket).bytes() { + if b.is_err() { + eprintln!("read error {:?}", b); + break; + } + let b = b.unwrap(); + s.push(b); + if s.ends_with(b"\r\n\r\n") { + break; + } + check_shutdown(&shutdown)?; + } + + // request received + + check_shutdown(&shutdown)?; + + // Special error URIs + if error_404.is_match(&s) { + sleep(&shutdown, responses.delay)?; + let result = socket + .write(b"HTTP/1.1 404 Not Found\r\n\r\n") + .and_then(|_| socket.flush()); + if let Err(e) = result { + eprintln!("error writing 404 header {:?}", e); + } + continue; + } + + if error_500.is_match(&s) { + sleep(&shutdown, responses.delay)?; + let result = socket + .write(b"HTTP/1.1 500 Internal Server Error\r\n\r\n") + .and_then(|_| socket.flush()); + if let Err(e) = result { + eprintln!("error writing 500 header {:?}", e); + } + continue; + } + + // Response with a body. + if s.starts_with(b"HEAD") || s.starts_with(b"GET") { + let result = socket + .write( + format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", + responses.body.len() + ) + .as_bytes(), + ) + .and_then(|_| socket.flush()); + if let Err(e) = result { + eprintln!("error writing header {:?}", e); + continue; + } + } + + if s.starts_with(b"GET") { + sleep(&shutdown, responses.delay)?; + let result = socket.write(&responses.body).and_then(|_| socket.flush()); + if let Err(e) = result { + eprintln!("error writing content {:?}", e); + continue; + } + } + } + }); + + MockHttpServerHandle { + port, + join: Some(join.unwrap()), + shutdown: caller_shutdown, + } +} + +// Test wrapper to ensure jobs are canceled, set up name strings +macro_rules! test { + (fn $name:ident($param:ident : &str, $tmpdir:ident : &TempDir) $body:block) => { + #[test] + fn $name() { + let $param = stringify!($name); + let $tmpdir = &Builder::new().prefix($param).tempdir().unwrap(); + + let result = panic::catch_unwind(|| $body); + + cancel_jobs(&format_job_name($param)); + + if let Err(e) = result { + panic::resume_unwind(e); + } + } + }; +} + +test! { + fn start_monitor_and_cancel(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 10_000, + }); + + let mut client = InProcessClient::new(format_job_name(name), tmp_dir.path().into()).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 10_000; + let timeout = 10_000; + + let (StartJobSuccess {guid}, mut monitor) = + client.start_job( + server.format_url(name), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // cancel in ~250ms + let _join = thread::Builder::new() + .spawn(move || { + thread::sleep(Duration::from_millis(250)); + client.cancel_job(guid).unwrap(); + }); + + let start = Instant::now(); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // ~250ms the cancel should cause an immediate disconnect (otherwise we wouldn't get + // an update until 10s when the transfer completes or the interval expires) + match monitor.get_status(timeout) { + Err(Error::NotConnected) => {}, + Ok(r) => panic!("unexpected success from get_status() {:?}", r), + Err(e) => panic!("unexpected failure from get_status() {:?}", e), + } + assert!(start.elapsed() < Duration::from_millis(9_000)); + + server.shutdown(); + } +} + +test! { + fn start_monitor_and_complete(name: &str, tmp_dir: &TempDir) { + let file_path = tmp_dir.path().join(name); + + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 500, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 100; + let timeout = 10_000; + + let (StartJobSuccess {guid}, mut monitor) = + client.start_job( + server.format_url(name), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + let start = Instant::now(); + + // Get status reports until transfer finishes (~500ms) + let mut completed = false; + loop { + match monitor.get_status(timeout) { + Err(e) => { + if completed { + break; + } else { + panic!("monitor failed before completion {:?}", e); + } + } + Ok(Ok(status)) => match BitsJobState::from(status.state) { + BitsJobState::Queued | BitsJobState::Connecting + | BitsJobState::Transferring => { + //eprintln!("{:?}", BitsJobState::from(status.state)); + //eprintln!("{:?}", status); + + // As long as there is no error, setting the timeout to 0 will not + // fail an active transfer. + client.set_no_progress_timeout(guid.clone(), 0).unwrap(); + } + BitsJobState::Transferred => { + client.complete_job(guid.clone()).unwrap(); + completed = true; + } + _ => { + panic!("{:?}", status); + } + } + Ok(Err(e)) => panic!("{:?}", e), + } + + // Timeout to prevent waiting forever + assert!(start.elapsed() < Duration::from_millis(60_000)); + } + + + // Verify file contents + let result = panic::catch_unwind(|| { + let mut file = File::open(file_path.clone()).unwrap(); + let mut v = Vec::new(); + file.read_to_end(&mut v).unwrap(); + assert_eq!(v, name.as_bytes()); + }); + + let _ = fs::remove_file(file_path); + + if let Err(e) = result { + panic::resume_unwind(e); + } + + // Save this for last to ensure the file is removed. + server.shutdown(); + } +} + +test! { + fn async_transferred_notification(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 250, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 60_000; + let timeout = 10_000; + + let (_, mut monitor) = + client.start_job( + server.format_url(name), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // Start the timer now, the initial job creation may be delayed by BITS service startup. + let start = Instant::now(); + + // First report, immediate + let report_1 = monitor.get_status(timeout).expect("should initially be ok").unwrap(); + let elapsed_to_report_1 = start.elapsed(); + + // Transferred notification should come when the job completes in ~250 ms, otherwise we + // will be stuck until timeout. + let report_2 = monitor.get_status(timeout).expect("should get status update").unwrap(); + let elapsed_to_report_2 = start.elapsed(); + assert!(elapsed_to_report_2 < Duration::from_millis(9_000)); + assert_eq!(report_2.state, BitsJobState::Transferred); + + let short_timeout = 500; + let report_3 = monitor.get_status(short_timeout); + let elapsed_to_report_3 = start.elapsed(); + + if let Ok(report_3) = report_3 { + panic!("should be disconnected\n\ + report_1 ({}.{:03}): {:?}\n\ + report_2 ({}.{:03}): {:?}\n\ + report_3 ({}.{:03}): {:?}", + elapsed_to_report_1.as_secs(), elapsed_to_report_1.subsec_millis(), report_1, + elapsed_to_report_2.as_secs(), elapsed_to_report_2.subsec_millis(), report_2, + elapsed_to_report_3.as_secs(), elapsed_to_report_3.subsec_millis(), report_3, + ); + } + + server.shutdown(); + + // job will be cancelled by macro + } +} + +test! { + fn change_interval(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 1000, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 0; + let interval = 60_000; + let timeout = 10_000; + + let (StartJobSuccess { guid }, mut monitor) = + client.start_job( + server.format_url(name), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + let start = Instant::now(); + + // reduce monitor interval in ~250ms to 500ms + let _join = thread::Builder::new() + .spawn(move || { + thread::sleep(Duration::from_millis(250)); + client.set_update_interval(guid, 500).unwrap(); + }); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // Next report should be rescheduled to 500ms by the spawned thread, otherwise no status + // until the original 10s interval. + monitor.get_status(timeout).expect("expected second status").unwrap(); + assert!(start.elapsed() < Duration::from_millis(9_000)); + assert!(start.elapsed() > Duration::from_millis(400)); + + server.shutdown(); + + // job will be cancelled by macro + } +} + +test! { + fn async_error_notification(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 100, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 60_000; + let timeout = 10_000; + + let (_, mut monitor) = + client.start_job( + server.format_url("error_404"), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // Start the timer now, the initial job creation may be delayed by BITS service startup. + let start = Instant::now(); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // Error notification should come with HEAD response in 100ms, otherwise no status until + // 10s interval or timeout. + let status = monitor.get_status(timeout).expect("should get status update").unwrap(); + assert!(start.elapsed() < Duration::from_millis(9_000)); + assert_eq!(status.state, BitsJobState::Error); + + server.shutdown(); + + // job will be cancelled by macro + } +} + +test! { + fn transient_error(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 100, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 1_000; + let timeout = 10_000; + + let (StartJobSuccess { guid }, mut monitor) = + client.start_job( + server.format_url("error_500"), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // Start the timer now, the initial job creation may be delayed by BITS service startup. + let start = Instant::now(); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // Transient error notification should come when the interval expires in ~1s. + let status = monitor.get_status(timeout).expect("should get status update").unwrap(); + assert!(start.elapsed() > Duration::from_millis(900)); + assert!(start.elapsed() < Duration::from_millis(9_000)); + assert_eq!(status.state, BitsJobState::TransientError); + + // Lower no progress timeout to 0 + let set_timeout_at = Instant::now(); + client.set_no_progress_timeout(guid, 0).unwrap(); + + // Should convert the transient error to a permanent error immediately. + let status = monitor.get_status(timeout).expect("should get status update").unwrap(); + assert!(set_timeout_at.elapsed() < Duration::from_millis(500)); + assert_eq!(status.state, BitsJobState::Error); + + server.shutdown(); + + // job will be cancelled by macro + } +} + +test! { + fn transient_to_permanent_error(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 100, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 0; + let interval = 1_000; + let timeout = 10_000; + + let (_, mut monitor) = + client.start_job( + server.format_url("error_500"), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // Start the timer now, the initial job creation may be delayed by BITS service startup. + let start = Instant::now(); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // 500 is a transient error, but with no_progress_timeout_secs = 0 it should immediately + // produce an error notification with the HEAD response in 100ms. Otherwise no status + // until 10s interval or timeout. + let status = monitor.get_status(timeout).expect("should get status update").unwrap(); + assert!(start.elapsed() < Duration::from_millis(500)); + assert_eq!(status.state, BitsJobState::Error); + + server.shutdown(); + + // job will be cancelled by macro + } +} diff --git a/toolkit/components/bitsdownload/bits_client/src/lib.rs b/toolkit/components/bitsdownload/bits_client/src/lib.rs new file mode 100644 index 0000000000..7db887bbc5 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/src/lib.rs @@ -0,0 +1,258 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +//! An interface for managing and monitoring BITS jobs. +//! +//! BITS is a Windows service for performing downloads in the background, independent from an +//! application, usually via HTTP/HTTPS. +//! +//! [`BitsClient`](enum.BitsClient.html) is the main interface, used to issue commands. +//! +//! [`BitsMonitorClient`](enum.BitsMonitorClient.html) delivers periodic status reports about a +//! job. +//! +//! Microsoft's documentation for BITS can be found at +//! <https://docs.microsoft.com/en-us/windows/desktop/Bits/background-intelligent-transfer-service-portal> + +extern crate bits; +extern crate comedy; +extern crate guid_win; +extern crate thiserror; + +pub mod bits_protocol; + +mod in_process; + +use std::ffi; + +use bits_protocol::*; +use thiserror::Error; + +pub use bits::status::{BitsErrorContext, BitsJobState, BitsJobTimes}; +pub use bits::{BitsJobProgress, BitsJobStatus, BitsProxyUsage}; +pub use bits_protocol::{JobError, JobStatus}; +pub use comedy::HResult; +pub use guid_win::Guid; + +// These errors would come from a Local Service client but are mostly unused currently. +// PipeError properly lives in the crate that deals with named pipes, but it isn't in use now. +#[derive(Clone, Debug, Eq, Error, PartialEq)] +pub enum PipeError { + #[error("Pipe is not connected")] + NotConnected, + #[error("Operation timed out")] + Timeout, + #[error("Should have written {0} bytes, wrote {1}")] + WriteCount(usize, u32), + #[error("Windows API error")] + Api(#[from] HResult), +} + +pub use PipeError as Error; + +/// A client for interacting with BITS. +/// +/// Methods on `BitsClient` return a `Result<Result<_, XyzFailure>, Error>`. The outer `Result` +/// is `Err` if there was a communication error in sending the associated command or receiving +/// its response. Currently this is always `Ok` as all clients are in-process. The inner +/// `Result` is `Err` if there was an error executing the command. +/// +/// A single `BitsClient` can be used with multiple BITS jobs simultaneously; generally a job +/// is not bound tightly to a client. +/// +/// A `BitsClient` tracks all [`BitsMonitorClient`s](enum.BitsMonitorClient.html) that it started +/// with `start_job()` or `monitor_job()`, so that the monitor can be stopped or modified. +pub enum BitsClient { + // The `InProcess` variant does all BITS calls directly. + #[doc(hidden)] + InProcess(in_process::InProcessClient), + // Space is reserved here for the LocalService variant, which will work through an external + // process running as Local Service. +} + +use BitsClient::InProcess; + +impl BitsClient { + /// Create an in-process `BitsClient`. + /// + /// `job_name` will be used when creating jobs, and this `BitsClient` can only be used to + /// manipulate jobs with that name. + /// + /// `save_path_prefix` will be prepended to the local `save_path` given to `start_job()`, it + /// must name an existing directory. + pub fn new( + job_name: ffi::OsString, + save_path_prefix: ffi::OsString, + ) -> Result<BitsClient, Error> { + Ok(InProcess(in_process::InProcessClient::new( + job_name, + save_path_prefix, + )?)) + } + + /// Start a job to download a single file at `url` to local path `save_path` (relative to the + /// `save_path_prefix` given when constructing the `BitsClient`). + /// + /// `save_path_prefix` combined with `save_path` must name a file (existing or not) in an + /// existing directory, which must be under the directory named by `save_path_prefix`. + /// + /// `proxy_usage` determines what proxy will be used. + /// + /// When a successful result `Ok(result)` is returned, `result.0.guid` is the id for the + /// new job, and `result.1` is a monitor client that can be polled for periodic updates, + /// returning a result approximately once per `monitor_interval_millis` milliseconds. + pub fn start_job( + &mut self, + url: ffi::OsString, + save_path: ffi::OsString, + proxy_usage: BitsProxyUsage, + no_progress_timeout_secs: u32, + monitor_interval_millis: u32, + ) -> Result<Result<(StartJobSuccess, BitsMonitorClient), StartJobFailure>, Error> { + match self { + InProcess(client) => Ok(client + .start_job( + url, + save_path, + proxy_usage, + no_progress_timeout_secs, + monitor_interval_millis, + ) + .map(|(success, monitor)| (success, BitsMonitorClient::InProcess(monitor)))), + } + } + + /// Start monitoring the job with id `guid` approximately once per `monitor_interval_millis` + /// milliseconds. + /// + /// The returned `Ok(monitor)` is a monitor client to be polled for periodic updates. + /// + /// There can only be one ongoing `BitsMonitorClient` for each job associated with a given + /// `BitsClient`. If a monitor client already exists for the specified job, it will be stopped. + pub fn monitor_job( + &mut self, + guid: Guid, + interval_millis: u32, + ) -> Result<Result<BitsMonitorClient, MonitorJobFailure>, Error> { + match self { + InProcess(client) => Ok(client + .monitor_job(guid, interval_millis) + .map(BitsMonitorClient::InProcess)), + } + } + + /// Suspend job `guid`. + pub fn suspend_job(&mut self, guid: Guid) -> Result<Result<(), SuspendJobFailure>, Error> { + match self { + InProcess(client) => Ok(client.suspend_job(guid)), + } + } + + /// Resume job `guid`. + pub fn resume_job(&mut self, guid: Guid) -> Result<Result<(), ResumeJobFailure>, Error> { + match self { + InProcess(client) => Ok(client.resume_job(guid)), + } + } + + /// Set the priority of job `guid`. + /// + /// `foreground == true` will set the priority to `BG_JOB_PRIORITY_FOREGROUND`, + /// `false` will use the default `BG_JOB_PRIORITY_NORMAL`. + /// See the Microsoft documentation for `BG_JOB_PRIORITY` for details. + /// + /// A job created by `start_job()` will be foreground priority, by default. + pub fn set_job_priority( + &mut self, + guid: Guid, + foreground: bool, + ) -> Result<Result<(), SetJobPriorityFailure>, Error> { + match self { + InProcess(client) => Ok(client.set_job_priority(guid, foreground)), + } + } + + /// Set the "no progress timeout" of job `guid`. + pub fn set_no_progress_timeout( + &mut self, + guid: Guid, + timeout_secs: u32, + ) -> Result<Result<(), SetNoProgressTimeoutFailure>, Error> { + match self { + InProcess(client) => Ok(client.set_no_progress_timeout(guid, timeout_secs)), + } + } + + /// Change the update interval for an ongoing monitor of job `guid`. + pub fn set_update_interval( + &mut self, + guid: Guid, + interval_millis: u32, + ) -> Result<Result<(), SetUpdateIntervalFailure>, Error> { + match self { + InProcess(client) => Ok(client.set_update_interval(guid, interval_millis)), + } + } + + /// Stop any ongoing monitor for job `guid`. + pub fn stop_update( + &mut self, + guid: Guid, + ) -> Result<Result<(), SetUpdateIntervalFailure>, Error> { + match self { + InProcess(client) => Ok(client.stop_update(guid)), + } + } + + /// Complete the job `guid`. + /// + /// This also stops any ongoing monitor for the job. + pub fn complete_job(&mut self, guid: Guid) -> Result<Result<(), CompleteJobFailure>, Error> { + match self { + InProcess(client) => Ok(client.complete_job(guid)), + } + } + + /// Cancel the job `guid`. + /// + /// This also stops any ongoing monitor for the job. + pub fn cancel_job(&mut self, guid: Guid) -> Result<Result<(), CancelJobFailure>, Error> { + match self { + InProcess(client) => Ok(client.cancel_job(guid)), + } + } +} + +/// The client side of a monitor for a BITS job. +/// +/// It is intended to be used by calling `get_status` in a loop to receive notifications about +/// the status of a job. Because `get_status` blocks, it is recommended to run this loop on its +/// own thread. +pub enum BitsMonitorClient { + InProcess(in_process::InProcessMonitor), +} + +impl BitsMonitorClient { + /// `get_status` will return a result approximately every `monitor_interval_millis` + /// milliseconds, but in case a result isn't available within `timeout_millis` milliseconds + /// this will return `Err(Error::Timeout)`. Any `Err` returned, including timeout, indicates + /// that the monitor has been stopped; the `BitsMonitorClient` should then be discarded. + /// + /// As with methods on `BitsClient`, `BitsMonitorClient::get_status()` has an inner `Result` + /// type which indicates an error returned from the server. Any `Err` here also indicates that + /// the monitor has stopped after yielding the result. + /// + /// The first time `get_status` is called it will return a status without any delay. + /// + /// If there is an error or the transfer completes, a result may be available sooner than + /// the monitor interval. + pub fn get_status( + &mut self, + timeout_millis: u32, + ) -> Result<Result<JobStatus, HResultMessage>, Error> { + match self { + BitsMonitorClient::InProcess(client) => client.get_status(timeout_millis), + } + } +} |