diff options
Diffstat (limited to 'third_party/rust/h2/src/proto/streams/stream.rs')
-rw-r--r-- | third_party/rust/h2/src/proto/streams/stream.rs | 490 |
1 files changed, 490 insertions, 0 deletions
diff --git a/third_party/rust/h2/src/proto/streams/stream.rs b/third_party/rust/h2/src/proto/streams/stream.rs new file mode 100644 index 0000000000..36d515bad2 --- /dev/null +++ b/third_party/rust/h2/src/proto/streams/stream.rs @@ -0,0 +1,490 @@ +use super::*; + +use std::task::{Context, Waker}; +use std::time::Instant; +use std::usize; + +/// Tracks Stream related state +/// +/// # Reference counting +/// +/// There can be a number of outstanding handles to a single Stream. These are +/// tracked using reference counting. The `ref_count` field represents the +/// number of outstanding userspace handles that can reach this stream. +/// +/// It's important to note that when the stream is placed in an internal queue +/// (such as an accept queue), this is **not** tracked by a reference count. +/// Thus, `ref_count` can be zero and the stream still has to be kept around. +#[derive(Debug)] +pub(super) struct Stream { + /// The h2 stream identifier + pub id: StreamId, + + /// Current state of the stream + pub state: State, + + /// Set to `true` when the stream is counted against the connection's max + /// concurrent streams. + pub is_counted: bool, + + /// Number of outstanding handles pointing to this stream + pub ref_count: usize, + + // ===== Fields related to sending ===== + /// Next node in the accept linked list + pub next_pending_send: Option<store::Key>, + + /// Set to true when the stream is pending accept + pub is_pending_send: bool, + + /// Send data flow control + pub send_flow: FlowControl, + + /// Amount of send capacity that has been requested, but not yet allocated. + pub requested_send_capacity: WindowSize, + + /// Amount of data buffered at the prioritization layer. + /// TODO: Technically this could be greater than the window size... + pub buffered_send_data: usize, + + /// Task tracking additional send capacity (i.e. window updates). + send_task: Option<Waker>, + + /// Frames pending for this stream being sent to the socket + pub pending_send: buffer::Deque, + + /// Next node in the linked list of streams waiting for additional + /// connection level capacity. + pub next_pending_send_capacity: Option<store::Key>, + + /// True if the stream is waiting for outbound connection capacity + pub is_pending_send_capacity: bool, + + /// Set to true when the send capacity has been incremented + pub send_capacity_inc: bool, + + /// Next node in the open linked list + pub next_open: Option<store::Key>, + + /// Set to true when the stream is pending to be opened + pub is_pending_open: bool, + + /// Set to true when a push is pending for this stream + pub is_pending_push: bool, + + // ===== Fields related to receiving ===== + /// Next node in the accept linked list + pub next_pending_accept: Option<store::Key>, + + /// Set to true when the stream is pending accept + pub is_pending_accept: bool, + + /// Receive data flow control + pub recv_flow: FlowControl, + + pub in_flight_recv_data: WindowSize, + + /// Next node in the linked list of streams waiting to send window updates. + pub next_window_update: Option<store::Key>, + + /// True if the stream is waiting to send a window update + pub is_pending_window_update: bool, + + /// The time when this stream may have been locally reset. + pub reset_at: Option<Instant>, + + /// Next node in list of reset streams that should expire eventually + pub next_reset_expire: Option<store::Key>, + + /// Frames pending for this stream to read + pub pending_recv: buffer::Deque, + + /// Task tracking receiving frames + pub recv_task: Option<Waker>, + + /// The stream's pending push promises + pub pending_push_promises: store::Queue<NextAccept>, + + /// Validate content-length headers + pub content_length: ContentLength, +} + +/// State related to validating a stream's content-length +#[derive(Debug)] +pub enum ContentLength { + Omitted, + Head, + Remaining(u64), +} + +#[derive(Debug)] +pub(super) struct NextAccept; + +#[derive(Debug)] +pub(super) struct NextSend; + +#[derive(Debug)] +pub(super) struct NextSendCapacity; + +#[derive(Debug)] +pub(super) struct NextWindowUpdate; + +#[derive(Debug)] +pub(super) struct NextOpen; + +#[derive(Debug)] +pub(super) struct NextResetExpire; + +impl Stream { + pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream { + let mut send_flow = FlowControl::new(); + let mut recv_flow = FlowControl::new(); + + recv_flow + .inc_window(init_recv_window) + .expect("invalid initial receive window"); + recv_flow.assign_capacity(init_recv_window); + + send_flow + .inc_window(init_send_window) + .expect("invalid initial send window size"); + + Stream { + id, + state: State::default(), + ref_count: 0, + is_counted: false, + + // ===== Fields related to sending ===== + next_pending_send: None, + is_pending_send: false, + send_flow, + requested_send_capacity: 0, + buffered_send_data: 0, + send_task: None, + pending_send: buffer::Deque::new(), + is_pending_send_capacity: false, + next_pending_send_capacity: None, + send_capacity_inc: false, + is_pending_open: false, + next_open: None, + is_pending_push: false, + + // ===== Fields related to receiving ===== + next_pending_accept: None, + is_pending_accept: false, + recv_flow, + in_flight_recv_data: 0, + next_window_update: None, + is_pending_window_update: false, + reset_at: None, + next_reset_expire: None, + pending_recv: buffer::Deque::new(), + recv_task: None, + pending_push_promises: store::Queue::new(), + content_length: ContentLength::Omitted, + } + } + + /// Increment the stream's ref count + pub fn ref_inc(&mut self) { + assert!(self.ref_count < usize::MAX); + self.ref_count += 1; + } + + /// Decrements the stream's ref count + pub fn ref_dec(&mut self) { + assert!(self.ref_count > 0); + self.ref_count -= 1; + } + + /// Returns true if stream is currently being held for some time because of + /// a local reset. + pub fn is_pending_reset_expiration(&self) -> bool { + self.reset_at.is_some() + } + + /// Returns true if frames for this stream are ready to be sent over the wire + pub fn is_send_ready(&self) -> bool { + // Why do we check pending_open? + // + // We allow users to call send_request() which schedules a stream to be pending_open + // if there is no room according to the concurrency limit (max_send_streams), and we + // also allow data to be buffered for send with send_data() if there is no capacity for + // the stream to send the data, which attempts to place the stream in pending_send. + // If the stream is not open, we don't want the stream to be scheduled for + // execution (pending_send). Note that if the stream is in pending_open, it will be + // pushed to pending_send when there is room for an open stream. + // + // In pending_push we track whether a PushPromise still needs to be sent + // from a different stream before we can start sending frames on this one. + // This is different from the "open" check because reserved streams don't count + // toward the concurrency limit. + // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2 + !self.is_pending_open && !self.is_pending_push + } + + /// Returns true if the stream is closed + pub fn is_closed(&self) -> bool { + // The state has fully transitioned to closed. + self.state.is_closed() && + // Because outbound frames transition the stream state before being + // buffered, we have to ensure that all frames have been flushed. + self.pending_send.is_empty() && + // Sometimes large data frames are sent out in chunks. After a chunk + // of the frame is sent, the remainder is pushed back onto the send + // queue to be rescheduled. + // + // Checking for additional buffered data lets us catch this case. + self.buffered_send_data == 0 + } + + /// Returns true if the stream is no longer in use + pub fn is_released(&self) -> bool { + // The stream is closed and fully flushed + self.is_closed() && + // There are no more outstanding references to the stream + self.ref_count == 0 && + // The stream is not in any queue + !self.is_pending_send && !self.is_pending_send_capacity && + !self.is_pending_accept && !self.is_pending_window_update && + !self.is_pending_open && !self.reset_at.is_some() + } + + /// Returns true when the consumer of the stream has dropped all handles + /// (indicating no further interest in the stream) and the stream state is + /// not actually closed. + /// + /// In this case, a reset should be sent. + pub fn is_canceled_interest(&self) -> bool { + self.ref_count == 0 && !self.state.is_closed() + } + + pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { + debug_assert!(capacity > 0); + self.send_flow.assign_capacity(capacity); + + tracing::trace!( + " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={}", + self.send_flow.available(), + self.buffered_send_data, + self.id, + max_buffer_size + ); + + self.notify_if_can_buffer_more(max_buffer_size); + } + + /// If the capacity was limited because of the max_send_buffer_size, + /// then consider waking the send task again... + pub fn notify_if_can_buffer_more(&mut self, max_buffer_size: usize) { + let available = self.send_flow.available().as_size() as usize; + let buffered = self.buffered_send_data; + + // Only notify if the capacity exceeds the amount of buffered data + if available.min(max_buffer_size) > buffered { + self.send_capacity_inc = true; + tracing::trace!(" notifying task"); + self.notify_send(); + } + } + + /// Returns `Err` when the decrement cannot be completed due to overflow. + pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { + match self.content_length { + ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) { + Some(val) => *rem = val, + None => return Err(()), + }, + ContentLength::Head => { + if len != 0 { + return Err(()); + } + } + _ => {} + } + + Ok(()) + } + + pub fn ensure_content_length_zero(&self) -> Result<(), ()> { + match self.content_length { + ContentLength::Remaining(0) => Ok(()), + ContentLength::Remaining(_) => Err(()), + _ => Ok(()), + } + } + + pub fn notify_send(&mut self) { + if let Some(task) = self.send_task.take() { + task.wake(); + } + } + + pub fn wait_send(&mut self, cx: &Context) { + self.send_task = Some(cx.waker().clone()); + } + + pub fn notify_recv(&mut self) { + if let Some(task) = self.recv_task.take() { + task.wake(); + } + } +} + +impl store::Next for NextAccept { + fn next(stream: &Stream) -> Option<store::Key> { + stream.next_pending_accept + } + + fn set_next(stream: &mut Stream, key: Option<store::Key>) { + stream.next_pending_accept = key; + } + + fn take_next(stream: &mut Stream) -> Option<store::Key> { + stream.next_pending_accept.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_accept + } + + fn set_queued(stream: &mut Stream, val: bool) { + stream.is_pending_accept = val; + } +} + +impl store::Next for NextSend { + fn next(stream: &Stream) -> Option<store::Key> { + stream.next_pending_send + } + + fn set_next(stream: &mut Stream, key: Option<store::Key>) { + stream.next_pending_send = key; + } + + fn take_next(stream: &mut Stream) -> Option<store::Key> { + stream.next_pending_send.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_send + } + + fn set_queued(stream: &mut Stream, val: bool) { + if val { + // ensure that stream is not queued for being opened + // if it's being put into queue for sending data + debug_assert_eq!(stream.is_pending_open, false); + } + stream.is_pending_send = val; + } +} + +impl store::Next for NextSendCapacity { + fn next(stream: &Stream) -> Option<store::Key> { + stream.next_pending_send_capacity + } + + fn set_next(stream: &mut Stream, key: Option<store::Key>) { + stream.next_pending_send_capacity = key; + } + + fn take_next(stream: &mut Stream) -> Option<store::Key> { + stream.next_pending_send_capacity.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_send_capacity + } + + fn set_queued(stream: &mut Stream, val: bool) { + stream.is_pending_send_capacity = val; + } +} + +impl store::Next for NextWindowUpdate { + fn next(stream: &Stream) -> Option<store::Key> { + stream.next_window_update + } + + fn set_next(stream: &mut Stream, key: Option<store::Key>) { + stream.next_window_update = key; + } + + fn take_next(stream: &mut Stream) -> Option<store::Key> { + stream.next_window_update.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_window_update + } + + fn set_queued(stream: &mut Stream, val: bool) { + stream.is_pending_window_update = val; + } +} + +impl store::Next for NextOpen { + fn next(stream: &Stream) -> Option<store::Key> { + stream.next_open + } + + fn set_next(stream: &mut Stream, key: Option<store::Key>) { + stream.next_open = key; + } + + fn take_next(stream: &mut Stream) -> Option<store::Key> { + stream.next_open.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_open + } + + fn set_queued(stream: &mut Stream, val: bool) { + if val { + // ensure that stream is not queued for being sent + // if it's being put into queue for opening the stream + debug_assert_eq!(stream.is_pending_send, false); + } + stream.is_pending_open = val; + } +} + +impl store::Next for NextResetExpire { + fn next(stream: &Stream) -> Option<store::Key> { + stream.next_reset_expire + } + + fn set_next(stream: &mut Stream, key: Option<store::Key>) { + stream.next_reset_expire = key; + } + + fn take_next(stream: &mut Stream) -> Option<store::Key> { + stream.next_reset_expire.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.reset_at.is_some() + } + + fn set_queued(stream: &mut Stream, val: bool) { + if val { + stream.reset_at = Some(Instant::now()); + } else { + stream.reset_at = None; + } + } +} + +// ===== impl ContentLength ===== + +impl ContentLength { + pub fn is_head(&self) -> bool { + match *self { + ContentLength::Head => true, + _ => false, + } + } +} |