summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/tests/udp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/tests/udp.rs')
-rw-r--r--third_party/rust/tokio/tests/udp.rs486
1 files changed, 486 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/udp.rs b/third_party/rust/tokio/tests/udp.rs
new file mode 100644
index 0000000000..ec2a1e9610
--- /dev/null
+++ b/third_party/rust/tokio/tests/udp.rs
@@ -0,0 +1,486 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use futures::future::poll_fn;
+use std::io;
+use std::sync::Arc;
+use tokio::{io::ReadBuf, net::UdpSocket};
+use tokio_test::assert_ok;
+
+const MSG: &[u8] = b"hello";
+const MSG_LEN: usize = MSG.len();
+
+#[tokio::test]
+async fn send_recv() -> std::io::Result<()> {
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let receiver = UdpSocket::bind("127.0.0.1:0").await?;
+
+ sender.connect(receiver.local_addr()?).await?;
+ receiver.connect(sender.local_addr()?).await?;
+
+ sender.send(MSG).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let len = receiver.recv(&mut recv_buf[..]).await?;
+
+ assert_eq!(&recv_buf[..len], MSG);
+ Ok(())
+}
+
+#[tokio::test]
+async fn send_recv_poll() -> std::io::Result<()> {
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let receiver = UdpSocket::bind("127.0.0.1:0").await?;
+
+ sender.connect(receiver.local_addr()?).await?;
+ receiver.connect(sender.local_addr()?).await?;
+
+ poll_fn(|cx| sender.poll_send(cx, MSG)).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ let _len = poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;
+
+ assert_eq!(read.filled(), MSG);
+ Ok(())
+}
+
+#[tokio::test]
+async fn send_to_recv_from() -> std::io::Result<()> {
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let receiver = UdpSocket::bind("127.0.0.1:0").await?;
+
+ let receiver_addr = receiver.local_addr()?;
+ sender.send_to(MSG, &receiver_addr).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?;
+
+ assert_eq!(&recv_buf[..len], MSG);
+ assert_eq!(addr, sender.local_addr()?);
+ Ok(())
+}
+
+#[tokio::test]
+async fn send_to_recv_from_poll() -> std::io::Result<()> {
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let receiver = UdpSocket::bind("127.0.0.1:0").await?;
+
+ let receiver_addr = receiver.local_addr()?;
+ poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
+
+ assert_eq!(read.filled(), MSG);
+ assert_eq!(addr, sender.local_addr()?);
+ Ok(())
+}
+
+#[tokio::test]
+async fn send_to_peek_from() -> std::io::Result<()> {
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let receiver = UdpSocket::bind("127.0.0.1:0").await?;
+
+ let receiver_addr = receiver.local_addr()?;
+ poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
+
+ // peek
+ let mut recv_buf = [0u8; 32];
+ let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
+ assert_eq!(&recv_buf[..n], MSG);
+ assert_eq!(addr, sender.local_addr()?);
+
+ // peek
+ let mut recv_buf = [0u8; 32];
+ let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
+ assert_eq!(&recv_buf[..n], MSG);
+ assert_eq!(addr, sender.local_addr()?);
+
+ let mut recv_buf = [0u8; 32];
+ let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
+ assert_eq!(&recv_buf[..n], MSG);
+ assert_eq!(addr, sender.local_addr()?);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn send_to_peek_from_poll() -> std::io::Result<()> {
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let receiver = UdpSocket::bind("127.0.0.1:0").await?;
+
+ let receiver_addr = receiver.local_addr()?;
+ poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ let addr = poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
+
+ assert_eq!(read.filled(), MSG);
+ assert_eq!(addr, sender.local_addr()?);
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
+
+ assert_eq!(read.filled(), MSG);
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+
+ poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
+ assert_eq!(read.filled(), MSG);
+ Ok(())
+}
+
+#[tokio::test]
+async fn split() -> std::io::Result<()> {
+ let socket = UdpSocket::bind("127.0.0.1:0").await?;
+ let s = Arc::new(socket);
+ let r = s.clone();
+
+ let addr = s.local_addr()?;
+ tokio::spawn(async move {
+ s.send_to(MSG, &addr).await.unwrap();
+ });
+ let mut recv_buf = [0u8; 32];
+ let (len, _) = r.recv_from(&mut recv_buf[..]).await?;
+ assert_eq!(&recv_buf[..len], MSG);
+ Ok(())
+}
+
+#[tokio::test]
+async fn split_chan() -> std::io::Result<()> {
+ // setup UdpSocket that will echo all sent items
+ let socket = UdpSocket::bind("127.0.0.1:0").await?;
+ let addr = socket.local_addr().unwrap();
+ let s = Arc::new(socket);
+ let r = s.clone();
+
+ let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
+ tokio::spawn(async move {
+ while let Some((bytes, addr)) = rx.recv().await {
+ s.send_to(&bytes, &addr).await.unwrap();
+ }
+ });
+
+ tokio::spawn(async move {
+ let mut buf = [0u8; 32];
+ loop {
+ let (len, addr) = r.recv_from(&mut buf).await.unwrap();
+ tx.send((buf[..len].to_vec(), addr)).await.unwrap();
+ }
+ });
+
+ // test that we can send a value and get back some response
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ sender.send_to(MSG, addr).await?;
+ let mut recv_buf = [0u8; 32];
+ let (len, _) = sender.recv_from(&mut recv_buf).await?;
+ assert_eq!(&recv_buf[..len], MSG);
+ Ok(())
+}
+
+#[tokio::test]
+async fn split_chan_poll() -> std::io::Result<()> {
+ // setup UdpSocket that will echo all sent items
+ let socket = UdpSocket::bind("127.0.0.1:0").await?;
+ let addr = socket.local_addr().unwrap();
+ let s = Arc::new(socket);
+ let r = s.clone();
+
+ let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
+ tokio::spawn(async move {
+ while let Some((bytes, addr)) = rx.recv().await {
+ poll_fn(|cx| s.poll_send_to(cx, &bytes, addr))
+ .await
+ .unwrap();
+ }
+ });
+
+ tokio::spawn(async move {
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ loop {
+ let addr = poll_fn(|cx| r.poll_recv_from(cx, &mut read)).await.unwrap();
+ tx.send((read.filled().to_vec(), addr)).await.unwrap();
+ }
+ });
+
+ // test that we can send a value and get back some response
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ poll_fn(|cx| sender.poll_send_to(cx, MSG, addr)).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ let _ = poll_fn(|cx| sender.poll_recv_from(cx, &mut read)).await?;
+ assert_eq!(read.filled(), MSG);
+ Ok(())
+}
+
+// # Note
+//
+// This test is purposely written such that each time `sender` sends data on
+// the socket, `receiver` awaits the data. On Unix, it would be okay waiting
+// until the end of the test to receive all the data. On Windows, this would
+// **not** be okay because it's resources are completion based (via IOCP).
+// If data is sent and not yet received, attempting to send more data will
+// result in `ErrorKind::WouldBlock` until the first operation completes.
+#[tokio::test]
+async fn try_send_spawn() {
+ const MSG2: &[u8] = b"world!";
+ const MSG2_LEN: usize = MSG2.len();
+
+ let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+
+ receiver
+ .connect(sender.local_addr().unwrap())
+ .await
+ .unwrap();
+
+ sender.writable().await.unwrap();
+
+ let sent = &sender
+ .try_send_to(MSG, receiver.local_addr().unwrap())
+ .unwrap();
+ assert_eq!(sent, &MSG_LEN);
+ let mut buf = [0u8; 32];
+ let mut received = receiver.recv(&mut buf[..]).await.unwrap();
+
+ sender
+ .connect(receiver.local_addr().unwrap())
+ .await
+ .unwrap();
+ let sent = &sender.try_send(MSG2).unwrap();
+ assert_eq!(sent, &MSG2_LEN);
+ received += receiver.recv(&mut buf[..]).await.unwrap();
+
+ std::thread::spawn(move || {
+ let sent = &sender.try_send(MSG).unwrap();
+ assert_eq!(sent, &MSG_LEN);
+ })
+ .join()
+ .unwrap();
+ received += receiver.recv(&mut buf[..]).await.unwrap();
+
+ assert_eq!(received, MSG_LEN * 2 + MSG2_LEN);
+}
+
+#[tokio::test]
+async fn try_send_recv() {
+ // Create listener
+ let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+
+ // Create socket pair
+ let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+
+ // Connect the two
+ client.connect(server.local_addr().unwrap()).await.unwrap();
+ server.connect(client.local_addr().unwrap()).await.unwrap();
+
+ for _ in 0..5 {
+ loop {
+ client.writable().await.unwrap();
+
+ match client.try_send(b"hello world") {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ server.readable().await.unwrap();
+
+ let mut buf = [0; 512];
+
+ match server.try_recv(&mut buf) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+}
+
+#[tokio::test]
+async fn try_send_to_recv_from() {
+ // Create listener
+ let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let saddr = server.local_addr().unwrap();
+
+ // Create socket pair
+ let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let caddr = client.local_addr().unwrap();
+
+ for _ in 0..5 {
+ loop {
+ client.writable().await.unwrap();
+
+ match client.try_send_to(b"hello world", saddr) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ server.readable().await.unwrap();
+
+ let mut buf = [0; 512];
+
+ match server.try_recv_from(&mut buf) {
+ Ok((n, addr)) => {
+ assert_eq!(n, 11);
+ assert_eq!(addr, caddr);
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+}
+
+#[tokio::test]
+async fn try_recv_buf() {
+ // Create listener
+ let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+
+ // Create socket pair
+ let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+
+ // Connect the two
+ client.connect(server.local_addr().unwrap()).await.unwrap();
+ server.connect(client.local_addr().unwrap()).await.unwrap();
+
+ for _ in 0..5 {
+ loop {
+ client.writable().await.unwrap();
+
+ match client.try_send(b"hello world") {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ server.readable().await.unwrap();
+
+ let mut buf = Vec::with_capacity(512);
+
+ match server.try_recv_buf(&mut buf) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+}
+
+#[tokio::test]
+async fn try_recv_buf_from() {
+ // Create listener
+ let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let saddr = server.local_addr().unwrap();
+
+ // Create socket pair
+ let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let caddr = client.local_addr().unwrap();
+
+ for _ in 0..5 {
+ loop {
+ client.writable().await.unwrap();
+
+ match client.try_send_to(b"hello world", saddr) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ server.readable().await.unwrap();
+
+ let mut buf = Vec::with_capacity(512);
+
+ match server.try_recv_buf_from(&mut buf) {
+ Ok((n, addr)) => {
+ assert_eq!(n, 11);
+ assert_eq!(addr, caddr);
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+}
+
+#[tokio::test]
+async fn poll_ready() {
+ // Create listener
+ let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let saddr = server.local_addr().unwrap();
+
+ // Create socket pair
+ let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let caddr = client.local_addr().unwrap();
+
+ for _ in 0..5 {
+ loop {
+ assert_ok!(poll_fn(|cx| client.poll_send_ready(cx)).await);
+
+ match client.try_send_to(b"hello world", saddr) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ assert_ok!(poll_fn(|cx| server.poll_recv_ready(cx)).await);
+
+ let mut buf = Vec::with_capacity(512);
+
+ match server.try_recv_buf_from(&mut buf) {
+ Ok((n, addr)) => {
+ assert_eq!(n, 11);
+ assert_eq!(addr, caddr);
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+}