diff options
Diffstat (limited to 'third_party/rust/tokio/src/net/udp')
-rw-r--r-- | third_party/rust/tokio/src/net/udp/mod.rs | 7 | ||||
-rw-r--r-- | third_party/rust/tokio/src/net/udp/socket.rs | 425 | ||||
-rw-r--r-- | third_party/rust/tokio/src/net/udp/split.rs | 148 |
3 files changed, 580 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/net/udp/mod.rs b/third_party/rust/tokio/src/net/udp/mod.rs new file mode 100644 index 0000000000..d43121a1ca --- /dev/null +++ b/third_party/rust/tokio/src/net/udp/mod.rs @@ -0,0 +1,7 @@ +//! UDP utility types. + +pub(crate) mod socket; +pub(crate) use socket::UdpSocket; + +mod split; +pub use split::{RecvHalf, ReuniteError, SendHalf}; diff --git a/third_party/rust/tokio/src/net/udp/socket.rs b/third_party/rust/tokio/src/net/udp/socket.rs new file mode 100644 index 0000000000..faf1dca615 --- /dev/null +++ b/third_party/rust/tokio/src/net/udp/socket.rs @@ -0,0 +1,425 @@ +use crate::future::poll_fn; +use crate::io::PollEvented; +use crate::net::udp::split::{split, RecvHalf, SendHalf}; +use crate::net::ToSocketAddrs; + +use std::convert::TryFrom; +use std::fmt; +use std::io; +use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::task::{Context, Poll}; + +cfg_udp! { + /// A UDP socket + pub struct UdpSocket { + io: PollEvented<mio::net::UdpSocket>, + } +} + +impl UdpSocket { + /// This function will create a new UDP socket and attempt to bind it to + /// the `addr` provided. + pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket> { + let addrs = addr.to_socket_addrs().await?; + let mut last_err = None; + + for addr in addrs { + match UdpSocket::bind_addr(addr) { + Ok(socket) => return Ok(socket), + Err(e) => last_err = Some(e), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any address", + ) + })) + } + + fn bind_addr(addr: SocketAddr) -> io::Result<UdpSocket> { + let sys = mio::net::UdpSocket::bind(&addr)?; + UdpSocket::new(sys) + } + + fn new(socket: mio::net::UdpSocket) -> io::Result<UdpSocket> { + let io = PollEvented::new(socket)?; + Ok(UdpSocket { io }) + } + + /// Creates a new `UdpSocket` from the previously bound socket provided. + /// + /// The socket given will be registered with the event loop that `handle` + /// is associated with. This function requires that `socket` has previously + /// been bound to an address to work correctly. + /// + /// This can be used in conjunction with net2's `UdpBuilder` interface to + /// configure a socket before it's handed off, such as setting options like + /// `reuse_address` or binding to multiple addresses. + /// + /// # Panics + /// + /// This function panics if thread-local runtime is not set. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> { + let io = mio::net::UdpSocket::from_socket(socket)?; + let io = PollEvented::new(io)?; + Ok(UdpSocket { io }) + } + + /// Splits the `UdpSocket` into a receive half and a send half. The two parts + /// can be used to receive and send datagrams concurrently, even from two + /// different tasks. + pub fn split(self) -> (RecvHalf, SendHalf) { + split(self) + } + + /// Returns the local address that this socket is bound to. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().local_addr() + } + + /// Connects the UDP socket setting the default destination for send() and + /// limiting packets that are read via recv from the address specified in + /// `addr`. + pub async fn connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<()> { + let addrs = addr.to_socket_addrs().await?; + let mut last_err = None; + + for addr in addrs { + match self.io.get_ref().connect(addr) { + Ok(_) => return Ok(()), + Err(e) => last_err = Some(e), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any address", + ) + })) + } + + /// Returns a future that sends data on the socket to the remote address to which it is connected. + /// On success, the future will resolve to the number of bytes written. + /// + /// The [`connect`] method will connect this socket to a remote address. The future + /// will resolve to an error if the socket is not connected. + /// + /// [`connect`]: #method.connect + pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { + poll_fn(|cx| self.poll_send(cx, buf)).await + } + + // Poll IO functions that takes `&self` are provided for the split API. + // + // They are not public because (taken from the doc of `PollEvented`): + // + // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the + // caller must ensure that there are at most two tasks that use a + // `PollEvented` instance concurrently. One for reading and one for writing. + // While violating this requirement is "safe" from a Rust memory model point + // of view, it will result in unexpected behavior in the form of lost + // notifications and tasks hanging. + #[doc(hidden)] + pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { + ready!(self.io.poll_write_ready(cx))?; + + match self.io.get_ref().send(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready(cx)?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + /// Returns a future that receives a single datagram message on the socket from + /// the remote address to which it is connected. On success, the future will resolve + /// to the number of bytes read. + /// + /// The function must be called with valid byte array `buf` of sufficient size to + /// hold the message bytes. If a message is too long to fit in the supplied buffer, + /// excess bytes may be discarded. + /// + /// The [`connect`] method will connect this socket to a remote address. The future + /// will fail if the socket is not connected. + /// + /// [`connect`]: #method.connect + pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { + poll_fn(|cx| self.poll_recv(cx, buf)).await + } + + #[doc(hidden)] + pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + + match self.io.get_ref().recv(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + /// Returns a future that sends data on the socket to the given address. + /// On success, the future will resolve to the number of bytes written. + /// + /// The future will resolve to an error if the IP version of the socket does + /// not match that of `target`. + pub async fn send_to<A: ToSocketAddrs>(&mut self, buf: &[u8], target: A) -> io::Result<usize> { + let mut addrs = target.to_socket_addrs().await?; + + match addrs.next() { + Some(target) => poll_fn(|cx| self.poll_send_to(cx, buf, &target)).await, + None => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "no addresses to send data to", + )), + } + } + + // TODO: Public or not? + #[doc(hidden)] + pub fn poll_send_to( + &self, + cx: &mut Context<'_>, + buf: &[u8], + target: &SocketAddr, + ) -> Poll<io::Result<usize>> { + ready!(self.io.poll_write_ready(cx))?; + + match self.io.get_ref().send_to(buf, target) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready(cx)?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + /// Returns a future that receives a single datagram on the socket. On success, + /// the future resolves to the number of bytes read and the origin. + /// + /// The function must be called with valid byte array `buf` of sufficient size + /// to hold the message bytes. If a message is too long to fit in the supplied + /// buffer, excess bytes may be discarded. + pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + poll_fn(|cx| self.poll_recv_from(cx, buf)).await + } + + #[doc(hidden)] + pub fn poll_recv_from( + &self, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<Result<(usize, SocketAddr), io::Error>> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + + match self.io.get_ref().recv_from(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + /// Gets the value of the `SO_BROADCAST` option for this socket. + /// + /// For more information about this option, see [`set_broadcast`]. + /// + /// [`set_broadcast`]: #method.set_broadcast + pub fn broadcast(&self) -> io::Result<bool> { + self.io.get_ref().broadcast() + } + + /// Sets the value of the `SO_BROADCAST` option for this socket. + /// + /// When enabled, this socket is allowed to send packets to a broadcast + /// address. + pub fn set_broadcast(&self, on: bool) -> io::Result<()> { + self.io.get_ref().set_broadcast(on) + } + + /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. + /// + /// For more information about this option, see [`set_multicast_loop_v4`]. + /// + /// [`set_multicast_loop_v4`]: #method.set_multicast_loop_v4 + pub fn multicast_loop_v4(&self) -> io::Result<bool> { + self.io.get_ref().multicast_loop_v4() + } + + /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. + /// + /// If enabled, multicast packets will be looped back to the local socket. + /// + /// # Note + /// + /// This may not have any affect on IPv6 sockets. + pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { + self.io.get_ref().set_multicast_loop_v4(on) + } + + /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. + /// + /// For more information about this option, see [`set_multicast_ttl_v4`]. + /// + /// [`set_multicast_ttl_v4`]: #method.set_multicast_ttl_v4 + pub fn multicast_ttl_v4(&self) -> io::Result<u32> { + self.io.get_ref().multicast_ttl_v4() + } + + /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. + /// + /// Indicates the time-to-live value of outgoing multicast packets for + /// this socket. The default value is 1 which means that multicast packets + /// don't leave the local network unless explicitly requested. + /// + /// # Note + /// + /// This may not have any affect on IPv6 sockets. + pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { + self.io.get_ref().set_multicast_ttl_v4(ttl) + } + + /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. + /// + /// For more information about this option, see [`set_multicast_loop_v6`]. + /// + /// [`set_multicast_loop_v6`]: #method.set_multicast_loop_v6 + pub fn multicast_loop_v6(&self) -> io::Result<bool> { + self.io.get_ref().multicast_loop_v6() + } + + /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. + /// + /// Controls whether this socket sees the multicast packets it sends itself. + /// + /// # Note + /// + /// This may not have any affect on IPv4 sockets. + pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { + self.io.get_ref().set_multicast_loop_v6(on) + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// For more information about this option, see [`set_ttl`]. + /// + /// [`set_ttl`]: #method.set_ttl + pub fn ttl(&self) -> io::Result<u32> { + self.io.get_ref().ttl() + } + + /// Sets the value for the `IP_TTL` option on this socket. + /// + /// This value sets the time-to-live field that is used in every packet sent + /// from this socket. + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.io.get_ref().set_ttl(ttl) + } + + /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. + /// + /// This function specifies a new multicast group for this socket to join. + /// The address must be a valid multicast address, and `interface` is the + /// address of the local interface with which the system should join the + /// multicast group. If it's equal to `INADDR_ANY` then an appropriate + /// interface is chosen by the system. + pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { + self.io.get_ref().join_multicast_v4(&multiaddr, &interface) + } + + /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. + /// + /// This function specifies a new multicast group for this socket to join. + /// The address must be a valid multicast address, and `interface` is the + /// index of the interface to join/leave (or 0 to indicate any interface). + pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { + self.io.get_ref().join_multicast_v6(multiaddr, interface) + } + + /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. + /// + /// For more information about this option, see [`join_multicast_v4`]. + /// + /// [`join_multicast_v4`]: #method.join_multicast_v4 + pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { + self.io.get_ref().leave_multicast_v4(&multiaddr, &interface) + } + + /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. + /// + /// For more information about this option, see [`join_multicast_v6`]. + /// + /// [`join_multicast_v6`]: #method.join_multicast_v6 + pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { + self.io.get_ref().leave_multicast_v6(multiaddr, interface) + } +} + +impl TryFrom<UdpSocket> for mio::net::UdpSocket { + type Error = io::Error; + + /// Consumes value, returning the mio I/O object. + /// + /// See [`PollEvented::into_inner`] for more details about + /// resource deregistration that happens during the call. + /// + /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner + fn try_from(value: UdpSocket) -> Result<Self, Self::Error> { + value.io.into_inner() + } +} + +impl TryFrom<net::UdpSocket> for UdpSocket { + type Error = io::Error; + + /// Consumes stream, returning the tokio I/O object. + /// + /// This is equivalent to + /// [`UdpSocket::from_std(stream)`](UdpSocket::from_std). + fn try_from(stream: net::UdpSocket) -> Result<Self, Self::Error> { + Self::from_std(stream) + } +} + +impl fmt::Debug for UdpSocket { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +#[cfg(all(unix))] +mod sys { + use super::UdpSocket; + use std::os::unix::prelude::*; + + impl AsRawFd for UdpSocket { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } + } +} + +#[cfg(windows)] +mod sys { + // TODO: let's land these upstream with mio and then we can add them here. + // + // use std::os::windows::prelude::*; + // use super::UdpSocket; + // + // impl AsRawHandle for UdpSocket { + // fn as_raw_handle(&self) -> RawHandle { + // self.io.get_ref().as_raw_handle() + // } + // } +} diff --git a/third_party/rust/tokio/src/net/udp/split.rs b/third_party/rust/tokio/src/net/udp/split.rs new file mode 100644 index 0000000000..55542cb631 --- /dev/null +++ b/third_party/rust/tokio/src/net/udp/split.rs @@ -0,0 +1,148 @@ +//! [`UdpSocket`](../struct.UdpSocket.html) split support. +//! +//! The [`split`](../struct.UdpSocket.html#method.split) method splits a +//! `UdpSocket` into a receive half and a send half, which can be used to +//! receive and send datagrams concurrently, even from two different tasks. +//! +//! The halves provide access to the underlying socket, implementing +//! `AsRef<UdpSocket>`. This allows you to call `UdpSocket` methods that takes +//! `&self`, e.g., to get local address, to get and set socket options, to join +//! or leave multicast groups, etc. +//! +//! The halves can be reunited to the original socket with their `reunite` +//! methods. + +use crate::future::poll_fn; +use crate::net::udp::UdpSocket; + +use std::error::Error; +use std::fmt; +use std::io; +use std::net::SocketAddr; +use std::sync::Arc; + +/// The send half after [`split`](super::UdpSocket::split). +/// +/// Use [`send_to`](#method.send_to) or [`send`](#method.send) to send +/// datagrams. +#[derive(Debug)] +pub struct SendHalf(Arc<UdpSocket>); + +/// The recv half after [`split`](super::UdpSocket::split). +/// +/// Use [`recv_from`](#method.recv_from) or [`recv`](#method.recv) to receive +/// datagrams. +#[derive(Debug)] +pub struct RecvHalf(Arc<UdpSocket>); + +pub(crate) fn split(socket: UdpSocket) -> (RecvHalf, SendHalf) { + let shared = Arc::new(socket); + let send = shared.clone(); + let recv = shared; + (RecvHalf(recv), SendHalf(send)) +} + +/// Error indicating two halves were not from the same socket, and thus could +/// not be `reunite`d. +#[derive(Debug)] +pub struct ReuniteError(pub SendHalf, pub RecvHalf); + +impl fmt::Display for ReuniteError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "tried to reunite halves that are not from the same socket" + ) + } +} + +impl Error for ReuniteError {} + +fn reunite(s: SendHalf, r: RecvHalf) -> Result<UdpSocket, ReuniteError> { + if Arc::ptr_eq(&s.0, &r.0) { + drop(r); + // Only two instances of the `Arc` are ever created, one for the + // receiver and one for the sender, and those `Arc`s are never exposed + // externally. And so when we drop one here, the other one must be the + // only remaining one. + Ok(Arc::try_unwrap(s.0).expect("udp: try_unwrap failed in reunite")) + } else { + Err(ReuniteError(s, r)) + } +} + +impl RecvHalf { + /// Attempts to put the two "halves" of a `UdpSocket` back together and + /// recover the original socket. Succeeds only if the two "halves" + /// originated from the same call to `UdpSocket::split`. + pub fn reunite(self, other: SendHalf) -> Result<UdpSocket, ReuniteError> { + reunite(other, self) + } + + /// Returns a future that receives a single datagram on the socket. On success, + /// the future resolves to the number of bytes read and the origin. + /// + /// The function must be called with valid byte array `buf` of sufficient size + /// to hold the message bytes. If a message is too long to fit in the supplied + /// buffer, excess bytes may be discarded. + pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + poll_fn(|cx| self.0.poll_recv_from(cx, buf)).await + } + + /// Returns a future that receives a single datagram message on the socket from + /// the remote address to which it is connected. On success, the future will resolve + /// to the number of bytes read. + /// + /// The function must be called with valid byte array `buf` of sufficient size to + /// hold the message bytes. If a message is too long to fit in the supplied buffer, + /// excess bytes may be discarded. + /// + /// The [`connect`] method will connect this socket to a remote address. The future + /// will fail if the socket is not connected. + /// + /// [`connect`]: super::UdpSocket::connect + pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { + poll_fn(|cx| self.0.poll_recv(cx, buf)).await + } +} + +impl SendHalf { + /// Attempts to put the two "halves" of a `UdpSocket` back together and + /// recover the original socket. Succeeds only if the two "halves" + /// originated from the same call to `UdpSocket::split`. + pub fn reunite(self, other: RecvHalf) -> Result<UdpSocket, ReuniteError> { + reunite(self, other) + } + + /// Returns a future that sends data on the socket to the given address. + /// On success, the future will resolve to the number of bytes written. + /// + /// The future will resolve to an error if the IP version of the socket does + /// not match that of `target`. + pub async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { + poll_fn(|cx| self.0.poll_send_to(cx, buf, target)).await + } + + /// Returns a future that sends data on the socket to the remote address to which it is connected. + /// On success, the future will resolve to the number of bytes written. + /// + /// The [`connect`] method will connect this socket to a remote address. The future + /// will resolve to an error if the socket is not connected. + /// + /// [`connect`]: super::UdpSocket::connect + pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { + poll_fn(|cx| self.0.poll_send(cx, buf)).await + } +} + +impl AsRef<UdpSocket> for SendHalf { + fn as_ref(&self) -> &UdpSocket { + &self.0 + } +} + +impl AsRef<UdpSocket> for RecvHalf { + fn as_ref(&self) -> &UdpSocket { + &self.0 + } +} |