diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 01:14:29 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 01:14:29 +0000 |
commit | fbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8 (patch) | |
tree | 4c1ccaf5486d4f2009f9a338a98a83e886e29c97 /third_party/rust/neqo-transport/src | |
parent | Releasing progress-linux version 124.0.1-1~progress7.99u1. (diff) | |
download | firefox-fbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8.tar.xz firefox-fbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8.zip |
Merging upstream version 125.0.1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/neqo-transport/src')
52 files changed, 1483 insertions, 741 deletions
diff --git a/third_party/rust/neqo-transport/src/ackrate.rs b/third_party/rust/neqo-transport/src/ackrate.rs index cf68f9021f..d5923805d9 100644 --- a/third_party/rust/neqo-transport/src/ackrate.rs +++ b/third_party/rust/neqo-transport/src/ackrate.rs @@ -5,9 +5,8 @@ // except according to those terms. // Management of the peer's ack rate. -#![deny(clippy::pedantic)] -use std::{cmp::max, convert::TryFrom, time::Duration}; +use std::{cmp::max, time::Duration}; use neqo_common::qtrace; diff --git a/third_party/rust/neqo-transport/src/addr_valid.rs b/third_party/rust/neqo-transport/src/addr_valid.rs index b5ed2d07d1..f596cfc3cb 100644 --- a/third_party/rust/neqo-transport/src/addr_valid.rs +++ b/third_party/rust/neqo-transport/src/addr_valid.rs @@ -7,7 +7,6 @@ // This file implements functions necessary for address validation. use std::{ - convert::TryFrom, net::{IpAddr, SocketAddr}, time::{Duration, Instant}, }; @@ -23,15 +22,15 @@ use crate::{ cid::ConnectionId, packet::PacketBuilder, recovery::RecoveryToken, stats::FrameStats, Res, }; -/// A prefix we add to Retry tokens to distinguish them from NEW_TOKEN tokens. +/// A prefix we add to Retry tokens to distinguish them from `NEW_TOKEN` tokens. const TOKEN_IDENTIFIER_RETRY: &[u8] = &[0x52, 0x65, 0x74, 0x72, 0x79]; -/// A prefix on NEW_TOKEN tokens, that is maximally Hamming distant from NEW_TOKEN. +/// A prefix on `NEW_TOKEN` tokens, that is maximally Hamming distant from `NEW_TOKEN`. /// Together, these need to have a low probability of collision, even if there is /// corruption of individual bits in transit. const TOKEN_IDENTIFIER_NEW_TOKEN: &[u8] = &[0xad, 0x9a, 0x8b, 0x8d, 0x86]; -/// The maximum number of tokens we'll save from NEW_TOKEN frames. -/// This should be the same as the value of MAX_TICKETS in neqo-crypto. +/// The maximum number of tokens we'll save from `NEW_TOKEN` frames. +/// This should be the same as the value of `MAX_TICKETS` in neqo-crypto. const MAX_NEW_TOKEN: usize = 4; /// The number of tokens we'll track for the purposes of looking for duplicates. /// This is based on how many might be received over a period where could be @@ -44,9 +43,9 @@ const MAX_SAVED_TOKENS: usize = 8; pub enum ValidateAddress { /// Require address validation never. Never, - /// Require address validation unless a NEW_TOKEN token is provided. + /// Require address validation unless a `NEW_TOKEN` token is provided. NoToken, - /// Require address validation even if a NEW_TOKEN token is provided. + /// Require address validation even if a `NEW_TOKEN` token is provided. Always, } @@ -143,7 +142,7 @@ impl AddressValidation { self.generate_token(Some(dcid), peer_address, now) } - /// This generates a token for use with NEW_TOKEN. + /// This generates a token for use with `NEW_TOKEN`. pub fn generate_new_token(&self, peer_address: SocketAddr, now: Instant) -> Res<Vec<u8>> { self.generate_token(None, peer_address, now) } @@ -184,7 +183,7 @@ impl AddressValidation { /// Less than one difference per byte indicates that it is likely not a Retry. /// This generous interpretation allows for a lot of damage in transit. /// Note that if this check fails, then the token will be treated like it came - /// from NEW_TOKEN instead. If there truly is corruption of packets that causes + /// from `NEW_TOKEN` instead. If there truly is corruption of packets that causes /// validation failure, it will be a failure that we try to recover from. fn is_likely_retry(token: &[u8]) -> bool { let mut difference = 0; @@ -210,10 +209,9 @@ impl AddressValidation { if self.validation == ValidateAddress::Never { qinfo!("AddressValidation: no token; accepting"); return AddressValidationResult::Pass; - } else { - qinfo!("AddressValidation: no token; validating"); - return AddressValidationResult::Validate; } + qinfo!("AddressValidation: no token; validating"); + return AddressValidationResult::Validate; } if token.len() <= TOKEN_IDENTIFIER_RETRY.len() { // Treat bad tokens strictly. @@ -231,7 +229,7 @@ impl AddressValidation { qinfo!("AddressValidation: valid Retry token for {}", cid); AddressValidationResult::ValidRetry(cid) } else { - panic!("AddressValidation: Retry token with small CID {}", cid); + panic!("AddressValidation: Retry token with small CID {cid}"); } } else if cid.is_empty() { // An empty connection ID means NEW_TOKEN. @@ -243,7 +241,7 @@ impl AddressValidation { AddressValidationResult::Pass } } else { - panic!("AddressValidation: NEW_TOKEN token with CID {}", cid); + panic!("AddressValidation: NEW_TOKEN token with CID {cid}"); } } else { // From here on, we have a token that we couldn't decrypt. @@ -351,14 +349,13 @@ impl NewTokenState { builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, - ) -> Res<()> { + ) { if let Self::Server(ref mut sender) = self { - sender.write_frames(builder, tokens, stats)?; + sender.write_frames(builder, tokens, stats); } - Ok(()) } - /// If this a server, buffer a NEW_TOKEN for sending. + /// If this a server, buffer a `NEW_TOKEN` for sending. /// If this is a client, panic. pub fn send_new_token(&mut self, token: Vec<u8>) { if let Self::Server(ref mut sender) = self { @@ -368,7 +365,7 @@ impl NewTokenState { } } - /// If this a server, process a lost signal for a NEW_TOKEN frame. + /// If this a server, process a lost signal for a `NEW_TOKEN` frame. /// If this is a client, panic. pub fn lost(&mut self, seqno: usize) { if let Self::Server(ref mut sender) = self { @@ -378,7 +375,7 @@ impl NewTokenState { } } - /// If this a server, process remove the acknowledged NEW_TOKEN frame. + /// If this a server, process remove the acknowledged `NEW_TOKEN` frame. /// If this is a client, panic. pub fn acked(&mut self, seqno: usize) { if let Self::Server(ref mut sender) = self { @@ -403,7 +400,7 @@ impl NewTokenFrameStatus { #[derive(Default)] pub struct NewTokenSender { - /// The unacknowledged NEW_TOKEN frames we are yet to send. + /// The unacknowledged `NEW_TOKEN` frames we are yet to send. tokens: Vec<NewTokenFrameStatus>, /// A sequence number that is used to track individual tokens /// by reference (so that recovery tokens can be simple). @@ -426,8 +423,8 @@ impl NewTokenSender { builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, - ) -> Res<()> { - for t in self.tokens.iter_mut() { + ) { + for t in &mut self.tokens { if t.needs_sending && t.len() <= builder.remaining() { t.needs_sending = false; @@ -438,11 +435,10 @@ impl NewTokenSender { stats.new_token += 1; } } - Ok(()) } pub fn lost(&mut self, seqno: usize) { - for t in self.tokens.iter_mut() { + for t in &mut self.tokens { if t.seqno == seqno { t.needs_sending = true; break; diff --git a/third_party/rust/neqo-transport/src/cc/classic_cc.rs b/third_party/rust/neqo-transport/src/cc/classic_cc.rs index 6f4a01d795..89be6c4b0f 100644 --- a/third_party/rust/neqo-transport/src/cc/classic_cc.rs +++ b/third_party/rust/neqo-transport/src/cc/classic_cc.rs @@ -5,7 +5,6 @@ // except according to those terms. // Congestion control -#![deny(clippy::pedantic)] use std::{ cmp::{max, min}, @@ -536,10 +535,7 @@ impl<T: WindowAdjustment> ClassicCongestionControl<T> { #[cfg(test)] mod tests { - use std::{ - convert::TryFrom, - time::{Duration, Instant}, - }; + use std::time::{Duration, Instant}; use neqo_common::qinfo; use test_fixture::now; diff --git a/third_party/rust/neqo-transport/src/cc/cubic.rs b/third_party/rust/neqo-transport/src/cc/cubic.rs index c04a29b443..058a4c2aa4 100644 --- a/third_party/rust/neqo-transport/src/cc/cubic.rs +++ b/third_party/rust/neqo-transport/src/cc/cubic.rs @@ -4,10 +4,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -#![deny(clippy::pedantic)] - use std::{ - convert::TryFrom, fmt::{self, Display}, time::{Duration, Instant}, }; diff --git a/third_party/rust/neqo-transport/src/cc/mod.rs b/third_party/rust/neqo-transport/src/cc/mod.rs index a1a43bd157..486d15e67e 100644 --- a/third_party/rust/neqo-transport/src/cc/mod.rs +++ b/third_party/rust/neqo-transport/src/cc/mod.rs @@ -5,7 +5,6 @@ // except according to those terms. // Congestion control -#![deny(clippy::pedantic)] use std::{ fmt::{Debug, Display}, diff --git a/third_party/rust/neqo-transport/src/cc/new_reno.rs b/third_party/rust/neqo-transport/src/cc/new_reno.rs index e51b3d6cc0..47d0d56f37 100644 --- a/third_party/rust/neqo-transport/src/cc/new_reno.rs +++ b/third_party/rust/neqo-transport/src/cc/new_reno.rs @@ -5,7 +5,6 @@ // except according to those terms. // Congestion control -#![deny(clippy::pedantic)] use std::{ fmt::{self, Display}, diff --git a/third_party/rust/neqo-transport/src/cc/tests/cubic.rs b/third_party/rust/neqo-transport/src/cc/tests/cubic.rs index 0c82e47817..2e0200fd6d 100644 --- a/third_party/rust/neqo-transport/src/cc/tests/cubic.rs +++ b/third_party/rust/neqo-transport/src/cc/tests/cubic.rs @@ -8,7 +8,6 @@ #![allow(clippy::cast_sign_loss)] use std::{ - convert::TryFrom, ops::Sub, time::{Duration, Instant}, }; diff --git a/third_party/rust/neqo-transport/src/cc/tests/mod.rs b/third_party/rust/neqo-transport/src/cc/tests/mod.rs index 238a7ad012..879693fb24 100644 --- a/third_party/rust/neqo-transport/src/cc/tests/mod.rs +++ b/third_party/rust/neqo-transport/src/cc/tests/mod.rs @@ -1,3 +1,4 @@ +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your // option. This file may not be copied, modified, or distributed diff --git a/third_party/rust/neqo-transport/src/cc/tests/new_reno.rs b/third_party/rust/neqo-transport/src/cc/tests/new_reno.rs index a73844a755..4cc20de5a7 100644 --- a/third_party/rust/neqo-transport/src/cc/tests/new_reno.rs +++ b/third_party/rust/neqo-transport/src/cc/tests/new_reno.rs @@ -5,7 +5,6 @@ // except according to those terms. // Congestion control -#![deny(clippy::pedantic)] use std::time::Duration; diff --git a/third_party/rust/neqo-transport/src/cid.rs b/third_party/rust/neqo-transport/src/cid.rs index be202daf25..6b3a95eaf0 100644 --- a/third_party/rust/neqo-transport/src/cid.rs +++ b/third_party/rust/neqo-transport/src/cid.rs @@ -10,14 +10,13 @@ use std::{ borrow::Borrow, cell::{Ref, RefCell}, cmp::{max, min}, - convert::{AsRef, TryFrom}, ops::Deref, rc::Rc, }; use neqo_common::{hex, hex_with_len, qinfo, Decoder, Encoder}; -use neqo_crypto::random; -use smallvec::SmallVec; +use neqo_crypto::{random, randomize}; +use smallvec::{smallvec, SmallVec}; use crate::{ frame::FRAME_TYPE_NEW_CONNECTION_ID, packet::PacketBuilder, recovery::RecoveryToken, @@ -39,19 +38,26 @@ pub struct ConnectionId { } impl ConnectionId { + /// # Panics + /// When `len` is larger than `MAX_CONNECTION_ID_LEN`. + #[must_use] pub fn generate(len: usize) -> Self { assert!(matches!(len, 0..=MAX_CONNECTION_ID_LEN)); - Self::from(random(len)) + let mut cid = smallvec![0; len]; + randomize(&mut cid); + Self { cid } } // Apply a wee bit of greasing here in picking a length between 8 and 20 bytes long. + #[must_use] pub fn generate_initial() -> Self { - let v = random(1); + let v = random::<1>()[0]; // Bias selection toward picking 8 (>50% of the time). - let len: usize = max(8, 5 + (v[0] & (v[0] >> 4))).into(); + let len: usize = max(8, 5 + (v & (v >> 4))).into(); Self::generate(len) } + #[must_use] pub fn as_cid_ref(&self) -> ConnectionIdRef { ConnectionIdRef::from(&self.cid[..]) } @@ -75,12 +81,6 @@ impl From<SmallVec<[u8; MAX_CONNECTION_ID_LEN]>> for ConnectionId { } } -impl From<Vec<u8>> for ConnectionId { - fn from(cid: Vec<u8>) -> Self { - Self::from(SmallVec::from(cid)) - } -} - impl<T: AsRef<[u8]> + ?Sized> From<&T> for ConnectionId { fn from(buf: &T) -> Self { Self::from(SmallVec::from(buf.as_ref())) @@ -201,7 +201,7 @@ impl ConnectionIdGenerator for EmptyConnectionIdGenerator { } } -/// An RandomConnectionIdGenerator produces connection IDs of +/// An `RandomConnectionIdGenerator` produces connection IDs of /// a fixed length and random content. No effort is made to /// prevent collisions. pub struct RandomConnectionIdGenerator { @@ -209,6 +209,7 @@ pub struct RandomConnectionIdGenerator { } impl RandomConnectionIdGenerator { + #[must_use] pub fn new(len: usize) -> Self { Self { len } } @@ -222,7 +223,9 @@ impl ConnectionIdDecoder for RandomConnectionIdGenerator { impl ConnectionIdGenerator for RandomConnectionIdGenerator { fn generate_cid(&mut self) -> Option<ConnectionId> { - Some(ConnectionId::from(&random(self.len))) + let mut buf = smallvec![0; self.len]; + randomize(&mut buf); + Some(ConnectionId::from(buf)) } fn as_decoder(&self) -> &dyn ConnectionIdDecoder { @@ -234,7 +237,7 @@ impl ConnectionIdGenerator for RandomConnectionIdGenerator { } } -/// A single connection ID, as saved from NEW_CONNECTION_ID. +/// A single connection ID, as saved from `NEW_CONNECTION_ID`. /// This is templated so that the connection ID entries from a peer can be /// saved with a stateless reset token. Local entries don't need that. #[derive(Debug, PartialEq, Eq, Clone)] @@ -250,8 +253,8 @@ pub struct ConnectionIdEntry<SRT: Clone + PartialEq> { impl ConnectionIdEntry<[u8; 16]> { /// Create a random stateless reset token so that it is hard to guess the correct /// value and reset the connection. - fn random_srt() -> [u8; 16] { - <[u8; 16]>::try_from(&random(16)[..]).unwrap() + pub fn random_srt() -> [u8; 16] { + random::<16>() } /// Create the first entry, which won't have a stateless reset token. @@ -294,6 +297,23 @@ impl ConnectionIdEntry<[u8; 16]> { pub fn sequence_number(&self) -> u64 { self.seqno } + + /// Write the entry out in a `NEW_CONNECTION_ID` frame. + /// Returns `true` if the frame was written, `false` if there is insufficient space. + pub fn write(&self, builder: &mut PacketBuilder, stats: &mut FrameStats) -> bool { + let len = 1 + Encoder::varint_len(self.seqno) + 1 + 1 + self.cid.len() + 16; + if builder.remaining() < len { + return false; + } + + builder.encode_varint(FRAME_TYPE_NEW_CONNECTION_ID); + builder.encode_varint(self.seqno); + builder.encode_varint(0u64); + builder.encode_vec(1, &self.cid); + builder.encode(&self.srt); + stats.new_connection_id += 1; + true + } } impl ConnectionIdEntry<()> { @@ -430,7 +450,7 @@ pub struct ConnectionIdManager { limit: usize, /// The next sequence number that will be used for sending `NEW_CONNECTION_ID` frames. next_seqno: u64, - /// Outstanding, but lost NEW_CONNECTION_ID frames will be stored here. + /// Outstanding, but lost `NEW_CONNECTION_ID` frames will be stored here. lost_new_connection_id: Vec<ConnectionIdEntry<[u8; 16]>>, } @@ -476,7 +496,7 @@ impl ConnectionIdManager { .add_local(ConnectionIdEntry::new(self.next_seqno, cid.clone(), ())); self.next_seqno += 1; - let srt = <[u8; 16]>::try_from(&random(16)[..]).unwrap(); + let srt = ConnectionIdEntry::random_srt(); Ok((cid, srt)) } else { Err(Error::ConnectionIdsExhausted) @@ -516,39 +536,19 @@ impl ConnectionIdManager { ); } - fn write_entry( - &mut self, - entry: &ConnectionIdEntry<[u8; 16]>, - builder: &mut PacketBuilder, - stats: &mut FrameStats, - ) -> Res<bool> { - let len = 1 + Encoder::varint_len(entry.seqno) + 1 + 1 + entry.cid.len() + 16; - if builder.remaining() < len { - return Ok(false); - } - - builder.encode_varint(FRAME_TYPE_NEW_CONNECTION_ID); - builder.encode_varint(entry.seqno); - builder.encode_varint(0u64); - builder.encode_vec(1, &entry.cid); - builder.encode(&entry.srt); - stats.new_connection_id += 1; - Ok(true) - } - pub fn write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, - ) -> Res<()> { + ) { if self.generator.deref().borrow().generates_empty_cids() { debug_assert_eq!(self.generator.borrow_mut().generate_cid().unwrap().len(), 0); - return Ok(()); + return; } while let Some(entry) = self.lost_new_connection_id.pop() { - if self.write_entry(&entry, builder, stats)? { + if entry.write(builder, stats) { tokens.push(RecoveryToken::NewConnectionId(entry)); } else { // This shouldn't happen often. @@ -565,7 +565,7 @@ impl ConnectionIdManager { if let Some(cid) = maybe_cid { assert_ne!(cid.len(), 0); // TODO: generate the stateless reset tokens from the connection ID and a key. - let srt = <[u8; 16]>::try_from(&random(16)[..]).unwrap(); + let srt = ConnectionIdEntry::random_srt(); let seqno = self.next_seqno; self.next_seqno += 1; @@ -573,11 +573,10 @@ impl ConnectionIdManager { .add_local(ConnectionIdEntry::new(seqno, cid.clone(), ())); let entry = ConnectionIdEntry::new(seqno, cid, srt); - debug_assert!(self.write_entry(&entry, builder, stats)?); + entry.write(builder, stats); tokens.push(RecoveryToken::NewConnectionId(entry)); } } - Ok(()) } pub fn lost(&mut self, entry: &ConnectionIdEntry<[u8; 16]>) { @@ -594,16 +593,17 @@ impl ConnectionIdManager { mod tests { use test_fixture::fixture_init; - use super::*; + use crate::{cid::MAX_CONNECTION_ID_LEN, ConnectionId}; #[test] fn generate_initial_cid() { fixture_init(); for _ in 0..100 { let cid = ConnectionId::generate_initial(); - if !matches!(cid.len(), 8..=MAX_CONNECTION_ID_LEN) { - panic!("connection ID {:?}", cid); - } + assert!( + matches!(cid.len(), 8..=MAX_CONNECTION_ID_LEN), + "connection ID length {cid:?}", + ); } } } diff --git a/third_party/rust/neqo-transport/src/connection/dump.rs b/third_party/rust/neqo-transport/src/connection/dump.rs index 77d51c605c..8a4f34dbb8 100644 --- a/third_party/rust/neqo-transport/src/connection/dump.rs +++ b/third_party/rust/neqo-transport/src/connection/dump.rs @@ -27,11 +27,11 @@ pub fn dump_packet( pn: PacketNumber, payload: &[u8], ) { - if !log::log_enabled!(log::Level::Debug) { + if log::STATIC_MAX_LEVEL == log::LevelFilter::Off || !log::log_enabled!(log::Level::Debug) { return; } - let mut s = String::from(""); + let mut s = String::new(); let mut d = Decoder::from(payload); while d.remaining() > 0 { let Ok(f) = Frame::decode(&mut d) else { diff --git a/third_party/rust/neqo-transport/src/connection/mod.rs b/third_party/rust/neqo-transport/src/connection/mod.rs index 2de388418a..c81a3727c6 100644 --- a/third_party/rust/neqo-transport/src/connection/mod.rs +++ b/third_party/rust/neqo-transport/src/connection/mod.rs @@ -9,7 +9,6 @@ use std::{ cell::RefCell, cmp::{max, min}, - convert::TryFrom, fmt::{self, Debug}, mem, net::{IpAddr, SocketAddr}, @@ -23,7 +22,7 @@ use neqo_common::{ qlog::NeqoQlog, qtrace, qwarn, Datagram, Decoder, Encoder, Role, }; use neqo_crypto::{ - agent::CertificateInfo, random, Agent, AntiReplay, AuthenticationStatus, Cipher, Client, Group, + agent::CertificateInfo, Agent, AntiReplay, AuthenticationStatus, Cipher, Client, Group, HandshakeState, PrivateKey, PublicKey, ResumptionToken, SecretAgentInfo, SecretAgentPreInfo, Server, ZeroRttChecker, }; @@ -48,6 +47,7 @@ use crate::{ recovery::{LossRecovery, RecoveryToken, SendProfile}, recv_stream::RecvStreamStats, rtt::GRANULARITY, + send_stream::SendStream, stats::{Stats, StatsCell}, stream_id::StreamType, streams::{SendOrder, Streams}, @@ -59,6 +59,7 @@ use crate::{ version::{Version, WireVersion}, AppError, ConnectionError, Error, Res, StreamId, }; + mod dump; mod idle; pub mod params; @@ -66,6 +67,7 @@ mod saved; mod state; #[cfg(test)] pub mod test_internal; + use dump::dump_packet; use idle::IdleTimeout; pub use params::ConnectionParameters; @@ -78,9 +80,6 @@ pub use state::{ClosingFrame, State}; pub use crate::send_stream::{RetransmissionPriority, SendStreamStats, TransmissionPriority}; -#[derive(Debug, Default)] -struct Packet(Vec<u8>); - /// The number of Initial packets that the client will send in response /// to receiving an undecryptable packet during the early part of the /// handshake. This is a hack, but a useful one. @@ -96,7 +95,7 @@ pub enum ZeroRttState { } #[derive(Clone, Debug, PartialEq, Eq)] -/// Type returned from process() and `process_output()`. Users are required to +/// Type returned from `process()` and `process_output()`. Users are required to /// call these repeatedly until `Callback` or `None` is returned. pub enum Output { /// Connection requires no action. @@ -119,6 +118,7 @@ impl Output { } /// Get a reference to the Datagram, if any. + #[must_use] pub fn as_dgram_ref(&self) -> Option<&Datagram> { match self { Self::Datagram(dg) => Some(dg), @@ -136,7 +136,7 @@ impl Output { } } -/// Used by inner functions like Connection::output. +/// Used by inner functions like `Connection::output`. enum SendOption { /// Yes, please send this datagram. Yes(Datagram), @@ -257,7 +257,7 @@ pub struct Connection { /// Some packets were received, but not tracked. received_untracked: bool, - /// This is responsible for the QuicDatagrams' handling: + /// This is responsible for the `QuicDatagrams`' handling: /// <https://datatracker.ietf.org/doc/html/draft-ietf-quic-datagram> quic_datagrams: QuicDatagrams, @@ -271,8 +271,8 @@ pub struct Connection { new_token: NewTokenState, stats: StatsCell, qlog: NeqoQlog, - /// A session ticket was received without NEW_TOKEN, - /// this is when that turns into an event without NEW_TOKEN. + /// A session ticket was received without `NEW_TOKEN`, + /// this is when that turns into an event without `NEW_TOKEN`. release_resumption_token_timer: Option<Instant>, conn_params: ConnectionParameters, hrtime: hrtime::Handle, @@ -302,6 +302,8 @@ impl Connection { const LOOSE_TIMER_RESOLUTION: Duration = Duration::from_millis(50); /// Create a new QUIC connection with Client role. + /// # Errors + /// When NSS fails and an agent cannot be created. pub fn new_client( server_name: impl Into<String>, protocols: &[impl AsRef<str>], @@ -338,6 +340,8 @@ impl Connection { } /// Create a new QUIC connection with Server role. + /// # Errors + /// When NSS fails and an agent cannot be created. pub fn new_server( certs: &[impl AsRef<str>], protocols: &[impl AsRef<str>], @@ -427,6 +431,8 @@ impl Connection { Ok(c) } + /// # Errors + /// When the operation fails. pub fn server_enable_0rtt( &mut self, anti_replay: &AntiReplay, @@ -436,6 +442,8 @@ impl Connection { .server_enable_0rtt(self.tps.clone(), anti_replay, zero_rtt_checker) } + /// # Errors + /// When the operation fails. pub fn server_enable_ech( &mut self, config: u8, @@ -447,10 +455,13 @@ impl Connection { } /// Get the active ECH configuration, which is empty if ECH is disabled. + #[must_use] pub fn ech_config(&self) -> &[u8] { self.crypto.ech_config() } + /// # Errors + /// When the operation fails. pub fn client_enable_ech(&mut self, ech_config_list: impl AsRef<[u8]>) -> Res<()> { self.crypto.client_enable_ech(ech_config_list) } @@ -468,8 +479,9 @@ impl Connection { } /// Get the original destination connection id for this connection. This - /// will always be present for Role::Client but not if Role::Server is in - /// State::Init. + /// will always be present for `Role::Client` but not if `Role::Server` is in + /// `State::Init`. + #[must_use] pub fn odcid(&self) -> Option<&ConnectionId> { self.original_destination_cid.as_ref() } @@ -478,8 +490,9 @@ impl Connection { /// This only sets transport parameters without dealing with other aspects of /// setting the value. /// + /// # Errors + /// When the transport parameter is invalid. /// # Panics - /// /// This panics if the transport parameter is known to this crate. pub fn set_local_tparam(&self, tp: TransportParameterId, value: TransportParameter) -> Res<()> { #[cfg(not(test))] @@ -502,9 +515,9 @@ impl Connection { /// Retry. pub(crate) fn set_retry_cids( &mut self, - odcid: ConnectionId, + odcid: &ConnectionId, remote_cid: ConnectionId, - retry_cid: ConnectionId, + retry_cid: &ConnectionId, ) { debug_assert_eq!(self.role, Role::Server); qtrace!( @@ -533,12 +546,16 @@ impl Connection { /// Set ALPN preferences. Strings that appear earlier in the list are given /// higher preference. + /// # Errors + /// When the operation fails, which is usually due to bad inputs or bad connection state. pub fn set_alpn(&mut self, protocols: &[impl AsRef<str>]) -> Res<()> { self.crypto.tls.set_alpn(protocols)?; Ok(()) } /// Enable a set of ciphers. + /// # Errors + /// When the operation fails, which is usually due to bad inputs or bad connection state. pub fn set_ciphers(&mut self, ciphers: &[Cipher]) -> Res<()> { if self.state != State::Init { qerror!([self], "Cannot enable ciphers in state {:?}", self.state); @@ -549,6 +566,8 @@ impl Connection { } /// Enable a set of key exchange groups. + /// # Errors + /// When the operation fails, which is usually due to bad inputs or bad connection state. pub fn set_groups(&mut self, groups: &[Group]) -> Res<()> { if self.state != State::Init { qerror!([self], "Cannot enable groups in state {:?}", self.state); @@ -559,6 +578,8 @@ impl Connection { } /// Set the number of additional key shares to send in the client hello. + /// # Errors + /// When the operation fails, which is usually due to bad inputs or bad connection state. pub fn send_additional_key_shares(&mut self, count: usize) -> Res<()> { if self.state != State::Init { qerror!([self], "Cannot enable groups in state {:?}", self.state); @@ -667,6 +688,8 @@ impl Connection { /// This can only be called once and only on the client. /// After calling the function, it should be possible to attempt 0-RTT /// if the token supports that. + /// # Errors + /// When the operation fails, which is usually due to bad inputs or bad connection state. pub fn enable_resumption(&mut self, now: Instant, token: impl AsRef<[u8]>) -> Res<()> { if self.state != State::Init { qerror!([self], "set token in state {:?}", self.state); @@ -683,8 +706,9 @@ impl Connection { ); let mut dec = Decoder::from(token.as_ref()); - let version = - Version::try_from(dec.decode_uint(4).ok_or(Error::InvalidResumptionToken)? as u32)?; + let version = Version::try_from(u32::try_from( + dec.decode_uint(4).ok_or(Error::InvalidResumptionToken)?, + )?)?; qtrace!([self], " version {:?}", version); if !self.conn_params.get_versions().all().contains(&version) { return Err(Error::DisabledVersion); @@ -732,13 +756,15 @@ impl Connection { Ok(()) } - pub(crate) fn set_validation(&mut self, validation: Rc<RefCell<AddressValidation>>) { + pub(crate) fn set_validation(&mut self, validation: &Rc<RefCell<AddressValidation>>) { qtrace!([self], "Enabling NEW_TOKEN"); assert_eq!(self.role, Role::Server); - self.address_validation = AddressValidationInfo::Server(Rc::downgrade(&validation)); + self.address_validation = AddressValidationInfo::Server(Rc::downgrade(validation)); } - /// Send a TLS session ticket AND a NEW_TOKEN frame (if possible). + /// Send a TLS session ticket AND a `NEW_TOKEN` frame (if possible). + /// # Errors + /// When the operation fails, which is usually due to bad inputs or bad connection state. pub fn send_ticket(&mut self, now: Instant, extra: &[u8]) -> Res<()> { if self.role == Role::Client { return Err(Error::WrongRole); @@ -774,15 +800,19 @@ impl Connection { } } + #[must_use] pub fn tls_info(&self) -> Option<&SecretAgentInfo> { self.crypto.tls.info() } + /// # Errors + /// When there is no information to obtain. pub fn tls_preinfo(&self) -> Res<SecretAgentPreInfo> { Ok(self.crypto.tls.preinfo()?) } /// Get the peer's certificate chain and other info. + #[must_use] pub fn peer_certificate(&self) -> Option<CertificateInfo> { self.crypto.tls.peer_certificate() } @@ -802,26 +832,31 @@ impl Connection { } /// Get the role of the connection. + #[must_use] pub fn role(&self) -> Role { self.role } /// Get the state of the connection. + #[must_use] pub fn state(&self) -> &State { &self.state } /// The QUIC version in use. + #[must_use] pub fn version(&self) -> Version { self.version } /// Get the 0-RTT state of the connection. + #[must_use] pub fn zero_rtt_state(&self) -> ZeroRttState { self.zero_rtt_state } /// Get a snapshot of collected statistics. + #[must_use] pub fn stats(&self) -> Stats { let mut v = self.stats.borrow().clone(); if let Some(p) = self.paths.primary_fallible() { @@ -888,7 +923,7 @@ impl Connection { res } - /// For use with process_input(). Errors there can be ignored, but this + /// For use with `process_input()`. Errors there can be ignored, but this /// needs to ensure that the state is updated. fn absorb_error<T>(&mut self, now: Instant, res: Res<T>) -> Option<T> { self.capture_error(None, now, 0, res).ok() @@ -1234,6 +1269,7 @@ impl Connection { /// Perform any processing that we might have to do on packets prior to /// attempting to remove protection. + #[allow(clippy::too_many_lines)] // Yeah, it's a work in progress. fn preprocess_packet( &mut self, packet: &PublicPacket, @@ -1346,17 +1382,17 @@ impl Connection { } State::WaitInitial => PreprocessResult::Continue, State::WaitVersion | State::Handshaking | State::Connected | State::Confirmed => { - if !self.cid_manager.is_valid(packet.dcid()) { - self.stats - .borrow_mut() - .pkt_dropped(format!("Invalid DCID {:?}", packet.dcid())); - PreprocessResult::Next - } else { + if self.cid_manager.is_valid(packet.dcid()) { if self.role == Role::Server && packet.packet_type() == PacketType::Handshake { // Server has received a Handshake packet -> discard Initial keys and states self.discard_keys(PacketNumberSpace::Initial, now); } PreprocessResult::Continue + } else { + self.stats + .borrow_mut() + .pkt_dropped(format!("Invalid DCID {:?}", packet.dcid())); + PreprocessResult::Next } } State::Closing { .. } => { @@ -1376,7 +1412,7 @@ impl Connection { Ok(res) } - /// After a Initial, Handshake, ZeroRtt, or Short packet is successfully processed. + /// After a Initial, Handshake, `ZeroRtt`, or Short packet is successfully processed. fn postprocess_packet( &mut self, path: &PathRef, @@ -1576,7 +1612,6 @@ impl Connection { /// During connection setup, the first path needs to be setup. /// This uses the connection IDs that were provided during the handshake /// to setup that path. - #[allow(clippy::or_fun_call)] // Remove when MSRV >= 1.59 fn setup_handshake_path(&mut self, path: &PathRef, now: Instant) { self.paths.make_permanent( path, @@ -1616,7 +1651,7 @@ impl Connection { } } - /// After an error, a permanent path is needed to send the CONNECTION_CLOSE. + /// After an error, a permanent path is needed to send the `CONNECTION_CLOSE`. /// This attempts to ensure that this exists. As the connection is now /// temporary, there is no reason to do anything special here. fn ensure_error_path(&mut self, path: &PathRef, packet: &PublicPacket, now: Instant) { @@ -1815,7 +1850,7 @@ impl Connection { State::Closing { .. } | State::Draining { .. } | State::Closed(_) => { if let Some(details) = self.state_signaling.close_frame() { let path = Rc::clone(details.path()); - let res = self.output_close(details); + let res = self.output_close(&details); self.capture_error(Some(path), now, 0, res) } else { Ok(SendOption::default()) @@ -1892,7 +1927,7 @@ impl Connection { } } - fn output_close(&mut self, close: ClosingFrame) -> Res<SendOption> { + fn output_close(&mut self, close: &ClosingFrame) -> Res<SendOption> { let mut encoder = Encoder::with_capacity(256); let grease_quic_bit = self.can_grease_quic_bit(); let version = self.version(); @@ -1902,6 +1937,14 @@ impl Connection { }; let path = close.path().borrow(); + // In some error cases, we will not be able to make a new, permanent path. + // For example, if we run out of connection IDs and the error results from + // a packet on a new path, we avoid sending (and the privacy risk) rather + // than reuse a connection ID. + if path.is_temporary() { + assert!(!cfg!(test), "attempting to close with a temporary path"); + return Err(Error::InternalError); + } let (_, mut builder) = Self::build_packet_header( &path, cspace, @@ -1932,7 +1975,7 @@ impl Connection { }; sanitized .as_ref() - .unwrap_or(&close) + .unwrap_or(close) .write_frame(&mut builder); encoder = builder.build(tx)?; } @@ -1946,11 +1989,11 @@ impl Connection { &mut self, builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, - ) -> Res<()> { + ) { let stats = &mut self.stats.borrow_mut(); let frame_stats = &mut stats.frame_tx; if self.role == Role::Server { - if let Some(t) = self.state_signaling.write_done(builder)? { + if let Some(t) = self.state_signaling.write_done(builder) { tokens.push(t); frame_stats.handshake_done += 1; } @@ -1959,7 +2002,7 @@ impl Connection { self.streams .write_frames(TransmissionPriority::Critical, builder, tokens, frame_stats); if builder.is_full() { - return Ok(()); + return; } self.streams.write_frames( @@ -1969,36 +2012,35 @@ impl Connection { frame_stats, ); if builder.is_full() { - return Ok(()); + return; } // NEW_CONNECTION_ID, RETIRE_CONNECTION_ID, and ACK_FREQUENCY. - self.cid_manager - .write_frames(builder, tokens, frame_stats)?; + self.cid_manager.write_frames(builder, tokens, frame_stats); if builder.is_full() { - return Ok(()); + return; } self.paths.write_frames(builder, tokens, frame_stats); if builder.is_full() { - return Ok(()); + return; } self.streams .write_frames(TransmissionPriority::High, builder, tokens, frame_stats); if builder.is_full() { - return Ok(()); + return; } self.streams .write_frames(TransmissionPriority::Normal, builder, tokens, frame_stats); if builder.is_full() { - return Ok(()); + return; } // Datagrams are best-effort and unreliable. Let streams starve them for now. self.quic_datagrams.write_frames(builder, tokens, stats); if builder.is_full() { - return Ok(()); + return; } let frame_stats = &mut stats.frame_tx; @@ -2009,13 +2051,13 @@ impl Connection { builder, tokens, frame_stats, - )?; + ); if builder.is_full() { - return Ok(()); + return; } - self.new_token.write_frames(builder, tokens, frame_stats)?; + self.new_token.write_frames(builder, tokens, frame_stats); if builder.is_full() { - return Ok(()); + return; } self.streams @@ -2027,8 +2069,6 @@ impl Connection { w.write_frames(builder); } } - - Ok(()) } // Maybe send a probe. Return true if the packet was ack-eliciting. @@ -2089,7 +2129,7 @@ impl Connection { profile: &SendProfile, builder: &mut PacketBuilder, now: Instant, - ) -> Res<(Vec<RecoveryToken>, bool, bool)> { + ) -> (Vec<RecoveryToken>, bool, bool) { let mut tokens = Vec::new(); let primary = path.borrow().is_primary(); let mut ack_eliciting = false; @@ -2125,16 +2165,15 @@ impl Connection { if profile.ack_only(space) { // If we are CC limited we can only send acks! - return Ok((tokens, false, false)); + return (tokens, false, false); } if primary { if space == PacketNumberSpace::ApplicationData { - self.write_appdata_frames(builder, &mut tokens)?; + self.write_appdata_frames(builder, &mut tokens); } else { let stats = &mut self.stats.borrow_mut().frame_tx; - self.crypto - .write_frame(space, builder, &mut tokens, stats)?; + self.crypto.write_frame(space, builder, &mut tokens, stats); } } @@ -2158,11 +2197,12 @@ impl Connection { }; stats.all += tokens.len(); - Ok((tokens, ack_eliciting, padded)) + (tokens, ack_eliciting, padded) } /// Build a datagram, possibly from multiple packets (for different PN /// spaces) and each containing 1+ frames. + #[allow(clippy::too_many_lines)] // Yeah, that's just the way it is. fn output_path(&mut self, path: &PathRef, now: Instant) -> Res<SendOption> { let mut initial_sent = None; let mut needs_padding = false; @@ -2217,7 +2257,7 @@ impl Connection { // Add frames to the packet. let payload_start = builder.len(); let (tokens, ack_eliciting, padded) = - self.write_frames(path, *space, &profile, &mut builder, now)?; + self.write_frames(path, *space, &profile, &mut builder, now); if builder.packet_empty() { // Nothing to include in this packet. encoder = builder.abort(); @@ -2306,6 +2346,8 @@ impl Connection { } } + /// # Errors + /// When connection state is not valid. pub fn initiate_key_update(&mut self) -> Res<()> { if self.state == State::Confirmed { let la = self @@ -2319,6 +2361,7 @@ impl Connection { } #[cfg(test)] + #[must_use] pub fn get_epochs(&self) -> (Option<usize>, Option<usize>) { self.crypto.states.get_epochs() } @@ -2377,6 +2420,7 @@ impl Connection { ); } + #[must_use] pub fn is_stream_id_allowed(&self, stream_id: StreamId) -> bool { self.streams.is_stream_id_allowed(stream_id) } @@ -2404,7 +2448,7 @@ impl Connection { } else { // The other side didn't provide a stateless reset token. // That's OK, they can try guessing this. - <[u8; 16]>::try_from(&random(16)[..]).unwrap() + ConnectionIdEntry::random_srt() }; self.paths .primary() @@ -2585,10 +2629,16 @@ impl Connection { ) -> Res<()> { qtrace!([self], "Handshake space={} data={:0x?}", space, data); + let was_authentication_pending = + *self.crypto.tls.state() == HandshakeState::AuthenticationPending; let try_update = data.is_some(); match self.crypto.handshake(now, space, data)? { HandshakeState::Authenticated(_) | HandshakeState::InProgress => (), - HandshakeState::AuthenticationPending => self.events.authentication_needed(), + HandshakeState::AuthenticationPending => { + if !was_authentication_pending { + self.events.authentication_needed(); + } + } HandshakeState::EchFallbackAuthenticationPending(public_name) => self .events .ech_fallback_authentication_needed(public_name.clone()), @@ -2623,6 +2673,7 @@ impl Connection { Ok(()) } + #[allow(clippy::too_many_lines)] // Yep, but it's a nice big match, which is basically lots of little functions. fn input_frame( &mut self, path: &PathRef, @@ -2640,7 +2691,7 @@ impl Connection { if frame.is_stream() { return self .streams - .input_frame(frame, &mut self.stats.borrow_mut().frame_rx); + .input_frame(&frame, &mut self.stats.borrow_mut().frame_rx); } match frame { Frame::Padding => { @@ -3005,11 +3056,10 @@ impl Connection { Ok(()) } - /// Set the SendOrder of a stream. Re-enqueues to keep the ordering correct + /// Set the `SendOrder` of a stream. Re-enqueues to keep the ordering correct /// /// # Errors - /// - /// Returns InvalidStreamId if the stream id doesn't exist + /// When the stream does not exist. pub fn stream_sendorder( &mut self, stream_id: StreamId, @@ -3021,16 +3071,21 @@ impl Connection { /// Set the Fairness of a stream /// /// # Errors - /// - /// Returns InvalidStreamId if the stream id doesn't exist + /// When the stream does not exist. pub fn stream_fairness(&mut self, stream_id: StreamId, fairness: bool) -> Res<()> { self.streams.set_fairness(stream_id, fairness) } + /// # Errors + /// When the stream does not exist. pub fn send_stream_stats(&self, stream_id: StreamId) -> Res<SendStreamStats> { - self.streams.get_send_stream(stream_id).map(|s| s.stats()) + self.streams + .get_send_stream(stream_id) + .map(SendStream::stats) } + /// # Errors + /// When the stream does not exist. pub fn recv_stream_stats(&mut self, stream_id: StreamId) -> Res<RecvStreamStats> { let stream = self.streams.get_recv_stream_mut(stream_id)?; @@ -3050,8 +3105,8 @@ impl Connection { self.streams.get_send_stream_mut(stream_id)?.send(data) } - /// Send all data or nothing on a stream. May cause DATA_BLOCKED or - /// STREAM_DATA_BLOCKED frames to be sent. + /// Send all data or nothing on a stream. May cause `DATA_BLOCKED` or + /// `STREAM_DATA_BLOCKED` frames to be sent. /// Returns true if data was successfully sent, otherwise false. /// /// # Errors @@ -3075,20 +3130,26 @@ impl Connection { val.map(|v| v == data.len()) } - /// Bytes that stream_send() is guaranteed to accept for sending. + /// Bytes that `stream_send()` is guaranteed to accept for sending. /// i.e. that will not be blocked by flow credits or send buffer max /// capacity. + /// # Errors + /// When the stream ID is invalid. pub fn stream_avail_send_space(&self, stream_id: StreamId) -> Res<usize> { Ok(self.streams.get_send_stream(stream_id)?.avail()) } /// Close the stream. Enqueued data will be sent. + /// # Errors + /// When the stream ID is invalid. pub fn stream_close_send(&mut self, stream_id: StreamId) -> Res<()> { self.streams.get_send_stream_mut(stream_id)?.close(); Ok(()) } /// Abandon transmission of in-flight and future stream data. + /// # Errors + /// When the stream ID is invalid. pub fn stream_reset_send(&mut self, stream_id: StreamId, err: AppError) -> Res<()> { self.streams.get_send_stream_mut(stream_id)?.reset(err); Ok(()) @@ -3109,6 +3170,8 @@ impl Connection { } /// Application is no longer interested in this stream. + /// # Errors + /// When the stream ID is invalid. pub fn stream_stop_sending(&mut self, stream_id: StreamId, err: AppError) -> Res<()> { let stream = self.streams.get_recv_stream_mut(stream_id)?; @@ -3142,6 +3205,7 @@ impl Connection { self.streams.keep_alive(stream_id, keep) } + #[must_use] pub fn remote_datagram_size(&self) -> u64 { self.quic_datagrams.remote_datagram_size() } @@ -3150,9 +3214,10 @@ impl Connection { /// The value will change over time depending on the encoded size of the /// packet number, ack frames, etc. /// - /// # Error - /// + /// # Errors /// The function returns `NotAvailable` if datagrams are not enabled. + /// # Panics + /// Basically never, because that unwrap won't fail. pub fn max_datagram_size(&self) -> Res<u64> { let max_dgram_size = self.quic_datagrams.remote_datagram_size(); if max_dgram_size == 0 { @@ -3193,7 +3258,7 @@ impl Connection { /// Queue a datagram for sending. /// - /// # Error + /// # Errors /// /// The function returns `TooMuchData` if the supply buffer is bigger than /// the allowed remote datagram size. The funcion does not check if the @@ -3203,7 +3268,6 @@ impl Connection { /// to check the estimated max datagram size and to use smaller datagrams. /// `max_datagram_size` is just a current estimate and will change over /// time depending on the encoded size of the packet number, ack frames, etc. - pub fn send_datagram(&mut self, buf: &[u8], id: impl Into<DatagramTracking>) -> Res<()> { self.quic_datagrams .add_datagram(buf, id.into(), &mut self.stats.borrow_mut()) diff --git a/third_party/rust/neqo-transport/src/connection/params.rs b/third_party/rust/neqo-transport/src/connection/params.rs index 48aba4303b..72d1efa3ee 100644 --- a/third_party/rust/neqo-transport/src/connection/params.rs +++ b/third_party/rust/neqo-transport/src/connection/params.rs @@ -4,7 +4,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::{cmp::max, convert::TryFrom, time::Duration}; +use std::{cmp::max, time::Duration}; pub use crate::recovery::FAST_PTO_SCALE; use crate::{ @@ -41,7 +41,7 @@ pub enum PreferredAddressConfig { Address(PreferredAddress), } -/// ConnectionParameters use for setting intitial value for QUIC parameters. +/// `ConnectionParameters` use for setting intitial value for QUIC parameters. /// This collects configuration like initial limits, protocol version, and /// congestion control algorithm. #[derive(Debug, Clone)] @@ -108,6 +108,7 @@ impl Default for ConnectionParameters { } impl ConnectionParameters { + #[must_use] pub fn get_versions(&self) -> &VersionConfig { &self.versions } @@ -120,29 +121,35 @@ impl ConnectionParameters { /// versions that should be enabled. This list should contain the initial /// version and be in order of preference, with more preferred versions /// before less preferred. + #[must_use] pub fn versions(mut self, initial: Version, all: Vec<Version>) -> Self { self.versions = VersionConfig::new(initial, all); self } + #[must_use] pub fn get_cc_algorithm(&self) -> CongestionControlAlgorithm { self.cc_algorithm } + #[must_use] pub fn cc_algorithm(mut self, v: CongestionControlAlgorithm) -> Self { self.cc_algorithm = v; self } + #[must_use] pub fn get_max_data(&self) -> u64 { self.max_data } + #[must_use] pub fn max_data(mut self, v: u64) -> Self { self.max_data = v; self } + #[must_use] pub fn get_max_streams(&self, stream_type: StreamType) -> u64 { match stream_type { StreamType::BiDi => self.max_streams_bidi, @@ -153,6 +160,7 @@ impl ConnectionParameters { /// # Panics /// /// If v > 2^60 (the maximum allowed by the protocol). + #[must_use] pub fn max_streams(mut self, stream_type: StreamType, v: u64) -> Self { assert!(v <= (1 << 60), "max_streams is too large"); match stream_type { @@ -171,6 +179,7 @@ impl ConnectionParameters { /// # Panics /// /// If `StreamType::UniDi` and `false` are passed as that is not a valid combination. + #[must_use] pub fn get_max_stream_data(&self, stream_type: StreamType, remote: bool) -> u64 { match (stream_type, remote) { (StreamType::BiDi, false) => self.max_stream_data_bidi_local, @@ -188,6 +197,7 @@ impl ConnectionParameters { /// /// If `StreamType::UniDi` and `false` are passed as that is not a valid combination /// or if v >= 62 (the maximum allowed by the protocol). + #[must_use] pub fn max_stream_data(mut self, stream_type: StreamType, remote: bool, v: u64) -> Self { assert!(v < (1 << 62), "max stream data is too large"); match (stream_type, remote) { @@ -208,26 +218,31 @@ impl ConnectionParameters { } /// Set a preferred address (which only has an effect for a server). + #[must_use] pub fn preferred_address(mut self, preferred: PreferredAddress) -> Self { self.preferred_address = PreferredAddressConfig::Address(preferred); self } /// Disable the use of preferred addresses. + #[must_use] pub fn disable_preferred_address(mut self) -> Self { self.preferred_address = PreferredAddressConfig::Disabled; self } + #[must_use] pub fn get_preferred_address(&self) -> &PreferredAddressConfig { &self.preferred_address } + #[must_use] pub fn ack_ratio(mut self, ack_ratio: u8) -> Self { self.ack_ratio = ack_ratio; self } + #[must_use] pub fn get_ack_ratio(&self) -> u8 { self.ack_ratio } @@ -235,45 +250,54 @@ impl ConnectionParameters { /// # Panics /// /// If `timeout` is 2^62 milliseconds or more. + #[must_use] pub fn idle_timeout(mut self, timeout: Duration) -> Self { assert!(timeout.as_millis() < (1 << 62), "idle timeout is too long"); self.idle_timeout = timeout; self } + #[must_use] pub fn get_idle_timeout(&self) -> Duration { self.idle_timeout } + #[must_use] pub fn get_datagram_size(&self) -> u64 { self.datagram_size } + #[must_use] pub fn datagram_size(mut self, v: u64) -> Self { self.datagram_size = v; self } + #[must_use] pub fn get_outgoing_datagram_queue(&self) -> usize { self.outgoing_datagram_queue } + #[must_use] pub fn outgoing_datagram_queue(mut self, v: usize) -> Self { // The max queue length must be at least 1. self.outgoing_datagram_queue = max(v, 1); self } + #[must_use] pub fn get_incoming_datagram_queue(&self) -> usize { self.incoming_datagram_queue } + #[must_use] pub fn incoming_datagram_queue(mut self, v: usize) -> Self { // The max queue length must be at least 1. self.incoming_datagram_queue = max(v, 1); self } + #[must_use] pub fn get_fast_pto(&self) -> u8 { self.fast_pto } @@ -293,39 +317,50 @@ impl ConnectionParameters { /// # Panics /// /// A value of 0 is invalid and will cause a panic. + #[must_use] pub fn fast_pto(mut self, scale: u8) -> Self { assert_ne!(scale, 0); self.fast_pto = scale; self } + #[must_use] pub fn is_fuzzing(&self) -> bool { self.fuzzing } + #[must_use] pub fn fuzzing(mut self, enable: bool) -> Self { self.fuzzing = enable; self } + #[must_use] pub fn is_greasing(&self) -> bool { self.grease } + #[must_use] pub fn grease(mut self, grease: bool) -> Self { self.grease = grease; self } + #[must_use] pub fn pacing_enabled(&self) -> bool { self.pacing } + #[must_use] pub fn pacing(mut self, pacing: bool) -> Self { self.pacing = pacing; self } + /// # Errors + /// When a connection ID cannot be obtained. + /// # Panics + /// Only when this code includes a transport parameter that is invalid. pub fn create_transport_parameter( &self, role: Role, diff --git a/third_party/rust/neqo-transport/src/connection/state.rs b/third_party/rust/neqo-transport/src/connection/state.rs index 9afb42174f..9789151d3f 100644 --- a/third_party/rust/neqo-transport/src/connection/state.rs +++ b/third_party/rust/neqo-transport/src/connection/state.rs @@ -21,7 +21,7 @@ use crate::{ packet::PacketBuilder, path::PathRef, recovery::RecoveryToken, - ConnectionError, Error, Res, + ConnectionError, Error, }; #[derive(Clone, Debug, PartialEq, Eq)] @@ -66,6 +66,7 @@ impl State { ) } + #[must_use] pub fn error(&self) -> Option<&ConnectionError> { if let Self::Closing { error, .. } | Self::Draining { error, .. } | Self::Closed(error) = self @@ -184,13 +185,13 @@ impl ClosingFrame { } } -/// `StateSignaling` manages whether we need to send HANDSHAKE_DONE and CONNECTION_CLOSE. +/// `StateSignaling` manages whether we need to send `HANDSHAKE_DONE` and `CONNECTION_CLOSE`. /// Valid state transitions are: -/// * Idle -> HandshakeDone: at the server when the handshake completes -/// * HandshakeDone -> Idle: when a HANDSHAKE_DONE frame is sent +/// * Idle -> `HandshakeDone`: at the server when the handshake completes +/// * `HandshakeDone` -> Idle: when a `HANDSHAKE_DONE` frame is sent /// * Idle/HandshakeDone -> Closing/Draining: when closing or draining -/// * Closing/Draining -> CloseSent: after sending CONNECTION_CLOSE -/// * CloseSent -> Closing: any time a new CONNECTION_CLOSE is needed +/// * Closing/Draining -> `CloseSent`: after sending `CONNECTION_CLOSE` +/// * `CloseSent` -> Closing: any time a new `CONNECTION_CLOSE` is needed /// * -> Reset: from any state in case of a stateless reset #[derive(Debug, Clone)] pub enum StateSignaling { @@ -214,13 +215,13 @@ impl StateSignaling { *self = Self::HandshakeDone; } - pub fn write_done(&mut self, builder: &mut PacketBuilder) -> Res<Option<RecoveryToken>> { + pub fn write_done(&mut self, builder: &mut PacketBuilder) -> Option<RecoveryToken> { if matches!(self, Self::HandshakeDone) && builder.remaining() >= 1 { *self = Self::Idle; builder.encode_varint(FRAME_TYPE_HANDSHAKE_DONE); - Ok(Some(RecoveryToken::HandshakeDone)) + Some(RecoveryToken::HandshakeDone) } else { - Ok(None) + None } } diff --git a/third_party/rust/neqo-transport/src/connection/tests/ackrate.rs b/third_party/rust/neqo-transport/src/connection/tests/ackrate.rs index 1b83d42acd..f0a1d17cd9 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/ackrate.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/ackrate.rs @@ -6,7 +6,7 @@ use std::{mem, time::Duration}; -use test_fixture::{addr_v4, assertions}; +use test_fixture::{assertions, DEFAULT_ADDR_V4}; use super::{ super::{ConnectionParameters, ACK_RATIO_SCALE}, @@ -164,7 +164,7 @@ fn migrate_ack_delay() { let mut now = connect_rtt_idle(&mut client, &mut server, DEFAULT_RTT); client - .migrate(Some(addr_v4()), Some(addr_v4()), true, now) + .migrate(Some(DEFAULT_ADDR_V4), Some(DEFAULT_ADDR_V4), true, now) .unwrap(); let client1 = send_something(&mut client, now); diff --git a/third_party/rust/neqo-transport/src/connection/tests/cc.rs b/third_party/rust/neqo-transport/src/connection/tests/cc.rs index b3467ea67c..b708bc421d 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/cc.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/cc.rs @@ -4,7 +4,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::{convert::TryFrom, mem, time::Duration}; +use std::{mem, time::Duration}; use neqo_common::{qdebug, qinfo, Datagram}; @@ -71,6 +71,7 @@ fn cc_slow_start_to_cong_avoidance_recovery_period() { client.stats().frame_rx.largest_acknowledged, flight1_largest ); + let cwnd_before_cong = cwnd(&client); // Client: send more let (mut c_tx_dgrams, mut now) = fill_cwnd(&mut client, stream_id, now); @@ -93,6 +94,7 @@ fn cc_slow_start_to_cong_avoidance_recovery_period() { client.stats().frame_rx.largest_acknowledged, flight2_largest ); + assert!(cwnd(&client) < cwnd_before_cong); } #[test] diff --git a/third_party/rust/neqo-transport/src/connection/tests/close.rs b/third_party/rust/neqo-transport/src/connection/tests/close.rs index f45e77e549..5351dd0d5c 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/close.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/close.rs @@ -6,7 +6,7 @@ use std::time::Duration; -use test_fixture::{self, datagram, now}; +use test_fixture::{datagram, now}; use super::{ super::{Connection, Output, State}, diff --git a/third_party/rust/neqo-transport/src/connection/tests/datagram.rs b/third_party/rust/neqo-transport/src/connection/tests/datagram.rs index 5b7b8dc0b4..ade8c753be 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/datagram.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/datagram.rs @@ -4,7 +4,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::{cell::RefCell, convert::TryFrom, rc::Rc}; +use std::{cell::RefCell, rc::Rc}; use neqo_common::event::Provider; use test_fixture::now; diff --git a/third_party/rust/neqo-transport/src/connection/tests/fuzzing.rs b/third_party/rust/neqo-transport/src/connection/tests/fuzzing.rs index 5425e1a16e..9924c06fa4 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/fuzzing.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/fuzzing.rs @@ -4,8 +4,6 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -#![cfg_attr(feature = "deny-warnings", deny(warnings))] -#![warn(clippy::pedantic)] #![cfg(feature = "fuzzing")] use neqo_crypto::FIXED_TAG_FUZZING; diff --git a/third_party/rust/neqo-transport/src/connection/tests/handshake.rs b/third_party/rust/neqo-transport/src/connection/tests/handshake.rs index 93385ac1bc..af0352ce90 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/handshake.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/handshake.rs @@ -6,7 +6,6 @@ use std::{ cell::RefCell, - convert::TryFrom, mem, net::{IpAddr, Ipv6Addr, SocketAddr}, rc::Rc, @@ -18,8 +17,8 @@ use neqo_crypto::{ constants::TLS_CHACHA20_POLY1305_SHA256, generate_ech_keys, AuthenticationStatus, }; use test_fixture::{ - self, addr, assertions, assertions::assert_coalesced_0rtt, datagram, fixture_init, now, - split_datagram, + assertions, assertions::assert_coalesced_0rtt, datagram, fixture_init, now, split_datagram, + DEFAULT_ADDR, }; use super::{ @@ -122,8 +121,8 @@ fn no_alpn() { "example.com", &["bad-alpn"], Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), - addr(), - addr(), + DEFAULT_ADDR, + DEFAULT_ADDR, ConnectionParameters::default(), now(), ) @@ -251,8 +250,8 @@ fn chacha20poly1305() { test_fixture::DEFAULT_SERVER_NAME, test_fixture::DEFAULT_ALPN, Rc::new(RefCell::new(EmptyConnectionIdGenerator::default())), - addr(), - addr(), + DEFAULT_ADDR, + DEFAULT_ADDR, ConnectionParameters::default(), now(), ) @@ -347,7 +346,7 @@ fn reorder_05rtt_with_0rtt() { let mut server = default_server(); let validation = AddressValidation::new(now(), ValidateAddress::NoToken).unwrap(); let validation = Rc::new(RefCell::new(validation)); - server.set_validation(Rc::clone(&validation)); + server.set_validation(&validation); let mut now = connect_with_rtt(&mut client, &mut server, now(), RTT); // Include RTT in sending the ticket or the ticket age reported by the @@ -730,8 +729,8 @@ fn connect_one_version() { test_fixture::DEFAULT_SERVER_NAME, test_fixture::DEFAULT_ALPN, Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), - addr(), - addr(), + DEFAULT_ADDR, + DEFAULT_ADDR, ConnectionParameters::default().versions(version, vec![version]), now(), ) @@ -1135,3 +1134,54 @@ fn implicit_rtt_server() { // an RTT estimate from having discarded the Initial packet number space. assert_eq!(server.stats().rtt, RTT); } + +#[test] +fn emit_authentication_needed_once() { + let mut client = default_client(); + + let mut server = Connection::new_server( + test_fixture::LONG_CERT_KEYS, + test_fixture::DEFAULT_ALPN, + Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), + ConnectionParameters::default(), + ) + .expect("create a server"); + + let client1 = client.process(None, now()); + assert!(client1.as_dgram_ref().is_some()); + + // The entire server flight doesn't fit in a single packet because the + // certificate is large, therefore the server will produce 2 packets. + let server1 = server.process(client1.as_dgram_ref(), now()); + assert!(server1.as_dgram_ref().is_some()); + let server2 = server.process(None, now()); + assert!(server2.as_dgram_ref().is_some()); + + let authentication_needed_count = |client: &mut Connection| { + client + .events() + .filter(|e| matches!(e, ConnectionEvent::AuthenticationNeeded)) + .count() + }; + + // Upon receiving the first packet, the client has the server certificate, + // but not yet all required handshake data. It moves to + // `HandshakeState::AuthenticationPending` and emits a + // `ConnectionEvent::AuthenticationNeeded` event. + // + // Note that this is a tiny bit fragile in that it depends on having a certificate + // that is within a fairly narrow range of sizes. It has to fit in a single + // packet, but be large enough that the CertificateVerify message does not + // also fit in the same packet. Our default test setup achieves this, but + // changes to the setup might invalidate this test. + let _ = client.process(server1.as_dgram_ref(), now()); + assert_eq!(1, authentication_needed_count(&mut client)); + assert!(client.peer_certificate().is_some()); + + // The `AuthenticationNeeded` event is still pending a call to + // `Connection::authenticated`. On receiving the second packet from the + // server, the client must not emit a another + // `ConnectionEvent::AuthenticationNeeded`. + let _ = client.process(server2.as_dgram_ref(), now()); + assert_eq!(0, authentication_needed_count(&mut client)); +} diff --git a/third_party/rust/neqo-transport/src/connection/tests/idle.rs b/third_party/rust/neqo-transport/src/connection/tests/idle.rs index c33726917a..5d01131541 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/idle.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/idle.rs @@ -10,7 +10,7 @@ use std::{ }; use neqo_common::{qtrace, Encoder}; -use test_fixture::{self, now, split_datagram}; +use test_fixture::{now, split_datagram}; use super::{ super::{Connection, ConnectionParameters, IdleTimeout, Output, State}, @@ -310,28 +310,20 @@ fn idle_caching() { server.process_input(&dgram.unwrap(), middle); assert_eq!(server.stats().frame_rx.ping, ping_before_s + 1); let mut tokens = Vec::new(); - server - .crypto - .streams - .write_frame( - PacketNumberSpace::Initial, - &mut builder, - &mut tokens, - &mut FrameStats::default(), - ) - .unwrap(); + server.crypto.streams.write_frame( + PacketNumberSpace::Initial, + &mut builder, + &mut tokens, + &mut FrameStats::default(), + ); assert_eq!(tokens.len(), 1); tokens.clear(); - server - .crypto - .streams - .write_frame( - PacketNumberSpace::Initial, - &mut builder, - &mut tokens, - &mut FrameStats::default(), - ) - .unwrap(); + server.crypto.streams.write_frame( + PacketNumberSpace::Initial, + &mut builder, + &mut tokens, + &mut FrameStats::default(), + ); assert!(tokens.is_empty()); let dgram = server.process_output(middle).dgram(); diff --git a/third_party/rust/neqo-transport/src/connection/tests/keys.rs b/third_party/rust/neqo-transport/src/connection/tests/keys.rs index c247bba670..847b253284 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/keys.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/keys.rs @@ -7,7 +7,7 @@ use std::mem; use neqo_common::{qdebug, Datagram}; -use test_fixture::{self, now}; +use test_fixture::now; use super::{ super::{ diff --git a/third_party/rust/neqo-transport/src/connection/tests/migration.rs b/third_party/rust/neqo-transport/src/connection/tests/migration.rs index 8307a7dd84..405ae161a4 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/migration.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/migration.rs @@ -6,6 +6,7 @@ use std::{ cell::RefCell, + mem, net::{IpAddr, Ipv6Addr, SocketAddr}, rc::Rc, time::{Duration, Instant}, @@ -13,9 +14,8 @@ use std::{ use neqo_common::{Datagram, Decoder}; use test_fixture::{ - self, addr, addr_v4, assertions::{assert_v4_path, assert_v6_path}, - fixture_init, new_neqo_qlog, now, + fixture_init, new_neqo_qlog, now, DEFAULT_ADDR, DEFAULT_ADDR_V4, }; use super::{ @@ -94,8 +94,8 @@ fn rebinding_port() { server.stream_close_send(stream_id).unwrap(); let dgram = server.process_output(now()).dgram(); let dgram = dgram.unwrap(); - assert_eq!(dgram.source(), addr()); - assert_eq!(dgram.destination(), new_port(addr())); + assert_eq!(dgram.source(), DEFAULT_ADDR); + assert_eq!(dgram.destination(), new_port(DEFAULT_ADDR)); } /// This simulates an attack where a valid packet is forwarded on @@ -109,7 +109,7 @@ fn path_forwarding_attack() { let mut now = now(); let dgram = send_something(&mut client, now); - let dgram = change_path(&dgram, addr_v4()); + let dgram = change_path(&dgram, DEFAULT_ADDR_V4); server.process_input(&dgram, now); // The server now probes the new (primary) path. @@ -188,7 +188,7 @@ fn migrate_immediate() { let now = now(); client - .migrate(Some(addr_v4()), Some(addr_v4()), true, now) + .migrate(Some(DEFAULT_ADDR_V4), Some(DEFAULT_ADDR_V4), true, now) .unwrap(); let client1 = send_something(&mut client, now); @@ -229,7 +229,7 @@ fn migrate_rtt() { let now = connect_rtt_idle(&mut client, &mut server, RTT); client - .migrate(Some(addr_v4()), Some(addr_v4()), true, now) + .migrate(Some(DEFAULT_ADDR_V4), Some(DEFAULT_ADDR_V4), true, now) .unwrap(); // The RTT might be increased for the new path, so allow a little flexibility. let rtt = client.paths.rtt(); @@ -245,7 +245,7 @@ fn migrate_immediate_fail() { let mut now = now(); client - .migrate(Some(addr_v4()), Some(addr_v4()), true, now) + .migrate(Some(DEFAULT_ADDR_V4), Some(DEFAULT_ADDR_V4), true, now) .unwrap(); let probe = client.process_output(now).dgram().unwrap(); @@ -293,7 +293,7 @@ fn migrate_same() { let now = now(); client - .migrate(Some(addr()), Some(addr()), true, now) + .migrate(Some(DEFAULT_ADDR), Some(DEFAULT_ADDR), true, now) .unwrap(); let probe = client.process_output(now).dgram().unwrap(); @@ -320,7 +320,7 @@ fn migrate_same_fail() { let mut now = now(); client - .migrate(Some(addr()), Some(addr()), true, now) + .migrate(Some(DEFAULT_ADDR), Some(DEFAULT_ADDR), true, now) .unwrap(); let probe = client.process_output(now).dgram().unwrap(); @@ -375,7 +375,7 @@ fn migration(mut client: Connection) { let now = now(); client - .migrate(Some(addr_v4()), Some(addr_v4()), false, now) + .migrate(Some(DEFAULT_ADDR_V4), Some(DEFAULT_ADDR_V4), false, now) .unwrap(); let probe = client.process_output(now).dgram().unwrap(); @@ -449,8 +449,8 @@ fn migration_client_empty_cid() { test_fixture::DEFAULT_SERVER_NAME, test_fixture::DEFAULT_ALPN, Rc::new(RefCell::new(EmptyConnectionIdGenerator::default())), - addr(), - addr(), + DEFAULT_ADDR, + DEFAULT_ADDR, ConnectionParameters::default(), now(), ) @@ -568,22 +568,22 @@ fn preferred_address(hs_client: SocketAddr, hs_server: SocketAddr, preferred: So /// Migration works for a new port number. #[test] fn preferred_address_new_port() { - let a = addr(); + let a = DEFAULT_ADDR; preferred_address(a, a, new_port(a)); } /// Migration works for a new address too. #[test] fn preferred_address_new_address() { - let mut preferred = addr(); + let mut preferred = DEFAULT_ADDR; preferred.set_ip(IpAddr::V6(Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 2))); - preferred_address(addr(), addr(), preferred); + preferred_address(DEFAULT_ADDR, DEFAULT_ADDR, preferred); } /// Migration works for IPv4 addresses. #[test] fn preferred_address_new_port_v4() { - let a = addr_v4(); + let a = DEFAULT_ADDR_V4; preferred_address(a, a, new_port(a)); } @@ -623,7 +623,7 @@ fn preferred_address_ignore_loopback() { /// A preferred address in the wrong address family is ignored. #[test] fn preferred_address_ignore_different_family() { - preferred_address_ignored(PreferredAddress::new_any(Some(addr_v4()), None)); + preferred_address_ignored(PreferredAddress::new_any(Some(DEFAULT_ADDR_V4), None)); } /// Disabling preferred addresses at the client means that it ignores a perfectly @@ -631,7 +631,7 @@ fn preferred_address_ignore_different_family() { #[test] fn preferred_address_disabled_client() { let mut client = new_client(ConnectionParameters::default().disable_preferred_address()); - let mut preferred = addr(); + let mut preferred = DEFAULT_ADDR; preferred.set_ip(IpAddr::V6(Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 2))); let spa = PreferredAddress::new_any(None, Some(preferred)); let mut server = new_server(ConnectionParameters::default().preferred_address(spa)); @@ -643,7 +643,7 @@ fn preferred_address_disabled_client() { fn preferred_address_empty_cid() { fixture_init(); - let spa = PreferredAddress::new_any(None, Some(new_port(addr()))); + let spa = PreferredAddress::new_any(None, Some(new_port(DEFAULT_ADDR))); let res = Connection::new_server( test_fixture::DEFAULT_KEYS, test_fixture::DEFAULT_ALPN, @@ -706,33 +706,33 @@ fn preferred_address_client() { fn migration_invalid_state() { let mut client = default_client(); assert!(client - .migrate(Some(addr()), Some(addr()), false, now()) + .migrate(Some(DEFAULT_ADDR), Some(DEFAULT_ADDR), false, now()) .is_err()); let mut server = default_server(); assert!(server - .migrate(Some(addr()), Some(addr()), false, now()) + .migrate(Some(DEFAULT_ADDR), Some(DEFAULT_ADDR), false, now()) .is_err()); connect_force_idle(&mut client, &mut server); assert!(server - .migrate(Some(addr()), Some(addr()), false, now()) + .migrate(Some(DEFAULT_ADDR), Some(DEFAULT_ADDR), false, now()) .is_err()); client.close(now(), 0, "closing"); assert!(client - .migrate(Some(addr()), Some(addr()), false, now()) + .migrate(Some(DEFAULT_ADDR), Some(DEFAULT_ADDR), false, now()) .is_err()); let close = client.process(None, now()).dgram(); let dgram = server.process(close.as_ref(), now()).dgram(); assert!(server - .migrate(Some(addr()), Some(addr()), false, now()) + .migrate(Some(DEFAULT_ADDR), Some(DEFAULT_ADDR), false, now()) .is_err()); client.process_input(&dgram.unwrap(), now()); assert!(client - .migrate(Some(addr()), Some(addr()), false, now()) + .migrate(Some(DEFAULT_ADDR), Some(DEFAULT_ADDR), false, now()) .is_err()); } @@ -753,32 +753,32 @@ fn migration_invalid_address() { cant_migrate(None, None); // Providing a zero port number isn't valid. - let mut zero_port = addr(); + let mut zero_port = DEFAULT_ADDR; zero_port.set_port(0); cant_migrate(None, Some(zero_port)); cant_migrate(Some(zero_port), None); // An unspecified remote address is bad. - let mut remote_unspecified = addr(); + let mut remote_unspecified = DEFAULT_ADDR; remote_unspecified.set_ip(IpAddr::V6(Ipv6Addr::from(0))); cant_migrate(None, Some(remote_unspecified)); // Mixed address families is bad. - cant_migrate(Some(addr()), Some(addr_v4())); - cant_migrate(Some(addr_v4()), Some(addr())); + cant_migrate(Some(DEFAULT_ADDR), Some(DEFAULT_ADDR_V4)); + cant_migrate(Some(DEFAULT_ADDR_V4), Some(DEFAULT_ADDR)); // Loopback to non-loopback is bad. - cant_migrate(Some(addr()), Some(loopback())); - cant_migrate(Some(loopback()), Some(addr())); + cant_migrate(Some(DEFAULT_ADDR), Some(loopback())); + cant_migrate(Some(loopback()), Some(DEFAULT_ADDR)); assert_eq!( client - .migrate(Some(addr()), Some(loopback()), true, now()) + .migrate(Some(DEFAULT_ADDR), Some(loopback()), true, now()) .unwrap_err(), Error::InvalidMigration ); assert_eq!( client - .migrate(Some(loopback()), Some(addr()), true, now()) + .migrate(Some(loopback()), Some(DEFAULT_ADDR), true, now()) .unwrap_err(), Error::InvalidMigration ); @@ -864,7 +864,7 @@ fn retire_prior_to_migration_failure() { let original_cid = ConnectionId::from(get_cid(&send_something(&mut client, now()))); client - .migrate(Some(addr_v4()), Some(addr_v4()), false, now()) + .migrate(Some(DEFAULT_ADDR_V4), Some(DEFAULT_ADDR_V4), false, now()) .unwrap(); // The client now probes the new path. @@ -919,7 +919,7 @@ fn retire_prior_to_migration_success() { let original_cid = ConnectionId::from(get_cid(&send_something(&mut client, now()))); client - .migrate(Some(addr_v4()), Some(addr_v4()), false, now()) + .migrate(Some(DEFAULT_ADDR_V4), Some(DEFAULT_ADDR_V4), false, now()) .unwrap(); // The client now probes the new path. @@ -951,3 +951,39 @@ fn retire_prior_to_migration_success() { assert_ne!(get_cid(&dgram), original_cid); assert_ne!(get_cid(&dgram), probe_cid); } + +struct GarbageWriter {} + +impl crate::connection::test_internal::FrameWriter for GarbageWriter { + fn write_frames(&mut self, builder: &mut PacketBuilder) { + // Not a valid frame type. + builder.encode_varint(u32::MAX); + } +} + +/// Test the case that we run out of connection ID and receive an invalid frame +/// from a new path. +#[test] +#[should_panic(expected = "attempting to close with a temporary path")] +fn error_on_new_path_with_no_connection_id() { + let mut client = default_client(); + let mut server = default_server(); + connect_force_idle(&mut client, &mut server); + + let cid_gen: Rc<RefCell<dyn ConnectionIdGenerator>> = + Rc::new(RefCell::new(CountingConnectionIdGenerator::default())); + server.test_frame_writer = Some(Box::new(RetireAll { cid_gen })); + let retire_all = send_something(&mut server, now()); + + client.process_input(&retire_all, now()); + + server.test_frame_writer = Some(Box::new(GarbageWriter {})); + let garbage = send_something(&mut server, now()); + + let dgram = change_path(&garbage, DEFAULT_ADDR_V4); + client.process_input(&dgram, now()); + + // See issue #1697. We had a crash when the client had a temporary path and + // process_output is called. + mem::drop(client.process_output(now())); +} diff --git a/third_party/rust/neqo-transport/src/connection/tests/mod.rs b/third_party/rust/neqo-transport/src/connection/tests/mod.rs index 8a999f4048..b6ce08f8d1 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/mod.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/mod.rs @@ -4,12 +4,9 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -#![deny(clippy::pedantic)] - use std::{ cell::RefCell, cmp::min, - convert::TryFrom, mem, rc::Rc, time::{Duration, Instant}, @@ -18,7 +15,7 @@ use std::{ use enum_map::enum_map; use neqo_common::{event::Provider, qdebug, qtrace, Datagram, Decoder, Role}; use neqo_crypto::{random, AllowZeroRtt, AuthenticationStatus, ResumptionToken}; -use test_fixture::{self, addr, fixture_init, new_neqo_qlog, now}; +use test_fixture::{fixture_init, new_neqo_qlog, now, DEFAULT_ADDR}; use super::{Connection, ConnectionError, ConnectionId, Output, State}; use crate::{ @@ -79,7 +76,7 @@ impl ConnectionIdDecoder for CountingConnectionIdGenerator { impl ConnectionIdGenerator for CountingConnectionIdGenerator { fn generate_cid(&mut self) -> Option<ConnectionId> { - let mut r = random(20); + let mut r = random::<20>(); r[0] = 8; r[1] = u8::try_from(self.counter >> 24).unwrap(); r[2] = u8::try_from((self.counter >> 16) & 0xff).unwrap(); @@ -107,8 +104,8 @@ pub fn new_client(params: ConnectionParameters) -> Connection { test_fixture::DEFAULT_SERVER_NAME, test_fixture::DEFAULT_ALPN, Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), - addr(), - addr(), + DEFAULT_ADDR, + DEFAULT_ADDR, params, now(), ) @@ -278,7 +275,7 @@ fn exchange_ticket( ) -> ResumptionToken { let validation = AddressValidation::new(now, ValidateAddress::NoToken).unwrap(); let validation = Rc::new(RefCell::new(validation)); - server.set_validation(Rc::clone(&validation)); + server.set_validation(&validation); server.send_ticket(now, &[]).expect("can send ticket"); let ticket = server.process_output(now).dgram(); assert!(ticket.is_some()); diff --git a/third_party/rust/neqo-transport/src/connection/tests/priority.rs b/third_party/rust/neqo-transport/src/connection/tests/priority.rs index 1f86aa22e5..079ba93b9f 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/priority.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/priority.rs @@ -7,7 +7,7 @@ use std::{cell::RefCell, mem, rc::Rc}; use neqo_common::event::Provider; -use test_fixture::{self, now}; +use test_fixture::now; use super::{ super::{Connection, Error, Output}, @@ -370,7 +370,7 @@ fn low() { let validation = Rc::new(RefCell::new( AddressValidation::new(now, ValidateAddress::Never).unwrap(), )); - server.set_validation(Rc::clone(&validation)); + server.set_validation(&validation); connect(&mut client, &mut server); let id = server.stream_create(StreamType::UniDi).unwrap(); diff --git a/third_party/rust/neqo-transport/src/connection/tests/resumption.rs b/third_party/rust/neqo-transport/src/connection/tests/resumption.rs index a8c45a9f06..7410e76ef8 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/resumption.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/resumption.rs @@ -6,7 +6,7 @@ use std::{cell::RefCell, mem, rc::Rc, time::Duration}; -use test_fixture::{self, assertions, now}; +use test_fixture::{assertions, now}; use super::{ connect, connect_with_rtt, default_client, default_server, exchange_ticket, get_tokens, @@ -50,7 +50,7 @@ fn remember_smoothed_rtt() { // wants to acknowledge; so the ticket will include an ACK frame too. let validation = AddressValidation::new(now, ValidateAddress::NoToken).unwrap(); let validation = Rc::new(RefCell::new(validation)); - server.set_validation(Rc::clone(&validation)); + server.set_validation(&validation); server.send_ticket(now, &[]).expect("can send ticket"); let ticket = server.process_output(now).dgram(); assert!(ticket.is_some()); @@ -84,7 +84,7 @@ fn address_validation_token_resume() { let mut server = default_server(); let validation = AddressValidation::new(now(), ValidateAddress::Always).unwrap(); let validation = Rc::new(RefCell::new(validation)); - server.set_validation(Rc::clone(&validation)); + server.set_validation(&validation); let mut now = connect_with_rtt(&mut client, &mut server, now(), RTT); let token = exchange_ticket(&mut client, &mut server, now); @@ -155,7 +155,7 @@ fn two_tickets_with_new_token() { let mut server = default_server(); let validation = AddressValidation::new(now(), ValidateAddress::Always).unwrap(); let validation = Rc::new(RefCell::new(validation)); - server.set_validation(Rc::clone(&validation)); + server.set_validation(&validation); connect(&mut client, &mut server); // Send two tickets with tokens and then bundle those into a packet. diff --git a/third_party/rust/neqo-transport/src/connection/tests/stream.rs b/third_party/rust/neqo-transport/src/connection/tests/stream.rs index 586a537b9d..f469866d50 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/stream.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/stream.rs @@ -4,7 +4,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::{cmp::max, collections::HashMap, convert::TryFrom, mem}; +use std::{cmp::max, collections::HashMap, mem}; use neqo_common::{event::Provider, qdebug}; use test_fixture::now; diff --git a/third_party/rust/neqo-transport/src/connection/tests/vn.rs b/third_party/rust/neqo-transport/src/connection/tests/vn.rs index 22f15c991c..93872a94f4 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/vn.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/vn.rs @@ -7,7 +7,7 @@ use std::{mem, time::Duration}; use neqo_common::{event::Provider, Decoder, Encoder}; -use test_fixture::{self, assertions, datagram, now}; +use test_fixture::{assertions, datagram, now}; use super::{ super::{ConnectionError, ConnectionEvent, Output, State, ZeroRttState}, diff --git a/third_party/rust/neqo-transport/src/connection/tests/zerortt.rs b/third_party/rust/neqo-transport/src/connection/tests/zerortt.rs index 0aa5573c98..b5e5f0d758 100644 --- a/third_party/rust/neqo-transport/src/connection/tests/zerortt.rs +++ b/third_party/rust/neqo-transport/src/connection/tests/zerortt.rs @@ -8,7 +8,7 @@ use std::{cell::RefCell, rc::Rc}; use neqo_common::event::Provider; use neqo_crypto::{AllowZeroRtt, AntiReplay}; -use test_fixture::{self, assertions, now}; +use test_fixture::{assertions, now}; use super::{ super::Connection, connect, default_client, default_server, exchange_ticket, new_server, diff --git a/third_party/rust/neqo-transport/src/crypto.rs b/third_party/rust/neqo-transport/src/crypto.rs index f6cc7c0e2f..9840eaa1e1 100644 --- a/third_party/rust/neqo-transport/src/crypto.rs +++ b/third_party/rust/neqo-transport/src/crypto.rs @@ -8,7 +8,6 @@ use std::{ cell::RefCell, cmp::{max, min}, collections::HashMap, - convert::TryFrom, mem, ops::{Index, IndexMut, Range}, rc::Rc, @@ -101,10 +100,10 @@ impl Crypto { version, protocols, tls: agent, - streams: Default::default(), + streams: CryptoStreams::default(), states: CryptoStates { fuzzing, - ..Default::default() + ..CryptoStates::default() }, }) } @@ -239,14 +238,14 @@ impl Crypto { /// Returns true if new handshake keys were installed. pub fn install_keys(&mut self, role: Role) -> Res<bool> { - if !self.tls.state().is_final() { + if self.tls.state().is_final() { + Ok(false) + } else { let installed_hs = self.install_handshake_keys()?; if role == Role::Server { self.maybe_install_application_write_key(self.version)?; } Ok(installed_hs) - } else { - Ok(false) } } @@ -274,7 +273,7 @@ impl Crypto { fn maybe_install_application_write_key(&mut self, version: Version) -> Res<()> { qtrace!([self], "Attempt to install application write key"); if let Some(secret) = self.tls.write_secret(TLS_EPOCH_APPLICATION_DATA) { - self.states.set_application_write_key(version, secret)?; + self.states.set_application_write_key(version, &secret)?; qdebug!([self], "Application write key installed"); } Ok(()) @@ -290,7 +289,7 @@ impl Crypto { .read_secret(TLS_EPOCH_APPLICATION_DATA) .ok_or(Error::InternalError)?; self.states - .set_application_read_key(version, read_secret, expire_0rtt)?; + .set_application_read_key(version, &read_secret, expire_0rtt)?; qdebug!([self], "application read keys installed"); Ok(()) } @@ -313,8 +312,8 @@ impl Crypto { builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, - ) -> Res<()> { - self.streams.write_frame(space, builder, tokens, stats) + ) { + self.streams.write_frame(space, builder, tokens, stats); } pub fn acked(&mut self, token: &CryptoRecoveryToken) { @@ -767,7 +766,7 @@ impl CryptoDxAppData { pub fn new( version: Version, dir: CryptoDxDirection, - secret: SymKey, + secret: &SymKey, cipher: Cipher, fuzzing: bool, ) -> Res<Self> { @@ -776,12 +775,12 @@ impl CryptoDxAppData { version, dir, TLS_EPOCH_APPLICATION_DATA, - &secret, + secret, cipher, fuzzing, ), cipher, - next_secret: Self::update_secret(cipher, &secret)?, + next_secret: Self::update_secret(cipher, secret)?, fuzzing, }) } @@ -1111,7 +1110,7 @@ impl CryptoStates { }); } - pub fn set_application_write_key(&mut self, version: Version, secret: SymKey) -> Res<()> { + pub fn set_application_write_key(&mut self, version: Version, secret: &SymKey) -> Res<()> { debug_assert!(self.app_write.is_none()); debug_assert_ne!(self.cipher, 0); let mut app = CryptoDxAppData::new( @@ -1134,7 +1133,7 @@ impl CryptoStates { pub fn set_application_read_key( &mut self, version: Version, - secret: SymKey, + secret: &SymKey, expire_0rtt: Instant, ) -> Res<()> { debug_assert!(self.app_write.is_some(), "should have write keys installed"); @@ -1530,14 +1529,14 @@ impl CryptoStreams { builder: &mut PacketBuilder, tokens: &mut Vec<RecoveryToken>, stats: &mut FrameStats, - ) -> Res<()> { + ) { let cs = self.get_mut(space).unwrap(); if let Some((offset, data)) = cs.tx.next_bytes() { let mut header_len = 1 + Encoder::varint_len(offset) + 1; // Don't bother if there isn't room for the header and some data. if builder.remaining() < header_len + 1 { - return Ok(()); + return; } // Calculate length of data based on the minimum of: // - available data @@ -1561,7 +1560,6 @@ impl CryptoStreams { })); stats.crypto += 1; } - Ok(()) } } diff --git a/third_party/rust/neqo-transport/src/events.rs b/third_party/rust/neqo-transport/src/events.rs index 88a85250ee..a892e384b9 100644 --- a/third_party/rust/neqo-transport/src/events.rs +++ b/third_party/rust/neqo-transport/src/events.rs @@ -52,7 +52,7 @@ pub enum ConnectionEvent { stream_id: StreamId, app_error: AppError, }, - /// Peer has sent STOP_SENDING + /// Peer has sent `STOP_SENDING` SendStreamStopSending { stream_id: StreamId, app_error: AppError, @@ -61,7 +61,7 @@ pub enum ConnectionEvent { SendStreamComplete { stream_id: StreamId, }, - /// Peer increased MAX_STREAMS + /// Peer increased `MAX_STREAMS` SendStreamCreatable { stream_type: StreamType, }, @@ -254,8 +254,9 @@ impl EventProvider for ConnectionEvents { #[cfg(test)] mod tests { - use super::*; - use crate::{ConnectionError, Error}; + use neqo_common::event::Provider; + + use crate::{ConnectionError, ConnectionEvent, ConnectionEvents, Error, State, StreamId}; #[test] fn event_culling() { diff --git a/third_party/rust/neqo-transport/src/fc.rs b/third_party/rust/neqo-transport/src/fc.rs index a219ca7e8d..5ddfce6463 100644 --- a/third_party/rust/neqo-transport/src/fc.rs +++ b/third_party/rust/neqo-transport/src/fc.rs @@ -8,7 +8,6 @@ // into flow control frames needing to be sent to the remote. use std::{ - convert::TryFrom, fmt::Debug, ops::{Deref, DerefMut, Index, IndexMut}, }; @@ -249,7 +248,7 @@ where } } - /// This function is called when STREAM_DATA_BLOCKED frame is received. + /// This function is called when `STREAM_DATA_BLOCKED` frame is received. /// The flow control will try to send an update if possible. pub fn send_flowc_update(&mut self) { if self.retired + self.max_active > self.max_allowed { diff --git a/third_party/rust/neqo-transport/src/frame.rs b/third_party/rust/neqo-transport/src/frame.rs index f3d567ac7c..b3bb024a2c 100644 --- a/third_party/rust/neqo-transport/src/frame.rs +++ b/third_party/rust/neqo-transport/src/frame.rs @@ -6,7 +6,7 @@ // Directly relating to QUIC frames. -use std::{convert::TryFrom, ops::RangeInclusive}; +use std::ops::RangeInclusive; use neqo_common::{qtrace, Decoder}; @@ -78,6 +78,7 @@ impl CloseError { } } + #[must_use] pub fn code(&self) -> u64 { match self { Self::Transport(c) | Self::Application(c) => *c, @@ -303,7 +304,7 @@ impl<'a> Frame<'a> { ) } - /// Converts AckRanges as encoded in a ACK frame (see -transport + /// Converts `AckRanges` as encoded in a ACK frame (see -transport /// 19.3.1) into ranges of acked packets (end, start), inclusive of /// start and end values. pub fn decode_ack_frame( @@ -387,6 +388,7 @@ impl<'a> Frame<'a> { } } + #[allow(clippy::too_many_lines)] // Yeah, but it's a nice match statement. pub fn decode(dec: &mut Decoder<'a>) -> Res<Self> { /// Maximum ACK Range Count in ACK Frame /// @@ -430,7 +432,7 @@ impl<'a> Frame<'a> { } })?; let fa = dv(dec)?; - let mut arr: Vec<AckRange> = Vec::with_capacity(nr as usize); + let mut arr: Vec<AckRange> = Vec::with_capacity(usize::try_from(nr)?); for _ in 0..nr { let ar = AckRange { gap: dv(dec)?, @@ -615,7 +617,11 @@ impl<'a> Frame<'a> { mod tests { use neqo_common::{Decoder, Encoder}; - use super::*; + use crate::{ + cid::MAX_CONNECTION_ID_LEN, + frame::{AckRange, Frame, FRAME_TYPE_ACK}, + CloseError, Error, StreamId, StreamType, + }; fn just_dec(f: &Frame, s: &str) { let encoded = Encoder::from_hex(s); diff --git a/third_party/rust/neqo-transport/src/lib.rs b/third_party/rust/neqo-transport/src/lib.rs index ecf7ee2f73..be482c466f 100644 --- a/third_party/rust/neqo-transport/src/lib.rs +++ b/third_party/rust/neqo-transport/src/lib.rs @@ -4,8 +4,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -#![cfg_attr(feature = "deny-warnings", deny(warnings))] -#![warn(clippy::use_self)] +#![allow(clippy::module_name_repetitions)] // This lint doesn't work here. use neqo_common::qinfo; use neqo_crypto::Error as CryptoError; @@ -30,6 +29,9 @@ pub mod recv_stream; #[cfg(not(feature = "bench"))] mod recv_stream; mod rtt; +#[cfg(feature = "bench")] +pub mod send_stream; +#[cfg(not(feature = "bench"))] mod send_stream; mod sender; pub mod server; @@ -130,6 +132,7 @@ pub enum Error { } impl Error { + #[must_use] pub fn code(&self) -> TransportError { match self { Self::NoError @@ -206,6 +209,7 @@ pub enum ConnectionError { } impl ConnectionError { + #[must_use] pub fn app_code(&self) -> Option<AppError> { match self { Self::Application(e) => Some(*e), diff --git a/third_party/rust/neqo-transport/src/pace.rs b/third_party/rust/neqo-transport/src/pace.rs index e5214c1bc8..5b88e5c0c4 100644 --- a/third_party/rust/neqo-transport/src/pace.rs +++ b/third_party/rust/neqo-transport/src/pace.rs @@ -5,11 +5,9 @@ // except according to those terms. // Pacer -#![deny(clippy::pedantic)] use std::{ cmp::min, - convert::TryFrom, fmt::{Debug, Display}, time::{Duration, Instant}, }; diff --git a/third_party/rust/neqo-transport/src/packet/mod.rs b/third_party/rust/neqo-transport/src/packet/mod.rs index ccfd212d5f..8458f69779 100644 --- a/third_party/rust/neqo-transport/src/packet/mod.rs +++ b/third_party/rust/neqo-transport/src/packet/mod.rs @@ -7,9 +7,7 @@ // Encoding and decoding packets off the wire. use std::{ cmp::min, - convert::TryFrom, fmt, - iter::ExactSizeIterator, ops::{Deref, DerefMut, Range}, time::Instant, }; @@ -172,11 +170,12 @@ impl PacketBuilder { } /// Start building a long header packet. - /// For an Initial packet you will need to call initial_token(), + /// For an Initial packet you will need to call `initial_token()`, /// even if the token is empty. /// /// See `short()` for more on how to handle this in cases where there is no space. #[allow(clippy::reversed_empty_ranges)] // For initializing an empty range. + #[allow(clippy::similar_names)] // For dcid and scid, which are fine here. pub fn long( mut encoder: Encoder, pt: PacketType, @@ -271,7 +270,7 @@ impl PacketBuilder { let mask = if quic_bit { PACKET_BIT_FIXED_QUIC } else { 0 } | if self.is_long() { 0 } else { PACKET_BIT_SPIN }; let first = self.header.start; - self.encoder.as_mut()[first] ^= random(1)[0] & mask; + self.encoder.as_mut()[first] ^= random::<1>()[0] & mask; } /// For an Initial packet, encode the token. @@ -315,6 +314,7 @@ impl PacketBuilder { self.pn = pn; } + #[allow(clippy::cast_possible_truncation)] // Nope. fn write_len(&mut self, expansion: usize) { let len = self.encoder.len() - (self.offsets.len + 2) + expansion; self.encoder.as_mut()[self.offsets.len] = 0x40 | ((len >> 8) & 0x3f) as u8; @@ -410,6 +410,7 @@ impl PacketBuilder { /// As this is a simple packet, this is just an associated function. /// As Retry is odd (it has to be constructed with leading bytes), /// this returns a [`Vec<u8>`] rather than building on an encoder. + #[allow(clippy::similar_names)] // scid and dcid are fine here. pub fn retry( version: Version, dcid: &[u8], @@ -424,7 +425,7 @@ impl PacketBuilder { PACKET_BIT_LONG | PACKET_BIT_FIXED_QUIC | (PacketType::Retry.to_byte(version) << 4) - | (random(1)[0] & 0xf), + | (random::<1>()[0] & 0xf), ); encoder.encode_uint(4, version.wire_version()); encoder.encode_vec(1, dcid); @@ -441,6 +442,7 @@ impl PacketBuilder { } /// Make a Version Negotiation packet. + #[allow(clippy::similar_names)] // scid and dcid are fine here. pub fn version_negotiation( dcid: &[u8], scid: &[u8], @@ -448,7 +450,7 @@ impl PacketBuilder { versions: &[Version], ) -> Vec<u8> { let mut encoder = Encoder::default(); - let mut grease = random(4); + let mut grease = random::<4>(); // This will not include the "QUIC bit" sometimes. Intentionally. encoder.encode_byte(PACKET_BIT_LONG | (grease[3] & 0x7f)); encoder.encode(&[0; 4]); // Zero version == VN. @@ -492,7 +494,7 @@ impl From<PacketBuilder> for Encoder { } } -/// PublicPacket holds information from packets that is public only. This allows for +/// `PublicPacket` holds information from packets that is public only. This allows for /// processing of packets prior to decryption. pub struct PublicPacket<'a> { /// The packet type. @@ -552,6 +554,7 @@ impl<'a> PublicPacket<'a> { /// Decode the common parts of a packet. This provides minimal parsing and validation. /// Returns a tuple of a `PublicPacket` and a slice with any remainder from the datagram. + #[allow(clippy::similar_names)] // For dcid and scid, which are fine. pub fn decode(data: &'a [u8], dcid_decoder: &dyn ConnectionIdDecoder) -> Res<(Self, &'a [u8])> { let mut decoder = Decoder::new(data); let first = Self::opt(decoder.decode_byte())?; @@ -868,10 +871,14 @@ mod tests { use neqo_common::Encoder; use test_fixture::{fixture_init, now}; - use super::*; use crate::{ + cid::MAX_CONNECTION_ID_LEN, crypto::{CryptoDxState, CryptoStates}, - EmptyConnectionIdGenerator, RandomConnectionIdGenerator, Version, + packet::{ + PacketBuilder, PacketType, PublicPacket, PACKET_BIT_FIXED_QUIC, PACKET_BIT_LONG, + PACKET_BIT_SPIN, + }, + ConnectionId, EmptyConnectionIdGenerator, RandomConnectionIdGenerator, Version, }; const CLIENT_CID: &[u8] = &[0x83, 0x94, 0xc8, 0xf0, 0x3e, 0x51, 0x57, 0x08]; @@ -1366,8 +1373,12 @@ mod tests { #[test] fn build_vn() { fixture_init(); - let mut vn = - PacketBuilder::version_negotiation(SERVER_CID, CLIENT_CID, 0x0a0a0a0a, &Version::all()); + let mut vn = PacketBuilder::version_negotiation( + SERVER_CID, + CLIENT_CID, + 0x0a0a_0a0a, + &Version::all(), + ); // Erase randomness from greasing... assert_eq!(vn.len(), SAMPLE_VN.len()); vn[0] &= 0x80; @@ -1380,8 +1391,12 @@ mod tests { #[test] fn vn_do_not_repeat_client_grease() { fixture_init(); - let vn = - PacketBuilder::version_negotiation(SERVER_CID, CLIENT_CID, 0x0a0a0a0a, &Version::all()); + let vn = PacketBuilder::version_negotiation( + SERVER_CID, + CLIENT_CID, + 0x0a0a_0a0a, + &Version::all(), + ); assert_ne!(&vn[SAMPLE_VN.len() - 4..], &[0x0a, 0x0a, 0x0a, 0x0a]); } diff --git a/third_party/rust/neqo-transport/src/packet/retry.rs b/third_party/rust/neqo-transport/src/packet/retry.rs index 004e9de6e7..72036d3b49 100644 --- a/third_party/rust/neqo-transport/src/packet/retry.rs +++ b/third_party/rust/neqo-transport/src/packet/retry.rs @@ -4,8 +4,6 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -#![deny(clippy::pedantic)] - use std::cell::RefCell; use neqo_common::qerror; diff --git a/third_party/rust/neqo-transport/src/path.rs b/third_party/rust/neqo-transport/src/path.rs index d6920c8d94..4e8d9958ab 100644 --- a/third_party/rust/neqo-transport/src/path.rs +++ b/third_party/rust/neqo-transport/src/path.rs @@ -4,12 +4,10 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -#![deny(clippy::pedantic)] #![allow(clippy::module_name_repetitions)] use std::{ cell::RefCell, - convert::TryFrom, fmt::{self, Display}, mem, net::{IpAddr, SocketAddr}, @@ -72,7 +70,7 @@ pub struct Paths { /// Connection IDs that need to be retired. to_retire: Vec<u64>, - /// QLog handler. + /// `QLog` handler. qlog: NeqoQlog, } @@ -156,7 +154,7 @@ impl Paths { /// Get a reference to the primary path. Use this prior to handshake completion. pub fn primary_fallible(&self) -> Option<PathRef> { - self.primary.as_ref().map(Rc::clone) + self.primary.clone() } /// Returns true if the path is not permanent. @@ -341,7 +339,7 @@ impl Paths { None } }) - .or_else(|| self.primary.as_ref().map(Rc::clone)) + .or_else(|| self.primary.clone()) } /// A `PATH_RESPONSE` was received. @@ -527,7 +525,7 @@ pub struct Path { /// For a path that is not validated, this is `None`. For a validated /// path, the time that the path was last valid. validated: Option<Instant>, - /// A path challenge was received and PATH_RESPONSE has not been sent. + /// A path challenge was received and `PATH_RESPONSE` has not been sent. challenge: Option<[u8; 8]>, /// The round trip time estimate for this path. @@ -796,7 +794,7 @@ impl Path { // Send PATH_CHALLENGE. if let ProbeState::ProbeNeeded { probe_count } = self.state { qtrace!([self], "Initiating path challenge {}", probe_count); - let data = <[u8; 8]>::try_from(&random(8)[..]).unwrap(); + let data = random::<8>(); builder.encode_varint(FRAME_TYPE_PATH_CHALLENGE); builder.encode(&data); diff --git a/third_party/rust/neqo-transport/src/qlog.rs b/third_party/rust/neqo-transport/src/qlog.rs index 434395fd23..2572966104 100644 --- a/third_party/rust/neqo-transport/src/qlog.rs +++ b/third_party/rust/neqo-transport/src/qlog.rs @@ -7,9 +7,7 @@ // Functions that handle capturing QLOG traces. use std::{ - convert::TryFrom, ops::{Deref, RangeInclusive}, - string::String, time::Duration, }; @@ -38,6 +36,7 @@ use crate::{ pub fn connection_tparams_set(qlog: &mut NeqoQlog, tph: &TransportParametersHandler) { qlog.add_event_data(|| { let remote = tph.remote(); + #[allow(clippy::cast_possible_truncation)] // Nope. let ev_data = EventData::TransportParametersSet( qlog::events::quic::TransportParametersSet { owner: None, @@ -206,7 +205,7 @@ pub fn packet_sent( let mut frames = SmallVec::new(); while d.remaining() > 0 { if let Ok(f) = Frame::decode(&mut d) { - frames.push(frame_to_qlogframe(&f)) + frames.push(frame_to_qlogframe(&f)); } else { qinfo!("qlog: invalid frame"); break; @@ -300,7 +299,7 @@ pub fn packet_received( while d.remaining() > 0 { if let Ok(f) = Frame::decode(&mut d) { - frames.push(frame_to_qlogframe(&f)) + frames.push(frame_to_qlogframe(&f)); } else { qinfo!("qlog: invalid frame"); break; @@ -355,6 +354,7 @@ pub fn metrics_updated(qlog: &mut NeqoQlog, updated_metrics: &[QlogMetric]) { let mut pacing_rate: Option<u64> = None; for metric in updated_metrics { + #[allow(clippy::cast_precision_loss)] // Nought to do here. match metric { QlogMetric::MinRtt(v) => min_rtt = Some(v.as_secs_f32() * 1000.0), QlogMetric::SmoothedRtt(v) => smoothed_rtt = Some(v.as_secs_f32() * 1000.0), @@ -391,6 +391,8 @@ pub fn metrics_updated(qlog: &mut NeqoQlog, updated_metrics: &[QlogMetric]) { // Helper functions +#[allow(clippy::too_many_lines)] // Yeah, but it's a nice match. +#[allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)] // No choice here. fn frame_to_qlogframe(frame: &Frame) -> QuicFrame { match frame { Frame::Padding => QuicFrame::Padding, diff --git a/third_party/rust/neqo-transport/src/quic_datagrams.rs b/third_party/rust/neqo-transport/src/quic_datagrams.rs index 07f3594768..d7c4769e31 100644 --- a/third_party/rust/neqo-transport/src/quic_datagrams.rs +++ b/third_party/rust/neqo-transport/src/quic_datagrams.rs @@ -6,7 +6,7 @@ // https://datatracker.ietf.org/doc/html/draft-ietf-quic-datagram -use std::{cmp::min, collections::VecDeque, convert::TryFrom}; +use std::{cmp::min, collections::VecDeque}; use neqo_common::Encoder; @@ -103,7 +103,7 @@ impl QuicDatagrams { /// This function tries to write a datagram frame into a packet. /// If the frame does not fit into the packet, the datagram will - /// be dropped and a DatagramLost event will be posted. + /// be dropped and a `DatagramLost` event will be posted. pub fn write_frames( &mut self, builder: &mut PacketBuilder, diff --git a/third_party/rust/neqo-transport/src/recovery.rs b/third_party/rust/neqo-transport/src/recovery.rs index d90989b486..dbea3aaf57 100644 --- a/third_party/rust/neqo-transport/src/recovery.rs +++ b/third_party/rust/neqo-transport/src/recovery.rs @@ -6,12 +6,9 @@ // Tracking of sent packets and detecting their loss. -#![deny(clippy::pedantic)] - use std::{ cmp::{max, min}, collections::BTreeMap, - convert::TryFrom, mem, ops::RangeInclusive, time::{Duration, Instant}, @@ -1020,14 +1017,13 @@ impl ::std::fmt::Display for LossRecovery { mod tests { use std::{ cell::RefCell, - convert::TryInto, ops::{Deref, DerefMut, RangeInclusive}, rc::Rc, time::{Duration, Instant}, }; use neqo_common::qlog::NeqoQlog; - use test_fixture::{addr, now}; + use test_fixture::{now, DEFAULT_ADDR}; use super::{ LossRecovery, LossRecoverySpace, PacketNumberSpace, SendProfile, SentPacket, FAST_PTO_SCALE, @@ -1105,7 +1101,14 @@ mod tests { impl Default for Fixture { fn default() -> Self { const CC: CongestionControlAlgorithm = CongestionControlAlgorithm::NewReno; - let mut path = Path::temporary(addr(), addr(), CC, true, NeqoQlog::default(), now()); + let mut path = Path::temporary( + DEFAULT_ADDR, + DEFAULT_ADDR, + CC, + true, + NeqoQlog::default(), + now(), + ); path.make_permanent( None, ConnectionIdEntry::new(0, ConnectionId::from(&[1, 2, 3]), [0; 16]), diff --git a/third_party/rust/neqo-transport/src/recv_stream.rs b/third_party/rust/neqo-transport/src/recv_stream.rs index 06ca59685d..5da80d6004 100644 --- a/third_party/rust/neqo-transport/src/recv_stream.rs +++ b/third_party/rust/neqo-transport/src/recv_stream.rs @@ -11,7 +11,6 @@ use std::{ cell::RefCell, cmp::max, collections::BTreeMap, - convert::TryFrom, mem, rc::{Rc, Weak}, }; @@ -34,6 +33,7 @@ use crate::{ const RX_STREAM_DATA_WINDOW: u64 = 0x10_0000; // 1MiB // Export as usize for consistency with SEND_BUFFER_SIZE +#[allow(clippy::cast_possible_truncation)] // Yeah, nope. pub const RECV_BUFFER_SIZE: usize = RX_STREAM_DATA_WINDOW as usize; #[derive(Debug, Default)] @@ -130,6 +130,7 @@ pub struct RxStreamOrderer { } impl RxStreamOrderer { + #[must_use] pub fn new() -> Self { Self::default() } @@ -137,6 +138,9 @@ impl RxStreamOrderer { /// Process an incoming stream frame off the wire. This may result in data /// being available to upper layers if frame is not out of order (ooo) or /// if the frame fills a gap. + /// # Panics + /// Only when `u64` values cannot be converted to `usize`, which only + /// happens on 32-bit machines that hold far too much data at the same time. pub fn inbound_frame(&mut self, mut new_start: u64, mut new_data: &[u8]) { qtrace!("Inbound data offset={} len={}", new_start, new_data.len()); @@ -275,6 +279,7 @@ impl RxStreamOrderer { } /// Are any bytes readable? + #[must_use] pub fn data_ready(&self) -> bool { self.data_ranges .keys() @@ -301,20 +306,24 @@ impl RxStreamOrderer { false } }) - .map(|(_, data_len)| data_len as usize) - .sum() + // Accumulate, but saturate at usize::MAX. + .fold(0, |acc: usize, (_, data_len)| { + acc.saturating_add(usize::try_from(data_len).unwrap_or(usize::MAX)) + }) } /// Bytes read by the application. + #[must_use] pub fn retired(&self) -> u64 { self.retired } + #[must_use] pub fn received(&self) -> u64 { self.received } - /// Data bytes buffered. Could be more than bytes_readable if there are + /// Data bytes buffered. Could be more than `bytes_readable` if there are /// ranges missing. fn buffered(&self) -> u64 { self.data_ranges @@ -588,6 +597,7 @@ impl RecvStream { self.state = new_state; } + #[must_use] pub fn stats(&self) -> RecvStreamStats { match &self.state { RecvStreamState::Recv { recv_buf, .. } @@ -622,6 +632,11 @@ impl RecvStream { } } + /// # Errors + /// When the incoming data violates flow control limits. + /// # Panics + /// Only when `u64` values are so big that they can't fit in a `usize`, which + /// only happens on a 32-bit machine that has far too much unread data. pub fn inbound_stream_frame(&mut self, fin: bool, offset: u64, data: &[u8]) -> Res<()> { // We should post a DataReadable event only once when we change from no-data-ready to // data-ready. Therefore remember the state before processing a new frame. @@ -691,6 +706,8 @@ impl RecvStream { Ok(()) } + /// # Errors + /// When the reset occurs at an invalid point. pub fn reset(&mut self, application_error_code: AppError, final_size: u64) -> Res<()> { self.state.flow_control_consume_data(final_size, true)?; match &mut self.state { @@ -773,6 +790,7 @@ impl RecvStream { } } + #[must_use] pub fn is_terminal(&self) -> bool { matches!( self.state, @@ -792,8 +810,8 @@ impl RecvStream { } /// # Errors - /// /// `NoMoreData` if data and fin bit were previously read by the application. + #[allow(clippy::missing_panics_doc)] // with a >16 exabyte packet on a 128-bit machine, maybe pub fn read(&mut self, buf: &mut [u8]) -> Res<(usize, bool)> { let data_recvd_state = matches!(self.state, RecvStreamState::DataRecvd { .. }); match &mut self.state { @@ -967,6 +985,7 @@ impl RecvStream { } #[cfg(test)] + #[must_use] pub fn has_frames_to_write(&self) -> bool { if let RecvStreamState::Recv { fc, .. } = &self.state { fc.frame_needed() @@ -976,6 +995,7 @@ impl RecvStream { } #[cfg(test)] + #[must_use] pub fn fc(&self) -> Option<&ReceiverFlowControl<StreamId>> { match &self.state { RecvStreamState::Recv { fc, .. } @@ -990,11 +1010,18 @@ impl RecvStream { #[cfg(test)] mod tests { - use std::ops::Range; + use std::{cell::RefCell, ops::Range, rc::Rc}; - use neqo_common::Encoder; + use neqo_common::{qtrace, Encoder}; - use super::*; + use super::RecvStream; + use crate::{ + fc::ReceiverFlowControl, + packet::PacketBuilder, + recv_stream::{RxStreamOrderer, RX_STREAM_DATA_WINDOW}, + stats::FrameStats, + ConnectionEvents, Error, StreamId, RECV_BUFFER_SIZE, + }; const SESSION_WINDOW: usize = 1024; @@ -1444,8 +1471,8 @@ mod tests { let mut buf = vec![0u8; RECV_BUFFER_SIZE + 100]; // Make it overlarge assert!(!s.has_frames_to_write()); - s.inbound_stream_frame(false, 0, &[0; RECV_BUFFER_SIZE]) - .unwrap(); + let big_buf = vec![0; RECV_BUFFER_SIZE]; + s.inbound_stream_frame(false, 0, &big_buf).unwrap(); assert!(!s.has_frames_to_write()); assert_eq!(s.read(&mut buf).unwrap(), (RECV_BUFFER_SIZE, false)); assert!(!s.data_ready()); @@ -1476,8 +1503,8 @@ mod tests { fn stream_max_stream_data() { let mut s = create_stream(1024 * RX_STREAM_DATA_WINDOW); assert!(!s.has_frames_to_write()); - s.inbound_stream_frame(false, 0, &[0; RECV_BUFFER_SIZE]) - .unwrap(); + let big_buf = vec![0; RECV_BUFFER_SIZE]; + s.inbound_stream_frame(false, 0, &big_buf).unwrap(); s.inbound_stream_frame(false, RX_STREAM_DATA_WINDOW, &[1; 1]) .unwrap_err(); } @@ -1520,9 +1547,10 @@ mod tests { #[test] fn no_stream_flowc_event_after_exiting_recv() { let mut s = create_stream(1024 * RX_STREAM_DATA_WINDOW); - s.inbound_stream_frame(false, 0, &[0; RECV_BUFFER_SIZE]) - .unwrap(); - let mut buf = [0; RECV_BUFFER_SIZE]; + let mut buf = vec![0; RECV_BUFFER_SIZE]; + // Write from buf at first. + s.inbound_stream_frame(false, 0, &buf).unwrap(); + // Then read into it. s.read(&mut buf).unwrap(); assert!(s.has_frames_to_write()); s.inbound_stream_frame(true, RX_STREAM_DATA_WINDOW, &[]) @@ -1634,7 +1662,7 @@ mod tests { assert_eq!(fc.retired(), retired); } - /// Test consuming the flow control in RecvStreamState::Recv + /// Test consuming the flow control in `RecvStreamState::Recv` #[test] fn fc_state_recv_1() { const SW: u64 = 1024; @@ -1651,7 +1679,7 @@ mod tests { check_fc(s.fc().unwrap(), SW / 4, 0); } - /// Test consuming the flow control in RecvStreamState::Recv + /// Test consuming the flow control in `RecvStreamState::Recv` /// with multiple streams #[test] fn fc_state_recv_2() { @@ -1678,7 +1706,7 @@ mod tests { check_fc(s2.fc().unwrap(), SW / 4, 0); } - /// Test retiring the flow control in RecvStreamState::Recv + /// Test retiring the flow control in `RecvStreamState::Recv` /// with multiple streams #[test] fn fc_state_recv_3() { @@ -1730,7 +1758,7 @@ mod tests { check_fc(s2.fc().unwrap(), SW / 4, SW / 4); } - /// Test consuming the flow control in RecvStreamState::Recv - duplicate data + /// Test consuming the flow control in `RecvStreamState::Recv` - duplicate data #[test] fn fc_state_recv_4() { const SW: u64 = 1024; @@ -1753,7 +1781,7 @@ mod tests { check_fc(s.fc().unwrap(), SW / 4, 0); } - /// Test consuming the flow control in RecvStreamState::Recv - filling a gap in the + /// Test consuming the flow control in `RecvStreamState::Recv` - filling a gap in the /// data stream. #[test] fn fc_state_recv_5() { @@ -1774,7 +1802,7 @@ mod tests { check_fc(s.fc().unwrap(), SW / 4, 0); } - /// Test consuming the flow control in RecvStreamState::Recv - receiving frame past + /// Test consuming the flow control in `RecvStreamState::Recv` - receiving frame past /// the flow control will cause an error. #[test] fn fc_state_recv_6() { @@ -1859,7 +1887,7 @@ mod tests { assert_eq!(stats.max_stream_data, 1); } - /// Test flow control in RecvStreamState::SizeKnown + /// Test flow control in `RecvStreamState::SizeKnown` #[test] fn fc_state_size_known() { const SW: u64 = 1024; @@ -1916,7 +1944,7 @@ mod tests { assert!(s.fc().is_none()); } - /// Test flow control in RecvStreamState::DataRecvd + /// Test flow control in `RecvStreamState::DataRecvd` #[test] fn fc_state_data_recv() { const SW: u64 = 1024; @@ -1961,7 +1989,7 @@ mod tests { assert!(s.fc().is_none()); } - /// Test flow control in RecvStreamState::DataRead + /// Test flow control in `RecvStreamState::DataRead` #[test] fn fc_state_data_read() { const SW: u64 = 1024; @@ -1999,7 +2027,7 @@ mod tests { assert!(s.fc().is_none()); } - /// Test flow control in RecvStreamState::AbortReading and final size is known + /// Test flow control in `RecvStreamState::AbortReading` and final size is known #[test] fn fc_state_abort_reading_1() { const SW: u64 = 1024; @@ -2041,7 +2069,7 @@ mod tests { check_fc(s.fc().unwrap(), SW / 2, SW / 2); } - /// Test flow control in RecvStreamState::AbortReading and final size is unknown + /// Test flow control in `RecvStreamState::AbortReading` and final size is unknown #[test] fn fc_state_abort_reading_2() { const SW: u64 = 1024; @@ -2099,7 +2127,7 @@ mod tests { check_fc(s.fc().unwrap(), SW / 2 + 20, SW / 2 + 20); } - /// Test flow control in RecvStreamState::WaitForReset + /// Test flow control in `RecvStreamState::WaitForReset` #[test] fn fc_state_wait_for_reset() { const SW: u64 = 1024; diff --git a/third_party/rust/neqo-transport/src/rtt.rs b/third_party/rust/neqo-transport/src/rtt.rs index 4b05198bc9..3b2969f689 100644 --- a/third_party/rust/neqo-transport/src/rtt.rs +++ b/third_party/rust/neqo-transport/src/rtt.rs @@ -6,8 +6,6 @@ // Tracking of sent packets and detecting their loss. -#![deny(clippy::pedantic)] - use std::{ cmp::{max, min}, time::{Duration, Instant}, diff --git a/third_party/rust/neqo-transport/src/send_stream.rs b/third_party/rust/neqo-transport/src/send_stream.rs index 5feb785ac6..8771ec7765 100644 --- a/third_party/rust/neqo-transport/src/send_stream.rs +++ b/third_party/rust/neqo-transport/src/send_stream.rs @@ -9,8 +9,7 @@ use std::{ cell::RefCell, cmp::{max, min, Ordering}, - collections::{BTreeMap, VecDeque}, - convert::TryFrom, + collections::{btree_map::Entry, BTreeMap, VecDeque}, hash::{Hash, Hasher}, mem, ops::Add, @@ -18,7 +17,7 @@ use std::{ }; use indexmap::IndexMap; -use neqo_common::{qdebug, qerror, qinfo, qtrace, Encoder, Role}; +use neqo_common::{qdebug, qerror, qtrace, Encoder, Role}; use smallvec::SmallVec; use crate::{ @@ -111,7 +110,7 @@ impl Add<RetransmissionPriority> for TransmissionPriority { /// If data is lost, this determines the priority that applies to retransmissions /// of that data. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] pub enum RetransmissionPriority { /// Prioritize retransmission at a fixed priority. /// With this, it is possible to prioritize retransmissions lower than transmissions. @@ -123,19 +122,14 @@ pub enum RetransmissionPriority { Same, /// Increase the priority of retransmissions (the default). /// Retransmissions of `Critical` or `Important` aren't elevated at all. + #[default] Higher, /// Increase the priority of retransmissions a lot. /// This is useful for streams that are particularly exposed to head-of-line blocking. MuchHigher, } -impl Default for RetransmissionPriority { - fn default() -> Self { - Self::Higher - } -} - -#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Clone, Copy)] enum RangeState { Sent, Acked, @@ -144,169 +138,268 @@ enum RangeState { /// Track ranges in the stream as sent or acked. Acked implies sent. Not in a /// range implies needing-to-be-sent, either initially or as a retransmission. #[derive(Debug, Default, PartialEq)] -struct RangeTracker { - // offset, (len, RangeState). Use u64 for len because ranges can exceed 32bits. +pub struct RangeTracker { + /// The number of bytes that have been acknowledged starting from offset 0. + acked: u64, + /// A map that tracks the state of ranges. + /// Keys are the offset of the start of the range. + /// Values is a tuple of the range length and its state. used: BTreeMap<u64, (u64, RangeState)>, + /// This is a cache for the output of `first_unmarked_range`, which we check a lot. + first_unmarked: Option<(u64, Option<u64>)>, } impl RangeTracker { fn highest_offset(&self) -> u64 { self.used - .range(..) - .next_back() - .map_or(0, |(k, (v, _))| *k + *v) + .last_key_value() + .map_or(self.acked, |(&k, &(v, _))| k + v) } fn acked_from_zero(&self) -> u64 { - self.used - .get(&0) - .filter(|(_, state)| *state == RangeState::Acked) - .map_or(0, |(v, _)| *v) + self.acked } /// Find the first unmarked range. If all are contiguous, this will return - /// (highest_offset(), None). - fn first_unmarked_range(&self) -> (u64, Option<u64>) { - let mut prev_end = 0; + /// (`highest_offset()`, None). + fn first_unmarked_range(&mut self) -> (u64, Option<u64>) { + if let Some(first_unmarked) = self.first_unmarked { + return first_unmarked; + } + + let mut prev_end = self.acked; - for (cur_off, (cur_len, _)) in &self.used { - if prev_end == *cur_off { + for (&cur_off, &(cur_len, _)) in &self.used { + if prev_end == cur_off { prev_end = cur_off + cur_len; } else { - return (prev_end, Some(cur_off - prev_end)); + let res = (prev_end, Some(cur_off - prev_end)); + self.first_unmarked = Some(res); + return res; } } + self.first_unmarked = Some((prev_end, None)); (prev_end, None) } - /// Turn one range into a list of subranges that align with existing - /// ranges. - /// Check impermissible overlaps in subregions: Sent cannot overwrite Acked. - // - // e.g. given N is new and ABC are existing: - // NNNNNNNNNNNNNNNN - // AAAAA BBBCCCCC ...then we want 5 chunks: - // 1122222333444555 - // - // but also if we have this: - // NNNNNNNNNNNNNNNN - // AAAAAAAAAA BBBB ...then break existing A and B ranges up: - // - // 1111111122222233 - // aaAAAAAAAA BBbb - // - // Doing all this work up front should make handling each chunk much - // easier. - fn chunk_range_on_edges( - &mut self, - new_off: u64, - new_len: u64, - new_state: RangeState, - ) -> Vec<(u64, u64, RangeState)> { - let mut tmp_off = new_off; - let mut tmp_len = new_len; - let mut v = Vec::new(); - - // cut previous overlapping range if needed - let prev = self.used.range_mut(..tmp_off).next_back(); - if let Some((prev_off, (prev_len, prev_state))) = prev { - let prev_state = *prev_state; - let overlap = (*prev_off + *prev_len).saturating_sub(new_off); - *prev_len -= overlap; - if overlap > 0 { - self.used.insert(new_off, (overlap, prev_state)); + /// When the range of acknowledged bytes from zero increases, we need to drop any + /// ranges within that span AND maybe extend it to include any adjacent acknowledged ranges. + fn coalesce_acked(&mut self) { + while let Some(e) = self.used.first_entry() { + match self.acked.cmp(e.key()) { + Ordering::Greater => { + let (off, (len, state)) = e.remove_entry(); + let overflow = (off + len).saturating_sub(self.acked); + if overflow > 0 { + if state == RangeState::Acked { + self.acked += overflow; + } else { + self.used.insert(self.acked, (overflow, state)); + } + break; + } + } + Ordering::Equal => { + if e.get().1 == RangeState::Acked { + let (len, _) = e.remove(); + self.acked += len; + } + break; + } + Ordering::Less => break, } } + } - let mut last_existing_remaining = None; - for (off, (len, state)) in self.used.range(tmp_off..tmp_off + tmp_len) { - // Create chunk for "overhang" before an existing range - if tmp_off < *off { - let sub_len = off - tmp_off; - v.push((tmp_off, sub_len, new_state)); - tmp_off += sub_len; - tmp_len -= sub_len; - } + /// Mark a range as acknowledged. This is simpler than marking a range as sent + /// because an acknowledged range can never turn back into a sent range, so + /// this function can just override the entire range. + /// + /// The only tricky parts are making sure that we maintain `self.acked`, + /// which is the first acknowledged range. And making sure that we don't create + /// ranges of the same type that are adjacent; these need to be merged. + #[allow(clippy::missing_panics_doc)] // with a >16 exabyte packet on a 128-bit machine, maybe + pub fn mark_acked(&mut self, new_off: u64, new_len: usize) { + let end = new_off + u64::try_from(new_len).unwrap(); + let new_off = max(self.acked, new_off); + let mut new_len = end.saturating_sub(new_off); + if new_len == 0 { + return; + } - // Create chunk to match existing range - let sub_len = min(*len, tmp_len); - let remaining_len = len - sub_len; - if new_state == RangeState::Sent && *state == RangeState::Acked { - qinfo!( - "Attempted to downgrade overlapping range Acked range {}-{} with Sent {}-{}", - off, - len, - new_off, - new_len - ); - } else { - v.push((tmp_off, sub_len, new_state)); - } - tmp_off += sub_len; - tmp_len -= sub_len; + self.first_unmarked = None; + if new_off == self.acked { + self.acked += new_len; + self.coalesce_acked(); + return; + } + let mut new_end = new_off + new_len; - if remaining_len > 0 { - last_existing_remaining = Some((*off, sub_len, remaining_len, *state)); + // Get all existing ranges that start within this new range. + let mut covered = self + .used + .range(new_off..new_end) + .map(|(&k, _)| k) + .collect::<SmallVec<[_; 8]>>(); + + if let Entry::Occupied(next_entry) = self.used.entry(new_end) { + // Check if the very next entry is the same type as this. + if next_entry.get().1 == RangeState::Acked { + // If is is acked, drop it and extend this new range. + let (extra_len, _) = next_entry.remove(); + new_len += extra_len; + new_end += extra_len; + } + } else if let Some(last) = covered.pop() { + // Otherwise, the last of the existing ranges might overhang this one by some. + let (old_off, (old_len, old_state)) = self.used.remove_entry(&last).unwrap(); // can't fail + let remainder = (old_off + old_len).saturating_sub(new_end); + if remainder > 0 { + if old_state == RangeState::Acked { + // Just extend the current range. + new_len += remainder; + new_end += remainder; + } else { + self.used.insert(new_end, (remainder, RangeState::Sent)); + } } } - - // Maybe break last existing range in two so that a final chunk will - // have the same length as an existing range entry - if let Some((off, sub_len, remaining_len, state)) = last_existing_remaining { - *self.used.get_mut(&off).expect("must be there") = (sub_len, state); - self.used.insert(off + sub_len, (remaining_len, state)); + // All covered ranges can just be trashed. + for k in covered { + self.used.remove(&k); } - // Create final chunk if anything remains of the new range - if tmp_len > 0 { - v.push((tmp_off, tmp_len, new_state)); + // Now either merge with a preceding acked range + // or cut a preceding sent range as needed. + let prev = self.used.range_mut(..new_off).next_back(); + if let Some((prev_off, (prev_len, prev_state))) = prev { + let prev_end = *prev_off + *prev_len; + if prev_end >= new_off { + if *prev_state == RangeState::Sent { + *prev_len = new_off - *prev_off; + if prev_end > new_end { + // There is some extra sent range after the new acked range. + self.used + .insert(new_end, (prev_end - new_end, RangeState::Sent)); + } + } else { + *prev_len = max(prev_end, new_end) - *prev_off; + return; + } + } + } + self.used.insert(new_off, (new_len, RangeState::Acked)); + } + + /// Turn a single sent range into a list of subranges that align with existing + /// acknowledged ranges. + /// + /// This is more complicated than adding acked ranges because any acked ranges + /// need to be kept in place, with sent ranges filling the gaps. + /// + /// This means: + /// ```ignore + /// AAA S AAAS AAAAA + /// + SSSSSSSSSSSSS + /// = AAASSSAAASSAAAAA + /// ``` + /// + /// But we also have to ensure that: + /// ```ignore + /// SSSS + /// + SS + /// = SSSSSS + /// ``` + /// and + /// ```ignore + /// SSSSS + /// + SS + /// = SSSSSS + /// ``` + #[allow(clippy::missing_panics_doc)] // not possible + pub fn mark_sent(&mut self, mut new_off: u64, new_len: usize) { + let new_end = new_off + u64::try_from(new_len).unwrap(); + new_off = max(self.acked, new_off); + let mut new_len = new_end.saturating_sub(new_off); + if new_len == 0 { + return; } - v - } + self.first_unmarked = None; - /// Merge contiguous Acked ranges into the first entry (0). This range may - /// be dropped from the send buffer. - fn coalesce_acked_from_zero(&mut self) { - let acked_range_from_zero = self + // Get all existing ranges that start within this new range. + let covered = self .used - .get_mut(&0) - .filter(|(_, state)| *state == RangeState::Acked) - .map(|(len, _)| *len); - - if let Some(len_from_zero) = acked_range_from_zero { - let mut new_len_from_zero = len_from_zero; - - // See if there's another Acked range entry contiguous to this one - while let Some((next_len, _)) = self - .used - .get(&new_len_from_zero) - .filter(|(_, state)| *state == RangeState::Acked) - { - let to_remove = new_len_from_zero; - new_len_from_zero += *next_len; - self.used.remove(&to_remove); - } - - if len_from_zero != new_len_from_zero { - self.used.get_mut(&0).expect("must be there").0 = new_len_from_zero; + .range(new_off..(new_off + new_len)) + .map(|(&k, _)| k) + .collect::<SmallVec<[u64; 8]>>(); + + if let Entry::Occupied(next_entry) = self.used.entry(new_end) { + if next_entry.get().1 == RangeState::Sent { + // Check if the very next entry is the same type as this, so it can be merged. + let (extra_len, _) = next_entry.remove(); + new_len += extra_len; } } - } - fn mark_range(&mut self, off: u64, len: usize, state: RangeState) { - if len == 0 { - qinfo!("mark 0-length range at {}", off); - return; - } + // Merge with any preceding sent range that might overlap, + // or cut the head of this if the preceding range is acked. + let prev = self.used.range(..new_off).next_back(); + if let Some((&prev_off, &(prev_len, prev_state))) = prev { + if prev_off + prev_len >= new_off { + let overlap = prev_off + prev_len - new_off; + new_len = new_len.saturating_sub(overlap); + if new_len == 0 { + // The previous range completely covers this one (no more to do). + return; + } - let subranges = self.chunk_range_on_edges(off, len as u64, state); + if prev_state == RangeState::Acked { + // The previous range is acked, so it cuts this one. + new_off += overlap; + } else { + // Extend the current range backwards. + new_off = prev_off; + new_len += prev_len; + // The previous range will be updated below. + // It might need to be cut because of a covered acked range. + } + } + } - for (sub_off, sub_len, sub_state) in subranges { - self.used.insert(sub_off, (sub_len, sub_state)); + // Now interleave new sent chunks with any existing acked chunks. + for old_off in covered { + let Entry::Occupied(e) = self.used.entry(old_off) else { + unreachable!(); + }; + let &(old_len, old_state) = e.get(); + if old_state == RangeState::Acked { + // Now we have to insert a chunk ahead of this acked chunk. + let chunk_len = old_off - new_off; + if chunk_len > 0 { + self.used.insert(new_off, (chunk_len, RangeState::Sent)); + } + let included = chunk_len + old_len; + new_len = new_len.saturating_sub(included); + if new_len == 0 { + return; + } + new_off += included; + } else { + let overhang = (old_off + old_len).saturating_sub(new_off + new_len); + new_len += overhang; + if *e.key() != new_off { + // Retain a sent entry at `new_off`. + // This avoids the work of removing and re-creating an entry. + // The value will be overwritten when the next insert occurs, + // either when this loop hits an acked range (above) + // or for any remainder (below). + e.remove(); + } + } } - self.coalesce_acked_from_zero(); + self.used.insert(new_off, (new_len, RangeState::Sent)); } fn unmark_range(&mut self, off: u64, len: usize) { @@ -315,6 +408,7 @@ impl RangeTracker { return; } + self.first_unmarked = None; let len = u64::try_from(len).unwrap(); let end_off = off + len; @@ -376,6 +470,9 @@ impl RangeTracker { } /// Unmark all sent ranges. + /// # Panics + /// On 32-bit machines where far too much is sent before calling this. + /// Note that this should not be called for handshakes, which should never exceed that limit. pub fn unmark_sent(&mut self) { self.unmark_range(0, usize::try_from(self.highest_offset()).unwrap()); } @@ -384,36 +481,37 @@ impl RangeTracker { /// Buffer to contain queued bytes and track their state. #[derive(Debug, Default, PartialEq)] pub struct TxBuffer { - retired: u64, // contig acked bytes, no longer in buffer send_buf: VecDeque<u8>, // buffer of not-acked bytes ranges: RangeTracker, // ranges in buffer that have been sent or acked } impl TxBuffer { + #[must_use] pub fn new() -> Self { Self::default() } - /// Attempt to add some or all of the passed-in buffer to the TxBuffer. + /// Attempt to add some or all of the passed-in buffer to the `TxBuffer`. pub fn send(&mut self, buf: &[u8]) -> usize { let can_buffer = min(SEND_BUFFER_SIZE - self.buffered(), buf.len()); if can_buffer > 0 { self.send_buf.extend(&buf[..can_buffer]); - assert!(self.send_buf.len() <= SEND_BUFFER_SIZE); + debug_assert!(self.send_buf.len() <= SEND_BUFFER_SIZE); } can_buffer } - pub fn next_bytes(&self) -> Option<(u64, &[u8])> { + #[allow(clippy::missing_panics_doc)] // These are not possible. + pub fn next_bytes(&mut self) -> Option<(u64, &[u8])> { let (start, maybe_len) = self.ranges.first_unmarked_range(); - if start == self.retired + u64::try_from(self.buffered()).unwrap() { + if start == self.retired() + u64::try_from(self.buffered()).unwrap() { return None; } // Convert from ranges-relative-to-zero to // ranges-relative-to-buffer-start - let buff_off = usize::try_from(start - self.retired).unwrap(); + let buff_off = usize::try_from(start - self.retired()).unwrap(); // Deque returns two slices. Create a subslice from whichever // one contains the first unmarked data. @@ -437,23 +535,22 @@ impl TxBuffer { } pub fn mark_as_sent(&mut self, offset: u64, len: usize) { - self.ranges.mark_range(offset, len, RangeState::Sent); + self.ranges.mark_sent(offset, len); } + #[allow(clippy::missing_panics_doc)] // Not possible here. pub fn mark_as_acked(&mut self, offset: u64, len: usize) { - self.ranges.mark_range(offset, len, RangeState::Acked); + let prev_retired = self.retired(); + self.ranges.mark_acked(offset, len); - // We can drop contig acked range from the buffer - let new_retirable = self.ranges.acked_from_zero() - self.retired; + // Any newly-retired bytes can be dropped from the buffer. + let new_retirable = self.retired() - prev_retired; debug_assert!(new_retirable <= self.buffered() as u64); - let keep_len = - self.buffered() - usize::try_from(new_retirable).expect("should fit in usize"); + let keep = self.buffered() - usize::try_from(new_retirable).unwrap(); // Truncate front - self.send_buf.rotate_left(self.buffered() - keep_len); - self.send_buf.truncate(keep_len); - - self.retired += new_retirable; + self.send_buf.rotate_left(self.buffered() - keep); + self.send_buf.truncate(keep); } pub fn mark_as_lost(&mut self, offset: u64, len: usize) { @@ -465,8 +562,9 @@ impl TxBuffer { self.ranges.unmark_sent(); } + #[must_use] pub fn retired(&self) -> u64 { - self.retired + self.ranges.acked_from_zero() } fn buffered(&self) -> usize { @@ -478,7 +576,7 @@ impl TxBuffer { } fn used(&self) -> u64 { - self.retired + u64::try_from(self.buffered()).unwrap() + self.retired() + u64::try_from(self.buffered()).unwrap() } } @@ -693,6 +791,7 @@ impl SendStream { self.fair = make_fair; } + #[must_use] pub fn is_fair(&self) -> bool { self.fair } @@ -706,6 +805,7 @@ impl SendStream { self.retransmission_priority = retransmission; } + #[must_use] pub fn sendorder(&self) -> Option<SendOrder> { self.sendorder } @@ -715,6 +815,7 @@ impl SendStream { } /// If all data has been buffered or written, how much was sent. + #[must_use] pub fn final_size(&self) -> Option<u64> { match &self.state { SendStreamState::DataSent { send_buf, .. } => Some(send_buf.used()), @@ -723,10 +824,13 @@ impl SendStream { } } + #[must_use] pub fn stats(&self) -> SendStreamStats { SendStreamStats::new(self.bytes_written(), self.bytes_sent, self.bytes_acked()) } + #[must_use] + #[allow(clippy::missing_panics_doc)] // not possible pub fn bytes_written(&self) -> u64 { match &self.state { SendStreamState::Send { send_buf, .. } | SendStreamState::DataSent { send_buf, .. } => { @@ -749,6 +853,7 @@ impl SendStream { } } + #[must_use] pub fn bytes_acked(&self) -> u64 { match &self.state { SendStreamState::Send { send_buf, .. } | SendStreamState::DataSent { send_buf, .. } => { @@ -766,11 +871,13 @@ impl SendStream { /// offset. fn next_bytes(&mut self, retransmission_only: bool) -> Option<(u64, &[u8])> { match self.state { - SendStreamState::Send { ref send_buf, .. } => { - send_buf.next_bytes().and_then(|(offset, slice)| { + SendStreamState::Send { + ref mut send_buf, .. + } => { + let result = send_buf.next_bytes(); + if let Some((offset, slice)) = result { if retransmission_only { qtrace!( - [self], "next_bytes apply retransmission limit at {}", self.retransmission_offset ); @@ -786,13 +893,16 @@ impl SendStream { } else { Some((offset, slice)) } - }) + } else { + None + } } SendStreamState::DataSent { - ref send_buf, + ref mut send_buf, fin_sent, .. } => { + let used = send_buf.used(); // immutable first let bytes = send_buf.next_bytes(); if bytes.is_some() { bytes @@ -800,7 +910,7 @@ impl SendStream { None } else { // Send empty stream frame with fin set - Some((send_buf.used(), &[])) + Some((used, &[])) } } SendStreamState::Ready { .. } @@ -833,6 +943,7 @@ impl SendStream { } /// Maybe write a `STREAM` frame. + #[allow(clippy::missing_panics_doc)] // not possible pub fn write_stream_frame( &mut self, priority: TransmissionPriority, @@ -995,6 +1106,7 @@ impl SendStream { } } + #[allow(clippy::missing_panics_doc)] // not possible pub fn mark_as_sent(&mut self, offset: u64, len: usize, fin: bool) { self.bytes_sent = max(self.bytes_sent, offset + u64::try_from(len).unwrap()); @@ -1010,6 +1122,7 @@ impl SendStream { } } + #[allow(clippy::missing_panics_doc)] // not possible pub fn mark_as_acked(&mut self, offset: u64, len: usize, fin: bool) { match self.state { SendStreamState::Send { @@ -1047,6 +1160,7 @@ impl SendStream { } } + #[allow(clippy::missing_panics_doc)] // not possible pub fn mark_as_lost(&mut self, offset: u64, len: usize, fin: bool) { self.retransmission_offset = max( self.retransmission_offset, @@ -1075,6 +1189,7 @@ impl SendStream { /// Bytes sendable on stream. Constrained by stream credit available, /// connection credit available, and space in the tx buffer. + #[must_use] pub fn avail(&self) -> usize { if let SendStreamState::Ready { fc, conn_fc } | SendStreamState::Send { fc, conn_fc, .. } = &self.state @@ -1100,6 +1215,7 @@ impl SendStream { } } + #[must_use] pub fn is_terminal(&self) -> bool { matches!( self.state, @@ -1107,10 +1223,14 @@ impl SendStream { ) } + /// # Errors + /// When `buf` is empty or when the stream is already closed. pub fn send(&mut self, buf: &[u8]) -> Res<usize> { self.send_internal(buf, false) } + /// # Errors + /// When `buf` is empty or when the stream is already closed. pub fn send_atomic(&mut self, buf: &[u8]) -> Res<usize> { self.send_internal(buf, true) } @@ -1155,9 +1275,9 @@ impl SendStream { if atomic { self.send_blocked_if_space_needed(buf.len()); return Ok(0); - } else { - &buf[..self.avail()] } + + &buf[..self.avail()] } else { buf }; @@ -1202,6 +1322,7 @@ impl SendStream { } } + #[allow(clippy::missing_panics_doc)] // not possible pub fn reset(&mut self, err: AppError) { match &self.state { SendStreamState::Ready { fc, .. } => { @@ -1296,6 +1417,7 @@ impl OrderGroup { } } + #[must_use] pub fn stream_ids(&self) -> &Vec<StreamId> { &self.vec } @@ -1319,26 +1441,24 @@ impl OrderGroup { next } + /// # Panics + /// If the stream ID is already present. pub fn insert(&mut self, stream_id: StreamId) { - match self.vec.binary_search(&stream_id) { - Ok(_) => { - // element already in vector @ `pos` - panic!("Duplicate stream_id {}", stream_id) - } - Err(pos) => self.vec.insert(pos, stream_id), - } + let Err(pos) = self.vec.binary_search(&stream_id) else { + // element already in vector @ `pos` + panic!("Duplicate stream_id {stream_id}"); + }; + self.vec.insert(pos, stream_id); } + /// # Panics + /// If the stream ID is not present. pub fn remove(&mut self, stream_id: StreamId) { - match self.vec.binary_search(&stream_id) { - Ok(pos) => { - self.vec.remove(pos); - } - Err(_) => { - // element already in vector @ `pos` - panic!("Missing stream_id {}", stream_id) - } - } + let Ok(pos) = self.vec.binary_search(&stream_id) else { + // element already in vector @ `pos` + panic!("Missing stream_id {stream_id}"); + }; + self.vec.remove(pos); } } @@ -1579,16 +1699,16 @@ impl SendStreams { // Iterate the map, but only those without fairness, then iterate // OrderGroups, then iterate each group - qdebug!("processing streams... unfair:"); + qtrace!("processing streams... unfair:"); for stream in self.map.values_mut() { if !stream.is_fair() { - qdebug!(" {}", stream); + qtrace!(" {}", stream); if !stream.write_frames_with_early_return(priority, builder, tokens, stats) { break; } } } - qdebug!("fair streams:"); + qtrace!("fair streams:"); let stream_ids = self.regular.iter().chain( self.sendordered .values_mut() @@ -1598,9 +1718,9 @@ impl SendStreams { for stream_id in stream_ids { let stream = self.map.get_mut(&stream_id).unwrap(); if let Some(order) = stream.sendorder() { - qdebug!(" {} ({})", stream_id, order) + qtrace!(" {} ({})", stream_id, order); } else { - qdebug!(" None") + qtrace!(" None"); } if !stream.write_frames_with_early_return(priority, builder, tokens, stats) { break; @@ -1609,7 +1729,7 @@ impl SendStreams { } pub fn update_initial_limit(&mut self, remote: &TransportParameters) { - for (id, ss) in self.map.iter_mut() { + for (id, ss) in &mut self.map { let limit = if id.is_bidi() { assert!(!id.is_remote_initiated(Role::Client)); remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE) @@ -1640,55 +1760,391 @@ pub struct SendStreamRecoveryToken { #[cfg(test)] mod tests { - use neqo_common::{event::Provider, hex_with_len, qtrace}; - - use super::*; - use crate::events::ConnectionEvent; + use std::{cell::RefCell, collections::VecDeque, rc::Rc}; + + use neqo_common::{event::Provider, hex_with_len, qtrace, Encoder}; + + use super::SendStreamRecoveryToken; + use crate::{ + connection::{RetransmissionPriority, TransmissionPriority}, + events::ConnectionEvent, + fc::SenderFlowControl, + packet::PacketBuilder, + recovery::{RecoveryToken, StreamRecoveryToken}, + send_stream::{ + RangeState, RangeTracker, SendStream, SendStreamState, SendStreams, TxBuffer, + }, + stats::FrameStats, + ConnectionEvents, StreamId, SEND_BUFFER_SIZE, + }; fn connection_fc(limit: u64) -> Rc<RefCell<SenderFlowControl<()>>> { Rc::new(RefCell::new(SenderFlowControl::new((), limit))) } #[test] - fn test_mark_range() { + fn mark_acked_from_zero() { let mut rt = RangeTracker::default(); // ranges can go from nothing->Sent if queued for retrans and then // acks arrive - rt.mark_range(5, 5, RangeState::Acked); + rt.mark_acked(5, 5); assert_eq!(rt.highest_offset(), 10); assert_eq!(rt.acked_from_zero(), 0); - rt.mark_range(10, 4, RangeState::Acked); + rt.mark_acked(10, 4); assert_eq!(rt.highest_offset(), 14); assert_eq!(rt.acked_from_zero(), 0); - rt.mark_range(0, 5, RangeState::Sent); + rt.mark_sent(0, 5); assert_eq!(rt.highest_offset(), 14); assert_eq!(rt.acked_from_zero(), 0); - rt.mark_range(0, 5, RangeState::Acked); + rt.mark_acked(0, 5); assert_eq!(rt.highest_offset(), 14); assert_eq!(rt.acked_from_zero(), 14); - rt.mark_range(12, 20, RangeState::Acked); + rt.mark_acked(12, 20); assert_eq!(rt.highest_offset(), 32); assert_eq!(rt.acked_from_zero(), 32); // ack the lot - rt.mark_range(0, 400, RangeState::Acked); + rt.mark_acked(0, 400); assert_eq!(rt.highest_offset(), 400); assert_eq!(rt.acked_from_zero(), 400); // acked trumps sent - rt.mark_range(0, 200, RangeState::Sent); + rt.mark_sent(0, 200); assert_eq!(rt.highest_offset(), 400); assert_eq!(rt.acked_from_zero(), 400); } + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// SSS SSSAAASSS + /// + AAAAAAAAA + /// = SSSAAAAAAAAASS + /// ``` + #[test] + fn mark_acked_1() { + let mut rt = RangeTracker::default(); + rt.mark_sent(0, 3); + rt.mark_sent(6, 3); + rt.mark_acked(9, 3); + rt.mark_sent(12, 3); + + rt.mark_acked(3, 10); + + let mut canon = RangeTracker::default(); + canon.used.insert(0, (3, RangeState::Sent)); + canon.used.insert(3, (10, RangeState::Acked)); + canon.used.insert(13, (2, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// SSS SSS AAA + /// + AAAAAAAAA + /// = SSAAAAAAAAAAAA + /// ``` + #[test] + fn mark_acked_2() { + let mut rt = RangeTracker::default(); + rt.mark_sent(0, 3); + rt.mark_sent(6, 3); + rt.mark_acked(12, 3); + + rt.mark_acked(2, 10); + + let mut canon = RangeTracker::default(); + canon.used.insert(0, (2, RangeState::Sent)); + canon.used.insert(2, (13, RangeState::Acked)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// AASSS AAAA + /// + AAAAAAAAA + /// = AAAAAAAAAAAA + /// ``` + #[test] + fn mark_acked_3() { + let mut rt = RangeTracker::default(); + rt.mark_acked(1, 2); + rt.mark_sent(3, 3); + rt.mark_acked(8, 4); + + rt.mark_acked(0, 9); + + let canon = RangeTracker { + acked: 12, + ..RangeTracker::default() + }; + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// SSS + /// + AAAA + /// = AAAASS + /// ``` + #[test] + fn mark_acked_4() { + let mut rt = RangeTracker::default(); + rt.mark_sent(3, 3); + + rt.mark_acked(0, 4); + + let mut canon = RangeTracker { + acked: 4, + ..Default::default() + }; + canon.used.insert(4, (2, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// AAAAAASSS + /// + AAA + /// = AAAAAASSS + /// ``` + #[test] + fn mark_acked_5() { + let mut rt = RangeTracker::default(); + rt.mark_acked(0, 6); + rt.mark_sent(6, 3); + + rt.mark_acked(3, 3); + + let mut canon = RangeTracker { + acked: 6, + ..RangeTracker::default() + }; + canon.used.insert(6, (3, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// AAA AAA AAA + /// + AAAAAAA + /// = AAAAAAAAAAAAA + /// ``` + #[test] + fn mark_acked_6() { + let mut rt = RangeTracker::default(); + rt.mark_acked(3, 3); + rt.mark_acked(8, 3); + rt.mark_acked(13, 3); + + rt.mark_acked(6, 7); + + let mut canon = RangeTracker::default(); + canon.used.insert(3, (13, RangeState::Acked)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// AAA AAA + /// + AAA + /// = AAAAAAAA + /// ``` + #[test] + fn mark_acked_7() { + let mut rt = RangeTracker::default(); + rt.mark_acked(3, 3); + rt.mark_acked(8, 3); + + rt.mark_acked(6, 3); + + let mut canon = RangeTracker::default(); + canon.used.insert(3, (8, RangeState::Acked)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// SSSSSSSS + /// + AAAA + /// = SSAAAASS + /// ``` + #[test] + fn mark_acked_8() { + let mut rt = RangeTracker::default(); + rt.mark_sent(0, 8); + + rt.mark_acked(2, 4); + + let mut canon = RangeTracker::default(); + canon.used.insert(0, (2, RangeState::Sent)); + canon.used.insert(2, (4, RangeState::Acked)); + canon.used.insert(6, (2, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// SSS + /// + AAA + /// = AAA SSS + /// ``` + #[test] + fn mark_acked_9() { + let mut rt = RangeTracker::default(); + rt.mark_sent(5, 3); + + rt.mark_acked(0, 3); + + let mut canon = RangeTracker { + acked: 3, + ..Default::default() + }; + canon.used.insert(5, (3, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_sent` correctly handles all paths. + /// ```ignore + /// AAA AAA SSS + /// + SSSSSSSSSSSS + /// = AAASSSAAASSSSSS + /// ``` + #[test] + fn mark_sent_1() { + let mut rt = RangeTracker::default(); + rt.mark_acked(0, 3); + rt.mark_acked(6, 3); + rt.mark_sent(12, 3); + + rt.mark_sent(0, 12); + + let mut canon = RangeTracker { + acked: 3, + ..RangeTracker::default() + }; + canon.used.insert(3, (3, RangeState::Sent)); + canon.used.insert(6, (3, RangeState::Acked)); + canon.used.insert(9, (6, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_sent` correctly handles all paths. + /// ```ignore + /// AAASS AAA S SSSS + /// + SSSSSSSSSSSSS + /// = AAASSSAAASSSSSSS + /// ``` + #[test] + fn mark_sent_2() { + let mut rt = RangeTracker::default(); + rt.mark_acked(0, 3); + rt.mark_sent(3, 2); + rt.mark_acked(6, 3); + rt.mark_sent(10, 1); + rt.mark_sent(12, 4); + + rt.mark_sent(0, 13); + + let mut canon = RangeTracker { + acked: 3, + ..RangeTracker::default() + }; + canon.used.insert(3, (3, RangeState::Sent)); + canon.used.insert(6, (3, RangeState::Acked)); + canon.used.insert(9, (7, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_sent` correctly handles all paths. + /// ```ignore + /// AAA AAA + /// + SSSS + /// = AAASSAAA + /// ``` + #[test] + fn mark_sent_3() { + let mut rt = RangeTracker::default(); + rt.mark_acked(0, 3); + rt.mark_acked(5, 3); + + rt.mark_sent(2, 4); + + let mut canon = RangeTracker { + acked: 3, + ..RangeTracker::default() + }; + canon.used.insert(3, (2, RangeState::Sent)); + canon.used.insert(5, (3, RangeState::Acked)); + assert_eq!(rt, canon); + } + + /// Check that `marked_sent` correctly handles all paths. + /// ```ignore + /// SSS AAA SS + /// + SSSSSSSS + /// = SSSSSAAASSSS + /// ``` + #[test] + fn mark_sent_4() { + let mut rt = RangeTracker::default(); + rt.mark_sent(0, 3); + rt.mark_acked(5, 3); + rt.mark_sent(10, 2); + + rt.mark_sent(2, 8); + + let mut canon = RangeTracker::default(); + canon.used.insert(0, (5, RangeState::Sent)); + canon.used.insert(5, (3, RangeState::Acked)); + canon.used.insert(8, (4, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_sent` correctly handles all paths. + /// ```ignore + /// AAA + /// + SSSSSS + /// = AAASSS + /// ``` + #[test] + fn mark_sent_5() { + let mut rt = RangeTracker::default(); + rt.mark_acked(3, 3); + + rt.mark_sent(3, 6); + + let mut canon = RangeTracker::default(); + canon.used.insert(3, (3, RangeState::Acked)); + canon.used.insert(6, (3, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_sent` correctly handles all paths. + /// ```ignore + /// SSSSS + /// + SSS + /// = SSSSS + /// ``` + #[test] + fn mark_sent_6() { + let mut rt = RangeTracker::default(); + rt.mark_sent(0, 5); + + rt.mark_sent(1, 3); + + let mut canon = RangeTracker::default(); + canon.used.insert(0, (5, RangeState::Sent)); + assert_eq!(rt, canon); + } + #[test] fn unmark_sent_start() { let mut rt = RangeTracker::default(); - rt.mark_range(0, 5, RangeState::Sent); + rt.mark_sent(0, 5); assert_eq!(rt.highest_offset(), 5); assert_eq!(rt.acked_from_zero(), 0); @@ -1702,13 +2158,13 @@ mod tests { fn unmark_sent_middle() { let mut rt = RangeTracker::default(); - rt.mark_range(0, 5, RangeState::Acked); + rt.mark_acked(0, 5); assert_eq!(rt.highest_offset(), 5); assert_eq!(rt.acked_from_zero(), 5); - rt.mark_range(5, 5, RangeState::Sent); + rt.mark_sent(5, 5); assert_eq!(rt.highest_offset(), 10); assert_eq!(rt.acked_from_zero(), 5); - rt.mark_range(10, 5, RangeState::Acked); + rt.mark_acked(10, 5); assert_eq!(rt.highest_offset(), 15); assert_eq!(rt.acked_from_zero(), 5); assert_eq!(rt.first_unmarked_range(), (15, None)); @@ -1723,10 +2179,10 @@ mod tests { fn unmark_sent_end() { let mut rt = RangeTracker::default(); - rt.mark_range(0, 5, RangeState::Acked); + rt.mark_acked(0, 5); assert_eq!(rt.highest_offset(), 5); assert_eq!(rt.acked_from_zero(), 5); - rt.mark_range(5, 5, RangeState::Sent); + rt.mark_sent(5, 5); assert_eq!(rt.highest_offset(), 10); assert_eq!(rt.acked_from_zero(), 5); assert_eq!(rt.first_unmarked_range(), (10, None)); @@ -1752,11 +2208,11 @@ mod tests { } #[test] - fn test_unmark_range() { + fn unmark_range() { let mut rt = RangeTracker::default(); - rt.mark_range(5, 5, RangeState::Acked); - rt.mark_range(10, 5, RangeState::Sent); + rt.mark_acked(5, 5); + rt.mark_sent(10, 5); // Should unmark sent but not acked range rt.unmark_range(7, 6); @@ -1772,11 +2228,11 @@ mod tests { (&13, &(2, RangeState::Sent)) ); assert!(rt.used.iter().nth(2).is_none()); - rt.mark_range(0, 5, RangeState::Sent); + rt.mark_sent(0, 5); let res = rt.first_unmarked_range(); assert_eq!(res, (10, Some(3))); - rt.mark_range(10, 3, RangeState::Sent); + rt.mark_sent(10, 3); let res = rt.first_unmarked_range(); assert_eq!(res, (15, None)); @@ -1790,14 +2246,15 @@ mod tests { assert_eq!(txb.avail(), SEND_BUFFER_SIZE); // Fill the buffer - assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE); + let big_buf = vec![1; SEND_BUFFER_SIZE * 2]; + assert_eq!(txb.send(&big_buf), SEND_BUFFER_SIZE); assert!(matches!(txb.next_bytes(), - Some((0, x)) if x.len()==SEND_BUFFER_SIZE + Some((0, x)) if x.len() == SEND_BUFFER_SIZE && x.iter().all(|ch| *ch == 1))); // Mark almost all as sent. Get what's left let one_byte_from_end = SEND_BUFFER_SIZE as u64 - 1; - txb.mark_as_sent(0, one_byte_from_end as usize); + txb.mark_as_sent(0, usize::try_from(one_byte_from_end).unwrap()); assert!(matches!(txb.next_bytes(), Some((start, x)) if x.len() == 1 && start == one_byte_from_end @@ -1826,14 +2283,14 @@ mod tests { // Contig acked range at start means it can be removed from buffer // Impl of vecdeque should now result in a split buffer when more data // is sent - txb.mark_as_acked(0, five_bytes_from_end as usize); + txb.mark_as_acked(0, usize::try_from(five_bytes_from_end).unwrap()); assert_eq!(txb.send(&[2; 30]), 30); // Just get 5 even though there is more assert!(matches!(txb.next_bytes(), Some((start, x)) if x.len() == 5 && start == five_bytes_from_end && x.iter().all(|ch| *ch == 1))); - assert_eq!(txb.retired, five_bytes_from_end); + assert_eq!(txb.retired(), five_bytes_from_end); assert_eq!(txb.buffered(), 35); // Marking that bit as sent should let the last contig bit be returned @@ -1852,7 +2309,8 @@ mod tests { assert_eq!(txb.avail(), SEND_BUFFER_SIZE); // Fill the buffer - assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE); + let big_buf = vec![1; SEND_BUFFER_SIZE * 2]; + assert_eq!(txb.send(&big_buf), SEND_BUFFER_SIZE); assert!(matches!(txb.next_bytes(), Some((0, x)) if x.len()==SEND_BUFFER_SIZE && x.iter().all(|ch| *ch == 1))); @@ -1860,7 +2318,7 @@ mod tests { // As above let forty_bytes_from_end = SEND_BUFFER_SIZE as u64 - 40; - txb.mark_as_acked(0, forty_bytes_from_end as usize); + txb.mark_as_acked(0, usize::try_from(forty_bytes_from_end).unwrap()); assert!(matches!(txb.next_bytes(), Some((start, x)) if x.len() == 40 && start == forty_bytes_from_end @@ -1888,7 +2346,7 @@ mod tests { // Ack entire first slice and into second slice let ten_bytes_past_end = SEND_BUFFER_SIZE as u64 + 10; - txb.mark_as_acked(0, ten_bytes_past_end as usize); + txb.mark_as_acked(0, usize::try_from(ten_bytes_past_end).unwrap()); // Get up to marked range A assert!(matches!(txb.next_bytes(), @@ -1910,7 +2368,7 @@ mod tests { } #[test] - fn test_stream_tx() { + fn stream_tx() { let conn_fc = connection_fc(4096); let conn_events = ConnectionEvents::default(); @@ -1926,22 +2384,23 @@ mod tests { } // Should hit stream flow control limit before filling up send buffer - let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); + let big_buf = vec![4; SEND_BUFFER_SIZE + 100]; + let res = s.send(&big_buf[..SEND_BUFFER_SIZE]).unwrap(); assert_eq!(res, 1024 - 100); // should do nothing, max stream data already 1024 s.set_max_stream_data(1024); - let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); + let res = s.send(&big_buf[..SEND_BUFFER_SIZE]).unwrap(); assert_eq!(res, 0); // should now hit the conn flow control (4096) s.set_max_stream_data(1_048_576); - let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); + let res = s.send(&big_buf[..SEND_BUFFER_SIZE]).unwrap(); assert_eq!(res, 3072); // should now hit the tx buffer size conn_fc.borrow_mut().update(SEND_BUFFER_SIZE as u64); - let res = s.send(&[4; SEND_BUFFER_SIZE + 100]).unwrap(); + let res = s.send(&big_buf).unwrap(); assert_eq!(res, SEND_BUFFER_SIZE - 4096); // TODO(agrover@mozilla.com): test ooo acks somehow @@ -2012,10 +2471,8 @@ mod tests { // tx buffer size. assert_eq!(s.avail(), SEND_BUFFER_SIZE - 4); - assert_eq!( - s.send(&[b'a'; SEND_BUFFER_SIZE]).unwrap(), - SEND_BUFFER_SIZE - 4 - ); + let big_buf = vec![b'a'; SEND_BUFFER_SIZE]; + assert_eq!(s.send(&big_buf).unwrap(), SEND_BUFFER_SIZE - 4); // No event because still blocked by tx buffer full s.set_max_stream_data(2_000_000_000); @@ -2395,8 +2852,7 @@ mod tests { ); let mut send_buf = TxBuffer::new(); - send_buf.retired = u64::try_from(offset).unwrap(); - send_buf.ranges.mark_range(0, offset, RangeState::Acked); + send_buf.ranges.mark_acked(0, offset); let mut fc = SenderFlowControl::new(StreamId::from(stream), MAX_VARINT); fc.consume(offset); let conn_fc = Rc::new(RefCell::new(SenderFlowControl::new((), MAX_VARINT))); diff --git a/third_party/rust/neqo-transport/src/sender.rs b/third_party/rust/neqo-transport/src/sender.rs index 9a00dfc7a7..3a54851533 100644 --- a/third_party/rust/neqo-transport/src/sender.rs +++ b/third_party/rust/neqo-transport/src/sender.rs @@ -5,7 +5,7 @@ // except according to those terms. // Congestion control -#![deny(clippy::pedantic)] + #![allow(clippy::module_name_repetitions)] use std::{ diff --git a/third_party/rust/neqo-transport/src/server.rs b/third_party/rust/neqo-transport/src/server.rs index 12a7d2f9e0..96a6244ef1 100644 --- a/third_party/rust/neqo-transport/src/server.rs +++ b/third_party/rust/neqo-transport/src/server.rs @@ -43,7 +43,7 @@ pub enum InitialResult { Retry(Vec<u8>), } -/// MIN_INITIAL_PACKET_SIZE is the smallest packet that can be used to establish +/// `MIN_INITIAL_PACKET_SIZE` is the smallest packet that can be used to establish /// a new connection across all QUIC versions this server supports. const MIN_INITIAL_PACKET_SIZE: usize = 1200; /// The size of timer buckets. This is higher than the actual timer granularity @@ -168,7 +168,7 @@ pub struct Server { /// the same key are routed to the connection that was first accepted. /// This is cleared out when the connection is closed or established. active_attempts: HashMap<AttemptKey, StateRef>, - /// All connections, keyed by ConnectionId. + /// All connections, keyed by `ConnectionId`. connections: ConnectionTableRef, /// The connections that have new events. active: HashSet<ActiveConnectionRef>, @@ -195,6 +195,8 @@ impl Server { /// OK. /// * `cid_generator` is responsible for generating connection IDs and parsing them; connection /// IDs produced by the manager cannot be zero-length. + /// # Errors + /// When address validation state cannot be created. pub fn new( now: Instant, certs: &[impl AsRef<str>], @@ -240,6 +242,8 @@ impl Server { self.ciphers = Vec::from(ciphers.as_ref()); } + /// # Errors + /// When the configuration is invalid. pub fn enable_ech( &mut self, config: u8, @@ -251,6 +255,7 @@ impl Server { Ok(()) } + #[must_use] pub fn ech_config(&self) -> &[u8] { self.ech_config.as_ref().map_or(&[], |cfg| &cfg.encoded) } @@ -262,7 +267,7 @@ impl Server { fn process_connection( &mut self, - c: StateRef, + c: &StateRef, dgram: Option<&Datagram>, now: Instant, ) -> Option<Datagram> { @@ -271,24 +276,24 @@ impl Server { match out { Output::Datagram(_) => { qtrace!([self], "Sending packet, added to waiting connections"); - self.waiting.push_back(Rc::clone(&c)); + self.waiting.push_back(Rc::clone(c)); } Output::Callback(delay) => { let next = now + delay; if next != c.borrow().last_timer { qtrace!([self], "Change timer to {:?}", next); - self.remove_timer(&c); + self.remove_timer(c); c.borrow_mut().last_timer = next; - self.timers.add(next, Rc::clone(&c)); + self.timers.add(next, Rc::clone(c)); } } Output::None => { - self.remove_timer(&c); + self.remove_timer(c); } } if c.borrow().has_events() { qtrace!([self], "Connection active: {:?}", c); - self.active.insert(ActiveConnectionRef { c: Rc::clone(&c) }); + self.active.insert(ActiveConnectionRef { c: Rc::clone(c) }); } if *c.borrow().state() > State::Handshaking { @@ -302,13 +307,13 @@ impl Server { c.borrow_mut().set_qlog(NeqoQlog::disabled()); self.connections .borrow_mut() - .retain(|_, v| !Rc::ptr_eq(v, &c)); + .retain(|_, v| !Rc::ptr_eq(v, c)); } out.dgram() } fn connection(&self, cid: ConnectionIdRef) -> Option<StateRef> { - self.connections.borrow().get(&cid[..]).map(Rc::clone) + self.connections.borrow().get(&cid[..]).cloned() } fn handle_initial( @@ -387,7 +392,7 @@ impl Server { attempt_key ); let c = Rc::clone(c); - self.process_connection(c, Some(dgram), now) + self.process_connection(&c, Some(dgram), now) } else { self.accept_connection(attempt_key, initial, dgram, orig_dcid, now) } @@ -395,9 +400,9 @@ impl Server { fn create_qlog_trace(&self, odcid: ConnectionIdRef<'_>) -> NeqoQlog { if let Some(qlog_dir) = &self.qlog_dir { - let mut qlog_path = qlog_dir.to_path_buf(); + let mut qlog_path = qlog_dir.clone(); - qlog_path.push(format!("{}.qlog", odcid)); + qlog_path.push(format!("{odcid}.qlog")); // The original DCID is chosen by the client. Using create_new() // prevents attackers from overwriting existing logs. @@ -456,9 +461,9 @@ impl Server { } if let Some(odcid) = orig_dcid { // There was a retry, so set the connection IDs for. - c.set_retry_cids(odcid, initial.src_cid, initial.dst_cid); + c.set_retry_cids(&odcid, initial.src_cid, &initial.dst_cid); } - c.set_validation(Rc::clone(&self.address_validation)); + c.set_validation(&self.address_validation); c.set_qlog(self.create_qlog_trace(attempt_key.odcid.as_cid_ref())); if let Some(cfg) = &self.ech_config { if c.server_enable_ech(cfg.config, &cfg.public_name, &cfg.sk, &cfg.pk) @@ -505,10 +510,10 @@ impl Server { last_timer: now, active_attempt: Some(attempt_key.clone()), })); - cid_mgr.borrow_mut().set_connection(Rc::clone(&c)); + cid_mgr.borrow_mut().set_connection(&c); let previous_attempt = self.active_attempts.insert(attempt_key, Rc::clone(&c)); debug_assert!(previous_attempt.is_none()); - self.process_connection(c, Some(dgram), now) + self.process_connection(&c, Some(dgram), now) } Err(e) => { qwarn!([self], "Unable to create connection"); @@ -517,7 +522,7 @@ impl Server { &mut self.create_qlog_trace(attempt_key.odcid.as_cid_ref()), self.conn_params.get_versions().all(), initial.version.wire_version(), - ) + ); } None } @@ -544,7 +549,7 @@ impl Server { attempt_key ); let c = Rc::clone(c); - self.process_connection(c, Some(dgram), now) + self.process_connection(&c, Some(dgram), now) } else { qdebug!([self], "Dropping 0-RTT for unknown connection"); None @@ -564,7 +569,7 @@ impl Server { // Finding an existing connection. Should be the most common case. if let Some(c) = self.connection(packet.dcid()) { - return self.process_connection(c, Some(dgram), now); + return self.process_connection(&c, Some(dgram), now); } if packet.packet_type() == PacketType::Short { @@ -637,13 +642,13 @@ impl Server { fn process_next_output(&mut self, now: Instant) -> Option<Datagram> { qtrace!([self], "No packet to send, look at waiting connections"); while let Some(c) = self.waiting.pop_front() { - if let Some(d) = self.process_connection(c, None, now) { + if let Some(d) = self.process_connection(&c, None, now) { return Some(d); } } qtrace!([self], "No packet to send still, run timers"); while let Some(c) = self.timers.take_next(now) { - if let Some(d) = self.process_connection(c, None, now) { + if let Some(d) = self.process_connection(&c, None, now) { return Some(d); } } @@ -684,7 +689,7 @@ impl Server { mem::take(&mut self.active).into_iter().collect() } - pub fn add_to_waiting(&mut self, c: ActiveConnectionRef) { + pub fn add_to_waiting(&mut self, c: &ActiveConnectionRef) { self.waiting.push_back(c.connection()); } } @@ -695,6 +700,7 @@ pub struct ActiveConnectionRef { } impl ActiveConnectionRef { + #[must_use] pub fn borrow(&self) -> impl Deref<Target = Connection> + '_ { std::cell::Ref::map(self.c.borrow(), |c| &c.c) } @@ -703,6 +709,7 @@ impl ActiveConnectionRef { std::cell::RefMut::map(self.c.borrow_mut(), |c| &mut c.c) } + #[must_use] pub fn connection(&self) -> StateRef { Rc::clone(&self.c) } @@ -731,13 +738,13 @@ struct ServerConnectionIdGenerator { } impl ServerConnectionIdGenerator { - pub fn set_connection(&mut self, c: StateRef) { + pub fn set_connection(&mut self, c: &StateRef) { let saved = std::mem::replace(&mut self.saved_cids, Vec::with_capacity(0)); for cid in saved { qtrace!("ServerConnectionIdGenerator inserting saved cid {}", cid); - self.insert_cid(cid, Rc::clone(&c)); + self.insert_cid(cid, Rc::clone(c)); } - self.c = Rc::downgrade(&c); + self.c = Rc::downgrade(c); } fn insert_cid(&mut self, cid: ConnectionId, rc: StateRef) { diff --git a/third_party/rust/neqo-transport/src/stats.rs b/third_party/rust/neqo-transport/src/stats.rs index d6c7a911f9..9eff503dcf 100644 --- a/third_party/rust/neqo-transport/src/stats.rs +++ b/third_party/rust/neqo-transport/src/stats.rs @@ -5,7 +5,6 @@ // except according to those terms. // Tracking of some useful statistics. -#![deny(clippy::pedantic)] use std::{ cell::RefCell, diff --git a/third_party/rust/neqo-transport/src/stream_id.rs b/third_party/rust/neqo-transport/src/stream_id.rs index f3b07b86a8..8dbe2dcfbc 100644 --- a/third_party/rust/neqo-transport/src/stream_id.rs +++ b/third_party/rust/neqo-transport/src/stream_id.rs @@ -20,10 +20,12 @@ pub enum StreamType { pub struct StreamId(u64); impl StreamId { + #[must_use] pub const fn new(id: u64) -> Self { Self(id) } + #[must_use] pub fn init(stream_type: StreamType, role: Role) -> Self { let type_val = match stream_type { StreamType::BiDi => 0, @@ -32,18 +34,22 @@ impl StreamId { Self(type_val + Self::role_bit(role)) } + #[must_use] pub fn as_u64(self) -> u64 { self.0 } + #[must_use] pub fn is_bidi(self) -> bool { self.as_u64() & 0x02 == 0 } + #[must_use] pub fn is_uni(self) -> bool { !self.is_bidi() } + #[must_use] pub fn stream_type(self) -> StreamType { if self.is_bidi() { StreamType::BiDi @@ -52,14 +58,17 @@ impl StreamId { } } + #[must_use] pub fn is_client_initiated(self) -> bool { self.as_u64() & 0x01 == 0 } + #[must_use] pub fn is_server_initiated(self) -> bool { !self.is_client_initiated() } + #[must_use] pub fn role(self) -> Role { if self.is_client_initiated() { Role::Client @@ -68,6 +77,7 @@ impl StreamId { } } + #[must_use] pub fn is_self_initiated(self, my_role: Role) -> bool { match my_role { Role::Client if self.is_client_initiated() => true, @@ -76,14 +86,17 @@ impl StreamId { } } + #[must_use] pub fn is_remote_initiated(self, my_role: Role) -> bool { !self.is_self_initiated(my_role) } + #[must_use] pub fn is_send_only(self, my_role: Role) -> bool { self.is_uni() && self.is_self_initiated(my_role) } + #[must_use] pub fn is_recv_only(self, my_role: Role) -> bool { self.is_uni() && self.is_remote_initiated(my_role) } @@ -93,6 +106,7 @@ impl StreamId { } /// This returns a bit that is shared by all streams created by this role. + #[must_use] pub fn role_bit(role: Role) -> u64 { match role { Role::Server => 1, diff --git a/third_party/rust/neqo-transport/src/streams.rs b/third_party/rust/neqo-transport/src/streams.rs index 7cbb29ce02..d8662afa3b 100644 --- a/third_party/rust/neqo-transport/src/streams.rs +++ b/third_party/rust/neqo-transport/src/streams.rs @@ -95,6 +95,7 @@ impl Streams { } } + #[must_use] pub fn is_stream_id_allowed(&self, stream_id: StreamId) -> bool { self.remote_stream_limits[stream_id.stream_type()].is_allowed(stream_id) } @@ -118,7 +119,9 @@ impl Streams { self.local_stream_limits = LocalStreamLimits::new(self.role); } - pub fn input_frame(&mut self, frame: Frame, stats: &mut FrameStats) -> Res<()> { + /// # Errors + /// When the frame is invalid. + pub fn input_frame(&mut self, frame: &Frame, stats: &mut FrameStats) -> Res<()> { match frame { Frame::ResetStream { stream_id, @@ -126,8 +129,8 @@ impl Streams { final_size, } => { stats.reset_stream += 1; - if let (_, Some(rs)) = self.obtain_stream(stream_id)? { - rs.reset(application_error_code, final_size)?; + if let (_, Some(rs)) = self.obtain_stream(*stream_id)? { + rs.reset(*application_error_code, *final_size)?; } } Frame::StopSending { @@ -136,9 +139,9 @@ impl Streams { } => { stats.stop_sending += 1; self.events - .send_stream_stop_sending(stream_id, application_error_code); - if let (Some(ss), _) = self.obtain_stream(stream_id)? { - ss.reset(application_error_code); + .send_stream_stop_sending(*stream_id, *application_error_code); + if let (Some(ss), _) = self.obtain_stream(*stream_id)? { + ss.reset(*application_error_code); } } Frame::Stream { @@ -149,13 +152,13 @@ impl Streams { .. } => { stats.stream += 1; - if let (_, Some(rs)) = self.obtain_stream(stream_id)? { - rs.inbound_stream_frame(fin, offset, data)?; + if let (_, Some(rs)) = self.obtain_stream(*stream_id)? { + rs.inbound_stream_frame(*fin, *offset, data)?; } } Frame::MaxData { maximum_data } => { stats.max_data += 1; - self.handle_max_data(maximum_data); + self.handle_max_data(*maximum_data); } Frame::MaxStreamData { stream_id, @@ -163,12 +166,12 @@ impl Streams { } => { qtrace!( "Stream {} Received MaxStreamData {}", - stream_id, - maximum_stream_data + *stream_id, + *maximum_stream_data ); stats.max_stream_data += 1; - if let (Some(ss), _) = self.obtain_stream(stream_id)? { - ss.set_max_stream_data(maximum_stream_data); + if let (Some(ss), _) = self.obtain_stream(*stream_id)? { + ss.set_max_stream_data(*maximum_stream_data); } } Frame::MaxStreams { @@ -176,7 +179,7 @@ impl Streams { maximum_streams, } => { stats.max_streams += 1; - self.handle_max_streams(stream_type, maximum_streams); + self.handle_max_streams(*stream_type, *maximum_streams); } Frame::DataBlocked { data_limit } => { // Should never happen since we set data limit to max @@ -193,7 +196,7 @@ impl Streams { return Err(Error::StreamStateError); } - if let (_, Some(rs)) = self.obtain_stream(stream_id)? { + if let (_, Some(rs)) = self.obtain_stream(*stream_id)? { rs.send_flowc_update(); } } @@ -401,6 +404,8 @@ impl Streams { /// Get or make a stream, and implicitly open additional streams as /// indicated by its stream id. + /// # Errors + /// When the stream cannot be created due to stream limits. pub fn obtain_stream( &mut self, stream_id: StreamId, @@ -412,14 +417,20 @@ impl Streams { )) } + /// # Errors + /// When the stream does not exist. pub fn set_sendorder(&mut self, stream_id: StreamId, sendorder: Option<SendOrder>) -> Res<()> { self.send.set_sendorder(stream_id, sendorder) } + /// # Errors + /// When the stream does not exist. pub fn set_fairness(&mut self, stream_id: StreamId, fairness: bool) -> Res<()> { self.send.set_fairness(stream_id, fairness) } + /// # Errors + /// When a stream cannot be created, which might be temporary. pub fn stream_create(&mut self, st: StreamType) -> Res<StreamId> { match self.local_stream_limits.take_stream_id(st) { None => Err(Error::StreamLimitError), @@ -525,18 +536,26 @@ impl Streams { } } + /// # Errors + /// When the stream does not exist. pub fn get_send_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut SendStream> { self.send.get_mut(stream_id) } + /// # Errors + /// When the stream does not exist. pub fn get_send_stream(&self, stream_id: StreamId) -> Res<&SendStream> { self.send.get(stream_id) } + /// # Errors + /// When the stream does not exist. pub fn get_recv_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut RecvStream> { self.recv.get_mut(stream_id) } + /// # Errors + /// When the stream does not exist. pub fn keep_alive(&mut self, stream_id: StreamId, keep: bool) -> Res<()> { self.recv.keep_alive(stream_id, keep) } diff --git a/third_party/rust/neqo-transport/src/tparams.rs b/third_party/rust/neqo-transport/src/tparams.rs index 1297829094..eada56cc4c 100644 --- a/third_party/rust/neqo-transport/src/tparams.rs +++ b/third_party/rust/neqo-transport/src/tparams.rs @@ -9,7 +9,6 @@ use std::{ cell::RefCell, collections::HashMap, - convert::TryFrom, net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, rc::Rc, }; @@ -88,6 +87,8 @@ impl PreferredAddress { } /// A generic version of `new()` for testing. + /// # Panics + /// When the addresses are the wrong type. #[must_use] #[cfg(test)] pub fn new_any(v4: Option<std::net::SocketAddr>, v6: Option<std::net::SocketAddr>) -> Self { @@ -231,7 +232,7 @@ impl TransportParameter { if v == 0 { Err(Error::TransportParameterError) } else { - Ok(v as WireVersion) + Ok(WireVersion::try_from(v)?) } } @@ -353,6 +354,9 @@ impl TransportParameters { } // Get an integer type or a default. + /// # Panics + /// When the transport parameter isn't recognized as being an integer. + #[must_use] pub fn get_integer(&self, tp: TransportParameterId) -> u64 { let default = match tp { IDLE_TIMEOUT @@ -378,6 +382,8 @@ impl TransportParameters { } // Set an integer type or a default. + /// # Panics + /// When the transport parameter isn't recognized as being an integer. pub fn set_integer(&mut self, tp: TransportParameterId, value: u64) { match tp { IDLE_TIMEOUT @@ -399,6 +405,9 @@ impl TransportParameters { } } + /// # Panics + /// When the transport parameter isn't recognized as containing bytes. + #[must_use] pub fn get_bytes(&self, tp: TransportParameterId) -> Option<&[u8]> { match tp { ORIGINAL_DESTINATION_CONNECTION_ID @@ -415,6 +424,8 @@ impl TransportParameters { } } + /// # Panics + /// When the transport parameter isn't recognized as containing bytes. pub fn set_bytes(&mut self, tp: TransportParameterId, value: Vec<u8>) { match tp { ORIGINAL_DESTINATION_CONNECTION_ID @@ -427,6 +438,8 @@ impl TransportParameters { } } + /// # Panics + /// When the transport parameter isn't recognized as being empty. pub fn set_empty(&mut self, tp: TransportParameterId) { match tp { DISABLE_MIGRATION | GREASE_QUIC_BIT => { @@ -437,11 +450,14 @@ impl TransportParameters { } /// Set version information. + /// # Panics + /// Never. But rust doesn't know that. pub fn set_versions(&mut self, role: Role, versions: &VersionConfig) { - let rbuf = random(4); + let rbuf = random::<4>(); let mut other = Vec::with_capacity(versions.all().len() + 1); let mut dec = Decoder::new(&rbuf); - let grease = (dec.decode_uint(4).unwrap() as u32) & 0xf0f0_f0f0 | 0x0a0a_0a0a; + let grease = + (u32::try_from(dec.decode_uint(4).unwrap()).unwrap()) & 0xf0f0_f0f0 | 0x0a0a_0a0a; other.push(grease); for &v in versions.all() { if role == Role::Client && !versions.initial().is_compatible(v) { @@ -467,6 +483,10 @@ impl TransportParameters { } } + /// # Panics + /// When the indicated transport parameter is present but NOT empty. + /// This should not happen if the parsing code in `TransportParameter::decode` is correct. + #[must_use] pub fn get_empty(&self, tipe: TransportParameterId) -> bool { match self.params.get(&tipe) { None => false, @@ -568,6 +588,7 @@ pub struct TransportParametersHandler { } impl TransportParametersHandler { + #[must_use] pub fn new(role: Role, versions: VersionConfig) -> Self { let mut local = TransportParameters::default(); local.set_versions(role, &versions); @@ -588,6 +609,10 @@ impl TransportParametersHandler { self.local.set_versions(self.role, &self.versions); } + /// # Panics + /// When this function is called before the peer has provided transport parameters. + /// Do not call this function if you are not also able to send data. + #[must_use] pub fn remote(&self) -> &TransportParameters { match (self.remote.as_ref(), self.remote_0rtt.as_ref()) { (Some(tp), _) | (_, Some(tp)) => tp, @@ -596,6 +621,7 @@ impl TransportParametersHandler { } /// Get the version as set (or as determined by a compatible upgrade). + #[must_use] pub fn version(&self) -> Version { self.versions.initial() } @@ -749,7 +775,24 @@ where #[cfg(test)] #[allow(unused_variables)] mod tests { - use super::*; + use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}; + + use neqo_common::{Decoder, Encoder}; + + use super::PreferredAddress; + use crate::{ + tparams::{ + TransportParameter, TransportParameterId, TransportParameters, + ACTIVE_CONNECTION_ID_LIMIT, IDLE_TIMEOUT, INITIAL_MAX_DATA, INITIAL_MAX_STREAMS_BIDI, + INITIAL_MAX_STREAMS_UNI, INITIAL_MAX_STREAM_DATA_BIDI_LOCAL, + INITIAL_MAX_STREAM_DATA_BIDI_REMOTE, INITIAL_MAX_STREAM_DATA_UNI, + INITIAL_SOURCE_CONNECTION_ID, MAX_ACK_DELAY, MAX_DATAGRAM_FRAME_SIZE, + MAX_UDP_PAYLOAD_SIZE, MIN_ACK_DELAY, ORIGINAL_DESTINATION_CONNECTION_ID, + PREFERRED_ADDRESS, RETRY_SOURCE_CONNECTION_ID, STATELESS_RESET_TOKEN, + VERSION_INFORMATION, + }, + ConnectionId, Error, Version, + }; #[test] fn basic_tps() { @@ -843,7 +886,7 @@ mod tests { /// This takes a `TransportParameter::PreferredAddress` that has been mutilated. /// It then encodes it, working from the knowledge that the `encode` function /// doesn't care about validity, and decodes it. The result should be failure. - fn assert_invalid_spa(spa: TransportParameter) { + fn assert_invalid_spa(spa: &TransportParameter) { let mut enc = Encoder::new(); spa.encode(&mut enc, PREFERRED_ADDRESS); assert_eq!( @@ -853,40 +896,40 @@ mod tests { } /// This is for those rare mutations that are acceptable. - fn assert_valid_spa(spa: TransportParameter) { + fn assert_valid_spa(spa: &TransportParameter) { let mut enc = Encoder::new(); spa.encode(&mut enc, PREFERRED_ADDRESS); let mut dec = enc.as_decoder(); let (id, decoded) = TransportParameter::decode(&mut dec).unwrap().unwrap(); assert_eq!(id, PREFERRED_ADDRESS); - assert_eq!(decoded, spa); + assert_eq!(&decoded, spa); } #[test] fn preferred_address_zero_address() { // Either port being zero is bad. - assert_invalid_spa(mutate_spa(|v4, _, _| { + assert_invalid_spa(&mutate_spa(|v4, _, _| { v4.as_mut().unwrap().set_port(0); })); - assert_invalid_spa(mutate_spa(|_, v6, _| { + assert_invalid_spa(&mutate_spa(|_, v6, _| { v6.as_mut().unwrap().set_port(0); })); // Either IP being zero is bad. - assert_invalid_spa(mutate_spa(|v4, _, _| { + assert_invalid_spa(&mutate_spa(|v4, _, _| { v4.as_mut().unwrap().set_ip(Ipv4Addr::from(0)); })); - assert_invalid_spa(mutate_spa(|_, v6, _| { + assert_invalid_spa(&mutate_spa(|_, v6, _| { v6.as_mut().unwrap().set_ip(Ipv6Addr::from(0)); })); // Either address being absent is OK. - assert_valid_spa(mutate_spa(|v4, _, _| { + assert_valid_spa(&mutate_spa(|v4, _, _| { *v4 = None; })); - assert_valid_spa(mutate_spa(|_, v6, _| { + assert_valid_spa(&mutate_spa(|_, v6, _| { *v6 = None; })); // Both addresses being absent is bad. - assert_invalid_spa(mutate_spa(|v4, v6, _| { + assert_invalid_spa(&mutate_spa(|v4, v6, _| { *v4 = None; *v6 = None; })); @@ -894,10 +937,10 @@ mod tests { #[test] fn preferred_address_bad_cid() { - assert_invalid_spa(mutate_spa(|_, _, cid| { + assert_invalid_spa(&mutate_spa(|_, _, cid| { *cid = ConnectionId::from(&[]); })); - assert_invalid_spa(mutate_spa(|_, _, cid| { + assert_invalid_spa(&mutate_spa(|_, _, cid| { *cid = ConnectionId::from(&[0x0c; 21]); })); } @@ -975,7 +1018,6 @@ mod tests { #[test] fn compatible_0rtt_integers() { - let mut tps_a = TransportParameters::default(); const INTEGER_KEYS: &[TransportParameterId] = &[ INITIAL_MAX_DATA, INITIAL_MAX_STREAM_DATA_BIDI_LOCAL, @@ -987,6 +1029,8 @@ mod tests { MIN_ACK_DELAY, MAX_DATAGRAM_FRAME_SIZE, ]; + + let mut tps_a = TransportParameters::default(); for i in INTEGER_KEYS { tps_a.set(*i, TransportParameter::Integer(12)); } diff --git a/third_party/rust/neqo-transport/src/tracking.rs b/third_party/rust/neqo-transport/src/tracking.rs index 64d00257d3..bdd0f250c7 100644 --- a/third_party/rust/neqo-transport/src/tracking.rs +++ b/third_party/rust/neqo-transport/src/tracking.rs @@ -6,12 +6,9 @@ // Tracking of received packets and generating acks thereof. -#![deny(clippy::pedantic)] - use std::{ cmp::min, collections::VecDeque, - convert::TryFrom, ops::{Index, IndexMut}, time::{Duration, Instant}, }; @@ -746,8 +743,8 @@ impl Default for AckTracker { mod tests { use std::collections::HashSet; - use lazy_static::lazy_static; use neqo_common::Encoder; + use test_fixture::now; use super::{ AckTracker, Duration, Instant, PacketNumberSpace, PacketNumberSpaceSet, RecoveryToken, @@ -760,16 +757,13 @@ mod tests { }; const RTT: Duration = Duration::from_millis(100); - lazy_static! { - static ref NOW: Instant = Instant::now(); - } fn test_ack_range(pns: &[PacketNumber], nranges: usize) { let mut rp = RecvdPackets::new(PacketNumberSpace::Initial); // Any space will do. let mut packets = HashSet::new(); for pn in pns { - rp.set_received(*NOW, *pn, true); + rp.set_received(now(), *pn, true); packets.insert(*pn); } @@ -824,7 +818,7 @@ mod tests { // This will add one too many disjoint ranges. for i in 0..=MAX_TRACKED_RANGES { - rp.set_received(*NOW, (i * 2) as u64, true); + rp.set_received(now(), (i * 2) as u64, true); } assert_eq!(rp.ranges.len(), MAX_TRACKED_RANGES); @@ -843,22 +837,22 @@ mod tests { // Only application data packets are delayed. let mut rp = RecvdPackets::new(PacketNumberSpace::ApplicationData); assert!(rp.ack_time().is_none()); - assert!(!rp.ack_now(*NOW, RTT)); + assert!(!rp.ack_now(now(), RTT)); rp.ack_freq(0, COUNT, DELAY, false); // Some packets won't cause an ACK to be needed. for i in 0..COUNT { - rp.set_received(*NOW, i, true); - assert_eq!(Some(*NOW + DELAY), rp.ack_time()); - assert!(!rp.ack_now(*NOW, RTT)); - assert!(rp.ack_now(*NOW + DELAY, RTT)); + rp.set_received(now(), i, true); + assert_eq!(Some(now() + DELAY), rp.ack_time()); + assert!(!rp.ack_now(now(), RTT)); + assert!(rp.ack_now(now() + DELAY, RTT)); } // Exceeding COUNT will move the ACK time to now. - rp.set_received(*NOW, COUNT, true); - assert_eq!(Some(*NOW), rp.ack_time()); - assert!(rp.ack_now(*NOW, RTT)); + rp.set_received(now(), COUNT, true); + assert_eq!(Some(now()), rp.ack_time()); + assert!(rp.ack_now(now(), RTT)); } #[test] @@ -866,12 +860,12 @@ mod tests { for space in &[PacketNumberSpace::Initial, PacketNumberSpace::Handshake] { let mut rp = RecvdPackets::new(*space); assert!(rp.ack_time().is_none()); - assert!(!rp.ack_now(*NOW, RTT)); + assert!(!rp.ack_now(now(), RTT)); // Any packet in these spaces is acknowledged straight away. - rp.set_received(*NOW, 0, true); - assert_eq!(Some(*NOW), rp.ack_time()); - assert!(rp.ack_now(*NOW, RTT)); + rp.set_received(now(), 0, true); + assert_eq!(Some(now()), rp.ack_time()); + assert!(rp.ack_now(now(), RTT)); } } @@ -879,12 +873,12 @@ mod tests { fn ooo_no_ack_delay_new() { let mut rp = RecvdPackets::new(PacketNumberSpace::ApplicationData); assert!(rp.ack_time().is_none()); - assert!(!rp.ack_now(*NOW, RTT)); + assert!(!rp.ack_now(now(), RTT)); // Anything other than packet 0 is acknowledged immediately. - rp.set_received(*NOW, 1, true); - assert_eq!(Some(*NOW), rp.ack_time()); - assert!(rp.ack_now(*NOW, RTT)); + rp.set_received(now(), 1, true); + assert_eq!(Some(now()), rp.ack_time()); + assert!(rp.ack_now(now(), RTT)); } fn write_frame_at(rp: &mut RecvdPackets, now: Instant) { @@ -897,37 +891,37 @@ mod tests { } fn write_frame(rp: &mut RecvdPackets) { - write_frame_at(rp, *NOW); + write_frame_at(rp, now()); } #[test] fn ooo_no_ack_delay_fill() { let mut rp = RecvdPackets::new(PacketNumberSpace::ApplicationData); - rp.set_received(*NOW, 1, true); + rp.set_received(now(), 1, true); write_frame(&mut rp); // Filling in behind the largest acknowledged causes immediate ACK. - rp.set_received(*NOW, 0, true); + rp.set_received(now(), 0, true); write_frame(&mut rp); // Receiving the next packet won't elicit an ACK. - rp.set_received(*NOW, 2, true); - assert!(!rp.ack_now(*NOW, RTT)); + rp.set_received(now(), 2, true); + assert!(!rp.ack_now(now(), RTT)); } #[test] fn immediate_ack_after_rtt() { let mut rp = RecvdPackets::new(PacketNumberSpace::ApplicationData); - rp.set_received(*NOW, 1, true); + rp.set_received(now(), 1, true); write_frame(&mut rp); // Filling in behind the largest acknowledged causes immediate ACK. - rp.set_received(*NOW, 0, true); + rp.set_received(now(), 0, true); write_frame(&mut rp); // A new packet ordinarily doesn't result in an ACK, but this time it does. - rp.set_received(*NOW + RTT, 2, true); - write_frame_at(&mut rp, *NOW + RTT); + rp.set_received(now() + RTT, 2, true); + write_frame_at(&mut rp, now() + RTT); } #[test] @@ -937,29 +931,29 @@ mod tests { // Set tolerance to 2 and then it takes three packets. rp.ack_freq(0, 2, Duration::from_millis(10), true); - rp.set_received(*NOW, 1, true); - assert_ne!(Some(*NOW), rp.ack_time()); - rp.set_received(*NOW, 2, true); - assert_ne!(Some(*NOW), rp.ack_time()); - rp.set_received(*NOW, 3, true); - assert_eq!(Some(*NOW), rp.ack_time()); + rp.set_received(now(), 1, true); + assert_ne!(Some(now()), rp.ack_time()); + rp.set_received(now(), 2, true); + assert_ne!(Some(now()), rp.ack_time()); + rp.set_received(now(), 3, true); + assert_eq!(Some(now()), rp.ack_time()); } #[test] fn ooo_no_ack_delay_threshold_gap() { let mut rp = RecvdPackets::new(PacketNumberSpace::ApplicationData); - rp.set_received(*NOW, 1, true); + rp.set_received(now(), 1, true); write_frame(&mut rp); // Set tolerance to 2 and then it takes three packets. rp.ack_freq(0, 2, Duration::from_millis(10), true); - rp.set_received(*NOW, 3, true); - assert_ne!(Some(*NOW), rp.ack_time()); - rp.set_received(*NOW, 4, true); - assert_ne!(Some(*NOW), rp.ack_time()); - rp.set_received(*NOW, 5, true); - assert_eq!(Some(*NOW), rp.ack_time()); + rp.set_received(now(), 3, true); + assert_ne!(Some(now()), rp.ack_time()); + rp.set_received(now(), 4, true); + assert_ne!(Some(now()), rp.ack_time()); + rp.set_received(now(), 5, true); + assert_eq!(Some(now()), rp.ack_time()); } /// Test that an in-order packet that is not ack-eliciting doesn't @@ -970,13 +964,13 @@ mod tests { rp.ack_freq(0, 1, Duration::from_millis(10), true); // This should be ignored. - rp.set_received(*NOW, 0, false); - assert_ne!(Some(*NOW), rp.ack_time()); + rp.set_received(now(), 0, false); + assert_ne!(Some(now()), rp.ack_time()); // Skip 1 (it has no effect). - rp.set_received(*NOW, 2, true); - assert_ne!(Some(*NOW), rp.ack_time()); - rp.set_received(*NOW, 3, true); - assert_eq!(Some(*NOW), rp.ack_time()); + rp.set_received(now(), 2, true); + assert_ne!(Some(now()), rp.ack_time()); + rp.set_received(now(), 3, true); + assert_eq!(Some(now()), rp.ack_time()); } /// If a packet that is not ack-eliciting is reordered, that's fine too. @@ -986,16 +980,16 @@ mod tests { rp.ack_freq(0, 1, Duration::from_millis(10), false); // These are out of order, but they are not ack-eliciting. - rp.set_received(*NOW, 1, false); - assert_ne!(Some(*NOW), rp.ack_time()); - rp.set_received(*NOW, 0, false); - assert_ne!(Some(*NOW), rp.ack_time()); + rp.set_received(now(), 1, false); + assert_ne!(Some(now()), rp.ack_time()); + rp.set_received(now(), 0, false); + assert_ne!(Some(now()), rp.ack_time()); // These are in order. - rp.set_received(*NOW, 2, true); - assert_ne!(Some(*NOW), rp.ack_time()); - rp.set_received(*NOW, 3, true); - assert_eq!(Some(*NOW), rp.ack_time()); + rp.set_received(now(), 2, true); + assert_ne!(Some(now()), rp.ack_time()); + rp.set_received(now(), 3, true); + assert_eq!(Some(now()), rp.ack_time()); } #[test] @@ -1007,23 +1001,23 @@ mod tests { tracker .get_mut(PacketNumberSpace::Handshake) .unwrap() - .set_received(*NOW, 0, false); - assert_eq!(None, tracker.ack_time(*NOW)); + .set_received(now(), 0, false); + assert_eq!(None, tracker.ack_time(now())); // This should be delayed. tracker .get_mut(PacketNumberSpace::ApplicationData) .unwrap() - .set_received(*NOW, 0, true); - assert_eq!(Some(*NOW + DELAY), tracker.ack_time(*NOW)); + .set_received(now(), 0, true); + assert_eq!(Some(now() + DELAY), tracker.ack_time(now())); // This should move the time forward. - let later = *NOW + (DELAY / 2); + let later = now() + (DELAY / 2); tracker .get_mut(PacketNumberSpace::Initial) .unwrap() .set_received(later, 0, true); - assert_eq!(Some(later), tracker.ack_time(*NOW)); + assert_eq!(Some(later), tracker.ack_time(now())); } #[test] @@ -1047,17 +1041,17 @@ mod tests { tracker .get_mut(PacketNumberSpace::Initial) .unwrap() - .set_received(*NOW, 0, true); + .set_received(now(), 0, true); // The reference time for `ack_time` has to be in the past or we filter out the timer. assert!(tracker - .ack_time(NOW.checked_sub(Duration::from_millis(1)).unwrap()) + .ack_time(now().checked_sub(Duration::from_millis(1)).unwrap()) .is_some()); let mut tokens = Vec::new(); let mut stats = FrameStats::default(); tracker.write_frame( PacketNumberSpace::Initial, - *NOW, + now(), RTT, &mut builder, &mut tokens, @@ -1069,9 +1063,9 @@ mod tests { tracker .get_mut(PacketNumberSpace::Initial) .unwrap() - .set_received(*NOW, 1, true); + .set_received(now(), 1, true); assert!(tracker - .ack_time(NOW.checked_sub(Duration::from_millis(1)).unwrap()) + .ack_time(now().checked_sub(Duration::from_millis(1)).unwrap()) .is_some()); // Now drop that space. @@ -1079,11 +1073,11 @@ mod tests { assert!(tracker.get_mut(PacketNumberSpace::Initial).is_none()); assert!(tracker - .ack_time(NOW.checked_sub(Duration::from_millis(1)).unwrap()) + .ack_time(now().checked_sub(Duration::from_millis(1)).unwrap()) .is_none()); tracker.write_frame( PacketNumberSpace::Initial, - *NOW, + now(), RTT, &mut builder, &mut tokens, @@ -1103,9 +1097,9 @@ mod tests { tracker .get_mut(PacketNumberSpace::Initial) .unwrap() - .set_received(*NOW, 0, true); + .set_received(now(), 0, true); assert!(tracker - .ack_time(NOW.checked_sub(Duration::from_millis(1)).unwrap()) + .ack_time(now().checked_sub(Duration::from_millis(1)).unwrap()) .is_some()); let mut builder = PacketBuilder::short(Encoder::new(), false, []); @@ -1114,7 +1108,7 @@ mod tests { let mut stats = FrameStats::default(); tracker.write_frame( PacketNumberSpace::Initial, - *NOW, + now(), RTT, &mut builder, &mut Vec::new(), @@ -1130,13 +1124,13 @@ mod tests { tracker .get_mut(PacketNumberSpace::Initial) .unwrap() - .set_received(*NOW, 0, true); + .set_received(now(), 0, true); tracker .get_mut(PacketNumberSpace::Initial) .unwrap() - .set_received(*NOW, 2, true); + .set_received(now(), 2, true); assert!(tracker - .ack_time(NOW.checked_sub(Duration::from_millis(1)).unwrap()) + .ack_time(now().checked_sub(Duration::from_millis(1)).unwrap()) .is_some()); let mut builder = PacketBuilder::short(Encoder::new(), false, []); @@ -1145,7 +1139,7 @@ mod tests { let mut stats = FrameStats::default(); tracker.write_frame( PacketNumberSpace::Initial, - *NOW, + now(), RTT, &mut builder, &mut Vec::new(), @@ -1168,19 +1162,19 @@ mod tests { let mut tracker = AckTracker::default(); // While we have multiple PN spaces, we ignore ACK timers from the past. - // Send out of order to cause the delayed ack timer to be set to `*NOW`. + // Send out of order to cause the delayed ack timer to be set to `now()`. tracker .get_mut(PacketNumberSpace::ApplicationData) .unwrap() - .set_received(*NOW, 3, true); - assert!(tracker.ack_time(*NOW + Duration::from_millis(1)).is_none()); + .set_received(now(), 3, true); + assert!(tracker.ack_time(now() + Duration::from_millis(1)).is_none()); // When we are reduced to one space, that filter is off. tracker.drop_space(PacketNumberSpace::Initial); tracker.drop_space(PacketNumberSpace::Handshake); assert_eq!( - tracker.ack_time(*NOW + Duration::from_millis(1)), - Some(*NOW) + tracker.ack_time(now() + Duration::from_millis(1)), + Some(now()) ); } diff --git a/third_party/rust/neqo-transport/src/version.rs b/third_party/rust/neqo-transport/src/version.rs index 13db0bf024..eee598fdd0 100644 --- a/third_party/rust/neqo-transport/src/version.rs +++ b/third_party/rust/neqo-transport/src/version.rs @@ -4,17 +4,16 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::convert::TryFrom; - use neqo_common::qdebug; use crate::{Error, Res}; pub type WireVersion = u32; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)] pub enum Version { Version2, + #[default] Version1, Draft29, Draft30, @@ -23,6 +22,7 @@ pub enum Version { } impl Version { + #[must_use] pub const fn wire_version(self) -> WireVersion { match self { Self::Version2 => 0x6b33_43cf, @@ -94,6 +94,7 @@ impl Version { } /// Determine if `self` can be upgraded to `other` compatibly. + #[must_use] pub fn is_compatible(self, other: Self) -> bool { self == other || matches!( @@ -102,6 +103,7 @@ impl Version { ) } + #[must_use] pub fn all() -> Vec<Self> { vec![ Self::Version2, @@ -121,12 +123,6 @@ impl Version { } } -impl Default for Version { - fn default() -> Self { - Self::Version1 - } -} - impl TryFrom<WireVersion> for Version { type Error = Error; @@ -176,15 +172,20 @@ pub struct VersionConfig { } impl VersionConfig { + /// # Panics + /// When `all` does not include `initial`. + #[must_use] pub fn new(initial: Version, all: Vec<Version>) -> Self { assert!(all.contains(&initial)); Self { initial, all } } + #[must_use] pub fn initial(&self) -> Version { self.initial } + #[must_use] pub fn all(&self) -> &[Version] { &self.all } |