diff options
Diffstat (limited to 'third_party/rust/neqo-common/src')
-rw-r--r-- | third_party/rust/neqo-common/src/datagram.rs | 15 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/lib.rs | 2 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/log.rs | 21 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/timer.rs | 46 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/tos.rs | 48 | ||||
-rw-r--r-- | third_party/rust/neqo-common/src/udp.rs | 222 |
6 files changed, 92 insertions, 262 deletions
diff --git a/third_party/rust/neqo-common/src/datagram.rs b/third_party/rust/neqo-common/src/datagram.rs index 04ba1a45a1..cc2cb7d113 100644 --- a/third_party/rust/neqo-common/src/datagram.rs +++ b/third_party/rust/neqo-common/src/datagram.rs @@ -54,10 +54,8 @@ impl Datagram { self.ttl } - #[cfg(feature = "udp")] - #[must_use] - pub(crate) fn into_data(self) -> Vec<u8> { - self.d + pub fn set_tos(&mut self, tos: IpTos) { + self.tos = tos; } } @@ -83,6 +81,12 @@ impl std::fmt::Debug for Datagram { } } +impl From<Datagram> for Vec<u8> { + fn from(datagram: Datagram) -> Self { + datagram.d + } +} + #[cfg(test)] use test_fixture::datagram; @@ -90,8 +94,7 @@ use test_fixture::datagram; fn fmt_datagram() { let d = datagram([0; 1].to_vec()); assert_eq!( - format!("{d:?}"), + &format!("{d:?}"), "Datagram IpTos(Cs0, NotEct) TTL Some(128) [fe80::1]:443->[fe80::1]:443: [1]: 00" - .to_string() ); } diff --git a/third_party/rust/neqo-common/src/lib.rs b/third_party/rust/neqo-common/src/lib.rs index fe88097983..e988c6071d 100644 --- a/third_party/rust/neqo-common/src/lib.rs +++ b/third_party/rust/neqo-common/src/lib.rs @@ -16,8 +16,6 @@ pub mod log; pub mod qlog; pub mod timer; pub mod tos; -#[cfg(feature = "udp")] -pub mod udp; use std::fmt::Write; diff --git a/third_party/rust/neqo-common/src/log.rs b/third_party/rust/neqo-common/src/log.rs index c5b89be8a6..04028a26bd 100644 --- a/third_party/rust/neqo-common/src/log.rs +++ b/third_party/rust/neqo-common/src/log.rs @@ -50,7 +50,7 @@ fn since_start() -> Duration { START_TIME.get_or_init(Instant::now).elapsed() } -pub fn init() { +pub fn init(level_filter: Option<log::LevelFilter>) { static INIT_ONCE: Once = Once::new(); if ::log::STATIC_MAX_LEVEL == ::log::LevelFilter::Off { @@ -59,6 +59,9 @@ pub fn init() { INIT_ONCE.call_once(|| { let mut builder = Builder::from_env("RUST_LOG"); + if let Some(filter) = level_filter { + builder.filter_level(filter); + } builder.format(|buf, record| { let elapsed = since_start(); writeln!( @@ -71,9 +74,9 @@ pub fn init() { ) }); if let Err(e) = builder.try_init() { - do_log!(::log::Level::Info, "Logging initialization error {:?}", e); + do_log!(::log::Level::Warn, "Logging initialization error {:?}", e); } else { - do_log!(::log::Level::Info, "Logging initialized"); + do_log!(::log::Level::Debug, "Logging initialized"); } }); } @@ -81,32 +84,32 @@ pub fn init() { #[macro_export] macro_rules! log_invoke { ($lvl:expr, $ctx:expr, $($arg:tt)*) => ( { - ::neqo_common::log::init(); + ::neqo_common::log::init(None); ::neqo_common::do_log!($lvl, "[{}] {}", $ctx, format!($($arg)*)); } ) } #[macro_export] macro_rules! qerror { ([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Error, $ctx, $($arg)*);); - ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Error, $($arg)*); } ); + ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Error, $($arg)*); } ); } #[macro_export] macro_rules! qwarn { ([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Warn, $ctx, $($arg)*);); - ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Warn, $($arg)*); } ); + ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Warn, $($arg)*); } ); } #[macro_export] macro_rules! qinfo { ([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Info, $ctx, $($arg)*);); - ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Info, $($arg)*); } ); + ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Info, $($arg)*); } ); } #[macro_export] macro_rules! qdebug { ([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Debug, $ctx, $($arg)*);); - ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Debug, $($arg)*); } ); + ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Debug, $($arg)*); } ); } #[macro_export] macro_rules! qtrace { ([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Trace, $ctx, $($arg)*);); - ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Trace, $($arg)*); } ); + ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Trace, $($arg)*); } ); } diff --git a/third_party/rust/neqo-common/src/timer.rs b/third_party/rust/neqo-common/src/timer.rs index a413252e08..3feddb2226 100644 --- a/third_party/rust/neqo-common/src/timer.rs +++ b/third_party/rust/neqo-common/src/timer.rs @@ -5,6 +5,7 @@ // except according to those terms. use std::{ + collections::VecDeque, mem, time::{Duration, Instant}, }; @@ -27,7 +28,7 @@ impl<T> TimerItem<T> { /// points). Time is relative, the wheel has an origin time and it is unable to represent times that /// are more than `granularity * capacity` past that time. pub struct Timer<T> { - items: Vec<Vec<TimerItem<T>>>, + items: Vec<VecDeque<TimerItem<T>>>, now: Instant, granularity: Duration, cursor: usize, @@ -55,9 +56,14 @@ impl<T> Timer<T> { /// Return a reference to the time of the next entry. #[must_use] pub fn next_time(&self) -> Option<Instant> { - for i in 0..self.items.len() { - let idx = self.bucket(i); - if let Some(t) = self.items[idx].first() { + let idx = self.bucket(0); + for i in idx..self.items.len() { + if let Some(t) = self.items[i].front() { + return Some(t.time); + } + } + for i in 0..idx { + if let Some(t) = self.items[i].front() { return Some(t.time); } } @@ -145,6 +151,9 @@ impl<T> Timer<T> { /// Given knowledge of the time an item was added, remove it. /// This requires use of a predicate that identifies matching items. + /// + /// # Panics + /// Impossible, I think. pub fn remove<F>(&mut self, time: Instant, mut selector: F) -> Option<T> where F: FnMut(&T) -> bool, @@ -167,7 +176,7 @@ impl<T> Timer<T> { break; } if selector(&self.items[bucket][i].item) { - return Some(self.items[bucket].remove(i).item); + return Some(self.items[bucket].remove(i).unwrap().item); } } // ... then forwards. @@ -176,7 +185,7 @@ impl<T> Timer<T> { break; } if selector(&self.items[bucket][i].item) { - return Some(self.items[bucket].remove(i).item); + return Some(self.items[bucket].remove(i).unwrap().item); } } None @@ -185,10 +194,25 @@ impl<T> Timer<T> { /// Take the next item, unless there are no items with /// a timeout in the past relative to `until`. pub fn take_next(&mut self, until: Instant) -> Option<T> { - for i in 0..self.items.len() { - let idx = self.bucket(i); - if !self.items[idx].is_empty() && self.items[idx][0].time <= until { - return Some(self.items[idx].remove(0).item); + fn maybe_take<T>(v: &mut VecDeque<TimerItem<T>>, until: Instant) -> Option<T> { + if !v.is_empty() && v[0].time <= until { + Some(v.pop_front().unwrap().item) + } else { + None + } + } + + let idx = self.bucket(0); + for i in idx..self.items.len() { + let res = maybe_take(&mut self.items[i], until); + if res.is_some() { + return res; + } + } + for i in 0..idx { + let res = maybe_take(&mut self.items[i], until); + if res.is_some() { + return res; } } None @@ -201,7 +225,7 @@ impl<T> Timer<T> { if until >= self.now + self.span() { // Drain everything, so a clean sweep. let mut empty_items = Vec::with_capacity(self.items.len()); - empty_items.resize_with(self.items.len(), Vec::default); + empty_items.resize_with(self.items.len(), VecDeque::default); let mut items = mem::replace(&mut self.items, empty_items); self.now = until; self.cursor = 0; diff --git a/third_party/rust/neqo-common/src/tos.rs b/third_party/rust/neqo-common/src/tos.rs index 3610f72750..533c5447e2 100644 --- a/third_party/rust/neqo-common/src/tos.rs +++ b/third_party/rust/neqo-common/src/tos.rs @@ -36,7 +36,7 @@ impl From<IpTosEcn> for u8 { impl From<u8> for IpTosEcn { fn from(v: u8) -> Self { - match v & 0b11 { + match v & 0b0000_0011 { 0b00 => IpTosEcn::NotEct, 0b01 => IpTosEcn::Ect1, 0b10 => IpTosEcn::Ect0, @@ -47,8 +47,8 @@ impl From<u8> for IpTosEcn { } impl From<IpTos> for IpTosEcn { - fn from(value: IpTos) -> Self { - IpTosEcn::from(value.0 & 0x3) + fn from(v: IpTos) -> Self { + IpTosEcn::from(u8::from(v)) } } @@ -166,14 +166,13 @@ impl From<u8> for IpTosDscp { } impl From<IpTos> for IpTosDscp { - fn from(value: IpTos) -> Self { - IpTosDscp::from(value.0 & 0xfc) + fn from(v: IpTos) -> Self { + IpTosDscp::from(u8::from(v)) } } /// The type-of-service field in an IP packet. -#[allow(clippy::module_name_repetitions)] -#[derive(Copy, Clone, PartialEq, Eq)] +#[derive(Copy, Clone, PartialEq, Eq, Default)] pub struct IpTos(u8); impl From<IpTosEcn> for IpTos { @@ -215,15 +214,19 @@ impl From<u8> for IpTos { impl Debug for IpTos { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_tuple("IpTos") - .field(&IpTosDscp::from(self.0 & 0xfc)) - .field(&IpTosEcn::from(self.0 & 0x3)) + .field(&IpTosDscp::from(*self)) + .field(&IpTosEcn::from(*self)) .finish() } } -impl Default for IpTos { - fn default() -> Self { - (IpTosDscp::default(), IpTosEcn::default()).into() +impl IpTos { + pub fn set_ecn(&mut self, ecn: IpTosEcn) { + self.0 = u8::from(IpTosDscp::from(*self)) | u8::from(ecn); + } + + pub fn set_dscp(&mut self, dscp: IpTosDscp) { + self.0 = u8::from(IpTosEcn::from(*self)) | u8::from(dscp); } } @@ -322,4 +325,25 @@ mod tests { assert_eq!(tos, u8::from(iptos)); assert_eq!(IpTos::from(tos), iptos); } + + #[test] + fn iptos_to_iptosdscp() { + let tos = IpTos::from((IpTosDscp::Af41, IpTosEcn::NotEct)); + let dscp = IpTosDscp::from(tos); + assert_eq!(dscp, IpTosDscp::Af41); + } + + #[test] + fn tos_modify_ecn() { + let mut iptos: IpTos = (IpTosDscp::Af41, IpTosEcn::NotEct).into(); + iptos.set_ecn(IpTosEcn::Ce); + assert_eq!(u8::from(iptos), 0b1000_1011); + } + + #[test] + fn tos_modify_dscp() { + let mut iptos: IpTos = (IpTosDscp::Af41, IpTosEcn::Ect1).into(); + iptos.set_dscp(IpTosDscp::Le); + assert_eq!(u8::from(iptos), 0b0000_0101); + } } diff --git a/third_party/rust/neqo-common/src/udp.rs b/third_party/rust/neqo-common/src/udp.rs deleted file mode 100644 index c27b0632ff..0000000000 --- a/third_party/rust/neqo-common/src/udp.rs +++ /dev/null @@ -1,222 +0,0 @@ -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -#![allow(clippy::missing_errors_doc)] // Functions simply delegate to tokio and quinn-udp. -#![allow(clippy::missing_panics_doc)] // Functions simply delegate to tokio and quinn-udp. - -use std::{ - io::{self, IoSliceMut}, - net::{SocketAddr, ToSocketAddrs}, - slice, -}; - -use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState}; -use tokio::io::Interest; - -use crate::{Datagram, IpTos}; - -/// Socket receive buffer size. -/// -/// Allows reading multiple datagrams in a single [`Socket::recv`] call. -const RECV_BUF_SIZE: usize = u16::MAX as usize; - -pub struct Socket { - socket: tokio::net::UdpSocket, - state: UdpSocketState, - recv_buf: Vec<u8>, -} - -impl Socket { - /// Calls [`std::net::UdpSocket::bind`] and instantiates [`quinn_udp::UdpSocketState`]. - pub fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self, io::Error> { - let socket = std::net::UdpSocket::bind(addr)?; - - Ok(Self { - state: quinn_udp::UdpSocketState::new((&socket).into())?, - socket: tokio::net::UdpSocket::from_std(socket)?, - recv_buf: vec![0; RECV_BUF_SIZE], - }) - } - - /// See [`tokio::net::UdpSocket::local_addr`]. - pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.socket.local_addr() - } - - /// See [`tokio::net::UdpSocket::writable`]. - pub async fn writable(&self) -> Result<(), io::Error> { - self.socket.writable().await - } - - /// See [`tokio::net::UdpSocket::readable`]. - pub async fn readable(&self) -> Result<(), io::Error> { - self.socket.readable().await - } - - /// Send the UDP datagram on the specified socket. - pub fn send(&self, d: Datagram) -> io::Result<usize> { - let transmit = Transmit { - destination: d.destination(), - ecn: EcnCodepoint::from_bits(Into::<u8>::into(d.tos())), - contents: d.into_data().into(), - segment_size: None, - src_ip: None, - }; - - let n = self.socket.try_io(Interest::WRITABLE, || { - self.state - .send((&self.socket).into(), slice::from_ref(&transmit)) - })?; - - assert_eq!(n, 1, "only passed one slice"); - - Ok(n) - } - - /// Receive a UDP datagram on the specified socket. - pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> { - let mut meta = RecvMeta::default(); - - match self.socket.try_io(Interest::READABLE, || { - self.state.recv( - (&self.socket).into(), - &mut [IoSliceMut::new(&mut self.recv_buf)], - slice::from_mut(&mut meta), - ) - }) { - Ok(n) => { - assert_eq!(n, 1, "only passed one slice"); - } - Err(ref err) - if err.kind() == io::ErrorKind::WouldBlock - || err.kind() == io::ErrorKind::Interrupted => - { - return Ok(vec![]) - } - Err(err) => { - return Err(err); - } - }; - - if meta.len == 0 { - eprintln!("zero length datagram received?"); - return Ok(vec![]); - } - if meta.len == self.recv_buf.len() { - eprintln!( - "Might have received more than {} bytes", - self.recv_buf.len() - ); - } - - Ok(self.recv_buf[0..meta.len] - .chunks(meta.stride.min(self.recv_buf.len())) - .map(|d| { - Datagram::new( - meta.addr, - *local_address, - meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(), - None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749 - d, - ) - }) - .collect()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{IpTosDscp, IpTosEcn}; - - #[tokio::test] - async fn datagram_tos() -> Result<(), io::Error> { - let sender = Socket::bind("127.0.0.1:0")?; - let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); - let mut receiver = Socket::bind(receiver_addr)?; - - let datagram = Datagram::new( - sender.local_addr()?, - receiver.local_addr()?, - IpTos::from((IpTosDscp::Le, IpTosEcn::Ect1)), - None, - "Hello, world!".as_bytes().to_vec(), - ); - - sender.writable().await?; - sender.send(datagram.clone())?; - - receiver.readable().await?; - let received_datagram = receiver - .recv(&receiver_addr) - .expect("receive to succeed") - .into_iter() - .next() - .expect("receive to yield datagram"); - - // Assert that the ECN is correct. - assert_eq!( - IpTosEcn::from(datagram.tos()), - IpTosEcn::from(received_datagram.tos()) - ); - - Ok(()) - } - - /// Expect [`Socket::recv`] to handle multiple [`Datagram`]s on GRO read. - #[tokio::test] - #[cfg_attr(not(any(target_os = "linux", target_os = "windows")), ignore)] - async fn many_datagrams_through_gro() -> Result<(), io::Error> { - const SEGMENT_SIZE: usize = 128; - - let sender = Socket::bind("127.0.0.1:0")?; - let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); - let mut receiver = Socket::bind(receiver_addr)?; - - // `neqo_common::udp::Socket::send` does not yet - // (https://github.com/mozilla/neqo/issues/1693) support GSO. Use - // `quinn_udp` directly. - let max_gso_segments = sender.state.max_gso_segments(); - let msg = vec![0xAB; SEGMENT_SIZE * max_gso_segments]; - let transmit = Transmit { - destination: receiver.local_addr()?, - ecn: EcnCodepoint::from_bits(Into::<u8>::into(IpTos::from(( - IpTosDscp::Le, - IpTosEcn::Ect1, - )))), - contents: msg.clone().into(), - segment_size: Some(SEGMENT_SIZE), - src_ip: None, - }; - sender.writable().await?; - let n = sender.socket.try_io(Interest::WRITABLE, || { - sender - .state - .send((&sender.socket).into(), slice::from_ref(&transmit)) - })?; - assert_eq!(n, 1, "only passed one slice"); - - // Allow for one GSO sendmmsg to result in multiple GRO recvmmsg. - let mut num_received = 0; - while num_received < max_gso_segments { - receiver.readable().await?; - receiver - .recv(&receiver_addr) - .expect("receive to succeed") - .into_iter() - .for_each(|d| { - assert_eq!( - SEGMENT_SIZE, - d.len(), - "Expect received datagrams to have same length as sent datagrams." - ); - num_received += 1; - }); - } - - Ok(()) - } -} |