summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/tests/tcp_stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/tests/tcp_stream.rs')
-rw-r--r--third_party/rust/tokio/tests/tcp_stream.rs359
1 files changed, 359 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/tcp_stream.rs b/third_party/rust/tokio/tests/tcp_stream.rs
new file mode 100644
index 0000000000..0b5d12ae8e
--- /dev/null
+++ b/third_party/rust/tokio/tests/tcp_stream.rs
@@ -0,0 +1,359 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest};
+use tokio::net::{TcpListener, TcpStream};
+use tokio::try_join;
+use tokio_test::task;
+use tokio_test::{assert_ok, assert_pending, assert_ready_ok};
+
+use std::io;
+use std::task::Poll;
+use std::time::Duration;
+
+use futures::future::poll_fn;
+
+#[tokio::test]
+async fn set_linger() {
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+
+ let stream = TcpStream::connect(listener.local_addr().unwrap())
+ .await
+ .unwrap();
+
+ assert_ok!(stream.set_linger(Some(Duration::from_secs(1))));
+ assert_eq!(stream.linger().unwrap().unwrap().as_secs(), 1);
+
+ assert_ok!(stream.set_linger(None));
+ assert!(stream.linger().unwrap().is_none());
+}
+
+#[tokio::test]
+async fn try_read_write() {
+ const DATA: &[u8] = b"this is some data to write to the socket";
+
+ // Create listener
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+
+ // Create socket pair
+ let client = TcpStream::connect(listener.local_addr().unwrap())
+ .await
+ .unwrap();
+ let (server, _) = listener.accept().await.unwrap();
+ let mut written = DATA.to_vec();
+
+ // Track the server receiving data
+ let mut readable = task::spawn(server.readable());
+ assert_pending!(readable.poll());
+
+ // Write data.
+ client.writable().await.unwrap();
+ assert_eq!(DATA.len(), client.try_write(DATA).unwrap());
+
+ // The task should be notified
+ while !readable.is_woken() {
+ tokio::task::yield_now().await;
+ }
+
+ // Fill the write buffer using non-vectored I/O
+ loop {
+ // Still ready
+ let mut writable = task::spawn(client.writable());
+ assert_ready_ok!(writable.poll());
+
+ match client.try_write(DATA) {
+ Ok(n) => written.extend(&DATA[..n]),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ break;
+ }
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ {
+ // Write buffer full
+ let mut writable = task::spawn(client.writable());
+ assert_pending!(writable.poll());
+
+ // Drain the socket from the server end using non-vectored I/O
+ let mut read = vec![0; written.len()];
+ let mut i = 0;
+
+ while i < read.len() {
+ server.readable().await.unwrap();
+
+ match server.try_read(&mut read[i..]) {
+ Ok(n) => i += n,
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ assert_eq!(read, written);
+ }
+
+ written.clear();
+ client.writable().await.unwrap();
+
+ // Fill the write buffer using vectored I/O
+ let data_bufs: Vec<_> = DATA.chunks(10).map(io::IoSlice::new).collect();
+ loop {
+ // Still ready
+ let mut writable = task::spawn(client.writable());
+ assert_ready_ok!(writable.poll());
+
+ match client.try_write_vectored(&data_bufs) {
+ Ok(n) => written.extend(&DATA[..n]),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ break;
+ }
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ {
+ // Write buffer full
+ let mut writable = task::spawn(client.writable());
+ assert_pending!(writable.poll());
+
+ // Drain the socket from the server end using vectored I/O
+ let mut read = vec![0; written.len()];
+ let mut i = 0;
+
+ while i < read.len() {
+ server.readable().await.unwrap();
+
+ let mut bufs: Vec<_> = read[i..]
+ .chunks_mut(0x10000)
+ .map(io::IoSliceMut::new)
+ .collect();
+ match server.try_read_vectored(&mut bufs) {
+ Ok(n) => i += n,
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ assert_eq!(read, written);
+ }
+
+ // Now, we listen for shutdown
+ drop(client);
+
+ loop {
+ let ready = server.ready(Interest::READABLE).await.unwrap();
+
+ if ready.is_read_closed() {
+ return;
+ } else {
+ tokio::task::yield_now().await;
+ }
+ }
+}
+
+#[test]
+fn buffer_not_included_in_future() {
+ use std::mem;
+
+ const N: usize = 4096;
+
+ let fut = async {
+ let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
+
+ loop {
+ stream.readable().await.unwrap();
+
+ let mut buf = [0; N];
+ let n = stream.try_read(&mut buf[..]).unwrap();
+
+ if n == 0 {
+ break;
+ }
+ }
+ };
+
+ let n = mem::size_of_val(&fut);
+ assert!(n < 1000);
+}
+
+macro_rules! assert_readable_by_polling {
+ ($stream:expr) => {
+ assert_ok!(poll_fn(|cx| $stream.poll_read_ready(cx)).await);
+ };
+}
+
+macro_rules! assert_not_readable_by_polling {
+ ($stream:expr) => {
+ poll_fn(|cx| {
+ assert_pending!($stream.poll_read_ready(cx));
+ Poll::Ready(())
+ })
+ .await;
+ };
+}
+
+macro_rules! assert_writable_by_polling {
+ ($stream:expr) => {
+ assert_ok!(poll_fn(|cx| $stream.poll_write_ready(cx)).await);
+ };
+}
+
+macro_rules! assert_not_writable_by_polling {
+ ($stream:expr) => {
+ poll_fn(|cx| {
+ assert_pending!($stream.poll_write_ready(cx));
+ Poll::Ready(())
+ })
+ .await;
+ };
+}
+
+#[tokio::test]
+async fn poll_read_ready() {
+ let (mut client, mut server) = create_pair().await;
+
+ // Initial state - not readable.
+ assert_not_readable_by_polling!(server);
+
+ // There is data in the buffer - readable.
+ assert_ok!(client.write_all(b"ping").await);
+ assert_readable_by_polling!(server);
+
+ // Readable until calls to `poll_read` return `Poll::Pending`.
+ let mut buf = [0u8; 4];
+ assert_ok!(server.read_exact(&mut buf).await);
+ assert_readable_by_polling!(server);
+ read_until_pending(&mut server);
+ assert_not_readable_by_polling!(server);
+
+ // Detect the client disconnect.
+ drop(client);
+ assert_readable_by_polling!(server);
+}
+
+#[tokio::test]
+async fn poll_write_ready() {
+ let (mut client, server) = create_pair().await;
+
+ // Initial state - writable.
+ assert_writable_by_polling!(client);
+
+ // No space to write - not writable.
+ write_until_pending(&mut client);
+ assert_not_writable_by_polling!(client);
+
+ // Detect the server disconnect.
+ drop(server);
+ assert_writable_by_polling!(client);
+}
+
+async fn create_pair() -> (TcpStream, TcpStream) {
+ let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = assert_ok!(listener.local_addr());
+ let (client, (server, _)) = assert_ok!(try_join!(TcpStream::connect(&addr), listener.accept()));
+ (client, server)
+}
+
+fn read_until_pending(stream: &mut TcpStream) {
+ let mut buf = vec![0u8; 1024 * 1024];
+ loop {
+ match stream.try_read(&mut buf) {
+ Ok(_) => (),
+ Err(err) => {
+ assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
+ break;
+ }
+ }
+ }
+}
+
+fn write_until_pending(stream: &mut TcpStream) {
+ let buf = vec![0u8; 1024 * 1024];
+ loop {
+ match stream.try_write(&buf) {
+ Ok(_) => (),
+ Err(err) => {
+ assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
+ break;
+ }
+ }
+ }
+}
+
+#[tokio::test]
+async fn try_read_buf() {
+ const DATA: &[u8] = b"this is some data to write to the socket";
+
+ // Create listener
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+
+ // Create socket pair
+ let client = TcpStream::connect(listener.local_addr().unwrap())
+ .await
+ .unwrap();
+ let (server, _) = listener.accept().await.unwrap();
+ let mut written = DATA.to_vec();
+
+ // Track the server receiving data
+ let mut readable = task::spawn(server.readable());
+ assert_pending!(readable.poll());
+
+ // Write data.
+ client.writable().await.unwrap();
+ assert_eq!(DATA.len(), client.try_write(DATA).unwrap());
+
+ // The task should be notified
+ while !readable.is_woken() {
+ tokio::task::yield_now().await;
+ }
+
+ // Fill the write buffer
+ loop {
+ // Still ready
+ let mut writable = task::spawn(client.writable());
+ assert_ready_ok!(writable.poll());
+
+ match client.try_write(DATA) {
+ Ok(n) => written.extend(&DATA[..n]),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ break;
+ }
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ {
+ // Write buffer full
+ let mut writable = task::spawn(client.writable());
+ assert_pending!(writable.poll());
+
+ // Drain the socket from the server end
+ let mut read = Vec::with_capacity(written.len());
+ let mut i = 0;
+
+ while i < read.capacity() {
+ server.readable().await.unwrap();
+
+ match server.try_read_buf(&mut read) {
+ Ok(n) => i += n,
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ assert_eq!(read, written);
+ }
+
+ // Now, we listen for shutdown
+ drop(client);
+
+ loop {
+ let ready = server.ready(Interest::READABLE).await.unwrap();
+
+ if ready.is_read_closed() {
+ return;
+ } else {
+ tokio::task::yield_now().await;
+ }
+ }
+}