From 26a029d407be480d791972afb5975cf62c9360a6 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 02:47:55 +0200 Subject: Adding upstream version 124.0.1. Signed-off-by: Daniel Baumann --- .../rust/neqo-http3/src/buffered_send_stream.rs | 118 +++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 third_party/rust/neqo-http3/src/buffered_send_stream.rs (limited to 'third_party/rust/neqo-http3/src/buffered_send_stream.rs') diff --git a/third_party/rust/neqo-http3/src/buffered_send_stream.rs b/third_party/rust/neqo-http3/src/buffered_send_stream.rs new file mode 100644 index 0000000000..4f6761fa80 --- /dev/null +++ b/third_party/rust/neqo-http3/src/buffered_send_stream.rs @@ -0,0 +1,118 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use neqo_common::qtrace; +use neqo_transport::{Connection, StreamId}; + +use crate::Res; + +#[derive(Debug, PartialEq, Eq)] +pub enum BufferedStream { + Uninitialized, + Initialized { stream_id: StreamId, buf: Vec }, +} + +impl Default for BufferedStream { + fn default() -> Self { + Self::Uninitialized + } +} + +impl ::std::fmt::Display for BufferedStream { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "BufferedStream {:?}", Option::::from(self)) + } +} + +impl BufferedStream { + #[must_use] + pub fn new(stream_id: StreamId) -> Self { + Self::Initialized { + stream_id, + buf: Vec::new(), + } + } + + /// # Panics + /// + /// If the `BufferedStream` is initialized more than one it will panic. + pub fn init(&mut self, stream_id: StreamId) { + debug_assert!(&Self::Uninitialized == self); + *self = Self::Initialized { + stream_id, + buf: Vec::new(), + }; + } + + /// # Panics + /// + /// This functon cannot be called before the `BufferedStream` is initialized. + pub fn buffer(&mut self, to_buf: &[u8]) { + if let Self::Initialized { buf, .. } = self { + buf.extend_from_slice(to_buf); + } else { + debug_assert!(false, "Do not buffer date before the stream is initialized"); + } + } + + /// # Errors + /// + /// Returns `neqo_transport` errors. + pub fn send_buffer(&mut self, conn: &mut Connection) -> Res { + let label = ::neqo_common::log_subject!(::log::Level::Debug, self); + let mut sent = 0; + if let Self::Initialized { stream_id, buf } = self { + if !buf.is_empty() { + qtrace!([label], "sending data."); + sent = conn.stream_send(*stream_id, &buf[..])?; + if sent == buf.len() { + buf.clear(); + } else { + let b = buf.split_off(sent); + *buf = b; + } + } + } + Ok(sent) + } + + /// # Errors + /// + /// Returns `neqo_transport` errors. + pub fn send_atomic(&mut self, conn: &mut Connection, to_send: &[u8]) -> Res { + // First try to send anything that is in the buffer. + self.send_buffer(conn)?; + if let Self::Initialized { stream_id, buf } = self { + if buf.is_empty() { + let res = conn.stream_send_atomic(*stream_id, to_send)?; + Ok(res) + } else { + Ok(false) + } + } else { + Ok(false) + } + } + + #[must_use] + pub fn has_buffered_data(&self) -> bool { + if let Self::Initialized { buf, .. } = self { + !buf.is_empty() + } else { + false + } + } +} + +impl From<&BufferedStream> for Option { + fn from(stream: &BufferedStream) -> Option { + if let BufferedStream::Initialized { stream_id, .. } = stream { + Some(*stream_id) + } else { + None + } + } +} -- cgit v1.2.3