summaryrefslogtreecommitdiffstats
path: root/vendor/h2/src/proto/streams/prioritize.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/h2/src/proto/streams/prioritize.rs')
-rw-r--r--vendor/h2/src/proto/streams/prioritize.rs50
1 files changed, 36 insertions, 14 deletions
diff --git a/vendor/h2/src/proto/streams/prioritize.rs b/vendor/h2/src/proto/streams/prioritize.rs
index 88204ddcc..3196049a4 100644
--- a/vendor/h2/src/proto/streams/prioritize.rs
+++ b/vendor/h2/src/proto/streams/prioritize.rs
@@ -87,7 +87,9 @@ impl Prioritize {
flow.inc_window(config.remote_init_window_sz)
.expect("invalid initial window size");
- flow.assign_capacity(config.remote_init_window_sz);
+ // TODO: proper error handling
+ let _res = flow.assign_capacity(config.remote_init_window_sz);
+ debug_assert!(_res.is_ok());
tracing::trace!("Prioritize::new; flow={:?}", flow);
@@ -253,7 +255,9 @@ impl Prioritize {
if available as usize > capacity {
let diff = available - capacity as WindowSize;
- stream.send_flow.claim_capacity(diff);
+ // TODO: proper error handling
+ let _res = stream.send_flow.claim_capacity(diff);
+ debug_assert!(_res.is_ok());
self.assign_connection_capacity(diff, stream, counts);
}
@@ -324,7 +328,9 @@ impl Prioritize {
pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
let available = stream.send_flow.available().as_size();
if available > 0 {
- stream.send_flow.claim_capacity(available);
+ // TODO: proper error handling
+ let _res = stream.send_flow.claim_capacity(available);
+ debug_assert!(_res.is_ok());
// Re-assign all capacity to the connection
self.assign_connection_capacity(available, stream, counts);
}
@@ -337,7 +343,9 @@ impl Prioritize {
if stream.requested_send_capacity as usize > stream.buffered_send_data {
let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize;
- stream.send_flow.claim_capacity(reserved);
+ // TODO: proper error handling
+ let _res = stream.send_flow.claim_capacity(reserved);
+ debug_assert!(_res.is_ok());
self.assign_connection_capacity(reserved, stream, counts);
}
}
@@ -363,7 +371,9 @@ impl Prioritize {
let span = tracing::trace_span!("assign_connection_capacity", inc);
let _e = span.enter();
- self.flow.assign_capacity(inc);
+ // TODO: proper error handling
+ let _res = self.flow.assign_capacity(inc);
+ debug_assert!(_res.is_ok());
// Assign newly acquired capacity to streams pending capacity.
while self.flow.available() > 0 {
@@ -443,7 +453,9 @@ impl Prioritize {
stream.assign_capacity(assign, self.max_buffer_size);
// Claim the capacity from the connection
- self.flow.claim_capacity(assign);
+ // TODO: proper error handling
+ let _res = self.flow.claim_capacity(assign);
+ debug_assert!(_res.is_ok());
}
tracing::trace!(
@@ -508,7 +520,9 @@ impl Prioritize {
tracing::trace!("poll_complete");
loop {
- self.schedule_pending_open(store, counts);
+ if let Some(mut stream) = self.pop_pending_open(store, counts) {
+ self.pending_send.push_front(&mut stream);
+ }
match self.pop_frame(buffer, store, max_frame_len, counts) {
Some(frame) => {
@@ -763,12 +777,16 @@ impl Prioritize {
// Assign the capacity back to the connection that
// was just consumed from the stream in the previous
// line.
- self.flow.assign_capacity(len);
+ // TODO: proper error handling
+ let _res = self.flow.assign_capacity(len);
+ debug_assert!(_res.is_ok());
});
let (eos, len) = tracing::trace_span!("updating connection flow")
.in_scope(|| {
- self.flow.send_data(len);
+ // TODO: proper error handling
+ let _res = self.flow.send_data(len);
+ debug_assert!(_res.is_ok());
// Wrap the frame's data payload to ensure that the
// correct amount of data gets written.
@@ -858,20 +876,24 @@ impl Prioritize {
}
}
- fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
+ fn pop_pending_open<'s>(
+ &mut self,
+ store: &'s mut Store,
+ counts: &mut Counts,
+ ) -> Option<store::Ptr<'s>> {
tracing::trace!("schedule_pending_open");
// check for any pending open streams
- while counts.can_inc_num_send_streams() {
+ if counts.can_inc_num_send_streams() {
if let Some(mut stream) = self.pending_open.pop(store) {
tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
counts.inc_num_send_streams(&mut stream);
- self.pending_send.push(&mut stream);
stream.notify_send();
- } else {
- return;
+ return Some(stream);
}
}
+
+ None
}
}