diff options
Diffstat (limited to '')
-rw-r--r-- | third_party/rust/neqo-transport/src/connection/mod.rs | 285 |
1 files changed, 185 insertions, 100 deletions
diff --git a/third_party/rust/neqo-transport/src/connection/mod.rs b/third_party/rust/neqo-transport/src/connection/mod.rs index 8522507a69..f955381414 100644 --- a/third_party/rust/neqo-transport/src/connection/mod.rs +++ b/third_party/rust/neqo-transport/src/connection/mod.rs @@ -19,7 +19,7 @@ use std::{ use neqo_common::{ event::Provider as EventProvider, hex, hex_snip_middle, hrtime, qdebug, qerror, qinfo, - qlog::NeqoQlog, qtrace, qwarn, Datagram, Decoder, Encoder, IpTos, Role, + qlog::NeqoQlog, qtrace, qwarn, Datagram, Decoder, Encoder, Role, }; use neqo_crypto::{ agent::CertificateInfo, Agent, AntiReplay, AuthenticationStatus, Cipher, Client, Group, @@ -35,6 +35,7 @@ use crate::{ ConnectionIdRef, ConnectionIdStore, LOCAL_ACTIVE_CID_LIMIT, }, crypto::{Crypto, CryptoDxState, CryptoSpace}, + ecn::EcnCount, events::{ConnectionEvent, ConnectionEvents, OutgoingDatagramOutcome}, frame::{ CloseError, Frame, FrameType, FRAME_TYPE_CONNECTION_CLOSE_APPLICATION, @@ -46,7 +47,7 @@ use crate::{ quic_datagrams::{DatagramTracking, QuicDatagrams}, recovery::{LossRecovery, RecoveryToken, SendProfile}, recv_stream::RecvStreamStats, - rtt::GRANULARITY, + rtt::{RttEstimate, GRANULARITY}, send_stream::SendStream, stats::{Stats, StatsCell}, stream_id::StreamType, @@ -55,9 +56,9 @@ use crate::{ self, TransportParameter, TransportParameterId, TransportParameters, TransportParametersHandler, }, - tracking::{AckTracker, PacketNumberSpace, SentPacket}, + tracking::{AckTracker, PacketNumberSpace, RecvdPackets, SentPacket}, version::{Version, WireVersion}, - AppError, ConnectionError, Error, Res, StreamId, + AppError, CloseReason, Error, Res, StreamId, }; mod dump; @@ -291,7 +292,7 @@ impl Debug for Connection { "{:?} Connection: {:?} {:?}", self.role, self.state, - self.paths.primary_fallible() + self.paths.primary() ) } } @@ -591,7 +592,11 @@ impl Connection { fn make_resumption_token(&mut self) -> ResumptionToken { debug_assert_eq!(self.role, Role::Client); debug_assert!(self.crypto.has_resumption_token()); - let rtt = self.paths.primary().borrow().rtt().estimate(); + let rtt = self.paths.primary().map_or_else( + || RttEstimate::default().estimate(), + |p| p.borrow().rtt().estimate(), + ); + self.crypto .create_resumption_token( self.new_token.take_token(), @@ -610,11 +615,10 @@ impl Connection { /// a value of this approximate order. Don't use this for loss recovery, /// only use it where a more precise value is not important. fn pto(&self) -> Duration { - self.paths - .primary() - .borrow() - .rtt() - .pto(PacketNumberSpace::ApplicationData) + self.paths.primary().map_or_else( + || RttEstimate::default().pto(PacketNumberSpace::ApplicationData), + |p| p.borrow().rtt().pto(PacketNumberSpace::ApplicationData), + ) } fn create_resumption_token(&mut self, now: Instant) { @@ -746,7 +750,12 @@ impl Connection { if !init_token.is_empty() { self.address_validation = AddressValidationInfo::NewToken(init_token.to_vec()); } - self.paths.primary().borrow_mut().rtt_mut().set_initial(rtt); + self.paths + .primary() + .ok_or(Error::InternalError)? + .borrow_mut() + .rtt_mut() + .set_initial(rtt); self.set_initial_limits(); // Start up TLS, which has the effect of setting up all the necessary // state for 0-RTT. This only stages the CRYPTO frames. @@ -786,7 +795,7 @@ impl Connection { // If we are able, also send a NEW_TOKEN frame. // This should be recording all remote addresses that are valid, // but there are just 0 or 1 in the current implementation. - if let Some(path) = self.paths.primary_fallible() { + if let Some(path) = self.paths.primary() { if let Some(token) = self .address_validation .generate_new_token(path.borrow().remote_address(), now) @@ -858,7 +867,7 @@ impl Connection { #[must_use] pub fn stats(&self) -> Stats { let mut v = self.stats.borrow().clone(); - if let Some(p) = self.paths.primary_fallible() { + if let Some(p) = self.paths.primary() { let p = p.borrow(); v.rtt = p.rtt().estimate(); v.rttvar = p.rtt().rttvar(); @@ -880,7 +889,7 @@ impl Connection { let msg = format!("{v:?}"); #[cfg(not(debug_assertions))] let msg = ""; - let error = ConnectionError::Transport(v.clone()); + let error = CloseReason::Transport(v.clone()); match &self.state { State::Closing { error: err, .. } | State::Draining { error: err, .. } @@ -895,14 +904,14 @@ impl Connection { State::WaitInitial => { // We don't have any state yet, so don't bother with // the closing state, just send one CONNECTION_CLOSE. - if let Some(path) = path.or_else(|| self.paths.primary_fallible()) { + if let Some(path) = path.or_else(|| self.paths.primary()) { self.state_signaling .close(path, error.clone(), frame_type, msg); } self.set_state(State::Closed(error)); } _ => { - if let Some(path) = path.or_else(|| self.paths.primary_fallible()) { + if let Some(path) = path.or_else(|| self.paths.primary()) { self.state_signaling .close(path, error.clone(), frame_type, msg); if matches!(v, Error::KeysExhausted) { @@ -951,9 +960,7 @@ impl Connection { let pto = self.pto(); if self.idle_timeout.expired(now, pto) { qinfo!([self], "idle timeout expired"); - self.set_state(State::Closed(ConnectionError::Transport( - Error::IdleTimeout, - ))); + self.set_state(State::Closed(CloseReason::Transport(Error::IdleTimeout))); return; } @@ -962,9 +969,11 @@ impl Connection { let res = self.crypto.states.check_key_update(now); self.absorb_error(now, res); - let lost = self.loss_recovery.timeout(&self.paths.primary(), now); - self.handle_lost_packets(&lost); - qlog::packets_lost(&mut self.qlog, &lost); + if let Some(path) = self.paths.primary() { + let lost = self.loss_recovery.timeout(&path, now); + self.handle_lost_packets(&lost); + qlog::packets_lost(&mut self.qlog, &lost); + } if self.release_resumption_token_timer.is_some() { self.create_resumption_token(now); @@ -1014,7 +1023,7 @@ impl Connection { delays.push(ack_time); } - if let Some(p) = self.paths.primary_fallible() { + if let Some(p) = self.paths.primary() { let path = p.borrow(); let rtt = path.rtt(); let pto = rtt.pto(PacketNumberSpace::ApplicationData); @@ -1102,7 +1111,15 @@ impl Connection { self.input(d, now, now); self.process_saved(now); } - self.process_output(now) + #[allow(clippy::let_and_return)] + let output = self.process_output(now); + #[cfg(all(feature = "build-fuzzing-corpus", test))] + if self.test_frame_writer.is_none() { + if let Some(d) = output.clone().dgram() { + neqo_common::write_item_to_fuzzing_corpus("packet", &d); + } + } + output } fn handle_retry(&mut self, packet: &PublicPacket, now: Instant) { @@ -1123,7 +1140,13 @@ impl Connection { } // At this point, we should only have the connection ID that we generated. // Update to the one that the server prefers. - let path = self.paths.primary(); + let Some(path) = self.paths.primary() else { + self.stats + .borrow_mut() + .pkt_dropped("Retry without an existing path"); + return; + }; + path.borrow_mut().set_remote_cid(packet.scid()); let retry_scid = ConnectionId::from(packet.scid()); @@ -1151,8 +1174,9 @@ impl Connection { fn discard_keys(&mut self, space: PacketNumberSpace, now: Instant) { if self.crypto.discard(space) { qdebug!([self], "Drop packet number space {}", space); - let primary = self.paths.primary(); - self.loss_recovery.discard(&primary, space, now); + if let Some(path) = self.paths.primary() { + self.loss_recovery.discard(&path, space, now); + } self.acks.drop_space(space); } } @@ -1180,7 +1204,7 @@ impl Connection { qdebug!([self], "Stateless reset: {}", hex(&d[d.len() - 16..])); self.state_signaling.reset(); self.set_state(State::Draining { - error: ConnectionError::Transport(Error::StatelessReset), + error: CloseReason::Transport(Error::StatelessReset), timeout: self.get_closing_period_time(now), }); Err(Error::StatelessReset) @@ -1227,8 +1251,9 @@ impl Connection { assert_ne!(self.version, version); qinfo!([self], "Version negotiation: trying {:?}", version); - let local_addr = self.paths.primary().borrow().local_address(); - let remote_addr = self.paths.primary().borrow().remote_address(); + let path = self.paths.primary().ok_or(Error::NoAvailablePath)?; + let local_addr = path.borrow().local_address(); + let remote_addr = path.borrow().remote_address(); let conn_params = self .conn_params .clone() @@ -1256,7 +1281,7 @@ impl Connection { } else { qinfo!([self], "Version negotiation: failed with {:?}", supported); // This error goes straight to closed. - self.set_state(State::Closed(ConnectionError::Transport( + self.set_state(State::Closed(CloseReason::Transport( Error::VersionNegotiation, ))); Err(Error::VersionNegotiation) @@ -1417,6 +1442,13 @@ impl Connection { migrate: bool, now: Instant, ) { + let space = PacketNumberSpace::from(packet.packet_type()); + if let Some(space) = self.acks.get_mut(space) { + *space.ecn_marks() += d.tos().into(); + } else { + qtrace!("Not tracking ECN for dropped packet number space"); + } + if self.state == State::WaitInitial { self.start_handshake(path, packet, now); } @@ -1491,6 +1523,16 @@ impl Connection { d.tos(), ); + #[cfg(feature = "build-fuzzing-corpus")] + if packet.packet_type() == PacketType::Initial { + let target = if self.role == Role::Client { + "server_initial" + } else { + "client_initial" + }; + neqo_common::write_item_to_fuzzing_corpus(target, &payload[..]); + } + qlog::packet_received(&mut self.qlog, &packet, &payload); let space = PacketNumberSpace::from(payload.packet_type()); if self.acks.get_mut(space).unwrap().is_duplicate(payload.pn()) { @@ -1562,7 +1604,11 @@ impl Connection { let mut probing = true; let mut d = Decoder::from(&packet[..]); while d.remaining() > 0 { + #[cfg(feature = "build-fuzzing-corpus")] + let pos = d.offset(); let f = Frame::decode(&mut d)?; + #[cfg(feature = "build-fuzzing-corpus")] + neqo_common::write_item_to_fuzzing_corpus("frame", &packet[pos..d.offset()]); ack_eliciting |= f.ack_eliciting(); probing &= f.path_probing(); let t = f.get_type(); @@ -1623,10 +1669,15 @@ impl Connection { if let Some(cid) = self.connection_ids.next() { self.paths.make_permanent(path, None, cid); Ok(()) - } else if self.paths.primary().borrow().remote_cid().is_empty() { - self.paths - .make_permanent(path, None, ConnectionIdEntry::empty_remote()); - Ok(()) + } else if let Some(primary) = self.paths.primary() { + if primary.borrow().remote_cid().is_empty() { + self.paths + .make_permanent(path, None, ConnectionIdEntry::empty_remote()); + Ok(()) + } else { + qtrace!([self], "Unable to make path permanent: {}", path.borrow()); + Err(Error::InvalidMigration) + } } else { qtrace!([self], "Unable to make path permanent: {}", path.borrow()); Err(Error::InvalidMigration) @@ -1719,8 +1770,10 @@ impl Connection { // Pointless migration is pointless. return Err(Error::InvalidMigration); } - let local = local.unwrap_or_else(|| self.paths.primary().borrow().local_address()); - let remote = remote.unwrap_or_else(|| self.paths.primary().borrow().remote_address()); + + let path = self.paths.primary().ok_or(Error::InvalidMigration)?; + let local = local.unwrap_or_else(|| path.borrow().local_address()); + let remote = remote.unwrap_or_else(|| path.borrow().remote_address()); if mem::discriminant(&local.ip()) != mem::discriminant(&remote.ip()) { // Can't mix address families. @@ -1773,7 +1826,12 @@ impl Connection { // has to use the existing address. So only pay attention to a preferred // address from the same family as is currently in use. More thought will // be needed to work out how to get addresses from a different family. - let prev = self.paths.primary().borrow().remote_address(); + let prev = self + .paths + .primary() + .ok_or(Error::NoAvailablePath)? + .borrow() + .remote_address(); let remote = match prev.ip() { IpAddr::V4(_) => addr.ipv4().map(SocketAddr::V4), IpAddr::V6(_) => addr.ipv6().map(SocketAddr::V6), @@ -1937,20 +1995,15 @@ impl Connection { } } - self.streams - .write_frames(TransmissionPriority::Critical, builder, tokens, frame_stats); - if builder.is_full() { - return; - } - - self.streams.write_frames( + for prio in [ + TransmissionPriority::Critical, TransmissionPriority::Important, - builder, - tokens, - frame_stats, - ); - if builder.is_full() { - return; + ] { + self.streams + .write_frames(prio, builder, tokens, frame_stats); + if builder.is_full() { + return; + } } // NEW_CONNECTION_ID, RETIRE_CONNECTION_ID, and ACK_FREQUENCY. @@ -1958,21 +2011,18 @@ impl Connection { if builder.is_full() { return; } - self.paths.write_frames(builder, tokens, frame_stats); - if builder.is_full() { - return; - } - self.streams - .write_frames(TransmissionPriority::High, builder, tokens, frame_stats); + self.paths.write_frames(builder, tokens, frame_stats); if builder.is_full() { return; } - self.streams - .write_frames(TransmissionPriority::Normal, builder, tokens, frame_stats); - if builder.is_full() { - return; + for prio in [TransmissionPriority::High, TransmissionPriority::Normal] { + self.streams + .write_frames(prio, builder, tokens, &mut stats.frame_tx); + if builder.is_full() { + return; + } } // Datagrams are best-effort and unreliable. Let streams starve them for now. @@ -1981,9 +2031,9 @@ impl Connection { return; } - let frame_stats = &mut stats.frame_tx; // CRYPTO here only includes NewSessionTicket, plus NEW_TOKEN. // Both of these are only used for resumption and so can be relatively low priority. + let frame_stats = &mut stats.frame_tx; self.crypto.write_frame( PacketNumberSpace::ApplicationData, builder, @@ -1993,6 +2043,7 @@ impl Connection { if builder.is_full() { return; } + self.new_token.write_frames(builder, tokens, frame_stats); if builder.is_full() { return; @@ -2002,10 +2053,8 @@ impl Connection { .write_frames(TransmissionPriority::Low, builder, tokens, frame_stats); #[cfg(test)] - { - if let Some(w) = &mut self.test_frame_writer { - w.write_frames(builder); - } + if let Some(w) = &mut self.test_frame_writer { + w.write_frames(builder); } } @@ -2138,6 +2187,40 @@ impl Connection { (tokens, ack_eliciting, padded) } + fn write_closing_frames( + &mut self, + close: &ClosingFrame, + builder: &mut PacketBuilder, + space: PacketNumberSpace, + now: Instant, + path: &PathRef, + tokens: &mut Vec<RecoveryToken>, + ) { + if builder.remaining() > ClosingFrame::MIN_LENGTH + RecvdPackets::USEFUL_ACK_LEN { + // Include an ACK frame with the CONNECTION_CLOSE. + let limit = builder.limit(); + builder.set_limit(limit - ClosingFrame::MIN_LENGTH); + self.acks.immediate_ack(now); + self.acks.write_frame( + space, + now, + path.borrow().rtt().estimate(), + builder, + tokens, + &mut self.stats.borrow_mut().frame_tx, + ); + builder.set_limit(limit); + } + // CloseReason::Application is only allowed at 1RTT. + let sanitized = if space == PacketNumberSpace::ApplicationData { + None + } else { + close.sanitize() + }; + sanitized.as_ref().unwrap_or(close).write_frame(builder); + self.stats.borrow_mut().frame_tx.connection_close += 1; + } + /// Build a datagram, possibly from multiple packets (for different PN /// spaces) and each containing 1+ frames. #[allow(clippy::too_many_lines)] // Yeah, that's just the way it is. @@ -2201,17 +2284,7 @@ impl Connection { let payload_start = builder.len(); let (mut tokens, mut ack_eliciting, mut padded) = (Vec::new(), false, false); if let Some(ref close) = closing_frame { - // ConnectionError::Application is only allowed at 1RTT. - let sanitized = if *space == PacketNumberSpace::ApplicationData { - None - } else { - close.sanitize() - }; - sanitized - .as_ref() - .unwrap_or(close) - .write_frame(&mut builder); - self.stats.borrow_mut().frame_tx.connection_close += 1; + self.write_closing_frames(close, &mut builder, *space, now, path, &mut tokens); } else { (tokens, ack_eliciting, padded) = self.write_frames(path, *space, &profile, &mut builder, now); @@ -2229,7 +2302,7 @@ impl Connection { pt, pn, &builder.as_ref()[payload_start..], - IpTos::default(), // TODO: set from path + path.borrow().tos(), ); qlog::packet_sent( &mut self.qlog, @@ -2251,6 +2324,7 @@ impl Connection { let sent = SentPacket::new( pt, pn, + path.borrow().tos().into(), now, ack_eliciting, tokens, @@ -2303,7 +2377,7 @@ impl Connection { self.loss_recovery.on_packet_sent(path, initial); } path.borrow_mut().add_sent(packets.len()); - Ok(SendOption::Yes(path.borrow().datagram(packets))) + Ok(SendOption::Yes(path.borrow_mut().datagram(packets))) } } @@ -2330,7 +2404,9 @@ impl Connection { fn client_start(&mut self, now: Instant) -> Res<()> { qdebug!([self], "client_start"); debug_assert_eq!(self.role, Role::Client); - qlog::client_connection_started(&mut self.qlog, &self.paths.primary()); + if let Some(path) = self.paths.primary() { + qlog::client_connection_started(&mut self.qlog, &path); + } qlog::client_version_information_initiated(&mut self.qlog, self.conn_params.get_versions()); self.handshake(now, self.version, PacketNumberSpace::Initial, None)?; @@ -2351,9 +2427,9 @@ impl Connection { /// Close the connection. pub fn close(&mut self, now: Instant, app_error: AppError, msg: impl AsRef<str>) { - let error = ConnectionError::Application(app_error); + let error = CloseReason::Application(app_error); let timeout = self.get_closing_period_time(now); - if let Some(path) = self.paths.primary_fallible() { + if let Some(path) = self.paths.primary() { self.state_signaling.close(path, error.clone(), 0, msg); self.set_state(State::Closing { error, timeout }); } else { @@ -2411,10 +2487,8 @@ impl Connection { // That's OK, they can try guessing this. ConnectionIdEntry::random_srt() }; - self.paths - .primary() - .borrow_mut() - .set_reset_token(reset_token); + let path = self.paths.primary().ok_or(Error::NoAvailablePath)?; + path.borrow_mut().set_reset_token(reset_token); let max_ad = Duration::from_millis(remote.get_integer(tparams::MAX_ACK_DELAY)); let min_ad = if remote.has_value(tparams::MIN_ACK_DELAY) { @@ -2426,11 +2500,8 @@ impl Connection { } else { None }; - self.paths.primary().borrow_mut().set_ack_delay( - max_ad, - min_ad, - self.conn_params.get_ack_ratio(), - ); + path.borrow_mut() + .set_ack_delay(max_ad, min_ad, self.conn_params.get_ack_ratio()); let max_active_cids = remote.get_integer(tparams::ACTIVE_CONNECTION_ID_LIMIT); self.cid_manager.set_limit(max_active_cids); @@ -2673,10 +2744,18 @@ impl Connection { ack_delay, first_ack_range, ack_ranges, + ecn_count, } => { let ranges = Frame::decode_ack_frame(largest_acknowledged, first_ack_range, &ack_ranges)?; - self.handle_ack(space, largest_acknowledged, ranges, ack_delay, now); + self.handle_ack( + space, + largest_acknowledged, + ranges, + ecn_count, + ack_delay, + now, + ); } Frame::Crypto { offset, data } => { qtrace!( @@ -2747,7 +2826,6 @@ impl Connection { reason_phrase, } => { self.stats.borrow_mut().frame_rx.connection_close += 1; - let reason_phrase = String::from_utf8_lossy(&reason_phrase); qinfo!( [self], "ConnectionClose received. Error code: {:?} frame type {:x} reason {}", @@ -2768,7 +2846,7 @@ impl Connection { FRAME_TYPE_CONNECTION_CLOSE_TRANSPORT, ) }; - let error = ConnectionError::Transport(detail); + let error = CloseReason::Transport(detail); self.state_signaling .drain(Rc::clone(path), error.clone(), frame_type, ""); self.set_state(State::Draining { @@ -2853,6 +2931,7 @@ impl Connection { space: PacketNumberSpace, largest_acknowledged: u64, ack_ranges: R, + ack_ecn: Option<EcnCount>, ack_delay: u64, now: Instant, ) where @@ -2861,11 +2940,15 @@ impl Connection { { qdebug!([self], "Rx ACK space={}, ranges={:?}", space, ack_ranges); + let Some(path) = self.paths.primary() else { + return; + }; let (acked_packets, lost_packets) = self.loss_recovery.on_ack_received( - &self.paths.primary(), + &path, space, largest_acknowledged, ack_ranges, + ack_ecn, self.decode_ack_delay(ack_delay), now, ); @@ -2903,8 +2986,10 @@ impl Connection { qdebug!([self], "0-RTT rejected"); // Tell 0-RTT packets that they were "lost". - let dropped = self.loss_recovery.drop_0rtt(&self.paths.primary(), now); - self.handle_lost_packets(&dropped); + if let Some(path) = self.paths.primary() { + let dropped = self.loss_recovery.drop_0rtt(&path, now); + self.handle_lost_packets(&dropped); + } self.streams.zero_rtt_rejected(); @@ -2923,7 +3008,7 @@ impl Connection { // Remove the randomized client CID from the list of acceptable CIDs. self.cid_manager.remove_odcid(); // Mark the path as validated, if it isn't already. - let path = self.paths.primary(); + let path = self.paths.primary().ok_or(Error::NoAvailablePath)?; path.borrow_mut().set_valid(now); // Generate a qlog event that the server connection started. qlog::server_connection_started(&mut self.qlog, &path); @@ -3191,7 +3276,7 @@ impl Connection { else { return Err(Error::NotAvailable); }; - let path = self.paths.primary_fallible().ok_or(Error::NotAvailable)?; + let path = self.paths.primary().ok_or(Error::NotAvailable)?; let mtu = path.borrow().mtu(); let encoder = Encoder::with_capacity(mtu); |