diff options
Diffstat (limited to 'third_party/rust/h2/src/proto/streams/send.rs')
-rw-r--r-- | third_party/rust/h2/src/proto/streams/send.rs | 565 |
1 files changed, 565 insertions, 0 deletions
diff --git a/third_party/rust/h2/src/proto/streams/send.rs b/third_party/rust/h2/src/proto/streams/send.rs new file mode 100644 index 0000000000..2c5a38c801 --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/send.rs @@ -0,0 +1,565 @@ +use super::{ + store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId, + StreamIdOverflow, WindowSize, +}; +use crate::codec::UserError; +use crate::frame::{self, Reason}; +use crate::proto::{Error, Initiator}; + +use bytes::Buf; +use http; +use std::task::{Context, Poll, Waker}; +use tokio::io::AsyncWrite; + +use std::io; + +/// Manages state transitions related to outbound frames. +#[derive(Debug)] +pub(super) struct Send { + /// Stream identifier to use for next initialized stream. + next_stream_id: Result<StreamId, StreamIdOverflow>, + + /// Any streams with a higher ID are ignored. + /// + /// This starts as MAX, but is lowered when a GOAWAY is received. + /// + /// > After sending a GOAWAY frame, the sender can discard frames for + /// > streams initiated by the receiver with identifiers higher than + /// > the identified last stream. + max_stream_id: StreamId, + + /// Initial window size of locally initiated streams + init_window_sz: WindowSize, + + /// Prioritization layer + prioritize: Prioritize, + + is_push_enabled: bool, + + /// If extended connect protocol is enabled. + is_extended_connect_protocol_enabled: bool, +} + +/// A value to detect which public API has called `poll_reset`. +#[derive(Debug)] +pub(crate) enum PollReset { + AwaitingHeaders, + Streaming, +} + +impl Send { + /// Create a new `Send` + pub fn new(config: &Config) -> Self { + Send { + init_window_sz: config.remote_init_window_sz, + max_stream_id: StreamId::MAX, + next_stream_id: Ok(config.local_next_stream_id), + prioritize: Prioritize::new(config), + is_push_enabled: true, + is_extended_connect_protocol_enabled: false, + } + } + + /// Returns the initial send window size + pub fn init_window_sz(&self) -> WindowSize { + self.init_window_sz + } + + pub fn open(&mut self) -> Result<StreamId, UserError> { + let stream_id = self.ensure_next_stream_id()?; + self.next_stream_id = stream_id.next_id(); + Ok(stream_id) + } + + pub fn reserve_local(&mut self) -> Result<StreamId, UserError> { + let stream_id = self.ensure_next_stream_id()?; + self.next_stream_id = stream_id.next_id(); + Ok(stream_id) + } + + fn check_headers(fields: &http::HeaderMap) -> Result<(), UserError> { + // 8.1.2.2. Connection-Specific Header Fields + if fields.contains_key(http::header::CONNECTION) + || fields.contains_key(http::header::TRANSFER_ENCODING) + || fields.contains_key(http::header::UPGRADE) + || fields.contains_key("keep-alive") + || fields.contains_key("proxy-connection") + { + tracing::debug!("illegal connection-specific headers found"); + return Err(UserError::MalformedHeaders); + } else if let Some(te) = fields.get(http::header::TE) { + if te != "trailers" { + tracing::debug!("illegal connection-specific headers found"); + return Err(UserError::MalformedHeaders); + } + } + Ok(()) + } + + pub fn send_push_promise<B>( + &mut self, + frame: frame::PushPromise, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + task: &mut Option<Waker>, + ) -> Result<(), UserError> { + if !self.is_push_enabled { + return Err(UserError::PeerDisabledServerPush); + } + + tracing::trace!( + "send_push_promise; frame={:?}; init_window={:?}", + frame, + self.init_window_sz + ); + + Self::check_headers(frame.fields())?; + + // Queue the frame for sending + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); + + Ok(()) + } + + pub fn send_headers<B>( + &mut self, + frame: frame::Headers, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + task: &mut Option<Waker>, + ) -> Result<(), UserError> { + tracing::trace!( + "send_headers; frame={:?}; init_window={:?}", + frame, + self.init_window_sz + ); + + Self::check_headers(frame.fields())?; + + let end_stream = frame.is_end_stream(); + + // Update the state + stream.state.send_open(end_stream)?; + + if counts.peer().is_local_init(frame.stream_id()) { + // If we're waiting on a PushPromise anyway + // handle potentially queueing the stream at that point + if !stream.is_pending_push { + if counts.can_inc_num_send_streams() { + counts.inc_num_send_streams(stream); + } else { + self.prioritize.queue_open(stream); + } + } + } + + // Queue the frame for sending + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); + + Ok(()) + } + + /// Send an explicit RST_STREAM frame + pub fn send_reset<B>( + &mut self, + reason: Reason, + initiator: Initiator, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + task: &mut Option<Waker>, + ) { + let is_reset = stream.state.is_reset(); + let is_closed = stream.state.is_closed(); + let is_empty = stream.pending_send.is_empty(); + let stream_id = stream.id; + + tracing::trace!( + "send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \ + is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \ + state={:?} \ + ", + reason, + initiator, + stream_id, + is_reset, + is_closed, + is_empty, + stream.state + ); + + if is_reset { + // Don't double reset + tracing::trace!( + " -> not sending RST_STREAM ({:?} is already reset)", + stream_id + ); + return; + } + + // Transition the state to reset no matter what. + stream.state.set_reset(stream_id, reason, initiator); + + // If closed AND the send queue is flushed, then the stream cannot be + // reset explicitly, either. Implicit resets can still be queued. + if is_closed && is_empty { + tracing::trace!( + " -> not sending explicit RST_STREAM ({:?} was closed \ + and send queue was flushed)", + stream_id + ); + return; + } + + // Clear all pending outbound frames. + // Note that we don't call `self.recv_err` because we want to enqueue + // the reset frame before transitioning the stream inside + // `reclaim_all_capacity`. + self.prioritize.clear_queue(buffer, stream); + + let frame = frame::Reset::new(stream.id, reason); + + tracing::trace!("send_reset -- queueing; frame={:?}", frame); + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); + self.prioritize.reclaim_all_capacity(stream, counts); + } + + pub fn schedule_implicit_reset( + &mut self, + stream: &mut store::Ptr, + reason: Reason, + counts: &mut Counts, + task: &mut Option<Waker>, + ) { + if stream.state.is_closed() { + // Stream is already closed, nothing more to do + return; + } + + stream.state.set_scheduled_reset(reason); + + self.prioritize.reclaim_reserved_capacity(stream, counts); + self.prioritize.schedule_send(stream, task); + } + + pub fn send_data<B>( + &mut self, + frame: frame::Data<B>, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + task: &mut Option<Waker>, + ) -> Result<(), UserError> + where + B: Buf, + { + self.prioritize + .send_data(frame, buffer, stream, counts, task) + } + + pub fn send_trailers<B>( + &mut self, + frame: frame::Headers, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + task: &mut Option<Waker>, + ) -> Result<(), UserError> { + // TODO: Should this logic be moved into state.rs? + if !stream.state.is_send_streaming() { + return Err(UserError::UnexpectedFrameType); + } + + stream.state.send_close(); + + tracing::trace!("send_trailers -- queuing; frame={:?}", frame); + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); + + // Release any excess capacity + self.prioritize.reserve_capacity(0, stream, counts); + + Ok(()) + } + + pub fn poll_complete<T, B>( + &mut self, + cx: &mut Context, + buffer: &mut Buffer<Frame<B>>, + store: &mut Store, + counts: &mut Counts, + dst: &mut Codec<T, Prioritized<B>>, + ) -> Poll<io::Result<()>> + where + T: AsyncWrite + Unpin, + B: Buf, + { + self.prioritize + .poll_complete(cx, buffer, store, counts, dst) + } + + /// Request capacity to send data + pub fn reserve_capacity( + &mut self, + capacity: WindowSize, + stream: &mut store::Ptr, + counts: &mut Counts, + ) { + self.prioritize.reserve_capacity(capacity, stream, counts) + } + + pub fn poll_capacity( + &mut self, + cx: &Context, + stream: &mut store::Ptr, + ) -> Poll<Option<Result<WindowSize, UserError>>> { + if !stream.state.is_send_streaming() { + return Poll::Ready(None); + } + + if !stream.send_capacity_inc { + stream.wait_send(cx); + return Poll::Pending; + } + + stream.send_capacity_inc = false; + + Poll::Ready(Some(Ok(self.capacity(stream)))) + } + + /// Current available stream send capacity + pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { + let available = stream.send_flow.available().as_size() as usize; + let buffered = stream.buffered_send_data; + + available + .min(self.prioritize.max_buffer_size()) + .saturating_sub(buffered) as WindowSize + } + + pub fn poll_reset( + &self, + cx: &Context, + stream: &mut Stream, + mode: PollReset, + ) -> Poll<Result<Reason, crate::Error>> { + match stream.state.ensure_reason(mode)? { + Some(reason) => Poll::Ready(Ok(reason)), + None => { + stream.wait_send(cx); + Poll::Pending + } + } + } + + pub fn recv_connection_window_update( + &mut self, + frame: frame::WindowUpdate, + store: &mut Store, + counts: &mut Counts, + ) -> Result<(), Reason> { + self.prioritize + .recv_connection_window_update(frame.size_increment(), store, counts) + } + + pub fn recv_stream_window_update<B>( + &mut self, + sz: WindowSize, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + task: &mut Option<Waker>, + ) -> Result<(), Reason> { + if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { + tracing::debug!("recv_stream_window_update !!; err={:?}", e); + + self.send_reset( + Reason::FLOW_CONTROL_ERROR, + Initiator::Library, + buffer, + stream, + counts, + task, + ); + + return Err(e); + } + + Ok(()) + } + + pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> { + if last_stream_id > self.max_stream_id { + // The remote endpoint sent a `GOAWAY` frame indicating a stream + // that we never sent, or that we have already terminated on account + // of previous `GOAWAY` frame. In either case, that is illegal. + // (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase + // the value they send in the last stream identifier, since the + // peers might already have retried unprocessed requests on another + // connection.") + proto_err!(conn: + "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})", + last_stream_id, self.max_stream_id, + ); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + + self.max_stream_id = last_stream_id; + Ok(()) + } + + pub fn handle_error<B>( + &mut self, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + ) { + // Clear all pending outbound frames + self.prioritize.clear_queue(buffer, stream); + self.prioritize.reclaim_all_capacity(stream, counts); + } + + pub fn apply_remote_settings<B>( + &mut self, + settings: &frame::Settings, + buffer: &mut Buffer<Frame<B>>, + store: &mut Store, + counts: &mut Counts, + task: &mut Option<Waker>, + ) -> Result<(), Error> { + if let Some(val) = settings.is_extended_connect_protocol_enabled() { + self.is_extended_connect_protocol_enabled = val; + } + + // Applies an update to the remote endpoint's initial window size. + // + // Per RFC 7540 ยง6.9.2: + // + // In addition to changing the flow-control window for streams that are + // not yet active, a SETTINGS frame can alter the initial flow-control + // window size for streams with active flow-control windows (that is, + // streams in the "open" or "half-closed (remote)" state). When the + // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust + // the size of all stream flow-control windows that it maintains by the + // difference between the new value and the old value. + // + // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available + // space in a flow-control window to become negative. A sender MUST + // track the negative flow-control window and MUST NOT send new + // flow-controlled frames until it receives WINDOW_UPDATE frames that + // cause the flow-control window to become positive. + if let Some(val) = settings.initial_window_size() { + let old_val = self.init_window_sz; + self.init_window_sz = val; + + if val < old_val { + // We must decrease the (remote) window on every open stream. + let dec = old_val - val; + tracing::trace!("decrementing all windows; dec={}", dec); + + let mut total_reclaimed = 0; + store.for_each(|mut stream| { + let stream = &mut *stream; + + stream.send_flow.dec_send_window(dec); + + // It's possible that decreasing the window causes + // `window_size` (the stream-specific window) to fall below + // `available` (the portion of the connection-level window + // that we have allocated to the stream). + // In this case, we should take that excess allocation away + // and reassign it to other streams. + let window_size = stream.send_flow.window_size(); + let available = stream.send_flow.available().as_size(); + let reclaimed = if available > window_size { + // Drop down to `window_size`. + let reclaim = available - window_size; + stream.send_flow.claim_capacity(reclaim); + total_reclaimed += reclaim; + reclaim + } else { + 0 + }; + + tracing::trace!( + "decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}", + stream.id, + dec, + reclaimed, + stream.send_flow + ); + + // TODO: Should this notify the producer when the capacity + // of a stream is reduced? Maybe it should if the capacity + // is reduced to zero, allowing the producer to stop work. + }); + + self.prioritize + .assign_connection_capacity(total_reclaimed, store, counts); + } else if val > old_val { + let inc = val - old_val; + + store.try_for_each(|mut stream| { + self.recv_stream_window_update(inc, buffer, &mut stream, counts, task) + .map_err(Error::library_go_away) + })?; + } + } + + if let Some(val) = settings.is_push_enabled() { + self.is_push_enabled = val + } + + Ok(()) + } + + pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) { + self.prioritize.clear_pending_capacity(store, counts); + self.prioritize.clear_pending_send(store, counts); + self.prioritize.clear_pending_open(store, counts); + } + + pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { + if let Ok(next) = self.next_stream_id { + if id >= next { + return Err(Reason::PROTOCOL_ERROR); + } + } + // if next_stream_id is overflowed, that's ok. + + Ok(()) + } + + pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> { + self.next_stream_id + .map_err(|_| UserError::OverflowedStreamId) + } + + pub fn may_have_created_stream(&self, id: StreamId) -> bool { + if let Ok(next_id) = self.next_stream_id { + // Peer::is_local_init should have been called beforehand + debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),); + id < next_id + } else { + true + } + } + + pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) { + if let Ok(next_id) = self.next_stream_id { + // Peer::is_local_init should have been called beforehand + debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated()); + if id >= next_id { + self.next_stream_id = id.next_id(); + } + } + } + + pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { + self.is_extended_connect_protocol_enabled + } +} |