diff options
Diffstat (limited to 'toolkit/components/bitsdownload/bits_client/src/in_process')
-rw-r--r-- | toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs | 504 | ||||
-rw-r--r-- | toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs | 628 |
2 files changed, 1132 insertions, 0 deletions
diff --git a/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs b/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs new file mode 100644 index 0000000000..c7f6869167 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/src/in_process/mod.rs @@ -0,0 +1,504 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +use std::cmp; +use std::collections::{hash_map, HashMap}; +use std::ffi; +use std::path; +use std::sync::{Arc, Condvar, Mutex, Weak}; +use std::time::{Duration, Instant}; + +use bits::{ + BackgroundCopyManager, BitsJob, BitsJobPriority, BitsProxyUsage, BG_S_PARTIAL_COMPLETE, E_FAIL, +}; +use guid_win::Guid; + +use bits_protocol::*; + +use super::Error; + +// This is a macro in order to use the NotFound and GetJob variants from whatever enum is in scope. +macro_rules! get_job { + ($bcm:ident, $guid:expr, $name:expr) => {{ + $bcm = BackgroundCopyManager::connect().map_err(|e| { + ConnectBcm(HResultMessage { + hr: e.code(), + message: e.to_string(), + }) + })?; + $bcm.find_job_by_guid_and_name($guid, $name) + .map_err(|e| GetJob($crate::in_process::format_error(&$bcm, e)))? + .ok_or(NotFound)? + }}; +} + +fn format_error(bcm: &BackgroundCopyManager, error: comedy::HResult) -> HResultMessage { + let bits_description = bcm.get_error_description(error.code()).ok(); + + HResultMessage { + hr: error.code(), + message: if let Some(desc) = bits_description { + format!("{}: {}", error, desc) + } else { + format!("{}", error) + }, + } +} + +// The in-process client uses direct BITS calls via the `bits` crate. +// See the corresponding functions in BitsClient. +pub struct InProcessClient { + job_name: ffi::OsString, + save_path_prefix: path::PathBuf, + monitors: HashMap<Guid, InProcessMonitorControl>, +} + +impl InProcessClient { + pub fn new( + job_name: ffi::OsString, + save_path_prefix: ffi::OsString, + ) -> Result<InProcessClient, Error> { + Ok(InProcessClient { + job_name, + save_path_prefix: path::PathBuf::from(save_path_prefix), + monitors: HashMap::new(), + }) + } + + pub fn start_job( + &mut self, + url: ffi::OsString, + save_path: ffi::OsString, + proxy_usage: BitsProxyUsage, + no_progress_timeout_secs: u32, + monitor_interval_millis: u32, + ) -> Result<(StartJobSuccess, InProcessMonitor), StartJobFailure> { + use StartJobFailure::*; + + let full_path = self.save_path_prefix.join(save_path); + + // Verify that `full_path` is under the directory called `save_path_prefix`. + { + let canonical_prefix = self.save_path_prefix.canonicalize().map_err(|e| { + ArgumentValidation(format!("save_path_prefix.canonicalize(): {}", e)) + })?; + // Full path minus file name, canonicalize() fails with nonexistent files, but the + // parent directory ought to exist. + let canonical_full_path = full_path + .parent() + .ok_or_else(|| ArgumentValidation("full_path.parent(): None".into()))? + .canonicalize() + .map_err(|e| { + ArgumentValidation(format!("full_path.parent().canonicalize(): {}", e)) + })?; + + if !canonical_full_path.starts_with(&canonical_prefix) { + return Err(ArgumentValidation(format!( + "{:?} is not within {:?}", + canonical_full_path, canonical_prefix + ))); + } + } + + // TODO: Should the job be explicitly cleaned up if this fn can't return success? + // If the job is dropped before `AddFile` succeeds, I think it automatically gets + // deleted from the queue. There is only one fallible call after that (`Resume`). + + let bcm = BackgroundCopyManager::connect().map_err(|e| { + ConnectBcm(HResultMessage { + hr: e.code(), + message: e.to_string(), + }) + })?; + let mut job = bcm + .create_job(&self.job_name) + .map_err(|e| Create(format_error(&bcm, e)))?; + + let guid = job.guid().map_err(|e| OtherBITS(format_error(&bcm, e)))?; + + (|| { + job.set_proxy_usage(proxy_usage)?; + job.set_minimum_retry_delay(60)?; + job.set_no_progress_timeout(no_progress_timeout_secs)?; + job.set_redirect_report()?; + + job.set_priority(BitsJobPriority::Foreground)?; + + Ok(()) + })() + .map_err(|e| ApplySettings(format_error(&bcm, e)))?; + + let (client, control) = InProcessMonitor::new(&mut job, monitor_interval_millis) + .map_err(|e| OtherBITS(format_error(&bcm, e)))?; + + job.add_file(&url, &full_path.into_os_string()) + .map_err(|e| AddFile(format_error(&bcm, e)))?; + + job.resume().map_err(|e| Resume(format_error(&bcm, e)))?; + + self.monitors.insert(guid.clone(), control); + + Ok((StartJobSuccess { guid }, client)) + } + + pub fn monitor_job( + &mut self, + guid: Guid, + interval_millis: u32, + ) -> Result<InProcessMonitor, MonitorJobFailure> { + use MonitorJobFailure::*; + + // Stop any preexisting monitor for the same guid. + let _ = self.stop_update(guid.clone()); + + let bcm; + let (client, control) = + InProcessMonitor::new(&mut get_job!(bcm, &guid, &self.job_name), interval_millis) + .map_err(|e| OtherBITS(format_error(&bcm, e)))?; + + self.monitors.insert(guid, control); + + Ok(client) + } + + pub fn suspend_job(&mut self, guid: Guid) -> Result<(), SuspendJobFailure> { + use SuspendJobFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .suspend() + .map_err(|e| SuspendJob(format_error(&bcm, e)))?; + + Ok(()) + } + + pub fn resume_job(&mut self, guid: Guid) -> Result<(), ResumeJobFailure> { + use ResumeJobFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .resume() + .map_err(|e| ResumeJob(format_error(&bcm, e)))?; + + Ok(()) + } + + pub fn set_job_priority( + &mut self, + guid: Guid, + foreground: bool, + ) -> Result<(), SetJobPriorityFailure> { + use SetJobPriorityFailure::*; + + let priority = if foreground { + BitsJobPriority::Foreground + } else { + BitsJobPriority::Normal + }; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .set_priority(priority) + .map_err(|e| ApplySettings(format_error(&bcm, e)))?; + + Ok(()) + } + + pub fn set_no_progress_timeout( + &mut self, + guid: Guid, + timeout_secs: u32, + ) -> Result<(), SetNoProgressTimeoutFailure> { + use SetNoProgressTimeoutFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .set_no_progress_timeout(timeout_secs) + .map_err(|e| ApplySettings(format_error(&bcm, e)))?; + + Ok(()) + } + + fn get_monitor_control_sender(&mut self, guid: Guid) -> Option<Arc<ControlPair>> { + if let hash_map::Entry::Occupied(occ) = self.monitors.entry(guid) { + if let Some(sender) = occ.get().0.upgrade() { + Some(sender) + } else { + // Remove dangling Weak + occ.remove_entry(); + None + } + } else { + None + } + } + + pub fn set_update_interval( + &mut self, + guid: Guid, + interval_millis: u32, + ) -> Result<(), SetUpdateIntervalFailure> { + use SetUpdateIntervalFailure::*; + + if let Some(sender) = self.get_monitor_control_sender(guid) { + let mut s = sender.1.lock().unwrap(); + s.interval_millis = interval_millis; + sender.0.notify_all(); + Ok(()) + } else { + Err(NotFound) + } + } + + pub fn stop_update(&mut self, guid: Guid) -> Result<(), SetUpdateIntervalFailure> { + use SetUpdateIntervalFailure::*; + + if let Some(sender) = self.get_monitor_control_sender(guid) { + sender.1.lock().unwrap().shutdown = true; + sender.0.notify_all(); + Ok(()) + } else { + Err(NotFound) + } + } + + pub fn complete_job(&mut self, guid: Guid) -> Result<(), CompleteJobFailure> { + use CompleteJobFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .complete() + .map_err(|e| CompleteJob(format_error(&bcm, e))) + .and_then(|hr| { + if hr == BG_S_PARTIAL_COMPLETE as i32 { + Err(PartialComplete) + } else { + Ok(()) + } + })?; + + let _ = self.stop_update(guid); + + Ok(()) + } + + pub fn cancel_job(&mut self, guid: Guid) -> Result<(), CancelJobFailure> { + use CancelJobFailure::*; + + let bcm; + get_job!(bcm, &guid, &self.job_name) + .cancel() + .map_err(|e| CancelJob(format_error(&bcm, e)))?; + + let _ = self.stop_update(guid); + + Ok(()) + } +} + +// InProcessMonitor can be used on any thread, and `ControlPair` can be synchronously modified to +// control a blocked `get_status` call from another thread. +pub struct InProcessMonitor { + vars: Arc<ControlPair>, + guid: Guid, + last_status_time: Option<Instant>, + last_url: Option<ffi::OsString>, +} + +// The `Condvar` is notified when `InProcessMonitorVars` changes. +type ControlPair = (Condvar, Mutex<InProcessMonitorVars>); +struct InProcessMonitorControl(Weak<ControlPair>); + +// RefUnwindSafe is not impl'd for Condvar but likely should be, +// see https://github.com/rust-lang/rust/issues/54768 +impl std::panic::RefUnwindSafe for InProcessMonitorControl {} + +struct InProcessMonitorVars { + interval_millis: u32, + notified: bool, + shutdown: bool, +} + +impl InProcessMonitor { + fn new( + job: &mut BitsJob, + interval_millis: u32, + ) -> Result<(InProcessMonitor, InProcessMonitorControl), comedy::HResult> { + let guid = job.guid()?; + + let vars = Arc::new(( + Condvar::new(), + Mutex::new(InProcessMonitorVars { + interval_millis, + notified: false, + shutdown: false, + }), + )); + + let transferred_control = InProcessMonitorControl(Arc::downgrade(&vars)); + let transferred_cb = Box::new(move || { + if let Some(control) = transferred_control.0.upgrade() { + if let Ok(mut vars) = control.1.lock() { + vars.notified = true; + control.0.notify_all(); + return Ok(()); + } + } + Err(E_FAIL) + }); + + let error_control = InProcessMonitorControl(Arc::downgrade(&vars)); + let error_cb = Box::new(move || { + if let Some(control) = error_control.0.upgrade() { + if let Ok(mut vars) = control.1.lock() { + vars.notified = true; + control.0.notify_all(); + return Ok(()); + } + } + Err(E_FAIL) + }); + + // Note: These callbacks are never explicitly cleared. They will be freed when the + // job is deleted from BITS, and they will be cleared if an attempt is made to call them + // when they are no longer valid (e.g. after the process exits). This is done mostly for + // simplicity and should be safe. + + job.register_callbacks(Some(transferred_cb), Some(error_cb), None)?; + + let control = InProcessMonitorControl(Arc::downgrade(&vars)); + + let monitor = InProcessMonitor { + guid, + vars, + last_status_time: None, + last_url: None, + }; + + Ok((monitor, control)) + } + + pub fn get_status( + &mut self, + timeout_millis: u32, + ) -> Result<Result<JobStatus, HResultMessage>, Error> { + let timeout = Duration::from_millis(u64::from(timeout_millis)); + + let started = Instant::now(); + let timeout_end = started + timeout; + + { + let mut s = self.vars.1.lock().unwrap(); + loop { + let wait_start = Instant::now(); + + if s.shutdown { + // Disconnected, immediately return error. + // Note: Shutdown takes priority over simultaneous notification. + return Err(Error::NotConnected); + } + + if wait_start >= timeout_end { + // Timed out, immediately return timeout error. + // This should not normally happen with the in-process monitor, but the + // monitor interval could be longer than the timeout. + s.shutdown = true; + return Err(Error::Timeout); + } + + // Get the interval every pass through the loop, in case it has changed. + let interval = Duration::from_millis(u64::from(s.interval_millis)); + + let wait_until = self + .last_status_time + .map(|last_status_time| cmp::min(last_status_time + interval, timeout_end)); + + if s.notified { + // Notified, exit loop to get status. + s.notified = false; + break; + } + + if wait_until.is_none() { + // First status report, no waiting, exit loop to get status. + break; + } + + let wait_until = wait_until.unwrap(); + + if wait_until <= wait_start { + // No time left to wait. This can't be due to timeout because + // `wait_until <= wait_start < timeout_end`. + // Status report due, exit loop to get status. + break; + } + + // Wait. + // Do not attempt to recover from poisoned Mutex. + s = self + .vars + .0 + .wait_timeout(s, wait_until - wait_start) + .unwrap() + .0; + + // Mutex re-acquired, loop. + } + } + + // No error yet, start getting status now. + self.last_status_time = Some(Instant::now()); + + let bcm = match BackgroundCopyManager::connect() { + Ok(bcm) => bcm, + Err(e) => { + // On any error, disconnect. + self.vars.1.lock().unwrap().shutdown = true; + + // Errors below can use the BCM to do `format_error()`, but this one just gets the + // basic `comedy::HResult` treatment. + return Ok(Err(HResultMessage { + hr: e.code(), + message: format!("{}", e), + })); + } + }; + + Ok((|| { + let mut job = bcm.get_job_by_guid(&self.guid)?; + + let status = job.get_status()?; + let url = job.get_first_file()?.get_remote_name()?; + + Ok(JobStatus { + state: status.state, + progress: status.progress, + error_count: status.error_count, + error: status.error.map(|e| JobError { + context: e.context, + context_str: e.context_str, + error: HResultMessage { + hr: e.error, + message: e.error_str, + }, + }), + times: status.times, + url: if self.last_url.is_some() && *self.last_url.as_ref().unwrap() == url { + None + } else { + self.last_url = Some(url); + self.last_url.clone() + }, + }) + })() + .map_err(|e| { + // On any error, disconnect. + self.vars.1.lock().unwrap().shutdown = true; + format_error(&bcm, e) + })) + } +} + +#[cfg(test)] +mod tests; diff --git a/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs b/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs new file mode 100644 index 0000000000..5545f6bd74 --- /dev/null +++ b/toolkit/components/bitsdownload/bits_client/src/in_process/tests.rs @@ -0,0 +1,628 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +// These are full integration tests that use the BITS service. + +// TODO +// It may make sense to restrict how many tests can run at once. BITS is only supposed to support +// four simultaneous notifications per user, it is not impossible that this test suite could +// exceed that. + +#![cfg(test)] +extern crate bits; +extern crate lazy_static; +extern crate rand; +extern crate regex; +extern crate tempfile; + +use std::ffi::{OsStr, OsString}; +use std::fs::{self, File}; +use std::io::{Read, Write}; +use std::net::{SocketAddr, TcpListener, TcpStream}; +use std::panic; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +use self::{ + bits::BackgroundCopyManager, + lazy_static::lazy_static, + rand::{thread_rng, Rng}, + regex::bytes::Regex, + tempfile::{Builder, TempDir}, +}; +use super::{ + super::{BitsJobState, Error}, + BitsProxyUsage, InProcessClient, StartJobSuccess, +}; + +static SERVER_ADDRESS: [u8; 4] = [127, 0, 0, 1]; + +lazy_static! { + static ref TEST_MUTEX: Mutex<()> = Mutex::new(()); +} + +fn format_job_name(name: &str) -> OsString { + format!("InProcessClient Test {}", name).into() +} + +fn format_dir_prefix(tmp_dir: &TempDir) -> OsString { + let mut dir = tmp_dir.path().to_path_buf().into_os_string(); + dir.push("\\"); + dir +} + +fn cancel_jobs(name: &OsStr) { + BackgroundCopyManager::connect() + .unwrap() + .cancel_jobs_by_name(name) + .unwrap(); +} + +struct HttpServerResponses { + body: Box<[u8]>, + delay: u64, + //error: Box<[u8]>, +} + +struct MockHttpServerHandle { + port: u16, + join: Option<JoinHandle<Result<(), ()>>>, + shutdown: Arc<(Mutex<bool>, Condvar)>, +} + +impl MockHttpServerHandle { + fn shutdown(&mut self) { + if self.join.is_none() { + return; + } + + { + let &(ref lock, ref cvar) = &*self.shutdown; + let mut shutdown = lock.lock().unwrap(); + + if !*shutdown { + *shutdown = true; + cvar.notify_all(); + } + } + // Wake up the server from `accept()`. Will fail if the server wasn't listening. + let _ = TcpStream::connect_timeout( + &(SERVER_ADDRESS, self.port).into(), + Duration::from_millis(10_000), + ); + + match self.join.take().unwrap().join() { + Ok(_) => {} + Err(p) => panic::resume_unwind(p), + } + } + + fn format_url(&self, name: &str) -> OsString { + format!( + "http://{}/{}", + SocketAddr::from((SERVER_ADDRESS, self.port)), + name + ) + .into() + } +} + +fn mock_http_server(name: &'static str, responses: HttpServerResponses) -> MockHttpServerHandle { + let mut bind_retries = 10; + let shutdown = Arc::new((Mutex::new(false), Condvar::new())); + let caller_shutdown = shutdown.clone(); + + let (listener, port) = loop { + let port = thread_rng().gen_range(1024..0x1_0000u32) as u16; + match TcpListener::bind(SocketAddr::from((SERVER_ADDRESS, port))) { + Ok(listener) => { + break (listener, port); + } + r @ Err(_) => { + if bind_retries == 0 { + r.unwrap(); + } + bind_retries -= 1; + continue; + } + } + }; + + let join = thread::Builder::new() + .name(format!("mock_http_server {}", name)) + .spawn(move || { + // returns Err(()) if server should shut down immediately + fn check_shutdown(shutdown: &Arc<(Mutex<bool>, Condvar)>) -> Result<(), ()> { + if *shutdown.0.lock().unwrap() { + Err(()) + } else { + Ok(()) + } + } + fn sleep(shutdown: &Arc<(Mutex<bool>, Condvar)>, delay_millis: u64) -> Result<(), ()> { + let sleep_start = Instant::now(); + let sleep_end = sleep_start + Duration::from_millis(delay_millis); + + let (ref lock, ref cvar) = **shutdown; + let mut shutdown_requested = lock.lock().unwrap(); + loop { + if *shutdown_requested { + return Err(()); + } + + let before_wait = Instant::now(); + if before_wait >= sleep_end { + return Ok(()); + } + let wait_dur = sleep_end - before_wait; + shutdown_requested = cvar.wait_timeout(shutdown_requested, wait_dur).unwrap().0; + } + } + + let error_404 = Regex::new(r"^((GET)|(HEAD)) [[:print:]]*/error_404 ").unwrap(); + let error_500 = Regex::new(r"^((GET)|(HEAD)) [[:print:]]*/error_500 ").unwrap(); + + loop { + let (mut socket, _addr) = listener.accept().expect("accept should succeed"); + + socket + .set_read_timeout(Some(Duration::from_millis(10_000))) + .unwrap(); + let mut s = Vec::new(); + for b in Read::by_ref(&mut socket).bytes() { + if b.is_err() { + eprintln!("read error {:?}", b); + break; + } + let b = b.unwrap(); + s.push(b); + if s.ends_with(b"\r\n\r\n") { + break; + } + check_shutdown(&shutdown)?; + } + + // request received + + check_shutdown(&shutdown)?; + + // Special error URIs + if error_404.is_match(&s) { + sleep(&shutdown, responses.delay)?; + let result = socket + .write(b"HTTP/1.1 404 Not Found\r\n\r\n") + .and_then(|_| socket.flush()); + if let Err(e) = result { + eprintln!("error writing 404 header {:?}", e); + } + continue; + } + + if error_500.is_match(&s) { + sleep(&shutdown, responses.delay)?; + let result = socket + .write(b"HTTP/1.1 500 Internal Server Error\r\n\r\n") + .and_then(|_| socket.flush()); + if let Err(e) = result { + eprintln!("error writing 500 header {:?}", e); + } + continue; + } + + // Response with a body. + if s.starts_with(b"HEAD") || s.starts_with(b"GET") { + let result = socket + .write( + format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", + responses.body.len() + ) + .as_bytes(), + ) + .and_then(|_| socket.flush()); + if let Err(e) = result { + eprintln!("error writing header {:?}", e); + continue; + } + } + + if s.starts_with(b"GET") { + sleep(&shutdown, responses.delay)?; + let result = socket.write(&responses.body).and_then(|_| socket.flush()); + if let Err(e) = result { + eprintln!("error writing content {:?}", e); + continue; + } + } + } + }); + + MockHttpServerHandle { + port, + join: Some(join.unwrap()), + shutdown: caller_shutdown, + } +} + +// Test wrapper to ensure jobs are canceled, set up name strings +macro_rules! test { + (fn $name:ident($param:ident : &str, $tmpdir:ident : &TempDir) $body:block) => { + #[test] + fn $name() { + let $param = stringify!($name); + let $tmpdir = &Builder::new().prefix($param).tempdir().unwrap(); + + let result = panic::catch_unwind(|| $body); + + cancel_jobs(&format_job_name($param)); + + if let Err(e) = result { + panic::resume_unwind(e); + } + } + }; +} + +test! { + fn start_monitor_and_cancel(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 10_000, + }); + + let mut client = InProcessClient::new(format_job_name(name), tmp_dir.path().into()).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 10_000; + let timeout = 10_000; + + let (StartJobSuccess {guid}, mut monitor) = + client.start_job( + server.format_url(name), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // cancel in ~250ms + let _join = thread::Builder::new() + .spawn(move || { + thread::sleep(Duration::from_millis(250)); + client.cancel_job(guid).unwrap(); + }); + + let start = Instant::now(); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // ~250ms the cancel should cause an immediate disconnect (otherwise we wouldn't get + // an update until 10s when the transfer completes or the interval expires) + match monitor.get_status(timeout) { + Err(Error::NotConnected) => {}, + Ok(r) => panic!("unexpected success from get_status() {:?}", r), + Err(e) => panic!("unexpected failure from get_status() {:?}", e), + } + assert!(start.elapsed() < Duration::from_millis(9_000)); + + server.shutdown(); + } +} + +test! { + fn start_monitor_and_complete(name: &str, tmp_dir: &TempDir) { + let file_path = tmp_dir.path().join(name); + + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 500, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 100; + let timeout = 10_000; + + let (StartJobSuccess {guid}, mut monitor) = + client.start_job( + server.format_url(name), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + let start = Instant::now(); + + // Get status reports until transfer finishes (~500ms) + let mut completed = false; + loop { + match monitor.get_status(timeout) { + Err(e) => { + if completed { + break; + } else { + panic!("monitor failed before completion {:?}", e); + } + } + Ok(Ok(status)) => match BitsJobState::from(status.state) { + BitsJobState::Queued | BitsJobState::Connecting + | BitsJobState::Transferring => { + //eprintln!("{:?}", BitsJobState::from(status.state)); + //eprintln!("{:?}", status); + + // As long as there is no error, setting the timeout to 0 will not + // fail an active transfer. + client.set_no_progress_timeout(guid.clone(), 0).unwrap(); + } + BitsJobState::Transferred => { + client.complete_job(guid.clone()).unwrap(); + completed = true; + } + _ => { + panic!("{:?}", status); + } + } + Ok(Err(e)) => panic!("{:?}", e), + } + + // Timeout to prevent waiting forever + assert!(start.elapsed() < Duration::from_millis(60_000)); + } + + + // Verify file contents + let result = panic::catch_unwind(|| { + let mut file = File::open(file_path.clone()).unwrap(); + let mut v = Vec::new(); + file.read_to_end(&mut v).unwrap(); + assert_eq!(v, name.as_bytes()); + }); + + let _ = fs::remove_file(file_path); + + if let Err(e) = result { + panic::resume_unwind(e); + } + + // Save this for last to ensure the file is removed. + server.shutdown(); + } +} + +test! { + fn async_transferred_notification(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 250, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 60_000; + let timeout = 10_000; + + let (_, mut monitor) = + client.start_job( + server.format_url(name), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // Start the timer now, the initial job creation may be delayed by BITS service startup. + let start = Instant::now(); + + // First report, immediate + let report_1 = monitor.get_status(timeout).expect("should initially be ok").unwrap(); + let elapsed_to_report_1 = start.elapsed(); + + // Transferred notification should come when the job completes in ~250 ms, otherwise we + // will be stuck until timeout. + let report_2 = monitor.get_status(timeout).expect("should get status update").unwrap(); + let elapsed_to_report_2 = start.elapsed(); + assert!(elapsed_to_report_2 < Duration::from_millis(9_000)); + assert_eq!(report_2.state, BitsJobState::Transferred); + + let short_timeout = 500; + let report_3 = monitor.get_status(short_timeout); + let elapsed_to_report_3 = start.elapsed(); + + if let Ok(report_3) = report_3 { + panic!("should be disconnected\n\ + report_1 ({}.{:03}): {:?}\n\ + report_2 ({}.{:03}): {:?}\n\ + report_3 ({}.{:03}): {:?}", + elapsed_to_report_1.as_secs(), elapsed_to_report_1.subsec_millis(), report_1, + elapsed_to_report_2.as_secs(), elapsed_to_report_2.subsec_millis(), report_2, + elapsed_to_report_3.as_secs(), elapsed_to_report_3.subsec_millis(), report_3, + ); + } + + server.shutdown(); + + // job will be cancelled by macro + } +} + +test! { + fn change_interval(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 1000, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 0; + let interval = 60_000; + let timeout = 10_000; + + let (StartJobSuccess { guid }, mut monitor) = + client.start_job( + server.format_url(name), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + let start = Instant::now(); + + // reduce monitor interval in ~250ms to 500ms + let _join = thread::Builder::new() + .spawn(move || { + thread::sleep(Duration::from_millis(250)); + client.set_update_interval(guid, 500).unwrap(); + }); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // Next report should be rescheduled to 500ms by the spawned thread, otherwise no status + // until the original 10s interval. + monitor.get_status(timeout).expect("expected second status").unwrap(); + assert!(start.elapsed() < Duration::from_millis(9_000)); + assert!(start.elapsed() > Duration::from_millis(400)); + + server.shutdown(); + + // job will be cancelled by macro + } +} + +test! { + fn async_error_notification(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 100, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 60_000; + let timeout = 10_000; + + let (_, mut monitor) = + client.start_job( + server.format_url("error_404"), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // Start the timer now, the initial job creation may be delayed by BITS service startup. + let start = Instant::now(); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // Error notification should come with HEAD response in 100ms, otherwise no status until + // 10s interval or timeout. + let status = monitor.get_status(timeout).expect("should get status update").unwrap(); + assert!(start.elapsed() < Duration::from_millis(9_000)); + assert_eq!(status.state, BitsJobState::Error); + + server.shutdown(); + + // job will be cancelled by macro + } +} + +test! { + fn transient_error(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 100, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 60; + let interval = 1_000; + let timeout = 10_000; + + let (StartJobSuccess { guid }, mut monitor) = + client.start_job( + server.format_url("error_500"), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // Start the timer now, the initial job creation may be delayed by BITS service startup. + let start = Instant::now(); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // Transient error notification should come when the interval expires in ~1s. + let status = monitor.get_status(timeout).expect("should get status update").unwrap(); + assert!(start.elapsed() > Duration::from_millis(900)); + assert!(start.elapsed() < Duration::from_millis(9_000)); + assert_eq!(status.state, BitsJobState::TransientError); + + // Lower no progress timeout to 0 + let set_timeout_at = Instant::now(); + client.set_no_progress_timeout(guid, 0).unwrap(); + + // Should convert the transient error to a permanent error immediately. + let status = monitor.get_status(timeout).expect("should get status update").unwrap(); + assert!(set_timeout_at.elapsed() < Duration::from_millis(500)); + assert_eq!(status.state, BitsJobState::Error); + + server.shutdown(); + + // job will be cancelled by macro + } +} + +test! { + fn transient_to_permanent_error(name: &str, tmp_dir: &TempDir) { + let mut server = mock_http_server(name, HttpServerResponses { + body: name.to_owned().into_boxed_str().into_boxed_bytes(), + delay: 100, + }); + + let mut client = InProcessClient::new(format_job_name(name), format_dir_prefix(tmp_dir)).unwrap(); + + let no_progress_timeout_secs = 0; + let interval = 1_000; + let timeout = 10_000; + + let (_, mut monitor) = + client.start_job( + server.format_url("error_500"), + name.into(), + BitsProxyUsage::Preconfig, + no_progress_timeout_secs, + interval, + ).unwrap(); + + // Start the timer now, the initial job creation may be delayed by BITS service startup. + let start = Instant::now(); + + // First immediate report + monitor.get_status(timeout).expect("should initially be ok").unwrap(); + + // 500 is a transient error, but with no_progress_timeout_secs = 0 it should immediately + // produce an error notification with the HEAD response in 100ms. Otherwise no status + // until 10s interval or timeout. + let status = monitor.get_status(timeout).expect("should get status update").unwrap(); + assert!(start.elapsed() < Duration::from_millis(500)); + assert_eq!(status.state, BitsJobState::Error); + + server.shutdown(); + + // job will be cancelled by macro + } +} |