diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/h2/src/proto/streams | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/h2/src/proto/streams')
-rw-r--r-- | third_party/rust/h2/src/proto/streams/buffer.rs | 95 | ||||
-rw-r--r-- | third_party/rust/h2/src/proto/streams/counts.rs | 210 | ||||
-rw-r--r-- | third_party/rust/h2/src/proto/streams/flow_control.rs | 258 | ||||
-rw-r--r-- | third_party/rust/h2/src/proto/streams/mod.rs | 67 | ||||
-rw-r--r-- | third_party/rust/h2/src/proto/streams/prioritize.rs | 903 | ||||
-rw-r--r-- | third_party/rust/h2/src/proto/streams/recv.rs | 1097 | ||||
-rw-r--r-- | third_party/rust/h2/src/proto/streams/send.rs | 565 | ||||
-rw-r--r-- | third_party/rust/h2/src/proto/streams/state.rs | 472 | ||||
-rw-r--r-- | third_party/rust/h2/src/proto/streams/store.rs | 426 | ||||
-rw-r--r-- | third_party/rust/h2/src/proto/streams/stream.rs | 490 | ||||
-rw-r--r-- | third_party/rust/h2/src/proto/streams/streams.rs | 1576 |
11 files changed, 6159 insertions, 0 deletions
diff --git a/third_party/rust/h2/src/proto/streams/buffer.rs b/third_party/rust/h2/src/proto/streams/buffer.rs new file mode 100644 index 0000000000..2648a410e4 --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/buffer.rs @@ -0,0 +1,95 @@ +use slab::Slab; + +/// Buffers frames for multiple streams. +#[derive(Debug)] +pub struct Buffer<T> { + slab: Slab<Slot<T>>, +} + +/// A sequence of frames in a `Buffer` +#[derive(Debug)] +pub struct Deque { + indices: Option<Indices>, +} + +/// Tracks the head & tail for a sequence of frames in a `Buffer`. +#[derive(Debug, Default, Copy, Clone)] +struct Indices { + head: usize, + tail: usize, +} + +#[derive(Debug)] +struct Slot<T> { + value: T, + next: Option<usize>, +} + +impl<T> Buffer<T> { + pub fn new() -> Self { + Buffer { slab: Slab::new() } + } +} + +impl Deque { + pub fn new() -> Self { + Deque { indices: None } + } + + pub fn is_empty(&self) -> bool { + self.indices.is_none() + } + + pub fn push_back<T>(&mut self, buf: &mut Buffer<T>, value: T) { + let key = buf.slab.insert(Slot { value, next: None }); + + match self.indices { + Some(ref mut idxs) => { + buf.slab[idxs.tail].next = Some(key); + idxs.tail = key; + } + None => { + self.indices = Some(Indices { + head: key, + tail: key, + }); + } + } + } + + pub fn push_front<T>(&mut self, buf: &mut Buffer<T>, value: T) { + let key = buf.slab.insert(Slot { value, next: None }); + + match self.indices { + Some(ref mut idxs) => { + buf.slab[key].next = Some(idxs.head); + idxs.head = key; + } + None => { + self.indices = Some(Indices { + head: key, + tail: key, + }); + } + } + } + + pub fn pop_front<T>(&mut self, buf: &mut Buffer<T>) -> Option<T> { + match self.indices { + Some(mut idxs) => { + let mut slot = buf.slab.remove(idxs.head); + + if idxs.head == idxs.tail { + assert!(slot.next.is_none()); + self.indices = None; + } else { + idxs.head = slot.next.take().unwrap(); + self.indices = Some(idxs); + } + + Some(slot.value) + } + None => None, + } + } +} diff --git a/third_party/rust/h2/src/proto/streams/counts.rs b/third_party/rust/h2/src/proto/streams/counts.rs new file mode 100644 index 0000000000..70dfc7851d --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/counts.rs @@ -0,0 +1,210 @@ +use super::*; + +use std::usize; + +#[derive(Debug)] +pub(super) struct Counts { + /// Acting as a client or server. This allows us to track which values to + /// inc / dec. + peer: peer::Dyn, + + /// Maximum number of locally initiated streams + max_send_streams: usize, + + /// Current number of remote initiated streams + num_send_streams: usize, + + /// Maximum number of remote initiated streams + max_recv_streams: usize, + + /// Current number of locally initiated streams + num_recv_streams: usize, + + /// Maximum number of pending locally reset streams + max_reset_streams: usize, + + /// Current number of pending locally reset streams + num_reset_streams: usize, +} + +impl Counts { + /// Create a new `Counts` using the provided configuration values. + pub fn new(peer: peer::Dyn, config: &Config) -> Self { + Counts { + peer, + max_send_streams: config.initial_max_send_streams, + num_send_streams: 0, + max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX), + num_recv_streams: 0, + max_reset_streams: config.local_reset_max, + num_reset_streams: 0, + } + } + + /// Returns the current peer + pub fn peer(&self) -> peer::Dyn { + self.peer + } + + pub fn has_streams(&self) -> bool { + self.num_send_streams != 0 || self.num_recv_streams != 0 + } + + /// Returns true if the receive stream concurrency can be incremented + pub fn can_inc_num_recv_streams(&self) -> bool { + self.max_recv_streams > self.num_recv_streams + } + + /// Increments the number of concurrent receive streams. + /// + /// # Panics + /// + /// Panics on failure as this should have been validated before hand. + pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) { + assert!(self.can_inc_num_recv_streams()); + assert!(!stream.is_counted); + + // Increment the number of remote initiated streams + self.num_recv_streams += 1; + stream.is_counted = true; + } + + /// Returns true if the send stream concurrency can be incremented + pub fn can_inc_num_send_streams(&self) -> bool { + self.max_send_streams > self.num_send_streams + } + + /// Increments the number of concurrent send streams. + /// + /// # Panics + /// + /// Panics on failure as this should have been validated before hand. + pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) { + assert!(self.can_inc_num_send_streams()); + assert!(!stream.is_counted); + + // Increment the number of remote initiated streams + self.num_send_streams += 1; + stream.is_counted = true; + } + + /// Returns true if the number of pending reset streams can be incremented. + pub fn can_inc_num_reset_streams(&self) -> bool { + self.max_reset_streams > self.num_reset_streams + } + + /// Increments the number of pending reset streams. + /// + /// # Panics + /// + /// Panics on failure as this should have been validated before hand. + pub fn inc_num_reset_streams(&mut self) { + assert!(self.can_inc_num_reset_streams()); + + self.num_reset_streams += 1; + } + + pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { + if let Some(val) = settings.max_concurrent_streams() { + self.max_send_streams = val as usize; + } + } + + /// Run a block of code that could potentially transition a stream's state. + /// + /// If the stream state transitions to closed, this function will perform + /// all necessary cleanup. + /// + /// TODO: Is this function still needed? + pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U + where + F: FnOnce(&mut Self, &mut store::Ptr) -> U, + { + // TODO: Does this need to be computed before performing the action? + let is_pending_reset = stream.is_pending_reset_expiration(); + + // Run the action + let ret = f(self, &mut stream); + + self.transition_after(stream, is_pending_reset); + + ret + } + + // TODO: move this to macro? + pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) { + tracing::trace!( + "transition_after; stream={:?}; state={:?}; is_closed={:?}; \ + pending_send_empty={:?}; buffered_send_data={}; \ + num_recv={}; num_send={}", + stream.id, + stream.state, + stream.is_closed(), + stream.pending_send.is_empty(), + stream.buffered_send_data, + self.num_recv_streams, + self.num_send_streams + ); + + if stream.is_closed() { + if !stream.is_pending_reset_expiration() { + stream.unlink(); + if is_reset_counted { + self.dec_num_reset_streams(); + } + } + + if stream.is_counted { + tracing::trace!("dec_num_streams; stream={:?}", stream.id); + // Decrement the number of active streams. + self.dec_num_streams(&mut stream); + } + } + + // Release the stream if it requires releasing + if stream.is_released() { + stream.remove(); + } + } + + /// Returns the maximum number of streams that can be initiated by this + /// peer. + pub(crate) fn max_send_streams(&self) -> usize { + self.max_send_streams + } + + /// Returns the maximum number of streams that can be initiated by the + /// remote peer. + pub(crate) fn max_recv_streams(&self) -> usize { + self.max_recv_streams + } + + fn dec_num_streams(&mut self, stream: &mut store::Ptr) { + assert!(stream.is_counted); + + if self.peer.is_local_init(stream.id) { + assert!(self.num_send_streams > 0); + self.num_send_streams -= 1; + stream.is_counted = false; + } else { + assert!(self.num_recv_streams > 0); + self.num_recv_streams -= 1; + stream.is_counted = false; + } + } + + fn dec_num_reset_streams(&mut self) { + assert!(self.num_reset_streams > 0); + self.num_reset_streams -= 1; + } +} + +impl Drop for Counts { + fn drop(&mut self) { + use std::thread; + + if !thread::panicking() { + debug_assert!(!self.has_streams()); + } + } +} diff --git a/third_party/rust/h2/src/proto/streams/flow_control.rs b/third_party/rust/h2/src/proto/streams/flow_control.rs new file mode 100644 index 0000000000..4a47f08ddf --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/flow_control.rs @@ -0,0 +1,258 @@ +use crate::frame::Reason; +use crate::proto::{WindowSize, MAX_WINDOW_SIZE}; + +use std::fmt; + +// We don't want to send WINDOW_UPDATE frames for tiny changes, but instead +// aggregate them when the changes are significant. Many implementations do +// this by keeping a "ratio" of the update version the allowed window size. +// +// While some may wish to represent this ratio as percentage, using a f32, +// we skip having to deal with float math and stick to integers. To do so, +// the "ratio" is represented by 2 i32s, split into the numerator and +// denominator. For example, a 50% ratio is simply represented as 1/2. +// +// An example applying this ratio: If a stream has an allowed window size of +// 100 bytes, WINDOW_UPDATE frames are scheduled when the unclaimed change +// becomes greater than 1/2, or 50 bytes. +const UNCLAIMED_NUMERATOR: i32 = 1; +const UNCLAIMED_DENOMINATOR: i32 = 2; + +#[test] +fn sanity_unclaimed_ratio() { + assert!(UNCLAIMED_NUMERATOR < UNCLAIMED_DENOMINATOR); + assert!(UNCLAIMED_NUMERATOR >= 0); + assert!(UNCLAIMED_DENOMINATOR > 0); +} + +#[derive(Copy, Clone, Debug)] +pub struct FlowControl { + /// Window the peer knows about. + /// + /// This can go negative if a SETTINGS_INITIAL_WINDOW_SIZE is received. + /// + /// For example, say the peer sends a request and uses 32kb of the window. + /// We send a SETTINGS_INITIAL_WINDOW_SIZE of 16kb. The peer has to adjust + /// its understanding of the capacity of the window, and that would be: + /// + /// ```notrust + /// default (64kb) - used (32kb) - settings_diff (64kb - 16kb): -16kb + /// ``` + window_size: Window, + + /// Window that we know about. + /// + /// This can go negative if a user declares a smaller target window than + /// the peer knows about. + available: Window, +} + +impl FlowControl { + pub fn new() -> FlowControl { + FlowControl { + window_size: Window(0), + available: Window(0), + } + } + + /// Returns the window size as known by the peer + pub fn window_size(&self) -> WindowSize { + self.window_size.as_size() + } + + /// Returns the window size available to the consumer + pub fn available(&self) -> Window { + self.available + } + + /// Returns true if there is unavailable window capacity + pub fn has_unavailable(&self) -> bool { + if self.window_size < 0 { + return false; + } + + self.window_size > self.available + } + + pub fn claim_capacity(&mut self, capacity: WindowSize) { + self.available -= capacity; + } + + pub fn assign_capacity(&mut self, capacity: WindowSize) { + self.available += capacity; + } + + /// If a WINDOW_UPDATE frame should be sent, returns a positive number + /// representing the increment to be used. + /// + /// If there is no available bytes to be reclaimed, or the number of + /// available bytes does not reach the threshold, this returns `None`. + /// + /// This represents pending outbound WINDOW_UPDATE frames. + pub fn unclaimed_capacity(&self) -> Option<WindowSize> { + let available = self.available; + + if self.window_size >= available { + return None; + } + + let unclaimed = available.0 - self.window_size.0; + let threshold = self.window_size.0 / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR; + + if unclaimed < threshold { + None + } else { + Some(unclaimed as WindowSize) + } + } + + /// Increase the window size. + /// + /// This is called after receiving a WINDOW_UPDATE frame + pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> { + let (val, overflow) = self.window_size.0.overflowing_add(sz as i32); + + if overflow { + return Err(Reason::FLOW_CONTROL_ERROR); + } + + if val > MAX_WINDOW_SIZE as i32 { + return Err(Reason::FLOW_CONTROL_ERROR); + } + + tracing::trace!( + "inc_window; sz={}; old={}; new={}", + sz, + self.window_size, + val + ); + + self.window_size = Window(val); + Ok(()) + } + + /// Decrement the send-side window size. + /// + /// This is called after receiving a SETTINGS frame with a lower + /// INITIAL_WINDOW_SIZE value. + pub fn dec_send_window(&mut self, sz: WindowSize) { + tracing::trace!( + "dec_window; sz={}; window={}, available={}", + sz, + self.window_size, + self.available + ); + // This should not be able to overflow `window_size` from the bottom. + self.window_size -= sz; + } + + /// Decrement the recv-side window size. + /// + /// This is called after receiving a SETTINGS ACK frame with a lower + /// INITIAL_WINDOW_SIZE value. + pub fn dec_recv_window(&mut self, sz: WindowSize) { + tracing::trace!( + "dec_recv_window; sz={}; window={}, available={}", + sz, + self.window_size, + self.available + ); + // This should not be able to overflow `window_size` from the bottom. + self.window_size -= sz; + self.available -= sz; + } + + /// Decrements the window reflecting data has actually been sent. The caller + /// must ensure that the window has capacity. + pub fn send_data(&mut self, sz: WindowSize) { + tracing::trace!( + "send_data; sz={}; window={}; available={}", + sz, + self.window_size, + self.available + ); + + // Ensure that the argument is correct + assert!(self.window_size >= sz as usize); + + // Update values + self.window_size -= sz; + self.available -= sz; + } +} + +/// The current capacity of a flow-controlled Window. +/// +/// This number can go negative when either side has used a certain amount +/// of capacity when the other side advertises a reduction in size. +/// +/// This type tries to centralize the knowledge of addition and subtraction +/// to this capacity, instead of having integer casts throughout the source. +#[derive(Clone, Copy, Debug, PartialEq, PartialOrd)] +pub struct Window(i32); + +impl Window { + pub fn as_size(&self) -> WindowSize { + if self.0 < 0 { + 0 + } else { + self.0 as WindowSize + } + } + + pub fn checked_size(&self) -> WindowSize { + assert!(self.0 >= 0, "negative Window"); + self.0 as WindowSize + } +} + +impl PartialEq<usize> for Window { + fn eq(&self, other: &usize) -> bool { + if self.0 < 0 { + false + } else { + (self.0 as usize).eq(other) + } + } +} + +impl PartialOrd<usize> for Window { + fn partial_cmp(&self, other: &usize) -> Option<::std::cmp::Ordering> { + if self.0 < 0 { + Some(::std::cmp::Ordering::Less) + } else { + (self.0 as usize).partial_cmp(other) + } + } +} + +impl ::std::ops::SubAssign<WindowSize> for Window { + fn sub_assign(&mut self, other: WindowSize) { + self.0 -= other as i32; + } +} + +impl ::std::ops::Add<WindowSize> for Window { + type Output = Self; + fn add(self, other: WindowSize) -> Self::Output { + Window(self.0 + other as i32) + } +} + +impl ::std::ops::AddAssign<WindowSize> for Window { + fn add_assign(&mut self, other: WindowSize) { + self.0 += other as i32; + } +} + +impl fmt::Display for Window { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&self.0, f) + } +} + +impl From<Window> for isize { + fn from(w: Window) -> isize { + w.0 as isize + } +} diff --git a/third_party/rust/h2/src/proto/streams/mod.rs b/third_party/rust/h2/src/proto/streams/mod.rs new file mode 100644 index 0000000000..de2a2c85a0 --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/mod.rs @@ -0,0 +1,67 @@ +mod buffer; +mod counts; +mod flow_control; +mod prioritize; +mod recv; +mod send; +mod state; +mod store; +mod stream; +mod streams; + +pub(crate) use self::prioritize::Prioritized; +pub(crate) use self::recv::Open; +pub(crate) use self::send::PollReset; +pub(crate) use self::streams::{DynStreams, OpaqueStreamRef, StreamRef, Streams}; + +use self::buffer::Buffer; +use self::counts::Counts; +use self::flow_control::FlowControl; +use self::prioritize::Prioritize; +use self::recv::Recv; +use self::send::Send; +use self::state::State; +use self::store::Store; +use self::stream::Stream; + +use crate::frame::{StreamId, StreamIdOverflow}; +use crate::proto::*; + +use bytes::Bytes; +use std::time::Duration; + +#[derive(Debug)] +pub struct Config { + /// Initial window size of locally initiated streams + pub local_init_window_sz: WindowSize, + + /// Initial maximum number of locally initiated streams. + /// After receiving a Settings frame from the remote peer, + /// the connection will overwrite this value with the + /// MAX_CONCURRENT_STREAMS specified in the frame. + pub initial_max_send_streams: usize, + + /// Max amount of DATA bytes to buffer per stream. + pub local_max_buffer_size: usize, + + /// The stream ID to start the next local stream with + pub local_next_stream_id: StreamId, + + /// If the local peer is willing to receive push promises + pub local_push_enabled: bool, + + /// If extended connect protocol is enabled. + pub extended_connect_protocol_enabled: bool, + + /// How long a locally reset stream should ignore frames + pub local_reset_duration: Duration, + + /// Maximum number of locally reset streams to keep at a time + pub local_reset_max: usize, + + /// Initial window size of remote initiated streams + pub remote_init_window_sz: WindowSize, + + /// Maximum number of remote initiated streams + pub remote_max_initiated: Option<usize>, +} diff --git a/third_party/rust/h2/src/proto/streams/prioritize.rs b/third_party/rust/h2/src/proto/streams/prioritize.rs new file mode 100644 index 0000000000..c2904aca9b --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/prioritize.rs @@ -0,0 +1,903 @@ +use super::store::Resolve; +use super::*; + +use crate::frame::{Reason, StreamId}; + +use crate::codec::UserError; +use crate::codec::UserError::*; + +use bytes::buf::{Buf, Take}; +use std::io; +use std::task::{Context, Poll, Waker}; +use std::{cmp, fmt, mem}; + +/// # Warning +/// +/// Queued streams are ordered by stream ID, as we need to ensure that +/// lower-numbered streams are sent headers before higher-numbered ones. +/// This is because "idle" stream IDs – those which have been initiated but +/// have yet to receive frames – will be implicitly closed on receipt of a +/// frame on a higher stream ID. If these queues was not ordered by stream +/// IDs, some mechanism would be necessary to ensure that the lowest-numbered] +/// idle stream is opened first. +#[derive(Debug)] +pub(super) struct Prioritize { + /// Queue of streams waiting for socket capacity to send a frame. + pending_send: store::Queue<stream::NextSend>, + + /// Queue of streams waiting for window capacity to produce data. + pending_capacity: store::Queue<stream::NextSendCapacity>, + + /// Streams waiting for capacity due to max concurrency + /// + /// The `SendRequest` handle is `Clone`. This enables initiating requests + /// from many tasks. However, offering this capability while supporting + /// backpressure at some level is tricky. If there are many `SendRequest` + /// handles and a single stream becomes available, which handle gets + /// assigned that stream? Maybe that handle is no longer ready to send a + /// request. + /// + /// The strategy used is to allow each `SendRequest` handle one buffered + /// request. A `SendRequest` handle is ready to send a request if it has no + /// associated buffered requests. This is the same strategy as `mpsc` in the + /// futures library. + pending_open: store::Queue<stream::NextOpen>, + + /// Connection level flow control governing sent data + flow: FlowControl, + + /// Stream ID of the last stream opened. + last_opened_id: StreamId, + + /// What `DATA` frame is currently being sent in the codec. + in_flight_data_frame: InFlightData, + + /// The maximum amount of bytes a stream should buffer. + max_buffer_size: usize, +} + +#[derive(Debug, Eq, PartialEq)] +enum InFlightData { + /// There is no `DATA` frame in flight. + Nothing, + /// There is a `DATA` frame in flight belonging to the given stream. + DataFrame(store::Key), + /// There was a `DATA` frame, but the stream's queue was since cleared. + Drop, +} + +pub(crate) struct Prioritized<B> { + // The buffer + inner: Take<B>, + + end_of_stream: bool, + + // The stream that this is associated with + stream: store::Key, +} + +// ===== impl Prioritize ===== + +impl Prioritize { + pub fn new(config: &Config) -> Prioritize { + let mut flow = FlowControl::new(); + + flow.inc_window(config.remote_init_window_sz) + .expect("invalid initial window size"); + + flow.assign_capacity(config.remote_init_window_sz); + + tracing::trace!("Prioritize::new; flow={:?}", flow); + + Prioritize { + pending_send: store::Queue::new(), + pending_capacity: store::Queue::new(), + pending_open: store::Queue::new(), + flow, + last_opened_id: StreamId::ZERO, + in_flight_data_frame: InFlightData::Nothing, + max_buffer_size: config.local_max_buffer_size, + } + } + + pub(crate) fn max_buffer_size(&self) -> usize { + self.max_buffer_size + } + + /// Queue a frame to be sent to the remote + pub fn queue_frame<B>( + &mut self, + frame: Frame<B>, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + task: &mut Option<Waker>, + ) { + let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id); + let _e = span.enter(); + // Queue the frame in the buffer + stream.pending_send.push_back(buffer, frame); + self.schedule_send(stream, task); + } + + pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { + // If the stream is waiting to be opened, nothing more to do. + if stream.is_send_ready() { + tracing::trace!(?stream.id, "schedule_send"); + // Queue the stream + self.pending_send.push(stream); + + // Notify the connection. + if let Some(task) = task.take() { + task.wake(); + } + } + } + + pub fn queue_open(&mut self, stream: &mut store::Ptr) { + self.pending_open.push(stream); + } + + /// Send a data frame + pub fn send_data<B>( + &mut self, + frame: frame::Data<B>, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + task: &mut Option<Waker>, + ) -> Result<(), UserError> + where + B: Buf, + { + let sz = frame.payload().remaining(); + + if sz > MAX_WINDOW_SIZE as usize { + return Err(UserError::PayloadTooBig); + } + + let sz = sz as WindowSize; + + if !stream.state.is_send_streaming() { + if stream.state.is_closed() { + return Err(InactiveStreamId); + } else { + return Err(UnexpectedFrameType); + } + } + + // Update the buffered data counter + stream.buffered_send_data += sz as usize; + + let span = + tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity); + let _e = span.enter(); + tracing::trace!(buffered = stream.buffered_send_data); + + // Implicitly request more send capacity if not enough has been + // requested yet. + if (stream.requested_send_capacity as usize) < stream.buffered_send_data { + // Update the target requested capacity + stream.requested_send_capacity = + cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize; + + self.try_assign_capacity(stream); + } + + if frame.is_end_stream() { + stream.state.send_close(); + self.reserve_capacity(0, stream, counts); + } + + tracing::trace!( + available = %stream.send_flow.available(), + buffered = stream.buffered_send_data, + ); + + // The `stream.buffered_send_data == 0` check is here so that, if a zero + // length data frame is queued to the front (there is no previously + // queued data), it gets sent out immediately even if there is no + // available send window. + // + // Sending out zero length data frames can be done to signal + // end-of-stream. + // + if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 { + // The stream currently has capacity to send the data frame, so + // queue it up and notify the connection task. + self.queue_frame(frame.into(), buffer, stream, task); + } else { + // The stream has no capacity to send the frame now, save it but + // don't notify the connection task. Once additional capacity + // becomes available, the frame will be flushed. + stream.pending_send.push_back(buffer, frame.into()); + } + + Ok(()) + } + + /// Request capacity to send data + pub fn reserve_capacity( + &mut self, + capacity: WindowSize, + stream: &mut store::Ptr, + counts: &mut Counts, + ) { + let span = tracing::trace_span!( + "reserve_capacity", + ?stream.id, + requested = capacity, + effective = (capacity as usize) + stream.buffered_send_data, + curr = stream.requested_send_capacity + ); + let _e = span.enter(); + + // Actual capacity is `capacity` + the current amount of buffered data. + // If it were less, then we could never send out the buffered data. + let capacity = (capacity as usize) + stream.buffered_send_data; + + if capacity == stream.requested_send_capacity as usize { + // Nothing to do + } else if capacity < stream.requested_send_capacity as usize { + // Update the target requested capacity + stream.requested_send_capacity = capacity as WindowSize; + + // Currently available capacity assigned to the stream + let available = stream.send_flow.available().as_size(); + + // If the stream has more assigned capacity than requested, reclaim + // some for the connection + if available as usize > capacity { + let diff = available - capacity as WindowSize; + + stream.send_flow.claim_capacity(diff); + + self.assign_connection_capacity(diff, stream, counts); + } + } else { + // If trying to *add* capacity, but the stream send side is closed, + // there's nothing to be done. + if stream.state.is_send_closed() { + return; + } + + // Update the target requested capacity + stream.requested_send_capacity = + cmp::min(capacity, WindowSize::MAX as usize) as WindowSize; + + // Try to assign additional capacity to the stream. If none is + // currently available, the stream will be queued to receive some + // when more becomes available. + self.try_assign_capacity(stream); + } + } + + pub fn recv_stream_window_update( + &mut self, + inc: WindowSize, + stream: &mut store::Ptr, + ) -> Result<(), Reason> { + let span = tracing::trace_span!( + "recv_stream_window_update", + ?stream.id, + ?stream.state, + inc, + flow = ?stream.send_flow + ); + let _e = span.enter(); + + if stream.state.is_send_closed() && stream.buffered_send_data == 0 { + // We can't send any data, so don't bother doing anything else. + return Ok(()); + } + + // Update the stream level flow control. + stream.send_flow.inc_window(inc)?; + + // If the stream is waiting on additional capacity, then this will + // assign it (if available on the connection) and notify the producer + self.try_assign_capacity(stream); + + Ok(()) + } + + pub fn recv_connection_window_update( + &mut self, + inc: WindowSize, + store: &mut Store, + counts: &mut Counts, + ) -> Result<(), Reason> { + // Update the connection's window + self.flow.inc_window(inc)?; + + self.assign_connection_capacity(inc, store, counts); + Ok(()) + } + + /// Reclaim all capacity assigned to the stream and re-assign it to the + /// connection + pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { + let available = stream.send_flow.available().as_size(); + stream.send_flow.claim_capacity(available); + // Re-assign all capacity to the connection + self.assign_connection_capacity(available, stream, counts); + } + + /// Reclaim just reserved capacity, not buffered capacity, and re-assign + /// it to the connection + pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { + // only reclaim requested capacity that isn't already buffered + if stream.requested_send_capacity as usize > stream.buffered_send_data { + let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize; + + stream.send_flow.claim_capacity(reserved); + self.assign_connection_capacity(reserved, stream, counts); + } + } + + pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) { + let span = tracing::trace_span!("clear_pending_capacity"); + let _e = span.enter(); + while let Some(stream) = self.pending_capacity.pop(store) { + counts.transition(stream, |_, stream| { + tracing::trace!(?stream.id, "clear_pending_capacity"); + }) + } + } + + pub fn assign_connection_capacity<R>( + &mut self, + inc: WindowSize, + store: &mut R, + counts: &mut Counts, + ) where + R: Resolve, + { + let span = tracing::trace_span!("assign_connection_capacity", inc); + let _e = span.enter(); + + self.flow.assign_capacity(inc); + + // Assign newly acquired capacity to streams pending capacity. + while self.flow.available() > 0 { + let stream = match self.pending_capacity.pop(store) { + Some(stream) => stream, + None => return, + }; + + // Streams pending capacity may have been reset before capacity + // became available. In that case, the stream won't want any + // capacity, and so we shouldn't "transition" on it, but just evict + // it and continue the loop. + if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { + continue; + } + + counts.transition(stream, |_, mut stream| { + // Try to assign capacity to the stream. This will also re-queue the + // stream if there isn't enough connection level capacity to fulfill + // the capacity request. + self.try_assign_capacity(&mut stream); + }) + } + } + + /// Request capacity to send data + fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { + let total_requested = stream.requested_send_capacity; + + // Total requested should never go below actual assigned + // (Note: the window size can go lower than assigned) + debug_assert!(stream.send_flow.available() <= total_requested as usize); + + // The amount of additional capacity that the stream requests. + // Don't assign more than the window has available! + let additional = cmp::min( + total_requested - stream.send_flow.available().as_size(), + // Can't assign more than what is available + stream.send_flow.window_size() - stream.send_flow.available().as_size(), + ); + let span = tracing::trace_span!("try_assign_capacity", ?stream.id); + let _e = span.enter(); + tracing::trace!( + requested = total_requested, + additional, + buffered = stream.buffered_send_data, + window = stream.send_flow.window_size(), + conn = %self.flow.available() + ); + + if additional == 0 { + // Nothing more to do + return; + } + + // If the stream has requested capacity, then it must be in the + // streaming state (more data could be sent) or there is buffered data + // waiting to be sent. + debug_assert!( + stream.state.is_send_streaming() || stream.buffered_send_data > 0, + "state={:?}", + stream.state + ); + + // The amount of currently available capacity on the connection + let conn_available = self.flow.available().as_size(); + + // First check if capacity is immediately available + if conn_available > 0 { + // The amount of capacity to assign to the stream + // TODO: Should prioritization factor into this? + let assign = cmp::min(conn_available, additional); + + tracing::trace!(capacity = assign, "assigning"); + + // Assign the capacity to the stream + stream.assign_capacity(assign, self.max_buffer_size); + + // Claim the capacity from the connection + self.flow.claim_capacity(assign); + } + + tracing::trace!( + available = %stream.send_flow.available(), + requested = stream.requested_send_capacity, + buffered = stream.buffered_send_data, + has_unavailable = %stream.send_flow.has_unavailable() + ); + + if stream.send_flow.available() < stream.requested_send_capacity as usize + && stream.send_flow.has_unavailable() + { + // The stream requires additional capacity and the stream's + // window has available capacity, but the connection window + // does not. + // + // In this case, the stream needs to be queued up for when the + // connection has more capacity. + self.pending_capacity.push(stream); + } + + // If data is buffered and the stream is send ready, then + // schedule the stream for execution + if stream.buffered_send_data > 0 && stream.is_send_ready() { + // TODO: This assertion isn't *exactly* correct. There can still be + // buffered send data while the stream's pending send queue is + // empty. This can happen when a large data frame is in the process + // of being **partially** sent. Once the window has been sent, the + // data frame will be returned to the prioritization layer to be + // re-scheduled. + // + // That said, it would be nice to figure out how to make this + // assertion correctly. + // + // debug_assert!(!stream.pending_send.is_empty()); + + self.pending_send.push(stream); + } + } + + pub fn poll_complete<T, B>( + &mut self, + cx: &mut Context, + buffer: &mut Buffer<Frame<B>>, + store: &mut Store, + counts: &mut Counts, + dst: &mut Codec<T, Prioritized<B>>, + ) -> Poll<io::Result<()>> + where + T: AsyncWrite + Unpin, + B: Buf, + { + // Ensure codec is ready + ready!(dst.poll_ready(cx))?; + + // Reclaim any frame that has previously been written + self.reclaim_frame(buffer, store, dst); + + // The max frame length + let max_frame_len = dst.max_send_frame_size(); + + tracing::trace!("poll_complete"); + + loop { + self.schedule_pending_open(store, counts); + + match self.pop_frame(buffer, store, max_frame_len, counts) { + Some(frame) => { + tracing::trace!(?frame, "writing"); + + debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing); + if let Frame::Data(ref frame) = frame { + self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream); + } + dst.buffer(frame).expect("invalid frame"); + + // Ensure the codec is ready to try the loop again. + ready!(dst.poll_ready(cx))?; + + // Because, always try to reclaim... + self.reclaim_frame(buffer, store, dst); + } + None => { + // Try to flush the codec. + ready!(dst.flush(cx))?; + + // This might release a data frame... + if !self.reclaim_frame(buffer, store, dst) { + return Poll::Ready(Ok(())); + } + + // No need to poll ready as poll_complete() does this for + // us... + } + } + } + } + + /// Tries to reclaim a pending data frame from the codec. + /// + /// Returns true if a frame was reclaimed. + /// + /// When a data frame is written to the codec, it may not be written in its + /// entirety (large chunks are split up into potentially many data frames). + /// In this case, the stream needs to be reprioritized. + fn reclaim_frame<T, B>( + &mut self, + buffer: &mut Buffer<Frame<B>>, + store: &mut Store, + dst: &mut Codec<T, Prioritized<B>>, + ) -> bool + where + B: Buf, + { + let span = tracing::trace_span!("try_reclaim_frame"); + let _e = span.enter(); + + // First check if there are any data chunks to take back + if let Some(frame) = dst.take_last_data_frame() { + self.reclaim_frame_inner(buffer, store, frame) + } else { + false + } + } + + fn reclaim_frame_inner<B>( + &mut self, + buffer: &mut Buffer<Frame<B>>, + store: &mut Store, + frame: frame::Data<Prioritized<B>>, + ) -> bool + where + B: Buf, + { + tracing::trace!( + ?frame, + sz = frame.payload().inner.get_ref().remaining(), + "reclaimed" + ); + + let mut eos = false; + let key = frame.payload().stream; + + match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) { + InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"), + InFlightData::Drop => { + tracing::trace!("not reclaiming frame for cancelled stream"); + return false; + } + InFlightData::DataFrame(k) => { + debug_assert_eq!(k, key); + } + } + + let mut frame = frame.map(|prioritized| { + // TODO: Ensure fully written + eos = prioritized.end_of_stream; + prioritized.inner.into_inner() + }); + + if frame.payload().has_remaining() { + let mut stream = store.resolve(key); + + if eos { + frame.set_end_stream(true); + } + + self.push_back_frame(frame.into(), buffer, &mut stream); + + return true; + } + + false + } + + /// Push the frame to the front of the stream's deque, scheduling the + /// stream if needed. + fn push_back_frame<B>( + &mut self, + frame: Frame<B>, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + ) { + // Push the frame to the front of the stream's deque + stream.pending_send.push_front(buffer, frame); + + // If needed, schedule the sender + if stream.send_flow.available() > 0 { + debug_assert!(!stream.pending_send.is_empty()); + self.pending_send.push(stream); + } + } + + pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { + let span = tracing::trace_span!("clear_queue", ?stream.id); + let _e = span.enter(); + + // TODO: make this more efficient? + while let Some(frame) = stream.pending_send.pop_front(buffer) { + tracing::trace!(?frame, "dropping"); + } + + stream.buffered_send_data = 0; + stream.requested_send_capacity = 0; + if let InFlightData::DataFrame(key) = self.in_flight_data_frame { + if stream.key() == key { + // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. + self.in_flight_data_frame = InFlightData::Drop; + } + } + } + + pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) { + while let Some(stream) = self.pending_send.pop(store) { + let is_pending_reset = stream.is_pending_reset_expiration(); + counts.transition_after(stream, is_pending_reset); + } + } + + pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { + while let Some(stream) = self.pending_open.pop(store) { + let is_pending_reset = stream.is_pending_reset_expiration(); + counts.transition_after(stream, is_pending_reset); + } + } + + fn pop_frame<B>( + &mut self, + buffer: &mut Buffer<Frame<B>>, + store: &mut Store, + max_len: usize, + counts: &mut Counts, + ) -> Option<Frame<Prioritized<B>>> + where + B: Buf, + { + let span = tracing::trace_span!("pop_frame"); + let _e = span.enter(); + + loop { + match self.pending_send.pop(store) { + Some(mut stream) => { + let span = tracing::trace_span!("popped", ?stream.id, ?stream.state); + let _e = span.enter(); + + // It's possible that this stream, besides having data to send, + // is also queued to send a reset, and thus is already in the queue + // to wait for "some time" after a reset. + // + // To be safe, we just always ask the stream. + let is_pending_reset = stream.is_pending_reset_expiration(); + + tracing::trace!(is_pending_reset); + + let frame = match stream.pending_send.pop_front(buffer) { + Some(Frame::Data(mut frame)) => { + // Get the amount of capacity remaining for stream's + // window. + let stream_capacity = stream.send_flow.available(); + let sz = frame.payload().remaining(); + + tracing::trace!( + sz, + eos = frame.is_end_stream(), + window = %stream_capacity, + available = %stream.send_flow.available(), + requested = stream.requested_send_capacity, + buffered = stream.buffered_send_data, + "data frame" + ); + + // Zero length data frames always have capacity to + // be sent. + if sz > 0 && stream_capacity == 0 { + tracing::trace!("stream capacity is 0"); + + // Ensure that the stream is waiting for + // connection level capacity + // + // TODO: uncomment + // debug_assert!(stream.is_pending_send_capacity); + + // The stream has no more capacity, this can + // happen if the remote reduced the stream + // window. In this case, we need to buffer the + // frame and wait for a window update... + stream.pending_send.push_front(buffer, frame.into()); + + continue; + } + + // Only send up to the max frame length + let len = cmp::min(sz, max_len); + + // Only send up to the stream's window capacity + let len = + cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; + + // There *must* be be enough connection level + // capacity at this point. + debug_assert!(len <= self.flow.window_size()); + + tracing::trace!(len, "sending data frame"); + + // Update the flow control + tracing::trace_span!("updating stream flow").in_scope(|| { + stream.send_flow.send_data(len); + + // Decrement the stream's buffered data counter + debug_assert!(stream.buffered_send_data >= len as usize); + stream.buffered_send_data -= len as usize; + stream.requested_send_capacity -= len; + + // If the capacity was limited because of the + // max_send_buffer_size, then consider waking + // the send task again... + stream.notify_if_can_buffer_more(self.max_buffer_size); + + // Assign the capacity back to the connection that + // was just consumed from the stream in the previous + // line. + self.flow.assign_capacity(len); + }); + + let (eos, len) = tracing::trace_span!("updating connection flow") + .in_scope(|| { + self.flow.send_data(len); + + // Wrap the frame's data payload to ensure that the + // correct amount of data gets written. + + let eos = frame.is_end_stream(); + let len = len as usize; + + if frame.payload().remaining() > len { + frame.set_end_stream(false); + } + (eos, len) + }); + + Frame::Data(frame.map(|buf| Prioritized { + inner: buf.take(len), + end_of_stream: eos, + stream: stream.key(), + })) + } + Some(Frame::PushPromise(pp)) => { + let mut pushed = + stream.store_mut().find_mut(&pp.promised_id()).unwrap(); + pushed.is_pending_push = false; + // Transition stream from pending_push to pending_open + // if possible + if !pushed.pending_send.is_empty() { + if counts.can_inc_num_send_streams() { + counts.inc_num_send_streams(&mut pushed); + self.pending_send.push(&mut pushed); + } else { + self.queue_open(&mut pushed); + } + } + Frame::PushPromise(pp) + } + Some(frame) => frame.map(|_| { + unreachable!( + "Frame::map closure will only be called \ + on DATA frames." + ) + }), + None => { + if let Some(reason) = stream.state.get_scheduled_reset() { + let stream_id = stream.id; + stream + .state + .set_reset(stream_id, reason, Initiator::Library); + + let frame = frame::Reset::new(stream.id, reason); + Frame::Reset(frame) + } else { + // If the stream receives a RESET from the peer, it may have + // had data buffered to be sent, but all the frames are cleared + // in clear_queue(). Instead of doing O(N) traversal through queue + // to remove, lets just ignore the stream here. + tracing::trace!("removing dangling stream from pending_send"); + // Since this should only happen as a consequence of `clear_queue`, + // we must be in a closed state of some kind. + debug_assert!(stream.state.is_closed()); + counts.transition_after(stream, is_pending_reset); + continue; + } + } + }; + + tracing::trace!("pop_frame; frame={:?}", frame); + + if cfg!(debug_assertions) && stream.state.is_idle() { + debug_assert!(stream.id > self.last_opened_id); + self.last_opened_id = stream.id; + } + + if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() { + // TODO: Only requeue the sender IF it is ready to send + // the next frame. i.e. don't requeue it if the next + // frame is a data frame and the stream does not have + // any more capacity. + self.pending_send.push(&mut stream); + } + + counts.transition_after(stream, is_pending_reset); + + return Some(frame); + } + None => return None, + } + } + } + + fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { + tracing::trace!("schedule_pending_open"); + // check for any pending open streams + while counts.can_inc_num_send_streams() { + if let Some(mut stream) = self.pending_open.pop(store) { + tracing::trace!("schedule_pending_open; stream={:?}", stream.id); + + counts.inc_num_send_streams(&mut stream); + self.pending_send.push(&mut stream); + stream.notify_send(); + } else { + return; + } + } + } +} + +// ===== impl Prioritized ===== + +impl<B> Buf for Prioritized<B> +where + B: Buf, +{ + fn remaining(&self) -> usize { + self.inner.remaining() + } + + fn chunk(&self) -> &[u8] { + self.inner.chunk() + } + + fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize { + self.inner.chunks_vectored(dst) + } + + fn advance(&mut self, cnt: usize) { + self.inner.advance(cnt) + } +} + +impl<B: Buf> fmt::Debug for Prioritized<B> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Prioritized") + .field("remaining", &self.inner.get_ref().remaining()) + .field("end_of_stream", &self.end_of_stream) + .field("stream", &self.stream) + .finish() + } +} diff --git a/third_party/rust/h2/src/proto/streams/recv.rs b/third_party/rust/h2/src/proto/streams/recv.rs new file mode 100644 index 0000000000..3af1af3a1d --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/recv.rs @@ -0,0 +1,1097 @@ +use super::*; +use crate::codec::UserError; +use crate::frame::{self, PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE}; +use crate::proto::{self, Error}; +use std::task::Context; + +use http::{HeaderMap, Request, Response}; + +use std::io; +use std::task::{Poll, Waker}; +use std::time::{Duration, Instant}; + +#[derive(Debug)] +pub(super) struct Recv { + /// Initial window size of remote initiated streams + init_window_sz: WindowSize, + + /// Connection level flow control governing received data + flow: FlowControl, + + /// Amount of connection window capacity currently used by outstanding streams. + in_flight_data: WindowSize, + + /// The lowest stream ID that is still idle + next_stream_id: Result<StreamId, StreamIdOverflow>, + + /// The stream ID of the last processed stream + last_processed_id: StreamId, + + /// Any streams with a higher ID are ignored. + /// + /// This starts as MAX, but is lowered when a GOAWAY is received. + /// + /// > After sending a GOAWAY frame, the sender can discard frames for + /// > streams initiated by the receiver with identifiers higher than + /// > the identified last stream. + max_stream_id: StreamId, + + /// Streams that have pending window updates + pending_window_updates: store::Queue<stream::NextWindowUpdate>, + + /// New streams to be accepted + pending_accept: store::Queue<stream::NextAccept>, + + /// Locally reset streams that should be reaped when they expire + pending_reset_expired: store::Queue<stream::NextResetExpire>, + + /// How long locally reset streams should ignore received frames + reset_duration: Duration, + + /// Holds frames that are waiting to be read + buffer: Buffer<Event>, + + /// Refused StreamId, this represents a frame that must be sent out. + refused: Option<StreamId>, + + /// If push promises are allowed to be received. + is_push_enabled: bool, + + /// If extended connect protocol is enabled. + is_extended_connect_protocol_enabled: bool, +} + +#[derive(Debug)] +pub(super) enum Event { + Headers(peer::PollMessage), + Data(Bytes), + Trailers(HeaderMap), +} + +#[derive(Debug)] +pub(super) enum RecvHeaderBlockError<T> { + Oversize(T), + State(Error), +} + +#[derive(Debug)] +pub(crate) enum Open { + PushPromise, + Headers, +} + +impl Recv { + pub fn new(peer: peer::Dyn, config: &Config) -> Self { + let next_stream_id = if peer.is_server() { 1 } else { 2 }; + + let mut flow = FlowControl::new(); + + // connections always have the default window size, regardless of + // settings + flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE) + .expect("invalid initial remote window size"); + flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE); + + Recv { + init_window_sz: config.local_init_window_sz, + flow, + in_flight_data: 0 as WindowSize, + next_stream_id: Ok(next_stream_id.into()), + pending_window_updates: store::Queue::new(), + last_processed_id: StreamId::ZERO, + max_stream_id: StreamId::MAX, + pending_accept: store::Queue::new(), + pending_reset_expired: store::Queue::new(), + reset_duration: config.local_reset_duration, + buffer: Buffer::new(), + refused: None, + is_push_enabled: config.local_push_enabled, + is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled, + } + } + + /// Returns the initial receive window size + pub fn init_window_sz(&self) -> WindowSize { + self.init_window_sz + } + + /// Returns the ID of the last processed stream + pub fn last_processed_id(&self) -> StreamId { + self.last_processed_id + } + + /// Update state reflecting a new, remotely opened stream + /// + /// Returns the stream state if successful. `None` if refused + pub fn open( + &mut self, + id: StreamId, + mode: Open, + counts: &mut Counts, + ) -> Result<Option<StreamId>, Error> { + assert!(self.refused.is_none()); + + counts.peer().ensure_can_open(id, mode)?; + + let next_id = self.next_stream_id()?; + if id < next_id { + proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + + self.next_stream_id = id.next_id(); + + if !counts.can_inc_num_recv_streams() { + self.refused = Some(id); + return Ok(None); + } + + Ok(Some(id)) + } + + /// Transition the stream state based on receiving headers + /// + /// The caller ensures that the frame represents headers and not trailers. + pub fn recv_headers( + &mut self, + frame: frame::Headers, + stream: &mut store::Ptr, + counts: &mut Counts, + ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> { + tracing::trace!("opening stream; init_window={}", self.init_window_sz); + let is_initial = stream.state.recv_open(&frame)?; + + if is_initial { + // TODO: be smarter about this logic + if frame.stream_id() > self.last_processed_id { + self.last_processed_id = frame.stream_id(); + } + + // Increment the number of concurrent streams + counts.inc_num_recv_streams(stream); + } + + if !stream.content_length.is_head() { + use super::stream::ContentLength; + use http::header; + + if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) { + let content_length = match frame::parse_u64(content_length.as_bytes()) { + Ok(v) => v, + Err(()) => { + proto_err!(stream: "could not parse content-length; stream={:?}", stream.id); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); + } + }; + + stream.content_length = ContentLength::Remaining(content_length); + } + } + + if frame.is_over_size() { + // A frame is over size if the decoded header block was bigger than + // SETTINGS_MAX_HEADER_LIST_SIZE. + // + // > A server that receives a larger header block than it is willing + // > to handle can send an HTTP 431 (Request Header Fields Too + // > Large) status code [RFC6585]. A client can discard responses + // > that it cannot process. + // + // So, if peer is a server, we'll send a 431. In either case, + // an error is recorded, which will send a REFUSED_STREAM, + // since we don't want any of the data frames either. + tracing::debug!( + "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \ + recv_headers: frame is over size; stream={:?}", + stream.id + ); + return if counts.peer().is_server() && is_initial { + let mut res = frame::Headers::new( + stream.id, + frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE), + HeaderMap::new(), + ); + res.set_end_stream(); + Err(RecvHeaderBlockError::Oversize(Some(res))) + } else { + Err(RecvHeaderBlockError::Oversize(None)) + }; + } + + let stream_id = frame.stream_id(); + let (pseudo, fields) = frame.into_parts(); + + if pseudo.protocol.is_some() { + if counts.peer().is_server() && !self.is_extended_connect_protocol_enabled { + proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into()); + } + } + + if !pseudo.is_informational() { + let message = counts + .peer() + .convert_poll_message(pseudo, fields, stream_id)?; + + // Push the frame onto the stream's recv buffer + stream + .pending_recv + .push_back(&mut self.buffer, Event::Headers(message)); + stream.notify_recv(); + } + + // Only servers can receive a headers frame that initiates the stream. + // This is verified in `Streams` before calling this function. + if counts.peer().is_server() { + self.pending_accept.push(stream); + } + + Ok(()) + } + + /// Called by the server to get the request + /// + /// TODO: Should this fn return `Result`? + pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> { + use super::peer::PollMessage::*; + + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Event::Headers(Server(request))) => request, + _ => panic!(), + } + } + + /// Called by the client to get pushed response + pub fn poll_pushed( + &mut self, + cx: &Context, + stream: &mut store::Ptr, + ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> { + use super::peer::PollMessage::*; + + let mut ppp = stream.pending_push_promises.take(); + let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| { + match pushed.pending_recv.pop_front(&mut self.buffer) { + Some(Event::Headers(Server(headers))) => (headers, pushed.key()), + // When frames are pushed into the queue, it is verified that + // the first frame is a HEADERS frame. + _ => panic!("Headers not set on pushed stream"), + } + }); + stream.pending_push_promises = ppp; + if let Some(p) = pushed { + Poll::Ready(Some(Ok(p))) + } else { + let is_open = stream.state.ensure_recv_open()?; + + if is_open { + stream.recv_task = Some(cx.waker().clone()); + Poll::Pending + } else { + Poll::Ready(None) + } + } + } + + /// Called by the client to get the response + pub fn poll_response( + &mut self, + cx: &Context, + stream: &mut store::Ptr, + ) -> Poll<Result<Response<()>, proto::Error>> { + use super::peer::PollMessage::*; + + // If the buffer is not empty, then the first frame must be a HEADERS + // frame or the user violated the contract. + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)), + Some(_) => panic!("poll_response called after response returned"), + None => { + stream.state.ensure_recv_open()?; + + stream.recv_task = Some(cx.waker().clone()); + Poll::Pending + } + } + } + + /// Transition the stream based on receiving trailers + pub fn recv_trailers( + &mut self, + frame: frame::Headers, + stream: &mut store::Ptr, + ) -> Result<(), Error> { + // Transition the state + stream.state.recv_close()?; + + if stream.ensure_content_length_zero().is_err() { + proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); + } + + let trailers = frame.into_fields(); + + // Push the frame onto the stream's recv buffer + stream + .pending_recv + .push_back(&mut self.buffer, Event::Trailers(trailers)); + stream.notify_recv(); + + Ok(()) + } + + /// Releases capacity of the connection + pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) { + tracing::trace!( + "release_connection_capacity; size={}, connection in_flight_data={}", + capacity, + self.in_flight_data, + ); + + // Decrement in-flight data + self.in_flight_data -= capacity; + + // Assign capacity to connection + self.flow.assign_capacity(capacity); + + if self.flow.unclaimed_capacity().is_some() { + if let Some(task) = task.take() { + task.wake(); + } + } + } + + /// Releases capacity back to the connection & stream + pub fn release_capacity( + &mut self, + capacity: WindowSize, + stream: &mut store::Ptr, + task: &mut Option<Waker>, + ) -> Result<(), UserError> { + tracing::trace!("release_capacity; size={}", capacity); + + if capacity > stream.in_flight_recv_data { + return Err(UserError::ReleaseCapacityTooBig); + } + + self.release_connection_capacity(capacity, task); + + // Decrement in-flight data + stream.in_flight_recv_data -= capacity; + + // Assign capacity to stream + stream.recv_flow.assign_capacity(capacity); + + if stream.recv_flow.unclaimed_capacity().is_some() { + // Queue the stream for sending the WINDOW_UPDATE frame. + self.pending_window_updates.push(stream); + + if let Some(task) = task.take() { + task.wake(); + } + } + + Ok(()) + } + + /// Release any unclaimed capacity for a closed stream. + pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { + debug_assert_eq!(stream.ref_count, 0); + + if stream.in_flight_recv_data == 0 { + return; + } + + tracing::trace!( + "auto-release closed stream ({:?}) capacity: {:?}", + stream.id, + stream.in_flight_recv_data, + ); + + self.release_connection_capacity(stream.in_flight_recv_data, task); + stream.in_flight_recv_data = 0; + + self.clear_recv_buffer(stream); + } + + /// Set the "target" connection window size. + /// + /// By default, all new connections start with 64kb of window size. As + /// streams used and release capacity, we will send WINDOW_UPDATEs for the + /// connection to bring it back up to the initial "target". + /// + /// Setting a target means that we will try to tell the peer about + /// WINDOW_UPDATEs so the peer knows it has about `target` window to use + /// for the whole connection. + /// + /// The `task` is an optional parked task for the `Connection` that might + /// be blocked on needing more window capacity. + pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) { + tracing::trace!( + "set_target_connection_window; target={}; available={}, reserved={}", + target, + self.flow.available(), + self.in_flight_data, + ); + + // The current target connection window is our `available` plus any + // in-flight data reserved by streams. + // + // Update the flow controller with the difference between the new + // target and the current target. + let current = (self.flow.available() + self.in_flight_data).checked_size(); + if target > current { + self.flow.assign_capacity(target - current); + } else { + self.flow.claim_capacity(current - target); + } + + // If changing the target capacity means we gained a bunch of capacity, + // enough that we went over the update threshold, then schedule sending + // a connection WINDOW_UPDATE. + if self.flow.unclaimed_capacity().is_some() { + if let Some(task) = task.take() { + task.wake(); + } + } + } + + pub(crate) fn apply_local_settings( + &mut self, + settings: &frame::Settings, + store: &mut Store, + ) -> Result<(), proto::Error> { + if let Some(val) = settings.is_extended_connect_protocol_enabled() { + self.is_extended_connect_protocol_enabled = val; + } + + if let Some(target) = settings.initial_window_size() { + let old_sz = self.init_window_sz; + self.init_window_sz = target; + + tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,); + + // Per RFC 7540 §6.9.2: + // + // In addition to changing the flow-control window for streams that are + // not yet active, a SETTINGS frame can alter the initial flow-control + // window size for streams with active flow-control windows (that is, + // streams in the "open" or "half-closed (remote)" state). When the + // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust + // the size of all stream flow-control windows that it maintains by the + // difference between the new value and the old value. + // + // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available + // space in a flow-control window to become negative. A sender MUST + // track the negative flow-control window and MUST NOT send new + // flow-controlled frames until it receives WINDOW_UPDATE frames that + // cause the flow-control window to become positive. + + if target < old_sz { + // We must decrease the (local) window on every open stream. + let dec = old_sz - target; + tracing::trace!("decrementing all windows; dec={}", dec); + + store.for_each(|mut stream| { + stream.recv_flow.dec_recv_window(dec); + }) + } else if target > old_sz { + // We must increase the (local) window on every open stream. + let inc = target - old_sz; + tracing::trace!("incrementing all windows; inc={}", inc); + store.try_for_each(|mut stream| { + // XXX: Shouldn't the peer have already noticed our + // overflow and sent us a GOAWAY? + stream + .recv_flow + .inc_window(inc) + .map_err(proto::Error::library_go_away)?; + stream.recv_flow.assign_capacity(inc); + Ok::<_, proto::Error>(()) + })?; + } + } + + Ok(()) + } + + pub fn is_end_stream(&self, stream: &store::Ptr) -> bool { + if !stream.state.is_recv_closed() { + return false; + } + + stream.pending_recv.is_empty() + } + + pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> { + let sz = frame.payload().len(); + + // This should have been enforced at the codec::FramedRead layer, so + // this is just a sanity check. + assert!(sz <= MAX_WINDOW_SIZE as usize); + + let sz = sz as WindowSize; + + let is_ignoring_frame = stream.state.is_local_reset(); + + if !is_ignoring_frame && !stream.state.is_recv_streaming() { + // TODO: There are cases where this can be a stream error of + // STREAM_CLOSED instead... + + // Receiving a DATA frame when not expecting one is a protocol + // error. + proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + + tracing::trace!( + "recv_data; size={}; connection={}; stream={}", + sz, + self.flow.window_size(), + stream.recv_flow.window_size() + ); + + if is_ignoring_frame { + tracing::trace!( + "recv_data; frame ignored on locally reset {:?} for some time", + stream.id, + ); + return Ok(self.ignore_data(sz)?); + } + + // Ensure that there is enough capacity on the connection before acting + // on the stream. + self.consume_connection_window(sz)?; + + if stream.recv_flow.window_size() < sz { + // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE + // > A receiver MAY respond with a stream error (Section 5.4.2) or + // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if + // > it is unable to accept a frame. + // + // So, for violating the **stream** window, we can send either a + // stream or connection error. We've opted to send a stream + // error. + return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR)); + } + + if stream.dec_content_length(frame.payload().len()).is_err() { + proto_err!(stream: + "recv_data: content-length overflow; stream={:?}; len={:?}", + stream.id, + frame.payload().len(), + ); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); + } + + if frame.is_end_stream() { + if stream.ensure_content_length_zero().is_err() { + proto_err!(stream: + "recv_data: content-length underflow; stream={:?}; len={:?}", + stream.id, + frame.payload().len(), + ); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); + } + + if stream.state.recv_close().is_err() { + proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); + } + } + + // Update stream level flow control + stream.recv_flow.send_data(sz); + + // Track the data as in-flight + stream.in_flight_recv_data += sz; + + let event = Event::Data(frame.into_payload()); + + // Push the frame onto the recv buffer + stream.pending_recv.push_back(&mut self.buffer, event); + stream.notify_recv(); + + Ok(()) + } + + pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> { + // Ensure that there is enough capacity on the connection... + self.consume_connection_window(sz)?; + + // Since we are ignoring this frame, + // we aren't returning the frame to the user. That means they + // have no way to release the capacity back to the connection. So + // we have to release it automatically. + // + // This call doesn't send a WINDOW_UPDATE immediately, just marks + // the capacity as available to be reclaimed. When the available + // capacity meets a threshold, a WINDOW_UPDATE is then sent. + self.release_connection_capacity(sz, &mut None); + Ok(()) + } + + pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> { + if self.flow.window_size() < sz { + tracing::debug!( + "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});", + self.flow.window_size(), + sz, + ); + return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR)); + } + + // Update connection level flow control + self.flow.send_data(sz); + + // Track the data as in-flight + self.in_flight_data += sz; + Ok(()) + } + + pub fn recv_push_promise( + &mut self, + frame: frame::PushPromise, + stream: &mut store::Ptr, + ) -> Result<(), Error> { + stream.state.reserve_remote()?; + if frame.is_over_size() { + // A frame is over size if the decoded header block was bigger than + // SETTINGS_MAX_HEADER_LIST_SIZE. + // + // > A server that receives a larger header block than it is willing + // > to handle can send an HTTP 431 (Request Header Fields Too + // > Large) status code [RFC6585]. A client can discard responses + // > that it cannot process. + // + // So, if peer is a server, we'll send a 431. In either case, + // an error is recorded, which will send a REFUSED_STREAM, + // since we don't want any of the data frames either. + tracing::debug!( + "stream error REFUSED_STREAM -- recv_push_promise: \ + headers frame is over size; promised_id={:?};", + frame.promised_id(), + ); + return Err(Error::library_reset( + frame.promised_id(), + Reason::REFUSED_STREAM, + )); + } + + let promised_id = frame.promised_id(); + let (pseudo, fields) = frame.into_parts(); + let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?; + + if let Err(e) = frame::PushPromise::validate_request(&req) { + use PushPromiseHeaderError::*; + match e { + NotSafeAndCacheable => proto_err!( + stream: + "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}", + req.method(), + promised_id, + ), + InvalidContentLength(e) => proto_err!( + stream: + "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}", + e, + promised_id, + ), + } + return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR)); + } + + use super::peer::PollMessage::*; + stream + .pending_recv + .push_back(&mut self.buffer, Event::Headers(Server(req))); + stream.notify_recv(); + Ok(()) + } + + /// Ensures that `id` is not in the `Idle` state. + pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { + if let Ok(next) = self.next_stream_id { + if id >= next { + tracing::debug!( + "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}", + id + ); + return Err(Reason::PROTOCOL_ERROR); + } + } + // if next_stream_id is overflowed, that's ok. + + Ok(()) + } + + /// Handle remote sending an explicit RST_STREAM. + pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { + // Notify the stream + stream.state.recv_reset(frame, stream.is_pending_send); + + stream.notify_send(); + stream.notify_recv(); + } + + /// Handle a connection-level error + pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) { + // Receive an error + stream.state.handle_error(err); + + // If a receiver is waiting, notify it + stream.notify_send(); + stream.notify_recv(); + } + + pub fn go_away(&mut self, last_processed_id: StreamId) { + assert!(self.max_stream_id >= last_processed_id); + self.max_stream_id = last_processed_id; + } + + pub fn recv_eof(&mut self, stream: &mut Stream) { + stream.state.recv_eof(); + stream.notify_send(); + stream.notify_recv(); + } + + pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) { + while let Some(_) = stream.pending_recv.pop_front(&mut self.buffer) { + // drop it + } + } + + /// Get the max ID of streams we can receive. + /// + /// This gets lowered if we send a GOAWAY frame. + pub fn max_stream_id(&self) -> StreamId { + self.max_stream_id + } + + pub fn next_stream_id(&self) -> Result<StreamId, Error> { + if let Ok(id) = self.next_stream_id { + Ok(id) + } else { + Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) + } + } + + pub fn may_have_created_stream(&self, id: StreamId) -> bool { + if let Ok(next_id) = self.next_stream_id { + // Peer::is_local_init should have been called beforehand + debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),); + id < next_id + } else { + true + } + } + + pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) { + if let Ok(next_id) = self.next_stream_id { + // !Peer::is_local_init should have been called beforehand + debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated()); + if id >= next_id { + self.next_stream_id = id.next_id(); + } + } + } + + /// Returns true if the remote peer can reserve a stream with the given ID. + pub fn ensure_can_reserve(&self) -> Result<(), Error> { + if !self.is_push_enabled { + proto_err!(conn: "recv_push_promise: push is disabled"); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + + Ok(()) + } + + /// Add a locally reset stream to queue to be eventually reaped. + pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { + if !stream.state.is_local_reset() || stream.is_pending_reset_expiration() { + return; + } + + tracing::trace!("enqueue_reset_expiration; {:?}", stream.id); + + if !counts.can_inc_num_reset_streams() { + // try to evict 1 stream if possible + // if max allow is 0, this won't be able to evict, + // and then we'll just bail after + if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) { + counts.transition_after(evicted, true); + } + } + + if counts.can_inc_num_reset_streams() { + counts.inc_num_reset_streams(); + self.pending_reset_expired.push(stream); + } + } + + /// Send any pending refusals. + pub fn send_pending_refusal<T, B>( + &mut self, + cx: &mut Context, + dst: &mut Codec<T, Prioritized<B>>, + ) -> Poll<io::Result<()>> + where + T: AsyncWrite + Unpin, + B: Buf, + { + if let Some(stream_id) = self.refused { + ready!(dst.poll_ready(cx))?; + + // Create the RST_STREAM frame + let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM); + + // Buffer the frame + dst.buffer(frame.into()).expect("invalid RST_STREAM frame"); + } + + self.refused = None; + + Poll::Ready(Ok(())) + } + + pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) { + if !self.pending_reset_expired.is_empty() { + let now = Instant::now(); + let reset_duration = self.reset_duration; + while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| { + let reset_at = stream.reset_at.expect("reset_at must be set if in queue"); + // rust-lang/rust#86470 tracks a bug in the standard library where `Instant` + // subtraction can panic (because, on some platforms, `Instant` isn't actually + // monotonic). We use a saturating operation to avoid this panic here. + now.saturating_duration_since(reset_at) > reset_duration + }) { + counts.transition_after(stream, true); + } + } + } + + pub fn clear_queues( + &mut self, + clear_pending_accept: bool, + store: &mut Store, + counts: &mut Counts, + ) { + self.clear_stream_window_update_queue(store, counts); + self.clear_all_reset_streams(store, counts); + + if clear_pending_accept { + self.clear_all_pending_accept(store, counts); + } + } + + fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) { + while let Some(stream) = self.pending_window_updates.pop(store) { + counts.transition(stream, |_, stream| { + tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id); + }) + } + } + + /// Called on EOF + fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) { + while let Some(stream) = self.pending_reset_expired.pop(store) { + counts.transition_after(stream, true); + } + } + + fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) { + while let Some(stream) = self.pending_accept.pop(store) { + counts.transition_after(stream, false); + } + } + + pub fn poll_complete<T, B>( + &mut self, + cx: &mut Context, + store: &mut Store, + counts: &mut Counts, + dst: &mut Codec<T, Prioritized<B>>, + ) -> Poll<io::Result<()>> + where + T: AsyncWrite + Unpin, + B: Buf, + { + // Send any pending connection level window updates + ready!(self.send_connection_window_update(cx, dst))?; + + // Send any pending stream level window updates + ready!(self.send_stream_window_updates(cx, store, counts, dst))?; + + Poll::Ready(Ok(())) + } + + /// Send connection level window update + fn send_connection_window_update<T, B>( + &mut self, + cx: &mut Context, + dst: &mut Codec<T, Prioritized<B>>, + ) -> Poll<io::Result<()>> + where + T: AsyncWrite + Unpin, + B: Buf, + { + if let Some(incr) = self.flow.unclaimed_capacity() { + let frame = frame::WindowUpdate::new(StreamId::zero(), incr); + + // Ensure the codec has capacity + ready!(dst.poll_ready(cx))?; + + // Buffer the WINDOW_UPDATE frame + dst.buffer(frame.into()) + .expect("invalid WINDOW_UPDATE frame"); + + // Update flow control + self.flow + .inc_window(incr) + .expect("unexpected flow control state"); + } + + Poll::Ready(Ok(())) + } + + /// Send stream level window update + pub fn send_stream_window_updates<T, B>( + &mut self, + cx: &mut Context, + store: &mut Store, + counts: &mut Counts, + dst: &mut Codec<T, Prioritized<B>>, + ) -> Poll<io::Result<()>> + where + T: AsyncWrite + Unpin, + B: Buf, + { + loop { + // Ensure the codec has capacity + ready!(dst.poll_ready(cx))?; + + // Get the next stream + let stream = match self.pending_window_updates.pop(store) { + Some(stream) => stream, + None => return Poll::Ready(Ok(())), + }; + + counts.transition(stream, |_, stream| { + tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id); + debug_assert!(!stream.is_pending_window_update); + + if !stream.state.is_recv_streaming() { + // No need to send window updates on the stream if the stream is + // no longer receiving data. + // + // TODO: is this correct? We could possibly send a window + // update on a ReservedRemote stream if we already know + // we want to stream the data faster... + return; + } + + // TODO: de-dup + if let Some(incr) = stream.recv_flow.unclaimed_capacity() { + // Create the WINDOW_UPDATE frame + let frame = frame::WindowUpdate::new(stream.id, incr); + + // Buffer it + dst.buffer(frame.into()) + .expect("invalid WINDOW_UPDATE frame"); + + // Update flow control + stream + .recv_flow + .inc_window(incr) + .expect("unexpected flow control state"); + } + }) + } + } + + pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> { + self.pending_accept.pop(store).map(|ptr| ptr.key()) + } + + pub fn poll_data( + &mut self, + cx: &Context, + stream: &mut Stream, + ) -> Poll<Option<Result<Bytes, proto::Error>>> { + // TODO: Return error when the stream is reset + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))), + Some(event) => { + // Frame is trailer + stream.pending_recv.push_front(&mut self.buffer, event); + + // Notify the recv task. This is done just in case + // `poll_trailers` was called. + // + // It is very likely that `notify_recv` will just be a no-op (as + // the task will be None), so this isn't really much of a + // performance concern. It also means we don't have to track + // state to see if `poll_trailers` was called before `poll_data` + // returned `None`. + stream.notify_recv(); + + // No more data frames + Poll::Ready(None) + } + None => self.schedule_recv(cx, stream), + } + } + + pub fn poll_trailers( + &mut self, + cx: &Context, + stream: &mut Stream, + ) -> Poll<Option<Result<HeaderMap, proto::Error>>> { + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))), + Some(event) => { + // Frame is not trailers.. not ready to poll trailers yet. + stream.pending_recv.push_front(&mut self.buffer, event); + + Poll::Pending + } + None => self.schedule_recv(cx, stream), + } + } + + fn schedule_recv<T>( + &mut self, + cx: &Context, + stream: &mut Stream, + ) -> Poll<Option<Result<T, proto::Error>>> { + if stream.state.ensure_recv_open()? { + // Request to get notified once more frames arrive + stream.recv_task = Some(cx.waker().clone()); + Poll::Pending + } else { + // No more frames will be received + Poll::Ready(None) + } + } +} + +// ===== impl Open ===== + +impl Open { + pub fn is_push_promise(&self) -> bool { + use self::Open::*; + + match *self { + PushPromise => true, + _ => false, + } + } +} + +// ===== impl RecvHeaderBlockError ===== + +impl<T> From<Error> for RecvHeaderBlockError<T> { + fn from(err: Error) -> Self { + RecvHeaderBlockError::State(err) + } +} diff --git a/third_party/rust/h2/src/proto/streams/send.rs b/third_party/rust/h2/src/proto/streams/send.rs new file mode 100644 index 0000000000..2c5a38c801 --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/send.rs @@ -0,0 +1,565 @@ +use super::{ + store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId, + StreamIdOverflow, WindowSize, +}; +use crate::codec::UserError; +use crate::frame::{self, Reason}; +use crate::proto::{Error, Initiator}; + +use bytes::Buf; +use http; +use std::task::{Context, Poll, Waker}; +use tokio::io::AsyncWrite; + +use std::io; + +/// Manages state transitions related to outbound frames. +#[derive(Debug)] +pub(super) struct Send { + /// Stream identifier to use for next initialized stream. + next_stream_id: Result<StreamId, StreamIdOverflow>, + + /// Any streams with a higher ID are ignored. + /// + /// This starts as MAX, but is lowered when a GOAWAY is received. + /// + /// > After sending a GOAWAY frame, the sender can discard frames for + /// > streams initiated by the receiver with identifiers higher than + /// > the identified last stream. + max_stream_id: StreamId, + + /// Initial window size of locally initiated streams + init_window_sz: WindowSize, + + /// Prioritization layer + prioritize: Prioritize, + + is_push_enabled: bool, + + /// If extended connect protocol is enabled. + is_extended_connect_protocol_enabled: bool, +} + +/// A value to detect which public API has called `poll_reset`. +#[derive(Debug)] +pub(crate) enum PollReset { + AwaitingHeaders, + Streaming, +} + +impl Send { + /// Create a new `Send` + pub fn new(config: &Config) -> Self { + Send { + init_window_sz: config.remote_init_window_sz, + max_stream_id: StreamId::MAX, + next_stream_id: Ok(config.local_next_stream_id), + prioritize: Prioritize::new(config), + is_push_enabled: true, + is_extended_connect_protocol_enabled: false, + } + } + + /// Returns the initial send window size + pub fn init_window_sz(&self) -> WindowSize { + self.init_window_sz + } + + pub fn open(&mut self) -> Result<StreamId, UserError> { + let stream_id = self.ensure_next_stream_id()?; + self.next_stream_id = stream_id.next_id(); + Ok(stream_id) + } + + pub fn reserve_local(&mut self) -> Result<StreamId, UserError> { + let stream_id = self.ensure_next_stream_id()?; + self.next_stream_id = stream_id.next_id(); + Ok(stream_id) + } + + fn check_headers(fields: &http::HeaderMap) -> Result<(), UserError> { + // 8.1.2.2. Connection-Specific Header Fields + if fields.contains_key(http::header::CONNECTION) + || fields.contains_key(http::header::TRANSFER_ENCODING) + || fields.contains_key(http::header::UPGRADE) + || fields.contains_key("keep-alive") + || fields.contains_key("proxy-connection") + { + tracing::debug!("illegal connection-specific headers found"); + return Err(UserError::MalformedHeaders); + } else if let Some(te) = fields.get(http::header::TE) { + if te != "trailers" { + tracing::debug!("illegal connection-specific headers found"); + return Err(UserError::MalformedHeaders); + } + } + Ok(()) + } + + pub fn send_push_promise<B>( + &mut self, + frame: frame::PushPromise, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + task: &mut Option<Waker>, + ) -> Result<(), UserError> { + if !self.is_push_enabled { + return Err(UserError::PeerDisabledServerPush); + } + + tracing::trace!( + "send_push_promise; frame={:?}; init_window={:?}", + frame, + self.init_window_sz + ); + + Self::check_headers(frame.fields())?; + + // Queue the frame for sending + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); + + Ok(()) + } + + pub fn send_headers<B>( + &mut self, + frame: frame::Headers, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + task: &mut Option<Waker>, + ) -> Result<(), UserError> { + tracing::trace!( + "send_headers; frame={:?}; init_window={:?}", + frame, + self.init_window_sz + ); + + Self::check_headers(frame.fields())?; + + let end_stream = frame.is_end_stream(); + + // Update the state + stream.state.send_open(end_stream)?; + + if counts.peer().is_local_init(frame.stream_id()) { + // If we're waiting on a PushPromise anyway + // handle potentially queueing the stream at that point + if !stream.is_pending_push { + if counts.can_inc_num_send_streams() { + counts.inc_num_send_streams(stream); + } else { + self.prioritize.queue_open(stream); + } + } + } + + // Queue the frame for sending + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); + + Ok(()) + } + + /// Send an explicit RST_STREAM frame + pub fn send_reset<B>( + &mut self, + reason: Reason, + initiator: Initiator, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + task: &mut Option<Waker>, + ) { + let is_reset = stream.state.is_reset(); + let is_closed = stream.state.is_closed(); + let is_empty = stream.pending_send.is_empty(); + let stream_id = stream.id; + + tracing::trace!( + "send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \ + is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \ + state={:?} \ + ", + reason, + initiator, + stream_id, + is_reset, + is_closed, + is_empty, + stream.state + ); + + if is_reset { + // Don't double reset + tracing::trace!( + " -> not sending RST_STREAM ({:?} is already reset)", + stream_id + ); + return; + } + + // Transition the state to reset no matter what. + stream.state.set_reset(stream_id, reason, initiator); + + // If closed AND the send queue is flushed, then the stream cannot be + // reset explicitly, either. Implicit resets can still be queued. + if is_closed && is_empty { + tracing::trace!( + " -> not sending explicit RST_STREAM ({:?} was closed \ + and send queue was flushed)", + stream_id + ); + return; + } + + // Clear all pending outbound frames. + // Note that we don't call `self.recv_err` because we want to enqueue + // the reset frame before transitioning the stream inside + // `reclaim_all_capacity`. + self.prioritize.clear_queue(buffer, stream); + + let frame = frame::Reset::new(stream.id, reason); + + tracing::trace!("send_reset -- queueing; frame={:?}", frame); + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); + self.prioritize.reclaim_all_capacity(stream, counts); + } + + pub fn schedule_implicit_reset( + &mut self, + stream: &mut store::Ptr, + reason: Reason, + counts: &mut Counts, + task: &mut Option<Waker>, + ) { + if stream.state.is_closed() { + // Stream is already closed, nothing more to do + return; + } + + stream.state.set_scheduled_reset(reason); + + self.prioritize.reclaim_reserved_capacity(stream, counts); + self.prioritize.schedule_send(stream, task); + } + + pub fn send_data<B>( + &mut self, + frame: frame::Data<B>, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + task: &mut Option<Waker>, + ) -> Result<(), UserError> + where + B: Buf, + { + self.prioritize + .send_data(frame, buffer, stream, counts, task) + } + + pub fn send_trailers<B>( + &mut self, + frame: frame::Headers, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + task: &mut Option<Waker>, + ) -> Result<(), UserError> { + // TODO: Should this logic be moved into state.rs? + if !stream.state.is_send_streaming() { + return Err(UserError::UnexpectedFrameType); + } + + stream.state.send_close(); + + tracing::trace!("send_trailers -- queuing; frame={:?}", frame); + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); + + // Release any excess capacity + self.prioritize.reserve_capacity(0, stream, counts); + + Ok(()) + } + + pub fn poll_complete<T, B>( + &mut self, + cx: &mut Context, + buffer: &mut Buffer<Frame<B>>, + store: &mut Store, + counts: &mut Counts, + dst: &mut Codec<T, Prioritized<B>>, + ) -> Poll<io::Result<()>> + where + T: AsyncWrite + Unpin, + B: Buf, + { + self.prioritize + .poll_complete(cx, buffer, store, counts, dst) + } + + /// Request capacity to send data + pub fn reserve_capacity( + &mut self, + capacity: WindowSize, + stream: &mut store::Ptr, + counts: &mut Counts, + ) { + self.prioritize.reserve_capacity(capacity, stream, counts) + } + + pub fn poll_capacity( + &mut self, + cx: &Context, + stream: &mut store::Ptr, + ) -> Poll<Option<Result<WindowSize, UserError>>> { + if !stream.state.is_send_streaming() { + return Poll::Ready(None); + } + + if !stream.send_capacity_inc { + stream.wait_send(cx); + return Poll::Pending; + } + + stream.send_capacity_inc = false; + + Poll::Ready(Some(Ok(self.capacity(stream)))) + } + + /// Current available stream send capacity + pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { + let available = stream.send_flow.available().as_size() as usize; + let buffered = stream.buffered_send_data; + + available + .min(self.prioritize.max_buffer_size()) + .saturating_sub(buffered) as WindowSize + } + + pub fn poll_reset( + &self, + cx: &Context, + stream: &mut Stream, + mode: PollReset, + ) -> Poll<Result<Reason, crate::Error>> { + match stream.state.ensure_reason(mode)? { + Some(reason) => Poll::Ready(Ok(reason)), + None => { + stream.wait_send(cx); + Poll::Pending + } + } + } + + pub fn recv_connection_window_update( + &mut self, + frame: frame::WindowUpdate, + store: &mut Store, + counts: &mut Counts, + ) -> Result<(), Reason> { + self.prioritize + .recv_connection_window_update(frame.size_increment(), store, counts) + } + + pub fn recv_stream_window_update<B>( + &mut self, + sz: WindowSize, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + task: &mut Option<Waker>, + ) -> Result<(), Reason> { + if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { + tracing::debug!("recv_stream_window_update !!; err={:?}", e); + + self.send_reset( + Reason::FLOW_CONTROL_ERROR, + Initiator::Library, + buffer, + stream, + counts, + task, + ); + + return Err(e); + } + + Ok(()) + } + + pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> { + if last_stream_id > self.max_stream_id { + // The remote endpoint sent a `GOAWAY` frame indicating a stream + // that we never sent, or that we have already terminated on account + // of previous `GOAWAY` frame. In either case, that is illegal. + // (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase + // the value they send in the last stream identifier, since the + // peers might already have retried unprocessed requests on another + // connection.") + proto_err!(conn: + "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})", + last_stream_id, self.max_stream_id, + ); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + + self.max_stream_id = last_stream_id; + Ok(()) + } + + pub fn handle_error<B>( + &mut self, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + ) { + // Clear all pending outbound frames + self.prioritize.clear_queue(buffer, stream); + self.prioritize.reclaim_all_capacity(stream, counts); + } + + pub fn apply_remote_settings<B>( + &mut self, + settings: &frame::Settings, + buffer: &mut Buffer<Frame<B>>, + store: &mut Store, + counts: &mut Counts, + task: &mut Option<Waker>, + ) -> Result<(), Error> { + if let Some(val) = settings.is_extended_connect_protocol_enabled() { + self.is_extended_connect_protocol_enabled = val; + } + + // Applies an update to the remote endpoint's initial window size. + // + // Per RFC 7540 §6.9.2: + // + // In addition to changing the flow-control window for streams that are + // not yet active, a SETTINGS frame can alter the initial flow-control + // window size for streams with active flow-control windows (that is, + // streams in the "open" or "half-closed (remote)" state). When the + // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust + // the size of all stream flow-control windows that it maintains by the + // difference between the new value and the old value. + // + // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available + // space in a flow-control window to become negative. A sender MUST + // track the negative flow-control window and MUST NOT send new + // flow-controlled frames until it receives WINDOW_UPDATE frames that + // cause the flow-control window to become positive. + if let Some(val) = settings.initial_window_size() { + let old_val = self.init_window_sz; + self.init_window_sz = val; + + if val < old_val { + // We must decrease the (remote) window on every open stream. + let dec = old_val - val; + tracing::trace!("decrementing all windows; dec={}", dec); + + let mut total_reclaimed = 0; + store.for_each(|mut stream| { + let stream = &mut *stream; + + stream.send_flow.dec_send_window(dec); + + // It's possible that decreasing the window causes + // `window_size` (the stream-specific window) to fall below + // `available` (the portion of the connection-level window + // that we have allocated to the stream). + // In this case, we should take that excess allocation away + // and reassign it to other streams. + let window_size = stream.send_flow.window_size(); + let available = stream.send_flow.available().as_size(); + let reclaimed = if available > window_size { + // Drop down to `window_size`. + let reclaim = available - window_size; + stream.send_flow.claim_capacity(reclaim); + total_reclaimed += reclaim; + reclaim + } else { + 0 + }; + + tracing::trace!( + "decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}", + stream.id, + dec, + reclaimed, + stream.send_flow + ); + + // TODO: Should this notify the producer when the capacity + // of a stream is reduced? Maybe it should if the capacity + // is reduced to zero, allowing the producer to stop work. + }); + + self.prioritize + .assign_connection_capacity(total_reclaimed, store, counts); + } else if val > old_val { + let inc = val - old_val; + + store.try_for_each(|mut stream| { + self.recv_stream_window_update(inc, buffer, &mut stream, counts, task) + .map_err(Error::library_go_away) + })?; + } + } + + if let Some(val) = settings.is_push_enabled() { + self.is_push_enabled = val + } + + Ok(()) + } + + pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) { + self.prioritize.clear_pending_capacity(store, counts); + self.prioritize.clear_pending_send(store, counts); + self.prioritize.clear_pending_open(store, counts); + } + + pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { + if let Ok(next) = self.next_stream_id { + if id >= next { + return Err(Reason::PROTOCOL_ERROR); + } + } + // if next_stream_id is overflowed, that's ok. + + Ok(()) + } + + pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> { + self.next_stream_id + .map_err(|_| UserError::OverflowedStreamId) + } + + pub fn may_have_created_stream(&self, id: StreamId) -> bool { + if let Ok(next_id) = self.next_stream_id { + // Peer::is_local_init should have been called beforehand + debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),); + id < next_id + } else { + true + } + } + + pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) { + if let Ok(next_id) = self.next_stream_id { + // Peer::is_local_init should have been called beforehand + debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated()); + if id >= next_id { + self.next_stream_id = id.next_id(); + } + } + } + + pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { + self.is_extended_connect_protocol_enabled + } +} diff --git a/third_party/rust/h2/src/proto/streams/state.rs b/third_party/rust/h2/src/proto/streams/state.rs new file mode 100644 index 0000000000..9931d41b1c --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/state.rs @@ -0,0 +1,472 @@ +use std::io; + +use crate::codec::UserError; +use crate::frame::{self, Reason, StreamId}; +use crate::proto::{self, Error, Initiator, PollReset}; + +use self::Inner::*; +use self::Peer::*; + +/// Represents the state of an H2 stream +/// +/// ```not_rust +/// +--------+ +/// send PP | | recv PP +/// ,--------| idle |--------. +/// / | | \ +/// v +--------+ v +/// +----------+ | +----------+ +/// | | | send H / | | +/// ,------| reserved | | recv H | reserved |------. +/// | | (local) | | | (remote) | | +/// | +----------+ v +----------+ | +/// | | +--------+ | | +/// | | recv ES | | send ES | | +/// | send H | ,-------| open |-------. | recv H | +/// | | / | | \ | | +/// | v v +--------+ v v | +/// | +----------+ | +----------+ | +/// | | half | | | half | | +/// | | closed | | send R / | closed | | +/// | | (remote) | | recv R | (local) | | +/// | +----------+ | +----------+ | +/// | | | | | +/// | | send ES / | recv ES / | | +/// | | send R / v send R / | | +/// | | recv R +--------+ recv R | | +/// | send R / `----------->| |<-----------' send R / | +/// | recv R | closed | recv R | +/// `----------------------->| |<----------------------' +/// +--------+ +/// +/// send: endpoint sends this frame +/// recv: endpoint receives this frame +/// +/// H: HEADERS frame (with implied CONTINUATIONs) +/// PP: PUSH_PROMISE frame (with implied CONTINUATIONs) +/// ES: END_STREAM flag +/// R: RST_STREAM frame +/// ``` +#[derive(Debug, Clone)] +pub struct State { + inner: Inner, +} + +#[derive(Debug, Clone)] +enum Inner { + Idle, + // TODO: these states shouldn't count against concurrency limits: + ReservedLocal, + ReservedRemote, + Open { local: Peer, remote: Peer }, + HalfClosedLocal(Peer), // TODO: explicitly name this value + HalfClosedRemote(Peer), + Closed(Cause), +} + +#[derive(Debug, Copy, Clone)] +enum Peer { + AwaitingHeaders, + Streaming, +} + +#[derive(Debug, Clone)] +enum Cause { + EndStream, + Error(Error), + + /// This indicates to the connection that a reset frame must be sent out + /// once the send queue has been flushed. + /// + /// Examples of when this could happen: + /// - User drops all references to a stream, so we want to CANCEL the it. + /// - Header block size was too large, so we want to REFUSE, possibly + /// after sending a 431 response frame. + ScheduledLibraryReset(Reason), +} + +impl State { + /// Opens the send-half of a stream if it is not already open. + pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> { + let local = Streaming; + + self.inner = match self.inner { + Idle => { + if eos { + HalfClosedLocal(AwaitingHeaders) + } else { + Open { + local, + remote: AwaitingHeaders, + } + } + } + Open { + local: AwaitingHeaders, + remote, + } => { + if eos { + HalfClosedLocal(remote) + } else { + Open { local, remote } + } + } + HalfClosedRemote(AwaitingHeaders) | ReservedLocal => { + if eos { + Closed(Cause::EndStream) + } else { + HalfClosedRemote(local) + } + } + _ => { + // All other transitions result in a protocol error + return Err(UserError::UnexpectedFrameType); + } + }; + + Ok(()) + } + + /// Opens the receive-half of the stream when a HEADERS frame is received. + /// + /// Returns true if this transitions the state to Open. + pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> { + let mut initial = false; + let eos = frame.is_end_stream(); + + self.inner = match self.inner { + Idle => { + initial = true; + + if eos { + HalfClosedRemote(AwaitingHeaders) + } else { + Open { + local: AwaitingHeaders, + remote: if frame.is_informational() { + tracing::trace!("skipping 1xx response headers"); + AwaitingHeaders + } else { + Streaming + }, + } + } + } + ReservedRemote => { + initial = true; + + if eos { + Closed(Cause::EndStream) + } else if frame.is_informational() { + tracing::trace!("skipping 1xx response headers"); + ReservedRemote + } else { + HalfClosedLocal(Streaming) + } + } + Open { + local, + remote: AwaitingHeaders, + } => { + if eos { + HalfClosedRemote(local) + } else { + Open { + local, + remote: if frame.is_informational() { + tracing::trace!("skipping 1xx response headers"); + AwaitingHeaders + } else { + Streaming + }, + } + } + } + HalfClosedLocal(AwaitingHeaders) => { + if eos { + Closed(Cause::EndStream) + } else if frame.is_informational() { + tracing::trace!("skipping 1xx response headers"); + HalfClosedLocal(AwaitingHeaders) + } else { + HalfClosedLocal(Streaming) + } + } + ref state => { + // All other transitions result in a protocol error + proto_err!(conn: "recv_open: in unexpected state {:?}", state); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + }; + + Ok(initial) + } + + /// Transition from Idle -> ReservedRemote + pub fn reserve_remote(&mut self) -> Result<(), Error> { + match self.inner { + Idle => { + self.inner = ReservedRemote; + Ok(()) + } + ref state => { + proto_err!(conn: "reserve_remote: in unexpected state {:?}", state); + Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) + } + } + } + + /// Transition from Idle -> ReservedLocal + pub fn reserve_local(&mut self) -> Result<(), UserError> { + match self.inner { + Idle => { + self.inner = ReservedLocal; + Ok(()) + } + _ => Err(UserError::UnexpectedFrameType), + } + } + + /// Indicates that the remote side will not send more data to the local. + pub fn recv_close(&mut self) -> Result<(), Error> { + match self.inner { + Open { local, .. } => { + // The remote side will continue to receive data. + tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local); + self.inner = HalfClosedRemote(local); + Ok(()) + } + HalfClosedLocal(..) => { + tracing::trace!("recv_close: HalfClosedLocal => Closed"); + self.inner = Closed(Cause::EndStream); + Ok(()) + } + ref state => { + proto_err!(conn: "recv_close: in unexpected state {:?}", state); + Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) + } + } + } + + /// The remote explicitly sent a RST_STREAM. + /// + /// # Arguments + /// - `frame`: the received RST_STREAM frame. + /// - `queued`: true if this stream has frames in the pending send queue. + pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) { + match self.inner { + // If the stream is already in a `Closed` state, do nothing, + // provided that there are no frames still in the send queue. + Closed(..) if !queued => {} + // A notionally `Closed` stream may still have queued frames in + // the following cases: + // + // - if the cause is `Cause::Scheduled(..)` (i.e. we have not + // actually closed the stream yet). + // - if the cause is `Cause::EndStream`: we transition to this + // state when an EOS frame is *enqueued* (so that it's invalid + // to enqueue more frames), not when the EOS frame is *sent*; + // therefore, there may still be frames ahead of the EOS frame + // in the send queue. + // + // In either of these cases, we want to overwrite the stream's + // previous state with the received RST_STREAM, so that the queue + // will be cleared by `Prioritize::pop_frame`. + ref state => { + tracing::trace!( + "recv_reset; frame={:?}; state={:?}; queued={:?}", + frame, + state, + queued + ); + self.inner = Closed(Cause::Error(Error::remote_reset( + frame.stream_id(), + frame.reason(), + ))); + } + } + } + + /// Handle a connection-level error. + pub fn handle_error(&mut self, err: &proto::Error) { + match self.inner { + Closed(..) => {} + _ => { + tracing::trace!("handle_error; err={:?}", err); + self.inner = Closed(Cause::Error(err.clone())); + } + } + } + + pub fn recv_eof(&mut self) { + match self.inner { + Closed(..) => {} + ref state => { + tracing::trace!("recv_eof; state={:?}", state); + self.inner = Closed(Cause::Error(io::ErrorKind::BrokenPipe.into())); + } + } + } + + /// Indicates that the local side will not send more data to the local. + pub fn send_close(&mut self) { + match self.inner { + Open { remote, .. } => { + // The remote side will continue to receive data. + tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote); + self.inner = HalfClosedLocal(remote); + } + HalfClosedRemote(..) => { + tracing::trace!("send_close: HalfClosedRemote => Closed"); + self.inner = Closed(Cause::EndStream); + } + ref state => panic!("send_close: unexpected state {:?}", state), + } + } + + /// Set the stream state to reset locally. + pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) { + self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator))); + } + + /// Set the stream state to a scheduled reset. + pub fn set_scheduled_reset(&mut self, reason: Reason) { + debug_assert!(!self.is_closed()); + self.inner = Closed(Cause::ScheduledLibraryReset(reason)); + } + + pub fn get_scheduled_reset(&self) -> Option<Reason> { + match self.inner { + Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason), + _ => None, + } + } + + pub fn is_scheduled_reset(&self) -> bool { + match self.inner { + Closed(Cause::ScheduledLibraryReset(..)) => true, + _ => false, + } + } + + pub fn is_local_reset(&self) -> bool { + match self.inner { + Closed(Cause::Error(ref e)) => e.is_local(), + Closed(Cause::ScheduledLibraryReset(..)) => true, + _ => false, + } + } + + /// Returns true if the stream is already reset. + pub fn is_reset(&self) -> bool { + match self.inner { + Closed(Cause::EndStream) => false, + Closed(_) => true, + _ => false, + } + } + + pub fn is_send_streaming(&self) -> bool { + match self.inner { + Open { + local: Streaming, .. + } => true, + HalfClosedRemote(Streaming) => true, + _ => false, + } + } + + /// Returns true when the stream is in a state to receive headers + pub fn is_recv_headers(&self) -> bool { + match self.inner { + Idle => true, + Open { + remote: AwaitingHeaders, + .. + } => true, + HalfClosedLocal(AwaitingHeaders) => true, + ReservedRemote => true, + _ => false, + } + } + + pub fn is_recv_streaming(&self) -> bool { + match self.inner { + Open { + remote: Streaming, .. + } => true, + HalfClosedLocal(Streaming) => true, + _ => false, + } + } + + pub fn is_closed(&self) -> bool { + match self.inner { + Closed(_) => true, + _ => false, + } + } + + pub fn is_recv_closed(&self) -> bool { + match self.inner { + Closed(..) | HalfClosedRemote(..) | ReservedLocal => true, + _ => false, + } + } + + pub fn is_send_closed(&self) -> bool { + match self.inner { + Closed(..) | HalfClosedLocal(..) | ReservedRemote => true, + _ => false, + } + } + + pub fn is_idle(&self) -> bool { + match self.inner { + Idle => true, + _ => false, + } + } + + pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> { + // TODO: Is this correct? + match self.inner { + Closed(Cause::Error(ref e)) => Err(e.clone()), + Closed(Cause::ScheduledLibraryReset(reason)) => { + Err(proto::Error::library_go_away(reason)) + } + Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false), + _ => Ok(true), + } + } + + /// Returns a reason if the stream has been reset. + pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> { + match self.inner { + Closed(Cause::Error(Error::Reset(_, reason, _))) + | Closed(Cause::Error(Error::GoAway(_, reason, _))) + | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)), + Closed(Cause::Error(ref e)) => Err(e.clone().into()), + Open { + local: Streaming, .. + } + | HalfClosedRemote(Streaming) => match mode { + PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()), + PollReset::Streaming => Ok(None), + }, + _ => Ok(None), + } + } +} + +impl Default for State { + fn default() -> State { + State { inner: Inner::Idle } + } +} + +impl Default for Peer { + fn default() -> Self { + AwaitingHeaders + } +} diff --git a/third_party/rust/h2/src/proto/streams/store.rs b/third_party/rust/h2/src/proto/streams/store.rs new file mode 100644 index 0000000000..3e34b7cb29 --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/store.rs @@ -0,0 +1,426 @@ +use super::*; + +use slab; + +use indexmap::{self, IndexMap}; + +use std::convert::Infallible; +use std::fmt; +use std::marker::PhantomData; +use std::ops; + +/// Storage for streams +#[derive(Debug)] +pub(super) struct Store { + slab: slab::Slab<Stream>, + ids: IndexMap<StreamId, SlabIndex>, +} + +/// "Pointer" to an entry in the store +pub(super) struct Ptr<'a> { + key: Key, + store: &'a mut Store, +} + +/// References an entry in the store. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct Key { + index: SlabIndex, + /// Keep the stream ID in the key as an ABA guard, since slab indices + /// could be re-used with a new stream. + stream_id: StreamId, +} + +// We can never have more than `StreamId::MAX` streams in the store, +// so we can save a smaller index (u32 vs usize). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct SlabIndex(u32); + +#[derive(Debug)] +pub(super) struct Queue<N> { + indices: Option<store::Indices>, + _p: PhantomData<N>, +} + +pub(super) trait Next { + fn next(stream: &Stream) -> Option<Key>; + + fn set_next(stream: &mut Stream, key: Option<Key>); + + fn take_next(stream: &mut Stream) -> Option<Key>; + + fn is_queued(stream: &Stream) -> bool; + + fn set_queued(stream: &mut Stream, val: bool); +} + +/// A linked list +#[derive(Debug, Clone, Copy)] +struct Indices { + pub head: Key, + pub tail: Key, +} + +pub(super) enum Entry<'a> { + Occupied(OccupiedEntry<'a>), + Vacant(VacantEntry<'a>), +} + +pub(super) struct OccupiedEntry<'a> { + ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>, +} + +pub(super) struct VacantEntry<'a> { + ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>, + slab: &'a mut slab::Slab<Stream>, +} + +pub(super) trait Resolve { + fn resolve(&mut self, key: Key) -> Ptr; +} + +// ===== impl Store ===== + +impl Store { + pub fn new() -> Self { + Store { + slab: slab::Slab::new(), + ids: IndexMap::new(), + } + } + + pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr> { + let index = match self.ids.get(id) { + Some(key) => *key, + None => return None, + }; + + Some(Ptr { + key: Key { + index, + stream_id: *id, + }, + store: self, + }) + } + + pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr { + let index = SlabIndex(self.slab.insert(val) as u32); + assert!(self.ids.insert(id, index).is_none()); + + Ptr { + key: Key { + index, + stream_id: id, + }, + store: self, + } + } + + pub fn find_entry(&mut self, id: StreamId) -> Entry { + use self::indexmap::map::Entry::*; + + match self.ids.entry(id) { + Occupied(e) => Entry::Occupied(OccupiedEntry { ids: e }), + Vacant(e) => Entry::Vacant(VacantEntry { + ids: e, + slab: &mut self.slab, + }), + } + } + + pub(crate) fn for_each<F>(&mut self, mut f: F) + where + F: FnMut(Ptr), + { + match self.try_for_each(|ptr| { + f(ptr); + Ok::<_, Infallible>(()) + }) { + Ok(()) => (), + Err(infallible) => match infallible {}, + } + } + + pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E> + where + F: FnMut(Ptr) -> Result<(), E>, + { + let mut len = self.ids.len(); + let mut i = 0; + + while i < len { + // Get the key by index, this makes the borrow checker happy + let (stream_id, index) = { + let entry = self.ids.get_index(i).unwrap(); + (*entry.0, *entry.1) + }; + + f(Ptr { + key: Key { index, stream_id }, + store: self, + })?; + + // TODO: This logic probably could be better... + let new_len = self.ids.len(); + + if new_len < len { + debug_assert!(new_len == len - 1); + len -= 1; + } else { + i += 1; + } + } + + Ok(()) + } +} + +impl Resolve for Store { + fn resolve(&mut self, key: Key) -> Ptr { + Ptr { key, store: self } + } +} + +impl ops::Index<Key> for Store { + type Output = Stream; + + fn index(&self, key: Key) -> &Self::Output { + self.slab + .get(key.index.0 as usize) + .filter(|s| s.id == key.stream_id) + .unwrap_or_else(|| { + panic!("dangling store key for stream_id={:?}", key.stream_id); + }) + } +} + +impl ops::IndexMut<Key> for Store { + fn index_mut(&mut self, key: Key) -> &mut Self::Output { + self.slab + .get_mut(key.index.0 as usize) + .filter(|s| s.id == key.stream_id) + .unwrap_or_else(|| { + panic!("dangling store key for stream_id={:?}", key.stream_id); + }) + } +} + +impl Store { + #[cfg(feature = "unstable")] + pub fn num_active_streams(&self) -> usize { + self.ids.len() + } + + #[cfg(feature = "unstable")] + pub fn num_wired_streams(&self) -> usize { + self.slab.len() + } +} + +// While running h2 unit/integration tests, enable this debug assertion. +// +// In practice, we don't need to ensure this. But the integration tests +// help to make sure we've cleaned up in cases where we could (like, the +// runtime isn't suddenly dropping the task for unknown reasons). +#[cfg(feature = "unstable")] +impl Drop for Store { + fn drop(&mut self) { + use std::thread; + + if !thread::panicking() { + debug_assert!(self.slab.is_empty()); + } + } +} + +// ===== impl Queue ===== + +impl<N> Queue<N> +where + N: Next, +{ + pub fn new() -> Self { + Queue { + indices: None, + _p: PhantomData, + } + } + + pub fn take(&mut self) -> Self { + Queue { + indices: self.indices.take(), + _p: PhantomData, + } + } + + /// Queue the stream. + /// + /// If the stream is already contained by the list, return `false`. + pub fn push(&mut self, stream: &mut store::Ptr) -> bool { + tracing::trace!("Queue::push"); + + if N::is_queued(stream) { + tracing::trace!(" -> already queued"); + return false; + } + + N::set_queued(stream, true); + + // The next pointer shouldn't be set + debug_assert!(N::next(stream).is_none()); + + // Queue the stream + match self.indices { + Some(ref mut idxs) => { + tracing::trace!(" -> existing entries"); + + // Update the current tail node to point to `stream` + let key = stream.key(); + N::set_next(&mut stream.resolve(idxs.tail), Some(key)); + + // Update the tail pointer + idxs.tail = stream.key(); + } + None => { + tracing::trace!(" -> first entry"); + self.indices = Some(store::Indices { + head: stream.key(), + tail: stream.key(), + }); + } + } + + true + } + + pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>> + where + R: Resolve, + { + if let Some(mut idxs) = self.indices { + let mut stream = store.resolve(idxs.head); + + if idxs.head == idxs.tail { + assert!(N::next(&*stream).is_none()); + self.indices = None; + } else { + idxs.head = N::take_next(&mut *stream).unwrap(); + self.indices = Some(idxs); + } + + debug_assert!(N::is_queued(&*stream)); + N::set_queued(&mut *stream, false); + + return Some(stream); + } + + None + } + + pub fn is_empty(&self) -> bool { + self.indices.is_none() + } + + pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>> + where + R: Resolve, + F: Fn(&Stream) -> bool, + { + if let Some(idxs) = self.indices { + let should_pop = f(&store.resolve(idxs.head)); + if should_pop { + return self.pop(store); + } + } + + None + } +} + +// ===== impl Ptr ===== + +impl<'a> Ptr<'a> { + /// Returns the Key associated with the stream + pub fn key(&self) -> Key { + self.key + } + + pub fn store_mut(&mut self) -> &mut Store { + &mut self.store + } + + /// Remove the stream from the store + pub fn remove(self) -> StreamId { + // The stream must have been unlinked before this point + debug_assert!(!self.store.ids.contains_key(&self.key.stream_id)); + + // Remove the stream state + let stream = self.store.slab.remove(self.key.index.0 as usize); + assert_eq!(stream.id, self.key.stream_id); + stream.id + } + + /// Remove the StreamId -> stream state association. + /// + /// This will effectively remove the stream as far as the H2 protocol is + /// concerned. + pub fn unlink(&mut self) { + let id = self.key.stream_id; + self.store.ids.swap_remove(&id); + } +} + +impl<'a> Resolve for Ptr<'a> { + fn resolve(&mut self, key: Key) -> Ptr { + Ptr { + key, + store: &mut *self.store, + } + } +} + +impl<'a> ops::Deref for Ptr<'a> { + type Target = Stream; + + fn deref(&self) -> &Stream { + &self.store[self.key] + } +} + +impl<'a> ops::DerefMut for Ptr<'a> { + fn deref_mut(&mut self) -> &mut Stream { + &mut self.store[self.key] + } +} + +impl<'a> fmt::Debug for Ptr<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + (**self).fmt(fmt) + } +} + +// ===== impl OccupiedEntry ===== + +impl<'a> OccupiedEntry<'a> { + pub fn key(&self) -> Key { + let stream_id = *self.ids.key(); + let index = *self.ids.get(); + Key { index, stream_id } + } +} + +// ===== impl VacantEntry ===== + +impl<'a> VacantEntry<'a> { + pub fn insert(self, value: Stream) -> Key { + // Insert the value in the slab + let stream_id = value.id; + let index = SlabIndex(self.slab.insert(value) as u32); + + // Insert the handle in the ID map + self.ids.insert(index); + + Key { index, stream_id } + } +} diff --git a/third_party/rust/h2/src/proto/streams/stream.rs b/third_party/rust/h2/src/proto/streams/stream.rs new file mode 100644 index 0000000000..36d515bad2 --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/stream.rs @@ -0,0 +1,490 @@ +use super::*; + +use std::task::{Context, Waker}; +use std::time::Instant; +use std::usize; + +/// Tracks Stream related state +/// +/// # Reference counting +/// +/// There can be a number of outstanding handles to a single Stream. These are +/// tracked using reference counting. The `ref_count` field represents the +/// number of outstanding userspace handles that can reach this stream. +/// +/// It's important to note that when the stream is placed in an internal queue +/// (such as an accept queue), this is **not** tracked by a reference count. +/// Thus, `ref_count` can be zero and the stream still has to be kept around. +#[derive(Debug)] +pub(super) struct Stream { + /// The h2 stream identifier + pub id: StreamId, + + /// Current state of the stream + pub state: State, + + /// Set to `true` when the stream is counted against the connection's max + /// concurrent streams. + pub is_counted: bool, + + /// Number of outstanding handles pointing to this stream + pub ref_count: usize, + + // ===== Fields related to sending ===== + /// Next node in the accept linked list + pub next_pending_send: Option<store::Key>, + + /// Set to true when the stream is pending accept + pub is_pending_send: bool, + + /// Send data flow control + pub send_flow: FlowControl, + + /// Amount of send capacity that has been requested, but not yet allocated. + pub requested_send_capacity: WindowSize, + + /// Amount of data buffered at the prioritization layer. + /// TODO: Technically this could be greater than the window size... + pub buffered_send_data: usize, + + /// Task tracking additional send capacity (i.e. window updates). + send_task: Option<Waker>, + + /// Frames pending for this stream being sent to the socket + pub pending_send: buffer::Deque, + + /// Next node in the linked list of streams waiting for additional + /// connection level capacity. + pub next_pending_send_capacity: Option<store::Key>, + + /// True if the stream is waiting for outbound connection capacity + pub is_pending_send_capacity: bool, + + /// Set to true when the send capacity has been incremented + pub send_capacity_inc: bool, + + /// Next node in the open linked list + pub next_open: Option<store::Key>, + + /// Set to true when the stream is pending to be opened + pub is_pending_open: bool, + + /// Set to true when a push is pending for this stream + pub is_pending_push: bool, + + // ===== Fields related to receiving ===== + /// Next node in the accept linked list + pub next_pending_accept: Option<store::Key>, + + /// Set to true when the stream is pending accept + pub is_pending_accept: bool, + + /// Receive data flow control + pub recv_flow: FlowControl, + + pub in_flight_recv_data: WindowSize, + + /// Next node in the linked list of streams waiting to send window updates. + pub next_window_update: Option<store::Key>, + + /// True if the stream is waiting to send a window update + pub is_pending_window_update: bool, + + /// The time when this stream may have been locally reset. + pub reset_at: Option<Instant>, + + /// Next node in list of reset streams that should expire eventually + pub next_reset_expire: Option<store::Key>, + + /// Frames pending for this stream to read + pub pending_recv: buffer::Deque, + + /// Task tracking receiving frames + pub recv_task: Option<Waker>, + + /// The stream's pending push promises + pub pending_push_promises: store::Queue<NextAccept>, + + /// Validate content-length headers + pub content_length: ContentLength, +} + +/// State related to validating a stream's content-length +#[derive(Debug)] +pub enum ContentLength { + Omitted, + Head, + Remaining(u64), +} + +#[derive(Debug)] +pub(super) struct NextAccept; + +#[derive(Debug)] +pub(super) struct NextSend; + +#[derive(Debug)] +pub(super) struct NextSendCapacity; + +#[derive(Debug)] +pub(super) struct NextWindowUpdate; + +#[derive(Debug)] +pub(super) struct NextOpen; + +#[derive(Debug)] +pub(super) struct NextResetExpire; + +impl Stream { + pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream { + let mut send_flow = FlowControl::new(); + let mut recv_flow = FlowControl::new(); + + recv_flow + .inc_window(init_recv_window) + .expect("invalid initial receive window"); + recv_flow.assign_capacity(init_recv_window); + + send_flow + .inc_window(init_send_window) + .expect("invalid initial send window size"); + + Stream { + id, + state: State::default(), + ref_count: 0, + is_counted: false, + + // ===== Fields related to sending ===== + next_pending_send: None, + is_pending_send: false, + send_flow, + requested_send_capacity: 0, + buffered_send_data: 0, + send_task: None, + pending_send: buffer::Deque::new(), + is_pending_send_capacity: false, + next_pending_send_capacity: None, + send_capacity_inc: false, + is_pending_open: false, + next_open: None, + is_pending_push: false, + + // ===== Fields related to receiving ===== + next_pending_accept: None, + is_pending_accept: false, + recv_flow, + in_flight_recv_data: 0, + next_window_update: None, + is_pending_window_update: false, + reset_at: None, + next_reset_expire: None, + pending_recv: buffer::Deque::new(), + recv_task: None, + pending_push_promises: store::Queue::new(), + content_length: ContentLength::Omitted, + } + } + + /// Increment the stream's ref count + pub fn ref_inc(&mut self) { + assert!(self.ref_count < usize::MAX); + self.ref_count += 1; + } + + /// Decrements the stream's ref count + pub fn ref_dec(&mut self) { + assert!(self.ref_count > 0); + self.ref_count -= 1; + } + + /// Returns true if stream is currently being held for some time because of + /// a local reset. + pub fn is_pending_reset_expiration(&self) -> bool { + self.reset_at.is_some() + } + + /// Returns true if frames for this stream are ready to be sent over the wire + pub fn is_send_ready(&self) -> bool { + // Why do we check pending_open? + // + // We allow users to call send_request() which schedules a stream to be pending_open + // if there is no room according to the concurrency limit (max_send_streams), and we + // also allow data to be buffered for send with send_data() if there is no capacity for + // the stream to send the data, which attempts to place the stream in pending_send. + // If the stream is not open, we don't want the stream to be scheduled for + // execution (pending_send). Note that if the stream is in pending_open, it will be + // pushed to pending_send when there is room for an open stream. + // + // In pending_push we track whether a PushPromise still needs to be sent + // from a different stream before we can start sending frames on this one. + // This is different from the "open" check because reserved streams don't count + // toward the concurrency limit. + // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2 + !self.is_pending_open && !self.is_pending_push + } + + /// Returns true if the stream is closed + pub fn is_closed(&self) -> bool { + // The state has fully transitioned to closed. + self.state.is_closed() && + // Because outbound frames transition the stream state before being + // buffered, we have to ensure that all frames have been flushed. + self.pending_send.is_empty() && + // Sometimes large data frames are sent out in chunks. After a chunk + // of the frame is sent, the remainder is pushed back onto the send + // queue to be rescheduled. + // + // Checking for additional buffered data lets us catch this case. + self.buffered_send_data == 0 + } + + /// Returns true if the stream is no longer in use + pub fn is_released(&self) -> bool { + // The stream is closed and fully flushed + self.is_closed() && + // There are no more outstanding references to the stream + self.ref_count == 0 && + // The stream is not in any queue + !self.is_pending_send && !self.is_pending_send_capacity && + !self.is_pending_accept && !self.is_pending_window_update && + !self.is_pending_open && !self.reset_at.is_some() + } + + /// Returns true when the consumer of the stream has dropped all handles + /// (indicating no further interest in the stream) and the stream state is + /// not actually closed. + /// + /// In this case, a reset should be sent. + pub fn is_canceled_interest(&self) -> bool { + self.ref_count == 0 && !self.state.is_closed() + } + + pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { + debug_assert!(capacity > 0); + self.send_flow.assign_capacity(capacity); + + tracing::trace!( + " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={}", + self.send_flow.available(), + self.buffered_send_data, + self.id, + max_buffer_size + ); + + self.notify_if_can_buffer_more(max_buffer_size); + } + + /// If the capacity was limited because of the max_send_buffer_size, + /// then consider waking the send task again... + pub fn notify_if_can_buffer_more(&mut self, max_buffer_size: usize) { + let available = self.send_flow.available().as_size() as usize; + let buffered = self.buffered_send_data; + + // Only notify if the capacity exceeds the amount of buffered data + if available.min(max_buffer_size) > buffered { + self.send_capacity_inc = true; + tracing::trace!(" notifying task"); + self.notify_send(); + } + } + + /// Returns `Err` when the decrement cannot be completed due to overflow. + pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { + match self.content_length { + ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) { + Some(val) => *rem = val, + None => return Err(()), + }, + ContentLength::Head => { + if len != 0 { + return Err(()); + } + } + _ => {} + } + + Ok(()) + } + + pub fn ensure_content_length_zero(&self) -> Result<(), ()> { + match self.content_length { + ContentLength::Remaining(0) => Ok(()), + ContentLength::Remaining(_) => Err(()), + _ => Ok(()), + } + } + + pub fn notify_send(&mut self) { + if let Some(task) = self.send_task.take() { + task.wake(); + } + } + + pub fn wait_send(&mut self, cx: &Context) { + self.send_task = Some(cx.waker().clone()); + } + + pub fn notify_recv(&mut self) { + if let Some(task) = self.recv_task.take() { + task.wake(); + } + } +} + +impl store::Next for NextAccept { + fn next(stream: &Stream) -> Option<store::Key> { + stream.next_pending_accept + } + + fn set_next(stream: &mut Stream, key: Option<store::Key>) { + stream.next_pending_accept = key; + } + + fn take_next(stream: &mut Stream) -> Option<store::Key> { + stream.next_pending_accept.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_accept + } + + fn set_queued(stream: &mut Stream, val: bool) { + stream.is_pending_accept = val; + } +} + +impl store::Next for NextSend { + fn next(stream: &Stream) -> Option<store::Key> { + stream.next_pending_send + } + + fn set_next(stream: &mut Stream, key: Option<store::Key>) { + stream.next_pending_send = key; + } + + fn take_next(stream: &mut Stream) -> Option<store::Key> { + stream.next_pending_send.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_send + } + + fn set_queued(stream: &mut Stream, val: bool) { + if val { + // ensure that stream is not queued for being opened + // if it's being put into queue for sending data + debug_assert_eq!(stream.is_pending_open, false); + } + stream.is_pending_send = val; + } +} + +impl store::Next for NextSendCapacity { + fn next(stream: &Stream) -> Option<store::Key> { + stream.next_pending_send_capacity + } + + fn set_next(stream: &mut Stream, key: Option<store::Key>) { + stream.next_pending_send_capacity = key; + } + + fn take_next(stream: &mut Stream) -> Option<store::Key> { + stream.next_pending_send_capacity.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_send_capacity + } + + fn set_queued(stream: &mut Stream, val: bool) { + stream.is_pending_send_capacity = val; + } +} + +impl store::Next for NextWindowUpdate { + fn next(stream: &Stream) -> Option<store::Key> { + stream.next_window_update + } + + fn set_next(stream: &mut Stream, key: Option<store::Key>) { + stream.next_window_update = key; + } + + fn take_next(stream: &mut Stream) -> Option<store::Key> { + stream.next_window_update.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_window_update + } + + fn set_queued(stream: &mut Stream, val: bool) { + stream.is_pending_window_update = val; + } +} + +impl store::Next for NextOpen { + fn next(stream: &Stream) -> Option<store::Key> { + stream.next_open + } + + fn set_next(stream: &mut Stream, key: Option<store::Key>) { + stream.next_open = key; + } + + fn take_next(stream: &mut Stream) -> Option<store::Key> { + stream.next_open.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_open + } + + fn set_queued(stream: &mut Stream, val: bool) { + if val { + // ensure that stream is not queued for being sent + // if it's being put into queue for opening the stream + debug_assert_eq!(stream.is_pending_send, false); + } + stream.is_pending_open = val; + } +} + +impl store::Next for NextResetExpire { + fn next(stream: &Stream) -> Option<store::Key> { + stream.next_reset_expire + } + + fn set_next(stream: &mut Stream, key: Option<store::Key>) { + stream.next_reset_expire = key; + } + + fn take_next(stream: &mut Stream) -> Option<store::Key> { + stream.next_reset_expire.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.reset_at.is_some() + } + + fn set_queued(stream: &mut Stream, val: bool) { + if val { + stream.reset_at = Some(Instant::now()); + } else { + stream.reset_at = None; + } + } +} + +// ===== impl ContentLength ===== + +impl ContentLength { + pub fn is_head(&self) -> bool { + match *self { + ContentLength::Head => true, + _ => false, + } + } +} diff --git a/third_party/rust/h2/src/proto/streams/streams.rs b/third_party/rust/h2/src/proto/streams/streams.rs new file mode 100644 index 0000000000..aee64ca6a3 --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/streams.rs @@ -0,0 +1,1576 @@ +use super::recv::RecvHeaderBlockError; +use super::store::{self, Entry, Resolve, Store}; +use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId}; +use crate::codec::{Codec, SendError, UserError}; +use crate::ext::Protocol; +use crate::frame::{self, Frame, Reason}; +use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize}; +use crate::{client, proto, server}; + +use bytes::{Buf, Bytes}; +use http::{HeaderMap, Request, Response}; +use std::task::{Context, Poll, Waker}; +use tokio::io::AsyncWrite; + +use std::sync::{Arc, Mutex}; +use std::{fmt, io}; + +#[derive(Debug)] +pub(crate) struct Streams<B, P> +where + P: Peer, +{ + /// Holds most of the connection and stream related state for processing + /// HTTP/2 frames associated with streams. + inner: Arc<Mutex<Inner>>, + + /// This is the queue of frames to be written to the wire. This is split out + /// to avoid requiring a `B` generic on all public API types even if `B` is + /// not technically required. + /// + /// Currently, splitting this out requires a second `Arc` + `Mutex`. + /// However, it should be possible to avoid this duplication with a little + /// bit of unsafe code. This optimization has been postponed until it has + /// been shown to be necessary. + send_buffer: Arc<SendBuffer<B>>, + + _p: ::std::marker::PhantomData<P>, +} + +// Like `Streams` but with a `peer::Dyn` field instead of a static `P: Peer` type parameter. +// Ensures that the methods only get one instantiation, instead of two (client and server) +#[derive(Debug)] +pub(crate) struct DynStreams<'a, B> { + inner: &'a Mutex<Inner>, + + send_buffer: &'a SendBuffer<B>, + + peer: peer::Dyn, +} + +/// Reference to the stream state +#[derive(Debug)] +pub(crate) struct StreamRef<B> { + opaque: OpaqueStreamRef, + send_buffer: Arc<SendBuffer<B>>, +} + +/// Reference to the stream state that hides the send data chunk generic +pub(crate) struct OpaqueStreamRef { + inner: Arc<Mutex<Inner>>, + key: store::Key, +} + +/// Fields needed to manage state related to managing the set of streams. This +/// is mostly split out to make ownership happy. +/// +/// TODO: better name +#[derive(Debug)] +struct Inner { + /// Tracks send & recv stream concurrency. + counts: Counts, + + /// Connection level state and performs actions on streams + actions: Actions, + + /// Stores stream state + store: Store, + + /// The number of stream refs to this shared state. + refs: usize, +} + +#[derive(Debug)] +struct Actions { + /// Manages state transitions initiated by receiving frames + recv: Recv, + + /// Manages state transitions initiated by sending frames + send: Send, + + /// Task that calls `poll_complete`. + task: Option<Waker>, + + /// If the connection errors, a copy is kept for any StreamRefs. + conn_error: Option<proto::Error>, +} + +/// Contains the buffer of frames to be written to the wire. +#[derive(Debug)] +struct SendBuffer<B> { + inner: Mutex<Buffer<Frame<B>>>, +} + +// ===== impl Streams ===== + +impl<B, P> Streams<B, P> +where + B: Buf, + P: Peer, +{ + pub fn new(config: Config) -> Self { + let peer = P::r#dyn(); + + Streams { + inner: Inner::new(peer, config), + send_buffer: Arc::new(SendBuffer::new()), + _p: ::std::marker::PhantomData, + } + } + + pub fn set_target_connection_window_size(&mut self, size: WindowSize) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + me.actions + .recv + .set_target_connection_window(size, &mut me.actions.task) + } + + pub fn next_incoming(&mut self) -> Option<StreamRef<B>> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + me.actions.recv.next_incoming(&mut me.store).map(|key| { + let stream = &mut me.store.resolve(key); + tracing::trace!( + "next_incoming; id={:?}, state={:?}", + stream.id, + stream.state + ); + // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding + // the lock, so it can't. + me.refs += 1; + StreamRef { + opaque: OpaqueStreamRef::new(self.inner.clone(), stream), + send_buffer: self.send_buffer.clone(), + } + }) + } + + pub fn send_pending_refusal<T>( + &mut self, + cx: &mut Context, + dst: &mut Codec<T, Prioritized<B>>, + ) -> Poll<io::Result<()>> + where + T: AsyncWrite + Unpin, + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + me.actions.recv.send_pending_refusal(cx, dst) + } + + pub fn clear_expired_reset_streams(&mut self) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + me.actions + .recv + .clear_expired_reset_streams(&mut me.store, &mut me.counts); + } + + pub fn poll_complete<T>( + &mut self, + cx: &mut Context, + dst: &mut Codec<T, Prioritized<B>>, + ) -> Poll<io::Result<()>> + where + T: AsyncWrite + Unpin, + { + let mut me = self.inner.lock().unwrap(); + me.poll_complete(&self.send_buffer, cx, dst) + } + + pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + me.counts.apply_remote_settings(frame); + + me.actions.send.apply_remote_settings( + frame, + send_buffer, + &mut me.store, + &mut me.counts, + &mut me.actions.task, + ) + } + + pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + me.actions.recv.apply_local_settings(frame, &mut me.store) + } + + pub fn send_request( + &mut self, + mut request: Request<()>, + end_of_stream: bool, + pending: Option<&OpaqueStreamRef>, + ) -> Result<StreamRef<B>, SendError> { + use super::stream::ContentLength; + use http::Method; + + let protocol = request.extensions_mut().remove::<Protocol>(); + + // Clear before taking lock, incase extensions contain a StreamRef. + request.extensions_mut().clear(); + + // TODO: There is a hazard with assigning a stream ID before the + // prioritize layer. If prioritization reorders new streams, this + // implicitly closes the earlier stream IDs. + // + // See: hyperium/h2#11 + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + me.actions.ensure_no_conn_error()?; + me.actions.send.ensure_next_stream_id()?; + + // The `pending` argument is provided by the `Client`, and holds + // a store `Key` of a `Stream` that may have been not been opened + // yet. + // + // If that stream is still pending, the Client isn't allowed to + // queue up another pending stream. They should use `poll_ready`. + if let Some(stream) = pending { + if me.store.resolve(stream.key).is_pending_open { + return Err(UserError::Rejected.into()); + } + } + + if me.counts.peer().is_server() { + // Servers cannot open streams. PushPromise must first be reserved. + return Err(UserError::UnexpectedFrameType.into()); + } + + let stream_id = me.actions.send.open()?; + + let mut stream = Stream::new( + stream_id, + me.actions.send.init_window_sz(), + me.actions.recv.init_window_sz(), + ); + + if *request.method() == Method::HEAD { + stream.content_length = ContentLength::Head; + } + + // Convert the message + let headers = + client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?; + + let mut stream = me.store.insert(stream.id, stream); + + let sent = me.actions.send.send_headers( + headers, + send_buffer, + &mut stream, + &mut me.counts, + &mut me.actions.task, + ); + + // send_headers can return a UserError, if it does, + // we should forget about this stream. + if let Err(err) = sent { + stream.unlink(); + stream.remove(); + return Err(err.into()); + } + + // Given that the stream has been initialized, it should not be in the + // closed state. + debug_assert!(!stream.state.is_closed()); + + // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding + // the lock, so it can't. + me.refs += 1; + + Ok(StreamRef { + opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream), + send_buffer: self.send_buffer.clone(), + }) + } + + pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { + self.inner + .lock() + .unwrap() + .actions + .send + .is_extended_connect_protocol_enabled() + } +} + +impl<B> DynStreams<'_, B> { + pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> { + let mut me = self.inner.lock().unwrap(); + + me.recv_headers(self.peer, &self.send_buffer, frame) + } + + pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), Error> { + let mut me = self.inner.lock().unwrap(); + me.recv_data(self.peer, &self.send_buffer, frame) + } + + pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error> { + let mut me = self.inner.lock().unwrap(); + + me.recv_reset(&self.send_buffer, frame) + } + + /// Notify all streams that a connection-level error happened. + pub fn handle_error(&mut self, err: proto::Error) -> StreamId { + let mut me = self.inner.lock().unwrap(); + me.handle_error(&self.send_buffer, err) + } + + pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error> { + let mut me = self.inner.lock().unwrap(); + me.recv_go_away(&self.send_buffer, frame) + } + + pub fn last_processed_id(&self) -> StreamId { + self.inner.lock().unwrap().actions.recv.last_processed_id() + } + + pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error> { + let mut me = self.inner.lock().unwrap(); + me.recv_window_update(&self.send_buffer, frame) + } + + pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error> { + let mut me = self.inner.lock().unwrap(); + me.recv_push_promise(&self.send_buffer, frame) + } + + pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> { + let mut me = self.inner.lock().map_err(|_| ())?; + me.recv_eof(&self.send_buffer, clear_pending_accept) + } + + pub fn send_reset(&mut self, id: StreamId, reason: Reason) { + let mut me = self.inner.lock().unwrap(); + me.send_reset(&self.send_buffer, id, reason) + } + + pub fn send_go_away(&mut self, last_processed_id: StreamId) { + let mut me = self.inner.lock().unwrap(); + me.actions.recv.go_away(last_processed_id); + } +} + +impl Inner { + fn new(peer: peer::Dyn, config: Config) -> Arc<Mutex<Self>> { + Arc::new(Mutex::new(Inner { + counts: Counts::new(peer, &config), + actions: Actions { + recv: Recv::new(peer, &config), + send: Send::new(&config), + task: None, + conn_error: None, + }, + store: Store::new(), + refs: 1, + })) + } + + fn recv_headers<B>( + &mut self, + peer: peer::Dyn, + send_buffer: &SendBuffer<B>, + frame: frame::Headers, + ) -> Result<(), Error> { + let id = frame.stream_id(); + + // The GOAWAY process has begun. All streams with a greater ID than + // specified as part of GOAWAY should be ignored. + if id > self.actions.recv.max_stream_id() { + tracing::trace!( + "id ({:?}) > max_stream_id ({:?}), ignoring HEADERS", + id, + self.actions.recv.max_stream_id() + ); + return Ok(()); + } + + let key = match self.store.find_entry(id) { + Entry::Occupied(e) => e.key(), + Entry::Vacant(e) => { + // Client: it's possible to send a request, and then send + // a RST_STREAM while the response HEADERS were in transit. + // + // Server: we can't reset a stream before having received + // the request headers, so don't allow. + if !peer.is_server() { + // This may be response headers for a stream we've already + // forgotten about... + if self.actions.may_have_forgotten_stream(peer, id) { + tracing::debug!( + "recv_headers for old stream={:?}, sending STREAM_CLOSED", + id, + ); + return Err(Error::library_reset(id, Reason::STREAM_CLOSED)); + } + } + + match self + .actions + .recv + .open(id, Open::Headers, &mut self.counts)? + { + Some(stream_id) => { + let stream = Stream::new( + stream_id, + self.actions.send.init_window_sz(), + self.actions.recv.init_window_sz(), + ); + + e.insert(stream) + } + None => return Ok(()), + } + } + }; + + let stream = self.store.resolve(key); + + if stream.state.is_local_reset() { + // Locally reset streams must ignore frames "for some time". + // This is because the remote may have sent trailers before + // receiving the RST_STREAM frame. + tracing::trace!("recv_headers; ignoring trailers on {:?}", stream.id); + return Ok(()); + } + + let actions = &mut self.actions; + let mut send_buffer = send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + self.counts.transition(stream, |counts, stream| { + tracing::trace!( + "recv_headers; stream={:?}; state={:?}", + stream.id, + stream.state + ); + + let res = if stream.state.is_recv_headers() { + match actions.recv.recv_headers(frame, stream, counts) { + Ok(()) => Ok(()), + Err(RecvHeaderBlockError::Oversize(resp)) => { + if let Some(resp) = resp { + let sent = actions.send.send_headers( + resp, send_buffer, stream, counts, &mut actions.task); + debug_assert!(sent.is_ok(), "oversize response should not fail"); + + actions.send.schedule_implicit_reset( + stream, + Reason::REFUSED_STREAM, + counts, + &mut actions.task); + + actions.recv.enqueue_reset_expiration(stream, counts); + + Ok(()) + } else { + Err(Error::library_reset(stream.id, Reason::REFUSED_STREAM)) + } + }, + Err(RecvHeaderBlockError::State(err)) => Err(err), + } + } else { + if !frame.is_end_stream() { + // Receiving trailers that don't set EOS is a "malformed" + // message. Malformed messages are a stream error. + proto_err!(stream: "recv_headers: trailers frame was not EOS; stream={:?}", stream.id); + return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)); + } + + actions.recv.recv_trailers(frame, stream) + }; + + actions.reset_on_recv_stream_err(send_buffer, stream, counts, res) + }) + } + + fn recv_data<B>( + &mut self, + peer: peer::Dyn, + send_buffer: &SendBuffer<B>, + frame: frame::Data, + ) -> Result<(), Error> { + let id = frame.stream_id(); + + let stream = match self.store.find_mut(&id) { + Some(stream) => stream, + None => { + // The GOAWAY process has begun. All streams with a greater ID + // than specified as part of GOAWAY should be ignored. + if id > self.actions.recv.max_stream_id() { + tracing::trace!( + "id ({:?}) > max_stream_id ({:?}), ignoring DATA", + id, + self.actions.recv.max_stream_id() + ); + return Ok(()); + } + + if self.actions.may_have_forgotten_stream(peer, id) { + tracing::debug!("recv_data for old stream={:?}, sending STREAM_CLOSED", id,); + + let sz = frame.payload().len(); + // This should have been enforced at the codec::FramedRead layer, so + // this is just a sanity check. + assert!(sz <= super::MAX_WINDOW_SIZE as usize); + let sz = sz as WindowSize; + + self.actions.recv.ignore_data(sz)?; + return Err(Error::library_reset(id, Reason::STREAM_CLOSED)); + } + + proto_err!(conn: "recv_data: stream not found; id={:?}", id); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + }; + + let actions = &mut self.actions; + let mut send_buffer = send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + self.counts.transition(stream, |counts, stream| { + let sz = frame.payload().len(); + let res = actions.recv.recv_data(frame, stream); + + // Any stream error after receiving a DATA frame means + // we won't give the data to the user, and so they can't + // release the capacity. We do it automatically. + if let Err(Error::Reset(..)) = res { + actions + .recv + .release_connection_capacity(sz as WindowSize, &mut None); + } + actions.reset_on_recv_stream_err(send_buffer, stream, counts, res) + }) + } + + fn recv_reset<B>( + &mut self, + send_buffer: &SendBuffer<B>, + frame: frame::Reset, + ) -> Result<(), Error> { + let id = frame.stream_id(); + + if id.is_zero() { + proto_err!(conn: "recv_reset: invalid stream ID 0"); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); + } + + // The GOAWAY process has begun. All streams with a greater ID than + // specified as part of GOAWAY should be ignored. + if id > self.actions.recv.max_stream_id() { + tracing::trace!( + "id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM", + id, + self.actions.recv.max_stream_id() + ); + return Ok(()); + } + + let stream = match self.store.find_mut(&id) { + Some(stream) => stream, + None => { + // TODO: Are there other error cases? + self.actions + .ensure_not_idle(self.counts.peer(), id) + .map_err(Error::library_go_away)?; + + return Ok(()); + } + }; + + let mut send_buffer = send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + let actions = &mut self.actions; + + self.counts.transition(stream, |counts, stream| { + actions.recv.recv_reset(frame, stream); + actions.send.handle_error(send_buffer, stream, counts); + assert!(stream.state.is_closed()); + Ok(()) + }) + } + + fn recv_window_update<B>( + &mut self, + send_buffer: &SendBuffer<B>, + frame: frame::WindowUpdate, + ) -> Result<(), Error> { + let id = frame.stream_id(); + + let mut send_buffer = send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + if id.is_zero() { + self.actions + .send + .recv_connection_window_update(frame, &mut self.store, &mut self.counts) + .map_err(Error::library_go_away)?; + } else { + // The remote may send window updates for streams that the local now + // considers closed. It's ok... + if let Some(mut stream) = self.store.find_mut(&id) { + // This result is ignored as there is nothing to do when there + // is an error. The stream is reset by the function on error and + // the error is informational. + let _ = self.actions.send.recv_stream_window_update( + frame.size_increment(), + send_buffer, + &mut stream, + &mut self.counts, + &mut self.actions.task, + ); + } else { + self.actions + .ensure_not_idle(self.counts.peer(), id) + .map_err(Error::library_go_away)?; + } + } + + Ok(()) + } + + fn handle_error<B>(&mut self, send_buffer: &SendBuffer<B>, err: proto::Error) -> StreamId { + let actions = &mut self.actions; + let counts = &mut self.counts; + let mut send_buffer = send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + let last_processed_id = actions.recv.last_processed_id(); + + self.store.for_each(|stream| { + counts.transition(stream, |counts, stream| { + actions.recv.handle_error(&err, &mut *stream); + actions.send.handle_error(send_buffer, stream, counts); + }) + }); + + actions.conn_error = Some(err); + + last_processed_id + } + + fn recv_go_away<B>( + &mut self, + send_buffer: &SendBuffer<B>, + frame: &frame::GoAway, + ) -> Result<(), Error> { + let actions = &mut self.actions; + let counts = &mut self.counts; + let mut send_buffer = send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + let last_stream_id = frame.last_stream_id(); + + actions.send.recv_go_away(last_stream_id)?; + + let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason()); + + self.store.for_each(|stream| { + if stream.id > last_stream_id { + counts.transition(stream, |counts, stream| { + actions.recv.handle_error(&err, &mut *stream); + actions.send.handle_error(send_buffer, stream, counts); + }) + } + }); + + actions.conn_error = Some(err); + + Ok(()) + } + + fn recv_push_promise<B>( + &mut self, + send_buffer: &SendBuffer<B>, + frame: frame::PushPromise, + ) -> Result<(), Error> { + let id = frame.stream_id(); + let promised_id = frame.promised_id(); + + // First, ensure that the initiating stream is still in a valid state. + let parent_key = match self.store.find_mut(&id) { + Some(stream) => { + // The GOAWAY process has begun. All streams with a greater ID + // than specified as part of GOAWAY should be ignored. + if id > self.actions.recv.max_stream_id() { + tracing::trace!( + "id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE", + id, + self.actions.recv.max_stream_id() + ); + return Ok(()); + } + + // The stream must be receive open + stream.state.ensure_recv_open()?; + stream.key() + } + None => { + proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state"); + return Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()); + } + }; + + // TODO: Streams in the reserved states do not count towards the concurrency + // limit. However, it seems like there should be a cap otherwise this + // could grow in memory indefinitely. + + // Ensure that we can reserve streams + self.actions.recv.ensure_can_reserve()?; + + // Next, open the stream. + // + // If `None` is returned, then the stream is being refused. There is no + // further work to be done. + if self + .actions + .recv + .open(promised_id, Open::PushPromise, &mut self.counts)? + .is_none() + { + return Ok(()); + } + + // Try to handle the frame and create a corresponding key for the pushed stream + // this requires a bit of indirection to make the borrow checker happy. + let child_key: Option<store::Key> = { + // Create state for the stream + let stream = self.store.insert(promised_id, { + Stream::new( + promised_id, + self.actions.send.init_window_sz(), + self.actions.recv.init_window_sz(), + ) + }); + + let actions = &mut self.actions; + + self.counts.transition(stream, |counts, stream| { + let stream_valid = actions.recv.recv_push_promise(frame, stream); + + match stream_valid { + Ok(()) => Ok(Some(stream.key())), + _ => { + let mut send_buffer = send_buffer.inner.lock().unwrap(); + actions + .reset_on_recv_stream_err( + &mut *send_buffer, + stream, + counts, + stream_valid, + ) + .map(|()| None) + } + } + })? + }; + // If we're successful, push the headers and stream... + if let Some(child) = child_key { + let mut ppp = self.store[parent_key].pending_push_promises.take(); + ppp.push(&mut self.store.resolve(child)); + + let parent = &mut self.store.resolve(parent_key); + parent.pending_push_promises = ppp; + parent.notify_recv(); + }; + + Ok(()) + } + + fn recv_eof<B>( + &mut self, + send_buffer: &SendBuffer<B>, + clear_pending_accept: bool, + ) -> Result<(), ()> { + let actions = &mut self.actions; + let counts = &mut self.counts; + let mut send_buffer = send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + if actions.conn_error.is_none() { + actions.conn_error = Some(io::Error::from(io::ErrorKind::BrokenPipe).into()); + } + + tracing::trace!("Streams::recv_eof"); + + self.store.for_each(|stream| { + counts.transition(stream, |counts, stream| { + actions.recv.recv_eof(stream); + + // This handles resetting send state associated with the + // stream + actions.send.handle_error(send_buffer, stream, counts); + }) + }); + + actions.clear_queues(clear_pending_accept, &mut self.store, counts); + Ok(()) + } + + fn poll_complete<T, B>( + &mut self, + send_buffer: &SendBuffer<B>, + cx: &mut Context, + dst: &mut Codec<T, Prioritized<B>>, + ) -> Poll<io::Result<()>> + where + T: AsyncWrite + Unpin, + B: Buf, + { + let mut send_buffer = send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + // Send WINDOW_UPDATE frames first + // + // TODO: It would probably be better to interleave updates w/ data + // frames. + ready!(self + .actions + .recv + .poll_complete(cx, &mut self.store, &mut self.counts, dst))?; + + // Send any other pending frames + ready!(self.actions.send.poll_complete( + cx, + send_buffer, + &mut self.store, + &mut self.counts, + dst + ))?; + + // Nothing else to do, track the task + self.actions.task = Some(cx.waker().clone()); + + Poll::Ready(Ok(())) + } + + fn send_reset<B>(&mut self, send_buffer: &SendBuffer<B>, id: StreamId, reason: Reason) { + let key = match self.store.find_entry(id) { + Entry::Occupied(e) => e.key(), + Entry::Vacant(e) => { + // Resetting a stream we don't know about? That could be OK... + // + // 1. As a server, we just received a request, but that request + // was bad, so we're resetting before even accepting it. + // This is totally fine. + // + // 2. The remote may have sent us a frame on new stream that + // it's *not* supposed to have done, and thus, we don't know + // the stream. In that case, sending a reset will "open" the + // stream in our store. Maybe that should be a connection + // error instead? At least for now, we need to update what + // our vision of the next stream is. + if self.counts.peer().is_local_init(id) { + // We normally would open this stream, so update our + // next-send-id record. + self.actions.send.maybe_reset_next_stream_id(id); + } else { + // We normally would recv this stream, so update our + // next-recv-id record. + self.actions.recv.maybe_reset_next_stream_id(id); + } + + let stream = Stream::new(id, 0, 0); + + e.insert(stream) + } + }; + + let stream = self.store.resolve(key); + let mut send_buffer = send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + self.actions.send_reset( + stream, + reason, + Initiator::Library, + &mut self.counts, + send_buffer, + ); + } +} + +impl<B> Streams<B, client::Peer> +where + B: Buf, +{ + pub fn poll_pending_open( + &mut self, + cx: &Context, + pending: Option<&OpaqueStreamRef>, + ) -> Poll<Result<(), crate::Error>> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + me.actions.ensure_no_conn_error()?; + me.actions.send.ensure_next_stream_id()?; + + if let Some(pending) = pending { + let mut stream = me.store.resolve(pending.key); + tracing::trace!("poll_pending_open; stream = {:?}", stream.is_pending_open); + if stream.is_pending_open { + stream.wait_send(cx); + return Poll::Pending; + } + } + Poll::Ready(Ok(())) + } +} + +impl<B, P> Streams<B, P> +where + P: Peer, +{ + pub fn as_dyn(&self) -> DynStreams<B> { + let Self { + inner, + send_buffer, + _p, + } = self; + DynStreams { + inner, + send_buffer, + peer: P::r#dyn(), + } + } + + /// This function is safe to call multiple times. + /// + /// A `Result` is returned to avoid panicking if the mutex is poisoned. + pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> { + self.as_dyn().recv_eof(clear_pending_accept) + } + + pub(crate) fn max_send_streams(&self) -> usize { + self.inner.lock().unwrap().counts.max_send_streams() + } + + pub(crate) fn max_recv_streams(&self) -> usize { + self.inner.lock().unwrap().counts.max_recv_streams() + } + + #[cfg(feature = "unstable")] + pub fn num_active_streams(&self) -> usize { + let me = self.inner.lock().unwrap(); + me.store.num_active_streams() + } + + pub fn has_streams(&self) -> bool { + let me = self.inner.lock().unwrap(); + me.counts.has_streams() + } + + pub fn has_streams_or_other_references(&self) -> bool { + let me = self.inner.lock().unwrap(); + me.counts.has_streams() || me.refs > 1 + } + + #[cfg(feature = "unstable")] + pub fn num_wired_streams(&self) -> usize { + let me = self.inner.lock().unwrap(); + me.store.num_wired_streams() + } +} + +// no derive because we don't need B and P to be Clone. +impl<B, P> Clone for Streams<B, P> +where + P: Peer, +{ + fn clone(&self) -> Self { + self.inner.lock().unwrap().refs += 1; + Streams { + inner: self.inner.clone(), + send_buffer: self.send_buffer.clone(), + _p: ::std::marker::PhantomData, + } + } +} + +impl<B, P> Drop for Streams<B, P> +where + P: Peer, +{ + fn drop(&mut self) { + if let Ok(mut inner) = self.inner.lock() { + inner.refs -= 1; + if inner.refs == 1 { + if let Some(task) = inner.actions.task.take() { + task.wake(); + } + } + } + } +} + +// ===== impl StreamRef ===== + +impl<B> StreamRef<B> { + pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError> + where + B: Buf, + { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let stream = me.store.resolve(self.opaque.key); + let actions = &mut me.actions; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + me.counts.transition(stream, |counts, stream| { + // Create the data frame + let mut frame = frame::Data::new(stream.id, data); + frame.set_end_stream(end_stream); + + // Send the data frame + actions + .send + .send_data(frame, send_buffer, stream, counts, &mut actions.task) + }) + } + + pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError> { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let stream = me.store.resolve(self.opaque.key); + let actions = &mut me.actions; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + me.counts.transition(stream, |counts, stream| { + // Create the trailers frame + let frame = frame::Headers::trailers(stream.id, trailers); + + // Send the trailers frame + actions + .send + .send_trailers(frame, send_buffer, stream, counts, &mut actions.task) + }) + } + + pub fn send_reset(&mut self, reason: Reason) { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let stream = me.store.resolve(self.opaque.key); + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + me.actions + .send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer); + } + + pub fn send_response( + &mut self, + mut response: Response<()>, + end_of_stream: bool, + ) -> Result<(), UserError> { + // Clear before taking lock, incase extensions contain a StreamRef. + response.extensions_mut().clear(); + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let stream = me.store.resolve(self.opaque.key); + let actions = &mut me.actions; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + me.counts.transition(stream, |counts, stream| { + let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream); + + actions + .send + .send_headers(frame, send_buffer, stream, counts, &mut actions.task) + }) + } + + pub fn send_push_promise( + &mut self, + mut request: Request<()>, + ) -> Result<StreamRef<B>, UserError> { + // Clear before taking lock, incase extensions contain a StreamRef. + request.extensions_mut().clear(); + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + let actions = &mut me.actions; + let promised_id = actions.send.reserve_local()?; + + let child_key = { + let mut child_stream = me.store.insert( + promised_id, + Stream::new( + promised_id, + actions.send.init_window_sz(), + actions.recv.init_window_sz(), + ), + ); + child_stream.state.reserve_local()?; + child_stream.is_pending_push = true; + child_stream.key() + }; + + let pushed = { + let mut stream = me.store.resolve(self.opaque.key); + + let frame = crate::server::Peer::convert_push_message(stream.id, promised_id, request)?; + + actions + .send + .send_push_promise(frame, send_buffer, &mut stream, &mut actions.task) + }; + + if let Err(err) = pushed { + let mut child_stream = me.store.resolve(child_key); + child_stream.unlink(); + child_stream.remove(); + return Err(err.into()); + } + + me.refs += 1; + let opaque = + OpaqueStreamRef::new(self.opaque.inner.clone(), &mut me.store.resolve(child_key)); + + Ok(StreamRef { + opaque, + send_buffer: self.send_buffer.clone(), + }) + } + + /// Called by the server after the stream is accepted. Given that clients + /// initialize streams by sending HEADERS, the request will always be + /// available. + /// + /// # Panics + /// + /// This function panics if the request isn't present. + pub fn take_request(&self) -> Request<()> { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.opaque.key); + me.actions.recv.take_request(&mut stream) + } + + /// Called by a client to see if the current stream is pending open + pub fn is_pending_open(&self) -> bool { + let mut me = self.opaque.inner.lock().unwrap(); + me.store.resolve(self.opaque.key).is_pending_open + } + + /// Request capacity to send data + pub fn reserve_capacity(&mut self, capacity: WindowSize) { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.opaque.key); + + me.actions + .send + .reserve_capacity(capacity, &mut stream, &mut me.counts) + } + + /// Returns the stream's current send capacity. + pub fn capacity(&self) -> WindowSize { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.opaque.key); + + me.actions.send.capacity(&mut stream) + } + + /// Request to be notified when the stream's capacity increases + pub fn poll_capacity(&mut self, cx: &Context) -> Poll<Option<Result<WindowSize, UserError>>> { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.opaque.key); + + me.actions.send.poll_capacity(cx, &mut stream) + } + + /// Request to be notified for if a `RST_STREAM` is received for this stream. + pub(crate) fn poll_reset( + &mut self, + cx: &Context, + mode: proto::PollReset, + ) -> Poll<Result<Reason, crate::Error>> { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.opaque.key); + + me.actions + .send + .poll_reset(cx, &mut stream, mode) + .map_err(From::from) + } + + pub fn clone_to_opaque(&self) -> OpaqueStreamRef + where + B: 'static, + { + self.opaque.clone() + } + + pub fn stream_id(&self) -> StreamId { + self.opaque.stream_id() + } +} + +impl<B> Clone for StreamRef<B> { + fn clone(&self) -> Self { + StreamRef { + opaque: self.opaque.clone(), + send_buffer: self.send_buffer.clone(), + } + } +} + +// ===== impl OpaqueStreamRef ===== + +impl OpaqueStreamRef { + fn new(inner: Arc<Mutex<Inner>>, stream: &mut store::Ptr) -> OpaqueStreamRef { + stream.ref_inc(); + OpaqueStreamRef { + inner, + key: stream.key(), + } + } + /// Called by a client to check for a received response. + pub fn poll_response(&mut self, cx: &Context) -> Poll<Result<Response<()>, proto::Error>> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.recv.poll_response(cx, &mut stream) + } + /// Called by a client to check for a pushed request. + pub fn poll_pushed( + &mut self, + cx: &Context, + ) -> Poll<Option<Result<(Request<()>, OpaqueStreamRef), proto::Error>>> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + me.actions + .recv + .poll_pushed(cx, &mut stream) + .map_ok(|(h, key)| { + me.refs += 1; + let opaque_ref = + OpaqueStreamRef::new(self.inner.clone(), &mut me.store.resolve(key)); + (h, opaque_ref) + }) + } + + pub fn is_end_stream(&self) -> bool { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let stream = me.store.resolve(self.key); + + me.actions.recv.is_end_stream(&stream) + } + + pub fn poll_data(&mut self, cx: &Context) -> Poll<Option<Result<Bytes, proto::Error>>> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.recv.poll_data(cx, &mut stream) + } + + pub fn poll_trailers(&mut self, cx: &Context) -> Poll<Option<Result<HeaderMap, proto::Error>>> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.recv.poll_trailers(cx, &mut stream) + } + + pub(crate) fn available_recv_capacity(&self) -> isize { + let me = self.inner.lock().unwrap(); + let me = &*me; + + let stream = &me.store[self.key]; + stream.recv_flow.available().into() + } + + pub(crate) fn used_recv_capacity(&self) -> WindowSize { + let me = self.inner.lock().unwrap(); + let me = &*me; + + let stream = &me.store[self.key]; + stream.in_flight_recv_data + } + + /// Releases recv capacity back to the peer. This may result in sending + /// WINDOW_UPDATE frames on both the stream and connection. + pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions + .recv + .release_capacity(capacity, &mut stream, &mut me.actions.task) + } + + pub(crate) fn clear_recv_buffer(&mut self) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.recv.clear_recv_buffer(&mut stream); + } + + pub fn stream_id(&self) -> StreamId { + self.inner.lock().unwrap().store[self.key].id + } +} + +impl fmt::Debug for OpaqueStreamRef { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + use std::sync::TryLockError::*; + + match self.inner.try_lock() { + Ok(me) => { + let stream = &me.store[self.key]; + fmt.debug_struct("OpaqueStreamRef") + .field("stream_id", &stream.id) + .field("ref_count", &stream.ref_count) + .finish() + } + Err(Poisoned(_)) => fmt + .debug_struct("OpaqueStreamRef") + .field("inner", &"<Poisoned>") + .finish(), + Err(WouldBlock) => fmt + .debug_struct("OpaqueStreamRef") + .field("inner", &"<Locked>") + .finish(), + } + } +} + +impl Clone for OpaqueStreamRef { + fn clone(&self) -> Self { + // Increment the ref count + let mut inner = self.inner.lock().unwrap(); + inner.store.resolve(self.key).ref_inc(); + inner.refs += 1; + + OpaqueStreamRef { + inner: self.inner.clone(), + key: self.key.clone(), + } + } +} + +impl Drop for OpaqueStreamRef { + fn drop(&mut self) { + drop_stream_ref(&self.inner, self.key); + } +} + +// TODO: Move back in fn above +fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) { + let mut me = match inner.lock() { + Ok(inner) => inner, + Err(_) => { + if ::std::thread::panicking() { + tracing::trace!("StreamRef::drop; mutex poisoned"); + return; + } else { + panic!("StreamRef::drop; mutex poisoned"); + } + } + }; + + let me = &mut *me; + me.refs -= 1; + let mut stream = me.store.resolve(key); + + tracing::trace!("drop_stream_ref; stream={:?}", stream); + + // decrement the stream's ref count by 1. + stream.ref_dec(); + + let actions = &mut me.actions; + + // If the stream is not referenced and it is already + // closed (does not have to go through logic below + // of canceling the stream), we should notify the task + // (connection) so that it can close properly + if stream.ref_count == 0 && stream.is_closed() { + if let Some(task) = actions.task.take() { + task.wake(); + } + } + + me.counts.transition(stream, |counts, stream| { + maybe_cancel(stream, actions, counts); + + if stream.ref_count == 0 { + // Release any recv window back to connection, no one can access + // it anymore. + actions + .recv + .release_closed_capacity(stream, &mut actions.task); + + // We won't be able to reach our push promises anymore + let mut ppp = stream.pending_push_promises.take(); + while let Some(promise) = ppp.pop(stream.store_mut()) { + counts.transition(promise, |counts, stream| { + maybe_cancel(stream, actions, counts); + }); + } + } + }); +} + +fn maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Counts) { + if stream.is_canceled_interest() { + // Server is allowed to early respond without fully consuming the client input stream + // But per the RFC, must send a RST_STREAM(NO_ERROR) in such cases. https://www.rfc-editor.org/rfc/rfc7540#section-8.1 + // Some other http2 implementation may interpret other error code as fatal if not respected (i.e: nginx https://trac.nginx.org/nginx/ticket/2376) + let reason = if counts.peer().is_server() + && stream.state.is_send_closed() + && stream.state.is_recv_streaming() + { + Reason::NO_ERROR + } else { + Reason::CANCEL + }; + + actions + .send + .schedule_implicit_reset(stream, reason, counts, &mut actions.task); + actions.recv.enqueue_reset_expiration(stream, counts); + } +} + +// ===== impl SendBuffer ===== + +impl<B> SendBuffer<B> { + fn new() -> Self { + let inner = Mutex::new(Buffer::new()); + SendBuffer { inner } + } +} + +// ===== impl Actions ===== + +impl Actions { + fn send_reset<B>( + &mut self, + stream: store::Ptr, + reason: Reason, + initiator: Initiator, + counts: &mut Counts, + send_buffer: &mut Buffer<Frame<B>>, + ) { + counts.transition(stream, |counts, stream| { + self.send.send_reset( + reason, + initiator, + send_buffer, + stream, + counts, + &mut self.task, + ); + self.recv.enqueue_reset_expiration(stream, counts); + // if a RecvStream is parked, ensure it's notified + stream.notify_recv(); + }); + } + + fn reset_on_recv_stream_err<B>( + &mut self, + buffer: &mut Buffer<Frame<B>>, + stream: &mut store::Ptr, + counts: &mut Counts, + res: Result<(), Error>, + ) -> Result<(), Error> { + if let Err(Error::Reset(stream_id, reason, initiator)) = res { + debug_assert_eq!(stream_id, stream.id); + // Reset the stream. + self.send + .send_reset(reason, initiator, buffer, stream, counts, &mut self.task); + Ok(()) + } else { + res + } + } + + fn ensure_not_idle(&mut self, peer: peer::Dyn, id: StreamId) -> Result<(), Reason> { + if peer.is_local_init(id) { + self.send.ensure_not_idle(id) + } else { + self.recv.ensure_not_idle(id) + } + } + + fn ensure_no_conn_error(&self) -> Result<(), proto::Error> { + if let Some(ref err) = self.conn_error { + Err(err.clone()) + } else { + Ok(()) + } + } + + /// Check if we possibly could have processed and since forgotten this stream. + /// + /// If we send a RST_STREAM for a stream, we will eventually "forget" about + /// the stream to free up memory. It's possible that the remote peer had + /// frames in-flight, and by the time we receive them, our own state is + /// gone. We *could* tear everything down by sending a GOAWAY, but it + /// is more likely to be latency/memory constraints that caused this, + /// and not a bad actor. So be less catastrophic, the spec allows + /// us to send another RST_STREAM of STREAM_CLOSED. + fn may_have_forgotten_stream(&self, peer: peer::Dyn, id: StreamId) -> bool { + if id.is_zero() { + return false; + } + if peer.is_local_init(id) { + self.send.may_have_created_stream(id) + } else { + self.recv.may_have_created_stream(id) + } + } + + fn clear_queues(&mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts) { + self.recv.clear_queues(clear_pending_accept, store, counts); + self.send.clear_queues(store, counts); + } +} |