summaryrefslogtreecommitdiffstats
path: root/vendor/h2/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-19 09:25:56 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-06-19 09:25:56 +0000
commit018c4950b9406055dec02ef0fb52f132e2bb1e2c (patch)
treea835ebdf2088ef88fa681f8fad45f09922c1ae9a /vendor/h2/src
parentAdding debian version 1.75.0+dfsg1-5. (diff)
downloadrustc-018c4950b9406055dec02ef0fb52f132e2bb1e2c.tar.xz
rustc-018c4950b9406055dec02ef0fb52f132e2bb1e2c.zip
Merging upstream version 1.76.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/h2/src')
-rw-r--r--vendor/h2/src/client.rs41
-rw-r--r--vendor/h2/src/codec/framed_read.rs6
-rw-r--r--vendor/h2/src/codec/framed_write.rs84
-rw-r--r--vendor/h2/src/codec/mod.rs5
-rw-r--r--vendor/h2/src/frame/data.rs2
-rw-r--r--vendor/h2/src/frame/mod.rs2
-rw-r--r--vendor/h2/src/frame/settings.rs2
-rw-r--r--vendor/h2/src/hpack/decoder.rs6
-rw-r--r--vendor/h2/src/hpack/table.rs6
-rw-r--r--vendor/h2/src/hpack/test/fixture.rs2
-rw-r--r--vendor/h2/src/lib.rs11
-rw-r--r--vendor/h2/src/proto/connection.rs4
-rw-r--r--vendor/h2/src/proto/mod.rs2
-rw-r--r--vendor/h2/src/proto/settings.rs4
-rw-r--r--vendor/h2/src/proto/streams/counts.rs8
-rw-r--r--vendor/h2/src/proto/streams/flow_control.rs73
-rw-r--r--vendor/h2/src/proto/streams/prioritize.rs50
-rw-r--r--vendor/h2/src/proto/streams/recv.rs90
-rw-r--r--vendor/h2/src/proto/streams/send.rs51
-rw-r--r--vendor/h2/src/proto/streams/state.rs17
-rw-r--r--vendor/h2/src/proto/streams/store.rs42
-rw-r--r--vendor/h2/src/proto/streams/stream.rs12
-rw-r--r--vendor/h2/src/proto/streams/streams.rs22
-rw-r--r--vendor/h2/src/server.rs2
24 files changed, 362 insertions, 182 deletions
diff --git a/vendor/h2/src/client.rs b/vendor/h2/src/client.rs
index 4147e8a46..35cfc1414 100644
--- a/vendor/h2/src/client.rs
+++ b/vendor/h2/src/client.rs
@@ -510,8 +510,10 @@ where
self.inner
.send_request(request, end_of_stream, self.pending.as_ref())
.map_err(Into::into)
- .map(|stream| {
- if stream.is_pending_open() {
+ .map(|(stream, is_full)| {
+ if stream.is_pending_open() && is_full {
+ // Only prevent sending another request when the request queue
+ // is not full.
self.pending = Some(stream.clone_to_opaque());
}
@@ -1021,7 +1023,7 @@ impl Builder {
/// stream have been written to the connection, the send buffer capacity
/// will be freed up again.
///
- /// The default is currently ~400MB, but may change.
+ /// The default is currently ~400KB, but may change.
///
/// # Panics
///
@@ -1070,6 +1072,39 @@ impl Builder {
self
}
+ /// Sets the header table size.
+ ///
+ /// This setting informs the peer of the maximum size of the header compression
+ /// table used to encode header blocks, in octets. The encoder may select any value
+ /// equal to or less than the header table size specified by the sender.
+ ///
+ /// The default value is 4,096.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::{AsyncRead, AsyncWrite};
+ /// # use h2::client::*;
+ /// # use bytes::Bytes;
+ /// #
+ /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
+ /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
+ /// # {
+ /// // `client_fut` is a future representing the completion of the HTTP/2
+ /// // handshake.
+ /// let client_fut = Builder::new()
+ /// .header_table_size(1_000_000)
+ /// .handshake(my_io);
+ /// # client_fut.await
+ /// # }
+ /// #
+ /// # pub fn main() {}
+ /// ```
+ pub fn header_table_size(&mut self, size: u32) -> &mut Self {
+ self.settings.set_header_table_size(Some(size));
+ self
+ }
+
/// Sets the first stream ID to something other than 1.
#[cfg(feature = "unstable")]
pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
diff --git a/vendor/h2/src/codec/framed_read.rs b/vendor/h2/src/codec/framed_read.rs
index a874d7732..3b0030d93 100644
--- a/vendor/h2/src/codec/framed_read.rs
+++ b/vendor/h2/src/codec/framed_read.rs
@@ -88,6 +88,12 @@ impl<T> FramedRead<T> {
pub fn set_max_header_list_size(&mut self, val: usize) {
self.max_header_list_size = val;
}
+
+ /// Update the header table size setting.
+ #[inline]
+ pub fn set_header_table_size(&mut self, val: usize) {
+ self.hpack.queue_size_update(val);
+ }
}
/// Decodes a frame.
diff --git a/vendor/h2/src/codec/framed_write.rs b/vendor/h2/src/codec/framed_write.rs
index 4b1b4accc..c88af02da 100644
--- a/vendor/h2/src/codec/framed_write.rs
+++ b/vendor/h2/src/codec/framed_write.rs
@@ -7,8 +7,9 @@ use bytes::{Buf, BufMut, BytesMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio_util::io::poll_write_buf;
-use std::io::{self, Cursor, IoSlice};
+use std::io::{self, Cursor};
// A macro to get around a method needing to borrow &mut self
macro_rules! limited_write_buf {
@@ -45,8 +46,11 @@ struct Encoder<B> {
/// Max frame size, this is specified by the peer
max_frame_size: FrameSize,
- /// Whether or not the wrapped `AsyncWrite` supports vectored IO.
- is_write_vectored: bool,
+ /// Chain payloads bigger than this.
+ chain_threshold: usize,
+
+ /// Min buffer required to attempt to write a frame
+ min_buffer_capacity: usize,
}
#[derive(Debug)]
@@ -61,14 +65,16 @@ enum Next<B> {
/// frame that big.
const DEFAULT_BUFFER_CAPACITY: usize = 16 * 1_024;
-/// Min buffer required to attempt to write a frame
-const MIN_BUFFER_CAPACITY: usize = frame::HEADER_LEN + CHAIN_THRESHOLD;
-
-/// Chain payloads bigger than this. The remote will never advertise a max frame
-/// size less than this (well, the spec says the max frame size can't be less
-/// than 16kb, so not even close).
+/// Chain payloads bigger than this when vectored I/O is enabled. The remote
+/// will never advertise a max frame size less than this (well, the spec says
+/// the max frame size can't be less than 16kb, so not even close).
const CHAIN_THRESHOLD: usize = 256;
+/// Chain payloads bigger than this when vectored I/O is **not** enabled.
+/// A larger value in this scenario will reduce the number of small and
+/// fragmented data being sent, and hereby improve the throughput.
+const CHAIN_THRESHOLD_WITHOUT_VECTORED_IO: usize = 1024;
+
// TODO: Make generic
impl<T, B> FramedWrite<T, B>
where
@@ -76,7 +82,11 @@ where
B: Buf,
{
pub fn new(inner: T) -> FramedWrite<T, B> {
- let is_write_vectored = inner.is_write_vectored();
+ let chain_threshold = if inner.is_write_vectored() {
+ CHAIN_THRESHOLD
+ } else {
+ CHAIN_THRESHOLD_WITHOUT_VECTORED_IO
+ };
FramedWrite {
inner,
encoder: Encoder {
@@ -85,7 +95,8 @@ where
next: None,
last_data_frame: None,
max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
- is_write_vectored,
+ chain_threshold,
+ min_buffer_capacity: chain_threshold + frame::HEADER_LEN,
},
}
}
@@ -126,23 +137,17 @@ where
Some(Next::Data(ref mut frame)) => {
tracing::trace!(queued_data_frame = true);
let mut buf = (&mut self.encoder.buf).chain(frame.payload_mut());
- ready!(write(
- &mut self.inner,
- self.encoder.is_write_vectored,
- &mut buf,
- cx,
- ))?
+ ready!(poll_write_buf(Pin::new(&mut self.inner), cx, &mut buf))?
}
_ => {
tracing::trace!(queued_data_frame = false);
- ready!(write(
- &mut self.inner,
- self.encoder.is_write_vectored,
- &mut self.encoder.buf,
+ ready!(poll_write_buf(
+ Pin::new(&mut self.inner),
cx,
+ &mut self.encoder.buf
))?
}
- }
+ };
}
match self.encoder.unset_frame() {
@@ -165,30 +170,6 @@ where
}
}
-fn write<T, B>(
- writer: &mut T,
- is_write_vectored: bool,
- buf: &mut B,
- cx: &mut Context<'_>,
-) -> Poll<io::Result<()>>
-where
- T: AsyncWrite + Unpin,
- B: Buf,
-{
- // TODO(eliza): when tokio-util 0.5.1 is released, this
- // could just use `poll_write_buf`...
- const MAX_IOVS: usize = 64;
- let n = if is_write_vectored {
- let mut bufs = [IoSlice::new(&[]); MAX_IOVS];
- let cnt = buf.chunks_vectored(&mut bufs);
- ready!(Pin::new(writer).poll_write_vectored(cx, &bufs[..cnt]))?
- } else {
- ready!(Pin::new(writer).poll_write(cx, buf.chunk()))?
- };
- buf.advance(n);
- Ok(()).into()
-}
-
#[must_use]
enum ControlFlow {
Continue,
@@ -240,12 +221,17 @@ where
return Err(PayloadTooBig);
}
- if len >= CHAIN_THRESHOLD {
+ if len >= self.chain_threshold {
let head = v.head();
// Encode the frame head to the buffer
head.encode(len, self.buf.get_mut());
+ if self.buf.get_ref().remaining() < self.chain_threshold {
+ let extra_bytes = self.chain_threshold - self.buf.remaining();
+ self.buf.get_mut().put(v.payload_mut().take(extra_bytes));
+ }
+
// Save the data frame
self.next = Some(Next::Data(v));
} else {
@@ -305,7 +291,9 @@ where
}
fn has_capacity(&self) -> bool {
- self.next.is_none() && self.buf.get_ref().remaining_mut() >= MIN_BUFFER_CAPACITY
+ self.next.is_none()
+ && (self.buf.get_ref().capacity() - self.buf.get_ref().len()
+ >= self.min_buffer_capacity)
}
fn is_empty(&self) -> bool {
diff --git a/vendor/h2/src/codec/mod.rs b/vendor/h2/src/codec/mod.rs
index 359adf6e4..6cbdc1e18 100644
--- a/vendor/h2/src/codec/mod.rs
+++ b/vendor/h2/src/codec/mod.rs
@@ -95,6 +95,11 @@ impl<T, B> Codec<T, B> {
self.framed_write().set_header_table_size(val)
}
+ /// Set the decoder header table size size.
+ pub fn set_recv_header_table_size(&mut self, val: usize) {
+ self.inner.set_header_table_size(val)
+ }
+
/// Set the max header list size that can be received.
pub fn set_max_recv_header_list_size(&mut self, val: usize) {
self.inner.set_max_header_list_size(val);
diff --git a/vendor/h2/src/frame/data.rs b/vendor/h2/src/frame/data.rs
index d0cdf5f69..5ed3c31b5 100644
--- a/vendor/h2/src/frame/data.rs
+++ b/vendor/h2/src/frame/data.rs
@@ -148,7 +148,7 @@ impl<T: Buf> Data<T> {
///
/// Panics if `dst` cannot contain the data frame.
pub(crate) fn encode_chunk<U: BufMut>(&mut self, dst: &mut U) {
- let len = self.data.remaining() as usize;
+ let len = self.data.remaining();
assert!(dst.remaining_mut() >= len);
diff --git a/vendor/h2/src/frame/mod.rs b/vendor/h2/src/frame/mod.rs
index 570a162a8..0e8e7035c 100644
--- a/vendor/h2/src/frame/mod.rs
+++ b/vendor/h2/src/frame/mod.rs
@@ -69,7 +69,7 @@ pub use crate::hpack::BytesStr;
pub use self::settings::{
DEFAULT_INITIAL_WINDOW_SIZE, DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE,
- MAX_INITIAL_WINDOW_SIZE, MAX_MAX_FRAME_SIZE,
+ MAX_MAX_FRAME_SIZE,
};
pub type FrameSize = u32;
diff --git a/vendor/h2/src/frame/settings.rs b/vendor/h2/src/frame/settings.rs
index 0c913f059..484498a9d 100644
--- a/vendor/h2/src/frame/settings.rs
+++ b/vendor/h2/src/frame/settings.rs
@@ -121,11 +121,9 @@ impl Settings {
self.header_table_size
}
- /*
pub fn set_header_table_size(&mut self, size: Option<u32>) {
self.header_table_size = size;
}
- */
pub fn load(head: Head, payload: &[u8]) -> Result<Settings, Error> {
use self::Setting::*;
diff --git a/vendor/h2/src/hpack/decoder.rs b/vendor/h2/src/hpack/decoder.rs
index b45c37927..960cbb143 100644
--- a/vendor/h2/src/hpack/decoder.rs
+++ b/vendor/h2/src/hpack/decoder.rs
@@ -447,7 +447,7 @@ fn decode_int<B: Buf>(buf: &mut B, prefix_size: u8) -> Result<usize, DecoderErro
Err(DecoderError::NeedMore(NeedMore::IntegerUnderflow))
}
-fn peek_u8<B: Buf>(buf: &mut B) -> Option<u8> {
+fn peek_u8<B: Buf>(buf: &B) -> Option<u8> {
if buf.has_remaining() {
Some(buf.chunk()[0])
} else {
@@ -835,9 +835,9 @@ mod test {
fn test_peek_u8() {
let b = 0xff;
let mut buf = Cursor::new(vec![b]);
- assert_eq!(peek_u8(&mut buf), Some(b));
+ assert_eq!(peek_u8(&buf), Some(b));
assert_eq!(buf.get_u8(), b);
- assert_eq!(peek_u8(&mut buf), None);
+ assert_eq!(peek_u8(&buf), None);
}
#[test]
diff --git a/vendor/h2/src/hpack/table.rs b/vendor/h2/src/hpack/table.rs
index a1a780451..3e45f413b 100644
--- a/vendor/h2/src/hpack/table.rs
+++ b/vendor/h2/src/hpack/table.rs
@@ -319,7 +319,7 @@ impl Table {
let mut probe = probe + 1;
probe_loop!(probe < self.indices.len(), {
- let pos = &mut self.indices[probe as usize];
+ let pos = &mut self.indices[probe];
prev = match mem::replace(pos, Some(prev)) {
Some(p) => p,
@@ -656,12 +656,12 @@ fn to_raw_capacity(n: usize) -> usize {
#[inline]
fn desired_pos(mask: usize, hash: HashValue) -> usize {
- (hash.0 & mask) as usize
+ hash.0 & mask
}
#[inline]
fn probe_distance(mask: usize, hash: HashValue, current: usize) -> usize {
- current.wrapping_sub(desired_pos(mask, hash)) & mask as usize
+ current.wrapping_sub(desired_pos(mask, hash)) & mask
}
fn hash_header(header: &Header) -> HashValue {
diff --git a/vendor/h2/src/hpack/test/fixture.rs b/vendor/h2/src/hpack/test/fixture.rs
index 0d33ca2de..d3f76e3bf 100644
--- a/vendor/h2/src/hpack/test/fixture.rs
+++ b/vendor/h2/src/hpack/test/fixture.rs
@@ -100,7 +100,7 @@ fn test_story(story: Value) {
let mut input: Vec<_> = case
.expect
.iter()
- .map(|&(ref name, ref value)| {
+ .map(|(name, value)| {
Header::new(name.clone().into(), value.clone().into())
.unwrap()
.into()
diff --git a/vendor/h2/src/lib.rs b/vendor/h2/src/lib.rs
index 830147113..a1fde6eb4 100644
--- a/vendor/h2/src/lib.rs
+++ b/vendor/h2/src/lib.rs
@@ -78,10 +78,15 @@
//! [`server::handshake`]: server/fn.handshake.html
//! [`client::handshake`]: client/fn.handshake.html
-#![doc(html_root_url = "https://docs.rs/h2/0.3.19")]
-#![deny(missing_debug_implementations, missing_docs)]
-#![cfg_attr(test, deny(warnings))]
+#![doc(html_root_url = "https://docs.rs/h2/0.3.22")]
+#![deny(
+ missing_debug_implementations,
+ missing_docs,
+ clippy::missing_safety_doc,
+ clippy::undocumented_unsafe_blocks
+)]
#![allow(clippy::type_complexity, clippy::manual_range_contains)]
+#![cfg_attr(test, deny(warnings))]
macro_rules! proto_err {
(conn: $($msg:tt)+) => {
diff --git a/vendor/h2/src/proto/connection.rs b/vendor/h2/src/proto/connection.rs
index 727643a65..637fac358 100644
--- a/vendor/h2/src/proto/connection.rs
+++ b/vendor/h2/src/proto/connection.rs
@@ -145,7 +145,9 @@ where
/// connection flow control
pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
- self.inner.streams.set_target_connection_window_size(size);
+ let _res = self.inner.streams.set_target_connection_window_size(size);
+ // TODO: proper error handling
+ debug_assert!(_res.is_ok());
}
/// Send a new SETTINGS frame with an updated initial window size.
diff --git a/vendor/h2/src/proto/mod.rs b/vendor/h2/src/proto/mod.rs
index d71ee9c42..567d03060 100644
--- a/vendor/h2/src/proto/mod.rs
+++ b/vendor/h2/src/proto/mod.rs
@@ -30,7 +30,7 @@ pub type PingPayload = [u8; 8];
pub type WindowSize = u32;
// Constants
-pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1;
+pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; // i32::MAX as u32
pub const DEFAULT_REMOTE_RESET_STREAM_MAX: usize = 20;
pub const DEFAULT_RESET_STREAM_MAX: usize = 10;
pub const DEFAULT_RESET_STREAM_SECS: u64 = 30;
diff --git a/vendor/h2/src/proto/settings.rs b/vendor/h2/src/proto/settings.rs
index 6cc617209..28065cc68 100644
--- a/vendor/h2/src/proto/settings.rs
+++ b/vendor/h2/src/proto/settings.rs
@@ -60,6 +60,10 @@ impl Settings {
codec.set_max_recv_header_list_size(max as usize);
}
+ if let Some(val) = local.header_table_size() {
+ codec.set_recv_header_table_size(val as usize);
+ }
+
streams.apply_local_settings(local)?;
self.local = Local::Synced;
Ok(())
diff --git a/vendor/h2/src/proto/streams/counts.rs b/vendor/h2/src/proto/streams/counts.rs
index 6a5aa9ccd..add1312e5 100644
--- a/vendor/h2/src/proto/streams/counts.rs
+++ b/vendor/h2/src/proto/streams/counts.rs
@@ -49,6 +49,14 @@ impl Counts {
}
}
+ /// Returns true when the next opened stream will reach capacity of outbound streams
+ ///
+ /// The number of client send streams is incremented in prioritize; send_request has to guess if
+ /// it should wait before allowing another request to be sent.
+ pub fn next_send_stream_will_reach_capacity(&self) -> bool {
+ self.max_send_streams <= (self.num_send_streams + 1)
+ }
+
/// Returns the current peer
pub fn peer(&self) -> peer::Dyn {
self.peer
diff --git a/vendor/h2/src/proto/streams/flow_control.rs b/vendor/h2/src/proto/streams/flow_control.rs
index 73a7754db..57a935825 100644
--- a/vendor/h2/src/proto/streams/flow_control.rs
+++ b/vendor/h2/src/proto/streams/flow_control.rs
@@ -75,12 +75,12 @@ impl FlowControl {
self.window_size > self.available
}
- pub fn claim_capacity(&mut self, capacity: WindowSize) {
- self.available -= capacity;
+ pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
+ self.available.decrease_by(capacity)
}
- pub fn assign_capacity(&mut self, capacity: WindowSize) {
- self.available += capacity;
+ pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
+ self.available.increase_by(capacity)
}
/// If a WINDOW_UPDATE frame should be sent, returns a positive number
@@ -136,22 +136,23 @@ impl FlowControl {
///
/// This is called after receiving a SETTINGS frame with a lower
/// INITIAL_WINDOW_SIZE value.
- pub fn dec_send_window(&mut self, sz: WindowSize) {
+ pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
tracing::trace!(
"dec_window; sz={}; window={}, available={}",
sz,
self.window_size,
self.available
);
- // This should not be able to overflow `window_size` from the bottom.
- self.window_size -= sz;
+ // ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can.
+ self.window_size.decrease_by(sz)?;
+ Ok(())
}
/// Decrement the recv-side window size.
///
/// This is called after receiving a SETTINGS ACK frame with a lower
/// INITIAL_WINDOW_SIZE value.
- pub fn dec_recv_window(&mut self, sz: WindowSize) {
+ pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
tracing::trace!(
"dec_recv_window; sz={}; window={}, available={}",
sz,
@@ -159,13 +160,14 @@ impl FlowControl {
self.available
);
// This should not be able to overflow `window_size` from the bottom.
- self.window_size -= sz;
- self.available -= sz;
+ self.window_size.decrease_by(sz)?;
+ self.available.decrease_by(sz)?;
+ Ok(())
}
/// Decrements the window reflecting data has actually been sent. The caller
/// must ensure that the window has capacity.
- pub fn send_data(&mut self, sz: WindowSize) {
+ pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> {
tracing::trace!(
"send_data; sz={}; window={}; available={}",
sz,
@@ -176,12 +178,13 @@ impl FlowControl {
// If send size is zero it's meaningless to update flow control window
if sz > 0 {
// Ensure that the argument is correct
- assert!(self.window_size >= sz as usize);
+ assert!(self.window_size.0 >= sz as i32);
// Update values
- self.window_size -= sz;
- self.available -= sz;
+ self.window_size.decrease_by(sz)?;
+ self.available.decrease_by(sz)?;
}
+ Ok(())
}
}
@@ -208,6 +211,29 @@ impl Window {
assert!(self.0 >= 0, "negative Window");
self.0 as WindowSize
}
+
+ pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> {
+ if let Some(v) = self.0.checked_sub(other as i32) {
+ self.0 = v;
+ Ok(())
+ } else {
+ Err(Reason::FLOW_CONTROL_ERROR)
+ }
+ }
+
+ pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> {
+ let other = self.add(other)?;
+ self.0 = other.0;
+ Ok(())
+ }
+
+ pub fn add(&self, other: WindowSize) -> Result<Self, Reason> {
+ if let Some(v) = self.0.checked_add(other as i32) {
+ Ok(Self(v))
+ } else {
+ Err(Reason::FLOW_CONTROL_ERROR)
+ }
+ }
}
impl PartialEq<usize> for Window {
@@ -230,25 +256,6 @@ impl PartialOrd<usize> for Window {
}
}
-impl ::std::ops::SubAssign<WindowSize> for Window {
- fn sub_assign(&mut self, other: WindowSize) {
- self.0 -= other as i32;
- }
-}
-
-impl ::std::ops::Add<WindowSize> for Window {
- type Output = Self;
- fn add(self, other: WindowSize) -> Self::Output {
- Window(self.0 + other as i32)
- }
-}
-
-impl ::std::ops::AddAssign<WindowSize> for Window {
- fn add_assign(&mut self, other: WindowSize) {
- self.0 += other as i32;
- }
-}
-
impl fmt::Display for Window {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
diff --git a/vendor/h2/src/proto/streams/prioritize.rs b/vendor/h2/src/proto/streams/prioritize.rs
index 88204ddcc..3196049a4 100644
--- a/vendor/h2/src/proto/streams/prioritize.rs
+++ b/vendor/h2/src/proto/streams/prioritize.rs
@@ -87,7 +87,9 @@ impl Prioritize {
flow.inc_window(config.remote_init_window_sz)
.expect("invalid initial window size");
- flow.assign_capacity(config.remote_init_window_sz);
+ // TODO: proper error handling
+ let _res = flow.assign_capacity(config.remote_init_window_sz);
+ debug_assert!(_res.is_ok());
tracing::trace!("Prioritize::new; flow={:?}", flow);
@@ -253,7 +255,9 @@ impl Prioritize {
if available as usize > capacity {
let diff = available - capacity as WindowSize;
- stream.send_flow.claim_capacity(diff);
+ // TODO: proper error handling
+ let _res = stream.send_flow.claim_capacity(diff);
+ debug_assert!(_res.is_ok());
self.assign_connection_capacity(diff, stream, counts);
}
@@ -324,7 +328,9 @@ impl Prioritize {
pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
let available = stream.send_flow.available().as_size();
if available > 0 {
- stream.send_flow.claim_capacity(available);
+ // TODO: proper error handling
+ let _res = stream.send_flow.claim_capacity(available);
+ debug_assert!(_res.is_ok());
// Re-assign all capacity to the connection
self.assign_connection_capacity(available, stream, counts);
}
@@ -337,7 +343,9 @@ impl Prioritize {
if stream.requested_send_capacity as usize > stream.buffered_send_data {
let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize;
- stream.send_flow.claim_capacity(reserved);
+ // TODO: proper error handling
+ let _res = stream.send_flow.claim_capacity(reserved);
+ debug_assert!(_res.is_ok());
self.assign_connection_capacity(reserved, stream, counts);
}
}
@@ -363,7 +371,9 @@ impl Prioritize {
let span = tracing::trace_span!("assign_connection_capacity", inc);
let _e = span.enter();
- self.flow.assign_capacity(inc);
+ // TODO: proper error handling
+ let _res = self.flow.assign_capacity(inc);
+ debug_assert!(_res.is_ok());
// Assign newly acquired capacity to streams pending capacity.
while self.flow.available() > 0 {
@@ -443,7 +453,9 @@ impl Prioritize {
stream.assign_capacity(assign, self.max_buffer_size);
// Claim the capacity from the connection
- self.flow.claim_capacity(assign);
+ // TODO: proper error handling
+ let _res = self.flow.claim_capacity(assign);
+ debug_assert!(_res.is_ok());
}
tracing::trace!(
@@ -508,7 +520,9 @@ impl Prioritize {
tracing::trace!("poll_complete");
loop {
- self.schedule_pending_open(store, counts);
+ if let Some(mut stream) = self.pop_pending_open(store, counts) {
+ self.pending_send.push_front(&mut stream);
+ }
match self.pop_frame(buffer, store, max_frame_len, counts) {
Some(frame) => {
@@ -763,12 +777,16 @@ impl Prioritize {
// Assign the capacity back to the connection that
// was just consumed from the stream in the previous
// line.
- self.flow.assign_capacity(len);
+ // TODO: proper error handling
+ let _res = self.flow.assign_capacity(len);
+ debug_assert!(_res.is_ok());
});
let (eos, len) = tracing::trace_span!("updating connection flow")
.in_scope(|| {
- self.flow.send_data(len);
+ // TODO: proper error handling
+ let _res = self.flow.send_data(len);
+ debug_assert!(_res.is_ok());
// Wrap the frame's data payload to ensure that the
// correct amount of data gets written.
@@ -858,20 +876,24 @@ impl Prioritize {
}
}
- fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
+ fn pop_pending_open<'s>(
+ &mut self,
+ store: &'s mut Store,
+ counts: &mut Counts,
+ ) -> Option<store::Ptr<'s>> {
tracing::trace!("schedule_pending_open");
// check for any pending open streams
- while counts.can_inc_num_send_streams() {
+ if counts.can_inc_num_send_streams() {
if let Some(mut stream) = self.pending_open.pop(store) {
tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
counts.inc_num_send_streams(&mut stream);
- self.pending_send.push(&mut stream);
stream.notify_send();
- } else {
- return;
+ return Some(stream);
}
}
+
+ None
}
}
diff --git a/vendor/h2/src/proto/streams/recv.rs b/vendor/h2/src/proto/streams/recv.rs
index 8c7267a9d..0063942a4 100644
--- a/vendor/h2/src/proto/streams/recv.rs
+++ b/vendor/h2/src/proto/streams/recv.rs
@@ -90,7 +90,7 @@ impl Recv {
// settings
flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
.expect("invalid initial remote window size");
- flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE);
+ flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
Recv {
init_window_sz: config.local_init_window_sz,
@@ -229,6 +229,11 @@ impl Recv {
return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
}
+ if pseudo.status.is_some() && counts.peer().is_server() {
+ proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
+ return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
+ }
+
if !pseudo.is_informational() {
let message = counts
.peer()
@@ -239,12 +244,14 @@ impl Recv {
.pending_recv
.push_back(&mut self.buffer, Event::Headers(message));
stream.notify_recv();
- }
- // Only servers can receive a headers frame that initiates the stream.
- // This is verified in `Streams` before calling this function.
- if counts.peer().is_server() {
- self.pending_accept.push(stream);
+ // Only servers can receive a headers frame that initiates the stream.
+ // This is verified in `Streams` before calling this function.
+ if counts.peer().is_server() {
+ // Correctness: never push a stream to `pending_accept` without having the
+ // corresponding headers frame pushed to `stream.pending_recv`.
+ self.pending_accept.push(stream);
+ }
}
Ok(())
@@ -252,13 +259,16 @@ impl Recv {
/// Called by the server to get the request
///
- /// TODO: Should this fn return `Result`?
+ /// # Panics
+ ///
+ /// Panics if `stream.pending_recv` has no `Event::Headers` queued.
+ ///
pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
use super::peer::PollMessage::*;
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Headers(Server(request))) => request,
- _ => panic!(),
+ _ => unreachable!("server stream queue must start with Headers"),
}
}
@@ -308,7 +318,13 @@ impl Recv {
Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
Some(_) => panic!("poll_response called after response returned"),
None => {
- stream.state.ensure_recv_open()?;
+ if !stream.state.ensure_recv_open()? {
+ proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id);
+ return Poll::Ready(Err(Error::library_reset(
+ stream.id,
+ Reason::PROTOCOL_ERROR,
+ )));
+ }
stream.recv_task = Some(cx.waker().clone());
Poll::Pending
@@ -353,7 +369,9 @@ impl Recv {
self.in_flight_data -= capacity;
// Assign capacity to connection
- self.flow.assign_capacity(capacity);
+ // TODO: proper error handling
+ let _res = self.flow.assign_capacity(capacity);
+ debug_assert!(_res.is_ok());
if self.flow.unclaimed_capacity().is_some() {
if let Some(task) = task.take() {
@@ -381,7 +399,9 @@ impl Recv {
stream.in_flight_recv_data -= capacity;
// Assign capacity to stream
- stream.recv_flow.assign_capacity(capacity);
+ // TODO: proper error handling
+ let _res = stream.recv_flow.assign_capacity(capacity);
+ debug_assert!(_res.is_ok());
if stream.recv_flow.unclaimed_capacity().is_some() {
// Queue the stream for sending the WINDOW_UPDATE frame.
@@ -427,7 +447,11 @@ impl Recv {
///
/// The `task` is an optional parked task for the `Connection` that might
/// be blocked on needing more window capacity.
- pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) {
+ pub fn set_target_connection_window(
+ &mut self,
+ target: WindowSize,
+ task: &mut Option<Waker>,
+ ) -> Result<(), Reason> {
tracing::trace!(
"set_target_connection_window; target={}; available={}, reserved={}",
target,
@@ -440,11 +464,15 @@ impl Recv {
//
// Update the flow controller with the difference between the new
// target and the current target.
- let current = (self.flow.available() + self.in_flight_data).checked_size();
+ let current = self
+ .flow
+ .available()
+ .add(self.in_flight_data)?
+ .checked_size();
if target > current {
- self.flow.assign_capacity(target - current);
+ self.flow.assign_capacity(target - current)?;
} else {
- self.flow.claim_capacity(current - target);
+ self.flow.claim_capacity(current - target)?;
}
// If changing the target capacity means we gained a bunch of capacity,
@@ -455,6 +483,7 @@ impl Recv {
task.wake();
}
}
+ Ok(())
}
pub(crate) fn apply_local_settings(
@@ -494,9 +523,13 @@ impl Recv {
let dec = old_sz - target;
tracing::trace!("decrementing all windows; dec={}", dec);
- store.for_each(|mut stream| {
- stream.recv_flow.dec_recv_window(dec);
- })
+ store.try_for_each(|mut stream| {
+ stream
+ .recv_flow
+ .dec_recv_window(dec)
+ .map_err(proto::Error::library_go_away)?;
+ Ok::<_, proto::Error>(())
+ })?;
}
Ordering::Greater => {
// We must increase the (local) window on every open stream.
@@ -509,7 +542,10 @@ impl Recv {
.recv_flow
.inc_window(inc)
.map_err(proto::Error::library_go_away)?;
- stream.recv_flow.assign_capacity(inc);
+ stream
+ .recv_flow
+ .assign_capacity(inc)
+ .map_err(proto::Error::library_go_away)?;
Ok::<_, proto::Error>(())
})?;
}
@@ -616,7 +652,10 @@ impl Recv {
}
// Update stream level flow control
- stream.recv_flow.send_data(sz);
+ stream
+ .recv_flow
+ .send_data(sz)
+ .map_err(proto::Error::library_go_away)?;
// Track the data as in-flight
stream.in_flight_recv_data += sz;
@@ -657,7 +696,7 @@ impl Recv {
}
// Update connection level flow control
- self.flow.send_data(sz);
+ self.flow.send_data(sz).map_err(Error::library_go_away)?;
// Track the data as in-flight
self.in_flight_data += sz;
@@ -859,15 +898,6 @@ impl Recv {
tracing::trace!("enqueue_reset_expiration; {:?}", stream.id);
- if !counts.can_inc_num_reset_streams() {
- // try to evict 1 stream if possible
- // if max allow is 0, this won't be able to evict,
- // and then we'll just bail after
- if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) {
- counts.transition_after(evicted, true);
- }
- }
-
if counts.can_inc_num_reset_streams() {
counts.inc_num_reset_streams();
self.pending_reset_expired.push(stream);
diff --git a/vendor/h2/src/proto/streams/send.rs b/vendor/h2/src/proto/streams/send.rs
index 20aba38d4..626e61a33 100644
--- a/vendor/h2/src/proto/streams/send.rs
+++ b/vendor/h2/src/proto/streams/send.rs
@@ -4,7 +4,7 @@ use super::{
};
use crate::codec::UserError;
use crate::frame::{self, Reason};
-use crate::proto::{Error, Initiator};
+use crate::proto::{self, Error, Initiator};
use bytes::Buf;
use tokio::io::AsyncWrite;
@@ -143,22 +143,27 @@ impl Send {
// Update the state
stream.state.send_open(end_stream)?;
- if counts.peer().is_local_init(frame.stream_id()) {
- // If we're waiting on a PushPromise anyway
- // handle potentially queueing the stream at that point
- if !stream.is_pending_push {
- if counts.can_inc_num_send_streams() {
- counts.inc_num_send_streams(stream);
- } else {
- self.prioritize.queue_open(stream);
- }
- }
+ let mut pending_open = false;
+ if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push {
+ self.prioritize.queue_open(stream);
+ pending_open = true;
}
// Queue the frame for sending
+ //
+ // This call expects that, since new streams are in the open queue, new
+ // streams won't be pushed on pending_send.
self.prioritize
.queue_frame(frame.into(), buffer, stream, task);
+ // Need to notify the connection when pushing onto pending_open since
+ // queue_frame only notifies for pending_send.
+ if pending_open {
+ if let Some(task) = task.take() {
+ task.wake();
+ }
+ }
+
Ok(())
}
@@ -458,10 +463,21 @@ impl Send {
tracing::trace!("decrementing all windows; dec={}", dec);
let mut total_reclaimed = 0;
- store.for_each(|mut stream| {
+ store.try_for_each(|mut stream| {
let stream = &mut *stream;
- stream.send_flow.dec_send_window(dec);
+ tracing::trace!(
+ "decrementing stream window; id={:?}; decr={}; flow={:?}",
+ stream.id,
+ dec,
+ stream.send_flow
+ );
+
+ // TODO: this decrement can underflow based on received frames!
+ stream
+ .send_flow
+ .dec_send_window(dec)
+ .map_err(proto::Error::library_go_away)?;
// It's possible that decreasing the window causes
// `window_size` (the stream-specific window) to fall below
@@ -474,7 +490,10 @@ impl Send {
let reclaimed = if available > window_size {
// Drop down to `window_size`.
let reclaim = available - window_size;
- stream.send_flow.claim_capacity(reclaim);
+ stream
+ .send_flow
+ .claim_capacity(reclaim)
+ .map_err(proto::Error::library_go_away)?;
total_reclaimed += reclaim;
reclaim
} else {
@@ -492,7 +511,9 @@ impl Send {
// TODO: Should this notify the producer when the capacity
// of a stream is reduced? Maybe it should if the capacity
// is reduced to zero, allowing the producer to stop work.
- });
+
+ Ok::<_, proto::Error>(())
+ })?;
self.prioritize
.assign_connection_capacity(total_reclaimed, store, counts);
diff --git a/vendor/h2/src/proto/streams/state.rs b/vendor/h2/src/proto/streams/state.rs
index 6f89b34c5..5256f09cf 100644
--- a/vendor/h2/src/proto/streams/state.rs
+++ b/vendor/h2/src/proto/streams/state.rs
@@ -64,8 +64,9 @@ enum Inner {
Closed(Cause),
}
-#[derive(Debug, Copy, Clone)]
+#[derive(Debug, Copy, Clone, Default)]
enum Peer {
+ #[default]
AwaitingHeaders,
Streaming,
}
@@ -361,10 +362,10 @@ impl State {
}
pub fn is_remote_reset(&self) -> bool {
- match self.inner {
- Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote))) => true,
- _ => false,
- }
+ matches!(
+ self.inner,
+ Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote)))
+ )
}
/// Returns true if the stream is already reset.
@@ -466,9 +467,3 @@ impl Default for State {
State { inner: Inner::Idle }
}
}
-
-impl Default for Peer {
- fn default() -> Self {
- AwaitingHeaders
- }
-}
diff --git a/vendor/h2/src/proto/streams/store.rs b/vendor/h2/src/proto/streams/store.rs
index d33a01cce..67b377b12 100644
--- a/vendor/h2/src/proto/streams/store.rs
+++ b/vendor/h2/src/proto/streams/store.rs
@@ -256,7 +256,7 @@ where
///
/// If the stream is already contained by the list, return `false`.
pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
- tracing::trace!("Queue::push");
+ tracing::trace!("Queue::push_back");
if N::is_queued(stream) {
tracing::trace!(" -> already queued");
@@ -292,6 +292,46 @@ where
true
}
+ /// Queue the stream
+ ///
+ /// If the stream is already contained by the list, return `false`.
+ pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
+ tracing::trace!("Queue::push_front");
+
+ if N::is_queued(stream) {
+ tracing::trace!(" -> already queued");
+ return false;
+ }
+
+ N::set_queued(stream, true);
+
+ // The next pointer shouldn't be set
+ debug_assert!(N::next(stream).is_none());
+
+ // Queue the stream
+ match self.indices {
+ Some(ref mut idxs) => {
+ tracing::trace!(" -> existing entries");
+
+ // Update the provided stream to point to the head node
+ let head_key = stream.resolve(idxs.head).key();
+ N::set_next(stream, Some(head_key));
+
+ // Update the head pointer
+ idxs.head = stream.key();
+ }
+ None => {
+ tracing::trace!(" -> first entry");
+ self.indices = Some(store::Indices {
+ head: stream.key(),
+ tail: stream.key(),
+ });
+ }
+ }
+
+ true
+ }
+
pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
where
R: Resolve,
diff --git a/vendor/h2/src/proto/streams/stream.rs b/vendor/h2/src/proto/streams/stream.rs
index 2888d744b..43e313647 100644
--- a/vendor/h2/src/proto/streams/stream.rs
+++ b/vendor/h2/src/proto/streams/stream.rs
@@ -146,7 +146,9 @@ impl Stream {
recv_flow
.inc_window(init_recv_window)
.expect("invalid initial receive window");
- recv_flow.assign_capacity(init_recv_window);
+ // TODO: proper error handling?
+ let _res = recv_flow.assign_capacity(init_recv_window);
+ debug_assert!(_res.is_ok());
send_flow
.inc_window(init_send_window)
@@ -275,7 +277,9 @@ impl Stream {
pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) {
let prev_capacity = self.capacity(max_buffer_size);
debug_assert!(capacity > 0);
- self.send_flow.assign_capacity(capacity);
+ // TODO: proper error handling
+ let _res = self.send_flow.assign_capacity(capacity);
+ debug_assert!(_res.is_ok());
tracing::trace!(
" assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
@@ -294,7 +298,9 @@ impl Stream {
pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) {
let prev_capacity = self.capacity(max_buffer_size);
- self.send_flow.send_data(len);
+ // TODO: proper error handling
+ let _res = self.send_flow.send_data(len);
+ debug_assert!(_res.is_ok());
// Decrement the stream's buffered data counter
debug_assert!(self.buffered_send_data >= len as usize);
diff --git a/vendor/h2/src/proto/streams/streams.rs b/vendor/h2/src/proto/streams/streams.rs
index dfc5c768b..274bf4553 100644
--- a/vendor/h2/src/proto/streams/streams.rs
+++ b/vendor/h2/src/proto/streams/streams.rs
@@ -118,7 +118,7 @@ where
}
}
- pub fn set_target_connection_window_size(&mut self, size: WindowSize) {
+ pub fn set_target_connection_window_size(&mut self, size: WindowSize) -> Result<(), Reason> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -216,7 +216,7 @@ where
mut request: Request<()>,
end_of_stream: bool,
pending: Option<&OpaqueStreamRef>,
- ) -> Result<StreamRef<B>, SendError> {
+ ) -> Result<(StreamRef<B>, bool), SendError> {
use super::stream::ContentLength;
use http::Method;
@@ -298,10 +298,14 @@ where
// the lock, so it can't.
me.refs += 1;
- Ok(StreamRef {
- opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
- send_buffer: self.send_buffer.clone(),
- })
+ let is_full = me.counts.next_send_stream_will_reach_capacity();
+ Ok((
+ StreamRef {
+ opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
+ send_buffer: self.send_buffer.clone(),
+ },
+ is_full,
+ ))
}
pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
@@ -726,7 +730,11 @@ impl Inner {
}
// The stream must be receive open
- stream.state.ensure_recv_open()?;
+ if !stream.state.ensure_recv_open()? {
+ proto_err!(conn: "recv_push_promise: initiating stream is not opened");
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+
stream.key()
}
None => {
diff --git a/vendor/h2/src/server.rs b/vendor/h2/src/server.rs
index f1f4cf470..bb20adc5d 100644
--- a/vendor/h2/src/server.rs
+++ b/vendor/h2/src/server.rs
@@ -937,7 +937,7 @@ impl Builder {
/// stream have been written to the connection, the send buffer capacity
/// will be freed up again.
///
- /// The default is currently ~400MB, but may change.
+ /// The default is currently ~400KB, but may change.
///
/// # Panics
///