summaryrefslogtreecommitdiffstats
path: root/third_party/rust/h2/src/proto/streams/counts.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/h2/src/proto/streams/counts.rs')
-rw-r--r--third_party/rust/h2/src/proto/streams/counts.rs253
1 files changed, 253 insertions, 0 deletions
diff --git a/third_party/rust/h2/src/proto/streams/counts.rs b/third_party/rust/h2/src/proto/streams/counts.rs
new file mode 100644
index 0000000000..add1312e55
--- /dev/null
+++ b/third_party/rust/h2/src/proto/streams/counts.rs
@@ -0,0 +1,253 @@
+use super::*;
+
+use std::usize;
+
+#[derive(Debug)]
+pub(super) struct Counts {
+ /// Acting as a client or server. This allows us to track which values to
+ /// inc / dec.
+ peer: peer::Dyn,
+
+ /// Maximum number of locally initiated streams
+ max_send_streams: usize,
+
+ /// Current number of remote initiated streams
+ num_send_streams: usize,
+
+ /// Maximum number of remote initiated streams
+ max_recv_streams: usize,
+
+ /// Current number of locally initiated streams
+ num_recv_streams: usize,
+
+ /// Maximum number of pending locally reset streams
+ max_local_reset_streams: usize,
+
+ /// Current number of pending locally reset streams
+ num_local_reset_streams: usize,
+
+ /// Max number of "pending accept" streams that were remotely reset
+ max_remote_reset_streams: usize,
+
+ /// Current number of "pending accept" streams that were remotely reset
+ num_remote_reset_streams: usize,
+}
+
+impl Counts {
+ /// Create a new `Counts` using the provided configuration values.
+ pub fn new(peer: peer::Dyn, config: &Config) -> Self {
+ Counts {
+ peer,
+ max_send_streams: config.initial_max_send_streams,
+ num_send_streams: 0,
+ max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
+ num_recv_streams: 0,
+ max_local_reset_streams: config.local_reset_max,
+ num_local_reset_streams: 0,
+ max_remote_reset_streams: config.remote_reset_max,
+ num_remote_reset_streams: 0,
+ }
+ }
+
+ /// Returns true when the next opened stream will reach capacity of outbound streams
+ ///
+ /// The number of client send streams is incremented in prioritize; send_request has to guess if
+ /// it should wait before allowing another request to be sent.
+ pub fn next_send_stream_will_reach_capacity(&self) -> bool {
+ self.max_send_streams <= (self.num_send_streams + 1)
+ }
+
+ /// Returns the current peer
+ pub fn peer(&self) -> peer::Dyn {
+ self.peer
+ }
+
+ pub fn has_streams(&self) -> bool {
+ self.num_send_streams != 0 || self.num_recv_streams != 0
+ }
+
+ /// Returns true if the receive stream concurrency can be incremented
+ pub fn can_inc_num_recv_streams(&self) -> bool {
+ self.max_recv_streams > self.num_recv_streams
+ }
+
+ /// Increments the number of concurrent receive streams.
+ ///
+ /// # Panics
+ ///
+ /// Panics on failure as this should have been validated before hand.
+ pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
+ assert!(self.can_inc_num_recv_streams());
+ assert!(!stream.is_counted);
+
+ // Increment the number of remote initiated streams
+ self.num_recv_streams += 1;
+ stream.is_counted = true;
+ }
+
+ /// Returns true if the send stream concurrency can be incremented
+ pub fn can_inc_num_send_streams(&self) -> bool {
+ self.max_send_streams > self.num_send_streams
+ }
+
+ /// Increments the number of concurrent send streams.
+ ///
+ /// # Panics
+ ///
+ /// Panics on failure as this should have been validated before hand.
+ pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
+ assert!(self.can_inc_num_send_streams());
+ assert!(!stream.is_counted);
+
+ // Increment the number of remote initiated streams
+ self.num_send_streams += 1;
+ stream.is_counted = true;
+ }
+
+ /// Returns true if the number of pending reset streams can be incremented.
+ pub fn can_inc_num_reset_streams(&self) -> bool {
+ self.max_local_reset_streams > self.num_local_reset_streams
+ }
+
+ /// Increments the number of pending reset streams.
+ ///
+ /// # Panics
+ ///
+ /// Panics on failure as this should have been validated before hand.
+ pub fn inc_num_reset_streams(&mut self) {
+ assert!(self.can_inc_num_reset_streams());
+
+ self.num_local_reset_streams += 1;
+ }
+
+ pub(crate) fn max_remote_reset_streams(&self) -> usize {
+ self.max_remote_reset_streams
+ }
+
+ /// Returns true if the number of pending REMOTE reset streams can be
+ /// incremented.
+ pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
+ self.max_remote_reset_streams > self.num_remote_reset_streams
+ }
+
+ /// Increments the number of pending REMOTE reset streams.
+ ///
+ /// # Panics
+ ///
+ /// Panics on failure as this should have been validated before hand.
+ pub(crate) fn inc_num_remote_reset_streams(&mut self) {
+ assert!(self.can_inc_num_remote_reset_streams());
+
+ self.num_remote_reset_streams += 1;
+ }
+
+ pub(crate) fn dec_num_remote_reset_streams(&mut self) {
+ assert!(self.num_remote_reset_streams > 0);
+
+ self.num_remote_reset_streams -= 1;
+ }
+
+ pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
+ if let Some(val) = settings.max_concurrent_streams() {
+ self.max_send_streams = val as usize;
+ }
+ }
+
+ /// Run a block of code that could potentially transition a stream's state.
+ ///
+ /// If the stream state transitions to closed, this function will perform
+ /// all necessary cleanup.
+ ///
+ /// TODO: Is this function still needed?
+ pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
+ where
+ F: FnOnce(&mut Self, &mut store::Ptr) -> U,
+ {
+ // TODO: Does this need to be computed before performing the action?
+ let is_pending_reset = stream.is_pending_reset_expiration();
+
+ // Run the action
+ let ret = f(self, &mut stream);
+
+ self.transition_after(stream, is_pending_reset);
+
+ ret
+ }
+
+ // TODO: move this to macro?
+ pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
+ tracing::trace!(
+ "transition_after; stream={:?}; state={:?}; is_closed={:?}; \
+ pending_send_empty={:?}; buffered_send_data={}; \
+ num_recv={}; num_send={}",
+ stream.id,
+ stream.state,
+ stream.is_closed(),
+ stream.pending_send.is_empty(),
+ stream.buffered_send_data,
+ self.num_recv_streams,
+ self.num_send_streams
+ );
+
+ if stream.is_closed() {
+ if !stream.is_pending_reset_expiration() {
+ stream.unlink();
+ if is_reset_counted {
+ self.dec_num_reset_streams();
+ }
+ }
+
+ if stream.is_counted {
+ tracing::trace!("dec_num_streams; stream={:?}", stream.id);
+ // Decrement the number of active streams.
+ self.dec_num_streams(&mut stream);
+ }
+ }
+
+ // Release the stream if it requires releasing
+ if stream.is_released() {
+ stream.remove();
+ }
+ }
+
+ /// Returns the maximum number of streams that can be initiated by this
+ /// peer.
+ pub(crate) fn max_send_streams(&self) -> usize {
+ self.max_send_streams
+ }
+
+ /// Returns the maximum number of streams that can be initiated by the
+ /// remote peer.
+ pub(crate) fn max_recv_streams(&self) -> usize {
+ self.max_recv_streams
+ }
+
+ fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
+ assert!(stream.is_counted);
+
+ if self.peer.is_local_init(stream.id) {
+ assert!(self.num_send_streams > 0);
+ self.num_send_streams -= 1;
+ stream.is_counted = false;
+ } else {
+ assert!(self.num_recv_streams > 0);
+ self.num_recv_streams -= 1;
+ stream.is_counted = false;
+ }
+ }
+
+ fn dec_num_reset_streams(&mut self) {
+ assert!(self.num_local_reset_streams > 0);
+ self.num_local_reset_streams -= 1;
+ }
+}
+
+impl Drop for Counts {
+ fn drop(&mut self) {
+ use std::thread;
+
+ if !thread::panicking() {
+ debug_assert!(!self.has_streams());
+ }
+ }
+}