extern crate futures; extern crate rand; extern crate tokio_executor; extern crate tokio_timer; use tokio_executor::park::{Park, Unpark, UnparkThread}; use tokio_timer::*; use futures::stream::FuturesUnordered; use futures::{Future, Stream}; use rand::Rng; use std::cmp; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Barrier}; use std::thread; use std::time::{Duration, Instant}; struct Signal { rem: AtomicUsize, unpark: UnparkThread, } #[test] fn hammer_complete() { const ITERS: usize = 5; const THREADS: usize = 4; const PER_THREAD: usize = 40; const MIN_DELAY: u64 = 1; const MAX_DELAY: u64 = 5_000; for _ in 0..ITERS { let mut timer = Timer::default(); let handle = timer.handle(); let barrier = Arc::new(Barrier::new(THREADS)); let done = Arc::new(Signal { rem: AtomicUsize::new(THREADS), unpark: timer.get_park().unpark(), }); for _ in 0..THREADS { let handle = handle.clone(); let barrier = barrier.clone(); let done = done.clone(); thread::spawn(move || { let mut exec = FuturesUnordered::new(); let mut rng = rand::thread_rng(); barrier.wait(); for _ in 0..PER_THREAD { let deadline = Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY)); exec.push({ handle.delay(deadline).and_then(move |_| { let now = Instant::now(); assert!(now >= deadline, "deadline greater by {:?}", deadline - now); Ok(()) }) }); } // Run the logic exec.for_each(|_| Ok(())).wait().unwrap(); if 1 == done.rem.fetch_sub(1, SeqCst) { done.unpark.unpark(); } }); } while done.rem.load(SeqCst) > 0 { timer.turn(None).unwrap(); } } } #[test] fn hammer_cancel() { const ITERS: usize = 5; const THREADS: usize = 4; const PER_THREAD: usize = 40; const MIN_DELAY: u64 = 1; const MAX_DELAY: u64 = 5_000; for _ in 0..ITERS { let mut timer = Timer::default(); let handle = timer.handle(); let barrier = Arc::new(Barrier::new(THREADS)); let done = Arc::new(Signal { rem: AtomicUsize::new(THREADS), unpark: timer.get_park().unpark(), }); for _ in 0..THREADS { let handle = handle.clone(); let barrier = barrier.clone(); let done = done.clone(); thread::spawn(move || { let mut exec = FuturesUnordered::new(); let mut rng = rand::thread_rng(); barrier.wait(); for _ in 0..PER_THREAD { let deadline1 = Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY)); let deadline2 = Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY)); let deadline = cmp::min(deadline1, deadline2); let delay = handle.delay(deadline1); let join = handle.timeout(delay, deadline2); exec.push({ join.and_then(move |_| { let now = Instant::now(); assert!(now >= deadline, "deadline greater by {:?}", deadline - now); Ok(()) }) }); } // Run the logic exec.or_else(|e| { assert!(e.is_elapsed()); Ok::<_, ()>(()) }) .for_each(|_| Ok(())) .wait() .unwrap(); if 1 == done.rem.fetch_sub(1, SeqCst) { done.unpark.unpark(); } }); } while done.rem.load(SeqCst) > 0 { timer.turn(None).unwrap(); } } } #[test] fn hammer_reset() { const ITERS: usize = 5; const THREADS: usize = 4; const PER_THREAD: usize = 40; const MIN_DELAY: u64 = 1; const MAX_DELAY: u64 = 250; for _ in 0..ITERS { let mut timer = Timer::default(); let handle = timer.handle(); let barrier = Arc::new(Barrier::new(THREADS)); let done = Arc::new(Signal { rem: AtomicUsize::new(THREADS), unpark: timer.get_park().unpark(), }); for _ in 0..THREADS { let handle = handle.clone(); let barrier = barrier.clone(); let done = done.clone(); thread::spawn(move || { let mut exec = FuturesUnordered::new(); let mut rng = rand::thread_rng(); barrier.wait(); for _ in 0..PER_THREAD { let deadline1 = Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY)); let deadline2 = deadline1 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY)); let deadline3 = deadline2 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY)); exec.push({ handle .delay(deadline1) // Select over a second delay .select2(handle.delay(deadline2)) .map_err(|e| panic!("boom; err={:?}", e)) .and_then(move |res| { use futures::future::Either::*; let now = Instant::now(); assert!( now >= deadline1, "deadline greater by {:?}", deadline1 - now ); let mut other = match res { A((_, other)) => other, B((_, other)) => other, }; other.reset(deadline3); other }) .and_then(move |_| { let now = Instant::now(); assert!( now >= deadline3, "deadline greater by {:?}", deadline3 - now ); Ok(()) }) }); } // Run the logic exec.for_each(|_| Ok(())).wait().unwrap(); if 1 == done.rem.fetch_sub(1, SeqCst) { done.unpark.unpark(); } }); } while done.rem.load(SeqCst) > 0 { timer.turn(None).unwrap(); } } }