From 26a029d407be480d791972afb5975cf62c9360a6 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 02:47:55 +0200 Subject: Adding upstream version 124.0.1. Signed-off-by: Daniel Baumann --- .../src/features/extended_connect/mod.rs | 118 ++ .../src/features/extended_connect/tests/mod.rs | 7 + .../tests/webtransport/datagrams.rs | 146 +++ .../extended_connect/tests/webtransport/mod.rs | 661 ++++++++++++ .../tests/webtransport/negotiation.rs | 280 +++++ .../tests/webtransport/sessions.rs | 456 ++++++++ .../extended_connect/tests/webtransport/streams.rs | 1131 ++++++++++++++++++++ .../extended_connect/webtransport_session.rs | 555 ++++++++++ .../extended_connect/webtransport_streams.rs | 271 +++++ third_party/rust/neqo-http3/src/features/mod.rs | 92 ++ 10 files changed, 3717 insertions(+) create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/mod.rs create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/tests/mod.rs create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/datagrams.rs create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/mod.rs create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/sessions.rs create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/streams.rs create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/webtransport_session.rs create mode 100644 third_party/rust/neqo-http3/src/features/extended_connect/webtransport_streams.rs create mode 100644 third_party/rust/neqo-http3/src/features/mod.rs (limited to 'third_party/rust/neqo-http3/src/features') diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/mod.rs b/third_party/rust/neqo-http3/src/features/extended_connect/mod.rs new file mode 100644 index 0000000000..77655833f7 --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/mod.rs @@ -0,0 +1,118 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![allow(clippy::module_name_repetitions)] + +pub(crate) mod webtransport_session; +pub(crate) mod webtransport_streams; + +use std::fmt::Debug; + +use neqo_common::Header; +use neqo_transport::{AppError, StreamId}; +pub(crate) use webtransport_session::WebTransportSession; + +use crate::{ + client_events::Http3ClientEvents, + features::NegotiationState, + settings::{HSettingType, HSettings}, + CloseType, Http3StreamInfo, Http3StreamType, +}; + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum SessionCloseReason { + Error(AppError), + Status(u16), + Clean { error: u32, message: String }, +} + +impl From for SessionCloseReason { + fn from(close_type: CloseType) -> SessionCloseReason { + match close_type { + CloseType::ResetApp(e) | CloseType::ResetRemote(e) | CloseType::LocalError(e) => { + SessionCloseReason::Error(e) + } + CloseType::Done => SessionCloseReason::Clean { + error: 0, + message: String::new(), + }, + } + } +} + +pub(crate) trait ExtendedConnectEvents: Debug { + fn session_start( + &self, + connect_type: ExtendedConnectType, + stream_id: StreamId, + status: u16, + headers: Vec
, + ); + fn session_end( + &self, + connect_type: ExtendedConnectType, + stream_id: StreamId, + reason: SessionCloseReason, + headers: Option>, + ); + fn extended_connect_new_stream(&self, stream_info: Http3StreamInfo); + fn new_datagram(&self, session_id: StreamId, datagram: Vec); +} + +#[derive(Debug, PartialEq, Copy, Clone, Eq)] +pub(crate) enum ExtendedConnectType { + WebTransport, +} + +impl ExtendedConnectType { + #[must_use] + #[allow(clippy::unused_self)] // This will change when we have more features using ExtendedConnectType. + pub fn string(&self) -> &str { + "webtransport" + } + + #[allow(clippy::unused_self)] // This will change when we have more features using ExtendedConnectType. + #[must_use] + pub fn get_stream_type(self, session_id: StreamId) -> Http3StreamType { + Http3StreamType::WebTransport(session_id) + } +} + +impl From for HSettingType { + fn from(_type: ExtendedConnectType) -> Self { + // This will change when we have more features using ExtendedConnectType. + HSettingType::EnableWebTransport + } +} + +#[derive(Debug)] +pub(crate) struct ExtendedConnectFeature { + feature_negotiation: NegotiationState, +} + +impl ExtendedConnectFeature { + #[must_use] + pub fn new(connect_type: ExtendedConnectType, enable: bool) -> Self { + Self { + feature_negotiation: NegotiationState::new(enable, HSettingType::from(connect_type)), + } + } + + pub fn set_listener(&mut self, new_listener: Http3ClientEvents) { + self.feature_negotiation.set_listener(new_listener); + } + + pub fn handle_settings(&mut self, settings: &HSettings) { + self.feature_negotiation.handle_settings(settings); + } + + #[must_use] + pub fn enabled(&self) -> bool { + self.feature_negotiation.enabled() + } +} +#[cfg(test)] +mod tests; diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/mod.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/mod.rs new file mode 100644 index 0000000000..a21f63dbf8 --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/mod.rs @@ -0,0 +1,7 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +mod webtransport; diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/datagrams.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/datagrams.rs new file mode 100644 index 0000000000..1c58596dd3 --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/datagrams.rs @@ -0,0 +1,146 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::convert::TryFrom; + +use neqo_common::Encoder; +use neqo_transport::Error as TransportError; + +use crate::{ + features::extended_connect::tests::webtransport::{ + wt_default_parameters, WtTest, DATAGRAM_SIZE, + }, + Error, Http3Parameters, WebTransportRequest, +}; + +const DGRAM: &[u8] = &[0, 100]; + +#[test] +fn no_datagrams() { + let mut wt = WtTest::new_with_params( + Http3Parameters::default().webtransport(true), + Http3Parameters::default().webtransport(true), + ); + let mut wt_session = wt.create_wt_session(); + + assert_eq!( + wt_session.max_datagram_size(), + Err(Error::TransportError(TransportError::NotAvailable)) + ); + assert_eq!( + wt.max_datagram_size(wt_session.stream_id()), + Err(Error::TransportError(TransportError::NotAvailable)) + ); + + assert_eq!( + wt_session.send_datagram(DGRAM, None), + Err(Error::TransportError(TransportError::TooMuchData)) + ); + assert_eq!( + wt.send_datagram(wt_session.stream_id(), DGRAM), + Err(Error::TransportError(TransportError::TooMuchData)) + ); + + wt.exchange_packets(); + wt.check_no_datagram_received_client(); + wt.check_no_datagram_received_server(); +} + +fn do_datagram_test(wt: &mut WtTest, wt_session: &mut WebTransportRequest) { + assert_eq!( + wt_session.max_datagram_size(), + Ok(DATAGRAM_SIZE + - u64::try_from(Encoder::varint_len(wt_session.stream_id().as_u64())).unwrap()) + ); + assert_eq!( + wt.max_datagram_size(wt_session.stream_id()), + Ok(DATAGRAM_SIZE + - u64::try_from(Encoder::varint_len(wt_session.stream_id().as_u64())).unwrap()) + ); + + assert_eq!(wt_session.send_datagram(DGRAM, None), Ok(())); + assert_eq!(wt.send_datagram(wt_session.stream_id(), DGRAM), Ok(())); + + wt.exchange_packets(); + wt.check_datagram_received_client(wt_session.stream_id(), DGRAM); + wt.check_datagram_received_server(wt_session, DGRAM); +} + +#[test] +fn datagrams() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + do_datagram_test(&mut wt, &mut wt_session); +} + +#[test] +fn datagrams_server_only() { + let mut wt = WtTest::new_with_params( + Http3Parameters::default().webtransport(true), + wt_default_parameters(), + ); + let mut wt_session = wt.create_wt_session(); + + assert_eq!( + wt_session.max_datagram_size(), + Err(Error::TransportError(TransportError::NotAvailable)) + ); + assert_eq!( + wt.max_datagram_size(wt_session.stream_id()), + Ok(DATAGRAM_SIZE + - u64::try_from(Encoder::varint_len(wt_session.stream_id().as_u64())).unwrap()) + ); + + assert_eq!( + wt_session.send_datagram(DGRAM, None), + Err(Error::TransportError(TransportError::TooMuchData)) + ); + assert_eq!(wt.send_datagram(wt_session.stream_id(), DGRAM), Ok(())); + + wt.exchange_packets(); + wt.check_datagram_received_server(&wt_session, DGRAM); + wt.check_no_datagram_received_client(); +} + +#[test] +fn datagrams_client_only() { + let mut wt = WtTest::new_with_params( + wt_default_parameters(), + Http3Parameters::default().webtransport(true), + ); + let mut wt_session = wt.create_wt_session(); + + assert_eq!( + wt_session.max_datagram_size(), + Ok(DATAGRAM_SIZE + - u64::try_from(Encoder::varint_len(wt_session.stream_id().as_u64())).unwrap()) + ); + assert_eq!( + wt.max_datagram_size(wt_session.stream_id()), + Err(Error::TransportError(TransportError::NotAvailable)) + ); + + assert_eq!(wt_session.send_datagram(DGRAM, None), Ok(())); + assert_eq!( + wt.send_datagram(wt_session.stream_id(), DGRAM), + Err(Error::TransportError(TransportError::TooMuchData)) + ); + + wt.exchange_packets(); + wt.check_datagram_received_client(wt_session.stream_id(), DGRAM); + wt.check_no_datagram_received_server(); +} + +#[test] +fn datagrams_multiple_session() { + let mut wt = WtTest::new(); + + let mut wt_session1 = wt.create_wt_session(); + do_datagram_test(&mut wt, &mut wt_session1); + + let mut wt_session_2 = wt.create_wt_session(); + do_datagram_test(&mut wt, &mut wt_session_2); +} diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/mod.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/mod.rs new file mode 100644 index 0000000000..51dc47e4c1 --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/mod.rs @@ -0,0 +1,661 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +mod datagrams; +mod negotiation; +mod sessions; +mod streams; +use std::{cell::RefCell, rc::Rc, time::Duration}; + +use neqo_common::event::Provider; +use neqo_crypto::AuthenticationStatus; +use neqo_transport::{ConnectionParameters, StreamId, StreamType}; +use test_fixture::{ + addr, anti_replay, fixture_init, now, CountingConnectionIdGenerator, DEFAULT_ALPN_H3, + DEFAULT_KEYS, DEFAULT_SERVER_NAME, +}; + +use crate::{ + features::extended_connect::SessionCloseReason, Error, Header, Http3Client, Http3ClientEvent, + Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, Http3State, + RecvStreamStats, SendStreamStats, WebTransportEvent, WebTransportRequest, + WebTransportServerEvent, WebTransportSessionAcceptAction, +}; + +const DATAGRAM_SIZE: u64 = 1200; + +pub fn wt_default_parameters() -> Http3Parameters { + Http3Parameters::default() + .webtransport(true) + .connection_parameters(ConnectionParameters::default().datagram_size(DATAGRAM_SIZE)) +} + +pub fn default_http3_client(client_params: Http3Parameters) -> Http3Client { + fixture_init(); + Http3Client::new( + DEFAULT_SERVER_NAME, + Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), + addr(), + addr(), + client_params, + now(), + ) + .expect("create a default client") +} + +pub fn default_http3_server(server_params: Http3Parameters) -> Http3Server { + Http3Server::new( + now(), + DEFAULT_KEYS, + DEFAULT_ALPN_H3, + anti_replay(), + Rc::new(RefCell::new(CountingConnectionIdGenerator::default())), + server_params, + None, + ) + .expect("create a server") +} + +fn exchange_packets(client: &mut Http3Client, server: &mut Http3Server) { + let mut out = None; + loop { + out = client.process(out.as_ref(), now()).dgram(); + out = server.process(out.as_ref(), now()).dgram(); + if out.is_none() { + break; + } + } +} + +// Perform only Quic transport handshake. +fn connect_with(client: &mut Http3Client, server: &mut Http3Server) { + assert_eq!(client.state(), Http3State::Initializing); + let out = client.process(None, now()); + assert_eq!(client.state(), Http3State::Initializing); + + let out = server.process(out.as_dgram_ref(), now()); + let out = client.process(out.as_dgram_ref(), now()); + let out = server.process(out.as_dgram_ref(), now()); + assert!(out.as_dgram_ref().is_none()); + + let authentication_needed = |e| matches!(e, Http3ClientEvent::AuthenticationNeeded); + assert!(client.events().any(authentication_needed)); + client.authenticated(AuthenticationStatus::Ok, now()); + + let out = client.process(out.as_dgram_ref(), now()); + let connected = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::Connected)); + assert!(client.events().any(connected)); + + assert_eq!(client.state(), Http3State::Connected); + + // Exchange H3 setttings + let out = server.process(out.as_dgram_ref(), now()); + let out = client.process(out.as_dgram_ref(), now()); + let out = server.process(out.as_dgram_ref(), now()); + let out = client.process(out.as_dgram_ref(), now()); + let out = server.process(out.as_dgram_ref(), now()); + std::mem::drop(client.process(out.as_dgram_ref(), now())); +} + +fn connect( + client_params: Http3Parameters, + server_params: Http3Parameters, +) -> (Http3Client, Http3Server) { + let mut client = default_http3_client(client_params); + let mut server = default_http3_server(server_params); + connect_with(&mut client, &mut server); + (client, server) +} + +struct WtTest { + client: Http3Client, + server: Http3Server, +} + +impl WtTest { + pub fn new() -> Self { + let (client, server) = connect(wt_default_parameters(), wt_default_parameters()); + Self { client, server } + } + + pub fn new_with_params(client_params: Http3Parameters, server_params: Http3Parameters) -> Self { + let (client, server) = connect(client_params, server_params); + Self { client, server } + } + + pub fn new_with(mut client: Http3Client, mut server: Http3Server) -> Self { + connect_with(&mut client, &mut server); + Self { client, server } + } + fn negotiate_wt_session( + &mut self, + accept: &WebTransportSessionAcceptAction, + ) -> (StreamId, Option) { + let wt_session_id = self + .client + .webtransport_create_session(now(), &("https", "something.com", "/"), &[]) + .unwrap(); + self.exchange_packets(); + + let mut wt_server_session = None; + while let Some(event) = self.server.next_event() { + match event { + Http3ServerEvent::WebTransport(WebTransportServerEvent::NewSession { + mut session, + headers, + }) => { + assert!( + headers + .iter() + .any(|h| h.name() == ":method" && h.value() == "CONNECT") + && headers + .iter() + .any(|h| h.name() == ":protocol" && h.value() == "webtransport") + ); + session.response(accept).unwrap(); + wt_server_session = Some(session); + } + Http3ServerEvent::Data { .. } => { + panic!("There should not be any data events!"); + } + _ => {} + } + } + + self.exchange_packets(); + (wt_session_id, wt_server_session) + } + + fn create_wt_session(&mut self) -> WebTransportRequest { + let (wt_session_id, wt_server_session) = + self.negotiate_wt_session(&WebTransportSessionAcceptAction::Accept); + let wt_session_negotiated_event = |e| { + matches!( + e, + Http3ClientEvent::WebTransport(WebTransportEvent::Session{ + stream_id, + status, + headers, + }) if ( + stream_id == wt_session_id && + status == 200 && + headers.contains(&Header::new(":status", "200")) + ) + ) + }; + assert!(self.client.events().any(wt_session_negotiated_event)); + + let wt_server_session = wt_server_session.unwrap(); + assert_eq!(wt_session_id, wt_server_session.stream_id()); + wt_server_session + } + + fn exchange_packets(&mut self) { + const RTT: Duration = Duration::from_millis(10); + let mut out = None; + let mut now = now(); + loop { + now += RTT / 2; + out = self.client.process(out.as_ref(), now).dgram(); + let client_none = out.is_none(); + now += RTT / 2; + out = self.server.process(out.as_ref(), now).dgram(); + if client_none && out.is_none() { + break; + } + } + } + + pub fn cancel_session_client(&mut self, wt_stream_id: StreamId) { + self.client + .cancel_fetch(wt_stream_id, Error::HttpNoError.code()) + .unwrap(); + self.exchange_packets(); + } + + fn session_closed_client( + e: &Http3ClientEvent, + id: StreamId, + expected_reason: &SessionCloseReason, + expected_headers: &Option>, + ) -> bool { + if let Http3ClientEvent::WebTransport(WebTransportEvent::SessionClosed { + stream_id, + reason, + headers, + }) = e + { + *stream_id == id && reason == expected_reason && headers == expected_headers + } else { + false + } + } + + pub fn check_session_closed_event_client( + &mut self, + wt_session_id: StreamId, + expected_reason: &SessionCloseReason, + expected_headers: &Option>, + ) { + let mut event_found = false; + + while let Some(event) = self.client.next_event() { + event_found = WtTest::session_closed_client( + &event, + wt_session_id, + expected_reason, + expected_headers, + ); + if event_found { + break; + } + } + assert!(event_found); + } + + pub fn cancel_session_server(&mut self, wt_session: &mut WebTransportRequest) { + wt_session.cancel_fetch(Error::HttpNoError.code()).unwrap(); + self.exchange_packets(); + } + + fn session_closed_server( + e: &Http3ServerEvent, + id: StreamId, + expected_reason: &SessionCloseReason, + ) -> bool { + if let Http3ServerEvent::WebTransport(WebTransportServerEvent::SessionClosed { + session, + reason, + headers, + }) = e + { + session.stream_id() == id && reason == expected_reason && headers.is_none() + } else { + false + } + } + + pub fn check_session_closed_event_server( + &mut self, + wt_session: &mut WebTransportRequest, + expected_reeason: &SessionCloseReason, + ) { + let event = self.server.next_event().unwrap(); + assert!(WtTest::session_closed_server( + &event, + wt_session.stream_id(), + expected_reeason + )); + } + + fn create_wt_stream_client( + &mut self, + wt_session_id: StreamId, + stream_type: StreamType, + ) -> StreamId { + self.client + .webtransport_create_stream(wt_session_id, stream_type) + .unwrap() + } + + fn send_data_client(&mut self, wt_stream_id: StreamId, data: &[u8]) { + assert_eq!( + self.client.send_data(wt_stream_id, data).unwrap(), + data.len() + ); + self.exchange_packets(); + } + + fn send_stream_stats(&mut self, wt_stream_id: StreamId) -> Result { + self.client.webtransport_send_stream_stats(wt_stream_id) + } + + fn recv_stream_stats(&mut self, wt_stream_id: StreamId) -> Result { + self.client.webtransport_recv_stream_stats(wt_stream_id) + } + + fn receive_data_client( + &mut self, + expected_stream_id: StreamId, + new_stream: bool, + expected_data: &[u8], + expected_fin: bool, + ) { + let mut new_stream_received = false; + let mut data_received = false; + while let Some(event) = self.client.next_event() { + match event { + Http3ClientEvent::WebTransport(WebTransportEvent::NewStream { + stream_id, .. + }) => { + assert_eq!(stream_id, expected_stream_id); + new_stream_received = true; + } + Http3ClientEvent::DataReadable { stream_id } => { + assert_eq!(stream_id, expected_stream_id); + let mut buf = [0; 100]; + let (amount, fin) = self.client.read_data(now(), stream_id, &mut buf).unwrap(); + assert_eq!(fin, expected_fin); + assert_eq!(amount, expected_data.len()); + assert_eq!(&buf[..amount], expected_data); + data_received = true; + } + _ => {} + } + } + assert!(data_received); + assert_eq!(new_stream, new_stream_received); + } + + fn close_stream_sending_client(&mut self, wt_stream_id: StreamId) { + self.client.stream_close_send(wt_stream_id).unwrap(); + self.exchange_packets(); + } + + fn reset_stream_client(&mut self, wt_stream_id: StreamId) { + self.client + .stream_reset_send(wt_stream_id, Error::HttpNoError.code()) + .unwrap(); + self.exchange_packets(); + } + + fn receive_reset_client(&mut self, expected_stream_id: StreamId) { + let wt_reset_event = |e| { + matches!( + e, + Http3ClientEvent::Reset { + stream_id, + error, + local + } if stream_id == expected_stream_id && error == Error::HttpNoError.code() && !local + ) + }; + assert!(self.client.events().any(wt_reset_event)); + } + + fn stream_stop_sending_client(&mut self, stream_id: StreamId) { + self.client + .stream_stop_sending(stream_id, Error::HttpNoError.code()) + .unwrap(); + self.exchange_packets(); + } + + fn receive_stop_sending_client(&mut self, expected_stream_id: StreamId) { + let wt_stop_sending_event = |e| { + matches!( + e, + Http3ClientEvent::StopSending { + stream_id, + error + } if stream_id == expected_stream_id && error == Error::HttpNoError.code() + ) + }; + assert!(self.client.events().any(wt_stop_sending_event)); + } + + fn check_events_after_closing_session_client( + &mut self, + expected_reset_ids: &[StreamId], + expected_error_stream_reset: Option, + expected_stop_sending_ids: &[StreamId], + expected_error_stream_stop_sending: Option, + expected_local: bool, + expected_session_close: &Option<(StreamId, SessionCloseReason)>, + ) { + let mut reset_ids_count = 0; + let mut stop_sending_ids_count = 0; + let mut close_event = false; + while let Some(event) = self.client.next_event() { + match event { + Http3ClientEvent::Reset { + stream_id, + error, + local, + } => { + assert!(expected_reset_ids.contains(&stream_id)); + assert_eq!(expected_error_stream_reset.unwrap(), error); + assert_eq!(expected_local, local); + reset_ids_count += 1; + } + Http3ClientEvent::StopSending { stream_id, error } => { + assert!(expected_stop_sending_ids.contains(&stream_id)); + assert_eq!(expected_error_stream_stop_sending.unwrap(), error); + stop_sending_ids_count += 1; + } + Http3ClientEvent::WebTransport(WebTransportEvent::SessionClosed { + stream_id, + reason, + headers, + }) => { + close_event = true; + assert_eq!(stream_id, expected_session_close.as_ref().unwrap().0); + assert_eq!(expected_session_close.as_ref().unwrap().1, reason); + assert!(headers.is_none()); + } + _ => {} + } + } + assert_eq!(reset_ids_count, expected_reset_ids.len()); + assert_eq!(stop_sending_ids_count, expected_stop_sending_ids.len()); + assert_eq!(close_event, expected_session_close.is_some()); + } + + fn create_wt_stream_server( + wt_server_session: &mut WebTransportRequest, + stream_type: StreamType, + ) -> Http3OrWebTransportStream { + wt_server_session.create_stream(stream_type).unwrap() + } + + fn send_data_server(&mut self, wt_stream: &mut Http3OrWebTransportStream, data: &[u8]) { + assert_eq!(wt_stream.send_data(data).unwrap(), data.len()); + self.exchange_packets(); + } + + fn receive_data_server( + &mut self, + stream_id: StreamId, + new_stream: bool, + expected_data: &[u8], + expected_fin: bool, + ) -> Http3OrWebTransportStream { + self.exchange_packets(); + let mut new_stream_received = false; + let mut data_received = false; + let mut wt_stream = None; + let mut stream_closed = false; + let mut recv_data = Vec::new(); + while let Some(event) = self.server.next_event() { + match event { + Http3ServerEvent::WebTransport(WebTransportServerEvent::NewStream(request)) => { + assert_eq!(stream_id, request.stream_id()); + new_stream_received = true; + } + Http3ServerEvent::Data { + mut data, + fin, + stream, + } => { + recv_data.append(&mut data); + stream_closed = fin; + data_received = true; + wt_stream = Some(stream); + } + _ => {} + } + } + assert_eq!(&recv_data[..], expected_data); + assert!(data_received); + assert_eq!(new_stream, new_stream_received); + assert_eq!(stream_closed, expected_fin); + wt_stream.unwrap() + } + + fn close_stream_sending_server(&mut self, wt_stream: &mut Http3OrWebTransportStream) { + wt_stream.stream_close_send().unwrap(); + self.exchange_packets(); + } + + fn reset_stream_server(&mut self, wt_stream: &mut Http3OrWebTransportStream) { + wt_stream + .stream_reset_send(Error::HttpNoError.code()) + .unwrap(); + self.exchange_packets(); + } + + fn stream_stop_sending_server(&mut self, wt_stream: &mut Http3OrWebTransportStream) { + wt_stream + .stream_stop_sending(Error::HttpNoError.code()) + .unwrap(); + self.exchange_packets(); + } + + fn receive_reset_server(&mut self, expected_stream_id: StreamId, expected_error: u64) { + let stream_reset = |e| { + matches!( + e, + Http3ServerEvent::StreamReset { + stream, + error + } if stream.stream_id() == expected_stream_id && error == expected_error + ) + }; + assert!(self.server.events().any(stream_reset)); + } + + fn receive_stop_sending_server(&mut self, expected_stream_id: StreamId, expected_error: u64) { + let stop_sending = |e| { + matches!( + e, + Http3ServerEvent::StreamStopSending { + stream, + error + } if stream.stream_id() == expected_stream_id && error == expected_error + ) + }; + assert!(self.server.events().any(stop_sending)); + } + + fn check_events_after_closing_session_server( + &mut self, + expected_reset_ids: &[StreamId], + expected_error_stream_reset: Option, + expected_stop_sending_ids: &[StreamId], + expected_error_stream_stop_sending: Option, + expected_session_close: &Option<(StreamId, SessionCloseReason)>, + ) { + let mut reset_ids_count = 0; + let mut stop_sending_ids_count = 0; + let mut close_event = false; + while let Some(event) = self.server.next_event() { + match event { + Http3ServerEvent::StreamReset { stream, error } => { + assert!(expected_reset_ids.contains(&stream.stream_id())); + assert_eq!(expected_error_stream_reset.unwrap(), error); + reset_ids_count += 1; + } + Http3ServerEvent::StreamStopSending { stream, error } => { + assert!(expected_stop_sending_ids.contains(&stream.stream_id())); + assert_eq!(expected_error_stream_stop_sending.unwrap(), error); + stop_sending_ids_count += 1; + } + Http3ServerEvent::WebTransport(WebTransportServerEvent::SessionClosed { + session, + reason, + headers, + }) => { + close_event = true; + assert_eq!( + session.stream_id(), + expected_session_close.as_ref().unwrap().0 + ); + assert_eq!(expected_session_close.as_ref().unwrap().1, reason); + assert!(headers.is_none()); + } + _ => {} + } + } + assert_eq!(reset_ids_count, expected_reset_ids.len()); + assert_eq!(stop_sending_ids_count, expected_stop_sending_ids.len()); + assert_eq!(close_event, expected_session_close.is_some()); + } + + pub fn session_close_frame_client(&mut self, session_id: StreamId, error: u32, message: &str) { + self.client + .webtransport_close_session(session_id, error, message) + .unwrap(); + } + + pub fn session_close_frame_server( + wt_session: &mut WebTransportRequest, + error: u32, + message: &str, + ) { + wt_session.close_session(error, message).unwrap(); + } + + fn max_datagram_size(&self, stream_id: StreamId) -> Result { + self.client.webtransport_max_datagram_size(stream_id) + } + + fn send_datagram(&mut self, stream_id: StreamId, buf: &[u8]) -> Result<(), Error> { + self.client.webtransport_send_datagram(stream_id, buf, None) + } + + fn check_datagram_received_client( + &mut self, + expected_stream_id: StreamId, + expected_dgram: &[u8], + ) { + let wt_datagram_event = |e| { + matches!( + e, + Http3ClientEvent::WebTransport(WebTransportEvent::Datagram { + session_id, + datagram + }) if session_id == expected_stream_id && datagram == expected_dgram + ) + }; + assert!(self.client.events().any(wt_datagram_event)); + } + + fn check_datagram_received_server( + &mut self, + expected_session: &WebTransportRequest, + expected_dgram: &[u8], + ) { + let wt_datagram_event = |e| { + matches!( + e, + Http3ServerEvent::WebTransport(WebTransportServerEvent::Datagram { + session, + datagram + }) if session.stream_id() == expected_session.stream_id() && datagram == expected_dgram + ) + }; + assert!(self.server.events().any(wt_datagram_event)); + } + + fn check_no_datagram_received_client(&mut self) { + let wt_datagram_event = |e| { + matches!( + e, + Http3ClientEvent::WebTransport(WebTransportEvent::Datagram { .. }) + ) + }; + assert!(!self.client.events().any(wt_datagram_event)); + } + + fn check_no_datagram_received_server(&mut self) { + let wt_datagram_event = |e| { + matches!( + e, + Http3ServerEvent::WebTransport(WebTransportServerEvent::Datagram { .. }) + ) + }; + assert!(!self.server.events().any(wt_datagram_event)); + } +} diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs new file mode 100644 index 0000000000..27f669861d --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs @@ -0,0 +1,280 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::time::Duration; + +use neqo_common::{event::Provider, Encoder}; +use neqo_crypto::AuthenticationStatus; +use neqo_transport::{Connection, ConnectionError, StreamType}; +use test_fixture::{default_server_h3, now}; + +use super::{connect, default_http3_client, default_http3_server, exchange_packets}; +use crate::{ + settings::{HSetting, HSettingType, HSettings}, + Error, HFrame, Http3Client, Http3ClientEvent, Http3Parameters, Http3Server, Http3State, + WebTransportEvent, +}; + +fn check_wt_event(client: &mut Http3Client, wt_enable_client: bool, wt_enable_server: bool) { + let wt_event = client.events().find_map(|e| { + if let Http3ClientEvent::WebTransport(WebTransportEvent::Negotiated(neg)) = e { + Some(neg) + } else { + None + } + }); + + assert_eq!(wt_event.is_some(), wt_enable_client); + if let Some(wt) = wt_event { + assert_eq!(wt, wt_enable_client && wt_enable_server); + } +} + +fn connect_wt(wt_enabled_client: bool, wt_enabled_server: bool) -> (Http3Client, Http3Server) { + connect( + Http3Parameters::default().webtransport(wt_enabled_client), + Http3Parameters::default().webtransport(wt_enabled_server), + ) +} + +#[test] +fn negotiate_wt() { + let (mut client, _server) = connect_wt(true, true); + assert!(client.webtransport_enabled()); + check_wt_event(&mut client, true, true); + + let (mut client, _server) = connect_wt(true, false); + assert!(!client.webtransport_enabled()); + check_wt_event(&mut client, true, false); + + let (mut client, _server) = connect_wt(false, true); + assert!(!client.webtransport_enabled()); + check_wt_event(&mut client, false, true); + + let (mut client, _server) = connect_wt(false, false); + assert!(!client.webtransport_enabled()); + check_wt_event(&mut client, false, false); +} + +#[derive(PartialEq, Eq)] +enum ClientState { + ClientEnabled, + ClientDisabled, +} + +#[derive(PartialEq, Eq)] +enum ServerState { + ServerEnabled, + ServerDisabled, +} + +fn zero_rtt( + client_state: &ClientState, + server_state: &ServerState, + client_resumed_state: &ClientState, + server_resumed_state: &ServerState, +) { + let client_org = ClientState::ClientEnabled.eq(client_state); + let server_org = ServerState::ServerEnabled.eq(server_state); + let client_resumed = ClientState::ClientEnabled.eq(client_resumed_state); + let server_resumed = ServerState::ServerEnabled.eq(server_resumed_state); + + let (mut client, mut server) = connect_wt(client_org, server_org); + assert_eq!(client.webtransport_enabled(), client_org && server_org); + + // exchange token + let out = server.process(None, now()); + // We do not have a token so we need to wait for a resumption token timer to trigger. + std::mem::drop(client.process(out.as_dgram_ref(), now() + Duration::from_millis(250))); + assert_eq!(client.state(), Http3State::Connected); + let token = client + .events() + .find_map(|e| { + if let Http3ClientEvent::ResumptionToken(token) = e { + Some(token) + } else { + None + } + }) + .unwrap(); + + let mut client = default_http3_client(Http3Parameters::default().webtransport(client_resumed)); + let mut server = default_http3_server(Http3Parameters::default().webtransport(server_resumed)); + client + .enable_resumption(now(), &token) + .expect("Set resumption token."); + assert_eq!(client.state(), Http3State::ZeroRtt); + + exchange_packets(&mut client, &mut server); + + assert_eq!(&client.state(), &Http3State::Connected); + assert_eq!( + client.webtransport_enabled(), + client_resumed && server_resumed + ); + + let mut early_data_accepted = true; + // The only case we should not do 0-RTT is when webtransport was enabled + // originally and is disabled afterwards. + if server_org && !server_resumed { + early_data_accepted = false; + } + assert_eq!( + client.tls_info().unwrap().early_data_accepted(), + early_data_accepted + ); + + check_wt_event(&mut client, client_resumed, server_resumed); +} + +#[test] +fn zero_rtt_wt_settings() { + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + ); + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + ); + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + ); + + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + ); + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + ); + + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + ); + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + ); + + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + ); + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + &ClientState::ClientDisabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + &ClientState::ClientEnabled, + &ServerState::ServerDisabled, + ); + zero_rtt( + &ClientState::ClientDisabled, + &ServerState::ServerEnabled, + &ClientState::ClientEnabled, + &ServerState::ServerEnabled, + ); +} + +fn exchange_packets2(client: &mut Http3Client, server: &mut Connection) { + let mut out = None; + loop { + out = client.process(out.as_ref(), now()).dgram(); + out = server.process(out.as_ref(), now()).dgram(); + if out.is_none() { + break; + } + } +} + +#[test] +fn wrong_setting_value() { + const CONTROL_STREAM_TYPE: &[u8] = &[0x0]; + let mut client = default_http3_client(Http3Parameters::default()); + let mut server = default_server_h3(); + + exchange_packets2(&mut client, &mut server); + client.authenticated(AuthenticationStatus::Ok, now()); + exchange_packets2(&mut client, &mut server); + + let control = server.stream_create(StreamType::UniDi).unwrap(); + server.stream_send(control, CONTROL_STREAM_TYPE).unwrap(); + // Encode a settings frame and send it. + let mut enc = Encoder::default(); + let settings = HFrame::Settings { + settings: HSettings::new(&[HSetting::new(HSettingType::EnableWebTransport, 2)]), + }; + settings.encode(&mut enc); + assert_eq!( + server.stream_send(control, enc.as_ref()).unwrap(), + enc.as_ref().len() + ); + + exchange_packets2(&mut client, &mut server); + match client.state() { + Http3State::Closing(err) | Http3State::Closed(err) => { + assert_eq!( + err, + ConnectionError::Application(Error::HttpSettings.code()) + ); + } + _ => panic!("Wrong state {:?}", client.state()), + }; +} diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/sessions.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/sessions.rs new file mode 100644 index 0000000000..5f929d0e4b --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/sessions.rs @@ -0,0 +1,456 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::mem; + +use neqo_common::{event::Provider, Encoder}; +use neqo_transport::StreamType; +use test_fixture::now; + +use crate::{ + features::extended_connect::{ + tests::webtransport::{ + default_http3_client, default_http3_server, wt_default_parameters, WtTest, + }, + SessionCloseReason, + }, + frames::WebTransportFrame, + Error, Header, Http3ClientEvent, Http3OrWebTransportStream, Http3Server, Http3ServerEvent, + Http3State, Priority, WebTransportEvent, WebTransportServerEvent, + WebTransportSessionAcceptAction, +}; + +#[test] +fn wt_session() { + let mut wt = WtTest::new(); + mem::drop(wt.create_wt_session()); +} + +#[test] +fn wt_session_reject() { + let mut wt = WtTest::new(); + let headers = vec![Header::new(":status", "404")]; + let accept_res = WebTransportSessionAcceptAction::Reject(headers.clone()); + let (wt_session_id, _wt_session) = wt.negotiate_wt_session(&accept_res); + + wt.check_session_closed_event_client( + wt_session_id, + &SessionCloseReason::Status(404), + &Some(headers), + ); +} + +#[test] +fn wt_session_close_client() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + wt.cancel_session_client(wt_session.stream_id()); + wt.check_session_closed_event_server( + &mut wt_session, + &SessionCloseReason::Error(Error::HttpNoError.code()), + ); +} + +#[test] +fn wt_session_close_server() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + wt.cancel_session_server(&mut wt_session); + wt.check_session_closed_event_client( + wt_session.stream_id(), + &SessionCloseReason::Error(Error::HttpNoError.code()), + &None, + ); +} + +#[test] +fn wt_session_close_server_close_send() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + wt_session.stream_close_send().unwrap(); + wt.exchange_packets(); + wt.check_session_closed_event_client( + wt_session.stream_id(), + &SessionCloseReason::Clean { + error: 0, + message: String::new(), + }, + &None, + ); +} + +#[test] +fn wt_session_close_server_stop_sending() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + wt_session + .stream_stop_sending(Error::HttpNoError.code()) + .unwrap(); + wt.exchange_packets(); + wt.check_session_closed_event_client( + wt_session.stream_id(), + &SessionCloseReason::Error(Error::HttpNoError.code()), + &None, + ); +} + +#[test] +fn wt_session_close_server_reset() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + wt_session + .stream_reset_send(Error::HttpNoError.code()) + .unwrap(); + wt.exchange_packets(); + wt.check_session_closed_event_client( + wt_session.stream_id(), + &SessionCloseReason::Error(Error::HttpNoError.code()), + &None, + ); +} + +#[test] +fn wt_session_response_with_1xx() { + let mut wt = WtTest::new(); + + let wt_session_id = wt + .client + .webtransport_create_session(now(), &("https", "something.com", "/"), &[]) + .unwrap(); + wt.exchange_packets(); + + let mut wt_server_session = None; + while let Some(event) = wt.server.next_event() { + if let Http3ServerEvent::WebTransport(WebTransportServerEvent::NewSession { + session, + headers, + }) = event + { + assert!( + headers + .iter() + .any(|h| h.name() == ":method" && h.value() == "CONNECT") + && headers + .iter() + .any(|h| h.name() == ":protocol" && h.value() == "webtransport") + ); + wt_server_session = Some(session); + } + } + + let mut wt_server_session = wt_server_session.unwrap(); + + // Send interim response. + wt_server_session + .send_headers(&[Header::new(":status", "111")]) + .unwrap(); + wt_server_session + .response(&WebTransportSessionAcceptAction::Accept) + .unwrap(); + + wt.exchange_packets(); + + let wt_session_negotiated_event = |e| { + matches!( + e, + Http3ClientEvent::WebTransport(WebTransportEvent::Session{ + stream_id, + status, + headers, + }) if ( + stream_id == wt_session_id && + status == 200 && + headers.contains(&Header::new(":status", "200")) + ) + ) + }; + assert!(wt.client.events().any(wt_session_negotiated_event)); + + assert_eq!(wt_session_id, wt_server_session.stream_id()); +} + +#[test] +fn wt_session_response_with_redirect() { + let headers = [Header::new(":status", "302"), Header::new("location", "/")].to_vec(); + let mut wt = WtTest::new(); + + let accept_res = WebTransportSessionAcceptAction::Reject(headers.clone()); + + let (wt_session_id, _wt_session) = wt.negotiate_wt_session(&accept_res); + + wt.check_session_closed_event_client( + wt_session_id, + &SessionCloseReason::Status(302), + &Some(headers), + ); +} + +#[test] +fn wt_session_respone_200_with_fin() { + let mut wt = WtTest::new(); + + let wt_session_id = wt + .client + .webtransport_create_session(now(), &("https", "something.com", "/"), &[]) + .unwrap(); + wt.exchange_packets(); + let mut wt_server_session = None; + while let Some(event) = wt.server.next_event() { + if let Http3ServerEvent::WebTransport(WebTransportServerEvent::NewSession { + session, + headers, + }) = event + { + assert!( + headers + .iter() + .any(|h| h.name() == ":method" && h.value() == "CONNECT") + && headers + .iter() + .any(|h| h.name() == ":protocol" && h.value() == "webtransport") + ); + wt_server_session = Some(session); + } + } + + let mut wt_server_session = wt_server_session.unwrap(); + wt_server_session + .response(&WebTransportSessionAcceptAction::Accept) + .unwrap(); + wt_server_session.stream_close_send().unwrap(); + + wt.exchange_packets(); + + let wt_session_close_event = |e| { + matches!( + e, + Http3ClientEvent::WebTransport(WebTransportEvent::SessionClosed{ + stream_id, + reason, + headers, + .. + }) if ( + stream_id == wt_session_id && + reason == SessionCloseReason::Clean{ error: 0, message: String::new()} && + headers.is_none() + ) + ) + }; + assert!(wt.client.events().any(wt_session_close_event)); + + assert_eq!(wt_session_id, wt_server_session.stream_id()); +} + +#[test] +fn wt_session_close_frame_client() { + const ERROR_NUM: u32 = 23; + const ERROR_MESSAGE: &str = "Something went wrong"; + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + wt.session_close_frame_client(wt_session.stream_id(), ERROR_NUM, ERROR_MESSAGE); + wt.exchange_packets(); + + wt.check_session_closed_event_server( + &mut wt_session, + &SessionCloseReason::Clean { + error: ERROR_NUM, + message: ERROR_MESSAGE.to_string(), + }, + ); +} + +#[test] +fn wt_session_close_frame_server() { + const ERROR_NUM: u32 = 23; + const ERROR_MESSAGE: &str = "Something went wrong"; + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + WtTest::session_close_frame_server(&mut wt_session, ERROR_NUM, ERROR_MESSAGE); + wt.exchange_packets(); + + wt.check_session_closed_event_client( + wt_session.stream_id(), + &SessionCloseReason::Clean { + error: ERROR_NUM, + message: ERROR_MESSAGE.to_string(), + }, + &None, + ); +} + +#[test] +fn wt_unknown_session_frame_client() { + const UNKNOWN_FRAME_LEN: usize = 832; + const BUF: &[u8] = &[0; 10]; + const ERROR_NUM: u32 = 23; + const ERROR_MESSAGE: &str = "Something went wrong"; + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + // Send an unknown frame. + let mut enc = Encoder::with_capacity(UNKNOWN_FRAME_LEN + 4); + enc.encode_varint(1028_u64); // Arbitrary type. + enc.encode_varint(UNKNOWN_FRAME_LEN as u64); + let mut buf: Vec<_> = enc.into(); + buf.resize(UNKNOWN_FRAME_LEN + buf.len(), 0); + wt.client.send_data(wt_session.stream_id(), &buf).unwrap(); + wt.exchange_packets(); + + // The session is still active + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.receive_data_client(unidi_server.stream_id(), true, BUF, false); + + // Now close the session. + wt.session_close_frame_client(wt_session.stream_id(), ERROR_NUM, ERROR_MESSAGE); + wt.exchange_packets(); + + wt.check_events_after_closing_session_client( + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + false, + &None, + ); + wt.check_events_after_closing_session_server( + &[], + None, + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Clean { + error: ERROR_NUM, + message: ERROR_MESSAGE.to_string(), + }, + )), + ); +} + +#[test] +fn wt_close_session_frame_broken_client() { + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + // Send a incorrect CloseSession frame. + let mut enc = Encoder::default(); + WebTransportFrame::CloseSession { + error: 5, + message: "Hello".to_string(), + } + .encode(&mut enc); + let mut buf: Vec<_> = enc.into(); + // Corrupt the string. + buf[9] = 0xff; + wt.client.send_data(wt_session.stream_id(), &buf).unwrap(); + wt.exchange_packets(); + + // check that the webtransport session is closed. + wt.check_session_closed_event_client( + wt_session.stream_id(), + &SessionCloseReason::Error(Error::HttpGeneralProtocolStream.code()), + &None, + ); + wt.check_session_closed_event_server( + &mut wt_session, + &SessionCloseReason::Error(Error::HttpGeneralProtocolStream.code()), + ); + + // The Http3 session is still working. + assert_eq!(wt.client.state(), Http3State::Connected); + assert_eq!(wt_session.state(), Http3State::Connected); +} + +fn receive_request(server: &mut Http3Server) -> Option { + while let Some(event) = server.next_event() { + if let Http3ServerEvent::Headers { stream, .. } = event { + return Some(stream); + } + } + None +} + +#[test] +// Ignoring this test as it is panicking at wt.create_wt_stream_client +// Issue # 1386 is created to track this +#[ignore] +fn wt_close_session_cannot_be_sent_at_once() { + const BUF: &[u8] = &[0; 443]; + const ERROR_NUM: u32 = 23; + const ERROR_MESSAGE: &str = "Something went wrong"; + + let client = default_http3_client(wt_default_parameters()); + let server = default_http3_server(wt_default_parameters()); + let mut wt = WtTest::new_with(client, server); + + let mut wt_session = wt.create_wt_session(); + + // Fill the flow control window using an unrelated http stream. + let req_id = wt + .client + .fetch( + now(), + "GET", + &("https", "something.com", "/"), + &[], + Priority::default(), + ) + .unwrap(); + assert_eq!(req_id, 4); + wt.exchange_packets(); + let mut req = receive_request(&mut wt.server).unwrap(); + req.send_headers(&[ + Header::new(":status", "200"), + Header::new("content-length", BUF.len().to_string()), + ]) + .unwrap(); + req.send_data(BUF).unwrap(); + + // Now close the session. + WtTest::session_close_frame_server(&mut wt_session, ERROR_NUM, ERROR_MESSAGE); + // server cannot create new streams. + assert_eq!( + wt_session.create_stream(StreamType::UniDi), + Err(Error::InvalidStreamId) + ); + + let out = wt.server.process(None, now()); + let out = wt.client.process(out.as_dgram_ref(), now()); + + // Client has not received the full CloseSession frame and it can create more streams. + let unidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + + let out = wt.server.process(out.as_dgram_ref(), now()); + let out = wt.client.process(out.as_dgram_ref(), now()); + let out = wt.server.process(out.as_dgram_ref(), now()); + let out = wt.client.process(out.as_dgram_ref(), now()); + let out = wt.server.process(out.as_dgram_ref(), now()); + let _out = wt.client.process(out.as_dgram_ref(), now()); + + wt.check_events_after_closing_session_client( + &[], + None, + &[unidi_client], + Some(Error::HttpRequestCancelled.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Clean { + error: ERROR_NUM, + message: ERROR_MESSAGE.to_string(), + }, + )), + ); + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/streams.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/streams.rs new file mode 100644 index 0000000000..b898dbb31e --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/streams.rs @@ -0,0 +1,1131 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::mem; + +use neqo_transport::StreamType; + +use crate::{ + features::extended_connect::{tests::webtransport::WtTest, SessionCloseReason}, + Error, +}; + +#[test] +fn wt_client_stream_uni() { + const BUF_CLIENT: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + let send_stats = wt.send_stream_stats(wt_stream).unwrap(); + assert_eq!(send_stats.bytes_written(), 0); + assert_eq!(send_stats.bytes_sent(), 0); + assert_eq!(send_stats.bytes_acked(), 0); + + wt.send_data_client(wt_stream, BUF_CLIENT); + wt.receive_data_server(wt_stream, true, BUF_CLIENT, false); + let send_stats = wt.send_stream_stats(wt_stream).unwrap(); + assert_eq!(send_stats.bytes_written(), BUF_CLIENT.len() as u64); + assert_eq!(send_stats.bytes_sent(), BUF_CLIENT.len() as u64); + assert_eq!(send_stats.bytes_acked(), BUF_CLIENT.len() as u64); + + // Send data again to test if the stats has the expected values. + wt.send_data_client(wt_stream, BUF_CLIENT); + wt.receive_data_server(wt_stream, false, BUF_CLIENT, false); + let send_stats = wt.send_stream_stats(wt_stream).unwrap(); + assert_eq!(send_stats.bytes_written(), (BUF_CLIENT.len() * 2) as u64); + assert_eq!(send_stats.bytes_sent(), (BUF_CLIENT.len() * 2) as u64); + assert_eq!(send_stats.bytes_acked(), (BUF_CLIENT.len() * 2) as u64); + + let recv_stats = wt.recv_stream_stats(wt_stream); + assert_eq!(recv_stats.unwrap_err(), Error::InvalidStreamId); +} + +#[test] +fn wt_client_stream_bidi() { + const BUF_CLIENT: &[u8] = &[0; 10]; + const BUF_SERVER: &[u8] = &[1; 20]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_client_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(wt_client_stream, BUF_CLIENT); + let mut wt_server_stream = wt.receive_data_server(wt_client_stream, true, BUF_CLIENT, false); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_client_stream, false, BUF_SERVER, false); + let send_stats = wt.send_stream_stats(wt_client_stream).unwrap(); + assert_eq!(send_stats.bytes_written(), BUF_CLIENT.len() as u64); + assert_eq!(send_stats.bytes_sent(), BUF_CLIENT.len() as u64); + assert_eq!(send_stats.bytes_acked(), BUF_CLIENT.len() as u64); + + let recv_stats = wt.recv_stream_stats(wt_client_stream).unwrap(); + assert_eq!(recv_stats.bytes_received(), BUF_SERVER.len() as u64); + assert_eq!(recv_stats.bytes_read(), BUF_SERVER.len() as u64); +} + +#[test] +fn wt_server_stream_uni() { + const BUF_SERVER: &[u8] = &[2; 30]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false); + let send_stats = wt.send_stream_stats(wt_server_stream.stream_id()); + assert_eq!(send_stats.unwrap_err(), Error::InvalidStreamId); + + let recv_stats = wt.recv_stream_stats(wt_server_stream.stream_id()).unwrap(); + assert_eq!(recv_stats.bytes_received(), BUF_SERVER.len() as u64); + assert_eq!(recv_stats.bytes_read(), BUF_SERVER.len() as u64); +} + +#[test] +fn wt_server_stream_bidi() { + const BUF_CLIENT: &[u8] = &[0; 10]; + const BUF_SERVER: &[u8] = &[1; 20]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false); + wt.send_data_client(wt_server_stream.stream_id(), BUF_CLIENT); + mem::drop(wt.receive_data_server(wt_server_stream.stream_id(), false, BUF_CLIENT, false)); + let stats = wt.send_stream_stats(wt_server_stream.stream_id()).unwrap(); + assert_eq!(stats.bytes_written(), BUF_CLIENT.len() as u64); + assert_eq!(stats.bytes_sent(), BUF_CLIENT.len() as u64); + assert_eq!(stats.bytes_acked(), BUF_CLIENT.len() as u64); + + let recv_stats = wt.recv_stream_stats(wt_server_stream.stream_id()).unwrap(); + assert_eq!(recv_stats.bytes_received(), BUF_SERVER.len() as u64); + assert_eq!(recv_stats.bytes_read(), BUF_SERVER.len() as u64); +} + +#[test] +fn wt_client_stream_uni_close() { + const BUF_CLIENT: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + wt.send_data_client(wt_stream, BUF_CLIENT); + wt.close_stream_sending_client(wt_stream); + wt.receive_data_server(wt_stream, true, BUF_CLIENT, true); +} + +#[test] +fn wt_client_stream_bidi_close() { + const BUF_CLIENT: &[u8] = &[0; 10]; + const BUF_SERVER: &[u8] = &[1; 20]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_client_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + + wt.send_data_client(wt_client_stream, BUF_CLIENT); + wt.close_stream_sending_client(wt_client_stream); + + let mut wt_server_stream = wt.receive_data_server(wt_client_stream, true, BUF_CLIENT, true); + + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.close_stream_sending_server(&mut wt_server_stream); + wt.receive_data_client(wt_client_stream, false, BUF_SERVER, true); +} + +#[test] +fn wt_server_stream_uni_closed() { + const BUF_SERVER: &[u8] = &[2; 30]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.close_stream_sending_server(&mut wt_server_stream); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, true); +} + +#[test] +fn wt_server_stream_bidi_close() { + const BUF_CLIENT: &[u8] = &[0; 10]; + const BUF_SERVER: &[u8] = &[1; 20]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.close_stream_sending_server(&mut wt_server_stream); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, true); + wt.send_data_client(wt_server_stream.stream_id(), BUF_CLIENT); + wt.close_stream_sending_client(wt_server_stream.stream_id()); + mem::drop(wt.receive_data_server(wt_server_stream.stream_id(), false, BUF_CLIENT, true)); +} + +#[test] +fn wt_client_stream_uni_reset() { + const BUF_CLIENT: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + wt.send_data_client(wt_stream, BUF_CLIENT); + mem::drop(wt.receive_data_server(wt_stream, true, BUF_CLIENT, false)); + wt.reset_stream_client(wt_stream); + wt.receive_reset_server(wt_stream, Error::HttpNoError.code()); +} + +#[test] +fn wt_server_stream_uni_reset() { + const BUF_SERVER: &[u8] = &[2; 30]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false); + wt.reset_stream_server(&mut wt_server_stream); + wt.receive_reset_client(wt_server_stream.stream_id()); +} + +#[test] +fn wt_client_stream_bidi_reset() { + const BUF_CLIENT: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_client_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + + wt.send_data_client(wt_client_stream, BUF_CLIENT); + let mut wt_server_stream = wt.receive_data_server(wt_client_stream, true, BUF_CLIENT, false); + + wt.reset_stream_client(wt_client_stream); + wt.receive_reset_server(wt_server_stream.stream_id(), Error::HttpNoError.code()); + + wt.reset_stream_server(&mut wt_server_stream); + wt.receive_reset_client(wt_client_stream); +} + +#[test] +fn wt_server_stream_bidi_reset() { + const BUF_SERVER: &[u8] = &[1; 20]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false); + + wt.reset_stream_client(wt_server_stream.stream_id()); + wt.receive_reset_server(wt_server_stream.stream_id(), Error::HttpNoError.code()); + + wt.reset_stream_server(&mut wt_server_stream); + wt.receive_reset_client(wt_server_stream.stream_id()); +} + +#[test] +fn wt_client_stream_uni_stop_sending() { + const BUF_CLIENT: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + let wt_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + wt.send_data_client(wt_stream, BUF_CLIENT); + let mut wt_server_stream = wt.receive_data_server(wt_stream, true, BUF_CLIENT, false); + wt.stream_stop_sending_server(&mut wt_server_stream); + wt.receive_stop_sending_client(wt_stream); +} + +#[test] +fn wt_server_stream_uni_stop_sending() { + const BUF_SERVER: &[u8] = &[2; 30]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false); + wt.stream_stop_sending_client(wt_server_stream.stream_id()); + wt.receive_stop_sending_server(wt_server_stream.stream_id(), Error::HttpNoError.code()); +} + +#[test] +fn wt_client_stream_bidi_stop_sending() { + const BUF_CLIENT: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let wt_client_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + + wt.send_data_client(wt_client_stream, BUF_CLIENT); + + let mut wt_server_stream = wt.receive_data_server(wt_client_stream, true, BUF_CLIENT, false); + + wt.stream_stop_sending_client(wt_client_stream); + + wt.receive_stop_sending_server(wt_server_stream.stream_id(), Error::HttpNoError.code()); + wt.stream_stop_sending_server(&mut wt_server_stream); + wt.receive_stop_sending_client(wt_server_stream.stream_id()); +} + +#[test] +fn wt_server_stream_bidi_stop_sending() { + const BUF_SERVER: &[u8] = &[1; 20]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + + wt.send_data_server(&mut wt_server_stream, BUF_SERVER); + wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false); + wt.stream_stop_sending_client(wt_server_stream.stream_id()); + wt.receive_stop_sending_server(wt_server_stream.stream_id(), Error::HttpNoError.code()); + wt.stream_stop_sending_server(&mut wt_server_stream); + wt.receive_stop_sending_client(wt_server_stream.stream_id()); +} + +// For the following tests the client cancels a session. The streams are in different states: +// 1) Both sides of a bidirectional client stream are opened. +// 2) A client unidirectional stream is opened. +// 3) A client unidirectional stream has been closed and both sides consumed the closing info. +// 4) A client unidirectional stream has been closed, but only the server has consumed the closing +// info. +// 5) A client unidirectional stream has been closed, but only the client has consum the closing +// info. +// 6) Both sides of a bidirectional server stream are opened. +// 7) A server unidirectional stream is opened. +// 8) A server unidirectional stream has been closed and both sides consumed the closing info. +// 9) A server unidirectional stream has been closed, but only the server has consumed the closing +// info. +// 10) A server unidirectional stream has been closed, but only the client has consumed the closing +// info. +// 11) Both sides of a bidirectional stream have been closed and consumed by both sides. +// 12) Both sides of a bidirectional stream have been closed, but not consumed by both sides. +// 13) Multiples open streams + +#[test] +fn wt_client_session_close_1() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let bidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(bidi_from_client, BUF); + std::mem::drop(wt.receive_data_server(bidi_from_client, true, BUF, false)); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[bidi_from_client], + Some(Error::HttpRequestCancelled.code()), + &[bidi_from_client], + Some(Error::HttpRequestCancelled.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[bidi_from_client], + Some(Error::HttpRequestCancelled.code()), + &[bidi_from_client], + Some(Error::HttpRequestCancelled.code()), + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_2() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let unidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + + wt.send_data_client(unidi_from_client, BUF); + std::mem::drop(wt.receive_data_server(unidi_from_client, true, BUF, false)); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[unidi_from_client], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[], + None, + &[unidi_from_client], + Some(Error::HttpRequestCancelled.code()), + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_3() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let unidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + + wt.send_data_client(unidi_from_client, BUF); + std::mem::drop(wt.receive_data_server(unidi_from_client, true, BUF, false)); + wt.close_stream_sending_client(unidi_from_client); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None); +} + +#[test] +fn wt_client_session_close_4() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let unidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + + wt.send_data_client(unidi_from_client, BUF); + let mut unidi_from_client_s = wt.receive_data_server(unidi_from_client, true, BUF, false); + wt.stream_stop_sending_server(&mut unidi_from_client_s); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[], + None, + &[unidi_from_client], + Some(Error::HttpNoError.code()), + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_5() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let unidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + + wt.send_data_client(unidi_from_client, BUF); + mem::drop(wt.receive_data_server(unidi_from_client, true, BUF, false)); + wt.reset_stream_client(unidi_from_client); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[unidi_from_client], + Some(Error::HttpNoError.code()), + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None); +} + +#[test] +fn wt_client_session_close_6() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut bidi_from_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut bidi_from_server, BUF); + wt.receive_data_client(bidi_from_server.stream_id(), true, BUF, false); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[bidi_from_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[bidi_from_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[bidi_from_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[bidi_from_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_7() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_from_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_from_server, BUF); + wt.receive_data_client(unidi_from_server.stream_id(), true, BUF, false); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[unidi_from_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[unidi_from_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_8() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.close_stream_sending_server(&mut unidi_server); + wt.receive_data_client(unidi_server.stream_id(), true, BUF, true); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None); +} + +#[test] +fn wt_client_session_close_9() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.stream_stop_sending_client(unidi_server.stream_id()); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[unidi_server.stream_id()], + Some(Error::HttpNoError.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None); +} + +#[test] +fn wt_client_session_close_10() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.close_stream_sending_server(&mut unidi_server); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_11() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut bidi_server, BUF); + wt.close_stream_sending_server(&mut bidi_server); + wt.receive_data_client(bidi_server.stream_id(), true, BUF, true); + wt.stream_stop_sending_server(&mut bidi_server); + wt.receive_stop_sending_client(bidi_server.stream_id()); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None); +} + +#[test] +fn wt_client_session_close_12() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut bidi_server, BUF); + wt.close_stream_sending_server(&mut bidi_server); + wt.stream_stop_sending_server(&mut bidi_server); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[], + None, + &[], + None, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[bidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[bidi_server.stream_id()], + Some(Error::HttpNoError.code()), + false, + &None, + ); +} + +#[test] +fn wt_client_session_close_13() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let wt_session = wt.create_wt_session(); + + let bidi_client_1 = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(bidi_client_1, BUF); + std::mem::drop(wt.receive_data_server(bidi_client_1, true, BUF, false)); + let bidi_client_2 = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(bidi_client_2, BUF); + std::mem::drop(wt.receive_data_server(bidi_client_2, true, BUF, false)); + + wt.cancel_session_client(wt_session.stream_id()); + + wt.check_events_after_closing_session_server( + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_client( + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + false, + &None, + ); +} + +// For the following tests the server cancels a session. The streams are in different states: +// 1) Both sides of a bidirectional client stream are opened. +// 2) A client unidirectional stream is opened. +// 3) A client unidirectional stream has been closed and consumed by both sides. +// 4) A client unidirectional stream has been closed, but not consumed by the client. +// 5) Both sides of a bidirectional server stream are opened. +// 6) A server unidirectional stream is opened. +// 7) A server unidirectional stream has been closed and consumed by both sides. +// 8) A server unidirectional stream has been closed, but not consumed by the client. +// 9) Both sides of a bidirectional stream have been closed and consumed by both sides. +// 10) Both sides of a bidirectional stream have been closed, but not consumed by the client. +// 12) Multiples open streams + +#[test] +fn wt_client_session_server_close_1() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let bidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(bidi_client, BUF); + std::mem::drop(wt.receive_data_server(bidi_client, true, BUF, false)); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[bidi_client], + Some(Error::HttpRequestCancelled.code()), + &[bidi_client], + Some(Error::HttpRequestCancelled.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server( + &[bidi_client], + Some(Error::HttpRequestCancelled.code()), + &[bidi_client], + Some(Error::HttpRequestCancelled.code()), + &None, + ); +} + +#[test] +fn wt_client_session_server_close_2() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let unidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + wt.send_data_client(unidi_client, BUF); + std::mem::drop(wt.receive_data_server(unidi_client, true, BUF, false)); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[], + None, + &[unidi_client], + Some(Error::HttpRequestCancelled.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server( + &[unidi_client], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + &None, + ); +} + +#[test] +fn wt_client_session_server_close_3() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let unidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + wt.send_data_client(unidi_client, BUF); + let mut unidi_client_s = wt.receive_data_server(unidi_client, true, BUF, false); + wt.stream_stop_sending_server(&mut unidi_client_s); + wt.receive_stop_sending_client(unidi_client); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[], + None, + &[], + None, + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} + +#[test] +fn wt_client_session_server_close_4() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let unidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi); + wt.send_data_client(unidi_client, BUF); + let mut unidi_client_s = wt.receive_data_server(unidi_client, true, BUF, false); + wt.stream_stop_sending_server(&mut unidi_client_s); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[], + None, + &[unidi_client], + Some(Error::HttpNoError.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} + +#[test] +fn wt_client_session_server_close_5() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut bidi_server, BUF); + wt.receive_data_client(bidi_server.stream_id(), true, BUF, false); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[bidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[bidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server( + &[bidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[bidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &None, + ); +} + +#[test] +fn wt_client_session_server_close_6() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.receive_data_client(unidi_server.stream_id(), true, BUF, false); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + wt.check_events_after_closing_session_server( + &[], + None, + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &None, + ); +} + +#[test] +fn wt_client_session_server_close_7() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.close_stream_sending_server(&mut unidi_server); + wt.receive_data_client(unidi_server.stream_id(), true, BUF, true); + + wt.cancel_session_server(&mut wt_session); + + // Already close stream will not have a reset event. + wt.check_events_after_closing_session_client( + &[], + None, + &[], + None, + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} + +#[test] +fn wt_client_session_server_close_8() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.close_stream_sending_server(&mut unidi_server); + + wt.cancel_session_server(&mut wt_session); + + // The stream was only closed on the server side therefore it is cancelled on the client side. + wt.check_events_after_closing_session_client( + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} + +#[test] +fn wt_client_session_server_close_9() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut bidi_server, BUF); + wt.close_stream_sending_server(&mut bidi_server); + wt.receive_data_client(bidi_server.stream_id(), true, BUF, true); + wt.stream_stop_sending_server(&mut bidi_server); + wt.receive_stop_sending_client(bidi_server.stream_id()); + + wt.cancel_session_server(&mut wt_session); + + // Already close stream will not have a reset event. + wt.check_events_after_closing_session_client( + &[], + None, + &[], + None, + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} + +#[test] +fn wt_client_session_server_close_10() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi); + wt.send_data_server(&mut bidi_server, BUF); + wt.close_stream_sending_server(&mut bidi_server); + wt.stream_stop_sending_server(&mut bidi_server); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[bidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[bidi_server.stream_id()], + Some(Error::HttpNoError.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server(&[], None, &[], None, &None); +} + +#[test] +fn wt_client_session_server_close_11() { + const BUF: &[u8] = &[0; 10]; + + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let bidi_client_1 = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(bidi_client_1, BUF); + std::mem::drop(wt.receive_data_server(bidi_client_1, true, BUF, false)); + let bidi_client_2 = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi); + wt.send_data_client(bidi_client_2, BUF); + std::mem::drop(wt.receive_data_server(bidi_client_2, true, BUF, false)); + + wt.cancel_session_server(&mut wt_session); + + wt.check_events_after_closing_session_client( + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + false, + &Some(( + wt_session.stream_id(), + SessionCloseReason::Error(Error::HttpNoError.code()), + )), + ); + + wt.check_events_after_closing_session_server( + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + &[bidi_client_1, bidi_client_2], + Some(Error::HttpRequestCancelled.code()), + &None, + ); +} + +#[test] +fn wt_session_close_frame_and_streams_client() { + const BUF: &[u8] = &[0; 10]; + const ERROR_NUM: u32 = 23; + const ERROR_MESSAGE: &str = "Something went wrong"; + let mut wt = WtTest::new(); + let mut wt_session = wt.create_wt_session(); + + let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi); + wt.send_data_server(&mut unidi_server, BUF); + wt.exchange_packets(); + + wt.session_close_frame_client(wt_session.stream_id(), ERROR_NUM, ERROR_MESSAGE); + wt.check_events_after_closing_session_client( + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &[], + None, + false, + &None, + ); + wt.exchange_packets(); + + wt.check_events_after_closing_session_server( + &[], + None, + &[unidi_server.stream_id()], + Some(Error::HttpRequestCancelled.code()), + &Some(( + wt_session.stream_id(), + SessionCloseReason::Clean { + error: ERROR_NUM, + message: ERROR_MESSAGE.to_string(), + }, + )), + ); +} diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/webtransport_session.rs b/third_party/rust/neqo-http3/src/features/extended_connect/webtransport_session.rs new file mode 100644 index 0000000000..adbdf07e11 --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/webtransport_session.rs @@ -0,0 +1,555 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![allow(clippy::module_name_repetitions)] + +use std::{any::Any, cell::RefCell, collections::BTreeSet, mem, rc::Rc}; + +use neqo_common::{qtrace, Encoder, Header, MessageType, Role}; +use neqo_qpack::{QPackDecoder, QPackEncoder}; +use neqo_transport::{streams::SendOrder, Connection, DatagramTracking, StreamId}; + +use super::{ExtendedConnectEvents, ExtendedConnectType, SessionCloseReason}; +use crate::{ + frames::{FrameReader, StreamReaderRecvStreamWrapper, WebTransportFrame}, + recv_message::{RecvMessage, RecvMessageInfo}, + send_message::SendMessage, + CloseType, Error, HFrame, Http3StreamInfo, Http3StreamType, HttpRecvStream, + HttpRecvStreamEvents, Priority, PriorityHandler, ReceiveOutput, RecvStream, RecvStreamEvents, + Res, SendStream, SendStreamEvents, Stream, +}; + +#[derive(Debug, PartialEq)] +enum SessionState { + Negotiating, + Active, + FinPending, + Done, +} + +impl SessionState { + pub fn closing_state(&self) -> bool { + matches!(self, Self::FinPending | Self::Done) + } +} + +#[derive(Debug)] +pub(crate) struct WebTransportSession { + control_stream_recv: Box, + control_stream_send: Box, + stream_event_listener: Rc>, + session_id: StreamId, + state: SessionState, + frame_reader: FrameReader, + events: Box, + send_streams: BTreeSet, + recv_streams: BTreeSet, + role: Role, +} + +impl ::std::fmt::Display for WebTransportSession { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "WebTransportSession session={}", self.session_id,) + } +} + +impl WebTransportSession { + #[must_use] + pub fn new( + session_id: StreamId, + events: Box, + role: Role, + qpack_encoder: Rc>, + qpack_decoder: Rc>, + ) -> Self { + let stream_event_listener = Rc::new(RefCell::new(WebTransportSessionListener::default())); + Self { + control_stream_recv: Box::new(RecvMessage::new( + &RecvMessageInfo { + message_type: MessageType::Response, + stream_type: Http3StreamType::ExtendedConnect, + stream_id: session_id, + header_frame_type_read: false, + }, + qpack_decoder, + Box::new(stream_event_listener.clone()), + None, + PriorityHandler::new(false, Priority::default()), + )), + control_stream_send: Box::new(SendMessage::new( + MessageType::Request, + Http3StreamType::ExtendedConnect, + session_id, + qpack_encoder, + Box::new(stream_event_listener.clone()), + )), + stream_event_listener, + session_id, + state: SessionState::Negotiating, + frame_reader: FrameReader::new(), + events, + send_streams: BTreeSet::new(), + recv_streams: BTreeSet::new(), + role, + } + } + + /// # Panics + /// + /// This function is only called with `RecvStream` and `SendStream` that also implement + /// the http specific functions and `http_stream()` will never return `None`. + #[must_use] + pub fn new_with_http_streams( + session_id: StreamId, + events: Box, + role: Role, + mut control_stream_recv: Box, + mut control_stream_send: Box, + ) -> Self { + let stream_event_listener = Rc::new(RefCell::new(WebTransportSessionListener::default())); + control_stream_recv + .http_stream() + .unwrap() + .set_new_listener(Box::new(stream_event_listener.clone())); + control_stream_send + .http_stream() + .unwrap() + .set_new_listener(Box::new(stream_event_listener.clone())); + Self { + control_stream_recv, + control_stream_send, + stream_event_listener, + session_id, + state: SessionState::Active, + frame_reader: FrameReader::new(), + events, + send_streams: BTreeSet::new(), + recv_streams: BTreeSet::new(), + role, + } + } + + /// # Errors + /// + /// The function can only fail if supplied headers are not valid http headers. + /// + /// # Panics + /// + /// `control_stream_send` implements the http specific functions and `http_stream()` + /// will never return `None`. + pub fn send_request(&mut self, headers: &[Header], conn: &mut Connection) -> Res<()> { + self.control_stream_send + .http_stream() + .unwrap() + .send_headers(headers, conn) + } + + fn receive(&mut self, conn: &mut Connection) -> Res<(ReceiveOutput, bool)> { + qtrace!([self], "receive control data"); + let (out, _) = self.control_stream_recv.receive(conn)?; + debug_assert!(out == ReceiveOutput::NoOutput); + self.maybe_check_headers(); + self.read_control_stream(conn)?; + Ok((ReceiveOutput::NoOutput, self.state == SessionState::Done)) + } + + fn header_unblocked(&mut self, conn: &mut Connection) -> Res<(ReceiveOutput, bool)> { + let (out, _) = self + .control_stream_recv + .http_stream() + .unwrap() + .header_unblocked(conn)?; + debug_assert!(out == ReceiveOutput::NoOutput); + self.maybe_check_headers(); + self.read_control_stream(conn)?; + Ok((ReceiveOutput::NoOutput, self.state == SessionState::Done)) + } + + fn maybe_update_priority(&mut self, priority: Priority) -> bool { + self.control_stream_recv + .http_stream() + .unwrap() + .maybe_update_priority(priority) + } + + fn priority_update_frame(&mut self) -> Option { + self.control_stream_recv + .http_stream() + .unwrap() + .priority_update_frame() + } + + fn priority_update_sent(&mut self) { + self.control_stream_recv + .http_stream() + .unwrap() + .priority_update_sent(); + } + + fn send(&mut self, conn: &mut Connection) -> Res<()> { + self.control_stream_send.send(conn)?; + if self.control_stream_send.done() { + self.state = SessionState::Done; + } + Ok(()) + } + + fn has_data_to_send(&self) -> bool { + self.control_stream_send.has_data_to_send() + } + + fn done(&self) -> bool { + self.state == SessionState::Done + } + + fn close(&mut self, close_type: CloseType) { + if self.state.closing_state() { + return; + } + qtrace!("ExtendedConnect close the session"); + self.state = SessionState::Done; + if !close_type.locally_initiated() { + self.events.session_end( + ExtendedConnectType::WebTransport, + self.session_id, + SessionCloseReason::from(close_type), + None, + ); + } + } + + /// # Panics + /// + /// This cannot panic because headers are checked before this function called. + pub fn maybe_check_headers(&mut self) { + if SessionState::Negotiating != self.state { + return; + } + + if let Some((headers, interim, fin)) = self.stream_event_listener.borrow_mut().get_headers() + { + qtrace!( + "ExtendedConnect response headers {:?}, fin={}", + headers, + fin + ); + + if interim { + if fin { + self.events.session_end( + ExtendedConnectType::WebTransport, + self.session_id, + SessionCloseReason::Clean { + error: 0, + message: String::new(), + }, + Some(headers), + ); + self.state = SessionState::Done; + } + } else { + let status = headers + .iter() + .find_map(|h| { + if h.name() == ":status" { + h.value().parse::().ok() + } else { + None + } + }) + .unwrap(); + + self.state = if (200..300).contains(&status) { + if fin { + self.events.session_end( + ExtendedConnectType::WebTransport, + self.session_id, + SessionCloseReason::Clean { + error: 0, + message: String::new(), + }, + Some(headers), + ); + SessionState::Done + } else { + self.events.session_start( + ExtendedConnectType::WebTransport, + self.session_id, + status, + headers, + ); + SessionState::Active + } + } else { + self.events.session_end( + ExtendedConnectType::WebTransport, + self.session_id, + SessionCloseReason::Status(status), + Some(headers), + ); + SessionState::Done + }; + } + } + } + + pub fn add_stream(&mut self, stream_id: StreamId) { + if let SessionState::Active = self.state { + if stream_id.is_bidi() { + self.send_streams.insert(stream_id); + self.recv_streams.insert(stream_id); + } else if stream_id.is_self_initiated(self.role) { + self.send_streams.insert(stream_id); + } else { + self.recv_streams.insert(stream_id); + } + + if !stream_id.is_self_initiated(self.role) { + self.events + .extended_connect_new_stream(Http3StreamInfo::new( + stream_id, + ExtendedConnectType::WebTransport.get_stream_type(self.session_id), + )); + } + } + } + + pub fn remove_recv_stream(&mut self, stream_id: StreamId) { + self.recv_streams.remove(&stream_id); + } + + pub fn remove_send_stream(&mut self, stream_id: StreamId) { + self.send_streams.remove(&stream_id); + } + + #[must_use] + pub fn is_active(&self) -> bool { + matches!(self.state, SessionState::Active) + } + + pub fn take_sub_streams(&mut self) -> (BTreeSet, BTreeSet) { + ( + mem::take(&mut self.recv_streams), + mem::take(&mut self.send_streams), + ) + } + + /// # Errors + /// + /// It may return an error if the frame is not correctly decoded. + pub fn read_control_stream(&mut self, conn: &mut Connection) -> Res<()> { + let (f, fin) = self + .frame_reader + .receive::(&mut StreamReaderRecvStreamWrapper::new( + conn, + &mut self.control_stream_recv, + )) + .map_err(|_| Error::HttpGeneralProtocolStream)?; + qtrace!([self], "Received frame: {:?} fin={}", f, fin); + if let Some(WebTransportFrame::CloseSession { error, message }) = f { + self.events.session_end( + ExtendedConnectType::WebTransport, + self.session_id, + SessionCloseReason::Clean { error, message }, + None, + ); + self.state = if fin { + SessionState::Done + } else { + SessionState::FinPending + }; + } else if fin { + self.events.session_end( + ExtendedConnectType::WebTransport, + self.session_id, + SessionCloseReason::Clean { + error: 0, + message: String::new(), + }, + None, + ); + self.state = SessionState::Done; + } + Ok(()) + } + + /// # Errors + /// + /// Return an error if the stream was closed on the transport layer, but that information is not + /// yet consumed on the http/3 layer. + pub fn close_session(&mut self, conn: &mut Connection, error: u32, message: &str) -> Res<()> { + self.state = SessionState::Done; + let close_frame = WebTransportFrame::CloseSession { + error, + message: message.to_string(), + }; + let mut encoder = Encoder::default(); + close_frame.encode(&mut encoder); + self.control_stream_send + .send_data_atomic(conn, encoder.as_ref())?; + self.control_stream_send.close(conn)?; + self.state = if self.control_stream_send.done() { + SessionState::Done + } else { + SessionState::FinPending + }; + Ok(()) + } + + fn send_data(&mut self, conn: &mut Connection, buf: &[u8]) -> Res { + self.control_stream_send.send_data(conn, buf) + } + + /// # Errors + /// + /// Returns an error if the datagram exceeds the remote datagram size limit. + pub fn send_datagram( + &self, + conn: &mut Connection, + buf: &[u8], + id: impl Into, + ) -> Res<()> { + qtrace!([self], "send_datagram state={:?}", self.state); + if let SessionState::Active = self.state { + let mut dgram_data = Encoder::default(); + dgram_data.encode_varint(self.session_id.as_u64() / 4); + dgram_data.encode(buf); + conn.send_datagram(dgram_data.as_ref(), id)?; + } else { + debug_assert!(false); + return Err(Error::Unavailable); + } + Ok(()) + } + + pub fn datagram(&mut self, datagram: Vec) { + if let SessionState::Active = self.state { + self.events.new_datagram(self.session_id, datagram); + } + } +} + +impl Stream for Rc> { + fn stream_type(&self) -> Http3StreamType { + Http3StreamType::ExtendedConnect + } +} + +impl RecvStream for Rc> { + fn receive(&mut self, conn: &mut Connection) -> Res<(ReceiveOutput, bool)> { + self.borrow_mut().receive(conn) + } + + fn reset(&mut self, close_type: CloseType) -> Res<()> { + self.borrow_mut().close(close_type); + Ok(()) + } + + fn http_stream(&mut self) -> Option<&mut dyn HttpRecvStream> { + Some(self) + } + + fn webtransport(&self) -> Option>> { + Some(self.clone()) + } +} + +impl HttpRecvStream for Rc> { + fn header_unblocked(&mut self, conn: &mut Connection) -> Res<(ReceiveOutput, bool)> { + self.borrow_mut().header_unblocked(conn) + } + + fn maybe_update_priority(&mut self, priority: Priority) -> bool { + self.borrow_mut().maybe_update_priority(priority) + } + + fn priority_update_frame(&mut self) -> Option { + self.borrow_mut().priority_update_frame() + } + + fn priority_update_sent(&mut self) { + self.borrow_mut().priority_update_sent(); + } + + fn any(&self) -> &dyn Any { + self + } +} + +impl SendStream for Rc> { + fn send(&mut self, conn: &mut Connection) -> Res<()> { + self.borrow_mut().send(conn) + } + + fn send_data(&mut self, conn: &mut Connection, buf: &[u8]) -> Res { + self.borrow_mut().send_data(conn, buf) + } + + fn has_data_to_send(&self) -> bool { + self.borrow_mut().has_data_to_send() + } + + fn set_sendorder(&mut self, _conn: &mut Connection, _sendorder: Option) -> Res<()> { + // Not relevant on session + Ok(()) + } + + fn set_fairness(&mut self, _conn: &mut Connection, _fairness: bool) -> Res<()> { + // Not relevant on session + Ok(()) + } + + fn stream_writable(&self) {} + + fn done(&self) -> bool { + self.borrow_mut().done() + } + + fn close(&mut self, conn: &mut Connection) -> Res<()> { + self.borrow_mut().close_session(conn, 0, "") + } + + fn close_with_message(&mut self, conn: &mut Connection, error: u32, message: &str) -> Res<()> { + self.borrow_mut().close_session(conn, error, message) + } + + fn handle_stop_sending(&mut self, close_type: CloseType) { + self.borrow_mut().close(close_type); + } +} + +#[derive(Debug, Default)] +struct WebTransportSessionListener { + headers: Option<(Vec
, bool, bool)>, +} + +impl WebTransportSessionListener { + fn set_headers(&mut self, headers: Vec
, interim: bool, fin: bool) { + self.headers = Some((headers, interim, fin)); + } + + pub fn get_headers(&mut self) -> Option<(Vec
, bool, bool)> { + mem::take(&mut self.headers) + } +} + +impl RecvStreamEvents for Rc> {} + +impl HttpRecvStreamEvents for Rc> { + fn header_ready( + &self, + _stream_info: Http3StreamInfo, + headers: Vec
, + interim: bool, + fin: bool, + ) { + if !interim || fin { + self.borrow_mut().set_headers(headers, interim, fin); + } + } +} + +impl SendStreamEvents for Rc> {} diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/webtransport_streams.rs b/third_party/rust/neqo-http3/src/features/extended_connect/webtransport_streams.rs new file mode 100644 index 0000000000..84dcd20618 --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/extended_connect/webtransport_streams.rs @@ -0,0 +1,271 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{cell::RefCell, rc::Rc}; + +use neqo_common::Encoder; +use neqo_transport::{Connection, RecvStreamStats, SendStreamStats, StreamId}; + +use super::WebTransportSession; +use crate::{ + CloseType, Http3StreamInfo, Http3StreamType, ReceiveOutput, RecvStream, RecvStreamEvents, Res, + SendStream, SendStreamEvents, Stream, +}; + +pub const WEBTRANSPORT_UNI_STREAM: u64 = 0x54; +pub const WEBTRANSPORT_STREAM: u64 = 0x41; + +#[derive(Debug)] +pub(crate) struct WebTransportRecvStream { + stream_id: StreamId, + events: Box, + session: Rc>, + session_id: StreamId, + fin: bool, +} + +impl WebTransportRecvStream { + pub fn new( + stream_id: StreamId, + session_id: StreamId, + events: Box, + session: Rc>, + ) -> Self { + Self { + stream_id, + events, + session_id, + session, + fin: false, + } + } + + fn get_info(&self) -> Http3StreamInfo { + Http3StreamInfo::new(self.stream_id, self.stream_type()) + } +} + +impl Stream for WebTransportRecvStream { + fn stream_type(&self) -> Http3StreamType { + Http3StreamType::WebTransport(self.session_id) + } +} + +impl RecvStream for WebTransportRecvStream { + fn receive(&mut self, _conn: &mut Connection) -> Res<(ReceiveOutput, bool)> { + self.events.data_readable(self.get_info()); + Ok((ReceiveOutput::NoOutput, false)) + } + + fn reset(&mut self, close_type: CloseType) -> Res<()> { + if !matches!(close_type, CloseType::ResetApp(_)) { + self.events.recv_closed(self.get_info(), close_type); + } + self.session.borrow_mut().remove_recv_stream(self.stream_id); + Ok(()) + } + + fn read_data(&mut self, conn: &mut Connection, buf: &mut [u8]) -> Res<(usize, bool)> { + let (amount, fin) = conn.stream_recv(self.stream_id, buf)?; + self.fin = fin; + if fin { + self.session.borrow_mut().remove_recv_stream(self.stream_id); + } + Ok((amount, fin)) + } + + fn stats(&mut self, conn: &mut Connection) -> Res { + const TYPE_LEN_UNI: usize = Encoder::varint_len(WEBTRANSPORT_UNI_STREAM); + const TYPE_LEN_BIDI: usize = Encoder::varint_len(WEBTRANSPORT_STREAM); + + let stream_header_size = if self.stream_id.is_server_initiated() { + let id_len = if self.stream_id.is_uni() { + TYPE_LEN_UNI + } else { + TYPE_LEN_BIDI + }; + (id_len + Encoder::varint_len(self.session_id.as_u64())) as u64 + } else { + 0 + }; + + let stats = conn.recv_stream_stats(self.stream_id)?; + if stream_header_size == 0 { + return Ok(stats); + } + + let subtract_non_app_bytes = + |count: u64| -> u64 { count.saturating_sub(stream_header_size) }; + + let bytes_received = subtract_non_app_bytes(stats.bytes_received()); + let bytes_read = subtract_non_app_bytes(stats.bytes_read()); + + Ok(RecvStreamStats::new(bytes_received, bytes_read)) + } +} + +#[derive(Debug, PartialEq)] +enum WebTransportSenderStreamState { + SendingInit { buf: Vec, fin: bool }, + SendingData, + Done, +} + +#[derive(Debug)] +pub(crate) struct WebTransportSendStream { + stream_id: StreamId, + state: WebTransportSenderStreamState, + events: Box, + session: Rc>, + session_id: StreamId, +} + +impl WebTransportSendStream { + pub fn new( + stream_id: StreamId, + session_id: StreamId, + events: Box, + session: Rc>, + local: bool, + ) -> Self { + Self { + stream_id, + state: if local { + let mut d = Encoder::default(); + if stream_id.is_uni() { + d.encode_varint(WEBTRANSPORT_UNI_STREAM); + } else { + d.encode_varint(WEBTRANSPORT_STREAM); + } + d.encode_varint(session_id.as_u64()); + WebTransportSenderStreamState::SendingInit { + buf: d.into(), + fin: false, + } + } else { + WebTransportSenderStreamState::SendingData + }, + events, + session_id, + session, + } + } + + fn set_done(&mut self, close_type: CloseType) { + self.state = WebTransportSenderStreamState::Done; + self.events.send_closed(self.get_info(), close_type); + self.session.borrow_mut().remove_send_stream(self.stream_id); + } + + fn get_info(&self) -> Http3StreamInfo { + Http3StreamInfo::new(self.stream_id, self.stream_type()) + } +} + +impl Stream for WebTransportSendStream { + fn stream_type(&self) -> Http3StreamType { + Http3StreamType::WebTransport(self.session_id) + } +} + +impl SendStream for WebTransportSendStream { + fn send(&mut self, conn: &mut Connection) -> Res<()> { + if let WebTransportSenderStreamState::SendingInit { ref mut buf, fin } = self.state { + let sent = conn.stream_send(self.stream_id, &buf[..])?; + if sent == buf.len() { + if fin { + conn.stream_close_send(self.stream_id)?; + self.set_done(CloseType::Done); + } else { + self.state = WebTransportSenderStreamState::SendingData; + } + } else { + let b = buf.split_off(sent); + *buf = b; + } + } + Ok(()) + } + + fn has_data_to_send(&self) -> bool { + matches!( + self.state, + WebTransportSenderStreamState::SendingInit { .. } + ) + } + + fn stream_writable(&self) { + self.events.data_writable(self.get_info()); + } + + fn done(&self) -> bool { + self.state == WebTransportSenderStreamState::Done + } + + fn send_data(&mut self, conn: &mut Connection, buf: &[u8]) -> Res { + self.send(conn)?; + if self.state == WebTransportSenderStreamState::SendingData { + let sent = conn.stream_send(self.stream_id, buf)?; + Ok(sent) + } else { + Ok(0) + } + } + + fn set_sendorder(&mut self, conn: &mut Connection, sendorder: Option) -> Res<()> { + conn.stream_sendorder(self.stream_id, sendorder) + .map_err(|_| crate::Error::InvalidStreamId) + } + + fn set_fairness(&mut self, conn: &mut Connection, fairness: bool) -> Res<()> { + conn.stream_fairness(self.stream_id, fairness) + .map_err(|_| crate::Error::InvalidStreamId) + } + + fn handle_stop_sending(&mut self, close_type: CloseType) { + self.set_done(close_type); + } + + fn close(&mut self, conn: &mut Connection) -> Res<()> { + if let WebTransportSenderStreamState::SendingInit { ref mut fin, .. } = self.state { + *fin = true; + } else { + self.state = WebTransportSenderStreamState::Done; + conn.stream_close_send(self.stream_id)?; + self.set_done(CloseType::Done); + } + Ok(()) + } + + fn stats(&mut self, conn: &mut Connection) -> Res { + const TYPE_LEN_UNI: usize = Encoder::varint_len(WEBTRANSPORT_UNI_STREAM); + const TYPE_LEN_BIDI: usize = Encoder::varint_len(WEBTRANSPORT_STREAM); + + let stream_header_size = if self.stream_id.is_client_initiated() { + let id_len = if self.stream_id.is_uni() { + TYPE_LEN_UNI + } else { + TYPE_LEN_BIDI + }; + (id_len + Encoder::varint_len(self.session_id.as_u64())) as u64 + } else { + 0 + }; + + let stats = conn.send_stream_stats(self.stream_id)?; + if stream_header_size == 0 { + return Ok(stats); + } + + let subtract_non_app_bytes = + |count: u64| -> u64 { count.saturating_sub(stream_header_size) }; + + let bytes_written = subtract_non_app_bytes(stats.bytes_written()); + let bytes_sent = subtract_non_app_bytes(stats.bytes_sent()); + let bytes_acked = subtract_non_app_bytes(stats.bytes_acked()); + Ok(SendStreamStats::new(bytes_written, bytes_sent, bytes_acked)) + } +} diff --git a/third_party/rust/neqo-http3/src/features/mod.rs b/third_party/rust/neqo-http3/src/features/mod.rs new file mode 100644 index 0000000000..34e21f50ac --- /dev/null +++ b/third_party/rust/neqo-http3/src/features/mod.rs @@ -0,0 +1,92 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{fmt::Debug, mem}; + +use neqo_common::qtrace; + +use crate::{ + client_events::Http3ClientEvents, + settings::{HSettingType, HSettings}, +}; + +pub mod extended_connect; + +/// States: +/// - `Disable` - it is not turned on for this connection. +/// - `Negotiating` - the feature is enabled locally, but settings from the peer have not been +/// received yet. +/// - `Negotiated` - the settings have been received and both sides support the feature. +/// - `NegotiationFailed` - the settings have been received and the peer does not support the +/// feature. +#[derive(Debug)] +pub enum NegotiationState { + Disabled, + Negotiating { + feature_type: HSettingType, + listener: Option, + }, + Negotiated, + NegotiationFailed, +} + +impl NegotiationState { + #[must_use] + pub fn new(enable: bool, feature_type: HSettingType) -> Self { + if enable { + Self::Negotiating { + feature_type, + listener: None, + } + } else { + Self::Disabled + } + } + + pub fn set_listener(&mut self, new_listener: Http3ClientEvents) { + if let Self::Negotiating { listener, .. } = self { + *listener = Some(new_listener); + } + } + + pub fn handle_settings(&mut self, settings: &HSettings) { + if !self.locally_enabled() { + return; + } + + if let Self::Negotiating { + feature_type, + listener, + } = self + { + qtrace!( + "set_negotiated {:?} to {}", + feature_type, + settings.get(*feature_type) + ); + let cb = mem::take(listener); + let ft = *feature_type; + *self = if settings.get(ft) == 1 { + Self::Negotiated + } else { + Self::NegotiationFailed + }; + if let Some(l) = cb { + l.negotiation_done(ft, self.enabled()); + } + } + } + + #[must_use] + pub fn enabled(&self) -> bool { + matches!(self, &Self::Negotiated) + } + + #[must_use] + pub fn locally_enabled(&self) -> bool { + !matches!(self, &Self::Disabled) + } +} -- cgit v1.2.3