diff options
Diffstat (limited to 'third_party/rust/tokio-util/tests/time_delay_queue.rs')
-rw-r--r-- | third_party/rust/tokio-util/tests/time_delay_queue.rs | 818 |
1 files changed, 818 insertions, 0 deletions
diff --git a/third_party/rust/tokio-util/tests/time_delay_queue.rs b/third_party/rust/tokio-util/tests/time_delay_queue.rs new file mode 100644 index 0000000000..cb163adf3a --- /dev/null +++ b/third_party/rust/tokio-util/tests/time_delay_queue.rs @@ -0,0 +1,818 @@ +#![allow(clippy::blacklisted_name)] +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::time::{self, sleep, sleep_until, Duration, Instant}; +use tokio_test::{assert_pending, assert_ready, task}; +use tokio_util::time::DelayQueue; + +macro_rules! poll { + ($queue:ident) => { + $queue.enter(|cx, mut queue| queue.poll_expired(cx)) + }; +} + +macro_rules! assert_ready_some { + ($e:expr) => {{ + match assert_ready!($e) { + Some(v) => v, + None => panic!("None"), + } + }}; +} + +#[tokio::test] +async fn single_immediate_delay() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + let _key = queue.insert_at("foo", Instant::now()); + + // Advance time by 1ms to handle thee rounding + sleep(ms(1)).await; + + assert_ready_some!(poll!(queue)); + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()) +} + +#[tokio::test] +async fn multi_immediate_delays() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let _k = queue.insert_at("1", Instant::now()); + let _k = queue.insert_at("2", Instant::now()); + let _k = queue.insert_at("3", Instant::now()); + + sleep(ms(1)).await; + + let mut res = vec![]; + + while res.len() < 3 { + let entry = assert_ready_some!(poll!(queue)); + res.push(entry.into_inner()); + } + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()); + + res.sort_unstable(); + + assert_eq!("1", res[0]); + assert_eq!("2", res[1]); + assert_eq!("3", res[2]); +} + +#[tokio::test] +async fn single_short_delay() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + let _key = queue.insert_at("foo", Instant::now() + ms(5)); + + assert_pending!(poll!(queue)); + + sleep(ms(1)).await; + + assert!(!queue.is_woken()); + + sleep(ms(5)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_some!(poll!(queue)); + assert_eq!(*entry.get_ref(), "foo"); + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()); +} + +#[tokio::test] +async fn multi_delay_at_start() { + time::pause(); + + let long = 262_144 + 9 * 4096; + let delays = &[1000, 2, 234, long, 60, 10]; + + let mut queue = task::spawn(DelayQueue::new()); + + // Setup the delays + for &i in delays { + let _key = queue.insert_at(i, Instant::now() + ms(i)); + } + + assert_pending!(poll!(queue)); + assert!(!queue.is_woken()); + + let start = Instant::now(); + for elapsed in 0..1200 { + println!("elapsed: {:?}", elapsed); + let elapsed = elapsed + 1; + tokio::time::sleep_until(start + ms(elapsed)).await; + + if delays.contains(&elapsed) { + assert!(queue.is_woken()); + assert_ready!(poll!(queue)); + assert_pending!(poll!(queue)); + } else if queue.is_woken() { + let cascade = &[192, 960]; + assert!( + cascade.contains(&elapsed), + "elapsed={} dt={:?}", + elapsed, + Instant::now() - start + ); + + assert_pending!(poll!(queue)); + } + } + println!("finished multi_delay_start"); +} + +#[tokio::test] +async fn insert_in_past_fires_immediately() { + println!("running insert_in_past_fires_immediately"); + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + let now = Instant::now(); + + sleep(ms(10)).await; + + queue.insert_at("foo", now); + + assert_ready!(poll!(queue)); + println!("finished insert_in_past_fires_immediately"); +} + +#[tokio::test] +async fn remove_entry() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let key = queue.insert_at("foo", Instant::now() + ms(5)); + + assert_pending!(poll!(queue)); + + let entry = queue.remove(&key); + assert_eq!(entry.into_inner(), "foo"); + + sleep(ms(10)).await; + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()); +} + +#[tokio::test] +async fn reset_entry() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + let key = queue.insert_at("foo", now + ms(5)); + + assert_pending!(poll!(queue)); + sleep(ms(1)).await; + + queue.reset_at(&key, now + ms(10)); + + assert_pending!(poll!(queue)); + + sleep(ms(7)).await; + + assert!(!queue.is_woken()); + + assert_pending!(poll!(queue)); + + sleep(ms(3)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_some!(poll!(queue)); + assert_eq!(*entry.get_ref(), "foo"); + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()) +} + +// Reproduces tokio-rs/tokio#849. +#[tokio::test] +async fn reset_much_later() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + sleep(ms(1)).await; + + let key = queue.insert_at("foo", now + ms(200)); + assert_pending!(poll!(queue)); + + sleep(ms(3)).await; + + queue.reset_at(&key, now + ms(10)); + + sleep(ms(20)).await; + + assert!(queue.is_woken()); +} + +// Reproduces tokio-rs/tokio#849. +#[tokio::test] +async fn reset_twice() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + let now = Instant::now(); + + sleep(ms(1)).await; + + let key = queue.insert_at("foo", now + ms(200)); + + assert_pending!(poll!(queue)); + + sleep(ms(3)).await; + + queue.reset_at(&key, now + ms(50)); + + sleep(ms(20)).await; + + queue.reset_at(&key, now + ms(40)); + + sleep(ms(20)).await; + + assert!(queue.is_woken()); +} + +/// Regression test: Given an entry inserted with a deadline in the past, so +/// that it is placed directly on the expired queue, reset the entry to a +/// deadline in the future. Validate that this leaves the entry and queue in an +/// internally consistent state by running an additional reset on the entry +/// before polling it to completion. +#[tokio::test] +async fn repeatedly_reset_entry_inserted_as_expired() { + time::pause(); + let mut queue = task::spawn(DelayQueue::new()); + let now = Instant::now(); + + let key = queue.insert_at("foo", now - ms(100)); + + queue.reset_at(&key, now + ms(100)); + queue.reset_at(&key, now + ms(50)); + + assert_pending!(poll!(queue)); + + time::sleep_until(now + ms(60)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_some!(poll!(queue)).into_inner(); + assert_eq!(entry, "foo"); + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()); +} + +#[tokio::test] +async fn remove_expired_item() { + time::pause(); + + let mut queue = DelayQueue::new(); + + let now = Instant::now(); + + sleep(ms(10)).await; + + let key = queue.insert_at("foo", now); + + let entry = queue.remove(&key); + assert_eq!(entry.into_inner(), "foo"); +} + +/// Regression test: it should be possible to remove entries which fall in the +/// 0th slot of the internal timer wheel — that is, entries whose expiration +/// (a) falls at the beginning of one of the wheel's hierarchical levels and (b) +/// is equal to the wheel's current elapsed time. +#[tokio::test] +async fn remove_at_timer_wheel_threshold() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let key1 = queue.insert_at("foo", now + ms(64)); + let key2 = queue.insert_at("bar", now + ms(64)); + + sleep(ms(80)).await; + + let entry = assert_ready_some!(poll!(queue)).into_inner(); + + match entry { + "foo" => { + let entry = queue.remove(&key2).into_inner(); + assert_eq!(entry, "bar"); + } + "bar" => { + let entry = queue.remove(&key1).into_inner(); + assert_eq!(entry, "foo"); + } + other => panic!("other: {:?}", other), + } +} + +#[tokio::test] +async fn expires_before_last_insert() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + queue.insert_at("foo", now + ms(10_000)); + + // Delay should be set to 8.192s here. + assert_pending!(poll!(queue)); + + // Delay should be set to the delay of the new item here + queue.insert_at("bar", now + ms(600)); + + assert_pending!(poll!(queue)); + + sleep(ms(600)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_some!(poll!(queue)).into_inner(); + assert_eq!(entry, "bar"); +} + +#[tokio::test] +async fn multi_reset() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let one = queue.insert_at("one", now + ms(200)); + let two = queue.insert_at("two", now + ms(250)); + + assert_pending!(poll!(queue)); + + queue.reset_at(&one, now + ms(300)); + queue.reset_at(&two, now + ms(350)); + queue.reset_at(&one, now + ms(400)); + + sleep(ms(310)).await; + + assert_pending!(poll!(queue)); + + sleep(ms(50)).await; + + let entry = assert_ready_some!(poll!(queue)); + assert_eq!(*entry.get_ref(), "two"); + + assert_pending!(poll!(queue)); + + sleep(ms(50)).await; + + let entry = assert_ready_some!(poll!(queue)); + assert_eq!(*entry.get_ref(), "one"); + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()) +} + +#[tokio::test] +async fn expire_first_key_when_reset_to_expire_earlier() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let one = queue.insert_at("one", now + ms(200)); + queue.insert_at("two", now + ms(250)); + + assert_pending!(poll!(queue)); + + queue.reset_at(&one, now + ms(100)); + + sleep(ms(100)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_some!(poll!(queue)).into_inner(); + assert_eq!(entry, "one"); +} + +#[tokio::test] +async fn expire_second_key_when_reset_to_expire_earlier() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + queue.insert_at("one", now + ms(200)); + let two = queue.insert_at("two", now + ms(250)); + + assert_pending!(poll!(queue)); + + queue.reset_at(&two, now + ms(100)); + + sleep(ms(100)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_some!(poll!(queue)).into_inner(); + assert_eq!(entry, "two"); +} + +#[tokio::test] +async fn reset_first_expiring_item_to_expire_later() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let one = queue.insert_at("one", now + ms(200)); + let _two = queue.insert_at("two", now + ms(250)); + + assert_pending!(poll!(queue)); + + queue.reset_at(&one, now + ms(300)); + sleep(ms(250)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_some!(poll!(queue)).into_inner(); + assert_eq!(entry, "two"); +} + +#[tokio::test] +async fn insert_before_first_after_poll() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let _one = queue.insert_at("one", now + ms(200)); + + assert_pending!(poll!(queue)); + + let _two = queue.insert_at("two", now + ms(100)); + + sleep(ms(99)).await; + + assert_pending!(poll!(queue)); + + sleep(ms(1)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_some!(poll!(queue)).into_inner(); + assert_eq!(entry, "two"); +} + +#[tokio::test] +async fn insert_after_ready_poll() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + queue.insert_at("1", now + ms(100)); + queue.insert_at("2", now + ms(100)); + queue.insert_at("3", now + ms(100)); + + assert_pending!(poll!(queue)); + + sleep(ms(100)).await; + + assert!(queue.is_woken()); + + let mut res = vec![]; + + while res.len() < 3 { + let entry = assert_ready_some!(poll!(queue)); + res.push(entry.into_inner()); + queue.insert_at("foo", now + ms(500)); + } + + res.sort_unstable(); + + assert_eq!("1", res[0]); + assert_eq!("2", res[1]); + assert_eq!("3", res[2]); +} + +#[tokio::test] +async fn reset_later_after_slot_starts() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let foo = queue.insert_at("foo", now + ms(100)); + + assert_pending!(poll!(queue)); + + sleep_until(now + Duration::from_millis(80)).await; + + assert!(!queue.is_woken()); + + // At this point the queue hasn't been polled, so `elapsed` on the wheel + // for the queue is still at 0 and hence the 1ms resolution slots cover + // [0-64). Resetting the time on the entry to 120 causes it to get put in + // the [64-128) slot. As the queue knows that the first entry is within + // that slot, but doesn't know when, it must wake immediately to advance + // the wheel. + queue.reset_at(&foo, now + ms(120)); + assert!(queue.is_woken()); + + assert_pending!(poll!(queue)); + + sleep_until(now + Duration::from_millis(119)).await; + assert!(!queue.is_woken()); + + sleep(ms(1)).await; + assert!(queue.is_woken()); + + let entry = assert_ready_some!(poll!(queue)).into_inner(); + assert_eq!(entry, "foo"); +} + +#[tokio::test] +async fn reset_inserted_expired() { + time::pause(); + let mut queue = task::spawn(DelayQueue::new()); + let now = Instant::now(); + + let key = queue.insert_at("foo", now - ms(100)); + + // this causes the panic described in #2473 + queue.reset_at(&key, now + ms(100)); + + assert_eq!(1, queue.len()); + + sleep(ms(200)).await; + + let entry = assert_ready_some!(poll!(queue)).into_inner(); + assert_eq!(entry, "foo"); + + assert_eq!(queue.len(), 0); +} + +#[tokio::test] +async fn reset_earlier_after_slot_starts() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let foo = queue.insert_at("foo", now + ms(200)); + + assert_pending!(poll!(queue)); + + sleep_until(now + Duration::from_millis(80)).await; + + assert!(!queue.is_woken()); + + // At this point the queue hasn't been polled, so `elapsed` on the wheel + // for the queue is still at 0 and hence the 1ms resolution slots cover + // [0-64). Resetting the time on the entry to 120 causes it to get put in + // the [64-128) slot. As the queue knows that the first entry is within + // that slot, but doesn't know when, it must wake immediately to advance + // the wheel. + queue.reset_at(&foo, now + ms(120)); + assert!(queue.is_woken()); + + assert_pending!(poll!(queue)); + + sleep_until(now + Duration::from_millis(119)).await; + assert!(!queue.is_woken()); + + sleep(ms(1)).await; + assert!(queue.is_woken()); + + let entry = assert_ready_some!(poll!(queue)).into_inner(); + assert_eq!(entry, "foo"); +} + +#[tokio::test] +async fn insert_in_past_after_poll_fires_immediately() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + queue.insert_at("foo", now + ms(200)); + + assert_pending!(poll!(queue)); + + sleep(ms(80)).await; + + assert!(!queue.is_woken()); + queue.insert_at("bar", now + ms(40)); + + assert!(queue.is_woken()); + + let entry = assert_ready_some!(poll!(queue)).into_inner(); + assert_eq!(entry, "bar"); +} + +#[tokio::test] +async fn delay_queue_poll_expired_when_empty() { + let mut delay_queue = task::spawn(DelayQueue::new()); + let key = delay_queue.insert(0, std::time::Duration::from_secs(10)); + assert_pending!(poll!(delay_queue)); + + delay_queue.remove(&key); + assert!(assert_ready!(poll!(delay_queue)).is_none()); +} + +#[tokio::test(start_paused = true)] +async fn compact_expire_empty() { + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + queue.insert_at("foo1", now + ms(10)); + queue.insert_at("foo2", now + ms(10)); + + sleep(ms(10)).await; + + let mut res = vec![]; + while res.len() < 2 { + let entry = assert_ready_some!(poll!(queue)); + res.push(entry.into_inner()); + } + + queue.compact(); + + assert_eq!(queue.len(), 0); + assert_eq!(queue.capacity(), 0); +} + +#[tokio::test(start_paused = true)] +async fn compact_remove_empty() { + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let key1 = queue.insert_at("foo1", now + ms(10)); + let key2 = queue.insert_at("foo2", now + ms(10)); + + queue.remove(&key1); + queue.remove(&key2); + + queue.compact(); + + assert_eq!(queue.len(), 0); + assert_eq!(queue.capacity(), 0); +} + +#[tokio::test(start_paused = true)] +// Trigger a re-mapping of keys in the slab due to a `compact` call and +// test removal of re-mapped keys +async fn compact_remove_remapped_keys() { + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + queue.insert_at("foo1", now + ms(10)); + queue.insert_at("foo2", now + ms(10)); + + // should be assigned indices 3 and 4 + let key3 = queue.insert_at("foo3", now + ms(20)); + let key4 = queue.insert_at("foo4", now + ms(20)); + + sleep(ms(10)).await; + + let mut res = vec![]; + while res.len() < 2 { + let entry = assert_ready_some!(poll!(queue)); + res.push(entry.into_inner()); + } + + // items corresponding to `foo3` and `foo4` will be assigned + // new indices here + queue.compact(); + + queue.insert_at("foo5", now + ms(10)); + + // test removal of re-mapped keys + let expired3 = queue.remove(&key3); + let expired4 = queue.remove(&key4); + + assert_eq!(expired3.into_inner(), "foo3"); + assert_eq!(expired4.into_inner(), "foo4"); + + queue.compact(); + assert_eq!(queue.len(), 1); + assert_eq!(queue.capacity(), 1); +} + +#[tokio::test(start_paused = true)] +async fn compact_change_deadline() { + let mut queue = task::spawn(DelayQueue::new()); + + let mut now = Instant::now(); + + queue.insert_at("foo1", now + ms(10)); + queue.insert_at("foo2", now + ms(10)); + + // should be assigned indices 3 and 4 + queue.insert_at("foo3", now + ms(20)); + let key4 = queue.insert_at("foo4", now + ms(20)); + + sleep(ms(10)).await; + + let mut res = vec![]; + while res.len() < 2 { + let entry = assert_ready_some!(poll!(queue)); + res.push(entry.into_inner()); + } + + // items corresponding to `foo3` and `foo4` should be assigned + // new indices + queue.compact(); + + now = Instant::now(); + + queue.insert_at("foo5", now + ms(10)); + let key6 = queue.insert_at("foo6", now + ms(10)); + + queue.reset_at(&key4, now + ms(20)); + queue.reset_at(&key6, now + ms(20)); + + // foo3 and foo5 will expire + sleep(ms(10)).await; + + while res.len() < 4 { + let entry = assert_ready_some!(poll!(queue)); + res.push(entry.into_inner()); + } + + sleep(ms(10)).await; + + while res.len() < 6 { + let entry = assert_ready_some!(poll!(queue)); + res.push(entry.into_inner()); + } + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()); +} + +#[tokio::test(start_paused = true)] +async fn remove_after_compact() { + let now = Instant::now(); + let mut queue = DelayQueue::new(); + + let foo_key = queue.insert_at("foo", now + ms(10)); + queue.insert_at("bar", now + ms(20)); + queue.remove(&foo_key); + queue.compact(); + + let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + queue.remove(&foo_key); + })); + assert!(panic.is_err()); +} + +#[tokio::test(start_paused = true)] +async fn remove_after_compact_poll() { + let now = Instant::now(); + let mut queue = task::spawn(DelayQueue::new()); + + let foo_key = queue.insert_at("foo", now + ms(10)); + queue.insert_at("bar", now + ms(20)); + + sleep(ms(10)).await; + assert_eq!(assert_ready_some!(poll!(queue)).key(), foo_key); + + queue.compact(); + + let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + queue.remove(&foo_key); + })); + assert!(panic.is_err()); +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} |