summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-transport/src/streams.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/neqo-transport/src/streams.rs')
-rw-r--r--third_party/rust/neqo-transport/src/streams.rs49
1 files changed, 34 insertions, 15 deletions
diff --git a/third_party/rust/neqo-transport/src/streams.rs b/third_party/rust/neqo-transport/src/streams.rs
index 7cbb29ce02..d8662afa3b 100644
--- a/third_party/rust/neqo-transport/src/streams.rs
+++ b/third_party/rust/neqo-transport/src/streams.rs
@@ -95,6 +95,7 @@ impl Streams {
}
}
+ #[must_use]
pub fn is_stream_id_allowed(&self, stream_id: StreamId) -> bool {
self.remote_stream_limits[stream_id.stream_type()].is_allowed(stream_id)
}
@@ -118,7 +119,9 @@ impl Streams {
self.local_stream_limits = LocalStreamLimits::new(self.role);
}
- pub fn input_frame(&mut self, frame: Frame, stats: &mut FrameStats) -> Res<()> {
+ /// # Errors
+ /// When the frame is invalid.
+ pub fn input_frame(&mut self, frame: &Frame, stats: &mut FrameStats) -> Res<()> {
match frame {
Frame::ResetStream {
stream_id,
@@ -126,8 +129,8 @@ impl Streams {
final_size,
} => {
stats.reset_stream += 1;
- if let (_, Some(rs)) = self.obtain_stream(stream_id)? {
- rs.reset(application_error_code, final_size)?;
+ if let (_, Some(rs)) = self.obtain_stream(*stream_id)? {
+ rs.reset(*application_error_code, *final_size)?;
}
}
Frame::StopSending {
@@ -136,9 +139,9 @@ impl Streams {
} => {
stats.stop_sending += 1;
self.events
- .send_stream_stop_sending(stream_id, application_error_code);
- if let (Some(ss), _) = self.obtain_stream(stream_id)? {
- ss.reset(application_error_code);
+ .send_stream_stop_sending(*stream_id, *application_error_code);
+ if let (Some(ss), _) = self.obtain_stream(*stream_id)? {
+ ss.reset(*application_error_code);
}
}
Frame::Stream {
@@ -149,13 +152,13 @@ impl Streams {
..
} => {
stats.stream += 1;
- if let (_, Some(rs)) = self.obtain_stream(stream_id)? {
- rs.inbound_stream_frame(fin, offset, data)?;
+ if let (_, Some(rs)) = self.obtain_stream(*stream_id)? {
+ rs.inbound_stream_frame(*fin, *offset, data)?;
}
}
Frame::MaxData { maximum_data } => {
stats.max_data += 1;
- self.handle_max_data(maximum_data);
+ self.handle_max_data(*maximum_data);
}
Frame::MaxStreamData {
stream_id,
@@ -163,12 +166,12 @@ impl Streams {
} => {
qtrace!(
"Stream {} Received MaxStreamData {}",
- stream_id,
- maximum_stream_data
+ *stream_id,
+ *maximum_stream_data
);
stats.max_stream_data += 1;
- if let (Some(ss), _) = self.obtain_stream(stream_id)? {
- ss.set_max_stream_data(maximum_stream_data);
+ if let (Some(ss), _) = self.obtain_stream(*stream_id)? {
+ ss.set_max_stream_data(*maximum_stream_data);
}
}
Frame::MaxStreams {
@@ -176,7 +179,7 @@ impl Streams {
maximum_streams,
} => {
stats.max_streams += 1;
- self.handle_max_streams(stream_type, maximum_streams);
+ self.handle_max_streams(*stream_type, *maximum_streams);
}
Frame::DataBlocked { data_limit } => {
// Should never happen since we set data limit to max
@@ -193,7 +196,7 @@ impl Streams {
return Err(Error::StreamStateError);
}
- if let (_, Some(rs)) = self.obtain_stream(stream_id)? {
+ if let (_, Some(rs)) = self.obtain_stream(*stream_id)? {
rs.send_flowc_update();
}
}
@@ -401,6 +404,8 @@ impl Streams {
/// Get or make a stream, and implicitly open additional streams as
/// indicated by its stream id.
+ /// # Errors
+ /// When the stream cannot be created due to stream limits.
pub fn obtain_stream(
&mut self,
stream_id: StreamId,
@@ -412,14 +417,20 @@ impl Streams {
))
}
+ /// # Errors
+ /// When the stream does not exist.
pub fn set_sendorder(&mut self, stream_id: StreamId, sendorder: Option<SendOrder>) -> Res<()> {
self.send.set_sendorder(stream_id, sendorder)
}
+ /// # Errors
+ /// When the stream does not exist.
pub fn set_fairness(&mut self, stream_id: StreamId, fairness: bool) -> Res<()> {
self.send.set_fairness(stream_id, fairness)
}
+ /// # Errors
+ /// When a stream cannot be created, which might be temporary.
pub fn stream_create(&mut self, st: StreamType) -> Res<StreamId> {
match self.local_stream_limits.take_stream_id(st) {
None => Err(Error::StreamLimitError),
@@ -525,18 +536,26 @@ impl Streams {
}
}
+ /// # Errors
+ /// When the stream does not exist.
pub fn get_send_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut SendStream> {
self.send.get_mut(stream_id)
}
+ /// # Errors
+ /// When the stream does not exist.
pub fn get_send_stream(&self, stream_id: StreamId) -> Res<&SendStream> {
self.send.get(stream_id)
}
+ /// # Errors
+ /// When the stream does not exist.
pub fn get_recv_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut RecvStream> {
self.recv.get_mut(stream_id)
}
+ /// # Errors
+ /// When the stream does not exist.
pub fn keep_alive(&mut self, stream_id: StreamId, keep: bool) -> Res<()> {
self.recv.keep_alive(stream_id, keep)
}