diff options
Diffstat (limited to 'third_party/rust/tokio-udp')
-rw-r--r-- | third_party/rust/tokio-udp/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/tokio-udp/CHANGELOG.md | 7 | ||||
-rw-r--r-- | third_party/rust/tokio-udp/Cargo.toml | 45 | ||||
-rw-r--r-- | third_party/rust/tokio-udp/LICENSE | 25 | ||||
-rw-r--r-- | third_party/rust/tokio-udp/README.md | 15 | ||||
-rw-r--r-- | third_party/rust/tokio-udp/src/frame.rs | 156 | ||||
-rw-r--r-- | third_party/rust/tokio-udp/src/lib.rs | 43 | ||||
-rw-r--r-- | third_party/rust/tokio-udp/src/recv_dgram.rs | 52 | ||||
-rw-r--r-- | third_party/rust/tokio-udp/src/send_dgram.rs | 61 | ||||
-rw-r--r-- | third_party/rust/tokio-udp/src/socket.rs | 425 | ||||
-rw-r--r-- | third_party/rust/tokio-udp/tests/udp.rs | 260 |
11 files changed, 1090 insertions, 0 deletions
diff --git a/third_party/rust/tokio-udp/.cargo-checksum.json b/third_party/rust/tokio-udp/.cargo-checksum.json new file mode 100644 index 0000000000..facc0c3fb8 --- /dev/null +++ b/third_party/rust/tokio-udp/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"CHANGELOG.md":"12c90f0adbca12bb4093c9ff49e5582b94ceba32cfb1883125cea2d86bc49229","Cargo.toml":"48d6aa4b9cf001ef34a4f2e6b831e7b8a607bc48c86ea4f3df8732111e177a9a","LICENSE":"4899c290472c872cf8a1904a60e73ec58a1bc1db2e20bc143aa3d1498be49c96","README.md":"6e14fb99ef1acf472ca5c3b501066df97de615c961b32fa19d1d4a335950a0a5","src/frame.rs":"739e2625b5b6a61024e61c8f7be02fdfcb7560e1547125c4f1fa4be7ff72e4d6","src/lib.rs":"c1d32d0874a6abbcaaabb40386e8366b1c0beb13738b2f3184b83b817f86dceb","src/recv_dgram.rs":"88a898ccbcfce5bb5c23f0f5485ad87b46fabd5c84e87b6876efbd964caba548","src/send_dgram.rs":"1ba6b5ba70c05e4b36fb209d3c4b46f3ffe7b075792ce209913824ecb04aaef0","src/socket.rs":"fdc5faef8075afa83072d09fa03eb615464926698bdbbf77d27d66c22047391e","tests/udp.rs":"120f35f20b1b134e3ac3c1fb5091cfa82d51e36dcd227905620b036741c2e313"},"package":"43eb534af6e8f37d43ab1b612660df14755c42bd003c5f8d2475ee78cc4600c0"}
\ No newline at end of file diff --git a/third_party/rust/tokio-udp/CHANGELOG.md b/third_party/rust/tokio-udp/CHANGELOG.md new file mode 100644 index 0000000000..dd8dad17a3 --- /dev/null +++ b/third_party/rust/tokio-udp/CHANGELOG.md @@ -0,0 +1,7 @@ +# 0.1.1 (June 13, 2018) + +* Switch to tokio-codec (#360) + +# 0.1.0 (Mar 23, 2018) + +* Initial release diff --git a/third_party/rust/tokio-udp/Cargo.toml b/third_party/rust/tokio-udp/Cargo.toml new file mode 100644 index 0000000000..2292109131 --- /dev/null +++ b/third_party/rust/tokio-udp/Cargo.toml @@ -0,0 +1,45 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g. crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +name = "tokio-udp" +version = "0.1.1" +authors = ["Carl Lerche <me@carllerche.com>"] +description = "UDP bindings for tokio.\n" +homepage = "https://tokio.rs" +documentation = "https://docs.rs/tokio-udp/0.1" +categories = ["asynchronous"] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" +[dependencies.bytes] +version = "0.4" + +[dependencies.futures] +version = "0.1.19" + +[dependencies.log] +version = "0.4" + +[dependencies.mio] +version = "0.6.14" + +[dependencies.tokio-codec] +version = "0.1.0" + +[dependencies.tokio-io] +version = "0.1.7" + +[dependencies.tokio-reactor] +version = "0.1.1" +[dev-dependencies.env_logger] +version = "0.4" +default-features = false diff --git a/third_party/rust/tokio-udp/LICENSE b/third_party/rust/tokio-udp/LICENSE new file mode 100644 index 0000000000..38c1e27b8e --- /dev/null +++ b/third_party/rust/tokio-udp/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2018 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/tokio-udp/README.md b/third_party/rust/tokio-udp/README.md new file mode 100644 index 0000000000..e677c0749f --- /dev/null +++ b/third_party/rust/tokio-udp/README.md @@ -0,0 +1,15 @@ +# tokio-udp + +UDP bindings for `tokio`. + +[Documentation](https://tokio-rs.github.io/tokio/tokio_udp/) + +## License + +This project is licensed under the [MIT license](./LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tokio by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/third_party/rust/tokio-udp/src/frame.rs b/third_party/rust/tokio-udp/src/frame.rs new file mode 100644 index 0000000000..37097ca377 --- /dev/null +++ b/third_party/rust/tokio-udp/src/frame.rs @@ -0,0 +1,156 @@ +use std::io; +use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4}; + +use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink}; + +use super::UdpSocket; + +use tokio_codec::{Decoder, Encoder}; +use bytes::{BytesMut, BufMut}; + +/// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using +/// the `Encoder` and `Decoder` traits to encode and decode frames. +/// +/// Raw UDP sockets work with datagrams, but higher-level code usually wants to +/// batch these into meaningful chunks, called "frames". This method layers +/// framing on top of this socket by using the `Encoder` and `Decoder` traits to +/// handle encoding and decoding of messages frames. Note that the incoming and +/// outgoing frame types may be distinct. +/// +/// This function returns a *single* object that is both `Stream` and `Sink`; +/// grouping this into a single object is often useful for layering things which +/// require both read and write access to the underlying object. +/// +/// If you want to work more directly with the streams and sink, consider +/// calling `split` on the `UdpFramed` returned by this method, which will break +/// them into separate objects, allowing them to interact more easily. +#[must_use = "sinks do nothing unless polled"] +#[derive(Debug)] +pub struct UdpFramed<C> { + socket: UdpSocket, + codec: C, + rd: BytesMut, + wr: BytesMut, + out_addr: SocketAddr, + flushed: bool, +} + +impl<C: Decoder> Stream for UdpFramed<C> { + type Item = (C::Item, SocketAddr); + type Error = C::Error; + + fn poll(&mut self) -> Poll<Option<(Self::Item)>, Self::Error> { + self.rd.reserve(INITIAL_RD_CAPACITY); + + let (n, addr) = unsafe { + // Read into the buffer without having to initialize the memory. + let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut())); + self.rd.advance_mut(n); + (n, addr) + }; + trace!("received {} bytes, decoding", n); + let frame_res = self.codec.decode(&mut self.rd); + self.rd.clear(); + let frame = frame_res?; + let result = frame.map(|frame| (frame, addr)); // frame -> (frame, addr) + trace!("frame decoded from buffer"); + Ok(Async::Ready(result)) + } +} + +impl<C: Encoder> Sink for UdpFramed<C> { + type SinkItem = (C::Item, SocketAddr); + type SinkError = C::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + trace!("sending frame"); + + if !self.flushed { + match try!(self.poll_complete()) { + Async::Ready(()) => {}, + Async::NotReady => return Ok(AsyncSink::NotReady(item)), + } + } + + let (frame, out_addr) = item; + self.codec.encode(frame, &mut self.wr)?; + self.out_addr = out_addr; + self.flushed = false; + trace!("frame encoded; length={}", self.wr.len()); + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), C::Error> { + if self.flushed { + return Ok(Async::Ready(())) + } + + trace!("flushing frame; length={}", self.wr.len()); + let n = try_ready!(self.socket.poll_send_to(&self.wr, &self.out_addr)); + trace!("written {}", n); + + let wrote_all = n == self.wr.len(); + self.wr.clear(); + self.flushed = true; + + if wrote_all { + Ok(Async::Ready(())) + } else { + Err(io::Error::new(io::ErrorKind::Other, + "failed to write entire datagram to socket").into()) + } + } + + fn close(&mut self) -> Poll<(), C::Error> { + try_ready!(self.poll_complete()); + Ok(().into()) + } +} + +const INITIAL_RD_CAPACITY: usize = 64 * 1024; +const INITIAL_WR_CAPACITY: usize = 8 * 1024; + +impl<C> UdpFramed<C> { + /// Create a new `UdpFramed` backed by the given socket and codec. + /// + /// See struct level documentation for more details. + pub fn new(socket: UdpSocket, codec: C) -> UdpFramed<C> { + UdpFramed { + socket: socket, + codec: codec, + out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)), + rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY), + wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY), + flushed: true, + } + } + + /// Returns a reference to the underlying I/O stream wrapped by `Framed`. + /// + /// # Note + /// + /// Care should be taken to not tamper with the underlying stream of data + /// coming in as it may corrupt the stream of frames otherwise being worked + /// with. + pub fn get_ref(&self) -> &UdpSocket { + &self.socket + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Framed`. + /// + /// # Note + /// + /// Care should be taken to not tamper with the underlying stream of data + /// coming in as it may corrupt the stream of frames otherwise being worked + /// with. + pub fn get_mut(&mut self) -> &mut UdpSocket { + &mut self.socket + } + + /// Consumes the `Framed`, returning its underlying I/O stream. + pub fn into_inner(self) -> UdpSocket { + self.socket + } +} diff --git a/third_party/rust/tokio-udp/src/lib.rs b/third_party/rust/tokio-udp/src/lib.rs new file mode 100644 index 0000000000..4a37b697ef --- /dev/null +++ b/third_party/rust/tokio-udp/src/lib.rs @@ -0,0 +1,43 @@ +//! UDP bindings for `tokio`. +//! +//! This module contains the UDP networking types, similar to the standard +//! library, which can be used to implement networking protocols. +//! +//! The main struct for UDP is the [`UdpSocket`], which represents a UDP socket. +//! Reading and writing to it can be done using futures, which return the +//! [`RecvDgram`] and [`SendDgram`] structs respectively. +//! +//! For convenience it's also possible to convert raw datagrams into higher-level +//! frames. +//! +//! [`UdpSocket`]: struct.UdpSocket.html +//! [`RecvDgram`]: struct.RecvDgram.html +//! [`SendDgram`]: struct.SendDgram.html +//! [`UdpFramed`]: struct.UdpFramed.html +//! [`framed`]: struct.UdpSocket.html#method.framed + +#![doc(html_root_url = "https://docs.rs/tokio-tcp/0.1.1")] +#![deny(missing_docs, warnings, missing_debug_implementations)] + +extern crate bytes; +#[macro_use] +extern crate futures; +extern crate mio; +#[macro_use] +extern crate log; +extern crate tokio_codec; +extern crate tokio_io; +extern crate tokio_reactor; + +#[cfg(feature = "unstable-futures")] +extern crate futures2; + +mod frame; +mod socket; +mod send_dgram; +mod recv_dgram; + +pub use self::frame::UdpFramed; +pub use self::socket::UdpSocket; +pub use self::send_dgram::SendDgram; +pub use self::recv_dgram::RecvDgram; diff --git a/third_party/rust/tokio-udp/src/recv_dgram.rs b/third_party/rust/tokio-udp/src/recv_dgram.rs new file mode 100644 index 0000000000..2a9e296064 --- /dev/null +++ b/third_party/rust/tokio-udp/src/recv_dgram.rs @@ -0,0 +1,52 @@ +use super::socket::UdpSocket; + +use std::io; +use std::net::SocketAddr; + +use futures::{Async, Future, Poll}; + +/// A future used to receive a datagram from a UDP socket. +/// +/// This is created by the `UdpSocket::recv_dgram` method. +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct RecvDgram<T> { + /// None means future was completed + state: Option<RecvDgramInner<T>> +} + +/// A struct is used to represent the full info of RecvDgram. +#[derive(Debug)] +struct RecvDgramInner<T> { + /// Rx socket + socket: UdpSocket, + /// The received data will be put in the buffer + buffer: T +} + +impl<T> RecvDgram<T> { + /// Create a new future to receive UDP Datagram + pub(crate) fn new(socket: UdpSocket, buffer: T) -> RecvDgram<T> { + let inner = RecvDgramInner { socket: socket, buffer: buffer }; + RecvDgram { state: Some(inner) } + } +} + +impl<T> Future for RecvDgram<T> + where T: AsMut<[u8]>, +{ + type Item = (UdpSocket, T, usize, SocketAddr); + type Error = io::Error; + + fn poll(&mut self) -> Poll<Self::Item, io::Error> { + let (n, addr) = { + let ref mut inner = + self.state.as_mut().expect("RecvDgram polled after completion"); + + try_ready!(inner.socket.poll_recv_from(inner.buffer.as_mut())) + }; + + let inner = self.state.take().unwrap(); + Ok(Async::Ready((inner.socket, inner.buffer, n, addr))) + } +} diff --git a/third_party/rust/tokio-udp/src/send_dgram.rs b/third_party/rust/tokio-udp/src/send_dgram.rs new file mode 100644 index 0000000000..50d6503899 --- /dev/null +++ b/third_party/rust/tokio-udp/src/send_dgram.rs @@ -0,0 +1,61 @@ +use super::socket::UdpSocket; + +use std::io; +use std::net::SocketAddr; + +use futures::{Async, Future, Poll}; + +/// A future used to write the entire contents of some data to a UDP socket. +/// +/// This is created by the `UdpSocket::send_dgram` method. +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct SendDgram<T> { + /// None means future was completed + state: Option<SendDgramInner<T>> +} + +/// A struct is used to represent the full info of SendDgram. +#[derive(Debug)] +struct SendDgramInner<T> { + /// Tx socket + socket: UdpSocket, + /// The whole buffer will be sent + buffer: T, + /// Destination addr + addr: SocketAddr, +} + +impl<T> SendDgram<T> { + /// Create a new future to send UDP Datagram + pub(crate) fn new(socket: UdpSocket, buffer: T, addr: SocketAddr) -> SendDgram<T> { + let inner = SendDgramInner { socket: socket, buffer: buffer, addr: addr }; + SendDgram { state: Some(inner) } + } +} + +fn incomplete_write(reason: &str) -> io::Error { + io::Error::new(io::ErrorKind::Other, reason) +} + +impl<T> Future for SendDgram<T> + where T: AsRef<[u8]>, +{ + type Item = (UdpSocket, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(UdpSocket, T), io::Error> { + { + let ref mut inner = + self.state.as_mut().expect("SendDgram polled after completion"); + let n = try_ready!(inner.socket.poll_send_to(inner.buffer.as_ref(), &inner.addr)); + if n != inner.buffer.as_ref().len() { + return Err(incomplete_write("failed to send entire message \ + in datagram")) + } + } + + let inner = self.state.take().unwrap(); + Ok(Async::Ready((inner.socket, inner.buffer))) + } +} diff --git a/third_party/rust/tokio-udp/src/socket.rs b/third_party/rust/tokio-udp/src/socket.rs new file mode 100644 index 0000000000..aaf6b8f726 --- /dev/null +++ b/third_party/rust/tokio-udp/src/socket.rs @@ -0,0 +1,425 @@ +use super::{SendDgram, RecvDgram}; + +use std::io; +use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr}; +use std::fmt; + +use futures::{Async, Poll}; +use mio; + +use tokio_reactor::{Handle, PollEvented}; + +/// An I/O object representing a UDP socket. +pub struct UdpSocket { + io: PollEvented<mio::net::UdpSocket>, +} + +impl UdpSocket { + /// This function will create a new UDP socket and attempt to bind it to + /// the `addr` provided. + pub fn bind(addr: &SocketAddr) -> io::Result<UdpSocket> { + mio::net::UdpSocket::bind(addr) + .map(UdpSocket::new) + } + + fn new(socket: mio::net::UdpSocket) -> UdpSocket { + let io = PollEvented::new(socket); + UdpSocket { io: io } + } + + /// Creates a new `UdpSocket` from the previously bound socket provided. + /// + /// The socket given will be registered with the event loop that `handle` + /// is associated with. This function requires that `socket` has previously + /// been bound to an address to work correctly. + /// + /// This can be used in conjunction with net2's `UdpBuilder` interface to + /// configure a socket before it's handed off, such as setting options like + /// `reuse_address` or binding to multiple addresses. + /// + /// Use `Handle::default()` to lazily bind to an event loop, just like `bind` does. + pub fn from_std(socket: net::UdpSocket, + handle: &Handle) -> io::Result<UdpSocket> { + let io = mio::net::UdpSocket::from_socket(socket)?; + let io = PollEvented::new_with_handle(io, handle)?; + Ok(UdpSocket { io }) + } + + /// Returns the local address that this socket is bound to. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().local_addr() + } + + /// Connects the UDP socket setting the default destination for send() and + /// limiting packets that are read via recv from the address specified in + /// `addr`. + pub fn connect(&self, addr: &SocketAddr) -> io::Result<()> { + self.io.get_ref().connect(*addr) + } + + #[deprecated(since = "0.1.2", note = "use poll_send instead")] + #[doc(hidden)] + pub fn send(&mut self, buf: &[u8]) -> io::Result<usize> { + match self.poll_send(buf)? { + Async::Ready(n) => Ok(n), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), + } + } + + /// Sends data on the socket to the remote address to which it is connected. + /// + /// The [`connect`] method will connect this socket to a remote address. This + /// method will fail if the socket is not connected. + /// + /// [`connect`]: #method.connect + /// + /// # Return + /// + /// On success, returns `Ok(Async::Ready(num_bytes_written))`. + /// + /// If the socket is not ready for writing, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// notification when the socket becomes writable. + /// + /// # Panics + /// + /// This function will panic if called from outside of a task context. + pub fn poll_send(&mut self, buf: &[u8]) -> Poll<usize, io::Error> { + try_ready!(self.io.poll_write_ready()); + + match self.io.get_ref().send(buf) { + Ok(n) => Ok(n.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready()?; + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } + + #[deprecated(since = "0.1.2", note = "use poll_recv instead")] + #[doc(hidden)] + pub fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { + match self.poll_recv(buf)? { + Async::Ready(n) => Ok(n), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), + } + } + + /// Receives a single datagram message on the socket from the remote address to + /// which it is connected. On success, returns the number of bytes read. + /// + /// The function must be called with valid byte array `buf` of sufficient size to + /// hold the message bytes. If a message is too long to fit in the supplied buffer, + /// excess bytes may be discarded. + /// + /// The [`connect`] method will connect this socket to a remote address. This + /// method will fail if the socket is not connected. + /// + /// [`connect`]: #method.connect + /// + /// # Return + /// + /// On success, returns `Ok(Async::Ready(num_bytes_read))`. + /// + /// If no data is available for reading, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// notification when the socket becomes receivable or is closed. + /// + /// # Panics + /// + /// This function will panic if called from outside of a task context. + pub fn poll_recv(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> { + try_ready!(self.io.poll_read_ready(mio::Ready::readable())); + + match self.io.get_ref().recv(buf) { + Ok(n) => Ok(n.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(mio::Ready::readable())?; + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } + + #[deprecated(since = "0.1.2", note = "use poll_send_to instead")] + #[doc(hidden)] + pub fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { + match self.poll_send_to(buf, target)? { + Async::Ready(n) => Ok(n), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), + } + } + + /// Sends data on the socket to the given address. On success, returns the + /// number of bytes written. + /// + /// This will return an error when the IP version of the local socket + /// does not match that of `target`. + /// + /// # Return + /// + /// On success, returns `Ok(Async::Ready(num_bytes_written))`. + /// + /// If the socket is not ready for writing, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// notification when the socket becomes writable. + /// + /// # Panics + /// + /// This function will panic if called from outside of a task context. + pub fn poll_send_to(&mut self, buf: &[u8], target: &SocketAddr) -> Poll<usize, io::Error> { + try_ready!(self.io.poll_write_ready()); + + match self.io.get_ref().send_to(buf, target) { + Ok(n) => Ok(n.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready()?; + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } + + /// Creates a future that will write the entire contents of the buffer + /// `buf` provided as a datagram to this socket. + /// + /// The returned future will return after data has been written to the + /// outbound socket. The future will resolve to the stream as well as the + /// buffer (for reuse if needed). + /// + /// Any error which happens during writing will cause both the stream and + /// the buffer to get destroyed. Note that failure to write the entire + /// buffer is considered an error for the purposes of sending a datagram. + /// + /// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which + /// should be broadly applicable to accepting data which can be converted + /// to a slice. + pub fn send_dgram<T>(self, buf: T, addr: &SocketAddr) -> SendDgram<T> + where T: AsRef<[u8]>, + { + SendDgram::new(self, buf, *addr) + } + + #[deprecated(since = "0.1.2", note = "use poll_recv_from instead")] + #[doc(hidden)] + pub fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + match self.poll_recv_from(buf)? { + Async::Ready(ret) => Ok(ret), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), + } + } + + /// Receives data from the socket. On success, returns the number of bytes + /// read and the address from whence the data came. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_recv_from(&mut self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> { + try_ready!(self.io.poll_read_ready(mio::Ready::readable())); + + match self.io.get_ref().recv_from(buf) { + Ok(n) => Ok(n.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(mio::Ready::readable())?; + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } + + /// Creates a future that receive a datagram to be written to the buffer + /// provided. + /// + /// The returned future will return after a datagram has been received on + /// this socket. The future will resolve to the socket, the buffer, the + /// amount of data read, and the address the data was received from. + /// + /// An error during reading will cause the socket and buffer to get + /// destroyed. + /// + /// The `buf` parameter here only requires the `AsMut<[u8]>` trait, which + /// should be broadly applicable to accepting data which can be converted + /// to a slice. + pub fn recv_dgram<T>(self, buf: T) -> RecvDgram<T> + where T: AsMut<[u8]>, + { + RecvDgram::new(self, buf) + } + + /// Gets the value of the `SO_BROADCAST` option for this socket. + /// + /// For more information about this option, see [`set_broadcast`]. + /// + /// [`set_broadcast`]: #method.set_broadcast + pub fn broadcast(&self) -> io::Result<bool> { + self.io.get_ref().broadcast() + } + + /// Sets the value of the `SO_BROADCAST` option for this socket. + /// + /// When enabled, this socket is allowed to send packets to a broadcast + /// address. + pub fn set_broadcast(&self, on: bool) -> io::Result<()> { + self.io.get_ref().set_broadcast(on) + } + + /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. + /// + /// For more information about this option, see [`set_multicast_loop_v4`]. + /// + /// [`set_multicast_loop_v4`]: #method.set_multicast_loop_v4 + pub fn multicast_loop_v4(&self) -> io::Result<bool> { + self.io.get_ref().multicast_loop_v4() + } + + /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. + /// + /// If enabled, multicast packets will be looped back to the local socket. + /// + /// # Note + /// + /// This may not have any affect on IPv6 sockets. + pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { + self.io.get_ref().set_multicast_loop_v4(on) + } + + /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. + /// + /// For more information about this option, see [`set_multicast_ttl_v4`]. + /// + /// [`set_multicast_ttl_v4`]: #method.set_multicast_ttl_v4 + pub fn multicast_ttl_v4(&self) -> io::Result<u32> { + self.io.get_ref().multicast_ttl_v4() + } + + /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. + /// + /// Indicates the time-to-live value of outgoing multicast packets for + /// this socket. The default value is 1 which means that multicast packets + /// don't leave the local network unless explicitly requested. + /// + /// # Note + /// + /// This may not have any affect on IPv6 sockets. + pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { + self.io.get_ref().set_multicast_ttl_v4(ttl) + } + + /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. + /// + /// For more information about this option, see [`set_multicast_loop_v6`]. + /// + /// [`set_multicast_loop_v6`]: #method.set_multicast_loop_v6 + pub fn multicast_loop_v6(&self) -> io::Result<bool> { + self.io.get_ref().multicast_loop_v6() + } + + /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. + /// + /// Controls whether this socket sees the multicast packets it sends itself. + /// + /// # Note + /// + /// This may not have any affect on IPv4 sockets. + pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { + self.io.get_ref().set_multicast_loop_v6(on) + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// For more information about this option, see [`set_ttl`]. + /// + /// [`set_ttl`]: #method.set_ttl + pub fn ttl(&self) -> io::Result<u32> { + self.io.get_ref().ttl() + } + + /// Sets the value for the `IP_TTL` option on this socket. + /// + /// This value sets the time-to-live field that is used in every packet sent + /// from this socket. + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.io.get_ref().set_ttl(ttl) + } + + /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. + /// + /// This function specifies a new multicast group for this socket to join. + /// The address must be a valid multicast address, and `interface` is the + /// address of the local interface with which the system should join the + /// multicast group. If it's equal to `INADDR_ANY` then an appropriate + /// interface is chosen by the system. + pub fn join_multicast_v4(&self, + multiaddr: &Ipv4Addr, + interface: &Ipv4Addr) -> io::Result<()> { + self.io.get_ref().join_multicast_v4(multiaddr, interface) + } + + /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. + /// + /// This function specifies a new multicast group for this socket to join. + /// The address must be a valid multicast address, and `interface` is the + /// index of the interface to join/leave (or 0 to indicate any interface). + pub fn join_multicast_v6(&self, + multiaddr: &Ipv6Addr, + interface: u32) -> io::Result<()> { + self.io.get_ref().join_multicast_v6(multiaddr, interface) + } + + /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. + /// + /// For more information about this option, see [`join_multicast_v4`]. + /// + /// [`join_multicast_v4`]: #method.join_multicast_v4 + pub fn leave_multicast_v4(&self, + multiaddr: &Ipv4Addr, + interface: &Ipv4Addr) -> io::Result<()> { + self.io.get_ref().leave_multicast_v4(multiaddr, interface) + } + + /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. + /// + /// For more information about this option, see [`join_multicast_v6`]. + /// + /// [`join_multicast_v6`]: #method.join_multicast_v6 + pub fn leave_multicast_v6(&self, + multiaddr: &Ipv6Addr, + interface: u32) -> io::Result<()> { + self.io.get_ref().leave_multicast_v6(multiaddr, interface) + } +} + +impl fmt::Debug for UdpSocket { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +#[cfg(all(unix))] +mod sys { + use std::os::unix::prelude::*; + use super::UdpSocket; + + impl AsRawFd for UdpSocket { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } + } +} + +#[cfg(windows)] +mod sys { + // TODO: let's land these upstream with mio and then we can add them here. + // + // use std::os::windows::prelude::*; + // use super::UdpSocket; + // + // impl AsRawHandle for UdpSocket { + // fn as_raw_handle(&self) -> RawHandle { + // self.io.get_ref().as_raw_handle() + // } + // } +} diff --git a/third_party/rust/tokio-udp/tests/udp.rs b/third_party/rust/tokio-udp/tests/udp.rs new file mode 100644 index 0000000000..ef70ed23f7 --- /dev/null +++ b/third_party/rust/tokio-udp/tests/udp.rs @@ -0,0 +1,260 @@ +extern crate futures; +extern crate tokio_udp; +extern crate tokio_codec; +#[macro_use] +extern crate tokio_io; +extern crate bytes; +extern crate env_logger; + +use std::io; +use std::net::SocketAddr; + +use futures::{Future, Poll, Stream, Sink}; + +use tokio_udp::{UdpSocket, UdpFramed}; +use tokio_codec::{Encoder, Decoder}; +use bytes::{BytesMut, BufMut}; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +fn send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R) { + let mut a = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into())); + let mut b = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into())); + let a_addr = t!(a.local_addr()); + let b_addr = t!(b.local_addr()); + + { + let send = SendMessage::new(a, send.clone(), b_addr, b"1234"); + let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234"); + let (sendt, received) = t!(send.join(recv).wait()); + a = sendt; + b = received; + } + + { + let send = SendMessage::new(a, send, b_addr, b""); + let recv = RecvMessage::new(b, recv, a_addr, b""); + t!(send.join(recv).wait()); + } +} + +#[test] +fn send_to_and_recv_from() { + send_messages(SendTo {}, RecvFrom {}); +} + +#[test] +fn send_and_recv() { + send_messages(Send {}, Recv {}); +} + +trait SendFn { + fn send(&self, &mut UdpSocket, &[u8], &SocketAddr) -> Result<usize, io::Error>; +} + +#[derive(Debug, Clone)] +struct SendTo {} + +impl SendFn for SendTo { + fn send(&self, socket: &mut UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error> { + socket.send_to(buf, addr) + } +} + +#[derive(Debug, Clone)] +struct Send {} + +impl SendFn for Send { + fn send(&self, socket: &mut UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error> { + socket.connect(addr).expect("could not connect"); + socket.send(buf) + } +} + +struct SendMessage<S> { + socket: Option<UdpSocket>, + send: S, + addr: SocketAddr, + data: &'static [u8], +} + +impl<S: SendFn> SendMessage<S> { + fn new(socket: UdpSocket, send: S, addr: SocketAddr, data: &'static [u8]) -> SendMessage<S> { + SendMessage { + socket: Some(socket), + send: send, + addr: addr, + data: data, + } + } +} + +impl<S: SendFn> Future for SendMessage<S> { + type Item = UdpSocket; + type Error = io::Error; + + fn poll(&mut self) -> Poll<UdpSocket, io::Error> { + let n = try_nb!(self.send.send(self.socket.as_mut().unwrap(), &self.data[..], &self.addr)); + + assert_eq!(n, self.data.len()); + + Ok(self.socket.take().unwrap().into()) + } +} + +trait RecvFn { + fn recv(&self, &mut UdpSocket, &mut [u8], &SocketAddr) -> Result<usize, io::Error>; +} + +#[derive(Debug, Clone)] +struct RecvFrom {} + +impl RecvFn for RecvFrom { + fn recv(&self, socket: &mut UdpSocket, buf: &mut [u8], + expected_addr: &SocketAddr) -> Result<usize, io::Error> { + socket.recv_from(buf).map(|(s, addr)| { + assert_eq!(addr, *expected_addr); + s + }) + } +} + +#[derive(Debug, Clone)] +struct Recv {} + +impl RecvFn for Recv { + fn recv(&self, socket: &mut UdpSocket, buf: &mut [u8], _: &SocketAddr) -> Result<usize, io::Error> { + socket.recv(buf) + } +} + +struct RecvMessage<R> { + socket: Option<UdpSocket>, + recv: R, + expected_addr: SocketAddr, + expected_data: &'static [u8], +} + +impl<R: RecvFn> RecvMessage<R> { + fn new(socket: UdpSocket, recv: R, expected_addr: SocketAddr, + expected_data: &'static [u8]) -> RecvMessage<R> { + RecvMessage { + socket: Some(socket), + recv: recv, + expected_addr: expected_addr, + expected_data: expected_data, + } + } +} + +impl<R: RecvFn> Future for RecvMessage<R> { + type Item = UdpSocket; + type Error = io::Error; + + fn poll(&mut self) -> Poll<UdpSocket, io::Error> { + let mut buf = vec![0u8; 10 + self.expected_data.len() * 10]; + let n = try_nb!(self.recv.recv(&mut self.socket.as_mut().unwrap(), &mut buf[..], + &self.expected_addr)); + + assert_eq!(n, self.expected_data.len()); + assert_eq!(&buf[..self.expected_data.len()], &self.expected_data[..]); + + Ok(self.socket.take().unwrap().into()) + } +} + +#[test] +fn send_dgrams() { + let mut a = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); + let mut b = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); + let mut buf = [0u8; 50]; + let b_addr = t!(b.local_addr()); + + { + let send = a.send_dgram(&b"4321"[..], &b_addr); + let recv = b.recv_dgram(&mut buf[..]); + let (sendt, received) = t!(send.join(recv).wait()); + assert_eq!(received.2, 4); + assert_eq!(&received.1[..4], b"4321"); + a = sendt.0; + b = received.0; + } + + { + let send = a.send_dgram(&b""[..], &b_addr); + let recv = b.recv_dgram(&mut buf[..]); + let received = t!(send.join(recv).wait()).1; + assert_eq!(received.2, 0); + } +} + +pub struct ByteCodec; + +impl Decoder for ByteCodec { + type Item = Vec<u8>; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> { + let len = buf.len(); + Ok(Some(buf.split_to(len).to_vec())) + } +} + +impl Encoder for ByteCodec { + type Item = Vec<u8>; + type Error = io::Error; + + fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(data.len()); + buf.put(data); + Ok(()) + } +} + +#[test] +fn send_framed() { + drop(env_logger::init()); + + let mut a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); + let mut b_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); + let a_addr = t!(a_soc.local_addr()); + let b_addr = t!(b_soc.local_addr()); + + { + let a = UdpFramed::new(a_soc, ByteCodec); + let b = UdpFramed::new(b_soc, ByteCodec); + + let msg = b"4567".to_vec(); + + let send = a.send((msg.clone(), b_addr)); + let recv = b.into_future().map_err(|e| e.0); + let (sendt, received) = t!(send.join(recv).wait()); + + let (data, addr) = received.0.unwrap(); + assert_eq!(msg, data); + assert_eq!(a_addr, addr); + + a_soc = sendt.into_inner(); + b_soc = received.1.into_inner(); + } + + { + let a = UdpFramed::new(a_soc, ByteCodec); + let b = UdpFramed::new(b_soc, ByteCodec); + + let msg = b"".to_vec(); + + let send = a.send((msg.clone(), b_addr)); + let recv = b.into_future().map_err(|e| e.0); + let received = t!(send.join(recv).wait()).1; + + let (data, addr) = received.0.unwrap(); + assert_eq!(msg, data); + assert_eq!(a_addr, addr); + } +} |