summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-http3/src/connection_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/neqo-http3/src/connection_server.rs')
-rw-r--r--third_party/rust/neqo-http3/src/connection_server.rs420
1 files changed, 420 insertions, 0 deletions
diff --git a/third_party/rust/neqo-http3/src/connection_server.rs b/third_party/rust/neqo-http3/src/connection_server.rs
new file mode 100644
index 0000000000..097209a226
--- /dev/null
+++ b/third_party/rust/neqo-http3/src/connection_server.rs
@@ -0,0 +1,420 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use std::{rc::Rc, time::Instant};
+
+use neqo_common::{event::Provider, qdebug, qinfo, qtrace, Header, MessageType, Role};
+use neqo_transport::{
+ AppError, Connection, ConnectionEvent, DatagramTracking, StreamId, StreamType,
+};
+
+use crate::{
+ connection::{Http3Connection, Http3State, WebTransportSessionAcceptAction},
+ frames::HFrame,
+ recv_message::{RecvMessage, RecvMessageInfo},
+ send_message::SendMessage,
+ server_connection_events::{Http3ServerConnEvent, Http3ServerConnEvents},
+ Error, Http3Parameters, Http3StreamType, NewStreamType, Priority, PriorityHandler,
+ ReceiveOutput, Res,
+};
+
+#[derive(Debug)]
+pub struct Http3ServerHandler {
+ base_handler: Http3Connection,
+ events: Http3ServerConnEvents,
+ needs_processing: bool,
+}
+
+impl ::std::fmt::Display for Http3ServerHandler {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
+ write!(f, "Http3 server connection")
+ }
+}
+
+impl Http3ServerHandler {
+ pub(crate) fn new(http3_parameters: Http3Parameters) -> Self {
+ Self {
+ base_handler: Http3Connection::new(http3_parameters, Role::Server),
+ events: Http3ServerConnEvents::default(),
+ needs_processing: false,
+ }
+ }
+
+ #[must_use]
+ pub fn state(&self) -> Http3State {
+ self.base_handler.state()
+ }
+
+ /// Supply a response for a request.
+ ///
+ /// # Errors
+ ///
+ /// `InvalidStreamId` if the stream does not exist,
+ /// `AlreadyClosed` if the stream has already been closed.
+ /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if
+ /// `process_output` has not been called when needed, and HTTP3 layer has not picked up the
+ /// info that the stream has been closed.) `InvalidInput` if an empty buffer has been
+ /// supplied.
+ pub(crate) fn send_data(
+ &mut self,
+ stream_id: StreamId,
+ data: &[u8],
+ conn: &mut Connection,
+ ) -> Res<usize> {
+ self.base_handler.stream_has_pending_data(stream_id);
+ self.needs_processing = true;
+ self.base_handler
+ .send_streams
+ .get_mut(&stream_id)
+ .ok_or(Error::InvalidStreamId)?
+ .send_data(conn, data)
+ }
+
+ /// Supply response heeaders for a request.
+ pub(crate) fn send_headers(
+ &mut self,
+ stream_id: StreamId,
+ headers: &[Header],
+ conn: &mut Connection,
+ ) -> Res<()> {
+ self.base_handler
+ .send_streams
+ .get_mut(&stream_id)
+ .ok_or(Error::InvalidStreamId)?
+ .http_stream()
+ .ok_or(Error::InvalidStreamId)?
+ .send_headers(headers, conn)?;
+ self.base_handler.stream_has_pending_data(stream_id);
+ self.needs_processing = true;
+ Ok(())
+ }
+
+ /// This is called when application is done sending a request.
+ ///
+ /// # Errors
+ ///
+ /// An error will be returned if stream does not exist.
+ pub fn stream_close_send(&mut self, stream_id: StreamId, conn: &mut Connection) -> Res<()> {
+ qinfo!([self], "Close sending side stream={}.", stream_id);
+ self.base_handler.stream_close_send(conn, stream_id)?;
+ self.base_handler.stream_has_pending_data(stream_id);
+ self.needs_processing = true;
+ Ok(())
+ }
+
+ /// An application may reset a stream(request).
+ /// Both sides, sending and receiving side, will be closed.
+ ///
+ /// # Errors
+ ///
+ /// An error will be return if a stream does not exist.
+ pub fn cancel_fetch(
+ &mut self,
+ stream_id: StreamId,
+ error: AppError,
+ conn: &mut Connection,
+ ) -> Res<()> {
+ qinfo!([self], "cancel_fetch {} error={}.", stream_id, error);
+ self.needs_processing = true;
+ self.base_handler.cancel_fetch(stream_id, error, conn)
+ }
+
+ pub fn stream_stop_sending(
+ &mut self,
+ stream_id: StreamId,
+ error: AppError,
+ conn: &mut Connection,
+ ) -> Res<()> {
+ qinfo!([self], "stream_stop_sending {} error={}.", stream_id, error);
+ self.needs_processing = true;
+ self.base_handler
+ .stream_stop_sending(conn, stream_id, error)
+ }
+
+ pub fn stream_reset_send(
+ &mut self,
+ stream_id: StreamId,
+ error: AppError,
+ conn: &mut Connection,
+ ) -> Res<()> {
+ qinfo!([self], "stream_reset_send {} error={}.", stream_id, error);
+ self.needs_processing = true;
+ self.base_handler.stream_reset_send(conn, stream_id, error)
+ }
+
+ /// Accept a `WebTransport` Session request
+ pub(crate) fn webtransport_session_accept(
+ &mut self,
+ conn: &mut Connection,
+ stream_id: StreamId,
+ accept: &WebTransportSessionAcceptAction,
+ ) -> Res<()> {
+ self.needs_processing = true;
+ self.base_handler.webtransport_session_accept(
+ conn,
+ stream_id,
+ Box::new(self.events.clone()),
+ accept,
+ )
+ }
+
+ /// Close `WebTransport` cleanly
+ ///
+ /// # Errors
+ ///
+ /// `InvalidStreamId` if the stream does not exist,
+ /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if
+ /// `process_output` has not been called when needed, and HTTP3 layer has not picked up the
+ /// info that the stream has been closed.) `InvalidInput` if an empty buffer has been
+ /// supplied.
+ pub fn webtransport_close_session(
+ &mut self,
+ conn: &mut Connection,
+ session_id: StreamId,
+ error: u32,
+ message: &str,
+ ) -> Res<()> {
+ self.needs_processing = true;
+ self.base_handler
+ .webtransport_close_session(conn, session_id, error, message)
+ }
+
+ pub fn webtransport_create_stream(
+ &mut self,
+ conn: &mut Connection,
+ session_id: StreamId,
+ stream_type: StreamType,
+ ) -> Res<StreamId> {
+ self.needs_processing = true;
+ self.base_handler.webtransport_create_stream_local(
+ conn,
+ session_id,
+ stream_type,
+ Box::new(self.events.clone()),
+ Box::new(self.events.clone()),
+ )
+ }
+
+ pub fn webtransport_send_datagram(
+ &mut self,
+ conn: &mut Connection,
+ session_id: StreamId,
+ buf: &[u8],
+ id: impl Into<DatagramTracking>,
+ ) -> Res<()> {
+ self.needs_processing = true;
+ self.base_handler
+ .webtransport_send_datagram(session_id, conn, buf, id)
+ }
+
+ /// Process HTTTP3 layer.
+ pub fn process_http3(&mut self, conn: &mut Connection, now: Instant) {
+ qtrace!([self], "Process http3 internal.");
+ if matches!(self.base_handler.state(), Http3State::Closed(..)) {
+ return;
+ }
+
+ let res = self.check_connection_events(conn, now);
+ if !self.check_result(conn, now, &res) && self.base_handler.state().active() {
+ let res = self.base_handler.process_sending(conn);
+ self.check_result(conn, now, &res);
+ }
+ }
+
+ /// Take the next available event.
+ pub(crate) fn next_event(&mut self) -> Option<Http3ServerConnEvent> {
+ self.events.next_event()
+ }
+
+ /// Whether this connection has events to process or data to send.
+ pub(crate) fn should_be_processed(&mut self) -> bool {
+ if self.needs_processing {
+ self.needs_processing = false;
+ return true;
+ }
+ self.base_handler.has_data_to_send() || self.events.has_events()
+ }
+
+ // This function takes the provided result and check for an error.
+ // An error results in closing the connection.
+ fn check_result<ERR>(&mut self, conn: &mut Connection, now: Instant, res: &Res<ERR>) -> bool {
+ match &res {
+ Err(e) => {
+ self.close(conn, now, e);
+ true
+ }
+ _ => false,
+ }
+ }
+
+ fn close(&mut self, conn: &mut Connection, now: Instant, err: &Error) {
+ qinfo!([self], "Connection error: {}.", err);
+ conn.close(now, err.code(), &format!("{err}"));
+ self.base_handler.close(err.code());
+ self.events
+ .connection_state_change(self.base_handler.state());
+ }
+
+ // If this return an error the connection must be closed.
+ fn check_connection_events(&mut self, conn: &mut Connection, now: Instant) -> Res<()> {
+ qtrace!([self], "Check connection events.");
+ while let Some(e) = conn.next_event() {
+ qdebug!([self], "check_connection_events - event {e:?}.");
+ match e {
+ ConnectionEvent::NewStream { stream_id } => {
+ self.base_handler.add_new_stream(stream_id);
+ }
+ ConnectionEvent::RecvStreamReadable { stream_id } => {
+ self.handle_stream_readable(conn, stream_id)?;
+ }
+ ConnectionEvent::RecvStreamReset {
+ stream_id,
+ app_error,
+ } => {
+ self.base_handler
+ .handle_stream_reset(stream_id, app_error, conn)?;
+ }
+ ConnectionEvent::SendStreamStopSending {
+ stream_id,
+ app_error,
+ } => self
+ .base_handler
+ .handle_stream_stop_sending(stream_id, app_error, conn)?,
+ ConnectionEvent::StateChange(state) => {
+ if self.base_handler.handle_state_change(conn, &state)? {
+ if self.base_handler.state() == Http3State::Connected {
+ let settings = self.base_handler.save_settings();
+ conn.send_ticket(now, &settings)?;
+ }
+ self.events
+ .connection_state_change(self.base_handler.state());
+ }
+ }
+ ConnectionEvent::SendStreamWritable { stream_id } => {
+ if let Some(s) = self.base_handler.send_streams.get_mut(&stream_id) {
+ s.stream_writable();
+ }
+ }
+ ConnectionEvent::Datagram(dgram) => self.base_handler.handle_datagram(&dgram),
+ ConnectionEvent::AuthenticationNeeded
+ | ConnectionEvent::EchFallbackAuthenticationNeeded { .. }
+ | ConnectionEvent::ZeroRttRejected
+ | ConnectionEvent::ResumptionToken(..) => return Err(Error::HttpInternal(4)),
+ ConnectionEvent::SendStreamComplete { .. }
+ | ConnectionEvent::SendStreamCreatable { .. }
+ | ConnectionEvent::OutgoingDatagramOutcome { .. }
+ | ConnectionEvent::IncomingDatagramDropped => {}
+ }
+ }
+ Ok(())
+ }
+
+ fn handle_stream_readable(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<()> {
+ match self.base_handler.handle_stream_readable(conn, stream_id)? {
+ ReceiveOutput::NewStream(NewStreamType::Push(_)) => Err(Error::HttpStreamCreation),
+ ReceiveOutput::NewStream(NewStreamType::Http) => {
+ self.base_handler.add_streams(
+ stream_id,
+ Box::new(SendMessage::new(
+ MessageType::Response,
+ Http3StreamType::Http,
+ stream_id,
+ self.base_handler.qpack_encoder.clone(),
+ Box::new(self.events.clone()),
+ )),
+ Box::new(RecvMessage::new(
+ &RecvMessageInfo {
+ message_type: MessageType::Request,
+ stream_type: Http3StreamType::Http,
+ stream_id,
+ header_frame_type_read: true,
+ },
+ Rc::clone(&self.base_handler.qpack_decoder),
+ Box::new(self.events.clone()),
+ None,
+ PriorityHandler::new(false, Priority::default()),
+ )),
+ );
+ let res = self.base_handler.handle_stream_readable(conn, stream_id)?;
+ assert_eq!(ReceiveOutput::NoOutput, res);
+ Ok(())
+ }
+ ReceiveOutput::NewStream(NewStreamType::WebTransportStream(session_id)) => {
+ self.base_handler.webtransport_create_stream_remote(
+ StreamId::from(session_id),
+ stream_id,
+ Box::new(self.events.clone()),
+ Box::new(self.events.clone()),
+ )?;
+ let res = self.base_handler.handle_stream_readable(conn, stream_id)?;
+ assert_eq!(ReceiveOutput::NoOutput, res);
+ Ok(())
+ }
+ ReceiveOutput::ControlFrames(control_frames) => {
+ for f in control_frames {
+ match f {
+ HFrame::MaxPushId { .. } => {
+ // TODO implement push
+ Ok(())
+ }
+ HFrame::Goaway { .. } | HFrame::CancelPush { .. } => {
+ Err(Error::HttpFrameUnexpected)
+ }
+ HFrame::PriorityUpdatePush { element_id, priority } => {
+ // TODO: check if the element_id references a promised push stream or
+ // is greater than the maximum Push ID.
+ self.events.priority_update(StreamId::from(element_id), priority);
+ Ok(())
+ }
+ HFrame::PriorityUpdateRequest { element_id, priority } => {
+ // check that the element_id references a request stream
+ // within the client-sided bidirectional stream limit
+ let element_stream_id = StreamId::new(element_id);
+ if !element_stream_id.is_bidi()
+ || !element_stream_id.is_client_initiated()
+ || !conn.is_stream_id_allowed(element_stream_id)
+ {
+ return Err(Error::HttpId)
+ }
+
+ self.events.priority_update(element_stream_id, priority);
+ Ok(())
+ }
+ _ => unreachable!(
+ "we should only put MaxPushId, Goaway and PriorityUpdates into control_frames."
+ ),
+ }?;
+ }
+ Ok(())
+ }
+ _ => Ok(()),
+ }
+ }
+
+ /// Response data are read directly into a buffer supplied as a parameter of this function to
+ /// avoid copying data.
+ ///
+ /// # Errors
+ ///
+ /// It returns an error if a stream does not exist or an error happen while reading a stream,
+ /// e.g. early close, protocol error, etc.
+ pub fn read_data(
+ &mut self,
+ conn: &mut Connection,
+ now: Instant,
+ stream_id: StreamId,
+ buf: &mut [u8],
+ ) -> Res<(usize, bool)> {
+ qinfo!([self], "read_data from stream {}.", stream_id);
+ let res = self.base_handler.read_data(conn, stream_id, buf);
+ if let Err(e) = &res {
+ if e.connection_error() {
+ self.close(conn, now, e);
+ }
+ }
+ res
+ }
+}