diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-19 09:25:56 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-06-19 09:25:56 +0000 |
commit | 018c4950b9406055dec02ef0fb52f132e2bb1e2c (patch) | |
tree | a835ebdf2088ef88fa681f8fad45f09922c1ae9a /vendor/h2/src | |
parent | Adding debian version 1.75.0+dfsg1-5. (diff) | |
download | rustc-018c4950b9406055dec02ef0fb52f132e2bb1e2c.tar.xz rustc-018c4950b9406055dec02ef0fb52f132e2bb1e2c.zip |
Merging upstream version 1.76.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/h2/src')
24 files changed, 362 insertions, 182 deletions
diff --git a/vendor/h2/src/client.rs b/vendor/h2/src/client.rs index 4147e8a46..35cfc1414 100644 --- a/vendor/h2/src/client.rs +++ b/vendor/h2/src/client.rs @@ -510,8 +510,10 @@ where self.inner .send_request(request, end_of_stream, self.pending.as_ref()) .map_err(Into::into) - .map(|stream| { - if stream.is_pending_open() { + .map(|(stream, is_full)| { + if stream.is_pending_open() && is_full { + // Only prevent sending another request when the request queue + // is not full. self.pending = Some(stream.clone_to_opaque()); } @@ -1021,7 +1023,7 @@ impl Builder { /// stream have been written to the connection, the send buffer capacity /// will be freed up again. /// - /// The default is currently ~400MB, but may change. + /// The default is currently ~400KB, but may change. /// /// # Panics /// @@ -1070,6 +1072,39 @@ impl Builder { self } + /// Sets the header table size. + /// + /// This setting informs the peer of the maximum size of the header compression + /// table used to encode header blocks, in octets. The encoder may select any value + /// equal to or less than the header table size specified by the sender. + /// + /// The default value is 4,096. + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::client::*; + /// # use bytes::Bytes; + /// # + /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) + /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error> + /// # { + /// // `client_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let client_fut = Builder::new() + /// .header_table_size(1_000_000) + /// .handshake(my_io); + /// # client_fut.await + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn header_table_size(&mut self, size: u32) -> &mut Self { + self.settings.set_header_table_size(Some(size)); + self + } + /// Sets the first stream ID to something other than 1. #[cfg(feature = "unstable")] pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self { diff --git a/vendor/h2/src/codec/framed_read.rs b/vendor/h2/src/codec/framed_read.rs index a874d7732..3b0030d93 100644 --- a/vendor/h2/src/codec/framed_read.rs +++ b/vendor/h2/src/codec/framed_read.rs @@ -88,6 +88,12 @@ impl<T> FramedRead<T> { pub fn set_max_header_list_size(&mut self, val: usize) { self.max_header_list_size = val; } + + /// Update the header table size setting. + #[inline] + pub fn set_header_table_size(&mut self, val: usize) { + self.hpack.queue_size_update(val); + } } /// Decodes a frame. diff --git a/vendor/h2/src/codec/framed_write.rs b/vendor/h2/src/codec/framed_write.rs index 4b1b4accc..c88af02da 100644 --- a/vendor/h2/src/codec/framed_write.rs +++ b/vendor/h2/src/codec/framed_write.rs @@ -7,8 +7,9 @@ use bytes::{Buf, BufMut, BytesMut}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio_util::io::poll_write_buf; -use std::io::{self, Cursor, IoSlice}; +use std::io::{self, Cursor}; // A macro to get around a method needing to borrow &mut self macro_rules! limited_write_buf { @@ -45,8 +46,11 @@ struct Encoder<B> { /// Max frame size, this is specified by the peer max_frame_size: FrameSize, - /// Whether or not the wrapped `AsyncWrite` supports vectored IO. - is_write_vectored: bool, + /// Chain payloads bigger than this. + chain_threshold: usize, + + /// Min buffer required to attempt to write a frame + min_buffer_capacity: usize, } #[derive(Debug)] @@ -61,14 +65,16 @@ enum Next<B> { /// frame that big. const DEFAULT_BUFFER_CAPACITY: usize = 16 * 1_024; -/// Min buffer required to attempt to write a frame -const MIN_BUFFER_CAPACITY: usize = frame::HEADER_LEN + CHAIN_THRESHOLD; - -/// Chain payloads bigger than this. The remote will never advertise a max frame -/// size less than this (well, the spec says the max frame size can't be less -/// than 16kb, so not even close). +/// Chain payloads bigger than this when vectored I/O is enabled. The remote +/// will never advertise a max frame size less than this (well, the spec says +/// the max frame size can't be less than 16kb, so not even close). const CHAIN_THRESHOLD: usize = 256; +/// Chain payloads bigger than this when vectored I/O is **not** enabled. +/// A larger value in this scenario will reduce the number of small and +/// fragmented data being sent, and hereby improve the throughput. +const CHAIN_THRESHOLD_WITHOUT_VECTORED_IO: usize = 1024; + // TODO: Make generic impl<T, B> FramedWrite<T, B> where @@ -76,7 +82,11 @@ where B: Buf, { pub fn new(inner: T) -> FramedWrite<T, B> { - let is_write_vectored = inner.is_write_vectored(); + let chain_threshold = if inner.is_write_vectored() { + CHAIN_THRESHOLD + } else { + CHAIN_THRESHOLD_WITHOUT_VECTORED_IO + }; FramedWrite { inner, encoder: Encoder { @@ -85,7 +95,8 @@ where next: None, last_data_frame: None, max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, - is_write_vectored, + chain_threshold, + min_buffer_capacity: chain_threshold + frame::HEADER_LEN, }, } } @@ -126,23 +137,17 @@ where Some(Next::Data(ref mut frame)) => { tracing::trace!(queued_data_frame = true); let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut()); - ready!(write( - &mut self.inner, - self.encoder.is_write_vectored, - &mut buf, - cx, - ))? + ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))? } _ => { tracing::trace!(queued_data_frame = false); - ready!(write( - &mut self.inner, - self.encoder.is_write_vectored, - &mut self.encoder.buf, + ready!(poll_write_buf( + Pin::new(&mut self.inner), cx, + &mut self.encoder.buf ))? } - } + }; } match self.encoder.unset_frame() { @@ -165,30 +170,6 @@ where } } -fn write<T, B>( - writer: &mut T, - is_write_vectored: bool, - buf: &mut B, - cx: &mut Context<'_>, -) -> Poll<io::Result<()>> -where - T: AsyncWrite + Unpin, - B: Buf, -{ - // TODO(eliza): when tokio-util 0.5.1 is released, this - // could just use `poll_write_buf`... - const MAX_IOVS: usize = 64; - let n = if is_write_vectored { - let mut bufs = [IoSlice::new(&[]); MAX_IOVS]; - let cnt = buf.chunks_vectored(&mut bufs); - ready!(Pin::new(writer).poll_write_vectored(cx, &bufs[..cnt]))? - } else { - ready!(Pin::new(writer).poll_write(cx, buf.chunk()))? - }; - buf.advance(n); - Ok(()).into() -} - #[must_use] enum ControlFlow { Continue, @@ -240,12 +221,17 @@ where return Err(PayloadTooBig); } - if len >= CHAIN_THRESHOLD { + if len >= self.chain_threshold { let head = v.head(); // Encode the frame head to the buffer head.encode(len, self.buf.get_mut()); + if self.buf.get_ref().remaining() < self.chain_threshold { + let extra_bytes = self.chain_threshold - self.buf.remaining(); + self.buf.get_mut().put(v.payload_mut().take(extra_bytes)); + } + // Save the data frame self.next = Some(Next::Data(v)); } else { @@ -305,7 +291,9 @@ where } fn has_capacity(&self) -> bool { - self.next.is_none() && self.buf.get_ref().remaining_mut() >= MIN_BUFFER_CAPACITY + self.next.is_none() + && (self.buf.get_ref().capacity() - self.buf.get_ref().len() + >= self.min_buffer_capacity) } fn is_empty(&self) -> bool { diff --git a/vendor/h2/src/codec/mod.rs b/vendor/h2/src/codec/mod.rs index 359adf6e4..6cbdc1e18 100644 --- a/vendor/h2/src/codec/mod.rs +++ b/vendor/h2/src/codec/mod.rs @@ -95,6 +95,11 @@ impl<T, B> Codec<T, B> { self.framed_write().set_header_table_size(val) } + /// Set the decoder header table size size. + pub fn set_recv_header_table_size(&mut self, val: usize) { + self.inner.set_header_table_size(val) + } + /// Set the max header list size that can be received. pub fn set_max_recv_header_list_size(&mut self, val: usize) { self.inner.set_max_header_list_size(val); diff --git a/vendor/h2/src/frame/data.rs b/vendor/h2/src/frame/data.rs index d0cdf5f69..5ed3c31b5 100644 --- a/vendor/h2/src/frame/data.rs +++ b/vendor/h2/src/frame/data.rs @@ -148,7 +148,7 @@ impl<T: Buf> Data<T> { /// /// Panics if `dst` cannot contain the data frame. pub(crate) fn encode_chunk<U: BufMut>(&mut self, dst: &mut U) { - let len = self.data.remaining() as usize; + let len = self.data.remaining(); assert!(dst.remaining_mut() >= len); diff --git a/vendor/h2/src/frame/mod.rs b/vendor/h2/src/frame/mod.rs index 570a162a8..0e8e7035c 100644 --- a/vendor/h2/src/frame/mod.rs +++ b/vendor/h2/src/frame/mod.rs @@ -69,7 +69,7 @@ pub use crate::hpack::BytesStr; pub use self::settings::{ DEFAULT_INITIAL_WINDOW_SIZE, DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, - MAX_INITIAL_WINDOW_SIZE, MAX_MAX_FRAME_SIZE, + MAX_MAX_FRAME_SIZE, }; pub type FrameSize = u32; diff --git a/vendor/h2/src/frame/settings.rs b/vendor/h2/src/frame/settings.rs index 0c913f059..484498a9d 100644 --- a/vendor/h2/src/frame/settings.rs +++ b/vendor/h2/src/frame/settings.rs @@ -121,11 +121,9 @@ impl Settings { self.header_table_size } - /* pub fn set_header_table_size(&mut self, size: Option<u32>) { self.header_table_size = size; } - */ pub fn load(head: Head, payload: &[u8]) -> Result<Settings, Error> { use self::Setting::*; diff --git a/vendor/h2/src/hpack/decoder.rs b/vendor/h2/src/hpack/decoder.rs index b45c37927..960cbb143 100644 --- a/vendor/h2/src/hpack/decoder.rs +++ b/vendor/h2/src/hpack/decoder.rs @@ -447,7 +447,7 @@ fn decode_int<B: Buf>(buf: &mut B, prefix_size: u8) -> Result<usize, DecoderErro Err(DecoderError::NeedMore(NeedMore::IntegerUnderflow)) } -fn peek_u8<B: Buf>(buf: &mut B) -> Option<u8> { +fn peek_u8<B: Buf>(buf: &B) -> Option<u8> { if buf.has_remaining() { Some(buf.chunk()[0]) } else { @@ -835,9 +835,9 @@ mod test { fn test_peek_u8() { let b = 0xff; let mut buf = Cursor::new(vec![b]); - assert_eq!(peek_u8(&mut buf), Some(b)); + assert_eq!(peek_u8(&buf), Some(b)); assert_eq!(buf.get_u8(), b); - assert_eq!(peek_u8(&mut buf), None); + assert_eq!(peek_u8(&buf), None); } #[test] diff --git a/vendor/h2/src/hpack/table.rs b/vendor/h2/src/hpack/table.rs index a1a780451..3e45f413b 100644 --- a/vendor/h2/src/hpack/table.rs +++ b/vendor/h2/src/hpack/table.rs @@ -319,7 +319,7 @@ impl Table { let mut probe = probe + 1; probe_loop!(probe < self.indices.len(), { - let pos = &mut self.indices[probe as usize]; + let pos = &mut self.indices[probe]; prev = match mem::replace(pos, Some(prev)) { Some(p) => p, @@ -656,12 +656,12 @@ fn to_raw_capacity(n: usize) -> usize { #[inline] fn desired_pos(mask: usize, hash: HashValue) -> usize { - (hash.0 & mask) as usize + hash.0 & mask } #[inline] fn probe_distance(mask: usize, hash: HashValue, current: usize) -> usize { - current.wrapping_sub(desired_pos(mask, hash)) & mask as usize + current.wrapping_sub(desired_pos(mask, hash)) & mask } fn hash_header(header: &Header) -> HashValue { diff --git a/vendor/h2/src/hpack/test/fixture.rs b/vendor/h2/src/hpack/test/fixture.rs index 0d33ca2de..d3f76e3bf 100644 --- a/vendor/h2/src/hpack/test/fixture.rs +++ b/vendor/h2/src/hpack/test/fixture.rs @@ -100,7 +100,7 @@ fn test_story(story: Value) { let mut input: Vec<_> = case .expect .iter() - .map(|&(ref name, ref value)| { + .map(|(name, value)| { Header::new(name.clone().into(), value.clone().into()) .unwrap() .into() diff --git a/vendor/h2/src/lib.rs b/vendor/h2/src/lib.rs index 830147113..a1fde6eb4 100644 --- a/vendor/h2/src/lib.rs +++ b/vendor/h2/src/lib.rs @@ -78,10 +78,15 @@ //! [`server::handshake`]: server/fn.handshake.html //! [`client::handshake`]: client/fn.handshake.html -#![doc(html_root_url = "https://docs.rs/h2/0.3.19")] -#![deny(missing_debug_implementations, missing_docs)] -#![cfg_attr(test, deny(warnings))] +#![doc(html_root_url = "https://docs.rs/h2/0.3.22")] +#![deny( + missing_debug_implementations, + missing_docs, + clippy::missing_safety_doc, + clippy::undocumented_unsafe_blocks +)] #![allow(clippy::type_complexity, clippy::manual_range_contains)] +#![cfg_attr(test, deny(warnings))] macro_rules! proto_err { (conn: $($msg:tt)+) => { diff --git a/vendor/h2/src/proto/connection.rs b/vendor/h2/src/proto/connection.rs index 727643a65..637fac358 100644 --- a/vendor/h2/src/proto/connection.rs +++ b/vendor/h2/src/proto/connection.rs @@ -145,7 +145,9 @@ where /// connection flow control pub(crate) fn set_target_window_size(&mut self, size: WindowSize) { - self.inner.streams.set_target_connection_window_size(size); + let _res = self.inner.streams.set_target_connection_window_size(size); + // TODO: proper error handling + debug_assert!(_res.is_ok()); } /// Send a new SETTINGS frame with an updated initial window size. diff --git a/vendor/h2/src/proto/mod.rs b/vendor/h2/src/proto/mod.rs index d71ee9c42..567d03060 100644 --- a/vendor/h2/src/proto/mod.rs +++ b/vendor/h2/src/proto/mod.rs @@ -30,7 +30,7 @@ pub type PingPayload = [u8; 8]; pub type WindowSize = u32; // Constants -pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; +pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; // i32::MAX as u32 pub const DEFAULT_REMOTE_RESET_STREAM_MAX: usize = 20; pub const DEFAULT_RESET_STREAM_MAX: usize = 10; pub const DEFAULT_RESET_STREAM_SECS: u64 = 30; diff --git a/vendor/h2/src/proto/settings.rs b/vendor/h2/src/proto/settings.rs index 6cc617209..28065cc68 100644 --- a/vendor/h2/src/proto/settings.rs +++ b/vendor/h2/src/proto/settings.rs @@ -60,6 +60,10 @@ impl Settings { codec.set_max_recv_header_list_size(max as usize); } + if let Some(val) = local.header_table_size() { + codec.set_recv_header_table_size(val as usize); + } + streams.apply_local_settings(local)?; self.local = Local::Synced; Ok(()) diff --git a/vendor/h2/src/proto/streams/counts.rs b/vendor/h2/src/proto/streams/counts.rs index 6a5aa9ccd..add1312e5 100644 --- a/vendor/h2/src/proto/streams/counts.rs +++ b/vendor/h2/src/proto/streams/counts.rs @@ -49,6 +49,14 @@ impl Counts { } } + /// Returns true when the next opened stream will reach capacity of outbound streams + /// + /// The number of client send streams is incremented in prioritize; send_request has to guess if + /// it should wait before allowing another request to be sent. + pub fn next_send_stream_will_reach_capacity(&self) -> bool { + self.max_send_streams <= (self.num_send_streams + 1) + } + /// Returns the current peer pub fn peer(&self) -> peer::Dyn { self.peer diff --git a/vendor/h2/src/proto/streams/flow_control.rs b/vendor/h2/src/proto/streams/flow_control.rs index 73a7754db..57a935825 100644 --- a/vendor/h2/src/proto/streams/flow_control.rs +++ b/vendor/h2/src/proto/streams/flow_control.rs @@ -75,12 +75,12 @@ impl FlowControl { self.window_size > self.available } - pub fn claim_capacity(&mut self, capacity: WindowSize) { - self.available -= capacity; + pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> { + self.available.decrease_by(capacity) } - pub fn assign_capacity(&mut self, capacity: WindowSize) { - self.available += capacity; + pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> { + self.available.increase_by(capacity) } /// If a WINDOW_UPDATE frame should be sent, returns a positive number @@ -136,22 +136,23 @@ impl FlowControl { /// /// This is called after receiving a SETTINGS frame with a lower /// INITIAL_WINDOW_SIZE value. - pub fn dec_send_window(&mut self, sz: WindowSize) { + pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> { tracing::trace!( "dec_window; sz={}; window={}, available={}", sz, self.window_size, self.available ); - // This should not be able to overflow `window_size` from the bottom. - self.window_size -= sz; + // ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can. + self.window_size.decrease_by(sz)?; + Ok(()) } /// Decrement the recv-side window size. /// /// This is called after receiving a SETTINGS ACK frame with a lower /// INITIAL_WINDOW_SIZE value. - pub fn dec_recv_window(&mut self, sz: WindowSize) { + pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> { tracing::trace!( "dec_recv_window; sz={}; window={}, available={}", sz, @@ -159,13 +160,14 @@ impl FlowControl { self.available ); // This should not be able to overflow `window_size` from the bottom. - self.window_size -= sz; - self.available -= sz; + self.window_size.decrease_by(sz)?; + self.available.decrease_by(sz)?; + Ok(()) } /// Decrements the window reflecting data has actually been sent. The caller /// must ensure that the window has capacity. - pub fn send_data(&mut self, sz: WindowSize) { + pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> { tracing::trace!( "send_data; sz={}; window={}; available={}", sz, @@ -176,12 +178,13 @@ impl FlowControl { // If send size is zero it's meaningless to update flow control window if sz > 0 { // Ensure that the argument is correct - assert!(self.window_size >= sz as usize); + assert!(self.window_size.0 >= sz as i32); // Update values - self.window_size -= sz; - self.available -= sz; + self.window_size.decrease_by(sz)?; + self.available.decrease_by(sz)?; } + Ok(()) } } @@ -208,6 +211,29 @@ impl Window { assert!(self.0 >= 0, "negative Window"); self.0 as WindowSize } + + pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> { + if let Some(v) = self.0.checked_sub(other as i32) { + self.0 = v; + Ok(()) + } else { + Err(Reason::FLOW_CONTROL_ERROR) + } + } + + pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> { + let other = self.add(other)?; + self.0 = other.0; + Ok(()) + } + + pub fn add(&self, other: WindowSize) -> Result<Self, Reason> { + if let Some(v) = self.0.checked_add(other as i32) { + Ok(Self(v)) + } else { + Err(Reason::FLOW_CONTROL_ERROR) + } + } } impl PartialEq<usize> for Window { @@ -230,25 +256,6 @@ impl PartialOrd<usize> for Window { } } -impl ::std::ops::SubAssign<WindowSize> for Window { - fn sub_assign(&mut self, other: WindowSize) { - self.0 -= other as i32; - } -} - -impl ::std::ops::Add<WindowSize> for Window { - type Output = Self; - fn add(self, other: WindowSize) -> Self::Output { - Window(self.0 + other as i32) - } -} - -impl ::std::ops::AddAssign<WindowSize> for Window { - fn add_assign(&mut self, other: WindowSize) { - self.0 += other as i32; - } -} - impl fmt::Display for Window { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fmt::Display::fmt(&self.0, f) diff --git a/vendor/h2/src/proto/streams/prioritize.rs b/vendor/h2/src/proto/streams/prioritize.rs index 88204ddcc..3196049a4 100644 --- a/vendor/h2/src/proto/streams/prioritize.rs +++ b/vendor/h2/src/proto/streams/prioritize.rs @@ -87,7 +87,9 @@ impl Prioritize { flow.inc_window(config.remote_init_window_sz) .expect("invalid initial window size"); - flow.assign_capacity(config.remote_init_window_sz); + // TODO: proper error handling + let _res = flow.assign_capacity(config.remote_init_window_sz); + debug_assert!(_res.is_ok()); tracing::trace!("Prioritize::new; flow={:?}", flow); @@ -253,7 +255,9 @@ impl Prioritize { if available as usize > capacity { let diff = available - capacity as WindowSize; - stream.send_flow.claim_capacity(diff); + // TODO: proper error handling + let _res = stream.send_flow.claim_capacity(diff); + debug_assert!(_res.is_ok()); self.assign_connection_capacity(diff, stream, counts); } @@ -324,7 +328,9 @@ impl Prioritize { pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { let available = stream.send_flow.available().as_size(); if available > 0 { - stream.send_flow.claim_capacity(available); + // TODO: proper error handling + let _res = stream.send_flow.claim_capacity(available); + debug_assert!(_res.is_ok()); // Re-assign all capacity to the connection self.assign_connection_capacity(available, stream, counts); } @@ -337,7 +343,9 @@ impl Prioritize { if stream.requested_send_capacity as usize > stream.buffered_send_data { let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize; - stream.send_flow.claim_capacity(reserved); + // TODO: proper error handling + let _res = stream.send_flow.claim_capacity(reserved); + debug_assert!(_res.is_ok()); self.assign_connection_capacity(reserved, stream, counts); } } @@ -363,7 +371,9 @@ impl Prioritize { let span = tracing::trace_span!("assign_connection_capacity", inc); let _e = span.enter(); - self.flow.assign_capacity(inc); + // TODO: proper error handling + let _res = self.flow.assign_capacity(inc); + debug_assert!(_res.is_ok()); // Assign newly acquired capacity to streams pending capacity. while self.flow.available() > 0 { @@ -443,7 +453,9 @@ impl Prioritize { stream.assign_capacity(assign, self.max_buffer_size); // Claim the capacity from the connection - self.flow.claim_capacity(assign); + // TODO: proper error handling + let _res = self.flow.claim_capacity(assign); + debug_assert!(_res.is_ok()); } tracing::trace!( @@ -508,7 +520,9 @@ impl Prioritize { tracing::trace!("poll_complete"); loop { - self.schedule_pending_open(store, counts); + if let Some(mut stream) = self.pop_pending_open(store, counts) { + self.pending_send.push_front(&mut stream); + } match self.pop_frame(buffer, store, max_frame_len, counts) { Some(frame) => { @@ -763,12 +777,16 @@ impl Prioritize { // Assign the capacity back to the connection that // was just consumed from the stream in the previous // line. - self.flow.assign_capacity(len); + // TODO: proper error handling + let _res = self.flow.assign_capacity(len); + debug_assert!(_res.is_ok()); }); let (eos, len) = tracing::trace_span!("updating connection flow") .in_scope(|| { - self.flow.send_data(len); + // TODO: proper error handling + let _res = self.flow.send_data(len); + debug_assert!(_res.is_ok()); // Wrap the frame's data payload to ensure that the // correct amount of data gets written. @@ -858,20 +876,24 @@ impl Prioritize { } } - fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { + fn pop_pending_open<'s>( + &mut self, + store: &'s mut Store, + counts: &mut Counts, + ) -> Option<store::Ptr<'s>> { tracing::trace!("schedule_pending_open"); // check for any pending open streams - while counts.can_inc_num_send_streams() { + if counts.can_inc_num_send_streams() { if let Some(mut stream) = self.pending_open.pop(store) { tracing::trace!("schedule_pending_open; stream={:?}", stream.id); counts.inc_num_send_streams(&mut stream); - self.pending_send.push(&mut stream); stream.notify_send(); - } else { - return; + return Some(stream); } } + + None } } diff --git a/vendor/h2/src/proto/streams/recv.rs b/vendor/h2/src/proto/streams/recv.rs index 8c7267a9d..0063942a4 100644 --- a/vendor/h2/src/proto/streams/recv.rs +++ b/vendor/h2/src/proto/streams/recv.rs @@ -90,7 +90,7 @@ impl Recv { // settings flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE) .expect("invalid initial remote window size"); - flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE); + flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap(); Recv { init_window_sz: config.local_init_window_sz, @@ -229,6 +229,11 @@ impl Recv { return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); } + if pseudo.status.is_some() && counts.peer().is_server() { + proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); + } + if !pseudo.is_informational() { let message = counts .peer() @@ -239,12 +244,14 @@ impl Recv { .pending_recv .push_back(&mut self.buffer, Event::Headers(message)); stream.notify_recv(); - } - // Only servers can receive a headers frame that initiates the stream. - // This is verified in `Streams` before calling this function. - if counts.peer().is_server() { - self.pending_accept.push(stream); + // Only servers can receive a headers frame that initiates the stream. + // This is verified in `Streams` before calling this function. + if counts.peer().is_server() { + // Correctness: never push a stream to `pending_accept` without having the + // corresponding headers frame pushed to `stream.pending_recv`. + self.pending_accept.push(stream); + } } Ok(()) @@ -252,13 +259,16 @@ impl Recv { /// Called by the server to get the request /// - /// TODO: Should this fn return `Result`? + /// # Panics + /// + /// Panics if `stream.pending_recv` has no `Event::Headers` queued. + /// pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> { use super::peer::PollMessage::*; match stream.pending_recv.pop_front(&mut self.buffer) { Some(Event::Headers(Server(request))) => request, - _ => panic!(), + _ => unreachable!("server stream queue must start with Headers"), } } @@ -308,7 +318,13 @@ impl Recv { Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)), Some(_) => panic!("poll_response called after response returned"), None => { - stream.state.ensure_recv_open()?; + if !stream.state.ensure_recv_open()? { + proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id); + return Poll::Ready(Err(Error::library_reset( + stream.id, + Reason::PROTOCOL_ERROR, + ))); + } stream.recv_task = Some(cx.waker().clone()); Poll::Pending @@ -353,7 +369,9 @@ impl Recv { self.in_flight_data -= capacity; // Assign capacity to connection - self.flow.assign_capacity(capacity); + // TODO: proper error handling + let _res = self.flow.assign_capacity(capacity); + debug_assert!(_res.is_ok()); if self.flow.unclaimed_capacity().is_some() { if let Some(task) = task.take() { @@ -381,7 +399,9 @@ impl Recv { stream.in_flight_recv_data -= capacity; // Assign capacity to stream - stream.recv_flow.assign_capacity(capacity); + // TODO: proper error handling + let _res = stream.recv_flow.assign_capacity(capacity); + debug_assert!(_res.is_ok()); if stream.recv_flow.unclaimed_capacity().is_some() { // Queue the stream for sending the WINDOW_UPDATE frame. @@ -427,7 +447,11 @@ impl Recv { /// /// The `task` is an optional parked task for the `Connection` that might /// be blocked on needing more window capacity. - pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) { + pub fn set_target_connection_window( + &mut self, + target: WindowSize, + task: &mut Option<Waker>, + ) -> Result<(), Reason> { tracing::trace!( "set_target_connection_window; target={}; available={}, reserved={}", target, @@ -440,11 +464,15 @@ impl Recv { // // Update the flow controller with the difference between the new // target and the current target. - let current = (self.flow.available() + self.in_flight_data).checked_size(); + let current = self + .flow + .available() + .add(self.in_flight_data)? + .checked_size(); if target > current { - self.flow.assign_capacity(target - current); + self.flow.assign_capacity(target - current)?; } else { - self.flow.claim_capacity(current - target); + self.flow.claim_capacity(current - target)?; } // If changing the target capacity means we gained a bunch of capacity, @@ -455,6 +483,7 @@ impl Recv { task.wake(); } } + Ok(()) } pub(crate) fn apply_local_settings( @@ -494,9 +523,13 @@ impl Recv { let dec = old_sz - target; tracing::trace!("decrementing all windows; dec={}", dec); - store.for_each(|mut stream| { - stream.recv_flow.dec_recv_window(dec); - }) + store.try_for_each(|mut stream| { + stream + .recv_flow + .dec_recv_window(dec) + .map_err(proto::Error::library_go_away)?; + Ok::<_, proto::Error>(()) + })?; } Ordering::Greater => { // We must increase the (local) window on every open stream. @@ -509,7 +542,10 @@ impl Recv { .recv_flow .inc_window(inc) .map_err(proto::Error::library_go_away)?; - stream.recv_flow.assign_capacity(inc); + stream + .recv_flow + .assign_capacity(inc) + .map_err(proto::Error::library_go_away)?; Ok::<_, proto::Error>(()) })?; } @@ -616,7 +652,10 @@ impl Recv { } // Update stream level flow control - stream.recv_flow.send_data(sz); + stream + .recv_flow + .send_data(sz) + .map_err(proto::Error::library_go_away)?; // Track the data as in-flight stream.in_flight_recv_data += sz; @@ -657,7 +696,7 @@ impl Recv { } // Update connection level flow control - self.flow.send_data(sz); + self.flow.send_data(sz).map_err(Error::library_go_away)?; // Track the data as in-flight self.in_flight_data += sz; @@ -859,15 +898,6 @@ impl Recv { tracing::trace!("enqueue_reset_expiration; {:?}", stream.id); - if !counts.can_inc_num_reset_streams() { - // try to evict 1 stream if possible - // if max allow is 0, this won't be able to evict, - // and then we'll just bail after - if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) { - counts.transition_after(evicted, true); - } - } - if counts.can_inc_num_reset_streams() { counts.inc_num_reset_streams(); self.pending_reset_expired.push(stream); diff --git a/vendor/h2/src/proto/streams/send.rs b/vendor/h2/src/proto/streams/send.rs index 20aba38d4..626e61a33 100644 --- a/vendor/h2/src/proto/streams/send.rs +++ b/vendor/h2/src/proto/streams/send.rs @@ -4,7 +4,7 @@ use super::{ }; use crate::codec::UserError; use crate::frame::{self, Reason}; -use crate::proto::{Error, Initiator}; +use crate::proto::{self, Error, Initiator}; use bytes::Buf; use tokio::io::AsyncWrite; @@ -143,22 +143,27 @@ impl Send { // Update the state stream.state.send_open(end_stream)?; - if counts.peer().is_local_init(frame.stream_id()) { - // If we're waiting on a PushPromise anyway - // handle potentially queueing the stream at that point - if !stream.is_pending_push { - if counts.can_inc_num_send_streams() { - counts.inc_num_send_streams(stream); - } else { - self.prioritize.queue_open(stream); - } - } + let mut pending_open = false; + if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push { + self.prioritize.queue_open(stream); + pending_open = true; } // Queue the frame for sending + // + // This call expects that, since new streams are in the open queue, new + // streams won't be pushed on pending_send. self.prioritize .queue_frame(frame.into(), buffer, stream, task); + // Need to notify the connection when pushing onto pending_open since + // queue_frame only notifies for pending_send. + if pending_open { + if let Some(task) = task.take() { + task.wake(); + } + } + Ok(()) } @@ -458,10 +463,21 @@ impl Send { tracing::trace!("decrementing all windows; dec={}", dec); let mut total_reclaimed = 0; - store.for_each(|mut stream| { + store.try_for_each(|mut stream| { let stream = &mut *stream; - stream.send_flow.dec_send_window(dec); + tracing::trace!( + "decrementing stream window; id={:?}; decr={}; flow={:?}", + stream.id, + dec, + stream.send_flow + ); + + // TODO: this decrement can underflow based on received frames! + stream + .send_flow + .dec_send_window(dec) + .map_err(proto::Error::library_go_away)?; // It's possible that decreasing the window causes // `window_size` (the stream-specific window) to fall below @@ -474,7 +490,10 @@ impl Send { let reclaimed = if available > window_size { // Drop down to `window_size`. let reclaim = available - window_size; - stream.send_flow.claim_capacity(reclaim); + stream + .send_flow + .claim_capacity(reclaim) + .map_err(proto::Error::library_go_away)?; total_reclaimed += reclaim; reclaim } else { @@ -492,7 +511,9 @@ impl Send { // TODO: Should this notify the producer when the capacity // of a stream is reduced? Maybe it should if the capacity // is reduced to zero, allowing the producer to stop work. - }); + + Ok::<_, proto::Error>(()) + })?; self.prioritize .assign_connection_capacity(total_reclaimed, store, counts); diff --git a/vendor/h2/src/proto/streams/state.rs b/vendor/h2/src/proto/streams/state.rs index 6f89b34c5..5256f09cf 100644 --- a/vendor/h2/src/proto/streams/state.rs +++ b/vendor/h2/src/proto/streams/state.rs @@ -64,8 +64,9 @@ enum Inner { Closed(Cause), } -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Default)] enum Peer { + #[default] AwaitingHeaders, Streaming, } @@ -361,10 +362,10 @@ impl State { } pub fn is_remote_reset(&self) -> bool { - match self.inner { - Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote))) => true, - _ => false, - } + matches!( + self.inner, + Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote))) + ) } /// Returns true if the stream is already reset. @@ -466,9 +467,3 @@ impl Default for State { State { inner: Inner::Idle } } } - -impl Default for Peer { - fn default() -> Self { - AwaitingHeaders - } -} diff --git a/vendor/h2/src/proto/streams/store.rs b/vendor/h2/src/proto/streams/store.rs index d33a01cce..67b377b12 100644 --- a/vendor/h2/src/proto/streams/store.rs +++ b/vendor/h2/src/proto/streams/store.rs @@ -256,7 +256,7 @@ where /// /// If the stream is already contained by the list, return `false`. pub fn push(&mut self, stream: &mut store::Ptr) -> bool { - tracing::trace!("Queue::push"); + tracing::trace!("Queue::push_back"); if N::is_queued(stream) { tracing::trace!(" -> already queued"); @@ -292,6 +292,46 @@ where true } + /// Queue the stream + /// + /// If the stream is already contained by the list, return `false`. + pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool { + tracing::trace!("Queue::push_front"); + + if N::is_queued(stream) { + tracing::trace!(" -> already queued"); + return false; + } + + N::set_queued(stream, true); + + // The next pointer shouldn't be set + debug_assert!(N::next(stream).is_none()); + + // Queue the stream + match self.indices { + Some(ref mut idxs) => { + tracing::trace!(" -> existing entries"); + + // Update the provided stream to point to the head node + let head_key = stream.resolve(idxs.head).key(); + N::set_next(stream, Some(head_key)); + + // Update the head pointer + idxs.head = stream.key(); + } + None => { + tracing::trace!(" -> first entry"); + self.indices = Some(store::Indices { + head: stream.key(), + tail: stream.key(), + }); + } + } + + true + } + pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>> where R: Resolve, diff --git a/vendor/h2/src/proto/streams/stream.rs b/vendor/h2/src/proto/streams/stream.rs index 2888d744b..43e313647 100644 --- a/vendor/h2/src/proto/streams/stream.rs +++ b/vendor/h2/src/proto/streams/stream.rs @@ -146,7 +146,9 @@ impl Stream { recv_flow .inc_window(init_recv_window) .expect("invalid initial receive window"); - recv_flow.assign_capacity(init_recv_window); + // TODO: proper error handling? + let _res = recv_flow.assign_capacity(init_recv_window); + debug_assert!(_res.is_ok()); send_flow .inc_window(init_send_window) @@ -275,7 +277,9 @@ impl Stream { pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { let prev_capacity = self.capacity(max_buffer_size); debug_assert!(capacity > 0); - self.send_flow.assign_capacity(capacity); + // TODO: proper error handling + let _res = self.send_flow.assign_capacity(capacity); + debug_assert!(_res.is_ok()); tracing::trace!( " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}", @@ -294,7 +298,9 @@ impl Stream { pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) { let prev_capacity = self.capacity(max_buffer_size); - self.send_flow.send_data(len); + // TODO: proper error handling + let _res = self.send_flow.send_data(len); + debug_assert!(_res.is_ok()); // Decrement the stream's buffered data counter debug_assert!(self.buffered_send_data >= len as usize); diff --git a/vendor/h2/src/proto/streams/streams.rs b/vendor/h2/src/proto/streams/streams.rs index dfc5c768b..274bf4553 100644 --- a/vendor/h2/src/proto/streams/streams.rs +++ b/vendor/h2/src/proto/streams/streams.rs @@ -118,7 +118,7 @@ where } } - pub fn set_target_connection_window_size(&mut self, size: WindowSize) { + pub fn set_target_connection_window_size(&mut self, size: WindowSize) -> Result<(), Reason> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -216,7 +216,7 @@ where mut request: Request<()>, end_of_stream: bool, pending: Option<&OpaqueStreamRef>, - ) -> Result<StreamRef<B>, SendError> { + ) -> Result<(StreamRef<B>, bool), SendError> { use super::stream::ContentLength; use http::Method; @@ -298,10 +298,14 @@ where // the lock, so it can't. me.refs += 1; - Ok(StreamRef { - opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream), - send_buffer: self.send_buffer.clone(), - }) + let is_full = me.counts.next_send_stream_will_reach_capacity(); + Ok(( + StreamRef { + opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream), + send_buffer: self.send_buffer.clone(), + }, + is_full, + )) } pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { @@ -726,7 +730,11 @@ impl Inner { } // The stream must be receive open - stream.state.ensure_recv_open()?; + if !stream.state.ensure_recv_open()? { + proto_err!(conn: "recv_push_promise: initiating stream is not opened"); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + stream.key() } None => { diff --git a/vendor/h2/src/server.rs b/vendor/h2/src/server.rs index f1f4cf470..bb20adc5d 100644 --- a/vendor/h2/src/server.rs +++ b/vendor/h2/src/server.rs @@ -937,7 +937,7 @@ impl Builder { /// stream have been written to the connection, the send buffer capacity /// will be freed up again. /// - /// The default is currently ~400MB, but may change. + /// The default is currently ~400KB, but may change. /// /// # Panics /// |