summaryrefslogtreecommitdiffstats
path: root/vendor/h2/src/proto/streams/send.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/h2/src/proto/streams/send.rs')
-rw-r--r--vendor/h2/src/proto/streams/send.rs51
1 files changed, 36 insertions, 15 deletions
diff --git a/vendor/h2/src/proto/streams/send.rs b/vendor/h2/src/proto/streams/send.rs
index 20aba38d4..626e61a33 100644
--- a/vendor/h2/src/proto/streams/send.rs
+++ b/vendor/h2/src/proto/streams/send.rs
@@ -4,7 +4,7 @@ use super::{
};
use crate::codec::UserError;
use crate::frame::{self, Reason};
-use crate::proto::{Error, Initiator};
+use crate::proto::{self, Error, Initiator};
use bytes::Buf;
use tokio::io::AsyncWrite;
@@ -143,22 +143,27 @@ impl Send {
// Update the state
stream.state.send_open(end_stream)?;
- if counts.peer().is_local_init(frame.stream_id()) {
- // If we're waiting on a PushPromise anyway
- // handle potentially queueing the stream at that point
- if !stream.is_pending_push {
- if counts.can_inc_num_send_streams() {
- counts.inc_num_send_streams(stream);
- } else {
- self.prioritize.queue_open(stream);
- }
- }
+ let mut pending_open = false;
+ if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push {
+ self.prioritize.queue_open(stream);
+ pending_open = true;
}
// Queue the frame for sending
+ //
+ // This call expects that, since new streams are in the open queue, new
+ // streams won't be pushed on pending_send.
self.prioritize
.queue_frame(frame.into(), buffer, stream, task);
+ // Need to notify the connection when pushing onto pending_open since
+ // queue_frame only notifies for pending_send.
+ if pending_open {
+ if let Some(task) = task.take() {
+ task.wake();
+ }
+ }
+
Ok(())
}
@@ -458,10 +463,21 @@ impl Send {
tracing::trace!("decrementing all windows; dec={}", dec);
let mut total_reclaimed = 0;
- store.for_each(|mut stream| {
+ store.try_for_each(|mut stream| {
let stream = &mut *stream;
- stream.send_flow.dec_send_window(dec);
+ tracing::trace!(
+ "decrementing stream window; id={:?}; decr={}; flow={:?}",
+ stream.id,
+ dec,
+ stream.send_flow
+ );
+
+ // TODO: this decrement can underflow based on received frames!
+ stream
+ .send_flow
+ .dec_send_window(dec)
+ .map_err(proto::Error::library_go_away)?;
// It's possible that decreasing the window causes
// `window_size` (the stream-specific window) to fall below
@@ -474,7 +490,10 @@ impl Send {
let reclaimed = if available > window_size {
// Drop down to `window_size`.
let reclaim = available - window_size;
- stream.send_flow.claim_capacity(reclaim);
+ stream
+ .send_flow
+ .claim_capacity(reclaim)
+ .map_err(proto::Error::library_go_away)?;
total_reclaimed += reclaim;
reclaim
} else {
@@ -492,7 +511,9 @@ impl Send {
// TODO: Should this notify the producer when the capacity
// of a stream is reduced? Maybe it should if the capacity
// is reduced to zero, allowing the producer to stop work.
- });
+
+ Ok::<_, proto::Error>(())
+ })?;
self.prioritize
.assign_connection_capacity(total_reclaimed, store, counts);