summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-transport/src/connection/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--third_party/rust/neqo-transport/src/connection/mod.rs160
1 files changed, 60 insertions, 100 deletions
diff --git a/third_party/rust/neqo-transport/src/connection/mod.rs b/third_party/rust/neqo-transport/src/connection/mod.rs
index c81a3727c6..8522507a69 100644
--- a/third_party/rust/neqo-transport/src/connection/mod.rs
+++ b/third_party/rust/neqo-transport/src/connection/mod.rs
@@ -10,7 +10,7 @@ use std::{
cell::RefCell,
cmp::{max, min},
fmt::{self, Debug},
- mem,
+ iter, mem,
net::{IpAddr, SocketAddr},
ops::RangeInclusive,
rc::{Rc, Weak},
@@ -19,7 +19,7 @@ use std::{
use neqo_common::{
event::Provider as EventProvider, hex, hex_snip_middle, hrtime, qdebug, qerror, qinfo,
- qlog::NeqoQlog, qtrace, qwarn, Datagram, Decoder, Encoder, Role,
+ qlog::NeqoQlog, qtrace, qwarn, Datagram, Decoder, Encoder, IpTos, Role,
};
use neqo_crypto::{
agent::CertificateInfo, Agent, AntiReplay, AuthenticationStatus, Cipher, Client, Group,
@@ -383,7 +383,6 @@ impl Connection {
agent,
protocols.iter().map(P::as_ref).map(String::from).collect(),
Rc::clone(&tphandler),
- conn_params.is_fuzzing(),
)?;
let stats = StatsCell::default();
@@ -461,7 +460,7 @@ impl Connection {
}
/// # Errors
- /// When the operation fails.
+ /// When the operation fails.
pub fn client_enable_ech(&mut self, ech_config_list: impl AsRef<[u8]>) -> Res<()> {
self.crypto.client_enable_ech(ech_config_list)
}
@@ -778,7 +777,7 @@ impl Connection {
});
enc.encode(extra);
let records = s.send_ticket(now, enc.as_ref())?;
- qinfo!([self], "send session ticket {}", hex(&enc));
+ qdebug!([self], "send session ticket {}", hex(&enc));
self.crypto.buffer_records(records)?;
} else {
unreachable!();
@@ -824,7 +823,7 @@ impl Connection {
/// the connection to fail. However, if no packets have been
/// exchanged, it's not OK.
pub fn authenticated(&mut self, status: AuthenticationStatus, now: Instant) {
- qinfo!([self], "Authenticated {:?}", status);
+ qdebug!([self], "Authenticated {:?}", status);
self.crypto.tls.authenticated(status);
let res = self.handshake(now, self.version, PacketNumberSpace::Handshake, None);
self.absorb_error(now, res);
@@ -979,19 +978,16 @@ impl Connection {
/// Process new input datagrams on the connection.
pub fn process_input(&mut self, d: &Datagram, now: Instant) {
- self.input(d, now, now);
- self.process_saved(now);
- self.streams.cleanup_closed_streams();
+ self.process_multiple_input(iter::once(d), now);
}
/// Process new input datagrams on the connection.
pub fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
- I::IntoIter: ExactSizeIterator,
{
- let dgrams = dgrams.into_iter();
- if dgrams.len() == 0 {
+ let mut dgrams = dgrams.into_iter().peekable();
+ if dgrams.peek().is_none() {
return;
}
@@ -1154,7 +1150,7 @@ impl Connection {
fn discard_keys(&mut self, space: PacketNumberSpace, now: Instant) {
if self.crypto.discard(space) {
- qinfo!([self], "Drop packet number space {}", space);
+ qdebug!([self], "Drop packet number space {}", space);
let primary = self.paths.primary();
self.loss_recovery.discard(&primary, space, now);
self.acks.drop_space(space);
@@ -1492,6 +1488,7 @@ impl Connection {
payload.packet_type(),
payload.pn(),
&payload[..],
+ d.tos(),
);
qlog::packet_received(&mut self.qlog, &packet, &payload);
@@ -1552,6 +1549,10 @@ impl Connection {
packet: &DecryptedPacket,
now: Instant,
) -> Res<bool> {
+ (!packet.is_empty())
+ .then_some(())
+ .ok_or(Error::ProtocolViolation)?;
+
// TODO(ekr@rtfm.com): Have the server blow away the initial
// crypto state if this fails? Otherwise, we will get a panic
// on the assert for doesn't exist.
@@ -1560,24 +1561,8 @@ impl Connection {
let mut ack_eliciting = false;
let mut probing = true;
let mut d = Decoder::from(&packet[..]);
- let mut consecutive_padding = 0;
while d.remaining() > 0 {
- let mut f = Frame::decode(&mut d)?;
-
- // Skip padding
- while f == Frame::Padding && d.remaining() > 0 {
- consecutive_padding += 1;
- f = Frame::decode(&mut d)?;
- }
- if consecutive_padding > 0 {
- qdebug!(
- [self],
- "PADDING frame repeated {} times",
- consecutive_padding
- );
- consecutive_padding = 0;
- }
-
+ let f = Frame::decode(&mut d)?;
ack_eliciting |= f.ack_eliciting();
probing &= f.path_probing();
let t = f.get_type();
@@ -1841,7 +1826,7 @@ impl Connection {
| State::Connected
| State::Confirmed => {
if let Some(path) = self.paths.select_path() {
- let res = self.output_path(&path, now);
+ let res = self.output_path(&path, now, &None);
self.capture_error(Some(path), now, 0, res)
} else {
Ok(SendOption::default())
@@ -1850,7 +1835,16 @@ impl Connection {
State::Closing { .. } | State::Draining { .. } | State::Closed(_) => {
if let Some(details) = self.state_signaling.close_frame() {
let path = Rc::clone(details.path());
- let res = self.output_close(&details);
+ // In some error cases, we will not be able to make a new, permanent path.
+ // For example, if we run out of connection IDs and the error results from
+ // a packet on a new path, we avoid sending (and the privacy risk) rather
+ // than reuse a connection ID.
+ let res = if path.borrow().is_temporary() {
+ assert!(!cfg!(test), "attempting to close with a temporary path");
+ Err(Error::InternalError)
+ } else {
+ self.output_path(&path, now, &Some(details))
+ };
self.capture_error(Some(path), now, 0, res)
} else {
Ok(SendOption::default())
@@ -1927,62 +1921,6 @@ impl Connection {
}
}
- fn output_close(&mut self, close: &ClosingFrame) -> Res<SendOption> {
- let mut encoder = Encoder::with_capacity(256);
- let grease_quic_bit = self.can_grease_quic_bit();
- let version = self.version();
- for space in PacketNumberSpace::iter() {
- let Some((cspace, tx)) = self.crypto.states.select_tx_mut(self.version, *space) else {
- continue;
- };
-
- let path = close.path().borrow();
- // In some error cases, we will not be able to make a new, permanent path.
- // For example, if we run out of connection IDs and the error results from
- // a packet on a new path, we avoid sending (and the privacy risk) rather
- // than reuse a connection ID.
- if path.is_temporary() {
- assert!(!cfg!(test), "attempting to close with a temporary path");
- return Err(Error::InternalError);
- }
- let (_, mut builder) = Self::build_packet_header(
- &path,
- cspace,
- encoder,
- tx,
- &AddressValidationInfo::None,
- version,
- grease_quic_bit,
- );
- _ = Self::add_packet_number(
- &mut builder,
- tx,
- self.loss_recovery.largest_acknowledged_pn(*space),
- );
- // The builder will set the limit to 0 if there isn't enough space for the header.
- if builder.is_full() {
- encoder = builder.abort();
- break;
- }
- builder.set_limit(min(path.amplification_limit(), path.mtu()) - tx.expansion());
- debug_assert!(builder.limit() <= 2048);
-
- // ConnectionError::Application is only allowed at 1RTT.
- let sanitized = if *space == PacketNumberSpace::ApplicationData {
- None
- } else {
- close.sanitize()
- };
- sanitized
- .as_ref()
- .unwrap_or(close)
- .write_frame(&mut builder);
- encoder = builder.build(tx)?;
- }
-
- Ok(SendOption::Yes(close.path().borrow().datagram(encoder)))
- }
-
/// Write the frames that are exchanged in the application data space.
/// The order of calls here determines the relative priority of frames.
fn write_appdata_frames(
@@ -2203,7 +2141,12 @@ impl Connection {
/// Build a datagram, possibly from multiple packets (for different PN
/// spaces) and each containing 1+ frames.
#[allow(clippy::too_many_lines)] // Yeah, that's just the way it is.
- fn output_path(&mut self, path: &PathRef, now: Instant) -> Res<SendOption> {
+ fn output_path(
+ &mut self,
+ path: &PathRef,
+ now: Instant,
+ closing_frame: &Option<ClosingFrame>,
+ ) -> Res<SendOption> {
let mut initial_sent = None;
let mut needs_padding = false;
let grease_quic_bit = self.can_grease_quic_bit();
@@ -2256,8 +2199,23 @@ impl Connection {
// Add frames to the packet.
let payload_start = builder.len();
- let (tokens, ack_eliciting, padded) =
- self.write_frames(path, *space, &profile, &mut builder, now);
+ let (mut tokens, mut ack_eliciting, mut padded) = (Vec::new(), false, false);
+ if let Some(ref close) = closing_frame {
+ // ConnectionError::Application is only allowed at 1RTT.
+ let sanitized = if *space == PacketNumberSpace::ApplicationData {
+ None
+ } else {
+ close.sanitize()
+ };
+ sanitized
+ .as_ref()
+ .unwrap_or(close)
+ .write_frame(&mut builder);
+ self.stats.borrow_mut().frame_tx.connection_close += 1;
+ } else {
+ (tokens, ack_eliciting, padded) =
+ self.write_frames(path, *space, &profile, &mut builder, now);
+ }
if builder.packet_empty() {
// Nothing to include in this packet.
encoder = builder.abort();
@@ -2271,6 +2229,7 @@ impl Connection {
pt,
pn,
&builder.as_ref()[payload_start..],
+ IpTos::default(), // TODO: set from path
);
qlog::packet_sent(
&mut self.qlog,
@@ -2323,7 +2282,7 @@ impl Connection {
}
if encoder.is_empty() {
- qinfo!("TX blocked, profile={:?} ", profile);
+ qdebug!("TX blocked, profile={:?} ", profile);
Ok(SendOption::No(profile.paced()))
} else {
// Perform additional padding for Initial packets as necessary.
@@ -2337,6 +2296,8 @@ impl Connection {
mtu
);
initial.size += mtu - packets.len();
+ // These zeros aren't padding frames, they are an invalid all-zero coalesced
+ // packet, which is why we don't increase `frame_tx.padding` count here.
packets.resize(mtu, 0);
}
self.loss_recovery.on_packet_sent(path, initial);
@@ -2367,7 +2328,7 @@ impl Connection {
}
fn client_start(&mut self, now: Instant) -> Res<()> {
- qinfo!([self], "client_start");
+ qdebug!([self], "client_start");
debug_assert_eq!(self.role, Role::Client);
qlog::client_connection_started(&mut self.qlog, &self.paths.primary());
qlog::client_version_information_initiated(&mut self.qlog, self.conn_params.get_versions());
@@ -2599,7 +2560,7 @@ impl Connection {
fn confirm_version(&mut self, v: Version) {
if self.version != v {
- qinfo!([self], "Compatible upgrade {:?} ==> {:?}", self.version, v);
+ qdebug!([self], "Compatible upgrade {:?} ==> {:?}", self.version, v);
}
self.crypto.confirm_version(v);
self.version = v;
@@ -2694,9 +2655,8 @@ impl Connection {
.input_frame(&frame, &mut self.stats.borrow_mut().frame_rx);
}
match frame {
- Frame::Padding => {
- // Note: This counts contiguous padding as a single frame.
- self.stats.borrow_mut().frame_rx.padding += 1;
+ Frame::Padding(length) => {
+ self.stats.borrow_mut().frame_rx.padding += usize::from(length);
}
Frame::Ping => {
// If we get a PING and there are outstanding CRYPTO frames,
@@ -2899,7 +2859,7 @@ impl Connection {
R: IntoIterator<Item = RangeInclusive<u64>> + Debug,
R::IntoIter: ExactSizeIterator,
{
- qinfo!([self], "Rx ACK space={}, ranges={:?}", space, ack_ranges);
+ qdebug!([self], "Rx ACK space={}, ranges={:?}", space, ack_ranges);
let (acked_packets, lost_packets) = self.loss_recovery.on_ack_received(
&self.paths.primary(),
@@ -2953,7 +2913,7 @@ impl Connection {
}
fn set_connected(&mut self, now: Instant) -> Res<()> {
- qinfo!([self], "TLS connection complete");
+ qdebug!([self], "TLS connection complete");
if self.crypto.tls.info().map(SecretAgentInfo::alpn).is_none() {
qwarn!([self], "No ALPN. Closing connection.");
// 120 = no_application_protocol
@@ -2996,7 +2956,7 @@ impl Connection {
fn set_state(&mut self, state: State) {
if state > self.state {
- qinfo!([self], "State change from {:?} -> {:?}", self.state, state);
+ qdebug!([self], "State change from {:?} -> {:?}", self.state, state);
self.state = state.clone();
if self.state.closed() {
self.streams.clear_streams();