// Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. // Building a stream of ordered bytes to give the application from a series of // incoming STREAM frames. use std::{ cell::RefCell, cmp::max, collections::BTreeMap, mem, rc::{Rc, Weak}, }; use neqo_common::{qtrace, Role}; use smallvec::SmallVec; use crate::{ events::ConnectionEvents, fc::ReceiverFlowControl, frame::FRAME_TYPE_STOP_SENDING, packet::PacketBuilder, recovery::{RecoveryToken, StreamRecoveryToken}, send_stream::SendStreams, stats::FrameStats, stream_id::StreamId, AppError, Error, Res, }; const RX_STREAM_DATA_WINDOW: u64 = 0x10_0000; // 1MiB // Export as usize for consistency with SEND_BUFFER_SIZE #[allow(clippy::cast_possible_truncation)] // Yeah, nope. pub const RECV_BUFFER_SIZE: usize = RX_STREAM_DATA_WINDOW as usize; #[derive(Debug, Default)] pub(crate) struct RecvStreams { streams: BTreeMap, keep_alive: Weak<()>, } impl RecvStreams { pub fn write_frames( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec, stats: &mut FrameStats, ) { for stream in self.streams.values_mut() { stream.write_frame(builder, tokens, stats); if builder.is_full() { return; } } } pub fn insert(&mut self, id: StreamId, stream: RecvStream) { self.streams.insert(id, stream); } pub fn get_mut(&mut self, id: StreamId) -> Res<&mut RecvStream> { self.streams.get_mut(&id).ok_or(Error::InvalidStreamId) } pub fn keep_alive(&mut self, id: StreamId, k: bool) -> Res<()> { let self_ka = &mut self.keep_alive; let s = self.streams.get_mut(&id).ok_or(Error::InvalidStreamId)?; s.keep_alive = if k { Some(self_ka.upgrade().unwrap_or_else(|| { let r = Rc::new(()); *self_ka = Rc::downgrade(&r); r })) } else { None }; Ok(()) } pub fn need_keep_alive(&mut self) -> bool { self.keep_alive.strong_count() > 0 } pub fn clear(&mut self) { self.streams.clear(); } pub fn clear_terminal(&mut self, send_streams: &SendStreams, role: Role) -> (u64, u64) { let recv_to_remove = self .streams .iter() .filter_map(|(id, stream)| { // Remove all streams for which the receiving is done (or aborted). // But only if they are unidirectional, or we have finished sending. if stream.is_terminal() && (id.is_uni() || !send_streams.exists(*id)) { Some(*id) } else { None } }) .collect::>(); let mut removed_bidi = 0; let mut removed_uni = 0; for id in &recv_to_remove { self.streams.remove(id); if id.is_remote_initiated(role) { if id.is_bidi() { removed_bidi += 1; } else { removed_uni += 1; } } } (removed_bidi, removed_uni) } } /// Holds data not yet read by application. Orders and dedupes data ranges /// from incoming STREAM frames. #[derive(Debug, Default)] pub struct RxStreamOrderer { data_ranges: BTreeMap>, // (start_offset, data) retired: u64, // Number of bytes the application has read received: u64, // The number of bytes has stored in `data_ranges` } impl RxStreamOrderer { #[must_use] pub fn new() -> Self { Self::default() } /// Process an incoming stream frame off the wire. This may result in data /// being available to upper layers if frame is not out of order (ooo) or /// if the frame fills a gap. /// # Panics /// Only when `u64` values cannot be converted to `usize`, which only /// happens on 32-bit machines that hold far too much data at the same time. pub fn inbound_frame(&mut self, mut new_start: u64, mut new_data: &[u8]) { qtrace!("Inbound data offset={} len={}", new_start, new_data.len()); // Get entry before where new entry would go, so we can see if we already // have the new bytes. // Avoid copies and duplicated data. let new_end = new_start + u64::try_from(new_data.len()).unwrap(); if new_end <= self.retired { // Range already read by application, this frame is very late and unneeded. return; } if new_start < self.retired { new_data = &new_data[usize::try_from(self.retired - new_start).unwrap()..]; new_start = self.retired; } if new_data.is_empty() { // No data to insert return; } let extend = if let Some((&prev_start, prev_vec)) = self.data_ranges.range_mut(..=new_start).next_back() { let prev_end = prev_start + u64::try_from(prev_vec.len()).unwrap(); if new_end > prev_end { // PPPPPP -> PPPPPP // NNNNNN NN // NNNNNNNN NN // Add a range containing only new data // (In-order frames will take this path, with no overlap) let overlap = prev_end.saturating_sub(new_start); qtrace!( "New frame {}-{} received, overlap: {}", new_start, new_end, overlap ); new_start += overlap; new_data = &new_data[usize::try_from(overlap).unwrap()..]; // If it is small enough, extend the previous buffer. // This can't always extend, because otherwise the buffer could end up // growing indefinitely without being released. prev_vec.len() < 4096 && prev_end == new_start } else { // PPPPPP -> PPPPPP // NNNN // NNNN // Do nothing qtrace!( "Dropping frame with already-received range {}-{}", new_start, new_end ); return; } } else { qtrace!("New frame {}-{} received", new_start, new_end); false }; let mut to_add = new_data; if self .data_ranges .last_entry() .map_or(false, |e| *e.key() >= new_start) { // Is this at the end (common case)? If so, nothing to do in this block // Common case: // PPPPPP -> PPPPPP // NNNNNNN NNNNNNN // or // PPPPPP -> PPPPPP // NNNNNNN NNNNNNN // // Not the common case, handle possible overlap with next entries // PPPPPP AAA -> PPPPPP // NNNNNNN NNNNNNN // or // PPPPPP AAAA -> PPPPPP AAAA // NNNNNNN NNNNN // or (this is where to_remove is used) // PPPPPP AA -> PPPPPP // NNNNNNN NNNNNNN let mut to_remove = SmallVec::<[_; 8]>::new(); for (&next_start, next_data) in self.data_ranges.range_mut(new_start..) { let next_end = next_start + u64::try_from(next_data.len()).unwrap(); let overlap = new_end.saturating_sub(next_start); if overlap == 0 { // Fills in the hole, exactly (probably common) break; } else if next_end >= new_end { qtrace!( "New frame {}-{} overlaps with next frame by {}, truncating", new_start, new_end, overlap ); let truncate_to = new_data.len() - usize::try_from(overlap).unwrap(); to_add = &new_data[..truncate_to]; break; } qtrace!( "New frame {}-{} spans entire next frame {}-{}, replacing", new_start, new_end, next_start, next_end ); to_remove.push(next_start); // Continue, since we may have more overlaps } for start in to_remove { self.data_ranges.remove(&start); } } if !to_add.is_empty() { self.received += u64::try_from(to_add.len()).unwrap(); if extend { let (_, buf) = self .data_ranges .range_mut(..=new_start) .next_back() .unwrap(); buf.extend_from_slice(to_add); } else { self.data_ranges.insert(new_start, to_add.to_vec()); } } } /// Are any bytes readable? #[must_use] pub fn data_ready(&self) -> bool { self.data_ranges .keys() .next() .map_or(false, |&start| start <= self.retired) } /// How many bytes are readable? fn bytes_ready(&self) -> usize { let mut prev_end = self.retired; self.data_ranges .iter() .map(|(start_offset, data)| { // All ranges don't overlap but we could have partially // retired some of the first entry's data. let data_len = data.len() as u64 - self.retired.saturating_sub(*start_offset); (start_offset, data_len) }) .take_while(|(start_offset, data_len)| { if **start_offset <= prev_end { prev_end += data_len; true } else { false } }) // Accumulate, but saturate at usize::MAX. .fold(0, |acc: usize, (_, data_len)| { acc.saturating_add(usize::try_from(data_len).unwrap_or(usize::MAX)) }) } /// Bytes read by the application. #[must_use] pub fn retired(&self) -> u64 { self.retired } #[must_use] pub fn received(&self) -> u64 { self.received } /// Data bytes buffered. Could be more than `bytes_readable` if there are /// ranges missing. fn buffered(&self) -> u64 { self.data_ranges .iter() .map(|(&start, data)| data.len() as u64 - (self.retired.saturating_sub(start))) .sum() } /// Copy received data (if any) into the buffer. Returns bytes copied. fn read(&mut self, buf: &mut [u8]) -> usize { qtrace!("Reading {} bytes, {} available", buf.len(), self.buffered()); let mut copied = 0; for (&range_start, range_data) in &mut self.data_ranges { let mut keep = false; if self.retired >= range_start { // Frame data has new contiguous bytes. let copy_offset = usize::try_from(max(range_start, self.retired) - range_start).unwrap(); assert!(range_data.len() >= copy_offset); let available = range_data.len() - copy_offset; let space = buf.len() - copied; let copy_bytes = if available > space { keep = true; space } else { available }; if copy_bytes > 0 { let copy_slc = &range_data[copy_offset..copy_offset + copy_bytes]; buf[copied..copied + copy_bytes].copy_from_slice(copy_slc); copied += copy_bytes; self.retired += u64::try_from(copy_bytes).unwrap(); } } else { // The data in the buffer isn't contiguous. keep = true; } if keep { let mut keep = self.data_ranges.split_off(&range_start); mem::swap(&mut self.data_ranges, &mut keep); return copied; } } self.data_ranges.clear(); copied } /// Extend the given Vector with any available data. pub fn read_to_end(&mut self, buf: &mut Vec) -> usize { let orig_len = buf.len(); buf.resize(orig_len + self.bytes_ready(), 0); self.read(&mut buf[orig_len..]) } } /// QUIC receiving states, based on -transport 3.2. #[derive(Debug)] #[allow(dead_code)] // Because a dead_code warning is easier than clippy::unused_self, see https://github.com/rust-lang/rust/issues/68408 enum RecvStreamState { Recv { fc: ReceiverFlowControl, session_fc: Rc>>, recv_buf: RxStreamOrderer, }, SizeKnown { fc: ReceiverFlowControl, session_fc: Rc>>, recv_buf: RxStreamOrderer, }, DataRecvd { fc: ReceiverFlowControl, session_fc: Rc>>, recv_buf: RxStreamOrderer, }, DataRead { final_received: u64, final_read: u64, }, AbortReading { fc: ReceiverFlowControl, session_fc: Rc>>, final_size_reached: bool, frame_needed: bool, err: AppError, final_received: u64, final_read: u64, }, WaitForReset { fc: ReceiverFlowControl, session_fc: Rc>>, final_received: u64, final_read: u64, }, ResetRecvd { final_received: u64, final_read: u64, }, // Defined by spec but we don't use it: ResetRead } impl RecvStreamState { fn new( max_bytes: u64, stream_id: StreamId, session_fc: Rc>>, ) -> Self { Self::Recv { fc: ReceiverFlowControl::new(stream_id, max_bytes), recv_buf: RxStreamOrderer::new(), session_fc, } } fn name(&self) -> &str { match self { Self::Recv { .. } => "Recv", Self::SizeKnown { .. } => "SizeKnown", Self::DataRecvd { .. } => "DataRecvd", Self::DataRead { .. } => "DataRead", Self::AbortReading { .. } => "AbortReading", Self::WaitForReset { .. } => "WaitForReset", Self::ResetRecvd { .. } => "ResetRecvd", } } fn recv_buf(&self) -> Option<&RxStreamOrderer> { match self { Self::Recv { recv_buf, .. } | Self::SizeKnown { recv_buf, .. } | Self::DataRecvd { recv_buf, .. } => Some(recv_buf), Self::DataRead { .. } | Self::AbortReading { .. } | Self::WaitForReset { .. } | Self::ResetRecvd { .. } => None, } } fn flow_control_consume_data(&mut self, consumed: u64, fin: bool) -> Res<()> { let (fc, session_fc, final_size_reached, retire_data) = match self { Self::Recv { fc, session_fc, .. } => (fc, session_fc, false, false), Self::WaitForReset { fc, session_fc, .. } => (fc, session_fc, false, true), Self::SizeKnown { fc, session_fc, .. } | Self::DataRecvd { fc, session_fc, .. } => { (fc, session_fc, true, false) } Self::AbortReading { fc, session_fc, final_size_reached, .. } => { let old_final_size_reached = *final_size_reached; *final_size_reached |= fin; (fc, session_fc, old_final_size_reached, true) } Self::DataRead { .. } | Self::ResetRecvd { .. } => { return Ok(()); } }; // Check final size: let final_size_ok = match (fin, final_size_reached) { (true, true) => consumed == fc.consumed(), (false, true) => consumed <= fc.consumed(), (true, false) => consumed >= fc.consumed(), (false, false) => true, }; if !final_size_ok { return Err(Error::FinalSizeError); } let new_bytes_consumed = fc.set_consumed(consumed)?; session_fc.borrow_mut().consume(new_bytes_consumed)?; if retire_data { // Let's also retire this data since the stream has been aborted RecvStream::flow_control_retire_data(fc.consumed() - fc.retired(), fc, session_fc); } Ok(()) } } // See https://www.w3.org/TR/webtransport/#receive-stream-stats #[derive(Debug, Clone, Copy)] pub struct RecvStreamStats { // An indicator of progress on how many of the server application’s bytes // intended for this stream have been received so far. // Only sequential bytes up to, but not including, the first missing byte, // are counted. This number can only increase. pub bytes_received: u64, // The total number of bytes the application has successfully read from this // stream. This number can only increase, and is always less than or equal // to bytes_received. pub bytes_read: u64, } impl RecvStreamStats { #[must_use] pub fn new(bytes_received: u64, bytes_read: u64) -> Self { Self { bytes_received, bytes_read, } } #[must_use] pub fn bytes_received(&self) -> u64 { self.bytes_received } #[must_use] pub fn bytes_read(&self) -> u64 { self.bytes_read } } /// Implement a QUIC receive stream. #[derive(Debug)] pub struct RecvStream { stream_id: StreamId, state: RecvStreamState, conn_events: ConnectionEvents, keep_alive: Option>, } impl RecvStream { pub fn new( stream_id: StreamId, max_stream_data: u64, session_fc: Rc>>, conn_events: ConnectionEvents, ) -> Self { Self { stream_id, state: RecvStreamState::new(max_stream_data, stream_id, session_fc), conn_events, keep_alive: None, } } fn set_state(&mut self, new_state: RecvStreamState) { debug_assert_ne!( mem::discriminant(&self.state), mem::discriminant(&new_state) ); qtrace!( "RecvStream {} state {} -> {}", self.stream_id.as_u64(), self.state.name(), new_state.name() ); match new_state { // Receiving all data, or receiving or requesting RESET_STREAM // is cause to stop keep-alives. RecvStreamState::DataRecvd { .. } | RecvStreamState::AbortReading { .. } | RecvStreamState::ResetRecvd { .. } => { self.keep_alive = None; } // Once all the data is read, generate an event. RecvStreamState::DataRead { .. } => { self.conn_events.recv_stream_complete(self.stream_id); } _ => {} } self.state = new_state; } #[must_use] pub fn stats(&self) -> RecvStreamStats { match &self.state { RecvStreamState::Recv { recv_buf, .. } | RecvStreamState::SizeKnown { recv_buf, .. } | RecvStreamState::DataRecvd { recv_buf, .. } => { let received = recv_buf.received(); let read = recv_buf.retired(); RecvStreamStats::new(received, read) } RecvStreamState::AbortReading { final_received, final_read, .. } | RecvStreamState::WaitForReset { final_received, final_read, .. } | RecvStreamState::DataRead { final_received, final_read, } | RecvStreamState::ResetRecvd { final_received, final_read, } => { let received = *final_received; let read = *final_read; RecvStreamStats::new(received, read) } } } /// # Errors /// When the incoming data violates flow control limits. /// # Panics /// Only when `u64` values are so big that they can't fit in a `usize`, which /// only happens on a 32-bit machine that has far too much unread data. pub fn inbound_stream_frame(&mut self, fin: bool, offset: u64, data: &[u8]) -> Res<()> { // We should post a DataReadable event only once when we change from no-data-ready to // data-ready. Therefore remember the state before processing a new frame. let already_data_ready = self.data_ready(); let new_end = offset + u64::try_from(data.len()).unwrap(); self.state.flow_control_consume_data(new_end, fin)?; match &mut self.state { RecvStreamState::Recv { recv_buf, fc, session_fc, } => { recv_buf.inbound_frame(offset, data); if fin { let all_recv = fc.consumed() == recv_buf.retired() + recv_buf.bytes_ready() as u64; let buf = mem::replace(recv_buf, RxStreamOrderer::new()); let fc_copy = mem::take(fc); let session_fc_copy = mem::take(session_fc); if all_recv { self.set_state(RecvStreamState::DataRecvd { fc: fc_copy, session_fc: session_fc_copy, recv_buf: buf, }); } else { self.set_state(RecvStreamState::SizeKnown { fc: fc_copy, session_fc: session_fc_copy, recv_buf: buf, }); } } } RecvStreamState::SizeKnown { recv_buf, fc, session_fc, } => { recv_buf.inbound_frame(offset, data); if fc.consumed() == recv_buf.retired() + recv_buf.bytes_ready() as u64 { let buf = mem::replace(recv_buf, RxStreamOrderer::new()); let fc_copy = mem::take(fc); let session_fc_copy = mem::take(session_fc); self.set_state(RecvStreamState::DataRecvd { fc: fc_copy, session_fc: session_fc_copy, recv_buf: buf, }); } } RecvStreamState::DataRecvd { .. } | RecvStreamState::DataRead { .. } | RecvStreamState::AbortReading { .. } | RecvStreamState::WaitForReset { .. } | RecvStreamState::ResetRecvd { .. } => { qtrace!("data received when we are in state {}", self.state.name()); } } if !already_data_ready && (self.data_ready() || self.needs_to_inform_app_about_fin()) { self.conn_events.recv_stream_readable(self.stream_id); } Ok(()) } /// # Errors /// When the reset occurs at an invalid point. pub fn reset(&mut self, application_error_code: AppError, final_size: u64) -> Res<()> { self.state.flow_control_consume_data(final_size, true)?; match &mut self.state { RecvStreamState::Recv { fc, session_fc, recv_buf, } | RecvStreamState::SizeKnown { fc, session_fc, recv_buf, } => { // make flow control consumes new data that not really exist. Self::flow_control_retire_data(final_size - fc.retired(), fc, session_fc); self.conn_events .recv_stream_reset(self.stream_id, application_error_code); let received = recv_buf.received(); let read = recv_buf.retired(); self.set_state(RecvStreamState::ResetRecvd { final_received: received, final_read: read, }); } RecvStreamState::AbortReading { fc, session_fc, final_received, final_read, .. } | RecvStreamState::WaitForReset { fc, session_fc, final_received, final_read, } => { // make flow control consumes new data that not really exist. Self::flow_control_retire_data(final_size - fc.retired(), fc, session_fc); self.conn_events .recv_stream_reset(self.stream_id, application_error_code); let received = *final_received; let read = *final_read; self.set_state(RecvStreamState::ResetRecvd { final_received: received, final_read: read, }); } _ => { // Ignore reset if in DataRecvd, DataRead, or ResetRecvd } } Ok(()) } /// If we should tell the sender they have more credit, return an offset fn flow_control_retire_data( new_read: u64, fc: &mut ReceiverFlowControl, session_fc: &mut Rc>>, ) { if new_read > 0 { fc.add_retired(new_read); session_fc.borrow_mut().add_retired(new_read); } } /// Send a flow control update. /// This is used when a peer declares that they are blocked. /// This sends `MAX_STREAM_DATA` if there is any increase possible. pub fn send_flowc_update(&mut self) { if let RecvStreamState::Recv { fc, .. } = &mut self.state { fc.send_flowc_update(); } } pub fn set_stream_max_data(&mut self, max_data: u64) { if let RecvStreamState::Recv { fc, .. } = &mut self.state { fc.set_max_active(max_data); } } #[must_use] pub fn is_terminal(&self) -> bool { matches!( self.state, RecvStreamState::ResetRecvd { .. } | RecvStreamState::DataRead { .. } ) } // App got all data but did not get the fin signal. fn needs_to_inform_app_about_fin(&self) -> bool { matches!(self.state, RecvStreamState::DataRecvd { .. }) } fn data_ready(&self) -> bool { self.state .recv_buf() .map_or(false, RxStreamOrderer::data_ready) } /// # Errors /// `NoMoreData` if data and fin bit were previously read by the application. #[allow(clippy::missing_panics_doc)] // with a >16 exabyte packet on a 128-bit machine, maybe pub fn read(&mut self, buf: &mut [u8]) -> Res<(usize, bool)> { let data_recvd_state = matches!(self.state, RecvStreamState::DataRecvd { .. }); match &mut self.state { RecvStreamState::Recv { recv_buf, fc, session_fc, } | RecvStreamState::SizeKnown { recv_buf, fc, session_fc, .. } | RecvStreamState::DataRecvd { recv_buf, fc, session_fc, } => { let bytes_read = recv_buf.read(buf); Self::flow_control_retire_data(u64::try_from(bytes_read).unwrap(), fc, session_fc); let fin_read = if data_recvd_state { if recv_buf.buffered() == 0 { let received = recv_buf.received(); let read = recv_buf.retired(); self.set_state(RecvStreamState::DataRead { final_received: received, final_read: read, }); true } else { false } } else { false }; Ok((bytes_read, fin_read)) } RecvStreamState::DataRead { .. } | RecvStreamState::AbortReading { .. } | RecvStreamState::WaitForReset { .. } | RecvStreamState::ResetRecvd { .. } => Err(Error::NoMoreData), } } pub fn stop_sending(&mut self, err: AppError) { qtrace!("stop_sending called when in state {}", self.state.name()); match &mut self.state { RecvStreamState::Recv { fc, session_fc, recv_buf, } | RecvStreamState::SizeKnown { fc, session_fc, recv_buf, } => { // Retire data Self::flow_control_retire_data(fc.consumed() - fc.retired(), fc, session_fc); let fc_copy = mem::take(fc); let session_fc_copy = mem::take(session_fc); let received = recv_buf.received(); let read = recv_buf.retired(); self.set_state(RecvStreamState::AbortReading { fc: fc_copy, session_fc: session_fc_copy, final_size_reached: matches!(self.state, RecvStreamState::SizeKnown { .. }), frame_needed: true, err, final_received: received, final_read: read, }); } RecvStreamState::DataRecvd { fc, session_fc, recv_buf, } => { Self::flow_control_retire_data(fc.consumed() - fc.retired(), fc, session_fc); let received = recv_buf.received(); let read = recv_buf.retired(); self.set_state(RecvStreamState::DataRead { final_received: received, final_read: read, }); } RecvStreamState::DataRead { .. } | RecvStreamState::AbortReading { .. } | RecvStreamState::WaitForReset { .. } | RecvStreamState::ResetRecvd { .. } => { // Already in terminal state } } } /// Maybe write a `MAX_STREAM_DATA` frame. pub fn write_frame( &mut self, builder: &mut PacketBuilder, tokens: &mut Vec, stats: &mut FrameStats, ) { match &mut self.state { // Maybe send MAX_STREAM_DATA RecvStreamState::Recv { fc, .. } => fc.write_frames(builder, tokens, stats), // Maybe send STOP_SENDING RecvStreamState::AbortReading { frame_needed, err, .. } => { if *frame_needed && builder.write_varint_frame(&[ FRAME_TYPE_STOP_SENDING, self.stream_id.as_u64(), *err, ]) { tokens.push(RecoveryToken::Stream(StreamRecoveryToken::StopSending { stream_id: self.stream_id, })); stats.stop_sending += 1; *frame_needed = false; } } _ => {} } } pub fn max_stream_data_lost(&mut self, maximum_data: u64) { if let RecvStreamState::Recv { fc, .. } = &mut self.state { fc.frame_lost(maximum_data); } } pub fn stop_sending_lost(&mut self) { if let RecvStreamState::AbortReading { frame_needed, .. } = &mut self.state { *frame_needed = true; } } pub fn stop_sending_acked(&mut self) { if let RecvStreamState::AbortReading { fc, session_fc, final_size_reached, final_received, final_read, .. } = &mut self.state { let received = *final_received; let read = *final_read; if *final_size_reached { // We already know the final_size of the stream therefore we // do not need to wait for RESET. self.set_state(RecvStreamState::ResetRecvd { final_received: received, final_read: read, }); } else { let fc_copy = mem::take(fc); let session_fc_copy = mem::take(session_fc); self.set_state(RecvStreamState::WaitForReset { fc: fc_copy, session_fc: session_fc_copy, final_received: received, final_read: read, }); } } } #[cfg(test)] #[must_use] pub fn has_frames_to_write(&self) -> bool { if let RecvStreamState::Recv { fc, .. } = &self.state { fc.frame_needed() } else { false } } #[cfg(test)] #[must_use] pub fn fc(&self) -> Option<&ReceiverFlowControl> { match &self.state { RecvStreamState::Recv { fc, .. } | RecvStreamState::SizeKnown { fc, .. } | RecvStreamState::DataRecvd { fc, .. } | RecvStreamState::AbortReading { fc, .. } | RecvStreamState::WaitForReset { fc, .. } => Some(fc), _ => None, } } } #[cfg(test)] mod tests { use std::{cell::RefCell, ops::Range, rc::Rc}; use neqo_common::{qtrace, Encoder}; use super::RecvStream; use crate::{ fc::ReceiverFlowControl, packet::PacketBuilder, recv_stream::{RxStreamOrderer, RX_STREAM_DATA_WINDOW}, stats::FrameStats, ConnectionEvents, Error, StreamId, RECV_BUFFER_SIZE, }; const SESSION_WINDOW: usize = 1024; fn recv_ranges(ranges: &[Range], available: usize) { const ZEROES: &[u8] = &[0; 100]; qtrace!("recv_ranges {:?}", ranges); let mut s = RxStreamOrderer::default(); for r in ranges { let data = &ZEROES[..usize::try_from(r.end - r.start).unwrap()]; s.inbound_frame(r.start, data); } let mut buf = [0xff; 100]; let mut total_recvd = 0; loop { let recvd = s.read(&mut buf[..]); qtrace!("recv_ranges read {}", recvd); total_recvd += recvd; if recvd == 0 { assert_eq!(total_recvd, available); break; } } } #[test] #[allow(unknown_lints, clippy::single_range_in_vec_init)] // Because that lint makes no sense here. fn recv_noncontiguous() { // Non-contiguous with the start, no data available. recv_ranges(&[10..20], 0); } /// Overlaps with the start of a 10..20 range of bytes. #[test] fn recv_overlap_start() { // Overlap the start, with a larger new value. // More overlap than not. recv_ranges(&[10..20, 4..18, 0..4], 20); // Overlap the start, with a larger new value. // Less overlap than not. recv_ranges(&[10..20, 2..15, 0..2], 20); // Overlap the start, with a smaller new value. // More overlap than not. recv_ranges(&[10..20, 8..14, 0..8], 20); // Overlap the start, with a smaller new value. // Less overlap than not. recv_ranges(&[10..20, 6..13, 0..6], 20); // Again with some of the first range split in two. recv_ranges(&[10..11, 11..20, 4..18, 0..4], 20); recv_ranges(&[10..11, 11..20, 2..15, 0..2], 20); recv_ranges(&[10..11, 11..20, 8..14, 0..8], 20); recv_ranges(&[10..11, 11..20, 6..13, 0..6], 20); // Again with a gap in the first range. recv_ranges(&[10..11, 12..20, 4..18, 0..4], 20); recv_ranges(&[10..11, 12..20, 2..15, 0..2], 20); recv_ranges(&[10..11, 12..20, 8..14, 0..8], 20); recv_ranges(&[10..11, 12..20, 6..13, 0..6], 20); } /// Overlaps with the end of a 10..20 range of bytes. #[test] fn recv_overlap_end() { // Overlap the end, with a larger new value. // More overlap than not. recv_ranges(&[10..20, 12..25, 0..10], 25); // Overlap the end, with a larger new value. // Less overlap than not. recv_ranges(&[10..20, 17..33, 0..10], 33); // Overlap the end, with a smaller new value. // More overlap than not. recv_ranges(&[10..20, 15..21, 0..10], 21); // Overlap the end, with a smaller new value. // Less overlap than not. recv_ranges(&[10..20, 17..25, 0..10], 25); // Again with some of the first range split in two. recv_ranges(&[10..19, 19..20, 12..25, 0..10], 25); recv_ranges(&[10..19, 19..20, 17..33, 0..10], 33); recv_ranges(&[10..19, 19..20, 15..21, 0..10], 21); recv_ranges(&[10..19, 19..20, 17..25, 0..10], 25); // Again with a gap in the first range. recv_ranges(&[10..18, 19..20, 12..25, 0..10], 25); recv_ranges(&[10..18, 19..20, 17..33, 0..10], 33); recv_ranges(&[10..18, 19..20, 15..21, 0..10], 21); recv_ranges(&[10..18, 19..20, 17..25, 0..10], 25); } /// Complete overlaps with the start of a 10..20 range of bytes. #[test] fn recv_overlap_complete() { // Complete overlap, more at the end. recv_ranges(&[10..20, 9..23, 0..9], 23); // Complete overlap, more at the start. recv_ranges(&[10..20, 3..23, 0..3], 23); // Complete overlap, to end. recv_ranges(&[10..20, 5..20, 0..5], 20); // Complete overlap, from start. recv_ranges(&[10..20, 10..27, 0..10], 27); // Complete overlap, from 0 and more. recv_ranges(&[10..20, 0..23], 23); // Again with the first range split in two. recv_ranges(&[10..14, 14..20, 9..23, 0..9], 23); recv_ranges(&[10..14, 14..20, 3..23, 0..3], 23); recv_ranges(&[10..14, 14..20, 5..20, 0..5], 20); recv_ranges(&[10..14, 14..20, 10..27, 0..10], 27); recv_ranges(&[10..14, 14..20, 0..23], 23); // Again with the a gap in the first range. recv_ranges(&[10..13, 14..20, 9..23, 0..9], 23); recv_ranges(&[10..13, 14..20, 3..23, 0..3], 23); recv_ranges(&[10..13, 14..20, 5..20, 0..5], 20); recv_ranges(&[10..13, 14..20, 10..27, 0..10], 27); recv_ranges(&[10..13, 14..20, 0..23], 23); } /// An overlap with no new bytes. #[test] fn recv_overlap_duplicate() { recv_ranges(&[10..20, 11..12, 0..10], 20); recv_ranges(&[10..20, 10..15, 0..10], 20); recv_ranges(&[10..20, 14..20, 0..10], 20); // Now with the first range split. recv_ranges(&[10..14, 14..20, 10..15, 0..10], 20); recv_ranges(&[10..15, 16..20, 21..25, 10..25, 0..10], 25); } /// Reading exactly one chunk works, when the next chunk starts immediately. #[test] fn stop_reading_at_chunk() { const CHUNK_SIZE: usize = 10; const EXTRA_SIZE: usize = 3; let mut s = RxStreamOrderer::new(); // Add three chunks. s.inbound_frame(0, &[0; CHUNK_SIZE]); let offset = u64::try_from(CHUNK_SIZE).unwrap(); s.inbound_frame(offset, &[0; EXTRA_SIZE]); let offset = u64::try_from(CHUNK_SIZE + EXTRA_SIZE).unwrap(); s.inbound_frame(offset, &[0; EXTRA_SIZE]); // Read, providing only enough space for the first. let mut buf = [0; 100]; let count = s.read(&mut buf[..CHUNK_SIZE]); assert_eq!(count, CHUNK_SIZE); let count = s.read(&mut buf[..]); assert_eq!(count, EXTRA_SIZE * 2); } #[test] fn recv_overlap_while_reading() { let mut s = RxStreamOrderer::new(); // Add a chunk s.inbound_frame(0, &[0; 150]); assert_eq!(s.data_ranges.get(&0).unwrap().len(), 150); // Read, providing only enough space for the first 100. let mut buf = [0; 100]; let count = s.read(&mut buf[..]); assert_eq!(count, 100); assert_eq!(s.retired, 100); // Add a second frame that overlaps. // This shouldn't truncate the first frame, as we're already // Reading from it. s.inbound_frame(120, &[0; 60]); assert_eq!(s.data_ranges.get(&0).unwrap().len(), 180); // Read second part of first frame and all of the second frame let count = s.read(&mut buf[..]); assert_eq!(count, 80); } /// Reading exactly one chunk works, when there is a gap. #[test] fn stop_reading_at_gap() { const CHUNK_SIZE: usize = 10; const EXTRA_SIZE: usize = 3; let mut s = RxStreamOrderer::new(); // Add three chunks. s.inbound_frame(0, &[0; CHUNK_SIZE]); let offset = u64::try_from(CHUNK_SIZE + EXTRA_SIZE).unwrap(); s.inbound_frame(offset, &[0; EXTRA_SIZE]); // Read, providing only enough space for the first chunk. let mut buf = [0; 100]; let count = s.read(&mut buf[..CHUNK_SIZE]); assert_eq!(count, CHUNK_SIZE); // Now fill the gap and ensure that everything can be read. let offset = u64::try_from(CHUNK_SIZE).unwrap(); s.inbound_frame(offset, &[0; EXTRA_SIZE]); let count = s.read(&mut buf[..]); assert_eq!(count, EXTRA_SIZE * 2); } /// Reading exactly one chunk works, when there is a gap. #[test] fn stop_reading_in_chunk() { const CHUNK_SIZE: usize = 10; const EXTRA_SIZE: usize = 3; let mut s = RxStreamOrderer::new(); // Add two chunks. s.inbound_frame(0, &[0; CHUNK_SIZE]); let offset = u64::try_from(CHUNK_SIZE).unwrap(); s.inbound_frame(offset, &[0; EXTRA_SIZE]); // Read, providing only enough space for some of the first chunk. let mut buf = [0; 100]; let count = s.read(&mut buf[..CHUNK_SIZE - EXTRA_SIZE]); assert_eq!(count, CHUNK_SIZE - EXTRA_SIZE); let count = s.read(&mut buf[..]); assert_eq!(count, EXTRA_SIZE * 2); } /// Read one byte at a time. #[test] fn read_byte_at_a_time() { const CHUNK_SIZE: usize = 10; const EXTRA_SIZE: usize = 3; let mut s = RxStreamOrderer::new(); // Add two chunks. s.inbound_frame(0, &[0; CHUNK_SIZE]); let offset = u64::try_from(CHUNK_SIZE).unwrap(); s.inbound_frame(offset, &[0; EXTRA_SIZE]); let mut buf = [0; 1]; for _ in 0..CHUNK_SIZE + EXTRA_SIZE { let count = s.read(&mut buf[..]); assert_eq!(count, 1); } assert_eq!(0, s.read(&mut buf[..])); } fn check_stats(stream: &RecvStream, expected_received: u64, expected_read: u64) { let stream_stats = stream.stats(); assert_eq!(expected_received, stream_stats.bytes_received()); assert_eq!(expected_read, stream_stats.bytes_read()); } #[test] fn stream_rx() { let conn_events = ConnectionEvents::default(); let mut s = RecvStream::new( StreamId::from(567), 1024, Rc::new(RefCell::new(ReceiverFlowControl::new((), 1024 * 1024))), conn_events, ); // test receiving a contig frame and reading it works s.inbound_stream_frame(false, 0, &[1; 10]).unwrap(); assert!(s.data_ready()); check_stats(&s, 10, 0); let mut buf = vec![0u8; 100]; assert_eq!(s.read(&mut buf).unwrap(), (10, false)); assert_eq!(s.state.recv_buf().unwrap().retired(), 10); assert_eq!(s.state.recv_buf().unwrap().buffered(), 0); check_stats(&s, 10, 10); // test receiving a noncontig frame s.inbound_stream_frame(false, 12, &[2; 12]).unwrap(); assert!(!s.data_ready()); assert_eq!(s.read(&mut buf).unwrap(), (0, false)); assert_eq!(s.state.recv_buf().unwrap().retired(), 10); assert_eq!(s.state.recv_buf().unwrap().buffered(), 12); check_stats(&s, 22, 10); // another frame that overlaps the first s.inbound_stream_frame(false, 14, &[3; 8]).unwrap(); assert!(!s.data_ready()); assert_eq!(s.state.recv_buf().unwrap().retired(), 10); assert_eq!(s.state.recv_buf().unwrap().buffered(), 12); check_stats(&s, 22, 10); // fill in the gap, but with a FIN s.inbound_stream_frame(true, 10, &[4; 6]).unwrap_err(); assert!(!s.data_ready()); assert_eq!(s.read(&mut buf).unwrap(), (0, false)); assert_eq!(s.state.recv_buf().unwrap().retired(), 10); assert_eq!(s.state.recv_buf().unwrap().buffered(), 12); check_stats(&s, 22, 10); // fill in the gap s.inbound_stream_frame(false, 10, &[5; 10]).unwrap(); assert!(s.data_ready()); assert_eq!(s.state.recv_buf().unwrap().retired(), 10); assert_eq!(s.state.recv_buf().unwrap().buffered(), 14); check_stats(&s, 24, 10); // a legit FIN s.inbound_stream_frame(true, 24, &[6; 18]).unwrap(); assert_eq!(s.state.recv_buf().unwrap().retired(), 10); assert_eq!(s.state.recv_buf().unwrap().buffered(), 32); assert!(s.data_ready()); assert_eq!(s.read(&mut buf).unwrap(), (32, true)); check_stats(&s, 42, 42); // Stream now no longer readable (is in DataRead state) s.read(&mut buf).unwrap_err(); } fn check_chunks(s: &mut RxStreamOrderer, expected: &[(u64, usize)]) { assert_eq!(s.data_ranges.len(), expected.len()); for ((start, buf), (expected_start, expected_len)) in s.data_ranges.iter().zip(expected) { assert_eq!((*start, buf.len()), (*expected_start, *expected_len)); } } // Test deduplication when the new data is at the end. #[test] fn stream_rx_dedupe_tail() { let mut s = RxStreamOrderer::new(); s.inbound_frame(0, &[1; 6]); check_chunks(&mut s, &[(0, 6)]); // New data that overlaps entirely (starting from the head), is ignored. s.inbound_frame(0, &[2; 3]); check_chunks(&mut s, &[(0, 6)]); // New data that overlaps at the tail has any new data appended. s.inbound_frame(2, &[3; 6]); check_chunks(&mut s, &[(0, 8)]); // New data that overlaps entirely (up to the tail), is ignored. s.inbound_frame(4, &[4; 4]); check_chunks(&mut s, &[(0, 8)]); // New data that overlaps, starting from the beginning is appended too. s.inbound_frame(0, &[5; 10]); check_chunks(&mut s, &[(0, 10)]); // New data that is entirely subsumed is ignored. s.inbound_frame(2, &[6; 2]); check_chunks(&mut s, &[(0, 10)]); let mut buf = [0; 16]; assert_eq!(s.read(&mut buf[..]), 10); assert_eq!(buf[..10], [1, 1, 1, 1, 1, 1, 3, 3, 5, 5]); } /// When chunks are added before existing data, they aren't merged. #[test] fn stream_rx_dedupe_head() { let mut s = RxStreamOrderer::new(); s.inbound_frame(1, &[6; 6]); check_chunks(&mut s, &[(1, 6)]); // Insertion before an existing chunk causes truncation of the new chunk. s.inbound_frame(0, &[7; 6]); check_chunks(&mut s, &[(0, 1), (1, 6)]); // Perfect overlap with existing slices has no effect. s.inbound_frame(0, &[8; 7]); check_chunks(&mut s, &[(0, 1), (1, 6)]); let mut buf = [0; 16]; assert_eq!(s.read(&mut buf[..]), 7); assert_eq!(buf[..7], [7, 6, 6, 6, 6, 6, 6]); } #[test] fn stream_rx_dedupe_new_tail() { let mut s = RxStreamOrderer::new(); s.inbound_frame(1, &[6; 6]); check_chunks(&mut s, &[(1, 6)]); // Insertion before an existing chunk causes truncation of the new chunk. s.inbound_frame(0, &[7; 6]); check_chunks(&mut s, &[(0, 1), (1, 6)]); // New data at the end causes the tail to be added to the first chunk, // replacing later chunks entirely. s.inbound_frame(0, &[9; 8]); check_chunks(&mut s, &[(0, 8)]); let mut buf = [0; 16]; assert_eq!(s.read(&mut buf[..]), 8); assert_eq!(buf[..8], [7, 9, 9, 9, 9, 9, 9, 9]); } #[test] fn stream_rx_dedupe_replace() { let mut s = RxStreamOrderer::new(); s.inbound_frame(2, &[6; 6]); check_chunks(&mut s, &[(2, 6)]); // Insertion before an existing chunk causes truncation of the new chunk. s.inbound_frame(1, &[7; 6]); check_chunks(&mut s, &[(1, 1), (2, 6)]); // New data at the start and end replaces all the slices. s.inbound_frame(0, &[9; 10]); check_chunks(&mut s, &[(0, 10)]); let mut buf = [0; 16]; assert_eq!(s.read(&mut buf[..]), 10); assert_eq!(buf[..10], [9; 10]); } #[test] fn trim_retired() { let mut s = RxStreamOrderer::new(); let mut buf = [0; 18]; s.inbound_frame(0, &[1; 10]); // Partially read slices are retained. assert_eq!(s.read(&mut buf[..6]), 6); check_chunks(&mut s, &[(0, 10)]); // Partially read slices are kept and so are added to. s.inbound_frame(3, &buf[..10]); check_chunks(&mut s, &[(0, 13)]); // Wholly read pieces are dropped. assert_eq!(s.read(&mut buf[..]), 7); assert!(s.data_ranges.is_empty()); // New data that overlaps with retired data is trimmed. s.inbound_frame(0, &buf[..]); check_chunks(&mut s, &[(13, 5)]); } #[test] fn stream_flowc_update() { let mut s = create_stream(1024 * RX_STREAM_DATA_WINDOW); let mut buf = vec![0u8; RECV_BUFFER_SIZE + 100]; // Make it overlarge assert!(!s.has_frames_to_write()); let big_buf = vec![0; RECV_BUFFER_SIZE]; s.inbound_stream_frame(false, 0, &big_buf).unwrap(); assert!(!s.has_frames_to_write()); assert_eq!(s.read(&mut buf).unwrap(), (RECV_BUFFER_SIZE, false)); assert!(!s.data_ready()); // flow msg generated! assert!(s.has_frames_to_write()); // consume it let mut builder = PacketBuilder::short(Encoder::new(), false, []); let mut token = Vec::new(); s.write_frame(&mut builder, &mut token, &mut FrameStats::default()); // it should be gone assert!(!s.has_frames_to_write()); } fn create_stream(session_fc: u64) -> RecvStream { let conn_events = ConnectionEvents::default(); RecvStream::new( StreamId::from(67), RX_STREAM_DATA_WINDOW, Rc::new(RefCell::new(ReceiverFlowControl::new((), session_fc))), conn_events, ) } #[test] fn stream_max_stream_data() { let mut s = create_stream(1024 * RX_STREAM_DATA_WINDOW); assert!(!s.has_frames_to_write()); let big_buf = vec![0; RECV_BUFFER_SIZE]; s.inbound_stream_frame(false, 0, &big_buf).unwrap(); s.inbound_stream_frame(false, RX_STREAM_DATA_WINDOW, &[1; 1]) .unwrap_err(); } #[test] fn stream_orderer_bytes_ready() { let mut rx_ord = RxStreamOrderer::new(); rx_ord.inbound_frame(0, &[1; 6]); assert_eq!(rx_ord.bytes_ready(), 6); assert_eq!(rx_ord.buffered(), 6); assert_eq!(rx_ord.retired(), 0); // read some so there's an offset into the first frame let mut buf = [0u8; 10]; rx_ord.read(&mut buf[..2]); assert_eq!(rx_ord.bytes_ready(), 4); assert_eq!(rx_ord.buffered(), 4); assert_eq!(rx_ord.retired(), 2); // an overlapping frame rx_ord.inbound_frame(5, &[2; 6]); assert_eq!(rx_ord.bytes_ready(), 9); assert_eq!(rx_ord.buffered(), 9); assert_eq!(rx_ord.retired(), 2); // a noncontig frame rx_ord.inbound_frame(20, &[3; 6]); assert_eq!(rx_ord.bytes_ready(), 9); assert_eq!(rx_ord.buffered(), 15); assert_eq!(rx_ord.retired(), 2); // an old frame rx_ord.inbound_frame(0, &[4; 2]); assert_eq!(rx_ord.bytes_ready(), 9); assert_eq!(rx_ord.buffered(), 15); assert_eq!(rx_ord.retired(), 2); } #[test] fn no_stream_flowc_event_after_exiting_recv() { let mut s = create_stream(1024 * RX_STREAM_DATA_WINDOW); let mut buf = vec![0; RECV_BUFFER_SIZE]; // Write from buf at first. s.inbound_stream_frame(false, 0, &buf).unwrap(); // Then read into it. s.read(&mut buf).unwrap(); assert!(s.has_frames_to_write()); s.inbound_stream_frame(true, RX_STREAM_DATA_WINDOW, &[]) .unwrap(); assert!(!s.has_frames_to_write()); } fn create_stream_with_fc( session_fc: Rc>>, fc_limit: u64, ) -> RecvStream { RecvStream::new( StreamId::from(567), fc_limit, session_fc, ConnectionEvents::default(), ) } fn create_stream_session_flow_control() -> (RecvStream, Rc>>) { assert!(RX_STREAM_DATA_WINDOW > u64::try_from(SESSION_WINDOW).unwrap()); let session_fc = Rc::new(RefCell::new(ReceiverFlowControl::new( (), u64::try_from(SESSION_WINDOW).unwrap(), ))); ( create_stream_with_fc(Rc::clone(&session_fc), RX_STREAM_DATA_WINDOW), session_fc, ) } #[test] fn session_flow_control() { let (mut s, session_fc) = create_stream_session_flow_control(); s.inbound_stream_frame(false, 0, &[0; SESSION_WINDOW]) .unwrap(); assert!(!session_fc.borrow().frame_needed()); // The buffer is big enough to hold SESSION_WINDOW, this will make sure that we always // read everything from he stream. let mut buf = [0; 2 * SESSION_WINDOW]; s.read(&mut buf).unwrap(); assert!(session_fc.borrow().frame_needed()); // consume it let mut builder = PacketBuilder::short(Encoder::new(), false, []); let mut token = Vec::new(); session_fc .borrow_mut() .write_frames(&mut builder, &mut token, &mut FrameStats::default()); // Switch to SizeKnown state s.inbound_stream_frame(true, 2 * u64::try_from(SESSION_WINDOW).unwrap() - 1, &[0]) .unwrap(); assert!(!session_fc.borrow().frame_needed()); // Receive new data that can be read. s.inbound_stream_frame( false, u64::try_from(SESSION_WINDOW).unwrap(), &[0; SESSION_WINDOW / 2 + 1], ) .unwrap(); assert!(!session_fc.borrow().frame_needed()); s.read(&mut buf).unwrap(); assert!(session_fc.borrow().frame_needed()); // consume it let mut builder = PacketBuilder::short(Encoder::new(), false, []); let mut token = Vec::new(); session_fc .borrow_mut() .write_frames(&mut builder, &mut token, &mut FrameStats::default()); // Test DataRecvd state let session_fc = Rc::new(RefCell::new(ReceiverFlowControl::new( (), u64::try_from(SESSION_WINDOW).unwrap(), ))); let mut s = RecvStream::new( StreamId::from(567), RX_STREAM_DATA_WINDOW, Rc::clone(&session_fc), ConnectionEvents::default(), ); s.inbound_stream_frame(true, 0, &[0; SESSION_WINDOW]) .unwrap(); assert!(!session_fc.borrow().frame_needed()); s.read(&mut buf).unwrap(); assert!(session_fc.borrow().frame_needed()); } #[test] fn session_flow_control_reset() { let (mut s, session_fc) = create_stream_session_flow_control(); s.inbound_stream_frame(false, 0, &[0; SESSION_WINDOW / 2]) .unwrap(); assert!(!session_fc.borrow().frame_needed()); s.reset( Error::NoError.code(), u64::try_from(SESSION_WINDOW).unwrap(), ) .unwrap(); assert!(session_fc.borrow().frame_needed()); } fn check_fc(fc: &ReceiverFlowControl, consumed: u64, retired: u64) { assert_eq!(fc.consumed(), consumed); assert_eq!(fc.retired(), retired); } /// Test consuming the flow control in `RecvStreamState::Recv` #[test] fn fc_state_recv_1() { const SW: u64 = 1024; const SW_US: usize = 1024; let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW))); let mut s = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4); check_fc(&fc.borrow(), 0, 0); check_fc(s.fc().unwrap(), 0, 0); s.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap(); check_fc(&fc.borrow(), SW / 4, 0); check_fc(s.fc().unwrap(), SW / 4, 0); } /// Test consuming the flow control in `RecvStreamState::Recv` /// with multiple streams #[test] fn fc_state_recv_2() { const SW: u64 = 1024; const SW_US: usize = 1024; let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW))); let mut s1 = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4); let mut s2 = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4); check_fc(&fc.borrow(), 0, 0); check_fc(s1.fc().unwrap(), 0, 0); check_fc(s2.fc().unwrap(), 0, 0); s1.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap(); check_fc(&fc.borrow(), SW / 4, 0); check_fc(s1.fc().unwrap(), SW / 4, 0); check_fc(s2.fc().unwrap(), 0, 0); s2.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s1.fc().unwrap(), SW / 4, 0); check_fc(s2.fc().unwrap(), SW / 4, 0); } /// Test retiring the flow control in `RecvStreamState::Recv` /// with multiple streams #[test] fn fc_state_recv_3() { const SW: u64 = 1024; const SW_US: usize = 1024; let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW))); let mut s1 = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4); let mut s2 = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4); check_fc(&fc.borrow(), 0, 0); check_fc(s1.fc().unwrap(), 0, 0); check_fc(s2.fc().unwrap(), 0, 0); s1.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap(); s2.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s1.fc().unwrap(), SW / 4, 0); check_fc(s2.fc().unwrap(), SW / 4, 0); // Read data let mut buf = [1; SW_US]; assert_eq!(s1.read(&mut buf).unwrap(), (SW_US / 4, false)); check_fc(&fc.borrow(), SW / 2, SW / 4); check_fc(s1.fc().unwrap(), SW / 4, SW / 4); check_fc(s2.fc().unwrap(), SW / 4, 0); assert_eq!(s2.read(&mut buf).unwrap(), (SW_US / 4, false)); check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s1.fc().unwrap(), SW / 4, SW / 4); check_fc(s2.fc().unwrap(), SW / 4, SW / 4); // Read when there is no more date to be read will not change fc. assert_eq!(s1.read(&mut buf).unwrap(), (0, false)); check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s1.fc().unwrap(), SW / 4, SW / 4); check_fc(s2.fc().unwrap(), SW / 4, SW / 4); // Receiving more data on a stream. s1.inbound_stream_frame(false, SW / 4, &[0; SW_US / 4]) .unwrap(); check_fc(&fc.borrow(), SW * 3 / 4, SW / 2); check_fc(s1.fc().unwrap(), SW / 2, SW / 4); check_fc(s2.fc().unwrap(), SW / 4, SW / 4); // Read data assert_eq!(s1.read(&mut buf).unwrap(), (SW_US / 4, false)); check_fc(&fc.borrow(), SW * 3 / 4, SW * 3 / 4); check_fc(s1.fc().unwrap(), SW / 2, SW / 2); check_fc(s2.fc().unwrap(), SW / 4, SW / 4); } /// Test consuming the flow control in `RecvStreamState::Recv` - duplicate data #[test] fn fc_state_recv_4() { const SW: u64 = 1024; const SW_US: usize = 1024; let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW))); let mut s = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4); check_fc(&fc.borrow(), 0, 0); check_fc(s.fc().unwrap(), 0, 0); s.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap(); check_fc(&fc.borrow(), SW / 4, 0); check_fc(s.fc().unwrap(), SW / 4, 0); // Receiving duplicate frames (already consumed data) will not cause an error or // change fc. s.inbound_stream_frame(false, 0, &[0; SW_US / 8]).unwrap(); check_fc(&fc.borrow(), SW / 4, 0); check_fc(s.fc().unwrap(), SW / 4, 0); } /// Test consuming the flow control in `RecvStreamState::Recv` - filling a gap in the /// data stream. #[test] fn fc_state_recv_5() { const SW: u64 = 1024; const SW_US: usize = 1024; let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW))); let mut s = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4); // Receive out of order data. s.inbound_stream_frame(false, SW / 8, &[0; SW_US / 8]) .unwrap(); check_fc(&fc.borrow(), SW / 4, 0); check_fc(s.fc().unwrap(), SW / 4, 0); // Filling in the gap will not change fc. s.inbound_stream_frame(false, 0, &[0; SW_US / 8]).unwrap(); check_fc(&fc.borrow(), SW / 4, 0); check_fc(s.fc().unwrap(), SW / 4, 0); } /// Test consuming the flow control in `RecvStreamState::Recv` - receiving frame past /// the flow control will cause an error. #[test] fn fc_state_recv_6() { const SW: u64 = 1024; const SW_US: usize = 1024; let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW))); let mut s = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4); // Receiving frame past the flow control will cause an error. assert_eq!( s.inbound_stream_frame(false, 0, &[0; SW_US * 3 / 4 + 1]), Err(Error::FlowControlError) ); } /// Test that the flow controls will send updates. #[test] fn fc_state_recv_7() { const SW: u64 = 1024; const SW_US: usize = 1024; let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW))); let mut s = create_stream_with_fc(Rc::clone(&fc), SW / 2); check_fc(&fc.borrow(), 0, 0); check_fc(s.fc().unwrap(), 0, 0); s.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap(); let mut buf = [1; SW_US]; assert_eq!(s.read(&mut buf).unwrap(), (SW_US / 4, false)); check_fc(&fc.borrow(), SW / 4, SW / 4); check_fc(s.fc().unwrap(), SW / 4, SW / 4); // Still no fc update needed. assert!(!fc.borrow().frame_needed()); assert!(!s.fc().unwrap().frame_needed()); // Receive one more byte that will cause a fc update after it is read. s.inbound_stream_frame(false, SW / 4, &[0]).unwrap(); check_fc(&fc.borrow(), SW / 4 + 1, SW / 4); check_fc(s.fc().unwrap(), SW / 4 + 1, SW / 4); // Only consuming data does not cause a fc update to be sent. assert!(!fc.borrow().frame_needed()); assert!(!s.fc().unwrap().frame_needed()); assert_eq!(s.read(&mut buf).unwrap(), (1, false)); check_fc(&fc.borrow(), SW / 4 + 1, SW / 4 + 1); check_fc(s.fc().unwrap(), SW / 4 + 1, SW / 4 + 1); // Data are retired and the sttream fc will send an update. assert!(!fc.borrow().frame_needed()); assert!(s.fc().unwrap().frame_needed()); // Receive more data to increase fc further. s.inbound_stream_frame(false, SW / 4, &[0; SW_US / 4]) .unwrap(); assert_eq!(s.read(&mut buf).unwrap(), (SW_US / 4 - 1, false)); check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s.fc().unwrap(), SW / 2, SW / 2); assert!(!fc.borrow().frame_needed()); assert!(s.fc().unwrap().frame_needed()); // Write the fc update frame let mut builder = PacketBuilder::short(Encoder::new(), false, []); let mut token = Vec::new(); let mut stats = FrameStats::default(); fc.borrow_mut() .write_frames(&mut builder, &mut token, &mut stats); assert_eq!(stats.max_data, 0); s.write_frame(&mut builder, &mut token, &mut stats); assert_eq!(stats.max_stream_data, 1); // Receive 1 byte that will case a session fc update after it is read. s.inbound_stream_frame(false, SW / 2, &[0]).unwrap(); assert_eq!(s.read(&mut buf).unwrap(), (1, false)); check_fc(&fc.borrow(), SW / 2 + 1, SW / 2 + 1); check_fc(s.fc().unwrap(), SW / 2 + 1, SW / 2 + 1); assert!(fc.borrow().frame_needed()); assert!(!s.fc().unwrap().frame_needed()); fc.borrow_mut() .write_frames(&mut builder, &mut token, &mut stats); assert_eq!(stats.max_data, 1); s.write_frame(&mut builder, &mut token, &mut stats); assert_eq!(stats.max_stream_data, 1); } /// Test flow control in `RecvStreamState::SizeKnown` #[test] fn fc_state_size_known() { const SW: u64 = 1024; const SW_US: usize = 1024; let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW))); let mut s = create_stream_with_fc(Rc::clone(&fc), SW); check_fc(&fc.borrow(), 0, 0); check_fc(s.fc().unwrap(), 0, 0); s.inbound_stream_frame(true, SW / 4, &[0; SW_US / 4]) .unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); // Receiving duplicate frames (already consumed data) will not cause an error or // change fc. s.inbound_stream_frame(true, SW / 4, &[0; SW_US / 4]) .unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); // The stream can still receive duplicate data without a fin bit. s.inbound_stream_frame(false, SW / 4, &[0; SW_US / 4]) .unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); // Receiving frame past the final size of a stream will return an error. assert_eq!( s.inbound_stream_frame(true, SW / 4, &[0; SW_US / 4 + 1]), Err(Error::FinalSizeError) ); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); // Add new data to the gap will not change fc. s.inbound_stream_frame(false, SW / 8, &[0; SW_US / 8]) .unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); // Fill the gap s.inbound_stream_frame(false, 0, &[0; SW_US / 8]).unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); // Read all data let mut buf = [1; SW_US]; assert_eq!(s.read(&mut buf).unwrap(), (SW_US / 2, true)); // the stream does not have fc any more. We can only check the session fc. check_fc(&fc.borrow(), SW / 2, SW / 2); assert!(s.fc().is_none()); } /// Test flow control in `RecvStreamState::DataRecvd` #[test] fn fc_state_data_recv() { const SW: u64 = 1024; const SW_US: usize = 1024; let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW))); let mut s = create_stream_with_fc(Rc::clone(&fc), SW); check_fc(&fc.borrow(), 0, 0); check_fc(s.fc().unwrap(), 0, 0); s.inbound_stream_frame(true, 0, &[0; SW_US / 2]).unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); // Receiving duplicate frames (already consumed data) will not cause an error or // change fc. s.inbound_stream_frame(true, SW / 4, &[0; SW_US / 4]) .unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); // The stream can still receive duplicate data without a fin bit. s.inbound_stream_frame(false, SW / 4, &[0; SW_US / 4]) .unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); // Receiving frame past the final size of a stream will return an error. assert_eq!( s.inbound_stream_frame(true, SW / 4, &[0; SW_US / 4 + 1]), Err(Error::FinalSizeError) ); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); // Read all data let mut buf = [1; SW_US]; assert_eq!(s.read(&mut buf).unwrap(), (SW_US / 2, true)); // the stream does not have fc any more. We can only check the session fc. check_fc(&fc.borrow(), SW / 2, SW / 2); assert!(s.fc().is_none()); } /// Test flow control in `RecvStreamState::DataRead` #[test] fn fc_state_data_read() { const SW: u64 = 1024; const SW_US: usize = 1024; let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW))); let mut s = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4); check_fc(&fc.borrow(), 0, 0); check_fc(s.fc().unwrap(), 0, 0); s.inbound_stream_frame(true, 0, &[0; SW_US / 2]).unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); let mut buf = [1; SW_US]; assert_eq!(s.read(&mut buf).unwrap(), (SW_US / 2, true)); // the stream does not have fc any more. We can only check the session fc. check_fc(&fc.borrow(), SW / 2, SW / 2); assert!(s.fc().is_none()); // Receiving duplicate frames (already consumed data) will not cause an error or // change fc. s.inbound_stream_frame(true, 0, &[0; SW_US / 2]).unwrap(); // the stream does not have fc any more. We can only check the session fc. check_fc(&fc.borrow(), SW / 2, SW / 2); assert!(s.fc().is_none()); // Receiving frame past the final size of a stream or the stream's fc limit // will NOT return an error. s.inbound_stream_frame(true, 0, &[0; SW_US / 2 + 1]) .unwrap(); s.inbound_stream_frame(true, 0, &[0; SW_US * 3 / 4 + 1]) .unwrap(); check_fc(&fc.borrow(), SW / 2, SW / 2); assert!(s.fc().is_none()); } /// Test flow control in `RecvStreamState::AbortReading` and final size is known #[test] fn fc_state_abort_reading_1() { const SW: u64 = 1024; const SW_US: usize = 1024; let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW))); let mut s = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4); check_fc(&fc.borrow(), 0, 0); check_fc(s.fc().unwrap(), 0, 0); s.inbound_stream_frame(true, SW / 4, &[0; SW_US / 4]) .unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); s.stop_sending(Error::NoError.code()); // All data will de retired check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s.fc().unwrap(), SW / 2, SW / 2); // Receiving duplicate frames (already consumed data) will not cause an error or // change fc. s.inbound_stream_frame(true, 0, &[0; SW_US / 2]).unwrap(); check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s.fc().unwrap(), SW / 2, SW / 2); // The stream can still receive duplicate data without a fin bit. s.inbound_stream_frame(false, SW / 4, &[0; SW_US / 4]) .unwrap(); check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s.fc().unwrap(), SW / 2, SW / 2); // Receiving frame past the final size of a stream will return an error. assert_eq!( s.inbound_stream_frame(true, SW / 4, &[0; SW_US / 4 + 1]), Err(Error::FinalSizeError) ); check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s.fc().unwrap(), SW / 2, SW / 2); } /// Test flow control in `RecvStreamState::AbortReading` and final size is unknown #[test] fn fc_state_abort_reading_2() { const SW: u64 = 1024; const SW_US: usize = 1024; let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW))); let mut s = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4); check_fc(&fc.borrow(), 0, 0); check_fc(s.fc().unwrap(), 0, 0); s.inbound_stream_frame(false, 0, &[0; SW_US / 2]).unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); s.stop_sending(Error::NoError.code()); // All data will de retired check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s.fc().unwrap(), SW / 2, SW / 2); // Receiving duplicate frames (already consumed data) will not cause an error or // change fc. s.inbound_stream_frame(false, 0, &[0; SW_US / 2]).unwrap(); check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s.fc().unwrap(), SW / 2, SW / 2); // Receiving data past the flow control limit will cause an error. assert_eq!( s.inbound_stream_frame(false, 0, &[0; SW_US * 3 / 4 + 1]), Err(Error::FlowControlError) ); // The stream can still receive duplicate data without a fin bit. s.inbound_stream_frame(false, SW / 4, &[0; SW_US / 4]) .unwrap(); check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s.fc().unwrap(), SW / 2, SW / 2); // Receiving more data will case the data to be retired. // The stream can still receive duplicate data without a fin bit. s.inbound_stream_frame(false, SW / 2, &[0; 10]).unwrap(); check_fc(&fc.borrow(), SW / 2 + 10, SW / 2 + 10); check_fc(s.fc().unwrap(), SW / 2 + 10, SW / 2 + 10); // We can still receive the final size. s.inbound_stream_frame(true, SW / 2, &[0; 20]).unwrap(); check_fc(&fc.borrow(), SW / 2 + 20, SW / 2 + 20); check_fc(s.fc().unwrap(), SW / 2 + 20, SW / 2 + 20); // Receiving frame past the final size of a stream will return an error. assert_eq!( s.inbound_stream_frame(true, SW / 2, &[0; 21]), Err(Error::FinalSizeError) ); check_fc(&fc.borrow(), SW / 2 + 20, SW / 2 + 20); check_fc(s.fc().unwrap(), SW / 2 + 20, SW / 2 + 20); } /// Test flow control in `RecvStreamState::WaitForReset` #[test] fn fc_state_wait_for_reset() { const SW: u64 = 1024; const SW_US: usize = 1024; let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW))); let mut s = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4); check_fc(&fc.borrow(), 0, 0); check_fc(s.fc().unwrap(), 0, 0); s.inbound_stream_frame(false, 0, &[0; SW_US / 2]).unwrap(); check_fc(&fc.borrow(), SW / 2, 0); check_fc(s.fc().unwrap(), SW / 2, 0); s.stop_sending(Error::NoError.code()); check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s.fc().unwrap(), SW / 2, SW / 2); s.stop_sending_acked(); check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s.fc().unwrap(), SW / 2, SW / 2); // Receiving duplicate frames (already consumed data) will not cause an error or // change fc. s.inbound_stream_frame(false, 0, &[0; SW_US / 2]).unwrap(); check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s.fc().unwrap(), SW / 2, SW / 2); // Receiving data past the flow control limit will cause an error. assert_eq!( s.inbound_stream_frame(false, 0, &[0; SW_US * 3 / 4 + 1]), Err(Error::FlowControlError) ); // The stream can still receive duplicate data without a fin bit. s.inbound_stream_frame(false, SW / 4, &[0; SW_US / 4]) .unwrap(); check_fc(&fc.borrow(), SW / 2, SW / 2); check_fc(s.fc().unwrap(), SW / 2, SW / 2); // Receiving more data will case the data to be retired. // The stream can still receive duplicate data without a fin bit. s.inbound_stream_frame(false, SW / 2, &[0; 10]).unwrap(); check_fc(&fc.borrow(), SW / 2 + 10, SW / 2 + 10); check_fc(s.fc().unwrap(), SW / 2 + 10, SW / 2 + 10); } }