diff options
Diffstat (limited to 'third_party/rust/h2/src/proto/streams/state.rs')
-rw-r--r-- | third_party/rust/h2/src/proto/streams/state.rs | 472 |
1 files changed, 472 insertions, 0 deletions
diff --git a/third_party/rust/h2/src/proto/streams/state.rs b/third_party/rust/h2/src/proto/streams/state.rs new file mode 100644 index 0000000000..9931d41b1c --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/state.rs @@ -0,0 +1,472 @@ +use std::io; + +use crate::codec::UserError; +use crate::frame::{self, Reason, StreamId}; +use crate::proto::{self, Error, Initiator, PollReset}; + +use self::Inner::*; +use self::Peer::*; + +/// Represents the state of an H2 stream +/// +/// ```not_rust +/// +--------+ +/// send PP | | recv PP +/// ,--------| idle |--------. +/// / | | \ +/// v +--------+ v +/// +----------+ | +----------+ +/// | | | send H / | | +/// ,------| reserved | | recv H | reserved |------. +/// | | (local) | | | (remote) | | +/// | +----------+ v +----------+ | +/// | | +--------+ | | +/// | | recv ES | | send ES | | +/// | send H | ,-------| open |-------. | recv H | +/// | | / | | \ | | +/// | v v +--------+ v v | +/// | +----------+ | +----------+ | +/// | | half | | | half | | +/// | | closed | | send R / | closed | | +/// | | (remote) | | recv R | (local) | | +/// | +----------+ | +----------+ | +/// | | | | | +/// | | send ES / | recv ES / | | +/// | | send R / v send R / | | +/// | | recv R +--------+ recv R | | +/// | send R / `----------->| |<-----------' send R / | +/// | recv R | closed | recv R | +/// `----------------------->| |<----------------------' +/// +--------+ +/// +/// send: endpoint sends this frame +/// recv: endpoint receives this frame +/// +/// H: HEADERS frame (with implied CONTINUATIONs) +/// PP: PUSH_PROMISE frame (with implied CONTINUATIONs) +/// ES: END_STREAM flag +/// R: RST_STREAM frame +/// ``` +#[derive(Debug, Clone)] +pub struct State { + inner: Inner, +} + +#[derive(Debug, Clone)] +enum Inner { + Idle, + // TODO: these states shouldn't count against concurrency limits: + ReservedLocal, + ReservedRemote, + Open { local: Peer, remote: Peer }, + HalfClosedLocal(Peer), // TODO: explicitly name this value + HalfClosedRemote(Peer), + Closed(Cause), +} + +#[derive(Debug, Copy, Clone)] +enum Peer { + AwaitingHeaders, + Streaming, +} + +#[derive(Debug, Clone)] +enum Cause { + EndStream, + Error(Error), + + /// This indicates to the connection that a reset frame must be sent out + /// once the send queue has been flushed. + /// + /// Examples of when this could happen: + /// - User drops all references to a stream, so we want to CANCEL the it. + /// - Header block size was too large, so we want to REFUSE, possibly + /// after sending a 431 response frame. + ScheduledLibraryReset(Reason), +} + +impl State { + /// Opens the send-half of a stream if it is not already open. + pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> { + let local = Streaming; + + self.inner = match self.inner { + Idle => { + if eos { + HalfClosedLocal(AwaitingHeaders) + } else { + Open { + local, + remote: AwaitingHeaders, + } + } + } + Open { + local: AwaitingHeaders, + remote, + } => { + if eos { + HalfClosedLocal(remote) + } else { + Open { local, remote } + } + } + HalfClosedRemote(AwaitingHeaders) | ReservedLocal => { + if eos { + Closed(Cause::EndStream) + } else { + HalfClosedRemote(local) + } + } + _ => { + // All other transitions result in a protocol error + return Err(UserError::UnexpectedFrameType); + } + }; + + Ok(()) + } + + /// Opens the receive-half of the stream when a HEADERS frame is received. + /// + /// Returns true if this transitions the state to Open. + pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> { + let mut initial = false; + let eos = frame.is_end_stream(); + + self.inner = match self.inner { + Idle => { + initial = true; + + if eos { + HalfClosedRemote(AwaitingHeaders) + } else { + Open { + local: AwaitingHeaders, + remote: if frame.is_informational() { + tracing::trace!("skipping 1xx response headers"); + AwaitingHeaders + } else { + Streaming + }, + } + } + } + ReservedRemote => { + initial = true; + + if eos { + Closed(Cause::EndStream) + } else if frame.is_informational() { + tracing::trace!("skipping 1xx response headers"); + ReservedRemote + } else { + HalfClosedLocal(Streaming) + } + } + Open { + local, + remote: AwaitingHeaders, + } => { + if eos { + HalfClosedRemote(local) + } else { + Open { + local, + remote: if frame.is_informational() { + tracing::trace!("skipping 1xx response headers"); + AwaitingHeaders + } else { + Streaming + }, + } + } + } + HalfClosedLocal(AwaitingHeaders) => { + if eos { + Closed(Cause::EndStream) + } else if frame.is_informational() { + tracing::trace!("skipping 1xx response headers"); + HalfClosedLocal(AwaitingHeaders) + } else { + HalfClosedLocal(Streaming) + } + } + ref state => { + // All other transitions result in a protocol error + proto_err!(conn: "recv_open: in unexpected state {:?}", state); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + }; + + Ok(initial) + } + + /// Transition from Idle -> ReservedRemote + pub fn reserve_remote(&mut self) -> Result<(), Error> { + match self.inner { + Idle => { + self.inner = ReservedRemote; + Ok(()) + } + ref state => { + proto_err!(conn: "reserve_remote: in unexpected state {:?}", state); + Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) + } + } + } + + /// Transition from Idle -> ReservedLocal + pub fn reserve_local(&mut self) -> Result<(), UserError> { + match self.inner { + Idle => { + self.inner = ReservedLocal; + Ok(()) + } + _ => Err(UserError::UnexpectedFrameType), + } + } + + /// Indicates that the remote side will not send more data to the local. + pub fn recv_close(&mut self) -> Result<(), Error> { + match self.inner { + Open { local, .. } => { + // The remote side will continue to receive data. + tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local); + self.inner = HalfClosedRemote(local); + Ok(()) + } + HalfClosedLocal(..) => { + tracing::trace!("recv_close: HalfClosedLocal => Closed"); + self.inner = Closed(Cause::EndStream); + Ok(()) + } + ref state => { + proto_err!(conn: "recv_close: in unexpected state {:?}", state); + Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) + } + } + } + + /// The remote explicitly sent a RST_STREAM. + /// + /// # Arguments + /// - `frame`: the received RST_STREAM frame. + /// - `queued`: true if this stream has frames in the pending send queue. + pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) { + match self.inner { + // If the stream is already in a `Closed` state, do nothing, + // provided that there are no frames still in the send queue. + Closed(..) if !queued => {} + // A notionally `Closed` stream may still have queued frames in + // the following cases: + // + // - if the cause is `Cause::Scheduled(..)` (i.e. we have not + // actually closed the stream yet). + // - if the cause is `Cause::EndStream`: we transition to this + // state when an EOS frame is *enqueued* (so that it's invalid + // to enqueue more frames), not when the EOS frame is *sent*; + // therefore, there may still be frames ahead of the EOS frame + // in the send queue. + // + // In either of these cases, we want to overwrite the stream's + // previous state with the received RST_STREAM, so that the queue + // will be cleared by `Prioritize::pop_frame`. + ref state => { + tracing::trace!( + "recv_reset; frame={:?}; state={:?}; queued={:?}", + frame, + state, + queued + ); + self.inner = Closed(Cause::Error(Error::remote_reset( + frame.stream_id(), + frame.reason(), + ))); + } + } + } + + /// Handle a connection-level error. + pub fn handle_error(&mut self, err: &proto::Error) { + match self.inner { + Closed(..) => {} + _ => { + tracing::trace!("handle_error; err={:?}", err); + self.inner = Closed(Cause::Error(err.clone())); + } + } + } + + pub fn recv_eof(&mut self) { + match self.inner { + Closed(..) => {} + ref state => { + tracing::trace!("recv_eof; state={:?}", state); + self.inner = Closed(Cause::Error(io::ErrorKind::BrokenPipe.into())); + } + } + } + + /// Indicates that the local side will not send more data to the local. + pub fn send_close(&mut self) { + match self.inner { + Open { remote, .. } => { + // The remote side will continue to receive data. + tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote); + self.inner = HalfClosedLocal(remote); + } + HalfClosedRemote(..) => { + tracing::trace!("send_close: HalfClosedRemote => Closed"); + self.inner = Closed(Cause::EndStream); + } + ref state => panic!("send_close: unexpected state {:?}", state), + } + } + + /// Set the stream state to reset locally. + pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) { + self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator))); + } + + /// Set the stream state to a scheduled reset. + pub fn set_scheduled_reset(&mut self, reason: Reason) { + debug_assert!(!self.is_closed()); + self.inner = Closed(Cause::ScheduledLibraryReset(reason)); + } + + pub fn get_scheduled_reset(&self) -> Option<Reason> { + match self.inner { + Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason), + _ => None, + } + } + + pub fn is_scheduled_reset(&self) -> bool { + match self.inner { + Closed(Cause::ScheduledLibraryReset(..)) => true, + _ => false, + } + } + + pub fn is_local_reset(&self) -> bool { + match self.inner { + Closed(Cause::Error(ref e)) => e.is_local(), + Closed(Cause::ScheduledLibraryReset(..)) => true, + _ => false, + } + } + + /// Returns true if the stream is already reset. + pub fn is_reset(&self) -> bool { + match self.inner { + Closed(Cause::EndStream) => false, + Closed(_) => true, + _ => false, + } + } + + pub fn is_send_streaming(&self) -> bool { + match self.inner { + Open { + local: Streaming, .. + } => true, + HalfClosedRemote(Streaming) => true, + _ => false, + } + } + + /// Returns true when the stream is in a state to receive headers + pub fn is_recv_headers(&self) -> bool { + match self.inner { + Idle => true, + Open { + remote: AwaitingHeaders, + .. + } => true, + HalfClosedLocal(AwaitingHeaders) => true, + ReservedRemote => true, + _ => false, + } + } + + pub fn is_recv_streaming(&self) -> bool { + match self.inner { + Open { + remote: Streaming, .. + } => true, + HalfClosedLocal(Streaming) => true, + _ => false, + } + } + + pub fn is_closed(&self) -> bool { + match self.inner { + Closed(_) => true, + _ => false, + } + } + + pub fn is_recv_closed(&self) -> bool { + match self.inner { + Closed(..) | HalfClosedRemote(..) | ReservedLocal => true, + _ => false, + } + } + + pub fn is_send_closed(&self) -> bool { + match self.inner { + Closed(..) | HalfClosedLocal(..) | ReservedRemote => true, + _ => false, + } + } + + pub fn is_idle(&self) -> bool { + match self.inner { + Idle => true, + _ => false, + } + } + + pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> { + // TODO: Is this correct? + match self.inner { + Closed(Cause::Error(ref e)) => Err(e.clone()), + Closed(Cause::ScheduledLibraryReset(reason)) => { + Err(proto::Error::library_go_away(reason)) + } + Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false), + _ => Ok(true), + } + } + + /// Returns a reason if the stream has been reset. + pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> { + match self.inner { + Closed(Cause::Error(Error::Reset(_, reason, _))) + | Closed(Cause::Error(Error::GoAway(_, reason, _))) + | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)), + Closed(Cause::Error(ref e)) => Err(e.clone().into()), + Open { + local: Streaming, .. + } + | HalfClosedRemote(Streaming) => match mode { + PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()), + PollReset::Streaming => Ok(None), + }, + _ => Ok(None), + } + } +} + +impl Default for State { + fn default() -> State { + State { inner: Inner::Idle } + } +} + +impl Default for Peer { + fn default() -> Self { + AwaitingHeaders + } +} |