From 26a029d407be480d791972afb5975cf62c9360a6 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 02:47:55 +0200 Subject: Adding upstream version 124.0.1. Signed-off-by: Daniel Baumann --- .../bitsdownload/bits_client/src/bits_protocol.rs | 380 +++++++++++++ .../bitsdownload/bits_client/src/in_process/mod.rs | 504 +++++++++++++++++ .../bits_client/src/in_process/tests.rs | 628 +++++++++++++++++++++ .../components/bitsdownload/bits_client/src/lib.rs | 258 +++++++++ 4 files changed, 1770 insertions(+) create mode 100644 toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs create mode 100644 toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs create mode 100644 toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs create mode 100644 toolkit/components/bitsdownload/bits_client/src/lib.rs (limited to 'toolkit/components/bitsdownload/bits_client/src') diff --git a/toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs b/toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs new file mode 100644 index 0000000000..6f19f5ef23 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/src/bits_protocol.rs @@ -0,0 +1,380 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +//! Command, response, and status types. + +use std::error::Error as StdError; +use std::ffi::OsString; +use std::fmt; +use std::result; + +use guid_win::Guid; +use thiserror::Error; + +use super::{BitsErrorContext, BitsJobProgress, BitsJobState, BitsJobTimes, BitsProxyUsage}; + +type HRESULT = i32; + +/// An HRESULT with a descriptive message +#[derive(Clone, Debug)] +pub struct HResultMessage { + pub hr: HRESULT, + pub message: String, +} + +impl fmt::Display for HResultMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> { + self.message.fmt(f) + } +} + +impl StdError for HResultMessage {} + +/// Commands which can be sent to the server. +/// +/// This is currently unused as the out-of-process Local Service server is not finished. +#[doc(hidden)] +#[derive(Clone, Debug)] +pub enum Command { + StartJob(StartJobCommand), + MonitorJob(MonitorJobCommand), + SuspendJob(SuspendJobCommand), + ResumeJob(ResumeJobCommand), + SetJobPriority(SetJobPriorityCommand), + SetNoProgressTimeout(SetNoProgressTimeoutCommand), + SetUpdateInterval(SetUpdateIntervalCommand), + CompleteJob(CompleteJobCommand), + CancelJob(CancelJobCommand), +} + +/// Combine a [`Command`](enum.Command.html) with its success and failure result types. +#[doc(hidden)] +pub trait CommandType { + type Success; + type Failure: StdError; + fn wrap(command: Self) -> Command; +} + +// Start Job +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct StartJobCommand { + pub url: OsString, + pub save_path: OsString, + pub proxy_usage: BitsProxyUsage, + pub no_progress_timeout_secs: u32, + pub monitor: Option, +} + +impl CommandType for StartJobCommand { + type Success = StartJobSuccess; + type Failure = StartJobFailure; + fn wrap(cmd: Self) -> Command { + Command::StartJob(cmd) + } +} + +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct MonitorConfig { + pub pipe_name: OsString, + pub interval_millis: u32, +} + +#[derive(Clone, Debug)] +pub struct StartJobSuccess { + pub guid: Guid, +} + +#[derive(Clone, Debug, Error)] +pub enum StartJobFailure { + #[error("Argument validation failed: {0}")] + ArgumentValidation(String), + #[error("Create job: {0}")] + Create(HResultMessage), + #[error("Add file to job: {0}")] + AddFile(HResultMessage), + #[error("Apply settings to job: {0}")] + ApplySettings(HResultMessage), + #[error("Resume job: {0}")] + Resume(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Monitor Job +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct MonitorJobCommand { + pub guid: Guid, + pub monitor: MonitorConfig, +} + +impl CommandType for MonitorJobCommand { + type Success = (); + type Failure = MonitorJobFailure; + fn wrap(cmd: Self) -> Command { + Command::MonitorJob(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum MonitorJobFailure { + #[error("Argument validation failed: {0}")] + ArgumentValidation(String), + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Suspend Job +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct SuspendJobCommand { + pub guid: Guid, +} + +impl CommandType for SuspendJobCommand { + type Success = (); + type Failure = SuspendJobFailure; + fn wrap(cmd: Self) -> Command { + Command::SuspendJob(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum SuspendJobFailure { + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Suspend job: {0}")] + SuspendJob(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Resume Job +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct ResumeJobCommand { + pub guid: Guid, +} + +impl CommandType for ResumeJobCommand { + type Success = (); + type Failure = ResumeJobFailure; + fn wrap(cmd: Self) -> Command { + Command::ResumeJob(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum ResumeJobFailure { + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Resume job: {0}")] + ResumeJob(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Set Job Priority +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct SetJobPriorityCommand { + pub guid: Guid, + pub foreground: bool, +} + +impl CommandType for SetJobPriorityCommand { + type Success = (); + type Failure = SetJobPriorityFailure; + fn wrap(cmd: Self) -> Command { + Command::SetJobPriority(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum SetJobPriorityFailure { + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Apply settings to job: {0}")] + ApplySettings(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Set No Progress Timeout +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct SetNoProgressTimeoutCommand { + pub guid: Guid, + pub timeout_secs: u32, +} + +impl CommandType for SetNoProgressTimeoutCommand { + type Success = (); + type Failure = SetNoProgressTimeoutFailure; + fn wrap(cmd: Self) -> Command { + Command::SetNoProgressTimeout(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum SetNoProgressTimeoutFailure { + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Apply settings to job: {0}")] + ApplySettings(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Set Update Interval +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct SetUpdateIntervalCommand { + pub guid: Guid, + pub interval_millis: u32, +} + +impl CommandType for SetUpdateIntervalCommand { + type Success = (); + type Failure = SetUpdateIntervalFailure; + fn wrap(cmd: Self) -> Command { + Command::SetUpdateInterval(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum SetUpdateIntervalFailure { + #[error("Argument validation: {0}")] + ArgumentValidation(String), + #[error("Monitor not found")] + NotFound, + #[error("Other failure: {0}")] + Other(String), +} + +// Complete Job +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct CompleteJobCommand { + pub guid: Guid, +} + +impl CommandType for CompleteJobCommand { + type Success = (); + type Failure = CompleteJobFailure; + fn wrap(cmd: Self) -> Command { + Command::CompleteJob(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum CompleteJobFailure { + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Complete job: {0}")] + CompleteJob(HResultMessage), + #[error("Job only partially completed")] + PartialComplete, + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +// Cancel Job +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct CancelJobCommand { + pub guid: Guid, +} + +impl CommandType for CancelJobCommand { + type Success = (); + type Failure = CancelJobFailure; + fn wrap(cmd: Self) -> Command { + Command::CancelJob(cmd) + } +} + +#[derive(Clone, Debug, Error)] +pub enum CancelJobFailure { + #[error("Job not found")] + NotFound, + #[error("Get job: {0}")] + GetJob(HResultMessage), + #[error("Cancel job: {0}")] + CancelJob(HResultMessage), + #[error("Connect to BackgroundCopyManager: {0}")] + ConnectBcm(HResultMessage), + #[error("BITS error: {0}")] + OtherBITS(HResultMessage), + #[error("Other failure: {0}")] + Other(String), +} + +/// Job status report +/// +/// This includes a URL which updates with redirect but is otherwise the same as +/// `bits::status::BitsJobStatus`. +#[derive(Clone, Debug)] +pub struct JobStatus { + pub state: BitsJobState, + pub progress: BitsJobProgress, + pub error_count: u32, + pub error: Option, + pub times: BitsJobTimes, + /// None means same as last time + pub url: Option, +} + +/// Job error report +#[derive(Clone, Debug, Error)] +#[error("Job error in context {context_str}: {error}")] +pub struct JobError { + pub context: BitsErrorContext, + pub context_str: String, + pub error: HResultMessage, +} diff --git a/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs b/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs new file mode 100644 index 0000000000..c7f6869167 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs @@ -0,0 +1,504 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +use std::cmp; +use std::collections::{hash_map, HashMap}; +use std::ffi; +use std::path; +use std::sync::{Arc, Condvar, Mutex, Weak}; +use std::time::{Duration, Instant}; + +use bits::{ + BackgroundCopyManager, BitsJob, BitsJobPriority, BitsProxyUsage, BG_S_PARTIAL_COMPLETE, E_FAIL, +}; +use guid_win::Guid; + +use bits_protocol::*; + +use super::Error; + +// This is a macro in order to use the NotFound and GetJob variants from whatever enum is in scope. +macro_rules! get_job { + ($bcm:ident, $guid:expr, $name:expr) => {{ + $bcm = BackgroundCopyManager::connect().map_err(|e| { + ConnectBcm(HResultMessage { + hr: e.code(), + message: e.to_string(), + }) + })?; + $bcm.find_job_by_guid_and_name($guid, $name) + .map_err(|e| GetJob($crate::in_process::format_error(&$bcm, e)))? + .ok_or(NotFound)? + }}; +} + +fn format_error(bcm: &BackgroundCopyManager, error: comedy::HResult) -> HResultMessage { + let bits_description = bcm.get_error_description(error.code()).ok(); + + HResultMessage { + hr: error.code(), + message: if let Some(desc) = bits_description { + format!("{}: {}", error, desc) + } else { + format!("{}", error) + }, + } +} + +// The in-process client uses direct BITS calls via the `bits` crate. +// See the corresponding functions in BitsClient. +pub struct InProcessClient { + job_name: ffi::OsString, + save_path_prefix: path::PathBuf, + monitors: HashMap, +} + +impl InProcessClient { + pub fn new( + job_name: ffi::OsString, + save_path_prefix: ffi::OsString, + ) -> Result { + Ok(InProcessClient { + job_name, + save_path_prefix: path::PathBuf::from(save_path_prefix), + monitors: HashMap::new(), + }) + } + + pub fn start_job( + &mut self, + url: ffi::OsString, + save_path: ffi::OsString, + proxy_usage: BitsProxyUsage, + no_progress_timeout_secs: u32, + monitor_interval_millis: u32, + ) -> Result<(StartJobSuccess, InProcessMonitor), StartJobFailure> { + use StartJobFailure::*; + + let full_path = self.save_path_prefix.join(save_path); + + // Verify that `full_path` is under the directory called `save_path_prefix`. + { + let canonical_prefix = self.save_path_prefix.canonicalize().map_err(|e| { + ArgumentValidation(format!("save_path_prefix.canonicalize(): {}", e)) + })?; + // Full path minus file name, canonicalize() fails with nonexistent files, but the + // parent directory ought to exist. + let canonical_full_path = full_path + .parent() + .ok_or_else(|| ArgumentValidation("full_path.parent(): None".into()))? + .canonicalize() + .map_err(|e| { + ArgumentValidation(format!("full_path.parent().canonicalize(): {}", e)) + })?; + + if !canonical_full_path.starts_with(&canonical_prefix) { + return Err(ArgumentValidation(format!( + "{:?} is not within {:?}", + canonical_full_path, canonical_prefix + ))); + } + } + + // TODO: Should the job be explicitly cleaned up if this fn can't return success? + // If the job is dropped before `AddFile` succeeds, I think it automatically gets + // deleted from the queue. There is only one fallible call after that (`Resume`). + + let bcm = BackgroundCopyManager::connect().map_err(|e| { + ConnectBcm(HResultMessage { + hr: e.code(), + message: e.to_string(), + }) + })?; + let mut job = bcm + .create_job(&self.job_name) + .map_err(|e| Create(format_error(&bcm, e)))?; + + let guid = job.guid().map_err(|e| OtherBITS(format_error(&bcm, e)))?; + + (|| { + job.set_proxy_usage(proxy_usage)?; + job.set_minimum_retry_delay(60)?; + job.set_no_progress_timeout(no_progress_timeout_secs)?; + job.set_redirect_report()?; + + job.set_priority(BitsJobPriority::Foreground)?; + + Ok(()) + })() + .map_err(|e| ApplySettings(format_error(&bcm, e)))?; + + let (client, control) = InProcessMonitor::new(&mut job, monitor_interval_millis) + .map_err(|e| OtherBITS(format_error(&bcm, e)))?; + + job.add_file(&url, &full_path.into_os_string()) + .map_err(|e| AddFile(format_error(&bcm, e)))?; + + job.resume().map_err(|e| Resume(format_error(&bcm, e)))?; + + self.monitors.insert(guid.clone(), control); + + Ok((StartJobSuccess { guid }, client)) + } + + pub fn monitor_job( + &mut self, + guid: Guid, + interval_millis: u32, + ) -> Result { + use MonitorJobFailure::*; + + // Stop any preexisting monitor for the same guid. + let _ = self.stop_update(guid.clone()); + + let bcm; + let (client, control) = + InProcessMonitor::new(&mut get_job!(bcm, &guid, &self.job_name), interval_millis) + .map_err(|e| OtherBITS(format_error(&bcm, e)))?; + + self.monitors.insert(guid, control); + + Ok(client) + } + + pub fn suspend_job(&mut self, guid: Guid) -> Result<(), SuspendJobFailure> { + use SuspendJobFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .suspend() + .map_err(|e| SuspendJob(format_error(&bcm, e)))?; + + Ok(()) + } + + pub fn resume_job(&mut self, guid: Guid) -> Result<(), ResumeJobFailure> { + use ResumeJobFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .resume() + .map_err(|e| ResumeJob(format_error(&bcm, e)))?; + + Ok(()) + } + + pub fn set_job_priority( + &mut self, + guid: Guid, + foreground: bool, + ) -> Result<(), SetJobPriorityFailure> { + use SetJobPriorityFailure::*; + + let priority = if foreground { + BitsJobPriority::Foreground + } else { + BitsJobPriority::Normal + }; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .set_priority(priority) + .map_err(|e| ApplySettings(format_error(&bcm, e)))?; + + Ok(()) + } + + pub fn set_no_progress_timeout( + &mut self, + guid: Guid, + timeout_secs: u32, + ) -> Result<(), SetNoProgressTimeoutFailure> { + use SetNoProgressTimeoutFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .set_no_progress_timeout(timeout_secs) + .map_err(|e| ApplySettings(format_error(&bcm, e)))?; + + Ok(()) + } + + fn get_monitor_control_sender(&mut self, guid: Guid) -> Option> { + if let hash_map::Entry::Occupied(occ) = self.monitors.entry(guid) { + if let Some(sender) = occ.get().0.upgrade() { + Some(sender) + } else { + // Remove dangling Weak + occ.remove_entry(); + None + } + } else { + None + } + } + + pub fn set_update_interval( + &mut self, + guid: Guid, + interval_millis: u32, + ) -> Result<(), SetUpdateIntervalFailure> { + use SetUpdateIntervalFailure::*; + + if let Some(sender) = self.get_monitor_control_sender(guid) { + let mut s = sender.1.lock().unwrap(); + s.interval_millis = interval_millis; + sender.0.notify_all(); + Ok(()) + } else { + Err(NotFound) + } + } + + pub fn stop_update(&mut self, guid: Guid) -> Result<(), SetUpdateIntervalFailure> { + use SetUpdateIntervalFailure::*; + + if let Some(sender) = self.get_monitor_control_sender(guid) { + sender.1.lock().unwrap().shutdown = true; + sender.0.notify_all(); + Ok(()) + } else { + Err(NotFound) + } + } + + pub fn complete_job(&mut self, guid: Guid) -> Result<(), CompleteJobFailure> { + use CompleteJobFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .complete() + .map_err(|e| CompleteJob(format_error(&bcm, e))) + .and_then(|hr| { + if hr == BG_S_PARTIAL_COMPLETE as i32 { + Err(PartialComplete) + } else { + Ok(()) + } + })?; + + let _ = self.stop_update(guid); + + Ok(()) + } + + pub fn cancel_job(&mut self, guid: Guid) -> Result<(), CancelJobFailure> { + use CancelJobFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .cancel() + .map_err(|e| CancelJob(format_error(&bcm, e)))?; + + let _ = self.stop_update(guid); + + Ok(()) + } +} + +// InProcessMonitor can be used on any thread, and `ControlPair` can be synchronously modified to +// control a blocked `get_status` call from another thread. +pub struct InProcessMonitor { + vars: Arc, + guid: Guid, + last_status_time: Option, + last_url: Option, +} + +// The `Condvar` is notified when `InProcessMonitorVars` changes. +type ControlPair = (Condvar, Mutex); +struct InProcessMonitorControl(Weak); + +// RefUnwindSafe is not impl'd for Condvar but likely should be, +// see https://github.com/rust-lang/rust/issues/54768 +impl std::panic::RefUnwindSafe for InProcessMonitorControl {} + +struct InProcessMonitorVars { + interval_millis: u32, + notified: bool, + shutdown: bool, +} + +impl InProcessMonitor { + fn new( + job: &mut BitsJob, + interval_millis: u32, + ) -> Result<(InProcessMonitor, InProcessMonitorControl), comedy::HResult> { + let guid = job.guid()?; + + let vars = Arc::new(( + Condvar::new(), + Mutex::new(InProcessMonitorVars { + interval_millis, + notified: false, + shutdown: false, + }), + )); + + let transferred_control = InProcessMonitorControl(Arc::downgrade(&vars)); + let transferred_cb = Box::new(move || { + if let Some(control) = transferred_control.0.upgrade() { + if let Ok(mut vars) = control.1.lock() { + vars.notified = true; + control.0.notify_all(); + return Ok(()); + } + } + Err(E_FAIL) + }); + + let error_control = InProcessMonitorControl(Arc::downgrade(&vars)); + let error_cb = Box::new(move || { + if let Some(control) = error_control.0.upgrade() { + if let Ok(mut vars) = control.1.lock() { + vars.notified = true; + control.0.notify_all(); + return Ok(()); + } + } + Err(E_FAIL) + }); + + // Note: These callbacks are never explicitly cleared. They will be freed when the + // job is deleted from BITS, and they will be cleared if an attempt is made to call them + // when they are no longer valid (e.g. after the process exits). This is done mostly for + // simplicity and should be safe. + + job.register_callbacks(Some(transferred_cb), Some(error_cb), None)?; + + let control = InProcessMonitorControl(Arc::downgrade(&vars)); + + let monitor = InProcessMonitor { + guid, + vars, + last_status_time: None, + last_url: None, + }; + + Ok((monitor, control)) + } + + pub fn get_status( + &mut self, + timeout_millis: u32, + ) -> Result, Error> { + let timeout = Duration::from_millis(u64::from(timeout_millis)); + + let started = Instant::now(); + let timeout_end = started + timeout; + + { + let mut s = self.vars.1.lock().unwrap(); + loop { + let wait_start = Instant::now(); + + if s.shutdown { + // Disconnected, immediately return error. + // Note: Shutdown takes priority over simultaneous notification. + return Err(Error::NotConnected); + } + + if wait_start >= timeout_end { + // Timed out, immediately return timeout error. + // This should not normally happen with the in-process monitor, but the + // monitor interval could be longer than the timeout. + s.shutdown = true; + return Err(Error::Timeout); + } + + // Get the interval every pass through the loop, in case it has changed. + let interval = Duration::from_millis(u64::from(s.interval_millis)); + + let wait_until = self + .last_status_time + .map(|last_status_time| cmp::min(last_status_time + interval, timeout_end)); + + if s.notified { + // Notified, exit loop to get status. + s.notified = false; + break; + } + + if wait_until.is_none() { + // First status report, no waiting, exit loop to get status. + break; + } + + let wait_until = wait_until.unwrap(); + + if wait_until <= wait_start { + // No time left to wait. This can't be due to timeout because + // `wait_until <= wait_start < timeout_end`. + // Status report due, exit loop to get status. + break; + } + + // Wait. + // Do not attempt to recover from poisoned Mutex. + s = self + .vars + .0 + .wait_timeout(s, wait_until - wait_start) + .unwrap() + .0; + + // Mutex re-acquired, loop. + } + } + + // No error yet, start getting status now. + self.last_status_time = Some(Instant::now()); + + let bcm = match BackgroundCopyManager::connect() { + Ok(bcm) => bcm, + Err(e) => { + // On any error, disconnect. + self.vars.1.lock().unwrap().shutdown = true; + + // Errors below can use the BCM to do `format_error()`, but this one just gets the + // basic `comedy::HResult` treatment. + return Ok(Err(HResultMessage { + hr: e.code(), + message: format!("{}", e), + })); + } + }; + + Ok((|| { + let mut job = bcm.get_job_by_guid(&self.guid)?; + + let status = job.get_status()?; + let url = job.get_first_file()?.get_remote_name()?; + + Ok(JobStatus { + state: status.state, + progress: status.progress, + error_count: status.error_count, + error: status.error.map(|e| JobError { + context: e.context, + context_str: e.context_str, + error: HResultMessage { + hr: e.error, + message: e.error_str, + }, + }), + times: status.times, + url: if self.last_url.is_some() && *self.last_url.as_ref().unwrap() == url { + None + } else { + self.last_url = Some(url); + self.last_url.clone() + }, + }) + })() + .map_err(|e| { + // On any error, disconnect. + self.vars.1.lock().unwrap().shutdown = true; + format_error(&bcm, e) + })) + } +} + +#[cfg(test)] +mod tests; diff --git a/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs b/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs new file mode 100644 index 0000000000..5545f6bd74 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs @@ -0,0 +1,628 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +// These are full integration tests that use the BITS service. + +// TODO +// It may make sense to restrict how many tests can run at once. BITS is only supposed to support +// four simultaneous notifications per user, it is not impossible that this test suite could +// exceed that. + +#![cfg(test)] +extern crate bits; +extern crate lazy_static; +extern crate rand; +extern crate regex; +extern crate tempfile; + +use std::ffi::{OsStr, OsString}; +use std::fs::{self, File}; +use std::io::{Read, Write}; +use std::net::{SocketAddr, TcpListener, TcpStream}; +use std::panic; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +use self::{ + bits::BackgroundCopyManager, + lazy_static::lazy_static, + rand::{thread_rng, Rng}, + regex::bytes::Regex, + tempfile::{Builder, TempDir}, +}; +use super::{ + super::{BitsJobState, Error}, + BitsProxyUsage, InProcessClient, StartJobSuccess, +}; + +static SERVER_ADDRESS: [u8; 4] = [127, 0, 0, 1]; + +lazy_static! { + static ref TEST_MUTEX: Mutex<()> = Mutex::new(()); +} + +fn format_job_name(name: &str) -> OsString { + format!("InProcessClient Test {}", name).into() +} + +fn format_dir_prefix(tmp_dir: &TempDir) -> OsString { + let mut dir = tmp_dir.path().to_path_buf().into_os_string(); + dir.push("\\"); + dir +} + +fn cancel_jobs(name: &OsStr) { + BackgroundCopyManager::connect() + .unwrap() + .cancel_jobs_by_name(name) + .unwrap(); +} + +struct HttpServerResponses { + body: Box<[u8]>, + delay: u64, + //error: Box<[u8]>, +} + +struct MockHttpServerHandle { + port: u16, + join: Option>>, + shutdown: Arc<(Mutex, Condvar)>, +} + +impl MockHttpServerHandle { + fn shutdown(&mut self) { + if self.join.is_none() { + return; + } + + { + let &(ref lock, ref cvar) = &*self.shutdown; + let mut shutdown = lock.lock().unwrap(); + + if !*shutdown { + *shutdown = true; + cvar.notify_all(); + } + } + // Wake up the server from `accept()`. Will fail if the server wasn't listening. + let _ = TcpStream::connect_timeout( + &(SERVER_ADDRESS, self.port).into(), + Duration::from_millis(10_000), + ); + + match self.join.take().unwrap().join() { + Ok(_) => {} + Err(p) => panic::resume_unwind(p), + } + } + + fn format_url(&self, name: &str) -> OsString { + format!( + "http://{}/{}", + SocketAddr::from((SERVER_ADDRESS, self.port)), + name + ) + .into() + } +} + +fn mock_http_server(name: &'static str, responses: HttpServerResponses) -> MockHttpServerHandle { + let mut bind_retries = 10; + let shutdown = Arc::new((Mutex::new(false), Condvar::new())); + let caller_shutdown = shutdown.clone(); + + let (listener, port) = loop { + let port = thread_rng().gen_range(1024..0x1_0000u32) as u16; + match TcpListener::bind(SocketAddr::from((SERVER_ADDRESS, port))) { + Ok(listener) => { + break (listener, port); + } + r @ Err(_) => { + if bind_retries == 0 { + r.unwrap(); + } + bind_retries -= 1; + continue; + } + } + }; + + let join = thread::Builder::new() + .name(format!("mock_http_server {}", name)) + .spawn(move || { + // returns Err(()) if server should shut down immediately + fn check_shutdown(shutdown: &Arc<(Mutex, Condvar)>) -> Result<(), ()> { + if *shutdown.0.lock().unwrap() { + Err(()) + } else { + Ok(()) + } + } + fn sleep(shutdown: &Arc<(Mutex, Condvar)>, delay_millis: u64) -> Result<(), ()> { + let sleep_start = Instant::now(); + let sleep_end = sleep_start + Duration::from_millis(delay_millis); + + let (ref lock, ref cvar) = **shutdown; + let mut shutdown_requested = lock.lock().unwrap(); + loop { + if *shutdown_requested { + return Err(()); + } + + let before_wait = Instant::now(); + if before_wait >= sleep_end { + return Ok(()); + } + let wait_dur = sleep_end - before_wait; + shutdown_requested = cvar.wait_timeout(shutdown_requested, wait_dur).unwrap().0; + } + } + + let error_404 = Regex::new(r"^((GET)|(HEAD)) [[:print:]]*/error_404 ").unwrap(); + let error_500 = Regex::new(r"^((GET)|(HEAD)) [[:print:]]*/error_500 ").unwrap(); + + loop { + let (mut socket, _addr) = listener.accept().expect("accept should succeed"); + + socket + .set_read_timeout(Some(Duration::from_millis(10_000))) + .unwrap(); + let mut s = Vec::new(); + for b in Read::by_ref(&mut socket).bytes() { + if b.is_err() { + eprintln!("read error {:?}", b); + break; + } + let b = b.unwrap(); + s.push(b); + if s.ends_with(b"\r\n\r\n") { + break; + } + check_shutdown(&shutdown)?; + } + + // request received + + check_shutdown(&shutdown)?; + + // Special error URIs + if error_404.is_match(&s) { + sleep(&shutdown, responses.delay)?; + let result = socket + .write(b"HTTP/1.1 404 Not Found\r\n\r\n") + .and_then(|_| socket.flush()); + if let Err(e) = result { + eprintln!("error writing 404 header {:?}", e); + } + continue; + } + + if error_500.is_match(&s) { + sleep(&shutdown, responses.delay)?; + let result = socket + .write(b"HTTP/1.1 500 Internal Server Error\r\n\r\n") + .and_then(|_| socket.flush()); + if let Err(e) = result { + eprintln!("error writing 500 header {:?}", e); + } + continue; + } + + // Response with a body. + if s.starts_with(b"HEAD") || s.starts_with(b"GET") { + let result = socket + .write( + format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", + responses.body.len() + ) + .as_bytes(), + ) + .and_then(|_| socket.flush()); + if let Err(e) = result { + eprintln!("error writing header {:?}", e); + continue; + } + } + + if s.starts_with(b"GET") { + sleep(&shutdown, responses.delay)?; + let result = socket.write(&responses.body).and_then(|_| socket.flush()); + if let Err(e) = result { + eprintln!("error writing content {:?}", e); + continue; + } + } + } + }); + + MockHttpServerHandle { + port, + join: Some(join.unwrap()), + shutdown: caller_shutdown, + } +} + +// Test wrapper to ensure jobs are canceled, set up name strings +macro_rules! test { + (fn $name:ident($param:ident : &str, $tmpdir:ident : &TempDir) $body:block) => { + #[test] + fn $name() { + let $param = stringify!($name); + let $tmpdir = &Builder::new().prefix($param).tempdir().unwrap(); + + let result = panic::catch_unwind(|| $body); + + cancel_jobs(&format_job_name($param)); + + if let Err(e) = result { + panic::resume_unwind(e); + } + } + }; +} + +test! { + fn start_monitor_and_cancel(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 10_000, + }); + + let mut client = InProcessClient::new(format_job_name(name), tmp_dir.path().into()).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 10_000; + let timeout = 10_000; + + let (StartJobSuccess {guid}, mut monitor) = + client.start_job( + server.format_url(name), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // cancel in ~250ms + let _join = thread::Builder::new() + .spawn(move || { + thread::sleep(Duration::from_millis(250)); + client.cancel_job(guid).unwrap(); + }); + + let start = Instant::now(); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // ~250ms the cancel should cause an immediate disconnect (otherwise we wouldn't get + // an update until 10s when the transfer completes or the interval expires) + match monitor.get_status(timeout) { + Err(Error::NotConnected) => {}, + Ok(r) => panic!("unexpected success from get_status() {:?}", r), + Err(e) => panic!("unexpected failure from get_status() {:?}", e), + } + assert!(start.elapsed() < Duration::from_millis(9_000)); + + server.shutdown(); + } +} + +test! { + fn start_monitor_and_complete(name: &str, tmp_dir: &TempDir) { + let file_path = tmp_dir.path().join(name); + + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 500, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 100; + let timeout = 10_000; + + let (StartJobSuccess {guid}, mut monitor) = + client.start_job( + server.format_url(name), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + let start = Instant::now(); + + // Get status reports until transfer finishes (~500ms) + let mut completed = false; + loop { + match monitor.get_status(timeout) { + Err(e) => { + if completed { + break; + } else { + panic!("monitor failed before completion {:?}", e); + } + } + Ok(Ok(status)) => match BitsJobState::from(status.state) { + BitsJobState::Queued | BitsJobState::Connecting + | BitsJobState::Transferring => { + //eprintln!("{:?}", BitsJobState::from(status.state)); + //eprintln!("{:?}", status); + + // As long as there is no error, setting the timeout to 0 will not + // fail an active transfer. + client.set_no_progress_timeout(guid.clone(), 0).unwrap(); + } + BitsJobState::Transferred => { + client.complete_job(guid.clone()).unwrap(); + completed = true; + } + _ => { + panic!("{:?}", status); + } + } + Ok(Err(e)) => panic!("{:?}", e), + } + + // Timeout to prevent waiting forever + assert!(start.elapsed() < Duration::from_millis(60_000)); + } + + + // Verify file contents + let result = panic::catch_unwind(|| { + let mut file = File::open(file_path.clone()).unwrap(); + let mut v = Vec::new(); + file.read_to_end(&mut v).unwrap(); + assert_eq!(v, name.as_bytes()); + }); + + let _ = fs::remove_file(file_path); + + if let Err(e) = result { + panic::resume_unwind(e); + } + + // Save this for last to ensure the file is removed. + server.shutdown(); + } +} + +test! { + fn async_transferred_notification(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 250, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 60_000; + let timeout = 10_000; + + let (_, mut monitor) = + client.start_job( + server.format_url(name), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // Start the timer now, the initial job creation may be delayed by BITS service startup. + let start = Instant::now(); + + // First report, immediate + let report_1 = monitor.get_status(timeout).expect("should initially be ok").unwrap(); + let elapsed_to_report_1 = start.elapsed(); + + // Transferred notification should come when the job completes in ~250 ms, otherwise we + // will be stuck until timeout. + let report_2 = monitor.get_status(timeout).expect("should get status update").unwrap(); + let elapsed_to_report_2 = start.elapsed(); + assert!(elapsed_to_report_2 < Duration::from_millis(9_000)); + assert_eq!(report_2.state, BitsJobState::Transferred); + + let short_timeout = 500; + let report_3 = monitor.get_status(short_timeout); + let elapsed_to_report_3 = start.elapsed(); + + if let Ok(report_3) = report_3 { + panic!("should be disconnected\n\ + report_1 ({}.{:03}): {:?}\n\ + report_2 ({}.{:03}): {:?}\n\ + report_3 ({}.{:03}): {:?}", + elapsed_to_report_1.as_secs(), elapsed_to_report_1.subsec_millis(), report_1, + elapsed_to_report_2.as_secs(), elapsed_to_report_2.subsec_millis(), report_2, + elapsed_to_report_3.as_secs(), elapsed_to_report_3.subsec_millis(), report_3, + ); + } + + server.shutdown(); + + // job will be cancelled by macro + } +} + +test! { + fn change_interval(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 1000, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 0; + let interval = 60_000; + let timeout = 10_000; + + let (StartJobSuccess { guid }, mut monitor) = + client.start_job( + server.format_url(name), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + let start = Instant::now(); + + // reduce monitor interval in ~250ms to 500ms + let _join = thread::Builder::new() + .spawn(move || { + thread::sleep(Duration::from_millis(250)); + client.set_update_interval(guid, 500).unwrap(); + }); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // Next report should be rescheduled to 500ms by the spawned thread, otherwise no status + // until the original 10s interval. + monitor.get_status(timeout).expect("expected second status").unwrap(); + assert!(start.elapsed() < Duration::from_millis(9_000)); + assert!(start.elapsed() > Duration::from_millis(400)); + + server.shutdown(); + + // job will be cancelled by macro + } +} + +test! { + fn async_error_notification(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 100, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 60_000; + let timeout = 10_000; + + let (_, mut monitor) = + client.start_job( + server.format_url("error_404"), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // Start the timer now, the initial job creation may be delayed by BITS service startup. + let start = Instant::now(); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // Error notification should come with HEAD response in 100ms, otherwise no status until + // 10s interval or timeout. + let status = monitor.get_status(timeout).expect("should get status update").unwrap(); + assert!(start.elapsed() < Duration::from_millis(9_000)); + assert_eq!(status.state, BitsJobState::Error); + + server.shutdown(); + + // job will be cancelled by macro + } +} + +test! { + fn transient_error(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 100, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 1_000; + let timeout = 10_000; + + let (StartJobSuccess { guid }, mut monitor) = + client.start_job( + server.format_url("error_500"), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // Start the timer now, the initial job creation may be delayed by BITS service startup. + let start = Instant::now(); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // Transient error notification should come when the interval expires in ~1s. + let status = monitor.get_status(timeout).expect("should get status update").unwrap(); + assert!(start.elapsed() > Duration::from_millis(900)); + assert!(start.elapsed() < Duration::from_millis(9_000)); + assert_eq!(status.state, BitsJobState::TransientError); + + // Lower no progress timeout to 0 + let set_timeout_at = Instant::now(); + client.set_no_progress_timeout(guid, 0).unwrap(); + + // Should convert the transient error to a permanent error immediately. + let status = monitor.get_status(timeout).expect("should get status update").unwrap(); + assert!(set_timeout_at.elapsed() < Duration::from_millis(500)); + assert_eq!(status.state, BitsJobState::Error); + + server.shutdown(); + + // job will be cancelled by macro + } +} + +test! { + fn transient_to_permanent_error(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 100, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 0; + let interval = 1_000; + let timeout = 10_000; + + let (_, mut monitor) = + client.start_job( + server.format_url("error_500"), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // Start the timer now, the initial job creation may be delayed by BITS service startup. + let start = Instant::now(); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // 500 is a transient error, but with no_progress_timeout_secs = 0 it should immediately + // produce an error notification with the HEAD response in 100ms. Otherwise no status + // until 10s interval or timeout. + let status = monitor.get_status(timeout).expect("should get status update").unwrap(); + assert!(start.elapsed() < Duration::from_millis(500)); + assert_eq!(status.state, BitsJobState::Error); + + server.shutdown(); + + // job will be cancelled by macro + } +} diff --git a/toolkit/components/bitsdownload/bits_client/src/lib.rs b/toolkit/components/bitsdownload/bits_client/src/lib.rs new file mode 100644 index 0000000000..7db887bbc5 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/src/lib.rs @@ -0,0 +1,258 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +//! An interface for managing and monitoring BITS jobs. +//! +//! BITS is a Windows service for performing downloads in the background, independent from an +//! application, usually via HTTP/HTTPS. +//! +//! [`BitsClient`](enum.BitsClient.html) is the main interface, used to issue commands. +//! +//! [`BitsMonitorClient`](enum.BitsMonitorClient.html) delivers periodic status reports about a +//! job. +//! +//! Microsoft's documentation for BITS can be found at +//! + +extern crate bits; +extern crate comedy; +extern crate guid_win; +extern crate thiserror; + +pub mod bits_protocol; + +mod in_process; + +use std::ffi; + +use bits_protocol::*; +use thiserror::Error; + +pub use bits::status::{BitsErrorContext, BitsJobState, BitsJobTimes}; +pub use bits::{BitsJobProgress, BitsJobStatus, BitsProxyUsage}; +pub use bits_protocol::{JobError, JobStatus}; +pub use comedy::HResult; +pub use guid_win::Guid; + +// These errors would come from a Local Service client but are mostly unused currently. +// PipeError properly lives in the crate that deals with named pipes, but it isn't in use now. +#[derive(Clone, Debug, Eq, Error, PartialEq)] +pub enum PipeError { + #[error("Pipe is not connected")] + NotConnected, + #[error("Operation timed out")] + Timeout, + #[error("Should have written {0} bytes, wrote {1}")] + WriteCount(usize, u32), + #[error("Windows API error")] + Api(#[from] HResult), +} + +pub use PipeError as Error; + +/// A client for interacting with BITS. +/// +/// Methods on `BitsClient` return a `Result, Error>`. The outer `Result` +/// is `Err` if there was a communication error in sending the associated command or receiving +/// its response. Currently this is always `Ok` as all clients are in-process. The inner +/// `Result` is `Err` if there was an error executing the command. +/// +/// A single `BitsClient` can be used with multiple BITS jobs simultaneously; generally a job +/// is not bound tightly to a client. +/// +/// A `BitsClient` tracks all [`BitsMonitorClient`s](enum.BitsMonitorClient.html) that it started +/// with `start_job()` or `monitor_job()`, so that the monitor can be stopped or modified. +pub enum BitsClient { + // The `InProcess` variant does all BITS calls directly. + #[doc(hidden)] + InProcess(in_process::InProcessClient), + // Space is reserved here for the LocalService variant, which will work through an external + // process running as Local Service. +} + +use BitsClient::InProcess; + +impl BitsClient { + /// Create an in-process `BitsClient`. + /// + /// `job_name` will be used when creating jobs, and this `BitsClient` can only be used to + /// manipulate jobs with that name. + /// + /// `save_path_prefix` will be prepended to the local `save_path` given to `start_job()`, it + /// must name an existing directory. + pub fn new( + job_name: ffi::OsString, + save_path_prefix: ffi::OsString, + ) -> Result { + Ok(InProcess(in_process::InProcessClient::new( + job_name, + save_path_prefix, + )?)) + } + + /// Start a job to download a single file at `url` to local path `save_path` (relative to the + /// `save_path_prefix` given when constructing the `BitsClient`). + /// + /// `save_path_prefix` combined with `save_path` must name a file (existing or not) in an + /// existing directory, which must be under the directory named by `save_path_prefix`. + /// + /// `proxy_usage` determines what proxy will be used. + /// + /// When a successful result `Ok(result)` is returned, `result.0.guid` is the id for the + /// new job, and `result.1` is a monitor client that can be polled for periodic updates, + /// returning a result approximately once per `monitor_interval_millis` milliseconds. + pub fn start_job( + &mut self, + url: ffi::OsString, + save_path: ffi::OsString, + proxy_usage: BitsProxyUsage, + no_progress_timeout_secs: u32, + monitor_interval_millis: u32, + ) -> Result, Error> { + match self { + InProcess(client) => Ok(client + .start_job( + url, + save_path, + proxy_usage, + no_progress_timeout_secs, + monitor_interval_millis, + ) + .map(|(success, monitor)| (success, BitsMonitorClient::InProcess(monitor)))), + } + } + + /// Start monitoring the job with id `guid` approximately once per `monitor_interval_millis` + /// milliseconds. + /// + /// The returned `Ok(monitor)` is a monitor client to be polled for periodic updates. + /// + /// There can only be one ongoing `BitsMonitorClient` for each job associated with a given + /// `BitsClient`. If a monitor client already exists for the specified job, it will be stopped. + pub fn monitor_job( + &mut self, + guid: Guid, + interval_millis: u32, + ) -> Result, Error> { + match self { + InProcess(client) => Ok(client + .monitor_job(guid, interval_millis) + .map(BitsMonitorClient::InProcess)), + } + } + + /// Suspend job `guid`. + pub fn suspend_job(&mut self, guid: Guid) -> Result, Error> { + match self { + InProcess(client) => Ok(client.suspend_job(guid)), + } + } + + /// Resume job `guid`. + pub fn resume_job(&mut self, guid: Guid) -> Result, Error> { + match self { + InProcess(client) => Ok(client.resume_job(guid)), + } + } + + /// Set the priority of job `guid`. + /// + /// `foreground == true` will set the priority to `BG_JOB_PRIORITY_FOREGROUND`, + /// `false` will use the default `BG_JOB_PRIORITY_NORMAL`. + /// See the Microsoft documentation for `BG_JOB_PRIORITY` for details. + /// + /// A job created by `start_job()` will be foreground priority, by default. + pub fn set_job_priority( + &mut self, + guid: Guid, + foreground: bool, + ) -> Result, Error> { + match self { + InProcess(client) => Ok(client.set_job_priority(guid, foreground)), + } + } + + /// Set the "no progress timeout" of job `guid`. + pub fn set_no_progress_timeout( + &mut self, + guid: Guid, + timeout_secs: u32, + ) -> Result, Error> { + match self { + InProcess(client) => Ok(client.set_no_progress_timeout(guid, timeout_secs)), + } + } + + /// Change the update interval for an ongoing monitor of job `guid`. + pub fn set_update_interval( + &mut self, + guid: Guid, + interval_millis: u32, + ) -> Result, Error> { + match self { + InProcess(client) => Ok(client.set_update_interval(guid, interval_millis)), + } + } + + /// Stop any ongoing monitor for job `guid`. + pub fn stop_update( + &mut self, + guid: Guid, + ) -> Result, Error> { + match self { + InProcess(client) => Ok(client.stop_update(guid)), + } + } + + /// Complete the job `guid`. + /// + /// This also stops any ongoing monitor for the job. + pub fn complete_job(&mut self, guid: Guid) -> Result, Error> { + match self { + InProcess(client) => Ok(client.complete_job(guid)), + } + } + + /// Cancel the job `guid`. + /// + /// This also stops any ongoing monitor for the job. + pub fn cancel_job(&mut self, guid: Guid) -> Result, Error> { + match self { + InProcess(client) => Ok(client.cancel_job(guid)), + } + } +} + +/// The client side of a monitor for a BITS job. +/// +/// It is intended to be used by calling `get_status` in a loop to receive notifications about +/// the status of a job. Because `get_status` blocks, it is recommended to run this loop on its +/// own thread. +pub enum BitsMonitorClient { + InProcess(in_process::InProcessMonitor), +} + +impl BitsMonitorClient { + /// `get_status` will return a result approximately every `monitor_interval_millis` + /// milliseconds, but in case a result isn't available within `timeout_millis` milliseconds + /// this will return `Err(Error::Timeout)`. Any `Err` returned, including timeout, indicates + /// that the monitor has been stopped; the `BitsMonitorClient` should then be discarded. + /// + /// As with methods on `BitsClient`, `BitsMonitorClient::get_status()` has an inner `Result` + /// type which indicates an error returned from the server. Any `Err` here also indicates that + /// the monitor has stopped after yielding the result. + /// + /// The first time `get_status` is called it will return a status without any delay. + /// + /// If there is an error or the transfer completes, a result may be available sooner than + /// the monitor interval. + pub fn get_status( + &mut self, + timeout_millis: u32, + ) -> Result, Error> { + match self { + BitsMonitorClient::InProcess(client) => client.get_status(timeout_millis), + } + } +} -- cgit v1.2.3