summaryrefslogtreecommitdiffstats
path: root/third_party/rust/h2/src/proto/streams/state.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/h2/src/proto/streams/state.rs')
-rw-r--r--third_party/rust/h2/src/proto/streams/state.rs469
1 files changed, 469 insertions, 0 deletions
diff --git a/third_party/rust/h2/src/proto/streams/state.rs b/third_party/rust/h2/src/proto/streams/state.rs
new file mode 100644
index 0000000000..5256f09cf4
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/state.rs
@@ -0,0 +1,469 @@
+use std::io;
+
+use crate::codec::UserError;
+use crate::frame::{self, Reason, StreamId};
+use crate::proto::{self, Error, Initiator, PollReset};
+
+use self::Inner::*;
+use self::Peer::*;
+
+/// Represents the state of an H2 stream
+///
+/// ```not_rust
+/// +--------+
+/// send PP | | recv PP
+/// ,--------| idle |--------.
+/// / | | \
+/// v +--------+ v
+/// +----------+ | +----------+
+/// | | | send H / | |
+/// ,------| reserved | | recv H | reserved |------.
+/// | | (local) | | | (remote) | |
+/// | +----------+ v +----------+ |
+/// | | +--------+ | |
+/// | | recv ES | | send ES | |
+/// | send H | ,-------| open |-------. | recv H |
+/// | | / | | \ | |
+/// | v v +--------+ v v |
+/// | +----------+ | +----------+ |
+/// | | half | | | half | |
+/// | | closed | | send R / | closed | |
+/// | | (remote) | | recv R | (local) | |
+/// | +----------+ | +----------+ |
+/// | | | | |
+/// | | send ES / | recv ES / | |
+/// | | send R / v send R / | |
+/// | | recv R +--------+ recv R | |
+/// | send R / `----------->| |<-----------' send R / |
+/// | recv R | closed | recv R |
+/// `----------------------->| |<----------------------'
+/// +--------+
+///
+/// send: endpoint sends this frame
+/// recv: endpoint receives this frame
+///
+/// H: HEADERS frame (with implied CONTINUATIONs)
+/// PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
+/// ES: END_STREAM flag
+/// R: RST_STREAM frame
+/// ```
+#[derive(Debug, Clone)]
+pub struct State {
+ inner: Inner,
+}
+
+#[derive(Debug, Clone)]
+enum Inner {
+ Idle,
+ // TODO: these states shouldn't count against concurrency limits:
+ ReservedLocal,
+ ReservedRemote,
+ Open { local: Peer, remote: Peer },
+ HalfClosedLocal(Peer), // TODO: explicitly name this value
+ HalfClosedRemote(Peer),
+ Closed(Cause),
+}
+
+#[derive(Debug, Copy, Clone, Default)]
+enum Peer {
+ #[default]
+ AwaitingHeaders,
+ Streaming,
+}
+
+#[derive(Debug, Clone)]
+enum Cause {
+ EndStream,
+ Error(Error),
+
+ /// This indicates to the connection that a reset frame must be sent out
+ /// once the send queue has been flushed.
+ ///
+ /// Examples of when this could happen:
+ /// - User drops all references to a stream, so we want to CANCEL the it.
+ /// - Header block size was too large, so we want to REFUSE, possibly
+ /// after sending a 431 response frame.
+ ScheduledLibraryReset(Reason),
+}
+
+impl State {
+ /// Opens the send-half of a stream if it is not already open.
+ pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> {
+ let local = Streaming;
+
+ self.inner = match self.inner {
+ Idle => {
+ if eos {
+ HalfClosedLocal(AwaitingHeaders)
+ } else {
+ Open {
+ local,
+ remote: AwaitingHeaders,
+ }
+ }
+ }
+ Open {
+ local: AwaitingHeaders,
+ remote,
+ } => {
+ if eos {
+ HalfClosedLocal(remote)
+ } else {
+ Open { local, remote }
+ }
+ }
+ HalfClosedRemote(AwaitingHeaders) | ReservedLocal => {
+ if eos {
+ Closed(Cause::EndStream)
+ } else {
+ HalfClosedRemote(local)
+ }
+ }
+ _ => {
+ // All other transitions result in a protocol error
+ return Err(UserError::UnexpectedFrameType);
+ }
+ };
+
+ Ok(())
+ }
+
+ /// Opens the receive-half of the stream when a HEADERS frame is received.
+ ///
+ /// Returns true if this transitions the state to Open.
+ pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> {
+ let mut initial = false;
+ let eos = frame.is_end_stream();
+
+ self.inner = match self.inner {
+ Idle => {
+ initial = true;
+
+ if eos {
+ HalfClosedRemote(AwaitingHeaders)
+ } else {
+ Open {
+ local: AwaitingHeaders,
+ remote: if frame.is_informational() {
+ tracing::trace!("skipping 1xx response headers");
+ AwaitingHeaders
+ } else {
+ Streaming
+ },
+ }
+ }
+ }
+ ReservedRemote => {
+ initial = true;
+
+ if eos {
+ Closed(Cause::EndStream)
+ } else if frame.is_informational() {
+ tracing::trace!("skipping 1xx response headers");
+ ReservedRemote
+ } else {
+ HalfClosedLocal(Streaming)
+ }
+ }
+ Open {
+ local,
+ remote: AwaitingHeaders,
+ } => {
+ if eos {
+ HalfClosedRemote(local)
+ } else {
+ Open {
+ local,
+ remote: if frame.is_informational() {
+ tracing::trace!("skipping 1xx response headers");
+ AwaitingHeaders
+ } else {
+ Streaming
+ },
+ }
+ }
+ }
+ HalfClosedLocal(AwaitingHeaders) => {
+ if eos {
+ Closed(Cause::EndStream)
+ } else if frame.is_informational() {
+ tracing::trace!("skipping 1xx response headers");
+ HalfClosedLocal(AwaitingHeaders)
+ } else {
+ HalfClosedLocal(Streaming)
+ }
+ }
+ ref state => {
+ // All other transitions result in a protocol error
+ proto_err!(conn: "recv_open: in unexpected state {:?}", state);
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+ };
+
+ Ok(initial)
+ }
+
+ /// Transition from Idle -> ReservedRemote
+ pub fn reserve_remote(&mut self) -> Result<(), Error> {
+ match self.inner {
+ Idle => {
+ self.inner = ReservedRemote;
+ Ok(())
+ }
+ ref state => {
+ proto_err!(conn: "reserve_remote: in unexpected state {:?}", state);
+ Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
+ }
+ }
+ }
+
+ /// Transition from Idle -> ReservedLocal
+ pub fn reserve_local(&mut self) -> Result<(), UserError> {
+ match self.inner {
+ Idle => {
+ self.inner = ReservedLocal;
+ Ok(())
+ }
+ _ => Err(UserError::UnexpectedFrameType),
+ }
+ }
+
+ /// Indicates that the remote side will not send more data to the local.
+ pub fn recv_close(&mut self) -> Result<(), Error> {
+ match self.inner {
+ Open { local, .. } => {
+ // The remote side will continue to receive data.
+ tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local);
+ self.inner = HalfClosedRemote(local);
+ Ok(())
+ }
+ HalfClosedLocal(..) => {
+ tracing::trace!("recv_close: HalfClosedLocal => Closed");
+ self.inner = Closed(Cause::EndStream);
+ Ok(())
+ }
+ ref state => {
+ proto_err!(conn: "recv_close: in unexpected state {:?}", state);
+ Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
+ }
+ }
+ }
+
+ /// The remote explicitly sent a RST_STREAM.
+ ///
+ /// # Arguments
+ /// - `frame`: the received RST_STREAM frame.
+ /// - `queued`: true if this stream has frames in the pending send queue.
+ pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) {
+ match self.inner {
+ // If the stream is already in a `Closed` state, do nothing,
+ // provided that there are no frames still in the send queue.
+ Closed(..) if !queued => {}
+ // A notionally `Closed` stream may still have queued frames in
+ // the following cases:
+ //
+ // - if the cause is `Cause::Scheduled(..)` (i.e. we have not
+ // actually closed the stream yet).
+ // - if the cause is `Cause::EndStream`: we transition to this
+ // state when an EOS frame is *enqueued* (so that it's invalid
+ // to enqueue more frames), not when the EOS frame is *sent*;
+ // therefore, there may still be frames ahead of the EOS frame
+ // in the send queue.
+ //
+ // In either of these cases, we want to overwrite the stream's
+ // previous state with the received RST_STREAM, so that the queue
+ // will be cleared by `Prioritize::pop_frame`.
+ ref state => {
+ tracing::trace!(
+ "recv_reset; frame={:?}; state={:?}; queued={:?}",
+ frame,
+ state,
+ queued
+ );
+ self.inner = Closed(Cause::Error(Error::remote_reset(
+ frame.stream_id(),
+ frame.reason(),
+ )));
+ }
+ }
+ }
+
+ /// Handle a connection-level error.
+ pub fn handle_error(&mut self, err: &proto::Error) {
+ match self.inner {
+ Closed(..) => {}
+ _ => {
+ tracing::trace!("handle_error; err={:?}", err);
+ self.inner = Closed(Cause::Error(err.clone()));
+ }
+ }
+ }
+
+ pub fn recv_eof(&mut self) {
+ match self.inner {
+ Closed(..) => {}
+ ref state => {
+ tracing::trace!("recv_eof; state={:?}", state);
+ self.inner = Closed(Cause::Error(
+ io::Error::new(
+ io::ErrorKind::BrokenPipe,
+ "stream closed because of a broken pipe",
+ )
+ .into(),
+ ));
+ }
+ }
+ }
+
+ /// Indicates that the local side will not send more data to the local.
+ pub fn send_close(&mut self) {
+ match self.inner {
+ Open { remote, .. } => {
+ // The remote side will continue to receive data.
+ tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote);
+ self.inner = HalfClosedLocal(remote);
+ }
+ HalfClosedRemote(..) => {
+ tracing::trace!("send_close: HalfClosedRemote => Closed");
+ self.inner = Closed(Cause::EndStream);
+ }
+ ref state => panic!("send_close: unexpected state {:?}", state),
+ }
+ }
+
+ /// Set the stream state to reset locally.
+ pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) {
+ self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator)));
+ }
+
+ /// Set the stream state to a scheduled reset.
+ pub fn set_scheduled_reset(&mut self, reason: Reason) {
+ debug_assert!(!self.is_closed());
+ self.inner = Closed(Cause::ScheduledLibraryReset(reason));
+ }
+
+ pub fn get_scheduled_reset(&self) -> Option<Reason> {
+ match self.inner {
+ Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason),
+ _ => None,
+ }
+ }
+
+ pub fn is_scheduled_reset(&self) -> bool {
+ matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..)))
+ }
+
+ pub fn is_local_error(&self) -> bool {
+ match self.inner {
+ Closed(Cause::Error(ref e)) => e.is_local(),
+ Closed(Cause::ScheduledLibraryReset(..)) => true,
+ _ => false,
+ }
+ }
+
+ pub fn is_remote_reset(&self) -> bool {
+ matches!(
+ self.inner,
+ Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote)))
+ )
+ }
+
+ /// Returns true if the stream is already reset.
+ pub fn is_reset(&self) -> bool {
+ match self.inner {
+ Closed(Cause::EndStream) => false,
+ Closed(_) => true,
+ _ => false,
+ }
+ }
+
+ pub fn is_send_streaming(&self) -> bool {
+ matches!(
+ self.inner,
+ Open {
+ local: Streaming,
+ ..
+ } | HalfClosedRemote(Streaming)
+ )
+ }
+
+ /// Returns true when the stream is in a state to receive headers
+ pub fn is_recv_headers(&self) -> bool {
+ matches!(
+ self.inner,
+ Idle | Open {
+ remote: AwaitingHeaders,
+ ..
+ } | HalfClosedLocal(AwaitingHeaders)
+ | ReservedRemote
+ )
+ }
+
+ pub fn is_recv_streaming(&self) -> bool {
+ matches!(
+ self.inner,
+ Open {
+ remote: Streaming,
+ ..
+ } | HalfClosedLocal(Streaming)
+ )
+ }
+
+ pub fn is_closed(&self) -> bool {
+ matches!(self.inner, Closed(_))
+ }
+
+ pub fn is_recv_closed(&self) -> bool {
+ matches!(
+ self.inner,
+ Closed(..) | HalfClosedRemote(..) | ReservedLocal
+ )
+ }
+
+ pub fn is_send_closed(&self) -> bool {
+ matches!(
+ self.inner,
+ Closed(..) | HalfClosedLocal(..) | ReservedRemote
+ )
+ }
+
+ pub fn is_idle(&self) -> bool {
+ matches!(self.inner, Idle)
+ }
+
+ pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
+ // TODO: Is this correct?
+ match self.inner {
+ Closed(Cause::Error(ref e)) => Err(e.clone()),
+ Closed(Cause::ScheduledLibraryReset(reason)) => {
+ Err(proto::Error::library_go_away(reason))
+ }
+ Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false),
+ _ => Ok(true),
+ }
+ }
+
+ /// Returns a reason if the stream has been reset.
+ pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> {
+ match self.inner {
+ Closed(Cause::Error(Error::Reset(_, reason, _)))
+ | Closed(Cause::Error(Error::GoAway(_, reason, _)))
+ | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)),
+ Closed(Cause::Error(ref e)) => Err(e.clone().into()),
+ Open {
+ local: Streaming, ..
+ }
+ | HalfClosedRemote(Streaming) => match mode {
+ PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()),
+ PollReset::Streaming => Ok(None),
+ },
+ _ => Ok(None),
+ }
+ }
+}
+
+impl Default for State {
+ fn default() -> State {
+ State { inner: Inner::Idle }
+ }
+}