diff options
Diffstat (limited to 'third_party/rust/neqo-common/src')
-rw-r--r-- | third_party/rust/neqo-common/src/codec.rs | 6 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/datagram.rs | 6 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/event.rs | 2 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/hrtime.rs | 5 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/lib.rs | 5 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/log.rs | 24 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/qlog.rs | 3 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/timer.rs | 62 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/tos.rs | 37 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/udp.rs | 222 |
10 files changed, 319 insertions, 53 deletions
diff --git a/third_party/rust/neqo-common/src/codec.rs b/third_party/rust/neqo-common/src/codec.rs index 57ff13f39f..7fea2f71ab 100644 --- a/third_party/rust/neqo-common/src/codec.rs +++ b/third_party/rust/neqo-common/src/codec.rs @@ -4,7 +4,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::{convert::TryFrom, fmt::Debug}; +use std::fmt::Debug; use crate::hex_with_len; @@ -112,9 +112,7 @@ impl<'a> Decoder<'a> { /// Decodes a QUIC varint. pub fn decode_varint(&mut self) -> Option<u64> { - let Some(b1) = self.decode_byte() else { - return None; - }; + let b1 = self.decode_byte()?; match b1 >> 6 { 0 => Some(u64::from(b1 & 0x3f)), 1 => Some((u64::from(b1 & 0x3f) << 8) | self.decode_uint(1)?), diff --git a/third_party/rust/neqo-common/src/datagram.rs b/third_party/rust/neqo-common/src/datagram.rs index 1729c8ed8d..04ba1a45a1 100644 --- a/third_party/rust/neqo-common/src/datagram.rs +++ b/third_party/rust/neqo-common/src/datagram.rs @@ -53,6 +53,12 @@ impl Datagram { pub fn ttl(&self) -> Option<u8> { self.ttl } + + #[cfg(feature = "udp")] + #[must_use] + pub(crate) fn into_data(self) -> Vec<u8> { + self.d + } } impl Deref for Datagram { diff --git a/third_party/rust/neqo-common/src/event.rs b/third_party/rust/neqo-common/src/event.rs index 26052b7571..ea8d491822 100644 --- a/third_party/rust/neqo-common/src/event.rs +++ b/third_party/rust/neqo-common/src/event.rs @@ -4,7 +4,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::{iter::Iterator, marker::PhantomData}; +use std::marker::PhantomData; /// An event provider is able to generate a stream of events. pub trait Provider { diff --git a/third_party/rust/neqo-common/src/hrtime.rs b/third_party/rust/neqo-common/src/hrtime.rs index 62d2567d42..e70b5f0ffb 100644 --- a/third_party/rust/neqo-common/src/hrtime.rs +++ b/third_party/rust/neqo-common/src/hrtime.rs @@ -6,7 +6,6 @@ use std::{ cell::RefCell, - convert::TryFrom, rc::{Rc, Weak}, time::Duration, }; @@ -340,9 +339,7 @@ impl Time { /// The handle can also be used to update the resolution. #[must_use] pub fn get(period: Duration) -> Handle { - thread_local! { - static HR_TIME: RefCell<Weak<RefCell<Time>>> = RefCell::default(); - } + thread_local!(static HR_TIME: RefCell<Weak<RefCell<Time>>> = RefCell::default()); HR_TIME.with(|r| { let mut b = r.borrow_mut(); diff --git a/third_party/rust/neqo-common/src/lib.rs b/third_party/rust/neqo-common/src/lib.rs index 853b05705b..fe88097983 100644 --- a/third_party/rust/neqo-common/src/lib.rs +++ b/third_party/rust/neqo-common/src/lib.rs @@ -4,8 +4,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -#![cfg_attr(feature = "deny-warnings", deny(warnings))] -#![warn(clippy::pedantic)] +#![allow(clippy::module_name_repetitions)] // This lint doesn't work here. mod codec; mod datagram; @@ -17,6 +16,8 @@ pub mod log; pub mod qlog; pub mod timer; pub mod tos; +#[cfg(feature = "udp")] +pub mod udp; use std::fmt::Write; diff --git a/third_party/rust/neqo-common/src/log.rs b/third_party/rust/neqo-common/src/log.rs index d9c30b98b1..c5b89be8a6 100644 --- a/third_party/rust/neqo-common/src/log.rs +++ b/third_party/rust/neqo-common/src/log.rs @@ -6,16 +6,19 @@ #![allow(clippy::module_name_repetitions)] -use std::{io::Write, sync::Once, time::Instant}; +use std::{ + io::Write, + sync::{Once, OnceLock}, + time::{Duration, Instant}, +}; use env_logger::Builder; -use lazy_static::lazy_static; #[macro_export] macro_rules! do_log { (target: $target:expr, $lvl:expr, $($arg:tt)+) => ({ let lvl = $lvl; - if lvl <= ::log::max_level() { + if lvl <= ::log::STATIC_MAX_LEVEL && lvl <= ::log::max_level() { ::log::logger().log( &::log::Record::builder() .args(format_args!($($arg)+)) @@ -42,17 +45,22 @@ macro_rules! log_subject { }}; } -static INIT_ONCE: Once = Once::new(); - -lazy_static! { - static ref START_TIME: Instant = Instant::now(); +fn since_start() -> Duration { + static START_TIME: OnceLock<Instant> = OnceLock::new(); + START_TIME.get_or_init(Instant::now).elapsed() } pub fn init() { + static INIT_ONCE: Once = Once::new(); + + if ::log::STATIC_MAX_LEVEL == ::log::LevelFilter::Off { + return; + } + INIT_ONCE.call_once(|| { let mut builder = Builder::from_env("RUST_LOG"); builder.format(|buf, record| { - let elapsed = START_TIME.elapsed(); + let elapsed = since_start(); writeln!( buf, "{}s{:3}ms {} {}", diff --git a/third_party/rust/neqo-common/src/qlog.rs b/third_party/rust/neqo-common/src/qlog.rs index 3da8350990..c67ce62afe 100644 --- a/third_party/rust/neqo-common/src/qlog.rs +++ b/third_party/rust/neqo-common/src/qlog.rs @@ -12,8 +12,7 @@ use std::{ }; use qlog::{ - self, streamer::QlogStreamer, CommonFields, Configuration, TraceSeq, VantagePoint, - VantagePointType, + streamer::QlogStreamer, CommonFields, Configuration, TraceSeq, VantagePoint, VantagePointType, }; use crate::Role; diff --git a/third_party/rust/neqo-common/src/timer.rs b/third_party/rust/neqo-common/src/timer.rs index e8532af442..a413252e08 100644 --- a/third_party/rust/neqo-common/src/timer.rs +++ b/third_party/rust/neqo-common/src/timer.rs @@ -5,7 +5,6 @@ // except according to those terms. use std::{ - convert::TryFrom, mem, time::{Duration, Instant}, }; @@ -247,49 +246,50 @@ impl<T> Timer<T> { #[cfg(test)] mod test { - use lazy_static::lazy_static; + use std::sync::OnceLock; use super::{Duration, Instant, Timer}; - lazy_static! { - static ref NOW: Instant = Instant::now(); + fn now() -> Instant { + static NOW: OnceLock<Instant> = OnceLock::new(); + *NOW.get_or_init(Instant::now) } const GRANULARITY: Duration = Duration::from_millis(10); const CAPACITY: usize = 10; #[test] fn create() { - let t: Timer<()> = Timer::new(*NOW, GRANULARITY, CAPACITY); + let t: Timer<()> = Timer::new(now(), GRANULARITY, CAPACITY); assert_eq!(t.span(), Duration::from_millis(100)); assert_eq!(None, t.next_time()); } #[test] fn immediate_entry() { - let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); - t.add(*NOW, 12); - assert_eq!(*NOW, t.next_time().expect("should have an entry")); - let values: Vec<_> = t.take_until(*NOW).collect(); + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + t.add(now(), 12); + assert_eq!(now(), t.next_time().expect("should have an entry")); + let values: Vec<_> = t.take_until(now()).collect(); assert_eq!(vec![12], values); } #[test] fn same_time() { - let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); let v1 = 12; let v2 = 13; - t.add(*NOW, v1); - t.add(*NOW, v2); - assert_eq!(*NOW, t.next_time().expect("should have an entry")); - let values: Vec<_> = t.take_until(*NOW).collect(); + t.add(now(), v1); + t.add(now(), v2); + assert_eq!(now(), t.next_time().expect("should have an entry")); + let values: Vec<_> = t.take_until(now()).collect(); assert!(values.contains(&v1)); assert!(values.contains(&v2)); } #[test] fn add() { - let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); - let near_future = *NOW + Duration::from_millis(17); + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let near_future = now() + Duration::from_millis(17); let v = 9; t.add(near_future, v); assert_eq!(near_future, t.next_time().expect("should return a value")); @@ -305,8 +305,8 @@ mod test { #[test] fn add_future() { - let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); - let future = *NOW + Duration::from_millis(117); + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let future = now() + Duration::from_millis(117); let v = 9; t.add(future, v); assert_eq!(future, t.next_time().expect("should return a value")); @@ -315,8 +315,8 @@ mod test { #[test] fn add_far_future() { - let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); - let far_future = *NOW + Duration::from_millis(892); + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let far_future = now() + Duration::from_millis(892); let v = 9; t.add(far_future, v); assert_eq!(far_future, t.next_time().expect("should return a value")); @@ -333,12 +333,12 @@ mod test { ]; fn with_times() -> Timer<usize> { - let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); for (i, time) in TIMES.iter().enumerate() { - t.add(*NOW + *time, i); + t.add(now() + *time, i); } assert_eq!( - *NOW + *TIMES.iter().min().unwrap(), + now() + *TIMES.iter().min().unwrap(), t.next_time().expect("should have a time") ); t @@ -348,7 +348,7 @@ mod test { #[allow(clippy::needless_collect)] // false positive fn multiple_values() { let mut t = with_times(); - let values: Vec<_> = t.take_until(*NOW + *TIMES.iter().max().unwrap()).collect(); + let values: Vec<_> = t.take_until(now() + *TIMES.iter().max().unwrap()).collect(); for i in 0..TIMES.len() { assert!(values.contains(&i)); } @@ -358,7 +358,7 @@ mod test { #[allow(clippy::needless_collect)] // false positive fn take_far_future() { let mut t = with_times(); - let values: Vec<_> = t.take_until(*NOW + Duration::from_secs(100)).collect(); + let values: Vec<_> = t.take_until(now() + Duration::from_secs(100)).collect(); for i in 0..TIMES.len() { assert!(values.contains(&i)); } @@ -368,15 +368,15 @@ mod test { fn remove_each() { let mut t = with_times(); for (i, time) in TIMES.iter().enumerate() { - assert_eq!(Some(i), t.remove(*NOW + *time, |&x| x == i)); + assert_eq!(Some(i), t.remove(now() + *time, |&x| x == i)); } assert_eq!(None, t.next_time()); } #[test] fn remove_future() { - let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); - let future = *NOW + Duration::from_millis(117); + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let future = now() + Duration::from_millis(117); let v = 9; t.add(future, v); @@ -385,9 +385,9 @@ mod test { #[test] fn remove_too_far_future() { - let mut t = Timer::new(*NOW, GRANULARITY, CAPACITY); - let future = *NOW + Duration::from_millis(117); - let too_far_future = *NOW + t.span() + Duration::from_millis(117); + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let future = now() + Duration::from_millis(117); + let too_far_future = now() + t.span() + Duration::from_millis(117); let v = 9; t.add(future, v); diff --git a/third_party/rust/neqo-common/src/tos.rs b/third_party/rust/neqo-common/src/tos.rs index aa360d1d53..3610f72750 100644 --- a/third_party/rust/neqo-common/src/tos.rs +++ b/third_party/rust/neqo-common/src/tos.rs @@ -46,6 +46,12 @@ impl From<u8> for IpTosEcn { } } +impl From<IpTos> for IpTosEcn { + fn from(value: IpTos) -> Self { + IpTosEcn::from(value.0 & 0x3) + } +} + /// Diffserv Codepoints, mapped to the upper six bits of the TOS field. /// <https://www.iana.org/assignments/dscp-registry/dscp-registry.xhtml> #[derive(Copy, Clone, PartialEq, Eq, Enum, Default, Debug)] @@ -159,6 +165,12 @@ impl From<u8> for IpTosDscp { } } +impl From<IpTos> for IpTosDscp { + fn from(value: IpTos) -> Self { + IpTosDscp::from(value.0 & 0xfc) + } +} + /// The type-of-service field in an IP packet. #[allow(clippy::module_name_repetitions)] #[derive(Copy, Clone, PartialEq, Eq)] @@ -169,22 +181,37 @@ impl From<IpTosEcn> for IpTos { Self(u8::from(v)) } } + impl From<IpTosDscp> for IpTos { fn from(v: IpTosDscp) -> Self { Self(u8::from(v)) } } + impl From<(IpTosDscp, IpTosEcn)> for IpTos { fn from(v: (IpTosDscp, IpTosEcn)) -> Self { Self(u8::from(v.0) | u8::from(v.1)) } } + +impl From<(IpTosEcn, IpTosDscp)> for IpTos { + fn from(v: (IpTosEcn, IpTosDscp)) -> Self { + Self(u8::from(v.0) | u8::from(v.1)) + } +} + impl From<IpTos> for u8 { fn from(v: IpTos) -> Self { v.0 } } +impl From<u8> for IpTos { + fn from(v: u8) -> Self { + Self(v) + } +} + impl Debug for IpTos { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_tuple("IpTos") @@ -202,7 +229,7 @@ impl Default for IpTos { #[cfg(test)] mod tests { - use super::*; + use crate::{IpTos, IpTosDscp, IpTosEcn}; #[test] fn iptosecn_into_u8() { @@ -287,4 +314,12 @@ mod tests { let iptos_dscp: IpTos = dscp.into(); assert_eq!(u8::from(iptos_dscp), dscp as u8); } + + #[test] + fn u8_to_iptos() { + let tos = 0x8b; + let iptos: IpTos = (IpTosEcn::Ce, IpTosDscp::Af41).into(); + assert_eq!(tos, u8::from(iptos)); + assert_eq!(IpTos::from(tos), iptos); + } } diff --git a/third_party/rust/neqo-common/src/udp.rs b/third_party/rust/neqo-common/src/udp.rs new file mode 100644 index 0000000000..c27b0632ff --- /dev/null +++ b/third_party/rust/neqo-common/src/udp.rs @@ -0,0 +1,222 @@ +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![allow(clippy::missing_errors_doc)] // Functions simply delegate to tokio and quinn-udp. +#![allow(clippy::missing_panics_doc)] // Functions simply delegate to tokio and quinn-udp. + +use std::{ + io::{self, IoSliceMut}, + net::{SocketAddr, ToSocketAddrs}, + slice, +}; + +use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState}; +use tokio::io::Interest; + +use crate::{Datagram, IpTos}; + +/// Socket receive buffer size. +/// +/// Allows reading multiple datagrams in a single [`Socket::recv`] call. +const RECV_BUF_SIZE: usize = u16::MAX as usize; + +pub struct Socket { + socket: tokio::net::UdpSocket, + state: UdpSocketState, + recv_buf: Vec<u8>, +} + +impl Socket { + /// Calls [`std::net::UdpSocket::bind`] and instantiates [`quinn_udp::UdpSocketState`]. + pub fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self, io::Error> { + let socket = std::net::UdpSocket::bind(addr)?; + + Ok(Self { + state: quinn_udp::UdpSocketState::new((&socket).into())?, + socket: tokio::net::UdpSocket::from_std(socket)?, + recv_buf: vec![0; RECV_BUF_SIZE], + }) + } + + /// See [`tokio::net::UdpSocket::local_addr`]. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.socket.local_addr() + } + + /// See [`tokio::net::UdpSocket::writable`]. + pub async fn writable(&self) -> Result<(), io::Error> { + self.socket.writable().await + } + + /// See [`tokio::net::UdpSocket::readable`]. + pub async fn readable(&self) -> Result<(), io::Error> { + self.socket.readable().await + } + + /// Send the UDP datagram on the specified socket. + pub fn send(&self, d: Datagram) -> io::Result<usize> { + let transmit = Transmit { + destination: d.destination(), + ecn: EcnCodepoint::from_bits(Into::<u8>::into(d.tos())), + contents: d.into_data().into(), + segment_size: None, + src_ip: None, + }; + + let n = self.socket.try_io(Interest::WRITABLE, || { + self.state + .send((&self.socket).into(), slice::from_ref(&transmit)) + })?; + + assert_eq!(n, 1, "only passed one slice"); + + Ok(n) + } + + /// Receive a UDP datagram on the specified socket. + pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> { + let mut meta = RecvMeta::default(); + + match self.socket.try_io(Interest::READABLE, || { + self.state.recv( + (&self.socket).into(), + &mut [IoSliceMut::new(&mut self.recv_buf)], + slice::from_mut(&mut meta), + ) + }) { + Ok(n) => { + assert_eq!(n, 1, "only passed one slice"); + } + Err(ref err) + if err.kind() == io::ErrorKind::WouldBlock + || err.kind() == io::ErrorKind::Interrupted => + { + return Ok(vec![]) + } + Err(err) => { + return Err(err); + } + }; + + if meta.len == 0 { + eprintln!("zero length datagram received?"); + return Ok(vec![]); + } + if meta.len == self.recv_buf.len() { + eprintln!( + "Might have received more than {} bytes", + self.recv_buf.len() + ); + } + + Ok(self.recv_buf[0..meta.len] + .chunks(meta.stride.min(self.recv_buf.len())) + .map(|d| { + Datagram::new( + meta.addr, + *local_address, + meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(), + None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749 + d, + ) + }) + .collect()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{IpTosDscp, IpTosEcn}; + + #[tokio::test] + async fn datagram_tos() -> Result<(), io::Error> { + let sender = Socket::bind("127.0.0.1:0")?; + let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + let mut receiver = Socket::bind(receiver_addr)?; + + let datagram = Datagram::new( + sender.local_addr()?, + receiver.local_addr()?, + IpTos::from((IpTosDscp::Le, IpTosEcn::Ect1)), + None, + "Hello, world!".as_bytes().to_vec(), + ); + + sender.writable().await?; + sender.send(datagram.clone())?; + + receiver.readable().await?; + let received_datagram = receiver + .recv(&receiver_addr) + .expect("receive to succeed") + .into_iter() + .next() + .expect("receive to yield datagram"); + + // Assert that the ECN is correct. + assert_eq!( + IpTosEcn::from(datagram.tos()), + IpTosEcn::from(received_datagram.tos()) + ); + + Ok(()) + } + + /// Expect [`Socket::recv`] to handle multiple [`Datagram`]s on GRO read. + #[tokio::test] + #[cfg_attr(not(any(target_os = "linux", target_os = "windows")), ignore)] + async fn many_datagrams_through_gro() -> Result<(), io::Error> { + const SEGMENT_SIZE: usize = 128; + + let sender = Socket::bind("127.0.0.1:0")?; + let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + let mut receiver = Socket::bind(receiver_addr)?; + + // `neqo_common::udp::Socket::send` does not yet + // (https://github.com/mozilla/neqo/issues/1693) support GSO. Use + // `quinn_udp` directly. + let max_gso_segments = sender.state.max_gso_segments(); + let msg = vec![0xAB; SEGMENT_SIZE * max_gso_segments]; + let transmit = Transmit { + destination: receiver.local_addr()?, + ecn: EcnCodepoint::from_bits(Into::<u8>::into(IpTos::from(( + IpTosDscp::Le, + IpTosEcn::Ect1, + )))), + contents: msg.clone().into(), + segment_size: Some(SEGMENT_SIZE), + src_ip: None, + }; + sender.writable().await?; + let n = sender.socket.try_io(Interest::WRITABLE, || { + sender + .state + .send((&sender.socket).into(), slice::from_ref(&transmit)) + })?; + assert_eq!(n, 1, "only passed one slice"); + + // Allow for one GSO sendmmsg to result in multiple GRO recvmmsg. + let mut num_received = 0; + while num_received < max_gso_segments { + receiver.readable().await?; + receiver + .recv(&receiver_addr) + .expect("receive to succeed") + .into_iter() + .for_each(|d| { + assert_eq!( + SEGMENT_SIZE, + d.len(), + "Expect received datagrams to have same length as sent datagrams." + ); + num_received += 1; + }); + } + + Ok(()) + } +} |