diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-19 09:26:03 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-19 09:26:03 +0000 |
commit | 9918693037dce8aa4bb6f08741b6812923486c18 (patch) | |
tree | 21d2b40bec7e6a7ea664acee056eb3d08e15a1cf /vendor/h2/src/proto/streams/send.rs | |
parent | Releasing progress-linux version 1.75.0+dfsg1-5~progress7.99u1. (diff) | |
download | rustc-9918693037dce8aa4bb6f08741b6812923486c18.tar.xz rustc-9918693037dce8aa4bb6f08741b6812923486c18.zip |
Merging upstream version 1.76.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/h2/src/proto/streams/send.rs')
-rw-r--r-- | vendor/h2/src/proto/streams/send.rs | 51 |
1 files changed, 36 insertions, 15 deletions
diff --git a/vendor/h2/src/proto/streams/send.rs b/vendor/h2/src/proto/streams/send.rs index 20aba38d4..626e61a33 100644 --- a/vendor/h2/src/proto/streams/send.rs +++ b/vendor/h2/src/proto/streams/send.rs @@ -4,7 +4,7 @@ use super::{ }; use crate::codec::UserError; use crate::frame::{self, Reason}; -use crate::proto::{Error, Initiator}; +use crate::proto::{self, Error, Initiator}; use bytes::Buf; use tokio::io::AsyncWrite; @@ -143,22 +143,27 @@ impl Send { // Update the state stream.state.send_open(end_stream)?; - if counts.peer().is_local_init(frame.stream_id()) { - // If we're waiting on a PushPromise anyway - // handle potentially queueing the stream at that point - if !stream.is_pending_push { - if counts.can_inc_num_send_streams() { - counts.inc_num_send_streams(stream); - } else { - self.prioritize.queue_open(stream); - } - } + let mut pending_open = false; + if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push { + self.prioritize.queue_open(stream); + pending_open = true; } // Queue the frame for sending + // + // This call expects that, since new streams are in the open queue, new + // streams won't be pushed on pending_send. self.prioritize .queue_frame(frame.into(), buffer, stream, task); + // Need to notify the connection when pushing onto pending_open since + // queue_frame only notifies for pending_send. + if pending_open { + if let Some(task) = task.take() { + task.wake(); + } + } + Ok(()) } @@ -458,10 +463,21 @@ impl Send { tracing::trace!("decrementing all windows; dec={}", dec); let mut total_reclaimed = 0; - store.for_each(|mut stream| { + store.try_for_each(|mut stream| { let stream = &mut *stream; - stream.send_flow.dec_send_window(dec); + tracing::trace!( + "decrementing stream window; id={:?}; decr={}; flow={:?}", + stream.id, + dec, + stream.send_flow + ); + + // TODO: this decrement can underflow based on received frames! + stream + .send_flow + .dec_send_window(dec) + .map_err(proto::Error::library_go_away)?; // It's possible that decreasing the window causes // `window_size` (the stream-specific window) to fall below @@ -474,7 +490,10 @@ impl Send { let reclaimed = if available > window_size { // Drop down to `window_size`. let reclaim = available - window_size; - stream.send_flow.claim_capacity(reclaim); + stream + .send_flow + .claim_capacity(reclaim) + .map_err(proto::Error::library_go_away)?; total_reclaimed += reclaim; reclaim } else { @@ -492,7 +511,9 @@ impl Send { // TODO: Should this notify the producer when the capacity // of a stream is reduced? Maybe it should if the capacity // is reduced to zero, allowing the producer to stop work. - }); + + Ok::<_, proto::Error>(()) + })?; self.prioritize .assign_connection_capacity(total_reclaimed, store, counts); |