diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 01:13:27 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 01:13:27 +0000 |
commit | 40a355a42d4a9444dc753c04c6608dade2f06a23 (patch) | |
tree | 871fc667d2de662f171103ce5ec067014ef85e61 /third_party/rust/neqo-transport/src/send_stream.rs | |
parent | Adding upstream version 124.0.1. (diff) | |
download | firefox-40a355a42d4a9444dc753c04c6608dade2f06a23.tar.xz firefox-40a355a42d4a9444dc753c04c6608dade2f06a23.zip |
Adding upstream version 125.0.1.upstream/125.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/neqo-transport/src/send_stream.rs')
-rw-r--r-- | third_party/rust/neqo-transport/src/send_stream.rs | 904 |
1 files changed, 680 insertions, 224 deletions
diff --git a/third_party/rust/neqo-transport/src/send_stream.rs b/third_party/rust/neqo-transport/src/send_stream.rs index 5feb785ac6..8771ec7765 100644 --- a/third_party/rust/neqo-transport/src/send_stream.rs +++ b/third_party/rust/neqo-transport/src/send_stream.rs @@ -9,8 +9,7 @@ use std::{ cell::RefCell, cmp::{max, min, Ordering}, - collections::{BTreeMap, VecDeque}, - convert::TryFrom, + collections::{btree_map::Entry, BTreeMap, VecDeque}, hash::{Hash, Hasher}, mem, ops::Add, @@ -18,7 +17,7 @@ use std::{ }; use indexmap::IndexMap; -use neqo_common::{qdebug, qerror, qinfo, qtrace, Encoder, Role}; +use neqo_common::{qdebug, qerror, qtrace, Encoder, Role}; use smallvec::SmallVec; use crate::{ @@ -111,7 +110,7 @@ impl Add<RetransmissionPriority> for TransmissionPriority { /// If data is lost, this determines the priority that applies to retransmissions /// of that data. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] pub enum RetransmissionPriority { /// Prioritize retransmission at a fixed priority. /// With this, it is possible to prioritize retransmissions lower than transmissions. @@ -123,19 +122,14 @@ pub enum RetransmissionPriority { Same, /// Increase the priority of retransmissions (the default). /// Retransmissions of `Critical` or `Important` aren't elevated at all. + #[default] Higher, /// Increase the priority of retransmissions a lot. /// This is useful for streams that are particularly exposed to head-of-line blocking. MuchHigher, } -impl Default for RetransmissionPriority { - fn default() -> Self { - Self::Higher - } -} - -#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Clone, Copy)] enum RangeState { Sent, Acked, @@ -144,169 +138,268 @@ enum RangeState { /// Track ranges in the stream as sent or acked. Acked implies sent. Not in a /// range implies needing-to-be-sent, either initially or as a retransmission. #[derive(Debug, Default, PartialEq)] -struct RangeTracker { - // offset, (len, RangeState). Use u64 for len because ranges can exceed 32bits. +pub struct RangeTracker { + /// The number of bytes that have been acknowledged starting from offset 0. + acked: u64, + /// A map that tracks the state of ranges. + /// Keys are the offset of the start of the range. + /// Values is a tuple of the range length and its state. used: BTreeMap<u64, (u64, RangeState)>, + /// This is a cache for the output of `first_unmarked_range`, which we check a lot. + first_unmarked: Option<(u64, Option<u64>)>, } impl RangeTracker { fn highest_offset(&self) -> u64 { self.used - .range(..) - .next_back() - .map_or(0, |(k, (v, _))| *k + *v) + .last_key_value() + .map_or(self.acked, |(&k, &(v, _))| k + v) } fn acked_from_zero(&self) -> u64 { - self.used - .get(&0) - .filter(|(_, state)| *state == RangeState::Acked) - .map_or(0, |(v, _)| *v) + self.acked } /// Find the first unmarked range. If all are contiguous, this will return - /// (highest_offset(), None). - fn first_unmarked_range(&self) -> (u64, Option<u64>) { - let mut prev_end = 0; + /// (`highest_offset()`, None). + fn first_unmarked_range(&mut self) -> (u64, Option<u64>) { + if let Some(first_unmarked) = self.first_unmarked { + return first_unmarked; + } + + let mut prev_end = self.acked; - for (cur_off, (cur_len, _)) in &self.used { - if prev_end == *cur_off { + for (&cur_off, &(cur_len, _)) in &self.used { + if prev_end == cur_off { prev_end = cur_off + cur_len; } else { - return (prev_end, Some(cur_off - prev_end)); + let res = (prev_end, Some(cur_off - prev_end)); + self.first_unmarked = Some(res); + return res; } } + self.first_unmarked = Some((prev_end, None)); (prev_end, None) } - /// Turn one range into a list of subranges that align with existing - /// ranges. - /// Check impermissible overlaps in subregions: Sent cannot overwrite Acked. - // - // e.g. given N is new and ABC are existing: - // NNNNNNNNNNNNNNNN - // AAAAA BBBCCCCC ...then we want 5 chunks: - // 1122222333444555 - // - // but also if we have this: - // NNNNNNNNNNNNNNNN - // AAAAAAAAAA BBBB ...then break existing A and B ranges up: - // - // 1111111122222233 - // aaAAAAAAAA BBbb - // - // Doing all this work up front should make handling each chunk much - // easier. - fn chunk_range_on_edges( - &mut self, - new_off: u64, - new_len: u64, - new_state: RangeState, - ) -> Vec<(u64, u64, RangeState)> { - let mut tmp_off = new_off; - let mut tmp_len = new_len; - let mut v = Vec::new(); - - // cut previous overlapping range if needed - let prev = self.used.range_mut(..tmp_off).next_back(); - if let Some((prev_off, (prev_len, prev_state))) = prev { - let prev_state = *prev_state; - let overlap = (*prev_off + *prev_len).saturating_sub(new_off); - *prev_len -= overlap; - if overlap > 0 { - self.used.insert(new_off, (overlap, prev_state)); + /// When the range of acknowledged bytes from zero increases, we need to drop any + /// ranges within that span AND maybe extend it to include any adjacent acknowledged ranges. + fn coalesce_acked(&mut self) { + while let Some(e) = self.used.first_entry() { + match self.acked.cmp(e.key()) { + Ordering::Greater => { + let (off, (len, state)) = e.remove_entry(); + let overflow = (off + len).saturating_sub(self.acked); + if overflow > 0 { + if state == RangeState::Acked { + self.acked += overflow; + } else { + self.used.insert(self.acked, (overflow, state)); + } + break; + } + } + Ordering::Equal => { + if e.get().1 == RangeState::Acked { + let (len, _) = e.remove(); + self.acked += len; + } + break; + } + Ordering::Less => break, } } + } - let mut last_existing_remaining = None; - for (off, (len, state)) in self.used.range(tmp_off..tmp_off + tmp_len) { - // Create chunk for "overhang" before an existing range - if tmp_off < *off { - let sub_len = off - tmp_off; - v.push((tmp_off, sub_len, new_state)); - tmp_off += sub_len; - tmp_len -= sub_len; - } + /// Mark a range as acknowledged. This is simpler than marking a range as sent + /// because an acknowledged range can never turn back into a sent range, so + /// this function can just override the entire range. + /// + /// The only tricky parts are making sure that we maintain `self.acked`, + /// which is the first acknowledged range. And making sure that we don't create + /// ranges of the same type that are adjacent; these need to be merged. + #[allow(clippy::missing_panics_doc)] // with a >16 exabyte packet on a 128-bit machine, maybe + pub fn mark_acked(&mut self, new_off: u64, new_len: usize) { + let end = new_off + u64::try_from(new_len).unwrap(); + let new_off = max(self.acked, new_off); + let mut new_len = end.saturating_sub(new_off); + if new_len == 0 { + return; + } - // Create chunk to match existing range - let sub_len = min(*len, tmp_len); - let remaining_len = len - sub_len; - if new_state == RangeState::Sent && *state == RangeState::Acked { - qinfo!( - "Attempted to downgrade overlapping range Acked range {}-{} with Sent {}-{}", - off, - len, - new_off, - new_len - ); - } else { - v.push((tmp_off, sub_len, new_state)); - } - tmp_off += sub_len; - tmp_len -= sub_len; + self.first_unmarked = None; + if new_off == self.acked { + self.acked += new_len; + self.coalesce_acked(); + return; + } + let mut new_end = new_off + new_len; - if remaining_len > 0 { - last_existing_remaining = Some((*off, sub_len, remaining_len, *state)); + // Get all existing ranges that start within this new range. + let mut covered = self + .used + .range(new_off..new_end) + .map(|(&k, _)| k) + .collect::<SmallVec<[_; 8]>>(); + + if let Entry::Occupied(next_entry) = self.used.entry(new_end) { + // Check if the very next entry is the same type as this. + if next_entry.get().1 == RangeState::Acked { + // If is is acked, drop it and extend this new range. + let (extra_len, _) = next_entry.remove(); + new_len += extra_len; + new_end += extra_len; + } + } else if let Some(last) = covered.pop() { + // Otherwise, the last of the existing ranges might overhang this one by some. + let (old_off, (old_len, old_state)) = self.used.remove_entry(&last).unwrap(); // can't fail + let remainder = (old_off + old_len).saturating_sub(new_end); + if remainder > 0 { + if old_state == RangeState::Acked { + // Just extend the current range. + new_len += remainder; + new_end += remainder; + } else { + self.used.insert(new_end, (remainder, RangeState::Sent)); + } } } - - // Maybe break last existing range in two so that a final chunk will - // have the same length as an existing range entry - if let Some((off, sub_len, remaining_len, state)) = last_existing_remaining { - *self.used.get_mut(&off).expect("must be there") = (sub_len, state); - self.used.insert(off + sub_len, (remaining_len, state)); + // All covered ranges can just be trashed. + for k in covered { + self.used.remove(&k); } - // Create final chunk if anything remains of the new range - if tmp_len > 0 { - v.push((tmp_off, tmp_len, new_state)); + // Now either merge with a preceding acked range + // or cut a preceding sent range as needed. + let prev = self.used.range_mut(..new_off).next_back(); + if let Some((prev_off, (prev_len, prev_state))) = prev { + let prev_end = *prev_off + *prev_len; + if prev_end >= new_off { + if *prev_state == RangeState::Sent { + *prev_len = new_off - *prev_off; + if prev_end > new_end { + // There is some extra sent range after the new acked range. + self.used + .insert(new_end, (prev_end - new_end, RangeState::Sent)); + } + } else { + *prev_len = max(prev_end, new_end) - *prev_off; + return; + } + } + } + self.used.insert(new_off, (new_len, RangeState::Acked)); + } + + /// Turn a single sent range into a list of subranges that align with existing + /// acknowledged ranges. + /// + /// This is more complicated than adding acked ranges because any acked ranges + /// need to be kept in place, with sent ranges filling the gaps. + /// + /// This means: + /// ```ignore + /// AAA S AAAS AAAAA + /// + SSSSSSSSSSSSS + /// = AAASSSAAASSAAAAA + /// ``` + /// + /// But we also have to ensure that: + /// ```ignore + /// SSSS + /// + SS + /// = SSSSSS + /// ``` + /// and + /// ```ignore + /// SSSSS + /// + SS + /// = SSSSSS + /// ``` + #[allow(clippy::missing_panics_doc)] // not possible + pub fn mark_sent(&mut self, mut new_off: u64, new_len: usize) { + let new_end = new_off + u64::try_from(new_len).unwrap(); + new_off = max(self.acked, new_off); + let mut new_len = new_end.saturating_sub(new_off); + if new_len == 0 { + return; } - v - } + self.first_unmarked = None; - /// Merge contiguous Acked ranges into the first entry (0). This range may - /// be dropped from the send buffer. - fn coalesce_acked_from_zero(&mut self) { - let acked_range_from_zero = self + // Get all existing ranges that start within this new range. + let covered = self .used - .get_mut(&0) - .filter(|(_, state)| *state == RangeState::Acked) - .map(|(len, _)| *len); - - if let Some(len_from_zero) = acked_range_from_zero { - let mut new_len_from_zero = len_from_zero; - - // See if there's another Acked range entry contiguous to this one - while let Some((next_len, _)) = self - .used - .get(&new_len_from_zero) - .filter(|(_, state)| *state == RangeState::Acked) - { - let to_remove = new_len_from_zero; - new_len_from_zero += *next_len; - self.used.remove(&to_remove); - } - - if len_from_zero != new_len_from_zero { - self.used.get_mut(&0).expect("must be there").0 = new_len_from_zero; + .range(new_off..(new_off + new_len)) + .map(|(&k, _)| k) + .collect::<SmallVec<[u64; 8]>>(); + + if let Entry::Occupied(next_entry) = self.used.entry(new_end) { + if next_entry.get().1 == RangeState::Sent { + // Check if the very next entry is the same type as this, so it can be merged. + let (extra_len, _) = next_entry.remove(); + new_len += extra_len; } } - } - fn mark_range(&mut self, off: u64, len: usize, state: RangeState) { - if len == 0 { - qinfo!("mark 0-length range at {}", off); - return; - } + // Merge with any preceding sent range that might overlap, + // or cut the head of this if the preceding range is acked. + let prev = self.used.range(..new_off).next_back(); + if let Some((&prev_off, &(prev_len, prev_state))) = prev { + if prev_off + prev_len >= new_off { + let overlap = prev_off + prev_len - new_off; + new_len = new_len.saturating_sub(overlap); + if new_len == 0 { + // The previous range completely covers this one (no more to do). + return; + } - let subranges = self.chunk_range_on_edges(off, len as u64, state); + if prev_state == RangeState::Acked { + // The previous range is acked, so it cuts this one. + new_off += overlap; + } else { + // Extend the current range backwards. + new_off = prev_off; + new_len += prev_len; + // The previous range will be updated below. + // It might need to be cut because of a covered acked range. + } + } + } - for (sub_off, sub_len, sub_state) in subranges { - self.used.insert(sub_off, (sub_len, sub_state)); + // Now interleave new sent chunks with any existing acked chunks. + for old_off in covered { + let Entry::Occupied(e) = self.used.entry(old_off) else { + unreachable!(); + }; + let &(old_len, old_state) = e.get(); + if old_state == RangeState::Acked { + // Now we have to insert a chunk ahead of this acked chunk. + let chunk_len = old_off - new_off; + if chunk_len > 0 { + self.used.insert(new_off, (chunk_len, RangeState::Sent)); + } + let included = chunk_len + old_len; + new_len = new_len.saturating_sub(included); + if new_len == 0 { + return; + } + new_off += included; + } else { + let overhang = (old_off + old_len).saturating_sub(new_off + new_len); + new_len += overhang; + if *e.key() != new_off { + // Retain a sent entry at `new_off`. + // This avoids the work of removing and re-creating an entry. + // The value will be overwritten when the next insert occurs, + // either when this loop hits an acked range (above) + // or for any remainder (below). + e.remove(); + } + } } - self.coalesce_acked_from_zero(); + self.used.insert(new_off, (new_len, RangeState::Sent)); } fn unmark_range(&mut self, off: u64, len: usize) { @@ -315,6 +408,7 @@ impl RangeTracker { return; } + self.first_unmarked = None; let len = u64::try_from(len).unwrap(); let end_off = off + len; @@ -376,6 +470,9 @@ impl RangeTracker { } /// Unmark all sent ranges. + /// # Panics + /// On 32-bit machines where far too much is sent before calling this. + /// Note that this should not be called for handshakes, which should never exceed that limit. pub fn unmark_sent(&mut self) { self.unmark_range(0, usize::try_from(self.highest_offset()).unwrap()); } @@ -384,36 +481,37 @@ impl RangeTracker { /// Buffer to contain queued bytes and track their state. #[derive(Debug, Default, PartialEq)] pub struct TxBuffer { - retired: u64, // contig acked bytes, no longer in buffer send_buf: VecDeque<u8>, // buffer of not-acked bytes ranges: RangeTracker, // ranges in buffer that have been sent or acked } impl TxBuffer { + #[must_use] pub fn new() -> Self { Self::default() } - /// Attempt to add some or all of the passed-in buffer to the TxBuffer. + /// Attempt to add some or all of the passed-in buffer to the `TxBuffer`. pub fn send(&mut self, buf: &[u8]) -> usize { let can_buffer = min(SEND_BUFFER_SIZE - self.buffered(), buf.len()); if can_buffer > 0 { self.send_buf.extend(&buf[..can_buffer]); - assert!(self.send_buf.len() <= SEND_BUFFER_SIZE); + debug_assert!(self.send_buf.len() <= SEND_BUFFER_SIZE); } can_buffer } - pub fn next_bytes(&self) -> Option<(u64, &[u8])> { + #[allow(clippy::missing_panics_doc)] // These are not possible. + pub fn next_bytes(&mut self) -> Option<(u64, &[u8])> { let (start, maybe_len) = self.ranges.first_unmarked_range(); - if start == self.retired + u64::try_from(self.buffered()).unwrap() { + if start == self.retired() + u64::try_from(self.buffered()).unwrap() { return None; } // Convert from ranges-relative-to-zero to // ranges-relative-to-buffer-start - let buff_off = usize::try_from(start - self.retired).unwrap(); + let buff_off = usize::try_from(start - self.retired()).unwrap(); // Deque returns two slices. Create a subslice from whichever // one contains the first unmarked data. @@ -437,23 +535,22 @@ impl TxBuffer { } pub fn mark_as_sent(&mut self, offset: u64, len: usize) { - self.ranges.mark_range(offset, len, RangeState::Sent); + self.ranges.mark_sent(offset, len); } + #[allow(clippy::missing_panics_doc)] // Not possible here. pub fn mark_as_acked(&mut self, offset: u64, len: usize) { - self.ranges.mark_range(offset, len, RangeState::Acked); + let prev_retired = self.retired(); + self.ranges.mark_acked(offset, len); - // We can drop contig acked range from the buffer - let new_retirable = self.ranges.acked_from_zero() - self.retired; + // Any newly-retired bytes can be dropped from the buffer. + let new_retirable = self.retired() - prev_retired; debug_assert!(new_retirable <= self.buffered() as u64); - let keep_len = - self.buffered() - usize::try_from(new_retirable).expect("should fit in usize"); + let keep = self.buffered() - usize::try_from(new_retirable).unwrap(); // Truncate front - self.send_buf.rotate_left(self.buffered() - keep_len); - self.send_buf.truncate(keep_len); - - self.retired += new_retirable; + self.send_buf.rotate_left(self.buffered() - keep); + self.send_buf.truncate(keep); } pub fn mark_as_lost(&mut self, offset: u64, len: usize) { @@ -465,8 +562,9 @@ impl TxBuffer { self.ranges.unmark_sent(); } + #[must_use] pub fn retired(&self) -> u64 { - self.retired + self.ranges.acked_from_zero() } fn buffered(&self) -> usize { @@ -478,7 +576,7 @@ impl TxBuffer { } fn used(&self) -> u64 { - self.retired + u64::try_from(self.buffered()).unwrap() + self.retired() + u64::try_from(self.buffered()).unwrap() } } @@ -693,6 +791,7 @@ impl SendStream { self.fair = make_fair; } + #[must_use] pub fn is_fair(&self) -> bool { self.fair } @@ -706,6 +805,7 @@ impl SendStream { self.retransmission_priority = retransmission; } + #[must_use] pub fn sendorder(&self) -> Option<SendOrder> { self.sendorder } @@ -715,6 +815,7 @@ impl SendStream { } /// If all data has been buffered or written, how much was sent. + #[must_use] pub fn final_size(&self) -> Option<u64> { match &self.state { SendStreamState::DataSent { send_buf, .. } => Some(send_buf.used()), @@ -723,10 +824,13 @@ impl SendStream { } } + #[must_use] pub fn stats(&self) -> SendStreamStats { SendStreamStats::new(self.bytes_written(), self.bytes_sent, self.bytes_acked()) } + #[must_use] + #[allow(clippy::missing_panics_doc)] // not possible pub fn bytes_written(&self) -> u64 { match &self.state { SendStreamState::Send { send_buf, .. } | SendStreamState::DataSent { send_buf, .. } => { @@ -749,6 +853,7 @@ impl SendStream { } } + #[must_use] pub fn bytes_acked(&self) -> u64 { match &self.state { SendStreamState::Send { send_buf, .. } | SendStreamState::DataSent { send_buf, .. } => { @@ -766,11 +871,13 @@ impl SendStream { /// offset. fn next_bytes(&mut self, retransmission_only: bool) -> Option<(u64, &[u8])> { match self.state { - SendStreamState::Send { ref send_buf, .. } => { - send_buf.next_bytes().and_then(|(offset, slice)| { + SendStreamState::Send { + ref mut send_buf, .. + } => { + let result = send_buf.next_bytes(); + if let Some((offset, slice)) = result { if retransmission_only { qtrace!( - [self], "next_bytes apply retransmission limit at {}", self.retransmission_offset ); @@ -786,13 +893,16 @@ impl SendStream { } else { Some((offset, slice)) } - }) + } else { + None + } } SendStreamState::DataSent { - ref send_buf, + ref mut send_buf, fin_sent, .. } => { + let used = send_buf.used(); // immutable first let bytes = send_buf.next_bytes(); if bytes.is_some() { bytes @@ -800,7 +910,7 @@ impl SendStream { None } else { // Send empty stream frame with fin set - Some((send_buf.used(), &[])) + Some((used, &[])) } } SendStreamState::Ready { .. } @@ -833,6 +943,7 @@ impl SendStream { } /// Maybe write a `STREAM` frame. + #[allow(clippy::missing_panics_doc)] // not possible pub fn write_stream_frame( &mut self, priority: TransmissionPriority, @@ -995,6 +1106,7 @@ impl SendStream { } } + #[allow(clippy::missing_panics_doc)] // not possible pub fn mark_as_sent(&mut self, offset: u64, len: usize, fin: bool) { self.bytes_sent = max(self.bytes_sent, offset + u64::try_from(len).unwrap()); @@ -1010,6 +1122,7 @@ impl SendStream { } } + #[allow(clippy::missing_panics_doc)] // not possible pub fn mark_as_acked(&mut self, offset: u64, len: usize, fin: bool) { match self.state { SendStreamState::Send { @@ -1047,6 +1160,7 @@ impl SendStream { } } + #[allow(clippy::missing_panics_doc)] // not possible pub fn mark_as_lost(&mut self, offset: u64, len: usize, fin: bool) { self.retransmission_offset = max( self.retransmission_offset, @@ -1075,6 +1189,7 @@ impl SendStream { /// Bytes sendable on stream. Constrained by stream credit available, /// connection credit available, and space in the tx buffer. + #[must_use] pub fn avail(&self) -> usize { if let SendStreamState::Ready { fc, conn_fc } | SendStreamState::Send { fc, conn_fc, .. } = &self.state @@ -1100,6 +1215,7 @@ impl SendStream { } } + #[must_use] pub fn is_terminal(&self) -> bool { matches!( self.state, @@ -1107,10 +1223,14 @@ impl SendStream { ) } + /// # Errors + /// When `buf` is empty or when the stream is already closed. pub fn send(&mut self, buf: &[u8]) -> Res<usize> { self.send_internal(buf, false) } + /// # Errors + /// When `buf` is empty or when the stream is already closed. pub fn send_atomic(&mut self, buf: &[u8]) -> Res<usize> { self.send_internal(buf, true) } @@ -1155,9 +1275,9 @@ impl SendStream { if atomic { self.send_blocked_if_space_needed(buf.len()); return Ok(0); - } else { - &buf[..self.avail()] } + + &buf[..self.avail()] } else { buf }; @@ -1202,6 +1322,7 @@ impl SendStream { } } + #[allow(clippy::missing_panics_doc)] // not possible pub fn reset(&mut self, err: AppError) { match &self.state { SendStreamState::Ready { fc, .. } => { @@ -1296,6 +1417,7 @@ impl OrderGroup { } } + #[must_use] pub fn stream_ids(&self) -> &Vec<StreamId> { &self.vec } @@ -1319,26 +1441,24 @@ impl OrderGroup { next } + /// # Panics + /// If the stream ID is already present. pub fn insert(&mut self, stream_id: StreamId) { - match self.vec.binary_search(&stream_id) { - Ok(_) => { - // element already in vector @ `pos` - panic!("Duplicate stream_id {}", stream_id) - } - Err(pos) => self.vec.insert(pos, stream_id), - } + let Err(pos) = self.vec.binary_search(&stream_id) else { + // element already in vector @ `pos` + panic!("Duplicate stream_id {stream_id}"); + }; + self.vec.insert(pos, stream_id); } + /// # Panics + /// If the stream ID is not present. pub fn remove(&mut self, stream_id: StreamId) { - match self.vec.binary_search(&stream_id) { - Ok(pos) => { - self.vec.remove(pos); - } - Err(_) => { - // element already in vector @ `pos` - panic!("Missing stream_id {}", stream_id) - } - } + let Ok(pos) = self.vec.binary_search(&stream_id) else { + // element already in vector @ `pos` + panic!("Missing stream_id {stream_id}"); + }; + self.vec.remove(pos); } } @@ -1579,16 +1699,16 @@ impl SendStreams { // Iterate the map, but only those without fairness, then iterate // OrderGroups, then iterate each group - qdebug!("processing streams... unfair:"); + qtrace!("processing streams... unfair:"); for stream in self.map.values_mut() { if !stream.is_fair() { - qdebug!(" {}", stream); + qtrace!(" {}", stream); if !stream.write_frames_with_early_return(priority, builder, tokens, stats) { break; } } } - qdebug!("fair streams:"); + qtrace!("fair streams:"); let stream_ids = self.regular.iter().chain( self.sendordered .values_mut() @@ -1598,9 +1718,9 @@ impl SendStreams { for stream_id in stream_ids { let stream = self.map.get_mut(&stream_id).unwrap(); if let Some(order) = stream.sendorder() { - qdebug!(" {} ({})", stream_id, order) + qtrace!(" {} ({})", stream_id, order); } else { - qdebug!(" None") + qtrace!(" None"); } if !stream.write_frames_with_early_return(priority, builder, tokens, stats) { break; @@ -1609,7 +1729,7 @@ impl SendStreams { } pub fn update_initial_limit(&mut self, remote: &TransportParameters) { - for (id, ss) in self.map.iter_mut() { + for (id, ss) in &mut self.map { let limit = if id.is_bidi() { assert!(!id.is_remote_initiated(Role::Client)); remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE) @@ -1640,55 +1760,391 @@ pub struct SendStreamRecoveryToken { #[cfg(test)] mod tests { - use neqo_common::{event::Provider, hex_with_len, qtrace}; - - use super::*; - use crate::events::ConnectionEvent; + use std::{cell::RefCell, collections::VecDeque, rc::Rc}; + + use neqo_common::{event::Provider, hex_with_len, qtrace, Encoder}; + + use super::SendStreamRecoveryToken; + use crate::{ + connection::{RetransmissionPriority, TransmissionPriority}, + events::ConnectionEvent, + fc::SenderFlowControl, + packet::PacketBuilder, + recovery::{RecoveryToken, StreamRecoveryToken}, + send_stream::{ + RangeState, RangeTracker, SendStream, SendStreamState, SendStreams, TxBuffer, + }, + stats::FrameStats, + ConnectionEvents, StreamId, SEND_BUFFER_SIZE, + }; fn connection_fc(limit: u64) -> Rc<RefCell<SenderFlowControl<()>>> { Rc::new(RefCell::new(SenderFlowControl::new((), limit))) } #[test] - fn test_mark_range() { + fn mark_acked_from_zero() { let mut rt = RangeTracker::default(); // ranges can go from nothing->Sent if queued for retrans and then // acks arrive - rt.mark_range(5, 5, RangeState::Acked); + rt.mark_acked(5, 5); assert_eq!(rt.highest_offset(), 10); assert_eq!(rt.acked_from_zero(), 0); - rt.mark_range(10, 4, RangeState::Acked); + rt.mark_acked(10, 4); assert_eq!(rt.highest_offset(), 14); assert_eq!(rt.acked_from_zero(), 0); - rt.mark_range(0, 5, RangeState::Sent); + rt.mark_sent(0, 5); assert_eq!(rt.highest_offset(), 14); assert_eq!(rt.acked_from_zero(), 0); - rt.mark_range(0, 5, RangeState::Acked); + rt.mark_acked(0, 5); assert_eq!(rt.highest_offset(), 14); assert_eq!(rt.acked_from_zero(), 14); - rt.mark_range(12, 20, RangeState::Acked); + rt.mark_acked(12, 20); assert_eq!(rt.highest_offset(), 32); assert_eq!(rt.acked_from_zero(), 32); // ack the lot - rt.mark_range(0, 400, RangeState::Acked); + rt.mark_acked(0, 400); assert_eq!(rt.highest_offset(), 400); assert_eq!(rt.acked_from_zero(), 400); // acked trumps sent - rt.mark_range(0, 200, RangeState::Sent); + rt.mark_sent(0, 200); assert_eq!(rt.highest_offset(), 400); assert_eq!(rt.acked_from_zero(), 400); } + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// SSS SSSAAASSS + /// + AAAAAAAAA + /// = SSSAAAAAAAAASS + /// ``` + #[test] + fn mark_acked_1() { + let mut rt = RangeTracker::default(); + rt.mark_sent(0, 3); + rt.mark_sent(6, 3); + rt.mark_acked(9, 3); + rt.mark_sent(12, 3); + + rt.mark_acked(3, 10); + + let mut canon = RangeTracker::default(); + canon.used.insert(0, (3, RangeState::Sent)); + canon.used.insert(3, (10, RangeState::Acked)); + canon.used.insert(13, (2, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// SSS SSS AAA + /// + AAAAAAAAA + /// = SSAAAAAAAAAAAA + /// ``` + #[test] + fn mark_acked_2() { + let mut rt = RangeTracker::default(); + rt.mark_sent(0, 3); + rt.mark_sent(6, 3); + rt.mark_acked(12, 3); + + rt.mark_acked(2, 10); + + let mut canon = RangeTracker::default(); + canon.used.insert(0, (2, RangeState::Sent)); + canon.used.insert(2, (13, RangeState::Acked)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// AASSS AAAA + /// + AAAAAAAAA + /// = AAAAAAAAAAAA + /// ``` + #[test] + fn mark_acked_3() { + let mut rt = RangeTracker::default(); + rt.mark_acked(1, 2); + rt.mark_sent(3, 3); + rt.mark_acked(8, 4); + + rt.mark_acked(0, 9); + + let canon = RangeTracker { + acked: 12, + ..RangeTracker::default() + }; + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// SSS + /// + AAAA + /// = AAAASS + /// ``` + #[test] + fn mark_acked_4() { + let mut rt = RangeTracker::default(); + rt.mark_sent(3, 3); + + rt.mark_acked(0, 4); + + let mut canon = RangeTracker { + acked: 4, + ..Default::default() + }; + canon.used.insert(4, (2, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// AAAAAASSS + /// + AAA + /// = AAAAAASSS + /// ``` + #[test] + fn mark_acked_5() { + let mut rt = RangeTracker::default(); + rt.mark_acked(0, 6); + rt.mark_sent(6, 3); + + rt.mark_acked(3, 3); + + let mut canon = RangeTracker { + acked: 6, + ..RangeTracker::default() + }; + canon.used.insert(6, (3, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// AAA AAA AAA + /// + AAAAAAA + /// = AAAAAAAAAAAAA + /// ``` + #[test] + fn mark_acked_6() { + let mut rt = RangeTracker::default(); + rt.mark_acked(3, 3); + rt.mark_acked(8, 3); + rt.mark_acked(13, 3); + + rt.mark_acked(6, 7); + + let mut canon = RangeTracker::default(); + canon.used.insert(3, (13, RangeState::Acked)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// AAA AAA + /// + AAA + /// = AAAAAAAA + /// ``` + #[test] + fn mark_acked_7() { + let mut rt = RangeTracker::default(); + rt.mark_acked(3, 3); + rt.mark_acked(8, 3); + + rt.mark_acked(6, 3); + + let mut canon = RangeTracker::default(); + canon.used.insert(3, (8, RangeState::Acked)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// SSSSSSSS + /// + AAAA + /// = SSAAAASS + /// ``` + #[test] + fn mark_acked_8() { + let mut rt = RangeTracker::default(); + rt.mark_sent(0, 8); + + rt.mark_acked(2, 4); + + let mut canon = RangeTracker::default(); + canon.used.insert(0, (2, RangeState::Sent)); + canon.used.insert(2, (4, RangeState::Acked)); + canon.used.insert(6, (2, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_acked` correctly handles all paths. + /// ```ignore + /// SSS + /// + AAA + /// = AAA SSS + /// ``` + #[test] + fn mark_acked_9() { + let mut rt = RangeTracker::default(); + rt.mark_sent(5, 3); + + rt.mark_acked(0, 3); + + let mut canon = RangeTracker { + acked: 3, + ..Default::default() + }; + canon.used.insert(5, (3, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_sent` correctly handles all paths. + /// ```ignore + /// AAA AAA SSS + /// + SSSSSSSSSSSS + /// = AAASSSAAASSSSSS + /// ``` + #[test] + fn mark_sent_1() { + let mut rt = RangeTracker::default(); + rt.mark_acked(0, 3); + rt.mark_acked(6, 3); + rt.mark_sent(12, 3); + + rt.mark_sent(0, 12); + + let mut canon = RangeTracker { + acked: 3, + ..RangeTracker::default() + }; + canon.used.insert(3, (3, RangeState::Sent)); + canon.used.insert(6, (3, RangeState::Acked)); + canon.used.insert(9, (6, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_sent` correctly handles all paths. + /// ```ignore + /// AAASS AAA S SSSS + /// + SSSSSSSSSSSSS + /// = AAASSSAAASSSSSSS + /// ``` + #[test] + fn mark_sent_2() { + let mut rt = RangeTracker::default(); + rt.mark_acked(0, 3); + rt.mark_sent(3, 2); + rt.mark_acked(6, 3); + rt.mark_sent(10, 1); + rt.mark_sent(12, 4); + + rt.mark_sent(0, 13); + + let mut canon = RangeTracker { + acked: 3, + ..RangeTracker::default() + }; + canon.used.insert(3, (3, RangeState::Sent)); + canon.used.insert(6, (3, RangeState::Acked)); + canon.used.insert(9, (7, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_sent` correctly handles all paths. + /// ```ignore + /// AAA AAA + /// + SSSS + /// = AAASSAAA + /// ``` + #[test] + fn mark_sent_3() { + let mut rt = RangeTracker::default(); + rt.mark_acked(0, 3); + rt.mark_acked(5, 3); + + rt.mark_sent(2, 4); + + let mut canon = RangeTracker { + acked: 3, + ..RangeTracker::default() + }; + canon.used.insert(3, (2, RangeState::Sent)); + canon.used.insert(5, (3, RangeState::Acked)); + assert_eq!(rt, canon); + } + + /// Check that `marked_sent` correctly handles all paths. + /// ```ignore + /// SSS AAA SS + /// + SSSSSSSS + /// = SSSSSAAASSSS + /// ``` + #[test] + fn mark_sent_4() { + let mut rt = RangeTracker::default(); + rt.mark_sent(0, 3); + rt.mark_acked(5, 3); + rt.mark_sent(10, 2); + + rt.mark_sent(2, 8); + + let mut canon = RangeTracker::default(); + canon.used.insert(0, (5, RangeState::Sent)); + canon.used.insert(5, (3, RangeState::Acked)); + canon.used.insert(8, (4, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_sent` correctly handles all paths. + /// ```ignore + /// AAA + /// + SSSSSS + /// = AAASSS + /// ``` + #[test] + fn mark_sent_5() { + let mut rt = RangeTracker::default(); + rt.mark_acked(3, 3); + + rt.mark_sent(3, 6); + + let mut canon = RangeTracker::default(); + canon.used.insert(3, (3, RangeState::Acked)); + canon.used.insert(6, (3, RangeState::Sent)); + assert_eq!(rt, canon); + } + + /// Check that `marked_sent` correctly handles all paths. + /// ```ignore + /// SSSSS + /// + SSS + /// = SSSSS + /// ``` + #[test] + fn mark_sent_6() { + let mut rt = RangeTracker::default(); + rt.mark_sent(0, 5); + + rt.mark_sent(1, 3); + + let mut canon = RangeTracker::default(); + canon.used.insert(0, (5, RangeState::Sent)); + assert_eq!(rt, canon); + } + #[test] fn unmark_sent_start() { let mut rt = RangeTracker::default(); - rt.mark_range(0, 5, RangeState::Sent); + rt.mark_sent(0, 5); assert_eq!(rt.highest_offset(), 5); assert_eq!(rt.acked_from_zero(), 0); @@ -1702,13 +2158,13 @@ mod tests { fn unmark_sent_middle() { let mut rt = RangeTracker::default(); - rt.mark_range(0, 5, RangeState::Acked); + rt.mark_acked(0, 5); assert_eq!(rt.highest_offset(), 5); assert_eq!(rt.acked_from_zero(), 5); - rt.mark_range(5, 5, RangeState::Sent); + rt.mark_sent(5, 5); assert_eq!(rt.highest_offset(), 10); assert_eq!(rt.acked_from_zero(), 5); - rt.mark_range(10, 5, RangeState::Acked); + rt.mark_acked(10, 5); assert_eq!(rt.highest_offset(), 15); assert_eq!(rt.acked_from_zero(), 5); assert_eq!(rt.first_unmarked_range(), (15, None)); @@ -1723,10 +2179,10 @@ mod tests { fn unmark_sent_end() { let mut rt = RangeTracker::default(); - rt.mark_range(0, 5, RangeState::Acked); + rt.mark_acked(0, 5); assert_eq!(rt.highest_offset(), 5); assert_eq!(rt.acked_from_zero(), 5); - rt.mark_range(5, 5, RangeState::Sent); + rt.mark_sent(5, 5); assert_eq!(rt.highest_offset(), 10); assert_eq!(rt.acked_from_zero(), 5); assert_eq!(rt.first_unmarked_range(), (10, None)); @@ -1752,11 +2208,11 @@ mod tests { } #[test] - fn test_unmark_range() { + fn unmark_range() { let mut rt = RangeTracker::default(); - rt.mark_range(5, 5, RangeState::Acked); - rt.mark_range(10, 5, RangeState::Sent); + rt.mark_acked(5, 5); + rt.mark_sent(10, 5); // Should unmark sent but not acked range rt.unmark_range(7, 6); @@ -1772,11 +2228,11 @@ mod tests { (&13, &(2, RangeState::Sent)) ); assert!(rt.used.iter().nth(2).is_none()); - rt.mark_range(0, 5, RangeState::Sent); + rt.mark_sent(0, 5); let res = rt.first_unmarked_range(); assert_eq!(res, (10, Some(3))); - rt.mark_range(10, 3, RangeState::Sent); + rt.mark_sent(10, 3); let res = rt.first_unmarked_range(); assert_eq!(res, (15, None)); @@ -1790,14 +2246,15 @@ mod tests { assert_eq!(txb.avail(), SEND_BUFFER_SIZE); // Fill the buffer - assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE); + let big_buf = vec![1; SEND_BUFFER_SIZE * 2]; + assert_eq!(txb.send(&big_buf), SEND_BUFFER_SIZE); assert!(matches!(txb.next_bytes(), - Some((0, x)) if x.len()==SEND_BUFFER_SIZE + Some((0, x)) if x.len() == SEND_BUFFER_SIZE && x.iter().all(|ch| *ch == 1))); // Mark almost all as sent. Get what's left let one_byte_from_end = SEND_BUFFER_SIZE as u64 - 1; - txb.mark_as_sent(0, one_byte_from_end as usize); + txb.mark_as_sent(0, usize::try_from(one_byte_from_end).unwrap()); assert!(matches!(txb.next_bytes(), Some((start, x)) if x.len() == 1 && start == one_byte_from_end @@ -1826,14 +2283,14 @@ mod tests { // Contig acked range at start means it can be removed from buffer // Impl of vecdeque should now result in a split buffer when more data // is sent - txb.mark_as_acked(0, five_bytes_from_end as usize); + txb.mark_as_acked(0, usize::try_from(five_bytes_from_end).unwrap()); assert_eq!(txb.send(&[2; 30]), 30); // Just get 5 even though there is more assert!(matches!(txb.next_bytes(), Some((start, x)) if x.len() == 5 && start == five_bytes_from_end && x.iter().all(|ch| *ch == 1))); - assert_eq!(txb.retired, five_bytes_from_end); + assert_eq!(txb.retired(), five_bytes_from_end); assert_eq!(txb.buffered(), 35); // Marking that bit as sent should let the last contig bit be returned @@ -1852,7 +2309,8 @@ mod tests { assert_eq!(txb.avail(), SEND_BUFFER_SIZE); // Fill the buffer - assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE); + let big_buf = vec![1; SEND_BUFFER_SIZE * 2]; + assert_eq!(txb.send(&big_buf), SEND_BUFFER_SIZE); assert!(matches!(txb.next_bytes(), Some((0, x)) if x.len()==SEND_BUFFER_SIZE && x.iter().all(|ch| *ch == 1))); @@ -1860,7 +2318,7 @@ mod tests { // As above let forty_bytes_from_end = SEND_BUFFER_SIZE as u64 - 40; - txb.mark_as_acked(0, forty_bytes_from_end as usize); + txb.mark_as_acked(0, usize::try_from(forty_bytes_from_end).unwrap()); assert!(matches!(txb.next_bytes(), Some((start, x)) if x.len() == 40 && start == forty_bytes_from_end @@ -1888,7 +2346,7 @@ mod tests { // Ack entire first slice and into second slice let ten_bytes_past_end = SEND_BUFFER_SIZE as u64 + 10; - txb.mark_as_acked(0, ten_bytes_past_end as usize); + txb.mark_as_acked(0, usize::try_from(ten_bytes_past_end).unwrap()); // Get up to marked range A assert!(matches!(txb.next_bytes(), @@ -1910,7 +2368,7 @@ mod tests { } #[test] - fn test_stream_tx() { + fn stream_tx() { let conn_fc = connection_fc(4096); let conn_events = ConnectionEvents::default(); @@ -1926,22 +2384,23 @@ mod tests { } // Should hit stream flow control limit before filling up send buffer - let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); + let big_buf = vec![4; SEND_BUFFER_SIZE + 100]; + let res = s.send(&big_buf[..SEND_BUFFER_SIZE]).unwrap(); assert_eq!(res, 1024 - 100); // should do nothing, max stream data already 1024 s.set_max_stream_data(1024); - let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); + let res = s.send(&big_buf[..SEND_BUFFER_SIZE]).unwrap(); assert_eq!(res, 0); // should now hit the conn flow control (4096) s.set_max_stream_data(1_048_576); - let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap(); + let res = s.send(&big_buf[..SEND_BUFFER_SIZE]).unwrap(); assert_eq!(res, 3072); // should now hit the tx buffer size conn_fc.borrow_mut().update(SEND_BUFFER_SIZE as u64); - let res = s.send(&[4; SEND_BUFFER_SIZE + 100]).unwrap(); + let res = s.send(&big_buf).unwrap(); assert_eq!(res, SEND_BUFFER_SIZE - 4096); // TODO(agrover@mozilla.com): test ooo acks somehow @@ -2012,10 +2471,8 @@ mod tests { // tx buffer size. assert_eq!(s.avail(), SEND_BUFFER_SIZE - 4); - assert_eq!( - s.send(&[b'a'; SEND_BUFFER_SIZE]).unwrap(), - SEND_BUFFER_SIZE - 4 - ); + let big_buf = vec![b'a'; SEND_BUFFER_SIZE]; + assert_eq!(s.send(&big_buf).unwrap(), SEND_BUFFER_SIZE - 4); // No event because still blocked by tx buffer full s.set_max_stream_data(2_000_000_000); @@ -2395,8 +2852,7 @@ mod tests { ); let mut send_buf = TxBuffer::new(); - send_buf.retired = u64::try_from(offset).unwrap(); - send_buf.ranges.mark_range(0, offset, RangeState::Acked); + send_buf.ranges.mark_acked(0, offset); let mut fc = SenderFlowControl::new(StreamId::from(stream), MAX_VARINT); fc.consume(offset); let conn_fc = Rc::new(RefCell::new(SenderFlowControl::new((), MAX_VARINT))); |