summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-transport/src/recv_stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/neqo-transport/src/recv_stream.rs')
-rw-r--r--third_party/rust/neqo-transport/src/recv_stream.rs82
1 files changed, 55 insertions, 27 deletions
diff --git a/third_party/rust/neqo-transport/src/recv_stream.rs b/third_party/rust/neqo-transport/src/recv_stream.rs
index 06ca59685d..5da80d6004 100644
--- a/third_party/rust/neqo-transport/src/recv_stream.rs
+++ b/third_party/rust/neqo-transport/src/recv_stream.rs
@@ -11,7 +11,6 @@ use std::{
cell::RefCell,
cmp::max,
collections::BTreeMap,
- convert::TryFrom,
mem,
rc::{Rc, Weak},
};
@@ -34,6 +33,7 @@ use crate::{
const RX_STREAM_DATA_WINDOW: u64 = 0x10_0000; // 1MiB
// Export as usize for consistency with SEND_BUFFER_SIZE
+#[allow(clippy::cast_possible_truncation)] // Yeah, nope.
pub const RECV_BUFFER_SIZE: usize = RX_STREAM_DATA_WINDOW as usize;
#[derive(Debug, Default)]
@@ -130,6 +130,7 @@ pub struct RxStreamOrderer {
}
impl RxStreamOrderer {
+ #[must_use]
pub fn new() -> Self {
Self::default()
}
@@ -137,6 +138,9 @@ impl RxStreamOrderer {
/// Process an incoming stream frame off the wire. This may result in data
/// being available to upper layers if frame is not out of order (ooo) or
/// if the frame fills a gap.
+ /// # Panics
+ /// Only when `u64` values cannot be converted to `usize`, which only
+ /// happens on 32-bit machines that hold far too much data at the same time.
pub fn inbound_frame(&mut self, mut new_start: u64, mut new_data: &[u8]) {
qtrace!("Inbound data offset={} len={}", new_start, new_data.len());
@@ -275,6 +279,7 @@ impl RxStreamOrderer {
}
/// Are any bytes readable?
+ #[must_use]
pub fn data_ready(&self) -> bool {
self.data_ranges
.keys()
@@ -301,20 +306,24 @@ impl RxStreamOrderer {
false
}
})
- .map(|(_, data_len)| data_len as usize)
- .sum()
+ // Accumulate, but saturate at usize::MAX.
+ .fold(0, |acc: usize, (_, data_len)| {
+ acc.saturating_add(usize::try_from(data_len).unwrap_or(usize::MAX))
+ })
}
/// Bytes read by the application.
+ #[must_use]
pub fn retired(&self) -> u64 {
self.retired
}
+ #[must_use]
pub fn received(&self) -> u64 {
self.received
}
- /// Data bytes buffered. Could be more than bytes_readable if there are
+ /// Data bytes buffered. Could be more than `bytes_readable` if there are
/// ranges missing.
fn buffered(&self) -> u64 {
self.data_ranges
@@ -588,6 +597,7 @@ impl RecvStream {
self.state = new_state;
}
+ #[must_use]
pub fn stats(&self) -> RecvStreamStats {
match &self.state {
RecvStreamState::Recv { recv_buf, .. }
@@ -622,6 +632,11 @@ impl RecvStream {
}
}
+ /// # Errors
+ /// When the incoming data violates flow control limits.
+ /// # Panics
+ /// Only when `u64` values are so big that they can't fit in a `usize`, which
+ /// only happens on a 32-bit machine that has far too much unread data.
pub fn inbound_stream_frame(&mut self, fin: bool, offset: u64, data: &[u8]) -> Res<()> {
// We should post a DataReadable event only once when we change from no-data-ready to
// data-ready. Therefore remember the state before processing a new frame.
@@ -691,6 +706,8 @@ impl RecvStream {
Ok(())
}
+ /// # Errors
+ /// When the reset occurs at an invalid point.
pub fn reset(&mut self, application_error_code: AppError, final_size: u64) -> Res<()> {
self.state.flow_control_consume_data(final_size, true)?;
match &mut self.state {
@@ -773,6 +790,7 @@ impl RecvStream {
}
}
+ #[must_use]
pub fn is_terminal(&self) -> bool {
matches!(
self.state,
@@ -792,8 +810,8 @@ impl RecvStream {
}
/// # Errors
- ///
/// `NoMoreData` if data and fin bit were previously read by the application.
+ #[allow(clippy::missing_panics_doc)] // with a >16 exabyte packet on a 128-bit machine, maybe
pub fn read(&mut self, buf: &mut [u8]) -> Res<(usize, bool)> {
let data_recvd_state = matches!(self.state, RecvStreamState::DataRecvd { .. });
match &mut self.state {
@@ -967,6 +985,7 @@ impl RecvStream {
}
#[cfg(test)]
+ #[must_use]
pub fn has_frames_to_write(&self) -> bool {
if let RecvStreamState::Recv { fc, .. } = &self.state {
fc.frame_needed()
@@ -976,6 +995,7 @@ impl RecvStream {
}
#[cfg(test)]
+ #[must_use]
pub fn fc(&self) -> Option<&ReceiverFlowControl<StreamId>> {
match &self.state {
RecvStreamState::Recv { fc, .. }
@@ -990,11 +1010,18 @@ impl RecvStream {
#[cfg(test)]
mod tests {
- use std::ops::Range;
+ use std::{cell::RefCell, ops::Range, rc::Rc};
- use neqo_common::Encoder;
+ use neqo_common::{qtrace, Encoder};
- use super::*;
+ use super::RecvStream;
+ use crate::{
+ fc::ReceiverFlowControl,
+ packet::PacketBuilder,
+ recv_stream::{RxStreamOrderer, RX_STREAM_DATA_WINDOW},
+ stats::FrameStats,
+ ConnectionEvents, Error, StreamId, RECV_BUFFER_SIZE,
+ };
const SESSION_WINDOW: usize = 1024;
@@ -1444,8 +1471,8 @@ mod tests {
let mut buf = vec![0u8; RECV_BUFFER_SIZE + 100]; // Make it overlarge
assert!(!s.has_frames_to_write());
- s.inbound_stream_frame(false, 0, &[0; RECV_BUFFER_SIZE])
- .unwrap();
+ let big_buf = vec![0; RECV_BUFFER_SIZE];
+ s.inbound_stream_frame(false, 0, &big_buf).unwrap();
assert!(!s.has_frames_to_write());
assert_eq!(s.read(&mut buf).unwrap(), (RECV_BUFFER_SIZE, false));
assert!(!s.data_ready());
@@ -1476,8 +1503,8 @@ mod tests {
fn stream_max_stream_data() {
let mut s = create_stream(1024 * RX_STREAM_DATA_WINDOW);
assert!(!s.has_frames_to_write());
- s.inbound_stream_frame(false, 0, &[0; RECV_BUFFER_SIZE])
- .unwrap();
+ let big_buf = vec![0; RECV_BUFFER_SIZE];
+ s.inbound_stream_frame(false, 0, &big_buf).unwrap();
s.inbound_stream_frame(false, RX_STREAM_DATA_WINDOW, &[1; 1])
.unwrap_err();
}
@@ -1520,9 +1547,10 @@ mod tests {
#[test]
fn no_stream_flowc_event_after_exiting_recv() {
let mut s = create_stream(1024 * RX_STREAM_DATA_WINDOW);
- s.inbound_stream_frame(false, 0, &[0; RECV_BUFFER_SIZE])
- .unwrap();
- let mut buf = [0; RECV_BUFFER_SIZE];
+ let mut buf = vec![0; RECV_BUFFER_SIZE];
+ // Write from buf at first.
+ s.inbound_stream_frame(false, 0, &buf).unwrap();
+ // Then read into it.
s.read(&mut buf).unwrap();
assert!(s.has_frames_to_write());
s.inbound_stream_frame(true, RX_STREAM_DATA_WINDOW, &[])
@@ -1634,7 +1662,7 @@ mod tests {
assert_eq!(fc.retired(), retired);
}
- /// Test consuming the flow control in RecvStreamState::Recv
+ /// Test consuming the flow control in `RecvStreamState::Recv`
#[test]
fn fc_state_recv_1() {
const SW: u64 = 1024;
@@ -1651,7 +1679,7 @@ mod tests {
check_fc(s.fc().unwrap(), SW / 4, 0);
}
- /// Test consuming the flow control in RecvStreamState::Recv
+ /// Test consuming the flow control in `RecvStreamState::Recv`
/// with multiple streams
#[test]
fn fc_state_recv_2() {
@@ -1678,7 +1706,7 @@ mod tests {
check_fc(s2.fc().unwrap(), SW / 4, 0);
}
- /// Test retiring the flow control in RecvStreamState::Recv
+ /// Test retiring the flow control in `RecvStreamState::Recv`
/// with multiple streams
#[test]
fn fc_state_recv_3() {
@@ -1730,7 +1758,7 @@ mod tests {
check_fc(s2.fc().unwrap(), SW / 4, SW / 4);
}
- /// Test consuming the flow control in RecvStreamState::Recv - duplicate data
+ /// Test consuming the flow control in `RecvStreamState::Recv` - duplicate data
#[test]
fn fc_state_recv_4() {
const SW: u64 = 1024;
@@ -1753,7 +1781,7 @@ mod tests {
check_fc(s.fc().unwrap(), SW / 4, 0);
}
- /// Test consuming the flow control in RecvStreamState::Recv - filling a gap in the
+ /// Test consuming the flow control in `RecvStreamState::Recv` - filling a gap in the
/// data stream.
#[test]
fn fc_state_recv_5() {
@@ -1774,7 +1802,7 @@ mod tests {
check_fc(s.fc().unwrap(), SW / 4, 0);
}
- /// Test consuming the flow control in RecvStreamState::Recv - receiving frame past
+ /// Test consuming the flow control in `RecvStreamState::Recv` - receiving frame past
/// the flow control will cause an error.
#[test]
fn fc_state_recv_6() {
@@ -1859,7 +1887,7 @@ mod tests {
assert_eq!(stats.max_stream_data, 1);
}
- /// Test flow control in RecvStreamState::SizeKnown
+ /// Test flow control in `RecvStreamState::SizeKnown`
#[test]
fn fc_state_size_known() {
const SW: u64 = 1024;
@@ -1916,7 +1944,7 @@ mod tests {
assert!(s.fc().is_none());
}
- /// Test flow control in RecvStreamState::DataRecvd
+ /// Test flow control in `RecvStreamState::DataRecvd`
#[test]
fn fc_state_data_recv() {
const SW: u64 = 1024;
@@ -1961,7 +1989,7 @@ mod tests {
assert!(s.fc().is_none());
}
- /// Test flow control in RecvStreamState::DataRead
+ /// Test flow control in `RecvStreamState::DataRead`
#[test]
fn fc_state_data_read() {
const SW: u64 = 1024;
@@ -1999,7 +2027,7 @@ mod tests {
assert!(s.fc().is_none());
}
- /// Test flow control in RecvStreamState::AbortReading and final size is known
+ /// Test flow control in `RecvStreamState::AbortReading` and final size is known
#[test]
fn fc_state_abort_reading_1() {
const SW: u64 = 1024;
@@ -2041,7 +2069,7 @@ mod tests {
check_fc(s.fc().unwrap(), SW / 2, SW / 2);
}
- /// Test flow control in RecvStreamState::AbortReading and final size is unknown
+ /// Test flow control in `RecvStreamState::AbortReading` and final size is unknown
#[test]
fn fc_state_abort_reading_2() {
const SW: u64 = 1024;
@@ -2099,7 +2127,7 @@ mod tests {
check_fc(s.fc().unwrap(), SW / 2 + 20, SW / 2 + 20);
}
- /// Test flow control in RecvStreamState::WaitForReset
+ /// Test flow control in `RecvStreamState::WaitForReset`
#[test]
fn fc_state_wait_for_reset() {
const SW: u64 = 1024;