summaryrefslogtreecommitdiffstats
path: root/third_party/rust/neqo-transport/src/connection/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/neqo-transport/src/connection/mod.rs')
-rw-r--r--third_party/rust/neqo-transport/src/connection/mod.rs285
1 files changed, 185 insertions, 100 deletions
diff --git a/third_party/rust/neqo-transport/src/connection/mod.rs b/third_party/rust/neqo-transport/src/connection/mod.rs
index 8522507a69..f955381414 100644
--- a/third_party/rust/neqo-transport/src/connection/mod.rs
+++ b/third_party/rust/neqo-transport/src/connection/mod.rs
@@ -19,7 +19,7 @@ use std::{
use neqo_common::{
event::Provider as EventProvider, hex, hex_snip_middle, hrtime, qdebug, qerror, qinfo,
- qlog::NeqoQlog, qtrace, qwarn, Datagram, Decoder, Encoder, IpTos, Role,
+ qlog::NeqoQlog, qtrace, qwarn, Datagram, Decoder, Encoder, Role,
};
use neqo_crypto::{
agent::CertificateInfo, Agent, AntiReplay, AuthenticationStatus, Cipher, Client, Group,
@@ -35,6 +35,7 @@ use crate::{
ConnectionIdRef, ConnectionIdStore, LOCAL_ACTIVE_CID_LIMIT,
},
crypto::{Crypto, CryptoDxState, CryptoSpace},
+ ecn::EcnCount,
events::{ConnectionEvent, ConnectionEvents, OutgoingDatagramOutcome},
frame::{
CloseError, Frame, FrameType, FRAME_TYPE_CONNECTION_CLOSE_APPLICATION,
@@ -46,7 +47,7 @@ use crate::{
quic_datagrams::{DatagramTracking, QuicDatagrams},
recovery::{LossRecovery, RecoveryToken, SendProfile},
recv_stream::RecvStreamStats,
- rtt::GRANULARITY,
+ rtt::{RttEstimate, GRANULARITY},
send_stream::SendStream,
stats::{Stats, StatsCell},
stream_id::StreamType,
@@ -55,9 +56,9 @@ use crate::{
self, TransportParameter, TransportParameterId, TransportParameters,
TransportParametersHandler,
},
- tracking::{AckTracker, PacketNumberSpace, SentPacket},
+ tracking::{AckTracker, PacketNumberSpace, RecvdPackets, SentPacket},
version::{Version, WireVersion},
- AppError, ConnectionError, Error, Res, StreamId,
+ AppError, CloseReason, Error, Res, StreamId,
};
mod dump;
@@ -291,7 +292,7 @@ impl Debug for Connection {
"{:?} Connection: {:?} {:?}",
self.role,
self.state,
- self.paths.primary_fallible()
+ self.paths.primary()
)
}
}
@@ -591,7 +592,11 @@ impl Connection {
fn make_resumption_token(&mut self) -> ResumptionToken {
debug_assert_eq!(self.role, Role::Client);
debug_assert!(self.crypto.has_resumption_token());
- let rtt = self.paths.primary().borrow().rtt().estimate();
+ let rtt = self.paths.primary().map_or_else(
+ || RttEstimate::default().estimate(),
+ |p| p.borrow().rtt().estimate(),
+ );
+
self.crypto
.create_resumption_token(
self.new_token.take_token(),
@@ -610,11 +615,10 @@ impl Connection {
/// a value of this approximate order. Don't use this for loss recovery,
/// only use it where a more precise value is not important.
fn pto(&self) -> Duration {
- self.paths
- .primary()
- .borrow()
- .rtt()
- .pto(PacketNumberSpace::ApplicationData)
+ self.paths.primary().map_or_else(
+ || RttEstimate::default().pto(PacketNumberSpace::ApplicationData),
+ |p| p.borrow().rtt().pto(PacketNumberSpace::ApplicationData),
+ )
}
fn create_resumption_token(&mut self, now: Instant) {
@@ -746,7 +750,12 @@ impl Connection {
if !init_token.is_empty() {
self.address_validation = AddressValidationInfo::NewToken(init_token.to_vec());
}
- self.paths.primary().borrow_mut().rtt_mut().set_initial(rtt);
+ self.paths
+ .primary()
+ .ok_or(Error::InternalError)?
+ .borrow_mut()
+ .rtt_mut()
+ .set_initial(rtt);
self.set_initial_limits();
// Start up TLS, which has the effect of setting up all the necessary
// state for 0-RTT. This only stages the CRYPTO frames.
@@ -786,7 +795,7 @@ impl Connection {
// If we are able, also send a NEW_TOKEN frame.
// This should be recording all remote addresses that are valid,
// but there are just 0 or 1 in the current implementation.
- if let Some(path) = self.paths.primary_fallible() {
+ if let Some(path) = self.paths.primary() {
if let Some(token) = self
.address_validation
.generate_new_token(path.borrow().remote_address(), now)
@@ -858,7 +867,7 @@ impl Connection {
#[must_use]
pub fn stats(&self) -> Stats {
let mut v = self.stats.borrow().clone();
- if let Some(p) = self.paths.primary_fallible() {
+ if let Some(p) = self.paths.primary() {
let p = p.borrow();
v.rtt = p.rtt().estimate();
v.rttvar = p.rtt().rttvar();
@@ -880,7 +889,7 @@ impl Connection {
let msg = format!("{v:?}");
#[cfg(not(debug_assertions))]
let msg = "";
- let error = ConnectionError::Transport(v.clone());
+ let error = CloseReason::Transport(v.clone());
match &self.state {
State::Closing { error: err, .. }
| State::Draining { error: err, .. }
@@ -895,14 +904,14 @@ impl Connection {
State::WaitInitial => {
// We don't have any state yet, so don't bother with
// the closing state, just send one CONNECTION_CLOSE.
- if let Some(path) = path.or_else(|| self.paths.primary_fallible()) {
+ if let Some(path) = path.or_else(|| self.paths.primary()) {
self.state_signaling
.close(path, error.clone(), frame_type, msg);
}
self.set_state(State::Closed(error));
}
_ => {
- if let Some(path) = path.or_else(|| self.paths.primary_fallible()) {
+ if let Some(path) = path.or_else(|| self.paths.primary()) {
self.state_signaling
.close(path, error.clone(), frame_type, msg);
if matches!(v, Error::KeysExhausted) {
@@ -951,9 +960,7 @@ impl Connection {
let pto = self.pto();
if self.idle_timeout.expired(now, pto) {
qinfo!([self], "idle timeout expired");
- self.set_state(State::Closed(ConnectionError::Transport(
- Error::IdleTimeout,
- )));
+ self.set_state(State::Closed(CloseReason::Transport(Error::IdleTimeout)));
return;
}
@@ -962,9 +969,11 @@ impl Connection {
let res = self.crypto.states.check_key_update(now);
self.absorb_error(now, res);
- let lost = self.loss_recovery.timeout(&self.paths.primary(), now);
- self.handle_lost_packets(&lost);
- qlog::packets_lost(&mut self.qlog, &lost);
+ if let Some(path) = self.paths.primary() {
+ let lost = self.loss_recovery.timeout(&path, now);
+ self.handle_lost_packets(&lost);
+ qlog::packets_lost(&mut self.qlog, &lost);
+ }
if self.release_resumption_token_timer.is_some() {
self.create_resumption_token(now);
@@ -1014,7 +1023,7 @@ impl Connection {
delays.push(ack_time);
}
- if let Some(p) = self.paths.primary_fallible() {
+ if let Some(p) = self.paths.primary() {
let path = p.borrow();
let rtt = path.rtt();
let pto = rtt.pto(PacketNumberSpace::ApplicationData);
@@ -1102,7 +1111,15 @@ impl Connection {
self.input(d, now, now);
self.process_saved(now);
}
- self.process_output(now)
+ #[allow(clippy::let_and_return)]
+ let output = self.process_output(now);
+ #[cfg(all(feature = "build-fuzzing-corpus", test))]
+ if self.test_frame_writer.is_none() {
+ if let Some(d) = output.clone().dgram() {
+ neqo_common::write_item_to_fuzzing_corpus("packet", &d);
+ }
+ }
+ output
}
fn handle_retry(&mut self, packet: &PublicPacket, now: Instant) {
@@ -1123,7 +1140,13 @@ impl Connection {
}
// At this point, we should only have the connection ID that we generated.
// Update to the one that the server prefers.
- let path = self.paths.primary();
+ let Some(path) = self.paths.primary() else {
+ self.stats
+ .borrow_mut()
+ .pkt_dropped("Retry without an existing path");
+ return;
+ };
+
path.borrow_mut().set_remote_cid(packet.scid());
let retry_scid = ConnectionId::from(packet.scid());
@@ -1151,8 +1174,9 @@ impl Connection {
fn discard_keys(&mut self, space: PacketNumberSpace, now: Instant) {
if self.crypto.discard(space) {
qdebug!([self], "Drop packet number space {}", space);
- let primary = self.paths.primary();
- self.loss_recovery.discard(&primary, space, now);
+ if let Some(path) = self.paths.primary() {
+ self.loss_recovery.discard(&path, space, now);
+ }
self.acks.drop_space(space);
}
}
@@ -1180,7 +1204,7 @@ impl Connection {
qdebug!([self], "Stateless reset: {}", hex(&d[d.len() - 16..]));
self.state_signaling.reset();
self.set_state(State::Draining {
- error: ConnectionError::Transport(Error::StatelessReset),
+ error: CloseReason::Transport(Error::StatelessReset),
timeout: self.get_closing_period_time(now),
});
Err(Error::StatelessReset)
@@ -1227,8 +1251,9 @@ impl Connection {
assert_ne!(self.version, version);
qinfo!([self], "Version negotiation: trying {:?}", version);
- let local_addr = self.paths.primary().borrow().local_address();
- let remote_addr = self.paths.primary().borrow().remote_address();
+ let path = self.paths.primary().ok_or(Error::NoAvailablePath)?;
+ let local_addr = path.borrow().local_address();
+ let remote_addr = path.borrow().remote_address();
let conn_params = self
.conn_params
.clone()
@@ -1256,7 +1281,7 @@ impl Connection {
} else {
qinfo!([self], "Version negotiation: failed with {:?}", supported);
// This error goes straight to closed.
- self.set_state(State::Closed(ConnectionError::Transport(
+ self.set_state(State::Closed(CloseReason::Transport(
Error::VersionNegotiation,
)));
Err(Error::VersionNegotiation)
@@ -1417,6 +1442,13 @@ impl Connection {
migrate: bool,
now: Instant,
) {
+ let space = PacketNumberSpace::from(packet.packet_type());
+ if let Some(space) = self.acks.get_mut(space) {
+ *space.ecn_marks() += d.tos().into();
+ } else {
+ qtrace!("Not tracking ECN for dropped packet number space");
+ }
+
if self.state == State::WaitInitial {
self.start_handshake(path, packet, now);
}
@@ -1491,6 +1523,16 @@ impl Connection {
d.tos(),
);
+ #[cfg(feature = "build-fuzzing-corpus")]
+ if packet.packet_type() == PacketType::Initial {
+ let target = if self.role == Role::Client {
+ "server_initial"
+ } else {
+ "client_initial"
+ };
+ neqo_common::write_item_to_fuzzing_corpus(target, &payload[..]);
+ }
+
qlog::packet_received(&mut self.qlog, &packet, &payload);
let space = PacketNumberSpace::from(payload.packet_type());
if self.acks.get_mut(space).unwrap().is_duplicate(payload.pn()) {
@@ -1562,7 +1604,11 @@ impl Connection {
let mut probing = true;
let mut d = Decoder::from(&packet[..]);
while d.remaining() > 0 {
+ #[cfg(feature = "build-fuzzing-corpus")]
+ let pos = d.offset();
let f = Frame::decode(&mut d)?;
+ #[cfg(feature = "build-fuzzing-corpus")]
+ neqo_common::write_item_to_fuzzing_corpus("frame", &packet[pos..d.offset()]);
ack_eliciting |= f.ack_eliciting();
probing &= f.path_probing();
let t = f.get_type();
@@ -1623,10 +1669,15 @@ impl Connection {
if let Some(cid) = self.connection_ids.next() {
self.paths.make_permanent(path, None, cid);
Ok(())
- } else if self.paths.primary().borrow().remote_cid().is_empty() {
- self.paths
- .make_permanent(path, None, ConnectionIdEntry::empty_remote());
- Ok(())
+ } else if let Some(primary) = self.paths.primary() {
+ if primary.borrow().remote_cid().is_empty() {
+ self.paths
+ .make_permanent(path, None, ConnectionIdEntry::empty_remote());
+ Ok(())
+ } else {
+ qtrace!([self], "Unable to make path permanent: {}", path.borrow());
+ Err(Error::InvalidMigration)
+ }
} else {
qtrace!([self], "Unable to make path permanent: {}", path.borrow());
Err(Error::InvalidMigration)
@@ -1719,8 +1770,10 @@ impl Connection {
// Pointless migration is pointless.
return Err(Error::InvalidMigration);
}
- let local = local.unwrap_or_else(|| self.paths.primary().borrow().local_address());
- let remote = remote.unwrap_or_else(|| self.paths.primary().borrow().remote_address());
+
+ let path = self.paths.primary().ok_or(Error::InvalidMigration)?;
+ let local = local.unwrap_or_else(|| path.borrow().local_address());
+ let remote = remote.unwrap_or_else(|| path.borrow().remote_address());
if mem::discriminant(&local.ip()) != mem::discriminant(&remote.ip()) {
// Can't mix address families.
@@ -1773,7 +1826,12 @@ impl Connection {
// has to use the existing address. So only pay attention to a preferred
// address from the same family as is currently in use. More thought will
// be needed to work out how to get addresses from a different family.
- let prev = self.paths.primary().borrow().remote_address();
+ let prev = self
+ .paths
+ .primary()
+ .ok_or(Error::NoAvailablePath)?
+ .borrow()
+ .remote_address();
let remote = match prev.ip() {
IpAddr::V4(_) => addr.ipv4().map(SocketAddr::V4),
IpAddr::V6(_) => addr.ipv6().map(SocketAddr::V6),
@@ -1937,20 +1995,15 @@ impl Connection {
}
}
- self.streams
- .write_frames(TransmissionPriority::Critical, builder, tokens, frame_stats);
- if builder.is_full() {
- return;
- }
-
- self.streams.write_frames(
+ for prio in [
+ TransmissionPriority::Critical,
TransmissionPriority::Important,
- builder,
- tokens,
- frame_stats,
- );
- if builder.is_full() {
- return;
+ ] {
+ self.streams
+ .write_frames(prio, builder, tokens, frame_stats);
+ if builder.is_full() {
+ return;
+ }
}
// NEW_CONNECTION_ID, RETIRE_CONNECTION_ID, and ACK_FREQUENCY.
@@ -1958,21 +2011,18 @@ impl Connection {
if builder.is_full() {
return;
}
- self.paths.write_frames(builder, tokens, frame_stats);
- if builder.is_full() {
- return;
- }
- self.streams
- .write_frames(TransmissionPriority::High, builder, tokens, frame_stats);
+ self.paths.write_frames(builder, tokens, frame_stats);
if builder.is_full() {
return;
}
- self.streams
- .write_frames(TransmissionPriority::Normal, builder, tokens, frame_stats);
- if builder.is_full() {
- return;
+ for prio in [TransmissionPriority::High, TransmissionPriority::Normal] {
+ self.streams
+ .write_frames(prio, builder, tokens, &mut stats.frame_tx);
+ if builder.is_full() {
+ return;
+ }
}
// Datagrams are best-effort and unreliable. Let streams starve them for now.
@@ -1981,9 +2031,9 @@ impl Connection {
return;
}
- let frame_stats = &mut stats.frame_tx;
// CRYPTO here only includes NewSessionTicket, plus NEW_TOKEN.
// Both of these are only used for resumption and so can be relatively low priority.
+ let frame_stats = &mut stats.frame_tx;
self.crypto.write_frame(
PacketNumberSpace::ApplicationData,
builder,
@@ -1993,6 +2043,7 @@ impl Connection {
if builder.is_full() {
return;
}
+
self.new_token.write_frames(builder, tokens, frame_stats);
if builder.is_full() {
return;
@@ -2002,10 +2053,8 @@ impl Connection {
.write_frames(TransmissionPriority::Low, builder, tokens, frame_stats);
#[cfg(test)]
- {
- if let Some(w) = &mut self.test_frame_writer {
- w.write_frames(builder);
- }
+ if let Some(w) = &mut self.test_frame_writer {
+ w.write_frames(builder);
}
}
@@ -2138,6 +2187,40 @@ impl Connection {
(tokens, ack_eliciting, padded)
}
+ fn write_closing_frames(
+ &mut self,
+ close: &ClosingFrame,
+ builder: &mut PacketBuilder,
+ space: PacketNumberSpace,
+ now: Instant,
+ path: &PathRef,
+ tokens: &mut Vec<RecoveryToken>,
+ ) {
+ if builder.remaining() > ClosingFrame::MIN_LENGTH + RecvdPackets::USEFUL_ACK_LEN {
+ // Include an ACK frame with the CONNECTION_CLOSE.
+ let limit = builder.limit();
+ builder.set_limit(limit - ClosingFrame::MIN_LENGTH);
+ self.acks.immediate_ack(now);
+ self.acks.write_frame(
+ space,
+ now,
+ path.borrow().rtt().estimate(),
+ builder,
+ tokens,
+ &mut self.stats.borrow_mut().frame_tx,
+ );
+ builder.set_limit(limit);
+ }
+ // CloseReason::Application is only allowed at 1RTT.
+ let sanitized = if space == PacketNumberSpace::ApplicationData {
+ None
+ } else {
+ close.sanitize()
+ };
+ sanitized.as_ref().unwrap_or(close).write_frame(builder);
+ self.stats.borrow_mut().frame_tx.connection_close += 1;
+ }
+
/// Build a datagram, possibly from multiple packets (for different PN
/// spaces) and each containing 1+ frames.
#[allow(clippy::too_many_lines)] // Yeah, that's just the way it is.
@@ -2201,17 +2284,7 @@ impl Connection {
let payload_start = builder.len();
let (mut tokens, mut ack_eliciting, mut padded) = (Vec::new(), false, false);
if let Some(ref close) = closing_frame {
- // ConnectionError::Application is only allowed at 1RTT.
- let sanitized = if *space == PacketNumberSpace::ApplicationData {
- None
- } else {
- close.sanitize()
- };
- sanitized
- .as_ref()
- .unwrap_or(close)
- .write_frame(&mut builder);
- self.stats.borrow_mut().frame_tx.connection_close += 1;
+ self.write_closing_frames(close, &mut builder, *space, now, path, &mut tokens);
} else {
(tokens, ack_eliciting, padded) =
self.write_frames(path, *space, &profile, &mut builder, now);
@@ -2229,7 +2302,7 @@ impl Connection {
pt,
pn,
&builder.as_ref()[payload_start..],
- IpTos::default(), // TODO: set from path
+ path.borrow().tos(),
);
qlog::packet_sent(
&mut self.qlog,
@@ -2251,6 +2324,7 @@ impl Connection {
let sent = SentPacket::new(
pt,
pn,
+ path.borrow().tos().into(),
now,
ack_eliciting,
tokens,
@@ -2303,7 +2377,7 @@ impl Connection {
self.loss_recovery.on_packet_sent(path, initial);
}
path.borrow_mut().add_sent(packets.len());
- Ok(SendOption::Yes(path.borrow().datagram(packets)))
+ Ok(SendOption::Yes(path.borrow_mut().datagram(packets)))
}
}
@@ -2330,7 +2404,9 @@ impl Connection {
fn client_start(&mut self, now: Instant) -> Res<()> {
qdebug!([self], "client_start");
debug_assert_eq!(self.role, Role::Client);
- qlog::client_connection_started(&mut self.qlog, &self.paths.primary());
+ if let Some(path) = self.paths.primary() {
+ qlog::client_connection_started(&mut self.qlog, &path);
+ }
qlog::client_version_information_initiated(&mut self.qlog, self.conn_params.get_versions());
self.handshake(now, self.version, PacketNumberSpace::Initial, None)?;
@@ -2351,9 +2427,9 @@ impl Connection {
/// Close the connection.
pub fn close(&mut self, now: Instant, app_error: AppError, msg: impl AsRef<str>) {
- let error = ConnectionError::Application(app_error);
+ let error = CloseReason::Application(app_error);
let timeout = self.get_closing_period_time(now);
- if let Some(path) = self.paths.primary_fallible() {
+ if let Some(path) = self.paths.primary() {
self.state_signaling.close(path, error.clone(), 0, msg);
self.set_state(State::Closing { error, timeout });
} else {
@@ -2411,10 +2487,8 @@ impl Connection {
// That's OK, they can try guessing this.
ConnectionIdEntry::random_srt()
};
- self.paths
- .primary()
- .borrow_mut()
- .set_reset_token(reset_token);
+ let path = self.paths.primary().ok_or(Error::NoAvailablePath)?;
+ path.borrow_mut().set_reset_token(reset_token);
let max_ad = Duration::from_millis(remote.get_integer(tparams::MAX_ACK_DELAY));
let min_ad = if remote.has_value(tparams::MIN_ACK_DELAY) {
@@ -2426,11 +2500,8 @@ impl Connection {
} else {
None
};
- self.paths.primary().borrow_mut().set_ack_delay(
- max_ad,
- min_ad,
- self.conn_params.get_ack_ratio(),
- );
+ path.borrow_mut()
+ .set_ack_delay(max_ad, min_ad, self.conn_params.get_ack_ratio());
let max_active_cids = remote.get_integer(tparams::ACTIVE_CONNECTION_ID_LIMIT);
self.cid_manager.set_limit(max_active_cids);
@@ -2673,10 +2744,18 @@ impl Connection {
ack_delay,
first_ack_range,
ack_ranges,
+ ecn_count,
} => {
let ranges =
Frame::decode_ack_frame(largest_acknowledged, first_ack_range, &ack_ranges)?;
- self.handle_ack(space, largest_acknowledged, ranges, ack_delay, now);
+ self.handle_ack(
+ space,
+ largest_acknowledged,
+ ranges,
+ ecn_count,
+ ack_delay,
+ now,
+ );
}
Frame::Crypto { offset, data } => {
qtrace!(
@@ -2747,7 +2826,6 @@ impl Connection {
reason_phrase,
} => {
self.stats.borrow_mut().frame_rx.connection_close += 1;
- let reason_phrase = String::from_utf8_lossy(&reason_phrase);
qinfo!(
[self],
"ConnectionClose received. Error code: {:?} frame type {:x} reason {}",
@@ -2768,7 +2846,7 @@ impl Connection {
FRAME_TYPE_CONNECTION_CLOSE_TRANSPORT,
)
};
- let error = ConnectionError::Transport(detail);
+ let error = CloseReason::Transport(detail);
self.state_signaling
.drain(Rc::clone(path), error.clone(), frame_type, "");
self.set_state(State::Draining {
@@ -2853,6 +2931,7 @@ impl Connection {
space: PacketNumberSpace,
largest_acknowledged: u64,
ack_ranges: R,
+ ack_ecn: Option<EcnCount>,
ack_delay: u64,
now: Instant,
) where
@@ -2861,11 +2940,15 @@ impl Connection {
{
qdebug!([self], "Rx ACK space={}, ranges={:?}", space, ack_ranges);
+ let Some(path) = self.paths.primary() else {
+ return;
+ };
let (acked_packets, lost_packets) = self.loss_recovery.on_ack_received(
- &self.paths.primary(),
+ &path,
space,
largest_acknowledged,
ack_ranges,
+ ack_ecn,
self.decode_ack_delay(ack_delay),
now,
);
@@ -2903,8 +2986,10 @@ impl Connection {
qdebug!([self], "0-RTT rejected");
// Tell 0-RTT packets that they were "lost".
- let dropped = self.loss_recovery.drop_0rtt(&self.paths.primary(), now);
- self.handle_lost_packets(&dropped);
+ if let Some(path) = self.paths.primary() {
+ let dropped = self.loss_recovery.drop_0rtt(&path, now);
+ self.handle_lost_packets(&dropped);
+ }
self.streams.zero_rtt_rejected();
@@ -2923,7 +3008,7 @@ impl Connection {
// Remove the randomized client CID from the list of acceptable CIDs.
self.cid_manager.remove_odcid();
// Mark the path as validated, if it isn't already.
- let path = self.paths.primary();
+ let path = self.paths.primary().ok_or(Error::NoAvailablePath)?;
path.borrow_mut().set_valid(now);
// Generate a qlog event that the server connection started.
qlog::server_connection_started(&mut self.qlog, &path);
@@ -3191,7 +3276,7 @@ impl Connection {
else {
return Err(Error::NotAvailable);
};
- let path = self.paths.primary_fallible().ok_or(Error::NotAvailable)?;
+ let path = self.paths.primary().ok_or(Error::NotAvailable)?;
let mtu = path.borrow().mtu();
let encoder = Encoder::with_capacity(mtu);