use super::{ store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId, StreamIdOverflow, WindowSize, }; use crate::codec::UserError; use crate::frame::{self, Reason}; use crate::proto::{Error, Initiator}; use bytes::Buf; use http; use std::task::{Context, Poll, Waker}; use tokio::io::AsyncWrite; use std::io; /// Manages state transitions related to outbound frames. #[derive(Debug)] pub(super) struct Send { /// Stream identifier to use for next initialized stream. next_stream_id: Result, /// Any streams with a higher ID are ignored. /// /// This starts as MAX, but is lowered when a GOAWAY is received. /// /// > After sending a GOAWAY frame, the sender can discard frames for /// > streams initiated by the receiver with identifiers higher than /// > the identified last stream. max_stream_id: StreamId, /// Initial window size of locally initiated streams init_window_sz: WindowSize, /// Prioritization layer prioritize: Prioritize, is_push_enabled: bool, /// If extended connect protocol is enabled. is_extended_connect_protocol_enabled: bool, } /// A value to detect which public API has called `poll_reset`. #[derive(Debug)] pub(crate) enum PollReset { AwaitingHeaders, Streaming, } impl Send { /// Create a new `Send` pub fn new(config: &Config) -> Self { Send { init_window_sz: config.remote_init_window_sz, max_stream_id: StreamId::MAX, next_stream_id: Ok(config.local_next_stream_id), prioritize: Prioritize::new(config), is_push_enabled: true, is_extended_connect_protocol_enabled: false, } } /// Returns the initial send window size pub fn init_window_sz(&self) -> WindowSize { self.init_window_sz } pub fn open(&mut self) -> Result { let stream_id = self.ensure_next_stream_id()?; self.next_stream_id = stream_id.next_id(); Ok(stream_id) } pub fn reserve_local(&mut self) -> Result { let stream_id = self.ensure_next_stream_id()?; self.next_stream_id = stream_id.next_id(); Ok(stream_id) } fn check_headers(fields: &http::HeaderMap) -> Result<(), UserError> { // 8.1.2.2. Connection-Specific Header Fields if fields.contains_key(http::header::CONNECTION) || fields.contains_key(http::header::TRANSFER_ENCODING) || fields.contains_key(http::header::UPGRADE) || fields.contains_key("keep-alive") || fields.contains_key("proxy-connection") { tracing::debug!("illegal connection-specific headers found"); return Err(UserError::MalformedHeaders); } else if let Some(te) = fields.get(http::header::TE) { if te != "trailers" { tracing::debug!("illegal connection-specific headers found"); return Err(UserError::MalformedHeaders); } } Ok(()) } pub fn send_push_promise( &mut self, frame: frame::PushPromise, buffer: &mut Buffer>, stream: &mut store::Ptr, task: &mut Option, ) -> Result<(), UserError> { if !self.is_push_enabled { return Err(UserError::PeerDisabledServerPush); } tracing::trace!( "send_push_promise; frame={:?}; init_window={:?}", frame, self.init_window_sz ); Self::check_headers(frame.fields())?; // Queue the frame for sending self.prioritize .queue_frame(frame.into(), buffer, stream, task); Ok(()) } pub fn send_headers( &mut self, frame: frame::Headers, buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, task: &mut Option, ) -> Result<(), UserError> { tracing::trace!( "send_headers; frame={:?}; init_window={:?}", frame, self.init_window_sz ); Self::check_headers(frame.fields())?; let end_stream = frame.is_end_stream(); // Update the state stream.state.send_open(end_stream)?; if counts.peer().is_local_init(frame.stream_id()) { // If we're waiting on a PushPromise anyway // handle potentially queueing the stream at that point if !stream.is_pending_push { if counts.can_inc_num_send_streams() { counts.inc_num_send_streams(stream); } else { self.prioritize.queue_open(stream); } } } // Queue the frame for sending self.prioritize .queue_frame(frame.into(), buffer, stream, task); Ok(()) } /// Send an explicit RST_STREAM frame pub fn send_reset( &mut self, reason: Reason, initiator: Initiator, buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, task: &mut Option, ) { let is_reset = stream.state.is_reset(); let is_closed = stream.state.is_closed(); let is_empty = stream.pending_send.is_empty(); let stream_id = stream.id; tracing::trace!( "send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \ is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \ state={:?} \ ", reason, initiator, stream_id, is_reset, is_closed, is_empty, stream.state ); if is_reset { // Don't double reset tracing::trace!( " -> not sending RST_STREAM ({:?} is already reset)", stream_id ); return; } // Transition the state to reset no matter what. stream.state.set_reset(stream_id, reason, initiator); // If closed AND the send queue is flushed, then the stream cannot be // reset explicitly, either. Implicit resets can still be queued. if is_closed && is_empty { tracing::trace!( " -> not sending explicit RST_STREAM ({:?} was closed \ and send queue was flushed)", stream_id ); return; } // Clear all pending outbound frames. // Note that we don't call `self.recv_err` because we want to enqueue // the reset frame before transitioning the stream inside // `reclaim_all_capacity`. self.prioritize.clear_queue(buffer, stream); let frame = frame::Reset::new(stream.id, reason); tracing::trace!("send_reset -- queueing; frame={:?}", frame); self.prioritize .queue_frame(frame.into(), buffer, stream, task); self.prioritize.reclaim_all_capacity(stream, counts); } pub fn schedule_implicit_reset( &mut self, stream: &mut store::Ptr, reason: Reason, counts: &mut Counts, task: &mut Option, ) { if stream.state.is_closed() { // Stream is already closed, nothing more to do return; } stream.state.set_scheduled_reset(reason); self.prioritize.reclaim_reserved_capacity(stream, counts); self.prioritize.schedule_send(stream, task); } pub fn send_data( &mut self, frame: frame::Data, buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, task: &mut Option, ) -> Result<(), UserError> where B: Buf, { self.prioritize .send_data(frame, buffer, stream, counts, task) } pub fn send_trailers( &mut self, frame: frame::Headers, buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, task: &mut Option, ) -> Result<(), UserError> { // TODO: Should this logic be moved into state.rs? if !stream.state.is_send_streaming() { return Err(UserError::UnexpectedFrameType); } stream.state.send_close(); tracing::trace!("send_trailers -- queuing; frame={:?}", frame); self.prioritize .queue_frame(frame.into(), buffer, stream, task); // Release any excess capacity self.prioritize.reserve_capacity(0, stream, counts); Ok(()) } pub fn poll_complete( &mut self, cx: &mut Context, buffer: &mut Buffer>, store: &mut Store, counts: &mut Counts, dst: &mut Codec>, ) -> Poll> where T: AsyncWrite + Unpin, B: Buf, { self.prioritize .poll_complete(cx, buffer, store, counts, dst) } /// Request capacity to send data pub fn reserve_capacity( &mut self, capacity: WindowSize, stream: &mut store::Ptr, counts: &mut Counts, ) { self.prioritize.reserve_capacity(capacity, stream, counts) } pub fn poll_capacity( &mut self, cx: &Context, stream: &mut store::Ptr, ) -> Poll>> { if !stream.state.is_send_streaming() { return Poll::Ready(None); } if !stream.send_capacity_inc { stream.wait_send(cx); return Poll::Pending; } stream.send_capacity_inc = false; Poll::Ready(Some(Ok(self.capacity(stream)))) } /// Current available stream send capacity pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { let available = stream.send_flow.available().as_size() as usize; let buffered = stream.buffered_send_data; available .min(self.prioritize.max_buffer_size()) .saturating_sub(buffered) as WindowSize } pub fn poll_reset( &self, cx: &Context, stream: &mut Stream, mode: PollReset, ) -> Poll> { match stream.state.ensure_reason(mode)? { Some(reason) => Poll::Ready(Ok(reason)), None => { stream.wait_send(cx); Poll::Pending } } } pub fn recv_connection_window_update( &mut self, frame: frame::WindowUpdate, store: &mut Store, counts: &mut Counts, ) -> Result<(), Reason> { self.prioritize .recv_connection_window_update(frame.size_increment(), store, counts) } pub fn recv_stream_window_update( &mut self, sz: WindowSize, buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, task: &mut Option, ) -> Result<(), Reason> { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { tracing::debug!("recv_stream_window_update !!; err={:?}", e); self.send_reset( Reason::FLOW_CONTROL_ERROR, Initiator::Library, buffer, stream, counts, task, ); return Err(e); } Ok(()) } pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> { if last_stream_id > self.max_stream_id { // The remote endpoint sent a `GOAWAY` frame indicating a stream // that we never sent, or that we have already terminated on account // of previous `GOAWAY` frame. In either case, that is illegal. // (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase // the value they send in the last stream identifier, since the // peers might already have retried unprocessed requests on another // connection.") proto_err!(conn: "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})", last_stream_id, self.max_stream_id, ); return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); } self.max_stream_id = last_stream_id; Ok(()) } pub fn handle_error( &mut self, buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, ) { // Clear all pending outbound frames self.prioritize.clear_queue(buffer, stream); self.prioritize.reclaim_all_capacity(stream, counts); } pub fn apply_remote_settings( &mut self, settings: &frame::Settings, buffer: &mut Buffer>, store: &mut Store, counts: &mut Counts, task: &mut Option, ) -> Result<(), Error> { if let Some(val) = settings.is_extended_connect_protocol_enabled() { self.is_extended_connect_protocol_enabled = val; } // Applies an update to the remote endpoint's initial window size. // // Per RFC 7540 ยง6.9.2: // // In addition to changing the flow-control window for streams that are // not yet active, a SETTINGS frame can alter the initial flow-control // window size for streams with active flow-control windows (that is, // streams in the "open" or "half-closed (remote)" state). When the // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust // the size of all stream flow-control windows that it maintains by the // difference between the new value and the old value. // // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available // space in a flow-control window to become negative. A sender MUST // track the negative flow-control window and MUST NOT send new // flow-controlled frames until it receives WINDOW_UPDATE frames that // cause the flow-control window to become positive. if let Some(val) = settings.initial_window_size() { let old_val = self.init_window_sz; self.init_window_sz = val; if val < old_val { // We must decrease the (remote) window on every open stream. let dec = old_val - val; tracing::trace!("decrementing all windows; dec={}", dec); let mut total_reclaimed = 0; store.for_each(|mut stream| { let stream = &mut *stream; stream.send_flow.dec_send_window(dec); // It's possible that decreasing the window causes // `window_size` (the stream-specific window) to fall below // `available` (the portion of the connection-level window // that we have allocated to the stream). // In this case, we should take that excess allocation away // and reassign it to other streams. let window_size = stream.send_flow.window_size(); let available = stream.send_flow.available().as_size(); let reclaimed = if available > window_size { // Drop down to `window_size`. let reclaim = available - window_size; stream.send_flow.claim_capacity(reclaim); total_reclaimed += reclaim; reclaim } else { 0 }; tracing::trace!( "decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}", stream.id, dec, reclaimed, stream.send_flow ); // TODO: Should this notify the producer when the capacity // of a stream is reduced? Maybe it should if the capacity // is reduced to zero, allowing the producer to stop work. }); self.prioritize .assign_connection_capacity(total_reclaimed, store, counts); } else if val > old_val { let inc = val - old_val; store.try_for_each(|mut stream| { self.recv_stream_window_update(inc, buffer, &mut stream, counts, task) .map_err(Error::library_go_away) })?; } } if let Some(val) = settings.is_push_enabled() { self.is_push_enabled = val } Ok(()) } pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) { self.prioritize.clear_pending_capacity(store, counts); self.prioritize.clear_pending_send(store, counts); self.prioritize.clear_pending_open(store, counts); } pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { if let Ok(next) = self.next_stream_id { if id >= next { return Err(Reason::PROTOCOL_ERROR); } } // if next_stream_id is overflowed, that's ok. Ok(()) } pub fn ensure_next_stream_id(&self) -> Result { self.next_stream_id .map_err(|_| UserError::OverflowedStreamId) } pub fn may_have_created_stream(&self, id: StreamId) -> bool { if let Ok(next_id) = self.next_stream_id { // Peer::is_local_init should have been called beforehand debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),); id < next_id } else { true } } pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) { if let Ok(next_id) = self.next_stream_id { // Peer::is_local_init should have been called beforehand debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated()); if id >= next_id { self.next_stream_id = id.next_id(); } } } pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { self.is_extended_connect_protocol_enabled } }