summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-transport/src/send_stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/neqo-transport/src/send_stream.rs')
-rw-r--r--third_party/rust/neqo-transport/src/send_stream.rs904
1 files changed, 680 insertions, 224 deletions
diff --git a/third_party/rust/neqo-transport/src/send_stream.rs b/third_party/rust/neqo-transport/src/send_stream.rs
index 5feb785ac6..8771ec7765 100644
--- a/third_party/rust/neqo-transport/src/send_stream.rs
+++ b/third_party/rust/neqo-transport/src/send_stream.rs
@@ -9,8 +9,7 @@
use std::{
cell::RefCell,
cmp::{max, min, Ordering},
- collections::{BTreeMap, VecDeque},
- convert::TryFrom,
+ collections::{btree_map::Entry, BTreeMap, VecDeque},
hash::{Hash, Hasher},
mem,
ops::Add,
@@ -18,7 +17,7 @@ use std::{
};
use indexmap::IndexMap;
-use neqo_common::{qdebug, qerror, qinfo, qtrace, Encoder, Role};
+use neqo_common::{qdebug, qerror, qtrace, Encoder, Role};
use smallvec::SmallVec;
use crate::{
@@ -111,7 +110,7 @@ impl Add<RetransmissionPriority> for TransmissionPriority {
/// If data is lost, this determines the priority that applies to retransmissions
/// of that data.
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum RetransmissionPriority {
/// Prioritize retransmission at a fixed priority.
/// With this, it is possible to prioritize retransmissions lower than transmissions.
@@ -123,19 +122,14 @@ pub enum RetransmissionPriority {
Same,
/// Increase the priority of retransmissions (the default).
/// Retransmissions of `Critical` or `Important` aren't elevated at all.
+ #[default]
Higher,
/// Increase the priority of retransmissions a lot.
/// This is useful for streams that are particularly exposed to head-of-line blocking.
MuchHigher,
}
-impl Default for RetransmissionPriority {
- fn default() -> Self {
- Self::Higher
- }
-}
-
-#[derive(Debug, PartialEq, Clone, Copy)]
+#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum RangeState {
Sent,
Acked,
@@ -144,169 +138,268 @@ enum RangeState {
/// Track ranges in the stream as sent or acked. Acked implies sent. Not in a
/// range implies needing-to-be-sent, either initially or as a retransmission.
#[derive(Debug, Default, PartialEq)]
-struct RangeTracker {
- // offset, (len, RangeState). Use u64 for len because ranges can exceed 32bits.
+pub struct RangeTracker {
+ /// The number of bytes that have been acknowledged starting from offset 0.
+ acked: u64,
+ /// A map that tracks the state of ranges.
+ /// Keys are the offset of the start of the range.
+ /// Values is a tuple of the range length and its state.
used: BTreeMap<u64, (u64, RangeState)>,
+ /// This is a cache for the output of `first_unmarked_range`, which we check a lot.
+ first_unmarked: Option<(u64, Option<u64>)>,
}
impl RangeTracker {
fn highest_offset(&self) -> u64 {
self.used
- .range(..)
- .next_back()
- .map_or(0, |(k, (v, _))| *k + *v)
+ .last_key_value()
+ .map_or(self.acked, |(&k, &(v, _))| k + v)
}
fn acked_from_zero(&self) -> u64 {
- self.used
- .get(&0)
- .filter(|(_, state)| *state == RangeState::Acked)
- .map_or(0, |(v, _)| *v)
+ self.acked
}
/// Find the first unmarked range. If all are contiguous, this will return
- /// (highest_offset(), None).
- fn first_unmarked_range(&self) -> (u64, Option<u64>) {
- let mut prev_end = 0;
+ /// (`highest_offset()`, None).
+ fn first_unmarked_range(&mut self) -> (u64, Option<u64>) {
+ if let Some(first_unmarked) = self.first_unmarked {
+ return first_unmarked;
+ }
+
+ let mut prev_end = self.acked;
- for (cur_off, (cur_len, _)) in &self.used {
- if prev_end == *cur_off {
+ for (&cur_off, &(cur_len, _)) in &self.used {
+ if prev_end == cur_off {
prev_end = cur_off + cur_len;
} else {
- return (prev_end, Some(cur_off - prev_end));
+ let res = (prev_end, Some(cur_off - prev_end));
+ self.first_unmarked = Some(res);
+ return res;
}
}
+ self.first_unmarked = Some((prev_end, None));
(prev_end, None)
}
- /// Turn one range into a list of subranges that align with existing
- /// ranges.
- /// Check impermissible overlaps in subregions: Sent cannot overwrite Acked.
- //
- // e.g. given N is new and ABC are existing:
- // NNNNNNNNNNNNNNNN
- // AAAAA BBBCCCCC ...then we want 5 chunks:
- // 1122222333444555
- //
- // but also if we have this:
- // NNNNNNNNNNNNNNNN
- // AAAAAAAAAA BBBB ...then break existing A and B ranges up:
- //
- // 1111111122222233
- // aaAAAAAAAA BBbb
- //
- // Doing all this work up front should make handling each chunk much
- // easier.
- fn chunk_range_on_edges(
- &mut self,
- new_off: u64,
- new_len: u64,
- new_state: RangeState,
- ) -> Vec<(u64, u64, RangeState)> {
- let mut tmp_off = new_off;
- let mut tmp_len = new_len;
- let mut v = Vec::new();
-
- // cut previous overlapping range if needed
- let prev = self.used.range_mut(..tmp_off).next_back();
- if let Some((prev_off, (prev_len, prev_state))) = prev {
- let prev_state = *prev_state;
- let overlap = (*prev_off + *prev_len).saturating_sub(new_off);
- *prev_len -= overlap;
- if overlap > 0 {
- self.used.insert(new_off, (overlap, prev_state));
+ /// When the range of acknowledged bytes from zero increases, we need to drop any
+ /// ranges within that span AND maybe extend it to include any adjacent acknowledged ranges.
+ fn coalesce_acked(&mut self) {
+ while let Some(e) = self.used.first_entry() {
+ match self.acked.cmp(e.key()) {
+ Ordering::Greater => {
+ let (off, (len, state)) = e.remove_entry();
+ let overflow = (off + len).saturating_sub(self.acked);
+ if overflow > 0 {
+ if state == RangeState::Acked {
+ self.acked += overflow;
+ } else {
+ self.used.insert(self.acked, (overflow, state));
+ }
+ break;
+ }
+ }
+ Ordering::Equal => {
+ if e.get().1 == RangeState::Acked {
+ let (len, _) = e.remove();
+ self.acked += len;
+ }
+ break;
+ }
+ Ordering::Less => break,
}
}
+ }
- let mut last_existing_remaining = None;
- for (off, (len, state)) in self.used.range(tmp_off..tmp_off + tmp_len) {
- // Create chunk for "overhang" before an existing range
- if tmp_off < *off {
- let sub_len = off - tmp_off;
- v.push((tmp_off, sub_len, new_state));
- tmp_off += sub_len;
- tmp_len -= sub_len;
- }
+ /// Mark a range as acknowledged. This is simpler than marking a range as sent
+ /// because an acknowledged range can never turn back into a sent range, so
+ /// this function can just override the entire range.
+ ///
+ /// The only tricky parts are making sure that we maintain `self.acked`,
+ /// which is the first acknowledged range. And making sure that we don't create
+ /// ranges of the same type that are adjacent; these need to be merged.
+ #[allow(clippy::missing_panics_doc)] // with a >16 exabyte packet on a 128-bit machine, maybe
+ pub fn mark_acked(&mut self, new_off: u64, new_len: usize) {
+ let end = new_off + u64::try_from(new_len).unwrap();
+ let new_off = max(self.acked, new_off);
+ let mut new_len = end.saturating_sub(new_off);
+ if new_len == 0 {
+ return;
+ }
- // Create chunk to match existing range
- let sub_len = min(*len, tmp_len);
- let remaining_len = len - sub_len;
- if new_state == RangeState::Sent && *state == RangeState::Acked {
- qinfo!(
- "Attempted to downgrade overlapping range Acked range {}-{} with Sent {}-{}",
- off,
- len,
- new_off,
- new_len
- );
- } else {
- v.push((tmp_off, sub_len, new_state));
- }
- tmp_off += sub_len;
- tmp_len -= sub_len;
+ self.first_unmarked = None;
+ if new_off == self.acked {
+ self.acked += new_len;
+ self.coalesce_acked();
+ return;
+ }
+ let mut new_end = new_off + new_len;
- if remaining_len > 0 {
- last_existing_remaining = Some((*off, sub_len, remaining_len, *state));
+ // Get all existing ranges that start within this new range.
+ let mut covered = self
+ .used
+ .range(new_off..new_end)
+ .map(|(&k, _)| k)
+ .collect::<SmallVec<[_; 8]>>();
+
+ if let Entry::Occupied(next_entry) = self.used.entry(new_end) {
+ // Check if the very next entry is the same type as this.
+ if next_entry.get().1 == RangeState::Acked {
+ // If is is acked, drop it and extend this new range.
+ let (extra_len, _) = next_entry.remove();
+ new_len += extra_len;
+ new_end += extra_len;
+ }
+ } else if let Some(last) = covered.pop() {
+ // Otherwise, the last of the existing ranges might overhang this one by some.
+ let (old_off, (old_len, old_state)) = self.used.remove_entry(&last).unwrap(); // can't fail
+ let remainder = (old_off + old_len).saturating_sub(new_end);
+ if remainder > 0 {
+ if old_state == RangeState::Acked {
+ // Just extend the current range.
+ new_len += remainder;
+ new_end += remainder;
+ } else {
+ self.used.insert(new_end, (remainder, RangeState::Sent));
+ }
}
}
-
- // Maybe break last existing range in two so that a final chunk will
- // have the same length as an existing range entry
- if let Some((off, sub_len, remaining_len, state)) = last_existing_remaining {
- *self.used.get_mut(&off).expect("must be there") = (sub_len, state);
- self.used.insert(off + sub_len, (remaining_len, state));
+ // All covered ranges can just be trashed.
+ for k in covered {
+ self.used.remove(&k);
}
- // Create final chunk if anything remains of the new range
- if tmp_len > 0 {
- v.push((tmp_off, tmp_len, new_state));
+ // Now either merge with a preceding acked range
+ // or cut a preceding sent range as needed.
+ let prev = self.used.range_mut(..new_off).next_back();
+ if let Some((prev_off, (prev_len, prev_state))) = prev {
+ let prev_end = *prev_off + *prev_len;
+ if prev_end >= new_off {
+ if *prev_state == RangeState::Sent {
+ *prev_len = new_off - *prev_off;
+ if prev_end > new_end {
+ // There is some extra sent range after the new acked range.
+ self.used
+ .insert(new_end, (prev_end - new_end, RangeState::Sent));
+ }
+ } else {
+ *prev_len = max(prev_end, new_end) - *prev_off;
+ return;
+ }
+ }
+ }
+ self.used.insert(new_off, (new_len, RangeState::Acked));
+ }
+
+ /// Turn a single sent range into a list of subranges that align with existing
+ /// acknowledged ranges.
+ ///
+ /// This is more complicated than adding acked ranges because any acked ranges
+ /// need to be kept in place, with sent ranges filling the gaps.
+ ///
+ /// This means:
+ /// ```ignore
+ /// AAA S AAAS AAAAA
+ /// + SSSSSSSSSSSSS
+ /// = AAASSSAAASSAAAAA
+ /// ```
+ ///
+ /// But we also have to ensure that:
+ /// ```ignore
+ /// SSSS
+ /// + SS
+ /// = SSSSSS
+ /// ```
+ /// and
+ /// ```ignore
+ /// SSSSS
+ /// + SS
+ /// = SSSSSS
+ /// ```
+ #[allow(clippy::missing_panics_doc)] // not possible
+ pub fn mark_sent(&mut self, mut new_off: u64, new_len: usize) {
+ let new_end = new_off + u64::try_from(new_len).unwrap();
+ new_off = max(self.acked, new_off);
+ let mut new_len = new_end.saturating_sub(new_off);
+ if new_len == 0 {
+ return;
}
- v
- }
+ self.first_unmarked = None;
- /// Merge contiguous Acked ranges into the first entry (0). This range may
- /// be dropped from the send buffer.
- fn coalesce_acked_from_zero(&mut self) {
- let acked_range_from_zero = self
+ // Get all existing ranges that start within this new range.
+ let covered = self
.used
- .get_mut(&0)
- .filter(|(_, state)| *state == RangeState::Acked)
- .map(|(len, _)| *len);
-
- if let Some(len_from_zero) = acked_range_from_zero {
- let mut new_len_from_zero = len_from_zero;
-
- // See if there's another Acked range entry contiguous to this one
- while let Some((next_len, _)) = self
- .used
- .get(&new_len_from_zero)
- .filter(|(_, state)| *state == RangeState::Acked)
- {
- let to_remove = new_len_from_zero;
- new_len_from_zero += *next_len;
- self.used.remove(&to_remove);
- }
-
- if len_from_zero != new_len_from_zero {
- self.used.get_mut(&0).expect("must be there").0 = new_len_from_zero;
+ .range(new_off..(new_off + new_len))
+ .map(|(&k, _)| k)
+ .collect::<SmallVec<[u64; 8]>>();
+
+ if let Entry::Occupied(next_entry) = self.used.entry(new_end) {
+ if next_entry.get().1 == RangeState::Sent {
+ // Check if the very next entry is the same type as this, so it can be merged.
+ let (extra_len, _) = next_entry.remove();
+ new_len += extra_len;
}
}
- }
- fn mark_range(&mut self, off: u64, len: usize, state: RangeState) {
- if len == 0 {
- qinfo!("mark 0-length range at {}", off);
- return;
- }
+ // Merge with any preceding sent range that might overlap,
+ // or cut the head of this if the preceding range is acked.
+ let prev = self.used.range(..new_off).next_back();
+ if let Some((&prev_off, &(prev_len, prev_state))) = prev {
+ if prev_off + prev_len >= new_off {
+ let overlap = prev_off + prev_len - new_off;
+ new_len = new_len.saturating_sub(overlap);
+ if new_len == 0 {
+ // The previous range completely covers this one (no more to do).
+ return;
+ }
- let subranges = self.chunk_range_on_edges(off, len as u64, state);
+ if prev_state == RangeState::Acked {
+ // The previous range is acked, so it cuts this one.
+ new_off += overlap;
+ } else {
+ // Extend the current range backwards.
+ new_off = prev_off;
+ new_len += prev_len;
+ // The previous range will be updated below.
+ // It might need to be cut because of a covered acked range.
+ }
+ }
+ }
- for (sub_off, sub_len, sub_state) in subranges {
- self.used.insert(sub_off, (sub_len, sub_state));
+ // Now interleave new sent chunks with any existing acked chunks.
+ for old_off in covered {
+ let Entry::Occupied(e) = self.used.entry(old_off) else {
+ unreachable!();
+ };
+ let &(old_len, old_state) = e.get();
+ if old_state == RangeState::Acked {
+ // Now we have to insert a chunk ahead of this acked chunk.
+ let chunk_len = old_off - new_off;
+ if chunk_len > 0 {
+ self.used.insert(new_off, (chunk_len, RangeState::Sent));
+ }
+ let included = chunk_len + old_len;
+ new_len = new_len.saturating_sub(included);
+ if new_len == 0 {
+ return;
+ }
+ new_off += included;
+ } else {
+ let overhang = (old_off + old_len).saturating_sub(new_off + new_len);
+ new_len += overhang;
+ if *e.key() != new_off {
+ // Retain a sent entry at `new_off`.
+ // This avoids the work of removing and re-creating an entry.
+ // The value will be overwritten when the next insert occurs,
+ // either when this loop hits an acked range (above)
+ // or for any remainder (below).
+ e.remove();
+ }
+ }
}
- self.coalesce_acked_from_zero();
+ self.used.insert(new_off, (new_len, RangeState::Sent));
}
fn unmark_range(&mut self, off: u64, len: usize) {
@@ -315,6 +408,7 @@ impl RangeTracker {
return;
}
+ self.first_unmarked = None;
let len = u64::try_from(len).unwrap();
let end_off = off + len;
@@ -376,6 +470,9 @@ impl RangeTracker {
}
/// Unmark all sent ranges.
+ /// # Panics
+ /// On 32-bit machines where far too much is sent before calling this.
+ /// Note that this should not be called for handshakes, which should never exceed that limit.
pub fn unmark_sent(&mut self) {
self.unmark_range(0, usize::try_from(self.highest_offset()).unwrap());
}
@@ -384,36 +481,37 @@ impl RangeTracker {
/// Buffer to contain queued bytes and track their state.
#[derive(Debug, Default, PartialEq)]
pub struct TxBuffer {
- retired: u64, // contig acked bytes, no longer in buffer
send_buf: VecDeque<u8>, // buffer of not-acked bytes
ranges: RangeTracker, // ranges in buffer that have been sent or acked
}
impl TxBuffer {
+ #[must_use]
pub fn new() -> Self {
Self::default()
}
- /// Attempt to add some or all of the passed-in buffer to the TxBuffer.
+ /// Attempt to add some or all of the passed-in buffer to the `TxBuffer`.
pub fn send(&mut self, buf: &[u8]) -> usize {
let can_buffer = min(SEND_BUFFER_SIZE - self.buffered(), buf.len());
if can_buffer > 0 {
self.send_buf.extend(&buf[..can_buffer]);
- assert!(self.send_buf.len() <= SEND_BUFFER_SIZE);
+ debug_assert!(self.send_buf.len() <= SEND_BUFFER_SIZE);
}
can_buffer
}
- pub fn next_bytes(&self) -> Option<(u64, &[u8])> {
+ #[allow(clippy::missing_panics_doc)] // These are not possible.
+ pub fn next_bytes(&mut self) -> Option<(u64, &[u8])> {
let (start, maybe_len) = self.ranges.first_unmarked_range();
- if start == self.retired + u64::try_from(self.buffered()).unwrap() {
+ if start == self.retired() + u64::try_from(self.buffered()).unwrap() {
return None;
}
// Convert from ranges-relative-to-zero to
// ranges-relative-to-buffer-start
- let buff_off = usize::try_from(start - self.retired).unwrap();
+ let buff_off = usize::try_from(start - self.retired()).unwrap();
// Deque returns two slices. Create a subslice from whichever
// one contains the first unmarked data.
@@ -437,23 +535,22 @@ impl TxBuffer {
}
pub fn mark_as_sent(&mut self, offset: u64, len: usize) {
- self.ranges.mark_range(offset, len, RangeState::Sent);
+ self.ranges.mark_sent(offset, len);
}
+ #[allow(clippy::missing_panics_doc)] // Not possible here.
pub fn mark_as_acked(&mut self, offset: u64, len: usize) {
- self.ranges.mark_range(offset, len, RangeState::Acked);
+ let prev_retired = self.retired();
+ self.ranges.mark_acked(offset, len);
- // We can drop contig acked range from the buffer
- let new_retirable = self.ranges.acked_from_zero() - self.retired;
+ // Any newly-retired bytes can be dropped from the buffer.
+ let new_retirable = self.retired() - prev_retired;
debug_assert!(new_retirable <= self.buffered() as u64);
- let keep_len =
- self.buffered() - usize::try_from(new_retirable).expect("should fit in usize");
+ let keep = self.buffered() - usize::try_from(new_retirable).unwrap();
// Truncate front
- self.send_buf.rotate_left(self.buffered() - keep_len);
- self.send_buf.truncate(keep_len);
-
- self.retired += new_retirable;
+ self.send_buf.rotate_left(self.buffered() - keep);
+ self.send_buf.truncate(keep);
}
pub fn mark_as_lost(&mut self, offset: u64, len: usize) {
@@ -465,8 +562,9 @@ impl TxBuffer {
self.ranges.unmark_sent();
}
+ #[must_use]
pub fn retired(&self) -> u64 {
- self.retired
+ self.ranges.acked_from_zero()
}
fn buffered(&self) -> usize {
@@ -478,7 +576,7 @@ impl TxBuffer {
}
fn used(&self) -> u64 {
- self.retired + u64::try_from(self.buffered()).unwrap()
+ self.retired() + u64::try_from(self.buffered()).unwrap()
}
}
@@ -693,6 +791,7 @@ impl SendStream {
self.fair = make_fair;
}
+ #[must_use]
pub fn is_fair(&self) -> bool {
self.fair
}
@@ -706,6 +805,7 @@ impl SendStream {
self.retransmission_priority = retransmission;
}
+ #[must_use]
pub fn sendorder(&self) -> Option<SendOrder> {
self.sendorder
}
@@ -715,6 +815,7 @@ impl SendStream {
}
/// If all data has been buffered or written, how much was sent.
+ #[must_use]
pub fn final_size(&self) -> Option<u64> {
match &self.state {
SendStreamState::DataSent { send_buf, .. } => Some(send_buf.used()),
@@ -723,10 +824,13 @@ impl SendStream {
}
}
+ #[must_use]
pub fn stats(&self) -> SendStreamStats {
SendStreamStats::new(self.bytes_written(), self.bytes_sent, self.bytes_acked())
}
+ #[must_use]
+ #[allow(clippy::missing_panics_doc)] // not possible
pub fn bytes_written(&self) -> u64 {
match &self.state {
SendStreamState::Send { send_buf, .. } | SendStreamState::DataSent { send_buf, .. } => {
@@ -749,6 +853,7 @@ impl SendStream {
}
}
+ #[must_use]
pub fn bytes_acked(&self) -> u64 {
match &self.state {
SendStreamState::Send { send_buf, .. } | SendStreamState::DataSent { send_buf, .. } => {
@@ -766,11 +871,13 @@ impl SendStream {
/// offset.
fn next_bytes(&mut self, retransmission_only: bool) -> Option<(u64, &[u8])> {
match self.state {
- SendStreamState::Send { ref send_buf, .. } => {
- send_buf.next_bytes().and_then(|(offset, slice)| {
+ SendStreamState::Send {
+ ref mut send_buf, ..
+ } => {
+ let result = send_buf.next_bytes();
+ if let Some((offset, slice)) = result {
if retransmission_only {
qtrace!(
- [self],
"next_bytes apply retransmission limit at {}",
self.retransmission_offset
);
@@ -786,13 +893,16 @@ impl SendStream {
} else {
Some((offset, slice))
}
- })
+ } else {
+ None
+ }
}
SendStreamState::DataSent {
- ref send_buf,
+ ref mut send_buf,
fin_sent,
..
} => {
+ let used = send_buf.used(); // immutable first
let bytes = send_buf.next_bytes();
if bytes.is_some() {
bytes
@@ -800,7 +910,7 @@ impl SendStream {
None
} else {
// Send empty stream frame with fin set
- Some((send_buf.used(), &[]))
+ Some((used, &[]))
}
}
SendStreamState::Ready { .. }
@@ -833,6 +943,7 @@ impl SendStream {
}
/// Maybe write a `STREAM` frame.
+ #[allow(clippy::missing_panics_doc)] // not possible
pub fn write_stream_frame(
&mut self,
priority: TransmissionPriority,
@@ -995,6 +1106,7 @@ impl SendStream {
}
}
+ #[allow(clippy::missing_panics_doc)] // not possible
pub fn mark_as_sent(&mut self, offset: u64, len: usize, fin: bool) {
self.bytes_sent = max(self.bytes_sent, offset + u64::try_from(len).unwrap());
@@ -1010,6 +1122,7 @@ impl SendStream {
}
}
+ #[allow(clippy::missing_panics_doc)] // not possible
pub fn mark_as_acked(&mut self, offset: u64, len: usize, fin: bool) {
match self.state {
SendStreamState::Send {
@@ -1047,6 +1160,7 @@ impl SendStream {
}
}
+ #[allow(clippy::missing_panics_doc)] // not possible
pub fn mark_as_lost(&mut self, offset: u64, len: usize, fin: bool) {
self.retransmission_offset = max(
self.retransmission_offset,
@@ -1075,6 +1189,7 @@ impl SendStream {
/// Bytes sendable on stream. Constrained by stream credit available,
/// connection credit available, and space in the tx buffer.
+ #[must_use]
pub fn avail(&self) -> usize {
if let SendStreamState::Ready { fc, conn_fc } | SendStreamState::Send { fc, conn_fc, .. } =
&self.state
@@ -1100,6 +1215,7 @@ impl SendStream {
}
}
+ #[must_use]
pub fn is_terminal(&self) -> bool {
matches!(
self.state,
@@ -1107,10 +1223,14 @@ impl SendStream {
)
}
+ /// # Errors
+ /// When `buf` is empty or when the stream is already closed.
pub fn send(&mut self, buf: &[u8]) -> Res<usize> {
self.send_internal(buf, false)
}
+ /// # Errors
+ /// When `buf` is empty or when the stream is already closed.
pub fn send_atomic(&mut self, buf: &[u8]) -> Res<usize> {
self.send_internal(buf, true)
}
@@ -1155,9 +1275,9 @@ impl SendStream {
if atomic {
self.send_blocked_if_space_needed(buf.len());
return Ok(0);
- } else {
- &buf[..self.avail()]
}
+
+ &buf[..self.avail()]
} else {
buf
};
@@ -1202,6 +1322,7 @@ impl SendStream {
}
}
+ #[allow(clippy::missing_panics_doc)] // not possible
pub fn reset(&mut self, err: AppError) {
match &self.state {
SendStreamState::Ready { fc, .. } => {
@@ -1296,6 +1417,7 @@ impl OrderGroup {
}
}
+ #[must_use]
pub fn stream_ids(&self) -> &Vec<StreamId> {
&self.vec
}
@@ -1319,26 +1441,24 @@ impl OrderGroup {
next
}
+ /// # Panics
+ /// If the stream ID is already present.
pub fn insert(&mut self, stream_id: StreamId) {
- match self.vec.binary_search(&stream_id) {
- Ok(_) => {
- // element already in vector @ `pos`
- panic!("Duplicate stream_id {}", stream_id)
- }
- Err(pos) => self.vec.insert(pos, stream_id),
- }
+ let Err(pos) = self.vec.binary_search(&stream_id) else {
+ // element already in vector @ `pos`
+ panic!("Duplicate stream_id {stream_id}");
+ };
+ self.vec.insert(pos, stream_id);
}
+ /// # Panics
+ /// If the stream ID is not present.
pub fn remove(&mut self, stream_id: StreamId) {
- match self.vec.binary_search(&stream_id) {
- Ok(pos) => {
- self.vec.remove(pos);
- }
- Err(_) => {
- // element already in vector @ `pos`
- panic!("Missing stream_id {}", stream_id)
- }
- }
+ let Ok(pos) = self.vec.binary_search(&stream_id) else {
+ // element already in vector @ `pos`
+ panic!("Missing stream_id {stream_id}");
+ };
+ self.vec.remove(pos);
}
}
@@ -1579,16 +1699,16 @@ impl SendStreams {
// Iterate the map, but only those without fairness, then iterate
// OrderGroups, then iterate each group
- qdebug!("processing streams... unfair:");
+ qtrace!("processing streams... unfair:");
for stream in self.map.values_mut() {
if !stream.is_fair() {
- qdebug!(" {}", stream);
+ qtrace!(" {}", stream);
if !stream.write_frames_with_early_return(priority, builder, tokens, stats) {
break;
}
}
}
- qdebug!("fair streams:");
+ qtrace!("fair streams:");
let stream_ids = self.regular.iter().chain(
self.sendordered
.values_mut()
@@ -1598,9 +1718,9 @@ impl SendStreams {
for stream_id in stream_ids {
let stream = self.map.get_mut(&stream_id).unwrap();
if let Some(order) = stream.sendorder() {
- qdebug!(" {} ({})", stream_id, order)
+ qtrace!(" {} ({})", stream_id, order);
} else {
- qdebug!(" None")
+ qtrace!(" None");
}
if !stream.write_frames_with_early_return(priority, builder, tokens, stats) {
break;
@@ -1609,7 +1729,7 @@ impl SendStreams {
}
pub fn update_initial_limit(&mut self, remote: &TransportParameters) {
- for (id, ss) in self.map.iter_mut() {
+ for (id, ss) in &mut self.map {
let limit = if id.is_bidi() {
assert!(!id.is_remote_initiated(Role::Client));
remote.get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE)
@@ -1640,55 +1760,391 @@ pub struct SendStreamRecoveryToken {
#[cfg(test)]
mod tests {
- use neqo_common::{event::Provider, hex_with_len, qtrace};
-
- use super::*;
- use crate::events::ConnectionEvent;
+ use std::{cell::RefCell, collections::VecDeque, rc::Rc};
+
+ use neqo_common::{event::Provider, hex_with_len, qtrace, Encoder};
+
+ use super::SendStreamRecoveryToken;
+ use crate::{
+ connection::{RetransmissionPriority, TransmissionPriority},
+ events::ConnectionEvent,
+ fc::SenderFlowControl,
+ packet::PacketBuilder,
+ recovery::{RecoveryToken, StreamRecoveryToken},
+ send_stream::{
+ RangeState, RangeTracker, SendStream, SendStreamState, SendStreams, TxBuffer,
+ },
+ stats::FrameStats,
+ ConnectionEvents, StreamId, SEND_BUFFER_SIZE,
+ };
fn connection_fc(limit: u64) -> Rc<RefCell<SenderFlowControl<()>>> {
Rc::new(RefCell::new(SenderFlowControl::new((), limit)))
}
#[test]
- fn test_mark_range() {
+ fn mark_acked_from_zero() {
let mut rt = RangeTracker::default();
// ranges can go from nothing->Sent if queued for retrans and then
// acks arrive
- rt.mark_range(5, 5, RangeState::Acked);
+ rt.mark_acked(5, 5);
assert_eq!(rt.highest_offset(), 10);
assert_eq!(rt.acked_from_zero(), 0);
- rt.mark_range(10, 4, RangeState::Acked);
+ rt.mark_acked(10, 4);
assert_eq!(rt.highest_offset(), 14);
assert_eq!(rt.acked_from_zero(), 0);
- rt.mark_range(0, 5, RangeState::Sent);
+ rt.mark_sent(0, 5);
assert_eq!(rt.highest_offset(), 14);
assert_eq!(rt.acked_from_zero(), 0);
- rt.mark_range(0, 5, RangeState::Acked);
+ rt.mark_acked(0, 5);
assert_eq!(rt.highest_offset(), 14);
assert_eq!(rt.acked_from_zero(), 14);
- rt.mark_range(12, 20, RangeState::Acked);
+ rt.mark_acked(12, 20);
assert_eq!(rt.highest_offset(), 32);
assert_eq!(rt.acked_from_zero(), 32);
// ack the lot
- rt.mark_range(0, 400, RangeState::Acked);
+ rt.mark_acked(0, 400);
assert_eq!(rt.highest_offset(), 400);
assert_eq!(rt.acked_from_zero(), 400);
// acked trumps sent
- rt.mark_range(0, 200, RangeState::Sent);
+ rt.mark_sent(0, 200);
assert_eq!(rt.highest_offset(), 400);
assert_eq!(rt.acked_from_zero(), 400);
}
+ /// Check that `marked_acked` correctly handles all paths.
+ /// ```ignore
+ /// SSS SSSAAASSS
+ /// + AAAAAAAAA
+ /// = SSSAAAAAAAAASS
+ /// ```
+ #[test]
+ fn mark_acked_1() {
+ let mut rt = RangeTracker::default();
+ rt.mark_sent(0, 3);
+ rt.mark_sent(6, 3);
+ rt.mark_acked(9, 3);
+ rt.mark_sent(12, 3);
+
+ rt.mark_acked(3, 10);
+
+ let mut canon = RangeTracker::default();
+ canon.used.insert(0, (3, RangeState::Sent));
+ canon.used.insert(3, (10, RangeState::Acked));
+ canon.used.insert(13, (2, RangeState::Sent));
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_acked` correctly handles all paths.
+ /// ```ignore
+ /// SSS SSS AAA
+ /// + AAAAAAAAA
+ /// = SSAAAAAAAAAAAA
+ /// ```
+ #[test]
+ fn mark_acked_2() {
+ let mut rt = RangeTracker::default();
+ rt.mark_sent(0, 3);
+ rt.mark_sent(6, 3);
+ rt.mark_acked(12, 3);
+
+ rt.mark_acked(2, 10);
+
+ let mut canon = RangeTracker::default();
+ canon.used.insert(0, (2, RangeState::Sent));
+ canon.used.insert(2, (13, RangeState::Acked));
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_acked` correctly handles all paths.
+ /// ```ignore
+ /// AASSS AAAA
+ /// + AAAAAAAAA
+ /// = AAAAAAAAAAAA
+ /// ```
+ #[test]
+ fn mark_acked_3() {
+ let mut rt = RangeTracker::default();
+ rt.mark_acked(1, 2);
+ rt.mark_sent(3, 3);
+ rt.mark_acked(8, 4);
+
+ rt.mark_acked(0, 9);
+
+ let canon = RangeTracker {
+ acked: 12,
+ ..RangeTracker::default()
+ };
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_acked` correctly handles all paths.
+ /// ```ignore
+ /// SSS
+ /// + AAAA
+ /// = AAAASS
+ /// ```
+ #[test]
+ fn mark_acked_4() {
+ let mut rt = RangeTracker::default();
+ rt.mark_sent(3, 3);
+
+ rt.mark_acked(0, 4);
+
+ let mut canon = RangeTracker {
+ acked: 4,
+ ..Default::default()
+ };
+ canon.used.insert(4, (2, RangeState::Sent));
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_acked` correctly handles all paths.
+ /// ```ignore
+ /// AAAAAASSS
+ /// + AAA
+ /// = AAAAAASSS
+ /// ```
+ #[test]
+ fn mark_acked_5() {
+ let mut rt = RangeTracker::default();
+ rt.mark_acked(0, 6);
+ rt.mark_sent(6, 3);
+
+ rt.mark_acked(3, 3);
+
+ let mut canon = RangeTracker {
+ acked: 6,
+ ..RangeTracker::default()
+ };
+ canon.used.insert(6, (3, RangeState::Sent));
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_acked` correctly handles all paths.
+ /// ```ignore
+ /// AAA AAA AAA
+ /// + AAAAAAA
+ /// = AAAAAAAAAAAAA
+ /// ```
+ #[test]
+ fn mark_acked_6() {
+ let mut rt = RangeTracker::default();
+ rt.mark_acked(3, 3);
+ rt.mark_acked(8, 3);
+ rt.mark_acked(13, 3);
+
+ rt.mark_acked(6, 7);
+
+ let mut canon = RangeTracker::default();
+ canon.used.insert(3, (13, RangeState::Acked));
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_acked` correctly handles all paths.
+ /// ```ignore
+ /// AAA AAA
+ /// + AAA
+ /// = AAAAAAAA
+ /// ```
+ #[test]
+ fn mark_acked_7() {
+ let mut rt = RangeTracker::default();
+ rt.mark_acked(3, 3);
+ rt.mark_acked(8, 3);
+
+ rt.mark_acked(6, 3);
+
+ let mut canon = RangeTracker::default();
+ canon.used.insert(3, (8, RangeState::Acked));
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_acked` correctly handles all paths.
+ /// ```ignore
+ /// SSSSSSSS
+ /// + AAAA
+ /// = SSAAAASS
+ /// ```
+ #[test]
+ fn mark_acked_8() {
+ let mut rt = RangeTracker::default();
+ rt.mark_sent(0, 8);
+
+ rt.mark_acked(2, 4);
+
+ let mut canon = RangeTracker::default();
+ canon.used.insert(0, (2, RangeState::Sent));
+ canon.used.insert(2, (4, RangeState::Acked));
+ canon.used.insert(6, (2, RangeState::Sent));
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_acked` correctly handles all paths.
+ /// ```ignore
+ /// SSS
+ /// + AAA
+ /// = AAA SSS
+ /// ```
+ #[test]
+ fn mark_acked_9() {
+ let mut rt = RangeTracker::default();
+ rt.mark_sent(5, 3);
+
+ rt.mark_acked(0, 3);
+
+ let mut canon = RangeTracker {
+ acked: 3,
+ ..Default::default()
+ };
+ canon.used.insert(5, (3, RangeState::Sent));
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_sent` correctly handles all paths.
+ /// ```ignore
+ /// AAA AAA SSS
+ /// + SSSSSSSSSSSS
+ /// = AAASSSAAASSSSSS
+ /// ```
+ #[test]
+ fn mark_sent_1() {
+ let mut rt = RangeTracker::default();
+ rt.mark_acked(0, 3);
+ rt.mark_acked(6, 3);
+ rt.mark_sent(12, 3);
+
+ rt.mark_sent(0, 12);
+
+ let mut canon = RangeTracker {
+ acked: 3,
+ ..RangeTracker::default()
+ };
+ canon.used.insert(3, (3, RangeState::Sent));
+ canon.used.insert(6, (3, RangeState::Acked));
+ canon.used.insert(9, (6, RangeState::Sent));
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_sent` correctly handles all paths.
+ /// ```ignore
+ /// AAASS AAA S SSSS
+ /// + SSSSSSSSSSSSS
+ /// = AAASSSAAASSSSSSS
+ /// ```
+ #[test]
+ fn mark_sent_2() {
+ let mut rt = RangeTracker::default();
+ rt.mark_acked(0, 3);
+ rt.mark_sent(3, 2);
+ rt.mark_acked(6, 3);
+ rt.mark_sent(10, 1);
+ rt.mark_sent(12, 4);
+
+ rt.mark_sent(0, 13);
+
+ let mut canon = RangeTracker {
+ acked: 3,
+ ..RangeTracker::default()
+ };
+ canon.used.insert(3, (3, RangeState::Sent));
+ canon.used.insert(6, (3, RangeState::Acked));
+ canon.used.insert(9, (7, RangeState::Sent));
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_sent` correctly handles all paths.
+ /// ```ignore
+ /// AAA AAA
+ /// + SSSS
+ /// = AAASSAAA
+ /// ```
+ #[test]
+ fn mark_sent_3() {
+ let mut rt = RangeTracker::default();
+ rt.mark_acked(0, 3);
+ rt.mark_acked(5, 3);
+
+ rt.mark_sent(2, 4);
+
+ let mut canon = RangeTracker {
+ acked: 3,
+ ..RangeTracker::default()
+ };
+ canon.used.insert(3, (2, RangeState::Sent));
+ canon.used.insert(5, (3, RangeState::Acked));
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_sent` correctly handles all paths.
+ /// ```ignore
+ /// SSS AAA SS
+ /// + SSSSSSSS
+ /// = SSSSSAAASSSS
+ /// ```
+ #[test]
+ fn mark_sent_4() {
+ let mut rt = RangeTracker::default();
+ rt.mark_sent(0, 3);
+ rt.mark_acked(5, 3);
+ rt.mark_sent(10, 2);
+
+ rt.mark_sent(2, 8);
+
+ let mut canon = RangeTracker::default();
+ canon.used.insert(0, (5, RangeState::Sent));
+ canon.used.insert(5, (3, RangeState::Acked));
+ canon.used.insert(8, (4, RangeState::Sent));
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_sent` correctly handles all paths.
+ /// ```ignore
+ /// AAA
+ /// + SSSSSS
+ /// = AAASSS
+ /// ```
+ #[test]
+ fn mark_sent_5() {
+ let mut rt = RangeTracker::default();
+ rt.mark_acked(3, 3);
+
+ rt.mark_sent(3, 6);
+
+ let mut canon = RangeTracker::default();
+ canon.used.insert(3, (3, RangeState::Acked));
+ canon.used.insert(6, (3, RangeState::Sent));
+ assert_eq!(rt, canon);
+ }
+
+ /// Check that `marked_sent` correctly handles all paths.
+ /// ```ignore
+ /// SSSSS
+ /// + SSS
+ /// = SSSSS
+ /// ```
+ #[test]
+ fn mark_sent_6() {
+ let mut rt = RangeTracker::default();
+ rt.mark_sent(0, 5);
+
+ rt.mark_sent(1, 3);
+
+ let mut canon = RangeTracker::default();
+ canon.used.insert(0, (5, RangeState::Sent));
+ assert_eq!(rt, canon);
+ }
+
#[test]
fn unmark_sent_start() {
let mut rt = RangeTracker::default();
- rt.mark_range(0, 5, RangeState::Sent);
+ rt.mark_sent(0, 5);
assert_eq!(rt.highest_offset(), 5);
assert_eq!(rt.acked_from_zero(), 0);
@@ -1702,13 +2158,13 @@ mod tests {
fn unmark_sent_middle() {
let mut rt = RangeTracker::default();
- rt.mark_range(0, 5, RangeState::Acked);
+ rt.mark_acked(0, 5);
assert_eq!(rt.highest_offset(), 5);
assert_eq!(rt.acked_from_zero(), 5);
- rt.mark_range(5, 5, RangeState::Sent);
+ rt.mark_sent(5, 5);
assert_eq!(rt.highest_offset(), 10);
assert_eq!(rt.acked_from_zero(), 5);
- rt.mark_range(10, 5, RangeState::Acked);
+ rt.mark_acked(10, 5);
assert_eq!(rt.highest_offset(), 15);
assert_eq!(rt.acked_from_zero(), 5);
assert_eq!(rt.first_unmarked_range(), (15, None));
@@ -1723,10 +2179,10 @@ mod tests {
fn unmark_sent_end() {
let mut rt = RangeTracker::default();
- rt.mark_range(0, 5, RangeState::Acked);
+ rt.mark_acked(0, 5);
assert_eq!(rt.highest_offset(), 5);
assert_eq!(rt.acked_from_zero(), 5);
- rt.mark_range(5, 5, RangeState::Sent);
+ rt.mark_sent(5, 5);
assert_eq!(rt.highest_offset(), 10);
assert_eq!(rt.acked_from_zero(), 5);
assert_eq!(rt.first_unmarked_range(), (10, None));
@@ -1752,11 +2208,11 @@ mod tests {
}
#[test]
- fn test_unmark_range() {
+ fn unmark_range() {
let mut rt = RangeTracker::default();
- rt.mark_range(5, 5, RangeState::Acked);
- rt.mark_range(10, 5, RangeState::Sent);
+ rt.mark_acked(5, 5);
+ rt.mark_sent(10, 5);
// Should unmark sent but not acked range
rt.unmark_range(7, 6);
@@ -1772,11 +2228,11 @@ mod tests {
(&13, &(2, RangeState::Sent))
);
assert!(rt.used.iter().nth(2).is_none());
- rt.mark_range(0, 5, RangeState::Sent);
+ rt.mark_sent(0, 5);
let res = rt.first_unmarked_range();
assert_eq!(res, (10, Some(3)));
- rt.mark_range(10, 3, RangeState::Sent);
+ rt.mark_sent(10, 3);
let res = rt.first_unmarked_range();
assert_eq!(res, (15, None));
@@ -1790,14 +2246,15 @@ mod tests {
assert_eq!(txb.avail(), SEND_BUFFER_SIZE);
// Fill the buffer
- assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE);
+ let big_buf = vec![1; SEND_BUFFER_SIZE * 2];
+ assert_eq!(txb.send(&big_buf), SEND_BUFFER_SIZE);
assert!(matches!(txb.next_bytes(),
- Some((0, x)) if x.len()==SEND_BUFFER_SIZE
+ Some((0, x)) if x.len() == SEND_BUFFER_SIZE
&& x.iter().all(|ch| *ch == 1)));
// Mark almost all as sent. Get what's left
let one_byte_from_end = SEND_BUFFER_SIZE as u64 - 1;
- txb.mark_as_sent(0, one_byte_from_end as usize);
+ txb.mark_as_sent(0, usize::try_from(one_byte_from_end).unwrap());
assert!(matches!(txb.next_bytes(),
Some((start, x)) if x.len() == 1
&& start == one_byte_from_end
@@ -1826,14 +2283,14 @@ mod tests {
// Contig acked range at start means it can be removed from buffer
// Impl of vecdeque should now result in a split buffer when more data
// is sent
- txb.mark_as_acked(0, five_bytes_from_end as usize);
+ txb.mark_as_acked(0, usize::try_from(five_bytes_from_end).unwrap());
assert_eq!(txb.send(&[2; 30]), 30);
// Just get 5 even though there is more
assert!(matches!(txb.next_bytes(),
Some((start, x)) if x.len() == 5
&& start == five_bytes_from_end
&& x.iter().all(|ch| *ch == 1)));
- assert_eq!(txb.retired, five_bytes_from_end);
+ assert_eq!(txb.retired(), five_bytes_from_end);
assert_eq!(txb.buffered(), 35);
// Marking that bit as sent should let the last contig bit be returned
@@ -1852,7 +2309,8 @@ mod tests {
assert_eq!(txb.avail(), SEND_BUFFER_SIZE);
// Fill the buffer
- assert_eq!(txb.send(&[1; SEND_BUFFER_SIZE * 2]), SEND_BUFFER_SIZE);
+ let big_buf = vec![1; SEND_BUFFER_SIZE * 2];
+ assert_eq!(txb.send(&big_buf), SEND_BUFFER_SIZE);
assert!(matches!(txb.next_bytes(),
Some((0, x)) if x.len()==SEND_BUFFER_SIZE
&& x.iter().all(|ch| *ch == 1)));
@@ -1860,7 +2318,7 @@ mod tests {
// As above
let forty_bytes_from_end = SEND_BUFFER_SIZE as u64 - 40;
- txb.mark_as_acked(0, forty_bytes_from_end as usize);
+ txb.mark_as_acked(0, usize::try_from(forty_bytes_from_end).unwrap());
assert!(matches!(txb.next_bytes(),
Some((start, x)) if x.len() == 40
&& start == forty_bytes_from_end
@@ -1888,7 +2346,7 @@ mod tests {
// Ack entire first slice and into second slice
let ten_bytes_past_end = SEND_BUFFER_SIZE as u64 + 10;
- txb.mark_as_acked(0, ten_bytes_past_end as usize);
+ txb.mark_as_acked(0, usize::try_from(ten_bytes_past_end).unwrap());
// Get up to marked range A
assert!(matches!(txb.next_bytes(),
@@ -1910,7 +2368,7 @@ mod tests {
}
#[test]
- fn test_stream_tx() {
+ fn stream_tx() {
let conn_fc = connection_fc(4096);
let conn_events = ConnectionEvents::default();
@@ -1926,22 +2384,23 @@ mod tests {
}
// Should hit stream flow control limit before filling up send buffer
- let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap();
+ let big_buf = vec![4; SEND_BUFFER_SIZE + 100];
+ let res = s.send(&big_buf[..SEND_BUFFER_SIZE]).unwrap();
assert_eq!(res, 1024 - 100);
// should do nothing, max stream data already 1024
s.set_max_stream_data(1024);
- let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap();
+ let res = s.send(&big_buf[..SEND_BUFFER_SIZE]).unwrap();
assert_eq!(res, 0);
// should now hit the conn flow control (4096)
s.set_max_stream_data(1_048_576);
- let res = s.send(&[4; SEND_BUFFER_SIZE]).unwrap();
+ let res = s.send(&big_buf[..SEND_BUFFER_SIZE]).unwrap();
assert_eq!(res, 3072);
// should now hit the tx buffer size
conn_fc.borrow_mut().update(SEND_BUFFER_SIZE as u64);
- let res = s.send(&[4; SEND_BUFFER_SIZE + 100]).unwrap();
+ let res = s.send(&big_buf).unwrap();
assert_eq!(res, SEND_BUFFER_SIZE - 4096);
// TODO(agrover@mozilla.com): test ooo acks somehow
@@ -2012,10 +2471,8 @@ mod tests {
// tx buffer size.
assert_eq!(s.avail(), SEND_BUFFER_SIZE - 4);
- assert_eq!(
- s.send(&[b'a'; SEND_BUFFER_SIZE]).unwrap(),
- SEND_BUFFER_SIZE - 4
- );
+ let big_buf = vec![b'a'; SEND_BUFFER_SIZE];
+ assert_eq!(s.send(&big_buf).unwrap(), SEND_BUFFER_SIZE - 4);
// No event because still blocked by tx buffer full
s.set_max_stream_data(2_000_000_000);
@@ -2395,8 +2852,7 @@ mod tests {
);
let mut send_buf = TxBuffer::new();
- send_buf.retired = u64::try_from(offset).unwrap();
- send_buf.ranges.mark_range(0, offset, RangeState::Acked);
+ send_buf.ranges.mark_acked(0, offset);
let mut fc = SenderFlowControl::new(StreamId::from(stream), MAX_VARINT);
fc.consume(offset);
let conn_fc = Rc::new(RefCell::new(SenderFlowControl::new((), MAX_VARINT)));