summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-common/src
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/neqo-common/src')
-rw-r--r--third_party/rust/neqo-common/src/datagram.rs15
-rw-r--r--third_party/rust/neqo-common/src/lib.rs2
-rw-r--r--third_party/rust/neqo-common/src/log.rs21
-rw-r--r--third_party/rust/neqo-common/src/timer.rs46
-rw-r--r--third_party/rust/neqo-common/src/tos.rs48
-rw-r--r--third_party/rust/neqo-common/src/udp.rs222
6 files changed, 92 insertions, 262 deletions
diff --git a/third_party/rust/neqo-common/src/datagram.rs b/third_party/rust/neqo-common/src/datagram.rs
index 04ba1a45a1..cc2cb7d113 100644
--- a/third_party/rust/neqo-common/src/datagram.rs
+++ b/third_party/rust/neqo-common/src/datagram.rs
@@ -54,10 +54,8 @@ impl Datagram {
self.ttl
}
- #[cfg(feature = "udp")]
- #[must_use]
- pub(crate) fn into_data(self) -> Vec<u8> {
- self.d
+ pub fn set_tos(&mut self, tos: IpTos) {
+ self.tos = tos;
}
}
@@ -83,6 +81,12 @@ impl std::fmt::Debug for Datagram {
}
}
+impl From<Datagram> for Vec<u8> {
+ fn from(datagram: Datagram) -> Self {
+ datagram.d
+ }
+}
+
#[cfg(test)]
use test_fixture::datagram;
@@ -90,8 +94,7 @@ use test_fixture::datagram;
fn fmt_datagram() {
let d = datagram([0; 1].to_vec());
assert_eq!(
- format!("{d:?}"),
+ &format!("{d:?}"),
"Datagram IpTos(Cs0, NotEct) TTL Some(128) [fe80::1]:443->[fe80::1]:443: [1]: 00"
- .to_string()
);
}
diff --git a/third_party/rust/neqo-common/src/lib.rs b/third_party/rust/neqo-common/src/lib.rs
index fe88097983..e988c6071d 100644
--- a/third_party/rust/neqo-common/src/lib.rs
+++ b/third_party/rust/neqo-common/src/lib.rs
@@ -16,8 +16,6 @@ pub mod log;
pub mod qlog;
pub mod timer;
pub mod tos;
-#[cfg(feature = "udp")]
-pub mod udp;
use std::fmt::Write;
diff --git a/third_party/rust/neqo-common/src/log.rs b/third_party/rust/neqo-common/src/log.rs
index c5b89be8a6..04028a26bd 100644
--- a/third_party/rust/neqo-common/src/log.rs
+++ b/third_party/rust/neqo-common/src/log.rs
@@ -50,7 +50,7 @@ fn since_start() -> Duration {
START_TIME.get_or_init(Instant::now).elapsed()
}
-pub fn init() {
+pub fn init(level_filter: Option<log::LevelFilter>) {
static INIT_ONCE: Once = Once::new();
if ::log::STATIC_MAX_LEVEL == ::log::LevelFilter::Off {
@@ -59,6 +59,9 @@ pub fn init() {
INIT_ONCE.call_once(|| {
let mut builder = Builder::from_env("RUST_LOG");
+ if let Some(filter) = level_filter {
+ builder.filter_level(filter);
+ }
builder.format(|buf, record| {
let elapsed = since_start();
writeln!(
@@ -71,9 +74,9 @@ pub fn init() {
)
});
if let Err(e) = builder.try_init() {
- do_log!(::log::Level::Info, "Logging initialization error {:?}", e);
+ do_log!(::log::Level::Warn, "Logging initialization error {:?}", e);
} else {
- do_log!(::log::Level::Info, "Logging initialized");
+ do_log!(::log::Level::Debug, "Logging initialized");
}
});
}
@@ -81,32 +84,32 @@ pub fn init() {
#[macro_export]
macro_rules! log_invoke {
($lvl:expr, $ctx:expr, $($arg:tt)*) => ( {
- ::neqo_common::log::init();
+ ::neqo_common::log::init(None);
::neqo_common::do_log!($lvl, "[{}] {}", $ctx, format!($($arg)*));
} )
}
#[macro_export]
macro_rules! qerror {
([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Error, $ctx, $($arg)*););
- ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Error, $($arg)*); } );
+ ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Error, $($arg)*); } );
}
#[macro_export]
macro_rules! qwarn {
([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Warn, $ctx, $($arg)*););
- ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Warn, $($arg)*); } );
+ ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Warn, $($arg)*); } );
}
#[macro_export]
macro_rules! qinfo {
([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Info, $ctx, $($arg)*););
- ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Info, $($arg)*); } );
+ ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Info, $($arg)*); } );
}
#[macro_export]
macro_rules! qdebug {
([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Debug, $ctx, $($arg)*););
- ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Debug, $($arg)*); } );
+ ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Debug, $($arg)*); } );
}
#[macro_export]
macro_rules! qtrace {
([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Trace, $ctx, $($arg)*););
- ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Trace, $($arg)*); } );
+ ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Trace, $($arg)*); } );
}
diff --git a/third_party/rust/neqo-common/src/timer.rs b/third_party/rust/neqo-common/src/timer.rs
index a413252e08..3feddb2226 100644
--- a/third_party/rust/neqo-common/src/timer.rs
+++ b/third_party/rust/neqo-common/src/timer.rs
@@ -5,6 +5,7 @@
// except according to those terms.
use std::{
+ collections::VecDeque,
mem,
time::{Duration, Instant},
};
@@ -27,7 +28,7 @@ impl<T> TimerItem<T> {
/// points). Time is relative, the wheel has an origin time and it is unable to represent times that
/// are more than `granularity * capacity` past that time.
pub struct Timer<T> {
- items: Vec<Vec<TimerItem<T>>>,
+ items: Vec<VecDeque<TimerItem<T>>>,
now: Instant,
granularity: Duration,
cursor: usize,
@@ -55,9 +56,14 @@ impl<T> Timer<T> {
/// Return a reference to the time of the next entry.
#[must_use]
pub fn next_time(&self) -> Option<Instant> {
- for i in 0..self.items.len() {
- let idx = self.bucket(i);
- if let Some(t) = self.items[idx].first() {
+ let idx = self.bucket(0);
+ for i in idx..self.items.len() {
+ if let Some(t) = self.items[i].front() {
+ return Some(t.time);
+ }
+ }
+ for i in 0..idx {
+ if let Some(t) = self.items[i].front() {
return Some(t.time);
}
}
@@ -145,6 +151,9 @@ impl<T> Timer<T> {
/// Given knowledge of the time an item was added, remove it.
/// This requires use of a predicate that identifies matching items.
+ ///
+ /// # Panics
+ /// Impossible, I think.
pub fn remove<F>(&mut self, time: Instant, mut selector: F) -> Option<T>
where
F: FnMut(&T) -> bool,
@@ -167,7 +176,7 @@ impl<T> Timer<T> {
break;
}
if selector(&self.items[bucket][i].item) {
- return Some(self.items[bucket].remove(i).item);
+ return Some(self.items[bucket].remove(i).unwrap().item);
}
}
// ... then forwards.
@@ -176,7 +185,7 @@ impl<T> Timer<T> {
break;
}
if selector(&self.items[bucket][i].item) {
- return Some(self.items[bucket].remove(i).item);
+ return Some(self.items[bucket].remove(i).unwrap().item);
}
}
None
@@ -185,10 +194,25 @@ impl<T> Timer<T> {
/// Take the next item, unless there are no items with
/// a timeout in the past relative to `until`.
pub fn take_next(&mut self, until: Instant) -> Option<T> {
- for i in 0..self.items.len() {
- let idx = self.bucket(i);
- if !self.items[idx].is_empty() && self.items[idx][0].time <= until {
- return Some(self.items[idx].remove(0).item);
+ fn maybe_take<T>(v: &mut VecDeque<TimerItem<T>>, until: Instant) -> Option<T> {
+ if !v.is_empty() && v[0].time <= until {
+ Some(v.pop_front().unwrap().item)
+ } else {
+ None
+ }
+ }
+
+ let idx = self.bucket(0);
+ for i in idx..self.items.len() {
+ let res = maybe_take(&mut self.items[i], until);
+ if res.is_some() {
+ return res;
+ }
+ }
+ for i in 0..idx {
+ let res = maybe_take(&mut self.items[i], until);
+ if res.is_some() {
+ return res;
}
}
None
@@ -201,7 +225,7 @@ impl<T> Timer<T> {
if until >= self.now + self.span() {
// Drain everything, so a clean sweep.
let mut empty_items = Vec::with_capacity(self.items.len());
- empty_items.resize_with(self.items.len(), Vec::default);
+ empty_items.resize_with(self.items.len(), VecDeque::default);
let mut items = mem::replace(&mut self.items, empty_items);
self.now = until;
self.cursor = 0;
diff --git a/third_party/rust/neqo-common/src/tos.rs b/third_party/rust/neqo-common/src/tos.rs
index 3610f72750..533c5447e2 100644
--- a/third_party/rust/neqo-common/src/tos.rs
+++ b/third_party/rust/neqo-common/src/tos.rs
@@ -36,7 +36,7 @@ impl From<IpTosEcn> for u8 {
impl From<u8> for IpTosEcn {
fn from(v: u8) -> Self {
- match v & 0b11 {
+ match v & 0b0000_0011 {
0b00 => IpTosEcn::NotEct,
0b01 => IpTosEcn::Ect1,
0b10 => IpTosEcn::Ect0,
@@ -47,8 +47,8 @@ impl From<u8> for IpTosEcn {
}
impl From<IpTos> for IpTosEcn {
- fn from(value: IpTos) -> Self {
- IpTosEcn::from(value.0 & 0x3)
+ fn from(v: IpTos) -> Self {
+ IpTosEcn::from(u8::from(v))
}
}
@@ -166,14 +166,13 @@ impl From<u8> for IpTosDscp {
}
impl From<IpTos> for IpTosDscp {
- fn from(value: IpTos) -> Self {
- IpTosDscp::from(value.0 & 0xfc)
+ fn from(v: IpTos) -> Self {
+ IpTosDscp::from(u8::from(v))
}
}
/// The type-of-service field in an IP packet.
-#[allow(clippy::module_name_repetitions)]
-#[derive(Copy, Clone, PartialEq, Eq)]
+#[derive(Copy, Clone, PartialEq, Eq, Default)]
pub struct IpTos(u8);
impl From<IpTosEcn> for IpTos {
@@ -215,15 +214,19 @@ impl From<u8> for IpTos {
impl Debug for IpTos {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("IpTos")
- .field(&IpTosDscp::from(self.0 & 0xfc))
- .field(&IpTosEcn::from(self.0 & 0x3))
+ .field(&IpTosDscp::from(*self))
+ .field(&IpTosEcn::from(*self))
.finish()
}
}
-impl Default for IpTos {
- fn default() -> Self {
- (IpTosDscp::default(), IpTosEcn::default()).into()
+impl IpTos {
+ pub fn set_ecn(&mut self, ecn: IpTosEcn) {
+ self.0 = u8::from(IpTosDscp::from(*self)) | u8::from(ecn);
+ }
+
+ pub fn set_dscp(&mut self, dscp: IpTosDscp) {
+ self.0 = u8::from(IpTosEcn::from(*self)) | u8::from(dscp);
}
}
@@ -322,4 +325,25 @@ mod tests {
assert_eq!(tos, u8::from(iptos));
assert_eq!(IpTos::from(tos), iptos);
}
+
+ #[test]
+ fn iptos_to_iptosdscp() {
+ let tos = IpTos::from((IpTosDscp::Af41, IpTosEcn::NotEct));
+ let dscp = IpTosDscp::from(tos);
+ assert_eq!(dscp, IpTosDscp::Af41);
+ }
+
+ #[test]
+ fn tos_modify_ecn() {
+ let mut iptos: IpTos = (IpTosDscp::Af41, IpTosEcn::NotEct).into();
+ iptos.set_ecn(IpTosEcn::Ce);
+ assert_eq!(u8::from(iptos), 0b1000_1011);
+ }
+
+ #[test]
+ fn tos_modify_dscp() {
+ let mut iptos: IpTos = (IpTosDscp::Af41, IpTosEcn::Ect1).into();
+ iptos.set_dscp(IpTosDscp::Le);
+ assert_eq!(u8::from(iptos), 0b0000_0101);
+ }
}
diff --git a/third_party/rust/neqo-common/src/udp.rs b/third_party/rust/neqo-common/src/udp.rs
deleted file mode 100644
index c27b0632ff..0000000000
--- a/third_party/rust/neqo-common/src/udp.rs
+++ /dev/null
@@ -1,222 +0,0 @@
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-#![allow(clippy::missing_errors_doc)] // Functions simply delegate to tokio and quinn-udp.
-#![allow(clippy::missing_panics_doc)] // Functions simply delegate to tokio and quinn-udp.
-
-use std::{
- io::{self, IoSliceMut},
- net::{SocketAddr, ToSocketAddrs},
- slice,
-};
-
-use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState};
-use tokio::io::Interest;
-
-use crate::{Datagram, IpTos};
-
-/// Socket receive buffer size.
-///
-/// Allows reading multiple datagrams in a single [`Socket::recv`] call.
-const RECV_BUF_SIZE: usize = u16::MAX as usize;
-
-pub struct Socket {
- socket: tokio::net::UdpSocket,
- state: UdpSocketState,
- recv_buf: Vec<u8>,
-}
-
-impl Socket {
- /// Calls [`std::net::UdpSocket::bind`] and instantiates [`quinn_udp::UdpSocketState`].
- pub fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self, io::Error> {
- let socket = std::net::UdpSocket::bind(addr)?;
-
- Ok(Self {
- state: quinn_udp::UdpSocketState::new((&socket).into())?,
- socket: tokio::net::UdpSocket::from_std(socket)?,
- recv_buf: vec![0; RECV_BUF_SIZE],
- })
- }
-
- /// See [`tokio::net::UdpSocket::local_addr`].
- pub fn local_addr(&self) -> io::Result<SocketAddr> {
- self.socket.local_addr()
- }
-
- /// See [`tokio::net::UdpSocket::writable`].
- pub async fn writable(&self) -> Result<(), io::Error> {
- self.socket.writable().await
- }
-
- /// See [`tokio::net::UdpSocket::readable`].
- pub async fn readable(&self) -> Result<(), io::Error> {
- self.socket.readable().await
- }
-
- /// Send the UDP datagram on the specified socket.
- pub fn send(&self, d: Datagram) -> io::Result<usize> {
- let transmit = Transmit {
- destination: d.destination(),
- ecn: EcnCodepoint::from_bits(Into::<u8>::into(d.tos())),
- contents: d.into_data().into(),
- segment_size: None,
- src_ip: None,
- };
-
- let n = self.socket.try_io(Interest::WRITABLE, || {
- self.state
- .send((&self.socket).into(), slice::from_ref(&transmit))
- })?;
-
- assert_eq!(n, 1, "only passed one slice");
-
- Ok(n)
- }
-
- /// Receive a UDP datagram on the specified socket.
- pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> {
- let mut meta = RecvMeta::default();
-
- match self.socket.try_io(Interest::READABLE, || {
- self.state.recv(
- (&self.socket).into(),
- &mut [IoSliceMut::new(&mut self.recv_buf)],
- slice::from_mut(&mut meta),
- )
- }) {
- Ok(n) => {
- assert_eq!(n, 1, "only passed one slice");
- }
- Err(ref err)
- if err.kind() == io::ErrorKind::WouldBlock
- || err.kind() == io::ErrorKind::Interrupted =>
- {
- return Ok(vec![])
- }
- Err(err) => {
- return Err(err);
- }
- };
-
- if meta.len == 0 {
- eprintln!("zero length datagram received?");
- return Ok(vec![]);
- }
- if meta.len == self.recv_buf.len() {
- eprintln!(
- "Might have received more than {} bytes",
- self.recv_buf.len()
- );
- }
-
- Ok(self.recv_buf[0..meta.len]
- .chunks(meta.stride.min(self.recv_buf.len()))
- .map(|d| {
- Datagram::new(
- meta.addr,
- *local_address,
- meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
- None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
- d,
- )
- })
- .collect())
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::{IpTosDscp, IpTosEcn};
-
- #[tokio::test]
- async fn datagram_tos() -> Result<(), io::Error> {
- let sender = Socket::bind("127.0.0.1:0")?;
- let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
- let mut receiver = Socket::bind(receiver_addr)?;
-
- let datagram = Datagram::new(
- sender.local_addr()?,
- receiver.local_addr()?,
- IpTos::from((IpTosDscp::Le, IpTosEcn::Ect1)),
- None,
- "Hello, world!".as_bytes().to_vec(),
- );
-
- sender.writable().await?;
- sender.send(datagram.clone())?;
-
- receiver.readable().await?;
- let received_datagram = receiver
- .recv(&receiver_addr)
- .expect("receive to succeed")
- .into_iter()
- .next()
- .expect("receive to yield datagram");
-
- // Assert that the ECN is correct.
- assert_eq!(
- IpTosEcn::from(datagram.tos()),
- IpTosEcn::from(received_datagram.tos())
- );
-
- Ok(())
- }
-
- /// Expect [`Socket::recv`] to handle multiple [`Datagram`]s on GRO read.
- #[tokio::test]
- #[cfg_attr(not(any(target_os = "linux", target_os = "windows")), ignore)]
- async fn many_datagrams_through_gro() -> Result<(), io::Error> {
- const SEGMENT_SIZE: usize = 128;
-
- let sender = Socket::bind("127.0.0.1:0")?;
- let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
- let mut receiver = Socket::bind(receiver_addr)?;
-
- // `neqo_common::udp::Socket::send` does not yet
- // (https://github.com/mozilla/neqo/issues/1693) support GSO. Use
- // `quinn_udp` directly.
- let max_gso_segments = sender.state.max_gso_segments();
- let msg = vec![0xAB; SEGMENT_SIZE * max_gso_segments];
- let transmit = Transmit {
- destination: receiver.local_addr()?,
- ecn: EcnCodepoint::from_bits(Into::<u8>::into(IpTos::from((
- IpTosDscp::Le,
- IpTosEcn::Ect1,
- )))),
- contents: msg.clone().into(),
- segment_size: Some(SEGMENT_SIZE),
- src_ip: None,
- };
- sender.writable().await?;
- let n = sender.socket.try_io(Interest::WRITABLE, || {
- sender
- .state
- .send((&sender.socket).into(), slice::from_ref(&transmit))
- })?;
- assert_eq!(n, 1, "only passed one slice");
-
- // Allow for one GSO sendmmsg to result in multiple GRO recvmmsg.
- let mut num_received = 0;
- while num_received < max_gso_segments {
- receiver.readable().await?;
- receiver
- .recv(&receiver_addr)
- .expect("receive to succeed")
- .into_iter()
- .for_each(|d| {
- assert_eq!(
- SEGMENT_SIZE,
- d.len(),
- "Expect received datagrams to have same length as sent datagrams."
- );
- num_received += 1;
- });
- }
-
- Ok(())
- }
-}