diff options
Diffstat (limited to 'third_party/rust/neqo-transport/src/events.rs')
-rw-r--r-- | third_party/rust/neqo-transport/src/events.rs | 320 |
1 files changed, 320 insertions, 0 deletions
diff --git a/third_party/rust/neqo-transport/src/events.rs b/third_party/rust/neqo-transport/src/events.rs new file mode 100644 index 0000000000..65b376eb0b --- /dev/null +++ b/third_party/rust/neqo-transport/src/events.rs @@ -0,0 +1,320 @@ +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +// Collecting a list of events relevant to whoever is using the Connection. + +use std::cell::RefCell; +use std::collections::VecDeque; +use std::rc::Rc; + +use crate::connection::State; +use crate::quic_datagrams::DatagramTracking; +use crate::stream_id::{StreamId, StreamType}; +use crate::{AppError, Stats}; +use neqo_common::event::Provider as EventProvider; +use neqo_crypto::ResumptionToken; + +#[derive(Debug, PartialOrd, Ord, PartialEq, Eq)] +pub enum OutgoingDatagramOutcome { + DroppedTooBig, + DroppedQueueFull, + Lost, + Acked, +} + +#[derive(Debug, PartialOrd, Ord, PartialEq, Eq)] +pub enum ConnectionEvent { + /// Cert authentication needed + AuthenticationNeeded, + /// Encrypted client hello fallback occurred. The certificate for the + /// public name needs to be authenticated. + EchFallbackAuthenticationNeeded { + public_name: String, + }, + /// A new uni (read) or bidi stream has been opened by the peer. + NewStream { + stream_id: StreamId, + }, + /// Space available in the buffer for an application write to succeed. + SendStreamWritable { + stream_id: StreamId, + }, + /// New bytes available for reading. + RecvStreamReadable { + stream_id: StreamId, + }, + /// Peer reset the stream. + RecvStreamReset { + stream_id: StreamId, + app_error: AppError, + }, + /// Peer has sent STOP_SENDING + SendStreamStopSending { + stream_id: StreamId, + app_error: AppError, + }, + /// Peer has acked everything sent on the stream. + SendStreamComplete { + stream_id: StreamId, + }, + /// Peer increased MAX_STREAMS + SendStreamCreatable { + stream_type: StreamType, + }, + /// Connection state change. + StateChange(State), + /// The server rejected 0-RTT. + /// This event invalidates all state in streams that has been created. + /// Any data written to streams needs to be written again. + ZeroRttRejected, + ResumptionToken(ResumptionToken), + Datagram(Vec<u8>), + OutgoingDatagramOutcome { + id: u64, + outcome: OutgoingDatagramOutcome, + }, + IncomingDatagramDropped, +} + +#[derive(Debug, Default, Clone)] +#[allow(clippy::module_name_repetitions)] +pub struct ConnectionEvents { + events: Rc<RefCell<VecDeque<ConnectionEvent>>>, +} + +impl ConnectionEvents { + pub fn authentication_needed(&self) { + self.insert(ConnectionEvent::AuthenticationNeeded); + } + + pub fn ech_fallback_authentication_needed(&self, public_name: String) { + self.insert(ConnectionEvent::EchFallbackAuthenticationNeeded { public_name }); + } + + pub fn new_stream(&self, stream_id: StreamId) { + self.insert(ConnectionEvent::NewStream { stream_id }); + } + + pub fn recv_stream_readable(&self, stream_id: StreamId) { + self.insert(ConnectionEvent::RecvStreamReadable { stream_id }); + } + + pub fn recv_stream_reset(&self, stream_id: StreamId, app_error: AppError) { + // If reset, no longer readable. + self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64())); + + self.insert(ConnectionEvent::RecvStreamReset { + stream_id, + app_error, + }); + } + + pub fn send_stream_writable(&self, stream_id: StreamId) { + self.insert(ConnectionEvent::SendStreamWritable { stream_id }); + } + + pub fn send_stream_stop_sending(&self, stream_id: StreamId, app_error: AppError) { + // If stopped, no longer writable. + self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id)); + + self.insert(ConnectionEvent::SendStreamStopSending { + stream_id, + app_error, + }); + } + + pub fn send_stream_complete(&self, stream_id: StreamId) { + self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamWritable { stream_id: x } if *x == stream_id)); + + self.remove(|evt| matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. } if *x == stream_id.as_u64())); + + self.insert(ConnectionEvent::SendStreamComplete { stream_id }); + } + + pub fn send_stream_creatable(&self, stream_type: StreamType) { + self.insert(ConnectionEvent::SendStreamCreatable { stream_type }); + } + + pub fn connection_state_change(&self, state: State) { + // If closing, existing events no longer relevant. + match state { + State::Closing { .. } | State::Closed(_) => self.events.borrow_mut().clear(), + _ => (), + } + self.insert(ConnectionEvent::StateChange(state)); + } + + pub fn client_resumption_token(&self, token: ResumptionToken) { + self.insert(ConnectionEvent::ResumptionToken(token)); + } + + pub fn client_0rtt_rejected(&self) { + // If 0rtt rejected, must start over and existing events are no longer + // relevant. + self.events.borrow_mut().clear(); + self.insert(ConnectionEvent::ZeroRttRejected); + } + + pub fn recv_stream_complete(&self, stream_id: StreamId) { + // If stopped, no longer readable. + self.remove(|evt| matches!(evt, ConnectionEvent::RecvStreamReadable { stream_id: x } if *x == stream_id.as_u64())); + } + + // The number of datagrams in the events queue is limited to max_queued_datagrams. + // This function ensure this and deletes the oldest datagrams if needed. + fn check_datagram_queued(&self, max_queued_datagrams: usize, stats: &mut Stats) { + let mut q = self.events.borrow_mut(); + let mut remove = None; + if q.iter() + .filter(|evt| matches!(evt, ConnectionEvent::Datagram(_))) + .count() + == max_queued_datagrams + { + if let Some(d) = q + .iter() + .rev() + .enumerate() + .filter(|(_, evt)| matches!(evt, ConnectionEvent::Datagram(_))) + .take(1) + .next() + { + remove = Some(d.0); + } + } + if let Some(r) = remove { + q.remove(r); + q.push_back(ConnectionEvent::IncomingDatagramDropped); + stats.incoming_datagram_dropped += 1; + } + } + + pub fn add_datagram(&self, max_queued_datagrams: usize, data: &[u8], stats: &mut Stats) { + self.check_datagram_queued(max_queued_datagrams, stats); + self.events + .borrow_mut() + .push_back(ConnectionEvent::Datagram(data.to_vec())); + } + + pub fn datagram_outcome( + &self, + dgram_tracker: &DatagramTracking, + outcome: OutgoingDatagramOutcome, + ) { + if let DatagramTracking::Id(id) = dgram_tracker { + self.events + .borrow_mut() + .push_back(ConnectionEvent::OutgoingDatagramOutcome { id: *id, outcome }); + } + } + + fn insert(&self, event: ConnectionEvent) { + let mut q = self.events.borrow_mut(); + + // Special-case two enums that are not strictly PartialEq equal but that + // we wish to avoid inserting duplicates. + let already_present = match &event { + ConnectionEvent::SendStreamStopSending { stream_id, .. } => q.iter().any(|evt| { + matches!(evt, ConnectionEvent::SendStreamStopSending { stream_id: x, .. } + if *x == *stream_id) + }), + ConnectionEvent::RecvStreamReset { stream_id, .. } => q.iter().any(|evt| { + matches!(evt, ConnectionEvent::RecvStreamReset { stream_id: x, .. } + if *x == *stream_id) + }), + _ => q.contains(&event), + }; + if !already_present { + q.push_back(event); + } + } + + fn remove<F>(&self, f: F) + where + F: Fn(&ConnectionEvent) -> bool, + { + self.events.borrow_mut().retain(|evt| !f(evt)) + } +} + +impl EventProvider for ConnectionEvents { + type Event = ConnectionEvent; + + fn has_events(&self) -> bool { + !self.events.borrow().is_empty() + } + + fn next_event(&mut self) -> Option<Self::Event> { + self.events.borrow_mut().pop_front() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ConnectionError, Error}; + + #[test] + fn event_culling() { + let mut evts = ConnectionEvents::default(); + + evts.client_0rtt_rejected(); + evts.client_0rtt_rejected(); + assert_eq!(evts.events().count(), 1); + assert_eq!(evts.events().count(), 0); + + evts.new_stream(4.into()); + evts.new_stream(4.into()); + assert_eq!(evts.events().count(), 1); + + evts.recv_stream_readable(6.into()); + evts.recv_stream_reset(6.into(), 66); + evts.recv_stream_reset(6.into(), 65); + assert_eq!(evts.events().count(), 1); + + evts.send_stream_writable(8.into()); + evts.send_stream_writable(8.into()); + evts.send_stream_stop_sending(8.into(), 55); + evts.send_stream_stop_sending(8.into(), 56); + let events = evts.events().collect::<Vec<_>>(); + assert_eq!(events.len(), 1); + assert_eq!( + events[0], + ConnectionEvent::SendStreamStopSending { + stream_id: StreamId::new(8), + app_error: 55 + } + ); + + evts.send_stream_writable(8.into()); + evts.send_stream_writable(8.into()); + evts.send_stream_stop_sending(8.into(), 55); + evts.send_stream_stop_sending(8.into(), 56); + evts.send_stream_complete(8.into()); + assert_eq!(evts.events().count(), 1); + + evts.send_stream_writable(8.into()); + evts.send_stream_writable(9.into()); + evts.send_stream_stop_sending(10.into(), 55); + evts.send_stream_stop_sending(11.into(), 56); + evts.send_stream_complete(12.into()); + assert_eq!(evts.events().count(), 5); + + evts.send_stream_writable(8.into()); + evts.send_stream_writable(9.into()); + evts.send_stream_stop_sending(10.into(), 55); + evts.send_stream_stop_sending(11.into(), 56); + evts.send_stream_complete(12.into()); + evts.client_0rtt_rejected(); + assert_eq!(evts.events().count(), 1); + + evts.send_stream_writable(9.into()); + evts.send_stream_stop_sending(10.into(), 55); + evts.connection_state_change(State::Closed(ConnectionError::Transport( + Error::StreamStateError, + ))); + assert_eq!(evts.events().count(), 1); + } +} |