diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:59:24 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:59:24 +0000 |
commit | 023939b627b7dc93b01471f7d41fb8553ddb4ffa (patch) | |
tree | 60fc59477c605c72b0a1051409062ddecc43f877 /vendor/tokio-util/src/udp | |
parent | Adding debian version 1.72.1+dfsg1-1. (diff) | |
download | rustc-023939b627b7dc93b01471f7d41fb8553ddb4ffa.tar.xz rustc-023939b627b7dc93b01471f7d41fb8553ddb4ffa.zip |
Merging upstream version 1.73.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio-util/src/udp')
-rw-r--r-- | vendor/tokio-util/src/udp/frame.rs | 245 | ||||
-rw-r--r-- | vendor/tokio-util/src/udp/mod.rs | 4 |
2 files changed, 249 insertions, 0 deletions
diff --git a/vendor/tokio-util/src/udp/frame.rs b/vendor/tokio-util/src/udp/frame.rs new file mode 100644 index 000000000..d900fd769 --- /dev/null +++ b/vendor/tokio-util/src/udp/frame.rs @@ -0,0 +1,245 @@ +use crate::codec::{Decoder, Encoder}; + +use futures_core::Stream; +use tokio::{io::ReadBuf, net::UdpSocket}; + +use bytes::{BufMut, BytesMut}; +use futures_core::ready; +use futures_sink::Sink; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{ + borrow::Borrow, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, +}; +use std::{io, mem::MaybeUninit}; + +/// A unified [`Stream`] and [`Sink`] interface to an underlying `UdpSocket`, using +/// the `Encoder` and `Decoder` traits to encode and decode frames. +/// +/// Raw UDP sockets work with datagrams, but higher-level code usually wants to +/// batch these into meaningful chunks, called "frames". This method layers +/// framing on top of this socket by using the `Encoder` and `Decoder` traits to +/// handle encoding and decoding of messages frames. Note that the incoming and +/// outgoing frame types may be distinct. +/// +/// This function returns a *single* object that is both [`Stream`] and [`Sink`]; +/// grouping this into a single object is often useful for layering things which +/// require both read and write access to the underlying object. +/// +/// If you want to work more directly with the streams and sink, consider +/// calling [`split`] on the `UdpFramed` returned by this method, which will break +/// them into separate objects, allowing them to interact more easily. +/// +/// [`Stream`]: futures_core::Stream +/// [`Sink`]: futures_sink::Sink +/// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split +#[must_use = "sinks do nothing unless polled"] +#[derive(Debug)] +pub struct UdpFramed<C, T = UdpSocket> { + socket: T, + codec: C, + rd: BytesMut, + wr: BytesMut, + out_addr: SocketAddr, + flushed: bool, + is_readable: bool, + current_addr: Option<SocketAddr>, +} + +const INITIAL_RD_CAPACITY: usize = 64 * 1024; +const INITIAL_WR_CAPACITY: usize = 8 * 1024; + +impl<C, T> Unpin for UdpFramed<C, T> {} + +impl<C, T> Stream for UdpFramed<C, T> +where + T: Borrow<UdpSocket>, + C: Decoder, +{ + type Item = Result<(C::Item, SocketAddr), C::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let pin = self.get_mut(); + + pin.rd.reserve(INITIAL_RD_CAPACITY); + + loop { + // Are there still bytes left in the read buffer to decode? + if pin.is_readable { + if let Some(frame) = pin.codec.decode_eof(&mut pin.rd)? { + let current_addr = pin + .current_addr + .expect("will always be set before this line is called"); + + return Poll::Ready(Some(Ok((frame, current_addr)))); + } + + // if this line has been reached then decode has returned `None`. + pin.is_readable = false; + pin.rd.clear(); + } + + // We're out of data. Try and fetch more data to decode + let addr = unsafe { + // Convert `&mut [MaybeUnit<u8>]` to `&mut [u8]` because we will be + // writing to it via `poll_recv_from` and therefore initializing the memory. + let buf = &mut *(pin.rd.chunk_mut() as *mut _ as *mut [MaybeUninit<u8>]); + let mut read = ReadBuf::uninit(buf); + let ptr = read.filled().as_ptr(); + let res = ready!(pin.socket.borrow().poll_recv_from(cx, &mut read)); + + assert_eq!(ptr, read.filled().as_ptr()); + let addr = res?; + pin.rd.advance_mut(read.filled().len()); + addr + }; + + pin.current_addr = Some(addr); + pin.is_readable = true; + } + } +} + +impl<I, C, T> Sink<(I, SocketAddr)> for UdpFramed<C, T> +where + T: Borrow<UdpSocket>, + C: Encoder<I>, +{ + type Error = C::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if !self.flushed { + match self.poll_flush(cx)? { + Poll::Ready(()) => {} + Poll::Pending => return Poll::Pending, + } + } + + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, item: (I, SocketAddr)) -> Result<(), Self::Error> { + let (frame, out_addr) = item; + + let pin = self.get_mut(); + + pin.codec.encode(frame, &mut pin.wr)?; + pin.out_addr = out_addr; + pin.flushed = false; + + Ok(()) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.flushed { + return Poll::Ready(Ok(())); + } + + let Self { + ref socket, + ref mut out_addr, + ref mut wr, + .. + } = *self; + + let n = ready!(socket.borrow().poll_send_to(cx, wr, *out_addr))?; + + let wrote_all = n == self.wr.len(); + self.wr.clear(); + self.flushed = true; + + let res = if wrote_all { + Ok(()) + } else { + Err(io::Error::new( + io::ErrorKind::Other, + "failed to write entire datagram to socket", + ) + .into()) + }; + + Poll::Ready(res) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + ready!(self.poll_flush(cx))?; + Poll::Ready(Ok(())) + } +} + +impl<C, T> UdpFramed<C, T> +where + T: Borrow<UdpSocket>, +{ + /// Create a new `UdpFramed` backed by the given socket and codec. + /// + /// See struct level documentation for more details. + pub fn new(socket: T, codec: C) -> UdpFramed<C, T> { + Self { + socket, + codec, + out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)), + rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY), + wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY), + flushed: true, + is_readable: false, + current_addr: None, + } + } + + /// Returns a reference to the underlying I/O stream wrapped by `Framed`. + /// + /// # Note + /// + /// Care should be taken to not tamper with the underlying stream of data + /// coming in as it may corrupt the stream of frames otherwise being worked + /// with. + pub fn get_ref(&self) -> &T { + &self.socket + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by `Framed`. + /// + /// # Note + /// + /// Care should be taken to not tamper with the underlying stream of data + /// coming in as it may corrupt the stream of frames otherwise being worked + /// with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.socket + } + + /// Returns a reference to the underlying codec wrapped by + /// `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying codec + /// as it may corrupt the stream of frames otherwise being worked with. + pub fn codec(&self) -> &C { + &self.codec + } + + /// Returns a mutable reference to the underlying codec wrapped by + /// `UdpFramed`. + /// + /// Note that care should be taken to not tamper with the underlying codec + /// as it may corrupt the stream of frames otherwise being worked with. + pub fn codec_mut(&mut self) -> &mut C { + &mut self.codec + } + + /// Returns a reference to the read buffer. + pub fn read_buffer(&self) -> &BytesMut { + &self.rd + } + + /// Returns a mutable reference to the read buffer. + pub fn read_buffer_mut(&mut self) -> &mut BytesMut { + &mut self.rd + } + + /// Consumes the `Framed`, returning its underlying I/O stream. + pub fn into_inner(self) -> T { + self.socket + } +} diff --git a/vendor/tokio-util/src/udp/mod.rs b/vendor/tokio-util/src/udp/mod.rs new file mode 100644 index 000000000..f88ea030a --- /dev/null +++ b/vendor/tokio-util/src/udp/mod.rs @@ -0,0 +1,4 @@ +//! UDP framing + +mod frame; +pub use frame::UdpFramed; |