summaryrefslogtreecommitdiffstats
path: root/third_party/rust/h2/src/proto
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
commit26a029d407be480d791972afb5975cf62c9360a6 (patch)
treef435a8308119effd964b339f76abb83a57c29483 /third_party/rust/h2/src/proto
parentInitial commit. (diff)
downloadfirefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz
firefox-26a029d407be480d791972afb5975cf62c9360a6.zip
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/h2/src/proto')
-rw-r--r--third_party/rust/h2/src/proto/connection.rs598
-rw-r--r--third_party/rust/h2/src/proto/error.rs91
-rw-r--r--third_party/rust/h2/src/proto/go_away.rs154
-rw-r--r--third_party/rust/h2/src/proto/mod.rs37
-rw-r--r--third_party/rust/h2/src/proto/peer.rs93
-rw-r--r--third_party/rust/h2/src/proto/ping_pong.rs291
-rw-r--r--third_party/rust/h2/src/proto/settings.rs155
-rw-r--r--third_party/rust/h2/src/proto/streams/buffer.rs95
-rw-r--r--third_party/rust/h2/src/proto/streams/counts.rs253
-rw-r--r--third_party/rust/h2/src/proto/streams/flow_control.rs269
-rw-r--r--third_party/rust/h2/src/proto/streams/mod.rs72
-rw-r--r--third_party/rust/h2/src/proto/streams/prioritize.rs931
-rw-r--r--third_party/rust/h2/src/proto/streams/recv.rs1166
-rw-r--r--third_party/rust/h2/src/proto/streams/send.rs585
-rw-r--r--third_party/rust/h2/src/proto/streams/state.rs469
-rw-r--r--third_party/rust/h2/src/proto/streams/store.rs464
-rw-r--r--third_party/rust/h2/src/proto/streams/stream.rs527
-rw-r--r--third_party/rust/h2/src/proto/streams/streams.rs1594
18 files changed, 7844 insertions, 0 deletions
diff --git a/third_party/rust/h2/src/proto/connection.rs b/third_party/rust/h2/src/proto/connection.rs
new file mode 100644
index 0000000000..637fac3588
--- /dev/null
+++ b/third_party/rust/h2/src/proto/connection.rs
@@ -0,0 +1,598 @@
+use crate::codec::UserError;
+use crate::frame::{Reason, StreamId};
+use crate::{client, frame, server};
+
+use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE;
+use crate::proto::*;
+
+use bytes::{Buf, Bytes};
+use futures_core::Stream;
+use std::io;
+use std::marker::PhantomData;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::io::{AsyncRead, AsyncWrite};
+
+/// An H2 connection
+#[derive(Debug)]
+pub(crate) struct Connection<T, P, B: Buf = Bytes>
+where
+ P: Peer,
+{
+ /// Read / write frame values
+ codec: Codec<T, Prioritized<B>>,
+
+ inner: ConnectionInner<P, B>,
+}
+
+// Extracted part of `Connection` which does not depend on `T`. Reduces the amount of duplicated
+// method instantiations.
+#[derive(Debug)]
+struct ConnectionInner<P, B: Buf = Bytes>
+where
+ P: Peer,
+{
+ /// Tracks the connection level state transitions.
+ state: State,
+
+ /// An error to report back once complete.
+ ///
+ /// This exists separately from State in order to support
+ /// graceful shutdown.
+ error: Option<frame::GoAway>,
+
+ /// Pending GOAWAY frames to write.
+ go_away: GoAway,
+
+ /// Ping/pong handler
+ ping_pong: PingPong,
+
+ /// Connection settings
+ settings: Settings,
+
+ /// Stream state handler
+ streams: Streams<B, P>,
+
+ /// A `tracing` span tracking the lifetime of the connection.
+ span: tracing::Span,
+
+ /// Client or server
+ _phantom: PhantomData<P>,
+}
+
+struct DynConnection<'a, B: Buf = Bytes> {
+ state: &'a mut State,
+
+ go_away: &'a mut GoAway,
+
+ streams: DynStreams<'a, B>,
+
+ error: &'a mut Option<frame::GoAway>,
+
+ ping_pong: &'a mut PingPong,
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct Config {
+ pub next_stream_id: StreamId,
+ pub initial_max_send_streams: usize,
+ pub max_send_buffer_size: usize,
+ pub reset_stream_duration: Duration,
+ pub reset_stream_max: usize,
+ pub remote_reset_stream_max: usize,
+ pub settings: frame::Settings,
+}
+
+#[derive(Debug)]
+enum State {
+ /// Currently open in a sane state
+ Open,
+
+ /// The codec must be flushed
+ Closing(Reason, Initiator),
+
+ /// In a closed state
+ Closed(Reason, Initiator),
+}
+
+impl<T, P, B> Connection<T, P, B>
+where
+ T: AsyncRead + AsyncWrite + Unpin,
+ P: Peer,
+ B: Buf,
+{
+ pub fn new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B> {
+ fn streams_config(config: &Config) -> streams::Config {
+ streams::Config {
+ local_init_window_sz: config
+ .settings
+ .initial_window_size()
+ .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE),
+ initial_max_send_streams: config.initial_max_send_streams,
+ local_max_buffer_size: config.max_send_buffer_size,
+ local_next_stream_id: config.next_stream_id,
+ local_push_enabled: config.settings.is_push_enabled().unwrap_or(true),
+ extended_connect_protocol_enabled: config
+ .settings
+ .is_extended_connect_protocol_enabled()
+ .unwrap_or(false),
+ local_reset_duration: config.reset_stream_duration,
+ local_reset_max: config.reset_stream_max,
+ remote_reset_max: config.remote_reset_stream_max,
+ remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
+ remote_max_initiated: config
+ .settings
+ .max_concurrent_streams()
+ .map(|max| max as usize),
+ }
+ }
+ let streams = Streams::new(streams_config(&config));
+ Connection {
+ codec,
+ inner: ConnectionInner {
+ state: State::Open,
+ error: None,
+ go_away: GoAway::new(),
+ ping_pong: PingPong::new(),
+ settings: Settings::new(config.settings),
+ streams,
+ span: tracing::debug_span!("Connection", peer = %P::NAME),
+ _phantom: PhantomData,
+ },
+ }
+ }
+
+ /// connection flow control
+ pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
+ let _res = self.inner.streams.set_target_connection_window_size(size);
+ // TODO: proper error handling
+ debug_assert!(_res.is_ok());
+ }
+
+ /// Send a new SETTINGS frame with an updated initial window size.
+ pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
+ let mut settings = frame::Settings::default();
+ settings.set_initial_window_size(Some(size));
+ self.inner.settings.send_settings(settings)
+ }
+
+ /// Send a new SETTINGS frame with extended CONNECT protocol enabled.
+ pub(crate) fn set_enable_connect_protocol(&mut self) -> Result<(), UserError> {
+ let mut settings = frame::Settings::default();
+ settings.set_enable_connect_protocol(Some(1));
+ self.inner.settings.send_settings(settings)
+ }
+
+ /// Returns the maximum number of concurrent streams that may be initiated
+ /// by this peer.
+ pub(crate) fn max_send_streams(&self) -> usize {
+ self.inner.streams.max_send_streams()
+ }
+
+ /// Returns the maximum number of concurrent streams that may be initiated
+ /// by the remote peer.
+ pub(crate) fn max_recv_streams(&self) -> usize {
+ self.inner.streams.max_recv_streams()
+ }
+
+ #[cfg(feature = "unstable")]
+ pub fn num_wired_streams(&self) -> usize {
+ self.inner.streams.num_wired_streams()
+ }
+
+ /// Returns `Ready` when the connection is ready to receive a frame.
+ ///
+ /// Returns `Error` as this may raise errors that are caused by delayed
+ /// processing of received frames.
+ fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
+ let _e = self.inner.span.enter();
+ let span = tracing::trace_span!("poll_ready");
+ let _e = span.enter();
+ // The order of these calls don't really matter too much
+ ready!(self.inner.ping_pong.send_pending_pong(cx, &mut self.codec))?;
+ ready!(self.inner.ping_pong.send_pending_ping(cx, &mut self.codec))?;
+ ready!(self
+ .inner
+ .settings
+ .poll_send(cx, &mut self.codec, &mut self.inner.streams))?;
+ ready!(self.inner.streams.send_pending_refusal(cx, &mut self.codec))?;
+
+ Poll::Ready(Ok(()))
+ }
+
+ /// Send any pending GOAWAY frames.
+ ///
+ /// This will return `Some(reason)` if the connection should be closed
+ /// afterwards. If this is a graceful shutdown, this returns `None`.
+ fn poll_go_away(&mut self, cx: &mut Context) -> Poll<Option<io::Result<Reason>>> {
+ self.inner.go_away.send_pending_go_away(cx, &mut self.codec)
+ }
+
+ pub fn go_away_from_user(&mut self, e: Reason) {
+ self.inner.as_dyn().go_away_from_user(e)
+ }
+
+ fn take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error> {
+ let (debug_data, theirs) = self
+ .inner
+ .error
+ .take()
+ .as_ref()
+ .map_or((Bytes::new(), Reason::NO_ERROR), |frame| {
+ (frame.debug_data().clone(), frame.reason())
+ });
+
+ match (ours, theirs) {
+ (Reason::NO_ERROR, Reason::NO_ERROR) => Ok(()),
+ (ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)),
+ // If both sides reported an error, give their
+ // error back to th user. We assume our error
+ // was a consequence of their error, and less
+ // important.
+ (_, theirs) => Err(Error::remote_go_away(debug_data, theirs)),
+ }
+ }
+
+ /// Closes the connection by transitioning to a GOAWAY state
+ /// iff there are no streams or references
+ pub fn maybe_close_connection_if_no_streams(&mut self) {
+ // If we poll() and realize that there are no streams or references
+ // then we can close the connection by transitioning to GOAWAY
+ if !self.inner.streams.has_streams_or_other_references() {
+ self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
+ }
+ }
+
+ pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> {
+ self.inner.ping_pong.take_user_pings()
+ }
+
+ /// Advances the internal state of the connection.
+ pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
+ // XXX(eliza): cloning the span is unfortunately necessary here in
+ // order to placate the borrow checker — `self` is mutably borrowed by
+ // `poll2`, which means that we can't borrow `self.span` to enter it.
+ // The clone is just an atomic ref bump.
+ let span = self.inner.span.clone();
+ let _e = span.enter();
+ let span = tracing::trace_span!("poll");
+ let _e = span.enter();
+
+ loop {
+ tracing::trace!(connection.state = ?self.inner.state);
+ // TODO: probably clean up this glob of code
+ match self.inner.state {
+ // When open, continue to poll a frame
+ State::Open => {
+ let result = match self.poll2(cx) {
+ Poll::Ready(result) => result,
+ // The connection is not ready to make progress
+ Poll::Pending => {
+ // Ensure all window updates have been sent.
+ //
+ // This will also handle flushing `self.codec`
+ ready!(self.inner.streams.poll_complete(cx, &mut self.codec))?;
+
+ if (self.inner.error.is_some()
+ || self.inner.go_away.should_close_on_idle())
+ && !self.inner.streams.has_streams()
+ {
+ self.inner.as_dyn().go_away_now(Reason::NO_ERROR);
+ continue;
+ }
+
+ return Poll::Pending;
+ }
+ };
+
+ self.inner.as_dyn().handle_poll2_result(result)?
+ }
+ State::Closing(reason, initiator) => {
+ tracing::trace!("connection closing after flush");
+ // Flush/shutdown the codec
+ ready!(self.codec.shutdown(cx))?;
+
+ // Transition the state to error
+ self.inner.state = State::Closed(reason, initiator);
+ }
+ State::Closed(reason, initiator) => {
+ return Poll::Ready(self.take_error(reason, initiator));
+ }
+ }
+ }
+ }
+
+ fn poll2(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
+ // This happens outside of the loop to prevent needing to do a clock
+ // check and then comparison of the queue possibly multiple times a
+ // second (and thus, the clock wouldn't have changed enough to matter).
+ self.clear_expired_reset_streams();
+
+ loop {
+ // First, ensure that the `Connection` is able to receive a frame
+ //
+ // The order here matters:
+ // - poll_go_away may buffer a graceful shutdown GOAWAY frame
+ // - If it has, we've also added a PING to be sent in poll_ready
+ if let Some(reason) = ready!(self.poll_go_away(cx)?) {
+ if self.inner.go_away.should_close_now() {
+ if self.inner.go_away.is_user_initiated() {
+ // A user initiated abrupt shutdown shouldn't return
+ // the same error back to the user.
+ return Poll::Ready(Ok(()));
+ } else {
+ return Poll::Ready(Err(Error::library_go_away(reason)));
+ }
+ }
+ // Only NO_ERROR should be waiting for idle
+ debug_assert_eq!(
+ reason,
+ Reason::NO_ERROR,
+ "graceful GOAWAY should be NO_ERROR"
+ );
+ }
+ ready!(self.poll_ready(cx))?;
+
+ match self
+ .inner
+ .as_dyn()
+ .recv_frame(ready!(Pin::new(&mut self.codec).poll_next(cx)?))?
+ {
+ ReceivedFrame::Settings(frame) => {
+ self.inner.settings.recv_settings(
+ frame,
+ &mut self.codec,
+ &mut self.inner.streams,
+ )?;
+ }
+ ReceivedFrame::Continue => (),
+ ReceivedFrame::Done => {
+ return Poll::Ready(Ok(()));
+ }
+ }
+ }
+ }
+
+ fn clear_expired_reset_streams(&mut self) {
+ self.inner.streams.clear_expired_reset_streams();
+ }
+}
+
+impl<P, B> ConnectionInner<P, B>
+where
+ P: Peer,
+ B: Buf,
+{
+ fn as_dyn(&mut self) -> DynConnection<'_, B> {
+ let ConnectionInner {
+ state,
+ go_away,
+ streams,
+ error,
+ ping_pong,
+ ..
+ } = self;
+ let streams = streams.as_dyn();
+ DynConnection {
+ state,
+ go_away,
+ streams,
+ error,
+ ping_pong,
+ }
+ }
+}
+
+impl<B> DynConnection<'_, B>
+where
+ B: Buf,
+{
+ fn go_away(&mut self, id: StreamId, e: Reason) {
+ let frame = frame::GoAway::new(id, e);
+ self.streams.send_go_away(id);
+ self.go_away.go_away(frame);
+ }
+
+ fn go_away_now(&mut self, e: Reason) {
+ let last_processed_id = self.streams.last_processed_id();
+ let frame = frame::GoAway::new(last_processed_id, e);
+ self.go_away.go_away_now(frame);
+ }
+
+ fn go_away_now_data(&mut self, e: Reason, data: Bytes) {
+ let last_processed_id = self.streams.last_processed_id();
+ let frame = frame::GoAway::with_debug_data(last_processed_id, e, data);
+ self.go_away.go_away_now(frame);
+ }
+
+ fn go_away_from_user(&mut self, e: Reason) {
+ let last_processed_id = self.streams.last_processed_id();
+ let frame = frame::GoAway::new(last_processed_id, e);
+ self.go_away.go_away_from_user(frame);
+
+ // Notify all streams of reason we're abruptly closing.
+ self.streams.handle_error(Error::user_go_away(e));
+ }
+
+ fn handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error> {
+ match result {
+ // The connection has shutdown normally
+ Ok(()) => {
+ *self.state = State::Closing(Reason::NO_ERROR, Initiator::Library);
+ Ok(())
+ }
+ // Attempting to read a frame resulted in a connection level
+ // error. This is handled by setting a GOAWAY frame followed by
+ // terminating the connection.
+ Err(Error::GoAway(debug_data, reason, initiator)) => {
+ let e = Error::GoAway(debug_data.clone(), reason, initiator);
+ tracing::debug!(error = ?e, "Connection::poll; connection error");
+
+ // We may have already sent a GOAWAY for this error,
+ // if so, don't send another, just flush and close up.
+ if self
+ .go_away
+ .going_away()
+ .map_or(false, |frame| frame.reason() == reason)
+ {
+ tracing::trace!(" -> already going away");
+ *self.state = State::Closing(reason, initiator);
+ return Ok(());
+ }
+
+ // Reset all active streams
+ self.streams.handle_error(e);
+ self.go_away_now_data(reason, debug_data);
+ Ok(())
+ }
+ // Attempting to read a frame resulted in a stream level error.
+ // This is handled by resetting the frame then trying to read
+ // another frame.
+ Err(Error::Reset(id, reason, initiator)) => {
+ debug_assert_eq!(initiator, Initiator::Library);
+ tracing::trace!(?id, ?reason, "stream error");
+ self.streams.send_reset(id, reason);
+ Ok(())
+ }
+ // Attempting to read a frame resulted in an I/O error. All
+ // active streams must be reset.
+ //
+ // TODO: Are I/O errors recoverable?
+ Err(Error::Io(e, inner)) => {
+ tracing::debug!(error = ?e, "Connection::poll; IO error");
+ let e = Error::Io(e, inner);
+
+ // Reset all active streams
+ self.streams.handle_error(e.clone());
+
+ // Return the error
+ Err(e)
+ }
+ }
+ }
+
+ fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error> {
+ use crate::frame::Frame::*;
+ match frame {
+ Some(Headers(frame)) => {
+ tracing::trace!(?frame, "recv HEADERS");
+ self.streams.recv_headers(frame)?;
+ }
+ Some(Data(frame)) => {
+ tracing::trace!(?frame, "recv DATA");
+ self.streams.recv_data(frame)?;
+ }
+ Some(Reset(frame)) => {
+ tracing::trace!(?frame, "recv RST_STREAM");
+ self.streams.recv_reset(frame)?;
+ }
+ Some(PushPromise(frame)) => {
+ tracing::trace!(?frame, "recv PUSH_PROMISE");
+ self.streams.recv_push_promise(frame)?;
+ }
+ Some(Settings(frame)) => {
+ tracing::trace!(?frame, "recv SETTINGS");
+ return Ok(ReceivedFrame::Settings(frame));
+ }
+ Some(GoAway(frame)) => {
+ tracing::trace!(?frame, "recv GOAWAY");
+ // This should prevent starting new streams,
+ // but should allow continuing to process current streams
+ // until they are all EOS. Once they are, State should
+ // transition to GoAway.
+ self.streams.recv_go_away(&frame)?;
+ *self.error = Some(frame);
+ }
+ Some(Ping(frame)) => {
+ tracing::trace!(?frame, "recv PING");
+ let status = self.ping_pong.recv_ping(frame);
+ if status.is_shutdown() {
+ assert!(
+ self.go_away.is_going_away(),
+ "received unexpected shutdown ping"
+ );
+
+ let last_processed_id = self.streams.last_processed_id();
+ self.go_away(last_processed_id, Reason::NO_ERROR);
+ }
+ }
+ Some(WindowUpdate(frame)) => {
+ tracing::trace!(?frame, "recv WINDOW_UPDATE");
+ self.streams.recv_window_update(frame)?;
+ }
+ Some(Priority(frame)) => {
+ tracing::trace!(?frame, "recv PRIORITY");
+ // TODO: handle
+ }
+ None => {
+ tracing::trace!("codec closed");
+ self.streams.recv_eof(false).expect("mutex poisoned");
+ return Ok(ReceivedFrame::Done);
+ }
+ }
+ Ok(ReceivedFrame::Continue)
+ }
+}
+
+enum ReceivedFrame {
+ Settings(frame::Settings),
+ Continue,
+ Done,
+}
+
+impl<T, B> Connection<T, client::Peer, B>
+where
+ T: AsyncRead + AsyncWrite,
+ B: Buf,
+{
+ pub(crate) fn streams(&self) -> &Streams<B, client::Peer> {
+ &self.inner.streams
+ }
+}
+
+impl<T, B> Connection<T, server::Peer, B>
+where
+ T: AsyncRead + AsyncWrite + Unpin,
+ B: Buf,
+{
+ pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
+ self.inner.streams.next_incoming()
+ }
+
+ // Graceful shutdown only makes sense for server peers.
+ pub fn go_away_gracefully(&mut self) {
+ if self.inner.go_away.is_going_away() {
+ // No reason to start a new one.
+ return;
+ }
+
+ // According to http://httpwg.org/specs/rfc7540.html#GOAWAY:
+ //
+ // > A server that is attempting to gracefully shut down a connection
+ // > SHOULD send an initial GOAWAY frame with the last stream
+ // > identifier set to 2^31-1 and a NO_ERROR code. This signals to the
+ // > client that a shutdown is imminent and that initiating further
+ // > requests is prohibited. After allowing time for any in-flight
+ // > stream creation (at least one round-trip time), the server can
+ // > send another GOAWAY frame with an updated last stream identifier.
+ // > This ensures that a connection can be cleanly shut down without
+ // > losing requests.
+ self.inner.as_dyn().go_away(StreamId::MAX, Reason::NO_ERROR);
+
+ // We take the advice of waiting 1 RTT literally, and wait
+ // for a pong before proceeding.
+ self.inner.ping_pong.ping_shutdown();
+ }
+}
+
+impl<T, P, B> Drop for Connection<T, P, B>
+where
+ P: Peer,
+ B: Buf,
+{
+ fn drop(&mut self) {
+ // Ignore errors as this indicates that the mutex is poisoned.
+ let _ = self.inner.streams.recv_eof(true);
+ }
+}
diff --git a/third_party/rust/h2/src/proto/error.rs b/third_party/rust/h2/src/proto/error.rs
new file mode 100644
index 0000000000..ad023317ee
--- /dev/null
+++ b/third_party/rust/h2/src/proto/error.rs
@@ -0,0 +1,91 @@
+use crate::codec::SendError;
+use crate::frame::{Reason, StreamId};
+
+use bytes::Bytes;
+use std::fmt;
+use std::io;
+
+/// Either an H2 reason or an I/O error
+#[derive(Clone, Debug)]
+pub enum Error {
+ Reset(StreamId, Reason, Initiator),
+ GoAway(Bytes, Reason, Initiator),
+ Io(io::ErrorKind, Option<String>),
+}
+
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum Initiator {
+ User,
+ Library,
+ Remote,
+}
+
+impl Error {
+ pub(crate) fn is_local(&self) -> bool {
+ match *self {
+ Self::Reset(_, _, initiator) | Self::GoAway(_, _, initiator) => initiator.is_local(),
+ Self::Io(..) => true,
+ }
+ }
+
+ pub(crate) fn user_go_away(reason: Reason) -> Self {
+ Self::GoAway(Bytes::new(), reason, Initiator::User)
+ }
+
+ pub(crate) fn library_reset(stream_id: StreamId, reason: Reason) -> Self {
+ Self::Reset(stream_id, reason, Initiator::Library)
+ }
+
+ pub(crate) fn library_go_away(reason: Reason) -> Self {
+ Self::GoAway(Bytes::new(), reason, Initiator::Library)
+ }
+
+ pub(crate) fn library_go_away_data(reason: Reason, debug_data: impl Into<Bytes>) -> Self {
+ Self::GoAway(debug_data.into(), reason, Initiator::Library)
+ }
+
+ pub(crate) fn remote_reset(stream_id: StreamId, reason: Reason) -> Self {
+ Self::Reset(stream_id, reason, Initiator::Remote)
+ }
+
+ pub(crate) fn remote_go_away(debug_data: Bytes, reason: Reason) -> Self {
+ Self::GoAway(debug_data, reason, Initiator::Remote)
+ }
+}
+
+impl Initiator {
+ fn is_local(&self) -> bool {
+ match *self {
+ Self::User | Self::Library => true,
+ Self::Remote => false,
+ }
+ }
+}
+
+impl fmt::Display for Error {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ match *self {
+ Self::Reset(_, reason, _) | Self::GoAway(_, reason, _) => reason.fmt(fmt),
+ Self::Io(_, Some(ref inner)) => inner.fmt(fmt),
+ Self::Io(kind, None) => io::Error::from(kind).fmt(fmt),
+ }
+ }
+}
+
+impl From<io::ErrorKind> for Error {
+ fn from(src: io::ErrorKind) -> Self {
+ Error::Io(src, None)
+ }
+}
+
+impl From<io::Error> for Error {
+ fn from(src: io::Error) -> Self {
+ Error::Io(src.kind(), src.get_ref().map(|inner| inner.to_string()))
+ }
+}
+
+impl From<Error> for SendError {
+ fn from(src: Error) -> Self {
+ Self::Connection(src)
+ }
+}
diff --git a/third_party/rust/h2/src/proto/go_away.rs b/third_party/rust/h2/src/proto/go_away.rs
new file mode 100644
index 0000000000..d52252cd74
--- /dev/null
+++ b/third_party/rust/h2/src/proto/go_away.rs
@@ -0,0 +1,154 @@
+use crate::codec::Codec;
+use crate::frame::{self, Reason, StreamId};
+
+use bytes::Buf;
+use std::io;
+use std::task::{Context, Poll};
+use tokio::io::AsyncWrite;
+
+/// Manages our sending of GOAWAY frames.
+#[derive(Debug)]
+pub(super) struct GoAway {
+ /// Whether the connection should close now, or wait until idle.
+ close_now: bool,
+ /// Records if we've sent any GOAWAY before.
+ going_away: Option<GoingAway>,
+ /// Whether the user started the GOAWAY by calling `abrupt_shutdown`.
+ is_user_initiated: bool,
+ /// A GOAWAY frame that must be buffered in the Codec immediately.
+ pending: Option<frame::GoAway>,
+}
+
+/// Keeps a memory of any GOAWAY frames we've sent before.
+///
+/// This looks very similar to a `frame::GoAway`, but is a separate type. Why?
+/// Mostly for documentation purposes. This type is to record status. If it
+/// were a `frame::GoAway`, it might appear like we eventually wanted to
+/// serialize it. We **only** want to be able to look up these fields at a
+/// later time.
+#[derive(Debug)]
+pub(crate) struct GoingAway {
+ /// Stores the highest stream ID of a GOAWAY that has been sent.
+ ///
+ /// It's illegal to send a subsequent GOAWAY with a higher ID.
+ last_processed_id: StreamId,
+
+ /// Records the error code of any GOAWAY frame sent.
+ reason: Reason,
+}
+
+impl GoAway {
+ pub fn new() -> Self {
+ GoAway {
+ close_now: false,
+ going_away: None,
+ is_user_initiated: false,
+ pending: None,
+ }
+ }
+
+ /// Enqueue a GOAWAY frame to be written.
+ ///
+ /// The connection is expected to continue to run until idle.
+ pub fn go_away(&mut self, f: frame::GoAway) {
+ if let Some(ref going_away) = self.going_away {
+ assert!(
+ f.last_stream_id() <= going_away.last_processed_id,
+ "GOAWAY stream IDs shouldn't be higher; \
+ last_processed_id = {:?}, f.last_stream_id() = {:?}",
+ going_away.last_processed_id,
+ f.last_stream_id(),
+ );
+ }
+
+ self.going_away = Some(GoingAway {
+ last_processed_id: f.last_stream_id(),
+ reason: f.reason(),
+ });
+ self.pending = Some(f);
+ }
+
+ pub fn go_away_now(&mut self, f: frame::GoAway) {
+ self.close_now = true;
+ if let Some(ref going_away) = self.going_away {
+ // Prevent sending the same GOAWAY twice.
+ if going_away.last_processed_id == f.last_stream_id() && going_away.reason == f.reason()
+ {
+ return;
+ }
+ }
+ self.go_away(f);
+ }
+
+ pub fn go_away_from_user(&mut self, f: frame::GoAway) {
+ self.is_user_initiated = true;
+ self.go_away_now(f);
+ }
+
+ /// Return if a GOAWAY has ever been scheduled.
+ pub fn is_going_away(&self) -> bool {
+ self.going_away.is_some()
+ }
+
+ pub fn is_user_initiated(&self) -> bool {
+ self.is_user_initiated
+ }
+
+ /// Returns the going away info, if any.
+ pub fn going_away(&self) -> Option<&GoingAway> {
+ self.going_away.as_ref()
+ }
+
+ /// Returns if the connection should close now, or wait until idle.
+ pub fn should_close_now(&self) -> bool {
+ self.pending.is_none() && self.close_now
+ }
+
+ /// Returns if the connection should be closed when idle.
+ pub fn should_close_on_idle(&self) -> bool {
+ !self.close_now
+ && self
+ .going_away
+ .as_ref()
+ .map(|g| g.last_processed_id != StreamId::MAX)
+ .unwrap_or(false)
+ }
+
+ /// Try to write a pending GOAWAY frame to the buffer.
+ ///
+ /// If a frame is written, the `Reason` of the GOAWAY is returned.
+ pub fn send_pending_go_away<T, B>(
+ &mut self,
+ cx: &mut Context,
+ dst: &mut Codec<T, B>,
+ ) -> Poll<Option<io::Result<Reason>>>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ {
+ if let Some(frame) = self.pending.take() {
+ if !dst.poll_ready(cx)?.is_ready() {
+ self.pending = Some(frame);
+ return Poll::Pending;
+ }
+
+ let reason = frame.reason();
+ dst.buffer(frame.into()).expect("invalid GOAWAY frame");
+
+ return Poll::Ready(Some(Ok(reason)));
+ } else if self.should_close_now() {
+ return match self.going_away().map(|going_away| going_away.reason) {
+ Some(reason) => Poll::Ready(Some(Ok(reason))),
+ None => Poll::Ready(None),
+ };
+ }
+
+ Poll::Ready(None)
+ }
+}
+
+impl GoingAway {
+ pub(crate) fn reason(&self) -> Reason {
+ self.reason
+ }
+}
diff --git a/third_party/rust/h2/src/proto/mod.rs b/third_party/rust/h2/src/proto/mod.rs
new file mode 100644
index 0000000000..567d030606
--- /dev/null
+++ b/third_party/rust/h2/src/proto/mod.rs
@@ -0,0 +1,37 @@
+mod connection;
+mod error;
+mod go_away;
+mod peer;
+mod ping_pong;
+mod settings;
+mod streams;
+
+pub(crate) use self::connection::{Config, Connection};
+pub use self::error::{Error, Initiator};
+pub(crate) use self::peer::{Dyn as DynPeer, Peer};
+pub(crate) use self::ping_pong::UserPings;
+pub(crate) use self::streams::{DynStreams, OpaqueStreamRef, StreamRef, Streams};
+pub(crate) use self::streams::{Open, PollReset, Prioritized};
+
+use crate::codec::Codec;
+
+use self::go_away::GoAway;
+use self::ping_pong::PingPong;
+use self::settings::Settings;
+
+use crate::frame::{self, Frame};
+
+use bytes::Buf;
+
+use tokio::io::AsyncWrite;
+
+pub type PingPayload = [u8; 8];
+
+pub type WindowSize = u32;
+
+// Constants
+pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; // i32::MAX as u32
+pub const DEFAULT_REMOTE_RESET_STREAM_MAX: usize = 20;
+pub const DEFAULT_RESET_STREAM_MAX: usize = 10;
+pub const DEFAULT_RESET_STREAM_SECS: u64 = 30;
+pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400;
diff --git a/third_party/rust/h2/src/proto/peer.rs b/third_party/rust/h2/src/proto/peer.rs
new file mode 100644
index 0000000000..d62d9e24e0
--- /dev/null
+++ b/third_party/rust/h2/src/proto/peer.rs
@@ -0,0 +1,93 @@
+use crate::error::Reason;
+use crate::frame::{Pseudo, StreamId};
+use crate::proto::{Error, Open};
+
+use http::{HeaderMap, Request, Response};
+
+use std::fmt;
+
+/// Either a Client or a Server
+pub(crate) trait Peer {
+ /// Message type polled from the transport
+ type Poll: fmt::Debug;
+ const NAME: &'static str;
+
+ fn r#dyn() -> Dyn;
+
+ fn is_server() -> bool;
+
+ fn convert_poll_message(
+ pseudo: Pseudo,
+ fields: HeaderMap,
+ stream_id: StreamId,
+ ) -> Result<Self::Poll, Error>;
+
+ fn is_local_init(id: StreamId) -> bool {
+ assert!(!id.is_zero());
+ Self::is_server() == id.is_server_initiated()
+ }
+}
+
+/// A dynamic representation of `Peer`.
+///
+/// This is used internally to avoid incurring a generic on all internal types.
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+pub(crate) enum Dyn {
+ Client,
+ Server,
+}
+
+#[derive(Debug)]
+pub enum PollMessage {
+ Client(Response<()>),
+ Server(Request<()>),
+}
+
+// ===== impl Dyn =====
+
+impl Dyn {
+ pub fn is_server(&self) -> bool {
+ *self == Dyn::Server
+ }
+
+ pub fn is_local_init(&self, id: StreamId) -> bool {
+ assert!(!id.is_zero());
+ self.is_server() == id.is_server_initiated()
+ }
+
+ pub fn convert_poll_message(
+ &self,
+ pseudo: Pseudo,
+ fields: HeaderMap,
+ stream_id: StreamId,
+ ) -> Result<PollMessage, Error> {
+ if self.is_server() {
+ crate::server::Peer::convert_poll_message(pseudo, fields, stream_id)
+ .map(PollMessage::Server)
+ } else {
+ crate::client::Peer::convert_poll_message(pseudo, fields, stream_id)
+ .map(PollMessage::Client)
+ }
+ }
+
+ /// Returns true if the remote peer can initiate a stream with the given ID.
+ pub fn ensure_can_open(&self, id: StreamId, mode: Open) -> Result<(), Error> {
+ if self.is_server() {
+ // Ensure that the ID is a valid client initiated ID
+ if mode.is_push_promise() || !id.is_client_initiated() {
+ proto_err!(conn: "cannot open stream {:?} - not client initiated", id);
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+
+ Ok(())
+ } else {
+ // Ensure that the ID is a valid server initiated ID
+ if !mode.is_push_promise() || !id.is_server_initiated() {
+ proto_err!(conn: "cannot open stream {:?} - not server initiated", id);
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+
+ Ok(())
+ }
+ }
+}
diff --git a/third_party/rust/h2/src/proto/ping_pong.rs b/third_party/rust/h2/src/proto/ping_pong.rs
new file mode 100644
index 0000000000..59023e26a2
--- /dev/null
+++ b/third_party/rust/h2/src/proto/ping_pong.rs
@@ -0,0 +1,291 @@
+use crate::codec::Codec;
+use crate::frame::Ping;
+use crate::proto::{self, PingPayload};
+
+use bytes::Buf;
+use futures_util::task::AtomicWaker;
+use std::io;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use tokio::io::AsyncWrite;
+
+/// Acknowledges ping requests from the remote.
+#[derive(Debug)]
+pub(crate) struct PingPong {
+ pending_ping: Option<PendingPing>,
+ pending_pong: Option<PingPayload>,
+ user_pings: Option<UserPingsRx>,
+}
+
+#[derive(Debug)]
+pub(crate) struct UserPings(Arc<UserPingsInner>);
+
+#[derive(Debug)]
+struct UserPingsRx(Arc<UserPingsInner>);
+
+#[derive(Debug)]
+struct UserPingsInner {
+ state: AtomicUsize,
+ /// Task to wake up the main `Connection`.
+ ping_task: AtomicWaker,
+ /// Task to wake up `share::PingPong::poll_pong`.
+ pong_task: AtomicWaker,
+}
+
+#[derive(Debug)]
+struct PendingPing {
+ payload: PingPayload,
+ sent: bool,
+}
+
+/// Status returned from `PingPong::recv_ping`.
+#[derive(Debug)]
+pub(crate) enum ReceivedPing {
+ MustAck,
+ Unknown,
+ Shutdown,
+}
+
+/// No user ping pending.
+const USER_STATE_EMPTY: usize = 0;
+/// User has called `send_ping`, but PING hasn't been written yet.
+const USER_STATE_PENDING_PING: usize = 1;
+/// User PING has been written, waiting for PONG.
+const USER_STATE_PENDING_PONG: usize = 2;
+/// We've received user PONG, waiting for user to `poll_pong`.
+const USER_STATE_RECEIVED_PONG: usize = 3;
+/// The connection is closed.
+const USER_STATE_CLOSED: usize = 4;
+
+// ===== impl PingPong =====
+
+impl PingPong {
+ pub(crate) fn new() -> Self {
+ PingPong {
+ pending_ping: None,
+ pending_pong: None,
+ user_pings: None,
+ }
+ }
+
+ /// Can only be called once. If called a second time, returns `None`.
+ pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> {
+ if self.user_pings.is_some() {
+ return None;
+ }
+
+ let user_pings = Arc::new(UserPingsInner {
+ state: AtomicUsize::new(USER_STATE_EMPTY),
+ ping_task: AtomicWaker::new(),
+ pong_task: AtomicWaker::new(),
+ });
+ self.user_pings = Some(UserPingsRx(user_pings.clone()));
+ Some(UserPings(user_pings))
+ }
+
+ pub(crate) fn ping_shutdown(&mut self) {
+ assert!(self.pending_ping.is_none());
+
+ self.pending_ping = Some(PendingPing {
+ payload: Ping::SHUTDOWN,
+ sent: false,
+ });
+ }
+
+ /// Process a ping
+ pub(crate) fn recv_ping(&mut self, ping: Ping) -> ReceivedPing {
+ // The caller should always check that `send_pongs` returns ready before
+ // calling `recv_ping`.
+ assert!(self.pending_pong.is_none());
+
+ if ping.is_ack() {
+ if let Some(pending) = self.pending_ping.take() {
+ if &pending.payload == ping.payload() {
+ assert_eq!(
+ &pending.payload,
+ &Ping::SHUTDOWN,
+ "pending_ping should be for shutdown",
+ );
+ tracing::trace!("recv PING SHUTDOWN ack");
+ return ReceivedPing::Shutdown;
+ }
+
+ // if not the payload we expected, put it back.
+ self.pending_ping = Some(pending);
+ }
+
+ if let Some(ref users) = self.user_pings {
+ if ping.payload() == &Ping::USER && users.receive_pong() {
+ tracing::trace!("recv PING USER ack");
+ return ReceivedPing::Unknown;
+ }
+ }
+
+ // else we were acked a ping we didn't send?
+ // The spec doesn't require us to do anything about this,
+ // so for resiliency, just ignore it for now.
+ tracing::warn!("recv PING ack that we never sent: {:?}", ping);
+ ReceivedPing::Unknown
+ } else {
+ // Save the ping's payload to be sent as an acknowledgement.
+ self.pending_pong = Some(ping.into_payload());
+ ReceivedPing::MustAck
+ }
+ }
+
+ /// Send any pending pongs.
+ pub(crate) fn send_pending_pong<T, B>(
+ &mut self,
+ cx: &mut Context,
+ dst: &mut Codec<T, B>,
+ ) -> Poll<io::Result<()>>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ {
+ if let Some(pong) = self.pending_pong.take() {
+ if !dst.poll_ready(cx)?.is_ready() {
+ self.pending_pong = Some(pong);
+ return Poll::Pending;
+ }
+
+ dst.buffer(Ping::pong(pong).into())
+ .expect("invalid pong frame");
+ }
+
+ Poll::Ready(Ok(()))
+ }
+
+ /// Send any pending pings.
+ pub(crate) fn send_pending_ping<T, B>(
+ &mut self,
+ cx: &mut Context,
+ dst: &mut Codec<T, B>,
+ ) -> Poll<io::Result<()>>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ {
+ if let Some(ref mut ping) = self.pending_ping {
+ if !ping.sent {
+ if !dst.poll_ready(cx)?.is_ready() {
+ return Poll::Pending;
+ }
+
+ dst.buffer(Ping::new(ping.payload).into())
+ .expect("invalid ping frame");
+ ping.sent = true;
+ }
+ } else if let Some(ref users) = self.user_pings {
+ if users.0.state.load(Ordering::Acquire) == USER_STATE_PENDING_PING {
+ if !dst.poll_ready(cx)?.is_ready() {
+ return Poll::Pending;
+ }
+
+ dst.buffer(Ping::new(Ping::USER).into())
+ .expect("invalid ping frame");
+ users
+ .0
+ .state
+ .store(USER_STATE_PENDING_PONG, Ordering::Release);
+ } else {
+ users.0.ping_task.register(cx.waker());
+ }
+ }
+
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl ReceivedPing {
+ pub(crate) fn is_shutdown(&self) -> bool {
+ matches!(*self, Self::Shutdown)
+ }
+}
+
+// ===== impl UserPings =====
+
+impl UserPings {
+ pub(crate) fn send_ping(&self) -> Result<(), Option<proto::Error>> {
+ let prev = self
+ .0
+ .state
+ .compare_exchange(
+ USER_STATE_EMPTY, // current
+ USER_STATE_PENDING_PING, // new
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ )
+ .unwrap_or_else(|v| v);
+
+ match prev {
+ USER_STATE_EMPTY => {
+ self.0.ping_task.wake();
+ Ok(())
+ }
+ USER_STATE_CLOSED => Err(Some(broken_pipe().into())),
+ _ => {
+ // Was already pending, user error!
+ Err(None)
+ }
+ }
+ }
+
+ pub(crate) fn poll_pong(&self, cx: &mut Context) -> Poll<Result<(), proto::Error>> {
+ // Must register before checking state, in case state were to change
+ // before we could register, and then the ping would just be lost.
+ self.0.pong_task.register(cx.waker());
+ let prev = self
+ .0
+ .state
+ .compare_exchange(
+ USER_STATE_RECEIVED_PONG, // current
+ USER_STATE_EMPTY, // new
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ )
+ .unwrap_or_else(|v| v);
+
+ match prev {
+ USER_STATE_RECEIVED_PONG => Poll::Ready(Ok(())),
+ USER_STATE_CLOSED => Poll::Ready(Err(broken_pipe().into())),
+ _ => Poll::Pending,
+ }
+ }
+}
+
+// ===== impl UserPingsRx =====
+
+impl UserPingsRx {
+ fn receive_pong(&self) -> bool {
+ let prev = self
+ .0
+ .state
+ .compare_exchange(
+ USER_STATE_PENDING_PONG, // current
+ USER_STATE_RECEIVED_PONG, // new
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ )
+ .unwrap_or_else(|v| v);
+
+ if prev == USER_STATE_PENDING_PONG {
+ self.0.pong_task.wake();
+ true
+ } else {
+ false
+ }
+ }
+}
+
+impl Drop for UserPingsRx {
+ fn drop(&mut self) {
+ self.0.state.store(USER_STATE_CLOSED, Ordering::Release);
+ self.0.pong_task.wake();
+ }
+}
+
+fn broken_pipe() -> io::Error {
+ io::ErrorKind::BrokenPipe.into()
+}
diff --git a/third_party/rust/h2/src/proto/settings.rs b/third_party/rust/h2/src/proto/settings.rs
new file mode 100644
index 0000000000..28065cc689
--- /dev/null
+++ b/third_party/rust/h2/src/proto/settings.rs
@@ -0,0 +1,155 @@
+use crate::codec::UserError;
+use crate::error::Reason;
+use crate::frame;
+use crate::proto::*;
+use std::task::{Context, Poll};
+
+#[derive(Debug)]
+pub(crate) struct Settings {
+ /// Our local SETTINGS sync state with the remote.
+ local: Local,
+ /// Received SETTINGS frame pending processing. The ACK must be written to
+ /// the socket first then the settings applied **before** receiving any
+ /// further frames.
+ remote: Option<frame::Settings>,
+}
+
+#[derive(Debug)]
+enum Local {
+ /// We want to send these SETTINGS to the remote when the socket is ready.
+ ToSend(frame::Settings),
+ /// We have sent these SETTINGS and are waiting for the remote to ACK
+ /// before we apply them.
+ WaitingAck(frame::Settings),
+ /// Our local settings are in sync with the remote.
+ Synced,
+}
+
+impl Settings {
+ pub(crate) fn new(local: frame::Settings) -> Self {
+ Settings {
+ // We assume the initial local SETTINGS were flushed during
+ // the handshake process.
+ local: Local::WaitingAck(local),
+ remote: None,
+ }
+ }
+
+ pub(crate) fn recv_settings<T, B, C, P>(
+ &mut self,
+ frame: frame::Settings,
+ codec: &mut Codec<T, B>,
+ streams: &mut Streams<C, P>,
+ ) -> Result<(), Error>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ C: Buf,
+ P: Peer,
+ {
+ if frame.is_ack() {
+ match &self.local {
+ Local::WaitingAck(local) => {
+ tracing::debug!("received settings ACK; applying {:?}", local);
+
+ if let Some(max) = local.max_frame_size() {
+ codec.set_max_recv_frame_size(max as usize);
+ }
+
+ if let Some(max) = local.max_header_list_size() {
+ codec.set_max_recv_header_list_size(max as usize);
+ }
+
+ if let Some(val) = local.header_table_size() {
+ codec.set_recv_header_table_size(val as usize);
+ }
+
+ streams.apply_local_settings(local)?;
+ self.local = Local::Synced;
+ Ok(())
+ }
+ Local::ToSend(..) | Local::Synced => {
+ // We haven't sent any SETTINGS frames to be ACKed, so
+ // this is very bizarre! Remote is either buggy or malicious.
+ proto_err!(conn: "received unexpected settings ack");
+ Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
+ }
+ }
+ } else {
+ // We always ACK before reading more frames, so `remote` should
+ // always be none!
+ assert!(self.remote.is_none());
+ self.remote = Some(frame);
+ Ok(())
+ }
+ }
+
+ pub(crate) fn send_settings(&mut self, frame: frame::Settings) -> Result<(), UserError> {
+ assert!(!frame.is_ack());
+ match &self.local {
+ Local::ToSend(..) | Local::WaitingAck(..) => Err(UserError::SendSettingsWhilePending),
+ Local::Synced => {
+ tracing::trace!("queue to send local settings: {:?}", frame);
+ self.local = Local::ToSend(frame);
+ Ok(())
+ }
+ }
+ }
+
+ pub(crate) fn poll_send<T, B, C, P>(
+ &mut self,
+ cx: &mut Context,
+ dst: &mut Codec<T, B>,
+ streams: &mut Streams<C, P>,
+ ) -> Poll<Result<(), Error>>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ C: Buf,
+ P: Peer,
+ {
+ if let Some(settings) = &self.remote {
+ if !dst.poll_ready(cx)?.is_ready() {
+ return Poll::Pending;
+ }
+
+ // Create an ACK settings frame
+ let frame = frame::Settings::ack();
+
+ // Buffer the settings frame
+ dst.buffer(frame.into()).expect("invalid settings frame");
+
+ tracing::trace!("ACK sent; applying settings");
+
+ streams.apply_remote_settings(settings)?;
+
+ if let Some(val) = settings.header_table_size() {
+ dst.set_send_header_table_size(val as usize);
+ }
+
+ if let Some(val) = settings.max_frame_size() {
+ dst.set_max_send_frame_size(val as usize);
+ }
+ }
+
+ self.remote = None;
+
+ match &self.local {
+ Local::ToSend(settings) => {
+ if !dst.poll_ready(cx)?.is_ready() {
+ return Poll::Pending;
+ }
+
+ // Buffer the settings frame
+ dst.buffer(settings.clone().into())
+ .expect("invalid settings frame");
+ tracing::trace!("local settings sent; waiting for ack: {:?}", settings);
+
+ self.local = Local::WaitingAck(settings.clone());
+ }
+ Local::WaitingAck(..) | Local::Synced => {}
+ }
+
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/h2/src/proto/streams/buffer.rs b/third_party/rust/h2/src/proto/streams/buffer.rs
new file mode 100644
index 0000000000..2648a410e4
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/buffer.rs
@@ -0,0 +1,95 @@
+use slab::Slab;
+
+/// Buffers frames for multiple streams.
+#[derive(Debug)]
+pub struct Buffer<T> {
+ slab: Slab<Slot<T>>,
+}
+
+/// A sequence of frames in a `Buffer`
+#[derive(Debug)]
+pub struct Deque {
+ indices: Option<Indices>,
+}
+
+/// Tracks the head & tail for a sequence of frames in a `Buffer`.
+#[derive(Debug, Default, Copy, Clone)]
+struct Indices {
+ head: usize,
+ tail: usize,
+}
+
+#[derive(Debug)]
+struct Slot<T> {
+ value: T,
+ next: Option<usize>,
+}
+
+impl<T> Buffer<T> {
+ pub fn new() -> Self {
+ Buffer { slab: Slab::new() }
+ }
+}
+
+impl Deque {
+ pub fn new() -> Self {
+ Deque { indices: None }
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.indices.is_none()
+ }
+
+ pub fn push_back<T>(&mut self, buf: &mut Buffer<T>, value: T) {
+ let key = buf.slab.insert(Slot { value, next: None });
+
+ match self.indices {
+ Some(ref mut idxs) => {
+ buf.slab[idxs.tail].next = Some(key);
+ idxs.tail = key;
+ }
+ None => {
+ self.indices = Some(Indices {
+ head: key,
+ tail: key,
+ });
+ }
+ }
+ }
+
+ pub fn push_front<T>(&mut self, buf: &mut Buffer<T>, value: T) {
+ let key = buf.slab.insert(Slot { value, next: None });
+
+ match self.indices {
+ Some(ref mut idxs) => {
+ buf.slab[key].next = Some(idxs.head);
+ idxs.head = key;
+ }
+ None => {
+ self.indices = Some(Indices {
+ head: key,
+ tail: key,
+ });
+ }
+ }
+ }
+
+ pub fn pop_front<T>(&mut self, buf: &mut Buffer<T>) -> Option<T> {
+ match self.indices {
+ Some(mut idxs) => {
+ let mut slot = buf.slab.remove(idxs.head);
+
+ if idxs.head == idxs.tail {
+ assert!(slot.next.is_none());
+ self.indices = None;
+ } else {
+ idxs.head = slot.next.take().unwrap();
+ self.indices = Some(idxs);
+ }
+
+ Some(slot.value)
+ }
+ None => None,
+ }
+ }
+}
diff --git a/third_party/rust/h2/src/proto/streams/counts.rs b/third_party/rust/h2/src/proto/streams/counts.rs
new file mode 100644
index 0000000000..add1312e55
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/counts.rs
@@ -0,0 +1,253 @@
+use super::*;
+
+use std::usize;
+
+#[derive(Debug)]
+pub(super) struct Counts {
+ /// Acting as a client or server. This allows us to track which values to
+ /// inc / dec.
+ peer: peer::Dyn,
+
+ /// Maximum number of locally initiated streams
+ max_send_streams: usize,
+
+ /// Current number of remote initiated streams
+ num_send_streams: usize,
+
+ /// Maximum number of remote initiated streams
+ max_recv_streams: usize,
+
+ /// Current number of locally initiated streams
+ num_recv_streams: usize,
+
+ /// Maximum number of pending locally reset streams
+ max_local_reset_streams: usize,
+
+ /// Current number of pending locally reset streams
+ num_local_reset_streams: usize,
+
+ /// Max number of "pending accept" streams that were remotely reset
+ max_remote_reset_streams: usize,
+
+ /// Current number of "pending accept" streams that were remotely reset
+ num_remote_reset_streams: usize,
+}
+
+impl Counts {
+ /// Create a new `Counts` using the provided configuration values.
+ pub fn new(peer: peer::Dyn, config: &Config) -> Self {
+ Counts {
+ peer,
+ max_send_streams: config.initial_max_send_streams,
+ num_send_streams: 0,
+ max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
+ num_recv_streams: 0,
+ max_local_reset_streams: config.local_reset_max,
+ num_local_reset_streams: 0,
+ max_remote_reset_streams: config.remote_reset_max,
+ num_remote_reset_streams: 0,
+ }
+ }
+
+ /// Returns true when the next opened stream will reach capacity of outbound streams
+ ///
+ /// The number of client send streams is incremented in prioritize; send_request has to guess if
+ /// it should wait before allowing another request to be sent.
+ pub fn next_send_stream_will_reach_capacity(&self) -> bool {
+ self.max_send_streams <= (self.num_send_streams + 1)
+ }
+
+ /// Returns the current peer
+ pub fn peer(&self) -> peer::Dyn {
+ self.peer
+ }
+
+ pub fn has_streams(&self) -> bool {
+ self.num_send_streams != 0 || self.num_recv_streams != 0
+ }
+
+ /// Returns true if the receive stream concurrency can be incremented
+ pub fn can_inc_num_recv_streams(&self) -> bool {
+ self.max_recv_streams > self.num_recv_streams
+ }
+
+ /// Increments the number of concurrent receive streams.
+ ///
+ /// # Panics
+ ///
+ /// Panics on failure as this should have been validated before hand.
+ pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
+ assert!(self.can_inc_num_recv_streams());
+ assert!(!stream.is_counted);
+
+ // Increment the number of remote initiated streams
+ self.num_recv_streams += 1;
+ stream.is_counted = true;
+ }
+
+ /// Returns true if the send stream concurrency can be incremented
+ pub fn can_inc_num_send_streams(&self) -> bool {
+ self.max_send_streams > self.num_send_streams
+ }
+
+ /// Increments the number of concurrent send streams.
+ ///
+ /// # Panics
+ ///
+ /// Panics on failure as this should have been validated before hand.
+ pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
+ assert!(self.can_inc_num_send_streams());
+ assert!(!stream.is_counted);
+
+ // Increment the number of remote initiated streams
+ self.num_send_streams += 1;
+ stream.is_counted = true;
+ }
+
+ /// Returns true if the number of pending reset streams can be incremented.
+ pub fn can_inc_num_reset_streams(&self) -> bool {
+ self.max_local_reset_streams > self.num_local_reset_streams
+ }
+
+ /// Increments the number of pending reset streams.
+ ///
+ /// # Panics
+ ///
+ /// Panics on failure as this should have been validated before hand.
+ pub fn inc_num_reset_streams(&mut self) {
+ assert!(self.can_inc_num_reset_streams());
+
+ self.num_local_reset_streams += 1;
+ }
+
+ pub(crate) fn max_remote_reset_streams(&self) -> usize {
+ self.max_remote_reset_streams
+ }
+
+ /// Returns true if the number of pending REMOTE reset streams can be
+ /// incremented.
+ pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
+ self.max_remote_reset_streams > self.num_remote_reset_streams
+ }
+
+ /// Increments the number of pending REMOTE reset streams.
+ ///
+ /// # Panics
+ ///
+ /// Panics on failure as this should have been validated before hand.
+ pub(crate) fn inc_num_remote_reset_streams(&mut self) {
+ assert!(self.can_inc_num_remote_reset_streams());
+
+ self.num_remote_reset_streams += 1;
+ }
+
+ pub(crate) fn dec_num_remote_reset_streams(&mut self) {
+ assert!(self.num_remote_reset_streams > 0);
+
+ self.num_remote_reset_streams -= 1;
+ }
+
+ pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
+ if let Some(val) = settings.max_concurrent_streams() {
+ self.max_send_streams = val as usize;
+ }
+ }
+
+ /// Run a block of code that could potentially transition a stream's state.
+ ///
+ /// If the stream state transitions to closed, this function will perform
+ /// all necessary cleanup.
+ ///
+ /// TODO: Is this function still needed?
+ pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
+ where
+ F: FnOnce(&mut Self, &mut store::Ptr) -> U,
+ {
+ // TODO: Does this need to be computed before performing the action?
+ let is_pending_reset = stream.is_pending_reset_expiration();
+
+ // Run the action
+ let ret = f(self, &mut stream);
+
+ self.transition_after(stream, is_pending_reset);
+
+ ret
+ }
+
+ // TODO: move this to macro?
+ pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
+ tracing::trace!(
+ "transition_after; stream={:?}; state={:?}; is_closed={:?}; \
+ pending_send_empty={:?}; buffered_send_data={}; \
+ num_recv={}; num_send={}",
+ stream.id,
+ stream.state,
+ stream.is_closed(),
+ stream.pending_send.is_empty(),
+ stream.buffered_send_data,
+ self.num_recv_streams,
+ self.num_send_streams
+ );
+
+ if stream.is_closed() {
+ if !stream.is_pending_reset_expiration() {
+ stream.unlink();
+ if is_reset_counted {
+ self.dec_num_reset_streams();
+ }
+ }
+
+ if stream.is_counted {
+ tracing::trace!("dec_num_streams; stream={:?}", stream.id);
+ // Decrement the number of active streams.
+ self.dec_num_streams(&mut stream);
+ }
+ }
+
+ // Release the stream if it requires releasing
+ if stream.is_released() {
+ stream.remove();
+ }
+ }
+
+ /// Returns the maximum number of streams that can be initiated by this
+ /// peer.
+ pub(crate) fn max_send_streams(&self) -> usize {
+ self.max_send_streams
+ }
+
+ /// Returns the maximum number of streams that can be initiated by the
+ /// remote peer.
+ pub(crate) fn max_recv_streams(&self) -> usize {
+ self.max_recv_streams
+ }
+
+ fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
+ assert!(stream.is_counted);
+
+ if self.peer.is_local_init(stream.id) {
+ assert!(self.num_send_streams > 0);
+ self.num_send_streams -= 1;
+ stream.is_counted = false;
+ } else {
+ assert!(self.num_recv_streams > 0);
+ self.num_recv_streams -= 1;
+ stream.is_counted = false;
+ }
+ }
+
+ fn dec_num_reset_streams(&mut self) {
+ assert!(self.num_local_reset_streams > 0);
+ self.num_local_reset_streams -= 1;
+ }
+}
+
+impl Drop for Counts {
+ fn drop(&mut self) {
+ use std::thread;
+
+ if !thread::panicking() {
+ debug_assert!(!self.has_streams());
+ }
+ }
+}
diff --git a/third_party/rust/h2/src/proto/streams/flow_control.rs b/third_party/rust/h2/src/proto/streams/flow_control.rs
new file mode 100644
index 0000000000..57a9358252
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/flow_control.rs
@@ -0,0 +1,269 @@
+use crate::frame::Reason;
+use crate::proto::{WindowSize, MAX_WINDOW_SIZE};
+
+use std::fmt;
+
+// We don't want to send WINDOW_UPDATE frames for tiny changes, but instead
+// aggregate them when the changes are significant. Many implementations do
+// this by keeping a "ratio" of the update version the allowed window size.
+//
+// While some may wish to represent this ratio as percentage, using a f32,
+// we skip having to deal with float math and stick to integers. To do so,
+// the "ratio" is represented by 2 i32s, split into the numerator and
+// denominator. For example, a 50% ratio is simply represented as 1/2.
+//
+// An example applying this ratio: If a stream has an allowed window size of
+// 100 bytes, WINDOW_UPDATE frames are scheduled when the unclaimed change
+// becomes greater than 1/2, or 50 bytes.
+const UNCLAIMED_NUMERATOR: i32 = 1;
+const UNCLAIMED_DENOMINATOR: i32 = 2;
+
+#[test]
+#[allow(clippy::assertions_on_constants)]
+fn sanity_unclaimed_ratio() {
+ assert!(UNCLAIMED_NUMERATOR < UNCLAIMED_DENOMINATOR);
+ assert!(UNCLAIMED_NUMERATOR >= 0);
+ assert!(UNCLAIMED_DENOMINATOR > 0);
+}
+
+#[derive(Copy, Clone, Debug)]
+pub struct FlowControl {
+ /// Window the peer knows about.
+ ///
+ /// This can go negative if a SETTINGS_INITIAL_WINDOW_SIZE is received.
+ ///
+ /// For example, say the peer sends a request and uses 32kb of the window.
+ /// We send a SETTINGS_INITIAL_WINDOW_SIZE of 16kb. The peer has to adjust
+ /// its understanding of the capacity of the window, and that would be:
+ ///
+ /// ```notrust
+ /// default (64kb) - used (32kb) - settings_diff (64kb - 16kb): -16kb
+ /// ```
+ window_size: Window,
+
+ /// Window that we know about.
+ ///
+ /// This can go negative if a user declares a smaller target window than
+ /// the peer knows about.
+ available: Window,
+}
+
+impl FlowControl {
+ pub fn new() -> FlowControl {
+ FlowControl {
+ window_size: Window(0),
+ available: Window(0),
+ }
+ }
+
+ /// Returns the window size as known by the peer
+ pub fn window_size(&self) -> WindowSize {
+ self.window_size.as_size()
+ }
+
+ /// Returns the window size available to the consumer
+ pub fn available(&self) -> Window {
+ self.available
+ }
+
+ /// Returns true if there is unavailable window capacity
+ pub fn has_unavailable(&self) -> bool {
+ if self.window_size < 0 {
+ return false;
+ }
+
+ self.window_size > self.available
+ }
+
+ pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
+ self.available.decrease_by(capacity)
+ }
+
+ pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
+ self.available.increase_by(capacity)
+ }
+
+ /// If a WINDOW_UPDATE frame should be sent, returns a positive number
+ /// representing the increment to be used.
+ ///
+ /// If there is no available bytes to be reclaimed, or the number of
+ /// available bytes does not reach the threshold, this returns `None`.
+ ///
+ /// This represents pending outbound WINDOW_UPDATE frames.
+ pub fn unclaimed_capacity(&self) -> Option<WindowSize> {
+ let available = self.available;
+
+ if self.window_size >= available {
+ return None;
+ }
+
+ let unclaimed = available.0 - self.window_size.0;
+ let threshold = self.window_size.0 / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR;
+
+ if unclaimed < threshold {
+ None
+ } else {
+ Some(unclaimed as WindowSize)
+ }
+ }
+
+ /// Increase the window size.
+ ///
+ /// This is called after receiving a WINDOW_UPDATE frame
+ pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
+ let (val, overflow) = self.window_size.0.overflowing_add(sz as i32);
+
+ if overflow {
+ return Err(Reason::FLOW_CONTROL_ERROR);
+ }
+
+ if val > MAX_WINDOW_SIZE as i32 {
+ return Err(Reason::FLOW_CONTROL_ERROR);
+ }
+
+ tracing::trace!(
+ "inc_window; sz={}; old={}; new={}",
+ sz,
+ self.window_size,
+ val
+ );
+
+ self.window_size = Window(val);
+ Ok(())
+ }
+
+ /// Decrement the send-side window size.
+ ///
+ /// This is called after receiving a SETTINGS frame with a lower
+ /// INITIAL_WINDOW_SIZE value.
+ pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
+ tracing::trace!(
+ "dec_window; sz={}; window={}, available={}",
+ sz,
+ self.window_size,
+ self.available
+ );
+ // ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can.
+ self.window_size.decrease_by(sz)?;
+ Ok(())
+ }
+
+ /// Decrement the recv-side window size.
+ ///
+ /// This is called after receiving a SETTINGS ACK frame with a lower
+ /// INITIAL_WINDOW_SIZE value.
+ pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
+ tracing::trace!(
+ "dec_recv_window; sz={}; window={}, available={}",
+ sz,
+ self.window_size,
+ self.available
+ );
+ // This should not be able to overflow `window_size` from the bottom.
+ self.window_size.decrease_by(sz)?;
+ self.available.decrease_by(sz)?;
+ Ok(())
+ }
+
+ /// Decrements the window reflecting data has actually been sent. The caller
+ /// must ensure that the window has capacity.
+ pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> {
+ tracing::trace!(
+ "send_data; sz={}; window={}; available={}",
+ sz,
+ self.window_size,
+ self.available
+ );
+
+ // If send size is zero it's meaningless to update flow control window
+ if sz > 0 {
+ // Ensure that the argument is correct
+ assert!(self.window_size.0 >= sz as i32);
+
+ // Update values
+ self.window_size.decrease_by(sz)?;
+ self.available.decrease_by(sz)?;
+ }
+ Ok(())
+ }
+}
+
+/// The current capacity of a flow-controlled Window.
+///
+/// This number can go negative when either side has used a certain amount
+/// of capacity when the other side advertises a reduction in size.
+///
+/// This type tries to centralize the knowledge of addition and subtraction
+/// to this capacity, instead of having integer casts throughout the source.
+#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd)]
+pub struct Window(i32);
+
+impl Window {
+ pub fn as_size(&self) -> WindowSize {
+ if self.0 < 0 {
+ 0
+ } else {
+ self.0 as WindowSize
+ }
+ }
+
+ pub fn checked_size(&self) -> WindowSize {
+ assert!(self.0 >= 0, "negative Window");
+ self.0 as WindowSize
+ }
+
+ pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> {
+ if let Some(v) = self.0.checked_sub(other as i32) {
+ self.0 = v;
+ Ok(())
+ } else {
+ Err(Reason::FLOW_CONTROL_ERROR)
+ }
+ }
+
+ pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> {
+ let other = self.add(other)?;
+ self.0 = other.0;
+ Ok(())
+ }
+
+ pub fn add(&self, other: WindowSize) -> Result<Self, Reason> {
+ if let Some(v) = self.0.checked_add(other as i32) {
+ Ok(Self(v))
+ } else {
+ Err(Reason::FLOW_CONTROL_ERROR)
+ }
+ }
+}
+
+impl PartialEq<usize> for Window {
+ fn eq(&self, other: &usize) -> bool {
+ if self.0 < 0 {
+ false
+ } else {
+ (self.0 as usize).eq(other)
+ }
+ }
+}
+
+impl PartialOrd<usize> for Window {
+ fn partial_cmp(&self, other: &usize) -> Option<::std::cmp::Ordering> {
+ if self.0 < 0 {
+ Some(::std::cmp::Ordering::Less)
+ } else {
+ (self.0 as usize).partial_cmp(other)
+ }
+ }
+}
+
+impl fmt::Display for Window {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ fmt::Display::fmt(&self.0, f)
+ }
+}
+
+impl From<Window> for isize {
+ fn from(w: Window) -> isize {
+ w.0 as isize
+ }
+}
diff --git a/third_party/rust/h2/src/proto/streams/mod.rs b/third_party/rust/h2/src/proto/streams/mod.rs
new file mode 100644
index 0000000000..fbe32c7b01
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/mod.rs
@@ -0,0 +1,72 @@
+mod buffer;
+mod counts;
+mod flow_control;
+mod prioritize;
+mod recv;
+mod send;
+mod state;
+mod store;
+mod stream;
+#[allow(clippy::module_inception)]
+mod streams;
+
+pub(crate) use self::prioritize::Prioritized;
+pub(crate) use self::recv::Open;
+pub(crate) use self::send::PollReset;
+pub(crate) use self::streams::{DynStreams, OpaqueStreamRef, StreamRef, Streams};
+
+use self::buffer::Buffer;
+use self::counts::Counts;
+use self::flow_control::FlowControl;
+use self::prioritize::Prioritize;
+use self::recv::Recv;
+use self::send::Send;
+use self::state::State;
+use self::store::Store;
+use self::stream::Stream;
+
+use crate::frame::{StreamId, StreamIdOverflow};
+use crate::proto::*;
+
+use bytes::Bytes;
+use std::time::Duration;
+
+#[derive(Debug)]
+pub struct Config {
+ /// Initial window size of locally initiated streams
+ pub local_init_window_sz: WindowSize,
+
+ /// Initial maximum number of locally initiated streams.
+ /// After receiving a Settings frame from the remote peer,
+ /// the connection will overwrite this value with the
+ /// MAX_CONCURRENT_STREAMS specified in the frame.
+ pub initial_max_send_streams: usize,
+
+ /// Max amount of DATA bytes to buffer per stream.
+ pub local_max_buffer_size: usize,
+
+ /// The stream ID to start the next local stream with
+ pub local_next_stream_id: StreamId,
+
+ /// If the local peer is willing to receive push promises
+ pub local_push_enabled: bool,
+
+ /// If extended connect protocol is enabled.
+ pub extended_connect_protocol_enabled: bool,
+
+ /// How long a locally reset stream should ignore frames
+ pub local_reset_duration: Duration,
+
+ /// Maximum number of locally reset streams to keep at a time
+ pub local_reset_max: usize,
+
+ /// Maximum number of remotely reset "pending accept" streams to keep at a
+ /// time. Going over this number results in a connection error.
+ pub remote_reset_max: usize,
+
+ /// Initial window size of remote initiated streams
+ pub remote_init_window_sz: WindowSize,
+
+ /// Maximum number of remote initiated streams
+ pub remote_max_initiated: Option<usize>,
+}
diff --git a/third_party/rust/h2/src/proto/streams/prioritize.rs b/third_party/rust/h2/src/proto/streams/prioritize.rs
new file mode 100644
index 0000000000..3196049a48
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/prioritize.rs
@@ -0,0 +1,931 @@
+use super::store::Resolve;
+use super::*;
+
+use crate::frame::{Reason, StreamId};
+
+use crate::codec::UserError;
+use crate::codec::UserError::*;
+
+use bytes::buf::{Buf, Take};
+use std::{
+ cmp::{self, Ordering},
+ fmt, io, mem,
+ task::{Context, Poll, Waker},
+};
+
+/// # Warning
+///
+/// Queued streams are ordered by stream ID, as we need to ensure that
+/// lower-numbered streams are sent headers before higher-numbered ones.
+/// This is because "idle" stream IDs – those which have been initiated but
+/// have yet to receive frames – will be implicitly closed on receipt of a
+/// frame on a higher stream ID. If these queues was not ordered by stream
+/// IDs, some mechanism would be necessary to ensure that the lowest-numbered]
+/// idle stream is opened first.
+#[derive(Debug)]
+pub(super) struct Prioritize {
+ /// Queue of streams waiting for socket capacity to send a frame.
+ pending_send: store::Queue<stream::NextSend>,
+
+ /// Queue of streams waiting for window capacity to produce data.
+ pending_capacity: store::Queue<stream::NextSendCapacity>,
+
+ /// Streams waiting for capacity due to max concurrency
+ ///
+ /// The `SendRequest` handle is `Clone`. This enables initiating requests
+ /// from many tasks. However, offering this capability while supporting
+ /// backpressure at some level is tricky. If there are many `SendRequest`
+ /// handles and a single stream becomes available, which handle gets
+ /// assigned that stream? Maybe that handle is no longer ready to send a
+ /// request.
+ ///
+ /// The strategy used is to allow each `SendRequest` handle one buffered
+ /// request. A `SendRequest` handle is ready to send a request if it has no
+ /// associated buffered requests. This is the same strategy as `mpsc` in the
+ /// futures library.
+ pending_open: store::Queue<stream::NextOpen>,
+
+ /// Connection level flow control governing sent data
+ flow: FlowControl,
+
+ /// Stream ID of the last stream opened.
+ last_opened_id: StreamId,
+
+ /// What `DATA` frame is currently being sent in the codec.
+ in_flight_data_frame: InFlightData,
+
+ /// The maximum amount of bytes a stream should buffer.
+ max_buffer_size: usize,
+}
+
+#[derive(Debug, Eq, PartialEq)]
+enum InFlightData {
+ /// There is no `DATA` frame in flight.
+ Nothing,
+ /// There is a `DATA` frame in flight belonging to the given stream.
+ DataFrame(store::Key),
+ /// There was a `DATA` frame, but the stream's queue was since cleared.
+ Drop,
+}
+
+pub(crate) struct Prioritized<B> {
+ // The buffer
+ inner: Take<B>,
+
+ end_of_stream: bool,
+
+ // The stream that this is associated with
+ stream: store::Key,
+}
+
+// ===== impl Prioritize =====
+
+impl Prioritize {
+ pub fn new(config: &Config) -> Prioritize {
+ let mut flow = FlowControl::new();
+
+ flow.inc_window(config.remote_init_window_sz)
+ .expect("invalid initial window size");
+
+ // TODO: proper error handling
+ let _res = flow.assign_capacity(config.remote_init_window_sz);
+ debug_assert!(_res.is_ok());
+
+ tracing::trace!("Prioritize::new; flow={:?}", flow);
+
+ Prioritize {
+ pending_send: store::Queue::new(),
+ pending_capacity: store::Queue::new(),
+ pending_open: store::Queue::new(),
+ flow,
+ last_opened_id: StreamId::ZERO,
+ in_flight_data_frame: InFlightData::Nothing,
+ max_buffer_size: config.local_max_buffer_size,
+ }
+ }
+
+ pub(crate) fn max_buffer_size(&self) -> usize {
+ self.max_buffer_size
+ }
+
+ /// Queue a frame to be sent to the remote
+ pub fn queue_frame<B>(
+ &mut self,
+ frame: Frame<B>,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ task: &mut Option<Waker>,
+ ) {
+ let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id);
+ let _e = span.enter();
+ // Queue the frame in the buffer
+ stream.pending_send.push_back(buffer, frame);
+ self.schedule_send(stream, task);
+ }
+
+ pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
+ // If the stream is waiting to be opened, nothing more to do.
+ if stream.is_send_ready() {
+ tracing::trace!(?stream.id, "schedule_send");
+ // Queue the stream
+ self.pending_send.push(stream);
+
+ // Notify the connection.
+ if let Some(task) = task.take() {
+ task.wake();
+ }
+ }
+ }
+
+ pub fn queue_open(&mut self, stream: &mut store::Ptr) {
+ self.pending_open.push(stream);
+ }
+
+ /// Send a data frame
+ pub fn send_data<B>(
+ &mut self,
+ frame: frame::Data<B>,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) -> Result<(), UserError>
+ where
+ B: Buf,
+ {
+ let sz = frame.payload().remaining();
+
+ if sz > MAX_WINDOW_SIZE as usize {
+ return Err(UserError::PayloadTooBig);
+ }
+
+ let sz = sz as WindowSize;
+
+ if !stream.state.is_send_streaming() {
+ if stream.state.is_closed() {
+ return Err(InactiveStreamId);
+ } else {
+ return Err(UnexpectedFrameType);
+ }
+ }
+
+ // Update the buffered data counter
+ stream.buffered_send_data += sz as usize;
+
+ let span =
+ tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity);
+ let _e = span.enter();
+ tracing::trace!(buffered = stream.buffered_send_data);
+
+ // Implicitly request more send capacity if not enough has been
+ // requested yet.
+ if (stream.requested_send_capacity as usize) < stream.buffered_send_data {
+ // Update the target requested capacity
+ stream.requested_send_capacity =
+ cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize;
+
+ self.try_assign_capacity(stream);
+ }
+
+ if frame.is_end_stream() {
+ stream.state.send_close();
+ self.reserve_capacity(0, stream, counts);
+ }
+
+ tracing::trace!(
+ available = %stream.send_flow.available(),
+ buffered = stream.buffered_send_data,
+ );
+
+ // The `stream.buffered_send_data == 0` check is here so that, if a zero
+ // length data frame is queued to the front (there is no previously
+ // queued data), it gets sent out immediately even if there is no
+ // available send window.
+ //
+ // Sending out zero length data frames can be done to signal
+ // end-of-stream.
+ //
+ if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
+ // The stream currently has capacity to send the data frame, so
+ // queue it up and notify the connection task.
+ self.queue_frame(frame.into(), buffer, stream, task);
+ } else {
+ // The stream has no capacity to send the frame now, save it but
+ // don't notify the connection task. Once additional capacity
+ // becomes available, the frame will be flushed.
+ stream.pending_send.push_back(buffer, frame.into());
+ }
+
+ Ok(())
+ }
+
+ /// Request capacity to send data
+ pub fn reserve_capacity(
+ &mut self,
+ capacity: WindowSize,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ ) {
+ let span = tracing::trace_span!(
+ "reserve_capacity",
+ ?stream.id,
+ requested = capacity,
+ effective = (capacity as usize) + stream.buffered_send_data,
+ curr = stream.requested_send_capacity
+ );
+ let _e = span.enter();
+
+ // Actual capacity is `capacity` + the current amount of buffered data.
+ // If it were less, then we could never send out the buffered data.
+ let capacity = (capacity as usize) + stream.buffered_send_data;
+
+ match capacity.cmp(&(stream.requested_send_capacity as usize)) {
+ Ordering::Equal => {
+ // Nothing to do
+ }
+ Ordering::Less => {
+ // Update the target requested capacity
+ stream.requested_send_capacity = capacity as WindowSize;
+
+ // Currently available capacity assigned to the stream
+ let available = stream.send_flow.available().as_size();
+
+ // If the stream has more assigned capacity than requested, reclaim
+ // some for the connection
+ if available as usize > capacity {
+ let diff = available - capacity as WindowSize;
+
+ // TODO: proper error handling
+ let _res = stream.send_flow.claim_capacity(diff);
+ debug_assert!(_res.is_ok());
+
+ self.assign_connection_capacity(diff, stream, counts);
+ }
+ }
+ Ordering::Greater => {
+ // If trying to *add* capacity, but the stream send side is closed,
+ // there's nothing to be done.
+ if stream.state.is_send_closed() {
+ return;
+ }
+
+ // Update the target requested capacity
+ stream.requested_send_capacity =
+ cmp::min(capacity, WindowSize::MAX as usize) as WindowSize;
+
+ // Try to assign additional capacity to the stream. If none is
+ // currently available, the stream will be queued to receive some
+ // when more becomes available.
+ self.try_assign_capacity(stream);
+ }
+ }
+ }
+
+ pub fn recv_stream_window_update(
+ &mut self,
+ inc: WindowSize,
+ stream: &mut store::Ptr,
+ ) -> Result<(), Reason> {
+ let span = tracing::trace_span!(
+ "recv_stream_window_update",
+ ?stream.id,
+ ?stream.state,
+ inc,
+ flow = ?stream.send_flow
+ );
+ let _e = span.enter();
+
+ if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
+ // We can't send any data, so don't bother doing anything else.
+ return Ok(());
+ }
+
+ // Update the stream level flow control.
+ stream.send_flow.inc_window(inc)?;
+
+ // If the stream is waiting on additional capacity, then this will
+ // assign it (if available on the connection) and notify the producer
+ self.try_assign_capacity(stream);
+
+ Ok(())
+ }
+
+ pub fn recv_connection_window_update(
+ &mut self,
+ inc: WindowSize,
+ store: &mut Store,
+ counts: &mut Counts,
+ ) -> Result<(), Reason> {
+ // Update the connection's window
+ self.flow.inc_window(inc)?;
+
+ self.assign_connection_capacity(inc, store, counts);
+ Ok(())
+ }
+
+ /// Reclaim all capacity assigned to the stream and re-assign it to the
+ /// connection
+ pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
+ let available = stream.send_flow.available().as_size();
+ if available > 0 {
+ // TODO: proper error handling
+ let _res = stream.send_flow.claim_capacity(available);
+ debug_assert!(_res.is_ok());
+ // Re-assign all capacity to the connection
+ self.assign_connection_capacity(available, stream, counts);
+ }
+ }
+
+ /// Reclaim just reserved capacity, not buffered capacity, and re-assign
+ /// it to the connection
+ pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
+ // only reclaim requested capacity that isn't already buffered
+ if stream.requested_send_capacity as usize > stream.buffered_send_data {
+ let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize;
+
+ // TODO: proper error handling
+ let _res = stream.send_flow.claim_capacity(reserved);
+ debug_assert!(_res.is_ok());
+ self.assign_connection_capacity(reserved, stream, counts);
+ }
+ }
+
+ pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
+ let span = tracing::trace_span!("clear_pending_capacity");
+ let _e = span.enter();
+ while let Some(stream) = self.pending_capacity.pop(store) {
+ counts.transition(stream, |_, stream| {
+ tracing::trace!(?stream.id, "clear_pending_capacity");
+ })
+ }
+ }
+
+ pub fn assign_connection_capacity<R>(
+ &mut self,
+ inc: WindowSize,
+ store: &mut R,
+ counts: &mut Counts,
+ ) where
+ R: Resolve,
+ {
+ let span = tracing::trace_span!("assign_connection_capacity", inc);
+ let _e = span.enter();
+
+ // TODO: proper error handling
+ let _res = self.flow.assign_capacity(inc);
+ debug_assert!(_res.is_ok());
+
+ // Assign newly acquired capacity to streams pending capacity.
+ while self.flow.available() > 0 {
+ let stream = match self.pending_capacity.pop(store) {
+ Some(stream) => stream,
+ None => return,
+ };
+
+ // Streams pending capacity may have been reset before capacity
+ // became available. In that case, the stream won't want any
+ // capacity, and so we shouldn't "transition" on it, but just evict
+ // it and continue the loop.
+ if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
+ continue;
+ }
+
+ counts.transition(stream, |_, stream| {
+ // Try to assign capacity to the stream. This will also re-queue the
+ // stream if there isn't enough connection level capacity to fulfill
+ // the capacity request.
+ self.try_assign_capacity(stream);
+ })
+ }
+ }
+
+ /// Request capacity to send data
+ fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
+ let total_requested = stream.requested_send_capacity;
+
+ // Total requested should never go below actual assigned
+ // (Note: the window size can go lower than assigned)
+ debug_assert!(stream.send_flow.available() <= total_requested as usize);
+
+ // The amount of additional capacity that the stream requests.
+ // Don't assign more than the window has available!
+ let additional = cmp::min(
+ total_requested - stream.send_flow.available().as_size(),
+ // Can't assign more than what is available
+ stream.send_flow.window_size() - stream.send_flow.available().as_size(),
+ );
+ let span = tracing::trace_span!("try_assign_capacity", ?stream.id);
+ let _e = span.enter();
+ tracing::trace!(
+ requested = total_requested,
+ additional,
+ buffered = stream.buffered_send_data,
+ window = stream.send_flow.window_size(),
+ conn = %self.flow.available()
+ );
+
+ if additional == 0 {
+ // Nothing more to do
+ return;
+ }
+
+ // If the stream has requested capacity, then it must be in the
+ // streaming state (more data could be sent) or there is buffered data
+ // waiting to be sent.
+ debug_assert!(
+ stream.state.is_send_streaming() || stream.buffered_send_data > 0,
+ "state={:?}",
+ stream.state
+ );
+
+ // The amount of currently available capacity on the connection
+ let conn_available = self.flow.available().as_size();
+
+ // First check if capacity is immediately available
+ if conn_available > 0 {
+ // The amount of capacity to assign to the stream
+ // TODO: Should prioritization factor into this?
+ let assign = cmp::min(conn_available, additional);
+
+ tracing::trace!(capacity = assign, "assigning");
+
+ // Assign the capacity to the stream
+ stream.assign_capacity(assign, self.max_buffer_size);
+
+ // Claim the capacity from the connection
+ // TODO: proper error handling
+ let _res = self.flow.claim_capacity(assign);
+ debug_assert!(_res.is_ok());
+ }
+
+ tracing::trace!(
+ available = %stream.send_flow.available(),
+ requested = stream.requested_send_capacity,
+ buffered = stream.buffered_send_data,
+ has_unavailable = %stream.send_flow.has_unavailable()
+ );
+
+ if stream.send_flow.available() < stream.requested_send_capacity as usize
+ && stream.send_flow.has_unavailable()
+ {
+ // The stream requires additional capacity and the stream's
+ // window has available capacity, but the connection window
+ // does not.
+ //
+ // In this case, the stream needs to be queued up for when the
+ // connection has more capacity.
+ self.pending_capacity.push(stream);
+ }
+
+ // If data is buffered and the stream is send ready, then
+ // schedule the stream for execution
+ if stream.buffered_send_data > 0 && stream.is_send_ready() {
+ // TODO: This assertion isn't *exactly* correct. There can still be
+ // buffered send data while the stream's pending send queue is
+ // empty. This can happen when a large data frame is in the process
+ // of being **partially** sent. Once the window has been sent, the
+ // data frame will be returned to the prioritization layer to be
+ // re-scheduled.
+ //
+ // That said, it would be nice to figure out how to make this
+ // assertion correctly.
+ //
+ // debug_assert!(!stream.pending_send.is_empty());
+
+ self.pending_send.push(stream);
+ }
+ }
+
+ pub fn poll_complete<T, B>(
+ &mut self,
+ cx: &mut Context,
+ buffer: &mut Buffer<Frame<B>>,
+ store: &mut Store,
+ counts: &mut Counts,
+ dst: &mut Codec<T, Prioritized<B>>,
+ ) -> Poll<io::Result<()>>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ {
+ // Ensure codec is ready
+ ready!(dst.poll_ready(cx))?;
+
+ // Reclaim any frame that has previously been written
+ self.reclaim_frame(buffer, store, dst);
+
+ // The max frame length
+ let max_frame_len = dst.max_send_frame_size();
+
+ tracing::trace!("poll_complete");
+
+ loop {
+ if let Some(mut stream) = self.pop_pending_open(store, counts) {
+ self.pending_send.push_front(&mut stream);
+ }
+
+ match self.pop_frame(buffer, store, max_frame_len, counts) {
+ Some(frame) => {
+ tracing::trace!(?frame, "writing");
+
+ debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
+ if let Frame::Data(ref frame) = frame {
+ self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
+ }
+ dst.buffer(frame).expect("invalid frame");
+
+ // Ensure the codec is ready to try the loop again.
+ ready!(dst.poll_ready(cx))?;
+
+ // Because, always try to reclaim...
+ self.reclaim_frame(buffer, store, dst);
+ }
+ None => {
+ // Try to flush the codec.
+ ready!(dst.flush(cx))?;
+
+ // This might release a data frame...
+ if !self.reclaim_frame(buffer, store, dst) {
+ return Poll::Ready(Ok(()));
+ }
+
+ // No need to poll ready as poll_complete() does this for
+ // us...
+ }
+ }
+ }
+ }
+
+ /// Tries to reclaim a pending data frame from the codec.
+ ///
+ /// Returns true if a frame was reclaimed.
+ ///
+ /// When a data frame is written to the codec, it may not be written in its
+ /// entirety (large chunks are split up into potentially many data frames).
+ /// In this case, the stream needs to be reprioritized.
+ fn reclaim_frame<T, B>(
+ &mut self,
+ buffer: &mut Buffer<Frame<B>>,
+ store: &mut Store,
+ dst: &mut Codec<T, Prioritized<B>>,
+ ) -> bool
+ where
+ B: Buf,
+ {
+ let span = tracing::trace_span!("try_reclaim_frame");
+ let _e = span.enter();
+
+ // First check if there are any data chunks to take back
+ if let Some(frame) = dst.take_last_data_frame() {
+ self.reclaim_frame_inner(buffer, store, frame)
+ } else {
+ false
+ }
+ }
+
+ fn reclaim_frame_inner<B>(
+ &mut self,
+ buffer: &mut Buffer<Frame<B>>,
+ store: &mut Store,
+ frame: frame::Data<Prioritized<B>>,
+ ) -> bool
+ where
+ B: Buf,
+ {
+ tracing::trace!(
+ ?frame,
+ sz = frame.payload().inner.get_ref().remaining(),
+ "reclaimed"
+ );
+
+ let mut eos = false;
+ let key = frame.payload().stream;
+
+ match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
+ InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
+ InFlightData::Drop => {
+ tracing::trace!("not reclaiming frame for cancelled stream");
+ return false;
+ }
+ InFlightData::DataFrame(k) => {
+ debug_assert_eq!(k, key);
+ }
+ }
+
+ let mut frame = frame.map(|prioritized| {
+ // TODO: Ensure fully written
+ eos = prioritized.end_of_stream;
+ prioritized.inner.into_inner()
+ });
+
+ if frame.payload().has_remaining() {
+ let mut stream = store.resolve(key);
+
+ if eos {
+ frame.set_end_stream(true);
+ }
+
+ self.push_back_frame(frame.into(), buffer, &mut stream);
+
+ return true;
+ }
+
+ false
+ }
+
+ /// Push the frame to the front of the stream's deque, scheduling the
+ /// stream if needed.
+ fn push_back_frame<B>(
+ &mut self,
+ frame: Frame<B>,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ ) {
+ // Push the frame to the front of the stream's deque
+ stream.pending_send.push_front(buffer, frame);
+
+ // If needed, schedule the sender
+ if stream.send_flow.available() > 0 {
+ debug_assert!(!stream.pending_send.is_empty());
+ self.pending_send.push(stream);
+ }
+ }
+
+ pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
+ let span = tracing::trace_span!("clear_queue", ?stream.id);
+ let _e = span.enter();
+
+ // TODO: make this more efficient?
+ while let Some(frame) = stream.pending_send.pop_front(buffer) {
+ tracing::trace!(?frame, "dropping");
+ }
+
+ stream.buffered_send_data = 0;
+ stream.requested_send_capacity = 0;
+ if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
+ if stream.key() == key {
+ // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
+ self.in_flight_data_frame = InFlightData::Drop;
+ }
+ }
+ }
+
+ pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
+ while let Some(stream) = self.pending_send.pop(store) {
+ let is_pending_reset = stream.is_pending_reset_expiration();
+ counts.transition_after(stream, is_pending_reset);
+ }
+ }
+
+ pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
+ while let Some(stream) = self.pending_open.pop(store) {
+ let is_pending_reset = stream.is_pending_reset_expiration();
+ counts.transition_after(stream, is_pending_reset);
+ }
+ }
+
+ fn pop_frame<B>(
+ &mut self,
+ buffer: &mut Buffer<Frame<B>>,
+ store: &mut Store,
+ max_len: usize,
+ counts: &mut Counts,
+ ) -> Option<Frame<Prioritized<B>>>
+ where
+ B: Buf,
+ {
+ let span = tracing::trace_span!("pop_frame");
+ let _e = span.enter();
+
+ loop {
+ match self.pending_send.pop(store) {
+ Some(mut stream) => {
+ let span = tracing::trace_span!("popped", ?stream.id, ?stream.state);
+ let _e = span.enter();
+
+ // It's possible that this stream, besides having data to send,
+ // is also queued to send a reset, and thus is already in the queue
+ // to wait for "some time" after a reset.
+ //
+ // To be safe, we just always ask the stream.
+ let is_pending_reset = stream.is_pending_reset_expiration();
+
+ tracing::trace!(is_pending_reset);
+
+ let frame = match stream.pending_send.pop_front(buffer) {
+ Some(Frame::Data(mut frame)) => {
+ // Get the amount of capacity remaining for stream's
+ // window.
+ let stream_capacity = stream.send_flow.available();
+ let sz = frame.payload().remaining();
+
+ tracing::trace!(
+ sz,
+ eos = frame.is_end_stream(),
+ window = %stream_capacity,
+ available = %stream.send_flow.available(),
+ requested = stream.requested_send_capacity,
+ buffered = stream.buffered_send_data,
+ "data frame"
+ );
+
+ // Zero length data frames always have capacity to
+ // be sent.
+ if sz > 0 && stream_capacity == 0 {
+ tracing::trace!("stream capacity is 0");
+
+ // Ensure that the stream is waiting for
+ // connection level capacity
+ //
+ // TODO: uncomment
+ // debug_assert!(stream.is_pending_send_capacity);
+
+ // The stream has no more capacity, this can
+ // happen if the remote reduced the stream
+ // window. In this case, we need to buffer the
+ // frame and wait for a window update...
+ stream.pending_send.push_front(buffer, frame.into());
+
+ continue;
+ }
+
+ // Only send up to the max frame length
+ let len = cmp::min(sz, max_len);
+
+ // Only send up to the stream's window capacity
+ let len =
+ cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
+
+ // There *must* be be enough connection level
+ // capacity at this point.
+ debug_assert!(len <= self.flow.window_size());
+
+ // Check if the stream level window the peer knows is available. In some
+ // scenarios, maybe the window we know is available but the window which
+ // peer knows is not.
+ if len > 0 && len > stream.send_flow.window_size() {
+ stream.pending_send.push_front(buffer, frame.into());
+ continue;
+ }
+
+ tracing::trace!(len, "sending data frame");
+
+ // Update the flow control
+ tracing::trace_span!("updating stream flow").in_scope(|| {
+ stream.send_data(len, self.max_buffer_size);
+
+ // Assign the capacity back to the connection that
+ // was just consumed from the stream in the previous
+ // line.
+ // TODO: proper error handling
+ let _res = self.flow.assign_capacity(len);
+ debug_assert!(_res.is_ok());
+ });
+
+ let (eos, len) = tracing::trace_span!("updating connection flow")
+ .in_scope(|| {
+ // TODO: proper error handling
+ let _res = self.flow.send_data(len);
+ debug_assert!(_res.is_ok());
+
+ // Wrap the frame's data payload to ensure that the
+ // correct amount of data gets written.
+
+ let eos = frame.is_end_stream();
+ let len = len as usize;
+
+ if frame.payload().remaining() > len {
+ frame.set_end_stream(false);
+ }
+ (eos, len)
+ });
+
+ Frame::Data(frame.map(|buf| Prioritized {
+ inner: buf.take(len),
+ end_of_stream: eos,
+ stream: stream.key(),
+ }))
+ }
+ Some(Frame::PushPromise(pp)) => {
+ let mut pushed =
+ stream.store_mut().find_mut(&pp.promised_id()).unwrap();
+ pushed.is_pending_push = false;
+ // Transition stream from pending_push to pending_open
+ // if possible
+ if !pushed.pending_send.is_empty() {
+ if counts.can_inc_num_send_streams() {
+ counts.inc_num_send_streams(&mut pushed);
+ self.pending_send.push(&mut pushed);
+ } else {
+ self.queue_open(&mut pushed);
+ }
+ }
+ Frame::PushPromise(pp)
+ }
+ Some(frame) => frame.map(|_| {
+ unreachable!(
+ "Frame::map closure will only be called \
+ on DATA frames."
+ )
+ }),
+ None => {
+ if let Some(reason) = stream.state.get_scheduled_reset() {
+ let stream_id = stream.id;
+ stream
+ .state
+ .set_reset(stream_id, reason, Initiator::Library);
+
+ let frame = frame::Reset::new(stream.id, reason);
+ Frame::Reset(frame)
+ } else {
+ // If the stream receives a RESET from the peer, it may have
+ // had data buffered to be sent, but all the frames are cleared
+ // in clear_queue(). Instead of doing O(N) traversal through queue
+ // to remove, lets just ignore the stream here.
+ tracing::trace!("removing dangling stream from pending_send");
+ // Since this should only happen as a consequence of `clear_queue`,
+ // we must be in a closed state of some kind.
+ debug_assert!(stream.state.is_closed());
+ counts.transition_after(stream, is_pending_reset);
+ continue;
+ }
+ }
+ };
+
+ tracing::trace!("pop_frame; frame={:?}", frame);
+
+ if cfg!(debug_assertions) && stream.state.is_idle() {
+ debug_assert!(stream.id > self.last_opened_id);
+ self.last_opened_id = stream.id;
+ }
+
+ if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
+ // TODO: Only requeue the sender IF it is ready to send
+ // the next frame. i.e. don't requeue it if the next
+ // frame is a data frame and the stream does not have
+ // any more capacity.
+ self.pending_send.push(&mut stream);
+ }
+
+ counts.transition_after(stream, is_pending_reset);
+
+ return Some(frame);
+ }
+ None => return None,
+ }
+ }
+ }
+
+ fn pop_pending_open<'s>(
+ &mut self,
+ store: &'s mut Store,
+ counts: &mut Counts,
+ ) -> Option<store::Ptr<'s>> {
+ tracing::trace!("schedule_pending_open");
+ // check for any pending open streams
+ if counts.can_inc_num_send_streams() {
+ if let Some(mut stream) = self.pending_open.pop(store) {
+ tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
+
+ counts.inc_num_send_streams(&mut stream);
+ stream.notify_send();
+ return Some(stream);
+ }
+ }
+
+ None
+ }
+}
+
+// ===== impl Prioritized =====
+
+impl<B> Buf for Prioritized<B>
+where
+ B: Buf,
+{
+ fn remaining(&self) -> usize {
+ self.inner.remaining()
+ }
+
+ fn chunk(&self) -> &[u8] {
+ self.inner.chunk()
+ }
+
+ fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
+ self.inner.chunks_vectored(dst)
+ }
+
+ fn advance(&mut self, cnt: usize) {
+ self.inner.advance(cnt)
+ }
+}
+
+impl<B: Buf> fmt::Debug for Prioritized<B> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("Prioritized")
+ .field("remaining", &self.inner.get_ref().remaining())
+ .field("end_of_stream", &self.end_of_stream)
+ .field("stream", &self.stream)
+ .finish()
+ }
+}
diff --git a/third_party/rust/h2/src/proto/streams/recv.rs b/third_party/rust/h2/src/proto/streams/recv.rs
new file mode 100644
index 0000000000..0063942a45
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/recv.rs
@@ -0,0 +1,1166 @@
+use super::*;
+use crate::codec::UserError;
+use crate::frame::{self, PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
+use crate::proto::{self, Error};
+
+use http::{HeaderMap, Request, Response};
+
+use std::cmp::Ordering;
+use std::io;
+use std::task::{Context, Poll, Waker};
+use std::time::{Duration, Instant};
+
+#[derive(Debug)]
+pub(super) struct Recv {
+ /// Initial window size of remote initiated streams
+ init_window_sz: WindowSize,
+
+ /// Connection level flow control governing received data
+ flow: FlowControl,
+
+ /// Amount of connection window capacity currently used by outstanding streams.
+ in_flight_data: WindowSize,
+
+ /// The lowest stream ID that is still idle
+ next_stream_id: Result<StreamId, StreamIdOverflow>,
+
+ /// The stream ID of the last processed stream
+ last_processed_id: StreamId,
+
+ /// Any streams with a higher ID are ignored.
+ ///
+ /// This starts as MAX, but is lowered when a GOAWAY is received.
+ ///
+ /// > After sending a GOAWAY frame, the sender can discard frames for
+ /// > streams initiated by the receiver with identifiers higher than
+ /// > the identified last stream.
+ max_stream_id: StreamId,
+
+ /// Streams that have pending window updates
+ pending_window_updates: store::Queue<stream::NextWindowUpdate>,
+
+ /// New streams to be accepted
+ pending_accept: store::Queue<stream::NextAccept>,
+
+ /// Locally reset streams that should be reaped when they expire
+ pending_reset_expired: store::Queue<stream::NextResetExpire>,
+
+ /// How long locally reset streams should ignore received frames
+ reset_duration: Duration,
+
+ /// Holds frames that are waiting to be read
+ buffer: Buffer<Event>,
+
+ /// Refused StreamId, this represents a frame that must be sent out.
+ refused: Option<StreamId>,
+
+ /// If push promises are allowed to be received.
+ is_push_enabled: bool,
+
+ /// If extended connect protocol is enabled.
+ is_extended_connect_protocol_enabled: bool,
+}
+
+#[derive(Debug)]
+pub(super) enum Event {
+ Headers(peer::PollMessage),
+ Data(Bytes),
+ Trailers(HeaderMap),
+}
+
+#[derive(Debug)]
+pub(super) enum RecvHeaderBlockError<T> {
+ Oversize(T),
+ State(Error),
+}
+
+#[derive(Debug)]
+pub(crate) enum Open {
+ PushPromise,
+ Headers,
+}
+
+impl Recv {
+ pub fn new(peer: peer::Dyn, config: &Config) -> Self {
+ let next_stream_id = if peer.is_server() { 1 } else { 2 };
+
+ let mut flow = FlowControl::new();
+
+ // connections always have the default window size, regardless of
+ // settings
+ flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
+ .expect("invalid initial remote window size");
+ flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
+
+ Recv {
+ init_window_sz: config.local_init_window_sz,
+ flow,
+ in_flight_data: 0 as WindowSize,
+ next_stream_id: Ok(next_stream_id.into()),
+ pending_window_updates: store::Queue::new(),
+ last_processed_id: StreamId::ZERO,
+ max_stream_id: StreamId::MAX,
+ pending_accept: store::Queue::new(),
+ pending_reset_expired: store::Queue::new(),
+ reset_duration: config.local_reset_duration,
+ buffer: Buffer::new(),
+ refused: None,
+ is_push_enabled: config.local_push_enabled,
+ is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
+ }
+ }
+
+ /// Returns the initial receive window size
+ pub fn init_window_sz(&self) -> WindowSize {
+ self.init_window_sz
+ }
+
+ /// Returns the ID of the last processed stream
+ pub fn last_processed_id(&self) -> StreamId {
+ self.last_processed_id
+ }
+
+ /// Update state reflecting a new, remotely opened stream
+ ///
+ /// Returns the stream state if successful. `None` if refused
+ pub fn open(
+ &mut self,
+ id: StreamId,
+ mode: Open,
+ counts: &mut Counts,
+ ) -> Result<Option<StreamId>, Error> {
+ assert!(self.refused.is_none());
+
+ counts.peer().ensure_can_open(id, mode)?;
+
+ let next_id = self.next_stream_id()?;
+ if id < next_id {
+ proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+
+ self.next_stream_id = id.next_id();
+
+ if !counts.can_inc_num_recv_streams() {
+ self.refused = Some(id);
+ return Ok(None);
+ }
+
+ Ok(Some(id))
+ }
+
+ /// Transition the stream state based on receiving headers
+ ///
+ /// The caller ensures that the frame represents headers and not trailers.
+ pub fn recv_headers(
+ &mut self,
+ frame: frame::Headers,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
+ tracing::trace!("opening stream; init_window={}", self.init_window_sz);
+ let is_initial = stream.state.recv_open(&frame)?;
+
+ if is_initial {
+ // TODO: be smarter about this logic
+ if frame.stream_id() > self.last_processed_id {
+ self.last_processed_id = frame.stream_id();
+ }
+
+ // Increment the number of concurrent streams
+ counts.inc_num_recv_streams(stream);
+ }
+
+ if !stream.content_length.is_head() {
+ use super::stream::ContentLength;
+ use http::header;
+
+ if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
+ let content_length = match frame::parse_u64(content_length.as_bytes()) {
+ Ok(v) => v,
+ Err(_) => {
+ proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
+ return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
+ }
+ };
+
+ stream.content_length = ContentLength::Remaining(content_length);
+ }
+ }
+
+ if frame.is_over_size() {
+ // A frame is over size if the decoded header block was bigger than
+ // SETTINGS_MAX_HEADER_LIST_SIZE.
+ //
+ // > A server that receives a larger header block than it is willing
+ // > to handle can send an HTTP 431 (Request Header Fields Too
+ // > Large) status code [RFC6585]. A client can discard responses
+ // > that it cannot process.
+ //
+ // So, if peer is a server, we'll send a 431. In either case,
+ // an error is recorded, which will send a REFUSED_STREAM,
+ // since we don't want any of the data frames either.
+ tracing::debug!(
+ "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
+ recv_headers: frame is over size; stream={:?}",
+ stream.id
+ );
+ return if counts.peer().is_server() && is_initial {
+ let mut res = frame::Headers::new(
+ stream.id,
+ frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
+ HeaderMap::new(),
+ );
+ res.set_end_stream();
+ Err(RecvHeaderBlockError::Oversize(Some(res)))
+ } else {
+ Err(RecvHeaderBlockError::Oversize(None))
+ };
+ }
+
+ let stream_id = frame.stream_id();
+ let (pseudo, fields) = frame.into_parts();
+
+ if pseudo.protocol.is_some()
+ && counts.peer().is_server()
+ && !self.is_extended_connect_protocol_enabled
+ {
+ proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
+ return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
+ }
+
+ if pseudo.status.is_some() && counts.peer().is_server() {
+ proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
+ return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
+ }
+
+ if !pseudo.is_informational() {
+ let message = counts
+ .peer()
+ .convert_poll_message(pseudo, fields, stream_id)?;
+
+ // Push the frame onto the stream's recv buffer
+ stream
+ .pending_recv
+ .push_back(&mut self.buffer, Event::Headers(message));
+ stream.notify_recv();
+
+ // Only servers can receive a headers frame that initiates the stream.
+ // This is verified in `Streams` before calling this function.
+ if counts.peer().is_server() {
+ // Correctness: never push a stream to `pending_accept` without having the
+ // corresponding headers frame pushed to `stream.pending_recv`.
+ self.pending_accept.push(stream);
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Called by the server to get the request
+ ///
+ /// # Panics
+ ///
+ /// Panics if `stream.pending_recv` has no `Event::Headers` queued.
+ ///
+ pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
+ use super::peer::PollMessage::*;
+
+ match stream.pending_recv.pop_front(&mut self.buffer) {
+ Some(Event::Headers(Server(request))) => request,
+ _ => unreachable!("server stream queue must start with Headers"),
+ }
+ }
+
+ /// Called by the client to get pushed response
+ pub fn poll_pushed(
+ &mut self,
+ cx: &Context,
+ stream: &mut store::Ptr,
+ ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
+ use super::peer::PollMessage::*;
+
+ let mut ppp = stream.pending_push_promises.take();
+ let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
+ match pushed.pending_recv.pop_front(&mut self.buffer) {
+ Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
+ // When frames are pushed into the queue, it is verified that
+ // the first frame is a HEADERS frame.
+ _ => panic!("Headers not set on pushed stream"),
+ }
+ });
+ stream.pending_push_promises = ppp;
+ if let Some(p) = pushed {
+ Poll::Ready(Some(Ok(p)))
+ } else {
+ let is_open = stream.state.ensure_recv_open()?;
+
+ if is_open {
+ stream.recv_task = Some(cx.waker().clone());
+ Poll::Pending
+ } else {
+ Poll::Ready(None)
+ }
+ }
+ }
+
+ /// Called by the client to get the response
+ pub fn poll_response(
+ &mut self,
+ cx: &Context,
+ stream: &mut store::Ptr,
+ ) -> Poll<Result<Response<()>, proto::Error>> {
+ use super::peer::PollMessage::*;
+
+ // If the buffer is not empty, then the first frame must be a HEADERS
+ // frame or the user violated the contract.
+ match stream.pending_recv.pop_front(&mut self.buffer) {
+ Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
+ Some(_) => panic!("poll_response called after response returned"),
+ None => {
+ if !stream.state.ensure_recv_open()? {
+ proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id);
+ return Poll::Ready(Err(Error::library_reset(
+ stream.id,
+ Reason::PROTOCOL_ERROR,
+ )));
+ }
+
+ stream.recv_task = Some(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+ }
+
+ /// Transition the stream based on receiving trailers
+ pub fn recv_trailers(
+ &mut self,
+ frame: frame::Headers,
+ stream: &mut store::Ptr,
+ ) -> Result<(), Error> {
+ // Transition the state
+ stream.state.recv_close()?;
+
+ if stream.ensure_content_length_zero().is_err() {
+ proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id);
+ return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
+ }
+
+ let trailers = frame.into_fields();
+
+ // Push the frame onto the stream's recv buffer
+ stream
+ .pending_recv
+ .push_back(&mut self.buffer, Event::Trailers(trailers));
+ stream.notify_recv();
+
+ Ok(())
+ }
+
+ /// Releases capacity of the connection
+ pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
+ tracing::trace!(
+ "release_connection_capacity; size={}, connection in_flight_data={}",
+ capacity,
+ self.in_flight_data,
+ );
+
+ // Decrement in-flight data
+ self.in_flight_data -= capacity;
+
+ // Assign capacity to connection
+ // TODO: proper error handling
+ let _res = self.flow.assign_capacity(capacity);
+ debug_assert!(_res.is_ok());
+
+ if self.flow.unclaimed_capacity().is_some() {
+ if let Some(task) = task.take() {
+ task.wake();
+ }
+ }
+ }
+
+ /// Releases capacity back to the connection & stream
+ pub fn release_capacity(
+ &mut self,
+ capacity: WindowSize,
+ stream: &mut store::Ptr,
+ task: &mut Option<Waker>,
+ ) -> Result<(), UserError> {
+ tracing::trace!("release_capacity; size={}", capacity);
+
+ if capacity > stream.in_flight_recv_data {
+ return Err(UserError::ReleaseCapacityTooBig);
+ }
+
+ self.release_connection_capacity(capacity, task);
+
+ // Decrement in-flight data
+ stream.in_flight_recv_data -= capacity;
+
+ // Assign capacity to stream
+ // TODO: proper error handling
+ let _res = stream.recv_flow.assign_capacity(capacity);
+ debug_assert!(_res.is_ok());
+
+ if stream.recv_flow.unclaimed_capacity().is_some() {
+ // Queue the stream for sending the WINDOW_UPDATE frame.
+ self.pending_window_updates.push(stream);
+
+ if let Some(task) = task.take() {
+ task.wake();
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Release any unclaimed capacity for a closed stream.
+ pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
+ debug_assert_eq!(stream.ref_count, 0);
+
+ if stream.in_flight_recv_data == 0 {
+ return;
+ }
+
+ tracing::trace!(
+ "auto-release closed stream ({:?}) capacity: {:?}",
+ stream.id,
+ stream.in_flight_recv_data,
+ );
+
+ self.release_connection_capacity(stream.in_flight_recv_data, task);
+ stream.in_flight_recv_data = 0;
+
+ self.clear_recv_buffer(stream);
+ }
+
+ /// Set the "target" connection window size.
+ ///
+ /// By default, all new connections start with 64kb of window size. As
+ /// streams used and release capacity, we will send WINDOW_UPDATEs for the
+ /// connection to bring it back up to the initial "target".
+ ///
+ /// Setting a target means that we will try to tell the peer about
+ /// WINDOW_UPDATEs so the peer knows it has about `target` window to use
+ /// for the whole connection.
+ ///
+ /// The `task` is an optional parked task for the `Connection` that might
+ /// be blocked on needing more window capacity.
+ pub fn set_target_connection_window(
+ &mut self,
+ target: WindowSize,
+ task: &mut Option<Waker>,
+ ) -> Result<(), Reason> {
+ tracing::trace!(
+ "set_target_connection_window; target={}; available={}, reserved={}",
+ target,
+ self.flow.available(),
+ self.in_flight_data,
+ );
+
+ // The current target connection window is our `available` plus any
+ // in-flight data reserved by streams.
+ //
+ // Update the flow controller with the difference between the new
+ // target and the current target.
+ let current = self
+ .flow
+ .available()
+ .add(self.in_flight_data)?
+ .checked_size();
+ if target > current {
+ self.flow.assign_capacity(target - current)?;
+ } else {
+ self.flow.claim_capacity(current - target)?;
+ }
+
+ // If changing the target capacity means we gained a bunch of capacity,
+ // enough that we went over the update threshold, then schedule sending
+ // a connection WINDOW_UPDATE.
+ if self.flow.unclaimed_capacity().is_some() {
+ if let Some(task) = task.take() {
+ task.wake();
+ }
+ }
+ Ok(())
+ }
+
+ pub(crate) fn apply_local_settings(
+ &mut self,
+ settings: &frame::Settings,
+ store: &mut Store,
+ ) -> Result<(), proto::Error> {
+ if let Some(val) = settings.is_extended_connect_protocol_enabled() {
+ self.is_extended_connect_protocol_enabled = val;
+ }
+
+ if let Some(target) = settings.initial_window_size() {
+ let old_sz = self.init_window_sz;
+ self.init_window_sz = target;
+
+ tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
+
+ // Per RFC 7540 §6.9.2:
+ //
+ // In addition to changing the flow-control window for streams that are
+ // not yet active, a SETTINGS frame can alter the initial flow-control
+ // window size for streams with active flow-control windows (that is,
+ // streams in the "open" or "half-closed (remote)" state). When the
+ // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
+ // the size of all stream flow-control windows that it maintains by the
+ // difference between the new value and the old value.
+ //
+ // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
+ // space in a flow-control window to become negative. A sender MUST
+ // track the negative flow-control window and MUST NOT send new
+ // flow-controlled frames until it receives WINDOW_UPDATE frames that
+ // cause the flow-control window to become positive.
+
+ match target.cmp(&old_sz) {
+ Ordering::Less => {
+ // We must decrease the (local) window on every open stream.
+ let dec = old_sz - target;
+ tracing::trace!("decrementing all windows; dec={}", dec);
+
+ store.try_for_each(|mut stream| {
+ stream
+ .recv_flow
+ .dec_recv_window(dec)
+ .map_err(proto::Error::library_go_away)?;
+ Ok::<_, proto::Error>(())
+ })?;
+ }
+ Ordering::Greater => {
+ // We must increase the (local) window on every open stream.
+ let inc = target - old_sz;
+ tracing::trace!("incrementing all windows; inc={}", inc);
+ store.try_for_each(|mut stream| {
+ // XXX: Shouldn't the peer have already noticed our
+ // overflow and sent us a GOAWAY?
+ stream
+ .recv_flow
+ .inc_window(inc)
+ .map_err(proto::Error::library_go_away)?;
+ stream
+ .recv_flow
+ .assign_capacity(inc)
+ .map_err(proto::Error::library_go_away)?;
+ Ok::<_, proto::Error>(())
+ })?;
+ }
+ Ordering::Equal => (),
+ }
+ }
+
+ Ok(())
+ }
+
+ pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
+ if !stream.state.is_recv_closed() {
+ return false;
+ }
+
+ stream.pending_recv.is_empty()
+ }
+
+ pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
+ let sz = frame.payload().len();
+
+ // This should have been enforced at the codec::FramedRead layer, so
+ // this is just a sanity check.
+ assert!(sz <= MAX_WINDOW_SIZE as usize);
+
+ let sz = sz as WindowSize;
+
+ let is_ignoring_frame = stream.state.is_local_error();
+
+ if !is_ignoring_frame && !stream.state.is_recv_streaming() {
+ // TODO: There are cases where this can be a stream error of
+ // STREAM_CLOSED instead...
+
+ // Receiving a DATA frame when not expecting one is a protocol
+ // error.
+ proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+
+ tracing::trace!(
+ "recv_data; size={}; connection={}; stream={}",
+ sz,
+ self.flow.window_size(),
+ stream.recv_flow.window_size()
+ );
+
+ if is_ignoring_frame {
+ tracing::trace!(
+ "recv_data; frame ignored on locally reset {:?} for some time",
+ stream.id,
+ );
+ return self.ignore_data(sz);
+ }
+
+ // Ensure that there is enough capacity on the connection before acting
+ // on the stream.
+ self.consume_connection_window(sz)?;
+
+ if stream.recv_flow.window_size() < sz {
+ // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE
+ // > A receiver MAY respond with a stream error (Section 5.4.2) or
+ // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if
+ // > it is unable to accept a frame.
+ //
+ // So, for violating the **stream** window, we can send either a
+ // stream or connection error. We've opted to send a stream
+ // error.
+ return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
+ }
+
+ if stream.dec_content_length(frame.payload().len()).is_err() {
+ proto_err!(stream:
+ "recv_data: content-length overflow; stream={:?}; len={:?}",
+ stream.id,
+ frame.payload().len(),
+ );
+ return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
+ }
+
+ if frame.is_end_stream() {
+ if stream.ensure_content_length_zero().is_err() {
+ proto_err!(stream:
+ "recv_data: content-length underflow; stream={:?}; len={:?}",
+ stream.id,
+ frame.payload().len(),
+ );
+ return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
+ }
+
+ if stream.state.recv_close().is_err() {
+ proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+ }
+
+ // Received a frame, but no one cared about it. fix issue#648
+ if !stream.is_recv {
+ tracing::trace!(
+ "recv_data; frame ignored on stream release {:?} for some time",
+ stream.id,
+ );
+ self.release_connection_capacity(sz, &mut None);
+ return Ok(());
+ }
+
+ // Update stream level flow control
+ stream
+ .recv_flow
+ .send_data(sz)
+ .map_err(proto::Error::library_go_away)?;
+
+ // Track the data as in-flight
+ stream.in_flight_recv_data += sz;
+
+ let event = Event::Data(frame.into_payload());
+
+ // Push the frame onto the recv buffer
+ stream.pending_recv.push_back(&mut self.buffer, event);
+ stream.notify_recv();
+
+ Ok(())
+ }
+
+ pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
+ // Ensure that there is enough capacity on the connection...
+ self.consume_connection_window(sz)?;
+
+ // Since we are ignoring this frame,
+ // we aren't returning the frame to the user. That means they
+ // have no way to release the capacity back to the connection. So
+ // we have to release it automatically.
+ //
+ // This call doesn't send a WINDOW_UPDATE immediately, just marks
+ // the capacity as available to be reclaimed. When the available
+ // capacity meets a threshold, a WINDOW_UPDATE is then sent.
+ self.release_connection_capacity(sz, &mut None);
+ Ok(())
+ }
+
+ pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
+ if self.flow.window_size() < sz {
+ tracing::debug!(
+ "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
+ self.flow.window_size(),
+ sz,
+ );
+ return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
+ }
+
+ // Update connection level flow control
+ self.flow.send_data(sz).map_err(Error::library_go_away)?;
+
+ // Track the data as in-flight
+ self.in_flight_data += sz;
+ Ok(())
+ }
+
+ pub fn recv_push_promise(
+ &mut self,
+ frame: frame::PushPromise,
+ stream: &mut store::Ptr,
+ ) -> Result<(), Error> {
+ stream.state.reserve_remote()?;
+ if frame.is_over_size() {
+ // A frame is over size if the decoded header block was bigger than
+ // SETTINGS_MAX_HEADER_LIST_SIZE.
+ //
+ // > A server that receives a larger header block than it is willing
+ // > to handle can send an HTTP 431 (Request Header Fields Too
+ // > Large) status code [RFC6585]. A client can discard responses
+ // > that it cannot process.
+ //
+ // So, if peer is a server, we'll send a 431. In either case,
+ // an error is recorded, which will send a REFUSED_STREAM,
+ // since we don't want any of the data frames either.
+ tracing::debug!(
+ "stream error REFUSED_STREAM -- recv_push_promise: \
+ headers frame is over size; promised_id={:?};",
+ frame.promised_id(),
+ );
+ return Err(Error::library_reset(
+ frame.promised_id(),
+ Reason::REFUSED_STREAM,
+ ));
+ }
+
+ let promised_id = frame.promised_id();
+ let (pseudo, fields) = frame.into_parts();
+ let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
+
+ if let Err(e) = frame::PushPromise::validate_request(&req) {
+ use PushPromiseHeaderError::*;
+ match e {
+ NotSafeAndCacheable => proto_err!(
+ stream:
+ "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
+ req.method(),
+ promised_id,
+ ),
+ InvalidContentLength(e) => proto_err!(
+ stream:
+ "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
+ e,
+ promised_id,
+ ),
+ }
+ return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
+ }
+
+ use super::peer::PollMessage::*;
+ stream
+ .pending_recv
+ .push_back(&mut self.buffer, Event::Headers(Server(req)));
+ stream.notify_recv();
+ Ok(())
+ }
+
+ /// Ensures that `id` is not in the `Idle` state.
+ pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
+ if let Ok(next) = self.next_stream_id {
+ if id >= next {
+ tracing::debug!(
+ "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
+ id
+ );
+ return Err(Reason::PROTOCOL_ERROR);
+ }
+ }
+ // if next_stream_id is overflowed, that's ok.
+
+ Ok(())
+ }
+
+ /// Handle remote sending an explicit RST_STREAM.
+ pub fn recv_reset(
+ &mut self,
+ frame: frame::Reset,
+ stream: &mut Stream,
+ counts: &mut Counts,
+ ) -> Result<(), Error> {
+ // Reseting a stream that the user hasn't accepted is possible,
+ // but should be done with care. These streams will continue
+ // to take up memory in the accept queue, but will no longer be
+ // counted as "concurrent" streams.
+ //
+ // So, we have a separate limit for these.
+ //
+ // See https://github.com/hyperium/hyper/issues/2877
+ if stream.is_pending_accept {
+ if counts.can_inc_num_remote_reset_streams() {
+ counts.inc_num_remote_reset_streams();
+ } else {
+ tracing::warn!(
+ "recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
+ counts.max_remote_reset_streams(),
+ );
+ return Err(Error::library_go_away_data(
+ Reason::ENHANCE_YOUR_CALM,
+ "too_many_resets",
+ ));
+ }
+ }
+
+ // Notify the stream
+ stream.state.recv_reset(frame, stream.is_pending_send);
+
+ stream.notify_send();
+ stream.notify_recv();
+
+ Ok(())
+ }
+
+ /// Handle a connection-level error
+ pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
+ // Receive an error
+ stream.state.handle_error(err);
+
+ // If a receiver is waiting, notify it
+ stream.notify_send();
+ stream.notify_recv();
+ }
+
+ pub fn go_away(&mut self, last_processed_id: StreamId) {
+ assert!(self.max_stream_id >= last_processed_id);
+ self.max_stream_id = last_processed_id;
+ }
+
+ pub fn recv_eof(&mut self, stream: &mut Stream) {
+ stream.state.recv_eof();
+ stream.notify_send();
+ stream.notify_recv();
+ }
+
+ pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
+ while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
+ // drop it
+ }
+ }
+
+ /// Get the max ID of streams we can receive.
+ ///
+ /// This gets lowered if we send a GOAWAY frame.
+ pub fn max_stream_id(&self) -> StreamId {
+ self.max_stream_id
+ }
+
+ pub fn next_stream_id(&self) -> Result<StreamId, Error> {
+ if let Ok(id) = self.next_stream_id {
+ Ok(id)
+ } else {
+ Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
+ }
+ }
+
+ pub fn may_have_created_stream(&self, id: StreamId) -> bool {
+ if let Ok(next_id) = self.next_stream_id {
+ // Peer::is_local_init should have been called beforehand
+ debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
+ id < next_id
+ } else {
+ true
+ }
+ }
+
+ pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
+ if let Ok(next_id) = self.next_stream_id {
+ // !Peer::is_local_init should have been called beforehand
+ debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
+ if id >= next_id {
+ self.next_stream_id = id.next_id();
+ }
+ }
+ }
+
+ /// Returns true if the remote peer can reserve a stream with the given ID.
+ pub fn ensure_can_reserve(&self) -> Result<(), Error> {
+ if !self.is_push_enabled {
+ proto_err!(conn: "recv_push_promise: push is disabled");
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+
+ Ok(())
+ }
+
+ /// Add a locally reset stream to queue to be eventually reaped.
+ pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
+ if !stream.state.is_local_error() || stream.is_pending_reset_expiration() {
+ return;
+ }
+
+ tracing::trace!("enqueue_reset_expiration; {:?}", stream.id);
+
+ if counts.can_inc_num_reset_streams() {
+ counts.inc_num_reset_streams();
+ self.pending_reset_expired.push(stream);
+ }
+ }
+
+ /// Send any pending refusals.
+ pub fn send_pending_refusal<T, B>(
+ &mut self,
+ cx: &mut Context,
+ dst: &mut Codec<T, Prioritized<B>>,
+ ) -> Poll<io::Result<()>>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ {
+ if let Some(stream_id) = self.refused {
+ ready!(dst.poll_ready(cx))?;
+
+ // Create the RST_STREAM frame
+ let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
+
+ // Buffer the frame
+ dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
+ }
+
+ self.refused = None;
+
+ Poll::Ready(Ok(()))
+ }
+
+ pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
+ if !self.pending_reset_expired.is_empty() {
+ let now = Instant::now();
+ let reset_duration = self.reset_duration;
+ while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
+ let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
+ // rust-lang/rust#86470 tracks a bug in the standard library where `Instant`
+ // subtraction can panic (because, on some platforms, `Instant` isn't actually
+ // monotonic). We use a saturating operation to avoid this panic here.
+ now.saturating_duration_since(reset_at) > reset_duration
+ }) {
+ counts.transition_after(stream, true);
+ }
+ }
+ }
+
+ pub fn clear_queues(
+ &mut self,
+ clear_pending_accept: bool,
+ store: &mut Store,
+ counts: &mut Counts,
+ ) {
+ self.clear_stream_window_update_queue(store, counts);
+ self.clear_all_reset_streams(store, counts);
+
+ if clear_pending_accept {
+ self.clear_all_pending_accept(store, counts);
+ }
+ }
+
+ fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
+ while let Some(stream) = self.pending_window_updates.pop(store) {
+ counts.transition(stream, |_, stream| {
+ tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
+ })
+ }
+ }
+
+ /// Called on EOF
+ fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
+ while let Some(stream) = self.pending_reset_expired.pop(store) {
+ counts.transition_after(stream, true);
+ }
+ }
+
+ fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
+ while let Some(stream) = self.pending_accept.pop(store) {
+ counts.transition_after(stream, false);
+ }
+ }
+
+ pub fn poll_complete<T, B>(
+ &mut self,
+ cx: &mut Context,
+ store: &mut Store,
+ counts: &mut Counts,
+ dst: &mut Codec<T, Prioritized<B>>,
+ ) -> Poll<io::Result<()>>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ {
+ // Send any pending connection level window updates
+ ready!(self.send_connection_window_update(cx, dst))?;
+
+ // Send any pending stream level window updates
+ ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
+
+ Poll::Ready(Ok(()))
+ }
+
+ /// Send connection level window update
+ fn send_connection_window_update<T, B>(
+ &mut self,
+ cx: &mut Context,
+ dst: &mut Codec<T, Prioritized<B>>,
+ ) -> Poll<io::Result<()>>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ {
+ if let Some(incr) = self.flow.unclaimed_capacity() {
+ let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
+
+ // Ensure the codec has capacity
+ ready!(dst.poll_ready(cx))?;
+
+ // Buffer the WINDOW_UPDATE frame
+ dst.buffer(frame.into())
+ .expect("invalid WINDOW_UPDATE frame");
+
+ // Update flow control
+ self.flow
+ .inc_window(incr)
+ .expect("unexpected flow control state");
+ }
+
+ Poll::Ready(Ok(()))
+ }
+
+ /// Send stream level window update
+ pub fn send_stream_window_updates<T, B>(
+ &mut self,
+ cx: &mut Context,
+ store: &mut Store,
+ counts: &mut Counts,
+ dst: &mut Codec<T, Prioritized<B>>,
+ ) -> Poll<io::Result<()>>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ {
+ loop {
+ // Ensure the codec has capacity
+ ready!(dst.poll_ready(cx))?;
+
+ // Get the next stream
+ let stream = match self.pending_window_updates.pop(store) {
+ Some(stream) => stream,
+ None => return Poll::Ready(Ok(())),
+ };
+
+ counts.transition(stream, |_, stream| {
+ tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
+ debug_assert!(!stream.is_pending_window_update);
+
+ if !stream.state.is_recv_streaming() {
+ // No need to send window updates on the stream if the stream is
+ // no longer receiving data.
+ //
+ // TODO: is this correct? We could possibly send a window
+ // update on a ReservedRemote stream if we already know
+ // we want to stream the data faster...
+ return;
+ }
+
+ // TODO: de-dup
+ if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
+ // Create the WINDOW_UPDATE frame
+ let frame = frame::WindowUpdate::new(stream.id, incr);
+
+ // Buffer it
+ dst.buffer(frame.into())
+ .expect("invalid WINDOW_UPDATE frame");
+
+ // Update flow control
+ stream
+ .recv_flow
+ .inc_window(incr)
+ .expect("unexpected flow control state");
+ }
+ })
+ }
+ }
+
+ pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
+ self.pending_accept.pop(store).map(|ptr| ptr.key())
+ }
+
+ pub fn poll_data(
+ &mut self,
+ cx: &Context,
+ stream: &mut Stream,
+ ) -> Poll<Option<Result<Bytes, proto::Error>>> {
+ match stream.pending_recv.pop_front(&mut self.buffer) {
+ Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
+ Some(event) => {
+ // Frame is trailer
+ stream.pending_recv.push_front(&mut self.buffer, event);
+
+ // Notify the recv task. This is done just in case
+ // `poll_trailers` was called.
+ //
+ // It is very likely that `notify_recv` will just be a no-op (as
+ // the task will be None), so this isn't really much of a
+ // performance concern. It also means we don't have to track
+ // state to see if `poll_trailers` was called before `poll_data`
+ // returned `None`.
+ stream.notify_recv();
+
+ // No more data frames
+ Poll::Ready(None)
+ }
+ None => self.schedule_recv(cx, stream),
+ }
+ }
+
+ pub fn poll_trailers(
+ &mut self,
+ cx: &Context,
+ stream: &mut Stream,
+ ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
+ match stream.pending_recv.pop_front(&mut self.buffer) {
+ Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
+ Some(event) => {
+ // Frame is not trailers.. not ready to poll trailers yet.
+ stream.pending_recv.push_front(&mut self.buffer, event);
+
+ Poll::Pending
+ }
+ None => self.schedule_recv(cx, stream),
+ }
+ }
+
+ fn schedule_recv<T>(
+ &mut self,
+ cx: &Context,
+ stream: &mut Stream,
+ ) -> Poll<Option<Result<T, proto::Error>>> {
+ if stream.state.ensure_recv_open()? {
+ // Request to get notified once more frames arrive
+ stream.recv_task = Some(cx.waker().clone());
+ Poll::Pending
+ } else {
+ // No more frames will be received
+ Poll::Ready(None)
+ }
+ }
+}
+
+// ===== impl Open =====
+
+impl Open {
+ pub fn is_push_promise(&self) -> bool {
+ matches!(*self, Self::PushPromise)
+ }
+}
+
+// ===== impl RecvHeaderBlockError =====
+
+impl<T> From<Error> for RecvHeaderBlockError<T> {
+ fn from(err: Error) -> Self {
+ RecvHeaderBlockError::State(err)
+ }
+}
diff --git a/third_party/rust/h2/src/proto/streams/send.rs b/third_party/rust/h2/src/proto/streams/send.rs
new file mode 100644
index 0000000000..626e61a330
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/send.rs
@@ -0,0 +1,585 @@
+use super::{
+ store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId,
+ StreamIdOverflow, WindowSize,
+};
+use crate::codec::UserError;
+use crate::frame::{self, Reason};
+use crate::proto::{self, Error, Initiator};
+
+use bytes::Buf;
+use tokio::io::AsyncWrite;
+
+use std::cmp::Ordering;
+use std::io;
+use std::task::{Context, Poll, Waker};
+
+/// Manages state transitions related to outbound frames.
+#[derive(Debug)]
+pub(super) struct Send {
+ /// Stream identifier to use for next initialized stream.
+ next_stream_id: Result<StreamId, StreamIdOverflow>,
+
+ /// Any streams with a higher ID are ignored.
+ ///
+ /// This starts as MAX, but is lowered when a GOAWAY is received.
+ ///
+ /// > After sending a GOAWAY frame, the sender can discard frames for
+ /// > streams initiated by the receiver with identifiers higher than
+ /// > the identified last stream.
+ max_stream_id: StreamId,
+
+ /// Initial window size of locally initiated streams
+ init_window_sz: WindowSize,
+
+ /// Prioritization layer
+ prioritize: Prioritize,
+
+ is_push_enabled: bool,
+
+ /// If extended connect protocol is enabled.
+ is_extended_connect_protocol_enabled: bool,
+}
+
+/// A value to detect which public API has called `poll_reset`.
+#[derive(Debug)]
+pub(crate) enum PollReset {
+ AwaitingHeaders,
+ Streaming,
+}
+
+impl Send {
+ /// Create a new `Send`
+ pub fn new(config: &Config) -> Self {
+ Send {
+ init_window_sz: config.remote_init_window_sz,
+ max_stream_id: StreamId::MAX,
+ next_stream_id: Ok(config.local_next_stream_id),
+ prioritize: Prioritize::new(config),
+ is_push_enabled: true,
+ is_extended_connect_protocol_enabled: false,
+ }
+ }
+
+ /// Returns the initial send window size
+ pub fn init_window_sz(&self) -> WindowSize {
+ self.init_window_sz
+ }
+
+ pub fn open(&mut self) -> Result<StreamId, UserError> {
+ let stream_id = self.ensure_next_stream_id()?;
+ self.next_stream_id = stream_id.next_id();
+ Ok(stream_id)
+ }
+
+ pub fn reserve_local(&mut self) -> Result<StreamId, UserError> {
+ let stream_id = self.ensure_next_stream_id()?;
+ self.next_stream_id = stream_id.next_id();
+ Ok(stream_id)
+ }
+
+ fn check_headers(fields: &http::HeaderMap) -> Result<(), UserError> {
+ // 8.1.2.2. Connection-Specific Header Fields
+ if fields.contains_key(http::header::CONNECTION)
+ || fields.contains_key(http::header::TRANSFER_ENCODING)
+ || fields.contains_key(http::header::UPGRADE)
+ || fields.contains_key("keep-alive")
+ || fields.contains_key("proxy-connection")
+ {
+ tracing::debug!("illegal connection-specific headers found");
+ return Err(UserError::MalformedHeaders);
+ } else if let Some(te) = fields.get(http::header::TE) {
+ if te != "trailers" {
+ tracing::debug!("illegal connection-specific headers found");
+ return Err(UserError::MalformedHeaders);
+ }
+ }
+ Ok(())
+ }
+
+ pub fn send_push_promise<B>(
+ &mut self,
+ frame: frame::PushPromise,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ task: &mut Option<Waker>,
+ ) -> Result<(), UserError> {
+ if !self.is_push_enabled {
+ return Err(UserError::PeerDisabledServerPush);
+ }
+
+ tracing::trace!(
+ "send_push_promise; frame={:?}; init_window={:?}",
+ frame,
+ self.init_window_sz
+ );
+
+ Self::check_headers(frame.fields())?;
+
+ // Queue the frame for sending
+ self.prioritize
+ .queue_frame(frame.into(), buffer, stream, task);
+
+ Ok(())
+ }
+
+ pub fn send_headers<B>(
+ &mut self,
+ frame: frame::Headers,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) -> Result<(), UserError> {
+ tracing::trace!(
+ "send_headers; frame={:?}; init_window={:?}",
+ frame,
+ self.init_window_sz
+ );
+
+ Self::check_headers(frame.fields())?;
+
+ let end_stream = frame.is_end_stream();
+
+ // Update the state
+ stream.state.send_open(end_stream)?;
+
+ let mut pending_open = false;
+ if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push {
+ self.prioritize.queue_open(stream);
+ pending_open = true;
+ }
+
+ // Queue the frame for sending
+ //
+ // This call expects that, since new streams are in the open queue, new
+ // streams won't be pushed on pending_send.
+ self.prioritize
+ .queue_frame(frame.into(), buffer, stream, task);
+
+ // Need to notify the connection when pushing onto pending_open since
+ // queue_frame only notifies for pending_send.
+ if pending_open {
+ if let Some(task) = task.take() {
+ task.wake();
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Send an explicit RST_STREAM frame
+ pub fn send_reset<B>(
+ &mut self,
+ reason: Reason,
+ initiator: Initiator,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) {
+ let is_reset = stream.state.is_reset();
+ let is_closed = stream.state.is_closed();
+ let is_empty = stream.pending_send.is_empty();
+ let stream_id = stream.id;
+
+ tracing::trace!(
+ "send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \
+ is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
+ state={:?} \
+ ",
+ reason,
+ initiator,
+ stream_id,
+ is_reset,
+ is_closed,
+ is_empty,
+ stream.state
+ );
+
+ if is_reset {
+ // Don't double reset
+ tracing::trace!(
+ " -> not sending RST_STREAM ({:?} is already reset)",
+ stream_id
+ );
+ return;
+ }
+
+ // Transition the state to reset no matter what.
+ stream.state.set_reset(stream_id, reason, initiator);
+
+ // If closed AND the send queue is flushed, then the stream cannot be
+ // reset explicitly, either. Implicit resets can still be queued.
+ if is_closed && is_empty {
+ tracing::trace!(
+ " -> not sending explicit RST_STREAM ({:?} was closed \
+ and send queue was flushed)",
+ stream_id
+ );
+ return;
+ }
+
+ // Clear all pending outbound frames.
+ // Note that we don't call `self.recv_err` because we want to enqueue
+ // the reset frame before transitioning the stream inside
+ // `reclaim_all_capacity`.
+ self.prioritize.clear_queue(buffer, stream);
+
+ let frame = frame::Reset::new(stream.id, reason);
+
+ tracing::trace!("send_reset -- queueing; frame={:?}", frame);
+ self.prioritize
+ .queue_frame(frame.into(), buffer, stream, task);
+ self.prioritize.reclaim_all_capacity(stream, counts);
+ }
+
+ pub fn schedule_implicit_reset(
+ &mut self,
+ stream: &mut store::Ptr,
+ reason: Reason,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) {
+ if stream.state.is_closed() {
+ // Stream is already closed, nothing more to do
+ return;
+ }
+
+ stream.state.set_scheduled_reset(reason);
+
+ self.prioritize.reclaim_reserved_capacity(stream, counts);
+ self.prioritize.schedule_send(stream, task);
+ }
+
+ pub fn send_data<B>(
+ &mut self,
+ frame: frame::Data<B>,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) -> Result<(), UserError>
+ where
+ B: Buf,
+ {
+ self.prioritize
+ .send_data(frame, buffer, stream, counts, task)
+ }
+
+ pub fn send_trailers<B>(
+ &mut self,
+ frame: frame::Headers,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) -> Result<(), UserError> {
+ // TODO: Should this logic be moved into state.rs?
+ if !stream.state.is_send_streaming() {
+ return Err(UserError::UnexpectedFrameType);
+ }
+
+ stream.state.send_close();
+
+ tracing::trace!("send_trailers -- queuing; frame={:?}", frame);
+ self.prioritize
+ .queue_frame(frame.into(), buffer, stream, task);
+
+ // Release any excess capacity
+ self.prioritize.reserve_capacity(0, stream, counts);
+
+ Ok(())
+ }
+
+ pub fn poll_complete<T, B>(
+ &mut self,
+ cx: &mut Context,
+ buffer: &mut Buffer<Frame<B>>,
+ store: &mut Store,
+ counts: &mut Counts,
+ dst: &mut Codec<T, Prioritized<B>>,
+ ) -> Poll<io::Result<()>>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ {
+ self.prioritize
+ .poll_complete(cx, buffer, store, counts, dst)
+ }
+
+ /// Request capacity to send data
+ pub fn reserve_capacity(
+ &mut self,
+ capacity: WindowSize,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ ) {
+ self.prioritize.reserve_capacity(capacity, stream, counts)
+ }
+
+ pub fn poll_capacity(
+ &mut self,
+ cx: &Context,
+ stream: &mut store::Ptr,
+ ) -> Poll<Option<Result<WindowSize, UserError>>> {
+ if !stream.state.is_send_streaming() {
+ return Poll::Ready(None);
+ }
+
+ if !stream.send_capacity_inc {
+ stream.wait_send(cx);
+ return Poll::Pending;
+ }
+
+ stream.send_capacity_inc = false;
+
+ Poll::Ready(Some(Ok(self.capacity(stream))))
+ }
+
+ /// Current available stream send capacity
+ pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize {
+ stream.capacity(self.prioritize.max_buffer_size())
+ }
+
+ pub fn poll_reset(
+ &self,
+ cx: &Context,
+ stream: &mut Stream,
+ mode: PollReset,
+ ) -> Poll<Result<Reason, crate::Error>> {
+ match stream.state.ensure_reason(mode)? {
+ Some(reason) => Poll::Ready(Ok(reason)),
+ None => {
+ stream.wait_send(cx);
+ Poll::Pending
+ }
+ }
+ }
+
+ pub fn recv_connection_window_update(
+ &mut self,
+ frame: frame::WindowUpdate,
+ store: &mut Store,
+ counts: &mut Counts,
+ ) -> Result<(), Reason> {
+ self.prioritize
+ .recv_connection_window_update(frame.size_increment(), store, counts)
+ }
+
+ pub fn recv_stream_window_update<B>(
+ &mut self,
+ sz: WindowSize,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) -> Result<(), Reason> {
+ if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
+ tracing::debug!("recv_stream_window_update !!; err={:?}", e);
+
+ self.send_reset(
+ Reason::FLOW_CONTROL_ERROR,
+ Initiator::Library,
+ buffer,
+ stream,
+ counts,
+ task,
+ );
+
+ return Err(e);
+ }
+
+ Ok(())
+ }
+
+ pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> {
+ if last_stream_id > self.max_stream_id {
+ // The remote endpoint sent a `GOAWAY` frame indicating a stream
+ // that we never sent, or that we have already terminated on account
+ // of previous `GOAWAY` frame. In either case, that is illegal.
+ // (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase
+ // the value they send in the last stream identifier, since the
+ // peers might already have retried unprocessed requests on another
+ // connection.")
+ proto_err!(conn:
+ "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})",
+ last_stream_id, self.max_stream_id,
+ );
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+
+ self.max_stream_id = last_stream_id;
+ Ok(())
+ }
+
+ pub fn handle_error<B>(
+ &mut self,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ ) {
+ // Clear all pending outbound frames
+ self.prioritize.clear_queue(buffer, stream);
+ self.prioritize.reclaim_all_capacity(stream, counts);
+ }
+
+ pub fn apply_remote_settings<B>(
+ &mut self,
+ settings: &frame::Settings,
+ buffer: &mut Buffer<Frame<B>>,
+ store: &mut Store,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) -> Result<(), Error> {
+ if let Some(val) = settings.is_extended_connect_protocol_enabled() {
+ self.is_extended_connect_protocol_enabled = val;
+ }
+
+ // Applies an update to the remote endpoint's initial window size.
+ //
+ // Per RFC 7540 §6.9.2:
+ //
+ // In addition to changing the flow-control window for streams that are
+ // not yet active, a SETTINGS frame can alter the initial flow-control
+ // window size for streams with active flow-control windows (that is,
+ // streams in the "open" or "half-closed (remote)" state). When the
+ // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
+ // the size of all stream flow-control windows that it maintains by the
+ // difference between the new value and the old value.
+ //
+ // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
+ // space in a flow-control window to become negative. A sender MUST
+ // track the negative flow-control window and MUST NOT send new
+ // flow-controlled frames until it receives WINDOW_UPDATE frames that
+ // cause the flow-control window to become positive.
+ if let Some(val) = settings.initial_window_size() {
+ let old_val = self.init_window_sz;
+ self.init_window_sz = val;
+
+ match val.cmp(&old_val) {
+ Ordering::Less => {
+ // We must decrease the (remote) window on every open stream.
+ let dec = old_val - val;
+ tracing::trace!("decrementing all windows; dec={}", dec);
+
+ let mut total_reclaimed = 0;
+ store.try_for_each(|mut stream| {
+ let stream = &mut *stream;
+
+ tracing::trace!(
+ "decrementing stream window; id={:?}; decr={}; flow={:?}",
+ stream.id,
+ dec,
+ stream.send_flow
+ );
+
+ // TODO: this decrement can underflow based on received frames!
+ stream
+ .send_flow
+ .dec_send_window(dec)
+ .map_err(proto::Error::library_go_away)?;
+
+ // It's possible that decreasing the window causes
+ // `window_size` (the stream-specific window) to fall below
+ // `available` (the portion of the connection-level window
+ // that we have allocated to the stream).
+ // In this case, we should take that excess allocation away
+ // and reassign it to other streams.
+ let window_size = stream.send_flow.window_size();
+ let available = stream.send_flow.available().as_size();
+ let reclaimed = if available > window_size {
+ // Drop down to `window_size`.
+ let reclaim = available - window_size;
+ stream
+ .send_flow
+ .claim_capacity(reclaim)
+ .map_err(proto::Error::library_go_away)?;
+ total_reclaimed += reclaim;
+ reclaim
+ } else {
+ 0
+ };
+
+ tracing::trace!(
+ "decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}",
+ stream.id,
+ dec,
+ reclaimed,
+ stream.send_flow
+ );
+
+ // TODO: Should this notify the producer when the capacity
+ // of a stream is reduced? Maybe it should if the capacity
+ // is reduced to zero, allowing the producer to stop work.
+
+ Ok::<_, proto::Error>(())
+ })?;
+
+ self.prioritize
+ .assign_connection_capacity(total_reclaimed, store, counts);
+ }
+ Ordering::Greater => {
+ let inc = val - old_val;
+
+ store.try_for_each(|mut stream| {
+ self.recv_stream_window_update(inc, buffer, &mut stream, counts, task)
+ .map_err(Error::library_go_away)
+ })?;
+ }
+ Ordering::Equal => (),
+ }
+ }
+
+ if let Some(val) = settings.is_push_enabled() {
+ self.is_push_enabled = val
+ }
+
+ Ok(())
+ }
+
+ pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) {
+ self.prioritize.clear_pending_capacity(store, counts);
+ self.prioritize.clear_pending_send(store, counts);
+ self.prioritize.clear_pending_open(store, counts);
+ }
+
+ pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
+ if let Ok(next) = self.next_stream_id {
+ if id >= next {
+ return Err(Reason::PROTOCOL_ERROR);
+ }
+ }
+ // if next_stream_id is overflowed, that's ok.
+
+ Ok(())
+ }
+
+ pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> {
+ self.next_stream_id
+ .map_err(|_| UserError::OverflowedStreamId)
+ }
+
+ pub fn may_have_created_stream(&self, id: StreamId) -> bool {
+ if let Ok(next_id) = self.next_stream_id {
+ // Peer::is_local_init should have been called beforehand
+ debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
+ id < next_id
+ } else {
+ true
+ }
+ }
+
+ pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
+ if let Ok(next_id) = self.next_stream_id {
+ // Peer::is_local_init should have been called beforehand
+ debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
+ if id >= next_id {
+ self.next_stream_id = id.next_id();
+ }
+ }
+ }
+
+ pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
+ self.is_extended_connect_protocol_enabled
+ }
+}
diff --git a/third_party/rust/h2/src/proto/streams/state.rs b/third_party/rust/h2/src/proto/streams/state.rs
new file mode 100644
index 0000000000..5256f09cf4
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/state.rs
@@ -0,0 +1,469 @@
+use std::io;
+
+use crate::codec::UserError;
+use crate::frame::{self, Reason, StreamId};
+use crate::proto::{self, Error, Initiator, PollReset};
+
+use self::Inner::*;
+use self::Peer::*;
+
+/// Represents the state of an H2 stream
+///
+/// ```not_rust
+/// +--------+
+/// send PP | | recv PP
+/// ,--------| idle |--------.
+/// / | | \
+/// v +--------+ v
+/// +----------+ | +----------+
+/// | | | send H / | |
+/// ,------| reserved | | recv H | reserved |------.
+/// | | (local) | | | (remote) | |
+/// | +----------+ v +----------+ |
+/// | | +--------+ | |
+/// | | recv ES | | send ES | |
+/// | send H | ,-------| open |-------. | recv H |
+/// | | / | | \ | |
+/// | v v +--------+ v v |
+/// | +----------+ | +----------+ |
+/// | | half | | | half | |
+/// | | closed | | send R / | closed | |
+/// | | (remote) | | recv R | (local) | |
+/// | +----------+ | +----------+ |
+/// | | | | |
+/// | | send ES / | recv ES / | |
+/// | | send R / v send R / | |
+/// | | recv R +--------+ recv R | |
+/// | send R / `----------->| |<-----------' send R / |
+/// | recv R | closed | recv R |
+/// `----------------------->| |<----------------------'
+/// +--------+
+///
+/// send: endpoint sends this frame
+/// recv: endpoint receives this frame
+///
+/// H: HEADERS frame (with implied CONTINUATIONs)
+/// PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
+/// ES: END_STREAM flag
+/// R: RST_STREAM frame
+/// ```
+#[derive(Debug, Clone)]
+pub struct State {
+ inner: Inner,
+}
+
+#[derive(Debug, Clone)]
+enum Inner {
+ Idle,
+ // TODO: these states shouldn't count against concurrency limits:
+ ReservedLocal,
+ ReservedRemote,
+ Open { local: Peer, remote: Peer },
+ HalfClosedLocal(Peer), // TODO: explicitly name this value
+ HalfClosedRemote(Peer),
+ Closed(Cause),
+}
+
+#[derive(Debug, Copy, Clone, Default)]
+enum Peer {
+ #[default]
+ AwaitingHeaders,
+ Streaming,
+}
+
+#[derive(Debug, Clone)]
+enum Cause {
+ EndStream,
+ Error(Error),
+
+ /// This indicates to the connection that a reset frame must be sent out
+ /// once the send queue has been flushed.
+ ///
+ /// Examples of when this could happen:
+ /// - User drops all references to a stream, so we want to CANCEL the it.
+ /// - Header block size was too large, so we want to REFUSE, possibly
+ /// after sending a 431 response frame.
+ ScheduledLibraryReset(Reason),
+}
+
+impl State {
+ /// Opens the send-half of a stream if it is not already open.
+ pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> {
+ let local = Streaming;
+
+ self.inner = match self.inner {
+ Idle => {
+ if eos {
+ HalfClosedLocal(AwaitingHeaders)
+ } else {
+ Open {
+ local,
+ remote: AwaitingHeaders,
+ }
+ }
+ }
+ Open {
+ local: AwaitingHeaders,
+ remote,
+ } => {
+ if eos {
+ HalfClosedLocal(remote)
+ } else {
+ Open { local, remote }
+ }
+ }
+ HalfClosedRemote(AwaitingHeaders) | ReservedLocal => {
+ if eos {
+ Closed(Cause::EndStream)
+ } else {
+ HalfClosedRemote(local)
+ }
+ }
+ _ => {
+ // All other transitions result in a protocol error
+ return Err(UserError::UnexpectedFrameType);
+ }
+ };
+
+ Ok(())
+ }
+
+ /// Opens the receive-half of the stream when a HEADERS frame is received.
+ ///
+ /// Returns true if this transitions the state to Open.
+ pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> {
+ let mut initial = false;
+ let eos = frame.is_end_stream();
+
+ self.inner = match self.inner {
+ Idle => {
+ initial = true;
+
+ if eos {
+ HalfClosedRemote(AwaitingHeaders)
+ } else {
+ Open {
+ local: AwaitingHeaders,
+ remote: if frame.is_informational() {
+ tracing::trace!("skipping 1xx response headers");
+ AwaitingHeaders
+ } else {
+ Streaming
+ },
+ }
+ }
+ }
+ ReservedRemote => {
+ initial = true;
+
+ if eos {
+ Closed(Cause::EndStream)
+ } else if frame.is_informational() {
+ tracing::trace!("skipping 1xx response headers");
+ ReservedRemote
+ } else {
+ HalfClosedLocal(Streaming)
+ }
+ }
+ Open {
+ local,
+ remote: AwaitingHeaders,
+ } => {
+ if eos {
+ HalfClosedRemote(local)
+ } else {
+ Open {
+ local,
+ remote: if frame.is_informational() {
+ tracing::trace!("skipping 1xx response headers");
+ AwaitingHeaders
+ } else {
+ Streaming
+ },
+ }
+ }
+ }
+ HalfClosedLocal(AwaitingHeaders) => {
+ if eos {
+ Closed(Cause::EndStream)
+ } else if frame.is_informational() {
+ tracing::trace!("skipping 1xx response headers");
+ HalfClosedLocal(AwaitingHeaders)
+ } else {
+ HalfClosedLocal(Streaming)
+ }
+ }
+ ref state => {
+ // All other transitions result in a protocol error
+ proto_err!(conn: "recv_open: in unexpected state {:?}", state);
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+ };
+
+ Ok(initial)
+ }
+
+ /// Transition from Idle -> ReservedRemote
+ pub fn reserve_remote(&mut self) -> Result<(), Error> {
+ match self.inner {
+ Idle => {
+ self.inner = ReservedRemote;
+ Ok(())
+ }
+ ref state => {
+ proto_err!(conn: "reserve_remote: in unexpected state {:?}", state);
+ Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
+ }
+ }
+ }
+
+ /// Transition from Idle -> ReservedLocal
+ pub fn reserve_local(&mut self) -> Result<(), UserError> {
+ match self.inner {
+ Idle => {
+ self.inner = ReservedLocal;
+ Ok(())
+ }
+ _ => Err(UserError::UnexpectedFrameType),
+ }
+ }
+
+ /// Indicates that the remote side will not send more data to the local.
+ pub fn recv_close(&mut self) -> Result<(), Error> {
+ match self.inner {
+ Open { local, .. } => {
+ // The remote side will continue to receive data.
+ tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local);
+ self.inner = HalfClosedRemote(local);
+ Ok(())
+ }
+ HalfClosedLocal(..) => {
+ tracing::trace!("recv_close: HalfClosedLocal => Closed");
+ self.inner = Closed(Cause::EndStream);
+ Ok(())
+ }
+ ref state => {
+ proto_err!(conn: "recv_close: in unexpected state {:?}", state);
+ Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
+ }
+ }
+ }
+
+ /// The remote explicitly sent a RST_STREAM.
+ ///
+ /// # Arguments
+ /// - `frame`: the received RST_STREAM frame.
+ /// - `queued`: true if this stream has frames in the pending send queue.
+ pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) {
+ match self.inner {
+ // If the stream is already in a `Closed` state, do nothing,
+ // provided that there are no frames still in the send queue.
+ Closed(..) if !queued => {}
+ // A notionally `Closed` stream may still have queued frames in
+ // the following cases:
+ //
+ // - if the cause is `Cause::Scheduled(..)` (i.e. we have not
+ // actually closed the stream yet).
+ // - if the cause is `Cause::EndStream`: we transition to this
+ // state when an EOS frame is *enqueued* (so that it's invalid
+ // to enqueue more frames), not when the EOS frame is *sent*;
+ // therefore, there may still be frames ahead of the EOS frame
+ // in the send queue.
+ //
+ // In either of these cases, we want to overwrite the stream's
+ // previous state with the received RST_STREAM, so that the queue
+ // will be cleared by `Prioritize::pop_frame`.
+ ref state => {
+ tracing::trace!(
+ "recv_reset; frame={:?}; state={:?}; queued={:?}",
+ frame,
+ state,
+ queued
+ );
+ self.inner = Closed(Cause::Error(Error::remote_reset(
+ frame.stream_id(),
+ frame.reason(),
+ )));
+ }
+ }
+ }
+
+ /// Handle a connection-level error.
+ pub fn handle_error(&mut self, err: &proto::Error) {
+ match self.inner {
+ Closed(..) => {}
+ _ => {
+ tracing::trace!("handle_error; err={:?}", err);
+ self.inner = Closed(Cause::Error(err.clone()));
+ }
+ }
+ }
+
+ pub fn recv_eof(&mut self) {
+ match self.inner {
+ Closed(..) => {}
+ ref state => {
+ tracing::trace!("recv_eof; state={:?}", state);
+ self.inner = Closed(Cause::Error(
+ io::Error::new(
+ io::ErrorKind::BrokenPipe,
+ "stream closed because of a broken pipe",
+ )
+ .into(),
+ ));
+ }
+ }
+ }
+
+ /// Indicates that the local side will not send more data to the local.
+ pub fn send_close(&mut self) {
+ match self.inner {
+ Open { remote, .. } => {
+ // The remote side will continue to receive data.
+ tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote);
+ self.inner = HalfClosedLocal(remote);
+ }
+ HalfClosedRemote(..) => {
+ tracing::trace!("send_close: HalfClosedRemote => Closed");
+ self.inner = Closed(Cause::EndStream);
+ }
+ ref state => panic!("send_close: unexpected state {:?}", state),
+ }
+ }
+
+ /// Set the stream state to reset locally.
+ pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) {
+ self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator)));
+ }
+
+ /// Set the stream state to a scheduled reset.
+ pub fn set_scheduled_reset(&mut self, reason: Reason) {
+ debug_assert!(!self.is_closed());
+ self.inner = Closed(Cause::ScheduledLibraryReset(reason));
+ }
+
+ pub fn get_scheduled_reset(&self) -> Option<Reason> {
+ match self.inner {
+ Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason),
+ _ => None,
+ }
+ }
+
+ pub fn is_scheduled_reset(&self) -> bool {
+ matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..)))
+ }
+
+ pub fn is_local_error(&self) -> bool {
+ match self.inner {
+ Closed(Cause::Error(ref e)) => e.is_local(),
+ Closed(Cause::ScheduledLibraryReset(..)) => true,
+ _ => false,
+ }
+ }
+
+ pub fn is_remote_reset(&self) -> bool {
+ matches!(
+ self.inner,
+ Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote)))
+ )
+ }
+
+ /// Returns true if the stream is already reset.
+ pub fn is_reset(&self) -> bool {
+ match self.inner {
+ Closed(Cause::EndStream) => false,
+ Closed(_) => true,
+ _ => false,
+ }
+ }
+
+ pub fn is_send_streaming(&self) -> bool {
+ matches!(
+ self.inner,
+ Open {
+ local: Streaming,
+ ..
+ } | HalfClosedRemote(Streaming)
+ )
+ }
+
+ /// Returns true when the stream is in a state to receive headers
+ pub fn is_recv_headers(&self) -> bool {
+ matches!(
+ self.inner,
+ Idle | Open {
+ remote: AwaitingHeaders,
+ ..
+ } | HalfClosedLocal(AwaitingHeaders)
+ | ReservedRemote
+ )
+ }
+
+ pub fn is_recv_streaming(&self) -> bool {
+ matches!(
+ self.inner,
+ Open {
+ remote: Streaming,
+ ..
+ } | HalfClosedLocal(Streaming)
+ )
+ }
+
+ pub fn is_closed(&self) -> bool {
+ matches!(self.inner, Closed(_))
+ }
+
+ pub fn is_recv_closed(&self) -> bool {
+ matches!(
+ self.inner,
+ Closed(..) | HalfClosedRemote(..) | ReservedLocal
+ )
+ }
+
+ pub fn is_send_closed(&self) -> bool {
+ matches!(
+ self.inner,
+ Closed(..) | HalfClosedLocal(..) | ReservedRemote
+ )
+ }
+
+ pub fn is_idle(&self) -> bool {
+ matches!(self.inner, Idle)
+ }
+
+ pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
+ // TODO: Is this correct?
+ match self.inner {
+ Closed(Cause::Error(ref e)) => Err(e.clone()),
+ Closed(Cause::ScheduledLibraryReset(reason)) => {
+ Err(proto::Error::library_go_away(reason))
+ }
+ Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false),
+ _ => Ok(true),
+ }
+ }
+
+ /// Returns a reason if the stream has been reset.
+ pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> {
+ match self.inner {
+ Closed(Cause::Error(Error::Reset(_, reason, _)))
+ | Closed(Cause::Error(Error::GoAway(_, reason, _)))
+ | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)),
+ Closed(Cause::Error(ref e)) => Err(e.clone().into()),
+ Open {
+ local: Streaming, ..
+ }
+ | HalfClosedRemote(Streaming) => match mode {
+ PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()),
+ PollReset::Streaming => Ok(None),
+ },
+ _ => Ok(None),
+ }
+ }
+}
+
+impl Default for State {
+ fn default() -> State {
+ State { inner: Inner::Idle }
+ }
+}
diff --git a/third_party/rust/h2/src/proto/streams/store.rs b/third_party/rust/h2/src/proto/streams/store.rs
new file mode 100644
index 0000000000..67b377b12c
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/store.rs
@@ -0,0 +1,464 @@
+use super::*;
+
+use indexmap::{self, IndexMap};
+
+use std::convert::Infallible;
+use std::fmt;
+use std::marker::PhantomData;
+use std::ops;
+
+/// Storage for streams
+#[derive(Debug)]
+pub(super) struct Store {
+ slab: slab::Slab<Stream>,
+ ids: IndexMap<StreamId, SlabIndex>,
+}
+
+/// "Pointer" to an entry in the store
+pub(super) struct Ptr<'a> {
+ key: Key,
+ store: &'a mut Store,
+}
+
+/// References an entry in the store.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) struct Key {
+ index: SlabIndex,
+ /// Keep the stream ID in the key as an ABA guard, since slab indices
+ /// could be re-used with a new stream.
+ stream_id: StreamId,
+}
+
+// We can never have more than `StreamId::MAX` streams in the store,
+// so we can save a smaller index (u32 vs usize).
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+struct SlabIndex(u32);
+
+#[derive(Debug)]
+pub(super) struct Queue<N> {
+ indices: Option<store::Indices>,
+ _p: PhantomData<N>,
+}
+
+pub(super) trait Next {
+ fn next(stream: &Stream) -> Option<Key>;
+
+ fn set_next(stream: &mut Stream, key: Option<Key>);
+
+ fn take_next(stream: &mut Stream) -> Option<Key>;
+
+ fn is_queued(stream: &Stream) -> bool;
+
+ fn set_queued(stream: &mut Stream, val: bool);
+}
+
+/// A linked list
+#[derive(Debug, Clone, Copy)]
+struct Indices {
+ pub head: Key,
+ pub tail: Key,
+}
+
+pub(super) enum Entry<'a> {
+ Occupied(OccupiedEntry<'a>),
+ Vacant(VacantEntry<'a>),
+}
+
+pub(super) struct OccupiedEntry<'a> {
+ ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>,
+}
+
+pub(super) struct VacantEntry<'a> {
+ ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>,
+ slab: &'a mut slab::Slab<Stream>,
+}
+
+pub(super) trait Resolve {
+ fn resolve(&mut self, key: Key) -> Ptr;
+}
+
+// ===== impl Store =====
+
+impl Store {
+ pub fn new() -> Self {
+ Store {
+ slab: slab::Slab::new(),
+ ids: IndexMap::new(),
+ }
+ }
+
+ pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr> {
+ let index = match self.ids.get(id) {
+ Some(key) => *key,
+ None => return None,
+ };
+
+ Some(Ptr {
+ key: Key {
+ index,
+ stream_id: *id,
+ },
+ store: self,
+ })
+ }
+
+ pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr {
+ let index = SlabIndex(self.slab.insert(val) as u32);
+ assert!(self.ids.insert(id, index).is_none());
+
+ Ptr {
+ key: Key {
+ index,
+ stream_id: id,
+ },
+ store: self,
+ }
+ }
+
+ pub fn find_entry(&mut self, id: StreamId) -> Entry {
+ use self::indexmap::map::Entry::*;
+
+ match self.ids.entry(id) {
+ Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }),
+ Vacant(e) => Entry::Vacant(VacantEntry {
+ ids: e,
+ slab: &mut self.slab,
+ }),
+ }
+ }
+
+ pub(crate) fn for_each<F>(&mut self, mut f: F)
+ where
+ F: FnMut(Ptr),
+ {
+ match self.try_for_each(|ptr| {
+ f(ptr);
+ Ok::<_, Infallible>(())
+ }) {
+ Ok(()) => (),
+ Err(infallible) => match infallible {},
+ }
+ }
+
+ pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
+ where
+ F: FnMut(Ptr) -> Result<(), E>,
+ {
+ let mut len = self.ids.len();
+ let mut i = 0;
+
+ while i < len {
+ // Get the key by index, this makes the borrow checker happy
+ let (stream_id, index) = {
+ let entry = self.ids.get_index(i).unwrap();
+ (*entry.0, *entry.1)
+ };
+
+ f(Ptr {
+ key: Key { index, stream_id },
+ store: self,
+ })?;
+
+ // TODO: This logic probably could be better...
+ let new_len = self.ids.len();
+
+ if new_len < len {
+ debug_assert!(new_len == len - 1);
+ len -= 1;
+ } else {
+ i += 1;
+ }
+ }
+
+ Ok(())
+ }
+}
+
+impl Resolve for Store {
+ fn resolve(&mut self, key: Key) -> Ptr {
+ Ptr { key, store: self }
+ }
+}
+
+impl ops::Index<Key> for Store {
+ type Output = Stream;
+
+ fn index(&self, key: Key) -> &Self::Output {
+ self.slab
+ .get(key.index.0 as usize)
+ .filter(|s| s.id == key.stream_id)
+ .unwrap_or_else(|| {
+ panic!("dangling store key for stream_id={:?}", key.stream_id);
+ })
+ }
+}
+
+impl ops::IndexMut<Key> for Store {
+ fn index_mut(&mut self, key: Key) -> &mut Self::Output {
+ self.slab
+ .get_mut(key.index.0 as usize)
+ .filter(|s| s.id == key.stream_id)
+ .unwrap_or_else(|| {
+ panic!("dangling store key for stream_id={:?}", key.stream_id);
+ })
+ }
+}
+
+impl Store {
+ #[cfg(feature = "unstable")]
+ pub fn num_active_streams(&self) -> usize {
+ self.ids.len()
+ }
+
+ #[cfg(feature = "unstable")]
+ pub fn num_wired_streams(&self) -> usize {
+ self.slab.len()
+ }
+}
+
+// While running h2 unit/integration tests, enable this debug assertion.
+//
+// In practice, we don't need to ensure this. But the integration tests
+// help to make sure we've cleaned up in cases where we could (like, the
+// runtime isn't suddenly dropping the task for unknown reasons).
+#[cfg(feature = "unstable")]
+impl Drop for Store {
+ fn drop(&mut self) {
+ use std::thread;
+
+ if !thread::panicking() {
+ debug_assert!(self.slab.is_empty());
+ }
+ }
+}
+
+// ===== impl Queue =====
+
+impl<N> Queue<N>
+where
+ N: Next,
+{
+ pub fn new() -> Self {
+ Queue {
+ indices: None,
+ _p: PhantomData,
+ }
+ }
+
+ pub fn take(&mut self) -> Self {
+ Queue {
+ indices: self.indices.take(),
+ _p: PhantomData,
+ }
+ }
+
+ /// Queue the stream.
+ ///
+ /// If the stream is already contained by the list, return `false`.
+ pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
+ tracing::trace!("Queue::push_back");
+
+ if N::is_queued(stream) {
+ tracing::trace!(" -> already queued");
+ return false;
+ }
+
+ N::set_queued(stream, true);
+
+ // The next pointer shouldn't be set
+ debug_assert!(N::next(stream).is_none());
+
+ // Queue the stream
+ match self.indices {
+ Some(ref mut idxs) => {
+ tracing::trace!(" -> existing entries");
+
+ // Update the current tail node to point to `stream`
+ let key = stream.key();
+ N::set_next(&mut stream.resolve(idxs.tail), Some(key));
+
+ // Update the tail pointer
+ idxs.tail = stream.key();
+ }
+ None => {
+ tracing::trace!(" -> first entry");
+ self.indices = Some(store::Indices {
+ head: stream.key(),
+ tail: stream.key(),
+ });
+ }
+ }
+
+ true
+ }
+
+ /// Queue the stream
+ ///
+ /// If the stream is already contained by the list, return `false`.
+ pub fn push_front(&mut self, stream: &mut store::Ptr) -> bool {
+ tracing::trace!("Queue::push_front");
+
+ if N::is_queued(stream) {
+ tracing::trace!(" -> already queued");
+ return false;
+ }
+
+ N::set_queued(stream, true);
+
+ // The next pointer shouldn't be set
+ debug_assert!(N::next(stream).is_none());
+
+ // Queue the stream
+ match self.indices {
+ Some(ref mut idxs) => {
+ tracing::trace!(" -> existing entries");
+
+ // Update the provided stream to point to the head node
+ let head_key = stream.resolve(idxs.head).key();
+ N::set_next(stream, Some(head_key));
+
+ // Update the head pointer
+ idxs.head = stream.key();
+ }
+ None => {
+ tracing::trace!(" -> first entry");
+ self.indices = Some(store::Indices {
+ head: stream.key(),
+ tail: stream.key(),
+ });
+ }
+ }
+
+ true
+ }
+
+ pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
+ where
+ R: Resolve,
+ {
+ if let Some(mut idxs) = self.indices {
+ let mut stream = store.resolve(idxs.head);
+
+ if idxs.head == idxs.tail {
+ assert!(N::next(&stream).is_none());
+ self.indices = None;
+ } else {
+ idxs.head = N::take_next(&mut stream).unwrap();
+ self.indices = Some(idxs);
+ }
+
+ debug_assert!(N::is_queued(&stream));
+ N::set_queued(&mut stream, false);
+
+ return Some(stream);
+ }
+
+ None
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.indices.is_none()
+ }
+
+ pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>>
+ where
+ R: Resolve,
+ F: Fn(&Stream) -> bool,
+ {
+ if let Some(idxs) = self.indices {
+ let should_pop = f(&store.resolve(idxs.head));
+ if should_pop {
+ return self.pop(store);
+ }
+ }
+
+ None
+ }
+}
+
+// ===== impl Ptr =====
+
+impl<'a> Ptr<'a> {
+ /// Returns the Key associated with the stream
+ pub fn key(&self) -> Key {
+ self.key
+ }
+
+ pub fn store_mut(&mut self) -> &mut Store {
+ self.store
+ }
+
+ /// Remove the stream from the store
+ pub fn remove(self) -> StreamId {
+ // The stream must have been unlinked before this point
+ debug_assert!(!self.store.ids.contains_key(&self.key.stream_id));
+
+ // Remove the stream state
+ let stream = self.store.slab.remove(self.key.index.0 as usize);
+ assert_eq!(stream.id, self.key.stream_id);
+ stream.id
+ }
+
+ /// Remove the StreamId -> stream state association.
+ ///
+ /// This will effectively remove the stream as far as the H2 protocol is
+ /// concerned.
+ pub fn unlink(&mut self) {
+ let id = self.key.stream_id;
+ self.store.ids.swap_remove(&id);
+ }
+}
+
+impl<'a> Resolve for Ptr<'a> {
+ fn resolve(&mut self, key: Key) -> Ptr {
+ Ptr {
+ key,
+ store: &mut *self.store,
+ }
+ }
+}
+
+impl<'a> ops::Deref for Ptr<'a> {
+ type Target = Stream;
+
+ fn deref(&self) -> &Stream {
+ &self.store[self.key]
+ }
+}
+
+impl<'a> ops::DerefMut for Ptr<'a> {
+ fn deref_mut(&mut self) -> &mut Stream {
+ &mut self.store[self.key]
+ }
+}
+
+impl<'a> fmt::Debug for Ptr<'a> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ (**self).fmt(fmt)
+ }
+}
+
+// ===== impl OccupiedEntry =====
+
+impl<'a> OccupiedEntry<'a> {
+ pub fn key(&self) -> Key {
+ let stream_id = *self.ids.key();
+ let index = *self.ids.get();
+ Key { index, stream_id }
+ }
+}
+
+// ===== impl VacantEntry =====
+
+impl<'a> VacantEntry<'a> {
+ pub fn insert(self, value: Stream) -> Key {
+ // Insert the value in the slab
+ let stream_id = value.id;
+ let index = SlabIndex(self.slab.insert(value) as u32);
+
+ // Insert the handle in the ID map
+ self.ids.insert(index);
+
+ Key { index, stream_id }
+ }
+}
diff --git a/third_party/rust/h2/src/proto/streams/stream.rs b/third_party/rust/h2/src/proto/streams/stream.rs
new file mode 100644
index 0000000000..43e313647a
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/stream.rs
@@ -0,0 +1,527 @@
+use super::*;
+
+use std::task::{Context, Waker};
+use std::time::Instant;
+use std::usize;
+
+/// Tracks Stream related state
+///
+/// # Reference counting
+///
+/// There can be a number of outstanding handles to a single Stream. These are
+/// tracked using reference counting. The `ref_count` field represents the
+/// number of outstanding userspace handles that can reach this stream.
+///
+/// It's important to note that when the stream is placed in an internal queue
+/// (such as an accept queue), this is **not** tracked by a reference count.
+/// Thus, `ref_count` can be zero and the stream still has to be kept around.
+#[derive(Debug)]
+pub(super) struct Stream {
+ /// The h2 stream identifier
+ pub id: StreamId,
+
+ /// Current state of the stream
+ pub state: State,
+
+ /// Set to `true` when the stream is counted against the connection's max
+ /// concurrent streams.
+ pub is_counted: bool,
+
+ /// Number of outstanding handles pointing to this stream
+ pub ref_count: usize,
+
+ // ===== Fields related to sending =====
+ /// Next node in the accept linked list
+ pub next_pending_send: Option<store::Key>,
+
+ /// Set to true when the stream is pending accept
+ pub is_pending_send: bool,
+
+ /// Send data flow control
+ pub send_flow: FlowControl,
+
+ /// Amount of send capacity that has been requested, but not yet allocated.
+ pub requested_send_capacity: WindowSize,
+
+ /// Amount of data buffered at the prioritization layer.
+ /// TODO: Technically this could be greater than the window size...
+ pub buffered_send_data: usize,
+
+ /// Task tracking additional send capacity (i.e. window updates).
+ send_task: Option<Waker>,
+
+ /// Frames pending for this stream being sent to the socket
+ pub pending_send: buffer::Deque,
+
+ /// Next node in the linked list of streams waiting for additional
+ /// connection level capacity.
+ pub next_pending_send_capacity: Option<store::Key>,
+
+ /// True if the stream is waiting for outbound connection capacity
+ pub is_pending_send_capacity: bool,
+
+ /// Set to true when the send capacity has been incremented
+ pub send_capacity_inc: bool,
+
+ /// Next node in the open linked list
+ pub next_open: Option<store::Key>,
+
+ /// Set to true when the stream is pending to be opened
+ pub is_pending_open: bool,
+
+ /// Set to true when a push is pending for this stream
+ pub is_pending_push: bool,
+
+ // ===== Fields related to receiving =====
+ /// Next node in the accept linked list
+ pub next_pending_accept: Option<store::Key>,
+
+ /// Set to true when the stream is pending accept
+ pub is_pending_accept: bool,
+
+ /// Receive data flow control
+ pub recv_flow: FlowControl,
+
+ pub in_flight_recv_data: WindowSize,
+
+ /// Next node in the linked list of streams waiting to send window updates.
+ pub next_window_update: Option<store::Key>,
+
+ /// True if the stream is waiting to send a window update
+ pub is_pending_window_update: bool,
+
+ /// The time when this stream may have been locally reset.
+ pub reset_at: Option<Instant>,
+
+ /// Next node in list of reset streams that should expire eventually
+ pub next_reset_expire: Option<store::Key>,
+
+ /// Frames pending for this stream to read
+ pub pending_recv: buffer::Deque,
+
+ /// When the RecvStream drop occurs, no data should be received.
+ pub is_recv: bool,
+
+ /// Task tracking receiving frames
+ pub recv_task: Option<Waker>,
+
+ /// The stream's pending push promises
+ pub pending_push_promises: store::Queue<NextAccept>,
+
+ /// Validate content-length headers
+ pub content_length: ContentLength,
+}
+
+/// State related to validating a stream's content-length
+#[derive(Debug)]
+pub enum ContentLength {
+ Omitted,
+ Head,
+ Remaining(u64),
+}
+
+#[derive(Debug)]
+pub(super) struct NextAccept;
+
+#[derive(Debug)]
+pub(super) struct NextSend;
+
+#[derive(Debug)]
+pub(super) struct NextSendCapacity;
+
+#[derive(Debug)]
+pub(super) struct NextWindowUpdate;
+
+#[derive(Debug)]
+pub(super) struct NextOpen;
+
+#[derive(Debug)]
+pub(super) struct NextResetExpire;
+
+impl Stream {
+ pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream {
+ let mut send_flow = FlowControl::new();
+ let mut recv_flow = FlowControl::new();
+
+ recv_flow
+ .inc_window(init_recv_window)
+ .expect("invalid initial receive window");
+ // TODO: proper error handling?
+ let _res = recv_flow.assign_capacity(init_recv_window);
+ debug_assert!(_res.is_ok());
+
+ send_flow
+ .inc_window(init_send_window)
+ .expect("invalid initial send window size");
+
+ Stream {
+ id,
+ state: State::default(),
+ ref_count: 0,
+ is_counted: false,
+
+ // ===== Fields related to sending =====
+ next_pending_send: None,
+ is_pending_send: false,
+ send_flow,
+ requested_send_capacity: 0,
+ buffered_send_data: 0,
+ send_task: None,
+ pending_send: buffer::Deque::new(),
+ is_pending_send_capacity: false,
+ next_pending_send_capacity: None,
+ send_capacity_inc: false,
+ is_pending_open: false,
+ next_open: None,
+ is_pending_push: false,
+
+ // ===== Fields related to receiving =====
+ next_pending_accept: None,
+ is_pending_accept: false,
+ recv_flow,
+ in_flight_recv_data: 0,
+ next_window_update: None,
+ is_pending_window_update: false,
+ reset_at: None,
+ next_reset_expire: None,
+ pending_recv: buffer::Deque::new(),
+ is_recv: true,
+ recv_task: None,
+ pending_push_promises: store::Queue::new(),
+ content_length: ContentLength::Omitted,
+ }
+ }
+
+ /// Increment the stream's ref count
+ pub fn ref_inc(&mut self) {
+ assert!(self.ref_count < usize::MAX);
+ self.ref_count += 1;
+ }
+
+ /// Decrements the stream's ref count
+ pub fn ref_dec(&mut self) {
+ assert!(self.ref_count > 0);
+ self.ref_count -= 1;
+ }
+
+ /// Returns true if stream is currently being held for some time because of
+ /// a local reset.
+ pub fn is_pending_reset_expiration(&self) -> bool {
+ self.reset_at.is_some()
+ }
+
+ /// Returns true if frames for this stream are ready to be sent over the wire
+ pub fn is_send_ready(&self) -> bool {
+ // Why do we check pending_open?
+ //
+ // We allow users to call send_request() which schedules a stream to be pending_open
+ // if there is no room according to the concurrency limit (max_send_streams), and we
+ // also allow data to be buffered for send with send_data() if there is no capacity for
+ // the stream to send the data, which attempts to place the stream in pending_send.
+ // If the stream is not open, we don't want the stream to be scheduled for
+ // execution (pending_send). Note that if the stream is in pending_open, it will be
+ // pushed to pending_send when there is room for an open stream.
+ //
+ // In pending_push we track whether a PushPromise still needs to be sent
+ // from a different stream before we can start sending frames on this one.
+ // This is different from the "open" check because reserved streams don't count
+ // toward the concurrency limit.
+ // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2
+ !self.is_pending_open && !self.is_pending_push
+ }
+
+ /// Returns true if the stream is closed
+ pub fn is_closed(&self) -> bool {
+ // The state has fully transitioned to closed.
+ self.state.is_closed() &&
+ // Because outbound frames transition the stream state before being
+ // buffered, we have to ensure that all frames have been flushed.
+ self.pending_send.is_empty() &&
+ // Sometimes large data frames are sent out in chunks. After a chunk
+ // of the frame is sent, the remainder is pushed back onto the send
+ // queue to be rescheduled.
+ //
+ // Checking for additional buffered data lets us catch this case.
+ self.buffered_send_data == 0
+ }
+
+ /// Returns true if the stream is no longer in use
+ pub fn is_released(&self) -> bool {
+ // The stream is closed and fully flushed
+ self.is_closed() &&
+ // There are no more outstanding references to the stream
+ self.ref_count == 0 &&
+ // The stream is not in any queue
+ !self.is_pending_send && !self.is_pending_send_capacity &&
+ !self.is_pending_accept && !self.is_pending_window_update &&
+ !self.is_pending_open && self.reset_at.is_none()
+ }
+
+ /// Returns true when the consumer of the stream has dropped all handles
+ /// (indicating no further interest in the stream) and the stream state is
+ /// not actually closed.
+ ///
+ /// In this case, a reset should be sent.
+ pub fn is_canceled_interest(&self) -> bool {
+ self.ref_count == 0 && !self.state.is_closed()
+ }
+
+ /// Current available stream send capacity
+ pub fn capacity(&self, max_buffer_size: usize) -> WindowSize {
+ let available = self.send_flow.available().as_size() as usize;
+ let buffered = self.buffered_send_data;
+
+ available.min(max_buffer_size).saturating_sub(buffered) as WindowSize
+ }
+
+ pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) {
+ let prev_capacity = self.capacity(max_buffer_size);
+ debug_assert!(capacity > 0);
+ // TODO: proper error handling
+ let _res = self.send_flow.assign_capacity(capacity);
+ debug_assert!(_res.is_ok());
+
+ tracing::trace!(
+ " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
+ self.send_flow.available(),
+ self.buffered_send_data,
+ self.id,
+ max_buffer_size,
+ prev_capacity,
+ );
+
+ if prev_capacity < self.capacity(max_buffer_size) {
+ self.notify_capacity();
+ }
+ }
+
+ pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) {
+ let prev_capacity = self.capacity(max_buffer_size);
+
+ // TODO: proper error handling
+ let _res = self.send_flow.send_data(len);
+ debug_assert!(_res.is_ok());
+
+ // Decrement the stream's buffered data counter
+ debug_assert!(self.buffered_send_data >= len as usize);
+ self.buffered_send_data -= len as usize;
+ self.requested_send_capacity -= len;
+
+ tracing::trace!(
+ " sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
+ self.send_flow.available(),
+ self.buffered_send_data,
+ self.id,
+ max_buffer_size,
+ prev_capacity,
+ );
+
+ if prev_capacity < self.capacity(max_buffer_size) {
+ self.notify_capacity();
+ }
+ }
+
+ /// If the capacity was limited because of the max_send_buffer_size,
+ /// then consider waking the send task again...
+ pub fn notify_capacity(&mut self) {
+ self.send_capacity_inc = true;
+ tracing::trace!(" notifying task");
+ self.notify_send();
+ }
+
+ /// Returns `Err` when the decrement cannot be completed due to overflow.
+ pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
+ match self.content_length {
+ ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) {
+ Some(val) => *rem = val,
+ None => return Err(()),
+ },
+ ContentLength::Head => {
+ if len != 0 {
+ return Err(());
+ }
+ }
+ _ => {}
+ }
+
+ Ok(())
+ }
+
+ pub fn ensure_content_length_zero(&self) -> Result<(), ()> {
+ match self.content_length {
+ ContentLength::Remaining(0) => Ok(()),
+ ContentLength::Remaining(_) => Err(()),
+ _ => Ok(()),
+ }
+ }
+
+ pub fn notify_send(&mut self) {
+ if let Some(task) = self.send_task.take() {
+ task.wake();
+ }
+ }
+
+ pub fn wait_send(&mut self, cx: &Context) {
+ self.send_task = Some(cx.waker().clone());
+ }
+
+ pub fn notify_recv(&mut self) {
+ if let Some(task) = self.recv_task.take() {
+ task.wake();
+ }
+ }
+}
+
+impl store::Next for NextAccept {
+ fn next(stream: &Stream) -> Option<store::Key> {
+ stream.next_pending_accept
+ }
+
+ fn set_next(stream: &mut Stream, key: Option<store::Key>) {
+ stream.next_pending_accept = key;
+ }
+
+ fn take_next(stream: &mut Stream) -> Option<store::Key> {
+ stream.next_pending_accept.take()
+ }
+
+ fn is_queued(stream: &Stream) -> bool {
+ stream.is_pending_accept
+ }
+
+ fn set_queued(stream: &mut Stream, val: bool) {
+ stream.is_pending_accept = val;
+ }
+}
+
+impl store::Next for NextSend {
+ fn next(stream: &Stream) -> Option<store::Key> {
+ stream.next_pending_send
+ }
+
+ fn set_next(stream: &mut Stream, key: Option<store::Key>) {
+ stream.next_pending_send = key;
+ }
+
+ fn take_next(stream: &mut Stream) -> Option<store::Key> {
+ stream.next_pending_send.take()
+ }
+
+ fn is_queued(stream: &Stream) -> bool {
+ stream.is_pending_send
+ }
+
+ fn set_queued(stream: &mut Stream, val: bool) {
+ if val {
+ // ensure that stream is not queued for being opened
+ // if it's being put into queue for sending data
+ debug_assert!(!stream.is_pending_open);
+ }
+ stream.is_pending_send = val;
+ }
+}
+
+impl store::Next for NextSendCapacity {
+ fn next(stream: &Stream) -> Option<store::Key> {
+ stream.next_pending_send_capacity
+ }
+
+ fn set_next(stream: &mut Stream, key: Option<store::Key>) {
+ stream.next_pending_send_capacity = key;
+ }
+
+ fn take_next(stream: &mut Stream) -> Option<store::Key> {
+ stream.next_pending_send_capacity.take()
+ }
+
+ fn is_queued(stream: &Stream) -> bool {
+ stream.is_pending_send_capacity
+ }
+
+ fn set_queued(stream: &mut Stream, val: bool) {
+ stream.is_pending_send_capacity = val;
+ }
+}
+
+impl store::Next for NextWindowUpdate {
+ fn next(stream: &Stream) -> Option<store::Key> {
+ stream.next_window_update
+ }
+
+ fn set_next(stream: &mut Stream, key: Option<store::Key>) {
+ stream.next_window_update = key;
+ }
+
+ fn take_next(stream: &mut Stream) -> Option<store::Key> {
+ stream.next_window_update.take()
+ }
+
+ fn is_queued(stream: &Stream) -> bool {
+ stream.is_pending_window_update
+ }
+
+ fn set_queued(stream: &mut Stream, val: bool) {
+ stream.is_pending_window_update = val;
+ }
+}
+
+impl store::Next for NextOpen {
+ fn next(stream: &Stream) -> Option<store::Key> {
+ stream.next_open
+ }
+
+ fn set_next(stream: &mut Stream, key: Option<store::Key>) {
+ stream.next_open = key;
+ }
+
+ fn take_next(stream: &mut Stream) -> Option<store::Key> {
+ stream.next_open.take()
+ }
+
+ fn is_queued(stream: &Stream) -> bool {
+ stream.is_pending_open
+ }
+
+ fn set_queued(stream: &mut Stream, val: bool) {
+ if val {
+ // ensure that stream is not queued for being sent
+ // if it's being put into queue for opening the stream
+ debug_assert!(!stream.is_pending_send);
+ }
+ stream.is_pending_open = val;
+ }
+}
+
+impl store::Next for NextResetExpire {
+ fn next(stream: &Stream) -> Option<store::Key> {
+ stream.next_reset_expire
+ }
+
+ fn set_next(stream: &mut Stream, key: Option<store::Key>) {
+ stream.next_reset_expire = key;
+ }
+
+ fn take_next(stream: &mut Stream) -> Option<store::Key> {
+ stream.next_reset_expire.take()
+ }
+
+ fn is_queued(stream: &Stream) -> bool {
+ stream.reset_at.is_some()
+ }
+
+ fn set_queued(stream: &mut Stream, val: bool) {
+ if val {
+ stream.reset_at = Some(Instant::now());
+ } else {
+ stream.reset_at = None;
+ }
+ }
+}
+
+// ===== impl ContentLength =====
+
+impl ContentLength {
+ pub fn is_head(&self) -> bool {
+ matches!(*self, Self::Head)
+ }
+}
diff --git a/third_party/rust/h2/src/proto/streams/streams.rs b/third_party/rust/h2/src/proto/streams/streams.rs
new file mode 100644
index 0000000000..274bf45538
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/streams.rs
@@ -0,0 +1,1594 @@
+use super::recv::RecvHeaderBlockError;
+use super::store::{self, Entry, Resolve, Store};
+use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId};
+use crate::codec::{Codec, SendError, UserError};
+use crate::ext::Protocol;
+use crate::frame::{self, Frame, Reason};
+use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize};
+use crate::{client, proto, server};
+
+use bytes::{Buf, Bytes};
+use http::{HeaderMap, Request, Response};
+use std::task::{Context, Poll, Waker};
+use tokio::io::AsyncWrite;
+
+use std::sync::{Arc, Mutex};
+use std::{fmt, io};
+
+#[derive(Debug)]
+pub(crate) struct Streams<B, P>
+where
+ P: Peer,
+{
+ /// Holds most of the connection and stream related state for processing
+ /// HTTP/2 frames associated with streams.
+ inner: Arc<Mutex<Inner>>,
+
+ /// This is the queue of frames to be written to the wire. This is split out
+ /// to avoid requiring a `B` generic on all public API types even if `B` is
+ /// not technically required.
+ ///
+ /// Currently, splitting this out requires a second `Arc` + `Mutex`.
+ /// However, it should be possible to avoid this duplication with a little
+ /// bit of unsafe code. This optimization has been postponed until it has
+ /// been shown to be necessary.
+ send_buffer: Arc<SendBuffer<B>>,
+
+ _p: ::std::marker::PhantomData<P>,
+}
+
+// Like `Streams` but with a `peer::Dyn` field instead of a static `P: Peer` type parameter.
+// Ensures that the methods only get one instantiation, instead of two (client and server)
+#[derive(Debug)]
+pub(crate) struct DynStreams<'a, B> {
+ inner: &'a Mutex<Inner>,
+
+ send_buffer: &'a SendBuffer<B>,
+
+ peer: peer::Dyn,
+}
+
+/// Reference to the stream state
+#[derive(Debug)]
+pub(crate) struct StreamRef<B> {
+ opaque: OpaqueStreamRef,
+ send_buffer: Arc<SendBuffer<B>>,
+}
+
+/// Reference to the stream state that hides the send data chunk generic
+pub(crate) struct OpaqueStreamRef {
+ inner: Arc<Mutex<Inner>>,
+ key: store::Key,
+}
+
+/// Fields needed to manage state related to managing the set of streams. This
+/// is mostly split out to make ownership happy.
+///
+/// TODO: better name
+#[derive(Debug)]
+struct Inner {
+ /// Tracks send & recv stream concurrency.
+ counts: Counts,
+
+ /// Connection level state and performs actions on streams
+ actions: Actions,
+
+ /// Stores stream state
+ store: Store,
+
+ /// The number of stream refs to this shared state.
+ refs: usize,
+}
+
+#[derive(Debug)]
+struct Actions {
+ /// Manages state transitions initiated by receiving frames
+ recv: Recv,
+
+ /// Manages state transitions initiated by sending frames
+ send: Send,
+
+ /// Task that calls `poll_complete`.
+ task: Option<Waker>,
+
+ /// If the connection errors, a copy is kept for any StreamRefs.
+ conn_error: Option<proto::Error>,
+}
+
+/// Contains the buffer of frames to be written to the wire.
+#[derive(Debug)]
+struct SendBuffer<B> {
+ inner: Mutex<Buffer<Frame<B>>>,
+}
+
+// ===== impl Streams =====
+
+impl<B, P> Streams<B, P>
+where
+ B: Buf,
+ P: Peer,
+{
+ pub fn new(config: Config) -> Self {
+ let peer = P::r#dyn();
+
+ Streams {
+ inner: Inner::new(peer, config),
+ send_buffer: Arc::new(SendBuffer::new()),
+ _p: ::std::marker::PhantomData,
+ }
+ }
+
+ pub fn set_target_connection_window_size(&mut self, size: WindowSize) -> Result<(), Reason> {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+
+ me.actions
+ .recv
+ .set_target_connection_window(size, &mut me.actions.task)
+ }
+
+ pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+ me.actions.recv.next_incoming(&mut me.store).map(|key| {
+ let stream = &mut me.store.resolve(key);
+ tracing::trace!(
+ "next_incoming; id={:?}, state={:?}",
+ stream.id,
+ stream.state
+ );
+ // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
+ // the lock, so it can't.
+ me.refs += 1;
+
+ // Pending-accepted remotely-reset streams are counted.
+ if stream.state.is_remote_reset() {
+ me.counts.dec_num_remote_reset_streams();
+ }
+
+ StreamRef {
+ opaque: OpaqueStreamRef::new(self.inner.clone(), stream),
+ send_buffer: self.send_buffer.clone(),
+ }
+ })
+ }
+
+ pub fn send_pending_refusal<T>(
+ &mut self,
+ cx: &mut Context,
+ dst: &mut Codec<T, Prioritized<B>>,
+ ) -> Poll<io::Result<()>>
+ where
+ T: AsyncWrite + Unpin,
+ {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+ me.actions.recv.send_pending_refusal(cx, dst)
+ }
+
+ pub fn clear_expired_reset_streams(&mut self) {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+ me.actions
+ .recv
+ .clear_expired_reset_streams(&mut me.store, &mut me.counts);
+ }
+
+ pub fn poll_complete<T>(
+ &mut self,
+ cx: &mut Context,
+ dst: &mut Codec<T, Prioritized<B>>,
+ ) -> Poll<io::Result<()>>
+ where
+ T: AsyncWrite + Unpin,
+ {
+ let mut me = self.inner.lock().unwrap();
+ me.poll_complete(&self.send_buffer, cx, dst)
+ }
+
+ pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut send_buffer = self.send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ me.counts.apply_remote_settings(frame);
+
+ me.actions.send.apply_remote_settings(
+ frame,
+ send_buffer,
+ &mut me.store,
+ &mut me.counts,
+ &mut me.actions.task,
+ )
+ }
+
+ pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+
+ me.actions.recv.apply_local_settings(frame, &mut me.store)
+ }
+
+ pub fn send_request(
+ &mut self,
+ mut request: Request<()>,
+ end_of_stream: bool,
+ pending: Option<&OpaqueStreamRef>,
+ ) -> Result<(StreamRef<B>, bool), SendError> {
+ use super::stream::ContentLength;
+ use http::Method;
+
+ let protocol = request.extensions_mut().remove::<Protocol>();
+
+ // Clear before taking lock, incase extensions contain a StreamRef.
+ request.extensions_mut().clear();
+
+ // TODO: There is a hazard with assigning a stream ID before the
+ // prioritize layer. If prioritization reorders new streams, this
+ // implicitly closes the earlier stream IDs.
+ //
+ // See: hyperium/h2#11
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut send_buffer = self.send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ me.actions.ensure_no_conn_error()?;
+ me.actions.send.ensure_next_stream_id()?;
+
+ // The `pending` argument is provided by the `Client`, and holds
+ // a store `Key` of a `Stream` that may have been not been opened
+ // yet.
+ //
+ // If that stream is still pending, the Client isn't allowed to
+ // queue up another pending stream. They should use `poll_ready`.
+ if let Some(stream) = pending {
+ if me.store.resolve(stream.key).is_pending_open {
+ return Err(UserError::Rejected.into());
+ }
+ }
+
+ if me.counts.peer().is_server() {
+ // Servers cannot open streams. PushPromise must first be reserved.
+ return Err(UserError::UnexpectedFrameType.into());
+ }
+
+ let stream_id = me.actions.send.open()?;
+
+ let mut stream = Stream::new(
+ stream_id,
+ me.actions.send.init_window_sz(),
+ me.actions.recv.init_window_sz(),
+ );
+
+ if *request.method() == Method::HEAD {
+ stream.content_length = ContentLength::Head;
+ }
+
+ // Convert the message
+ let headers =
+ client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?;
+
+ let mut stream = me.store.insert(stream.id, stream);
+
+ let sent = me.actions.send.send_headers(
+ headers,
+ send_buffer,
+ &mut stream,
+ &mut me.counts,
+ &mut me.actions.task,
+ );
+
+ // send_headers can return a UserError, if it does,
+ // we should forget about this stream.
+ if let Err(err) = sent {
+ stream.unlink();
+ stream.remove();
+ return Err(err.into());
+ }
+
+ // Given that the stream has been initialized, it should not be in the
+ // closed state.
+ debug_assert!(!stream.state.is_closed());
+
+ // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
+ // the lock, so it can't.
+ me.refs += 1;
+
+ let is_full = me.counts.next_send_stream_will_reach_capacity();
+ Ok((
+ StreamRef {
+ opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
+ send_buffer: self.send_buffer.clone(),
+ },
+ is_full,
+ ))
+ }
+
+ pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
+ self.inner
+ .lock()
+ .unwrap()
+ .actions
+ .send
+ .is_extended_connect_protocol_enabled()
+ }
+}
+
+impl<B> DynStreams<'_, B> {
+ pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> {
+ let mut me = self.inner.lock().unwrap();
+
+ me.recv_headers(self.peer, self.send_buffer, frame)
+ }
+
+ pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), Error> {
+ let mut me = self.inner.lock().unwrap();
+ me.recv_data(self.peer, self.send_buffer, frame)
+ }
+
+ pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error> {
+ let mut me = self.inner.lock().unwrap();
+
+ me.recv_reset(self.send_buffer, frame)
+ }
+
+ /// Notify all streams that a connection-level error happened.
+ pub fn handle_error(&mut self, err: proto::Error) -> StreamId {
+ let mut me = self.inner.lock().unwrap();
+ me.handle_error(self.send_buffer, err)
+ }
+
+ pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error> {
+ let mut me = self.inner.lock().unwrap();
+ me.recv_go_away(self.send_buffer, frame)
+ }
+
+ pub fn last_processed_id(&self) -> StreamId {
+ self.inner.lock().unwrap().actions.recv.last_processed_id()
+ }
+
+ pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error> {
+ let mut me = self.inner.lock().unwrap();
+ me.recv_window_update(self.send_buffer, frame)
+ }
+
+ pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error> {
+ let mut me = self.inner.lock().unwrap();
+ me.recv_push_promise(self.send_buffer, frame)
+ }
+
+ pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
+ let mut me = self.inner.lock().map_err(|_| ())?;
+ me.recv_eof(self.send_buffer, clear_pending_accept)
+ }
+
+ pub fn send_reset(&mut self, id: StreamId, reason: Reason) {
+ let mut me = self.inner.lock().unwrap();
+ me.send_reset(self.send_buffer, id, reason)
+ }
+
+ pub fn send_go_away(&mut self, last_processed_id: StreamId) {
+ let mut me = self.inner.lock().unwrap();
+ me.actions.recv.go_away(last_processed_id);
+ }
+}
+
+impl Inner {
+ fn new(peer: peer::Dyn, config: Config) -> Arc<Mutex<Self>> {
+ Arc::new(Mutex::new(Inner {
+ counts: Counts::new(peer, &config),
+ actions: Actions {
+ recv: Recv::new(peer, &config),
+ send: Send::new(&config),
+ task: None,
+ conn_error: None,
+ },
+ store: Store::new(),
+ refs: 1,
+ }))
+ }
+
+ fn recv_headers<B>(
+ &mut self,
+ peer: peer::Dyn,
+ send_buffer: &SendBuffer<B>,
+ frame: frame::Headers,
+ ) -> Result<(), Error> {
+ let id = frame.stream_id();
+
+ // The GOAWAY process has begun. All streams with a greater ID than
+ // specified as part of GOAWAY should be ignored.
+ if id > self.actions.recv.max_stream_id() {
+ tracing::trace!(
+ "id ({:?}) > max_stream_id ({:?}), ignoring HEADERS",
+ id,
+ self.actions.recv.max_stream_id()
+ );
+ return Ok(());
+ }
+
+ let key = match self.store.find_entry(id) {
+ Entry::Occupied(e) => e.key(),
+ Entry::Vacant(e) => {
+ // Client: it's possible to send a request, and then send
+ // a RST_STREAM while the response HEADERS were in transit.
+ //
+ // Server: we can't reset a stream before having received
+ // the request headers, so don't allow.
+ if !peer.is_server() {
+ // This may be response headers for a stream we've already
+ // forgotten about...
+ if self.actions.may_have_forgotten_stream(peer, id) {
+ tracing::debug!(
+ "recv_headers for old stream={:?}, sending STREAM_CLOSED",
+ id,
+ );
+ return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
+ }
+ }
+
+ match self
+ .actions
+ .recv
+ .open(id, Open::Headers, &mut self.counts)?
+ {
+ Some(stream_id) => {
+ let stream = Stream::new(
+ stream_id,
+ self.actions.send.init_window_sz(),
+ self.actions.recv.init_window_sz(),
+ );
+
+ e.insert(stream)
+ }
+ None => return Ok(()),
+ }
+ }
+ };
+
+ let stream = self.store.resolve(key);
+
+ if stream.state.is_local_error() {
+ // Locally reset streams must ignore frames "for some time".
+ // This is because the remote may have sent trailers before
+ // receiving the RST_STREAM frame.
+ tracing::trace!("recv_headers; ignoring trailers on {:?}", stream.id);
+ return Ok(());
+ }
+
+ let actions = &mut self.actions;
+ let mut send_buffer = send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ self.counts.transition(stream, |counts, stream| {
+ tracing::trace!(
+ "recv_headers; stream={:?}; state={:?}",
+ stream.id,
+ stream.state
+ );
+
+ let res = if stream.state.is_recv_headers() {
+ match actions.recv.recv_headers(frame, stream, counts) {
+ Ok(()) => Ok(()),
+ Err(RecvHeaderBlockError::Oversize(resp)) => {
+ if let Some(resp) = resp {
+ let sent = actions.send.send_headers(
+ resp, send_buffer, stream, counts, &mut actions.task);
+ debug_assert!(sent.is_ok(), "oversize response should not fail");
+
+ actions.send.schedule_implicit_reset(
+ stream,
+ Reason::REFUSED_STREAM,
+ counts,
+ &mut actions.task);
+
+ actions.recv.enqueue_reset_expiration(stream, counts);
+
+ Ok(())
+ } else {
+ Err(Error::library_reset(stream.id, Reason::REFUSED_STREAM))
+ }
+ },
+ Err(RecvHeaderBlockError::State(err)) => Err(err),
+ }
+ } else {
+ if !frame.is_end_stream() {
+ // Receiving trailers that don't set EOS is a "malformed"
+ // message. Malformed messages are a stream error.
+ proto_err!(stream: "recv_headers: trailers frame was not EOS; stream={:?}", stream.id);
+ return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
+ }
+
+ actions.recv.recv_trailers(frame, stream)
+ };
+
+ actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
+ })
+ }
+
+ fn recv_data<B>(
+ &mut self,
+ peer: peer::Dyn,
+ send_buffer: &SendBuffer<B>,
+ frame: frame::Data,
+ ) -> Result<(), Error> {
+ let id = frame.stream_id();
+
+ let stream = match self.store.find_mut(&id) {
+ Some(stream) => stream,
+ None => {
+ // The GOAWAY process has begun. All streams with a greater ID
+ // than specified as part of GOAWAY should be ignored.
+ if id > self.actions.recv.max_stream_id() {
+ tracing::trace!(
+ "id ({:?}) > max_stream_id ({:?}), ignoring DATA",
+ id,
+ self.actions.recv.max_stream_id()
+ );
+ return Ok(());
+ }
+
+ if self.actions.may_have_forgotten_stream(peer, id) {
+ tracing::debug!("recv_data for old stream={:?}, sending STREAM_CLOSED", id,);
+
+ let sz = frame.payload().len();
+ // This should have been enforced at the codec::FramedRead layer, so
+ // this is just a sanity check.
+ assert!(sz <= super::MAX_WINDOW_SIZE as usize);
+ let sz = sz as WindowSize;
+
+ self.actions.recv.ignore_data(sz)?;
+ return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
+ }
+
+ proto_err!(conn: "recv_data: stream not found; id={:?}", id);
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+ };
+
+ let actions = &mut self.actions;
+ let mut send_buffer = send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ self.counts.transition(stream, |counts, stream| {
+ let sz = frame.payload().len();
+ let res = actions.recv.recv_data(frame, stream);
+
+ // Any stream error after receiving a DATA frame means
+ // we won't give the data to the user, and so they can't
+ // release the capacity. We do it automatically.
+ if let Err(Error::Reset(..)) = res {
+ actions
+ .recv
+ .release_connection_capacity(sz as WindowSize, &mut None);
+ }
+ actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
+ })
+ }
+
+ fn recv_reset<B>(
+ &mut self,
+ send_buffer: &SendBuffer<B>,
+ frame: frame::Reset,
+ ) -> Result<(), Error> {
+ let id = frame.stream_id();
+
+ if id.is_zero() {
+ proto_err!(conn: "recv_reset: invalid stream ID 0");
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+
+ // The GOAWAY process has begun. All streams with a greater ID than
+ // specified as part of GOAWAY should be ignored.
+ if id > self.actions.recv.max_stream_id() {
+ tracing::trace!(
+ "id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM",
+ id,
+ self.actions.recv.max_stream_id()
+ );
+ return Ok(());
+ }
+
+ let stream = match self.store.find_mut(&id) {
+ Some(stream) => stream,
+ None => {
+ // TODO: Are there other error cases?
+ self.actions
+ .ensure_not_idle(self.counts.peer(), id)
+ .map_err(Error::library_go_away)?;
+
+ return Ok(());
+ }
+ };
+
+ let mut send_buffer = send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ let actions = &mut self.actions;
+
+ self.counts.transition(stream, |counts, stream| {
+ actions.recv.recv_reset(frame, stream, counts)?;
+ actions.send.handle_error(send_buffer, stream, counts);
+ assert!(stream.state.is_closed());
+ Ok(())
+ })
+ }
+
+ fn recv_window_update<B>(
+ &mut self,
+ send_buffer: &SendBuffer<B>,
+ frame: frame::WindowUpdate,
+ ) -> Result<(), Error> {
+ let id = frame.stream_id();
+
+ let mut send_buffer = send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ if id.is_zero() {
+ self.actions
+ .send
+ .recv_connection_window_update(frame, &mut self.store, &mut self.counts)
+ .map_err(Error::library_go_away)?;
+ } else {
+ // The remote may send window updates for streams that the local now
+ // considers closed. It's ok...
+ if let Some(mut stream) = self.store.find_mut(&id) {
+ // This result is ignored as there is nothing to do when there
+ // is an error. The stream is reset by the function on error and
+ // the error is informational.
+ let _ = self.actions.send.recv_stream_window_update(
+ frame.size_increment(),
+ send_buffer,
+ &mut stream,
+ &mut self.counts,
+ &mut self.actions.task,
+ );
+ } else {
+ self.actions
+ .ensure_not_idle(self.counts.peer(), id)
+ .map_err(Error::library_go_away)?;
+ }
+ }
+
+ Ok(())
+ }
+
+ fn handle_error<B>(&mut self, send_buffer: &SendBuffer<B>, err: proto::Error) -> StreamId {
+ let actions = &mut self.actions;
+ let counts = &mut self.counts;
+ let mut send_buffer = send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ let last_processed_id = actions.recv.last_processed_id();
+
+ self.store.for_each(|stream| {
+ counts.transition(stream, |counts, stream| {
+ actions.recv.handle_error(&err, &mut *stream);
+ actions.send.handle_error(send_buffer, stream, counts);
+ })
+ });
+
+ actions.conn_error = Some(err);
+
+ last_processed_id
+ }
+
+ fn recv_go_away<B>(
+ &mut self,
+ send_buffer: &SendBuffer<B>,
+ frame: &frame::GoAway,
+ ) -> Result<(), Error> {
+ let actions = &mut self.actions;
+ let counts = &mut self.counts;
+ let mut send_buffer = send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ let last_stream_id = frame.last_stream_id();
+
+ actions.send.recv_go_away(last_stream_id)?;
+
+ let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason());
+
+ self.store.for_each(|stream| {
+ if stream.id > last_stream_id {
+ counts.transition(stream, |counts, stream| {
+ actions.recv.handle_error(&err, &mut *stream);
+ actions.send.handle_error(send_buffer, stream, counts);
+ })
+ }
+ });
+
+ actions.conn_error = Some(err);
+
+ Ok(())
+ }
+
+ fn recv_push_promise<B>(
+ &mut self,
+ send_buffer: &SendBuffer<B>,
+ frame: frame::PushPromise,
+ ) -> Result<(), Error> {
+ let id = frame.stream_id();
+ let promised_id = frame.promised_id();
+
+ // First, ensure that the initiating stream is still in a valid state.
+ let parent_key = match self.store.find_mut(&id) {
+ Some(stream) => {
+ // The GOAWAY process has begun. All streams with a greater ID
+ // than specified as part of GOAWAY should be ignored.
+ if id > self.actions.recv.max_stream_id() {
+ tracing::trace!(
+ "id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE",
+ id,
+ self.actions.recv.max_stream_id()
+ );
+ return Ok(());
+ }
+
+ // The stream must be receive open
+ if !stream.state.ensure_recv_open()? {
+ proto_err!(conn: "recv_push_promise: initiating stream is not opened");
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+
+ stream.key()
+ }
+ None => {
+ proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state");
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+ };
+
+ // TODO: Streams in the reserved states do not count towards the concurrency
+ // limit. However, it seems like there should be a cap otherwise this
+ // could grow in memory indefinitely.
+
+ // Ensure that we can reserve streams
+ self.actions.recv.ensure_can_reserve()?;
+
+ // Next, open the stream.
+ //
+ // If `None` is returned, then the stream is being refused. There is no
+ // further work to be done.
+ if self
+ .actions
+ .recv
+ .open(promised_id, Open::PushPromise, &mut self.counts)?
+ .is_none()
+ {
+ return Ok(());
+ }
+
+ // Try to handle the frame and create a corresponding key for the pushed stream
+ // this requires a bit of indirection to make the borrow checker happy.
+ let child_key: Option<store::Key> = {
+ // Create state for the stream
+ let stream = self.store.insert(promised_id, {
+ Stream::new(
+ promised_id,
+ self.actions.send.init_window_sz(),
+ self.actions.recv.init_window_sz(),
+ )
+ });
+
+ let actions = &mut self.actions;
+
+ self.counts.transition(stream, |counts, stream| {
+ let stream_valid = actions.recv.recv_push_promise(frame, stream);
+
+ match stream_valid {
+ Ok(()) => Ok(Some(stream.key())),
+ _ => {
+ let mut send_buffer = send_buffer.inner.lock().unwrap();
+ actions
+ .reset_on_recv_stream_err(
+ &mut *send_buffer,
+ stream,
+ counts,
+ stream_valid,
+ )
+ .map(|()| None)
+ }
+ }
+ })?
+ };
+ // If we're successful, push the headers and stream...
+ if let Some(child) = child_key {
+ let mut ppp = self.store[parent_key].pending_push_promises.take();
+ ppp.push(&mut self.store.resolve(child));
+
+ let parent = &mut self.store.resolve(parent_key);
+ parent.pending_push_promises = ppp;
+ parent.notify_recv();
+ };
+
+ Ok(())
+ }
+
+ fn recv_eof<B>(
+ &mut self,
+ send_buffer: &SendBuffer<B>,
+ clear_pending_accept: bool,
+ ) -> Result<(), ()> {
+ let actions = &mut self.actions;
+ let counts = &mut self.counts;
+ let mut send_buffer = send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ if actions.conn_error.is_none() {
+ actions.conn_error = Some(
+ io::Error::new(
+ io::ErrorKind::BrokenPipe,
+ "connection closed because of a broken pipe",
+ )
+ .into(),
+ );
+ }
+
+ tracing::trace!("Streams::recv_eof");
+
+ self.store.for_each(|stream| {
+ counts.transition(stream, |counts, stream| {
+ actions.recv.recv_eof(stream);
+
+ // This handles resetting send state associated with the
+ // stream
+ actions.send.handle_error(send_buffer, stream, counts);
+ })
+ });
+
+ actions.clear_queues(clear_pending_accept, &mut self.store, counts);
+ Ok(())
+ }
+
+ fn poll_complete<T, B>(
+ &mut self,
+ send_buffer: &SendBuffer<B>,
+ cx: &mut Context,
+ dst: &mut Codec<T, Prioritized<B>>,
+ ) -> Poll<io::Result<()>>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ {
+ let mut send_buffer = send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ // Send WINDOW_UPDATE frames first
+ //
+ // TODO: It would probably be better to interleave updates w/ data
+ // frames.
+ ready!(self
+ .actions
+ .recv
+ .poll_complete(cx, &mut self.store, &mut self.counts, dst))?;
+
+ // Send any other pending frames
+ ready!(self.actions.send.poll_complete(
+ cx,
+ send_buffer,
+ &mut self.store,
+ &mut self.counts,
+ dst
+ ))?;
+
+ // Nothing else to do, track the task
+ self.actions.task = Some(cx.waker().clone());
+
+ Poll::Ready(Ok(()))
+ }
+
+ fn send_reset<B>(&mut self, send_buffer: &SendBuffer<B>, id: StreamId, reason: Reason) {
+ let key = match self.store.find_entry(id) {
+ Entry::Occupied(e) => e.key(),
+ Entry::Vacant(e) => {
+ // Resetting a stream we don't know about? That could be OK...
+ //
+ // 1. As a server, we just received a request, but that request
+ // was bad, so we're resetting before even accepting it.
+ // This is totally fine.
+ //
+ // 2. The remote may have sent us a frame on new stream that
+ // it's *not* supposed to have done, and thus, we don't know
+ // the stream. In that case, sending a reset will "open" the
+ // stream in our store. Maybe that should be a connection
+ // error instead? At least for now, we need to update what
+ // our vision of the next stream is.
+ if self.counts.peer().is_local_init(id) {
+ // We normally would open this stream, so update our
+ // next-send-id record.
+ self.actions.send.maybe_reset_next_stream_id(id);
+ } else {
+ // We normally would recv this stream, so update our
+ // next-recv-id record.
+ self.actions.recv.maybe_reset_next_stream_id(id);
+ }
+
+ let stream = Stream::new(id, 0, 0);
+
+ e.insert(stream)
+ }
+ };
+
+ let stream = self.store.resolve(key);
+ let mut send_buffer = send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+ self.actions.send_reset(
+ stream,
+ reason,
+ Initiator::Library,
+ &mut self.counts,
+ send_buffer,
+ );
+ }
+}
+
+impl<B> Streams<B, client::Peer>
+where
+ B: Buf,
+{
+ pub fn poll_pending_open(
+ &mut self,
+ cx: &Context,
+ pending: Option<&OpaqueStreamRef>,
+ ) -> Poll<Result<(), crate::Error>> {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+
+ me.actions.ensure_no_conn_error()?;
+ me.actions.send.ensure_next_stream_id()?;
+
+ if let Some(pending) = pending {
+ let mut stream = me.store.resolve(pending.key);
+ tracing::trace!("poll_pending_open; stream = {:?}", stream.is_pending_open);
+ if stream.is_pending_open {
+ stream.wait_send(cx);
+ return Poll::Pending;
+ }
+ }
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl<B, P> Streams<B, P>
+where
+ P: Peer,
+{
+ pub fn as_dyn(&self) -> DynStreams<B> {
+ let Self {
+ inner,
+ send_buffer,
+ _p,
+ } = self;
+ DynStreams {
+ inner,
+ send_buffer,
+ peer: P::r#dyn(),
+ }
+ }
+
+ /// This function is safe to call multiple times.
+ ///
+ /// A `Result` is returned to avoid panicking if the mutex is poisoned.
+ pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
+ self.as_dyn().recv_eof(clear_pending_accept)
+ }
+
+ pub(crate) fn max_send_streams(&self) -> usize {
+ self.inner.lock().unwrap().counts.max_send_streams()
+ }
+
+ pub(crate) fn max_recv_streams(&self) -> usize {
+ self.inner.lock().unwrap().counts.max_recv_streams()
+ }
+
+ #[cfg(feature = "unstable")]
+ pub fn num_active_streams(&self) -> usize {
+ let me = self.inner.lock().unwrap();
+ me.store.num_active_streams()
+ }
+
+ pub fn has_streams(&self) -> bool {
+ let me = self.inner.lock().unwrap();
+ me.counts.has_streams()
+ }
+
+ pub fn has_streams_or_other_references(&self) -> bool {
+ let me = self.inner.lock().unwrap();
+ me.counts.has_streams() || me.refs > 1
+ }
+
+ #[cfg(feature = "unstable")]
+ pub fn num_wired_streams(&self) -> usize {
+ let me = self.inner.lock().unwrap();
+ me.store.num_wired_streams()
+ }
+}
+
+// no derive because we don't need B and P to be Clone.
+impl<B, P> Clone for Streams<B, P>
+where
+ P: Peer,
+{
+ fn clone(&self) -> Self {
+ self.inner.lock().unwrap().refs += 1;
+ Streams {
+ inner: self.inner.clone(),
+ send_buffer: self.send_buffer.clone(),
+ _p: ::std::marker::PhantomData,
+ }
+ }
+}
+
+impl<B, P> Drop for Streams<B, P>
+where
+ P: Peer,
+{
+ fn drop(&mut self) {
+ if let Ok(mut inner) = self.inner.lock() {
+ inner.refs -= 1;
+ if inner.refs == 1 {
+ if let Some(task) = inner.actions.task.take() {
+ task.wake();
+ }
+ }
+ }
+ }
+}
+
+// ===== impl StreamRef =====
+
+impl<B> StreamRef<B> {
+ pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError>
+ where
+ B: Buf,
+ {
+ let mut me = self.opaque.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let stream = me.store.resolve(self.opaque.key);
+ let actions = &mut me.actions;
+ let mut send_buffer = self.send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ me.counts.transition(stream, |counts, stream| {
+ // Create the data frame
+ let mut frame = frame::Data::new(stream.id, data);
+ frame.set_end_stream(end_stream);
+
+ // Send the data frame
+ actions
+ .send
+ .send_data(frame, send_buffer, stream, counts, &mut actions.task)
+ })
+ }
+
+ pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError> {
+ let mut me = self.opaque.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let stream = me.store.resolve(self.opaque.key);
+ let actions = &mut me.actions;
+ let mut send_buffer = self.send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ me.counts.transition(stream, |counts, stream| {
+ // Create the trailers frame
+ let frame = frame::Headers::trailers(stream.id, trailers);
+
+ // Send the trailers frame
+ actions
+ .send
+ .send_trailers(frame, send_buffer, stream, counts, &mut actions.task)
+ })
+ }
+
+ pub fn send_reset(&mut self, reason: Reason) {
+ let mut me = self.opaque.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let stream = me.store.resolve(self.opaque.key);
+ let mut send_buffer = self.send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ me.actions
+ .send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer);
+ }
+
+ pub fn send_response(
+ &mut self,
+ mut response: Response<()>,
+ end_of_stream: bool,
+ ) -> Result<(), UserError> {
+ // Clear before taking lock, incase extensions contain a StreamRef.
+ response.extensions_mut().clear();
+ let mut me = self.opaque.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let stream = me.store.resolve(self.opaque.key);
+ let actions = &mut me.actions;
+ let mut send_buffer = self.send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ me.counts.transition(stream, |counts, stream| {
+ let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream);
+
+ actions
+ .send
+ .send_headers(frame, send_buffer, stream, counts, &mut actions.task)
+ })
+ }
+
+ pub fn send_push_promise(
+ &mut self,
+ mut request: Request<()>,
+ ) -> Result<StreamRef<B>, UserError> {
+ // Clear before taking lock, incase extensions contain a StreamRef.
+ request.extensions_mut().clear();
+ let mut me = self.opaque.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut send_buffer = self.send_buffer.inner.lock().unwrap();
+ let send_buffer = &mut *send_buffer;
+
+ let actions = &mut me.actions;
+ let promised_id = actions.send.reserve_local()?;
+
+ let child_key = {
+ let mut child_stream = me.store.insert(
+ promised_id,
+ Stream::new(
+ promised_id,
+ actions.send.init_window_sz(),
+ actions.recv.init_window_sz(),
+ ),
+ );
+ child_stream.state.reserve_local()?;
+ child_stream.is_pending_push = true;
+ child_stream.key()
+ };
+
+ let pushed = {
+ let mut stream = me.store.resolve(self.opaque.key);
+
+ let frame = crate::server::Peer::convert_push_message(stream.id, promised_id, request)?;
+
+ actions
+ .send
+ .send_push_promise(frame, send_buffer, &mut stream, &mut actions.task)
+ };
+
+ if let Err(err) = pushed {
+ let mut child_stream = me.store.resolve(child_key);
+ child_stream.unlink();
+ child_stream.remove();
+ return Err(err);
+ }
+
+ me.refs += 1;
+ let opaque =
+ OpaqueStreamRef::new(self.opaque.inner.clone(), &mut me.store.resolve(child_key));
+
+ Ok(StreamRef {
+ opaque,
+ send_buffer: self.send_buffer.clone(),
+ })
+ }
+
+ /// Called by the server after the stream is accepted. Given that clients
+ /// initialize streams by sending HEADERS, the request will always be
+ /// available.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the request isn't present.
+ pub fn take_request(&self) -> Request<()> {
+ let mut me = self.opaque.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut stream = me.store.resolve(self.opaque.key);
+ me.actions.recv.take_request(&mut stream)
+ }
+
+ /// Called by a client to see if the current stream is pending open
+ pub fn is_pending_open(&self) -> bool {
+ let mut me = self.opaque.inner.lock().unwrap();
+ me.store.resolve(self.opaque.key).is_pending_open
+ }
+
+ /// Request capacity to send data
+ pub fn reserve_capacity(&mut self, capacity: WindowSize) {
+ let mut me = self.opaque.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut stream = me.store.resolve(self.opaque.key);
+
+ me.actions
+ .send
+ .reserve_capacity(capacity, &mut stream, &mut me.counts)
+ }
+
+ /// Returns the stream's current send capacity.
+ pub fn capacity(&self) -> WindowSize {
+ let mut me = self.opaque.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut stream = me.store.resolve(self.opaque.key);
+
+ me.actions.send.capacity(&mut stream)
+ }
+
+ /// Request to be notified when the stream's capacity increases
+ pub fn poll_capacity(&mut self, cx: &Context) -> Poll<Option<Result<WindowSize, UserError>>> {
+ let mut me = self.opaque.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut stream = me.store.resolve(self.opaque.key);
+
+ me.actions.send.poll_capacity(cx, &mut stream)
+ }
+
+ /// Request to be notified for if a `RST_STREAM` is received for this stream.
+ pub(crate) fn poll_reset(
+ &mut self,
+ cx: &Context,
+ mode: proto::PollReset,
+ ) -> Poll<Result<Reason, crate::Error>> {
+ let mut me = self.opaque.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut stream = me.store.resolve(self.opaque.key);
+
+ me.actions
+ .send
+ .poll_reset(cx, &mut stream, mode)
+ .map_err(From::from)
+ }
+
+ pub fn clone_to_opaque(&self) -> OpaqueStreamRef {
+ self.opaque.clone()
+ }
+
+ pub fn stream_id(&self) -> StreamId {
+ self.opaque.stream_id()
+ }
+}
+
+impl<B> Clone for StreamRef<B> {
+ fn clone(&self) -> Self {
+ StreamRef {
+ opaque: self.opaque.clone(),
+ send_buffer: self.send_buffer.clone(),
+ }
+ }
+}
+
+// ===== impl OpaqueStreamRef =====
+
+impl OpaqueStreamRef {
+ fn new(inner: Arc<Mutex<Inner>>, stream: &mut store::Ptr) -> OpaqueStreamRef {
+ stream.ref_inc();
+ OpaqueStreamRef {
+ inner,
+ key: stream.key(),
+ }
+ }
+ /// Called by a client to check for a received response.
+ pub fn poll_response(&mut self, cx: &Context) -> Poll<Result<Response<()>, proto::Error>> {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut stream = me.store.resolve(self.key);
+
+ me.actions.recv.poll_response(cx, &mut stream)
+ }
+ /// Called by a client to check for a pushed request.
+ pub fn poll_pushed(
+ &mut self,
+ cx: &Context,
+ ) -> Poll<Option<Result<(Request<()>, OpaqueStreamRef), proto::Error>>> {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut stream = me.store.resolve(self.key);
+ me.actions
+ .recv
+ .poll_pushed(cx, &mut stream)
+ .map_ok(|(h, key)| {
+ me.refs += 1;
+ let opaque_ref =
+ OpaqueStreamRef::new(self.inner.clone(), &mut me.store.resolve(key));
+ (h, opaque_ref)
+ })
+ }
+
+ pub fn is_end_stream(&self) -> bool {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let stream = me.store.resolve(self.key);
+
+ me.actions.recv.is_end_stream(&stream)
+ }
+
+ pub fn poll_data(&mut self, cx: &Context) -> Poll<Option<Result<Bytes, proto::Error>>> {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut stream = me.store.resolve(self.key);
+
+ me.actions.recv.poll_data(cx, &mut stream)
+ }
+
+ pub fn poll_trailers(&mut self, cx: &Context) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut stream = me.store.resolve(self.key);
+
+ me.actions.recv.poll_trailers(cx, &mut stream)
+ }
+
+ pub(crate) fn available_recv_capacity(&self) -> isize {
+ let me = self.inner.lock().unwrap();
+ let me = &*me;
+
+ let stream = &me.store[self.key];
+ stream.recv_flow.available().into()
+ }
+
+ pub(crate) fn used_recv_capacity(&self) -> WindowSize {
+ let me = self.inner.lock().unwrap();
+ let me = &*me;
+
+ let stream = &me.store[self.key];
+ stream.in_flight_recv_data
+ }
+
+ /// Releases recv capacity back to the peer. This may result in sending
+ /// WINDOW_UPDATE frames on both the stream and connection.
+ pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut stream = me.store.resolve(self.key);
+
+ me.actions
+ .recv
+ .release_capacity(capacity, &mut stream, &mut me.actions.task)
+ }
+
+ /// Clear the receive queue and set the status to no longer receive data frames.
+ pub(crate) fn clear_recv_buffer(&mut self) {
+ let mut me = self.inner.lock().unwrap();
+ let me = &mut *me;
+
+ let mut stream = me.store.resolve(self.key);
+ stream.is_recv = false;
+ me.actions.recv.clear_recv_buffer(&mut stream);
+ }
+
+ pub fn stream_id(&self) -> StreamId {
+ self.inner.lock().unwrap().store[self.key].id
+ }
+}
+
+impl fmt::Debug for OpaqueStreamRef {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ use std::sync::TryLockError::*;
+
+ match self.inner.try_lock() {
+ Ok(me) => {
+ let stream = &me.store[self.key];
+ fmt.debug_struct("OpaqueStreamRef")
+ .field("stream_id", &stream.id)
+ .field("ref_count", &stream.ref_count)
+ .finish()
+ }
+ Err(Poisoned(_)) => fmt
+ .debug_struct("OpaqueStreamRef")
+ .field("inner", &"<Poisoned>")
+ .finish(),
+ Err(WouldBlock) => fmt
+ .debug_struct("OpaqueStreamRef")
+ .field("inner", &"<Locked>")
+ .finish(),
+ }
+ }
+}
+
+impl Clone for OpaqueStreamRef {
+ fn clone(&self) -> Self {
+ // Increment the ref count
+ let mut inner = self.inner.lock().unwrap();
+ inner.store.resolve(self.key).ref_inc();
+ inner.refs += 1;
+
+ OpaqueStreamRef {
+ inner: self.inner.clone(),
+ key: self.key,
+ }
+ }
+}
+
+impl Drop for OpaqueStreamRef {
+ fn drop(&mut self) {
+ drop_stream_ref(&self.inner, self.key);
+ }
+}
+
+// TODO: Move back in fn above
+fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
+ let mut me = match inner.lock() {
+ Ok(inner) => inner,
+ Err(_) => {
+ if ::std::thread::panicking() {
+ tracing::trace!("StreamRef::drop; mutex poisoned");
+ return;
+ } else {
+ panic!("StreamRef::drop; mutex poisoned");
+ }
+ }
+ };
+
+ let me = &mut *me;
+ me.refs -= 1;
+ let mut stream = me.store.resolve(key);
+
+ tracing::trace!("drop_stream_ref; stream={:?}", stream);
+
+ // decrement the stream's ref count by 1.
+ stream.ref_dec();
+
+ let actions = &mut me.actions;
+
+ // If the stream is not referenced and it is already
+ // closed (does not have to go through logic below
+ // of canceling the stream), we should notify the task
+ // (connection) so that it can close properly
+ if stream.ref_count == 0 && stream.is_closed() {
+ if let Some(task) = actions.task.take() {
+ task.wake();
+ }
+ }
+
+ me.counts.transition(stream, |counts, stream| {
+ maybe_cancel(stream, actions, counts);
+
+ if stream.ref_count == 0 {
+ // Release any recv window back to connection, no one can access
+ // it anymore.
+ actions
+ .recv
+ .release_closed_capacity(stream, &mut actions.task);
+
+ // We won't be able to reach our push promises anymore
+ let mut ppp = stream.pending_push_promises.take();
+ while let Some(promise) = ppp.pop(stream.store_mut()) {
+ counts.transition(promise, |counts, stream| {
+ maybe_cancel(stream, actions, counts);
+ });
+ }
+ }
+ });
+}
+
+fn maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Counts) {
+ if stream.is_canceled_interest() {
+ // Server is allowed to early respond without fully consuming the client input stream
+ // But per the RFC, must send a RST_STREAM(NO_ERROR) in such cases. https://www.rfc-editor.org/rfc/rfc7540#section-8.1
+ // Some other http2 implementation may interpret other error code as fatal if not respected (i.e: nginx https://trac.nginx.org/nginx/ticket/2376)
+ let reason = if counts.peer().is_server()
+ && stream.state.is_send_closed()
+ && stream.state.is_recv_streaming()
+ {
+ Reason::NO_ERROR
+ } else {
+ Reason::CANCEL
+ };
+
+ actions
+ .send
+ .schedule_implicit_reset(stream, reason, counts, &mut actions.task);
+ actions.recv.enqueue_reset_expiration(stream, counts);
+ }
+}
+
+// ===== impl SendBuffer =====
+
+impl<B> SendBuffer<B> {
+ fn new() -> Self {
+ let inner = Mutex::new(Buffer::new());
+ SendBuffer { inner }
+ }
+}
+
+// ===== impl Actions =====
+
+impl Actions {
+ fn send_reset<B>(
+ &mut self,
+ stream: store::Ptr,
+ reason: Reason,
+ initiator: Initiator,
+ counts: &mut Counts,
+ send_buffer: &mut Buffer<Frame<B>>,
+ ) {
+ counts.transition(stream, |counts, stream| {
+ self.send.send_reset(
+ reason,
+ initiator,
+ send_buffer,
+ stream,
+ counts,
+ &mut self.task,
+ );
+ self.recv.enqueue_reset_expiration(stream, counts);
+ // if a RecvStream is parked, ensure it's notified
+ stream.notify_recv();
+ });
+ }
+
+ fn reset_on_recv_stream_err<B>(
+ &mut self,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ res: Result<(), Error>,
+ ) -> Result<(), Error> {
+ if let Err(Error::Reset(stream_id, reason, initiator)) = res {
+ debug_assert_eq!(stream_id, stream.id);
+ // Reset the stream.
+ self.send
+ .send_reset(reason, initiator, buffer, stream, counts, &mut self.task);
+ Ok(())
+ } else {
+ res
+ }
+ }
+
+ fn ensure_not_idle(&mut self, peer: peer::Dyn, id: StreamId) -> Result<(), Reason> {
+ if peer.is_local_init(id) {
+ self.send.ensure_not_idle(id)
+ } else {
+ self.recv.ensure_not_idle(id)
+ }
+ }
+
+ fn ensure_no_conn_error(&self) -> Result<(), proto::Error> {
+ if let Some(ref err) = self.conn_error {
+ Err(err.clone())
+ } else {
+ Ok(())
+ }
+ }
+
+ /// Check if we possibly could have processed and since forgotten this stream.
+ ///
+ /// If we send a RST_STREAM for a stream, we will eventually "forget" about
+ /// the stream to free up memory. It's possible that the remote peer had
+ /// frames in-flight, and by the time we receive them, our own state is
+ /// gone. We *could* tear everything down by sending a GOAWAY, but it
+ /// is more likely to be latency/memory constraints that caused this,
+ /// and not a bad actor. So be less catastrophic, the spec allows
+ /// us to send another RST_STREAM of STREAM_CLOSED.
+ fn may_have_forgotten_stream(&self, peer: peer::Dyn, id: StreamId) -> bool {
+ if id.is_zero() {
+ return false;
+ }
+ if peer.is_local_init(id) {
+ self.send.may_have_created_stream(id)
+ } else {
+ self.recv.may_have_created_stream(id)
+ }
+ }
+
+ fn clear_queues(&mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts) {
+ self.recv.clear_queues(clear_pending_accept, store, counts);
+ self.send.clear_queues(store, counts);
+ }
+}