diff options
Diffstat (limited to 'third_party/rust/neqo-transport/src/recv_stream.rs')
-rw-r--r-- | third_party/rust/neqo-transport/src/recv_stream.rs | 1110 |
1 files changed, 1110 insertions, 0 deletions
diff --git a/third_party/rust/neqo-transport/src/recv_stream.rs b/third_party/rust/neqo-transport/src/recv_stream.rs new file mode 100644 index 0000000000..e82e08d2ec --- /dev/null +++ b/third_party/rust/neqo-transport/src/recv_stream.rs @@ -0,0 +1,1110 @@ +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +// Building a stream of ordered bytes to give the application from a series of +// incoming STREAM frames. + +use std::cell::RefCell; +use std::cmp::max; +use std::collections::BTreeMap; +use std::convert::TryFrom; +use std::mem; +use std::ops::Bound::{Included, Unbounded}; +use std::rc::Rc; + +use smallvec::SmallVec; + +use crate::events::ConnectionEvents; +use crate::flow_mgr::FlowMgr; +use crate::stream_id::StreamId; +use crate::{AppError, Error, Res}; +use neqo_common::qtrace; + +const RX_STREAM_DATA_WINDOW: u64 = 0x10_0000; // 1MiB + +// Export as usize for consistency with SEND_BUFFER_SIZE +pub const RECV_BUFFER_SIZE: usize = RX_STREAM_DATA_WINDOW as usize; + +pub(crate) type RecvStreams = BTreeMap<StreamId, RecvStream>; + +/// Holds data not yet read by application. Orders and dedupes data ranges +/// from incoming STREAM frames. +#[derive(Debug, Default)] +pub struct RxStreamOrderer { + data_ranges: BTreeMap<u64, Vec<u8>>, // (start_offset, data) + retired: u64, // Number of bytes the application has read +} + +impl RxStreamOrderer { + pub fn new() -> Self { + Self::default() + } + + /// Process an incoming stream frame off the wire. This may result in data + /// being available to upper layers if frame is not out of order (ooo) or + /// if the frame fills a gap. + pub fn inbound_frame(&mut self, mut new_start: u64, mut new_data: &[u8]) { + qtrace!("Inbound data offset={} len={}", new_start, new_data.len()); + + // Get entry before where new entry would go, so we can see if we already + // have the new bytes. + // Avoid copies and duplicated data. + let new_end = new_start + u64::try_from(new_data.len()).unwrap(); + + if new_end <= self.retired { + // Range already read by application, this frame is very late and unneeded. + return; + } + + if new_start < self.retired { + new_data = &new_data[usize::try_from(self.retired - new_start).unwrap()..]; + new_start = self.retired; + } + + if new_data.is_empty() { + // No data to insert + return; + } + + let extend = if let Some((&prev_start, prev_vec)) = self + .data_ranges + .range_mut((Unbounded, Included(new_start))) + .next_back() + { + let prev_end = prev_start + u64::try_from(prev_vec.len()).unwrap(); + if new_end > prev_end { + // PPPPPP -> PPPPPP + // NNNNNN NN + // NNNNNNNN NN + // Add a range containing only new data + // (In-order frames will take this path, with no overlap) + let overlap = prev_end.saturating_sub(new_start); + qtrace!( + "New frame {}-{} received, overlap: {}", + new_start, + new_end, + overlap + ); + new_start += overlap; + new_data = &new_data[usize::try_from(overlap).unwrap()..]; + // If it is small enough, extend the previous buffer. + // This can't always extend, because otherwise the buffer could end up + // growing indefinitely without being released. + prev_vec.len() < 4096 && prev_end == new_start + } else { + // PPPPPP -> PPPPPP + // NNNN + // NNNN + // Do nothing + qtrace!( + "Dropping frame with already-received range {}-{}", + new_start, + new_end + ); + return; + } + } else { + qtrace!("New frame {}-{} received", new_start, new_end); + false + }; + + // Now handle possible overlap with next entries + let mut to_remove = SmallVec::<[_; 8]>::new(); + let mut to_add = new_data; + + for (&next_start, next_data) in self.data_ranges.range_mut(new_start..) { + let next_end = next_start + u64::try_from(next_data.len()).unwrap(); + let overlap = new_end.saturating_sub(next_start); + if overlap == 0 { + break; + } else if next_end >= new_end { + qtrace!( + "New frame {}-{} overlaps with next frame by {}, truncating", + new_start, + new_end, + overlap + ); + let truncate_to = new_data.len() - usize::try_from(overlap).unwrap(); + to_add = &new_data[..truncate_to]; + break; + } else { + qtrace!( + "New frame {}-{} spans entire next frame {}-{}, replacing", + new_start, + new_end, + next_start, + next_end + ); + to_remove.push(next_start); + } + } + + for start in to_remove { + self.data_ranges.remove(&start); + } + + if !to_add.is_empty() { + if extend { + let (_, buf) = self + .data_ranges + .range_mut((Unbounded, Included(new_start))) + .next_back() + .unwrap(); + buf.extend_from_slice(to_add); + } else { + self.data_ranges.insert(new_start, to_add.to_vec()); + } + } + } + + /// Are any bytes readable? + pub fn data_ready(&self) -> bool { + self.data_ranges + .keys() + .next() + .map_or(false, |&start| start <= self.retired) + } + + /// How many bytes are readable? + fn bytes_ready(&self) -> usize { + let mut prev_end = self.retired; + self.data_ranges + .iter() + .map(|(start_offset, data)| { + // All ranges don't overlap but we could have partially + // retired some of the first entry's data. + let data_len = data.len() as u64 - self.retired.saturating_sub(*start_offset); + (start_offset, data_len) + }) + .take_while(|(start_offset, data_len)| { + if **start_offset <= prev_end { + prev_end += data_len; + true + } else { + false + } + }) + .map(|(_, data_len)| data_len as usize) + .sum() + } + + /// Bytes read by the application. + fn retired(&self) -> u64 { + self.retired + } + + /// Data bytes buffered. Could be more than bytes_readable if there are + /// ranges missing. + fn buffered(&self) -> u64 { + self.data_ranges + .iter() + .map(|(&start, data)| data.len() as u64 - (self.retired.saturating_sub(start))) + .sum() + } + + /// Copy received data (if any) into the buffer. Returns bytes copied. + fn read(&mut self, buf: &mut [u8]) -> usize { + qtrace!("Reading {} bytes, {} available", buf.len(), self.buffered()); + let mut copied = 0; + + for (&range_start, range_data) in &mut self.data_ranges { + let mut keep = false; + if self.retired >= range_start { + // Frame data has new contiguous bytes. + let copy_offset = + usize::try_from(max(range_start, self.retired) - range_start).unwrap(); + assert!(range_data.len() >= copy_offset); + let available = range_data.len() - copy_offset; + let space = buf.len() - copied; + let copy_bytes = if available > space { + keep = true; + space + } else { + available + }; + + if copy_bytes > 0 { + let copy_slc = &range_data[copy_offset..copy_offset + copy_bytes]; + buf[copied..copied + copy_bytes].copy_from_slice(copy_slc); + copied += copy_bytes; + self.retired += u64::try_from(copy_bytes).unwrap(); + } + } else { + // The data in the buffer isn't contiguous. + keep = true; + } + if keep { + let mut keep = self.data_ranges.split_off(&range_start); + mem::swap(&mut self.data_ranges, &mut keep); + return copied; + } + } + + self.data_ranges.clear(); + copied + } + + /// Extend the given Vector with any available data. + pub fn read_to_end(&mut self, buf: &mut Vec<u8>) -> usize { + let orig_len = buf.len(); + buf.resize(orig_len + self.bytes_ready(), 0); + self.read(&mut buf[orig_len..]) + } + + fn highest_seen_offset(&self) -> u64 { + let maybe_ooo_last = self + .data_ranges + .iter() + .next_back() + .map(|(start, data)| *start + data.len() as u64); + maybe_ooo_last.unwrap_or(self.retired) + } +} + +/// QUIC receiving states, based on -transport 3.2. +#[derive(Debug)] +#[allow(dead_code)] +// Because a dead_code warning is easier than clippy::unused_self, see https://github.com/rust-lang/rust/issues/68408 +enum RecvStreamState { + Recv { + recv_buf: RxStreamOrderer, + max_bytes: u64, // Maximum size of recv_buf + max_stream_data: u64, + }, + SizeKnown { + recv_buf: RxStreamOrderer, + final_size: u64, + }, + DataRecvd { + recv_buf: RxStreamOrderer, + }, + DataRead, + ResetRecvd, + // Defined by spec but we don't use it: ResetRead +} + +impl RecvStreamState { + fn new(max_bytes: u64) -> Self { + Self::Recv { + recv_buf: RxStreamOrderer::new(), + max_bytes, + max_stream_data: max_bytes, + } + } + + fn name(&self) -> &str { + match self { + Self::Recv { .. } => "Recv", + Self::SizeKnown { .. } => "SizeKnown", + Self::DataRecvd { .. } => "DataRecvd", + Self::DataRead => "DataRead", + Self::ResetRecvd => "ResetRecvd", + } + } + + fn recv_buf(&self) -> Option<&RxStreamOrderer> { + match self { + Self::Recv { recv_buf, .. } + | Self::SizeKnown { recv_buf, .. } + | Self::DataRecvd { recv_buf } => Some(recv_buf), + Self::DataRead | Self::ResetRecvd => None, + } + } + + fn final_size(&self) -> Option<u64> { + match self { + Self::SizeKnown { final_size, .. } => Some(*final_size), + _ => None, + } + } + + fn max_stream_data(&self) -> Option<u64> { + match self { + Self::Recv { + max_stream_data, .. + } => Some(*max_stream_data), + _ => None, + } + } +} + +/// Implement a QUIC receive stream. +#[derive(Debug)] +pub struct RecvStream { + stream_id: StreamId, + state: RecvStreamState, + flow_mgr: Rc<RefCell<FlowMgr>>, + conn_events: ConnectionEvents, +} + +impl RecvStream { + pub fn new( + stream_id: StreamId, + max_stream_data: u64, + flow_mgr: Rc<RefCell<FlowMgr>>, + conn_events: ConnectionEvents, + ) -> Self { + Self { + stream_id, + state: RecvStreamState::new(max_stream_data), + flow_mgr, + conn_events, + } + } + + fn set_state(&mut self, new_state: RecvStreamState) { + debug_assert_ne!( + mem::discriminant(&self.state), + mem::discriminant(&new_state) + ); + qtrace!( + "RecvStream {} state {} -> {}", + self.stream_id.as_u64(), + self.state.name(), + new_state.name() + ); + + if let RecvStreamState::Recv { .. } = &self.state { + self.flow_mgr + .borrow_mut() + .clear_max_stream_data(self.stream_id) + } + + if let RecvStreamState::DataRead = new_state { + self.conn_events.recv_stream_complete(self.stream_id); + } + + self.state = new_state; + } + + pub fn inbound_stream_frame(&mut self, fin: bool, offset: u64, data: &[u8]) -> Res<()> { + // We should post a DataReadable event only once when we change from no-data-ready to + // data-ready. Therefore remember the state before processing a new frame. + let already_data_ready = self.data_ready(); + let new_end = offset + u64::try_from(data.len()).unwrap(); + + // Send final size errors even if stream is closed + if let Some(final_size) = self.state.final_size() { + if new_end > final_size || (fin && new_end != final_size) { + return Err(Error::FinalSizeError); + } + } + + match &mut self.state { + RecvStreamState::Recv { + recv_buf, + max_stream_data, + .. + } => { + if new_end > *max_stream_data { + qtrace!("Stream RX window {} exceeded: {}", max_stream_data, new_end); + return Err(Error::FlowControlError); + } + + if fin { + let final_size = offset + data.len() as u64; + if final_size < recv_buf.highest_seen_offset() { + return Err(Error::FinalSizeError); + } + recv_buf.inbound_frame(offset, data); + + let buf = mem::replace(recv_buf, RxStreamOrderer::new()); + if final_size == buf.retired() + buf.bytes_ready() as u64 { + self.set_state(RecvStreamState::DataRecvd { recv_buf: buf }); + } else { + self.set_state(RecvStreamState::SizeKnown { + recv_buf: buf, + final_size, + }); + } + } else { + recv_buf.inbound_frame(offset, data); + } + } + RecvStreamState::SizeKnown { + recv_buf, + final_size, + } => { + recv_buf.inbound_frame(offset, data); + if *final_size == recv_buf.retired() + recv_buf.bytes_ready() as u64 { + let buf = mem::replace(recv_buf, RxStreamOrderer::new()); + self.set_state(RecvStreamState::DataRecvd { recv_buf: buf }); + } + } + RecvStreamState::DataRecvd { .. } + | RecvStreamState::DataRead + | RecvStreamState::ResetRecvd => { + qtrace!("data received when we are in state {}", self.state.name()) + } + } + + if !already_data_ready && (self.data_ready() || self.needs_to_inform_app_about_fin()) { + self.conn_events.recv_stream_readable(self.stream_id) + } + + Ok(()) + } + + pub fn reset(&mut self, application_error_code: AppError) { + match self.state { + RecvStreamState::Recv { .. } | RecvStreamState::SizeKnown { .. } => { + self.conn_events + .recv_stream_reset(self.stream_id, application_error_code); + self.set_state(RecvStreamState::ResetRecvd); + } + _ => { + // Ignore reset if in DataRecvd, DataRead, or ResetRecvd + } + } + } + + /// If we should tell the sender they have more credit, return an offset + pub fn maybe_send_flowc_update(&mut self) { + // Only ever needed if actively receiving and not in SizeKnown state + if let RecvStreamState::Recv { + max_bytes, + max_stream_data, + recv_buf, + } = &mut self.state + { + // Algo: send an update if app has consumed more than half + // the data in the current window + // TODO(agrover@mozilla.com): This algo is not great but + // should prevent Silly Window Syndrome. Spec refers to using + // highest seen offset somehow? RTT maybe? + let maybe_new_max = recv_buf.retired() + *max_bytes; + if maybe_new_max > (*max_bytes / 2) + *max_stream_data { + *max_stream_data = maybe_new_max; + self.flow_mgr + .borrow_mut() + .max_stream_data(self.stream_id, maybe_new_max) + } + } + } + + pub fn max_stream_data(&self) -> Option<u64> { + self.state.max_stream_data() + } + + pub fn is_terminal(&self) -> bool { + matches!( + self.state, + RecvStreamState::ResetRecvd | RecvStreamState::DataRead + ) + } + + // App got all data but did not get the fin signal. + fn needs_to_inform_app_about_fin(&self) -> bool { + matches!(self.state, RecvStreamState::DataRecvd { .. }) + } + + fn data_ready(&self) -> bool { + self.state + .recv_buf() + .map_or(false, RxStreamOrderer::data_ready) + } + + /// # Errors + /// `NoMoreData` if data and fin bit were previously read by the application. + pub fn read(&mut self, buf: &mut [u8]) -> Res<(usize, bool)> { + let res = match &mut self.state { + RecvStreamState::Recv { recv_buf, .. } + | RecvStreamState::SizeKnown { recv_buf, .. } => Ok((recv_buf.read(buf), false)), + RecvStreamState::DataRecvd { recv_buf } => { + let bytes_read = recv_buf.read(buf); + let fin_read = recv_buf.buffered() == 0; + if fin_read { + self.set_state(RecvStreamState::DataRead); + } + Ok((bytes_read, fin_read)) + } + RecvStreamState::DataRead | RecvStreamState::ResetRecvd => Err(Error::NoMoreData), + }; + self.maybe_send_flowc_update(); + res + } + + pub fn stop_sending(&mut self, err: AppError) { + qtrace!("stop_sending called when in state {}", self.state.name()); + match &self.state { + RecvStreamState::Recv { .. } | RecvStreamState::SizeKnown { .. } => { + self.set_state(RecvStreamState::ResetRecvd); + self.flow_mgr.borrow_mut().stop_sending(self.stream_id, err) + } + RecvStreamState::DataRecvd { .. } => self.set_state(RecvStreamState::DataRead), + RecvStreamState::DataRead | RecvStreamState::ResetRecvd => { + // Already in terminal state + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::frame::Frame; + use std::ops::Range; + + fn recv_ranges(ranges: &[Range<u64>], available: usize) { + const ZEROES: &[u8] = &[0; 100]; + qtrace!("recv_ranges {:?}", ranges); + + let mut s = RxStreamOrderer::default(); + for r in ranges { + let data = &ZEROES[..usize::try_from(r.end - r.start).unwrap()]; + s.inbound_frame(r.start, data); + } + + let mut buf = [0xff; 100]; + let mut total_recvd = 0; + loop { + let recvd = s.read(&mut buf[..]); + qtrace!("recv_ranges read {}", recvd); + total_recvd += recvd; + if recvd == 0 { + assert_eq!(total_recvd, available); + break; + } + } + } + + #[test] + fn recv_noncontiguous() { + // Non-contiguous with the start, no data available. + recv_ranges(&[10..20], 0); + } + + /// Overlaps with the start of a 10..20 range of bytes. + #[test] + fn recv_overlap_start() { + // Overlap the start, with a larger new value. + // More overlap than not. + recv_ranges(&[10..20, 4..18, 0..4], 20); + // Overlap the start, with a larger new value. + // Less overlap than not. + recv_ranges(&[10..20, 2..15, 0..2], 20); + // Overlap the start, with a smaller new value. + // More overlap than not. + recv_ranges(&[10..20, 8..14, 0..8], 20); + // Overlap the start, with a smaller new value. + // Less overlap than not. + recv_ranges(&[10..20, 6..13, 0..6], 20); + + // Again with some of the first range split in two. + recv_ranges(&[10..11, 11..20, 4..18, 0..4], 20); + recv_ranges(&[10..11, 11..20, 2..15, 0..2], 20); + recv_ranges(&[10..11, 11..20, 8..14, 0..8], 20); + recv_ranges(&[10..11, 11..20, 6..13, 0..6], 20); + + // Again with a gap in the first range. + recv_ranges(&[10..11, 12..20, 4..18, 0..4], 20); + recv_ranges(&[10..11, 12..20, 2..15, 0..2], 20); + recv_ranges(&[10..11, 12..20, 8..14, 0..8], 20); + recv_ranges(&[10..11, 12..20, 6..13, 0..6], 20); + } + + /// Overlaps with the end of a 10..20 range of bytes. + #[test] + fn recv_overlap_end() { + // Overlap the end, with a larger new value. + // More overlap than not. + recv_ranges(&[10..20, 12..25, 0..10], 25); + // Overlap the end, with a larger new value. + // Less overlap than not. + recv_ranges(&[10..20, 17..33, 0..10], 33); + // Overlap the end, with a smaller new value. + // More overlap than not. + recv_ranges(&[10..20, 15..21, 0..10], 21); + // Overlap the end, with a smaller new value. + // Less overlap than not. + recv_ranges(&[10..20, 17..25, 0..10], 25); + + // Again with some of the first range split in two. + recv_ranges(&[10..19, 19..20, 12..25, 0..10], 25); + recv_ranges(&[10..19, 19..20, 17..33, 0..10], 33); + recv_ranges(&[10..19, 19..20, 15..21, 0..10], 21); + recv_ranges(&[10..19, 19..20, 17..25, 0..10], 25); + + // Again with a gap in the first range. + recv_ranges(&[10..18, 19..20, 12..25, 0..10], 25); + recv_ranges(&[10..18, 19..20, 17..33, 0..10], 33); + recv_ranges(&[10..18, 19..20, 15..21, 0..10], 21); + recv_ranges(&[10..18, 19..20, 17..25, 0..10], 25); + } + + /// Complete overlaps with the start of a 10..20 range of bytes. + #[test] + fn recv_overlap_complete() { + // Complete overlap, more at the end. + recv_ranges(&[10..20, 9..23, 0..9], 23); + // Complete overlap, more at the start. + recv_ranges(&[10..20, 3..23, 0..3], 23); + // Complete overlap, to end. + recv_ranges(&[10..20, 5..20, 0..5], 20); + // Complete overlap, from start. + recv_ranges(&[10..20, 10..27, 0..10], 27); + // Complete overlap, from 0 and more. + recv_ranges(&[10..20, 0..23], 23); + + // Again with the first range split in two. + recv_ranges(&[10..14, 14..20, 9..23, 0..9], 23); + recv_ranges(&[10..14, 14..20, 3..23, 0..3], 23); + recv_ranges(&[10..14, 14..20, 5..20, 0..5], 20); + recv_ranges(&[10..14, 14..20, 10..27, 0..10], 27); + recv_ranges(&[10..14, 14..20, 0..23], 23); + + // Again with the a gap in the first range. + recv_ranges(&[10..13, 14..20, 9..23, 0..9], 23); + recv_ranges(&[10..13, 14..20, 3..23, 0..3], 23); + recv_ranges(&[10..13, 14..20, 5..20, 0..5], 20); + recv_ranges(&[10..13, 14..20, 10..27, 0..10], 27); + recv_ranges(&[10..13, 14..20, 0..23], 23); + } + + /// An overlap with no new bytes. + #[test] + fn recv_overlap_duplicate() { + recv_ranges(&[10..20, 11..12, 0..10], 20); + recv_ranges(&[10..20, 10..15, 0..10], 20); + recv_ranges(&[10..20, 14..20, 0..10], 20); + // Now with the first range split. + recv_ranges(&[10..14, 14..20, 10..15, 0..10], 20); + recv_ranges(&[10..15, 16..20, 21..25, 10..25, 0..10], 25); + } + + /// Reading exactly one chunk works, when the next chunk starts immediately. + #[test] + fn stop_reading_at_chunk() { + const CHUNK_SIZE: usize = 10; + const EXTRA_SIZE: usize = 3; + let mut s = RxStreamOrderer::new(); + + // Add three chunks. + s.inbound_frame(0, &[0; CHUNK_SIZE]); + let offset = u64::try_from(CHUNK_SIZE).unwrap(); + s.inbound_frame(offset, &[0; EXTRA_SIZE]); + let offset = u64::try_from(CHUNK_SIZE + EXTRA_SIZE).unwrap(); + s.inbound_frame(offset, &[0; EXTRA_SIZE]); + + // Read, providing only enough space for the first. + let mut buf = vec![0; 100]; + let count = s.read(&mut buf[..CHUNK_SIZE]); + assert_eq!(count, CHUNK_SIZE); + let count = s.read(&mut buf[..]); + assert_eq!(count, EXTRA_SIZE * 2); + } + + #[test] + fn recv_overlap_while_reading() { + let mut s = RxStreamOrderer::new(); + + // Add a chunk + s.inbound_frame(0, &[0; 150]); + assert_eq!(s.data_ranges.get(&0).unwrap().len(), 150); + // Read, providing only enough space for the first 100. + let mut buf = [0; 100]; + let count = s.read(&mut buf[..]); + assert_eq!(count, 100); + assert_eq!(s.retired, 100); + + // Add a second frame that overlaps. + // This shouldn't truncate the first frame, as we're already + // Reading from it. + s.inbound_frame(120, &[0; 60]); + assert_eq!(s.data_ranges.get(&0).unwrap().len(), 180); + // Read second part of first frame and all of the second frame + let count = s.read(&mut buf[..]); + assert_eq!(count, 80); + } + + /// Reading exactly one chunk works, when there is a gap. + #[test] + fn stop_reading_at_gap() { + const CHUNK_SIZE: usize = 10; + const EXTRA_SIZE: usize = 3; + let mut s = RxStreamOrderer::new(); + + // Add three chunks. + s.inbound_frame(0, &[0; CHUNK_SIZE]); + let offset = u64::try_from(CHUNK_SIZE + EXTRA_SIZE).unwrap(); + s.inbound_frame(offset, &[0; EXTRA_SIZE]); + + // Read, providing only enough space for the first chunk. + let mut buf = [0; 100]; + let count = s.read(&mut buf[..CHUNK_SIZE]); + assert_eq!(count, CHUNK_SIZE); + + // Now fill the gap and ensure that everything can be read. + let offset = u64::try_from(CHUNK_SIZE).unwrap(); + s.inbound_frame(offset, &[0; EXTRA_SIZE]); + let count = s.read(&mut buf[..]); + assert_eq!(count, EXTRA_SIZE * 2); + } + + /// Reading exactly one chunk works, when there is a gap. + #[test] + fn stop_reading_in_chunk() { + const CHUNK_SIZE: usize = 10; + const EXTRA_SIZE: usize = 3; + let mut s = RxStreamOrderer::new(); + + // Add two chunks. + s.inbound_frame(0, &[0; CHUNK_SIZE]); + let offset = u64::try_from(CHUNK_SIZE).unwrap(); + s.inbound_frame(offset, &[0; EXTRA_SIZE]); + + // Read, providing only enough space for some of the first chunk. + let mut buf = [0; 100]; + let count = s.read(&mut buf[..CHUNK_SIZE - EXTRA_SIZE]); + assert_eq!(count, CHUNK_SIZE - EXTRA_SIZE); + + let count = s.read(&mut buf[..]); + assert_eq!(count, EXTRA_SIZE * 2); + } + + /// Read one byte at a time. + #[test] + fn read_byte_at_a_time() { + const CHUNK_SIZE: usize = 10; + const EXTRA_SIZE: usize = 3; + let mut s = RxStreamOrderer::new(); + + // Add two chunks. + s.inbound_frame(0, &[0; CHUNK_SIZE]); + let offset = u64::try_from(CHUNK_SIZE).unwrap(); + s.inbound_frame(offset, &[0; EXTRA_SIZE]); + + let mut buf = [0; 1]; + for _ in 0..CHUNK_SIZE + EXTRA_SIZE { + let count = s.read(&mut buf[..]); + assert_eq!(count, 1); + } + assert_eq!(0, s.read(&mut buf[..])); + } + + #[test] + fn stream_rx() { + let flow_mgr = Rc::new(RefCell::new(FlowMgr::default())); + let conn_events = ConnectionEvents::default(); + + let mut s = RecvStream::new(StreamId::from(567), 1024, Rc::clone(&flow_mgr), conn_events); + + // test receiving a contig frame and reading it works + s.inbound_stream_frame(false, 0, &[1; 10]).unwrap(); + assert_eq!(s.data_ready(), true); + let mut buf = vec![0u8; 100]; + assert_eq!(s.read(&mut buf).unwrap(), (10, false)); + assert_eq!(s.state.recv_buf().unwrap().retired(), 10); + assert_eq!(s.state.recv_buf().unwrap().buffered(), 0); + + // test receiving a noncontig frame + s.inbound_stream_frame(false, 12, &[2; 12]).unwrap(); + assert_eq!(s.data_ready(), false); + assert_eq!(s.read(&mut buf).unwrap(), (0, false)); + assert_eq!(s.state.recv_buf().unwrap().retired(), 10); + assert_eq!(s.state.recv_buf().unwrap().buffered(), 12); + + // another frame that overlaps the first + s.inbound_stream_frame(false, 14, &[3; 8]).unwrap(); + assert_eq!(s.data_ready(), false); + assert_eq!(s.state.recv_buf().unwrap().retired(), 10); + assert_eq!(s.state.recv_buf().unwrap().buffered(), 12); + + // fill in the gap, but with a FIN + s.inbound_stream_frame(true, 10, &[4; 6]).unwrap_err(); + assert_eq!(s.data_ready(), false); + assert_eq!(s.read(&mut buf).unwrap(), (0, false)); + assert_eq!(s.state.recv_buf().unwrap().retired(), 10); + assert_eq!(s.state.recv_buf().unwrap().buffered(), 12); + + // fill in the gap + s.inbound_stream_frame(false, 10, &[5; 10]).unwrap(); + assert_eq!(s.data_ready(), true); + assert_eq!(s.state.recv_buf().unwrap().retired(), 10); + assert_eq!(s.state.recv_buf().unwrap().buffered(), 14); + + // a legit FIN + s.inbound_stream_frame(true, 24, &[6; 18]).unwrap(); + assert_eq!(s.state.recv_buf().unwrap().retired(), 10); + assert_eq!(s.state.recv_buf().unwrap().buffered(), 32); + assert_eq!(s.data_ready(), true); + assert_eq!(s.read(&mut buf).unwrap(), (32, true)); + + // Stream now no longer readable (is in DataRead state) + s.read(&mut buf).unwrap_err(); + } + + fn check_chunks(s: &mut RxStreamOrderer, expected: &[(u64, usize)]) { + assert_eq!(s.data_ranges.len(), expected.len()); + for ((start, buf), (expected_start, expected_len)) in s.data_ranges.iter().zip(expected) { + assert_eq!((*start, buf.len()), (*expected_start, *expected_len)); + } + } + + // Test deduplication when the new data is at the end. + #[test] + fn stream_rx_dedupe_tail() { + let mut s = RxStreamOrderer::new(); + + s.inbound_frame(0, &[1; 6]); + check_chunks(&mut s, &[(0, 6)]); + + // New data that overlaps entirely (starting from the head), is ignored. + s.inbound_frame(0, &[2; 3]); + check_chunks(&mut s, &[(0, 6)]); + + // New data that overlaps at the tail has any new data appended. + s.inbound_frame(2, &[3; 6]); + check_chunks(&mut s, &[(0, 8)]); + + // New data that overlaps entirely (up to the tail), is ignored. + s.inbound_frame(4, &[4; 4]); + check_chunks(&mut s, &[(0, 8)]); + + // New data that overlaps, starting from the beginning is appended too. + s.inbound_frame(0, &[5; 10]); + check_chunks(&mut s, &[(0, 10)]); + + // New data that is entirely subsumed is ignored. + s.inbound_frame(2, &[6; 2]); + check_chunks(&mut s, &[(0, 10)]); + + let mut buf = [0; 16]; + assert_eq!(s.read(&mut buf[..]), 10); + assert_eq!(buf[..10], [1, 1, 1, 1, 1, 1, 3, 3, 5, 5]); + } + + /// When chunks are added before existing data, they aren't merged. + #[test] + fn stream_rx_dedupe_head() { + let mut s = RxStreamOrderer::new(); + + s.inbound_frame(1, &[6; 6]); + check_chunks(&mut s, &[(1, 6)]); + + // Insertion before an existing chunk causes truncation of the new chunk. + s.inbound_frame(0, &[7; 6]); + check_chunks(&mut s, &[(0, 1), (1, 6)]); + + // Perfect overlap with existing slices has no effect. + s.inbound_frame(0, &[8; 7]); + check_chunks(&mut s, &[(0, 1), (1, 6)]); + + let mut buf = [0; 16]; + assert_eq!(s.read(&mut buf[..]), 7); + assert_eq!(buf[..7], [7, 6, 6, 6, 6, 6, 6]); + } + + #[test] + fn stream_rx_dedupe_new_tail() { + let mut s = RxStreamOrderer::new(); + + s.inbound_frame(1, &[6; 6]); + check_chunks(&mut s, &[(1, 6)]); + + // Insertion before an existing chunk causes truncation of the new chunk. + s.inbound_frame(0, &[7; 6]); + check_chunks(&mut s, &[(0, 1), (1, 6)]); + + // New data at the end causes the tail to be added to the first chunk, + // replacing later chunks entirely. + s.inbound_frame(0, &[9; 8]); + check_chunks(&mut s, &[(0, 8)]); + + let mut buf = [0; 16]; + assert_eq!(s.read(&mut buf[..]), 8); + assert_eq!(buf[..8], [7, 9, 9, 9, 9, 9, 9, 9]); + } + + #[test] + fn stream_rx_dedupe_replace() { + let mut s = RxStreamOrderer::new(); + + s.inbound_frame(2, &[6; 6]); + check_chunks(&mut s, &[(2, 6)]); + + // Insertion before an existing chunk causes truncation of the new chunk. + s.inbound_frame(1, &[7; 6]); + check_chunks(&mut s, &[(1, 1), (2, 6)]); + + // New data at the start and end replaces all the slices. + s.inbound_frame(0, &[9; 10]); + check_chunks(&mut s, &[(0, 10)]); + + let mut buf = [0; 16]; + assert_eq!(s.read(&mut buf[..]), 10); + assert_eq!(buf[..10], [9; 10]); + } + + #[test] + fn trim_retired() { + let mut s = RxStreamOrderer::new(); + + let mut buf = [0; 18]; + s.inbound_frame(0, &[1; 10]); + + // Partially read slices are retained. + assert_eq!(s.read(&mut buf[..6]), 6); + check_chunks(&mut s, &[(0, 10)]); + + // Partially read slices are kept and so are added to. + s.inbound_frame(3, &buf[..10]); + check_chunks(&mut s, &[(0, 13)]); + + // Wholly read pieces are dropped. + assert_eq!(s.read(&mut buf[..]), 7); + assert!(s.data_ranges.is_empty()); + + // New data that overlaps with retired data is trimmed. + s.inbound_frame(0, &buf[..]); + check_chunks(&mut s, &[(13, 5)]); + } + + #[test] + fn stream_flowc_update() { + let flow_mgr = Rc::default(); + let conn_events = ConnectionEvents::default(); + + let frame1 = vec![0; RECV_BUFFER_SIZE]; + + let mut s = RecvStream::new( + StreamId::from(4), + RX_STREAM_DATA_WINDOW, + Rc::clone(&flow_mgr), + conn_events, + ); + + let mut buf = vec![0u8; RECV_BUFFER_SIZE + 100]; // Make it overlarge + + s.maybe_send_flowc_update(); + assert_eq!(s.flow_mgr.borrow().peek(), None); + s.inbound_stream_frame(false, 0, &frame1).unwrap(); + s.maybe_send_flowc_update(); + assert_eq!(s.flow_mgr.borrow().peek(), None); + assert_eq!(s.read(&mut buf).unwrap(), (RECV_BUFFER_SIZE, false)); + assert_eq!(s.data_ready(), false); + s.maybe_send_flowc_update(); + + // flow msg generated! + assert!(s.flow_mgr.borrow().peek().is_some()); + + // consume it + s.flow_mgr.borrow_mut().next().unwrap(); + + // it should be gone + s.maybe_send_flowc_update(); + assert_eq!(s.flow_mgr.borrow().peek(), None); + } + + #[test] + fn stream_max_stream_data() { + let flow_mgr = Rc::new(RefCell::new(FlowMgr::default())); + let conn_events = ConnectionEvents::default(); + + let frame1 = vec![0; RECV_BUFFER_SIZE]; + let mut s = RecvStream::new( + StreamId::from(67), + RX_STREAM_DATA_WINDOW, + Rc::clone(&flow_mgr), + conn_events, + ); + + s.maybe_send_flowc_update(); + assert_eq!(s.flow_mgr.borrow().peek(), None); + s.inbound_stream_frame(false, 0, &frame1).unwrap(); + s.inbound_stream_frame(false, RX_STREAM_DATA_WINDOW, &[1; 1]) + .unwrap_err(); + } + + #[test] + fn stream_orderer_bytes_ready() { + let mut rx_ord = RxStreamOrderer::new(); + + rx_ord.inbound_frame(0, &[1; 6]); + assert_eq!(rx_ord.bytes_ready(), 6); + assert_eq!(rx_ord.buffered(), 6); + assert_eq!(rx_ord.retired(), 0); + + // read some so there's an offset into the first frame + let mut buf = [0u8; 10]; + rx_ord.read(&mut buf[..2]); + assert_eq!(rx_ord.bytes_ready(), 4); + assert_eq!(rx_ord.buffered(), 4); + assert_eq!(rx_ord.retired(), 2); + + // an overlapping frame + rx_ord.inbound_frame(5, &[2; 6]); + assert_eq!(rx_ord.bytes_ready(), 9); + assert_eq!(rx_ord.buffered(), 9); + assert_eq!(rx_ord.retired(), 2); + + // a noncontig frame + rx_ord.inbound_frame(20, &[3; 6]); + assert_eq!(rx_ord.bytes_ready(), 9); + assert_eq!(rx_ord.buffered(), 15); + assert_eq!(rx_ord.retired(), 2); + + // an old frame + rx_ord.inbound_frame(0, &[4; 2]); + assert_eq!(rx_ord.bytes_ready(), 9); + assert_eq!(rx_ord.buffered(), 15); + assert_eq!(rx_ord.retired(), 2); + } + + #[test] + fn no_stream_flowc_event_after_exiting_recv() { + let flow_mgr = Rc::new(RefCell::new(FlowMgr::default())); + let conn_events = ConnectionEvents::default(); + + let frame1 = vec![0; RECV_BUFFER_SIZE]; + let stream_id = StreamId::from(67); + let mut s = RecvStream::new( + stream_id, + RX_STREAM_DATA_WINDOW, + Rc::clone(&flow_mgr), + conn_events, + ); + + s.inbound_stream_frame(false, 0, &frame1).unwrap(); + flow_mgr.borrow_mut().max_stream_data(stream_id, 100); + assert!(matches!(s.flow_mgr.borrow().peek().unwrap(), Frame::MaxStreamData{..})); + s.inbound_stream_frame(true, RX_STREAM_DATA_WINDOW, &[]) + .unwrap(); + assert!(matches!(s.flow_mgr.borrow().peek(), None)); + } + + #[test] + fn resend_flowc_if_lost() { + let flow_mgr = Rc::new(RefCell::new(FlowMgr::default())); + let conn_events = ConnectionEvents::default(); + + let frame1 = &[0; RECV_BUFFER_SIZE]; + let stream_id = StreamId::from(67); + let mut s = RecvStream::new( + stream_id, + RX_STREAM_DATA_WINDOW, + Rc::clone(&flow_mgr), + conn_events, + ); + + // A flow control update is queued + s.inbound_stream_frame(false, 0, frame1).unwrap(); + flow_mgr.borrow_mut().max_stream_data(stream_id, 100); + // Generates frame + assert!(matches!( + s.flow_mgr.borrow_mut().next().unwrap(), + Frame::MaxStreamData { .. } + )); + // Nothing else queued + assert!(matches!(s.flow_mgr.borrow().peek(), None)); + // Asking for another one won't get you one + s.maybe_send_flowc_update(); + assert!(matches!(s.flow_mgr.borrow().peek(), None)); + // But if lost, another frame is generated + flow_mgr.borrow_mut().max_stream_data(stream_id, 100); + assert!(matches!(s.flow_mgr.borrow_mut().next().unwrap(), Frame::MaxStreamData{..})); + } +} |