summaryrefslogtreecommitdiffstats
path: root/third_party/rust/h2/src/proto/streams/send.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/h2/src/proto/streams/send.rs')
-rw-r--r--third_party/rust/h2/src/proto/streams/send.rs585
1 files changed, 585 insertions, 0 deletions
diff --git a/third_party/rust/h2/src/proto/streams/send.rs b/third_party/rust/h2/src/proto/streams/send.rs
new file mode 100644
index 0000000000..626e61a330
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/send.rs
@@ -0,0 +1,585 @@
+use super::{
+ store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId,
+ StreamIdOverflow, WindowSize,
+};
+use crate::codec::UserError;
+use crate::frame::{self, Reason};
+use crate::proto::{self, Error, Initiator};
+
+use bytes::Buf;
+use tokio::io::AsyncWrite;
+
+use std::cmp::Ordering;
+use std::io;
+use std::task::{Context, Poll, Waker};
+
+/// Manages state transitions related to outbound frames.
+#[derive(Debug)]
+pub(super) struct Send {
+ /// Stream identifier to use for next initialized stream.
+ next_stream_id: Result<StreamId, StreamIdOverflow>,
+
+ /// Any streams with a higher ID are ignored.
+ ///
+ /// This starts as MAX, but is lowered when a GOAWAY is received.
+ ///
+ /// > After sending a GOAWAY frame, the sender can discard frames for
+ /// > streams initiated by the receiver with identifiers higher than
+ /// > the identified last stream.
+ max_stream_id: StreamId,
+
+ /// Initial window size of locally initiated streams
+ init_window_sz: WindowSize,
+
+ /// Prioritization layer
+ prioritize: Prioritize,
+
+ is_push_enabled: bool,
+
+ /// If extended connect protocol is enabled.
+ is_extended_connect_protocol_enabled: bool,
+}
+
+/// A value to detect which public API has called `poll_reset`.
+#[derive(Debug)]
+pub(crate) enum PollReset {
+ AwaitingHeaders,
+ Streaming,
+}
+
+impl Send {
+ /// Create a new `Send`
+ pub fn new(config: &Config) -> Self {
+ Send {
+ init_window_sz: config.remote_init_window_sz,
+ max_stream_id: StreamId::MAX,
+ next_stream_id: Ok(config.local_next_stream_id),
+ prioritize: Prioritize::new(config),
+ is_push_enabled: true,
+ is_extended_connect_protocol_enabled: false,
+ }
+ }
+
+ /// Returns the initial send window size
+ pub fn init_window_sz(&self) -> WindowSize {
+ self.init_window_sz
+ }
+
+ pub fn open(&mut self) -> Result<StreamId, UserError> {
+ let stream_id = self.ensure_next_stream_id()?;
+ self.next_stream_id = stream_id.next_id();
+ Ok(stream_id)
+ }
+
+ pub fn reserve_local(&mut self) -> Result<StreamId, UserError> {
+ let stream_id = self.ensure_next_stream_id()?;
+ self.next_stream_id = stream_id.next_id();
+ Ok(stream_id)
+ }
+
+ fn check_headers(fields: &http::HeaderMap) -> Result<(), UserError> {
+ // 8.1.2.2. Connection-Specific Header Fields
+ if fields.contains_key(http::header::CONNECTION)
+ || fields.contains_key(http::header::TRANSFER_ENCODING)
+ || fields.contains_key(http::header::UPGRADE)
+ || fields.contains_key("keep-alive")
+ || fields.contains_key("proxy-connection")
+ {
+ tracing::debug!("illegal connection-specific headers found");
+ return Err(UserError::MalformedHeaders);
+ } else if let Some(te) = fields.get(http::header::TE) {
+ if te != "trailers" {
+ tracing::debug!("illegal connection-specific headers found");
+ return Err(UserError::MalformedHeaders);
+ }
+ }
+ Ok(())
+ }
+
+ pub fn send_push_promise<B>(
+ &mut self,
+ frame: frame::PushPromise,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ task: &mut Option<Waker>,
+ ) -> Result<(), UserError> {
+ if !self.is_push_enabled {
+ return Err(UserError::PeerDisabledServerPush);
+ }
+
+ tracing::trace!(
+ "send_push_promise; frame={:?}; init_window={:?}",
+ frame,
+ self.init_window_sz
+ );
+
+ Self::check_headers(frame.fields())?;
+
+ // Queue the frame for sending
+ self.prioritize
+ .queue_frame(frame.into(), buffer, stream, task);
+
+ Ok(())
+ }
+
+ pub fn send_headers<B>(
+ &mut self,
+ frame: frame::Headers,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) -> Result<(), UserError> {
+ tracing::trace!(
+ "send_headers; frame={:?}; init_window={:?}",
+ frame,
+ self.init_window_sz
+ );
+
+ Self::check_headers(frame.fields())?;
+
+ let end_stream = frame.is_end_stream();
+
+ // Update the state
+ stream.state.send_open(end_stream)?;
+
+ let mut pending_open = false;
+ if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push {
+ self.prioritize.queue_open(stream);
+ pending_open = true;
+ }
+
+ // Queue the frame for sending
+ //
+ // This call expects that, since new streams are in the open queue, new
+ // streams won't be pushed on pending_send.
+ self.prioritize
+ .queue_frame(frame.into(), buffer, stream, task);
+
+ // Need to notify the connection when pushing onto pending_open since
+ // queue_frame only notifies for pending_send.
+ if pending_open {
+ if let Some(task) = task.take() {
+ task.wake();
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Send an explicit RST_STREAM frame
+ pub fn send_reset<B>(
+ &mut self,
+ reason: Reason,
+ initiator: Initiator,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) {
+ let is_reset = stream.state.is_reset();
+ let is_closed = stream.state.is_closed();
+ let is_empty = stream.pending_send.is_empty();
+ let stream_id = stream.id;
+
+ tracing::trace!(
+ "send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \
+ is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
+ state={:?} \
+ ",
+ reason,
+ initiator,
+ stream_id,
+ is_reset,
+ is_closed,
+ is_empty,
+ stream.state
+ );
+
+ if is_reset {
+ // Don't double reset
+ tracing::trace!(
+ " -> not sending RST_STREAM ({:?} is already reset)",
+ stream_id
+ );
+ return;
+ }
+
+ // Transition the state to reset no matter what.
+ stream.state.set_reset(stream_id, reason, initiator);
+
+ // If closed AND the send queue is flushed, then the stream cannot be
+ // reset explicitly, either. Implicit resets can still be queued.
+ if is_closed && is_empty {
+ tracing::trace!(
+ " -> not sending explicit RST_STREAM ({:?} was closed \
+ and send queue was flushed)",
+ stream_id
+ );
+ return;
+ }
+
+ // Clear all pending outbound frames.
+ // Note that we don't call `self.recv_err` because we want to enqueue
+ // the reset frame before transitioning the stream inside
+ // `reclaim_all_capacity`.
+ self.prioritize.clear_queue(buffer, stream);
+
+ let frame = frame::Reset::new(stream.id, reason);
+
+ tracing::trace!("send_reset -- queueing; frame={:?}", frame);
+ self.prioritize
+ .queue_frame(frame.into(), buffer, stream, task);
+ self.prioritize.reclaim_all_capacity(stream, counts);
+ }
+
+ pub fn schedule_implicit_reset(
+ &mut self,
+ stream: &mut store::Ptr,
+ reason: Reason,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) {
+ if stream.state.is_closed() {
+ // Stream is already closed, nothing more to do
+ return;
+ }
+
+ stream.state.set_scheduled_reset(reason);
+
+ self.prioritize.reclaim_reserved_capacity(stream, counts);
+ self.prioritize.schedule_send(stream, task);
+ }
+
+ pub fn send_data<B>(
+ &mut self,
+ frame: frame::Data<B>,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) -> Result<(), UserError>
+ where
+ B: Buf,
+ {
+ self.prioritize
+ .send_data(frame, buffer, stream, counts, task)
+ }
+
+ pub fn send_trailers<B>(
+ &mut self,
+ frame: frame::Headers,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) -> Result<(), UserError> {
+ // TODO: Should this logic be moved into state.rs?
+ if !stream.state.is_send_streaming() {
+ return Err(UserError::UnexpectedFrameType);
+ }
+
+ stream.state.send_close();
+
+ tracing::trace!("send_trailers -- queuing; frame={:?}", frame);
+ self.prioritize
+ .queue_frame(frame.into(), buffer, stream, task);
+
+ // Release any excess capacity
+ self.prioritize.reserve_capacity(0, stream, counts);
+
+ Ok(())
+ }
+
+ pub fn poll_complete<T, B>(
+ &mut self,
+ cx: &mut Context,
+ buffer: &mut Buffer<Frame<B>>,
+ store: &mut Store,
+ counts: &mut Counts,
+ dst: &mut Codec<T, Prioritized<B>>,
+ ) -> Poll<io::Result<()>>
+ where
+ T: AsyncWrite + Unpin,
+ B: Buf,
+ {
+ self.prioritize
+ .poll_complete(cx, buffer, store, counts, dst)
+ }
+
+ /// Request capacity to send data
+ pub fn reserve_capacity(
+ &mut self,
+ capacity: WindowSize,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ ) {
+ self.prioritize.reserve_capacity(capacity, stream, counts)
+ }
+
+ pub fn poll_capacity(
+ &mut self,
+ cx: &Context,
+ stream: &mut store::Ptr,
+ ) -> Poll<Option<Result<WindowSize, UserError>>> {
+ if !stream.state.is_send_streaming() {
+ return Poll::Ready(None);
+ }
+
+ if !stream.send_capacity_inc {
+ stream.wait_send(cx);
+ return Poll::Pending;
+ }
+
+ stream.send_capacity_inc = false;
+
+ Poll::Ready(Some(Ok(self.capacity(stream))))
+ }
+
+ /// Current available stream send capacity
+ pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize {
+ stream.capacity(self.prioritize.max_buffer_size())
+ }
+
+ pub fn poll_reset(
+ &self,
+ cx: &Context,
+ stream: &mut Stream,
+ mode: PollReset,
+ ) -> Poll<Result<Reason, crate::Error>> {
+ match stream.state.ensure_reason(mode)? {
+ Some(reason) => Poll::Ready(Ok(reason)),
+ None => {
+ stream.wait_send(cx);
+ Poll::Pending
+ }
+ }
+ }
+
+ pub fn recv_connection_window_update(
+ &mut self,
+ frame: frame::WindowUpdate,
+ store: &mut Store,
+ counts: &mut Counts,
+ ) -> Result<(), Reason> {
+ self.prioritize
+ .recv_connection_window_update(frame.size_increment(), store, counts)
+ }
+
+ pub fn recv_stream_window_update<B>(
+ &mut self,
+ sz: WindowSize,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) -> Result<(), Reason> {
+ if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
+ tracing::debug!("recv_stream_window_update !!; err={:?}", e);
+
+ self.send_reset(
+ Reason::FLOW_CONTROL_ERROR,
+ Initiator::Library,
+ buffer,
+ stream,
+ counts,
+ task,
+ );
+
+ return Err(e);
+ }
+
+ Ok(())
+ }
+
+ pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> {
+ if last_stream_id > self.max_stream_id {
+ // The remote endpoint sent a `GOAWAY` frame indicating a stream
+ // that we never sent, or that we have already terminated on account
+ // of previous `GOAWAY` frame. In either case, that is illegal.
+ // (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase
+ // the value they send in the last stream identifier, since the
+ // peers might already have retried unprocessed requests on another
+ // connection.")
+ proto_err!(conn:
+ "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})",
+ last_stream_id, self.max_stream_id,
+ );
+ return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
+ }
+
+ self.max_stream_id = last_stream_id;
+ Ok(())
+ }
+
+ pub fn handle_error<B>(
+ &mut self,
+ buffer: &mut Buffer<Frame<B>>,
+ stream: &mut store::Ptr,
+ counts: &mut Counts,
+ ) {
+ // Clear all pending outbound frames
+ self.prioritize.clear_queue(buffer, stream);
+ self.prioritize.reclaim_all_capacity(stream, counts);
+ }
+
+ pub fn apply_remote_settings<B>(
+ &mut self,
+ settings: &frame::Settings,
+ buffer: &mut Buffer<Frame<B>>,
+ store: &mut Store,
+ counts: &mut Counts,
+ task: &mut Option<Waker>,
+ ) -> Result<(), Error> {
+ if let Some(val) = settings.is_extended_connect_protocol_enabled() {
+ self.is_extended_connect_protocol_enabled = val;
+ }
+
+ // Applies an update to the remote endpoint's initial window size.
+ //
+ // Per RFC 7540 ยง6.9.2:
+ //
+ // In addition to changing the flow-control window for streams that are
+ // not yet active, a SETTINGS frame can alter the initial flow-control
+ // window size for streams with active flow-control windows (that is,
+ // streams in the "open" or "half-closed (remote)" state). When the
+ // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
+ // the size of all stream flow-control windows that it maintains by the
+ // difference between the new value and the old value.
+ //
+ // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
+ // space in a flow-control window to become negative. A sender MUST
+ // track the negative flow-control window and MUST NOT send new
+ // flow-controlled frames until it receives WINDOW_UPDATE frames that
+ // cause the flow-control window to become positive.
+ if let Some(val) = settings.initial_window_size() {
+ let old_val = self.init_window_sz;
+ self.init_window_sz = val;
+
+ match val.cmp(&old_val) {
+ Ordering::Less => {
+ // We must decrease the (remote) window on every open stream.
+ let dec = old_val - val;
+ tracing::trace!("decrementing all windows; dec={}", dec);
+
+ let mut total_reclaimed = 0;
+ store.try_for_each(|mut stream| {
+ let stream = &mut *stream;
+
+ tracing::trace!(
+ "decrementing stream window; id={:?}; decr={}; flow={:?}",
+ stream.id,
+ dec,
+ stream.send_flow
+ );
+
+ // TODO: this decrement can underflow based on received frames!
+ stream
+ .send_flow
+ .dec_send_window(dec)
+ .map_err(proto::Error::library_go_away)?;
+
+ // It's possible that decreasing the window causes
+ // `window_size` (the stream-specific window) to fall below
+ // `available` (the portion of the connection-level window
+ // that we have allocated to the stream).
+ // In this case, we should take that excess allocation away
+ // and reassign it to other streams.
+ let window_size = stream.send_flow.window_size();
+ let available = stream.send_flow.available().as_size();
+ let reclaimed = if available > window_size {
+ // Drop down to `window_size`.
+ let reclaim = available - window_size;
+ stream
+ .send_flow
+ .claim_capacity(reclaim)
+ .map_err(proto::Error::library_go_away)?;
+ total_reclaimed += reclaim;
+ reclaim
+ } else {
+ 0
+ };
+
+ tracing::trace!(
+ "decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}",
+ stream.id,
+ dec,
+ reclaimed,
+ stream.send_flow
+ );
+
+ // TODO: Should this notify the producer when the capacity
+ // of a stream is reduced? Maybe it should if the capacity
+ // is reduced to zero, allowing the producer to stop work.
+
+ Ok::<_, proto::Error>(())
+ })?;
+
+ self.prioritize
+ .assign_connection_capacity(total_reclaimed, store, counts);
+ }
+ Ordering::Greater => {
+ let inc = val - old_val;
+
+ store.try_for_each(|mut stream| {
+ self.recv_stream_window_update(inc, buffer, &mut stream, counts, task)
+ .map_err(Error::library_go_away)
+ })?;
+ }
+ Ordering::Equal => (),
+ }
+ }
+
+ if let Some(val) = settings.is_push_enabled() {
+ self.is_push_enabled = val
+ }
+
+ Ok(())
+ }
+
+ pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) {
+ self.prioritize.clear_pending_capacity(store, counts);
+ self.prioritize.clear_pending_send(store, counts);
+ self.prioritize.clear_pending_open(store, counts);
+ }
+
+ pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
+ if let Ok(next) = self.next_stream_id {
+ if id >= next {
+ return Err(Reason::PROTOCOL_ERROR);
+ }
+ }
+ // if next_stream_id is overflowed, that's ok.
+
+ Ok(())
+ }
+
+ pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> {
+ self.next_stream_id
+ .map_err(|_| UserError::OverflowedStreamId)
+ }
+
+ pub fn may_have_created_stream(&self, id: StreamId) -> bool {
+ if let Ok(next_id) = self.next_stream_id {
+ // Peer::is_local_init should have been called beforehand
+ debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
+ id < next_id
+ } else {
+ true
+ }
+ }
+
+ pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
+ if let Ok(next_id) = self.next_stream_id {
+ // Peer::is_local_init should have been called beforehand
+ debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
+ if id >= next_id {
+ self.next_stream_id = id.next_id();
+ }
+ }
+ }
+
+ pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
+ self.is_extended_connect_protocol_enabled
+ }
+}