use crate::io::{Interest, PollEvented, ReadBuf, Ready}; use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; use std::fmt; use std::io; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::task::{Context, Poll}; cfg_io_util! { use bytes::BufMut; } cfg_net! { /// A UDP socket. /// /// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket` /// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`: /// /// * one to many: [`bind`](`UdpSocket::bind`) and use [`send_to`](`UdpSocket::send_to`) /// and [`recv_from`](`UdpSocket::recv_from`) to communicate with many different addresses /// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`send`](`UdpSocket::send`) /// and [`recv`](`UdpSocket::recv`) to communicate only with that remote address /// /// This type does not provide a `split` method, because this functionality /// can be achieved by instead wrapping the socket in an [`Arc`]. Note that /// you do not need a `Mutex` to share the `UdpSocket` — an `Arc` /// is enough. This is because all of the methods take `&self` instead of /// `&mut self`. Once you have wrapped it in an `Arc`, you can call /// `.clone()` on the `Arc` to get multiple shared handles to the /// same socket. An example of such usage can be found further down. /// /// [`Arc`]: std::sync::Arc /// /// # Streams /// /// If you need to listen over UDP and produce a [`Stream`], you can look /// at [`UdpFramed`]. /// /// [`UdpFramed`]: https://docs.rs/tokio-util/latest/tokio_util/udp/struct.UdpFramed.html /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html /// /// # Example: one to many (bind) /// /// Using `bind` we can create a simple echo server that sends and recv's with many different clients: /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; /// let mut buf = [0; 1024]; /// loop { /// let (len, addr) = sock.recv_from(&mut buf).await?; /// println!("{:?} bytes received from {:?}", len, addr); /// /// let len = sock.send_to(&buf[..len], addr).await?; /// println!("{:?} bytes sent", len); /// } /// } /// ``` /// /// # Example: one to one (connect) /// /// Or using `connect` we can echo with a single remote address using `send` and `recv`: /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; /// /// let remote_addr = "127.0.0.1:59611"; /// sock.connect(remote_addr).await?; /// let mut buf = [0; 1024]; /// loop { /// let len = sock.recv(&mut buf).await?; /// println!("{:?} bytes received from {:?}", len, remote_addr); /// /// let len = sock.send(&buf[..len]).await?; /// println!("{:?} bytes sent", len); /// } /// } /// ``` /// /// # Example: Splitting with `Arc` /// /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright /// to use an `Arc` and share the references to multiple tasks. /// Here is a similar "echo" example that supports concurrent /// sending/receiving: /// /// ```no_run /// use tokio::{net::UdpSocket, sync::mpsc}; /// use std::{io, net::SocketAddr, sync::Arc}; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::().unwrap()).await?; /// let r = Arc::new(sock); /// let s = r.clone(); /// let (tx, mut rx) = mpsc::channel::<(Vec, SocketAddr)>(1_000); /// /// tokio::spawn(async move { /// while let Some((bytes, addr)) = rx.recv().await { /// let len = s.send_to(&bytes, &addr).await.unwrap(); /// println!("{:?} bytes sent", len); /// } /// }); /// /// let mut buf = [0; 1024]; /// loop { /// let (len, addr) = r.recv_from(&mut buf).await?; /// println!("{:?} bytes received from {:?}", len, addr); /// tx.send((buf[..len].to_vec(), addr)).await.unwrap(); /// } /// } /// ``` /// pub struct UdpSocket { io: PollEvented, } } impl UdpSocket { /// This function will create a new UDP socket and attempt to bind it to /// the `addr` provided. /// /// Binding with a port number of 0 will request that the OS assigns a port /// to this listener. The port allocated can be queried via the `local_addr` /// method. /// /// # Example /// /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; /// // use `sock` /// # let _ = sock; /// Ok(()) /// } /// ``` pub async fn bind(addr: A) -> io::Result { let addrs = to_socket_addrs(addr).await?; let mut last_err = None; for addr in addrs { match UdpSocket::bind_addr(addr) { Ok(socket) => return Ok(socket), Err(e) => last_err = Some(e), } } Err(last_err.unwrap_or_else(|| { io::Error::new( io::ErrorKind::InvalidInput, "could not resolve to any address", ) })) } fn bind_addr(addr: SocketAddr) -> io::Result { let sys = mio::net::UdpSocket::bind(addr)?; UdpSocket::new(sys) } fn new(socket: mio::net::UdpSocket) -> io::Result { let io = PollEvented::new(socket)?; Ok(UdpSocket { io }) } /// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`. /// /// This function is intended to be used to wrap a UDP socket from the /// standard library in the Tokio equivalent. The conversion assumes nothing /// about the underlying socket; it is left up to the user to set it in /// non-blocking mode. /// /// This can be used in conjunction with socket2's `Socket` interface to /// configure a socket before it's handed off, such as setting options like /// `reuse_address` or binding to multiple addresses. /// /// # Panics /// /// This function panics if thread-local runtime is not set. /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. /// /// # Example /// /// ```no_run /// use tokio::net::UdpSocket; /// # use std::{io, net::SocketAddr}; /// /// # #[tokio::main] /// # async fn main() -> io::Result<()> { /// let addr = "0.0.0.0:8080".parse::().unwrap(); /// let std_sock = std::net::UdpSocket::bind(addr)?; /// std_sock.set_nonblocking(true)?; /// let sock = UdpSocket::from_std(std_sock)?; /// // use `sock` /// # Ok(()) /// # } /// ``` pub fn from_std(socket: net::UdpSocket) -> io::Result { let io = mio::net::UdpSocket::from_std(socket); UdpSocket::new(io) } /// Turns a [`tokio::net::UdpSocket`] into a [`std::net::UdpSocket`]. /// /// The returned [`std::net::UdpSocket`] will have nonblocking mode set as /// `true`. Use [`set_nonblocking`] to change the blocking mode if needed. /// /// # Examples /// /// ```rust,no_run /// use std::error::Error; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let tokio_socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await?; /// let std_socket = tokio_socket.into_std()?; /// std_socket.set_nonblocking(false)?; /// Ok(()) /// } /// ``` /// /// [`tokio::net::UdpSocket`]: UdpSocket /// [`std::net::UdpSocket`]: std::net::UdpSocket /// [`set_nonblocking`]: fn@std::net::UdpSocket::set_nonblocking pub fn into_std(self) -> io::Result { #[cfg(unix)] { use std::os::unix::io::{FromRawFd, IntoRawFd}; self.io .into_inner() .map(|io| io.into_raw_fd()) .map(|raw_fd| unsafe { std::net::UdpSocket::from_raw_fd(raw_fd) }) } #[cfg(windows)] { use std::os::windows::io::{FromRawSocket, IntoRawSocket}; self.io .into_inner() .map(|io| io.into_raw_socket()) .map(|raw_socket| unsafe { std::net::UdpSocket::from_raw_socket(raw_socket) }) } } /// Returns the local address that this socket is bound to. /// /// # Example /// /// ```no_run /// use tokio::net::UdpSocket; /// # use std::{io, net::SocketAddr}; /// /// # #[tokio::main] /// # async fn main() -> io::Result<()> { /// let addr = "0.0.0.0:8080".parse::().unwrap(); /// let sock = UdpSocket::bind(addr).await?; /// // the address the socket is bound to /// let local_addr = sock.local_addr()?; /// # Ok(()) /// # } /// ``` pub fn local_addr(&self) -> io::Result { self.io.local_addr() } /// Connects the UDP socket setting the default destination for send() and /// limiting packets that are read via recv from the address specified in /// `addr`. /// /// # Example /// /// ```no_run /// use tokio::net::UdpSocket; /// # use std::{io, net::SocketAddr}; /// /// # #[tokio::main] /// # async fn main() -> io::Result<()> { /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::().unwrap()).await?; /// /// let remote_addr = "127.0.0.1:59600".parse::().unwrap(); /// sock.connect(remote_addr).await?; /// let mut buf = [0u8; 32]; /// // recv from remote_addr /// let len = sock.recv(&mut buf).await?; /// // send to remote_addr /// let _len = sock.send(&buf[..len]).await?; /// # Ok(()) /// # } /// ``` pub async fn connect(&self, addr: A) -> io::Result<()> { let addrs = to_socket_addrs(addr).await?; let mut last_err = None; for addr in addrs { match self.io.connect(addr) { Ok(_) => return Ok(()), Err(e) => last_err = Some(e), } } Err(last_err.unwrap_or_else(|| { io::Error::new( io::ErrorKind::InvalidInput, "could not resolve to any address", ) })) } /// Waits for any of the requested ready states. /// /// This function is usually paired with `try_recv()` or `try_send()`. It /// can be used to concurrently recv / send to the same socket on a single /// task without splitting the socket. /// /// The function may complete without the socket being ready. This is a /// false-positive and attempting an operation will return with /// `io::ErrorKind::WouldBlock`. /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method /// will continue to return immediately until the readiness event is /// consumed by an attempt to read or write that fails with `WouldBlock` or /// `Poll::Pending`. /// /// # Examples /// /// Concurrently receive from and send to the socket on the same task /// without splitting. /// /// ```no_run /// use tokio::io::{self, Interest}; /// use tokio::net::UdpSocket; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// socket.connect("127.0.0.1:8081").await?; /// /// loop { /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?; /// /// if ready.is_readable() { /// // The buffer is **not** included in the async task and will only exist /// // on the stack. /// let mut data = [0; 1024]; /// match socket.try_recv(&mut data[..]) { /// Ok(n) => { /// println!("received {:?}", &data[..n]); /// } /// // False-positive, continue /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} /// Err(e) => { /// return Err(e); /// } /// } /// } /// /// if ready.is_writable() { /// // Write some data /// match socket.try_send(b"hello world") { /// Ok(n) => { /// println!("sent {} bytes", n); /// } /// // False-positive, continue /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} /// Err(e) => { /// return Err(e); /// } /// } /// } /// } /// } /// ``` pub async fn ready(&self, interest: Interest) -> io::Result { let event = self.io.registration().readiness(interest).await?; Ok(event.ready) } /// Waits for the socket to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is /// usually paired with `try_send()` or `try_send_to()`. /// /// The function may complete without the socket being writable. This is a /// false-positive and attempting a `try_send()` will return with /// `io::ErrorKind::WouldBlock`. /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method /// will continue to return immediately until the readiness event is /// consumed by an attempt to write that fails with `WouldBlock` or /// `Poll::Pending`. /// /// # Examples /// /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// // Bind socket /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// socket.connect("127.0.0.1:8081").await?; /// /// loop { /// // Wait for the socket to be writable /// socket.writable().await?; /// /// // Try to send data, this may still fail with `WouldBlock` /// // if the readiness event is a false positive. /// match socket.try_send(b"hello world") { /// Ok(n) => { /// break; /// } /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { /// return Err(e); /// } /// } /// } /// /// Ok(()) /// } /// ``` pub async fn writable(&self) -> io::Result<()> { self.ready(Interest::WRITABLE).await?; Ok(()) } /// Polls for write/send readiness. /// /// If the udp stream is not currently ready for sending, this method will /// store a clone of the `Waker` from the provided `Context`. When the udp /// stream becomes ready for sending, `Waker::wake` will be called on the /// waker. /// /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only /// the `Waker` from the `Context` passed to the most recent call is /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a /// second, independent waker.) /// /// This function is intended for cases where creating and pinning a future /// via [`writable`] is not feasible. Where possible, using [`writable`] is /// preferred, as this supports polling from multiple tasks at once. /// /// # Return value /// /// The function returns: /// /// * `Poll::Pending` if the udp stream is not ready for writing. /// * `Poll::Ready(Ok(()))` if the udp stream is ready for writing. /// * `Poll::Ready(Err(e))` if an error is encountered. /// /// # Errors /// /// This function may encounter any standard I/O error except `WouldBlock`. /// /// [`writable`]: method@Self::writable pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll> { self.io.registration().poll_write_ready(cx).map_ok(|_| ()) } /// Sends data on the socket to the remote address that the socket is /// connected to. /// /// The [`connect`] method will connect this socket to a remote address. /// This method will fail if the socket is not connected. /// /// [`connect`]: method@Self::connect /// /// # Return /// /// On success, the number of bytes sent is returned, otherwise, the /// encountered error is returned. /// /// # Cancel safety /// /// This method is cancel safe. If `send` is used as the event in a /// [`tokio::select!`](crate::select) statement and some other branch /// completes first, then it is guaranteed that the message was not sent. /// /// # Examples /// /// ```no_run /// use tokio::io; /// use tokio::net::UdpSocket; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// // Bind socket /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// socket.connect("127.0.0.1:8081").await?; /// /// // Send a message /// socket.send(b"hello world").await?; /// /// Ok(()) /// } /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result { self.io .registration() .async_io(Interest::WRITABLE, || self.io.send(buf)) .await } /// Attempts to send data on the socket to the remote address to which it /// was previously `connect`ed. /// /// The [`connect`] method will connect this socket to a remote address. /// This method will fail if the socket is not connected. /// /// Note that on multiple calls to a `poll_*` method in the send direction, /// only the `Waker` from the `Context` passed to the most recent call will /// be scheduled to receive a wakeup. /// /// # Return value /// /// The function returns: /// /// * `Poll::Pending` if the socket is not available to write /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent /// * `Poll::Ready(Err(e))` if an error is encountered. /// /// # Errors /// /// This function may encounter any standard I/O error except `WouldBlock`. /// /// [`connect`]: method@Self::connect pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { self.io .registration() .poll_write_io(cx, || self.io.send(buf)) } /// Tries to send data on the socket to the remote address to which it is /// connected. /// /// When the socket buffer is full, `Err(io::ErrorKind::WouldBlock)` is /// returned. This function is usually paired with `writable()`. /// /// # Returns /// /// If successful, `Ok(n)` is returned, where `n` is the number of bytes /// sent. If the socket is not ready to send data, /// `Err(ErrorKind::WouldBlock)` is returned. /// /// # Examples /// /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// // Bind a UDP socket /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// /// // Connect to a peer /// socket.connect("127.0.0.1:8081").await?; /// /// loop { /// // Wait for the socket to be writable /// socket.writable().await?; /// /// // Try to send data, this may still fail with `WouldBlock` /// // if the readiness event is a false positive. /// match socket.try_send(b"hello world") { /// Ok(n) => { /// break; /// } /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { /// return Err(e); /// } /// } /// } /// /// Ok(()) /// } /// ``` pub fn try_send(&self, buf: &[u8]) -> io::Result { self.io .registration() .try_io(Interest::WRITABLE, || self.io.send(buf)) } /// Waits for the socket to become readable. /// /// This function is equivalent to `ready(Interest::READABLE)` and is usually /// paired with `try_recv()`. /// /// The function may complete without the socket being readable. This is a /// false-positive and attempting a `try_recv()` will return with /// `io::ErrorKind::WouldBlock`. /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method /// will continue to return immediately until the readiness event is /// consumed by an attempt to read that fails with `WouldBlock` or /// `Poll::Pending`. /// /// # Examples /// /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// // Connect to a peer /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// socket.connect("127.0.0.1:8081").await?; /// /// loop { /// // Wait for the socket to be readable /// socket.readable().await?; /// /// // The buffer is **not** included in the async task and will /// // only exist on the stack. /// let mut buf = [0; 1024]; /// /// // Try to recv data, this may still fail with `WouldBlock` /// // if the readiness event is a false positive. /// match socket.try_recv(&mut buf) { /// Ok(n) => { /// println!("GOT {:?}", &buf[..n]); /// break; /// } /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { /// return Err(e); /// } /// } /// } /// /// Ok(()) /// } /// ``` pub async fn readable(&self) -> io::Result<()> { self.ready(Interest::READABLE).await?; Ok(()) } /// Polls for read/receive readiness. /// /// If the udp stream is not currently ready for receiving, this method will /// store a clone of the `Waker` from the provided `Context`. When the udp /// socket becomes ready for reading, `Waker::wake` will be called on the /// waker. /// /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or /// `poll_peek`, only the `Waker` from the `Context` passed to the most /// recent call is scheduled to receive a wakeup. (However, /// `poll_send_ready` retains a second, independent waker.) /// /// This function is intended for cases where creating and pinning a future /// via [`readable`] is not feasible. Where possible, using [`readable`] is /// preferred, as this supports polling from multiple tasks at once. /// /// # Return value /// /// The function returns: /// /// * `Poll::Pending` if the udp stream is not ready for reading. /// * `Poll::Ready(Ok(()))` if the udp stream is ready for reading. /// * `Poll::Ready(Err(e))` if an error is encountered. /// /// # Errors /// /// This function may encounter any standard I/O error except `WouldBlock`. /// /// [`readable`]: method@Self::readable pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll> { self.io.registration().poll_read_ready(cx).map_ok(|_| ()) } /// Receives a single datagram message on the socket from the remote address /// to which it is connected. On success, returns the number of bytes read. /// /// The function must be called with valid byte array `buf` of sufficient /// size to hold the message bytes. If a message is too long to fit in the /// supplied buffer, excess bytes may be discarded. /// /// The [`connect`] method will connect this socket to a remote address. /// This method will fail if the socket is not connected. /// /// # Cancel safety /// /// This method is cancel safe. If `recv_from` is used as the event in a /// [`tokio::select!`](crate::select) statement and some other branch /// completes first, it is guaranteed that no messages were received on this /// socket. /// /// [`connect`]: method@Self::connect /// /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// // Bind socket /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// socket.connect("127.0.0.1:8081").await?; /// /// let mut buf = vec![0; 10]; /// let n = socket.recv(&mut buf).await?; /// /// println!("received {} bytes {:?}", n, &buf[..n]); /// /// Ok(()) /// } /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result { self.io .registration() .async_io(Interest::READABLE, || self.io.recv(buf)) .await } /// Attempts to receive a single datagram message on the socket from the remote /// address to which it is `connect`ed. /// /// The [`connect`] method will connect this socket to a remote address. This method /// resolves to an error if the socket is not connected. /// /// Note that on multiple calls to a `poll_*` method in the recv direction, only the /// `Waker` from the `Context` passed to the most recent call will be scheduled to /// receive a wakeup. /// /// # Return value /// /// The function returns: /// /// * `Poll::Pending` if the socket is not ready to read /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready /// * `Poll::Ready(Err(e))` if an error is encountered. /// /// # Errors /// /// This function may encounter any standard I/O error except `WouldBlock`. /// /// [`connect`]: method@Self::connect pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { let n = ready!(self.io.registration().poll_read_io(cx, || { // Safety: will not read the maybe uninitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) }; self.io.recv(b) }))?; // Safety: We trust `recv` to have filled up `n` bytes in the buffer. unsafe { buf.assume_init(n); } buf.advance(n); Poll::Ready(Ok(())) } /// Tries to receive a single datagram message on the socket from the remote /// address to which it is connected. On success, returns the number of /// bytes read. /// /// The function must be called with valid byte array buf of sufficient size /// to hold the message bytes. If a message is too long to fit in the /// supplied buffer, excess bytes may be discarded. /// /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is /// returned. This function is usually paired with `readable()`. /// /// # Examples /// /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// // Connect to a peer /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// socket.connect("127.0.0.1:8081").await?; /// /// loop { /// // Wait for the socket to be readable /// socket.readable().await?; /// /// // The buffer is **not** included in the async task and will /// // only exist on the stack. /// let mut buf = [0; 1024]; /// /// // Try to recv data, this may still fail with `WouldBlock` /// // if the readiness event is a false positive. /// match socket.try_recv(&mut buf) { /// Ok(n) => { /// println!("GOT {:?}", &buf[..n]); /// break; /// } /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { /// return Err(e); /// } /// } /// } /// /// Ok(()) /// } /// ``` pub fn try_recv(&self, buf: &mut [u8]) -> io::Result { self.io .registration() .try_io(Interest::READABLE, || self.io.recv(buf)) } cfg_io_util! { /// Tries to receive data from the stream into the provided buffer, advancing the /// buffer's internal cursor, returning how many bytes were read. /// /// The function must be called with valid byte array buf of sufficient size /// to hold the message bytes. If a message is too long to fit in the /// supplied buffer, excess bytes may be discarded. /// /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is /// returned. This function is usually paired with `readable()`. /// /// # Examples /// /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// // Connect to a peer /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// socket.connect("127.0.0.1:8081").await?; /// /// loop { /// // Wait for the socket to be readable /// socket.readable().await?; /// /// let mut buf = Vec::with_capacity(1024); /// /// // Try to recv data, this may still fail with `WouldBlock` /// // if the readiness event is a false positive. /// match socket.try_recv_buf(&mut buf) { /// Ok(n) => { /// println!("GOT {:?}", &buf[..n]); /// break; /// } /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { /// return Err(e); /// } /// } /// } /// /// Ok(()) /// } /// ``` pub fn try_recv_buf(&self, buf: &mut B) -> io::Result { self.io.registration().try_io(Interest::READABLE, || { let dst = buf.chunk_mut(); let dst = unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit] as *mut [u8]) }; // Safety: We trust `UdpSocket::recv` to have filled up `n` bytes in the // buffer. let n = (&*self.io).recv(dst)?; unsafe { buf.advance_mut(n); } Ok(n) }) } /// Tries to receive a single datagram message on the socket. On success, /// returns the number of bytes read and the origin. /// /// The function must be called with valid byte array buf of sufficient size /// to hold the message bytes. If a message is too long to fit in the /// supplied buffer, excess bytes may be discarded. /// /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is /// returned. This function is usually paired with `readable()`. /// /// # Examples /// /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// // Connect to a peer /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// /// loop { /// // Wait for the socket to be readable /// socket.readable().await?; /// /// let mut buf = Vec::with_capacity(1024); /// /// // Try to recv data, this may still fail with `WouldBlock` /// // if the readiness event is a false positive. /// match socket.try_recv_buf_from(&mut buf) { /// Ok((n, _addr)) => { /// println!("GOT {:?}", &buf[..n]); /// break; /// } /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { /// return Err(e); /// } /// } /// } /// /// Ok(()) /// } /// ``` pub fn try_recv_buf_from(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> { self.io.registration().try_io(Interest::READABLE, || { let dst = buf.chunk_mut(); let dst = unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit] as *mut [u8]) }; // Safety: We trust `UdpSocket::recv_from` to have filled up `n` bytes in the // buffer. let (n, addr) = (&*self.io).recv_from(dst)?; unsafe { buf.advance_mut(n); } Ok((n, addr)) }) } } /// Sends data on the socket to the given address. On success, returns the /// number of bytes written. /// /// Address type can be any implementor of [`ToSocketAddrs`] trait. See its /// documentation for concrete examples. /// /// It is possible for `addr` to yield multiple addresses, but `send_to` /// will only send data to the first address yielded by `addr`. /// /// This will return an error when the IP version of the local socket does /// not match that returned from [`ToSocketAddrs`]. /// /// [`ToSocketAddrs`]: crate::net::ToSocketAddrs /// /// # Cancel safety /// /// This method is cancel safe. If `send_to` is used as the event in a /// [`tokio::select!`](crate::select) statement and some other branch /// completes first, then it is guaranteed that the message was not sent. /// /// # Example /// /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// let len = socket.send_to(b"hello world", "127.0.0.1:8081").await?; /// /// println!("Sent {} bytes", len); /// /// Ok(()) /// } /// ``` pub async fn send_to(&self, buf: &[u8], target: A) -> io::Result { let mut addrs = to_socket_addrs(target).await?; match addrs.next() { Some(target) => self.send_to_addr(buf, target).await, None => Err(io::Error::new( io::ErrorKind::InvalidInput, "no addresses to send data to", )), } } /// Attempts to send data on the socket to a given address. /// /// Note that on multiple calls to a `poll_*` method in the send direction, only the /// `Waker` from the `Context` passed to the most recent call will be scheduled to /// receive a wakeup. /// /// # Return value /// /// The function returns: /// /// * `Poll::Pending` if the socket is not ready to write /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent. /// * `Poll::Ready(Err(e))` if an error is encountered. /// /// # Errors /// /// This function may encounter any standard I/O error except `WouldBlock`. pub fn poll_send_to( &self, cx: &mut Context<'_>, buf: &[u8], target: SocketAddr, ) -> Poll> { self.io .registration() .poll_write_io(cx, || self.io.send_to(buf, target)) } /// Tries to send data on the socket to the given address, but if the send is /// blocked this will return right away. /// /// This function is usually paired with `writable()`. /// /// # Returns /// /// If successful, returns the number of bytes sent /// /// Users should ensure that when the remote cannot receive, the /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur /// if the IP version of the socket does not match that of `target`. /// /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock /// /// # Example /// /// ```no_run /// use tokio::net::UdpSocket; /// use std::error::Error; /// use std::io; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// /// let dst = "127.0.0.1:8081".parse()?; /// /// loop { /// socket.writable().await?; /// /// match socket.try_send_to(&b"hello world"[..], dst) { /// Ok(sent) => { /// println!("sent {} bytes", sent); /// break; /// } /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { /// // Writable false positive. /// continue; /// } /// Err(e) => return Err(e.into()), /// } /// } /// /// Ok(()) /// } /// ``` pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result { self.io .registration() .try_io(Interest::WRITABLE, || self.io.send_to(buf, target)) } async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result { self.io .registration() .async_io(Interest::WRITABLE, || self.io.send_to(buf, target)) .await } /// Receives a single datagram message on the socket. On success, returns /// the number of bytes read and the origin. /// /// The function must be called with valid byte array `buf` of sufficient /// size to hold the message bytes. If a message is too long to fit in the /// supplied buffer, excess bytes may be discarded. /// /// # Cancel safety /// /// This method is cancel safe. If `recv_from` is used as the event in a /// [`tokio::select!`](crate::select) statement and some other branch /// completes first, it is guaranteed that no messages were received on this /// socket. /// /// # Example /// /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// /// let mut buf = vec![0u8; 32]; /// let (len, addr) = socket.recv_from(&mut buf).await?; /// /// println!("received {:?} bytes from {:?}", len, addr); /// /// Ok(()) /// } /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io .registration() .async_io(Interest::READABLE, || self.io.recv_from(buf)) .await } /// Attempts to receive a single datagram on the socket. /// /// Note that on multiple calls to a `poll_*` method in the recv direction, only the /// `Waker` from the `Context` passed to the most recent call will be scheduled to /// receive a wakeup. /// /// # Return value /// /// The function returns: /// /// * `Poll::Pending` if the socket is not ready to read /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready /// * `Poll::Ready(Err(e))` if an error is encountered. /// /// # Errors /// /// This function may encounter any standard I/O error except `WouldBlock`. pub fn poll_recv_from( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { // Safety: will not read the maybe uninitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) }; self.io.recv_from(b) }))?; // Safety: We trust `recv` to have filled up `n` bytes in the buffer. unsafe { buf.assume_init(n); } buf.advance(n); Poll::Ready(Ok(addr)) } /// Tries to receive a single datagram message on the socket. On success, /// returns the number of bytes read and the origin. /// /// The function must be called with valid byte array buf of sufficient size /// to hold the message bytes. If a message is too long to fit in the /// supplied buffer, excess bytes may be discarded. /// /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is /// returned. This function is usually paired with `readable()`. /// /// # Examples /// /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// // Connect to a peer /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// /// loop { /// // Wait for the socket to be readable /// socket.readable().await?; /// /// // The buffer is **not** included in the async task and will /// // only exist on the stack. /// let mut buf = [0; 1024]; /// /// // Try to recv data, this may still fail with `WouldBlock` /// // if the readiness event is a false positive. /// match socket.try_recv_from(&mut buf) { /// Ok((n, _addr)) => { /// println!("GOT {:?}", &buf[..n]); /// break; /// } /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; /// } /// Err(e) => { /// return Err(e); /// } /// } /// } /// /// Ok(()) /// } /// ``` pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io .registration() .try_io(Interest::READABLE, || self.io.recv_from(buf)) } /// Tries to read or write from the socket using a user-provided IO operation. /// /// If the socket is ready, the provided closure is called. The closure /// should attempt to perform IO operation from the socket by manually /// calling the appropriate syscall. If the operation fails because the /// socket is not actually ready, then the closure should return a /// `WouldBlock` error and the readiness flag is cleared. The return value /// of the closure is then returned by `try_io`. /// /// If the socket is not ready, then the closure is not called /// and a `WouldBlock` error is returned. /// /// The closure should only return a `WouldBlock` error if it has performed /// an IO operation on the socket that failed due to the socket not being /// ready. Returning a `WouldBlock` error in any other situation will /// incorrectly clear the readiness flag, which can cause the socket to /// behave incorrectly. /// /// The closure should not perform the IO operation using any of the methods /// defined on the Tokio `UdpSocket` type, as this will mess with the /// readiness flag and can cause the socket to behave incorrectly. /// /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. /// /// [`readable()`]: UdpSocket::readable() /// [`writable()`]: UdpSocket::writable() /// [`ready()`]: UdpSocket::ready() pub fn try_io( &self, interest: Interest, f: impl FnOnce() -> io::Result, ) -> io::Result { self.io.registration().try_io(interest, f) } /// Receives data from the socket, without removing it from the input queue. /// On success, returns the number of bytes read and the address from whence /// the data came. /// /// # Notes /// /// On Windows, if the data is larger than the buffer specified, the buffer /// is filled with the first part of the data, and peek_from returns the error /// WSAEMSGSIZE(10040). The excess data is lost. /// Make sure to always use a sufficiently large buffer to hold the /// maximum UDP packet size, which can be up to 65536 bytes in size. /// /// # Examples /// /// ```no_run /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// /// let mut buf = vec![0u8; 32]; /// let (len, addr) = socket.peek_from(&mut buf).await?; /// /// println!("peeked {:?} bytes from {:?}", len, addr); /// /// Ok(()) /// } /// ``` pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io .registration() .async_io(Interest::READABLE, || self.io.peek_from(buf)) .await } /// Receives data from the socket, without removing it from the input queue. /// On success, returns the number of bytes read. /// /// # Notes /// /// Note that on multiple calls to a `poll_*` method in the recv direction, only the /// `Waker` from the `Context` passed to the most recent call will be scheduled to /// receive a wakeup /// /// On Windows, if the data is larger than the buffer specified, the buffer /// is filled with the first part of the data, and peek returns the error /// WSAEMSGSIZE(10040). The excess data is lost. /// Make sure to always use a sufficiently large buffer to hold the /// maximum UDP packet size, which can be up to 65536 bytes in size. /// /// # Return value /// /// The function returns: /// /// * `Poll::Pending` if the socket is not ready to read /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready /// * `Poll::Ready(Err(e))` if an error is encountered. /// /// # Errors /// /// This function may encounter any standard I/O error except `WouldBlock`. pub fn poll_peek_from( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { // Safety: will not read the maybe uninitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) }; self.io.peek_from(b) }))?; // Safety: We trust `recv` to have filled up `n` bytes in the buffer. unsafe { buf.assume_init(n); } buf.advance(n); Poll::Ready(Ok(addr)) } /// Gets the value of the `SO_BROADCAST` option for this socket. /// /// For more information about this option, see [`set_broadcast`]. /// /// [`set_broadcast`]: method@Self::set_broadcast pub fn broadcast(&self) -> io::Result { self.io.broadcast() } /// Sets the value of the `SO_BROADCAST` option for this socket. /// /// When enabled, this socket is allowed to send packets to a broadcast /// address. pub fn set_broadcast(&self, on: bool) -> io::Result<()> { self.io.set_broadcast(on) } /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. /// /// For more information about this option, see [`set_multicast_loop_v4`]. /// /// [`set_multicast_loop_v4`]: method@Self::set_multicast_loop_v4 pub fn multicast_loop_v4(&self) -> io::Result { self.io.multicast_loop_v4() } /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. /// /// If enabled, multicast packets will be looped back to the local socket. /// /// # Note /// /// This may not have any affect on IPv6 sockets. pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { self.io.set_multicast_loop_v4(on) } /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. /// /// For more information about this option, see [`set_multicast_ttl_v4`]. /// /// [`set_multicast_ttl_v4`]: method@Self::set_multicast_ttl_v4 pub fn multicast_ttl_v4(&self) -> io::Result { self.io.multicast_ttl_v4() } /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. /// /// Indicates the time-to-live value of outgoing multicast packets for /// this socket. The default value is 1 which means that multicast packets /// don't leave the local network unless explicitly requested. /// /// # Note /// /// This may not have any affect on IPv6 sockets. pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { self.io.set_multicast_ttl_v4(ttl) } /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. /// /// For more information about this option, see [`set_multicast_loop_v6`]. /// /// [`set_multicast_loop_v6`]: method@Self::set_multicast_loop_v6 pub fn multicast_loop_v6(&self) -> io::Result { self.io.multicast_loop_v6() } /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. /// /// Controls whether this socket sees the multicast packets it sends itself. /// /// # Note /// /// This may not have any affect on IPv4 sockets. pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { self.io.set_multicast_loop_v6(on) } /// Gets the value of the `IP_TTL` option for this socket. /// /// For more information about this option, see [`set_ttl`]. /// /// [`set_ttl`]: method@Self::set_ttl /// /// # Examples /// /// ```no_run /// use tokio::net::UdpSocket; /// # use std::io; /// /// # async fn dox() -> io::Result<()> { /// let sock = UdpSocket::bind("127.0.0.1:8080").await?; /// /// println!("{:?}", sock.ttl()?); /// # Ok(()) /// # } /// ``` pub fn ttl(&self) -> io::Result { self.io.ttl() } /// Sets the value for the `IP_TTL` option on this socket. /// /// This value sets the time-to-live field that is used in every packet sent /// from this socket. /// /// # Examples /// /// ```no_run /// use tokio::net::UdpSocket; /// # use std::io; /// /// # async fn dox() -> io::Result<()> { /// let sock = UdpSocket::bind("127.0.0.1:8080").await?; /// sock.set_ttl(60)?; /// /// # Ok(()) /// # } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { self.io.set_ttl(ttl) } /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. /// /// This function specifies a new multicast group for this socket to join. /// The address must be a valid multicast address, and `interface` is the /// address of the local interface with which the system should join the /// multicast group. If it's equal to `INADDR_ANY` then an appropriate /// interface is chosen by the system. pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { self.io.join_multicast_v4(&multiaddr, &interface) } /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. /// /// This function specifies a new multicast group for this socket to join. /// The address must be a valid multicast address, and `interface` is the /// index of the interface to join/leave (or 0 to indicate any interface). pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { self.io.join_multicast_v6(multiaddr, interface) } /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. /// /// For more information about this option, see [`join_multicast_v4`]. /// /// [`join_multicast_v4`]: method@Self::join_multicast_v4 pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { self.io.leave_multicast_v4(&multiaddr, &interface) } /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. /// /// For more information about this option, see [`join_multicast_v6`]. /// /// [`join_multicast_v6`]: method@Self::join_multicast_v6 pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { self.io.leave_multicast_v6(multiaddr, interface) } /// Returns the value of the `SO_ERROR` option. /// /// # Examples /// ``` /// use tokio::net::UdpSocket; /// use std::io; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// // Create a socket /// let socket = UdpSocket::bind("0.0.0.0:8080").await?; /// /// if let Ok(Some(err)) = socket.take_error() { /// println!("Got error: {:?}", err); /// } /// /// Ok(()) /// } /// ``` pub fn take_error(&self) -> io::Result> { self.io.take_error() } } impl TryFrom for UdpSocket { type Error = io::Error; /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to /// [`UdpSocket::from_std(stream)`](UdpSocket::from_std). fn try_from(stream: std::net::UdpSocket) -> Result { Self::from_std(stream) } } impl fmt::Debug for UdpSocket { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.io.fmt(f) } } #[cfg(all(unix))] mod sys { use super::UdpSocket; use std::os::unix::prelude::*; impl AsRawFd for UdpSocket { fn as_raw_fd(&self) -> RawFd { self.io.as_raw_fd() } } } #[cfg(windows)] mod sys { use super::UdpSocket; use std::os::windows::prelude::*; impl AsRawSocket for UdpSocket { fn as_raw_socket(&self) -> RawSocket { self.io.as_raw_socket() } } }