summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-http3/src/server_events.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/neqo-http3/src/server_events.rs')
-rw-r--r--third_party/rust/neqo-http3/src/server_events.rs601
1 files changed, 601 insertions, 0 deletions
diff --git a/third_party/rust/neqo-http3/src/server_events.rs b/third_party/rust/neqo-http3/src/server_events.rs
new file mode 100644
index 0000000000..4be48363df
--- /dev/null
+++ b/third_party/rust/neqo-http3/src/server_events.rs
@@ -0,0 +1,601 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+#![allow(clippy::module_name_repetitions)]
+
+use std::{
+ cell::RefCell,
+ collections::VecDeque,
+ convert::TryFrom,
+ ops::{Deref, DerefMut},
+ rc::Rc,
+};
+
+use neqo_common::{qdebug, qinfo, Encoder, Header};
+use neqo_transport::{
+ server::ActiveConnectionRef, AppError, Connection, DatagramTracking, StreamId, StreamType,
+};
+
+use crate::{
+ connection::{Http3State, WebTransportSessionAcceptAction},
+ connection_server::Http3ServerHandler,
+ features::extended_connect::SessionCloseReason,
+ Http3StreamInfo, Http3StreamType, Priority, Res,
+};
+
+#[derive(Debug, Clone)]
+pub struct StreamHandler {
+ pub conn: ActiveConnectionRef,
+ pub handler: Rc<RefCell<Http3ServerHandler>>,
+ pub stream_info: Http3StreamInfo,
+}
+
+impl ::std::fmt::Display for StreamHandler {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
+ let conn: &Connection = &self.conn.borrow();
+ write!(f, "conn={} stream_info={:?}", conn, self.stream_info)
+ }
+}
+
+impl std::hash::Hash for StreamHandler {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.conn.hash(state);
+ state.write_u64(self.stream_info.stream_id().as_u64());
+ state.finish();
+ }
+}
+
+impl PartialEq for StreamHandler {
+ fn eq(&self, other: &Self) -> bool {
+ self.conn == other.conn && self.stream_info.stream_id() == other.stream_info.stream_id()
+ }
+}
+
+impl Eq for StreamHandler {}
+
+impl StreamHandler {
+ pub fn stream_id(&self) -> StreamId {
+ self.stream_info.stream_id()
+ }
+
+ /// Supply a response header to a request.
+ ///
+ /// # Errors
+ ///
+ /// It may return `InvalidStreamId` if a stream does not exist anymore.
+ pub fn send_headers(&mut self, headers: &[Header]) -> Res<()> {
+ self.handler.borrow_mut().send_headers(
+ self.stream_id(),
+ headers,
+ &mut self.conn.borrow_mut(),
+ )
+ }
+
+ /// Supply response data to a request.
+ ///
+ /// # Errors
+ ///
+ /// It may return `InvalidStreamId` if a stream does not exist anymore.
+ pub fn send_data(&mut self, buf: &[u8]) -> Res<usize> {
+ self.handler
+ .borrow_mut()
+ .send_data(self.stream_id(), buf, &mut self.conn.borrow_mut())
+ }
+
+ /// Close sending side.
+ ///
+ /// # Errors
+ ///
+ /// It may return `InvalidStreamId` if a stream does not exist anymore.
+ pub fn stream_close_send(&mut self) -> Res<()> {
+ self.handler
+ .borrow_mut()
+ .stream_close_send(self.stream_id(), &mut self.conn.borrow_mut())
+ }
+
+ /// Request a peer to stop sending a stream.
+ ///
+ /// # Errors
+ ///
+ /// It may return `InvalidStreamId` if a stream does not exist anymore.
+ pub fn stream_stop_sending(&mut self, app_error: AppError) -> Res<()> {
+ qdebug!(
+ [self],
+ "stop sending stream_id:{} error:{}.",
+ self.stream_info.stream_id(),
+ app_error
+ );
+ self.handler.borrow_mut().stream_stop_sending(
+ self.stream_info.stream_id(),
+ app_error,
+ &mut self.conn.borrow_mut(),
+ )
+ }
+
+ /// Reset sending side of a stream.
+ ///
+ /// # Errors
+ ///
+ /// It may return `InvalidStreamId` if a stream does not exist anymore.
+ pub fn stream_reset_send(&mut self, app_error: AppError) -> Res<()> {
+ qdebug!(
+ [self],
+ "reset send stream_id:{} error:{}.",
+ self.stream_info.stream_id(),
+ app_error
+ );
+ self.handler.borrow_mut().stream_reset_send(
+ self.stream_info.stream_id(),
+ app_error,
+ &mut self.conn.borrow_mut(),
+ )
+ }
+
+ /// Reset a stream/request.
+ ///
+ /// # Errors
+ ///
+ /// It may return `InvalidStreamId` if a stream does not exist anymore
+ pub fn cancel_fetch(&mut self, app_error: AppError) -> Res<()> {
+ qdebug!([self], "reset error:{}.", app_error);
+ self.handler.borrow_mut().cancel_fetch(
+ self.stream_info.stream_id(),
+ app_error,
+ &mut self.conn.borrow_mut(),
+ )
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct Http3OrWebTransportStream {
+ stream_handler: StreamHandler,
+}
+
+impl ::std::fmt::Display for Http3OrWebTransportStream {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
+ write!(f, "Stream server {:?}", self.stream_handler)
+ }
+}
+
+impl Http3OrWebTransportStream {
+ pub(crate) fn new(
+ conn: ActiveConnectionRef,
+ handler: Rc<RefCell<Http3ServerHandler>>,
+ stream_info: Http3StreamInfo,
+ ) -> Self {
+ Self {
+ stream_handler: StreamHandler {
+ conn,
+ handler,
+ stream_info,
+ },
+ }
+ }
+
+ /// Supply a response header to a request.
+ ///
+ /// # Errors
+ ///
+ /// It may return `InvalidStreamId` if a stream does not exist anymore.
+ pub fn send_headers(&mut self, headers: &[Header]) -> Res<()> {
+ self.stream_handler.send_headers(headers)
+ }
+
+ /// Supply response data to a request.
+ ///
+ /// # Errors
+ ///
+ /// It may return `InvalidStreamId` if a stream does not exist anymore.
+ pub fn send_data(&mut self, data: &[u8]) -> Res<usize> {
+ qinfo!([self], "Set new response.");
+ self.stream_handler.send_data(data)
+ }
+
+ /// Close sending side.
+ ///
+ /// # Errors
+ ///
+ /// It may return `InvalidStreamId` if a stream does not exist anymore.
+ pub fn stream_close_send(&mut self) -> Res<()> {
+ qinfo!([self], "Set new response.");
+ self.stream_handler.stream_close_send()
+ }
+}
+
+impl Deref for Http3OrWebTransportStream {
+ type Target = StreamHandler;
+ #[must_use]
+ fn deref(&self) -> &Self::Target {
+ &self.stream_handler
+ }
+}
+
+impl DerefMut for Http3OrWebTransportStream {
+ fn deref_mut(&mut self) -> &mut StreamHandler {
+ &mut self.stream_handler
+ }
+}
+
+impl std::hash::Hash for Http3OrWebTransportStream {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.stream_handler.hash(state);
+ state.finish();
+ }
+}
+
+impl PartialEq for Http3OrWebTransportStream {
+ fn eq(&self, other: &Self) -> bool {
+ self.stream_handler == other.stream_handler
+ }
+}
+
+impl Eq for Http3OrWebTransportStream {}
+
+#[derive(Debug, Clone)]
+pub struct WebTransportRequest {
+ stream_handler: StreamHandler,
+}
+
+impl ::std::fmt::Display for WebTransportRequest {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
+ write!(f, "WebTransport session {}", self.stream_handler)
+ }
+}
+
+impl WebTransportRequest {
+ pub(crate) fn new(
+ conn: ActiveConnectionRef,
+ handler: Rc<RefCell<Http3ServerHandler>>,
+ stream_id: StreamId,
+ ) -> Self {
+ Self {
+ stream_handler: StreamHandler {
+ conn,
+ handler,
+ stream_info: Http3StreamInfo::new(stream_id, Http3StreamType::Http),
+ },
+ }
+ }
+
+ #[must_use]
+ pub fn state(&self) -> Http3State {
+ self.stream_handler.handler.borrow().state()
+ }
+
+ /// Respond to a `WebTransport` session request.
+ ///
+ /// # Errors
+ ///
+ /// It may return `InvalidStreamId` if a stream does not exist anymore.
+ pub fn response(&mut self, accept: &WebTransportSessionAcceptAction) -> Res<()> {
+ qinfo!([self], "Set a response for a WebTransport session.");
+ self.stream_handler
+ .handler
+ .borrow_mut()
+ .webtransport_session_accept(
+ &mut self.stream_handler.conn.borrow_mut(),
+ self.stream_handler.stream_info.stream_id(),
+ accept,
+ )
+ }
+
+ /// # Errors
+ ///
+ /// It may return `InvalidStreamId` if a stream does not exist anymore.
+ /// Also return an error if the stream was closed on the transport layer,
+ /// but that information is not yet consumed on the http/3 layer.
+ pub fn close_session(&mut self, error: u32, message: &str) -> Res<()> {
+ self.stream_handler
+ .handler
+ .borrow_mut()
+ .webtransport_close_session(
+ &mut self.stream_handler.conn.borrow_mut(),
+ self.stream_handler.stream_info.stream_id(),
+ error,
+ message,
+ )
+ }
+
+ #[must_use]
+ pub fn stream_id(&self) -> StreamId {
+ self.stream_handler.stream_id()
+ }
+
+ /// Close sending side.
+ ///
+ /// # Errors
+ ///
+ /// It may return `InvalidStreamId` if a stream does not exist anymore.
+ pub fn create_stream(&mut self, stream_type: StreamType) -> Res<Http3OrWebTransportStream> {
+ let session_id = self.stream_handler.stream_id();
+ let id = self
+ .stream_handler
+ .handler
+ .borrow_mut()
+ .webtransport_create_stream(
+ &mut self.stream_handler.conn.borrow_mut(),
+ session_id,
+ stream_type,
+ )?;
+
+ Ok(Http3OrWebTransportStream::new(
+ self.stream_handler.conn.clone(),
+ self.stream_handler.handler.clone(),
+ Http3StreamInfo::new(id, Http3StreamType::WebTransport(session_id)),
+ ))
+ }
+
+ /// Send `WebTransport` datagram.
+ ///
+ /// # Errors
+ ///
+ /// It may return `InvalidStreamId` if a stream does not exist anymore.
+ /// The function returns `TooMuchData` if the supply buffer is bigger than
+ /// the allowed remote datagram size.
+ pub fn send_datagram(&mut self, buf: &[u8], id: impl Into<DatagramTracking>) -> Res<()> {
+ let session_id = self.stream_handler.stream_id();
+ self.stream_handler
+ .handler
+ .borrow_mut()
+ .webtransport_send_datagram(
+ &mut self.stream_handler.conn.borrow_mut(),
+ session_id,
+ buf,
+ id,
+ )
+ }
+
+ #[must_use]
+ pub fn remote_datagram_size(&self) -> u64 {
+ self.stream_handler.conn.borrow().remote_datagram_size()
+ }
+
+ /// Returns the current max size of a datagram that can fit into a packet.
+ /// The value will change over time depending on the encoded size of the
+ /// packet number, ack frames, etc.
+ ///
+ /// # Errors
+ ///
+ /// The function returns `NotAvailable` if datagrams are not enabled.
+ ///
+ /// # Panics
+ ///
+ /// This cannot panic. The max varint length is 8.
+ pub fn max_datagram_size(&self) -> Res<u64> {
+ let max_size = self.stream_handler.conn.borrow().max_datagram_size()?;
+ Ok(max_size
+ - u64::try_from(Encoder::varint_len(
+ self.stream_handler.stream_id().as_u64(),
+ ))
+ .unwrap())
+ }
+}
+
+impl Deref for WebTransportRequest {
+ type Target = StreamHandler;
+ #[must_use]
+ fn deref(&self) -> &Self::Target {
+ &self.stream_handler
+ }
+}
+
+impl DerefMut for WebTransportRequest {
+ fn deref_mut(&mut self) -> &mut StreamHandler {
+ &mut self.stream_handler
+ }
+}
+
+impl std::hash::Hash for WebTransportRequest {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.stream_handler.hash(state);
+ state.finish();
+ }
+}
+
+impl PartialEq for WebTransportRequest {
+ fn eq(&self, other: &Self) -> bool {
+ self.stream_handler == other.stream_handler
+ }
+}
+
+impl Eq for WebTransportRequest {}
+
+#[derive(Debug, Clone)]
+pub enum WebTransportServerEvent {
+ NewSession {
+ session: WebTransportRequest,
+ headers: Vec<Header>,
+ },
+ SessionClosed {
+ session: WebTransportRequest,
+ reason: SessionCloseReason,
+ headers: Option<Vec<Header>>,
+ },
+ NewStream(Http3OrWebTransportStream),
+ Datagram {
+ session: WebTransportRequest,
+ datagram: Vec<u8>,
+ },
+}
+
+#[derive(Debug, Clone)]
+pub enum Http3ServerEvent {
+ /// Headers are ready.
+ Headers {
+ stream: Http3OrWebTransportStream,
+ headers: Vec<Header>,
+ fin: bool,
+ },
+ /// Request data is ready.
+ Data {
+ stream: Http3OrWebTransportStream,
+ data: Vec<u8>,
+ fin: bool,
+ },
+ DataWritable {
+ stream: Http3OrWebTransportStream,
+ },
+ StreamReset {
+ stream: Http3OrWebTransportStream,
+ error: AppError,
+ },
+ StreamStopSending {
+ stream: Http3OrWebTransportStream,
+ error: AppError,
+ },
+ /// When individual connection change state. It is only used for tests.
+ StateChange {
+ conn: ActiveConnectionRef,
+ state: Http3State,
+ },
+ PriorityUpdate {
+ stream_id: StreamId,
+ priority: Priority,
+ },
+ WebTransport(WebTransportServerEvent),
+}
+
+#[derive(Debug, Default, Clone)]
+pub struct Http3ServerEvents {
+ events: Rc<RefCell<VecDeque<Http3ServerEvent>>>,
+}
+
+impl Http3ServerEvents {
+ fn insert(&self, event: Http3ServerEvent) {
+ self.events.borrow_mut().push_back(event);
+ }
+
+ /// Take all events
+ pub fn events(&self) -> impl Iterator<Item = Http3ServerEvent> {
+ self.events.replace(VecDeque::new()).into_iter()
+ }
+
+ /// Whether there is request pending.
+ pub fn has_events(&self) -> bool {
+ !self.events.borrow().is_empty()
+ }
+
+ /// Take the next event if present.
+ pub fn next_event(&self) -> Option<Http3ServerEvent> {
+ self.events.borrow_mut().pop_front()
+ }
+
+ /// Insert a `Headers` event.
+ pub(crate) fn headers(
+ &self,
+ request: Http3OrWebTransportStream,
+ headers: Vec<Header>,
+ fin: bool,
+ ) {
+ self.insert(Http3ServerEvent::Headers {
+ stream: request,
+ headers,
+ fin,
+ });
+ }
+
+ /// Insert a `StateChange` event.
+ pub(crate) fn connection_state_change(&self, conn: ActiveConnectionRef, state: Http3State) {
+ self.insert(Http3ServerEvent::StateChange { conn, state });
+ }
+
+ /// Insert a `Data` event.
+ pub(crate) fn data(
+ &self,
+ conn: ActiveConnectionRef,
+ handler: Rc<RefCell<Http3ServerHandler>>,
+ stream_info: Http3StreamInfo,
+ data: Vec<u8>,
+ fin: bool,
+ ) {
+ self.insert(Http3ServerEvent::Data {
+ stream: Http3OrWebTransportStream::new(conn, handler, stream_info),
+ data,
+ fin,
+ });
+ }
+
+ pub(crate) fn data_writable(
+ &self,
+ conn: ActiveConnectionRef,
+ handler: Rc<RefCell<Http3ServerHandler>>,
+ stream_info: Http3StreamInfo,
+ ) {
+ self.insert(Http3ServerEvent::DataWritable {
+ stream: Http3OrWebTransportStream::new(conn, handler, stream_info),
+ });
+ }
+
+ pub(crate) fn stream_reset(
+ &self,
+ conn: ActiveConnectionRef,
+ handler: Rc<RefCell<Http3ServerHandler>>,
+ stream_info: Http3StreamInfo,
+ error: AppError,
+ ) {
+ self.insert(Http3ServerEvent::StreamReset {
+ stream: Http3OrWebTransportStream::new(conn, handler, stream_info),
+ error,
+ });
+ }
+
+ pub(crate) fn stream_stop_sending(
+ &self,
+ conn: ActiveConnectionRef,
+ handler: Rc<RefCell<Http3ServerHandler>>,
+ stream_info: Http3StreamInfo,
+ error: AppError,
+ ) {
+ self.insert(Http3ServerEvent::StreamStopSending {
+ stream: Http3OrWebTransportStream::new(conn, handler, stream_info),
+ error,
+ });
+ }
+
+ pub(crate) fn priority_update(&self, stream_id: StreamId, priority: Priority) {
+ self.insert(Http3ServerEvent::PriorityUpdate {
+ stream_id,
+ priority,
+ });
+ }
+
+ pub(crate) fn webtransport_new_session(
+ &self,
+ session: WebTransportRequest,
+ headers: Vec<Header>,
+ ) {
+ self.insert(Http3ServerEvent::WebTransport(
+ WebTransportServerEvent::NewSession { session, headers },
+ ));
+ }
+
+ pub(crate) fn webtransport_session_closed(
+ &self,
+ session: WebTransportRequest,
+ reason: SessionCloseReason,
+ headers: Option<Vec<Header>>,
+ ) {
+ self.insert(Http3ServerEvent::WebTransport(
+ WebTransportServerEvent::SessionClosed {
+ session,
+ reason,
+ headers,
+ },
+ ));
+ }
+
+ pub(crate) fn webtransport_new_stream(&self, stream: Http3OrWebTransportStream) {
+ self.insert(Http3ServerEvent::WebTransport(
+ WebTransportServerEvent::NewStream(stream),
+ ));
+ }
+
+ pub(crate) fn webtransport_datagram(&self, session: WebTransportRequest, datagram: Vec<u8>) {
+ self.insert(Http3ServerEvent::WebTransport(
+ WebTransportServerEvent::Datagram { session, datagram },
+ ));
+ }
+}