diff options
Diffstat (limited to 'third_party/rust/neqo-transport/tests/sim')
-rw-r--r-- | third_party/rust/neqo-transport/tests/sim/connection.rs | 311 | ||||
-rw-r--r-- | third_party/rust/neqo-transport/tests/sim/delay.rs | 98 | ||||
-rw-r--r-- | third_party/rust/neqo-transport/tests/sim/drop.rs | 71 | ||||
-rw-r--r-- | third_party/rust/neqo-transport/tests/sim/mod.rs | 232 | ||||
-rw-r--r-- | third_party/rust/neqo-transport/tests/sim/net.rs | 111 | ||||
-rw-r--r-- | third_party/rust/neqo-transport/tests/sim/rng.rs | 81 | ||||
-rw-r--r-- | third_party/rust/neqo-transport/tests/sim/taildrop.rs | 184 |
7 files changed, 1088 insertions, 0 deletions
diff --git a/third_party/rust/neqo-transport/tests/sim/connection.rs b/third_party/rust/neqo-transport/tests/sim/connection.rs new file mode 100644 index 0000000000..eafb0d9a3f --- /dev/null +++ b/third_party/rust/neqo-transport/tests/sim/connection.rs @@ -0,0 +1,311 @@ +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![allow(clippy::module_name_repetitions)] + +use super::{Node, Rng}; +use neqo_common::{event::Provider, qdebug, qtrace, Datagram}; +use neqo_crypto::AuthenticationStatus; +use neqo_transport::{ + Connection, ConnectionEvent, ConnectionParameters, Output, State, StreamId, StreamType, +}; +use std::cmp::min; +use std::fmt::{self, Debug}; +use std::time::Instant; + +/// The status of the processing of an event. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum GoalStatus { + /// The event didn't result in doing anything; the goal is waiting for something. + Waiting, + /// An action was taken as a result of the event. + Active, + /// The goal was accomplished. + Done, +} + +/// A goal for the connection. +/// Goals can be accomplished in any order. +pub trait ConnectionGoal { + fn init(&mut self, _c: &mut Connection, _now: Instant) {} + /// Perform some processing. + fn process(&mut self, _c: &mut Connection, _now: Instant) -> GoalStatus { + GoalStatus::Waiting + } + /// Handle an event from the provided connection, returning `true` when the + /// goal is achieved. + fn handle_event(&mut self, c: &mut Connection, e: &ConnectionEvent, now: Instant) + -> GoalStatus; +} + +pub struct ConnectionNode { + c: Connection, + goals: Vec<Box<dyn ConnectionGoal>>, +} + +impl ConnectionNode { + pub fn new_client( + params: ConnectionParameters, + goals: impl IntoIterator<Item = Box<dyn ConnectionGoal>>, + ) -> Self { + Self { + c: test_fixture::new_client(params), + goals: goals.into_iter().collect(), + } + } + + pub fn new_server( + params: ConnectionParameters, + goals: impl IntoIterator<Item = Box<dyn ConnectionGoal>>, + ) -> Self { + Self { + c: test_fixture::new_server(test_fixture::DEFAULT_ALPN, params), + goals: goals.into_iter().collect(), + } + } + + pub fn default_client(goals: impl IntoIterator<Item = Box<dyn ConnectionGoal>>) -> Self { + Self::new_client(ConnectionParameters::default(), goals) + } + + pub fn default_server(goals: impl IntoIterator<Item = Box<dyn ConnectionGoal>>) -> Self { + Self::new_server(ConnectionParameters::default(), goals) + } + + #[allow(dead_code)] + pub fn clear_goals(&mut self) { + self.goals.clear(); + } + + #[allow(dead_code)] + pub fn add_goal(&mut self, goal: Box<dyn ConnectionGoal>) { + self.goals.push(goal); + } + + /// Process all goals using the given closure and return whether any were active. + fn process_goals<F>(&mut self, mut f: F) -> bool + where + F: FnMut(&mut Box<dyn ConnectionGoal>, &mut Connection) -> GoalStatus, + { + // Waiting on drain_filter... + // self.goals.drain_filter(|g| f(g, &mut self.c, &e)).count(); + let mut active = false; + let mut i = 0; + while i < self.goals.len() { + let status = f(&mut self.goals[i], &mut self.c); + if status == GoalStatus::Done { + self.goals.remove(i); + active = true; + } else { + active |= status == GoalStatus::Active; + i += 1; + } + } + active + } +} + +impl Node for ConnectionNode { + fn init(&mut self, _rng: Rng, now: Instant) { + for g in &mut self.goals { + g.init(&mut self.c, now); + } + } + + fn process(&mut self, mut d: Option<Datagram>, now: Instant) -> Output { + let _ = self.process_goals(|goal, c| goal.process(c, now)); + loop { + let res = self.c.process(d.take(), now); + + let mut active = false; + while let Some(e) = self.c.next_event() { + qtrace!([self.c], "received event {:?}", e); + + // Perform authentication automatically. + if matches!(e, ConnectionEvent::AuthenticationNeeded) { + self.c.authenticated(AuthenticationStatus::Ok, now); + } + + active |= self.process_goals(|goal, c| goal.handle_event(c, &e, now)); + } + // Exit at this point if the connection produced a datagram. + // We also exit if none of the goals were active, as there is + // no point trying again if they did nothing. + if matches!(res, Output::Datagram(_)) || !active { + return res; + } + qdebug!([self.c], "no datagram and goal activity, looping"); + } + } + + fn done(&self) -> bool { + self.goals.is_empty() + } + + fn print_summary(&self, test_name: &str) { + println!("{}: {:?}", test_name, self.c.stats()); + } +} + +impl Debug for ConnectionNode { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&self.c, f) + } +} + +#[derive(Debug, Clone)] +pub struct ReachState { + target: State, +} + +impl ReachState { + pub fn new(target: State) -> Self { + Self { target } + } +} + +impl ConnectionGoal for ReachState { + fn handle_event( + &mut self, + _c: &mut Connection, + e: &ConnectionEvent, + _now: Instant, + ) -> GoalStatus { + if matches!(e, ConnectionEvent::StateChange(state) if *state == self.target) { + GoalStatus::Done + } else { + GoalStatus::Waiting + } + } +} + +#[derive(Debug)] +pub struct SendData { + remaining: usize, + stream_id: Option<StreamId>, +} + +impl SendData { + pub fn new(amount: usize) -> Self { + Self { + remaining: amount, + stream_id: None, + } + } + + fn make_stream(&mut self, c: &mut Connection) { + if self.stream_id.is_none() { + if let Ok(stream_id) = c.stream_create(StreamType::UniDi) { + qdebug!([c], "made stream {} for sending", stream_id); + self.stream_id = Some(stream_id); + } + } + } + + fn send(&mut self, c: &mut Connection, stream_id: StreamId) -> GoalStatus { + const DATA: &[u8] = &[0; 4096]; + let mut status = GoalStatus::Waiting; + loop { + let end = min(self.remaining, DATA.len()); + let sent = c.stream_send(stream_id, &DATA[..end]).unwrap(); + if sent == 0 { + return status; + } + self.remaining -= sent; + qtrace!("sent {} remaining {}", sent, self.remaining); + if self.remaining == 0 { + c.stream_close_send(stream_id).unwrap(); + return GoalStatus::Done; + } + status = GoalStatus::Active; + } + } +} + +impl ConnectionGoal for SendData { + fn init(&mut self, c: &mut Connection, _now: Instant) { + self.make_stream(c); + } + + fn process(&mut self, c: &mut Connection, _now: Instant) -> GoalStatus { + self.stream_id + .map_or(GoalStatus::Waiting, |stream_id| self.send(c, stream_id)) + } + + fn handle_event( + &mut self, + c: &mut Connection, + e: &ConnectionEvent, + _now: Instant, + ) -> GoalStatus { + match e { + ConnectionEvent::SendStreamCreatable { + stream_type: StreamType::UniDi, + } + // TODO(mt): remove the second condition when #842 is fixed. + | ConnectionEvent::StateChange(_) => { + self.make_stream(c); + GoalStatus::Active + } + + ConnectionEvent::SendStreamWritable { stream_id } + if Some(*stream_id) == self.stream_id => + { + self.send(c, *stream_id) + } + + // If we sent data in 0-RTT, then we didn't track how much we should + // have sent. This is trivial to fix if 0-RTT testing is ever needed. + ConnectionEvent::ZeroRttRejected => panic!("not supported"), + _ => GoalStatus::Waiting, + } + } +} + +/// Receive a prescribed amount of data from any stream. +#[derive(Debug)] +pub struct ReceiveData { + remaining: usize, +} + +impl ReceiveData { + pub fn new(amount: usize) -> Self { + Self { remaining: amount } + } + + fn recv(&mut self, c: &mut Connection, stream_id: StreamId) -> GoalStatus { + let mut buf = vec![0; 4096]; + let mut status = GoalStatus::Waiting; + loop { + let end = min(self.remaining, buf.len()); + let (recvd, _) = c.stream_recv(stream_id, &mut buf[..end]).unwrap(); + qtrace!("received {} remaining {}", recvd, self.remaining); + if recvd == 0 { + return status; + } + self.remaining -= recvd; + if self.remaining == 0 { + return GoalStatus::Done; + } + status = GoalStatus::Active; + } + } +} + +impl ConnectionGoal for ReceiveData { + fn handle_event( + &mut self, + c: &mut Connection, + e: &ConnectionEvent, + _now: Instant, + ) -> GoalStatus { + if let ConnectionEvent::RecvStreamReadable { stream_id } = e { + self.recv(c, *stream_id) + } else { + GoalStatus::Waiting + } + } +} diff --git a/third_party/rust/neqo-transport/tests/sim/delay.rs b/third_party/rust/neqo-transport/tests/sim/delay.rs new file mode 100644 index 0000000000..95188c0562 --- /dev/null +++ b/third_party/rust/neqo-transport/tests/sim/delay.rs @@ -0,0 +1,98 @@ +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![allow(clippy::module_name_repetitions)] + +use super::{Node, Rng}; +use neqo_common::Datagram; +use neqo_transport::Output; +use std::collections::BTreeMap; +use std::convert::TryFrom; +use std::fmt::{self, Debug}; +use std::ops::Range; +use std::time::{Duration, Instant}; + +/// An iterator that shares a `Random` instance and produces uniformly +/// random `Duration`s within a specified range. +pub struct RandomDelay { + start: Duration, + max: u64, + rng: Option<Rng>, +} + +impl RandomDelay { + /// Make a new random `Duration` generator. This panics if the range provided + /// is inverted (i.e., `bounds.start > bounds.end`), or spans 2^64 + /// or more nanoseconds. + /// A zero-length range means that random values won't be taken from the Rng + pub fn new(bounds: Range<Duration>) -> Self { + let max = u64::try_from((bounds.end - bounds.start).as_nanos()).unwrap(); + Self { + start: bounds.start, + max, + rng: None, + } + } + + pub fn set_rng(&mut self, rng: Rng) { + self.rng = Some(rng); + } + + pub fn next(&mut self) -> Duration { + let mut rng = self.rng.as_ref().unwrap().borrow_mut(); + let r = rng.random_from(0..self.max); + self.start + Duration::from_nanos(r) + } +} + +pub struct Delay { + random: RandomDelay, + queue: BTreeMap<Instant, Datagram>, +} + +impl Delay { + pub fn new(bounds: Range<Duration>) -> Self { + Self { + random: RandomDelay::new(bounds), + queue: BTreeMap::default(), + } + } + + fn insert(&mut self, d: Datagram, now: Instant) { + let mut t = now + self.random.next(); + while self.queue.contains_key(&t) { + // This is a little inefficient, but it avoids drops on collisions, + // which are super-common for a fixed delay. + t += Duration::from_nanos(1); + } + self.queue.insert(t, d); + } +} + +impl Node for Delay { + fn init(&mut self, rng: Rng, _now: Instant) { + self.random.set_rng(rng); + } + + fn process(&mut self, d: Option<Datagram>, now: Instant) -> Output { + if let Some(dgram) = d { + self.insert(dgram, now); + } + if let Some((&k, _)) = self.queue.range(..=now).next() { + Output::Datagram(self.queue.remove(&k).unwrap()) + } else if let Some(&t) = self.queue.keys().next() { + Output::Callback(t - now) + } else { + Output::None + } + } +} + +impl Debug for Delay { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("delay") + } +} diff --git a/third_party/rust/neqo-transport/tests/sim/drop.rs b/third_party/rust/neqo-transport/tests/sim/drop.rs new file mode 100644 index 0000000000..d42913d99d --- /dev/null +++ b/third_party/rust/neqo-transport/tests/sim/drop.rs @@ -0,0 +1,71 @@ +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![allow(clippy::module_name_repetitions)] + +use super::{Node, Rng}; +use neqo_common::{qtrace, Datagram}; +use neqo_transport::Output; +use std::fmt::{self, Debug}; +use std::time::Instant; + +/// A random dropper. +pub struct Drop { + threshold: u64, + max: u64, + rng: Option<Rng>, +} + +impl Drop { + /// Make a new random drop generator. Each `drop` is called, this generates a + /// random value between 0 and `max` (exclusive). If this value is less than + /// `threshold` a value of `true` is returned. + pub fn new(threshold: u64, max: u64) -> Self { + Self { + threshold, + max, + rng: None, + } + } + + /// Generate random drops with the given percentage. + pub fn percentage(pct: u8) -> Self { + // Multiply by 10 so that the random number generator works more efficiently. + Self::new(u64::from(pct) * 10, 1000) + } + + pub fn drop(&mut self) -> bool { + let mut rng = self.rng.as_ref().unwrap().borrow_mut(); + let r = rng.random_from(0..self.max); + r < self.threshold + } +} + +impl Node for Drop { + fn init(&mut self, rng: Rng, _now: Instant) { + self.rng = Some(rng); + } + + // Pass any datagram provided directly out, but drop some of them. + fn process(&mut self, d: Option<Datagram>, _now: Instant) -> Output { + if let Some(dgram) = d { + if self.drop() { + qtrace!("drop {}", dgram.len()); + Output::None + } else { + Output::Datagram(dgram) + } + } else { + Output::None + } + } +} + +impl Debug for Drop { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("drop") + } +} diff --git a/third_party/rust/neqo-transport/tests/sim/mod.rs b/third_party/rust/neqo-transport/tests/sim/mod.rs new file mode 100644 index 0000000000..f7646aac56 --- /dev/null +++ b/third_party/rust/neqo-transport/tests/sim/mod.rs @@ -0,0 +1,232 @@ +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +// Tests with simulated network +#![cfg_attr(feature = "deny-warnings", deny(warnings))] +#![warn(clippy::pedantic)] + +pub mod connection; +mod delay; +mod drop; +pub mod rng; +mod taildrop; + +use neqo_common::{qdebug, qinfo, qtrace, Datagram, Encoder}; +use neqo_transport::Output; +use rng::Random; +use std::cell::RefCell; +use std::cmp::min; +use std::convert::TryFrom; +use std::fmt::Debug; +use std::rc::Rc; +use std::time::{Duration, Instant}; +use test_fixture::{self, now}; + +use NodeState::{Active, Idle, Waiting}; + +pub mod network { + pub use super::delay::Delay; + pub use super::drop::Drop; + pub use super::taildrop::TailDrop; +} + +type Rng = Rc<RefCell<Random>>; + +/// A macro that turns a list of values into boxed versions of the same. +#[macro_export] +macro_rules! boxed { + [$($v:expr),+ $(,)?] => { + vec![ $( Box::new($v) as _ ),+ ] + }; +} + +/// Create a simulation test case. This takes either two or three arguments. +/// The two argument form takes a bare name (`ident`), a comma, and an array of +/// items that implement `Node`. +/// The three argument form adds a setup block that can be used to construct a +/// complex value that is then shared between all nodes. The values in the +/// three-argument form have to be closures (or functions) that accept a reference +/// to the value returned by the setup. +#[macro_export] +macro_rules! simulate { + ($n:ident, [ $($v:expr),+ $(,)? ] $(,)?) => { + simulate!($n, (), [ $(|_| $v),+ ]); + }; + ($n:ident, $setup:expr, [ $( $v:expr ),+ $(,)? ] $(,)?) => { + #[test] + fn $n() { + let fixture = $setup; + let mut nodes: Vec<Box<dyn $crate::sim::Node>> = Vec::new(); + $( + let f: Box<dyn FnOnce(&_) -> _> = Box::new($v); + nodes.push(Box::new(f(&fixture))); + )* + let mut sim = Simulator::new(stringify!($n), nodes); + if let Ok(seed) = std::env::var("SIMULATION_SEED") { + sim.seed_str(seed); + } + sim.run(); + } + }; +} + +pub trait Node: Debug { + fn init(&mut self, _rng: Rng, _now: Instant) {} + /// Perform processing. This optionally takes a datagram and produces either + /// another data, a time that the simulator needs to wait, or nothing. + fn process(&mut self, d: Option<Datagram>, now: Instant) -> Output; + /// An node can report when it considers itself "done". + fn done(&self) -> bool { + true + } + fn print_summary(&self, _test_name: &str) {} +} + +/// The state of a single node. Nodes will be activated if they are `Active` +/// or if the previous node in the loop generated a datagram. Nodes that return +/// `true` from `Node::done` will be activated as normal. +#[derive(Debug, PartialEq)] +enum NodeState { + /// The node just produced a datagram. It should be activated again as soon as possible. + Active, + /// The node is waiting. + Waiting(Instant), + /// The node became idle. + Idle, +} + +#[derive(Debug)] +struct NodeHolder { + node: Box<dyn Node>, + state: NodeState, +} + +impl NodeHolder { + fn ready(&self, now: Instant) -> bool { + match self.state { + Active => true, + Waiting(t) => t >= now, + Idle => false, + } + } +} + +pub struct Simulator { + name: String, + nodes: Vec<NodeHolder>, + rng: Rng, +} + +impl Simulator { + pub fn new(name: impl AsRef<str>, nodes: impl IntoIterator<Item = Box<dyn Node>>) -> Self { + let name = String::from(name.as_ref()); + // The first node is marked as Active, the rest are idle. + let mut it = nodes.into_iter(); + let nodes = it + .next() + .map(|node| NodeHolder { + node, + state: Active, + }) + .into_iter() + .chain(it.map(|node| NodeHolder { node, state: Idle })) + .collect::<Vec<_>>(); + Self { + name, + nodes, + rng: Rc::default(), + } + } + + pub fn seed(&mut self, seed: [u8; 32]) { + self.rng = Rc::new(RefCell::new(Random::new(seed))); + } + + /// Seed from a hex string. + /// Though this is convenient, it panics if this isn't a 64 character hex string. + pub fn seed_str(&mut self, seed: impl AsRef<str>) { + let seed = Encoder::from_hex(seed); + self.seed(<[u8; 32]>::try_from(seed.as_ref()).unwrap()); + } + + fn next_time(&self, now: Instant) -> Instant { + let mut next = None; + for n in &self.nodes { + match n.state { + Idle => continue, + Active => return now, + Waiting(a) => next = Some(next.map_or(a, |b| min(a, b))), + } + } + next.expect("a node cannot be idle and not done") + } + + /// Runs the simulation. + pub fn run(mut self) -> Duration { + let start = now(); + let mut now = start; + let mut dgram = None; + + for n in &mut self.nodes { + n.node.init(self.rng.clone(), now); + } + println!("{}: seed {}", self.name, self.rng.borrow().seed_str()); + + let real_start = Instant::now(); + loop { + for n in &mut self.nodes { + if dgram.is_none() && !n.ready(now) { + qdebug!([self.name], "skipping {:?}", n.node); + continue; + } + + qdebug!([self.name], "processing {:?}", n.node); + let res = n.node.process(dgram.take(), now); + n.state = match res { + Output::Datagram(d) => { + qtrace!([self.name], " => datagram {}", d.len()); + dgram = Some(d); + Active + } + Output::Callback(delay) => { + qtrace!([self.name], " => callback {:?}", delay); + assert_ne!(delay, Duration::new(0, 0)); + Waiting(now + delay) + } + Output::None => { + qtrace!([self.name], " => nothing"); + assert!(n.node.done(), "nodes have to be done when they go idle"); + Idle + } + }; + } + + if self.nodes.iter().all(|n| n.node.done()) { + let real_elapsed = real_start.elapsed(); + println!("{}: real elapsed time: {:?}", self.name, real_elapsed); + let elapsed = now - start; + println!("{}: simulated elapsed time: {:?}", self.name, elapsed); + for n in &self.nodes { + n.node.print_summary(&self.name); + } + return elapsed; + } + + if dgram.is_none() { + let next = self.next_time(now); + if next > now { + qinfo!( + [self.name], + "advancing time by {:?} to {:?}", + next - now, + next - start + ); + now = next; + } + } + } + } +} diff --git a/third_party/rust/neqo-transport/tests/sim/net.rs b/third_party/rust/neqo-transport/tests/sim/net.rs new file mode 100644 index 0000000000..754426f895 --- /dev/null +++ b/third_party/rust/neqo-transport/tests/sim/net.rs @@ -0,0 +1,111 @@ +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use super::rng::RandomDuration; +use super::{Node, Rng}; +use neqo_common::Datagram; +use neqo_transport::Output; +use std::collections::BTreeMap; +use std::fmt::{self, Debug}; +use std::iter; +use std::ops::Range; +use std::time::{Duration, Instant}; + +/// +pub struct RandomDrop { + threshold: u64, + max: u64, + rng: Rng, +} + +impl RandomDuration { + /// Make a new random `Duration` generator. This asserts if the range provided + /// is inverted (i.e., `bounds.start > bounds.end`), or spans 2^64 + /// or more nanoseconds. + /// A zero-length range means that random values won't be taken from the Rng + pub fn new(bounds: Range<Duration>, rng: Rng) -> Self { + let max = u64::try_from((bounds.end - bounds.start).as_nanos()).unwrap(); + Self { + start: bounds.start, + max, + rng, + } + } + + fn next(&mut self) -> Duration { + let r = if self.max == 0 { + Duration::new(0, 0) + } else { + self.rng.borrow_mut().random_from(0..self.max) + } + self.start + Duration::from_nanos(r) + } +} + +enum DelayState { + New(Range<Duration>), + Ready(RandomDuration), +} + +pub struct Delay { + state: DelayState, + queue: BTreeMap<Instant, Datagram>, +} + +impl Delay +{ + pub fn new(bounds: Range<Duration>) -> Self + { + Self { + State: DelayState::New(bounds), + queue: BTreeMap::default(), + } + } + + fn insert(&mut self, d: Datagram, now: Instant) { + let mut t = if let State::Ready(r) = self.state { + now + self.source.next() + } else { + unreachable!(); + } + while self.queue.contains_key(&t) { + // This is a little inefficient, but it avoids drops on collisions, + // which are super-common for a fixed delay. + t += Duration::from_nanos(1); + } + self.queue.insert(t, d); + } +} + +impl Node for Delay +{ + fn init(&mut self, rng: Rng, now: Instant) { + if let DelayState::New(bounds) = self.state { + self.state = RandomDuration::new(bounds); + } else { + unreachable!(); + } + } + + fn process(&mut self, d: Option<Datagram>, now: Instant) -> Output { + if let Some(dgram) = d { + self.insert(dgram, now); + } + if let Some((&k, _)) = self.queue.range(..now).nth(0) { + Output::Datagram(self.queue.remove(&k).unwrap()) + } else if let Some(&t) = self.queue.keys().nth(0) { + Output::Callback(t - now) + } else { + Output::None + } + } +} + +impl<T> Debug for Delay<T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("delay") + } +} diff --git a/third_party/rust/neqo-transport/tests/sim/rng.rs b/third_party/rust/neqo-transport/tests/sim/rng.rs new file mode 100644 index 0000000000..d314e8b36f --- /dev/null +++ b/third_party/rust/neqo-transport/tests/sim/rng.rs @@ -0,0 +1,81 @@ +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use neqo_common::Decoder; +use std::convert::TryFrom; +use std::ops::Range; + +/// An implementation of a xoshiro256** pseudorandom generator. +pub struct Random { + state: [u64; 4], +} + +impl Random { + pub fn new(seed: [u8; 32]) -> Self { + assert!(seed.iter().any(|&x| x != 0)); + let mut dec = Decoder::from(&seed); + Self { + state: [ + dec.decode_uint(8).unwrap(), + dec.decode_uint(8).unwrap(), + dec.decode_uint(8).unwrap(), + dec.decode_uint(8).unwrap(), + ], + } + } + + pub fn random(&mut self) -> u64 { + let result = (self.state[1].overflowing_mul(5).0) + .rotate_right(7) + .overflowing_mul(9) + .0; + let t = self.state[1] << 17; + + self.state[2] ^= self.state[0]; + self.state[3] ^= self.state[1]; + self.state[1] ^= self.state[2]; + self.state[0] ^= self.state[3]; + + self.state[2] ^= t; + self.state[3] = self.state[3].rotate_right(45); + + result + } + + /// Generate a random value from the range. + /// If the range is empty or inverted (`range.start > range.end`), then + /// this returns the value of `range.start` without generating any random values. + pub fn random_from(&mut self, range: Range<u64>) -> u64 { + let max = range.end.saturating_sub(range.start); + if max == 0 { + return range.start; + } + + let shift = (max - 1).leading_zeros(); + assert_ne!(max, 0); + loop { + let r = self.random() >> shift; + if r < max { + return range.start + r; + } + } + } + + /// Get the seed necessary to continue from this point. + pub fn seed_str(&self) -> String { + format!( + "{:8x}{:8x}{:8x}{:8x}", + self.state[0], self.state[1], self.state[2], self.state[3], + ) + } +} + +impl Default for Random { + fn default() -> Self { + let buf = neqo_crypto::random(32); + Random::new(<[u8; 32]>::try_from(&buf[..]).unwrap()) + } +} diff --git a/third_party/rust/neqo-transport/tests/sim/taildrop.rs b/third_party/rust/neqo-transport/tests/sim/taildrop.rs new file mode 100644 index 0000000000..7346b27178 --- /dev/null +++ b/third_party/rust/neqo-transport/tests/sim/taildrop.rs @@ -0,0 +1,184 @@ +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![allow(clippy::module_name_repetitions)] + +use super::Node; +use neqo_common::{qtrace, Datagram}; +use neqo_transport::Output; +use std::cmp::max; +use std::collections::VecDeque; +use std::convert::TryFrom; +use std::fmt::{self, Debug}; +use std::time::{Duration, Instant}; + +/// One second in nanoseconds. +const ONE_SECOND_NS: u128 = 1_000_000_000; + +/// This models a link with a tail drop router at the front of it. +pub struct TailDrop { + /// An overhead associated with each entry. This accounts for + /// layer 2, IP, and UDP overheads. + overhead: usize, + /// The rate at which bytes egress the link, in bytes per second. + rate: usize, + /// The depth of the queue, in bytes. + capacity: usize, + + /// A counter for how many bytes are enqueued. + used: usize, + /// A queue of unsent bytes. + queue: VecDeque<Datagram>, + /// The time that the next datagram can enter the link. + next_deque: Option<Instant>, + + /// Any sub-ns delay from the last enqueue. + sub_ns_delay: u32, + /// The time it takes a byte to exit the other end of the link. + delay: Duration, + /// The packets that are on the link and when they can be delivered. + on_link: VecDeque<(Instant, Datagram)>, + + /// The number of packets received. + received: usize, + /// The number of packets dropped. + dropped: usize, + /// The number of packets delivered. + delivered: usize, + /// The maximum amount of queue capacity ever used. + /// As packets leave the queue as soon as they start being used, this doesn't + /// count them. + maxq: usize, +} + +impl TailDrop { + /// Make a new taildrop node with the given rate, queue capacity, and link delay. + pub fn new(rate: usize, capacity: usize, delay: Duration) -> Self { + Self { + overhead: 64, + rate, + capacity, + used: 0, + queue: VecDeque::new(), + next_deque: None, + sub_ns_delay: 0, + delay, + on_link: VecDeque::new(), + received: 0, + dropped: 0, + delivered: 0, + maxq: 0, + } + } + + /// A tail drop queue on a 10Mbps link (approximated to 1 million bytes per second) + /// with a fat 32k buffer (about 30ms), and the default forward delay of 50ms. + pub fn dsl_uplink() -> Self { + TailDrop::new(1_000_000, 32_768, Duration::from_millis(50)) + } + + /// Cut downlink to one fifth of the uplink (2Mbps), and reduce the buffer to 1/4. + pub fn dsl_downlink() -> Self { + TailDrop::new(200_000, 8_192, Duration::from_millis(50)) + } + + /// How "big" is this datagram, accounting for overheads. + /// This approximates by using the same overhead for storing in the queue + /// and for sending on the wire. + fn size(&self, d: &Datagram) -> usize { + d.len() + self.overhead + } + + /// Start sending a datagram. + fn send(&mut self, d: Datagram, now: Instant) { + // How many bytes are we "transmitting"? + let sz = u128::try_from(self.size(&d)).unwrap(); + + // Calculate how long it takes to put the packet on the link. + // Perform the calculation based on 2^32 seconds and save any remainder. + // This ensures that high rates and small packets don't result in rounding + // down times too badly. + // Duration consists of a u64 and a u32, so we have 32 high bits to spare. + let t = sz * (ONE_SECOND_NS << 32) / u128::try_from(self.rate).unwrap() + + u128::from(self.sub_ns_delay); + let send_ns = u64::try_from(t >> 32).unwrap(); + assert_ne!(send_ns, 0, "sending a packet takes <1ns"); + self.sub_ns_delay = u32::try_from(t & u128::from(u32::MAX)).unwrap(); + let deque_time = now + Duration::from_nanos(send_ns); + self.next_deque = Some(deque_time); + + // Now work out when the packet is fully received at the other end of + // the link. Setup to deliver the packet then. + let delivery_time = deque_time + self.delay; + self.on_link.push_back((delivery_time, d)); + } + + /// Enqueue for sending. Maybe. If this overflows the queue, drop it instead. + fn maybe_enqueue(&mut self, d: Datagram, now: Instant) { + self.received += 1; + if self.next_deque.is_none() { + // Nothing in the queue and nothing still sending. + debug_assert!(self.queue.is_empty()); + self.send(d, now); + } else if self.used + self.size(&d) <= self.capacity { + self.used += self.size(&d); + self.maxq = max(self.maxq, self.used); + self.queue.push_back(d); + } else { + qtrace!("taildrop dropping {} bytes", d.len()); + self.dropped += 1; + } + } + + /// If the last packet that was sending has been sent, start sending + /// the next one. + fn maybe_send(&mut self, now: Instant) { + if self.next_deque.as_ref().map_or(false, |t| *t <= now) { + if let Some(d) = self.queue.pop_front() { + self.used -= self.size(&d); + self.send(d, now); + } else { + self.next_deque = None; + self.sub_ns_delay = 0; + } + } + } +} + +impl Node for TailDrop { + fn process(&mut self, d: Option<Datagram>, now: Instant) -> Output { + if let Some(dgram) = d { + self.maybe_enqueue(dgram, now); + } + + self.maybe_send(now); + + if let Some((t, _)) = self.on_link.front() { + if *t <= now { + let (_, d) = self.on_link.pop_front().unwrap(); + self.delivered += 1; + Output::Datagram(d) + } else { + Output::Callback(*t - now) + } + } else { + Output::None + } + } + + fn print_summary(&self, test_name: &str) { + println!( + "{}: taildrop: rx {} drop {} tx {} maxq {}", + test_name, self.received, self.dropped, self.delivered, self.maxq, + ); + } +} + +impl Debug for TailDrop { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("taildrop") + } +} |