summaryrefslogtreecommitdiffstats
path: root/toolkit/components/bitsdownload/bits_client/src/in_process
diff options
context:
space:
mode:
Diffstat (limited to 'toolkit/components/bitsdownload/bits_client/src/in_process')
-rw-r--r--toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs504
-rw-r--r--toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs628
2 files changed, 1132 insertions, 0 deletions
diff --git a/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs b/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs
new file mode 100644
index 0000000000..c7f6869167
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs
@@ -0,0 +1,504 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+use std::cmp;
+use std::collections::{hash_map, HashMap};
+use std::ffi;
+use std::path;
+use std::sync::{Arc, Condvar, Mutex, Weak};
+use std::time::{Duration, Instant};
+
+use bits::{
+ BackgroundCopyManager, BitsJob, BitsJobPriority, BitsProxyUsage, BG_S_PARTIAL_COMPLETE, E_FAIL,
+};
+use guid_win::Guid;
+
+use bits_protocol::*;
+
+use super::Error;
+
+// This is a macro in order to use the NotFound and GetJob variants from whatever enum is in scope.
+macro_rules! get_job {
+ ($bcm:ident, $guid:expr, $name:expr) => {{
+ $bcm = BackgroundCopyManager::connect().map_err(|e| {
+ ConnectBcm(HResultMessage {
+ hr: e.code(),
+ message: e.to_string(),
+ })
+ })?;
+ $bcm.find_job_by_guid_and_name($guid, $name)
+ .map_err(|e| GetJob($crate::in_process::format_error(&$bcm, e)))?
+ .ok_or(NotFound)?
+ }};
+}
+
+fn format_error(bcm: &BackgroundCopyManager, error: comedy::HResult) -> HResultMessage {
+ let bits_description = bcm.get_error_description(error.code()).ok();
+
+ HResultMessage {
+ hr: error.code(),
+ message: if let Some(desc) = bits_description {
+ format!("{}: {}", error, desc)
+ } else {
+ format!("{}", error)
+ },
+ }
+}
+
+// The in-process client uses direct BITS calls via the `bits` crate.
+// See the corresponding functions in BitsClient.
+pub struct InProcessClient {
+ job_name: ffi::OsString,
+ save_path_prefix: path::PathBuf,
+ monitors: HashMap<Guid, InProcessMonitorControl>,
+}
+
+impl InProcessClient {
+ pub fn new(
+ job_name: ffi::OsString,
+ save_path_prefix: ffi::OsString,
+ ) -> Result<InProcessClient, Error> {
+ Ok(InProcessClient {
+ job_name,
+ save_path_prefix: path::PathBuf::from(save_path_prefix),
+ monitors: HashMap::new(),
+ })
+ }
+
+ pub fn start_job(
+ &mut self,
+ url: ffi::OsString,
+ save_path: ffi::OsString,
+ proxy_usage: BitsProxyUsage,
+ no_progress_timeout_secs: u32,
+ monitor_interval_millis: u32,
+ ) -> Result<(StartJobSuccess, InProcessMonitor), StartJobFailure> {
+ use StartJobFailure::*;
+
+ let full_path = self.save_path_prefix.join(save_path);
+
+ // Verify that `full_path` is under the directory called `save_path_prefix`.
+ {
+ let canonical_prefix = self.save_path_prefix.canonicalize().map_err(|e| {
+ ArgumentValidation(format!("save_path_prefix.canonicalize(): {}", e))
+ })?;
+ // Full path minus file name, canonicalize() fails with nonexistent files, but the
+ // parent directory ought to exist.
+ let canonical_full_path = full_path
+ .parent()
+ .ok_or_else(|| ArgumentValidation("full_path.parent(): None".into()))?
+ .canonicalize()
+ .map_err(|e| {
+ ArgumentValidation(format!("full_path.parent().canonicalize(): {}", e))
+ })?;
+
+ if !canonical_full_path.starts_with(&canonical_prefix) {
+ return Err(ArgumentValidation(format!(
+ "{:?} is not within {:?}",
+ canonical_full_path, canonical_prefix
+ )));
+ }
+ }
+
+ // TODO: Should the job be explicitly cleaned up if this fn can't return success?
+ // If the job is dropped before `AddFile` succeeds, I think it automatically gets
+ // deleted from the queue. There is only one fallible call after that (`Resume`).
+
+ let bcm = BackgroundCopyManager::connect().map_err(|e| {
+ ConnectBcm(HResultMessage {
+ hr: e.code(),
+ message: e.to_string(),
+ })
+ })?;
+ let mut job = bcm
+ .create_job(&self.job_name)
+ .map_err(|e| Create(format_error(&bcm, e)))?;
+
+ let guid = job.guid().map_err(|e| OtherBITS(format_error(&bcm, e)))?;
+
+ (|| {
+ job.set_proxy_usage(proxy_usage)?;
+ job.set_minimum_retry_delay(60)?;
+ job.set_no_progress_timeout(no_progress_timeout_secs)?;
+ job.set_redirect_report()?;
+
+ job.set_priority(BitsJobPriority::Foreground)?;
+
+ Ok(())
+ })()
+ .map_err(|e| ApplySettings(format_error(&bcm, e)))?;
+
+ let (client, control) = InProcessMonitor::new(&mut job, monitor_interval_millis)
+ .map_err(|e| OtherBITS(format_error(&bcm, e)))?;
+
+ job.add_file(&url, &full_path.into_os_string())
+ .map_err(|e| AddFile(format_error(&bcm, e)))?;
+
+ job.resume().map_err(|e| Resume(format_error(&bcm, e)))?;
+
+ self.monitors.insert(guid.clone(), control);
+
+ Ok((StartJobSuccess { guid }, client))
+ }
+
+ pub fn monitor_job(
+ &mut self,
+ guid: Guid,
+ interval_millis: u32,
+ ) -> Result<InProcessMonitor, MonitorJobFailure> {
+ use MonitorJobFailure::*;
+
+ // Stop any preexisting monitor for the same guid.
+ let _ = self.stop_update(guid.clone());
+
+ let bcm;
+ let (client, control) =
+ InProcessMonitor::new(&mut get_job!(bcm, &guid, &self.job_name), interval_millis)
+ .map_err(|e| OtherBITS(format_error(&bcm, e)))?;
+
+ self.monitors.insert(guid, control);
+
+ Ok(client)
+ }
+
+ pub fn suspend_job(&mut self, guid: Guid) -> Result<(), SuspendJobFailure> {
+ use SuspendJobFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .suspend()
+ .map_err(|e| SuspendJob(format_error(&bcm, e)))?;
+
+ Ok(())
+ }
+
+ pub fn resume_job(&mut self, guid: Guid) -> Result<(), ResumeJobFailure> {
+ use ResumeJobFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .resume()
+ .map_err(|e| ResumeJob(format_error(&bcm, e)))?;
+
+ Ok(())
+ }
+
+ pub fn set_job_priority(
+ &mut self,
+ guid: Guid,
+ foreground: bool,
+ ) -> Result<(), SetJobPriorityFailure> {
+ use SetJobPriorityFailure::*;
+
+ let priority = if foreground {
+ BitsJobPriority::Foreground
+ } else {
+ BitsJobPriority::Normal
+ };
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .set_priority(priority)
+ .map_err(|e| ApplySettings(format_error(&bcm, e)))?;
+
+ Ok(())
+ }
+
+ pub fn set_no_progress_timeout(
+ &mut self,
+ guid: Guid,
+ timeout_secs: u32,
+ ) -> Result<(), SetNoProgressTimeoutFailure> {
+ use SetNoProgressTimeoutFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .set_no_progress_timeout(timeout_secs)
+ .map_err(|e| ApplySettings(format_error(&bcm, e)))?;
+
+ Ok(())
+ }
+
+ fn get_monitor_control_sender(&mut self, guid: Guid) -> Option<Arc<ControlPair>> {
+ if let hash_map::Entry::Occupied(occ) = self.monitors.entry(guid) {
+ if let Some(sender) = occ.get().0.upgrade() {
+ Some(sender)
+ } else {
+ // Remove dangling Weak
+ occ.remove_entry();
+ None
+ }
+ } else {
+ None
+ }
+ }
+
+ pub fn set_update_interval(
+ &mut self,
+ guid: Guid,
+ interval_millis: u32,
+ ) -> Result<(), SetUpdateIntervalFailure> {
+ use SetUpdateIntervalFailure::*;
+
+ if let Some(sender) = self.get_monitor_control_sender(guid) {
+ let mut s = sender.1.lock().unwrap();
+ s.interval_millis = interval_millis;
+ sender.0.notify_all();
+ Ok(())
+ } else {
+ Err(NotFound)
+ }
+ }
+
+ pub fn stop_update(&mut self, guid: Guid) -> Result<(), SetUpdateIntervalFailure> {
+ use SetUpdateIntervalFailure::*;
+
+ if let Some(sender) = self.get_monitor_control_sender(guid) {
+ sender.1.lock().unwrap().shutdown = true;
+ sender.0.notify_all();
+ Ok(())
+ } else {
+ Err(NotFound)
+ }
+ }
+
+ pub fn complete_job(&mut self, guid: Guid) -> Result<(), CompleteJobFailure> {
+ use CompleteJobFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .complete()
+ .map_err(|e| CompleteJob(format_error(&bcm, e)))
+ .and_then(|hr| {
+ if hr == BG_S_PARTIAL_COMPLETE as i32 {
+ Err(PartialComplete)
+ } else {
+ Ok(())
+ }
+ })?;
+
+ let _ = self.stop_update(guid);
+
+ Ok(())
+ }
+
+ pub fn cancel_job(&mut self, guid: Guid) -> Result<(), CancelJobFailure> {
+ use CancelJobFailure::*;
+
+ let bcm;
+ get_job!(bcm, &guid, &self.job_name)
+ .cancel()
+ .map_err(|e| CancelJob(format_error(&bcm, e)))?;
+
+ let _ = self.stop_update(guid);
+
+ Ok(())
+ }
+}
+
+// InProcessMonitor can be used on any thread, and `ControlPair` can be synchronously modified to
+// control a blocked `get_status` call from another thread.
+pub struct InProcessMonitor {
+ vars: Arc<ControlPair>,
+ guid: Guid,
+ last_status_time: Option<Instant>,
+ last_url: Option<ffi::OsString>,
+}
+
+// The `Condvar` is notified when `InProcessMonitorVars` changes.
+type ControlPair = (Condvar, Mutex<InProcessMonitorVars>);
+struct InProcessMonitorControl(Weak<ControlPair>);
+
+// RefUnwindSafe is not impl'd for Condvar but likely should be,
+// see https://github.com/rust-lang/rust/issues/54768
+impl std::panic::RefUnwindSafe for InProcessMonitorControl {}
+
+struct InProcessMonitorVars {
+ interval_millis: u32,
+ notified: bool,
+ shutdown: bool,
+}
+
+impl InProcessMonitor {
+ fn new(
+ job: &mut BitsJob,
+ interval_millis: u32,
+ ) -> Result<(InProcessMonitor, InProcessMonitorControl), comedy::HResult> {
+ let guid = job.guid()?;
+
+ let vars = Arc::new((
+ Condvar::new(),
+ Mutex::new(InProcessMonitorVars {
+ interval_millis,
+ notified: false,
+ shutdown: false,
+ }),
+ ));
+
+ let transferred_control = InProcessMonitorControl(Arc::downgrade(&vars));
+ let transferred_cb = Box::new(move || {
+ if let Some(control) = transferred_control.0.upgrade() {
+ if let Ok(mut vars) = control.1.lock() {
+ vars.notified = true;
+ control.0.notify_all();
+ return Ok(());
+ }
+ }
+ Err(E_FAIL)
+ });
+
+ let error_control = InProcessMonitorControl(Arc::downgrade(&vars));
+ let error_cb = Box::new(move || {
+ if let Some(control) = error_control.0.upgrade() {
+ if let Ok(mut vars) = control.1.lock() {
+ vars.notified = true;
+ control.0.notify_all();
+ return Ok(());
+ }
+ }
+ Err(E_FAIL)
+ });
+
+ // Note: These callbacks are never explicitly cleared. They will be freed when the
+ // job is deleted from BITS, and they will be cleared if an attempt is made to call them
+ // when they are no longer valid (e.g. after the process exits). This is done mostly for
+ // simplicity and should be safe.
+
+ job.register_callbacks(Some(transferred_cb), Some(error_cb), None)?;
+
+ let control = InProcessMonitorControl(Arc::downgrade(&vars));
+
+ let monitor = InProcessMonitor {
+ guid,
+ vars,
+ last_status_time: None,
+ last_url: None,
+ };
+
+ Ok((monitor, control))
+ }
+
+ pub fn get_status(
+ &mut self,
+ timeout_millis: u32,
+ ) -> Result<Result<JobStatus, HResultMessage>, Error> {
+ let timeout = Duration::from_millis(u64::from(timeout_millis));
+
+ let started = Instant::now();
+ let timeout_end = started + timeout;
+
+ {
+ let mut s = self.vars.1.lock().unwrap();
+ loop {
+ let wait_start = Instant::now();
+
+ if s.shutdown {
+ // Disconnected, immediately return error.
+ // Note: Shutdown takes priority over simultaneous notification.
+ return Err(Error::NotConnected);
+ }
+
+ if wait_start >= timeout_end {
+ // Timed out, immediately return timeout error.
+ // This should not normally happen with the in-process monitor, but the
+ // monitor interval could be longer than the timeout.
+ s.shutdown = true;
+ return Err(Error::Timeout);
+ }
+
+ // Get the interval every pass through the loop, in case it has changed.
+ let interval = Duration::from_millis(u64::from(s.interval_millis));
+
+ let wait_until = self
+ .last_status_time
+ .map(|last_status_time| cmp::min(last_status_time + interval, timeout_end));
+
+ if s.notified {
+ // Notified, exit loop to get status.
+ s.notified = false;
+ break;
+ }
+
+ if wait_until.is_none() {
+ // First status report, no waiting, exit loop to get status.
+ break;
+ }
+
+ let wait_until = wait_until.unwrap();
+
+ if wait_until <= wait_start {
+ // No time left to wait. This can't be due to timeout because
+ // `wait_until <= wait_start < timeout_end`.
+ // Status report due, exit loop to get status.
+ break;
+ }
+
+ // Wait.
+ // Do not attempt to recover from poisoned Mutex.
+ s = self
+ .vars
+ .0
+ .wait_timeout(s, wait_until - wait_start)
+ .unwrap()
+ .0;
+
+ // Mutex re-acquired, loop.
+ }
+ }
+
+ // No error yet, start getting status now.
+ self.last_status_time = Some(Instant::now());
+
+ let bcm = match BackgroundCopyManager::connect() {
+ Ok(bcm) => bcm,
+ Err(e) => {
+ // On any error, disconnect.
+ self.vars.1.lock().unwrap().shutdown = true;
+
+ // Errors below can use the BCM to do `format_error()`, but this one just gets the
+ // basic `comedy::HResult` treatment.
+ return Ok(Err(HResultMessage {
+ hr: e.code(),
+ message: format!("{}", e),
+ }));
+ }
+ };
+
+ Ok((|| {
+ let mut job = bcm.get_job_by_guid(&self.guid)?;
+
+ let status = job.get_status()?;
+ let url = job.get_first_file()?.get_remote_name()?;
+
+ Ok(JobStatus {
+ state: status.state,
+ progress: status.progress,
+ error_count: status.error_count,
+ error: status.error.map(|e| JobError {
+ context: e.context,
+ context_str: e.context_str,
+ error: HResultMessage {
+ hr: e.error,
+ message: e.error_str,
+ },
+ }),
+ times: status.times,
+ url: if self.last_url.is_some() && *self.last_url.as_ref().unwrap() == url {
+ None
+ } else {
+ self.last_url = Some(url);
+ self.last_url.clone()
+ },
+ })
+ })()
+ .map_err(|e| {
+ // On any error, disconnect.
+ self.vars.1.lock().unwrap().shutdown = true;
+ format_error(&bcm, e)
+ }))
+ }
+}
+
+#[cfg(test)]
+mod tests;
diff --git a/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs b/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs
new file mode 100644
index 0000000000..5545f6bd74
--- /dev/null
+++ b/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs
@@ -0,0 +1,628 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
+
+// These are full integration tests that use the BITS service.
+
+// TODO
+// It may make sense to restrict how many tests can run at once. BITS is only supposed to support
+// four simultaneous notifications per user, it is not impossible that this test suite could
+// exceed that.
+
+#![cfg(test)]
+extern crate bits;
+extern crate lazy_static;
+extern crate rand;
+extern crate regex;
+extern crate tempfile;
+
+use std::ffi::{OsStr, OsString};
+use std::fs::{self, File};
+use std::io::{Read, Write};
+use std::net::{SocketAddr, TcpListener, TcpStream};
+use std::panic;
+use std::sync::{Arc, Condvar, Mutex};
+use std::thread::{self, JoinHandle};
+use std::time::{Duration, Instant};
+
+use self::{
+ bits::BackgroundCopyManager,
+ lazy_static::lazy_static,
+ rand::{thread_rng, Rng},
+ regex::bytes::Regex,
+ tempfile::{Builder, TempDir},
+};
+use super::{
+ super::{BitsJobState, Error},
+ BitsProxyUsage, InProcessClient, StartJobSuccess,
+};
+
+static SERVER_ADDRESS: [u8; 4] = [127, 0, 0, 1];
+
+lazy_static! {
+ static ref TEST_MUTEX: Mutex<()> = Mutex::new(());
+}
+
+fn format_job_name(name: &str) -> OsString {
+ format!("InProcessClient Test {}", name).into()
+}
+
+fn format_dir_prefix(tmp_dir: &TempDir) -> OsString {
+ let mut dir = tmp_dir.path().to_path_buf().into_os_string();
+ dir.push("\\");
+ dir
+}
+
+fn cancel_jobs(name: &OsStr) {
+ BackgroundCopyManager::connect()
+ .unwrap()
+ .cancel_jobs_by_name(name)
+ .unwrap();
+}
+
+struct HttpServerResponses {
+ body: Box<[u8]>,
+ delay: u64,
+ //error: Box<[u8]>,
+}
+
+struct MockHttpServerHandle {
+ port: u16,
+ join: Option<JoinHandle<Result<(), ()>>>,
+ shutdown: Arc<(Mutex<bool>, Condvar)>,
+}
+
+impl MockHttpServerHandle {
+ fn shutdown(&mut self) {
+ if self.join.is_none() {
+ return;
+ }
+
+ {
+ let &(ref lock, ref cvar) = &*self.shutdown;
+ let mut shutdown = lock.lock().unwrap();
+
+ if !*shutdown {
+ *shutdown = true;
+ cvar.notify_all();
+ }
+ }
+ // Wake up the server from `accept()`. Will fail if the server wasn't listening.
+ let _ = TcpStream::connect_timeout(
+ &(SERVER_ADDRESS, self.port).into(),
+ Duration::from_millis(10_000),
+ );
+
+ match self.join.take().unwrap().join() {
+ Ok(_) => {}
+ Err(p) => panic::resume_unwind(p),
+ }
+ }
+
+ fn format_url(&self, name: &str) -> OsString {
+ format!(
+ "http://{}/{}",
+ SocketAddr::from((SERVER_ADDRESS, self.port)),
+ name
+ )
+ .into()
+ }
+}
+
+fn mock_http_server(name: &'static str, responses: HttpServerResponses) -> MockHttpServerHandle {
+ let mut bind_retries = 10;
+ let shutdown = Arc::new((Mutex::new(false), Condvar::new()));
+ let caller_shutdown = shutdown.clone();
+
+ let (listener, port) = loop {
+ let port = thread_rng().gen_range(1024..0x1_0000u32) as u16;
+ match TcpListener::bind(SocketAddr::from((SERVER_ADDRESS, port))) {
+ Ok(listener) => {
+ break (listener, port);
+ }
+ r @ Err(_) => {
+ if bind_retries == 0 {
+ r.unwrap();
+ }
+ bind_retries -= 1;
+ continue;
+ }
+ }
+ };
+
+ let join = thread::Builder::new()
+ .name(format!("mock_http_server {}", name))
+ .spawn(move || {
+ // returns Err(()) if server should shut down immediately
+ fn check_shutdown(shutdown: &Arc<(Mutex<bool>, Condvar)>) -> Result<(), ()> {
+ if *shutdown.0.lock().unwrap() {
+ Err(())
+ } else {
+ Ok(())
+ }
+ }
+ fn sleep(shutdown: &Arc<(Mutex<bool>, Condvar)>, delay_millis: u64) -> Result<(), ()> {
+ let sleep_start = Instant::now();
+ let sleep_end = sleep_start + Duration::from_millis(delay_millis);
+
+ let (ref lock, ref cvar) = **shutdown;
+ let mut shutdown_requested = lock.lock().unwrap();
+ loop {
+ if *shutdown_requested {
+ return Err(());
+ }
+
+ let before_wait = Instant::now();
+ if before_wait >= sleep_end {
+ return Ok(());
+ }
+ let wait_dur = sleep_end - before_wait;
+ shutdown_requested = cvar.wait_timeout(shutdown_requested, wait_dur).unwrap().0;
+ }
+ }
+
+ let error_404 = Regex::new(r"^((GET)|(HEAD)) [[:print:]]*/error_404 ").unwrap();
+ let error_500 = Regex::new(r"^((GET)|(HEAD)) [[:print:]]*/error_500 ").unwrap();
+
+ loop {
+ let (mut socket, _addr) = listener.accept().expect("accept should succeed");
+
+ socket
+ .set_read_timeout(Some(Duration::from_millis(10_000)))
+ .unwrap();
+ let mut s = Vec::new();
+ for b in Read::by_ref(&mut socket).bytes() {
+ if b.is_err() {
+ eprintln!("read error {:?}", b);
+ break;
+ }
+ let b = b.unwrap();
+ s.push(b);
+ if s.ends_with(b"\r\n\r\n") {
+ break;
+ }
+ check_shutdown(&shutdown)?;
+ }
+
+ // request received
+
+ check_shutdown(&shutdown)?;
+
+ // Special error URIs
+ if error_404.is_match(&s) {
+ sleep(&shutdown, responses.delay)?;
+ let result = socket
+ .write(b"HTTP/1.1 404 Not Found\r\n\r\n")
+ .and_then(|_| socket.flush());
+ if let Err(e) = result {
+ eprintln!("error writing 404 header {:?}", e);
+ }
+ continue;
+ }
+
+ if error_500.is_match(&s) {
+ sleep(&shutdown, responses.delay)?;
+ let result = socket
+ .write(b"HTTP/1.1 500 Internal Server Error\r\n\r\n")
+ .and_then(|_| socket.flush());
+ if let Err(e) = result {
+ eprintln!("error writing 500 header {:?}", e);
+ }
+ continue;
+ }
+
+ // Response with a body.
+ if s.starts_with(b"HEAD") || s.starts_with(b"GET") {
+ let result = socket
+ .write(
+ format!(
+ "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n",
+ responses.body.len()
+ )
+ .as_bytes(),
+ )
+ .and_then(|_| socket.flush());
+ if let Err(e) = result {
+ eprintln!("error writing header {:?}", e);
+ continue;
+ }
+ }
+
+ if s.starts_with(b"GET") {
+ sleep(&shutdown, responses.delay)?;
+ let result = socket.write(&responses.body).and_then(|_| socket.flush());
+ if let Err(e) = result {
+ eprintln!("error writing content {:?}", e);
+ continue;
+ }
+ }
+ }
+ });
+
+ MockHttpServerHandle {
+ port,
+ join: Some(join.unwrap()),
+ shutdown: caller_shutdown,
+ }
+}
+
+// Test wrapper to ensure jobs are canceled, set up name strings
+macro_rules! test {
+ (fn $name:ident($param:ident : &str, $tmpdir:ident : &TempDir) $body:block) => {
+ #[test]
+ fn $name() {
+ let $param = stringify!($name);
+ let $tmpdir = &Builder::new().prefix($param).tempdir().unwrap();
+
+ let result = panic::catch_unwind(|| $body);
+
+ cancel_jobs(&format_job_name($param));
+
+ if let Err(e) = result {
+ panic::resume_unwind(e);
+ }
+ }
+ };
+}
+
+test! {
+ fn start_monitor_and_cancel(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 10_000,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), tmp_dir.path().into()).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 10_000;
+ let timeout = 10_000;
+
+ let (StartJobSuccess {guid}, mut monitor) =
+ client.start_job(
+ server.format_url(name),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // cancel in ~250ms
+ let _join = thread::Builder::new()
+ .spawn(move || {
+ thread::sleep(Duration::from_millis(250));
+ client.cancel_job(guid).unwrap();
+ });
+
+ let start = Instant::now();
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // ~250ms the cancel should cause an immediate disconnect (otherwise we wouldn't get
+ // an update until 10s when the transfer completes or the interval expires)
+ match monitor.get_status(timeout) {
+ Err(Error::NotConnected) => {},
+ Ok(r) => panic!("unexpected success from get_status() {:?}", r),
+ Err(e) => panic!("unexpected failure from get_status() {:?}", e),
+ }
+ assert!(start.elapsed() < Duration::from_millis(9_000));
+
+ server.shutdown();
+ }
+}
+
+test! {
+ fn start_monitor_and_complete(name: &str, tmp_dir: &TempDir) {
+ let file_path = tmp_dir.path().join(name);
+
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 500,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 100;
+ let timeout = 10_000;
+
+ let (StartJobSuccess {guid}, mut monitor) =
+ client.start_job(
+ server.format_url(name),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ let start = Instant::now();
+
+ // Get status reports until transfer finishes (~500ms)
+ let mut completed = false;
+ loop {
+ match monitor.get_status(timeout) {
+ Err(e) => {
+ if completed {
+ break;
+ } else {
+ panic!("monitor failed before completion {:?}", e);
+ }
+ }
+ Ok(Ok(status)) => match BitsJobState::from(status.state) {
+ BitsJobState::Queued | BitsJobState::Connecting
+ | BitsJobState::Transferring => {
+ //eprintln!("{:?}", BitsJobState::from(status.state));
+ //eprintln!("{:?}", status);
+
+ // As long as there is no error, setting the timeout to 0 will not
+ // fail an active transfer.
+ client.set_no_progress_timeout(guid.clone(), 0).unwrap();
+ }
+ BitsJobState::Transferred => {
+ client.complete_job(guid.clone()).unwrap();
+ completed = true;
+ }
+ _ => {
+ panic!("{:?}", status);
+ }
+ }
+ Ok(Err(e)) => panic!("{:?}", e),
+ }
+
+ // Timeout to prevent waiting forever
+ assert!(start.elapsed() < Duration::from_millis(60_000));
+ }
+
+
+ // Verify file contents
+ let result = panic::catch_unwind(|| {
+ let mut file = File::open(file_path.clone()).unwrap();
+ let mut v = Vec::new();
+ file.read_to_end(&mut v).unwrap();
+ assert_eq!(v, name.as_bytes());
+ });
+
+ let _ = fs::remove_file(file_path);
+
+ if let Err(e) = result {
+ panic::resume_unwind(e);
+ }
+
+ // Save this for last to ensure the file is removed.
+ server.shutdown();
+ }
+}
+
+test! {
+ fn async_transferred_notification(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 250,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 60_000;
+ let timeout = 10_000;
+
+ let (_, mut monitor) =
+ client.start_job(
+ server.format_url(name),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // Start the timer now, the initial job creation may be delayed by BITS service startup.
+ let start = Instant::now();
+
+ // First report, immediate
+ let report_1 = monitor.get_status(timeout).expect("should initially be ok").unwrap();
+ let elapsed_to_report_1 = start.elapsed();
+
+ // Transferred notification should come when the job completes in ~250 ms, otherwise we
+ // will be stuck until timeout.
+ let report_2 = monitor.get_status(timeout).expect("should get status update").unwrap();
+ let elapsed_to_report_2 = start.elapsed();
+ assert!(elapsed_to_report_2 < Duration::from_millis(9_000));
+ assert_eq!(report_2.state, BitsJobState::Transferred);
+
+ let short_timeout = 500;
+ let report_3 = monitor.get_status(short_timeout);
+ let elapsed_to_report_3 = start.elapsed();
+
+ if let Ok(report_3) = report_3 {
+ panic!("should be disconnected\n\
+ report_1 ({}.{:03}): {:?}\n\
+ report_2 ({}.{:03}): {:?}\n\
+ report_3 ({}.{:03}): {:?}",
+ elapsed_to_report_1.as_secs(), elapsed_to_report_1.subsec_millis(), report_1,
+ elapsed_to_report_2.as_secs(), elapsed_to_report_2.subsec_millis(), report_2,
+ elapsed_to_report_3.as_secs(), elapsed_to_report_3.subsec_millis(), report_3,
+ );
+ }
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
+
+test! {
+ fn change_interval(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 1000,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 0;
+ let interval = 60_000;
+ let timeout = 10_000;
+
+ let (StartJobSuccess { guid }, mut monitor) =
+ client.start_job(
+ server.format_url(name),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ let start = Instant::now();
+
+ // reduce monitor interval in ~250ms to 500ms
+ let _join = thread::Builder::new()
+ .spawn(move || {
+ thread::sleep(Duration::from_millis(250));
+ client.set_update_interval(guid, 500).unwrap();
+ });
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // Next report should be rescheduled to 500ms by the spawned thread, otherwise no status
+ // until the original 10s interval.
+ monitor.get_status(timeout).expect("expected second status").unwrap();
+ assert!(start.elapsed() < Duration::from_millis(9_000));
+ assert!(start.elapsed() > Duration::from_millis(400));
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
+
+test! {
+ fn async_error_notification(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 100,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 60_000;
+ let timeout = 10_000;
+
+ let (_, mut monitor) =
+ client.start_job(
+ server.format_url("error_404"),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // Start the timer now, the initial job creation may be delayed by BITS service startup.
+ let start = Instant::now();
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // Error notification should come with HEAD response in 100ms, otherwise no status until
+ // 10s interval or timeout.
+ let status = monitor.get_status(timeout).expect("should get status update").unwrap();
+ assert!(start.elapsed() < Duration::from_millis(9_000));
+ assert_eq!(status.state, BitsJobState::Error);
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
+
+test! {
+ fn transient_error(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 100,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 60;
+ let interval = 1_000;
+ let timeout = 10_000;
+
+ let (StartJobSuccess { guid }, mut monitor) =
+ client.start_job(
+ server.format_url("error_500"),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // Start the timer now, the initial job creation may be delayed by BITS service startup.
+ let start = Instant::now();
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // Transient error notification should come when the interval expires in ~1s.
+ let status = monitor.get_status(timeout).expect("should get status update").unwrap();
+ assert!(start.elapsed() > Duration::from_millis(900));
+ assert!(start.elapsed() < Duration::from_millis(9_000));
+ assert_eq!(status.state, BitsJobState::TransientError);
+
+ // Lower no progress timeout to 0
+ let set_timeout_at = Instant::now();
+ client.set_no_progress_timeout(guid, 0).unwrap();
+
+ // Should convert the transient error to a permanent error immediately.
+ let status = monitor.get_status(timeout).expect("should get status update").unwrap();
+ assert!(set_timeout_at.elapsed() < Duration::from_millis(500));
+ assert_eq!(status.state, BitsJobState::Error);
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}
+
+test! {
+ fn transient_to_permanent_error(name: &str, tmp_dir: &TempDir) {
+ let mut server = mock_http_server(name, HttpServerResponses {
+ body: name.to_owned().into_boxed_str().into_boxed_bytes(),
+ delay: 100,
+ });
+
+ let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap();
+
+ let no_progress_timeout_secs = 0;
+ let interval = 1_000;
+ let timeout = 10_000;
+
+ let (_, mut monitor) =
+ client.start_job(
+ server.format_url("error_500"),
+ name.into(),
+ BitsProxyUsage::Preconfig,
+ no_progress_timeout_secs,
+ interval,
+ ).unwrap();
+
+ // Start the timer now, the initial job creation may be delayed by BITS service startup.
+ let start = Instant::now();
+
+ // First immediate report
+ monitor.get_status(timeout).expect("should initially be ok").unwrap();
+
+ // 500 is a transient error, but with no_progress_timeout_secs = 0 it should immediately
+ // produce an error notification with the HEAD response in 100ms. Otherwise no status
+ // until 10s interval or timeout.
+ let status = monitor.get_status(timeout).expect("should get status update").unwrap();
+ assert!(start.elapsed() < Duration::from_millis(500));
+ assert_eq!(status.state, BitsJobState::Error);
+
+ server.shutdown();
+
+ // job will be cancelled by macro
+ }
+}