summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-common
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/neqo-common')
-rw-r--r--third_party/rust/neqo-common/.cargo-checksum.json2
-rw-r--r--third_party/rust/neqo-common/Cargo.toml28
-rw-r--r--third_party/rust/neqo-common/benches/timer.rs39
-rw-r--r--third_party/rust/neqo-common/src/datagram.rs15
-rw-r--r--third_party/rust/neqo-common/src/lib.rs2
-rw-r--r--third_party/rust/neqo-common/src/log.rs21
-rw-r--r--third_party/rust/neqo-common/src/timer.rs46
-rw-r--r--third_party/rust/neqo-common/src/tos.rs48
-rw-r--r--third_party/rust/neqo-common/src/udp.rs222
9 files changed, 140 insertions, 283 deletions
diff --git a/third_party/rust/neqo-common/.cargo-checksum.json b/third_party/rust/neqo-common/.cargo-checksum.json
index e7daca1191..64d5739014 100644
--- a/third_party/rust/neqo-common/.cargo-checksum.json
+++ b/third_party/rust/neqo-common/.cargo-checksum.json
@@ -1 +1 @@
-{"files":{"Cargo.toml":"b49758e5e8f0a6955d761e689be39530f193f7089de07f2295a7a3aef4df5898","build.rs":"306b2f909a25ae38daf5404a4e128d2a94e8975b70870864c2a71cafec9717c7","src/codec.rs":"fd239f75d374db6ff744211344c82bcd19ecf753e07410e1fe37732bbb81dfe9","src/datagram.rs":"f2ff56faa0e513edbf4331b6ee2c9e6d6111483bda7aff08d16b9f05bce5c320","src/event.rs":"106ca6c4afb107fa49a1bc72f5eb4ae95f4baa1ba19736aa38c8ba973774c160","src/header.rs":"467b947f78bfe354d8bb51e8df0c2be69e75a45e2be688d81f0d268aa77c89ef","src/hrtime.rs":"112dc758e65301b8a7a508b125d3d61063180d432bffaec566a050d4f907ab18","src/incrdecoder.rs":"577c32b9ace51f2daaf940be6d0c391c4f55cd42ef6848c68c1ffc970d8c57b5","src/lib.rs":"a86aae69900933bf83044fa96166ee51216277415eafcdb15c04a907bb2dd10e","src/log.rs":"7246053bffd704b264d42fc82f986b9d62079472a76a9fc3749c25cfc7698532","src/qlog.rs":"9b081f32bf158fd340300693acc97fe0554b617ae664eba86e4d3572e2b1e16e","src/timer.rs":"350a730cc5a159dfdac5d78ec8e8a34c5172a476d827a566703edec24c791842","src/tos.rs":"440616cb0aee9082abe00623b33e68dbe80eda47aec889ac5f4145b1566bf692","src/udp.rs":"2b92132e078791e35b66f68d99d79ff5df55efd03e788474f7781a00403a5533","tests/log.rs":"a11e21fb570258ca93bb40e3923817d381e1e605accbc3aed1df5a0a9918b41d"},"package":null} \ No newline at end of file
+{"files":{"Cargo.toml":"28a963b1a9067fef18e945a938a6ae5ea14a3a29a937f3be2ec4c0e3ae33854f","benches/timer.rs":"52d35abe1e06b92e913f43d95295b4eded0f19809677a7a63857fe92dad2c6fa","build.rs":"306b2f909a25ae38daf5404a4e128d2a94e8975b70870864c2a71cafec9717c7","src/codec.rs":"fd239f75d374db6ff744211344c82bcd19ecf753e07410e1fe37732bbb81dfe9","src/datagram.rs":"691ad94a3618d6bf5202a7911419b5e75e318d09c8cc57a9a542a864dcc764ec","src/event.rs":"106ca6c4afb107fa49a1bc72f5eb4ae95f4baa1ba19736aa38c8ba973774c160","src/header.rs":"467b947f78bfe354d8bb51e8df0c2be69e75a45e2be688d81f0d268aa77c89ef","src/hrtime.rs":"112dc758e65301b8a7a508b125d3d61063180d432bffaec566a050d4f907ab18","src/incrdecoder.rs":"577c32b9ace51f2daaf940be6d0c391c4f55cd42ef6848c68c1ffc970d8c57b5","src/lib.rs":"c917282134f43d0ddfbd67bbceea9f615a7db8a23608f809b4746808c08a9b3f","src/log.rs":"6ed99e15707c4256ae793011ed2f4b33aa81fed70205aaf5f8d3cd11ad451cf0","src/qlog.rs":"9b081f32bf158fd340300693acc97fe0554b617ae664eba86e4d3572e2b1e16e","src/timer.rs":"f6da86baf3b5d91c1230d5296ef886fb7233cdefa8c8e2b4197fcf82425a54fa","src/tos.rs":"baec87b4f8a6253b88cd257730bd1e3147c046ef993288b08235d54a24f88fbe","tests/log.rs":"a11e21fb570258ca93bb40e3923817d381e1e605accbc3aed1df5a0a9918b41d"},"package":null} \ No newline at end of file
diff --git a/third_party/rust/neqo-common/Cargo.toml b/third_party/rust/neqo-common/Cargo.toml
index dc5bed385f..90b254c888 100644
--- a/third_party/rust/neqo-common/Cargo.toml
+++ b/third_party/rust/neqo-common/Cargo.toml
@@ -13,7 +13,7 @@
edition = "2021"
rust-version = "1.74.0"
name = "neqo-common"
-version = "0.7.2"
+version = "0.7.5"
authors = ["The Neqo Authors <necko@mozilla.com>"]
build = "build.rs"
homepage = "https://github.com/mozilla/neqo/"
@@ -23,6 +23,10 @@ repository = "https://github.com/mozilla/neqo/"
[lib]
bench = false
+[[bench]]
+name = "timer"
+harness = false
+
[dependencies.enum-map]
version = "2.7"
default-features = false
@@ -39,26 +43,14 @@ default-features = false
version = "0.12"
default-features = false
-[dependencies.quinn-udp]
-git = "https://github.com/quinn-rs/quinn/"
-rev = "a947962131aba8a6521253d03cc948b20098a2d6"
-optional = true
-
[dependencies.time]
version = "0.3"
features = ["formatting"]
default-features = false
-[dependencies.tokio]
-version = "1"
-features = [
- "net",
- "time",
- "macros",
- "rt",
- "rt-multi-thread",
-]
-optional = true
+[dev-dependencies.criterion]
+version = "0.5"
+features = ["html_reports"]
default-features = false
[dev-dependencies.test-fixture]
@@ -66,10 +58,6 @@ path = "../test-fixture"
[features]
ci = []
-udp = [
- "dep:quinn-udp",
- "dep:tokio",
-]
[target."cfg(windows)".dependencies.winapi]
version = "0.3"
diff --git a/third_party/rust/neqo-common/benches/timer.rs b/third_party/rust/neqo-common/benches/timer.rs
new file mode 100644
index 0000000000..5ac8019db4
--- /dev/null
+++ b/third_party/rust/neqo-common/benches/timer.rs
@@ -0,0 +1,39 @@
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use std::time::{Duration, Instant};
+
+use criterion::{criterion_group, criterion_main, Criterion};
+use neqo_common::timer::Timer;
+use test_fixture::now;
+
+fn benchmark_timer(c: &mut Criterion) {
+ c.bench_function("drain a timer quickly", |b| {
+ b.iter_batched_ref(
+ make_timer,
+ |(_now, timer)| {
+ while let Some(t) = timer.next_time() {
+ assert!(timer.take_next(t).is_some());
+ }
+ },
+ criterion::BatchSize::SmallInput,
+ );
+ });
+}
+
+fn make_timer() -> (Instant, Timer<()>) {
+ const TIMES: &[u64] = &[1, 2, 3, 5, 8, 13, 21, 34];
+
+ let now = now();
+ let mut timer = Timer::new(now, Duration::from_millis(777), 100);
+ for &t in TIMES {
+ timer.add(now + Duration::from_secs(t), ());
+ }
+ (now, timer)
+}
+
+criterion_group!(benches, benchmark_timer);
+criterion_main!(benches);
diff --git a/third_party/rust/neqo-common/src/datagram.rs b/third_party/rust/neqo-common/src/datagram.rs
index 04ba1a45a1..cc2cb7d113 100644
--- a/third_party/rust/neqo-common/src/datagram.rs
+++ b/third_party/rust/neqo-common/src/datagram.rs
@@ -54,10 +54,8 @@ impl Datagram {
self.ttl
}
- #[cfg(feature = "udp")]
- #[must_use]
- pub(crate) fn into_data(self) -> Vec<u8> {
- self.d
+ pub fn set_tos(&mut self, tos: IpTos) {
+ self.tos = tos;
}
}
@@ -83,6 +81,12 @@ impl std::fmt::Debug for Datagram {
}
}
+impl From<Datagram> for Vec<u8> {
+ fn from(datagram: Datagram) -> Self {
+ datagram.d
+ }
+}
+
#[cfg(test)]
use test_fixture::datagram;
@@ -90,8 +94,7 @@ use test_fixture::datagram;
fn fmt_datagram() {
let d = datagram([0; 1].to_vec());
assert_eq!(
- format!("{d:?}"),
+ &format!("{d:?}"),
"Datagram IpTos(Cs0, NotEct) TTL Some(128) [fe80::1]:443->[fe80::1]:443: [1]: 00"
- .to_string()
);
}
diff --git a/third_party/rust/neqo-common/src/lib.rs b/third_party/rust/neqo-common/src/lib.rs
index fe88097983..e988c6071d 100644
--- a/third_party/rust/neqo-common/src/lib.rs
+++ b/third_party/rust/neqo-common/src/lib.rs
@@ -16,8 +16,6 @@ pub mod log;
pub mod qlog;
pub mod timer;
pub mod tos;
-#[cfg(feature = "udp")]
-pub mod udp;
use std::fmt::Write;
diff --git a/third_party/rust/neqo-common/src/log.rs b/third_party/rust/neqo-common/src/log.rs
index c5b89be8a6..04028a26bd 100644
--- a/third_party/rust/neqo-common/src/log.rs
+++ b/third_party/rust/neqo-common/src/log.rs
@@ -50,7 +50,7 @@ fn since_start() -> Duration {
START_TIME.get_or_init(Instant::now).elapsed()
}
-pub fn init() {
+pub fn init(level_filter: Option<log::LevelFilter>) {
static INIT_ONCE: Once = Once::new();
if ::log::STATIC_MAX_LEVEL == ::log::LevelFilter::Off {
@@ -59,6 +59,9 @@ pub fn init() {
INIT_ONCE.call_once(|| {
let mut builder = Builder::from_env("RUST_LOG");
+ if let Some(filter) = level_filter {
+ builder.filter_level(filter);
+ }
builder.format(|buf, record| {
let elapsed = since_start();
writeln!(
@@ -71,9 +74,9 @@ pub fn init() {
)
});
if let Err(e) = builder.try_init() {
- do_log!(::log::Level::Info, "Logging initialization error {:?}", e);
+ do_log!(::log::Level::Warn, "Logging initialization error {:?}", e);
} else {
- do_log!(::log::Level::Info, "Logging initialized");
+ do_log!(::log::Level::Debug, "Logging initialized");
}
});
}
@@ -81,32 +84,32 @@ pub fn init() {
#[macro_export]
macro_rules! log_invoke {
($lvl:expr, $ctx:expr, $($arg:tt)*) => ( {
- ::neqo_common::log::init();
+ ::neqo_common::log::init(None);
::neqo_common::do_log!($lvl, "[{}] {}", $ctx, format!($($arg)*));
} )
}
#[macro_export]
macro_rules! qerror {
([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Error, $ctx, $($arg)*););
- ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Error, $($arg)*); } );
+ ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Error, $($arg)*); } );
}
#[macro_export]
macro_rules! qwarn {
([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Warn, $ctx, $($arg)*););
- ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Warn, $($arg)*); } );
+ ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Warn, $($arg)*); } );
}
#[macro_export]
macro_rules! qinfo {
([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Info, $ctx, $($arg)*););
- ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Info, $($arg)*); } );
+ ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Info, $($arg)*); } );
}
#[macro_export]
macro_rules! qdebug {
([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Debug, $ctx, $($arg)*););
- ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Debug, $($arg)*); } );
+ ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Debug, $($arg)*); } );
}
#[macro_export]
macro_rules! qtrace {
([$ctx:expr], $($arg:tt)*) => (::neqo_common::log_invoke!(::log::Level::Trace, $ctx, $($arg)*););
- ($($arg:tt)*) => ( { ::neqo_common::log::init(); ::neqo_common::do_log!(::log::Level::Trace, $($arg)*); } );
+ ($($arg:tt)*) => ( { ::neqo_common::log::init(None); ::neqo_common::do_log!(::log::Level::Trace, $($arg)*); } );
}
diff --git a/third_party/rust/neqo-common/src/timer.rs b/third_party/rust/neqo-common/src/timer.rs
index a413252e08..3feddb2226 100644
--- a/third_party/rust/neqo-common/src/timer.rs
+++ b/third_party/rust/neqo-common/src/timer.rs
@@ -5,6 +5,7 @@
// except according to those terms.
use std::{
+ collections::VecDeque,
mem,
time::{Duration, Instant},
};
@@ -27,7 +28,7 @@ impl<T> TimerItem<T> {
/// points). Time is relative, the wheel has an origin time and it is unable to represent times that
/// are more than `granularity * capacity` past that time.
pub struct Timer<T> {
- items: Vec<Vec<TimerItem<T>>>,
+ items: Vec<VecDeque<TimerItem<T>>>,
now: Instant,
granularity: Duration,
cursor: usize,
@@ -55,9 +56,14 @@ impl<T> Timer<T> {
/// Return a reference to the time of the next entry.
#[must_use]
pub fn next_time(&self) -> Option<Instant> {
- for i in 0..self.items.len() {
- let idx = self.bucket(i);
- if let Some(t) = self.items[idx].first() {
+ let idx = self.bucket(0);
+ for i in idx..self.items.len() {
+ if let Some(t) = self.items[i].front() {
+ return Some(t.time);
+ }
+ }
+ for i in 0..idx {
+ if let Some(t) = self.items[i].front() {
return Some(t.time);
}
}
@@ -145,6 +151,9 @@ impl<T> Timer<T> {
/// Given knowledge of the time an item was added, remove it.
/// This requires use of a predicate that identifies matching items.
+ ///
+ /// # Panics
+ /// Impossible, I think.
pub fn remove<F>(&mut self, time: Instant, mut selector: F) -> Option<T>
where
F: FnMut(&T) -> bool,
@@ -167,7 +176,7 @@ impl<T> Timer<T> {
break;
}
if selector(&self.items[bucket][i].item) {
- return Some(self.items[bucket].remove(i).item);
+ return Some(self.items[bucket].remove(i).unwrap().item);
}
}
// ... then forwards.
@@ -176,7 +185,7 @@ impl<T> Timer<T> {
break;
}
if selector(&self.items[bucket][i].item) {
- return Some(self.items[bucket].remove(i).item);
+ return Some(self.items[bucket].remove(i).unwrap().item);
}
}
None
@@ -185,10 +194,25 @@ impl<T> Timer<T> {
/// Take the next item, unless there are no items with
/// a timeout in the past relative to `until`.
pub fn take_next(&mut self, until: Instant) -> Option<T> {
- for i in 0..self.items.len() {
- let idx = self.bucket(i);
- if !self.items[idx].is_empty() && self.items[idx][0].time <= until {
- return Some(self.items[idx].remove(0).item);
+ fn maybe_take<T>(v: &mut VecDeque<TimerItem<T>>, until: Instant) -> Option<T> {
+ if !v.is_empty() && v[0].time <= until {
+ Some(v.pop_front().unwrap().item)
+ } else {
+ None
+ }
+ }
+
+ let idx = self.bucket(0);
+ for i in idx..self.items.len() {
+ let res = maybe_take(&mut self.items[i], until);
+ if res.is_some() {
+ return res;
+ }
+ }
+ for i in 0..idx {
+ let res = maybe_take(&mut self.items[i], until);
+ if res.is_some() {
+ return res;
}
}
None
@@ -201,7 +225,7 @@ impl<T> Timer<T> {
if until >= self.now + self.span() {
// Drain everything, so a clean sweep.
let mut empty_items = Vec::with_capacity(self.items.len());
- empty_items.resize_with(self.items.len(), Vec::default);
+ empty_items.resize_with(self.items.len(), VecDeque::default);
let mut items = mem::replace(&mut self.items, empty_items);
self.now = until;
self.cursor = 0;
diff --git a/third_party/rust/neqo-common/src/tos.rs b/third_party/rust/neqo-common/src/tos.rs
index 3610f72750..533c5447e2 100644
--- a/third_party/rust/neqo-common/src/tos.rs
+++ b/third_party/rust/neqo-common/src/tos.rs
@@ -36,7 +36,7 @@ impl From<IpTosEcn> for u8 {
impl From<u8> for IpTosEcn {
fn from(v: u8) -> Self {
- match v & 0b11 {
+ match v & 0b0000_0011 {
0b00 => IpTosEcn::NotEct,
0b01 => IpTosEcn::Ect1,
0b10 => IpTosEcn::Ect0,
@@ -47,8 +47,8 @@ impl From<u8> for IpTosEcn {
}
impl From<IpTos> for IpTosEcn {
- fn from(value: IpTos) -> Self {
- IpTosEcn::from(value.0 & 0x3)
+ fn from(v: IpTos) -> Self {
+ IpTosEcn::from(u8::from(v))
}
}
@@ -166,14 +166,13 @@ impl From<u8> for IpTosDscp {
}
impl From<IpTos> for IpTosDscp {
- fn from(value: IpTos) -> Self {
- IpTosDscp::from(value.0 & 0xfc)
+ fn from(v: IpTos) -> Self {
+ IpTosDscp::from(u8::from(v))
}
}
/// The type-of-service field in an IP packet.
-#[allow(clippy::module_name_repetitions)]
-#[derive(Copy, Clone, PartialEq, Eq)]
+#[derive(Copy, Clone, PartialEq, Eq, Default)]
pub struct IpTos(u8);
impl From<IpTosEcn> for IpTos {
@@ -215,15 +214,19 @@ impl From<u8> for IpTos {
impl Debug for IpTos {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("IpTos")
- .field(&IpTosDscp::from(self.0 & 0xfc))
- .field(&IpTosEcn::from(self.0 & 0x3))
+ .field(&IpTosDscp::from(*self))
+ .field(&IpTosEcn::from(*self))
.finish()
}
}
-impl Default for IpTos {
- fn default() -> Self {
- (IpTosDscp::default(), IpTosEcn::default()).into()
+impl IpTos {
+ pub fn set_ecn(&mut self, ecn: IpTosEcn) {
+ self.0 = u8::from(IpTosDscp::from(*self)) | u8::from(ecn);
+ }
+
+ pub fn set_dscp(&mut self, dscp: IpTosDscp) {
+ self.0 = u8::from(IpTosEcn::from(*self)) | u8::from(dscp);
}
}
@@ -322,4 +325,25 @@ mod tests {
assert_eq!(tos, u8::from(iptos));
assert_eq!(IpTos::from(tos), iptos);
}
+
+ #[test]
+ fn iptos_to_iptosdscp() {
+ let tos = IpTos::from((IpTosDscp::Af41, IpTosEcn::NotEct));
+ let dscp = IpTosDscp::from(tos);
+ assert_eq!(dscp, IpTosDscp::Af41);
+ }
+
+ #[test]
+ fn tos_modify_ecn() {
+ let mut iptos: IpTos = (IpTosDscp::Af41, IpTosEcn::NotEct).into();
+ iptos.set_ecn(IpTosEcn::Ce);
+ assert_eq!(u8::from(iptos), 0b1000_1011);
+ }
+
+ #[test]
+ fn tos_modify_dscp() {
+ let mut iptos: IpTos = (IpTosDscp::Af41, IpTosEcn::Ect1).into();
+ iptos.set_dscp(IpTosDscp::Le);
+ assert_eq!(u8::from(iptos), 0b0000_0101);
+ }
}
diff --git a/third_party/rust/neqo-common/src/udp.rs b/third_party/rust/neqo-common/src/udp.rs
deleted file mode 100644
index c27b0632ff..0000000000
--- a/third_party/rust/neqo-common/src/udp.rs
+++ /dev/null
@@ -1,222 +0,0 @@
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-#![allow(clippy::missing_errors_doc)] // Functions simply delegate to tokio and quinn-udp.
-#![allow(clippy::missing_panics_doc)] // Functions simply delegate to tokio and quinn-udp.
-
-use std::{
- io::{self, IoSliceMut},
- net::{SocketAddr, ToSocketAddrs},
- slice,
-};
-
-use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState};
-use tokio::io::Interest;
-
-use crate::{Datagram, IpTos};
-
-/// Socket receive buffer size.
-///
-/// Allows reading multiple datagrams in a single [`Socket::recv`] call.
-const RECV_BUF_SIZE: usize = u16::MAX as usize;
-
-pub struct Socket {
- socket: tokio::net::UdpSocket,
- state: UdpSocketState,
- recv_buf: Vec<u8>,
-}
-
-impl Socket {
- /// Calls [`std::net::UdpSocket::bind`] and instantiates [`quinn_udp::UdpSocketState`].
- pub fn bind<A: ToSocketAddrs>(addr: A) -> Result<Self, io::Error> {
- let socket = std::net::UdpSocket::bind(addr)?;
-
- Ok(Self {
- state: quinn_udp::UdpSocketState::new((&socket).into())?,
- socket: tokio::net::UdpSocket::from_std(socket)?,
- recv_buf: vec![0; RECV_BUF_SIZE],
- })
- }
-
- /// See [`tokio::net::UdpSocket::local_addr`].
- pub fn local_addr(&self) -> io::Result<SocketAddr> {
- self.socket.local_addr()
- }
-
- /// See [`tokio::net::UdpSocket::writable`].
- pub async fn writable(&self) -> Result<(), io::Error> {
- self.socket.writable().await
- }
-
- /// See [`tokio::net::UdpSocket::readable`].
- pub async fn readable(&self) -> Result<(), io::Error> {
- self.socket.readable().await
- }
-
- /// Send the UDP datagram on the specified socket.
- pub fn send(&self, d: Datagram) -> io::Result<usize> {
- let transmit = Transmit {
- destination: d.destination(),
- ecn: EcnCodepoint::from_bits(Into::<u8>::into(d.tos())),
- contents: d.into_data().into(),
- segment_size: None,
- src_ip: None,
- };
-
- let n = self.socket.try_io(Interest::WRITABLE, || {
- self.state
- .send((&self.socket).into(), slice::from_ref(&transmit))
- })?;
-
- assert_eq!(n, 1, "only passed one slice");
-
- Ok(n)
- }
-
- /// Receive a UDP datagram on the specified socket.
- pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> {
- let mut meta = RecvMeta::default();
-
- match self.socket.try_io(Interest::READABLE, || {
- self.state.recv(
- (&self.socket).into(),
- &mut [IoSliceMut::new(&mut self.recv_buf)],
- slice::from_mut(&mut meta),
- )
- }) {
- Ok(n) => {
- assert_eq!(n, 1, "only passed one slice");
- }
- Err(ref err)
- if err.kind() == io::ErrorKind::WouldBlock
- || err.kind() == io::ErrorKind::Interrupted =>
- {
- return Ok(vec![])
- }
- Err(err) => {
- return Err(err);
- }
- };
-
- if meta.len == 0 {
- eprintln!("zero length datagram received?");
- return Ok(vec![]);
- }
- if meta.len == self.recv_buf.len() {
- eprintln!(
- "Might have received more than {} bytes",
- self.recv_buf.len()
- );
- }
-
- Ok(self.recv_buf[0..meta.len]
- .chunks(meta.stride.min(self.recv_buf.len()))
- .map(|d| {
- Datagram::new(
- meta.addr,
- *local_address,
- meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
- None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
- d,
- )
- })
- .collect())
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::{IpTosDscp, IpTosEcn};
-
- #[tokio::test]
- async fn datagram_tos() -> Result<(), io::Error> {
- let sender = Socket::bind("127.0.0.1:0")?;
- let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
- let mut receiver = Socket::bind(receiver_addr)?;
-
- let datagram = Datagram::new(
- sender.local_addr()?,
- receiver.local_addr()?,
- IpTos::from((IpTosDscp::Le, IpTosEcn::Ect1)),
- None,
- "Hello, world!".as_bytes().to_vec(),
- );
-
- sender.writable().await?;
- sender.send(datagram.clone())?;
-
- receiver.readable().await?;
- let received_datagram = receiver
- .recv(&receiver_addr)
- .expect("receive to succeed")
- .into_iter()
- .next()
- .expect("receive to yield datagram");
-
- // Assert that the ECN is correct.
- assert_eq!(
- IpTosEcn::from(datagram.tos()),
- IpTosEcn::from(received_datagram.tos())
- );
-
- Ok(())
- }
-
- /// Expect [`Socket::recv`] to handle multiple [`Datagram`]s on GRO read.
- #[tokio::test]
- #[cfg_attr(not(any(target_os = "linux", target_os = "windows")), ignore)]
- async fn many_datagrams_through_gro() -> Result<(), io::Error> {
- const SEGMENT_SIZE: usize = 128;
-
- let sender = Socket::bind("127.0.0.1:0")?;
- let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
- let mut receiver = Socket::bind(receiver_addr)?;
-
- // `neqo_common::udp::Socket::send` does not yet
- // (https://github.com/mozilla/neqo/issues/1693) support GSO. Use
- // `quinn_udp` directly.
- let max_gso_segments = sender.state.max_gso_segments();
- let msg = vec![0xAB; SEGMENT_SIZE * max_gso_segments];
- let transmit = Transmit {
- destination: receiver.local_addr()?,
- ecn: EcnCodepoint::from_bits(Into::<u8>::into(IpTos::from((
- IpTosDscp::Le,
- IpTosEcn::Ect1,
- )))),
- contents: msg.clone().into(),
- segment_size: Some(SEGMENT_SIZE),
- src_ip: None,
- };
- sender.writable().await?;
- let n = sender.socket.try_io(Interest::WRITABLE, || {
- sender
- .state
- .send((&sender.socket).into(), slice::from_ref(&transmit))
- })?;
- assert_eq!(n, 1, "only passed one slice");
-
- // Allow for one GSO sendmmsg to result in multiple GRO recvmmsg.
- let mut num_received = 0;
- while num_received < max_gso_segments {
- receiver.readable().await?;
- receiver
- .recv(&receiver_addr)
- .expect("receive to succeed")
- .into_iter()
- .for_each(|d| {
- assert_eq!(
- SEGMENT_SIZE,
- d.len(),
- "Expect received datagrams to have same length as sent datagrams."
- );
- num_received += 1;
- });
- }
-
- Ok(())
- }
-}