summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-http3/src/features/extended_connect
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/neqo-http3/src/features/extended_connect
parentInitial commit. (diff)
downloadfirefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz
firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esr
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/neqo-http3/src/features/extended_connect')
-rw-r--r--third_party/rust/neqo-http3/src/features/extended_connect/mod.rs114
-rw-r--r--third_party/rust/neqo-http3/src/features/extended_connect/tests/mod.rs7
-rw-r--r--third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/datagrams.rs142
-rw-r--r--third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/mod.rs648
-rw-r--r--third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs278
-rw-r--r--third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/sessions.rs450
-rw-r--r--third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/streams.rs1081
-rw-r--r--third_party/rust/neqo-http3/src/features/extended_connect/webtransport_session.rs539
-rw-r--r--third_party/rust/neqo-http3/src/features/extended_connect/webtransport_streams.rs202
9 files changed, 3461 insertions, 0 deletions
diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/mod.rs b/third_party/rust/neqo-http3/src/features/extended_connect/mod.rs
new file mode 100644
index 0000000000..6be92dabba
--- /dev/null
+++ b/third_party/rust/neqo-http3/src/features/extended_connect/mod.rs
@@ -0,0 +1,114 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+#![allow(clippy::module_name_repetitions)]
+
+pub(crate) mod webtransport_session;
+pub(crate) mod webtransport_streams;
+
+use crate::client_events::Http3ClientEvents;
+use crate::features::NegotiationState;
+use crate::settings::{HSettingType, HSettings};
+use crate::{CloseType, Http3StreamInfo, Http3StreamType};
+use neqo_common::Header;
+use neqo_transport::{AppError, StreamId};
+use std::fmt::Debug;
+pub(crate) use webtransport_session::WebTransportSession;
+
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum SessionCloseReason {
+ Error(AppError),
+ Status(u16),
+ Clean { error: u32, message: String },
+}
+
+impl From<CloseType> for SessionCloseReason {
+ fn from(close_type: CloseType) -> SessionCloseReason {
+ match close_type {
+ CloseType::ResetApp(e) | CloseType::ResetRemote(e) | CloseType::LocalError(e) => {
+ SessionCloseReason::Error(e)
+ }
+ CloseType::Done => SessionCloseReason::Clean {
+ error: 0,
+ message: String::new(),
+ },
+ }
+ }
+}
+
+pub(crate) trait ExtendedConnectEvents: Debug {
+ fn session_start(
+ &self,
+ connect_type: ExtendedConnectType,
+ stream_id: StreamId,
+ status: u16,
+ headers: Vec<Header>,
+ );
+ fn session_end(
+ &self,
+ connect_type: ExtendedConnectType,
+ stream_id: StreamId,
+ reason: SessionCloseReason,
+ headers: Option<Vec<Header>>,
+ );
+ fn extended_connect_new_stream(&self, stream_info: Http3StreamInfo);
+ fn new_datagram(&self, session_id: StreamId, datagram: Vec<u8>);
+}
+
+#[derive(Debug, PartialEq, Copy, Clone, Eq)]
+pub(crate) enum ExtendedConnectType {
+ WebTransport,
+}
+
+impl ExtendedConnectType {
+ #[must_use]
+ #[allow(clippy::unused_self)] // This will change when we have more features using ExtendedConnectType.
+ pub fn string(&self) -> &str {
+ "webtransport"
+ }
+
+ #[allow(clippy::unused_self)] // This will change when we have more features using ExtendedConnectType.
+ #[must_use]
+ pub fn get_stream_type(self, session_id: StreamId) -> Http3StreamType {
+ Http3StreamType::WebTransport(session_id)
+ }
+}
+
+impl From<ExtendedConnectType> for HSettingType {
+ fn from(_type: ExtendedConnectType) -> Self {
+ // This will change when we have more features using ExtendedConnectType.
+ HSettingType::EnableWebTransport
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct ExtendedConnectFeature {
+ feature_negotiation: NegotiationState,
+}
+
+impl ExtendedConnectFeature {
+ #[must_use]
+ pub fn new(connect_type: ExtendedConnectType, enable: bool) -> Self {
+ Self {
+ feature_negotiation: NegotiationState::new(enable, HSettingType::from(connect_type)),
+ }
+ }
+
+ pub fn set_listener(&mut self, new_listener: Http3ClientEvents) {
+ self.feature_negotiation.set_listener(new_listener);
+ }
+
+ pub fn handle_settings(&mut self, settings: &HSettings) {
+ self.feature_negotiation.handle_settings(settings);
+ }
+
+ #[must_use]
+ pub fn enabled(&self) -> bool {
+ self.feature_negotiation.enabled()
+ }
+}
+#[cfg(test)]
+mod tests;
diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/mod.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/mod.rs
new file mode 100644
index 0000000000..a21f63dbf8
--- /dev/null
+++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/mod.rs
@@ -0,0 +1,7 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+mod webtransport;
diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/datagrams.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/datagrams.rs
new file mode 100644
index 0000000000..1b9511b255
--- /dev/null
+++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/datagrams.rs
@@ -0,0 +1,142 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use crate::features::extended_connect::tests::webtransport::{
+ wt_default_parameters, WtTest, DATAGRAM_SIZE,
+};
+use crate::{Error, Http3Parameters, WebTransportRequest};
+use neqo_common::Encoder;
+use neqo_transport::Error as TransportError;
+use std::convert::TryFrom;
+
+const DGRAM: &[u8] = &[0, 100];
+
+#[test]
+fn no_datagrams() {
+ let mut wt = WtTest::new_with_params(
+ Http3Parameters::default().webtransport(true),
+ Http3Parameters::default().webtransport(true),
+ );
+ let mut wt_session = wt.create_wt_session();
+
+ assert_eq!(
+ wt_session.max_datagram_size(),
+ Err(Error::TransportError(TransportError::NotAvailable))
+ );
+ assert_eq!(
+ wt.max_datagram_size(wt_session.stream_id()),
+ Err(Error::TransportError(TransportError::NotAvailable))
+ );
+
+ assert_eq!(
+ wt_session.send_datagram(DGRAM, None),
+ Err(Error::TransportError(TransportError::TooMuchData))
+ );
+ assert_eq!(
+ wt.send_datagram(wt_session.stream_id(), DGRAM),
+ Err(Error::TransportError(TransportError::TooMuchData))
+ );
+
+ wt.exchange_packets();
+ wt.check_no_datagram_received_client();
+ wt.check_no_datagram_received_server();
+}
+
+fn do_datagram_test(wt: &mut WtTest, wt_session: &mut WebTransportRequest) {
+ assert_eq!(
+ wt_session.max_datagram_size(),
+ Ok(DATAGRAM_SIZE
+ - u64::try_from(Encoder::varint_len(wt_session.stream_id().as_u64())).unwrap())
+ );
+ assert_eq!(
+ wt.max_datagram_size(wt_session.stream_id()),
+ Ok(DATAGRAM_SIZE
+ - u64::try_from(Encoder::varint_len(wt_session.stream_id().as_u64())).unwrap())
+ );
+
+ assert_eq!(wt_session.send_datagram(DGRAM, None), Ok(()));
+ assert_eq!(wt.send_datagram(wt_session.stream_id(), DGRAM), Ok(()));
+
+ wt.exchange_packets();
+ wt.check_datagram_received_client(wt_session.stream_id(), DGRAM);
+ wt.check_datagram_received_server(wt_session, DGRAM);
+}
+
+#[test]
+fn datagrams() {
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+ do_datagram_test(&mut wt, &mut wt_session);
+}
+
+#[test]
+fn datagrams_server_only() {
+ let mut wt = WtTest::new_with_params(
+ Http3Parameters::default().webtransport(true),
+ wt_default_parameters(),
+ );
+ let mut wt_session = wt.create_wt_session();
+
+ assert_eq!(
+ wt_session.max_datagram_size(),
+ Err(Error::TransportError(TransportError::NotAvailable))
+ );
+ assert_eq!(
+ wt.max_datagram_size(wt_session.stream_id()),
+ Ok(DATAGRAM_SIZE
+ - u64::try_from(Encoder::varint_len(wt_session.stream_id().as_u64())).unwrap())
+ );
+
+ assert_eq!(
+ wt_session.send_datagram(DGRAM, None),
+ Err(Error::TransportError(TransportError::TooMuchData))
+ );
+ assert_eq!(wt.send_datagram(wt_session.stream_id(), DGRAM), Ok(()));
+
+ wt.exchange_packets();
+ wt.check_datagram_received_server(&wt_session, DGRAM);
+ wt.check_no_datagram_received_client();
+}
+
+#[test]
+fn datagrams_client_only() {
+ let mut wt = WtTest::new_with_params(
+ wt_default_parameters(),
+ Http3Parameters::default().webtransport(true),
+ );
+ let mut wt_session = wt.create_wt_session();
+
+ assert_eq!(
+ wt_session.max_datagram_size(),
+ Ok(DATAGRAM_SIZE
+ - u64::try_from(Encoder::varint_len(wt_session.stream_id().as_u64())).unwrap())
+ );
+ assert_eq!(
+ wt.max_datagram_size(wt_session.stream_id()),
+ Err(Error::TransportError(TransportError::NotAvailable))
+ );
+
+ assert_eq!(wt_session.send_datagram(DGRAM, None), Ok(()));
+ assert_eq!(
+ wt.send_datagram(wt_session.stream_id(), DGRAM),
+ Err(Error::TransportError(TransportError::TooMuchData))
+ );
+
+ wt.exchange_packets();
+ wt.check_datagram_received_client(wt_session.stream_id(), DGRAM);
+ wt.check_no_datagram_received_server();
+}
+
+#[test]
+fn datagrams_multiple_session() {
+ let mut wt = WtTest::new();
+
+ let mut wt_session1 = wt.create_wt_session();
+ do_datagram_test(&mut wt, &mut wt_session1);
+
+ let mut wt_session_2 = wt.create_wt_session();
+ do_datagram_test(&mut wt, &mut wt_session_2);
+}
diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/mod.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/mod.rs
new file mode 100644
index 0000000000..58ebfd8a6c
--- /dev/null
+++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/mod.rs
@@ -0,0 +1,648 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+mod datagrams;
+mod negotiation;
+mod sessions;
+mod streams;
+use neqo_common::event::Provider;
+
+use crate::{
+ features::extended_connect::SessionCloseReason, Error, Header, Http3Client, Http3ClientEvent,
+ Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, Http3State,
+ WebTransportEvent, WebTransportRequest, WebTransportServerEvent,
+ WebTransportSessionAcceptAction,
+};
+use neqo_crypto::AuthenticationStatus;
+use neqo_transport::{ConnectionParameters, StreamId, StreamType};
+use std::cell::RefCell;
+use std::rc::Rc;
+
+use test_fixture::{
+ addr, anti_replay, fixture_init, now, CountingConnectionIdGenerator, DEFAULT_ALPN_H3,
+ DEFAULT_KEYS, DEFAULT_SERVER_NAME,
+};
+
+const DATAGRAM_SIZE: u64 = 1200;
+
+pub fn wt_default_parameters() -> Http3Parameters {
+ Http3Parameters::default()
+ .webtransport(true)
+ .connection_parameters(ConnectionParameters::default().datagram_size(DATAGRAM_SIZE))
+}
+
+pub fn default_http3_client(client_params: Http3Parameters) -> Http3Client {
+ fixture_init();
+ Http3Client::new(
+ DEFAULT_SERVER_NAME,
+ Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
+ addr(),
+ addr(),
+ client_params,
+ now(),
+ )
+ .expect("create a default client")
+}
+
+pub fn default_http3_server(server_params: Http3Parameters) -> Http3Server {
+ Http3Server::new(
+ now(),
+ DEFAULT_KEYS,
+ DEFAULT_ALPN_H3,
+ anti_replay(),
+ Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
+ server_params,
+ None,
+ )
+ .expect("create a server")
+}
+
+fn exchange_packets(client: &mut Http3Client, server: &mut Http3Server) {
+ let mut out = None;
+ loop {
+ out = client.process(out, now()).dgram();
+ out = server.process(out, now()).dgram();
+ if out.is_none() {
+ break;
+ }
+ }
+}
+
+// Perform only Quic transport handshake.
+fn connect_with(client: &mut Http3Client, server: &mut Http3Server) {
+ assert_eq!(client.state(), Http3State::Initializing);
+ let out = client.process(None, now());
+ assert_eq!(client.state(), Http3State::Initializing);
+
+ let out = server.process(out.dgram(), now());
+ let out = client.process(out.dgram(), now());
+ let out = server.process(out.dgram(), now());
+ assert!(out.as_dgram_ref().is_none());
+
+ let authentication_needed = |e| matches!(e, Http3ClientEvent::AuthenticationNeeded);
+ assert!(client.events().any(authentication_needed));
+ client.authenticated(AuthenticationStatus::Ok, now());
+
+ let out = client.process(out.dgram(), now());
+ let connected = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::Connected));
+ assert!(client.events().any(connected));
+
+ assert_eq!(client.state(), Http3State::Connected);
+
+ // Exchange H3 setttings
+ let out = server.process(out.dgram(), now());
+ let out = client.process(out.dgram(), now());
+ let out = server.process(out.dgram(), now());
+ let out = client.process(out.dgram(), now());
+ std::mem::drop(server.process(out.dgram(), now()));
+}
+
+fn connect(
+ client_params: Http3Parameters,
+ server_params: Http3Parameters,
+) -> (Http3Client, Http3Server) {
+ let mut client = default_http3_client(client_params);
+ let mut server = default_http3_server(server_params);
+ connect_with(&mut client, &mut server);
+ (client, server)
+}
+
+struct WtTest {
+ client: Http3Client,
+ server: Http3Server,
+}
+
+impl WtTest {
+ pub fn new() -> Self {
+ let (client, server) = connect(wt_default_parameters(), wt_default_parameters());
+ Self { client, server }
+ }
+
+ pub fn new_with_params(client_params: Http3Parameters, server_params: Http3Parameters) -> Self {
+ let (client, server) = connect(client_params, server_params);
+ Self { client, server }
+ }
+
+ pub fn new_with(mut client: Http3Client, mut server: Http3Server) -> Self {
+ connect_with(&mut client, &mut server);
+ Self { client, server }
+ }
+ fn negotiate_wt_session(
+ &mut self,
+ accept: &WebTransportSessionAcceptAction,
+ ) -> (StreamId, Option<WebTransportRequest>) {
+ let wt_session_id = self
+ .client
+ .webtransport_create_session(now(), &("https", "something.com", "/"), &[])
+ .unwrap();
+ self.exchange_packets();
+
+ let mut wt_server_session = None;
+ while let Some(event) = self.server.next_event() {
+ match event {
+ Http3ServerEvent::WebTransport(WebTransportServerEvent::NewSession {
+ mut session,
+ headers,
+ }) => {
+ assert!(
+ headers
+ .iter()
+ .any(|h| h.name() == ":method" && h.value() == "CONNECT")
+ && headers
+ .iter()
+ .any(|h| h.name() == ":protocol" && h.value() == "webtransport")
+ );
+ session.response(accept).unwrap();
+ wt_server_session = Some(session);
+ }
+ Http3ServerEvent::Data { .. } => {
+ panic!("There should not be any data events!");
+ }
+ _ => {}
+ }
+ }
+
+ self.exchange_packets();
+ (wt_session_id, wt_server_session)
+ }
+
+ fn create_wt_session(&mut self) -> WebTransportRequest {
+ let (wt_session_id, wt_server_session) =
+ self.negotiate_wt_session(&WebTransportSessionAcceptAction::Accept);
+ let wt_session_negotiated_event = |e| {
+ matches!(
+ e,
+ Http3ClientEvent::WebTransport(WebTransportEvent::Session{
+ stream_id,
+ status,
+ headers,
+ }) if (
+ stream_id == wt_session_id &&
+ status == 200 &&
+ headers.contains(&Header::new(":status", "200"))
+ )
+ )
+ };
+ assert!(self.client.events().any(wt_session_negotiated_event));
+
+ let wt_server_session = wt_server_session.unwrap();
+ assert_eq!(wt_session_id, wt_server_session.stream_id());
+ wt_server_session
+ }
+
+ fn exchange_packets(&mut self) {
+ let mut out = None;
+ loop {
+ out = self.client.process(out, now()).dgram();
+ out = self.server.process(out, now()).dgram();
+ if out.is_none() {
+ break;
+ }
+ }
+ }
+
+ pub fn cancel_session_client(&mut self, wt_stream_id: StreamId) {
+ self.client
+ .cancel_fetch(wt_stream_id, Error::HttpNoError.code())
+ .unwrap();
+ self.exchange_packets();
+ }
+
+ fn session_closed_client(
+ e: &Http3ClientEvent,
+ id: StreamId,
+ expected_reason: &SessionCloseReason,
+ expected_headers: &Option<Vec<Header>>,
+ ) -> bool {
+ if let Http3ClientEvent::WebTransport(WebTransportEvent::SessionClosed {
+ stream_id,
+ reason,
+ headers,
+ }) = e
+ {
+ *stream_id == id && reason == expected_reason && headers == expected_headers
+ } else {
+ false
+ }
+ }
+
+ pub fn check_session_closed_event_client(
+ &mut self,
+ wt_session_id: StreamId,
+ expected_reason: &SessionCloseReason,
+ expected_headers: &Option<Vec<Header>>,
+ ) {
+ let mut event_found = false;
+
+ while let Some(event) = self.client.next_event() {
+ event_found = WtTest::session_closed_client(
+ &event,
+ wt_session_id,
+ expected_reason,
+ expected_headers,
+ );
+ if event_found {
+ break;
+ }
+ }
+ assert!(event_found);
+ }
+
+ pub fn cancel_session_server(&mut self, wt_session: &mut WebTransportRequest) {
+ wt_session.cancel_fetch(Error::HttpNoError.code()).unwrap();
+ self.exchange_packets();
+ }
+
+ fn session_closed_server(
+ e: &Http3ServerEvent,
+ id: StreamId,
+ expected_reason: &SessionCloseReason,
+ ) -> bool {
+ if let Http3ServerEvent::WebTransport(WebTransportServerEvent::SessionClosed {
+ session,
+ reason,
+ headers,
+ }) = e
+ {
+ session.stream_id() == id && reason == expected_reason && headers.is_none()
+ } else {
+ false
+ }
+ }
+
+ pub fn check_session_closed_event_server(
+ &mut self,
+ wt_session: &mut WebTransportRequest,
+ expected_reeason: &SessionCloseReason,
+ ) {
+ let event = self.server.next_event().unwrap();
+ assert!(WtTest::session_closed_server(
+ &event,
+ wt_session.stream_id(),
+ expected_reeason
+ ));
+ }
+
+ fn create_wt_stream_client(
+ &mut self,
+ wt_session_id: StreamId,
+ stream_type: StreamType,
+ ) -> StreamId {
+ self.client
+ .webtransport_create_stream(wt_session_id, stream_type)
+ .unwrap()
+ }
+
+ fn send_data_client(&mut self, wt_stream_id: StreamId, data: &[u8]) {
+ assert_eq!(
+ self.client.send_data(wt_stream_id, data).unwrap(),
+ data.len()
+ );
+ self.exchange_packets();
+ }
+
+ fn receive_data_client(
+ &mut self,
+ expected_stream_id: StreamId,
+ new_stream: bool,
+ expected_data: &[u8],
+ expected_fin: bool,
+ ) {
+ let mut new_stream_received = false;
+ let mut data_received = false;
+ while let Some(event) = self.client.next_event() {
+ match event {
+ Http3ClientEvent::WebTransport(WebTransportEvent::NewStream {
+ stream_id, ..
+ }) => {
+ assert_eq!(stream_id, expected_stream_id);
+ new_stream_received = true;
+ }
+ Http3ClientEvent::DataReadable { stream_id } => {
+ assert_eq!(stream_id, expected_stream_id);
+ let mut buf = [0; 100];
+ let (amount, fin) = self.client.read_data(now(), stream_id, &mut buf).unwrap();
+ assert_eq!(fin, expected_fin);
+ assert_eq!(amount, expected_data.len());
+ assert_eq!(&buf[..amount], expected_data);
+ data_received = true;
+ }
+ _ => {}
+ }
+ }
+ assert!(data_received);
+ assert_eq!(new_stream, new_stream_received);
+ }
+
+ fn close_stream_sending_client(&mut self, wt_stream_id: StreamId) {
+ self.client.stream_close_send(wt_stream_id).unwrap();
+ self.exchange_packets();
+ }
+
+ fn reset_stream_client(&mut self, wt_stream_id: StreamId) {
+ self.client
+ .stream_reset_send(wt_stream_id, Error::HttpNoError.code())
+ .unwrap();
+ self.exchange_packets();
+ }
+
+ fn receive_reset_client(&mut self, expected_stream_id: StreamId) {
+ let wt_reset_event = |e| {
+ matches!(
+ e,
+ Http3ClientEvent::Reset {
+ stream_id,
+ error,
+ local
+ } if stream_id == expected_stream_id && error == Error::HttpNoError.code() && !local
+ )
+ };
+ assert!(self.client.events().any(wt_reset_event));
+ }
+
+ fn stream_stop_sending_client(&mut self, stream_id: StreamId) {
+ self.client
+ .stream_stop_sending(stream_id, Error::HttpNoError.code())
+ .unwrap();
+ self.exchange_packets();
+ }
+
+ fn receive_stop_sending_client(&mut self, expected_stream_id: StreamId) {
+ let wt_stop_sending_event = |e| {
+ matches!(
+ e,
+ Http3ClientEvent::StopSending {
+ stream_id,
+ error
+ } if stream_id == expected_stream_id && error == Error::HttpNoError.code()
+ )
+ };
+ assert!(self.client.events().any(wt_stop_sending_event));
+ }
+
+ fn check_events_after_closing_session_client(
+ &mut self,
+ expected_reset_ids: &[StreamId],
+ expected_error_stream_reset: Option<u64>,
+ expected_stop_sending_ids: &[StreamId],
+ expected_error_stream_stop_sending: Option<u64>,
+ expected_local: bool,
+ expected_session_close: &Option<(StreamId, SessionCloseReason)>,
+ ) {
+ let mut reset_ids_count = 0;
+ let mut stop_sending_ids_count = 0;
+ let mut close_event = false;
+ while let Some(event) = self.client.next_event() {
+ match event {
+ Http3ClientEvent::Reset {
+ stream_id,
+ error,
+ local,
+ } => {
+ assert!(expected_reset_ids.contains(&stream_id));
+ assert_eq!(expected_error_stream_reset.unwrap(), error);
+ assert_eq!(expected_local, local);
+ reset_ids_count += 1;
+ }
+ Http3ClientEvent::StopSending { stream_id, error } => {
+ assert!(expected_stop_sending_ids.contains(&stream_id));
+ assert_eq!(expected_error_stream_stop_sending.unwrap(), error);
+ stop_sending_ids_count += 1;
+ }
+ Http3ClientEvent::WebTransport(WebTransportEvent::SessionClosed {
+ stream_id,
+ reason,
+ headers,
+ }) => {
+ close_event = true;
+ assert_eq!(stream_id, expected_session_close.as_ref().unwrap().0);
+ assert_eq!(expected_session_close.as_ref().unwrap().1, reason);
+ assert!(headers.is_none());
+ }
+ _ => {}
+ }
+ }
+ assert_eq!(reset_ids_count, expected_reset_ids.len());
+ assert_eq!(stop_sending_ids_count, expected_stop_sending_ids.len());
+ assert_eq!(close_event, expected_session_close.is_some());
+ }
+
+ fn create_wt_stream_server(
+ wt_server_session: &mut WebTransportRequest,
+ stream_type: StreamType,
+ ) -> Http3OrWebTransportStream {
+ wt_server_session.create_stream(stream_type).unwrap()
+ }
+
+ fn send_data_server(&mut self, wt_stream: &mut Http3OrWebTransportStream, data: &[u8]) {
+ assert_eq!(wt_stream.send_data(data).unwrap(), data.len());
+ self.exchange_packets();
+ }
+
+ fn receive_data_server(
+ &mut self,
+ stream_id: StreamId,
+ new_stream: bool,
+ expected_data: &[u8],
+ expected_fin: bool,
+ ) -> Http3OrWebTransportStream {
+ self.exchange_packets();
+ let mut new_stream_received = false;
+ let mut data_received = false;
+ let mut wt_stream = None;
+ let mut stream_closed = false;
+ let mut recv_data = Vec::new();
+ while let Some(event) = self.server.next_event() {
+ match event {
+ Http3ServerEvent::WebTransport(WebTransportServerEvent::NewStream(request)) => {
+ assert_eq!(stream_id, request.stream_id());
+ new_stream_received = true;
+ }
+ Http3ServerEvent::Data {
+ mut data,
+ fin,
+ stream,
+ } => {
+ recv_data.append(&mut data);
+ stream_closed = fin;
+ data_received = true;
+ wt_stream = Some(stream);
+ }
+ _ => {}
+ }
+ }
+ assert_eq!(&recv_data[..], expected_data);
+ assert!(data_received);
+ assert_eq!(new_stream, new_stream_received);
+ assert_eq!(stream_closed, expected_fin);
+ wt_stream.unwrap()
+ }
+
+ fn close_stream_sending_server(&mut self, wt_stream: &mut Http3OrWebTransportStream) {
+ wt_stream.stream_close_send().unwrap();
+ self.exchange_packets();
+ }
+
+ fn reset_stream_server(&mut self, wt_stream: &mut Http3OrWebTransportStream) {
+ wt_stream
+ .stream_reset_send(Error::HttpNoError.code())
+ .unwrap();
+ self.exchange_packets();
+ }
+
+ fn stream_stop_sending_server(&mut self, wt_stream: &mut Http3OrWebTransportStream) {
+ wt_stream
+ .stream_stop_sending(Error::HttpNoError.code())
+ .unwrap();
+ self.exchange_packets();
+ }
+
+ fn receive_reset_server(&mut self, expected_stream_id: StreamId, expected_error: u64) {
+ let stream_reset = |e| {
+ matches!(
+ e,
+ Http3ServerEvent::StreamReset {
+ stream,
+ error
+ } if stream.stream_id() == expected_stream_id && error == expected_error
+ )
+ };
+ assert!(self.server.events().any(stream_reset));
+ }
+
+ fn receive_stop_sending_server(&mut self, expected_stream_id: StreamId, expected_error: u64) {
+ let stop_sending = |e| {
+ matches!(
+ e,
+ Http3ServerEvent::StreamStopSending {
+ stream,
+ error
+ } if stream.stream_id() == expected_stream_id && error == expected_error
+ )
+ };
+ assert!(self.server.events().any(stop_sending));
+ }
+
+ fn check_events_after_closing_session_server(
+ &mut self,
+ expected_reset_ids: &[StreamId],
+ expected_error_stream_reset: Option<u64>,
+ expected_stop_sending_ids: &[StreamId],
+ expected_error_stream_stop_sending: Option<u64>,
+ expected_session_close: &Option<(StreamId, SessionCloseReason)>,
+ ) {
+ let mut reset_ids_count = 0;
+ let mut stop_sending_ids_count = 0;
+ let mut close_event = false;
+ while let Some(event) = self.server.next_event() {
+ match event {
+ Http3ServerEvent::StreamReset { stream, error } => {
+ assert!(expected_reset_ids.contains(&stream.stream_id()));
+ assert_eq!(expected_error_stream_reset.unwrap(), error);
+ reset_ids_count += 1;
+ }
+ Http3ServerEvent::StreamStopSending { stream, error } => {
+ assert!(expected_stop_sending_ids.contains(&stream.stream_id()));
+ assert_eq!(expected_error_stream_stop_sending.unwrap(), error);
+ stop_sending_ids_count += 1;
+ }
+ Http3ServerEvent::WebTransport(WebTransportServerEvent::SessionClosed {
+ session,
+ reason,
+ headers,
+ }) => {
+ close_event = true;
+ assert_eq!(
+ session.stream_id(),
+ expected_session_close.as_ref().unwrap().0
+ );
+ assert_eq!(expected_session_close.as_ref().unwrap().1, reason);
+ assert!(headers.is_none());
+ }
+ _ => {}
+ }
+ }
+ assert_eq!(reset_ids_count, expected_reset_ids.len());
+ assert_eq!(stop_sending_ids_count, expected_stop_sending_ids.len());
+ assert_eq!(close_event, expected_session_close.is_some());
+ }
+
+ pub fn session_close_frame_client(&mut self, session_id: StreamId, error: u32, message: &str) {
+ self.client
+ .webtransport_close_session(session_id, error, message)
+ .unwrap();
+ }
+
+ pub fn session_close_frame_server(
+ wt_session: &mut WebTransportRequest,
+ error: u32,
+ message: &str,
+ ) {
+ wt_session.close_session(error, message).unwrap();
+ }
+
+ fn max_datagram_size(&self, stream_id: StreamId) -> Result<u64, Error> {
+ self.client.webtransport_max_datagram_size(stream_id)
+ }
+
+ fn send_datagram(&mut self, stream_id: StreamId, buf: &[u8]) -> Result<(), Error> {
+ self.client.webtransport_send_datagram(stream_id, buf, None)
+ }
+
+ fn check_datagram_received_client(
+ &mut self,
+ expected_stream_id: StreamId,
+ expected_dgram: &[u8],
+ ) {
+ let wt_datagram_event = |e| {
+ matches!(
+ e,
+ Http3ClientEvent::WebTransport(WebTransportEvent::Datagram {
+ session_id,
+ datagram
+ }) if session_id == expected_stream_id && datagram == expected_dgram
+ )
+ };
+ assert!(self.client.events().any(wt_datagram_event));
+ }
+
+ fn check_datagram_received_server(
+ &mut self,
+ expected_session: &WebTransportRequest,
+ expected_dgram: &[u8],
+ ) {
+ let wt_datagram_event = |e| {
+ matches!(
+ e,
+ Http3ServerEvent::WebTransport(WebTransportServerEvent::Datagram {
+ session,
+ datagram
+ }) if session.stream_id() == expected_session.stream_id() && datagram == expected_dgram
+ )
+ };
+ assert!(self.server.events().any(wt_datagram_event));
+ }
+
+ fn check_no_datagram_received_client(&mut self) {
+ let wt_datagram_event = |e| {
+ matches!(
+ e,
+ Http3ClientEvent::WebTransport(WebTransportEvent::Datagram { .. })
+ )
+ };
+ assert!(!self.client.events().any(wt_datagram_event));
+ }
+
+ fn check_no_datagram_received_server(&mut self) {
+ let wt_datagram_event = |e| {
+ matches!(
+ e,
+ Http3ServerEvent::WebTransport(WebTransportServerEvent::Datagram { .. })
+ )
+ };
+ assert!(!self.server.events().any(wt_datagram_event));
+ }
+}
diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs
new file mode 100644
index 0000000000..23784e5609
--- /dev/null
+++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/negotiation.rs
@@ -0,0 +1,278 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use super::{connect, default_http3_client, default_http3_server, exchange_packets};
+use crate::{
+ settings::{HSetting, HSettingType, HSettings},
+ Error, HFrame, Http3Client, Http3ClientEvent, Http3Parameters, Http3Server, Http3State,
+ WebTransportEvent,
+};
+use neqo_common::{event::Provider, Encoder};
+use neqo_crypto::AuthenticationStatus;
+use neqo_transport::{Connection, ConnectionError, StreamType};
+use std::time::Duration;
+use test_fixture::{default_server_h3, now};
+
+fn check_wt_event(client: &mut Http3Client, wt_enable_client: bool, wt_enable_server: bool) {
+ let wt_event = client.events().find_map(|e| {
+ if let Http3ClientEvent::WebTransport(WebTransportEvent::Negotiated(neg)) = e {
+ Some(neg)
+ } else {
+ None
+ }
+ });
+
+ assert_eq!(wt_event.is_some(), wt_enable_client);
+ if let Some(wt) = wt_event {
+ assert_eq!(wt, wt_enable_client && wt_enable_server);
+ }
+}
+
+fn connect_wt(wt_enabled_client: bool, wt_enabled_server: bool) -> (Http3Client, Http3Server) {
+ connect(
+ Http3Parameters::default().webtransport(wt_enabled_client),
+ Http3Parameters::default().webtransport(wt_enabled_server),
+ )
+}
+
+#[test]
+fn negotiate_wt() {
+ let (mut client, _server) = connect_wt(true, true);
+ assert!(client.webtransport_enabled());
+ check_wt_event(&mut client, true, true);
+
+ let (mut client, _server) = connect_wt(true, false);
+ assert!(!client.webtransport_enabled());
+ check_wt_event(&mut client, true, false);
+
+ let (mut client, _server) = connect_wt(false, true);
+ assert!(!client.webtransport_enabled());
+ check_wt_event(&mut client, false, true);
+
+ let (mut client, _server) = connect_wt(false, false);
+ assert!(!client.webtransport_enabled());
+ check_wt_event(&mut client, false, false);
+}
+
+#[derive(PartialEq, Eq)]
+enum ClientState {
+ ClientEnabled,
+ ClientDisabled,
+}
+
+#[derive(PartialEq, Eq)]
+enum ServerState {
+ ServerEnabled,
+ ServerDisabled,
+}
+
+fn zero_rtt(
+ client_state: &ClientState,
+ server_state: &ServerState,
+ client_resumed_state: &ClientState,
+ server_resumed_state: &ServerState,
+) {
+ let client_org = ClientState::ClientEnabled.eq(client_state);
+ let server_org = ServerState::ServerEnabled.eq(server_state);
+ let client_resumed = ClientState::ClientEnabled.eq(client_resumed_state);
+ let server_resumed = ServerState::ServerEnabled.eq(server_resumed_state);
+
+ let (mut client, mut server) = connect_wt(client_org, server_org);
+ assert_eq!(client.webtransport_enabled(), client_org && server_org);
+
+ // exchange token
+ let out = server.process(None, now());
+ // We do not have a token so we need to wait for a resumption token timer to trigger.
+ std::mem::drop(client.process(out.dgram(), now() + Duration::from_millis(250)));
+ assert_eq!(client.state(), Http3State::Connected);
+ let token = client
+ .events()
+ .find_map(|e| {
+ if let Http3ClientEvent::ResumptionToken(token) = e {
+ Some(token)
+ } else {
+ None
+ }
+ })
+ .unwrap();
+
+ let mut client = default_http3_client(Http3Parameters::default().webtransport(client_resumed));
+ let mut server = default_http3_server(Http3Parameters::default().webtransport(server_resumed));
+ client
+ .enable_resumption(now(), &token)
+ .expect("Set resumption token.");
+ assert_eq!(client.state(), Http3State::ZeroRtt);
+
+ exchange_packets(&mut client, &mut server);
+
+ assert_eq!(&client.state(), &Http3State::Connected);
+ assert_eq!(
+ client.webtransport_enabled(),
+ client_resumed && server_resumed
+ );
+
+ let mut early_data_accepted = true;
+ // The only case we should not do 0-RTT is when webtransport was enabled
+ // originally and is disabled afterwards.
+ if server_org && !server_resumed {
+ early_data_accepted = false;
+ }
+ assert_eq!(
+ client.tls_info().unwrap().early_data_accepted(),
+ early_data_accepted
+ );
+
+ check_wt_event(&mut client, client_resumed, server_resumed);
+}
+
+#[test]
+fn zero_rtt_wt_settings() {
+ zero_rtt(
+ &ClientState::ClientEnabled,
+ &ServerState::ServerEnabled,
+ &ClientState::ClientEnabled,
+ &ServerState::ServerEnabled,
+ );
+ zero_rtt(
+ &ClientState::ClientEnabled,
+ &ServerState::ServerEnabled,
+ &ClientState::ClientEnabled,
+ &ServerState::ServerDisabled,
+ );
+ zero_rtt(
+ &ClientState::ClientEnabled,
+ &ServerState::ServerEnabled,
+ &ClientState::ClientDisabled,
+ &ServerState::ServerEnabled,
+ );
+ zero_rtt(
+ &ClientState::ClientEnabled,
+ &ServerState::ServerEnabled,
+ &ClientState::ClientDisabled,
+ &ServerState::ServerDisabled,
+ );
+
+ zero_rtt(
+ &ClientState::ClientEnabled,
+ &ServerState::ServerDisabled,
+ &ClientState::ClientEnabled,
+ &ServerState::ServerDisabled,
+ );
+ zero_rtt(
+ &ClientState::ClientEnabled,
+ &ServerState::ServerDisabled,
+ &ClientState::ClientEnabled,
+ &ServerState::ServerEnabled,
+ );
+ zero_rtt(
+ &ClientState::ClientEnabled,
+ &ServerState::ServerDisabled,
+ &ClientState::ClientDisabled,
+ &ServerState::ServerDisabled,
+ );
+ zero_rtt(
+ &ClientState::ClientEnabled,
+ &ServerState::ServerDisabled,
+ &ClientState::ClientDisabled,
+ &ServerState::ServerEnabled,
+ );
+
+ zero_rtt(
+ &ClientState::ClientDisabled,
+ &ServerState::ServerDisabled,
+ &ClientState::ClientDisabled,
+ &ServerState::ServerDisabled,
+ );
+ zero_rtt(
+ &ClientState::ClientDisabled,
+ &ServerState::ServerDisabled,
+ &ClientState::ClientDisabled,
+ &ServerState::ServerEnabled,
+ );
+ zero_rtt(
+ &ClientState::ClientDisabled,
+ &ServerState::ServerDisabled,
+ &ClientState::ClientEnabled,
+ &ServerState::ServerDisabled,
+ );
+ zero_rtt(
+ &ClientState::ClientDisabled,
+ &ServerState::ServerDisabled,
+ &ClientState::ClientEnabled,
+ &ServerState::ServerEnabled,
+ );
+
+ zero_rtt(
+ &ClientState::ClientDisabled,
+ &ServerState::ServerEnabled,
+ &ClientState::ClientDisabled,
+ &ServerState::ServerEnabled,
+ );
+ zero_rtt(
+ &ClientState::ClientDisabled,
+ &ServerState::ServerEnabled,
+ &ClientState::ClientDisabled,
+ &ServerState::ServerDisabled,
+ );
+ zero_rtt(
+ &ClientState::ClientDisabled,
+ &ServerState::ServerEnabled,
+ &ClientState::ClientEnabled,
+ &ServerState::ServerDisabled,
+ );
+ zero_rtt(
+ &ClientState::ClientDisabled,
+ &ServerState::ServerEnabled,
+ &ClientState::ClientEnabled,
+ &ServerState::ServerEnabled,
+ );
+}
+
+fn exchange_packets2(client: &mut Http3Client, server: &mut Connection) {
+ let mut out = None;
+ loop {
+ out = client.process(out, now()).dgram();
+ out = server.process(out, now()).dgram();
+ if out.is_none() {
+ break;
+ }
+ }
+}
+
+#[test]
+fn wrong_setting_value() {
+ const CONTROL_STREAM_TYPE: &[u8] = &[0x0];
+ let mut client = default_http3_client(Http3Parameters::default());
+ let mut server = default_server_h3();
+
+ exchange_packets2(&mut client, &mut server);
+ client.authenticated(AuthenticationStatus::Ok, now());
+ exchange_packets2(&mut client, &mut server);
+
+ let control = server.stream_create(StreamType::UniDi).unwrap();
+ server.stream_send(control, CONTROL_STREAM_TYPE).unwrap();
+ // Encode a settings frame and send it.
+ let mut enc = Encoder::default();
+ let settings = HFrame::Settings {
+ settings: HSettings::new(&[HSetting::new(HSettingType::EnableWebTransport, 2)]),
+ };
+ settings.encode(&mut enc);
+ assert_eq!(
+ server.stream_send(control, enc.as_ref()).unwrap(),
+ enc.as_ref().len()
+ );
+
+ exchange_packets2(&mut client, &mut server);
+ match client.state() {
+ Http3State::Closing(err) | Http3State::Closed(err) => {
+ assert_eq!(
+ err,
+ ConnectionError::Application(Error::HttpSettings.code())
+ );
+ }
+ _ => panic!("Wrong state {:?}", client.state()),
+ };
+}
diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/sessions.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/sessions.rs
new file mode 100644
index 0000000000..65572a1c2a
--- /dev/null
+++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/sessions.rs
@@ -0,0 +1,450 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use crate::features::extended_connect::tests::webtransport::{
+ default_http3_client, default_http3_server, wt_default_parameters, WtTest,
+};
+use crate::{
+ features::extended_connect::SessionCloseReason, frames::WebTransportFrame, Error, Header,
+ Http3ClientEvent, Http3OrWebTransportStream, Http3Server, Http3ServerEvent, Http3State,
+ Priority, WebTransportEvent, WebTransportServerEvent, WebTransportSessionAcceptAction,
+};
+use neqo_common::{event::Provider, Encoder};
+use neqo_transport::StreamType;
+use std::mem;
+use test_fixture::now;
+
+#[test]
+fn wt_session() {
+ let mut wt = WtTest::new();
+ mem::drop(wt.create_wt_session());
+}
+
+#[test]
+fn wt_session_reject() {
+ let mut wt = WtTest::new();
+ let headers = vec![Header::new(":status", "404")];
+ let accept_res = WebTransportSessionAcceptAction::Reject(headers.clone());
+ let (wt_session_id, _wt_session) = wt.negotiate_wt_session(&accept_res);
+
+ wt.check_session_closed_event_client(
+ wt_session_id,
+ &SessionCloseReason::Status(404),
+ &Some(headers),
+ );
+}
+
+#[test]
+fn wt_session_close_client() {
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ wt.cancel_session_client(wt_session.stream_id());
+ wt.check_session_closed_event_server(
+ &mut wt_session,
+ &SessionCloseReason::Error(Error::HttpNoError.code()),
+ );
+}
+
+#[test]
+fn wt_session_close_server() {
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ wt.cancel_session_server(&mut wt_session);
+ wt.check_session_closed_event_client(
+ wt_session.stream_id(),
+ &SessionCloseReason::Error(Error::HttpNoError.code()),
+ &None,
+ );
+}
+
+#[test]
+fn wt_session_close_server_close_send() {
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ wt_session.stream_close_send().unwrap();
+ wt.exchange_packets();
+ wt.check_session_closed_event_client(
+ wt_session.stream_id(),
+ &SessionCloseReason::Clean {
+ error: 0,
+ message: String::new(),
+ },
+ &None,
+ );
+}
+
+#[test]
+fn wt_session_close_server_stop_sending() {
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ wt_session
+ .stream_stop_sending(Error::HttpNoError.code())
+ .unwrap();
+ wt.exchange_packets();
+ wt.check_session_closed_event_client(
+ wt_session.stream_id(),
+ &SessionCloseReason::Error(Error::HttpNoError.code()),
+ &None,
+ );
+}
+
+#[test]
+fn wt_session_close_server_reset() {
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ wt_session
+ .stream_reset_send(Error::HttpNoError.code())
+ .unwrap();
+ wt.exchange_packets();
+ wt.check_session_closed_event_client(
+ wt_session.stream_id(),
+ &SessionCloseReason::Error(Error::HttpNoError.code()),
+ &None,
+ );
+}
+
+#[test]
+fn wt_session_response_with_1xx() {
+ let mut wt = WtTest::new();
+
+ let wt_session_id = wt
+ .client
+ .webtransport_create_session(now(), &("https", "something.com", "/"), &[])
+ .unwrap();
+ wt.exchange_packets();
+
+ let mut wt_server_session = None;
+ while let Some(event) = wt.server.next_event() {
+ if let Http3ServerEvent::WebTransport(WebTransportServerEvent::NewSession {
+ session,
+ headers,
+ }) = event
+ {
+ assert!(
+ headers
+ .iter()
+ .any(|h| h.name() == ":method" && h.value() == "CONNECT")
+ && headers
+ .iter()
+ .any(|h| h.name() == ":protocol" && h.value() == "webtransport")
+ );
+ wt_server_session = Some(session);
+ }
+ }
+
+ let mut wt_server_session = wt_server_session.unwrap();
+
+ // Send interim response.
+ wt_server_session
+ .send_headers(&[Header::new(":status", "111")])
+ .unwrap();
+ wt_server_session
+ .response(&WebTransportSessionAcceptAction::Accept)
+ .unwrap();
+
+ wt.exchange_packets();
+
+ let wt_session_negotiated_event = |e| {
+ matches!(
+ e,
+ Http3ClientEvent::WebTransport(WebTransportEvent::Session{
+ stream_id,
+ status,
+ headers,
+ }) if (
+ stream_id == wt_session_id &&
+ status == 200 &&
+ headers.contains(&Header::new(":status", "200"))
+ )
+ )
+ };
+ assert!(wt.client.events().any(wt_session_negotiated_event));
+
+ assert_eq!(wt_session_id, wt_server_session.stream_id());
+}
+
+#[test]
+fn wt_session_response_with_redirect() {
+ let headers = [Header::new(":status", "302"), Header::new("location", "/")].to_vec();
+ let mut wt = WtTest::new();
+
+ let accept_res = WebTransportSessionAcceptAction::Reject(headers.clone());
+
+ let (wt_session_id, _wt_session) = wt.negotiate_wt_session(&accept_res);
+
+ wt.check_session_closed_event_client(
+ wt_session_id,
+ &SessionCloseReason::Status(302),
+ &Some(headers),
+ );
+}
+
+#[test]
+fn wt_session_respone_200_with_fin() {
+ let mut wt = WtTest::new();
+
+ let wt_session_id = wt
+ .client
+ .webtransport_create_session(now(), &("https", "something.com", "/"), &[])
+ .unwrap();
+ wt.exchange_packets();
+ let mut wt_server_session = None;
+ while let Some(event) = wt.server.next_event() {
+ if let Http3ServerEvent::WebTransport(WebTransportServerEvent::NewSession {
+ session,
+ headers,
+ }) = event
+ {
+ assert!(
+ headers
+ .iter()
+ .any(|h| h.name() == ":method" && h.value() == "CONNECT")
+ && headers
+ .iter()
+ .any(|h| h.name() == ":protocol" && h.value() == "webtransport")
+ );
+ wt_server_session = Some(session);
+ }
+ }
+
+ let mut wt_server_session = wt_server_session.unwrap();
+ wt_server_session
+ .response(&WebTransportSessionAcceptAction::Accept)
+ .unwrap();
+ wt_server_session.stream_close_send().unwrap();
+
+ wt.exchange_packets();
+
+ let wt_session_close_event = |e| {
+ matches!(
+ e,
+ Http3ClientEvent::WebTransport(WebTransportEvent::SessionClosed{
+ stream_id,
+ reason,
+ headers,
+ ..
+ }) if (
+ stream_id == wt_session_id &&
+ reason == SessionCloseReason::Clean{ error: 0, message: String::new()} &&
+ headers.is_none()
+ )
+ )
+ };
+ assert!(wt.client.events().any(wt_session_close_event));
+
+ assert_eq!(wt_session_id, wt_server_session.stream_id());
+}
+
+#[test]
+fn wt_session_close_frame_client() {
+ const ERROR_NUM: u32 = 23;
+ const ERROR_MESSAGE: &str = "Something went wrong";
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ wt.session_close_frame_client(wt_session.stream_id(), ERROR_NUM, ERROR_MESSAGE);
+ wt.exchange_packets();
+
+ wt.check_session_closed_event_server(
+ &mut wt_session,
+ &SessionCloseReason::Clean {
+ error: ERROR_NUM,
+ message: ERROR_MESSAGE.to_string(),
+ },
+ );
+}
+
+#[test]
+fn wt_session_close_frame_server() {
+ const ERROR_NUM: u32 = 23;
+ const ERROR_MESSAGE: &str = "Something went wrong";
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ WtTest::session_close_frame_server(&mut wt_session, ERROR_NUM, ERROR_MESSAGE);
+ wt.exchange_packets();
+
+ wt.check_session_closed_event_client(
+ wt_session.stream_id(),
+ &SessionCloseReason::Clean {
+ error: ERROR_NUM,
+ message: ERROR_MESSAGE.to_string(),
+ },
+ &None,
+ );
+}
+
+#[test]
+fn wt_unknown_session_frame_client() {
+ const UNKNOWN_FRAME_LEN: usize = 832;
+ const BUF: &[u8] = &[0; 10];
+ const ERROR_NUM: u32 = 23;
+ const ERROR_MESSAGE: &str = "Something went wrong";
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ // Send an unknown frame.
+ let mut enc = Encoder::with_capacity(UNKNOWN_FRAME_LEN + 4);
+ enc.encode_varint(1028_u64); // Arbitrary type.
+ enc.encode_varint(UNKNOWN_FRAME_LEN as u64);
+ let mut buf: Vec<_> = enc.into();
+ buf.resize(UNKNOWN_FRAME_LEN + buf.len(), 0);
+ wt.client.send_data(wt_session.stream_id(), &buf).unwrap();
+ wt.exchange_packets();
+
+ // The session is still active
+ let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
+ wt.send_data_server(&mut unidi_server, BUF);
+ wt.receive_data_client(unidi_server.stream_id(), true, BUF, false);
+
+ // Now close the session.
+ wt.session_close_frame_client(wt_session.stream_id(), ERROR_NUM, ERROR_MESSAGE);
+ wt.exchange_packets();
+
+ wt.check_events_after_closing_session_client(
+ &[unidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &[],
+ None,
+ false,
+ &None,
+ );
+ wt.check_events_after_closing_session_server(
+ &[],
+ None,
+ &[unidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Clean {
+ error: ERROR_NUM,
+ message: ERROR_MESSAGE.to_string(),
+ },
+ )),
+ );
+}
+
+#[test]
+fn wt_close_session_frame_broken_client() {
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ // Send a incorrect CloseSession frame.
+ let mut enc = Encoder::default();
+ WebTransportFrame::CloseSession {
+ error: 5,
+ message: "Hello".to_string(),
+ }
+ .encode(&mut enc);
+ let mut buf: Vec<_> = enc.into();
+ // Corrupt the string.
+ buf[9] = 0xff;
+ wt.client.send_data(wt_session.stream_id(), &buf).unwrap();
+ wt.exchange_packets();
+
+ // check that the webtransport session is closed.
+ wt.check_session_closed_event_client(
+ wt_session.stream_id(),
+ &SessionCloseReason::Error(Error::HttpGeneralProtocolStream.code()),
+ &None,
+ );
+ wt.check_session_closed_event_server(
+ &mut wt_session,
+ &SessionCloseReason::Error(Error::HttpGeneralProtocolStream.code()),
+ );
+
+ // The Http3 session is still working.
+ assert_eq!(wt.client.state(), Http3State::Connected);
+ assert_eq!(wt_session.state(), Http3State::Connected);
+}
+
+fn receive_request(server: &mut Http3Server) -> Option<Http3OrWebTransportStream> {
+ while let Some(event) = server.next_event() {
+ if let Http3ServerEvent::Headers { stream, .. } = event {
+ return Some(stream);
+ }
+ }
+ None
+}
+
+#[test]
+// Ignoring this test as it is panicking at wt.create_wt_stream_client
+// Issue # 1386 is created to track this
+#[ignore]
+fn wt_close_session_cannot_be_sent_at_once() {
+ const BUF: &[u8] = &[0; 443];
+ const ERROR_NUM: u32 = 23;
+ const ERROR_MESSAGE: &str = "Something went wrong";
+
+ let client = default_http3_client(wt_default_parameters());
+ let server = default_http3_server(wt_default_parameters());
+ let mut wt = WtTest::new_with(client, server);
+
+ let mut wt_session = wt.create_wt_session();
+
+ // Fill the flow control window using an unrelated http stream.
+ let req_id = wt
+ .client
+ .fetch(
+ now(),
+ "GET",
+ &("https", "something.com", "/"),
+ &[],
+ Priority::default(),
+ )
+ .unwrap();
+ assert_eq!(req_id, 4);
+ wt.exchange_packets();
+ let mut req = receive_request(&mut wt.server).unwrap();
+ req.send_headers(&[
+ Header::new(":status", "200"),
+ Header::new("content-length", BUF.len().to_string()),
+ ])
+ .unwrap();
+ req.send_data(BUF).unwrap();
+
+ // Now close the session.
+ WtTest::session_close_frame_server(&mut wt_session, ERROR_NUM, ERROR_MESSAGE);
+ // server cannot create new streams.
+ assert_eq!(
+ wt_session.create_stream(StreamType::UniDi),
+ Err(Error::InvalidStreamId)
+ );
+
+ let out = wt.server.process(None, now()).dgram();
+ let out = wt.client.process(out, now()).dgram();
+
+ // Client has not received the full CloseSession frame and it can create more streams.
+ let unidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi);
+
+ let out = wt.server.process(out, now()).dgram();
+ let out = wt.client.process(out, now()).dgram();
+ let out = wt.server.process(out, now()).dgram();
+ let out = wt.client.process(out, now()).dgram();
+ let out = wt.server.process(out, now()).dgram();
+ let _out = wt.client.process(out, now()).dgram();
+
+ wt.check_events_after_closing_session_client(
+ &[],
+ None,
+ &[unidi_client],
+ Some(Error::HttpRequestCancelled.code()),
+ false,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Clean {
+ error: ERROR_NUM,
+ message: ERROR_MESSAGE.to_string(),
+ },
+ )),
+ );
+ wt.check_events_after_closing_session_server(&[], None, &[], None, &None);
+}
diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/streams.rs b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/streams.rs
new file mode 100644
index 0000000000..0eaf3ce928
--- /dev/null
+++ b/third_party/rust/neqo-http3/src/features/extended_connect/tests/webtransport/streams.rs
@@ -0,0 +1,1081 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use crate::features::extended_connect::tests::webtransport::WtTest;
+use crate::{features::extended_connect::SessionCloseReason, Error};
+use neqo_transport::StreamType;
+use std::mem;
+
+#[test]
+fn wt_client_stream_uni() {
+ const BUF_CLIENT: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+ let wt_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi);
+ wt.send_data_client(wt_stream, BUF_CLIENT);
+ wt.receive_data_server(wt_stream, true, BUF_CLIENT, false);
+}
+
+#[test]
+fn wt_client_stream_bidi() {
+ const BUF_CLIENT: &[u8] = &[0; 10];
+ const BUF_SERVER: &[u8] = &[1; 20];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+ let wt_client_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi);
+ wt.send_data_client(wt_client_stream, BUF_CLIENT);
+ let mut wt_server_stream = wt.receive_data_server(wt_client_stream, true, BUF_CLIENT, false);
+ wt.send_data_server(&mut wt_server_stream, BUF_SERVER);
+ wt.receive_data_client(wt_client_stream, false, BUF_SERVER, false);
+}
+
+#[test]
+fn wt_server_stream_uni() {
+ const BUF_SERVER: &[u8] = &[2; 30];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+ let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
+ wt.send_data_server(&mut wt_server_stream, BUF_SERVER);
+ wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false);
+}
+
+#[test]
+fn wt_server_stream_bidi() {
+ const BUF_CLIENT: &[u8] = &[0; 10];
+ const BUF_SERVER: &[u8] = &[1; 20];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+ let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi);
+ wt.send_data_server(&mut wt_server_stream, BUF_SERVER);
+ wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false);
+ wt.send_data_client(wt_server_stream.stream_id(), BUF_CLIENT);
+ mem::drop(wt.receive_data_server(wt_server_stream.stream_id(), false, BUF_CLIENT, false));
+}
+
+#[test]
+fn wt_client_stream_uni_close() {
+ const BUF_CLIENT: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+ let wt_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi);
+ wt.send_data_client(wt_stream, BUF_CLIENT);
+ wt.close_stream_sending_client(wt_stream);
+ wt.receive_data_server(wt_stream, true, BUF_CLIENT, true);
+}
+
+#[test]
+fn wt_client_stream_bidi_close() {
+ const BUF_CLIENT: &[u8] = &[0; 10];
+ const BUF_SERVER: &[u8] = &[1; 20];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+ let wt_client_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi);
+
+ wt.send_data_client(wt_client_stream, BUF_CLIENT);
+ wt.close_stream_sending_client(wt_client_stream);
+
+ let mut wt_server_stream = wt.receive_data_server(wt_client_stream, true, BUF_CLIENT, true);
+
+ wt.send_data_server(&mut wt_server_stream, BUF_SERVER);
+ wt.close_stream_sending_server(&mut wt_server_stream);
+ wt.receive_data_client(wt_client_stream, false, BUF_SERVER, true);
+}
+
+#[test]
+fn wt_server_stream_uni_closed() {
+ const BUF_SERVER: &[u8] = &[2; 30];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+ let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
+ wt.send_data_server(&mut wt_server_stream, BUF_SERVER);
+ wt.close_stream_sending_server(&mut wt_server_stream);
+ wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, true);
+}
+
+#[test]
+fn wt_server_stream_bidi_close() {
+ const BUF_CLIENT: &[u8] = &[0; 10];
+ const BUF_SERVER: &[u8] = &[1; 20];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+ let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi);
+ wt.send_data_server(&mut wt_server_stream, BUF_SERVER);
+ wt.close_stream_sending_server(&mut wt_server_stream);
+ wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, true);
+ wt.send_data_client(wt_server_stream.stream_id(), BUF_CLIENT);
+ wt.close_stream_sending_client(wt_server_stream.stream_id());
+ mem::drop(wt.receive_data_server(wt_server_stream.stream_id(), false, BUF_CLIENT, true));
+}
+
+#[test]
+fn wt_client_stream_uni_reset() {
+ const BUF_CLIENT: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+ let wt_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi);
+ wt.send_data_client(wt_stream, BUF_CLIENT);
+ mem::drop(wt.receive_data_server(wt_stream, true, BUF_CLIENT, false));
+ wt.reset_stream_client(wt_stream);
+ wt.receive_reset_server(wt_stream, Error::HttpNoError.code());
+}
+
+#[test]
+fn wt_server_stream_uni_reset() {
+ const BUF_SERVER: &[u8] = &[2; 30];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+ let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
+ wt.send_data_server(&mut wt_server_stream, BUF_SERVER);
+ wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false);
+ wt.reset_stream_server(&mut wt_server_stream);
+ wt.receive_reset_client(wt_server_stream.stream_id());
+}
+
+#[test]
+fn wt_client_stream_bidi_reset() {
+ const BUF_CLIENT: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+ let wt_client_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi);
+
+ wt.send_data_client(wt_client_stream, BUF_CLIENT);
+ let mut wt_server_stream = wt.receive_data_server(wt_client_stream, true, BUF_CLIENT, false);
+
+ wt.reset_stream_client(wt_client_stream);
+ wt.receive_reset_server(wt_server_stream.stream_id(), Error::HttpNoError.code());
+
+ wt.reset_stream_server(&mut wt_server_stream);
+ wt.receive_reset_client(wt_client_stream);
+}
+
+#[test]
+fn wt_server_stream_bidi_reset() {
+ const BUF_SERVER: &[u8] = &[1; 20];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+ let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi);
+ wt.send_data_server(&mut wt_server_stream, BUF_SERVER);
+ wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false);
+
+ wt.reset_stream_client(wt_server_stream.stream_id());
+ wt.receive_reset_server(wt_server_stream.stream_id(), Error::HttpNoError.code());
+
+ wt.reset_stream_server(&mut wt_server_stream);
+ wt.receive_reset_client(wt_server_stream.stream_id());
+}
+
+#[test]
+fn wt_client_stream_uni_stop_sending() {
+ const BUF_CLIENT: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+ let wt_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi);
+ wt.send_data_client(wt_stream, BUF_CLIENT);
+ let mut wt_server_stream = wt.receive_data_server(wt_stream, true, BUF_CLIENT, false);
+ wt.stream_stop_sending_server(&mut wt_server_stream);
+ wt.receive_stop_sending_client(wt_stream);
+}
+
+#[test]
+fn wt_server_stream_uni_stop_sending() {
+ const BUF_SERVER: &[u8] = &[2; 30];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+ let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
+ wt.send_data_server(&mut wt_server_stream, BUF_SERVER);
+ wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false);
+ wt.stream_stop_sending_client(wt_server_stream.stream_id());
+ wt.receive_stop_sending_server(wt_server_stream.stream_id(), Error::HttpNoError.code());
+}
+
+#[test]
+fn wt_client_stream_bidi_stop_sending() {
+ const BUF_CLIENT: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+
+ let wt_client_stream = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi);
+
+ wt.send_data_client(wt_client_stream, BUF_CLIENT);
+
+ let mut wt_server_stream = wt.receive_data_server(wt_client_stream, true, BUF_CLIENT, false);
+
+ wt.stream_stop_sending_client(wt_client_stream);
+
+ wt.receive_stop_sending_server(wt_server_stream.stream_id(), Error::HttpNoError.code());
+ wt.stream_stop_sending_server(&mut wt_server_stream);
+ wt.receive_stop_sending_client(wt_server_stream.stream_id());
+}
+
+#[test]
+fn wt_server_stream_bidi_stop_sending() {
+ const BUF_SERVER: &[u8] = &[1; 20];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+ let mut wt_server_stream = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi);
+
+ wt.send_data_server(&mut wt_server_stream, BUF_SERVER);
+ wt.receive_data_client(wt_server_stream.stream_id(), true, BUF_SERVER, false);
+ wt.stream_stop_sending_client(wt_server_stream.stream_id());
+ wt.receive_stop_sending_server(wt_server_stream.stream_id(), Error::HttpNoError.code());
+ wt.stream_stop_sending_server(&mut wt_server_stream);
+ wt.receive_stop_sending_client(wt_server_stream.stream_id());
+}
+
+// For the following tests the client cancels a session. The streams are in different states:
+// 1) Both sides of a bidirectional client stream are opened.
+// 2) A client unidirectional stream is opened.
+// 3) A client unidirectional stream has been closed and both sides consumed the closing info.
+// 4) A client unidirectional stream has been closed, but only the server has consumed the closing info.
+// 5) A client unidirectional stream has been closed, but only the client has consum the closing info.
+// 6) Both sides of a bidirectional server stream are opened.
+// 7) A server unidirectional stream is opened.
+// 8) A server unidirectional stream has been closed and both sides consumed the closing info.
+// 9) A server unidirectional stream has been closed, but only the server has consumed the closing info.
+// 10) A server unidirectional stream has been closed, but only the client has consumed the closing info.
+// 11) Both sides of a bidirectional stream have been closed and consumed by both sides.
+// 12) Both sides of a bidirectional stream have been closed, but not consumed by both sides.
+// 13) Multiples open streams
+
+#[test]
+fn wt_client_session_close_1() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+
+ let bidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi);
+ wt.send_data_client(bidi_from_client, BUF);
+ std::mem::drop(wt.receive_data_server(bidi_from_client, true, BUF, false));
+
+ wt.cancel_session_client(wt_session.stream_id());
+
+ wt.check_events_after_closing_session_server(
+ &[bidi_from_client],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_from_client],
+ Some(Error::HttpRequestCancelled.code()),
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_client(
+ &[bidi_from_client],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_from_client],
+ Some(Error::HttpRequestCancelled.code()),
+ false,
+ &None,
+ );
+}
+
+#[test]
+fn wt_client_session_close_2() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+
+ let unidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi);
+
+ wt.send_data_client(unidi_from_client, BUF);
+ std::mem::drop(wt.receive_data_server(unidi_from_client, true, BUF, false));
+
+ wt.cancel_session_client(wt_session.stream_id());
+
+ wt.check_events_after_closing_session_server(
+ &[unidi_from_client],
+ Some(Error::HttpRequestCancelled.code()),
+ &[],
+ None,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_client(
+ &[],
+ None,
+ &[unidi_from_client],
+ Some(Error::HttpRequestCancelled.code()),
+ false,
+ &None,
+ );
+}
+
+#[test]
+fn wt_client_session_close_3() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+
+ let unidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi);
+
+ wt.send_data_client(unidi_from_client, BUF);
+ std::mem::drop(wt.receive_data_server(unidi_from_client, true, BUF, false));
+ wt.close_stream_sending_client(unidi_from_client);
+
+ wt.cancel_session_client(wt_session.stream_id());
+
+ wt.check_events_after_closing_session_server(
+ &[],
+ None,
+ &[],
+ None,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None);
+}
+
+#[test]
+fn wt_client_session_close_4() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+
+ let unidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi);
+
+ wt.send_data_client(unidi_from_client, BUF);
+ let mut unidi_from_client_s = wt.receive_data_server(unidi_from_client, true, BUF, false);
+ wt.stream_stop_sending_server(&mut unidi_from_client_s);
+
+ wt.cancel_session_client(wt_session.stream_id());
+
+ wt.check_events_after_closing_session_server(
+ &[],
+ None,
+ &[],
+ None,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_client(
+ &[],
+ None,
+ &[unidi_from_client],
+ Some(Error::HttpNoError.code()),
+ false,
+ &None,
+ );
+}
+
+#[test]
+fn wt_client_session_close_5() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+
+ let unidi_from_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi);
+
+ wt.send_data_client(unidi_from_client, BUF);
+ mem::drop(wt.receive_data_server(unidi_from_client, true, BUF, false));
+ wt.reset_stream_client(unidi_from_client);
+
+ wt.cancel_session_client(wt_session.stream_id());
+
+ wt.check_events_after_closing_session_server(
+ &[unidi_from_client],
+ Some(Error::HttpNoError.code()),
+ &[],
+ None,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None);
+}
+
+#[test]
+fn wt_client_session_close_6() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut bidi_from_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi);
+ wt.send_data_server(&mut bidi_from_server, BUF);
+ wt.receive_data_client(bidi_from_server.stream_id(), true, BUF, false);
+
+ wt.cancel_session_client(wt_session.stream_id());
+
+ wt.check_events_after_closing_session_server(
+ &[bidi_from_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_from_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_client(
+ &[bidi_from_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_from_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ false,
+ &None,
+ );
+}
+
+#[test]
+fn wt_client_session_close_7() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut unidi_from_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
+ wt.send_data_server(&mut unidi_from_server, BUF);
+ wt.receive_data_client(unidi_from_server.stream_id(), true, BUF, false);
+
+ wt.cancel_session_client(wt_session.stream_id());
+
+ wt.check_events_after_closing_session_server(
+ &[],
+ None,
+ &[unidi_from_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_client(
+ &[unidi_from_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &[],
+ None,
+ false,
+ &None,
+ );
+}
+
+#[test]
+fn wt_client_session_close_8() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
+ wt.send_data_server(&mut unidi_server, BUF);
+ wt.close_stream_sending_server(&mut unidi_server);
+ wt.receive_data_client(unidi_server.stream_id(), true, BUF, true);
+
+ wt.cancel_session_client(wt_session.stream_id());
+
+ wt.check_events_after_closing_session_server(
+ &[],
+ None,
+ &[],
+ None,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None);
+}
+
+#[test]
+fn wt_client_session_close_9() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
+ wt.send_data_server(&mut unidi_server, BUF);
+ wt.stream_stop_sending_client(unidi_server.stream_id());
+
+ wt.cancel_session_client(wt_session.stream_id());
+
+ wt.check_events_after_closing_session_server(
+ &[],
+ None,
+ &[unidi_server.stream_id()],
+ Some(Error::HttpNoError.code()),
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None);
+}
+
+#[test]
+fn wt_client_session_close_10() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
+ wt.send_data_server(&mut unidi_server, BUF);
+ wt.close_stream_sending_server(&mut unidi_server);
+
+ wt.cancel_session_client(wt_session.stream_id());
+
+ wt.check_events_after_closing_session_server(
+ &[],
+ None,
+ &[],
+ None,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_client(
+ &[unidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &[],
+ None,
+ false,
+ &None,
+ );
+}
+
+#[test]
+fn wt_client_session_close_11() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi);
+ wt.send_data_server(&mut bidi_server, BUF);
+ wt.close_stream_sending_server(&mut bidi_server);
+ wt.receive_data_client(bidi_server.stream_id(), true, BUF, true);
+ wt.stream_stop_sending_server(&mut bidi_server);
+ wt.receive_stop_sending_client(bidi_server.stream_id());
+
+ wt.cancel_session_client(wt_session.stream_id());
+
+ wt.check_events_after_closing_session_server(
+ &[],
+ None,
+ &[],
+ None,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_client(&[], None, &[], None, false, &None);
+}
+
+#[test]
+fn wt_client_session_close_12() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi);
+ wt.send_data_server(&mut bidi_server, BUF);
+ wt.close_stream_sending_server(&mut bidi_server);
+ wt.stream_stop_sending_server(&mut bidi_server);
+
+ wt.cancel_session_client(wt_session.stream_id());
+
+ wt.check_events_after_closing_session_server(
+ &[],
+ None,
+ &[],
+ None,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_client(
+ &[bidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_server.stream_id()],
+ Some(Error::HttpNoError.code()),
+ false,
+ &None,
+ );
+}
+
+#[test]
+fn wt_client_session_close_13() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let wt_session = wt.create_wt_session();
+
+ let bidi_client_1 = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi);
+ wt.send_data_client(bidi_client_1, BUF);
+ std::mem::drop(wt.receive_data_server(bidi_client_1, true, BUF, false));
+ let bidi_client_2 = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi);
+ wt.send_data_client(bidi_client_2, BUF);
+ std::mem::drop(wt.receive_data_server(bidi_client_2, true, BUF, false));
+
+ wt.cancel_session_client(wt_session.stream_id());
+
+ wt.check_events_after_closing_session_server(
+ &[bidi_client_1, bidi_client_2],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_client_1, bidi_client_2],
+ Some(Error::HttpRequestCancelled.code()),
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_client(
+ &[bidi_client_1, bidi_client_2],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_client_1, bidi_client_2],
+ Some(Error::HttpRequestCancelled.code()),
+ false,
+ &None,
+ );
+}
+
+// For the following tests the server cancels a session. The streams are in different states:
+// 1) Both sides of a bidirectional client stream are opened.
+// 2) A client unidirectional stream is opened.
+// 3) A client unidirectional stream has been closed and consumed by both sides.
+// 4) A client unidirectional stream has been closed, but not consumed by the client.
+// 5) Both sides of a bidirectional server stream are opened.
+// 6) A server unidirectional stream is opened.
+// 7) A server unidirectional stream has been closed and consumed by both sides.
+// 8) A server unidirectional stream has been closed, but not consumed by the client.
+// 9) Both sides of a bidirectional stream have been closed and consumed by both sides.
+// 10) Both sides of a bidirectional stream have been closed, but not consumed by the client.
+// 12) Multiples open streams
+
+#[test]
+fn wt_client_session_server_close_1() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let bidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi);
+ wt.send_data_client(bidi_client, BUF);
+ std::mem::drop(wt.receive_data_server(bidi_client, true, BUF, false));
+
+ wt.cancel_session_server(&mut wt_session);
+
+ wt.check_events_after_closing_session_client(
+ &[bidi_client],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_client],
+ Some(Error::HttpRequestCancelled.code()),
+ false,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_server(
+ &[bidi_client],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_client],
+ Some(Error::HttpRequestCancelled.code()),
+ &None,
+ );
+}
+
+#[test]
+fn wt_client_session_server_close_2() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let unidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi);
+ wt.send_data_client(unidi_client, BUF);
+ std::mem::drop(wt.receive_data_server(unidi_client, true, BUF, false));
+
+ wt.cancel_session_server(&mut wt_session);
+
+ wt.check_events_after_closing_session_client(
+ &[],
+ None,
+ &[unidi_client],
+ Some(Error::HttpRequestCancelled.code()),
+ false,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_server(
+ &[unidi_client],
+ Some(Error::HttpRequestCancelled.code()),
+ &[],
+ None,
+ &None,
+ );
+}
+
+#[test]
+fn wt_client_session_server_close_3() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let unidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi);
+ wt.send_data_client(unidi_client, BUF);
+ let mut unidi_client_s = wt.receive_data_server(unidi_client, true, BUF, false);
+ wt.stream_stop_sending_server(&mut unidi_client_s);
+ wt.receive_stop_sending_client(unidi_client);
+
+ wt.cancel_session_server(&mut wt_session);
+
+ wt.check_events_after_closing_session_client(
+ &[],
+ None,
+ &[],
+ None,
+ false,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_server(&[], None, &[], None, &None);
+}
+
+#[test]
+fn wt_client_session_server_close_4() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let unidi_client = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::UniDi);
+ wt.send_data_client(unidi_client, BUF);
+ let mut unidi_client_s = wt.receive_data_server(unidi_client, true, BUF, false);
+ wt.stream_stop_sending_server(&mut unidi_client_s);
+
+ wt.cancel_session_server(&mut wt_session);
+
+ wt.check_events_after_closing_session_client(
+ &[],
+ None,
+ &[unidi_client],
+ Some(Error::HttpNoError.code()),
+ false,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_server(&[], None, &[], None, &None);
+}
+
+#[test]
+fn wt_client_session_server_close_5() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi);
+ wt.send_data_server(&mut bidi_server, BUF);
+ wt.receive_data_client(bidi_server.stream_id(), true, BUF, false);
+
+ wt.cancel_session_server(&mut wt_session);
+
+ wt.check_events_after_closing_session_client(
+ &[bidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ false,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_server(
+ &[bidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &None,
+ );
+}
+
+#[test]
+fn wt_client_session_server_close_6() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
+ wt.send_data_server(&mut unidi_server, BUF);
+ wt.receive_data_client(unidi_server.stream_id(), true, BUF, false);
+
+ wt.cancel_session_server(&mut wt_session);
+
+ wt.check_events_after_closing_session_client(
+ &[unidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &[],
+ None,
+ false,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+ wt.check_events_after_closing_session_server(
+ &[],
+ None,
+ &[unidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &None,
+ );
+}
+
+#[test]
+fn wt_client_session_server_close_7() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
+ wt.send_data_server(&mut unidi_server, BUF);
+ wt.close_stream_sending_server(&mut unidi_server);
+ wt.receive_data_client(unidi_server.stream_id(), true, BUF, true);
+
+ wt.cancel_session_server(&mut wt_session);
+
+ // Already close stream will not have a reset event.
+ wt.check_events_after_closing_session_client(
+ &[],
+ None,
+ &[],
+ None,
+ false,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_server(&[], None, &[], None, &None);
+}
+
+#[test]
+fn wt_client_session_server_close_8() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
+ wt.send_data_server(&mut unidi_server, BUF);
+ wt.close_stream_sending_server(&mut unidi_server);
+
+ wt.cancel_session_server(&mut wt_session);
+
+ // The stream was only closed on the server side therefore it is cancelled on the client side.
+ wt.check_events_after_closing_session_client(
+ &[unidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &[],
+ None,
+ false,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_server(&[], None, &[], None, &None);
+}
+
+#[test]
+fn wt_client_session_server_close_9() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi);
+ wt.send_data_server(&mut bidi_server, BUF);
+ wt.close_stream_sending_server(&mut bidi_server);
+ wt.receive_data_client(bidi_server.stream_id(), true, BUF, true);
+ wt.stream_stop_sending_server(&mut bidi_server);
+ wt.receive_stop_sending_client(bidi_server.stream_id());
+
+ wt.cancel_session_server(&mut wt_session);
+
+ // Already close stream will not have a reset event.
+ wt.check_events_after_closing_session_client(
+ &[],
+ None,
+ &[],
+ None,
+ false,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_server(&[], None, &[], None, &None);
+}
+
+#[test]
+fn wt_client_session_server_close_10() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut bidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::BiDi);
+ wt.send_data_server(&mut bidi_server, BUF);
+ wt.close_stream_sending_server(&mut bidi_server);
+ wt.stream_stop_sending_server(&mut bidi_server);
+
+ wt.cancel_session_server(&mut wt_session);
+
+ wt.check_events_after_closing_session_client(
+ &[bidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_server.stream_id()],
+ Some(Error::HttpNoError.code()),
+ false,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_server(&[], None, &[], None, &None);
+}
+
+#[test]
+fn wt_client_session_server_close_11() {
+ const BUF: &[u8] = &[0; 10];
+
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let bidi_client_1 = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi);
+ wt.send_data_client(bidi_client_1, BUF);
+ std::mem::drop(wt.receive_data_server(bidi_client_1, true, BUF, false));
+ let bidi_client_2 = wt.create_wt_stream_client(wt_session.stream_id(), StreamType::BiDi);
+ wt.send_data_client(bidi_client_2, BUF);
+ std::mem::drop(wt.receive_data_server(bidi_client_2, true, BUF, false));
+
+ wt.cancel_session_server(&mut wt_session);
+
+ wt.check_events_after_closing_session_client(
+ &[bidi_client_1, bidi_client_2],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_client_1, bidi_client_2],
+ Some(Error::HttpRequestCancelled.code()),
+ false,
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Error(Error::HttpNoError.code()),
+ )),
+ );
+
+ wt.check_events_after_closing_session_server(
+ &[bidi_client_1, bidi_client_2],
+ Some(Error::HttpRequestCancelled.code()),
+ &[bidi_client_1, bidi_client_2],
+ Some(Error::HttpRequestCancelled.code()),
+ &None,
+ );
+}
+
+#[test]
+fn wt_session_close_frame_and_streams_client() {
+ const BUF: &[u8] = &[0; 10];
+ const ERROR_NUM: u32 = 23;
+ const ERROR_MESSAGE: &str = "Something went wrong";
+ let mut wt = WtTest::new();
+ let mut wt_session = wt.create_wt_session();
+
+ let mut unidi_server = WtTest::create_wt_stream_server(&mut wt_session, StreamType::UniDi);
+ wt.send_data_server(&mut unidi_server, BUF);
+ wt.exchange_packets();
+
+ wt.session_close_frame_client(wt_session.stream_id(), ERROR_NUM, ERROR_MESSAGE);
+ wt.check_events_after_closing_session_client(
+ &[unidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &[],
+ None,
+ false,
+ &None,
+ );
+ wt.exchange_packets();
+
+ wt.check_events_after_closing_session_server(
+ &[],
+ None,
+ &[unidi_server.stream_id()],
+ Some(Error::HttpRequestCancelled.code()),
+ &Some((
+ wt_session.stream_id(),
+ SessionCloseReason::Clean {
+ error: ERROR_NUM,
+ message: ERROR_MESSAGE.to_string(),
+ },
+ )),
+ );
+}
diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/webtransport_session.rs b/third_party/rust/neqo-http3/src/features/extended_connect/webtransport_session.rs
new file mode 100644
index 0000000000..4a412dd27e
--- /dev/null
+++ b/third_party/rust/neqo-http3/src/features/extended_connect/webtransport_session.rs
@@ -0,0 +1,539 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+#![allow(clippy::module_name_repetitions)]
+
+use super::{ExtendedConnectEvents, ExtendedConnectType, SessionCloseReason};
+use crate::{
+ frames::{FrameReader, StreamReaderRecvStreamWrapper, WebTransportFrame},
+ recv_message::{RecvMessage, RecvMessageInfo},
+ send_message::SendMessage,
+ CloseType, Error, HFrame, Http3StreamInfo, Http3StreamType, HttpRecvStream,
+ HttpRecvStreamEvents, Priority, PriorityHandler, ReceiveOutput, RecvStream, RecvStreamEvents,
+ Res, SendStream, SendStreamEvents, Stream,
+};
+use neqo_common::{qtrace, Encoder, Header, MessageType, Role};
+use neqo_qpack::{QPackDecoder, QPackEncoder};
+use neqo_transport::{Connection, DatagramTracking, StreamId};
+use std::any::Any;
+use std::cell::RefCell;
+use std::collections::BTreeSet;
+use std::mem;
+use std::rc::Rc;
+
+#[derive(Debug, PartialEq)]
+enum SessionState {
+ Negotiating,
+ Active,
+ FinPending,
+ Done,
+}
+
+impl SessionState {
+ pub fn closing_state(&self) -> bool {
+ matches!(self, Self::FinPending | Self::Done)
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct WebTransportSession {
+ control_stream_recv: Box<dyn RecvStream>,
+ control_stream_send: Box<dyn SendStream>,
+ stream_event_listener: Rc<RefCell<WebTransportSessionListener>>,
+ session_id: StreamId,
+ state: SessionState,
+ frame_reader: FrameReader,
+ events: Box<dyn ExtendedConnectEvents>,
+ send_streams: BTreeSet<StreamId>,
+ recv_streams: BTreeSet<StreamId>,
+ role: Role,
+}
+
+impl ::std::fmt::Display for WebTransportSession {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
+ write!(f, "WebTransportSession session={}", self.session_id,)
+ }
+}
+
+impl WebTransportSession {
+ #[must_use]
+ pub fn new(
+ session_id: StreamId,
+ events: Box<dyn ExtendedConnectEvents>,
+ role: Role,
+ qpack_encoder: Rc<RefCell<QPackEncoder>>,
+ qpack_decoder: Rc<RefCell<QPackDecoder>>,
+ ) -> Self {
+ let stream_event_listener = Rc::new(RefCell::new(WebTransportSessionListener::default()));
+ Self {
+ control_stream_recv: Box::new(RecvMessage::new(
+ &RecvMessageInfo {
+ message_type: MessageType::Response,
+ stream_type: Http3StreamType::ExtendedConnect,
+ stream_id: session_id,
+ header_frame_type_read: false,
+ },
+ qpack_decoder,
+ Box::new(stream_event_listener.clone()),
+ None,
+ PriorityHandler::new(false, Priority::default()),
+ )),
+ control_stream_send: Box::new(SendMessage::new(
+ MessageType::Request,
+ Http3StreamType::ExtendedConnect,
+ session_id,
+ qpack_encoder,
+ Box::new(stream_event_listener.clone()),
+ )),
+ stream_event_listener,
+ session_id,
+ state: SessionState::Negotiating,
+ frame_reader: FrameReader::new(),
+ events,
+ send_streams: BTreeSet::new(),
+ recv_streams: BTreeSet::new(),
+ role,
+ }
+ }
+
+ /// # Panics
+ /// This function is only called with `RecvStream` and `SendStream` that also implement
+ /// the http specific functions and `http_stream()` will never return `None`.
+ #[must_use]
+ pub fn new_with_http_streams(
+ session_id: StreamId,
+ events: Box<dyn ExtendedConnectEvents>,
+ role: Role,
+ mut control_stream_recv: Box<dyn RecvStream>,
+ mut control_stream_send: Box<dyn SendStream>,
+ ) -> Self {
+ let stream_event_listener = Rc::new(RefCell::new(WebTransportSessionListener::default()));
+ control_stream_recv
+ .http_stream()
+ .unwrap()
+ .set_new_listener(Box::new(stream_event_listener.clone()));
+ control_stream_send
+ .http_stream()
+ .unwrap()
+ .set_new_listener(Box::new(stream_event_listener.clone()));
+ Self {
+ control_stream_recv,
+ control_stream_send,
+ stream_event_listener,
+ session_id,
+ state: SessionState::Active,
+ frame_reader: FrameReader::new(),
+ events,
+ send_streams: BTreeSet::new(),
+ recv_streams: BTreeSet::new(),
+ role,
+ }
+ }
+
+ /// # Errors
+ /// The function can only fail if supplied headers are not valid http headers.
+ /// # Panics
+ /// `control_stream_send` implements the http specific functions and `http_stream()`
+ /// will never return `None`.
+ pub fn send_request(&mut self, headers: &[Header], conn: &mut Connection) -> Res<()> {
+ self.control_stream_send
+ .http_stream()
+ .unwrap()
+ .send_headers(headers, conn)
+ }
+
+ fn receive(&mut self, conn: &mut Connection) -> Res<(ReceiveOutput, bool)> {
+ qtrace!([self], "receive control data");
+ let (out, _) = self.control_stream_recv.receive(conn)?;
+ debug_assert!(out == ReceiveOutput::NoOutput);
+ self.maybe_check_headers();
+ self.read_control_stream(conn)?;
+ Ok((ReceiveOutput::NoOutput, self.state == SessionState::Done))
+ }
+
+ fn header_unblocked(&mut self, conn: &mut Connection) -> Res<(ReceiveOutput, bool)> {
+ let (out, _) = self
+ .control_stream_recv
+ .http_stream()
+ .unwrap()
+ .header_unblocked(conn)?;
+ debug_assert!(out == ReceiveOutput::NoOutput);
+ self.maybe_check_headers();
+ self.read_control_stream(conn)?;
+ Ok((ReceiveOutput::NoOutput, self.state == SessionState::Done))
+ }
+
+ fn maybe_update_priority(&mut self, priority: Priority) -> bool {
+ self.control_stream_recv
+ .http_stream()
+ .unwrap()
+ .maybe_update_priority(priority)
+ }
+
+ fn priority_update_frame(&mut self) -> Option<HFrame> {
+ self.control_stream_recv
+ .http_stream()
+ .unwrap()
+ .priority_update_frame()
+ }
+
+ fn priority_update_sent(&mut self) {
+ self.control_stream_recv
+ .http_stream()
+ .unwrap()
+ .priority_update_sent();
+ }
+
+ fn send(&mut self, conn: &mut Connection) -> Res<()> {
+ self.control_stream_send.send(conn)?;
+ if self.control_stream_send.done() {
+ self.state = SessionState::Done;
+ }
+ Ok(())
+ }
+
+ fn has_data_to_send(&self) -> bool {
+ self.control_stream_send.has_data_to_send()
+ }
+
+ fn done(&self) -> bool {
+ self.state == SessionState::Done
+ }
+
+ fn close(&mut self, close_type: CloseType) {
+ if self.state.closing_state() {
+ return;
+ }
+ qtrace!("ExtendedConnect close the session");
+ self.state = SessionState::Done;
+ if !close_type.locally_initiated() {
+ self.events.session_end(
+ ExtendedConnectType::WebTransport,
+ self.session_id,
+ SessionCloseReason::from(close_type),
+ None,
+ );
+ }
+ }
+
+ /// # Panics
+ /// This cannot panic because headers are checked before this function called.
+ pub fn maybe_check_headers(&mut self) {
+ if SessionState::Negotiating != self.state {
+ return;
+ }
+
+ if let Some((headers, interim, fin)) = self.stream_event_listener.borrow_mut().get_headers()
+ {
+ qtrace!(
+ "ExtendedConnect response headers {:?}, fin={}",
+ headers,
+ fin
+ );
+
+ if interim {
+ if fin {
+ self.events.session_end(
+ ExtendedConnectType::WebTransport,
+ self.session_id,
+ SessionCloseReason::Clean {
+ error: 0,
+ message: String::new(),
+ },
+ Some(headers),
+ );
+ self.state = SessionState::Done;
+ }
+ } else {
+ let status = headers
+ .iter()
+ .find_map(|h| {
+ if h.name() == ":status" {
+ h.value().parse::<u16>().ok()
+ } else {
+ None
+ }
+ })
+ .unwrap();
+
+ self.state = if (200..300).contains(&status) {
+ if fin {
+ self.events.session_end(
+ ExtendedConnectType::WebTransport,
+ self.session_id,
+ SessionCloseReason::Clean {
+ error: 0,
+ message: String::new(),
+ },
+ Some(headers),
+ );
+ SessionState::Done
+ } else {
+ self.events.session_start(
+ ExtendedConnectType::WebTransport,
+ self.session_id,
+ status,
+ headers,
+ );
+ SessionState::Active
+ }
+ } else {
+ self.events.session_end(
+ ExtendedConnectType::WebTransport,
+ self.session_id,
+ SessionCloseReason::Status(status),
+ Some(headers),
+ );
+ SessionState::Done
+ };
+ }
+ }
+ }
+
+ pub fn add_stream(&mut self, stream_id: StreamId) {
+ if let SessionState::Active = self.state {
+ if stream_id.is_bidi() {
+ self.send_streams.insert(stream_id);
+ self.recv_streams.insert(stream_id);
+ } else if stream_id.is_self_initiated(self.role) {
+ self.send_streams.insert(stream_id);
+ } else {
+ self.recv_streams.insert(stream_id);
+ }
+
+ if !stream_id.is_self_initiated(self.role) {
+ self.events
+ .extended_connect_new_stream(Http3StreamInfo::new(
+ stream_id,
+ ExtendedConnectType::WebTransport.get_stream_type(self.session_id),
+ ));
+ }
+ }
+ }
+
+ pub fn remove_recv_stream(&mut self, stream_id: StreamId) {
+ self.recv_streams.remove(&stream_id);
+ }
+
+ pub fn remove_send_stream(&mut self, stream_id: StreamId) {
+ self.send_streams.remove(&stream_id);
+ }
+
+ #[must_use]
+ pub fn is_active(&self) -> bool {
+ matches!(self.state, SessionState::Active)
+ }
+
+ pub fn take_sub_streams(&mut self) -> (BTreeSet<StreamId>, BTreeSet<StreamId>) {
+ (
+ mem::take(&mut self.recv_streams),
+ mem::take(&mut self.send_streams),
+ )
+ }
+
+ /// # Errors
+ /// It may return an error if the frame is not correctly decoded.
+ pub fn read_control_stream(&mut self, conn: &mut Connection) -> Res<()> {
+ let (f, fin) = self
+ .frame_reader
+ .receive::<WebTransportFrame>(&mut StreamReaderRecvStreamWrapper::new(
+ conn,
+ &mut self.control_stream_recv,
+ ))
+ .map_err(|_| Error::HttpGeneralProtocolStream)?;
+ qtrace!([self], "Received frame: {:?} fin={}", f, fin);
+ if let Some(WebTransportFrame::CloseSession { error, message }) = f {
+ self.events.session_end(
+ ExtendedConnectType::WebTransport,
+ self.session_id,
+ SessionCloseReason::Clean { error, message },
+ None,
+ );
+ self.state = if fin {
+ SessionState::Done
+ } else {
+ SessionState::FinPending
+ };
+ } else if fin {
+ self.events.session_end(
+ ExtendedConnectType::WebTransport,
+ self.session_id,
+ SessionCloseReason::Clean {
+ error: 0,
+ message: String::new(),
+ },
+ None,
+ );
+ self.state = SessionState::Done;
+ }
+ Ok(())
+ }
+
+ /// # Errors
+ /// Return an error if the stream was closed on the transport layer, but that information is not yet
+ /// consumed on the http/3 layer.
+ pub fn close_session(&mut self, conn: &mut Connection, error: u32, message: &str) -> Res<()> {
+ self.state = SessionState::Done;
+ let close_frame = WebTransportFrame::CloseSession {
+ error,
+ message: message.to_string(),
+ };
+ let mut encoder = Encoder::default();
+ close_frame.encode(&mut encoder);
+ self.control_stream_send
+ .send_data_atomic(conn, encoder.as_ref())?;
+ self.control_stream_send.close(conn)?;
+ self.state = if self.control_stream_send.done() {
+ SessionState::Done
+ } else {
+ SessionState::FinPending
+ };
+ Ok(())
+ }
+
+ fn send_data(&mut self, conn: &mut Connection, buf: &[u8]) -> Res<usize> {
+ self.control_stream_send.send_data(conn, buf)
+ }
+
+ /// # Errors
+ /// Returns an error if the datagram exceeds the remote datagram size limit.
+ pub fn send_datagram(
+ &self,
+ conn: &mut Connection,
+ buf: &[u8],
+ id: impl Into<DatagramTracking>,
+ ) -> Res<()> {
+ qtrace!([self], "send_datagram state={:?}", self.state);
+ if let SessionState::Active = self.state {
+ let mut dgram_data = Encoder::default();
+ dgram_data.encode_varint(self.session_id.as_u64() / 4);
+ dgram_data.encode(buf);
+ conn.send_datagram(dgram_data.as_ref(), id)?;
+ } else {
+ debug_assert!(false);
+ return Err(Error::Unavailable);
+ }
+ Ok(())
+ }
+
+ pub fn datagram(&mut self, datagram: Vec<u8>) {
+ if let SessionState::Active = self.state {
+ self.events.new_datagram(self.session_id, datagram);
+ }
+ }
+}
+
+impl Stream for Rc<RefCell<WebTransportSession>> {
+ fn stream_type(&self) -> Http3StreamType {
+ Http3StreamType::ExtendedConnect
+ }
+}
+
+impl RecvStream for Rc<RefCell<WebTransportSession>> {
+ fn receive(&mut self, conn: &mut Connection) -> Res<(ReceiveOutput, bool)> {
+ self.borrow_mut().receive(conn)
+ }
+
+ fn reset(&mut self, close_type: CloseType) -> Res<()> {
+ self.borrow_mut().close(close_type);
+ Ok(())
+ }
+
+ fn http_stream(&mut self) -> Option<&mut dyn HttpRecvStream> {
+ Some(self)
+ }
+
+ fn webtransport(&self) -> Option<Rc<RefCell<WebTransportSession>>> {
+ Some(self.clone())
+ }
+}
+
+impl HttpRecvStream for Rc<RefCell<WebTransportSession>> {
+ fn header_unblocked(&mut self, conn: &mut Connection) -> Res<(ReceiveOutput, bool)> {
+ self.borrow_mut().header_unblocked(conn)
+ }
+
+ fn maybe_update_priority(&mut self, priority: Priority) -> bool {
+ self.borrow_mut().maybe_update_priority(priority)
+ }
+
+ fn priority_update_frame(&mut self) -> Option<HFrame> {
+ self.borrow_mut().priority_update_frame()
+ }
+
+ fn priority_update_sent(&mut self) {
+ self.borrow_mut().priority_update_sent();
+ }
+
+ fn any(&self) -> &dyn Any {
+ self
+ }
+}
+
+impl SendStream for Rc<RefCell<WebTransportSession>> {
+ fn send(&mut self, conn: &mut Connection) -> Res<()> {
+ self.borrow_mut().send(conn)
+ }
+
+ fn send_data(&mut self, conn: &mut Connection, buf: &[u8]) -> Res<usize> {
+ self.borrow_mut().send_data(conn, buf)
+ }
+
+ fn has_data_to_send(&self) -> bool {
+ self.borrow_mut().has_data_to_send()
+ }
+
+ fn stream_writable(&self) {}
+
+ fn done(&self) -> bool {
+ self.borrow_mut().done()
+ }
+
+ fn close(&mut self, conn: &mut Connection) -> Res<()> {
+ self.borrow_mut().close_session(conn, 0, "")
+ }
+
+ fn close_with_message(&mut self, conn: &mut Connection, error: u32, message: &str) -> Res<()> {
+ self.borrow_mut().close_session(conn, error, message)
+ }
+
+ fn handle_stop_sending(&mut self, close_type: CloseType) {
+ self.borrow_mut().close(close_type);
+ }
+}
+
+#[derive(Debug, Default)]
+struct WebTransportSessionListener {
+ headers: Option<(Vec<Header>, bool, bool)>,
+}
+
+impl WebTransportSessionListener {
+ fn set_headers(&mut self, headers: Vec<Header>, interim: bool, fin: bool) {
+ self.headers = Some((headers, interim, fin));
+ }
+
+ pub fn get_headers(&mut self) -> Option<(Vec<Header>, bool, bool)> {
+ mem::take(&mut self.headers)
+ }
+}
+
+impl RecvStreamEvents for Rc<RefCell<WebTransportSessionListener>> {}
+
+impl HttpRecvStreamEvents for Rc<RefCell<WebTransportSessionListener>> {
+ fn header_ready(
+ &self,
+ _stream_info: Http3StreamInfo,
+ headers: Vec<Header>,
+ interim: bool,
+ fin: bool,
+ ) {
+ if !interim || fin {
+ self.borrow_mut().set_headers(headers, interim, fin);
+ }
+ }
+}
+
+impl SendStreamEvents for Rc<RefCell<WebTransportSessionListener>> {}
diff --git a/third_party/rust/neqo-http3/src/features/extended_connect/webtransport_streams.rs b/third_party/rust/neqo-http3/src/features/extended_connect/webtransport_streams.rs
new file mode 100644
index 0000000000..f08f8047d7
--- /dev/null
+++ b/third_party/rust/neqo-http3/src/features/extended_connect/webtransport_streams.rs
@@ -0,0 +1,202 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use super::WebTransportSession;
+use crate::{
+ CloseType, Http3StreamInfo, Http3StreamType, ReceiveOutput, RecvStream, RecvStreamEvents, Res,
+ SendStream, SendStreamEvents, Stream,
+};
+use neqo_common::Encoder;
+use neqo_transport::{Connection, StreamId};
+use std::cell::RefCell;
+use std::rc::Rc;
+
+pub const WEBTRANSPORT_UNI_STREAM: u64 = 0x54;
+pub const WEBTRANSPORT_STREAM: u64 = 0x41;
+
+#[derive(Debug)]
+pub(crate) struct WebTransportRecvStream {
+ stream_id: StreamId,
+ events: Box<dyn RecvStreamEvents>,
+ session: Rc<RefCell<WebTransportSession>>,
+ session_id: StreamId,
+ fin: bool,
+}
+
+impl WebTransportRecvStream {
+ pub fn new(
+ stream_id: StreamId,
+ session_id: StreamId,
+ events: Box<dyn RecvStreamEvents>,
+ session: Rc<RefCell<WebTransportSession>>,
+ ) -> Self {
+ Self {
+ stream_id,
+ events,
+ session_id,
+ session,
+ fin: false,
+ }
+ }
+
+ fn get_info(&self) -> Http3StreamInfo {
+ Http3StreamInfo::new(self.stream_id, self.stream_type())
+ }
+}
+
+impl Stream for WebTransportRecvStream {
+ fn stream_type(&self) -> Http3StreamType {
+ Http3StreamType::WebTransport(self.session_id)
+ }
+}
+
+impl RecvStream for WebTransportRecvStream {
+ fn receive(&mut self, _conn: &mut Connection) -> Res<(ReceiveOutput, bool)> {
+ self.events.data_readable(self.get_info());
+ Ok((ReceiveOutput::NoOutput, false))
+ }
+
+ fn reset(&mut self, close_type: CloseType) -> Res<()> {
+ if !matches!(close_type, CloseType::ResetApp(_)) {
+ self.events.recv_closed(self.get_info(), close_type);
+ }
+ self.session.borrow_mut().remove_recv_stream(self.stream_id);
+ Ok(())
+ }
+
+ fn read_data(&mut self, conn: &mut Connection, buf: &mut [u8]) -> Res<(usize, bool)> {
+ let (amount, fin) = conn.stream_recv(self.stream_id, buf)?;
+ self.fin = fin;
+ if fin {
+ self.session.borrow_mut().remove_recv_stream(self.stream_id);
+ }
+ Ok((amount, fin))
+ }
+}
+
+#[derive(Debug, PartialEq)]
+enum WebTransportSenderStreamState {
+ SendingInit { buf: Vec<u8>, fin: bool },
+ SendingData,
+ Done,
+}
+
+#[derive(Debug)]
+pub(crate) struct WebTransportSendStream {
+ stream_id: StreamId,
+ state: WebTransportSenderStreamState,
+ events: Box<dyn SendStreamEvents>,
+ session: Rc<RefCell<WebTransportSession>>,
+ session_id: StreamId,
+}
+
+impl WebTransportSendStream {
+ pub fn new(
+ stream_id: StreamId,
+ session_id: StreamId,
+ events: Box<dyn SendStreamEvents>,
+ session: Rc<RefCell<WebTransportSession>>,
+ local: bool,
+ ) -> Self {
+ Self {
+ stream_id,
+ state: if local {
+ let mut d = Encoder::default();
+ if stream_id.is_uni() {
+ d.encode_varint(WEBTRANSPORT_UNI_STREAM);
+ } else {
+ d.encode_varint(WEBTRANSPORT_STREAM);
+ }
+ d.encode_varint(session_id.as_u64());
+ WebTransportSenderStreamState::SendingInit {
+ buf: d.into(),
+ fin: false,
+ }
+ } else {
+ WebTransportSenderStreamState::SendingData
+ },
+ events,
+ session_id,
+ session,
+ }
+ }
+
+ fn set_done(&mut self, close_type: CloseType) {
+ self.state = WebTransportSenderStreamState::Done;
+ self.events.send_closed(self.get_info(), close_type);
+ self.session.borrow_mut().remove_send_stream(self.stream_id);
+ }
+
+ fn get_info(&self) -> Http3StreamInfo {
+ Http3StreamInfo::new(self.stream_id, self.stream_type())
+ }
+}
+
+impl Stream for WebTransportSendStream {
+ fn stream_type(&self) -> Http3StreamType {
+ Http3StreamType::WebTransport(self.session_id)
+ }
+}
+
+impl SendStream for WebTransportSendStream {
+ fn send(&mut self, conn: &mut Connection) -> Res<()> {
+ if let WebTransportSenderStreamState::SendingInit { ref mut buf, fin } = self.state {
+ let sent = conn.stream_send(self.stream_id, &buf[..])?;
+ if sent == buf.len() {
+ if fin {
+ conn.stream_close_send(self.stream_id)?;
+ self.set_done(CloseType::Done);
+ } else {
+ self.state = WebTransportSenderStreamState::SendingData;
+ }
+ } else {
+ let b = buf.split_off(sent);
+ *buf = b;
+ }
+ }
+ Ok(())
+ }
+
+ fn has_data_to_send(&self) -> bool {
+ matches!(
+ self.state,
+ WebTransportSenderStreamState::SendingInit { .. }
+ )
+ }
+
+ fn stream_writable(&self) {
+ self.events.data_writable(self.get_info());
+ }
+
+ fn done(&self) -> bool {
+ self.state == WebTransportSenderStreamState::Done
+ }
+
+ fn send_data(&mut self, conn: &mut Connection, buf: &[u8]) -> Res<usize> {
+ self.send(conn)?;
+ if self.state == WebTransportSenderStreamState::SendingData {
+ let sent = conn.stream_send(self.stream_id, buf)?;
+ Ok(sent)
+ } else {
+ Ok(0)
+ }
+ }
+
+ fn handle_stop_sending(&mut self, close_type: CloseType) {
+ self.set_done(close_type);
+ }
+
+ fn close(&mut self, conn: &mut Connection) -> Res<()> {
+ if let WebTransportSenderStreamState::SendingInit { ref mut fin, .. } = self.state {
+ *fin = true;
+ } else {
+ self.state = WebTransportSenderStreamState::Done;
+ conn.stream_close_send(self.stream_id)?;
+ self.set_done(CloseType::Done);
+ }
+ Ok(())
+ }
+}