diff options
Diffstat (limited to 'third_party/rust/tokio-timer/tests/hammer.rs')
-rw-r--r-- | third_party/rust/tokio-timer/tests/hammer.rs | 241 |
1 files changed, 241 insertions, 0 deletions
diff --git a/third_party/rust/tokio-timer/tests/hammer.rs b/third_party/rust/tokio-timer/tests/hammer.rs new file mode 100644 index 0000000000..9986a9e659 --- /dev/null +++ b/third_party/rust/tokio-timer/tests/hammer.rs @@ -0,0 +1,241 @@ +extern crate futures; +extern crate rand; +extern crate tokio_executor; +extern crate tokio_timer; + +use tokio_executor::park::{Park, Unpark, UnparkThread}; +use tokio_timer::*; + +use futures::stream::FuturesUnordered; +use futures::{Future, Stream}; +use rand::Rng; + +use std::cmp; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Barrier}; +use std::thread; +use std::time::{Duration, Instant}; + +struct Signal { + rem: AtomicUsize, + unpark: UnparkThread, +} + +#[test] +fn hammer_complete() { + const ITERS: usize = 5; + const THREADS: usize = 4; + const PER_THREAD: usize = 40; + const MIN_DELAY: u64 = 1; + const MAX_DELAY: u64 = 5_000; + + for _ in 0..ITERS { + let mut timer = Timer::default(); + let handle = timer.handle(); + let barrier = Arc::new(Barrier::new(THREADS)); + + let done = Arc::new(Signal { + rem: AtomicUsize::new(THREADS), + unpark: timer.get_park().unpark(), + }); + + for _ in 0..THREADS { + let handle = handle.clone(); + let barrier = barrier.clone(); + let done = done.clone(); + + thread::spawn(move || { + let mut exec = FuturesUnordered::new(); + let mut rng = rand::thread_rng(); + + barrier.wait(); + + for _ in 0..PER_THREAD { + let deadline = + Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY)); + + exec.push({ + handle.delay(deadline).and_then(move |_| { + let now = Instant::now(); + assert!(now >= deadline, "deadline greater by {:?}", deadline - now); + Ok(()) + }) + }); + } + + // Run the logic + exec.for_each(|_| Ok(())).wait().unwrap(); + + if 1 == done.rem.fetch_sub(1, SeqCst) { + done.unpark.unpark(); + } + }); + } + + while done.rem.load(SeqCst) > 0 { + timer.turn(None).unwrap(); + } + } +} + +#[test] +fn hammer_cancel() { + const ITERS: usize = 5; + const THREADS: usize = 4; + const PER_THREAD: usize = 40; + const MIN_DELAY: u64 = 1; + const MAX_DELAY: u64 = 5_000; + + for _ in 0..ITERS { + let mut timer = Timer::default(); + let handle = timer.handle(); + let barrier = Arc::new(Barrier::new(THREADS)); + + let done = Arc::new(Signal { + rem: AtomicUsize::new(THREADS), + unpark: timer.get_park().unpark(), + }); + + for _ in 0..THREADS { + let handle = handle.clone(); + let barrier = barrier.clone(); + let done = done.clone(); + + thread::spawn(move || { + let mut exec = FuturesUnordered::new(); + let mut rng = rand::thread_rng(); + + barrier.wait(); + + for _ in 0..PER_THREAD { + let deadline1 = + Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY)); + + let deadline2 = + Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY)); + + let deadline = cmp::min(deadline1, deadline2); + + let delay = handle.delay(deadline1); + let join = handle.timeout(delay, deadline2); + + exec.push({ + join.and_then(move |_| { + let now = Instant::now(); + assert!(now >= deadline, "deadline greater by {:?}", deadline - now); + Ok(()) + }) + }); + } + + // Run the logic + exec.or_else(|e| { + assert!(e.is_elapsed()); + Ok::<_, ()>(()) + }) + .for_each(|_| Ok(())) + .wait() + .unwrap(); + + if 1 == done.rem.fetch_sub(1, SeqCst) { + done.unpark.unpark(); + } + }); + } + + while done.rem.load(SeqCst) > 0 { + timer.turn(None).unwrap(); + } + } +} + +#[test] +fn hammer_reset() { + const ITERS: usize = 5; + const THREADS: usize = 4; + const PER_THREAD: usize = 40; + const MIN_DELAY: u64 = 1; + const MAX_DELAY: u64 = 250; + + for _ in 0..ITERS { + let mut timer = Timer::default(); + let handle = timer.handle(); + let barrier = Arc::new(Barrier::new(THREADS)); + + let done = Arc::new(Signal { + rem: AtomicUsize::new(THREADS), + unpark: timer.get_park().unpark(), + }); + + for _ in 0..THREADS { + let handle = handle.clone(); + let barrier = barrier.clone(); + let done = done.clone(); + + thread::spawn(move || { + let mut exec = FuturesUnordered::new(); + let mut rng = rand::thread_rng(); + + barrier.wait(); + + for _ in 0..PER_THREAD { + let deadline1 = + Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY)); + + let deadline2 = + deadline1 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY)); + + let deadline3 = + deadline2 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY)); + + exec.push({ + handle + .delay(deadline1) + // Select over a second delay + .select2(handle.delay(deadline2)) + .map_err(|e| panic!("boom; err={:?}", e)) + .and_then(move |res| { + use futures::future::Either::*; + + let now = Instant::now(); + assert!( + now >= deadline1, + "deadline greater by {:?}", + deadline1 - now + ); + + let mut other = match res { + A((_, other)) => other, + B((_, other)) => other, + }; + + other.reset(deadline3); + other + }) + .and_then(move |_| { + let now = Instant::now(); + assert!( + now >= deadline3, + "deadline greater by {:?}", + deadline3 - now + ); + Ok(()) + }) + }); + } + + // Run the logic + exec.for_each(|_| Ok(())).wait().unwrap(); + + if 1 == done.rem.fetch_sub(1, SeqCst) { + done.unpark.unpark(); + } + }); + } + + while done.rem.load(SeqCst) > 0 { + timer.turn(None).unwrap(); + } + } +} |