summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-timer/tests/hammer.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-timer/tests/hammer.rs')
-rw-r--r--third_party/rust/tokio-timer/tests/hammer.rs241
1 files changed, 241 insertions, 0 deletions
diff --git a/third_party/rust/tokio-timer/tests/hammer.rs b/third_party/rust/tokio-timer/tests/hammer.rs
new file mode 100644
index 0000000000..9986a9e659
--- /dev/null
+++ b/third_party/rust/tokio-timer/tests/hammer.rs
@@ -0,0 +1,241 @@
+extern crate futures;
+extern crate rand;
+extern crate tokio_executor;
+extern crate tokio_timer;
+
+use tokio_executor::park::{Park, Unpark, UnparkThread};
+use tokio_timer::*;
+
+use futures::stream::FuturesUnordered;
+use futures::{Future, Stream};
+use rand::Rng;
+
+use std::cmp;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+use std::sync::{Arc, Barrier};
+use std::thread;
+use std::time::{Duration, Instant};
+
+struct Signal {
+ rem: AtomicUsize,
+ unpark: UnparkThread,
+}
+
+#[test]
+fn hammer_complete() {
+ const ITERS: usize = 5;
+ const THREADS: usize = 4;
+ const PER_THREAD: usize = 40;
+ const MIN_DELAY: u64 = 1;
+ const MAX_DELAY: u64 = 5_000;
+
+ for _ in 0..ITERS {
+ let mut timer = Timer::default();
+ let handle = timer.handle();
+ let barrier = Arc::new(Barrier::new(THREADS));
+
+ let done = Arc::new(Signal {
+ rem: AtomicUsize::new(THREADS),
+ unpark: timer.get_park().unpark(),
+ });
+
+ for _ in 0..THREADS {
+ let handle = handle.clone();
+ let barrier = barrier.clone();
+ let done = done.clone();
+
+ thread::spawn(move || {
+ let mut exec = FuturesUnordered::new();
+ let mut rng = rand::thread_rng();
+
+ barrier.wait();
+
+ for _ in 0..PER_THREAD {
+ let deadline =
+ Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
+
+ exec.push({
+ handle.delay(deadline).and_then(move |_| {
+ let now = Instant::now();
+ assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
+ Ok(())
+ })
+ });
+ }
+
+ // Run the logic
+ exec.for_each(|_| Ok(())).wait().unwrap();
+
+ if 1 == done.rem.fetch_sub(1, SeqCst) {
+ done.unpark.unpark();
+ }
+ });
+ }
+
+ while done.rem.load(SeqCst) > 0 {
+ timer.turn(None).unwrap();
+ }
+ }
+}
+
+#[test]
+fn hammer_cancel() {
+ const ITERS: usize = 5;
+ const THREADS: usize = 4;
+ const PER_THREAD: usize = 40;
+ const MIN_DELAY: u64 = 1;
+ const MAX_DELAY: u64 = 5_000;
+
+ for _ in 0..ITERS {
+ let mut timer = Timer::default();
+ let handle = timer.handle();
+ let barrier = Arc::new(Barrier::new(THREADS));
+
+ let done = Arc::new(Signal {
+ rem: AtomicUsize::new(THREADS),
+ unpark: timer.get_park().unpark(),
+ });
+
+ for _ in 0..THREADS {
+ let handle = handle.clone();
+ let barrier = barrier.clone();
+ let done = done.clone();
+
+ thread::spawn(move || {
+ let mut exec = FuturesUnordered::new();
+ let mut rng = rand::thread_rng();
+
+ barrier.wait();
+
+ for _ in 0..PER_THREAD {
+ let deadline1 =
+ Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
+
+ let deadline2 =
+ Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
+
+ let deadline = cmp::min(deadline1, deadline2);
+
+ let delay = handle.delay(deadline1);
+ let join = handle.timeout(delay, deadline2);
+
+ exec.push({
+ join.and_then(move |_| {
+ let now = Instant::now();
+ assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
+ Ok(())
+ })
+ });
+ }
+
+ // Run the logic
+ exec.or_else(|e| {
+ assert!(e.is_elapsed());
+ Ok::<_, ()>(())
+ })
+ .for_each(|_| Ok(()))
+ .wait()
+ .unwrap();
+
+ if 1 == done.rem.fetch_sub(1, SeqCst) {
+ done.unpark.unpark();
+ }
+ });
+ }
+
+ while done.rem.load(SeqCst) > 0 {
+ timer.turn(None).unwrap();
+ }
+ }
+}
+
+#[test]
+fn hammer_reset() {
+ const ITERS: usize = 5;
+ const THREADS: usize = 4;
+ const PER_THREAD: usize = 40;
+ const MIN_DELAY: u64 = 1;
+ const MAX_DELAY: u64 = 250;
+
+ for _ in 0..ITERS {
+ let mut timer = Timer::default();
+ let handle = timer.handle();
+ let barrier = Arc::new(Barrier::new(THREADS));
+
+ let done = Arc::new(Signal {
+ rem: AtomicUsize::new(THREADS),
+ unpark: timer.get_park().unpark(),
+ });
+
+ for _ in 0..THREADS {
+ let handle = handle.clone();
+ let barrier = barrier.clone();
+ let done = done.clone();
+
+ thread::spawn(move || {
+ let mut exec = FuturesUnordered::new();
+ let mut rng = rand::thread_rng();
+
+ barrier.wait();
+
+ for _ in 0..PER_THREAD {
+ let deadline1 =
+ Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
+
+ let deadline2 =
+ deadline1 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
+
+ let deadline3 =
+ deadline2 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
+
+ exec.push({
+ handle
+ .delay(deadline1)
+ // Select over a second delay
+ .select2(handle.delay(deadline2))
+ .map_err(|e| panic!("boom; err={:?}", e))
+ .and_then(move |res| {
+ use futures::future::Either::*;
+
+ let now = Instant::now();
+ assert!(
+ now >= deadline1,
+ "deadline greater by {:?}",
+ deadline1 - now
+ );
+
+ let mut other = match res {
+ A((_, other)) => other,
+ B((_, other)) => other,
+ };
+
+ other.reset(deadline3);
+ other
+ })
+ .and_then(move |_| {
+ let now = Instant::now();
+ assert!(
+ now >= deadline3,
+ "deadline greater by {:?}",
+ deadline3 - now
+ );
+ Ok(())
+ })
+ });
+ }
+
+ // Run the logic
+ exec.for_each(|_| Ok(())).wait().unwrap();
+
+ if 1 == done.rem.fetch_sub(1, SeqCst) {
+ done.unpark.unpark();
+ }
+ });
+ }
+
+ while done.rem.load(SeqCst) > 0 {
+ timer.turn(None).unwrap();
+ }
+ }
+}