summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-timer/tests
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio-timer/tests
parentInitial commit. (diff)
downloadfirefox-upstream.tar.xz
firefox-upstream.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-timer/tests')
-rw-r--r--third_party/rust/tokio-timer/tests/clock.rs51
-rw-r--r--third_party/rust/tokio-timer/tests/deadline.rs106
-rw-r--r--third_party/rust/tokio-timer/tests/delay.rs499
-rw-r--r--third_party/rust/tokio-timer/tests/hammer.rs241
-rw-r--r--third_party/rust/tokio-timer/tests/interval.rs46
-rw-r--r--third_party/rust/tokio-timer/tests/queue.rs406
-rw-r--r--third_party/rust/tokio-timer/tests/support/mod.rs261
-rw-r--r--third_party/rust/tokio-timer/tests/throttle.rs51
-rw-r--r--third_party/rust/tokio-timer/tests/timeout.rs179
9 files changed, 1840 insertions, 0 deletions
diff --git a/third_party/rust/tokio-timer/tests/clock.rs b/third_party/rust/tokio-timer/tests/clock.rs
new file mode 100644
index 0000000000..d8bcaaebd7
--- /dev/null
+++ b/third_party/rust/tokio-timer/tests/clock.rs
@@ -0,0 +1,51 @@
+extern crate tokio_executor;
+extern crate tokio_timer;
+
+use tokio_timer::clock;
+use tokio_timer::clock::*;
+
+use std::time::Instant;
+
+struct ConstNow(Instant);
+
+impl Now for ConstNow {
+ fn now(&self) -> Instant {
+ self.0
+ }
+}
+
+#[test]
+fn default_clock() {
+ let a = Instant::now();
+ let b = clock::now();
+ let c = Clock::new().now();
+
+ assert!(a <= b);
+ assert!(b <= c);
+}
+
+#[test]
+fn custom_clock() {
+ let now = ConstNow(Instant::now());
+ let clock = Clock::new_with_now(now);
+
+ let a = Instant::now();
+ let b = clock.now();
+
+ assert!(b <= a);
+}
+
+#[test]
+fn execution_context() {
+ let now = ConstNow(Instant::now());
+ let clock = Clock::new_with_now(now);
+
+ let mut enter = tokio_executor::enter().unwrap();
+
+ with_default(&clock, &mut enter, |_| {
+ let a = Instant::now();
+ let b = clock::now();
+
+ assert!(b <= a);
+ });
+}
diff --git a/third_party/rust/tokio-timer/tests/deadline.rs b/third_party/rust/tokio-timer/tests/deadline.rs
new file mode 100644
index 0000000000..8eec79ae7f
--- /dev/null
+++ b/third_party/rust/tokio-timer/tests/deadline.rs
@@ -0,0 +1,106 @@
+#![allow(deprecated)]
+extern crate futures;
+extern crate tokio_executor;
+extern crate tokio_timer;
+
+#[macro_use]
+mod support;
+use support::*;
+
+use tokio_timer::*;
+
+use futures::sync::oneshot;
+use futures::{future, Future};
+
+#[test]
+fn simultaneous_deadline_future_completion() {
+ mocked(|_, time| {
+ // Create a future that is immediately ready
+ let fut = future::ok::<_, ()>(());
+
+ // Wrap it with a deadline
+ let mut fut = Deadline::new(fut, time.now());
+
+ // Ready!
+ assert_ready!(fut);
+ });
+}
+
+#[test]
+fn completed_future_past_deadline() {
+ mocked(|_, time| {
+ // Create a future that is immediately ready
+ let fut = future::ok::<_, ()>(());
+
+ // Wrap it with a deadline
+ let mut fut = Deadline::new(fut, time.now() - ms(1000));
+
+ // Ready!
+ assert_ready!(fut);
+ });
+}
+
+#[test]
+fn future_and_deadline_in_future() {
+ mocked(|timer, time| {
+ // Not yet complete
+ let (tx, rx) = oneshot::channel();
+
+ // Wrap it with a deadline
+ let mut fut = Deadline::new(rx, time.now() + ms(100));
+
+ // Ready!
+ assert_not_ready!(fut);
+
+ // Turn the timer, it runs for the elapsed time
+ advance(timer, ms(90));
+
+ assert_not_ready!(fut);
+
+ // Complete the future
+ tx.send(()).unwrap();
+
+ assert_ready!(fut);
+ });
+}
+
+#[test]
+fn deadline_now_elapses() {
+ mocked(|_, time| {
+ let fut = future::empty::<(), ()>();
+
+ // Wrap it with a deadline
+ let mut fut = Deadline::new(fut, time.now());
+
+ assert_elapsed!(fut);
+ });
+}
+
+#[test]
+fn deadline_future_elapses() {
+ mocked(|timer, time| {
+ let fut = future::empty::<(), ()>();
+
+ // Wrap it with a deadline
+ let mut fut = Deadline::new(fut, time.now() + ms(300));
+
+ assert_not_ready!(fut);
+
+ advance(timer, ms(300));
+
+ assert_elapsed!(fut);
+ });
+}
+
+#[test]
+fn future_errors_first() {
+ mocked(|_, time| {
+ let fut = future::err::<(), ()>(());
+
+ // Wrap it with a deadline
+ let mut fut = Deadline::new(fut, time.now() + ms(100));
+
+ // Ready!
+ assert!(fut.poll().unwrap_err().is_inner());
+ });
+}
diff --git a/third_party/rust/tokio-timer/tests/delay.rs b/third_party/rust/tokio-timer/tests/delay.rs
new file mode 100644
index 0000000000..d21106c630
--- /dev/null
+++ b/third_party/rust/tokio-timer/tests/delay.rs
@@ -0,0 +1,499 @@
+extern crate futures;
+extern crate tokio_executor;
+extern crate tokio_timer;
+
+#[macro_use]
+mod support;
+use support::*;
+
+use tokio_timer::timer::Handle;
+use tokio_timer::*;
+
+use futures::Future;
+
+use std::time::{Duration, Instant};
+
+#[test]
+fn immediate_delay() {
+ mocked(|timer, time| {
+ // Create `Delay` that elapsed immediately.
+ let mut delay = Delay::new(time.now());
+
+ // Ready!
+ assert_ready!(delay);
+
+ // Turn the timer, it runs for the elapsed time
+ turn(timer, ms(1000));
+
+ // The time has not advanced. The `turn` completed immediately.
+ assert_eq!(time.advanced(), ms(1000));
+ });
+}
+
+#[test]
+fn delayed_delay_level_0() {
+ for &i in &[1, 10, 60] {
+ mocked(|timer, time| {
+ // Create a `Delay` that elapses in the future
+ let mut delay = Delay::new(time.now() + ms(i));
+
+ // The delay has not elapsed.
+ assert_not_ready!(delay);
+
+ turn(timer, ms(1000));
+ assert_eq!(time.advanced(), ms(i));
+
+ assert_ready!(delay);
+ });
+ }
+}
+
+#[test]
+fn sub_ms_delayed_delay() {
+ mocked(|timer, time| {
+ for _ in 0..5 {
+ let deadline = time.now() + Duration::from_millis(1) + Duration::new(0, 1);
+
+ let mut delay = Delay::new(deadline);
+
+ assert_not_ready!(delay);
+
+ turn(timer, None);
+ assert_ready!(delay);
+
+ assert!(time.now() >= deadline);
+
+ time.advance(Duration::new(0, 1));
+ }
+ });
+}
+
+#[test]
+fn delayed_delay_wrapping_level_0() {
+ mocked(|timer, time| {
+ turn(timer, ms(5));
+ assert_eq!(time.advanced(), ms(5));
+
+ let mut delay = Delay::new(time.now() + ms(60));
+
+ assert_not_ready!(delay);
+
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(64));
+ assert_not_ready!(delay);
+
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(65));
+
+ assert_ready!(delay);
+ });
+}
+
+#[test]
+fn timer_wrapping_with_higher_levels() {
+ mocked(|timer, time| {
+ // Set delay to hit level 1
+ let mut s1 = Delay::new(time.now() + ms(64));
+ assert_not_ready!(s1);
+
+ // Turn a bit
+ turn(timer, ms(5));
+
+ // Set timeout such that it will hit level 0, but wrap
+ let mut s2 = Delay::new(time.now() + ms(60));
+ assert_not_ready!(s2);
+
+ // This should result in s1 firing
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(64));
+
+ assert_ready!(s1);
+ assert_not_ready!(s2);
+
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(65));
+
+ assert_ready!(s2);
+ });
+}
+
+#[test]
+fn delay_with_deadline_in_past() {
+ mocked(|timer, time| {
+ // Create `Delay` that elapsed immediately.
+ let mut delay = Delay::new(time.now() - ms(100));
+
+ // Even though the delay expires in the past, it is not ready yet
+ // because the timer must observe it.
+ assert_ready!(delay);
+
+ // Turn the timer, it runs for the elapsed time
+ turn(timer, ms(1000));
+
+ // The time has not advanced. The `turn` completed immediately.
+ assert_eq!(time.advanced(), ms(1000));
+ });
+}
+
+#[test]
+fn delayed_delay_level_1() {
+ mocked(|timer, time| {
+ // Create a `Delay` that elapses in the future
+ let mut delay = Delay::new(time.now() + ms(234));
+
+ // The delay has not elapsed.
+ assert_not_ready!(delay);
+
+ // Turn the timer, this will wake up to cascade the timer down.
+ turn(timer, ms(1000));
+ assert_eq!(time.advanced(), ms(192));
+
+ // The delay has not elapsed.
+ assert_not_ready!(delay);
+
+ // Turn the timer again
+ turn(timer, ms(1000));
+ assert_eq!(time.advanced(), ms(234));
+
+ // The delay has elapsed.
+ assert_ready!(delay);
+ });
+
+ mocked(|timer, time| {
+ // Create a `Delay` that elapses in the future
+ let mut delay = Delay::new(time.now() + ms(234));
+
+ // The delay has not elapsed.
+ assert_not_ready!(delay);
+
+ // Turn the timer with a smaller timeout than the cascade.
+ turn(timer, ms(100));
+ assert_eq!(time.advanced(), ms(100));
+
+ assert_not_ready!(delay);
+
+ // Turn the timer, this will wake up to cascade the timer down.
+ turn(timer, ms(1000));
+ assert_eq!(time.advanced(), ms(192));
+
+ // The delay has not elapsed.
+ assert_not_ready!(delay);
+
+ // Turn the timer again
+ turn(timer, ms(1000));
+ assert_eq!(time.advanced(), ms(234));
+
+ // The delay has elapsed.
+ assert_ready!(delay);
+ });
+}
+
+#[test]
+fn creating_delay_outside_of_context() {
+ let now = Instant::now();
+
+ // This creates a delay outside of the context of a mock timer. This tests
+ // that it will still expire.
+ let mut delay = Delay::new(now + ms(500));
+
+ mocked_with_now(now, |timer, time| {
+ // This registers the delay with the timer
+ assert_not_ready!(delay);
+
+ // Wait some time... the timer is cascading
+ turn(timer, ms(1000));
+ assert_eq!(time.advanced(), ms(448));
+
+ assert_not_ready!(delay);
+
+ turn(timer, ms(1000));
+ assert_eq!(time.advanced(), ms(500));
+
+ // The delay has elapsed
+ assert_ready!(delay);
+ });
+}
+
+#[test]
+fn concurrently_set_two_timers_second_one_shorter() {
+ mocked(|timer, time| {
+ let mut delay1 = Delay::new(time.now() + ms(500));
+ let mut delay2 = Delay::new(time.now() + ms(200));
+
+ // The delay has not elapsed
+ assert_not_ready!(delay1);
+ assert_not_ready!(delay2);
+
+ // Delay until a cascade
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(192));
+
+ // Delay until the second timer.
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(200));
+
+ // The shorter delay fires
+ assert_ready!(delay2);
+ assert_not_ready!(delay1);
+
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(448));
+
+ assert_not_ready!(delay1);
+
+ // Turn again, this time the time will advance to the second delay
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(500));
+
+ assert_ready!(delay1);
+ })
+}
+
+#[test]
+fn short_delay() {
+ mocked(|timer, time| {
+ // Create a `Delay` that elapses in the future
+ let mut delay = Delay::new(time.now() + ms(1));
+
+ // The delay has not elapsed.
+ assert_not_ready!(delay);
+
+ // Turn the timer, but not enough time will go by.
+ turn(timer, None);
+
+ // The delay has elapsed.
+ assert_ready!(delay);
+
+ // The time has advanced to the point of the delay elapsing.
+ assert_eq!(time.advanced(), ms(1));
+ })
+}
+
+#[test]
+fn sorta_long_delay() {
+ const MIN_5: u64 = 5 * 60 * 1000;
+
+ mocked(|timer, time| {
+ // Create a `Delay` that elapses in the future
+ let mut delay = Delay::new(time.now() + ms(MIN_5));
+
+ // The delay has not elapsed.
+ assert_not_ready!(delay);
+
+ let cascades = &[262_144, 262_144 + 9 * 4096, 262_144 + 9 * 4096 + 15 * 64];
+
+ for &elapsed in cascades {
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(elapsed));
+
+ assert_not_ready!(delay);
+ }
+
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(MIN_5));
+
+ // The delay has elapsed.
+ assert_ready!(delay);
+ })
+}
+
+#[test]
+fn very_long_delay() {
+ const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000;
+
+ mocked(|timer, time| {
+ // Create a `Delay` that elapses in the future
+ let mut delay = Delay::new(time.now() + ms(MO_5));
+
+ // The delay has not elapsed.
+ assert_not_ready!(delay);
+
+ let cascades = &[
+ 12_884_901_888,
+ 12_952_010_752,
+ 12_959_875_072,
+ 12_959_997_952,
+ ];
+
+ for &elapsed in cascades {
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(elapsed));
+
+ assert_not_ready!(delay);
+ }
+
+ // Turn the timer, but not enough time will go by.
+ turn(timer, None);
+
+ // The time has advanced to the point of the delay elapsing.
+ assert_eq!(time.advanced(), ms(MO_5));
+
+ // The delay has elapsed.
+ assert_ready!(delay);
+ })
+}
+
+#[test]
+fn greater_than_max() {
+ const YR_5: u64 = 5 * 365 * 24 * 60 * 60 * 1000;
+
+ mocked(|timer, time| {
+ // Create a `Delay` that elapses in the future
+ let mut delay = Delay::new(time.now() + ms(YR_5));
+
+ assert_not_ready!(delay);
+
+ turn(timer, ms(0));
+
+ assert!(delay.poll().is_err());
+ })
+}
+
+#[test]
+fn unpark_is_delayed() {
+ mocked(|timer, time| {
+ let mut delay1 = Delay::new(time.now() + ms(100));
+ let mut delay2 = Delay::new(time.now() + ms(101));
+ let mut delay3 = Delay::new(time.now() + ms(200));
+
+ assert_not_ready!(delay1);
+ assert_not_ready!(delay2);
+ assert_not_ready!(delay3);
+
+ time.park_for(ms(500));
+
+ turn(timer, None);
+
+ assert_eq!(time.advanced(), ms(500));
+
+ assert_ready!(delay1);
+ assert_ready!(delay2);
+ assert_ready!(delay3);
+ })
+}
+
+#[test]
+fn set_timeout_at_deadline_greater_than_max_timer() {
+ const YR_1: u64 = 365 * 24 * 60 * 60 * 1000;
+ const YR_5: u64 = 5 * YR_1;
+
+ mocked(|timer, time| {
+ for _ in 0..5 {
+ turn(timer, ms(YR_1));
+ }
+
+ let mut delay = Delay::new(time.now() + ms(1));
+ assert_not_ready!(delay);
+
+ turn(timer, ms(1000));
+ assert_eq!(time.advanced(), Duration::from_millis(YR_5) + ms(1));
+
+ assert_ready!(delay);
+ });
+}
+
+#[test]
+fn reset_future_delay_before_fire() {
+ mocked(|timer, time| {
+ let mut delay = Delay::new(time.now() + ms(100));
+
+ assert_not_ready!(delay);
+
+ delay.reset(time.now() + ms(200));
+
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(192));
+
+ assert_not_ready!(delay);
+
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(200));
+
+ assert_ready!(delay);
+ });
+}
+
+#[test]
+fn reset_past_delay_before_turn() {
+ mocked(|timer, time| {
+ let mut delay = Delay::new(time.now() + ms(100));
+
+ assert_not_ready!(delay);
+
+ delay.reset(time.now() + ms(80));
+
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(64));
+
+ assert_not_ready!(delay);
+
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(80));
+
+ assert_ready!(delay);
+ });
+}
+
+#[test]
+fn reset_past_delay_before_fire() {
+ mocked(|timer, time| {
+ let mut delay = Delay::new(time.now() + ms(100));
+
+ assert_not_ready!(delay);
+ turn(timer, ms(10));
+
+ assert_not_ready!(delay);
+ delay.reset(time.now() + ms(80));
+
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(64));
+
+ assert_not_ready!(delay);
+
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(90));
+
+ assert_ready!(delay);
+ });
+}
+
+#[test]
+fn reset_future_delay_after_fire() {
+ mocked(|timer, time| {
+ let mut delay = Delay::new(time.now() + ms(100));
+
+ assert_not_ready!(delay);
+
+ turn(timer, ms(1000));
+ assert_eq!(time.advanced(), ms(64));
+
+ turn(timer, None);
+ assert_eq!(time.advanced(), ms(100));
+
+ assert_ready!(delay);
+
+ delay.reset(time.now() + ms(10));
+ assert_not_ready!(delay);
+
+ turn(timer, ms(1000));
+ assert_eq!(time.advanced(), ms(110));
+
+ assert_ready!(delay);
+ });
+}
+
+#[test]
+fn delay_with_default_handle() {
+ let handle = Handle::default();
+ let now = Instant::now();
+
+ let mut delay = handle.delay(now + ms(1));
+
+ mocked_with_now(now, |timer, _time| {
+ assert_not_ready!(delay);
+
+ turn(timer, ms(1));
+
+ assert_ready!(delay);
+ });
+}
diff --git a/third_party/rust/tokio-timer/tests/hammer.rs b/third_party/rust/tokio-timer/tests/hammer.rs
new file mode 100644
index 0000000000..9986a9e659
--- /dev/null
+++ b/third_party/rust/tokio-timer/tests/hammer.rs
@@ -0,0 +1,241 @@
+extern crate futures;
+extern crate rand;
+extern crate tokio_executor;
+extern crate tokio_timer;
+
+use tokio_executor::park::{Park, Unpark, UnparkThread};
+use tokio_timer::*;
+
+use futures::stream::FuturesUnordered;
+use futures::{Future, Stream};
+use rand::Rng;
+
+use std::cmp;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+use std::sync::{Arc, Barrier};
+use std::thread;
+use std::time::{Duration, Instant};
+
+struct Signal {
+ rem: AtomicUsize,
+ unpark: UnparkThread,
+}
+
+#[test]
+fn hammer_complete() {
+ const ITERS: usize = 5;
+ const THREADS: usize = 4;
+ const PER_THREAD: usize = 40;
+ const MIN_DELAY: u64 = 1;
+ const MAX_DELAY: u64 = 5_000;
+
+ for _ in 0..ITERS {
+ let mut timer = Timer::default();
+ let handle = timer.handle();
+ let barrier = Arc::new(Barrier::new(THREADS));
+
+ let done = Arc::new(Signal {
+ rem: AtomicUsize::new(THREADS),
+ unpark: timer.get_park().unpark(),
+ });
+
+ for _ in 0..THREADS {
+ let handle = handle.clone();
+ let barrier = barrier.clone();
+ let done = done.clone();
+
+ thread::spawn(move || {
+ let mut exec = FuturesUnordered::new();
+ let mut rng = rand::thread_rng();
+
+ barrier.wait();
+
+ for _ in 0..PER_THREAD {
+ let deadline =
+ Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
+
+ exec.push({
+ handle.delay(deadline).and_then(move |_| {
+ let now = Instant::now();
+ assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
+ Ok(())
+ })
+ });
+ }
+
+ // Run the logic
+ exec.for_each(|_| Ok(())).wait().unwrap();
+
+ if 1 == done.rem.fetch_sub(1, SeqCst) {
+ done.unpark.unpark();
+ }
+ });
+ }
+
+ while done.rem.load(SeqCst) > 0 {
+ timer.turn(None).unwrap();
+ }
+ }
+}
+
+#[test]
+fn hammer_cancel() {
+ const ITERS: usize = 5;
+ const THREADS: usize = 4;
+ const PER_THREAD: usize = 40;
+ const MIN_DELAY: u64 = 1;
+ const MAX_DELAY: u64 = 5_000;
+
+ for _ in 0..ITERS {
+ let mut timer = Timer::default();
+ let handle = timer.handle();
+ let barrier = Arc::new(Barrier::new(THREADS));
+
+ let done = Arc::new(Signal {
+ rem: AtomicUsize::new(THREADS),
+ unpark: timer.get_park().unpark(),
+ });
+
+ for _ in 0..THREADS {
+ let handle = handle.clone();
+ let barrier = barrier.clone();
+ let done = done.clone();
+
+ thread::spawn(move || {
+ let mut exec = FuturesUnordered::new();
+ let mut rng = rand::thread_rng();
+
+ barrier.wait();
+
+ for _ in 0..PER_THREAD {
+ let deadline1 =
+ Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
+
+ let deadline2 =
+ Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
+
+ let deadline = cmp::min(deadline1, deadline2);
+
+ let delay = handle.delay(deadline1);
+ let join = handle.timeout(delay, deadline2);
+
+ exec.push({
+ join.and_then(move |_| {
+ let now = Instant::now();
+ assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
+ Ok(())
+ })
+ });
+ }
+
+ // Run the logic
+ exec.or_else(|e| {
+ assert!(e.is_elapsed());
+ Ok::<_, ()>(())
+ })
+ .for_each(|_| Ok(()))
+ .wait()
+ .unwrap();
+
+ if 1 == done.rem.fetch_sub(1, SeqCst) {
+ done.unpark.unpark();
+ }
+ });
+ }
+
+ while done.rem.load(SeqCst) > 0 {
+ timer.turn(None).unwrap();
+ }
+ }
+}
+
+#[test]
+fn hammer_reset() {
+ const ITERS: usize = 5;
+ const THREADS: usize = 4;
+ const PER_THREAD: usize = 40;
+ const MIN_DELAY: u64 = 1;
+ const MAX_DELAY: u64 = 250;
+
+ for _ in 0..ITERS {
+ let mut timer = Timer::default();
+ let handle = timer.handle();
+ let barrier = Arc::new(Barrier::new(THREADS));
+
+ let done = Arc::new(Signal {
+ rem: AtomicUsize::new(THREADS),
+ unpark: timer.get_park().unpark(),
+ });
+
+ for _ in 0..THREADS {
+ let handle = handle.clone();
+ let barrier = barrier.clone();
+ let done = done.clone();
+
+ thread::spawn(move || {
+ let mut exec = FuturesUnordered::new();
+ let mut rng = rand::thread_rng();
+
+ barrier.wait();
+
+ for _ in 0..PER_THREAD {
+ let deadline1 =
+ Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
+
+ let deadline2 =
+ deadline1 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
+
+ let deadline3 =
+ deadline2 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));
+
+ exec.push({
+ handle
+ .delay(deadline1)
+ // Select over a second delay
+ .select2(handle.delay(deadline2))
+ .map_err(|e| panic!("boom; err={:?}", e))
+ .and_then(move |res| {
+ use futures::future::Either::*;
+
+ let now = Instant::now();
+ assert!(
+ now >= deadline1,
+ "deadline greater by {:?}",
+ deadline1 - now
+ );
+
+ let mut other = match res {
+ A((_, other)) => other,
+ B((_, other)) => other,
+ };
+
+ other.reset(deadline3);
+ other
+ })
+ .and_then(move |_| {
+ let now = Instant::now();
+ assert!(
+ now >= deadline3,
+ "deadline greater by {:?}",
+ deadline3 - now
+ );
+ Ok(())
+ })
+ });
+ }
+
+ // Run the logic
+ exec.for_each(|_| Ok(())).wait().unwrap();
+
+ if 1 == done.rem.fetch_sub(1, SeqCst) {
+ done.unpark.unpark();
+ }
+ });
+ }
+
+ while done.rem.load(SeqCst) > 0 {
+ timer.turn(None).unwrap();
+ }
+ }
+}
diff --git a/third_party/rust/tokio-timer/tests/interval.rs b/third_party/rust/tokio-timer/tests/interval.rs
new file mode 100644
index 0000000000..27828fc83a
--- /dev/null
+++ b/third_party/rust/tokio-timer/tests/interval.rs
@@ -0,0 +1,46 @@
+extern crate futures;
+extern crate tokio_executor;
+extern crate tokio_timer;
+
+#[macro_use]
+mod support;
+use support::*;
+
+use tokio_timer::*;
+
+use futures::Stream;
+
+#[test]
+#[should_panic]
+fn interval_zero_duration() {
+ mocked(|_, time| {
+ let _ = Interval::new(time.now(), ms(0));
+ });
+}
+
+#[test]
+fn usage() {
+ mocked(|timer, time| {
+ let start = time.now();
+ let mut int = Interval::new(start, ms(300));
+
+ assert_ready_eq!(int, Some(start));
+ assert_not_ready!(int);
+
+ advance(timer, ms(100));
+ assert_not_ready!(int);
+
+ advance(timer, ms(200));
+ assert_ready_eq!(int, Some(start + ms(300)));
+ assert_not_ready!(int);
+
+ advance(timer, ms(400));
+ assert_ready_eq!(int, Some(start + ms(600)));
+ assert_not_ready!(int);
+
+ advance(timer, ms(500));
+ assert_ready_eq!(int, Some(start + ms(900)));
+ assert_ready_eq!(int, Some(start + ms(1200)));
+ assert_not_ready!(int);
+ });
+}
diff --git a/third_party/rust/tokio-timer/tests/queue.rs b/third_party/rust/tokio-timer/tests/queue.rs
new file mode 100644
index 0000000000..eeba610414
--- /dev/null
+++ b/third_party/rust/tokio-timer/tests/queue.rs
@@ -0,0 +1,406 @@
+extern crate futures;
+extern crate tokio_executor;
+extern crate tokio_mock_task;
+extern crate tokio_timer;
+
+#[macro_use]
+mod support;
+use support::*;
+
+use tokio_mock_task::MockTask;
+use tokio_timer::*;
+
+use futures::Stream;
+
+#[test]
+fn single_immediate_delay() {
+ mocked(|_timer, time| {
+ let mut queue = DelayQueue::new();
+ let _key = queue.insert_at("foo", time.now());
+
+ let entry = assert_ready!(queue).unwrap();
+ assert_eq!(*entry.get_ref(), "foo");
+
+ let entry = assert_ready!(queue);
+ assert!(entry.is_none())
+ });
+}
+
+#[test]
+fn multi_immediate_delays() {
+ mocked(|_timer, time| {
+ let mut queue = DelayQueue::new();
+
+ let _k = queue.insert_at("1", time.now());
+ let _k = queue.insert_at("2", time.now());
+ let _k = queue.insert_at("3", time.now());
+
+ let mut res = vec![];
+
+ while res.len() < 3 {
+ let entry = assert_ready!(queue).unwrap();
+ res.push(entry.into_inner());
+ }
+
+ let entry = assert_ready!(queue);
+ assert!(entry.is_none());
+
+ res.sort();
+
+ assert_eq!("1", res[0]);
+ assert_eq!("2", res[1]);
+ assert_eq!("3", res[2]);
+ });
+}
+
+#[test]
+fn single_short_delay() {
+ mocked(|timer, time| {
+ let mut queue = DelayQueue::new();
+ let _key = queue.insert_at("foo", time.now() + ms(5));
+
+ let mut task = MockTask::new();
+
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ turn(timer, ms(1));
+
+ assert!(!task.is_notified());
+
+ turn(timer, ms(5));
+
+ assert!(task.is_notified());
+
+ let entry = assert_ready!(queue).unwrap();
+ assert_eq!(*entry.get_ref(), "foo");
+
+ let entry = assert_ready!(queue);
+ assert!(entry.is_none());
+ });
+}
+
+#[test]
+fn multi_delay_at_start() {
+ let long = 262_144 + 9 * 4096;
+ let delays = &[1000, 2, 234, long, 60, 10];
+
+ mocked(|timer, time| {
+ let mut queue = DelayQueue::new();
+ let mut task = MockTask::new();
+
+ // Setup the delays
+ for &i in delays {
+ let _key = queue.insert_at(i, time.now() + ms(i));
+ }
+
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ assert!(!task.is_notified());
+
+ for elapsed in 0..1200 {
+ turn(timer, ms(1));
+ let elapsed = elapsed + 1;
+
+ if delays.contains(&elapsed) {
+ assert!(task.is_notified());
+
+ task.enter(|| {
+ assert_ready!(queue);
+ assert_not_ready!(queue);
+ });
+ } else {
+ if task.is_notified() {
+ let cascade = &[192, 960];
+ assert!(cascade.contains(&elapsed), "elapsed={}", elapsed);
+
+ task.enter(|| {
+ assert_not_ready!(queue, "elapsed={}", elapsed);
+ });
+ }
+ }
+ }
+ });
+}
+
+#[test]
+fn insert_in_past_fires_immediately() {
+ mocked(|timer, time| {
+ let mut queue = DelayQueue::new();
+
+ let now = time.now();
+
+ turn(timer, ms(10));
+
+ queue.insert_at("foo", now);
+
+ assert_ready!(queue);
+ });
+}
+
+#[test]
+fn remove_entry() {
+ mocked(|timer, time| {
+ let mut queue = DelayQueue::new();
+ let mut task = MockTask::new();
+
+ let key = queue.insert_at("foo", time.now() + ms(5));
+
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ let entry = queue.remove(&key);
+ assert_eq!(entry.into_inner(), "foo");
+
+ turn(timer, ms(10));
+
+ task.enter(|| {
+ let entry = assert_ready!(queue);
+ assert!(entry.is_none());
+ });
+ });
+}
+
+#[test]
+fn reset_entry() {
+ mocked(|timer, time| {
+ let mut queue = DelayQueue::new();
+ let mut task = MockTask::new();
+
+ let now = time.now();
+ let key = queue.insert_at("foo", now + ms(5));
+
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ turn(timer, ms(1));
+
+ queue.reset_at(&key, now + ms(10));
+
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ turn(timer, ms(7));
+
+ assert!(!task.is_notified());
+
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ turn(timer, ms(3));
+
+ assert!(task.is_notified());
+
+ let entry = assert_ready!(queue).unwrap();
+ assert_eq!(*entry.get_ref(), "foo");
+
+ let entry = assert_ready!(queue);
+ assert!(entry.is_none())
+ });
+}
+
+#[test]
+fn reset_much_later() {
+ // Reproduces tokio-rs/tokio#849.
+ mocked(|timer, time| {
+ let mut queue = DelayQueue::new();
+ let mut task = MockTask::new();
+
+ let epoch = time.now();
+
+ turn(timer, ms(1));
+
+ let key = queue.insert_at("foo", epoch + ms(200));
+
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ turn(timer, ms(3));
+
+ queue.reset_at(&key, epoch + ms(5));
+
+ turn(timer, ms(20));
+
+ assert!(task.is_notified());
+ });
+}
+
+#[test]
+fn reset_twice() {
+ // Reproduces tokio-rs/tokio#849.
+ mocked(|timer, time| {
+ let mut queue = DelayQueue::new();
+ let mut task = MockTask::new();
+
+ let epoch = time.now();
+
+ turn(timer, ms(1));
+
+ let key = queue.insert_at("foo", epoch + ms(200));
+
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ turn(timer, ms(3));
+
+ queue.reset_at(&key, epoch + ms(50));
+
+ turn(timer, ms(20));
+
+ queue.reset_at(&key, epoch + ms(40));
+
+ turn(timer, ms(20));
+
+ assert!(task.is_notified());
+ });
+}
+
+#[test]
+fn remove_expired_item() {
+ mocked(|timer, time| {
+ let mut queue = DelayQueue::new();
+
+ let now = time.now();
+
+ turn(timer, ms(10));
+
+ let key = queue.insert_at("foo", now);
+
+ let entry = queue.remove(&key);
+ assert_eq!(entry.into_inner(), "foo");
+ })
+}
+
+#[test]
+fn expires_before_last_insert() {
+ mocked(|timer, time| {
+ let mut queue = DelayQueue::new();
+ let mut task = MockTask::new();
+
+ let epoch = time.now();
+
+ queue.insert_at("foo", epoch + ms(10_000));
+
+ // Delay should be set to 8.192s here.
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ // Delay should be set to the delay of the new item here
+ queue.insert_at("bar", epoch + ms(600));
+
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ advance(timer, ms(600));
+
+ assert!(task.is_notified());
+ let entry = assert_ready!(queue).unwrap().into_inner();
+ assert_eq!(entry, "bar");
+ })
+}
+
+#[test]
+fn multi_reset() {
+ mocked(|_, time| {
+ let mut queue = DelayQueue::new();
+ let mut task = MockTask::new();
+
+ let epoch = time.now();
+
+ let foo = queue.insert_at("foo", epoch + ms(200));
+ let bar = queue.insert_at("bar", epoch + ms(250));
+
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ queue.reset_at(&foo, epoch + ms(300));
+ queue.reset_at(&bar, epoch + ms(350));
+ queue.reset_at(&foo, epoch + ms(400));
+ })
+}
+
+#[test]
+fn expire_first_key_when_reset_to_expire_earlier() {
+ mocked(|timer, time| {
+ let mut queue = DelayQueue::new();
+ let mut task = MockTask::new();
+
+ let epoch = time.now();
+
+ let foo = queue.insert_at("foo", epoch + ms(200));
+ queue.insert_at("bar", epoch + ms(250));
+
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ queue.reset_at(&foo, epoch + ms(100));
+
+ advance(timer, ms(100));
+
+ assert!(task.is_notified());
+ let entry = assert_ready!(queue).unwrap().into_inner();
+ assert_eq!(entry, "foo");
+ })
+}
+
+#[test]
+fn expire_second_key_when_reset_to_expire_earlier() {
+ mocked(|timer, time| {
+ let mut queue = DelayQueue::new();
+ let mut task = MockTask::new();
+
+ let epoch = time.now();
+
+ queue.insert_at("foo", epoch + ms(200));
+ let bar = queue.insert_at("bar", epoch + ms(250));
+
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ queue.reset_at(&bar, epoch + ms(100));
+
+ advance(timer, ms(100));
+
+ assert!(task.is_notified());
+ let entry = assert_ready!(queue).unwrap().into_inner();
+ assert_eq!(entry, "bar");
+ })
+}
+
+#[test]
+fn reset_first_expiring_item_to_expire_later() {
+ mocked(|timer, time| {
+ let mut queue = DelayQueue::new();
+ let mut task = MockTask::new();
+
+ let epoch = time.now();
+
+ let foo = queue.insert_at("foo", epoch + ms(200));
+ let bar = queue.insert_at("bar", epoch + ms(250));
+
+ task.enter(|| {
+ assert_not_ready!(queue);
+ });
+
+ queue.reset_at(&foo, epoch + ms(300));
+ advance(timer, ms(250));
+
+ assert!(task.is_notified());
+ let entry = assert_ready!(queue).unwrap().into_inner();
+ assert_eq!(entry, "bar");
+ })
+}
diff --git a/third_party/rust/tokio-timer/tests/support/mod.rs b/third_party/rust/tokio-timer/tests/support/mod.rs
new file mode 100644
index 0000000000..244d56b819
--- /dev/null
+++ b/third_party/rust/tokio-timer/tests/support/mod.rs
@@ -0,0 +1,261 @@
+#![allow(unused_macros, unused_imports, dead_code, deprecated)]
+
+use tokio_executor::park::{Park, Unpark};
+use tokio_timer::clock::Now;
+use tokio_timer::timer::Timer;
+
+use futures::future::{lazy, Future};
+
+use std::marker::PhantomData;
+use std::rc::Rc;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+
+macro_rules! assert_ready {
+ ($f:expr) => {{
+ use ::futures::Async::*;
+
+ match $f.poll().unwrap() {
+ Ready(v) => v,
+ NotReady => panic!("NotReady"),
+ }
+ }};
+ ($f:expr, $($msg:expr),+) => {{
+ use ::futures::Async::*;
+
+ match $f.poll().unwrap() {
+ Ready(v) => v,
+ NotReady => {
+ let msg = format!($($msg),+);
+ panic!("NotReady; {}", msg)
+ }
+ }
+ }}
+}
+
+macro_rules! assert_ready_eq {
+ ($f:expr, $expect:expr) => {
+ assert_eq!($f.poll().unwrap(), ::futures::Async::Ready($expect));
+ };
+}
+
+macro_rules! assert_not_ready {
+ ($f:expr) => {{
+ let res = $f.poll().unwrap();
+ assert!(!res.is_ready(), "actual={:?}", res)
+ }};
+ ($f:expr, $($msg:expr),+) => {{
+ let res = $f.poll().unwrap();
+ if res.is_ready() {
+ let msg = format!($($msg),+);
+ panic!("actual={:?}; {}", res, msg);
+ }
+ }};
+}
+
+macro_rules! assert_elapsed {
+ ($f:expr) => {
+ assert!($f.poll().unwrap_err().is_elapsed());
+ };
+}
+
+#[derive(Debug)]
+pub struct MockTime {
+ inner: Inner,
+ _p: PhantomData<Rc<()>>,
+}
+
+#[derive(Debug)]
+pub struct MockNow {
+ inner: Inner,
+}
+
+#[derive(Debug)]
+pub struct MockPark {
+ inner: Inner,
+ _p: PhantomData<Rc<()>>,
+}
+
+#[derive(Debug)]
+pub struct MockUnpark {
+ inner: Inner,
+}
+
+type Inner = Arc<Mutex<State>>;
+
+#[derive(Debug)]
+struct State {
+ base: Instant,
+ advance: Duration,
+ unparked: bool,
+ park_for: Option<Duration>,
+}
+
+pub fn ms(num: u64) -> Duration {
+ Duration::from_millis(num)
+}
+
+pub trait IntoTimeout {
+ fn into_timeout(self) -> Option<Duration>;
+}
+
+impl IntoTimeout for Option<Duration> {
+ fn into_timeout(self) -> Self {
+ self
+ }
+}
+
+impl IntoTimeout for Duration {
+ fn into_timeout(self) -> Option<Duration> {
+ Some(self)
+ }
+}
+
+/// Turn the timer state once
+pub fn turn<T: IntoTimeout>(timer: &mut Timer<MockPark>, duration: T) {
+ timer.turn(duration.into_timeout()).unwrap();
+}
+
+/// Advance the timer the specified amount
+pub fn advance(timer: &mut Timer<MockPark>, duration: Duration) {
+ let inner = timer.get_park().inner.clone();
+ let deadline = inner.lock().unwrap().now() + duration;
+
+ while inner.lock().unwrap().now() < deadline {
+ let dur = deadline - inner.lock().unwrap().now();
+ turn(timer, dur);
+ }
+}
+
+pub fn mocked<F, R>(f: F) -> R
+where
+ F: FnOnce(&mut Timer<MockPark>, &mut MockTime) -> R,
+{
+ mocked_with_now(Instant::now(), f)
+}
+
+pub fn mocked_with_now<F, R>(now: Instant, f: F) -> R
+where
+ F: FnOnce(&mut Timer<MockPark>, &mut MockTime) -> R,
+{
+ let mut time = MockTime::new(now);
+ let park = time.mock_park();
+ let now = ::tokio_timer::clock::Clock::new_with_now(time.mock_now());
+
+ let mut enter = ::tokio_executor::enter().unwrap();
+
+ ::tokio_timer::clock::with_default(&now, &mut enter, |enter| {
+ let mut timer = Timer::new(park);
+ let handle = timer.handle();
+
+ ::tokio_timer::with_default(&handle, enter, |_| {
+ lazy(|| Ok::<_, ()>(f(&mut timer, &mut time)))
+ .wait()
+ .unwrap()
+ })
+ })
+}
+
+impl MockTime {
+ pub fn new(now: Instant) -> MockTime {
+ let state = State {
+ base: now,
+ advance: Duration::default(),
+ unparked: false,
+ park_for: None,
+ };
+
+ MockTime {
+ inner: Arc::new(Mutex::new(state)),
+ _p: PhantomData,
+ }
+ }
+
+ pub fn mock_now(&self) -> MockNow {
+ let inner = self.inner.clone();
+ MockNow { inner }
+ }
+
+ pub fn mock_park(&self) -> MockPark {
+ let inner = self.inner.clone();
+ MockPark {
+ inner,
+ _p: PhantomData,
+ }
+ }
+
+ pub fn now(&self) -> Instant {
+ self.inner.lock().unwrap().now()
+ }
+
+ /// Returns the total amount of time the time has been advanced.
+ pub fn advanced(&self) -> Duration {
+ self.inner.lock().unwrap().advance
+ }
+
+ pub fn advance(&self, duration: Duration) {
+ let mut inner = self.inner.lock().unwrap();
+ inner.advance(duration);
+ }
+
+ /// The next call to park_timeout will be for this duration, regardless of
+ /// the timeout passed to `park_timeout`.
+ pub fn park_for(&self, duration: Duration) {
+ self.inner.lock().unwrap().park_for = Some(duration);
+ }
+}
+
+impl Park for MockPark {
+ type Unpark = MockUnpark;
+ type Error = ();
+
+ fn unpark(&self) -> Self::Unpark {
+ let inner = self.inner.clone();
+ MockUnpark { inner }
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ let mut inner = self.inner.lock().map_err(|_| ())?;
+
+ let duration = inner.park_for.take().expect("call park_for first");
+
+ inner.advance(duration);
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ let mut inner = self.inner.lock().unwrap();
+
+ if let Some(duration) = inner.park_for.take() {
+ inner.advance(duration);
+ } else {
+ inner.advance(duration);
+ }
+
+ Ok(())
+ }
+}
+
+impl Unpark for MockUnpark {
+ fn unpark(&self) {
+ if let Ok(mut inner) = self.inner.lock() {
+ inner.unparked = true;
+ }
+ }
+}
+
+impl Now for MockNow {
+ fn now(&self) -> Instant {
+ self.inner.lock().unwrap().now()
+ }
+}
+
+impl State {
+ fn now(&self) -> Instant {
+ self.base + self.advance
+ }
+
+ fn advance(&mut self, duration: Duration) {
+ self.advance += duration;
+ }
+}
diff --git a/third_party/rust/tokio-timer/tests/throttle.rs b/third_party/rust/tokio-timer/tests/throttle.rs
new file mode 100644
index 0000000000..253309c988
--- /dev/null
+++ b/third_party/rust/tokio-timer/tests/throttle.rs
@@ -0,0 +1,51 @@
+extern crate futures;
+extern crate tokio_executor;
+extern crate tokio_timer;
+
+#[macro_use]
+mod support;
+use support::*;
+
+use futures::{prelude::*, sync::mpsc};
+use tokio_timer::throttle::Throttle;
+
+#[test]
+fn throttle() {
+ mocked(|timer, _| {
+ let (tx, rx) = mpsc::unbounded();
+ let mut stream = Throttle::new(rx, ms(1));
+
+ assert_not_ready!(stream);
+
+ for i in 0..3 {
+ tx.unbounded_send(i).unwrap();
+ }
+ for i in 0..3 {
+ assert_ready_eq!(stream, Some(i));
+ assert_not_ready!(stream);
+
+ advance(timer, ms(1));
+ }
+
+ assert_not_ready!(stream);
+ });
+}
+
+#[test]
+fn throttle_dur_0() {
+ mocked(|_, _| {
+ let (tx, rx) = mpsc::unbounded();
+ let mut stream = Throttle::new(rx, ms(0));
+
+ assert_not_ready!(stream);
+
+ for i in 0..3 {
+ tx.unbounded_send(i).unwrap();
+ }
+ for i in 0..3 {
+ assert_ready_eq!(stream, Some(i));
+ }
+
+ assert_not_ready!(stream);
+ });
+}
diff --git a/third_party/rust/tokio-timer/tests/timeout.rs b/third_party/rust/tokio-timer/tests/timeout.rs
new file mode 100644
index 0000000000..8e10776ae7
--- /dev/null
+++ b/third_party/rust/tokio-timer/tests/timeout.rs
@@ -0,0 +1,179 @@
+extern crate futures;
+extern crate tokio_executor;
+extern crate tokio_timer;
+
+#[macro_use]
+mod support;
+use support::*;
+
+use tokio_timer::*;
+
+use futures::sync::{mpsc, oneshot};
+use futures::{future, Future, Stream};
+
+#[test]
+fn simultaneous_deadline_future_completion() {
+ mocked(|_, time| {
+ // Create a future that is immediately ready
+ let fut = future::ok::<_, ()>(());
+
+ // Wrap it with a deadline
+ let mut fut = Timeout::new_at(fut, time.now());
+
+ // Ready!
+ assert_ready!(fut);
+ });
+}
+
+#[test]
+fn completed_future_past_deadline() {
+ mocked(|_, time| {
+ // Create a future that is immediately ready
+ let fut = future::ok::<_, ()>(());
+
+ // Wrap it with a deadline
+ let mut fut = Timeout::new_at(fut, time.now() - ms(1000));
+
+ // Ready!
+ assert_ready!(fut);
+ });
+}
+
+#[test]
+fn future_and_deadline_in_future() {
+ mocked(|timer, time| {
+ // Not yet complete
+ let (tx, rx) = oneshot::channel();
+
+ // Wrap it with a deadline
+ let mut fut = Timeout::new_at(rx, time.now() + ms(100));
+
+ // Ready!
+ assert_not_ready!(fut);
+
+ // Turn the timer, it runs for the elapsed time
+ advance(timer, ms(90));
+
+ assert_not_ready!(fut);
+
+ // Complete the future
+ tx.send(()).unwrap();
+
+ assert_ready!(fut);
+ });
+}
+
+#[test]
+fn future_and_timeout_in_future() {
+ mocked(|timer, _time| {
+ // Not yet complete
+ let (tx, rx) = oneshot::channel();
+
+ // Wrap it with a deadline
+ let mut fut = Timeout::new(rx, ms(100));
+
+ // Ready!
+ assert_not_ready!(fut);
+
+ // Turn the timer, it runs for the elapsed time
+ advance(timer, ms(90));
+
+ assert_not_ready!(fut);
+
+ // Complete the future
+ tx.send(()).unwrap();
+
+ assert_ready!(fut);
+ });
+}
+
+#[test]
+fn deadline_now_elapses() {
+ mocked(|_, time| {
+ let fut = future::empty::<(), ()>();
+
+ // Wrap it with a deadline
+ let mut fut = Timeout::new_at(fut, time.now());
+
+ assert_elapsed!(fut);
+ });
+}
+
+#[test]
+fn deadline_future_elapses() {
+ mocked(|timer, time| {
+ let fut = future::empty::<(), ()>();
+
+ // Wrap it with a deadline
+ let mut fut = Timeout::new_at(fut, time.now() + ms(300));
+
+ assert_not_ready!(fut);
+
+ advance(timer, ms(300));
+
+ assert_elapsed!(fut);
+ });
+}
+
+#[test]
+fn future_errors_first() {
+ mocked(|_, time| {
+ let fut = future::err::<(), ()>(());
+
+ // Wrap it with a deadline
+ let mut fut = Timeout::new_at(fut, time.now() + ms(100));
+
+ // Ready!
+ assert!(fut.poll().unwrap_err().is_inner());
+ });
+}
+
+#[test]
+fn stream_and_timeout_in_future() {
+ mocked(|timer, _time| {
+ // Not yet complete
+ let (tx, rx) = mpsc::unbounded();
+
+ // Wrap it with a deadline
+ let mut stream = Timeout::new(rx, ms(100));
+
+ // Not ready
+ assert_not_ready!(stream);
+
+ // Turn the timer, it runs for the elapsed time
+ advance(timer, ms(90));
+
+ assert_not_ready!(stream);
+
+ // Complete the future
+ tx.unbounded_send(()).unwrap();
+
+ let item = assert_ready!(stream);
+ assert!(item.is_some());
+ });
+}
+
+#[test]
+fn idle_stream_timesout_periodically() {
+ mocked(|timer, _time| {
+ // Not yet complete
+ let (_tx, rx) = mpsc::unbounded::<()>();
+
+ // Wrap it with a deadline
+ let mut stream = Timeout::new(rx, ms(100));
+
+ // Not ready
+ assert_not_ready!(stream);
+
+ // Turn the timer, it runs for the elapsed time
+ advance(timer, ms(100));
+
+ assert_elapsed!(stream);
+ // Stream's timeout should reset
+ assert_not_ready!(stream);
+
+ // Turn the timer, it runs for the elapsed time
+ advance(timer, ms(100));
+ assert_elapsed!(stream);
+ });
+}