diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio/tests | |
parent | Initial commit. (diff) | |
download | firefox-upstream.tar.xz firefox-upstream.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/tests')
94 files changed, 10269 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/_require_full.rs b/third_party/rust/tokio/tests/_require_full.rs new file mode 100644 index 0000000000..98455bedef --- /dev/null +++ b/third_party/rust/tokio/tests/_require_full.rs @@ -0,0 +1,2 @@ +#![cfg(not(feature = "full"))] +compile_error!("run main Tokio tests with `--features full`"); diff --git a/third_party/rust/tokio/tests/async_send_sync.rs b/third_party/rust/tokio/tests/async_send_sync.rs new file mode 100644 index 0000000000..1fea19c2a7 --- /dev/null +++ b/third_party/rust/tokio/tests/async_send_sync.rs @@ -0,0 +1,258 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::cell::Cell; +use std::io::Cursor; +use std::net::SocketAddr; +use std::rc::Rc; +use tokio::net::TcpStream; +use tokio::time::{Duration, Instant}; + +#[allow(dead_code)] +type BoxFutureSync<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + Sync>>; +#[allow(dead_code)] +type BoxFutureSend<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send>>; +#[allow(dead_code)] +type BoxFuture<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T>>>; + +#[allow(dead_code)] +fn require_send<T: Send>(_t: &T) {} +#[allow(dead_code)] +fn require_sync<T: Sync>(_t: &T) {} + +#[allow(dead_code)] +struct Invalid; + +trait AmbiguousIfSend<A> { + fn some_item(&self) {} +} +impl<T: ?Sized> AmbiguousIfSend<()> for T {} +impl<T: ?Sized + Send> AmbiguousIfSend<Invalid> for T {} + +trait AmbiguousIfSync<A> { + fn some_item(&self) {} +} +impl<T: ?Sized> AmbiguousIfSync<()> for T {} +impl<T: ?Sized + Sync> AmbiguousIfSync<Invalid> for T {} + +macro_rules! into_todo { + ($typ:ty) => {{ + let x: $typ = todo!(); + x + }}; +} +macro_rules! assert_value { + ($type:ty: Send & Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f: $type = todo!(); + require_send(&f); + require_sync(&f); + }; + }; + ($type:ty: !Send & Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f: $type = todo!(); + AmbiguousIfSend::some_item(&f); + require_sync(&f); + }; + }; + ($type:ty: Send & !Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f: $type = todo!(); + require_send(&f); + AmbiguousIfSync::some_item(&f); + }; + }; + ($type:ty: !Send & !Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f: $type = todo!(); + AmbiguousIfSend::some_item(&f); + AmbiguousIfSync::some_item(&f); + }; + }; +} +macro_rules! async_assert_fn { + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + require_send(&f); + require_sync(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & !Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + require_send(&f); + AmbiguousIfSync::some_item(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + AmbiguousIfSend::some_item(&f); + require_sync(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & !Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + AmbiguousIfSend::some_item(&f); + AmbiguousIfSync::some_item(&f); + }; + }; +} + +async_assert_fn!(tokio::io::copy(&mut TcpStream, &mut TcpStream): Send & Sync); +async_assert_fn!(tokio::io::empty(): Send & Sync); +async_assert_fn!(tokio::io::repeat(u8): Send & Sync); +async_assert_fn!(tokio::io::sink(): Send & Sync); +async_assert_fn!(tokio::io::split(TcpStream): Send & Sync); +async_assert_fn!(tokio::io::stderr(): Send & Sync); +async_assert_fn!(tokio::io::stdin(): Send & Sync); +async_assert_fn!(tokio::io::stdout(): Send & Sync); +async_assert_fn!(tokio::io::Split<Cursor<Vec<u8>>>::next_segment(_): Send & Sync); + +async_assert_fn!(tokio::fs::canonicalize(&str): Send & Sync); +async_assert_fn!(tokio::fs::copy(&str, &str): Send & Sync); +async_assert_fn!(tokio::fs::create_dir(&str): Send & Sync); +async_assert_fn!(tokio::fs::create_dir_all(&str): Send & Sync); +async_assert_fn!(tokio::fs::hard_link(&str, &str): Send & Sync); +async_assert_fn!(tokio::fs::metadata(&str): Send & Sync); +async_assert_fn!(tokio::fs::read(&str): Send & Sync); +async_assert_fn!(tokio::fs::read_dir(&str): Send & Sync); +async_assert_fn!(tokio::fs::read_link(&str): Send & Sync); +async_assert_fn!(tokio::fs::read_to_string(&str): Send & Sync); +async_assert_fn!(tokio::fs::remove_dir(&str): Send & Sync); +async_assert_fn!(tokio::fs::remove_dir_all(&str): Send & Sync); +async_assert_fn!(tokio::fs::remove_file(&str): Send & Sync); +async_assert_fn!(tokio::fs::rename(&str, &str): Send & Sync); +async_assert_fn!(tokio::fs::set_permissions(&str, std::fs::Permissions): Send & Sync); +async_assert_fn!(tokio::fs::symlink_metadata(&str): Send & Sync); +async_assert_fn!(tokio::fs::write(&str, Vec<u8>): Send & Sync); +async_assert_fn!(tokio::fs::ReadDir::next_entry(_): Send & Sync); +async_assert_fn!(tokio::fs::OpenOptions::open(_, &str): Send & Sync); +async_assert_fn!(tokio::fs::DirEntry::metadata(_): Send & Sync); +async_assert_fn!(tokio::fs::DirEntry::file_type(_): Send & Sync); + +async_assert_fn!(tokio::fs::File::open(&str): Send & Sync); +async_assert_fn!(tokio::fs::File::create(&str): Send & Sync); +async_assert_fn!(tokio::fs::File::seek(_, std::io::SeekFrom): Send & Sync); +async_assert_fn!(tokio::fs::File::sync_all(_): Send & Sync); +async_assert_fn!(tokio::fs::File::sync_data(_): Send & Sync); +async_assert_fn!(tokio::fs::File::set_len(_, u64): Send & Sync); +async_assert_fn!(tokio::fs::File::metadata(_): Send & Sync); +async_assert_fn!(tokio::fs::File::try_clone(_): Send & Sync); +async_assert_fn!(tokio::fs::File::into_std(_): Send & Sync); +async_assert_fn!(tokio::fs::File::set_permissions(_, std::fs::Permissions): Send & Sync); + +async_assert_fn!(tokio::net::lookup_host(SocketAddr): Send & Sync); +async_assert_fn!(tokio::net::TcpListener::bind(SocketAddr): Send & Sync); +async_assert_fn!(tokio::net::TcpListener::accept(_): Send & Sync); +async_assert_fn!(tokio::net::TcpStream::connect(SocketAddr): Send & Sync); +async_assert_fn!(tokio::net::TcpStream::peek(_, &mut [u8]): Send & Sync); +async_assert_fn!(tokio::net::tcp::ReadHalf::peek(_, &mut [u8]): Send & Sync); +async_assert_fn!(tokio::net::UdpSocket::bind(SocketAddr): Send & Sync); +async_assert_fn!(tokio::net::UdpSocket::connect(_, SocketAddr): Send & Sync); +async_assert_fn!(tokio::net::UdpSocket::send(_, &[u8]): Send & Sync); +async_assert_fn!(tokio::net::UdpSocket::recv(_, &mut [u8]): Send & Sync); +async_assert_fn!(tokio::net::UdpSocket::send_to(_, &[u8], SocketAddr): Send & Sync); +async_assert_fn!(tokio::net::UdpSocket::recv_from(_, &mut [u8]): Send & Sync); +async_assert_fn!(tokio::net::udp::RecvHalf::recv(_, &mut [u8]): Send & Sync); +async_assert_fn!(tokio::net::udp::RecvHalf::recv_from(_, &mut [u8]): Send & Sync); +async_assert_fn!(tokio::net::udp::SendHalf::send(_, &[u8]): Send & Sync); +async_assert_fn!(tokio::net::udp::SendHalf::send_to(_, &[u8], &SocketAddr): Send & Sync); + +#[cfg(unix)] +mod unix_datagram { + use super::*; + async_assert_fn!(tokio::net::UnixListener::bind(&str): Send & Sync); + async_assert_fn!(tokio::net::UnixListener::accept(_): Send & Sync); + async_assert_fn!(tokio::net::UnixDatagram::send(_, &[u8]): Send & Sync); + async_assert_fn!(tokio::net::UnixDatagram::recv(_, &mut [u8]): Send & Sync); + async_assert_fn!(tokio::net::UnixDatagram::send_to(_, &[u8], &str): Send & Sync); + async_assert_fn!(tokio::net::UnixDatagram::recv_from(_, &mut [u8]): Send & Sync); + async_assert_fn!(tokio::net::UnixStream::connect(&str): Send & Sync); +} + +async_assert_fn!(tokio::process::Child::wait_with_output(_): Send & Sync); +async_assert_fn!(tokio::signal::ctrl_c(): Send & Sync); +#[cfg(unix)] +async_assert_fn!(tokio::signal::unix::Signal::recv(_): Send & Sync); + +async_assert_fn!(tokio::stream::empty<Rc<u8>>(): Send & Sync); +async_assert_fn!(tokio::stream::pending<Rc<u8>>(): Send & Sync); +async_assert_fn!(tokio::stream::iter(std::vec::IntoIter<u8>): Send & Sync); + +async_assert_fn!(tokio::sync::Barrier::wait(_): Send & Sync); +async_assert_fn!(tokio::sync::Mutex<u8>::lock(_): Send & Sync); +async_assert_fn!(tokio::sync::Mutex<Cell<u8>>::lock(_): Send & Sync); +async_assert_fn!(tokio::sync::Mutex<Rc<u8>>::lock(_): !Send & !Sync); +async_assert_fn!(tokio::sync::Notify::notified(_): Send & !Sync); +async_assert_fn!(tokio::sync::RwLock<u8>::read(_): Send & Sync); +async_assert_fn!(tokio::sync::RwLock<u8>::write(_): Send & Sync); +async_assert_fn!(tokio::sync::RwLock<Cell<u8>>::read(_): !Send & !Sync); +async_assert_fn!(tokio::sync::RwLock<Cell<u8>>::write(_): !Send & !Sync); +async_assert_fn!(tokio::sync::RwLock<Rc<u8>>::read(_): !Send & !Sync); +async_assert_fn!(tokio::sync::RwLock<Rc<u8>>::write(_): !Send & !Sync); +async_assert_fn!(tokio::sync::Semaphore::acquire(_): Send & Sync); + +async_assert_fn!(tokio::sync::broadcast::Receiver<u8>::recv(_): Send & Sync); +async_assert_fn!(tokio::sync::broadcast::Receiver<Cell<u8>>::recv(_): Send & Sync); +async_assert_fn!(tokio::sync::broadcast::Receiver<Rc<u8>>::recv(_): !Send & !Sync); + +async_assert_fn!(tokio::sync::mpsc::Receiver<u8>::recv(_): Send & Sync); +async_assert_fn!(tokio::sync::mpsc::Receiver<Cell<u8>>::recv(_): Send & Sync); +async_assert_fn!(tokio::sync::mpsc::Receiver<Rc<u8>>::recv(_): !Send & !Sync); +async_assert_fn!(tokio::sync::mpsc::Sender<u8>::send(_, u8): Send & Sync); +async_assert_fn!(tokio::sync::mpsc::Sender<Cell<u8>>::send(_, Cell<u8>): Send & !Sync); +async_assert_fn!(tokio::sync::mpsc::Sender<Rc<u8>>::send(_, Rc<u8>): !Send & !Sync); + +async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<u8>::recv(_): Send & Sync); +async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<Cell<u8>>::recv(_): Send & Sync); +async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver<Rc<u8>>::recv(_): !Send & !Sync); + +async_assert_fn!(tokio::sync::watch::Receiver<u8>::recv(_): Send & Sync); +async_assert_fn!(tokio::sync::watch::Receiver<Cell<u8>>::recv(_): !Send & !Sync); +async_assert_fn!(tokio::sync::watch::Receiver<Rc<u8>>::recv(_): !Send & !Sync); +async_assert_fn!(tokio::sync::watch::Sender<u8>::closed(_): Send & Sync); +async_assert_fn!(tokio::sync::watch::Sender<Cell<u8>>::closed(_): !Send & !Sync); +async_assert_fn!(tokio::sync::watch::Sender<Rc<u8>>::closed(_): !Send & !Sync); + +async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFutureSync<()>): Send & Sync); +async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFutureSend<()>): Send & !Sync); +async_assert_fn!(tokio::task::LocalKey<u32>::scope(_, u32, BoxFuture<()>): !Send & !Sync); +async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFutureSync<()>): Send & !Sync); +async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFutureSend<()>): Send & !Sync); +async_assert_fn!(tokio::task::LocalKey<Cell<u32>>::scope(_, Cell<u32>, BoxFuture<()>): !Send & !Sync); +async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFutureSync<()>): !Send & !Sync); +async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFutureSend<()>): !Send & !Sync); +async_assert_fn!(tokio::task::LocalKey<Rc<u32>>::scope(_, Rc<u32>, BoxFuture<()>): !Send & !Sync); +async_assert_fn!(tokio::task::LocalSet::run_until(_, BoxFutureSync<()>): !Send & !Sync); +assert_value!(tokio::task::LocalSet: !Send & !Sync); + +async_assert_fn!(tokio::time::advance(Duration): Send & Sync); +async_assert_fn!(tokio::time::delay_for(Duration): Send & Sync); +async_assert_fn!(tokio::time::delay_until(Instant): Send & Sync); +async_assert_fn!(tokio::time::timeout(Duration, BoxFutureSync<()>): Send & Sync); +async_assert_fn!(tokio::time::timeout(Duration, BoxFutureSend<()>): Send & !Sync); +async_assert_fn!(tokio::time::timeout(Duration, BoxFuture<()>): !Send & !Sync); +async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSync<()>): Send & Sync); +async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSend<()>): Send & !Sync); +async_assert_fn!(tokio::time::timeout_at(Instant, BoxFuture<()>): !Send & !Sync); +async_assert_fn!(tokio::time::Interval::tick(_): Send & Sync); diff --git a/third_party/rust/tokio/tests/buffered.rs b/third_party/rust/tokio/tests/buffered.rs new file mode 100644 index 0000000000..595f855a0f --- /dev/null +++ b/third_party/rust/tokio/tests/buffered.rs @@ -0,0 +1,51 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::net::TcpListener; +use tokio::prelude::*; +use tokio_test::assert_ok; + +use std::io::prelude::*; +use std::net::TcpStream; +use std::thread; + +#[tokio::test] +async fn echo_server() { + const N: usize = 1024; + + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + + let msg = "foo bar baz"; + + let t = thread::spawn(move || { + let mut s = assert_ok!(TcpStream::connect(&addr)); + + let t2 = thread::spawn(move || { + let mut s = assert_ok!(TcpStream::connect(&addr)); + let mut b = vec![0; msg.len() * N]; + assert_ok!(s.read_exact(&mut b)); + b + }); + + let mut expected = Vec::<u8>::new(); + for _i in 0..N { + expected.extend(msg.as_bytes()); + let res = assert_ok!(s.write(msg.as_bytes())); + assert_eq!(res, msg.len()); + } + + (expected, t2) + }); + + let (mut a, _) = assert_ok!(srv.accept().await); + let (mut b, _) = assert_ok!(srv.accept().await); + + let n = assert_ok!(io::copy(&mut a, &mut b).await); + + let (expected, t2) = t.join().unwrap(); + let actual = t2.join().unwrap(); + + assert!(expected == actual); + assert_eq!(n, msg.len() as u64 * 1024); +} diff --git a/third_party/rust/tokio/tests/fs.rs b/third_party/rust/tokio/tests/fs.rs new file mode 100644 index 0000000000..13c44c08d6 --- /dev/null +++ b/third_party/rust/tokio/tests/fs.rs @@ -0,0 +1,20 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::fs; +use tokio_test::assert_ok; + +#[tokio::test] +async fn path_read_write() { + let temp = tempdir(); + let dir = temp.path(); + + assert_ok!(fs::write(dir.join("bar"), b"bytes").await); + let out = assert_ok!(fs::read(dir.join("bar")).await); + + assert_eq!(out, b"bytes"); +} + +fn tempdir() -> tempfile::TempDir { + tempfile::tempdir().unwrap() +} diff --git a/third_party/rust/tokio/tests/fs_copy.rs b/third_party/rust/tokio/tests/fs_copy.rs new file mode 100644 index 0000000000..8d1632013e --- /dev/null +++ b/third_party/rust/tokio/tests/fs_copy.rs @@ -0,0 +1,39 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tempfile::tempdir; +use tokio::fs; + +#[tokio::test] +async fn copy() { + let dir = tempdir().unwrap(); + + let source_path = dir.path().join("foo.txt"); + let dest_path = dir.path().join("bar.txt"); + + fs::write(&source_path, b"Hello File!").await.unwrap(); + fs::copy(&source_path, &dest_path).await.unwrap(); + + let from = fs::read(&source_path).await.unwrap(); + let to = fs::read(&dest_path).await.unwrap(); + + assert_eq!(from, to); +} + +#[tokio::test] +async fn copy_permissions() { + let dir = tempdir().unwrap(); + let from_path = dir.path().join("foo.txt"); + let to_path = dir.path().join("bar.txt"); + + let from = tokio::fs::File::create(&from_path).await.unwrap(); + let mut from_perms = from.metadata().await.unwrap().permissions(); + from_perms.set_readonly(true); + from.set_permissions(from_perms.clone()).await.unwrap(); + + tokio::fs::copy(from_path, &to_path).await.unwrap(); + + let to_perms = tokio::fs::metadata(to_path).await.unwrap().permissions(); + + assert_eq!(from_perms, to_perms); +} diff --git a/third_party/rust/tokio/tests/fs_dir.rs b/third_party/rust/tokio/tests/fs_dir.rs new file mode 100644 index 0000000000..eaff59da4f --- /dev/null +++ b/third_party/rust/tokio/tests/fs_dir.rs @@ -0,0 +1,102 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::fs; +use tokio_test::assert_ok; + +use std::sync::{Arc, Mutex}; +use tempfile::tempdir; + +#[tokio::test] +async fn create_dir() { + let base_dir = tempdir().unwrap(); + let new_dir = base_dir.path().join("foo"); + let new_dir_2 = new_dir.clone(); + + assert_ok!(fs::create_dir(new_dir).await); + + assert!(new_dir_2.is_dir()); +} + +#[tokio::test] +async fn create_all() { + let base_dir = tempdir().unwrap(); + let new_dir = base_dir.path().join("foo").join("bar"); + let new_dir_2 = new_dir.clone(); + + assert_ok!(fs::create_dir_all(new_dir).await); + assert!(new_dir_2.is_dir()); +} + +#[tokio::test] +async fn remove() { + let base_dir = tempdir().unwrap(); + let new_dir = base_dir.path().join("foo"); + let new_dir_2 = new_dir.clone(); + + std::fs::create_dir(new_dir.clone()).unwrap(); + + assert_ok!(fs::remove_dir(new_dir).await); + assert!(!new_dir_2.exists()); +} + +#[tokio::test] +async fn read_inherent() { + let base_dir = tempdir().unwrap(); + + let p = base_dir.path(); + std::fs::create_dir(p.join("aa")).unwrap(); + std::fs::create_dir(p.join("bb")).unwrap(); + std::fs::create_dir(p.join("cc")).unwrap(); + + let files = Arc::new(Mutex::new(Vec::new())); + + let f = files.clone(); + let p = p.to_path_buf(); + + let mut entries = fs::read_dir(p).await.unwrap(); + + while let Some(e) = assert_ok!(entries.next_entry().await) { + let s = e.file_name().to_str().unwrap().to_string(); + f.lock().unwrap().push(s); + } + + let mut files = files.lock().unwrap(); + files.sort(); // because the order is not guaranteed + assert_eq!( + *files, + vec!["aa".to_string(), "bb".to_string(), "cc".to_string()] + ); +} + +#[tokio::test] +async fn read_stream() { + use tokio::stream::StreamExt; + + let base_dir = tempdir().unwrap(); + + let p = base_dir.path(); + std::fs::create_dir(p.join("aa")).unwrap(); + std::fs::create_dir(p.join("bb")).unwrap(); + std::fs::create_dir(p.join("cc")).unwrap(); + + let files = Arc::new(Mutex::new(Vec::new())); + + let f = files.clone(); + let p = p.to_path_buf(); + + let mut entries = fs::read_dir(p).await.unwrap(); + + while let Some(res) = entries.next().await { + let e = assert_ok!(res); + let s = e.file_name().to_str().unwrap().to_string(); + f.lock().unwrap().push(s); + } + + let mut files = files.lock().unwrap(); + files.sort(); // because the order is not guaranteed + assert_eq!( + *files, + vec!["aa".to_string(), "bb".to_string(), "cc".to_string()] + ); +} diff --git a/third_party/rust/tokio/tests/fs_file.rs b/third_party/rust/tokio/tests/fs_file.rs new file mode 100644 index 0000000000..eee9a5b5c5 --- /dev/null +++ b/third_party/rust/tokio/tests/fs_file.rs @@ -0,0 +1,87 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::fs::File; +use tokio::prelude::*; +use tokio_test::task; + +use std::io::prelude::*; +use tempfile::NamedTempFile; + +const HELLO: &[u8] = b"hello world..."; + +#[tokio::test] +async fn basic_read() { + let mut tempfile = tempfile(); + tempfile.write_all(HELLO).unwrap(); + + let mut file = File::open(tempfile.path()).await.unwrap(); + + let mut buf = [0; 1024]; + let n = file.read(&mut buf).await.unwrap(); + + assert_eq!(n, HELLO.len()); + assert_eq!(&buf[..n], HELLO); +} + +#[tokio::test] +async fn basic_write() { + let tempfile = tempfile(); + + let mut file = File::create(tempfile.path()).await.unwrap(); + + file.write_all(HELLO).await.unwrap(); + file.flush().await.unwrap(); + + let file = std::fs::read(tempfile.path()).unwrap(); + assert_eq!(file, HELLO); +} + +#[tokio::test] +async fn coop() { + let mut tempfile = tempfile(); + tempfile.write_all(HELLO).unwrap(); + + let mut task = task::spawn(async { + let mut file = File::open(tempfile.path()).await.unwrap(); + + let mut buf = [0; 1024]; + + loop { + file.read(&mut buf).await.unwrap(); + file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + } + }); + + for _ in 0..1_000 { + if task.poll().is_pending() { + return; + } + } + + panic!("did not yield"); +} + +fn tempfile() -> NamedTempFile { + NamedTempFile::new().unwrap() +} + +#[tokio::test] +#[cfg(unix)] +async fn unix_fd() { + use std::os::unix::io::AsRawFd; + let tempfile = tempfile(); + + let file = File::create(tempfile.path()).await.unwrap(); + assert!(file.as_raw_fd() as u64 > 0); +} + +#[tokio::test] +#[cfg(windows)] +async fn windows_handle() { + use std::os::windows::io::AsRawHandle; + let tempfile = tempfile(); + + let file = File::create(tempfile.path()).await.unwrap(); + assert!(file.as_raw_handle() as u64 > 0); +} diff --git a/third_party/rust/tokio/tests/fs_file_mocked.rs b/third_party/rust/tokio/tests/fs_file_mocked.rs new file mode 100644 index 0000000000..0c5722404e --- /dev/null +++ b/third_party/rust/tokio/tests/fs_file_mocked.rs @@ -0,0 +1,777 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +macro_rules! ready { + ($e:expr $(,)?) => { + match $e { + std::task::Poll::Ready(t) => t, + std::task::Poll::Pending => return std::task::Poll::Pending, + } + }; +} + +#[macro_export] +macro_rules! cfg_fs { + ($($item:item)*) => { $($item)* } +} + +#[macro_export] +macro_rules! cfg_io_std { + ($($item:item)*) => { $($item)* } +} + +use futures::future; + +// Load source +#[allow(warnings)] +#[path = "../src/fs/file.rs"] +mod file; +use file::File; + +#[allow(warnings)] +#[path = "../src/io/blocking.rs"] +mod blocking; + +// Load mocked types +mod support { + pub(crate) mod mock_file; + pub(crate) mod mock_pool; +} +pub(crate) use support::mock_pool as pool; + +// Place them where the source expects them +pub(crate) mod io { + pub(crate) use tokio::io::*; + + pub(crate) use crate::blocking; + + pub(crate) mod sys { + pub(crate) use crate::support::mock_pool::{run, Blocking}; + } +} +pub(crate) mod fs { + pub(crate) mod sys { + pub(crate) use crate::support::mock_file::File; + pub(crate) use crate::support::mock_pool::{run, Blocking}; + } + + pub(crate) use crate::support::mock_pool::asyncify; +} +use fs::sys; + +use tokio::prelude::*; +use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; + +use std::io::SeekFrom; + +const HELLO: &[u8] = b"hello world..."; +const FOO: &[u8] = b"foo bar baz..."; + +#[test] +fn open_read() { + let (mock, file) = sys::File::mock(); + mock.read(HELLO); + + let mut file = File::from_std(file); + + let mut buf = [0; 1024]; + let mut t = task::spawn(file.read(&mut buf)); + + assert_eq!(0, pool::len()); + assert_pending!(t.poll()); + + assert_eq!(1, mock.remaining()); + assert_eq!(1, pool::len()); + + pool::run_one(); + + assert_eq!(0, mock.remaining()); + assert!(t.is_woken()); + + let n = assert_ready_ok!(t.poll()); + assert_eq!(n, HELLO.len()); + assert_eq!(&buf[..n], HELLO); +} + +#[test] +fn read_twice_before_dispatch() { + let (mock, file) = sys::File::mock(); + mock.read(HELLO); + + let mut file = File::from_std(file); + + let mut buf = [0; 1024]; + let mut t = task::spawn(file.read(&mut buf)); + + assert_pending!(t.poll()); + assert_pending!(t.poll()); + + assert_eq!(pool::len(), 1); + pool::run_one(); + + assert!(t.is_woken()); + + let n = assert_ready_ok!(t.poll()); + assert_eq!(&buf[..n], HELLO); +} + +#[test] +fn read_with_smaller_buf() { + let (mock, file) = sys::File::mock(); + mock.read(HELLO); + + let mut file = File::from_std(file); + + { + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + } + + pool::run_one(); + + { + let mut buf = [0; 4]; + let mut t = task::spawn(file.read(&mut buf)); + let n = assert_ready_ok!(t.poll()); + assert_eq!(n, 4); + assert_eq!(&buf[..], &HELLO[..n]); + } + + // Calling again immediately succeeds with the rest of the buffer + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + let n = assert_ready_ok!(t.poll()); + assert_eq!(n, 10); + assert_eq!(&buf[..n], &HELLO[4..]); + + assert_eq!(0, pool::len()); +} + +#[test] +fn read_with_bigger_buf() { + let (mock, file) = sys::File::mock(); + mock.read(&HELLO[..4]).read(&HELLO[4..]); + + let mut file = File::from_std(file); + + { + let mut buf = [0; 4]; + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + } + + pool::run_one(); + + { + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + let n = assert_ready_ok!(t.poll()); + assert_eq!(n, 4); + assert_eq!(&buf[..n], &HELLO[..n]); + } + + // Calling again immediately succeeds with the rest of the buffer + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + + let n = assert_ready_ok!(t.poll()); + assert_eq!(n, 10); + assert_eq!(&buf[..n], &HELLO[4..]); + + assert_eq!(0, pool::len()); +} + +#[test] +fn read_err_then_read_success() { + let (mock, file) = sys::File::mock(); + mock.read_err().read(&HELLO); + + let mut file = File::from_std(file); + + { + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + + pool::run_one(); + + assert_ready_err!(t.poll()); + } + + { + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + + pool::run_one(); + + let n = assert_ready_ok!(t.poll()); + + assert_eq!(n, HELLO.len()); + assert_eq!(&buf[..n], HELLO); + } +} + +#[test] +fn open_write() { + let (mock, file) = sys::File::mock(); + mock.write(HELLO); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + + assert_eq!(0, pool::len()); + assert_ready_ok!(t.poll()); + + assert_eq!(1, mock.remaining()); + assert_eq!(1, pool::len()); + + pool::run_one(); + + assert_eq!(0, mock.remaining()); + assert!(!t.is_woken()); + + let mut t = task::spawn(file.flush()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn flush_while_idle() { + let (_mock, file) = sys::File::mock(); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.flush()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn read_with_buffer_larger_than_max() { + // Chunks + let a = 16 * 1024; + let b = a * 2; + let c = a * 3; + let d = a * 4; + + assert_eq!(d / 1024, 64); + + let mut data = vec![]; + for i in 0..(d - 1) { + data.push((i % 151) as u8); + } + + let (mock, file) = sys::File::mock(); + mock.read(&data[0..a]) + .read(&data[a..b]) + .read(&data[b..c]) + .read(&data[c..]); + + let mut file = File::from_std(file); + + let mut actual = vec![0; d]; + let mut pos = 0; + + while pos < data.len() { + let mut t = task::spawn(file.read(&mut actual[pos..])); + + assert_pending!(t.poll()); + pool::run_one(); + assert!(t.is_woken()); + + let n = assert_ready_ok!(t.poll()); + assert!(n <= a); + + pos += n; + } + + assert_eq!(mock.remaining(), 0); + assert_eq!(data, &actual[..data.len()]); +} + +#[test] +fn write_with_buffer_larger_than_max() { + // Chunks + let a = 16 * 1024; + let b = a * 2; + let c = a * 3; + let d = a * 4; + + assert_eq!(d / 1024, 64); + + let mut data = vec![]; + for i in 0..(d - 1) { + data.push((i % 151) as u8); + } + + let (mock, file) = sys::File::mock(); + mock.write(&data[0..a]) + .write(&data[a..b]) + .write(&data[b..c]) + .write(&data[c..]); + + let mut file = File::from_std(file); + + let mut rem = &data[..]; + + let mut first = true; + + while !rem.is_empty() { + let mut t = task::spawn(file.write(rem)); + + if !first { + assert_pending!(t.poll()); + pool::run_one(); + assert!(t.is_woken()); + } + + first = false; + + let n = assert_ready_ok!(t.poll()); + + rem = &rem[n..]; + } + + pool::run_one(); + + assert_eq!(mock.remaining(), 0); +} + +#[test] +fn write_twice_before_dispatch() { + let (mock, file) = sys::File::mock(); + mock.write(HELLO).write(FOO); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.write(FOO)); + assert_pending!(t.poll()); + + assert_eq!(pool::len(), 1); + pool::run_one(); + + assert!(t.is_woken()); + + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.flush()); + assert_pending!(t.poll()); + + assert_eq!(pool::len(), 1); + pool::run_one(); + + assert!(t.is_woken()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn incomplete_read_followed_by_write() { + let (mock, file) = sys::File::mock(); + mock.read(HELLO) + .seek_current_ok(-(HELLO.len() as i64), 0) + .write(FOO); + + let mut file = File::from_std(file); + + let mut buf = [0; 32]; + + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + + pool::run_one(); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_ok!(t.poll()); + + assert_eq!(pool::len(), 1); + pool::run_one(); + + let mut t = task::spawn(file.flush()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn incomplete_partial_read_followed_by_write() { + let (mock, file) = sys::File::mock(); + mock.read(HELLO).seek_current_ok(-10, 0).write(FOO); + + let mut file = File::from_std(file); + + let mut buf = [0; 32]; + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + + pool::run_one(); + + let mut buf = [0; 4]; + let mut t = task::spawn(file.read(&mut buf)); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_ok!(t.poll()); + + assert_eq!(pool::len(), 1); + pool::run_one(); + + let mut t = task::spawn(file.flush()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn incomplete_read_followed_by_flush() { + let (mock, file) = sys::File::mock(); + mock.read(HELLO) + .seek_current_ok(-(HELLO.len() as i64), 0) + .write(FOO); + + let mut file = File::from_std(file); + + let mut buf = [0; 32]; + + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + + pool::run_one(); + + let mut t = task::spawn(file.flush()); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); +} + +#[test] +fn incomplete_flush_followed_by_write() { + let (mock, file) = sys::File::mock(); + mock.write(HELLO).write(FOO); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + let n = assert_ready_ok!(t.poll()); + assert_eq!(n, HELLO.len()); + + let mut t = task::spawn(file.flush()); + assert_pending!(t.poll()); + + // TODO: Move under write + pool::run_one(); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); + + let mut t = task::spawn(file.flush()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn read_err() { + let (mock, file) = sys::File::mock(); + mock.read_err(); + + let mut file = File::from_std(file); + + let mut buf = [0; 1024]; + let mut t = task::spawn(file.read(&mut buf)); + + assert_pending!(t.poll()); + + pool::run_one(); + assert!(t.is_woken()); + + assert_ready_err!(t.poll()); +} + +#[test] +fn write_write_err() { + let (mock, file) = sys::File::mock(); + mock.write_err(); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_err!(t.poll()); +} + +#[test] +fn write_read_write_err() { + let (mock, file) = sys::File::mock(); + mock.write_err().read(HELLO); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); + + let mut buf = [0; 1024]; + let mut t = task::spawn(file.read(&mut buf)); + + assert_pending!(t.poll()); + + pool::run_one(); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_err!(t.poll()); +} + +#[test] +fn write_read_flush_err() { + let (mock, file) = sys::File::mock(); + mock.write_err().read(HELLO); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); + + let mut buf = [0; 1024]; + let mut t = task::spawn(file.read(&mut buf)); + + assert_pending!(t.poll()); + + pool::run_one(); + + let mut t = task::spawn(file.flush()); + assert_ready_err!(t.poll()); +} + +#[test] +fn write_seek_write_err() { + let (mock, file) = sys::File::mock(); + mock.write_err().seek_start_ok(0); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); + + { + let mut t = task::spawn(file.seek(SeekFrom::Start(0))); + assert_pending!(t.poll()); + } + + pool::run_one(); + + let mut t = task::spawn(file.write(FOO)); + assert_ready_err!(t.poll()); +} + +#[test] +fn write_seek_flush_err() { + let (mock, file) = sys::File::mock(); + mock.write_err().seek_start_ok(0); + + let mut file = File::from_std(file); + + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + pool::run_one(); + + { + let mut t = task::spawn(file.seek(SeekFrom::Start(0))); + assert_pending!(t.poll()); + } + + pool::run_one(); + + let mut t = task::spawn(file.flush()); + assert_ready_err!(t.poll()); +} + +#[test] +fn sync_all_ordered_after_write() { + let (mock, file) = sys::File::mock(); + mock.write(HELLO).sync_all(); + + let mut file = File::from_std(file); + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.sync_all()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn sync_all_err_ordered_after_write() { + let (mock, file) = sys::File::mock(); + mock.write(HELLO).sync_all_err(); + + let mut file = File::from_std(file); + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.sync_all()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_ready_err!(t.poll()); +} + +#[test] +fn sync_data_ordered_after_write() { + let (mock, file) = sys::File::mock(); + mock.write(HELLO).sync_data(); + + let mut file = File::from_std(file); + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.sync_data()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn sync_data_err_ordered_after_write() { + let (mock, file) = sys::File::mock(); + mock.write(HELLO).sync_data_err(); + + let mut file = File::from_std(file); + let mut t = task::spawn(file.write(HELLO)); + assert_ready_ok!(t.poll()); + + let mut t = task::spawn(file.sync_data()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_pending!(t.poll()); + + assert_eq!(1, pool::len()); + pool::run_one(); + + assert!(t.is_woken()); + assert_ready_err!(t.poll()); +} + +#[test] +fn open_set_len_ok() { + let (mock, file) = sys::File::mock(); + mock.set_len(123); + + let mut file = File::from_std(file); + let mut t = task::spawn(file.set_len(123)); + + assert_pending!(t.poll()); + assert_eq!(1, mock.remaining()); + + pool::run_one(); + assert_eq!(0, mock.remaining()); + + assert!(t.is_woken()); + assert_ready_ok!(t.poll()); +} + +#[test] +fn open_set_len_err() { + let (mock, file) = sys::File::mock(); + mock.set_len_err(123); + + let mut file = File::from_std(file); + let mut t = task::spawn(file.set_len(123)); + + assert_pending!(t.poll()); + assert_eq!(1, mock.remaining()); + + pool::run_one(); + assert_eq!(0, mock.remaining()); + + assert!(t.is_woken()); + assert_ready_err!(t.poll()); +} + +#[test] +fn partial_read_set_len_ok() { + let (mock, file) = sys::File::mock(); + mock.read(HELLO) + .seek_current_ok(-14, 0) + .set_len(123) + .read(FOO); + + let mut buf = [0; 32]; + let mut file = File::from_std(file); + + { + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + } + + pool::run_one(); + + { + let mut t = task::spawn(file.set_len(123)); + + assert_pending!(t.poll()); + pool::run_one(); + assert_ready_ok!(t.poll()); + } + + let mut t = task::spawn(file.read(&mut buf)); + assert_pending!(t.poll()); + pool::run_one(); + let n = assert_ready_ok!(t.poll()); + + assert_eq!(n, FOO.len()); + assert_eq!(&buf[..n], FOO); +} diff --git a/third_party/rust/tokio/tests/fs_link.rs b/third_party/rust/tokio/tests/fs_link.rs new file mode 100644 index 0000000000..cbbe27efe4 --- /dev/null +++ b/third_party/rust/tokio/tests/fs_link.rs @@ -0,0 +1,70 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::fs; + +use std::io::prelude::*; +use std::io::BufReader; +use tempfile::tempdir; + +#[tokio::test] +async fn test_hard_link() { + let dir = tempdir().unwrap(); + let src = dir.path().join("src.txt"); + let dst = dir.path().join("dst.txt"); + + { + let mut file = std::fs::File::create(&src).unwrap(); + file.write_all(b"hello").unwrap(); + } + + let dst_2 = dst.clone(); + + assert!(fs::hard_link(src, dst_2.clone()).await.is_ok()); + + let mut content = String::new(); + + { + let file = std::fs::File::open(dst).unwrap(); + let mut reader = BufReader::new(file); + reader.read_to_string(&mut content).unwrap(); + } + + assert!(content == "hello"); +} + +#[cfg(unix)] +#[tokio::test] +async fn test_symlink() { + let dir = tempdir().unwrap(); + let src = dir.path().join("src.txt"); + let dst = dir.path().join("dst.txt"); + + { + let mut file = std::fs::File::create(&src).unwrap(); + file.write_all(b"hello").unwrap(); + } + + let src_2 = src.clone(); + let dst_2 = dst.clone(); + + assert!(fs::os::unix::symlink(src_2.clone(), dst_2.clone()) + .await + .is_ok()); + + let mut content = String::new(); + + { + let file = std::fs::File::open(dst.clone()).unwrap(); + let mut reader = BufReader::new(file); + reader.read_to_string(&mut content).unwrap(); + } + + assert!(content == "hello"); + + let read = fs::read_link(dst.clone()).await.unwrap(); + assert!(read == src); + + let symlink_meta = fs::symlink_metadata(dst.clone()).await.unwrap(); + assert!(symlink_meta.file_type().is_symlink()); +} diff --git a/third_party/rust/tokio/tests/io_async_read.rs b/third_party/rust/tokio/tests/io_async_read.rs new file mode 100644 index 0000000000..20440bbde3 --- /dev/null +++ b/third_party/rust/tokio/tests/io_async_read.rs @@ -0,0 +1,148 @@ +#![allow(clippy::transmute_ptr_to_ptr)] +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::AsyncRead; +use tokio_test::task; +use tokio_test::{assert_ready_err, assert_ready_ok}; + +use bytes::{BufMut, BytesMut}; +use std::io; +use std::mem::MaybeUninit; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[test] +fn assert_obj_safe() { + fn _assert<T>() {} + _assert::<Box<dyn AsyncRead>>(); +} + +#[test] +fn read_buf_success() { + struct Rd; + + impl AsyncRead for Rd { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + buf[0..11].copy_from_slice(b"hello world"); + Poll::Ready(Ok(11)) + } + } + + let mut buf = BytesMut::with_capacity(65); + + task::spawn(Rd).enter(|cx, rd| { + let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf)); + + assert_eq!(11, n); + assert_eq!(buf[..], b"hello world"[..]); + }); +} + +#[test] +fn read_buf_error() { + struct Rd; + + impl AsyncRead for Rd { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + let err = io::ErrorKind::Other.into(); + Poll::Ready(Err(err)) + } + } + + let mut buf = BytesMut::with_capacity(65); + + task::spawn(Rd).enter(|cx, rd| { + let err = assert_ready_err!(rd.poll_read_buf(cx, &mut buf)); + assert_eq!(err.kind(), io::ErrorKind::Other); + }); +} + +#[test] +fn read_buf_no_capacity() { + struct Rd; + + impl AsyncRead for Rd { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + unimplemented!(); + } + } + + let mut buf = [0u8; 0]; + + task::spawn(Rd).enter(|cx, rd| { + let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut &mut buf[..])); + assert_eq!(0, n); + }); +} + +#[test] +fn read_buf_no_uninitialized() { + struct Rd; + + impl AsyncRead for Rd { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + for b in buf { + assert_eq!(0, *b); + } + + Poll::Ready(Ok(0)) + } + } + + let mut buf = BytesMut::with_capacity(64); + + task::spawn(Rd).enter(|cx, rd| { + let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf)); + assert_eq!(0, n); + }); +} + +#[test] +fn read_buf_uninitialized_ok() { + struct Rd; + + impl AsyncRead for Rd { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool { + false + } + + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + assert_eq!(buf[0..11], b"hello world"[..]); + Poll::Ready(Ok(0)) + } + } + + // Can't create BytesMut w/ zero capacity, so fill it up + let mut buf = BytesMut::with_capacity(64); + + unsafe { + let b: &mut [u8] = std::mem::transmute(buf.bytes_mut()); + b[0..11].copy_from_slice(b"hello world"); + } + + task::spawn(Rd).enter(|cx, rd| { + let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf)); + assert_eq!(0, n); + }); +} diff --git a/third_party/rust/tokio/tests/io_chain.rs b/third_party/rust/tokio/tests/io_chain.rs new file mode 100644 index 0000000000..e2d59411a1 --- /dev/null +++ b/third_party/rust/tokio/tests/io_chain.rs @@ -0,0 +1,16 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::AsyncReadExt; +use tokio_test::assert_ok; + +#[tokio::test] +async fn chain() { + let mut buf = Vec::new(); + let rd1: &[u8] = b"hello "; + let rd2: &[u8] = b"world"; + + let mut rd = rd1.chain(rd2); + assert_ok!(rd.read_to_end(&mut buf).await); + assert_eq!(buf, b"hello world"); +} diff --git a/third_party/rust/tokio/tests/io_copy.rs b/third_party/rust/tokio/tests/io_copy.rs new file mode 100644 index 0000000000..c1c6df4eb3 --- /dev/null +++ b/third_party/rust/tokio/tests/io_copy.rs @@ -0,0 +1,36 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::{self, AsyncRead}; +use tokio_test::assert_ok; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[tokio::test] +async fn copy() { + struct Rd(bool); + + impl AsyncRead for Rd { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + if self.0 { + buf[0..11].copy_from_slice(b"hello world"); + self.0 = false; + Poll::Ready(Ok(11)) + } else { + Poll::Ready(Ok(0)) + } + } + } + + let mut rd = Rd(true); + let mut wr = Vec::new(); + + let n = assert_ok!(io::copy(&mut rd, &mut wr).await); + assert_eq!(n, 11); + assert_eq!(wr, b"hello world"); +} diff --git a/third_party/rust/tokio/tests/io_driver.rs b/third_party/rust/tokio/tests/io_driver.rs new file mode 100644 index 0000000000..b85abd8c2a --- /dev/null +++ b/third_party/rust/tokio/tests/io_driver.rs @@ -0,0 +1,88 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::net::TcpListener; +use tokio::runtime; +use tokio_test::{assert_ok, assert_pending}; + +use futures::task::{waker_ref, ArcWake}; +use std::future::Future; +use std::net::TcpStream; +use std::pin::Pin; +use std::sync::{mpsc, Arc, Mutex}; +use std::task::Context; + +struct Task<T> { + future: Mutex<Pin<Box<T>>>, +} + +impl<T: Send> ArcWake for Task<T> { + fn wake_by_ref(_: &Arc<Self>) { + // Do nothing... + } +} + +impl<T> Task<T> { + fn new(future: T) -> Task<T> { + Task { + future: Mutex::new(Box::pin(future)), + } + } +} + +#[test] +fn test_drop_on_notify() { + // When the reactor receives a kernel notification, it notifies the + // task that holds the associated socket. If this notification results in + // the task being dropped, the socket will also be dropped. + // + // Previously, there was a deadlock scenario where the reactor, while + // notifying, held a lock and the task being dropped attempted to acquire + // that same lock in order to clean up state. + // + // To simulate this case, we create a fake executor that does nothing when + // the task is notified. This simulates an executor in the process of + // shutting down. Then, when the task handle is dropped, the task itself is + // dropped. + + let mut rt = runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); + + let (addr_tx, addr_rx) = mpsc::channel(); + + // Define a task that just drains the listener + let task = Arc::new(Task::new(async move { + // Create a listener + let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + + // Send the address + let addr = listener.local_addr().unwrap(); + addr_tx.send(addr).unwrap(); + + loop { + let _ = listener.accept().await; + } + })); + + { + rt.enter(|| { + let waker = waker_ref(&task); + let mut cx = Context::from_waker(&waker); + assert_pending!(task.future.lock().unwrap().as_mut().poll(&mut cx)); + }); + } + + // Get the address + let addr = addr_rx.recv().unwrap(); + + drop(task); + + // Establish a connection to the acceptor + let _s = TcpStream::connect(&addr).unwrap(); + + // Force the reactor to turn + rt.block_on(async {}); +} diff --git a/third_party/rust/tokio/tests/io_driver_drop.rs b/third_party/rust/tokio/tests/io_driver_drop.rs new file mode 100644 index 0000000000..0a5ce62513 --- /dev/null +++ b/third_party/rust/tokio/tests/io_driver_drop.rs @@ -0,0 +1,53 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::net::TcpListener; +use tokio::runtime; +use tokio_test::{assert_err, assert_pending, assert_ready, task}; + +#[test] +fn tcp_doesnt_block() { + let rt = rt(); + + let mut listener = rt.enter(|| { + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + TcpListener::from_std(listener).unwrap() + }); + + drop(rt); + + let mut task = task::spawn(async move { + assert_err!(listener.accept().await); + }); + + assert_ready!(task.poll()); +} + +#[test] +fn drop_wakes() { + let rt = rt(); + + let mut listener = rt.enter(|| { + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + TcpListener::from_std(listener).unwrap() + }); + + let mut task = task::spawn(async move { + assert_err!(listener.accept().await); + }); + + assert_pending!(task.poll()); + + drop(rt); + + assert!(task.is_woken()); + assert_ready!(task.poll()); +} + +fn rt() -> runtime::Runtime { + runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap() +} diff --git a/third_party/rust/tokio/tests/io_lines.rs b/third_party/rust/tokio/tests/io_lines.rs new file mode 100644 index 0000000000..2f6b3393b9 --- /dev/null +++ b/third_party/rust/tokio/tests/io_lines.rs @@ -0,0 +1,35 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::AsyncBufReadExt; +use tokio_test::assert_ok; + +#[tokio::test] +async fn lines_inherent() { + let rd: &[u8] = b"hello\r\nworld\n\n"; + let mut st = rd.lines(); + + let b = assert_ok!(st.next_line().await).unwrap(); + assert_eq!(b, "hello"); + let b = assert_ok!(st.next_line().await).unwrap(); + assert_eq!(b, "world"); + let b = assert_ok!(st.next_line().await).unwrap(); + assert_eq!(b, ""); + assert!(assert_ok!(st.next_line().await).is_none()); +} + +#[tokio::test] +async fn lines_stream() { + use tokio::stream::StreamExt; + + let rd: &[u8] = b"hello\r\nworld\n\n"; + let mut st = rd.lines(); + + let b = assert_ok!(st.next().await.unwrap()); + assert_eq!(b, "hello"); + let b = assert_ok!(st.next().await.unwrap()); + assert_eq!(b, "world"); + let b = assert_ok!(st.next().await.unwrap()); + assert_eq!(b, ""); + assert!(st.next().await.is_none()); +} diff --git a/third_party/rust/tokio/tests/io_read.rs b/third_party/rust/tokio/tests/io_read.rs new file mode 100644 index 0000000000..4791c9a661 --- /dev/null +++ b/third_party/rust/tokio/tests/io_read.rs @@ -0,0 +1,60 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio_test::assert_ok; + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[tokio::test] +async fn read() { + #[derive(Default)] + struct Rd { + poll_cnt: usize, + } + + impl AsyncRead for Rd { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + assert_eq!(0, self.poll_cnt); + self.poll_cnt += 1; + + buf[0..11].copy_from_slice(b"hello world"); + Poll::Ready(Ok(11)) + } + } + + let mut buf = Box::new([0; 11]); + let mut rd = Rd::default(); + + let n = assert_ok!(rd.read(&mut buf[..]).await); + assert_eq!(n, 11); + assert_eq!(buf[..], b"hello world"[..]); +} + +struct BadAsyncRead; + +impl AsyncRead for BadAsyncRead { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + for b in &mut *buf { + *b = b'a'; + } + Poll::Ready(Ok(buf.len() * 2)) + } +} + +#[tokio::test] +#[should_panic] +async fn read_buf_bad_async_read() { + let mut buf = Vec::with_capacity(10); + BadAsyncRead.read_buf(&mut buf).await.unwrap(); +} diff --git a/third_party/rust/tokio/tests/io_read_exact.rs b/third_party/rust/tokio/tests/io_read_exact.rs new file mode 100644 index 0000000000..d0e659bd33 --- /dev/null +++ b/third_party/rust/tokio/tests/io_read_exact.rs @@ -0,0 +1,15 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::AsyncReadExt; +use tokio_test::assert_ok; + +#[tokio::test] +async fn read_exact() { + let mut buf = Box::new([0; 8]); + let mut rd: &[u8] = b"hello world"; + + let n = assert_ok!(rd.read_exact(&mut buf[..]).await); + assert_eq!(n, 8); + assert_eq!(buf[..], b"hello wo"[..]); +} diff --git a/third_party/rust/tokio/tests/io_read_line.rs b/third_party/rust/tokio/tests/io_read_line.rs new file mode 100644 index 0000000000..57ae37cef3 --- /dev/null +++ b/third_party/rust/tokio/tests/io_read_line.rs @@ -0,0 +1,29 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::AsyncBufReadExt; +use tokio_test::assert_ok; + +use std::io::Cursor; + +#[tokio::test] +async fn read_line() { + let mut buf = String::new(); + let mut rd = Cursor::new(b"hello\nworld\n\n"); + + let n = assert_ok!(rd.read_line(&mut buf).await); + assert_eq!(n, 6); + assert_eq!(buf, "hello\n"); + buf.clear(); + let n = assert_ok!(rd.read_line(&mut buf).await); + assert_eq!(n, 6); + assert_eq!(buf, "world\n"); + buf.clear(); + let n = assert_ok!(rd.read_line(&mut buf).await); + assert_eq!(n, 1); + assert_eq!(buf, "\n"); + buf.clear(); + let n = assert_ok!(rd.read_line(&mut buf).await); + assert_eq!(n, 0); + assert_eq!(buf, ""); +} diff --git a/third_party/rust/tokio/tests/io_read_to_end.rs b/third_party/rust/tokio/tests/io_read_to_end.rs new file mode 100644 index 0000000000..ee636ba596 --- /dev/null +++ b/third_party/rust/tokio/tests/io_read_to_end.rs @@ -0,0 +1,15 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::AsyncReadExt; +use tokio_test::assert_ok; + +#[tokio::test] +async fn read_to_end() { + let mut buf = vec![]; + let mut rd: &[u8] = b"hello world"; + + let n = assert_ok!(rd.read_to_end(&mut buf).await); + assert_eq!(n, 11); + assert_eq!(buf[..], b"hello world"[..]); +} diff --git a/third_party/rust/tokio/tests/io_read_to_string.rs b/third_party/rust/tokio/tests/io_read_to_string.rs new file mode 100644 index 0000000000..6b384b8910 --- /dev/null +++ b/third_party/rust/tokio/tests/io_read_to_string.rs @@ -0,0 +1,15 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::AsyncReadExt; +use tokio_test::assert_ok; + +#[tokio::test] +async fn read_to_string() { + let mut buf = String::new(); + let mut rd: &[u8] = b"hello world"; + + let n = assert_ok!(rd.read_to_string(&mut buf).await); + assert_eq!(n, 11); + assert_eq!(buf[..], "hello world"[..]); +} diff --git a/third_party/rust/tokio/tests/io_read_until.rs b/third_party/rust/tokio/tests/io_read_until.rs new file mode 100644 index 0000000000..4e0e0d10d3 --- /dev/null +++ b/third_party/rust/tokio/tests/io_read_until.rs @@ -0,0 +1,23 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::AsyncBufReadExt; +use tokio_test::assert_ok; + +#[tokio::test] +async fn read_until() { + let mut buf = vec![]; + let mut rd: &[u8] = b"hello world"; + + let n = assert_ok!(rd.read_until(b' ', &mut buf).await); + assert_eq!(n, 6); + assert_eq!(buf, b"hello "); + buf.clear(); + let n = assert_ok!(rd.read_until(b' ', &mut buf).await); + assert_eq!(n, 5); + assert_eq!(buf, b"world"); + buf.clear(); + let n = assert_ok!(rd.read_until(b' ', &mut buf).await); + assert_eq!(n, 0); + assert_eq!(buf, []); +} diff --git a/third_party/rust/tokio/tests/io_split.rs b/third_party/rust/tokio/tests/io_split.rs new file mode 100644 index 0000000000..e54bf24852 --- /dev/null +++ b/third_party/rust/tokio/tests/io_split.rs @@ -0,0 +1,78 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::{split, AsyncRead, AsyncWrite, ReadHalf, WriteHalf}; + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +struct RW; + +impl AsyncRead for RW { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + Poll::Ready(Ok(1)) + } +} + +impl AsyncWrite for RW { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + ) -> Poll<Result<usize, io::Error>> { + Poll::Ready(Ok(1)) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + Poll::Ready(Ok(())) + } +} + +#[test] +fn is_send_and_sync() { + fn assert_bound<T: Send + Sync>() {} + + assert_bound::<ReadHalf<RW>>(); + assert_bound::<WriteHalf<RW>>(); +} + +#[test] +fn split_stream_id() { + let (r1, w1) = split(RW); + let (r2, w2) = split(RW); + assert_eq!(r1.is_pair_of(&w1), true); + assert_eq!(r1.is_pair_of(&w2), false); + assert_eq!(r2.is_pair_of(&w2), true); + assert_eq!(r2.is_pair_of(&w1), false); +} + +#[test] +fn unsplit_ok() { + let (r, w) = split(RW); + r.unsplit(w); +} + +#[test] +#[should_panic] +fn unsplit_err1() { + let (r, _) = split(RW); + let (_, w) = split(RW); + r.unsplit(w); +} + +#[test] +#[should_panic] +fn unsplit_err2() { + let (_, w) = split(RW); + let (r, _) = split(RW); + r.unsplit(w); +} diff --git a/third_party/rust/tokio/tests/io_take.rs b/third_party/rust/tokio/tests/io_take.rs new file mode 100644 index 0000000000..683606f727 --- /dev/null +++ b/third_party/rust/tokio/tests/io_take.rs @@ -0,0 +1,16 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::AsyncReadExt; +use tokio_test::assert_ok; + +#[tokio::test] +async fn take() { + let mut buf = [0; 6]; + let rd: &[u8] = b"hello world"; + + let mut rd = rd.take(4); + let n = assert_ok!(rd.read(&mut buf).await); + assert_eq!(n, 4); + assert_eq!(&buf, &b"hell\0\0"[..]); +} diff --git a/third_party/rust/tokio/tests/io_write.rs b/third_party/rust/tokio/tests/io_write.rs new file mode 100644 index 0000000000..96cebc3313 --- /dev/null +++ b/third_party/rust/tokio/tests/io_write.rs @@ -0,0 +1,58 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio_test::assert_ok; + +use bytes::BytesMut; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[tokio::test] +async fn write() { + struct Wr { + buf: BytesMut, + cnt: usize, + } + + impl AsyncWrite for Wr { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + assert_eq!(self.cnt, 0); + self.buf.extend(&buf[0..4]); + Ok(4).into() + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Ok(()).into() + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Ok(()).into() + } + } + + let mut wr = Wr { + buf: BytesMut::with_capacity(64), + cnt: 0, + }; + + let n = assert_ok!(wr.write(b"hello world").await); + assert_eq!(n, 4); + assert_eq!(wr.buf, b"hell"[..]); +} + +#[tokio::test] +async fn write_cursor() { + use std::io::Cursor; + + let mut wr = Cursor::new(Vec::new()); + + let n = assert_ok!(wr.write(b"hello world").await); + assert_eq!(n, 11); + assert_eq!(wr.get_ref().as_slice(), &b"hello world"[..]); +} diff --git a/third_party/rust/tokio/tests/io_write_all.rs b/third_party/rust/tokio/tests/io_write_all.rs new file mode 100644 index 0000000000..7ca02228a3 --- /dev/null +++ b/third_party/rust/tokio/tests/io_write_all.rs @@ -0,0 +1,51 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio_test::assert_ok; + +use bytes::BytesMut; +use std::cmp; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[tokio::test] +async fn write_all() { + struct Wr { + buf: BytesMut, + cnt: usize, + } + + impl AsyncWrite for Wr { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + let n = cmp::min(4, buf.len()); + let buf = &buf[0..n]; + + self.cnt += 1; + self.buf.extend(buf); + Ok(buf.len()).into() + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Ok(()).into() + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Ok(()).into() + } + } + + let mut wr = Wr { + buf: BytesMut::with_capacity(64), + cnt: 0, + }; + + assert_ok!(wr.write_all(b"hello world").await); + assert_eq!(wr.buf, b"hello world"[..]); + assert_eq!(wr.cnt, 3); +} diff --git a/third_party/rust/tokio/tests/io_write_int.rs b/third_party/rust/tokio/tests/io_write_int.rs new file mode 100644 index 0000000000..48a583d8c3 --- /dev/null +++ b/third_party/rust/tokio/tests/io_write_int.rs @@ -0,0 +1,37 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[tokio::test] +async fn write_int_should_err_if_write_count_0() { + struct Wr {} + + impl AsyncWrite for Wr { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + ) -> Poll<io::Result<usize>> { + Ok(0).into() + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Ok(()).into() + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Ok(()).into() + } + } + + let mut wr = Wr {}; + + // should be ok just to test these 2, other cases actually expanded by same macro. + assert!(wr.write_i8(0).await.is_err()); + assert!(wr.write_i32(12).await.is_err()); +} diff --git a/third_party/rust/tokio/tests/macros_join.rs b/third_party/rust/tokio/tests/macros_join.rs new file mode 100644 index 0000000000..d9b748d9a7 --- /dev/null +++ b/third_party/rust/tokio/tests/macros_join.rs @@ -0,0 +1,71 @@ +use tokio::sync::oneshot; +use tokio_test::{assert_pending, assert_ready, task}; + +#[tokio::test] +async fn sync_one_lit_expr_comma() { + let foo = tokio::join!(async { 1 },); + + assert_eq!(foo, (1,)); +} + +#[tokio::test] +async fn sync_one_lit_expr_no_comma() { + let foo = tokio::join!(async { 1 }); + + assert_eq!(foo, (1,)); +} + +#[tokio::test] +async fn sync_two_lit_expr_comma() { + let foo = tokio::join!(async { 1 }, async { 2 },); + + assert_eq!(foo, (1, 2)); +} + +#[tokio::test] +async fn sync_two_lit_expr_no_comma() { + let foo = tokio::join!(async { 1 }, async { 2 }); + + assert_eq!(foo, (1, 2)); +} + +#[tokio::test] +async fn two_await() { + let (tx1, rx1) = oneshot::channel::<&str>(); + let (tx2, rx2) = oneshot::channel::<u32>(); + + let mut join = task::spawn(async { + tokio::join!(async { rx1.await.unwrap() }, async { rx2.await.unwrap() }) + }); + + assert_pending!(join.poll()); + + tx2.send(123).unwrap(); + assert!(join.is_woken()); + assert_pending!(join.poll()); + + tx1.send("hello").unwrap(); + assert!(join.is_woken()); + let res = assert_ready!(join.poll()); + + assert_eq!(("hello", 123), res); +} + +#[test] +fn join_size() { + use futures::future; + use std::mem; + + let fut = async { + let ready = future::ready(0i32); + tokio::join!(ready) + }; + assert_eq!(mem::size_of_val(&fut), 16); + + let fut = async { + let ready1 = future::ready(0i32); + let ready2 = future::ready(0i32); + tokio::join!(ready1, ready2) + }; + assert_eq!(mem::size_of_val(&fut), 28); +} diff --git a/third_party/rust/tokio/tests/macros_pin.rs b/third_party/rust/tokio/tests/macros_pin.rs new file mode 100644 index 0000000000..da6e0be6ed --- /dev/null +++ b/third_party/rust/tokio/tests/macros_pin.rs @@ -0,0 +1,13 @@ +async fn one() {} +async fn two() {} + +#[tokio::test] +async fn multi_pin() { + tokio::pin! { + let f1 = one(); + let f2 = two(); + } + + (&mut f1).await; + (&mut f2).await; +} diff --git a/third_party/rust/tokio/tests/macros_select.rs b/third_party/rust/tokio/tests/macros_select.rs new file mode 100644 index 0000000000..c08e816a01 --- /dev/null +++ b/third_party/rust/tokio/tests/macros_select.rs @@ -0,0 +1,447 @@ +use tokio::sync::{mpsc, oneshot}; +use tokio::task; +use tokio_test::{assert_ok, assert_pending, assert_ready}; + +use futures::future::poll_fn; +use std::task::Poll::Ready; + +#[tokio::test] +async fn sync_one_lit_expr_comma() { + let foo = tokio::select! { + foo = async { 1 } => foo, + }; + + assert_eq!(foo, 1); +} + +#[tokio::test] +async fn nested_one() { + let foo = tokio::select! { + foo = async { 1 } => tokio::select! { + bar = async { foo } => bar, + }, + }; + + assert_eq!(foo, 1); +} + +#[tokio::test] +async fn sync_one_lit_expr_no_comma() { + let foo = tokio::select! { + foo = async { 1 } => foo + }; + + assert_eq!(foo, 1); +} + +#[tokio::test] +async fn sync_one_lit_expr_block() { + let foo = tokio::select! { + foo = async { 1 } => { foo } + }; + + assert_eq!(foo, 1); +} + +#[tokio::test] +async fn sync_one_await() { + let foo = tokio::select! { + foo = one() => foo, + }; + + assert_eq!(foo, 1); +} + +#[tokio::test] +async fn sync_one_ident() { + let one = one(); + + let foo = tokio::select! { + foo = one => foo, + }; + + assert_eq!(foo, 1); +} + +#[tokio::test] +async fn sync_two() { + use std::cell::Cell; + + let cnt = Cell::new(0); + + let res = tokio::select! { + foo = async { + cnt.set(cnt.get() + 1); + 1 + } => foo, + bar = async { + cnt.set(cnt.get() + 1); + 2 + } => bar, + }; + + assert_eq!(1, cnt.get()); + assert!(res == 1 || res == 2); +} + +#[tokio::test] +async fn drop_in_fut() { + let s = "hello".to_string(); + + let res = tokio::select! { + foo = async { + let v = one().await; + drop(s); + v + } => foo + }; + + assert_eq!(res, 1); +} + +#[tokio::test] +async fn one_ready() { + let (tx1, rx1) = oneshot::channel::<i32>(); + let (_tx2, rx2) = oneshot::channel::<i32>(); + + tx1.send(1).unwrap(); + + let v = tokio::select! { + res = rx1 => { + assert_ok!(res) + }, + _ = rx2 => unreachable!(), + }; + + assert_eq!(1, v); +} + +#[tokio::test] +async fn select_streams() { + let (tx1, mut rx1) = mpsc::unbounded_channel::<i32>(); + let (tx2, mut rx2) = mpsc::unbounded_channel::<i32>(); + + tokio::spawn(async move { + assert_ok!(tx2.send(1)); + task::yield_now().await; + + assert_ok!(tx1.send(2)); + task::yield_now().await; + + assert_ok!(tx2.send(3)); + task::yield_now().await; + + drop((tx1, tx2)); + }); + + let mut rem = true; + let mut msgs = vec![]; + + while rem { + tokio::select! { + Some(x) = rx1.recv() => { + msgs.push(x); + } + Some(y) = rx2.recv() => { + msgs.push(y); + } + else => { + rem = false; + } + } + } + + msgs.sort(); + assert_eq!(&msgs[..], &[1, 2, 3]); +} + +#[tokio::test] +async fn move_uncompleted_futures() { + let (tx1, mut rx1) = oneshot::channel::<i32>(); + let (tx2, mut rx2) = oneshot::channel::<i32>(); + + tx1.send(1).unwrap(); + tx2.send(2).unwrap(); + + let ran; + + tokio::select! { + res = &mut rx1 => { + assert_eq!(1, assert_ok!(res)); + assert_eq!(2, assert_ok!(rx2.await)); + ran = true; + }, + res = &mut rx2 => { + assert_eq!(2, assert_ok!(res)); + assert_eq!(1, assert_ok!(rx1.await)); + ran = true; + }, + } + + assert!(ran); +} + +#[tokio::test] +async fn nested() { + let res = tokio::select! { + x = async { 1 } => { + tokio::select! { + y = async { 2 } => x + y, + } + } + }; + + assert_eq!(res, 3); +} + +#[tokio::test] +async fn struct_size() { + use futures::future; + use std::mem; + + let fut = async { + let ready = future::ready(0i32); + + tokio::select! { + _ = ready => {}, + } + }; + + assert!(mem::size_of_val(&fut) <= 32); + + let fut = async { + let ready1 = future::ready(0i32); + let ready2 = future::ready(0i32); + + tokio::select! { + _ = ready1 => {}, + _ = ready2 => {}, + } + }; + + assert!(mem::size_of_val(&fut) <= 40); + + let fut = async { + let ready1 = future::ready(0i32); + let ready2 = future::ready(0i32); + let ready3 = future::ready(0i32); + + tokio::select! { + _ = ready1 => {}, + _ = ready2 => {}, + _ = ready3 => {}, + } + }; + + assert!(mem::size_of_val(&fut) <= 48); +} + +#[tokio::test] +async fn mutable_borrowing_future_with_same_borrow_in_block() { + let mut value = 234; + + tokio::select! { + _ = require_mutable(&mut value) => { }, + _ = async_noop() => { + value += 5; + }, + } + + assert!(value >= 234); +} + +#[tokio::test] +async fn mutable_borrowing_future_with_same_borrow_in_block_and_else() { + let mut value = 234; + + tokio::select! { + _ = require_mutable(&mut value) => { }, + _ = async_noop() => { + value += 5; + }, + else => { + value += 27; + }, + } + + assert!(value >= 234); +} + +#[tokio::test] +async fn future_panics_after_poll() { + use tokio_test::task; + + let (tx, rx) = oneshot::channel(); + + let mut polled = false; + + let f = poll_fn(|_| { + assert!(!polled); + polled = true; + Ready(None::<()>) + }); + + let mut f = task::spawn(async { + tokio::select! { + Some(_) = f => unreachable!(), + ret = rx => ret.unwrap(), + } + }); + + assert_pending!(f.poll()); + assert_pending!(f.poll()); + + assert_ok!(tx.send(1)); + + let res = assert_ready!(f.poll()); + assert_eq!(1, res); +} + +#[tokio::test] +async fn disable_with_if() { + use tokio_test::task; + + let f = poll_fn(|_| panic!()); + let (tx, rx) = oneshot::channel(); + + let mut f = task::spawn(async { + tokio::select! { + _ = f, if false => unreachable!(), + _ = rx => (), + } + }); + + assert_pending!(f.poll()); + + assert_ok!(tx.send(())); + assert!(f.is_woken()); + + assert_ready!(f.poll()); +} + +#[tokio::test] +async fn join_with_select() { + use tokio_test::task; + + let (tx1, mut rx1) = oneshot::channel(); + let (tx2, mut rx2) = oneshot::channel(); + + let mut f = task::spawn(async { + let mut a = None; + let mut b = None; + + while a.is_none() || b.is_none() { + tokio::select! { + v1 = &mut rx1, if a.is_none() => a = Some(assert_ok!(v1)), + v2 = &mut rx2, if b.is_none() => b = Some(assert_ok!(v2)) + } + } + + (a.unwrap(), b.unwrap()) + }); + + assert_pending!(f.poll()); + + assert_ok!(tx1.send(123)); + assert!(f.is_woken()); + assert_pending!(f.poll()); + + assert_ok!(tx2.send(456)); + assert!(f.is_woken()); + let (a, b) = assert_ready!(f.poll()); + + assert_eq!(a, 123); + assert_eq!(b, 456); +} + +#[tokio::test] +async fn use_future_in_if_condition() { + use tokio::time::{self, Duration}; + + let mut delay = time::delay_for(Duration::from_millis(50)); + + tokio::select! { + _ = &mut delay, if !delay.is_elapsed() => { + } + _ = async { 1 } => { + } + } +} + +#[tokio::test] +async fn many_branches() { + let num = tokio::select! { + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + x = async { 1 } => x, + }; + + assert_eq!(1, num); +} + +async fn one() -> usize { + 1 +} + +async fn require_mutable(_: &mut i32) {} +async fn async_noop() {} diff --git a/third_party/rust/tokio/tests/macros_try_join.rs b/third_party/rust/tokio/tests/macros_try_join.rs new file mode 100644 index 0000000000..faa55421a2 --- /dev/null +++ b/third_party/rust/tokio/tests/macros_try_join.rs @@ -0,0 +1,100 @@ +use tokio::sync::oneshot; +use tokio_test::{assert_pending, assert_ready, task}; + +#[tokio::test] +async fn sync_one_lit_expr_comma() { + let foo = tokio::try_join!(async { ok(1) },); + + assert_eq!(foo, Ok((1,))); +} + +#[tokio::test] +async fn sync_one_lit_expr_no_comma() { + let foo = tokio::try_join!(async { ok(1) }); + + assert_eq!(foo, Ok((1,))); +} + +#[tokio::test] +async fn sync_two_lit_expr_comma() { + let foo = tokio::try_join!(async { ok(1) }, async { ok(2) },); + + assert_eq!(foo, Ok((1, 2))); +} + +#[tokio::test] +async fn sync_two_lit_expr_no_comma() { + let foo = tokio::try_join!(async { ok(1) }, async { ok(2) }); + + assert_eq!(foo, Ok((1, 2))); +} + +#[tokio::test] +async fn two_await() { + let (tx1, rx1) = oneshot::channel::<&str>(); + let (tx2, rx2) = oneshot::channel::<u32>(); + + let mut join = + task::spawn(async { tokio::try_join!(async { rx1.await }, async { rx2.await }) }); + + assert_pending!(join.poll()); + + tx2.send(123).unwrap(); + assert!(join.is_woken()); + assert_pending!(join.poll()); + + tx1.send("hello").unwrap(); + assert!(join.is_woken()); + let res: Result<(&str, u32), _> = assert_ready!(join.poll()); + + assert_eq!(Ok(("hello", 123)), res); +} + +#[tokio::test] +async fn err_abort_early() { + let (tx1, rx1) = oneshot::channel::<&str>(); + let (tx2, rx2) = oneshot::channel::<u32>(); + let (_tx3, rx3) = oneshot::channel::<u32>(); + + let mut join = task::spawn(async { + tokio::try_join!(async { rx1.await }, async { rx2.await }, async { + rx3.await + }) + }); + + assert_pending!(join.poll()); + + tx2.send(123).unwrap(); + assert!(join.is_woken()); + assert_pending!(join.poll()); + + drop(tx1); + assert!(join.is_woken()); + + let res = assert_ready!(join.poll()); + + assert!(res.is_err()); +} + +#[test] +fn join_size() { + use futures::future; + use std::mem; + + let fut = async { + let ready = future::ready(ok(0i32)); + tokio::try_join!(ready) + }; + assert_eq!(mem::size_of_val(&fut), 16); + + let fut = async { + let ready1 = future::ready(ok(0i32)); + let ready2 = future::ready(ok(0i32)); + tokio::try_join!(ready1, ready2) + }; + assert_eq!(mem::size_of_val(&fut), 28); +} + +fn ok<T>(val: T) -> Result<T, ()> { + Ok(val) +} diff --git a/third_party/rust/tokio/tests/net_bind_resource.rs b/third_party/rust/tokio/tests/net_bind_resource.rs new file mode 100644 index 0000000000..d4a0b8dab0 --- /dev/null +++ b/third_party/rust/tokio/tests/net_bind_resource.rs @@ -0,0 +1,14 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::net::TcpListener; + +use std::convert::TryFrom; +use std::net; + +#[test] +#[should_panic] +fn no_runtime_panics_binding_net_tcp_listener() { + let listener = net::TcpListener::bind("127.0.0.1:0").expect("failed to bind listener"); + let _ = TcpListener::try_from(listener); +} diff --git a/third_party/rust/tokio/tests/net_lookup_host.rs b/third_party/rust/tokio/tests/net_lookup_host.rs new file mode 100644 index 0000000000..4d06402988 --- /dev/null +++ b/third_party/rust/tokio/tests/net_lookup_host.rs @@ -0,0 +1,36 @@ +use tokio::net; +use tokio_test::assert_ok; + +use std::io; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + +#[tokio::test] +async fn lookup_socket_addr() { + let addr: SocketAddr = "127.0.0.1:8000".parse().unwrap(); + + let actual = assert_ok!(net::lookup_host(addr).await).collect::<Vec<_>>(); + assert_eq!(vec![addr], actual); +} + +#[tokio::test] +async fn lookup_str_socket_addr() { + let addr: SocketAddr = "127.0.0.1:8000".parse().unwrap(); + + let actual = assert_ok!(net::lookup_host("127.0.0.1:8000").await).collect::<Vec<_>>(); + assert_eq!(vec![addr], actual); +} + +#[tokio::test] +async fn resolve_dns() -> io::Result<()> { + let mut hosts = net::lookup_host("localhost:3000").await?; + let host = hosts.next().unwrap(); + + let expected = if host.is_ipv4() { + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 3000) + } else { + SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 3000) + }; + assert_eq!(host, expected); + + Ok(()) +} diff --git a/third_party/rust/tokio/tests/no_rt.rs b/third_party/rust/tokio/tests/no_rt.rs new file mode 100644 index 0000000000..962eed7952 --- /dev/null +++ b/third_party/rust/tokio/tests/no_rt.rs @@ -0,0 +1,27 @@ +use tokio::net::TcpStream; +use tokio::sync::oneshot; +use tokio::time::{timeout, Duration}; + +use futures::executor::block_on; + +use std::net::TcpListener; + +#[test] +#[should_panic(expected = "no timer running")] +fn panics_when_no_timer() { + block_on(timeout_value()); +} + +#[test] +#[should_panic(expected = "no reactor running")] +fn panics_when_no_reactor() { + let srv = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = srv.local_addr().unwrap(); + block_on(TcpStream::connect(&addr)).unwrap(); +} + +async fn timeout_value() { + let (_tx, rx) = oneshot::channel::<()>(); + let dur = Duration::from_millis(20); + let _ = timeout(dur, rx).await; +} diff --git a/third_party/rust/tokio/tests/process_issue_2174.rs b/third_party/rust/tokio/tests/process_issue_2174.rs new file mode 100644 index 0000000000..b5a63ceee8 --- /dev/null +++ b/third_party/rust/tokio/tests/process_issue_2174.rs @@ -0,0 +1,46 @@ +#![cfg(feature = "process")] +#![warn(rust_2018_idioms)] +// This test reveals a difference in behavior of kqueue on FreeBSD. When the +// reader disconnects, there does not seem to be an `EVFILT_WRITE` filter that +// is returned. +// +// It is expected that `EVFILT_WRITE` would be returned with either the +// `EV_EOF` or `EV_ERROR` flag set. If either flag is set a write would be +// attempted, but that does not seem to occur. +#![cfg(all(unix, not(target_os = "freebsd")))] + +use std::process::Stdio; +use std::time::Duration; +use tokio::prelude::*; +use tokio::process::Command; +use tokio::time; +use tokio_test::assert_err; + +#[tokio::test] +async fn issue_2174() { + let mut child = Command::new("sleep") + .arg("2") + .stdin(Stdio::piped()) + .stdout(Stdio::null()) + .spawn() + .unwrap(); + let mut input = child.stdin.take().unwrap(); + + // Writes will buffer up to 65_636. This *should* loop at least 8 times + // and then register interest. + let handle = tokio::spawn(async move { + let data = [0u8; 8192]; + loop { + input.write_all(&data).await.unwrap(); + } + }); + + // Sleep enough time so that the child process's stdin's buffer fills. + time::delay_for(Duration::from_secs(1)).await; + + // Kill the child process. + child.kill().unwrap(); + let _ = child.await; + + assert_err!(handle.await); +} diff --git a/third_party/rust/tokio/tests/process_issue_42.rs b/third_party/rust/tokio/tests/process_issue_42.rs new file mode 100644 index 0000000000..aa70af3b56 --- /dev/null +++ b/third_party/rust/tokio/tests/process_issue_42.rs @@ -0,0 +1,36 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(unix)] + +use futures::future::join_all; +use std::process::Stdio; +use tokio::process::Command; +use tokio::task; + +#[tokio::test] +async fn issue_42() { + // We spawn a many batches of processes which should exit at roughly the + // same time (modulo OS scheduling delays), to make sure that consuming + // a readiness event for one process doesn't inadvertently starve another. + // We then do this many times (in parallel) in an effort to stress test the + // implementation to ensure there are no race conditions. + // See alexcrichton/tokio-process#42 for background + let join_handles = (0..10usize).map(|_| { + task::spawn(async { + let processes = (0..10usize).map(|i| { + Command::new("echo") + .arg(format!("I am spawned process #{}", i)) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .kill_on_drop(true) + .spawn() + .unwrap() + }); + + join_all(processes).await; + }) + }); + + join_all(join_handles).await; +} diff --git a/third_party/rust/tokio/tests/process_kill_on_drop.rs b/third_party/rust/tokio/tests/process_kill_on_drop.rs new file mode 100644 index 0000000000..f376c15475 --- /dev/null +++ b/third_party/rust/tokio/tests/process_kill_on_drop.rs @@ -0,0 +1,42 @@ +#![cfg(all(unix, feature = "process"))] +#![warn(rust_2018_idioms)] + +use std::process::Stdio; +use std::time::Duration; +use tokio::io::AsyncReadExt; +use tokio::process::Command; +use tokio::time::delay_for; +use tokio_test::assert_ok; + +#[tokio::test] +async fn kill_on_drop() { + let mut cmd = Command::new("sh"); + cmd.args(&[ + "-c", + " + # Fork another child that won't get killed + sh -c 'sleep 1; echo child ran' & + disown -a + + # Await our death + sleep 5 + echo hello from beyond the grave + ", + ]); + + let mut child = cmd + .kill_on_drop(true) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + delay_for(Duration::from_secs(2)).await; + + let mut out = child.stdout.take().unwrap(); + drop(child); + + let mut msg = String::new(); + assert_ok!(out.read_to_string(&mut msg).await); + + assert_eq!("child ran\n", msg); +} diff --git a/third_party/rust/tokio/tests/process_smoke.rs b/third_party/rust/tokio/tests/process_smoke.rs new file mode 100644 index 0000000000..d16d1d72c1 --- /dev/null +++ b/third_party/rust/tokio/tests/process_smoke.rs @@ -0,0 +1,29 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::process::Command; +use tokio_test::assert_ok; + +#[tokio::test] +async fn simple() { + let mut cmd; + + if cfg!(windows) { + cmd = Command::new("cmd"); + cmd.arg("/c"); + } else { + cmd = Command::new("sh"); + cmd.arg("-c"); + } + + let mut child = cmd.arg("exit 2").spawn().unwrap(); + + let id = child.id(); + assert!(id > 0); + + let status = assert_ok!((&mut child).await); + assert_eq!(status.code(), Some(2)); + + assert_eq!(child.id(), id); + drop(child.kill()); +} diff --git a/third_party/rust/tokio/tests/rt_basic.rs b/third_party/rust/tokio/tests/rt_basic.rs new file mode 100644 index 0000000000..b9e373b88f --- /dev/null +++ b/third_party/rust/tokio/tests/rt_basic.rs @@ -0,0 +1,135 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::runtime::Runtime; +use tokio::sync::{mpsc, oneshot}; +use tokio_test::{assert_err, assert_ok}; + +use std::thread; +use std::time::Duration; + +#[test] +fn spawned_task_does_not_progress_without_block_on() { + let (tx, mut rx) = oneshot::channel(); + + let mut rt = rt(); + + rt.spawn(async move { + assert_ok!(tx.send("hello")); + }); + + thread::sleep(Duration::from_millis(50)); + + assert_err!(rx.try_recv()); + + let out = rt.block_on(async { assert_ok!(rx.await) }); + + assert_eq!(out, "hello"); +} + +#[test] +fn no_extra_poll() { + use std::pin::Pin; + use std::sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, + }; + use std::task::{Context, Poll}; + use tokio::stream::{Stream, StreamExt}; + + struct TrackPolls<S> { + npolls: Arc<AtomicUsize>, + s: S, + } + + impl<S> Stream for TrackPolls<S> + where + S: Stream, + { + type Item = S::Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + // safety: we do not move s + let this = unsafe { self.get_unchecked_mut() }; + this.npolls.fetch_add(1, SeqCst); + // safety: we are pinned, and so is s + unsafe { Pin::new_unchecked(&mut this.s) }.poll_next(cx) + } + } + + let (tx, rx) = mpsc::unbounded_channel(); + let mut rx = TrackPolls { + npolls: Arc::new(AtomicUsize::new(0)), + s: rx, + }; + let npolls = Arc::clone(&rx.npolls); + + let mut rt = rt(); + + rt.spawn(async move { while let Some(_) = rx.next().await {} }); + rt.block_on(async { + tokio::task::yield_now().await; + }); + + // should have been polled exactly once: the initial poll + assert_eq!(npolls.load(SeqCst), 1); + + tx.send(()).unwrap(); + rt.block_on(async { + tokio::task::yield_now().await; + }); + + // should have been polled twice more: once to yield Some(), then once to yield Pending + assert_eq!(npolls.load(SeqCst), 1 + 2); + + drop(tx); + rt.block_on(async { + tokio::task::yield_now().await; + }); + + // should have been polled once more: to yield None + assert_eq!(npolls.load(SeqCst), 1 + 2 + 1); +} + +#[test] +fn acquire_mutex_in_drop() { + use futures::future::pending; + use tokio::task; + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + let mut rt = rt(); + + rt.spawn(async move { + let _ = rx2.await; + unreachable!(); + }); + + rt.spawn(async move { + let _ = rx1.await; + tx2.send(()).unwrap(); + unreachable!(); + }); + + // Spawn a task that will never notify + rt.spawn(async move { + pending::<()>().await; + tx1.send(()).unwrap(); + }); + + // Tick the loop + rt.block_on(async { + task::yield_now().await; + }); + + // Drop the rt + drop(rt); +} + +fn rt() -> Runtime { + tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap() +} diff --git a/third_party/rust/tokio/tests/rt_common.rs b/third_party/rust/tokio/tests/rt_common.rs new file mode 100644 index 0000000000..8dc0da3c5a --- /dev/null +++ b/third_party/rust/tokio/tests/rt_common.rs @@ -0,0 +1,1009 @@ +#![allow(clippy::needless_range_loop)] +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +// Tests to run on both current-thread & therad-pool runtime variants. + +macro_rules! rt_test { + ($($t:tt)*) => { + mod basic_scheduler { + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_4_threads { + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new() + .threaded_scheduler() + .core_threads(4) + .enable_all() + .build() + .unwrap() + } + } + + mod threaded_scheduler_1_thread { + $($t)* + + fn rt() -> Runtime { + tokio::runtime::Builder::new() + .threaded_scheduler() + .core_threads(1) + .enable_all() + .build() + .unwrap() + } + } + } +} + +#[test] +fn send_sync_bound() { + use tokio::runtime::Runtime; + fn is_send<T: Send + Sync>() {} + + is_send::<Runtime>(); +} + +rt_test! { + use tokio::net::{TcpListener, TcpStream, UdpSocket}; + use tokio::prelude::*; + use tokio::runtime::Runtime; + use tokio::sync::oneshot; + use tokio::{task, time}; + use tokio_test::{assert_err, assert_ok}; + + use futures::future::poll_fn; + use std::future::Future; + use std::pin::Pin; + use std::sync::{mpsc, Arc}; + use std::task::{Context, Poll}; + use std::thread; + use std::time::{Duration, Instant}; + + #[test] + fn block_on_sync() { + let mut rt = rt(); + + let mut win = false; + rt.block_on(async { + win = true; + }); + + assert!(win); + } + + #[test] + fn block_on_async() { + let mut rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(50)); + tx.send("ZOMG").unwrap(); + }); + + assert_ok!(rx.await) + }); + + assert_eq!(out, "ZOMG"); + } + + #[test] + fn spawn_one_bg() { + let mut rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + tx.send("ZOMG").unwrap(); + }); + + assert_ok!(rx.await) + }); + + assert_eq!(out, "ZOMG"); + } + + #[test] + fn spawn_one_join() { + let mut rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + let handle = tokio::spawn(async move { + tx.send("ZOMG").unwrap(); + "DONE" + }); + + let msg = assert_ok!(rx.await); + + let out = assert_ok!(handle.await); + assert_eq!(out, "DONE"); + + msg + }); + + assert_eq!(out, "ZOMG"); + } + + #[test] + fn spawn_two() { + let mut rt = rt(); + + let out = rt.block_on(async { + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + tokio::spawn(async move { + assert_ok!(tx1.send("ZOMG")); + }); + + tokio::spawn(async move { + let msg = assert_ok!(rx1.await); + assert_ok!(tx2.send(msg)); + }); + + assert_ok!(rx2.await) + }); + + assert_eq!(out, "ZOMG"); + } + + #[test] + fn spawn_many_from_block_on() { + use tokio::sync::mpsc; + + const ITER: usize = 200; + + let mut rt = rt(); + + let out = rt.block_on(async { + let (done_tx, mut done_rx) = mpsc::unbounded_channel(); + + let mut txs = (0..ITER) + .map(|i| { + let (tx, rx) = oneshot::channel(); + let done_tx = done_tx.clone(); + + tokio::spawn(async move { + let msg = assert_ok!(rx.await); + assert_eq!(i, msg); + assert_ok!(done_tx.send(msg)); + }); + + tx + }) + .collect::<Vec<_>>(); + + drop(done_tx); + + thread::spawn(move || { + for (i, tx) in txs.drain(..).enumerate() { + assert_ok!(tx.send(i)); + } + }); + + let mut out = vec![]; + while let Some(i) = done_rx.recv().await { + out.push(i); + } + + out.sort(); + out + }); + + assert_eq!(ITER, out.len()); + + for i in 0..ITER { + assert_eq!(i, out[i]); + } + } + + #[test] + fn spawn_many_from_task() { + use tokio::sync::mpsc; + + const ITER: usize = 500; + + let mut rt = rt(); + + let out = rt.block_on(async { + tokio::spawn(async move { + let (done_tx, mut done_rx) = mpsc::unbounded_channel(); + + /* + for _ in 0..100 { + tokio::spawn(async move { }); + } + + tokio::task::yield_now().await; + */ + + let mut txs = (0..ITER) + .map(|i| { + let (tx, rx) = oneshot::channel(); + let done_tx = done_tx.clone(); + + tokio::spawn(async move { + let msg = assert_ok!(rx.await); + assert_eq!(i, msg); + assert_ok!(done_tx.send(msg)); + }); + + tx + }) + .collect::<Vec<_>>(); + + drop(done_tx); + + thread::spawn(move || { + for (i, tx) in txs.drain(..).enumerate() { + assert_ok!(tx.send(i)); + } + }); + + let mut out = vec![]; + while let Some(i) = done_rx.recv().await { + out.push(i); + } + + out.sort(); + out + }).await.unwrap() + }); + + assert_eq!(ITER, out.len()); + + for i in 0..ITER { + assert_eq!(i, out[i]); + } + } + + #[test] + fn spawn_await_chain() { + let mut rt = rt(); + + let out = rt.block_on(async { + assert_ok!(tokio::spawn(async { + assert_ok!(tokio::spawn(async { + "hello" + }).await) + }).await) + }); + + assert_eq!(out, "hello"); + } + + #[test] + fn outstanding_tasks_dropped() { + let mut rt = rt(); + + let cnt = Arc::new(()); + + rt.block_on(async { + let cnt = cnt.clone(); + + tokio::spawn(poll_fn(move |_| { + assert_eq!(2, Arc::strong_count(&cnt)); + Poll::<()>::Pending + })); + }); + + assert_eq!(2, Arc::strong_count(&cnt)); + + drop(rt); + + assert_eq!(1, Arc::strong_count(&cnt)); + } + + #[test] + #[should_panic] + fn nested_rt() { + let mut rt1 = rt(); + let mut rt2 = rt(); + + rt1.block_on(async { rt2.block_on(async { "hello" }) }); + } + + #[test] + fn create_rt_in_block_on() { + let mut rt1 = rt(); + let mut rt2 = rt1.block_on(async { rt() }); + let out = rt2.block_on(async { "ZOMG" }); + + assert_eq!(out, "ZOMG"); + } + + #[test] + fn complete_block_on_under_load() { + let mut rt = rt(); + + rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + // Spin hard + tokio::spawn(async { + loop { + yield_once().await; + } + }); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(50)); + assert_ok!(tx.send(())); + }); + + assert_ok!(rx.await); + }); + } + + #[test] + fn complete_task_under_load() { + let mut rt = rt(); + + rt.block_on(async { + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + // Spin hard + tokio::spawn(async { + loop { + yield_once().await; + } + }); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(50)); + assert_ok!(tx1.send(())); + }); + + tokio::spawn(async move { + assert_ok!(rx1.await); + assert_ok!(tx2.send(())); + }); + + assert_ok!(rx2.await); + }); + } + + #[test] + fn spawn_from_other_thread_idle() { + let mut rt = rt(); + let handle = rt.handle().clone(); + + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(50)); + + handle.spawn(async move { + assert_ok!(tx.send(())); + }); + }); + + rt.block_on(async move { + assert_ok!(rx.await); + }); + } + + #[test] + fn spawn_from_other_thread_under_load() { + let mut rt = rt(); + let handle = rt.handle().clone(); + + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + handle.spawn(async move { + assert_ok!(tx.send(())); + }); + }); + + rt.block_on(async move { + // Spin hard + tokio::spawn(async { + loop { + yield_once().await; + } + }); + + assert_ok!(rx.await); + }); + } + + #[test] + fn delay_at_root() { + let mut rt = rt(); + + let now = Instant::now(); + let dur = Duration::from_millis(50); + + rt.block_on(async move { + time::delay_for(dur).await; + }); + + assert!(now.elapsed() >= dur); + } + + #[test] + fn delay_in_spawn() { + let mut rt = rt(); + + let now = Instant::now(); + let dur = Duration::from_millis(50); + + rt.block_on(async move { + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + time::delay_for(dur).await; + assert_ok!(tx.send(())); + }); + + assert_ok!(rx.await); + }); + + assert!(now.elapsed() >= dur); + } + + #[test] + fn block_on_socket() { + let mut rt = rt(); + + rt.block_on(async move { + let (tx, rx) = oneshot::channel(); + + let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + let _ = listener.accept().await; + tx.send(()).unwrap(); + }); + + TcpStream::connect(&addr).await.unwrap(); + rx.await.unwrap(); + }); + } + + #[test] + fn spawn_from_blocking() { + let mut rt = rt(); + + let out = rt.block_on(async move { + let inner = assert_ok!(tokio::task::spawn_blocking(|| { + tokio::spawn(async move { "hello" }) + }).await); + + assert_ok!(inner.await) + }); + + assert_eq!(out, "hello") + } + + #[test] + fn spawn_blocking_from_blocking() { + let mut rt = rt(); + + let out = rt.block_on(async move { + let inner = assert_ok!(tokio::task::spawn_blocking(|| { + tokio::task::spawn_blocking(|| "hello") + }).await); + + assert_ok!(inner.await) + }); + + assert_eq!(out, "hello") + } + + #[test] + fn delay_from_blocking() { + let mut rt = rt(); + + rt.block_on(async move { + assert_ok!(tokio::task::spawn_blocking(|| { + let now = std::time::Instant::now(); + let dur = Duration::from_millis(1); + + // use the futures' block_on fn to make sure we aren't setting + // any Tokio context + futures::executor::block_on(async { + tokio::time::delay_for(dur).await; + }); + + assert!(now.elapsed() >= dur); + }).await); + }); + } + + #[test] + fn socket_from_blocking() { + let mut rt = rt(); + + rt.block_on(async move { + let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(listener.local_addr()); + + let peer = tokio::task::spawn_blocking(move || { + // use the futures' block_on fn to make sure we aren't setting + // any Tokio context + futures::executor::block_on(async { + assert_ok!(TcpStream::connect(addr).await); + }); + }); + + // Wait for the client to connect + let _ = assert_ok!(listener.accept().await); + + assert_ok!(peer.await); + }); + } + + #[test] + fn spawn_blocking_after_shutdown() { + let rt = rt(); + let handle = rt.handle().clone(); + + // Shutdown + drop(rt); + + handle.enter(|| { + let res = task::spawn_blocking(|| unreachable!()); + + // Avoid using a tokio runtime + let out = futures::executor::block_on(res); + assert!(out.is_err()); + }); + } + + #[test] + fn io_driver_called_when_under_load() { + let mut rt = rt(); + + // Create a lot of constant load. The scheduler will always be busy. + for _ in 0..100 { + rt.spawn(async { + loop { + tokio::task::yield_now().await; + } + }); + } + + // Do some I/O work + rt.block_on(async { + let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(listener.local_addr()); + + let srv = tokio::spawn(async move { + let (mut stream, _) = assert_ok!(listener.accept().await); + assert_ok!(stream.write_all(b"hello world").await); + }); + + let cli = tokio::spawn(async move { + let mut stream = assert_ok!(TcpStream::connect(addr).await); + let mut dst = vec![0; 11]; + + assert_ok!(stream.read_exact(&mut dst).await); + assert_eq!(dst, b"hello world"); + }); + + assert_ok!(srv.await); + assert_ok!(cli.await); + }); + } + + #[test] + fn client_server_block_on() { + let mut rt = rt(); + let (tx, rx) = mpsc::channel(); + + rt.block_on(async move { client_server(tx).await }); + + assert_ok!(rx.try_recv()); + assert_err!(rx.try_recv()); + } + + #[test] + fn panic_in_task() { + let mut rt = rt(); + let (tx, rx) = oneshot::channel(); + + struct Boom(Option<oneshot::Sender<()>>); + + impl Future for Boom { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + panic!(); + } + } + + impl Drop for Boom { + fn drop(&mut self) { + assert!(std::thread::panicking()); + self.0.take().unwrap().send(()).unwrap(); + } + } + + rt.spawn(Boom(Some(tx))); + assert_ok!(rt.block_on(rx)); + } + + #[test] + #[should_panic] + fn panic_in_block_on() { + let mut rt = rt(); + rt.block_on(async { panic!() }); + } + + async fn yield_once() { + let mut yielded = false; + poll_fn(|cx| { + if yielded { + Poll::Ready(()) + } else { + yielded = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + }) + .await + } + + #[test] + fn enter_and_spawn() { + let mut rt = rt(); + let handle = rt.enter(|| { + tokio::spawn(async {}) + }); + + assert_ok!(rt.block_on(handle)); + } + + #[test] + fn eagerly_drops_futures_on_shutdown() { + use std::sync::mpsc; + + struct Never { + drop_tx: mpsc::Sender<()>, + } + + impl Future for Never { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + Poll::Pending + } + } + + impl Drop for Never { + fn drop(&mut self) { + self.drop_tx.send(()).unwrap(); + } + } + + let mut rt = rt(); + + let (drop_tx, drop_rx) = mpsc::channel(); + let (run_tx, run_rx) = oneshot::channel(); + + rt.block_on(async move { + tokio::spawn(async move { + assert_ok!(run_tx.send(())); + + Never { drop_tx }.await + }); + + assert_ok!(run_rx.await); + }); + + drop(rt); + + assert_ok!(drop_rx.recv()); + } + + #[test] + fn wake_while_rt_is_dropping() { + use tokio::task; + + struct OnDrop<F: FnMut()>(F); + + impl<F: FnMut()> Drop for OnDrop<F> { + fn drop(&mut self) { + (self.0)() + } + } + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let (tx3, rx3) = oneshot::channel(); + + let mut rt = rt(); + + let h1 = rt.handle().clone(); + + rt.handle().spawn(async move { + // Ensure a waker gets stored in oneshot 1. + let _ = rx1.await; + tx3.send(()).unwrap(); + }); + + rt.handle().spawn(async move { + // When this task is dropped, we'll be "closing remotes". + // We spawn a new task that owns the `tx1`, to move its Drop + // out of here. + // + // Importantly, the oneshot 1 has a waker already stored, so + // the eventual drop here will try to re-schedule again. + let mut opt_tx1 = Some(tx1); + let _d = OnDrop(move || { + let tx1 = opt_tx1.take().unwrap(); + h1.spawn(async move { + tx1.send(()).unwrap(); + }); + }); + let _ = rx2.await; + }); + + rt.handle().spawn(async move { + let _ = rx3.await; + // We'll never get here, but once task 3 drops, this will + // force task 2 to re-schedule since it's waiting on oneshot 2. + tx2.send(()).unwrap(); + }); + + // Tick the loop + rt.block_on(async { + task::yield_now().await; + }); + + // Drop the rt + drop(rt); + } + + #[test] + fn io_notify_while_shutting_down() { + use std::net::Ipv6Addr; + + for _ in 1..10 { + let mut runtime = rt(); + + runtime.block_on(async { + let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); + let addr = socket.local_addr().unwrap(); + let (mut recv_half, mut send_half) = socket.split(); + + tokio::spawn(async move { + let mut buf = [0]; + loop { + recv_half.recv_from(&mut buf).await.unwrap(); + std::thread::sleep(Duration::from_millis(2)); + } + }); + + tokio::spawn(async move { + let buf = [0]; + loop { + send_half.send_to(&buf, &addr).await.unwrap(); + tokio::time::delay_for(Duration::from_millis(1)).await; + } + }); + + tokio::time::delay_for(Duration::from_millis(5)).await; + }); + } + } + + #[test] + fn shutdown_timeout() { + let (tx, rx) = oneshot::channel(); + let mut runtime = rt(); + + runtime.block_on(async move { + task::spawn_blocking(move || { + tx.send(()).unwrap(); + thread::sleep(Duration::from_secs(10_000)); + }); + + rx.await.unwrap(); + }); + + runtime.shutdown_timeout(Duration::from_millis(100)); + } + + #[test] + fn runtime_in_thread_local() { + use std::cell::RefCell; + use std::thread; + + thread_local!( + static R: RefCell<Option<Runtime>> = RefCell::new(None); + ); + + thread::spawn(|| { + R.with(|cell| { + *cell.borrow_mut() = Some(rt()); + }); + + let _rt = rt(); + }).join().unwrap(); + } + + async fn client_server(tx: mpsc::Sender<()>) { + let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + + // Get the assigned address + let addr = assert_ok!(server.local_addr()); + + // Spawn the server + tokio::spawn(async move { + // Accept a socket + let (mut socket, _) = server.accept().await.unwrap(); + + // Write some data + socket.write_all(b"hello").await.unwrap(); + }); + + let mut client = TcpStream::connect(&addr).await.unwrap(); + + let mut buf = vec![]; + client.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(buf, b"hello"); + tx.send(()).unwrap(); + } + + #[test] + fn local_set_block_on_socket() { + let mut rt = rt(); + let local = task::LocalSet::new(); + + local.block_on(&mut rt, async move { + let (tx, rx) = oneshot::channel(); + + let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + task::spawn_local(async move { + let _ = listener.accept().await; + tx.send(()).unwrap(); + }); + + TcpStream::connect(&addr).await.unwrap(); + rx.await.unwrap(); + }); + } + + #[test] + fn local_set_client_server_block_on() { + let mut rt = rt(); + let (tx, rx) = mpsc::channel(); + + let local = task::LocalSet::new(); + + local.block_on(&mut rt, async move { client_server_local(tx).await }); + + assert_ok!(rx.try_recv()); + assert_err!(rx.try_recv()); + } + + async fn client_server_local(tx: mpsc::Sender<()>) { + let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + + // Get the assigned address + let addr = assert_ok!(server.local_addr()); + + // Spawn the server + task::spawn_local(async move { + // Accept a socket + let (mut socket, _) = server.accept().await.unwrap(); + + // Write some data + socket.write_all(b"hello").await.unwrap(); + }); + + let mut client = TcpStream::connect(&addr).await.unwrap(); + + let mut buf = vec![]; + client.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(buf, b"hello"); + tx.send(()).unwrap(); + } + + #[test] + fn coop() { + use std::task::Poll::Ready; + + let mut rt = rt(); + + rt.block_on(async { + // Create a bunch of tasks + let mut tasks = (0..1_000).map(|_| { + tokio::spawn(async { }) + }).collect::<Vec<_>>(); + + // Hope that all the tasks complete... + time::delay_for(Duration::from_millis(100)).await; + + poll_fn(|cx| { + // At least one task should not be ready + for task in &mut tasks { + if Pin::new(task).poll(cx).is_pending() { + return Ready(()); + } + } + + panic!("did not yield"); + }).await; + }); + } + + // Tests that the "next task" scheduler optimization is not able to starve + // other tasks. + #[test] + fn ping_pong_saturation() { + use tokio::sync::mpsc; + + const NUM: usize = 100; + + let mut rt = rt(); + + rt.block_on(async { + let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel(); + + // Spawn a bunch of tasks that ping ping between each other to + // saturate the runtime. + for _ in 0..NUM { + let (tx1, mut rx1) = mpsc::unbounded_channel(); + let (tx2, mut rx2) = mpsc::unbounded_channel(); + let spawned_tx = spawned_tx.clone(); + + task::spawn(async move { + spawned_tx.send(()).unwrap(); + + tx1.send(()).unwrap(); + + loop { + rx2.recv().await.unwrap(); + tx1.send(()).unwrap(); + } + }); + + task::spawn(async move { + loop { + rx1.recv().await.unwrap(); + tx2.send(()).unwrap(); + } + }); + } + + for _ in 0..NUM { + spawned_rx.recv().await.unwrap(); + } + + // spawn another task and wait for it to complete + let handle = task::spawn(async { + for _ in 0..5 { + // Yielding forces it back into the local queue. + task::yield_now().await; + } + }); + handle.await.unwrap(); + }); + } +} diff --git a/third_party/rust/tokio/tests/rt_threaded.rs b/third_party/rust/tokio/tests/rt_threaded.rs new file mode 100644 index 0000000000..9c95afd5ae --- /dev/null +++ b/third_party/rust/tokio/tests/rt_threaded.rs @@ -0,0 +1,327 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::runtime::{self, Runtime}; +use tokio::sync::oneshot; +use tokio_test::{assert_err, assert_ok}; + +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::{mpsc, Arc}; +use std::task::{Context, Poll}; + +#[test] +fn single_thread() { + // No panic when starting a runtime w/ a single thread + let _ = runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .core_threads(1) + .build(); +} + +#[test] +fn many_oneshot_futures() { + // used for notifying the main thread + const NUM: usize = 1_000; + + for _ in 0..5 { + let (tx, rx) = mpsc::channel(); + + let rt = rt(); + let cnt = Arc::new(AtomicUsize::new(0)); + + for _ in 0..NUM { + let cnt = cnt.clone(); + let tx = tx.clone(); + + rt.spawn(async move { + let num = cnt.fetch_add(1, Relaxed) + 1; + + if num == NUM { + tx.send(()).unwrap(); + } + }); + } + + rx.recv().unwrap(); + + // Wait for the pool to shutdown + drop(rt); + } +} +#[test] +fn many_multishot_futures() { + use tokio::sync::mpsc; + + const CHAIN: usize = 200; + const CYCLES: usize = 5; + const TRACKS: usize = 50; + + for _ in 0..50 { + let mut rt = rt(); + let mut start_txs = Vec::with_capacity(TRACKS); + let mut final_rxs = Vec::with_capacity(TRACKS); + + for _ in 0..TRACKS { + let (start_tx, mut chain_rx) = mpsc::channel(10); + + for _ in 0..CHAIN { + let (mut next_tx, next_rx) = mpsc::channel(10); + + // Forward all the messages + rt.spawn(async move { + while let Some(v) = chain_rx.recv().await { + next_tx.send(v).await.unwrap(); + } + }); + + chain_rx = next_rx; + } + + // This final task cycles if needed + let (mut final_tx, final_rx) = mpsc::channel(10); + let mut cycle_tx = start_tx.clone(); + let mut rem = CYCLES; + + rt.spawn(async move { + for _ in 0..CYCLES { + let msg = chain_rx.recv().await.unwrap(); + + rem -= 1; + + if rem == 0 { + final_tx.send(msg).await.unwrap(); + } else { + cycle_tx.send(msg).await.unwrap(); + } + } + }); + + start_txs.push(start_tx); + final_rxs.push(final_rx); + } + + { + rt.block_on(async move { + for mut start_tx in start_txs { + start_tx.send("ping").await.unwrap(); + } + + for mut final_rx in final_rxs { + final_rx.recv().await.unwrap(); + } + }); + } + } +} + +#[test] +fn spawn_shutdown() { + let mut rt = rt(); + let (tx, rx) = mpsc::channel(); + + rt.block_on(async { + tokio::spawn(client_server(tx.clone())); + }); + + // Use spawner + rt.spawn(client_server(tx)); + + assert_ok!(rx.recv()); + assert_ok!(rx.recv()); + + drop(rt); + assert_err!(rx.try_recv()); +} + +async fn client_server(tx: mpsc::Sender<()>) { + let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + + // Get the assigned address + let addr = assert_ok!(server.local_addr()); + + // Spawn the server + tokio::spawn(async move { + // Accept a socket + let (mut socket, _) = server.accept().await.unwrap(); + + // Write some data + socket.write_all(b"hello").await.unwrap(); + }); + + let mut client = TcpStream::connect(&addr).await.unwrap(); + + let mut buf = vec![]; + client.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(buf, b"hello"); + tx.send(()).unwrap(); +} + +#[test] +fn drop_threadpool_drops_futures() { + for _ in 0..1_000 { + let num_inc = Arc::new(AtomicUsize::new(0)); + let num_dec = Arc::new(AtomicUsize::new(0)); + let num_drop = Arc::new(AtomicUsize::new(0)); + + struct Never(Arc<AtomicUsize>); + + impl Future for Never { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + Poll::Pending + } + } + + impl Drop for Never { + fn drop(&mut self) { + self.0.fetch_add(1, Relaxed); + } + } + + let a = num_inc.clone(); + let b = num_dec.clone(); + + let rt = runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .on_thread_start(move || { + a.fetch_add(1, Relaxed); + }) + .on_thread_stop(move || { + b.fetch_add(1, Relaxed); + }) + .build() + .unwrap(); + + rt.spawn(Never(num_drop.clone())); + + // Wait for the pool to shutdown + drop(rt); + + // Assert that only a single thread was spawned. + let a = num_inc.load(Relaxed); + assert!(a >= 1); + + // Assert that all threads shutdown + let b = num_dec.load(Relaxed); + assert_eq!(a, b); + + // Assert that the future was dropped + let c = num_drop.load(Relaxed); + assert_eq!(c, 1); + } +} + +#[test] +fn start_stop_callbacks_called() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + let after_start = Arc::new(AtomicUsize::new(0)); + let before_stop = Arc::new(AtomicUsize::new(0)); + + let after_inner = after_start.clone(); + let before_inner = before_stop.clone(); + let mut rt = tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .on_thread_start(move || { + after_inner.clone().fetch_add(1, Ordering::Relaxed); + }) + .on_thread_stop(move || { + before_inner.clone().fetch_add(1, Ordering::Relaxed); + }) + .build() + .unwrap(); + + let (tx, rx) = oneshot::channel(); + + rt.spawn(async move { + assert_ok!(tx.send(())); + }); + + assert_ok!(rt.block_on(rx)); + + drop(rt); + + assert!(after_start.load(Ordering::Relaxed) > 0); + assert!(before_stop.load(Ordering::Relaxed) > 0); +} + +#[test] +fn blocking() { + // used for notifying the main thread + const NUM: usize = 1_000; + + for _ in 0..10 { + let (tx, rx) = mpsc::channel(); + + let rt = rt(); + let cnt = Arc::new(AtomicUsize::new(0)); + + // there are four workers in the pool + // so, if we run 4 blocking tasks, we know that handoff must have happened + let block = Arc::new(std::sync::Barrier::new(5)); + for _ in 0..4 { + let block = block.clone(); + rt.spawn(async move { + tokio::task::block_in_place(move || { + block.wait(); + block.wait(); + }) + }); + } + block.wait(); + + for _ in 0..NUM { + let cnt = cnt.clone(); + let tx = tx.clone(); + + rt.spawn(async move { + let num = cnt.fetch_add(1, Relaxed) + 1; + + if num == NUM { + tx.send(()).unwrap(); + } + }); + } + + rx.recv().unwrap(); + + // Wait for the pool to shutdown + block.wait(); + } +} + +#[test] +fn multi_threadpool() { + use tokio::sync::oneshot; + + let rt1 = rt(); + let rt2 = rt(); + + let (tx, rx) = oneshot::channel(); + let (done_tx, done_rx) = mpsc::channel(); + + rt2.spawn(async move { + rx.await.unwrap(); + done_tx.send(()).unwrap(); + }); + + rt1.spawn(async move { + tx.send(()).unwrap(); + }); + + done_rx.recv().unwrap(); +} + +fn rt() -> Runtime { + Runtime::new().unwrap() +} diff --git a/third_party/rust/tokio/tests/signal_ctrl_c.rs b/third_party/rust/tokio/tests/signal_ctrl_c.rs new file mode 100644 index 0000000000..4b057ee7e1 --- /dev/null +++ b/third_party/rust/tokio/tests/signal_ctrl_c.rs @@ -0,0 +1,30 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(unix)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::signal; +use tokio::sync::oneshot; +use tokio_test::assert_ok; + +#[tokio::test] +async fn ctrl_c() { + let ctrl_c = signal::ctrl_c(); + + let (fire, wait) = oneshot::channel(); + + // NB: simulate a signal coming in by exercising our signal handler + // to avoid complications with sending SIGINT to the test process + tokio::spawn(async { + wait.await.expect("wait failed"); + send_signal(libc::SIGINT); + }); + + let _ = fire.send(()); + + assert_ok!(ctrl_c.await); +} diff --git a/third_party/rust/tokio/tests/signal_drop_recv.rs b/third_party/rust/tokio/tests/signal_drop_recv.rs new file mode 100644 index 0000000000..b0d9213e61 --- /dev/null +++ b/third_party/rust/tokio/tests/signal_drop_recv.rs @@ -0,0 +1,22 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(unix)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::signal::unix::{signal, SignalKind}; + +#[tokio::test] +async fn drop_then_get_a_signal() { + let kind = SignalKind::user_defined1(); + let sig = signal(kind).expect("failed to create first signal"); + drop(sig); + + send_signal(libc::SIGUSR1); + let mut sig = signal(kind).expect("failed to create second signal"); + + let _ = sig.recv().await; +} diff --git a/third_party/rust/tokio/tests/signal_drop_rt.rs b/third_party/rust/tokio/tests/signal_drop_rt.rs new file mode 100644 index 0000000000..aeedd96e4e --- /dev/null +++ b/third_party/rust/tokio/tests/signal_drop_rt.rs @@ -0,0 +1,45 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(unix)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::runtime::Runtime; +use tokio::signal::unix::{signal, SignalKind}; + +#[test] +fn dropping_loops_does_not_cause_starvation() { + let kind = SignalKind::user_defined1(); + + let mut first_rt = rt(); + let mut first_signal = + first_rt.block_on(async { signal(kind).expect("failed to register first signal") }); + + let mut second_rt = rt(); + let mut second_signal = + second_rt.block_on(async { signal(kind).expect("failed to register second signal") }); + + send_signal(libc::SIGUSR1); + + first_rt + .block_on(first_signal.recv()) + .expect("failed to await first signal"); + + drop(first_rt); + drop(first_signal); + + send_signal(libc::SIGUSR1); + + second_rt.block_on(second_signal.recv()); +} + +fn rt() -> Runtime { + tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap() +} diff --git a/third_party/rust/tokio/tests/signal_drop_signal.rs b/third_party/rust/tokio/tests/signal_drop_signal.rs new file mode 100644 index 0000000000..92ac4050d5 --- /dev/null +++ b/third_party/rust/tokio/tests/signal_drop_signal.rs @@ -0,0 +1,26 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(unix)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::signal::unix::{signal, SignalKind}; + +#[tokio::test] +async fn dropping_signal_does_not_deregister_any_other_instances() { + let kind = SignalKind::user_defined1(); + + // Signals should not starve based on ordering + let first_duplicate_signal = signal(kind).expect("failed to register first duplicate signal"); + let mut sig = signal(kind).expect("failed to register signal"); + let second_duplicate_signal = signal(kind).expect("failed to register second duplicate signal"); + + drop(first_duplicate_signal); + drop(second_duplicate_signal); + + send_signal(libc::SIGUSR1); + let _ = sig.recv().await; +} diff --git a/third_party/rust/tokio/tests/signal_multi_rt.rs b/third_party/rust/tokio/tests/signal_multi_rt.rs new file mode 100644 index 0000000000..9d78469578 --- /dev/null +++ b/third_party/rust/tokio/tests/signal_multi_rt.rs @@ -0,0 +1,55 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(unix)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::runtime::Runtime; +use tokio::signal::unix::{signal, SignalKind}; + +use std::sync::mpsc::channel; +use std::thread; + +#[test] +fn multi_loop() { + // An "ordinary" (non-future) channel + let (sender, receiver) = channel(); + // Run multiple times, to make sure there are no race conditions + for _ in 0..10 { + // Run multiple event loops, each one in its own thread + let threads: Vec<_> = (0..4) + .map(|_| { + let sender = sender.clone(); + thread::spawn(move || { + let mut rt = rt(); + let _ = rt.block_on(async { + let mut signal = signal(SignalKind::hangup()).unwrap(); + sender.send(()).unwrap(); + signal.recv().await + }); + }) + }) + .collect(); + // Wait for them to declare they're ready + for &_ in threads.iter() { + receiver.recv().unwrap(); + } + // Send a signal + send_signal(libc::SIGHUP); + // Make sure the threads terminated correctly + for t in threads { + t.join().unwrap(); + } + } +} + +fn rt() -> Runtime { + tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap() +} diff --git a/third_party/rust/tokio/tests/signal_no_rt.rs b/third_party/rust/tokio/tests/signal_no_rt.rs new file mode 100644 index 0000000000..b0f32b2d10 --- /dev/null +++ b/third_party/rust/tokio/tests/signal_no_rt.rs @@ -0,0 +1,11 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(unix)] + +use tokio::signal::unix::{signal, SignalKind}; + +#[test] +#[should_panic] +fn no_runtime_panics_creating_signals() { + let _ = signal(SignalKind::hangup()); +} diff --git a/third_party/rust/tokio/tests/signal_notify_both.rs b/third_party/rust/tokio/tests/signal_notify_both.rs new file mode 100644 index 0000000000..3481f808b3 --- /dev/null +++ b/third_party/rust/tokio/tests/signal_notify_both.rs @@ -0,0 +1,23 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(unix)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::signal::unix::{signal, SignalKind}; + +#[tokio::test] +async fn notify_both() { + let kind = SignalKind::user_defined2(); + + let mut signal1 = signal(kind).expect("failed to create signal1"); + let mut signal2 = signal(kind).expect("failed to create signal2"); + + send_signal(libc::SIGUSR2); + + signal1.recv().await; + signal2.recv().await; +} diff --git a/third_party/rust/tokio/tests/signal_twice.rs b/third_party/rust/tokio/tests/signal_twice.rs new file mode 100644 index 0000000000..8f33d22a82 --- /dev/null +++ b/third_party/rust/tokio/tests/signal_twice.rs @@ -0,0 +1,22 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(unix)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::signal::unix::{signal, SignalKind}; + +#[tokio::test] +async fn twice() { + let kind = SignalKind::user_defined1(); + let mut sig = signal(kind).expect("failed to get signal"); + + for _ in 0..2 { + send_signal(libc::SIGUSR1); + + assert!(sig.recv().await.is_some()); + } +} diff --git a/third_party/rust/tokio/tests/signal_usr1.rs b/third_party/rust/tokio/tests/signal_usr1.rs new file mode 100644 index 0000000000..d74c7d31ab --- /dev/null +++ b/third_party/rust/tokio/tests/signal_usr1.rs @@ -0,0 +1,23 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(unix)] + +mod support { + pub mod signal; +} +use support::signal::send_signal; + +use tokio::signal::unix::{signal, SignalKind}; +use tokio_test::assert_ok; + +#[tokio::test] +async fn signal_usr1() { + let mut signal = assert_ok!( + signal(SignalKind::user_defined1()), + "failed to create signal" + ); + + send_signal(libc::SIGUSR1); + + signal.recv().await; +} diff --git a/third_party/rust/tokio/tests/stream_chain.rs b/third_party/rust/tokio/tests/stream_chain.rs new file mode 100644 index 0000000000..0e14618b49 --- /dev/null +++ b/third_party/rust/tokio/tests/stream_chain.rs @@ -0,0 +1,71 @@ +use tokio::stream::{self, Stream, StreamExt}; +use tokio::sync::mpsc; +use tokio_test::{assert_pending, assert_ready, task}; + +#[tokio::test] +async fn basic_usage() { + let one = stream::iter(vec![1, 2, 3]); + let two = stream::iter(vec![4, 5, 6]); + + let mut stream = one.chain(two); + + assert_eq!(stream.size_hint(), (6, Some(6))); + assert_eq!(stream.next().await, Some(1)); + + assert_eq!(stream.size_hint(), (5, Some(5))); + assert_eq!(stream.next().await, Some(2)); + + assert_eq!(stream.size_hint(), (4, Some(4))); + assert_eq!(stream.next().await, Some(3)); + + assert_eq!(stream.size_hint(), (3, Some(3))); + assert_eq!(stream.next().await, Some(4)); + + assert_eq!(stream.size_hint(), (2, Some(2))); + assert_eq!(stream.next().await, Some(5)); + + assert_eq!(stream.size_hint(), (1, Some(1))); + assert_eq!(stream.next().await, Some(6)); + + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(stream.next().await, None); + + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(stream.next().await, None); +} + +#[tokio::test] +async fn pending_first() { + let (tx1, rx1) = mpsc::unbounded_channel(); + let (tx2, rx2) = mpsc::unbounded_channel(); + + let mut stream = task::spawn(rx1.chain(rx2)); + assert_eq!(stream.size_hint(), (0, None)); + + assert_pending!(stream.poll_next()); + + tx2.send(2).unwrap(); + assert!(!stream.is_woken()); + + assert_pending!(stream.poll_next()); + + tx1.send(1).unwrap(); + assert!(stream.is_woken()); + assert_eq!(Some(1), assert_ready!(stream.poll_next())); + + assert_pending!(stream.poll_next()); + + drop(tx1); + + assert_eq!(stream.size_hint(), (0, None)); + + assert!(stream.is_woken()); + assert_eq!(Some(2), assert_ready!(stream.poll_next())); + + assert_eq!(stream.size_hint(), (0, None)); + + drop(tx2); + + assert_eq!(stream.size_hint(), (0, None)); + assert_eq!(None, assert_ready!(stream.poll_next())); +} diff --git a/third_party/rust/tokio/tests/stream_collect.rs b/third_party/rust/tokio/tests/stream_collect.rs new file mode 100644 index 0000000000..70051e7f67 --- /dev/null +++ b/third_party/rust/tokio/tests/stream_collect.rs @@ -0,0 +1,172 @@ +use tokio::stream::{self, StreamExt}; +use tokio::sync::mpsc; +use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; + +use bytes::{Bytes, BytesMut}; + +#[allow(clippy::let_unit_value)] +#[tokio::test] +async fn empty_unit() { + // Drains the stream. + let mut iter = vec![(), (), ()].into_iter(); + let _: () = stream::iter(&mut iter).collect().await; + assert!(iter.next().is_none()); +} + +#[tokio::test] +async fn empty_vec() { + let coll: Vec<u32> = stream::empty().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_box_slice() { + let coll: Box<[u32]> = stream::empty().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_bytes() { + let coll: Bytes = stream::empty::<&[u8]>().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_bytes_mut() { + let coll: BytesMut = stream::empty::<&[u8]>().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_string() { + let coll: String = stream::empty::<&str>().collect().await; + assert!(coll.is_empty()); +} + +#[tokio::test] +async fn empty_result() { + let coll: Result<Vec<u32>, &str> = stream::empty().collect().await; + assert_eq!(Ok(vec![]), coll); +} + +#[tokio::test] +async fn collect_vec_items() { + let (tx, rx) = mpsc::unbounded_channel(); + let mut fut = task::spawn(rx.collect::<Vec<i32>>()); + + assert_pending!(fut.poll()); + + tx.send(1).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(2).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!(vec![1, 2], coll); +} + +#[tokio::test] +async fn collect_string_items() { + let (tx, rx) = mpsc::unbounded_channel(); + let mut fut = task::spawn(rx.collect::<String>()); + + assert_pending!(fut.poll()); + + tx.send("hello ".to_string()).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send("world".to_string()).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!("hello world", coll); +} + +#[tokio::test] +async fn collect_str_items() { + let (tx, rx) = mpsc::unbounded_channel(); + let mut fut = task::spawn(rx.collect::<String>()); + + assert_pending!(fut.poll()); + + tx.send("hello ").unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send("world").unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!("hello world", coll); +} + +#[tokio::test] +async fn collect_bytes() { + let (tx, rx) = mpsc::unbounded_channel(); + let mut fut = task::spawn(rx.collect::<Bytes>()); + + assert_pending!(fut.poll()); + + tx.send(&b"hello "[..]).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(&b"world"[..]).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready!(fut.poll()); + assert_eq!(&b"hello world"[..], coll); +} + +#[tokio::test] +async fn collect_results_ok() { + let (tx, rx) = mpsc::unbounded_channel(); + let mut fut = task::spawn(rx.collect::<Result<String, &str>>()); + + assert_pending!(fut.poll()); + + tx.send(Ok("hello ")).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(Ok("world")).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + drop(tx); + assert!(fut.is_woken()); + let coll = assert_ready_ok!(fut.poll()); + assert_eq!("hello world", coll); +} + +#[tokio::test] +async fn collect_results_err() { + let (tx, rx) = mpsc::unbounded_channel(); + let mut fut = task::spawn(rx.collect::<Result<String, &str>>()); + + assert_pending!(fut.poll()); + + tx.send(Ok("hello ")).unwrap(); + assert!(fut.is_woken()); + assert_pending!(fut.poll()); + + tx.send(Err("oh no")).unwrap(); + assert!(fut.is_woken()); + let err = assert_ready_err!(fut.poll()); + assert_eq!("oh no", err); +} diff --git a/third_party/rust/tokio/tests/stream_empty.rs b/third_party/rust/tokio/tests/stream_empty.rs new file mode 100644 index 0000000000..f278076d1a --- /dev/null +++ b/third_party/rust/tokio/tests/stream_empty.rs @@ -0,0 +1,11 @@ +use tokio::stream::{self, Stream, StreamExt}; + +#[tokio::test] +async fn basic_usage() { + let mut stream = stream::empty::<i32>(); + + for _ in 0..2 { + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(None, stream.next().await); + } +} diff --git a/third_party/rust/tokio/tests/stream_fuse.rs b/third_party/rust/tokio/tests/stream_fuse.rs new file mode 100644 index 0000000000..9d7d969f8b --- /dev/null +++ b/third_party/rust/tokio/tests/stream_fuse.rs @@ -0,0 +1,50 @@ +use tokio::stream::{Stream, StreamExt}; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +// a stream which alternates between Some and None +struct Alternate { + state: i32, +} + +impl Stream for Alternate { + type Item = i32; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> { + let val = self.state; + self.state += 1; + + // if it's even, Some(i32), else None + if val % 2 == 0 { + Poll::Ready(Some(val)) + } else { + Poll::Ready(None) + } + } +} + +#[tokio::test] +async fn basic_usage() { + let mut stream = Alternate { state: 0 }; + + // the stream goes back and forth + assert_eq!(stream.next().await, Some(0)); + assert_eq!(stream.next().await, None); + assert_eq!(stream.next().await, Some(2)); + assert_eq!(stream.next().await, None); + + // however, once it is fused + let mut stream = stream.fuse(); + + assert_eq!(stream.size_hint(), (0, None)); + assert_eq!(stream.next().await, Some(4)); + + assert_eq!(stream.size_hint(), (0, None)); + assert_eq!(stream.next().await, None); + + // it will always return `None` after the first time. + assert_eq!(stream.size_hint(), (0, Some(0))); + assert_eq!(stream.next().await, None); + assert_eq!(stream.size_hint(), (0, Some(0))); +} diff --git a/third_party/rust/tokio/tests/stream_iter.rs b/third_party/rust/tokio/tests/stream_iter.rs new file mode 100644 index 0000000000..45148a7a8b --- /dev/null +++ b/third_party/rust/tokio/tests/stream_iter.rs @@ -0,0 +1,18 @@ +use tokio::stream; +use tokio_test::task; + +use std::iter; + +#[tokio::test] +async fn coop() { + let mut stream = task::spawn(stream::iter(iter::repeat(1))); + + for _ in 0..10_000 { + if stream.poll_next().is_pending() { + assert!(stream.is_woken()); + return; + } + } + + panic!("did not yield"); +} diff --git a/third_party/rust/tokio/tests/stream_merge.rs b/third_party/rust/tokio/tests/stream_merge.rs new file mode 100644 index 0000000000..f0168d72ee --- /dev/null +++ b/third_party/rust/tokio/tests/stream_merge.rs @@ -0,0 +1,54 @@ +use tokio::stream::{self, Stream, StreamExt}; +use tokio::sync::mpsc; +use tokio_test::task; +use tokio_test::{assert_pending, assert_ready}; + +#[tokio::test] +async fn merge_sync_streams() { + let mut s = stream::iter(vec![0, 2, 4, 6]).merge(stream::iter(vec![1, 3, 5])); + + for i in 0..7 { + let rem = 7 - i; + assert_eq!(s.size_hint(), (rem, Some(rem))); + assert_eq!(Some(i), s.next().await); + } + + assert!(s.next().await.is_none()); +} + +#[tokio::test] +async fn merge_async_streams() { + let (tx1, rx1) = mpsc::unbounded_channel(); + let (tx2, rx2) = mpsc::unbounded_channel(); + + let mut rx = task::spawn(rx1.merge(rx2)); + + assert_eq!(rx.size_hint(), (0, None)); + + assert_pending!(rx.poll_next()); + + tx1.send(1).unwrap(); + + assert!(rx.is_woken()); + assert_eq!(Some(1), assert_ready!(rx.poll_next())); + + assert_pending!(rx.poll_next()); + tx2.send(2).unwrap(); + + assert!(rx.is_woken()); + assert_eq!(Some(2), assert_ready!(rx.poll_next())); + assert_pending!(rx.poll_next()); + + drop(tx1); + assert!(rx.is_woken()); + assert_pending!(rx.poll_next()); + + tx2.send(3).unwrap(); + assert!(rx.is_woken()); + assert_eq!(Some(3), assert_ready!(rx.poll_next())); + assert_pending!(rx.poll_next()); + + drop(tx2); + assert!(rx.is_woken()); + assert_eq!(None, assert_ready!(rx.poll_next())); +} diff --git a/third_party/rust/tokio/tests/stream_once.rs b/third_party/rust/tokio/tests/stream_once.rs new file mode 100644 index 0000000000..bb4635ac9e --- /dev/null +++ b/third_party/rust/tokio/tests/stream_once.rs @@ -0,0 +1,12 @@ +use tokio::stream::{self, Stream, StreamExt}; + +#[tokio::test] +async fn basic_usage() { + let mut one = stream::once(1); + + assert_eq!(one.size_hint(), (1, Some(1))); + assert_eq!(Some(1), one.next().await); + + assert_eq!(one.size_hint(), (0, Some(0))); + assert_eq!(None, one.next().await); +} diff --git a/third_party/rust/tokio/tests/stream_pending.rs b/third_party/rust/tokio/tests/stream_pending.rs new file mode 100644 index 0000000000..f4d3080de8 --- /dev/null +++ b/third_party/rust/tokio/tests/stream_pending.rs @@ -0,0 +1,14 @@ +use tokio::stream::{self, Stream, StreamExt}; +use tokio_test::{assert_pending, task}; + +#[tokio::test] +async fn basic_usage() { + let mut stream = stream::pending::<i32>(); + + for _ in 0..2 { + assert_eq!(stream.size_hint(), (0, None)); + + let mut next = task::spawn(async { stream.next().await }); + assert_pending!(next.poll()); + } +} diff --git a/third_party/rust/tokio/tests/stream_reader.rs b/third_party/rust/tokio/tests/stream_reader.rs new file mode 100644 index 0000000000..8370df4dac --- /dev/null +++ b/third_party/rust/tokio/tests/stream_reader.rs @@ -0,0 +1,35 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use bytes::Bytes; +use tokio::io::{stream_reader, AsyncReadExt}; +use tokio::stream::iter; + +#[tokio::test] +async fn test_stream_reader() -> std::io::Result<()> { + let stream = iter(vec![ + Ok(Bytes::from_static(&[])), + Ok(Bytes::from_static(&[0, 1, 2, 3])), + Ok(Bytes::from_static(&[])), + Ok(Bytes::from_static(&[4, 5, 6, 7])), + Ok(Bytes::from_static(&[])), + Ok(Bytes::from_static(&[8, 9, 10, 11])), + Ok(Bytes::from_static(&[])), + ]); + + let mut read = stream_reader(stream); + + let mut buf = [0; 5]; + read.read_exact(&mut buf).await?; + assert_eq!(buf, [0, 1, 2, 3, 4]); + + assert_eq!(read.read(&mut buf).await?, 3); + assert_eq!(&buf[..3], [5, 6, 7]); + + assert_eq!(read.read(&mut buf).await?, 4); + assert_eq!(&buf[..4], [8, 9, 10, 11]); + + assert_eq!(read.read(&mut buf).await?, 0); + + Ok(()) +} diff --git a/third_party/rust/tokio/tests/stream_stream_map.rs b/third_party/rust/tokio/tests/stream_stream_map.rs new file mode 100644 index 0000000000..6b49803234 --- /dev/null +++ b/third_party/rust/tokio/tests/stream_stream_map.rs @@ -0,0 +1,374 @@ +use tokio::stream::{self, pending, Stream, StreamExt, StreamMap}; +use tokio::sync::mpsc; +use tokio_test::{assert_ok, assert_pending, assert_ready, task}; + +use std::pin::Pin; + +macro_rules! assert_ready_some { + ($($t:tt)*) => { + match assert_ready!($($t)*) { + Some(v) => v, + None => panic!("expected `Some`, got `None`"), + } + }; +} + +macro_rules! assert_ready_none { + ($($t:tt)*) => { + match assert_ready!($($t)*) { + None => {} + Some(v) => panic!("expected `None`, got `Some({:?})`", v), + } + }; +} + +#[tokio::test] +async fn empty() { + let mut map = StreamMap::<&str, stream::Pending<()>>::new(); + + assert_eq!(map.len(), 0); + assert!(map.is_empty()); + + assert!(map.next().await.is_none()); + assert!(map.next().await.is_none()); + + assert!(map.remove("foo").is_none()); +} + +#[tokio::test] +async fn single_entry() { + let mut map = task::spawn(StreamMap::new()); + let (tx, rx) = mpsc::unbounded_channel(); + + assert_ready_none!(map.poll_next()); + + assert!(map.insert("foo", rx).is_none()); + assert!(map.contains_key("foo")); + assert!(!map.contains_key("bar")); + + assert_eq!(map.len(), 1); + assert!(!map.is_empty()); + + assert_pending!(map.poll_next()); + + assert_ok!(tx.send(1)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "foo"); + assert_eq!(v, 1); + + assert_pending!(map.poll_next()); + + assert_ok!(tx.send(2)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "foo"); + assert_eq!(v, 2); + + assert_pending!(map.poll_next()); + drop(tx); + assert!(map.is_woken()); + assert_ready_none!(map.poll_next()); +} + +#[tokio::test] +async fn multiple_entries() { + let mut map = task::spawn(StreamMap::new()); + let (tx1, rx1) = mpsc::unbounded_channel(); + let (tx2, rx2) = mpsc::unbounded_channel(); + + map.insert("foo", rx1); + map.insert("bar", rx2); + + assert_pending!(map.poll_next()); + + assert_ok!(tx1.send(1)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "foo"); + assert_eq!(v, 1); + + assert_pending!(map.poll_next()); + + assert_ok!(tx2.send(2)); + + assert!(map.is_woken()); + let (k, v) = assert_ready_some!(map.poll_next()); + assert_eq!(k, "bar"); + assert_eq!(v, 2); + + assert_pending!(map.poll_next()); + + assert_ok!(tx1.send(3)); + assert_ok!(tx2.send(4)); + + assert!(map.is_woken()); + + // Given the randomization, there is no guarantee what order the values will + // be received in. + let mut v = (0..2) + .map(|_| assert_ready_some!(map.poll_next())) + .collect::<Vec<_>>(); + + assert_pending!(map.poll_next()); + + v.sort(); + assert_eq!(v[0].0, "bar"); + assert_eq!(v[0].1, 4); + assert_eq!(v[1].0, "foo"); + assert_eq!(v[1].1, 3); + + drop(tx1); + assert!(map.is_woken()); + assert_pending!(map.poll_next()); + drop(tx2); + + assert_ready_none!(map.poll_next()); +} + +#[tokio::test] +async fn insert_remove() { + let mut map = task::spawn(StreamMap::new()); + let (tx, rx) = mpsc::unbounded_channel(); + + assert_ready_none!(map.poll_next()); + + assert!(map.insert("foo", rx).is_none()); + let rx = map.remove("foo").unwrap(); + + assert_ok!(tx.send(1)); + + assert!(!map.is_woken()); + assert_ready_none!(map.poll_next()); + + assert!(map.insert("bar", rx).is_none()); + + let v = assert_ready_some!(map.poll_next()); + assert_eq!(v.0, "bar"); + assert_eq!(v.1, 1); + + assert!(map.remove("bar").is_some()); + assert_ready_none!(map.poll_next()); + + assert!(map.is_empty()); + assert_eq!(0, map.len()); +} + +#[tokio::test] +async fn replace() { + let mut map = task::spawn(StreamMap::new()); + let (tx1, rx1) = mpsc::unbounded_channel(); + let (tx2, rx2) = mpsc::unbounded_channel(); + + assert!(map.insert("foo", rx1).is_none()); + + assert_pending!(map.poll_next()); + + let _rx1 = map.insert("foo", rx2).unwrap(); + + assert_pending!(map.poll_next()); + + tx1.send(1).unwrap(); + assert_pending!(map.poll_next()); + + tx2.send(2).unwrap(); + assert!(map.is_woken()); + let v = assert_ready_some!(map.poll_next()); + assert_eq!(v.0, "foo"); + assert_eq!(v.1, 2); +} + +#[test] +fn size_hint_with_upper() { + let mut map = StreamMap::new(); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + assert_eq!(3, map.len()); + assert!(!map.is_empty()); + + let size_hint = map.size_hint(); + assert_eq!(size_hint, (6, Some(6))); +} + +#[test] +fn size_hint_without_upper() { + let mut map = StreamMap::new(); + + map.insert("a", pin_box(stream::iter(vec![1]))); + map.insert("b", pin_box(stream::iter(vec![1, 2]))); + map.insert("c", pin_box(pending())); + + let size_hint = map.size_hint(); + assert_eq!(size_hint, (3, None)); +} + +#[test] +fn new_capacity_zero() { + let map = StreamMap::<&str, stream::Pending<()>>::new(); + assert_eq!(0, map.capacity()); + + let keys = map.keys().collect::<Vec<_>>(); + assert!(keys.is_empty()); +} + +#[test] +fn with_capacity() { + let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10); + assert!(10 <= map.capacity()); + + let keys = map.keys().collect::<Vec<_>>(); + assert!(keys.is_empty()); +} + +#[test] +fn iter_keys() { + let mut map = StreamMap::new(); + + map.insert("a", pending::<i32>()); + map.insert("b", pending()); + map.insert("c", pending()); + + let mut keys = map.keys().collect::<Vec<_>>(); + keys.sort(); + + assert_eq!(&keys[..], &[&"a", &"b", &"c"]); +} + +#[test] +fn iter_values() { + let mut map = StreamMap::new(); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>(); + + size_hints.sort(); + + assert_eq!(&size_hints[..], &[1, 2, 3]); +} + +#[test] +fn iter_values_mut() { + let mut map = StreamMap::new(); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + let mut size_hints = map + .values_mut() + .map(|s: &mut _| s.size_hint().0) + .collect::<Vec<_>>(); + + size_hints.sort(); + + assert_eq!(&size_hints[..], &[1, 2, 3]); +} + +#[test] +fn clear() { + let mut map = task::spawn(StreamMap::new()); + + map.insert("a", stream::iter(vec![1])); + map.insert("b", stream::iter(vec![1, 2])); + map.insert("c", stream::iter(vec![1, 2, 3])); + + assert_ready_some!(map.poll_next()); + + map.clear(); + + assert_ready_none!(map.poll_next()); + assert!(map.is_empty()); +} + +#[test] +fn contains_key_borrow() { + let mut map = StreamMap::new(); + map.insert("foo".to_string(), pending::<()>()); + + assert!(map.contains_key("foo")); +} + +#[test] +fn one_ready_many_none() { + // Run a few times because of randomness + for _ in 0..100 { + let mut map = task::spawn(StreamMap::new()); + + map.insert(0, pin_box(stream::empty())); + map.insert(1, pin_box(stream::empty())); + map.insert(2, pin_box(stream::once("hello"))); + map.insert(3, pin_box(stream::pending())); + + let v = assert_ready_some!(map.poll_next()); + assert_eq!(v, (2, "hello")); + } +} + +proptest::proptest! { + #[test] + fn fuzz_pending_complete_mix(kinds: Vec<bool>) { + use std::task::{Context, Poll}; + + struct DidPoll<T> { + did_poll: bool, + inner: T, + } + + impl<T: Stream + Unpin> Stream for DidPoll<T> { + type Item = T::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) + -> Poll<Option<T::Item>> + { + self.did_poll = true; + Pin::new(&mut self.inner).poll_next(cx) + } + } + + for _ in 0..10 { + let mut map = task::spawn(StreamMap::new()); + let mut expect = 0; + + for (i, &is_empty) in kinds.iter().enumerate() { + let inner = if is_empty { + pin_box(stream::empty::<()>()) + } else { + expect += 1; + pin_box(stream::pending::<()>()) + }; + + let stream = DidPoll { + did_poll: false, + inner, + }; + + map.insert(i, stream); + } + + if expect == 0 { + assert_ready_none!(map.poll_next()); + } else { + assert_pending!(map.poll_next()); + + assert_eq!(expect, map.values().count()); + + for stream in map.values() { + assert!(stream.did_poll); + } + } + } + } +} + +fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> { + Box::pin(s) +} diff --git a/third_party/rust/tokio/tests/stream_timeout.rs b/third_party/rust/tokio/tests/stream_timeout.rs new file mode 100644 index 0000000000..f65c835196 --- /dev/null +++ b/third_party/rust/tokio/tests/stream_timeout.rs @@ -0,0 +1,109 @@ +#![cfg(feature = "full")] + +use tokio::stream::{self, StreamExt}; +use tokio::time::{self, delay_for, Duration}; +use tokio_test::*; + +use futures::StreamExt as _; + +async fn maybe_delay(idx: i32) -> i32 { + if idx % 2 == 0 { + delay_for(ms(200)).await; + } + idx +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} + +#[tokio::test] +async fn basic_usage() { + time::pause(); + + // Items 2 and 4 time out. If we run the stream until it completes, + // we end up with the following items: + // + // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)] + + let stream = stream::iter(1..=4).then(maybe_delay).timeout(ms(100)); + let mut stream = task::spawn(stream); + + // First item completes immediately + assert_ready_eq!(stream.poll_next(), Some(Ok(1))); + + // Second item is delayed 200ms, times out after 100ms + assert_pending!(stream.poll_next()); + + time::advance(ms(150)).await; + let v = assert_ready!(stream.poll_next()); + assert!(v.unwrap().is_err()); + + assert_pending!(stream.poll_next()); + + time::advance(ms(100)).await; + assert_ready_eq!(stream.poll_next(), Some(Ok(2))); + + // Third item is ready immediately + assert_ready_eq!(stream.poll_next(), Some(Ok(3))); + + // Fourth item is delayed 200ms, times out after 100ms + assert_pending!(stream.poll_next()); + + time::advance(ms(60)).await; + assert_pending!(stream.poll_next()); // nothing ready yet + + time::advance(ms(60)).await; + let v = assert_ready!(stream.poll_next()); + assert!(v.unwrap().is_err()); // timeout! + + time::advance(ms(120)).await; + assert_ready_eq!(stream.poll_next(), Some(Ok(4))); + + // Done. + assert_ready_eq!(stream.poll_next(), None); +} + +#[tokio::test] +async fn return_elapsed_errors_only_once() { + time::pause(); + + let stream = stream::iter(1..=3).then(maybe_delay).timeout(ms(50)); + let mut stream = task::spawn(stream); + + // First item completes immediately + assert_ready_eq!(stream.poll_next(), Some(Ok(1))); + + // Second item is delayed 200ms, times out after 50ms. Only one `Elapsed` + // error is returned. + assert_pending!(stream.poll_next()); + // + time::advance(ms(50)).await; + let v = assert_ready!(stream.poll_next()); + assert!(v.unwrap().is_err()); // timeout! + + // deadline elapses again, but no error is returned + time::advance(ms(50)).await; + assert_pending!(stream.poll_next()); + + time::advance(ms(100)).await; + assert_ready_eq!(stream.poll_next(), Some(Ok(2))); + assert_ready_eq!(stream.poll_next(), Some(Ok(3))); + + // Done + assert_ready_eq!(stream.poll_next(), None); +} + +#[tokio::test] +async fn no_timeouts() { + let stream = stream::iter(vec![1, 3, 5]) + .then(maybe_delay) + .timeout(ms(100)); + + let mut stream = task::spawn(stream); + + assert_ready_eq!(stream.poll_next(), Some(Ok(1))); + assert_ready_eq!(stream.poll_next(), Some(Ok(3))); + assert_ready_eq!(stream.poll_next(), Some(Ok(5))); + assert_ready_eq!(stream.poll_next(), None); +} diff --git a/third_party/rust/tokio/tests/support/mock_file.rs b/third_party/rust/tokio/tests/support/mock_file.rs new file mode 100644 index 0000000000..9895f835e6 --- /dev/null +++ b/third_party/rust/tokio/tests/support/mock_file.rs @@ -0,0 +1,281 @@ +#![allow(clippy::unnecessary_operation)] + +use std::collections::VecDeque; +use std::fmt; +use std::fs::{Metadata, Permissions}; +use std::io; +use std::io::prelude::*; +use std::io::SeekFrom; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +pub struct File { + shared: Arc<Mutex<Shared>>, +} + +pub struct Handle { + shared: Arc<Mutex<Shared>>, +} + +struct Shared { + calls: VecDeque<Call>, +} + +#[derive(Debug)] +enum Call { + Read(io::Result<Vec<u8>>), + Write(io::Result<Vec<u8>>), + Seek(SeekFrom, io::Result<u64>), + SyncAll(io::Result<()>), + SyncData(io::Result<()>), + SetLen(u64, io::Result<()>), +} + +impl Handle { + pub fn read(&self, data: &[u8]) -> &Self { + let mut s = self.shared.lock().unwrap(); + s.calls.push_back(Call::Read(Ok(data.to_owned()))); + self + } + + pub fn read_err(&self) -> &Self { + let mut s = self.shared.lock().unwrap(); + s.calls + .push_back(Call::Read(Err(io::ErrorKind::Other.into()))); + self + } + + pub fn write(&self, data: &[u8]) -> &Self { + let mut s = self.shared.lock().unwrap(); + s.calls.push_back(Call::Write(Ok(data.to_owned()))); + self + } + + pub fn write_err(&self) -> &Self { + let mut s = self.shared.lock().unwrap(); + s.calls + .push_back(Call::Write(Err(io::ErrorKind::Other.into()))); + self + } + + pub fn seek_start_ok(&self, offset: u64) -> &Self { + let mut s = self.shared.lock().unwrap(); + s.calls + .push_back(Call::Seek(SeekFrom::Start(offset), Ok(offset))); + self + } + + pub fn seek_current_ok(&self, offset: i64, ret: u64) -> &Self { + let mut s = self.shared.lock().unwrap(); + s.calls + .push_back(Call::Seek(SeekFrom::Current(offset), Ok(ret))); + self + } + + pub fn sync_all(&self) -> &Self { + let mut s = self.shared.lock().unwrap(); + s.calls.push_back(Call::SyncAll(Ok(()))); + self + } + + pub fn sync_all_err(&self) -> &Self { + let mut s = self.shared.lock().unwrap(); + s.calls + .push_back(Call::SyncAll(Err(io::ErrorKind::Other.into()))); + self + } + + pub fn sync_data(&self) -> &Self { + let mut s = self.shared.lock().unwrap(); + s.calls.push_back(Call::SyncData(Ok(()))); + self + } + + pub fn sync_data_err(&self) -> &Self { + let mut s = self.shared.lock().unwrap(); + s.calls + .push_back(Call::SyncData(Err(io::ErrorKind::Other.into()))); + self + } + + pub fn set_len(&self, size: u64) -> &Self { + let mut s = self.shared.lock().unwrap(); + s.calls.push_back(Call::SetLen(size, Ok(()))); + self + } + + pub fn set_len_err(&self, size: u64) -> &Self { + let mut s = self.shared.lock().unwrap(); + s.calls + .push_back(Call::SetLen(size, Err(io::ErrorKind::Other.into()))); + self + } + + pub fn remaining(&self) -> usize { + let s = self.shared.lock().unwrap(); + s.calls.len() + } +} + +impl Drop for Handle { + fn drop(&mut self) { + if !std::thread::panicking() { + let s = self.shared.lock().unwrap(); + assert_eq!(0, s.calls.len()); + } + } +} + +impl File { + pub fn open(_: PathBuf) -> io::Result<File> { + unimplemented!(); + } + + pub fn create(_: PathBuf) -> io::Result<File> { + unimplemented!(); + } + + pub fn mock() -> (Handle, File) { + let shared = Arc::new(Mutex::new(Shared { + calls: VecDeque::new(), + })); + + let handle = Handle { + shared: shared.clone(), + }; + let file = File { shared }; + + (handle, file) + } + + pub fn sync_all(&self) -> io::Result<()> { + use self::Call::*; + + let mut s = self.shared.lock().unwrap(); + + match s.calls.pop_front() { + Some(SyncAll(ret)) => ret, + Some(op) => panic!("expected next call to be {:?}; was sync_all", op), + None => panic!("did not expect call"), + } + } + + pub fn sync_data(&self) -> io::Result<()> { + use self::Call::*; + + let mut s = self.shared.lock().unwrap(); + + match s.calls.pop_front() { + Some(SyncData(ret)) => ret, + Some(op) => panic!("expected next call to be {:?}; was sync_all", op), + None => panic!("did not expect call"), + } + } + + pub fn set_len(&self, size: u64) -> io::Result<()> { + use self::Call::*; + + let mut s = self.shared.lock().unwrap(); + + match s.calls.pop_front() { + Some(SetLen(arg, ret)) => { + assert_eq!(arg, size); + ret + } + Some(op) => panic!("expected next call to be {:?}; was sync_all", op), + None => panic!("did not expect call"), + } + } + + pub fn metadata(&self) -> io::Result<Metadata> { + unimplemented!(); + } + + pub fn set_permissions(&self, _perm: Permissions) -> io::Result<()> { + unimplemented!(); + } + + pub fn try_clone(&self) -> io::Result<Self> { + unimplemented!(); + } +} + +impl Read for &'_ File { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + use self::Call::*; + + let mut s = self.shared.lock().unwrap(); + + match s.calls.pop_front() { + Some(Read(Ok(data))) => { + assert!(dst.len() >= data.len()); + assert!(dst.len() <= 16 * 1024, "actual = {}", dst.len()); // max buffer + + &mut dst[..data.len()].copy_from_slice(&data); + Ok(data.len()) + } + Some(Read(Err(e))) => Err(e), + Some(op) => panic!("expected next call to be {:?}; was a read", op), + None => panic!("did not expect call"), + } + } +} + +impl Write for &'_ File { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + use self::Call::*; + + let mut s = self.shared.lock().unwrap(); + + match s.calls.pop_front() { + Some(Write(Ok(data))) => { + assert_eq!(src, &data[..]); + Ok(src.len()) + } + Some(Write(Err(e))) => Err(e), + Some(op) => panic!("expected next call to be {:?}; was write", op), + None => panic!("did not expect call"), + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl Seek for &'_ File { + fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { + use self::Call::*; + + let mut s = self.shared.lock().unwrap(); + + match s.calls.pop_front() { + Some(Seek(expect, res)) => { + assert_eq!(expect, pos); + res + } + Some(op) => panic!("expected call {:?}; was `seek`", op), + None => panic!("did not expect call; was `seek`"), + } + } +} + +impl fmt::Debug for File { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("mock::File").finish() + } +} + +#[cfg(unix)] +impl std::os::unix::io::AsRawFd for File { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + unimplemented!(); + } +} + +#[cfg(windows)] +impl std::os::windows::io::AsRawHandle for File { + fn as_raw_handle(&self) -> std::os::windows::io::RawHandle { + unimplemented!(); + } +} diff --git a/third_party/rust/tokio/tests/support/mock_pool.rs b/third_party/rust/tokio/tests/support/mock_pool.rs new file mode 100644 index 0000000000..e1fdb42641 --- /dev/null +++ b/third_party/rust/tokio/tests/support/mock_pool.rs @@ -0,0 +1,66 @@ +use tokio::sync::oneshot; + +use std::cell::RefCell; +use std::collections::VecDeque; +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +thread_local! { + static QUEUE: RefCell<VecDeque<Box<dyn FnOnce() + Send>>> = RefCell::new(VecDeque::new()) +} + +#[derive(Debug)] +pub(crate) struct Blocking<T> { + rx: oneshot::Receiver<T>, +} + +pub(crate) fn run<F, R>(f: F) -> Blocking<R> +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + let (tx, rx) = oneshot::channel(); + let task = Box::new(move || { + let _ = tx.send(f()); + }); + + QUEUE.with(|cell| cell.borrow_mut().push_back(task)); + + Blocking { rx } +} + +impl<T> Future for Blocking<T> { + type Output = Result<T, io::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + use std::task::Poll::*; + + match Pin::new(&mut self.rx).poll(cx) { + Ready(Ok(v)) => Ready(Ok(v)), + Ready(Err(e)) => panic!("error = {:?}", e), + Pending => Pending, + } + } +} + +pub(crate) async fn asyncify<F, T>(f: F) -> io::Result<T> +where + F: FnOnce() -> io::Result<T> + Send + 'static, + T: Send + 'static, +{ + run(f).await? +} + +pub(crate) fn len() -> usize { + QUEUE.with(|cell| cell.borrow().len()) +} + +pub(crate) fn run_one() { + let task = QUEUE + .with(|cell| cell.borrow_mut().pop_front()) + .expect("expected task to run, but none ready"); + + task(); +} diff --git a/third_party/rust/tokio/tests/support/signal.rs b/third_party/rust/tokio/tests/support/signal.rs new file mode 100644 index 0000000000..ea06058764 --- /dev/null +++ b/third_party/rust/tokio/tests/support/signal.rs @@ -0,0 +1,7 @@ +pub fn send_signal(signal: libc::c_int) { + use libc::{getpid, kill}; + + unsafe { + assert_eq!(kill(getpid(), signal), 0); + } +} diff --git a/third_party/rust/tokio/tests/sync_barrier.rs b/third_party/rust/tokio/tests/sync_barrier.rs new file mode 100644 index 0000000000..f280fe8600 --- /dev/null +++ b/third_party/rust/tokio/tests/sync_barrier.rs @@ -0,0 +1,96 @@ +#![allow(clippy::unnecessary_operation)] +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::sync::Barrier; + +use tokio_test::task::spawn; +use tokio_test::{assert_pending, assert_ready}; + +struct IsSend<T: Send>(T); +#[test] +fn barrier_future_is_send() { + let b = Barrier::new(0); + IsSend(b.wait()); +} + +#[test] +fn zero_does_not_block() { + let b = Barrier::new(0); + + { + let mut w = spawn(b.wait()); + let wr = assert_ready!(w.poll()); + assert!(wr.is_leader()); + } + { + let mut w = spawn(b.wait()); + let wr = assert_ready!(w.poll()); + assert!(wr.is_leader()); + } +} + +#[test] +fn single() { + let b = Barrier::new(1); + + { + let mut w = spawn(b.wait()); + let wr = assert_ready!(w.poll()); + assert!(wr.is_leader()); + } + { + let mut w = spawn(b.wait()); + let wr = assert_ready!(w.poll()); + assert!(wr.is_leader()); + } + { + let mut w = spawn(b.wait()); + let wr = assert_ready!(w.poll()); + assert!(wr.is_leader()); + } +} + +#[test] +fn tango() { + let b = Barrier::new(2); + + let mut w1 = spawn(b.wait()); + assert_pending!(w1.poll()); + + let mut w2 = spawn(b.wait()); + let wr2 = assert_ready!(w2.poll()); + let wr1 = assert_ready!(w1.poll()); + + assert!(wr1.is_leader() || wr2.is_leader()); + assert!(!(wr1.is_leader() && wr2.is_leader())); +} + +#[test] +fn lots() { + let b = Barrier::new(100); + + for _ in 0..10 { + let mut wait = Vec::new(); + for _ in 0..99 { + let mut w = spawn(b.wait()); + assert_pending!(w.poll()); + wait.push(w); + } + for w in &mut wait { + assert_pending!(w.poll()); + } + + // pass the barrier + let mut w = spawn(b.wait()); + let mut found_leader = assert_ready!(w.poll()).is_leader(); + for mut w in wait { + let wr = assert_ready!(w.poll()); + if wr.is_leader() { + assert!(!found_leader); + found_leader = true; + } + } + assert!(found_leader); + } +} diff --git a/third_party/rust/tokio/tests/sync_broadcast.rs b/third_party/rust/tokio/tests/sync_broadcast.rs new file mode 100644 index 0000000000..e9e7b36610 --- /dev/null +++ b/third_party/rust/tokio/tests/sync_broadcast.rs @@ -0,0 +1,357 @@ +#![allow(clippy::cognitive_complexity)] +#![warn(rust_2018_idioms)] +#![cfg(feature = "sync")] + +use tokio::sync::broadcast; +use tokio_test::task; +use tokio_test::{ + assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, +}; + +use std::sync::Arc; + +macro_rules! assert_recv { + ($e:expr) => { + match $e.try_recv() { + Ok(value) => value, + Err(e) => panic!("expected recv; got = {:?}", e), + } + }; +} + +macro_rules! assert_empty { + ($e:expr) => { + match $e.try_recv() { + Ok(value) => panic!("expected empty; got = {:?}", value), + Err(broadcast::TryRecvError::Empty) => {} + Err(e) => panic!("expected empty; got = {:?}", e), + } + }; +} + +macro_rules! assert_lagged { + ($e:expr, $n:expr) => { + match assert_err!($e) { + broadcast::TryRecvError::Lagged(n) => { + assert_eq!(n, $n); + } + _ => panic!("did not lag"), + } + }; +} + +trait AssertSend: Send {} +impl AssertSend for broadcast::Sender<i32> {} +impl AssertSend for broadcast::Receiver<i32> {} + +#[test] +fn send_try_recv_bounded() { + let (tx, mut rx) = broadcast::channel(16); + + assert_empty!(rx); + + let n = assert_ok!(tx.send("hello")); + assert_eq!(n, 1); + + let val = assert_recv!(rx); + assert_eq!(val, "hello"); + + assert_empty!(rx); +} + +#[test] +fn send_two_recv() { + let (tx, mut rx1) = broadcast::channel(16); + let mut rx2 = tx.subscribe(); + + assert_empty!(rx1); + assert_empty!(rx2); + + let n = assert_ok!(tx.send("hello")); + assert_eq!(n, 2); + + let val = assert_recv!(rx1); + assert_eq!(val, "hello"); + + let val = assert_recv!(rx2); + assert_eq!(val, "hello"); + + assert_empty!(rx1); + assert_empty!(rx2); +} + +#[tokio::test] +async fn send_recv_stream() { + use tokio::stream::StreamExt; + + let (tx, mut rx) = broadcast::channel::<i32>(8); + + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + + assert_eq!(Some(Ok(1)), rx.next().await); + assert_eq!(Some(Ok(2)), rx.next().await); + + drop(tx); + + assert_eq!(None, rx.next().await); +} + +#[test] +fn send_recv_bounded() { + let (tx, mut rx) = broadcast::channel(16); + + let mut recv = task::spawn(rx.recv()); + + assert_pending!(recv.poll()); + + assert_ok!(tx.send("hello")); + + assert!(recv.is_woken()); + let val = assert_ready_ok!(recv.poll()); + assert_eq!(val, "hello"); +} + +#[test] +fn send_two_recv_bounded() { + let (tx, mut rx1) = broadcast::channel(16); + let mut rx2 = tx.subscribe(); + + let mut recv1 = task::spawn(rx1.recv()); + let mut recv2 = task::spawn(rx2.recv()); + + assert_pending!(recv1.poll()); + assert_pending!(recv2.poll()); + + assert_ok!(tx.send("hello")); + + assert!(recv1.is_woken()); + assert!(recv2.is_woken()); + + let val1 = assert_ready_ok!(recv1.poll()); + let val2 = assert_ready_ok!(recv2.poll()); + assert_eq!(val1, "hello"); + assert_eq!(val2, "hello"); + + drop((recv1, recv2)); + + let mut recv1 = task::spawn(rx1.recv()); + let mut recv2 = task::spawn(rx2.recv()); + + assert_pending!(recv1.poll()); + + assert_ok!(tx.send("world")); + + assert!(recv1.is_woken()); + assert!(!recv2.is_woken()); + + let val1 = assert_ready_ok!(recv1.poll()); + let val2 = assert_ready_ok!(recv2.poll()); + assert_eq!(val1, "world"); + assert_eq!(val2, "world"); +} + +#[test] +fn send_slow_rx() { + let (tx, mut rx1) = broadcast::channel(16); + let mut rx2 = tx.subscribe(); + + { + let mut recv2 = task::spawn(rx2.recv()); + + { + let mut recv1 = task::spawn(rx1.recv()); + + assert_pending!(recv1.poll()); + assert_pending!(recv2.poll()); + + assert_ok!(tx.send("one")); + + assert!(recv1.is_woken()); + assert!(recv2.is_woken()); + + assert_ok!(tx.send("two")); + + let val = assert_ready_ok!(recv1.poll()); + assert_eq!(val, "one"); + } + + let val = assert_ready_ok!(task::spawn(rx1.recv()).poll()); + assert_eq!(val, "two"); + + let mut recv1 = task::spawn(rx1.recv()); + + assert_pending!(recv1.poll()); + + assert_ok!(tx.send("three")); + + assert!(recv1.is_woken()); + + let val = assert_ready_ok!(recv1.poll()); + assert_eq!(val, "three"); + + let val = assert_ready_ok!(recv2.poll()); + assert_eq!(val, "one"); + } + + let val = assert_recv!(rx2); + assert_eq!(val, "two"); + + let val = assert_recv!(rx2); + assert_eq!(val, "three"); +} + +#[test] +fn drop_rx_while_values_remain() { + let (tx, mut rx1) = broadcast::channel(16); + let mut rx2 = tx.subscribe(); + + assert_ok!(tx.send("one")); + assert_ok!(tx.send("two")); + + assert_recv!(rx1); + assert_recv!(rx2); + + drop(rx2); + drop(rx1); +} + +#[test] +fn lagging_rx() { + let (tx, mut rx1) = broadcast::channel(2); + let mut rx2 = tx.subscribe(); + + assert_ok!(tx.send("one")); + assert_ok!(tx.send("two")); + + assert_eq!("one", assert_recv!(rx1)); + + assert_ok!(tx.send("three")); + + // Lagged too far + assert_lagged!(rx2.try_recv(), 1); + + // Calling again gets the next value + assert_eq!("two", assert_recv!(rx2)); + + assert_eq!("two", assert_recv!(rx1)); + assert_eq!("three", assert_recv!(rx1)); + + assert_ok!(tx.send("four")); + assert_ok!(tx.send("five")); + + assert_lagged!(rx2.try_recv(), 1); + + assert_ok!(tx.send("six")); + + assert_lagged!(rx2.try_recv(), 1); +} + +#[test] +fn send_no_rx() { + let (tx, _) = broadcast::channel(16); + + assert_err!(tx.send("hello")); + + let mut rx = tx.subscribe(); + + assert_ok!(tx.send("world")); + + let val = assert_recv!(rx); + assert_eq!("world", val); +} + +#[test] +#[should_panic] +fn zero_capacity() { + broadcast::channel::<()>(0); +} + +#[test] +#[should_panic] +fn capacity_too_big() { + use std::usize; + + broadcast::channel::<()>(1 + (usize::MAX >> 1)); +} + +#[test] +fn panic_in_clone() { + use std::panic::{self, AssertUnwindSafe}; + + #[derive(Eq, PartialEq, Debug)] + struct MyVal(usize); + + impl Clone for MyVal { + fn clone(&self) -> MyVal { + assert_ne!(0, self.0); + MyVal(self.0) + } + } + + let (tx, mut rx) = broadcast::channel(16); + + assert_ok!(tx.send(MyVal(0))); + assert_ok!(tx.send(MyVal(1))); + + let res = panic::catch_unwind(AssertUnwindSafe(|| { + let _ = rx.try_recv(); + })); + + assert_err!(res); + + let val = assert_recv!(rx); + assert_eq!(val, MyVal(1)); +} + +#[test] +fn dropping_tx_notifies_rx() { + let (tx, mut rx1) = broadcast::channel::<()>(16); + let mut rx2 = tx.subscribe(); + + let tx2 = tx.clone(); + + let mut recv1 = task::spawn(rx1.recv()); + let mut recv2 = task::spawn(rx2.recv()); + + assert_pending!(recv1.poll()); + assert_pending!(recv2.poll()); + + drop(tx); + + assert_pending!(recv1.poll()); + assert_pending!(recv2.poll()); + + drop(tx2); + + assert!(recv1.is_woken()); + assert!(recv2.is_woken()); + + let err = assert_ready_err!(recv1.poll()); + assert!(is_closed(err)); + + let err = assert_ready_err!(recv2.poll()); + assert!(is_closed(err)); +} + +#[test] +fn unconsumed_messages_are_dropped() { + let (tx, rx) = broadcast::channel(16); + + let msg = Arc::new(()); + + assert_ok!(tx.send(msg.clone())); + + assert_eq!(2, Arc::strong_count(&msg)); + + drop(rx); + + assert_eq!(1, Arc::strong_count(&msg)); +} + +fn is_closed(err: broadcast::RecvError) -> bool { + match err { + broadcast::RecvError::Closed => true, + _ => false, + } +} diff --git a/third_party/rust/tokio/tests/sync_errors.rs b/third_party/rust/tokio/tests/sync_errors.rs new file mode 100644 index 0000000000..66e8f0c098 --- /dev/null +++ b/third_party/rust/tokio/tests/sync_errors.rs @@ -0,0 +1,27 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +fn is_error<T: std::error::Error + Send + Sync>() {} + +#[test] +fn mpsc_error_bound() { + use tokio::sync::mpsc::error; + + is_error::<error::SendError<()>>(); + is_error::<error::TrySendError<()>>(); +} + +#[test] +fn oneshot_error_bound() { + use tokio::sync::oneshot::error; + + is_error::<error::RecvError>(); + is_error::<error::TryRecvError>(); +} + +#[test] +fn watch_error_bound() { + use tokio::sync::watch::error; + + is_error::<error::SendError<()>>(); +} diff --git a/third_party/rust/tokio/tests/sync_mpsc.rs b/third_party/rust/tokio/tests/sync_mpsc.rs new file mode 100644 index 0000000000..f02d90aa56 --- /dev/null +++ b/third_party/rust/tokio/tests/sync_mpsc.rs @@ -0,0 +1,492 @@ +#![allow(clippy::redundant_clone)] +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; +use tokio_test::task; +use tokio_test::{ + assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, +}; + +use std::sync::Arc; + +trait AssertSend: Send {} +impl AssertSend for mpsc::Sender<i32> {} +impl AssertSend for mpsc::Receiver<i32> {} + +#[test] +fn send_recv_with_buffer() { + let (tx, rx) = mpsc::channel::<i32>(16); + let mut tx = task::spawn(tx); + let mut rx = task::spawn(rx); + + // Using poll_ready / try_send + assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx))); + tx.try_send(1).unwrap(); + + // Without poll_ready + tx.try_send(2).unwrap(); + + drop(tx); + + let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); + assert_eq!(val, Some(1)); + + let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); + assert_eq!(val, Some(2)); + + let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); + assert!(val.is_none()); +} + +#[test] +fn disarm() { + let (tx, rx) = mpsc::channel::<i32>(2); + let mut tx1 = task::spawn(tx.clone()); + let mut tx2 = task::spawn(tx.clone()); + let mut tx3 = task::spawn(tx.clone()); + let mut tx4 = task::spawn(tx); + let mut rx = task::spawn(rx); + + // We should be able to `poll_ready` two handles without problem + assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); + assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx))); + + // But a third should not be ready + assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); + + // Using one of the reserved slots should allow a new handle to become ready + tx1.try_send(1).unwrap(); + // We also need to receive for the slot to be free + let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap(); + // Now there's a free slot! + assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); + assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx))); + + // Dropping a ready handle should also open up a slot + drop(tx2); + assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx))); + assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); + + // Explicitly disarming a handle should also open a slot + assert!(tx3.disarm()); + assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); + + // Disarming a non-armed sender does not free up a slot + assert!(!tx3.disarm()); + assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); +} + +#[tokio::test] +async fn send_recv_stream_with_buffer() { + use tokio::stream::StreamExt; + + let (mut tx, mut rx) = mpsc::channel::<i32>(16); + + tokio::spawn(async move { + assert_ok!(tx.send(1).await); + assert_ok!(tx.send(2).await); + }); + + assert_eq!(Some(1), rx.next().await); + assert_eq!(Some(2), rx.next().await); + assert_eq!(None, rx.next().await); +} + +#[tokio::test] +async fn async_send_recv_with_buffer() { + let (mut tx, mut rx) = mpsc::channel(16); + + tokio::spawn(async move { + assert_ok!(tx.send(1).await); + assert_ok!(tx.send(2).await); + }); + + assert_eq!(Some(1), rx.recv().await); + assert_eq!(Some(2), rx.recv().await); + assert_eq!(None, rx.recv().await); +} + +#[test] +fn start_send_past_cap() { + let mut t1 = task::spawn(()); + let mut t2 = task::spawn(()); + let mut t3 = task::spawn(()); + + let (mut tx1, mut rx) = mpsc::channel(1); + let mut tx2 = tx1.clone(); + + assert_ok!(tx1.try_send(())); + + t1.enter(|cx, _| { + assert_pending!(tx1.poll_ready(cx)); + }); + + t2.enter(|cx, _| { + assert_pending!(tx2.poll_ready(cx)); + }); + + drop(tx1); + + let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); + assert!(val.is_some()); + + assert!(t2.is_woken()); + assert!(!t1.is_woken()); + + drop(tx2); + + let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); + assert!(val.is_none()); +} + +#[test] +#[should_panic] +fn buffer_gteq_one() { + mpsc::channel::<i32>(0); +} + +#[test] +fn send_recv_unbounded() { + let mut t1 = task::spawn(()); + + let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); + + // Using `try_send` + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert_eq!(val, Some(1)); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert_eq!(val, Some(2)); + + drop(tx); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert!(val.is_none()); +} + +#[tokio::test] +async fn async_send_recv_unbounded() { + let (tx, mut rx) = mpsc::unbounded_channel(); + + tokio::spawn(async move { + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + }); + + assert_eq!(Some(1), rx.recv().await); + assert_eq!(Some(2), rx.recv().await); + assert_eq!(None, rx.recv().await); +} + +#[tokio::test] +async fn send_recv_stream_unbounded() { + use tokio::stream::StreamExt; + + let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); + + tokio::spawn(async move { + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + }); + + assert_eq!(Some(1), rx.next().await); + assert_eq!(Some(2), rx.next().await); + assert_eq!(None, rx.next().await); +} + +#[test] +fn no_t_bounds_buffer() { + struct NoImpls; + + let mut t1 = task::spawn(()); + let (tx, mut rx) = mpsc::channel(100); + + // sender should be Debug even though T isn't Debug + println!("{:?}", tx); + // same with Receiver + println!("{:?}", rx); + // and sender should be Clone even though T isn't Clone + assert!(tx.clone().try_send(NoImpls).is_ok()); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert!(val.is_some()); +} + +#[test] +fn no_t_bounds_unbounded() { + struct NoImpls; + + let mut t1 = task::spawn(()); + let (tx, mut rx) = mpsc::unbounded_channel(); + + // sender should be Debug even though T isn't Debug + println!("{:?}", tx); + // same with Receiver + println!("{:?}", rx); + // and sender should be Clone even though T isn't Clone + assert!(tx.clone().send(NoImpls).is_ok()); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert!(val.is_some()); +} + +#[test] +fn send_recv_buffer_limited() { + let mut t1 = task::spawn(()); + let mut t2 = task::spawn(()); + + let (mut tx, mut rx) = mpsc::channel::<i32>(1); + + // Run on a task context + t1.enter(|cx, _| { + assert_ready_ok!(tx.poll_ready(cx)); + + // Send first message + assert_ok!(tx.try_send(1)); + + // Not ready + assert_pending!(tx.poll_ready(cx)); + + // Send second message + assert_err!(tx.try_send(1337)); + }); + + t2.enter(|cx, _| { + // Take the value + let val = assert_ready!(rx.poll_recv(cx)); + assert_eq!(Some(1), val); + }); + + assert!(t1.is_woken()); + + t1.enter(|cx, _| { + assert_ready_ok!(tx.poll_ready(cx)); + + assert_ok!(tx.try_send(2)); + + // Not ready + assert_pending!(tx.poll_ready(cx)); + }); + + t2.enter(|cx, _| { + // Take the value + let val = assert_ready!(rx.poll_recv(cx)); + assert_eq!(Some(2), val); + }); + + t1.enter(|cx, _| { + assert_ready_ok!(tx.poll_ready(cx)); + }); +} + +#[test] +fn recv_close_gets_none_idle() { + let mut t1 = task::spawn(()); + + let (mut tx, mut rx) = mpsc::channel::<i32>(10); + + rx.close(); + + t1.enter(|cx, _| { + let val = assert_ready!(rx.poll_recv(cx)); + assert!(val.is_none()); + assert_ready_err!(tx.poll_ready(cx)); + }); +} + +#[test] +fn recv_close_gets_none_reserved() { + let mut t1 = task::spawn(()); + let mut t2 = task::spawn(()); + let mut t3 = task::spawn(()); + + let (mut tx1, mut rx) = mpsc::channel::<i32>(1); + let mut tx2 = tx1.clone(); + + assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx))); + + t2.enter(|cx, _| { + assert_pending!(tx2.poll_ready(cx)); + }); + + rx.close(); + + assert!(t2.is_woken()); + + t2.enter(|cx, _| { + assert_ready_err!(tx2.poll_ready(cx)); + }); + + t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx))); + + assert!(!t1.is_woken()); + assert!(!t2.is_woken()); + + assert_ok!(tx1.try_send(123)); + + assert!(t3.is_woken()); + + t3.enter(|cx, _| { + let v = assert_ready!(rx.poll_recv(cx)); + assert_eq!(v, Some(123)); + + let v = assert_ready!(rx.poll_recv(cx)); + assert!(v.is_none()); + }); +} + +#[test] +fn tx_close_gets_none() { + let mut t1 = task::spawn(()); + + let (_, mut rx) = mpsc::channel::<i32>(10); + + // Run on a task context + t1.enter(|cx, _| { + let v = assert_ready!(rx.poll_recv(cx)); + assert!(v.is_none()); + }); +} + +#[test] +fn try_send_fail() { + let mut t1 = task::spawn(()); + + let (mut tx, mut rx) = mpsc::channel(1); + + tx.try_send("hello").unwrap(); + + // This should fail + match assert_err!(tx.try_send("fail")) { + TrySendError::Full(..) => {} + _ => panic!(), + } + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert_eq!(val, Some("hello")); + + assert_ok!(tx.try_send("goodbye")); + drop(tx); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert_eq!(val, Some("goodbye")); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert!(val.is_none()); +} + +#[test] +fn drop_tx_with_permit_releases_permit() { + let mut t1 = task::spawn(()); + let mut t2 = task::spawn(()); + + // poll_ready reserves capacity, ensure that the capacity is released if tx + // is dropped w/o sending a value. + let (mut tx1, _rx) = mpsc::channel::<i32>(1); + let mut tx2 = tx1.clone(); + + assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx))); + + t2.enter(|cx, _| { + assert_pending!(tx2.poll_ready(cx)); + }); + + drop(tx1); + + assert!(t2.is_woken()); + + assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx))); +} + +#[test] +fn dropping_rx_closes_channel() { + let mut t1 = task::spawn(()); + + let (mut tx, rx) = mpsc::channel(100); + + let msg = Arc::new(()); + assert_ok!(tx.try_send(msg.clone())); + + drop(rx); + assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx))); + + assert_eq!(1, Arc::strong_count(&msg)); +} + +#[test] +fn dropping_rx_closes_channel_for_try() { + let (mut tx, rx) = mpsc::channel(100); + + let msg = Arc::new(()); + tx.try_send(msg.clone()).unwrap(); + + drop(rx); + + { + let err = assert_err!(tx.try_send(msg.clone())); + match err { + TrySendError::Closed(..) => {} + _ => panic!(), + } + } + + assert_eq!(1, Arc::strong_count(&msg)); +} + +#[test] +fn unconsumed_messages_are_dropped() { + let msg = Arc::new(()); + + let (mut tx, rx) = mpsc::channel(100); + + tx.try_send(msg.clone()).unwrap(); + + assert_eq!(2, Arc::strong_count(&msg)); + + drop((tx, rx)); + + assert_eq!(1, Arc::strong_count(&msg)); +} + +#[test] +fn try_recv() { + let (mut tx, mut rx) = mpsc::channel(1); + match rx.try_recv() { + Err(TryRecvError::Empty) => {} + _ => panic!(), + } + tx.try_send(42).unwrap(); + match rx.try_recv() { + Ok(42) => {} + _ => panic!(), + } + drop(tx); + match rx.try_recv() { + Err(TryRecvError::Closed) => {} + _ => panic!(), + } +} + +#[test] +fn try_recv_unbounded() { + let (tx, mut rx) = mpsc::unbounded_channel(); + match rx.try_recv() { + Err(TryRecvError::Empty) => {} + _ => panic!(), + } + tx.send(42).unwrap(); + match rx.try_recv() { + Ok(42) => {} + _ => panic!(), + } + drop(tx); + match rx.try_recv() { + Err(TryRecvError::Closed) => {} + _ => panic!(), + } +} diff --git a/third_party/rust/tokio/tests/sync_mutex.rs b/third_party/rust/tokio/tests/sync_mutex.rs new file mode 100644 index 0000000000..444ebd6a22 --- /dev/null +++ b/third_party/rust/tokio/tests/sync_mutex.rs @@ -0,0 +1,154 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::sync::Mutex; +use tokio::time::{interval, timeout}; +use tokio_test::task::spawn; +use tokio_test::{assert_pending, assert_ready}; + +use std::sync::Arc; +use std::time::Duration; + +#[test] +fn straight_execution() { + let l = Mutex::new(100); + + { + let mut t = spawn(l.lock()); + let mut g = assert_ready!(t.poll()); + assert_eq!(&*g, &100); + *g = 99; + } + { + let mut t = spawn(l.lock()); + let mut g = assert_ready!(t.poll()); + assert_eq!(&*g, &99); + *g = 98; + } + { + let mut t = spawn(l.lock()); + let g = assert_ready!(t.poll()); + assert_eq!(&*g, &98); + } +} + +#[test] +fn readiness() { + let l1 = Arc::new(Mutex::new(100)); + let l2 = Arc::clone(&l1); + let mut t1 = spawn(l1.lock()); + let mut t2 = spawn(l2.lock()); + + let g = assert_ready!(t1.poll()); + + // We can't now acquire the lease since it's already held in g + assert_pending!(t2.poll()); + + // But once g unlocks, we can acquire it + drop(g); + assert!(t2.is_woken()); + assert_ready!(t2.poll()); +} + +/* +#[test] +#[ignore] +fn lock() { + let mut lock = Mutex::new(false); + + let mut lock2 = lock.clone(); + std::thread::spawn(move || { + let l = lock2.lock(); + pin_mut!(l); + + let mut task = MockTask::new(); + let mut g = assert_ready!(task.poll(&mut l)); + std::thread::sleep(std::time::Duration::from_millis(500)); + *g = true; + drop(g); + }); + + std::thread::sleep(std::time::Duration::from_millis(50)); + let mut task = MockTask::new(); + let l = lock.lock(); + pin_mut!(l); + + assert_pending!(task.poll(&mut l)); + + std::thread::sleep(std::time::Duration::from_millis(500)); + assert!(task.is_woken()); + let result = assert_ready!(task.poll(&mut l)); + assert!(*result); +} +*/ + +#[tokio::test] +/// Ensure a mutex is unlocked if a future holding the lock +/// is aborted prematurely. +async fn aborted_future_1() { + let m1: Arc<Mutex<usize>> = Arc::new(Mutex::new(0)); + { + let m2 = m1.clone(); + // Try to lock mutex in a future that is aborted prematurely + timeout(Duration::from_millis(1u64), async move { + let mut iv = interval(Duration::from_millis(1000)); + m2.lock().await; + iv.tick().await; + iv.tick().await; + }) + .await + .unwrap_err(); + } + // This should succeed as there is no lock left for the mutex. + timeout(Duration::from_millis(1u64), async move { + m1.lock().await; + }) + .await + .expect("Mutex is locked"); +} + +#[tokio::test] +/// This test is similar to `aborted_future_1` but this time the +/// aborted future is waiting for the lock. +async fn aborted_future_2() { + let m1: Arc<Mutex<usize>> = Arc::new(Mutex::new(0)); + { + // Lock mutex + let _lock = m1.lock().await; + { + let m2 = m1.clone(); + // Try to lock mutex in a future that is aborted prematurely + timeout(Duration::from_millis(1u64), async move { + m2.lock().await; + }) + .await + .unwrap_err(); + } + } + // This should succeed as there is no lock left for the mutex. + timeout(Duration::from_millis(1u64), async move { + m1.lock().await; + }) + .await + .expect("Mutex is locked"); +} + +#[test] +fn try_lock() { + let m: Mutex<usize> = Mutex::new(0); + { + let g1 = m.try_lock(); + assert_eq!(g1.is_ok(), true); + let g2 = m.try_lock(); + assert_eq!(g2.is_ok(), false); + } + let g3 = m.try_lock(); + assert_eq!(g3.is_ok(), true); +} + +#[tokio::test] +async fn debug_format() { + let s = "debug"; + let m = Mutex::new(s.to_string()); + assert_eq!(format!("{:?}", s), format!("{:?}", m.lock().await)); +} diff --git a/third_party/rust/tokio/tests/sync_notify.rs b/third_party/rust/tokio/tests/sync_notify.rs new file mode 100644 index 0000000000..be39ce32df --- /dev/null +++ b/third_party/rust/tokio/tests/sync_notify.rs @@ -0,0 +1,102 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::sync::Notify; +use tokio_test::task::spawn; +use tokio_test::*; + +trait AssertSend: Send + Sync {} +impl AssertSend for Notify {} + +#[test] +fn notify_notified_one() { + let notify = Notify::new(); + let mut notified = spawn(async { notify.notified().await }); + + notify.notify(); + assert_ready!(notified.poll()); +} + +#[test] +fn notified_one_notify() { + let notify = Notify::new(); + let mut notified = spawn(async { notify.notified().await }); + + assert_pending!(notified.poll()); + + notify.notify(); + assert!(notified.is_woken()); + assert_ready!(notified.poll()); +} + +#[test] +fn notified_multi_notify() { + let notify = Notify::new(); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); + + assert_pending!(notified1.poll()); + assert_pending!(notified2.poll()); + + notify.notify(); + assert!(notified1.is_woken()); + assert!(!notified2.is_woken()); + + assert_ready!(notified1.poll()); + assert_pending!(notified2.poll()); +} + +#[test] +fn notify_notified_multi() { + let notify = Notify::new(); + + notify.notify(); + + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); + + assert_ready!(notified1.poll()); + assert_pending!(notified2.poll()); + + notify.notify(); + + assert!(notified2.is_woken()); + assert_ready!(notified2.poll()); +} + +#[test] +fn notified_drop_notified_notify() { + let notify = Notify::new(); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); + + assert_pending!(notified1.poll()); + + drop(notified1); + + assert_pending!(notified2.poll()); + + notify.notify(); + assert!(notified2.is_woken()); + assert_ready!(notified2.poll()); +} + +#[test] +fn notified_multi_notify_drop_one() { + let notify = Notify::new(); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); + + assert_pending!(notified1.poll()); + assert_pending!(notified2.poll()); + + notify.notify(); + + assert!(notified1.is_woken()); + assert!(!notified2.is_woken()); + + drop(notified1); + + assert!(notified2.is_woken()); + assert_ready!(notified2.poll()); +} diff --git a/third_party/rust/tokio/tests/sync_oneshot.rs b/third_party/rust/tokio/tests/sync_oneshot.rs new file mode 100644 index 0000000000..13e526d48e --- /dev/null +++ b/third_party/rust/tokio/tests/sync_oneshot.rs @@ -0,0 +1,234 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::sync::oneshot; +use tokio_test::*; + +use std::future::Future; +use std::pin::Pin; + +trait AssertSend: Send {} +impl AssertSend for oneshot::Sender<i32> {} +impl AssertSend for oneshot::Receiver<i32> {} + +#[test] +fn send_recv() { + let (tx, rx) = oneshot::channel(); + let mut rx = task::spawn(rx); + + assert_pending!(rx.poll()); + + assert_ok!(tx.send(1)); + + assert!(rx.is_woken()); + + let val = assert_ready_ok!(rx.poll()); + assert_eq!(val, 1); +} + +#[tokio::test] +async fn async_send_recv() { + let (tx, rx) = oneshot::channel(); + + assert_ok!(tx.send(1)); + assert_eq!(1, assert_ok!(rx.await)); +} + +#[test] +fn close_tx() { + let (tx, rx) = oneshot::channel::<i32>(); + let mut rx = task::spawn(rx); + + assert_pending!(rx.poll()); + + drop(tx); + + assert!(rx.is_woken()); + assert_ready_err!(rx.poll()); +} + +#[test] +fn close_rx() { + // First, without checking poll_closed() + // + let (tx, _) = oneshot::channel(); + + assert_err!(tx.send(1)); + + // Second, via poll_closed(); + + let (tx, rx) = oneshot::channel(); + let mut tx = task::spawn(tx); + + assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); + + drop(rx); + + assert!(tx.is_woken()); + assert!(tx.is_closed()); + assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); + + assert_err!(tx.into_inner().send(1)); +} + +#[tokio::test] +async fn async_rx_closed() { + let (mut tx, rx) = oneshot::channel::<()>(); + + tokio::spawn(async move { + drop(rx); + }); + + tx.closed().await; +} + +#[test] +fn explicit_close_poll() { + // First, with message sent + let (tx, rx) = oneshot::channel(); + let mut rx = task::spawn(rx); + + assert_ok!(tx.send(1)); + + rx.close(); + + let value = assert_ready_ok!(rx.poll()); + assert_eq!(value, 1); + + // Second, without the message sent + let (tx, rx) = oneshot::channel::<i32>(); + let mut tx = task::spawn(tx); + let mut rx = task::spawn(rx); + + assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); + + rx.close(); + + assert!(tx.is_woken()); + assert!(tx.is_closed()); + assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); + + assert_err!(tx.into_inner().send(1)); + assert_ready_err!(rx.poll()); + + // Again, but without sending the value this time + let (tx, rx) = oneshot::channel::<i32>(); + let mut tx = task::spawn(tx); + let mut rx = task::spawn(rx); + + assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); + + rx.close(); + + assert!(tx.is_woken()); + assert!(tx.is_closed()); + assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); + + assert_ready_err!(rx.poll()); +} + +#[test] +fn explicit_close_try_recv() { + // First, with message sent + let (tx, mut rx) = oneshot::channel(); + + assert_ok!(tx.send(1)); + + rx.close(); + + let val = assert_ok!(rx.try_recv()); + assert_eq!(1, val); + + // Second, without the message sent + let (tx, mut rx) = oneshot::channel::<i32>(); + let mut tx = task::spawn(tx); + + assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); + + rx.close(); + + assert!(tx.is_woken()); + assert!(tx.is_closed()); + assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); + + assert_err!(rx.try_recv()); +} + +#[test] +#[should_panic] +fn close_try_recv_poll() { + let (_tx, rx) = oneshot::channel::<i32>(); + let mut rx = task::spawn(rx); + + rx.close(); + + assert_err!(rx.try_recv()); + + let _ = rx.poll(); +} + +#[test] +fn drops_tasks() { + let (mut tx, mut rx) = oneshot::channel::<i32>(); + let mut tx_task = task::spawn(()); + let mut rx_task = task::spawn(()); + + assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx))); + assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx))); + + drop(tx); + drop(rx); + + assert_eq!(1, tx_task.waker_ref_count()); + assert_eq!(1, rx_task.waker_ref_count()); +} + +#[test] +fn receiver_changes_task() { + let (tx, mut rx) = oneshot::channel(); + + let mut task1 = task::spawn(()); + let mut task2 = task::spawn(()); + + assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx))); + + assert_eq!(2, task1.waker_ref_count()); + assert_eq!(1, task2.waker_ref_count()); + + assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx))); + + assert_eq!(1, task1.waker_ref_count()); + assert_eq!(2, task2.waker_ref_count()); + + assert_ok!(tx.send(1)); + + assert!(!task1.is_woken()); + assert!(task2.is_woken()); + + assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx))); +} + +#[test] +fn sender_changes_task() { + let (mut tx, rx) = oneshot::channel::<i32>(); + + let mut task1 = task::spawn(()); + let mut task2 = task::spawn(()); + + assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx))); + + assert_eq!(2, task1.waker_ref_count()); + assert_eq!(1, task2.waker_ref_count()); + + assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx))); + + assert_eq!(1, task1.waker_ref_count()); + assert_eq!(2, task2.waker_ref_count()); + + drop(rx); + + assert!(!task1.is_woken()); + assert!(task2.is_woken()); + + assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx))); +} diff --git a/third_party/rust/tokio/tests/sync_rwlock.rs b/third_party/rust/tokio/tests/sync_rwlock.rs new file mode 100644 index 0000000000..87010b658e --- /dev/null +++ b/third_party/rust/tokio/tests/sync_rwlock.rs @@ -0,0 +1,237 @@ +#![warn(rust_2018_idioms)] + +use std::sync::Arc; +use std::task::Poll; + +use futures::future::FutureExt; +use futures::stream; +use futures::stream::StreamExt; + +use tokio::sync::{Barrier, RwLock}; +use tokio_test::task::spawn; +use tokio_test::{assert_pending, assert_ready}; + +#[test] +fn into_inner() { + let rwlock = RwLock::new(42); + assert_eq!(rwlock.into_inner(), 42); +} + +// multiple reads should be Ready +#[test] +fn read_shared() { + let rwlock = RwLock::new(100); + + let mut t1 = spawn(rwlock.read()); + let _g1 = assert_ready!(t1.poll()); + let mut t2 = spawn(rwlock.read()); + assert_ready!(t2.poll()); +} + +// When there is an active shared owner, exclusive access should not be possible +#[test] +fn write_shared_pending() { + let rwlock = RwLock::new(100); + let mut t1 = spawn(rwlock.read()); + + let _g1 = assert_ready!(t1.poll()); + let mut t2 = spawn(rwlock.write()); + assert_pending!(t2.poll()); +} + +// When there is an active exclusive owner, subsequent exclusive access should not be possible +#[test] +fn read_exclusive_pending() { + let rwlock = RwLock::new(100); + let mut t1 = spawn(rwlock.write()); + + let _g1 = assert_ready!(t1.poll()); + let mut t2 = spawn(rwlock.read()); + assert_pending!(t2.poll()); +} + +// If the max shared access is reached and subsquent shared access is pending +// should be made available when one of the shared acesses is dropped +#[test] +fn exhaust_reading() { + let rwlock = RwLock::new(100); + let mut reads = Vec::new(); + loop { + let mut t = spawn(rwlock.read()); + match t.poll() { + Poll::Ready(guard) => reads.push(guard), + Poll::Pending => break, + } + } + + let mut t1 = spawn(rwlock.read()); + assert_pending!(t1.poll()); + let g2 = reads.pop().unwrap(); + drop(g2); + assert!(t1.is_woken()); + assert_ready!(t1.poll()); +} + +// When there is an active exclusive owner, subsequent exclusive access should not be possible +#[test] +fn write_exclusive_pending() { + let rwlock = RwLock::new(100); + let mut t1 = spawn(rwlock.write()); + + let _g1 = assert_ready!(t1.poll()); + let mut t2 = spawn(rwlock.write()); + assert_pending!(t2.poll()); +} + +// When there is an active shared owner, exclusive access should be possible after shared is dropped +#[test] +fn write_shared_drop() { + let rwlock = RwLock::new(100); + let mut t1 = spawn(rwlock.read()); + + let g1 = assert_ready!(t1.poll()); + let mut t2 = spawn(rwlock.write()); + assert_pending!(t2.poll()); + drop(g1); + assert!(t2.is_woken()); + assert_ready!(t2.poll()); +} + +// when there is an active shared owner, and exclusive access is triggered, +// subsequent shared access should not be possible as write gathers all the available semaphore permits +#[test] +fn write_read_shared_pending() { + let rwlock = RwLock::new(100); + let mut t1 = spawn(rwlock.read()); + let _g1 = assert_ready!(t1.poll()); + + let mut t2 = spawn(rwlock.read()); + assert_ready!(t2.poll()); + + let mut t3 = spawn(rwlock.write()); + assert_pending!(t3.poll()); + + let mut t4 = spawn(rwlock.read()); + assert_pending!(t4.poll()); +} + +// when there is an active shared owner, and exclusive access is triggered, +// reading should be possible after pending exclusive access is dropped +#[test] +fn write_read_shared_drop_pending() { + let rwlock = RwLock::new(100); + let mut t1 = spawn(rwlock.read()); + let _g1 = assert_ready!(t1.poll()); + + let mut t2 = spawn(rwlock.write()); + assert_pending!(t2.poll()); + + let mut t3 = spawn(rwlock.read()); + assert_pending!(t3.poll()); + drop(t2); + + assert!(t3.is_woken()); + assert_ready!(t3.poll()); +} + +// Acquire an RwLock nonexclusively by a single task +#[tokio::test] +async fn read_uncontested() { + let rwlock = RwLock::new(100); + let result = *rwlock.read().await; + + assert_eq!(result, 100); +} + +// Acquire an uncontested RwLock in exclusive mode +#[tokio::test] +async fn write_uncontested() { + let rwlock = RwLock::new(100); + let mut result = rwlock.write().await; + *result += 50; + assert_eq!(*result, 150); +} + +// RwLocks should be acquired in the order that their Futures are waited upon. +#[tokio::test] +async fn write_order() { + let rwlock = RwLock::<Vec<u32>>::new(vec![]); + let fut2 = rwlock.write().map(|mut guard| guard.push(2)); + let fut1 = rwlock.write().map(|mut guard| guard.push(1)); + fut1.await; + fut2.await; + + let g = rwlock.read().await; + assert_eq!(*g, vec![1, 2]); +} + +// A single RwLock is contested by tasks in multiple threads +#[tokio::test(threaded_scheduler)] +async fn multithreaded() { + let barrier = Arc::new(Barrier::new(5)); + let rwlock = Arc::new(RwLock::<u32>::new(0)); + let rwclone1 = rwlock.clone(); + let rwclone2 = rwlock.clone(); + let rwclone3 = rwlock.clone(); + let rwclone4 = rwlock.clone(); + + let b1 = barrier.clone(); + tokio::spawn(async move { + stream::iter(0..1000) + .for_each(move |_| { + let rwlock = rwclone1.clone(); + async move { + let mut guard = rwlock.write().await; + *guard += 2; + } + }) + .await; + b1.wait().await; + }); + + let b2 = barrier.clone(); + tokio::spawn(async move { + stream::iter(0..1000) + .for_each(move |_| { + let rwlock = rwclone2.clone(); + async move { + let mut guard = rwlock.write().await; + *guard += 3; + } + }) + .await; + b2.wait().await; + }); + + let b3 = barrier.clone(); + tokio::spawn(async move { + stream::iter(0..1000) + .for_each(move |_| { + let rwlock = rwclone3.clone(); + async move { + let mut guard = rwlock.write().await; + *guard += 5; + } + }) + .await; + b3.wait().await; + }); + + let b4 = barrier.clone(); + tokio::spawn(async move { + stream::iter(0..1000) + .for_each(move |_| { + let rwlock = rwclone4.clone(); + async move { + let mut guard = rwlock.write().await; + *guard += 7; + } + }) + .await; + b4.wait().await; + }); + + barrier.wait().await; + let g = rwlock.read().await; + assert_eq!(*g, 17_000); +} diff --git a/third_party/rust/tokio/tests/sync_semaphore.rs b/third_party/rust/tokio/tests/sync_semaphore.rs new file mode 100644 index 0000000000..1cb0c749db --- /dev/null +++ b/third_party/rust/tokio/tests/sync_semaphore.rs @@ -0,0 +1,81 @@ +#![cfg(feature = "full")] + +use std::sync::Arc; +use tokio::sync::Semaphore; + +#[test] +fn no_permits() { + // this should not panic + Semaphore::new(0); +} + +#[test] +fn try_acquire() { + let sem = Semaphore::new(1); + { + let p1 = sem.try_acquire(); + assert!(p1.is_ok()); + let p2 = sem.try_acquire(); + assert!(p2.is_err()); + } + let p3 = sem.try_acquire(); + assert!(p3.is_ok()); +} + +#[tokio::test] +async fn acquire() { + let sem = Arc::new(Semaphore::new(1)); + let p1 = sem.try_acquire().unwrap(); + let sem_clone = sem.clone(); + let j = tokio::spawn(async move { + let _p2 = sem_clone.acquire().await; + }); + drop(p1); + j.await.unwrap(); +} + +#[tokio::test] +async fn add_permits() { + let sem = Arc::new(Semaphore::new(0)); + let sem_clone = sem.clone(); + let j = tokio::spawn(async move { + let _p2 = sem_clone.acquire().await; + }); + sem.add_permits(1); + j.await.unwrap(); +} + +#[test] +fn forget() { + let sem = Arc::new(Semaphore::new(1)); + { + let p = sem.try_acquire().unwrap(); + assert_eq!(sem.available_permits(), 0); + p.forget(); + assert_eq!(sem.available_permits(), 0); + } + assert_eq!(sem.available_permits(), 0); + assert!(sem.try_acquire().is_err()); +} + +#[tokio::test] +async fn stresstest() { + let sem = Arc::new(Semaphore::new(5)); + let mut join_handles = Vec::new(); + for _ in 0..1000 { + let sem_clone = sem.clone(); + join_handles.push(tokio::spawn(async move { + let _p = sem_clone.acquire().await; + })); + } + for j in join_handles { + j.await.unwrap(); + } + // there should be exactly 5 semaphores available now + let _p1 = sem.try_acquire().unwrap(); + let _p2 = sem.try_acquire().unwrap(); + let _p3 = sem.try_acquire().unwrap(); + let _p4 = sem.try_acquire().unwrap(); + let _p5 = sem.try_acquire().unwrap(); + assert!(sem.try_acquire().is_err()); +} diff --git a/third_party/rust/tokio/tests/sync_watch.rs b/third_party/rust/tokio/tests/sync_watch.rs new file mode 100644 index 0000000000..2bc5bb2a85 --- /dev/null +++ b/third_party/rust/tokio/tests/sync_watch.rs @@ -0,0 +1,231 @@ +#![allow(clippy::cognitive_complexity)] +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::sync::watch; +use tokio_test::task::spawn; +use tokio_test::{assert_pending, assert_ready}; + +#[test] +fn single_rx_recv() { + let (tx, mut rx) = watch::channel("one"); + + { + let mut t = spawn(rx.recv()); + let v = assert_ready!(t.poll()).unwrap(); + assert_eq!(v, "one"); + } + + { + let mut t = spawn(rx.recv()); + + assert_pending!(t.poll()); + + tx.broadcast("two").unwrap(); + + assert!(t.is_woken()); + + let v = assert_ready!(t.poll()).unwrap(); + assert_eq!(v, "two"); + } + + { + let mut t = spawn(rx.recv()); + + assert_pending!(t.poll()); + + drop(tx); + + let res = assert_ready!(t.poll()); + assert!(res.is_none()); + } +} + +#[test] +fn multi_rx() { + let (tx, mut rx1) = watch::channel("one"); + let mut rx2 = rx1.clone(); + + { + let mut t1 = spawn(rx1.recv()); + let mut t2 = spawn(rx2.recv()); + + let res = assert_ready!(t1.poll()); + assert_eq!(res.unwrap(), "one"); + + let res = assert_ready!(t2.poll()); + assert_eq!(res.unwrap(), "one"); + } + + let mut t2 = spawn(rx2.recv()); + + { + let mut t1 = spawn(rx1.recv()); + + assert_pending!(t1.poll()); + assert_pending!(t2.poll()); + + tx.broadcast("two").unwrap(); + + assert!(t1.is_woken()); + assert!(t2.is_woken()); + + let res = assert_ready!(t1.poll()); + assert_eq!(res.unwrap(), "two"); + } + + { + let mut t1 = spawn(rx1.recv()); + + assert_pending!(t1.poll()); + + tx.broadcast("three").unwrap(); + + assert!(t1.is_woken()); + assert!(t2.is_woken()); + + let res = assert_ready!(t1.poll()); + assert_eq!(res.unwrap(), "three"); + + let res = assert_ready!(t2.poll()); + assert_eq!(res.unwrap(), "three"); + } + + drop(t2); + + { + let mut t1 = spawn(rx1.recv()); + let mut t2 = spawn(rx2.recv()); + + assert_pending!(t1.poll()); + assert_pending!(t2.poll()); + + tx.broadcast("four").unwrap(); + + let res = assert_ready!(t1.poll()); + assert_eq!(res.unwrap(), "four"); + drop(t1); + + let mut t1 = spawn(rx1.recv()); + assert_pending!(t1.poll()); + + drop(tx); + + assert!(t1.is_woken()); + let res = assert_ready!(t1.poll()); + assert!(res.is_none()); + + let res = assert_ready!(t2.poll()); + assert_eq!(res.unwrap(), "four"); + + drop(t2); + let mut t2 = spawn(rx2.recv()); + let res = assert_ready!(t2.poll()); + assert!(res.is_none()); + } +} + +#[test] +fn rx_observes_final_value() { + // Initial value + + let (tx, mut rx) = watch::channel("one"); + drop(tx); + + { + let mut t1 = spawn(rx.recv()); + let res = assert_ready!(t1.poll()); + assert_eq!(res.unwrap(), "one"); + } + + { + let mut t1 = spawn(rx.recv()); + let res = assert_ready!(t1.poll()); + assert!(res.is_none()); + } + + // Sending a value + + let (tx, mut rx) = watch::channel("one"); + + tx.broadcast("two").unwrap(); + + { + let mut t1 = spawn(rx.recv()); + let res = assert_ready!(t1.poll()); + assert_eq!(res.unwrap(), "two"); + } + + { + let mut t1 = spawn(rx.recv()); + assert_pending!(t1.poll()); + + tx.broadcast("three").unwrap(); + drop(tx); + + assert!(t1.is_woken()); + + let res = assert_ready!(t1.poll()); + assert_eq!(res.unwrap(), "three"); + } + + { + let mut t1 = spawn(rx.recv()); + let res = assert_ready!(t1.poll()); + assert!(res.is_none()); + } +} + +#[test] +fn poll_close() { + let (mut tx, rx) = watch::channel("one"); + + { + let mut t = spawn(tx.closed()); + assert_pending!(t.poll()); + + drop(rx); + + assert!(t.is_woken()); + assert_ready!(t.poll()); + } + + assert!(tx.broadcast("two").is_err()); +} + +#[test] +fn stream_impl() { + use tokio::stream::StreamExt; + + let (tx, mut rx) = watch::channel("one"); + + { + let mut t = spawn(rx.next()); + let v = assert_ready!(t.poll()).unwrap(); + assert_eq!(v, "one"); + } + + { + let mut t = spawn(rx.next()); + + assert_pending!(t.poll()); + + tx.broadcast("two").unwrap(); + + assert!(t.is_woken()); + + let v = assert_ready!(t.poll()).unwrap(); + assert_eq!(v, "two"); + } + + { + let mut t = spawn(rx.next()); + + assert_pending!(t.poll()); + + drop(tx); + + let res = assert_ready!(t.poll()); + assert!(res.is_none()); + } +} diff --git a/third_party/rust/tokio/tests/task_blocking.rs b/third_party/rust/tokio/tests/task_blocking.rs new file mode 100644 index 0000000000..4cd83d8a0d --- /dev/null +++ b/third_party/rust/tokio/tests/task_blocking.rs @@ -0,0 +1,29 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::task; +use tokio_test::assert_ok; + +use std::thread; +use std::time::Duration; + +#[tokio::test] +async fn basic_blocking() { + // Run a few times + for _ in 0..100 { + let out = assert_ok!( + tokio::spawn(async { + assert_ok!( + task::spawn_blocking(|| { + thread::sleep(Duration::from_millis(5)); + "hello" + }) + .await + ) + }) + .await + ); + + assert_eq!(out, "hello"); + } +} diff --git a/third_party/rust/tokio/tests/task_local.rs b/third_party/rust/tokio/tests/task_local.rs new file mode 100644 index 0000000000..7f508997f2 --- /dev/null +++ b/third_party/rust/tokio/tests/task_local.rs @@ -0,0 +1,31 @@ +tokio::task_local! { + static REQ_ID: u32; + pub static FOO: bool; +} + +#[tokio::test(threaded_scheduler)] +async fn local() { + let j1 = tokio::spawn(REQ_ID.scope(1, async move { + assert_eq!(REQ_ID.get(), 1); + assert_eq!(REQ_ID.get(), 1); + })); + + let j2 = tokio::spawn(REQ_ID.scope(2, async move { + REQ_ID.with(|v| { + assert_eq!(REQ_ID.get(), 2); + assert_eq!(*v, 2); + }); + + tokio::time::delay_for(std::time::Duration::from_millis(10)).await; + + assert_eq!(REQ_ID.get(), 2); + })); + + let j3 = tokio::spawn(FOO.scope(true, async move { + assert!(FOO.get()); + })); + + j1.await.unwrap(); + j2.await.unwrap(); + j3.await.unwrap(); +} diff --git a/third_party/rust/tokio/tests/task_local_set.rs b/third_party/rust/tokio/tests/task_local_set.rs new file mode 100644 index 0000000000..1a10fefa68 --- /dev/null +++ b/third_party/rust/tokio/tests/task_local_set.rs @@ -0,0 +1,466 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::runtime::{self, Runtime}; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::{self, LocalSet}; +use tokio::time; + +use std::cell::Cell; +use std::sync::atomic::Ordering::{self, SeqCst}; +use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::time::Duration; + +#[tokio::test(basic_scheduler)] +async fn local_basic_scheduler() { + LocalSet::new() + .run_until(async { + task::spawn_local(async {}).await.unwrap(); + }) + .await; +} + +#[tokio::test(threaded_scheduler)] +async fn local_threadpool() { + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + LocalSet::new() + .run_until(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + task::spawn_local(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + }) + .await + .unwrap(); + }) + .await; +} + +#[tokio::test(threaded_scheduler)] +async fn localset_future_threadpool() { + thread_local! { + static ON_LOCAL_THREAD: Cell<bool> = Cell::new(false); + } + + ON_LOCAL_THREAD.with(|cell| cell.set(true)); + + let local = LocalSet::new(); + local.spawn_local(async move { + assert!(ON_LOCAL_THREAD.with(|cell| cell.get())); + }); + local.await; +} + +#[tokio::test(threaded_scheduler)] +async fn localset_future_timers() { + static RAN1: AtomicBool = AtomicBool::new(false); + static RAN2: AtomicBool = AtomicBool::new(false); + + let local = LocalSet::new(); + local.spawn_local(async move { + time::delay_for(Duration::from_millis(10)).await; + RAN1.store(true, Ordering::SeqCst); + }); + local.spawn_local(async move { + time::delay_for(Duration::from_millis(20)).await; + RAN2.store(true, Ordering::SeqCst); + }); + local.await; + assert!(RAN1.load(Ordering::SeqCst)); + assert!(RAN2.load(Ordering::SeqCst)); +} + +#[tokio::test] +async fn localset_future_drives_all_local_futs() { + static RAN1: AtomicBool = AtomicBool::new(false); + static RAN2: AtomicBool = AtomicBool::new(false); + static RAN3: AtomicBool = AtomicBool::new(false); + + let local = LocalSet::new(); + local.spawn_local(async move { + task::spawn_local(async { + task::yield_now().await; + RAN3.store(true, Ordering::SeqCst); + }); + task::yield_now().await; + RAN1.store(true, Ordering::SeqCst); + }); + local.spawn_local(async move { + task::yield_now().await; + RAN2.store(true, Ordering::SeqCst); + }); + local.await; + assert!(RAN1.load(Ordering::SeqCst)); + assert!(RAN2.load(Ordering::SeqCst)); + assert!(RAN3.load(Ordering::SeqCst)); +} + +#[tokio::test(threaded_scheduler)] +async fn local_threadpool_timer() { + // This test ensures that runtime services like the timer are properly + // set for the local task set. + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + LocalSet::new() + .run_until(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + let join = task::spawn_local(async move { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + time::delay_for(Duration::from_millis(10)).await; + assert!(ON_RT_THREAD.with(|cell| cell.get())); + }); + join.await.unwrap(); + }) + .await; +} + +#[test] +// This will panic, since the thread that calls `block_on` cannot use +// in-place blocking inside of `block_on`. +#[should_panic] +fn local_threadpool_blocking_in_place() { + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + let mut rt = runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap(); + LocalSet::new().block_on(&mut rt, async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + let join = task::spawn_local(async move { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + task::block_in_place(|| {}); + assert!(ON_RT_THREAD.with(|cell| cell.get())); + }); + join.await.unwrap(); + }); +} + +#[tokio::test(threaded_scheduler)] +async fn local_threadpool_blocking_run() { + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + LocalSet::new() + .run_until(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + let join = task::spawn_local(async move { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + task::spawn_blocking(|| { + assert!( + !ON_RT_THREAD.with(|cell| cell.get()), + "blocking must not run on the local task set's thread" + ); + }) + .await + .unwrap(); + assert!(ON_RT_THREAD.with(|cell| cell.get())); + }); + join.await.unwrap(); + }) + .await; +} + +#[tokio::test(threaded_scheduler)] +async fn all_spawns_are_local() { + use futures::future; + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + LocalSet::new() + .run_until(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + let handles = (0..128) + .map(|_| { + task::spawn_local(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + }) + }) + .collect::<Vec<_>>(); + for joined in future::join_all(handles).await { + joined.unwrap(); + } + }) + .await; +} + +#[tokio::test(threaded_scheduler)] +async fn nested_spawn_is_local() { + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + LocalSet::new() + .run_until(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + task::spawn_local(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + task::spawn_local(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + task::spawn_local(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + task::spawn_local(async { + assert!(ON_RT_THREAD.with(|cell| cell.get())); + }) + .await + .unwrap(); + }) + .await + .unwrap(); + }) + .await + .unwrap(); + }) + .await + .unwrap(); + }) + .await; +} + +#[test] +fn join_local_future_elsewhere() { + thread_local! { + static ON_RT_THREAD: Cell<bool> = Cell::new(false); + } + + ON_RT_THREAD.with(|cell| cell.set(true)); + + let mut rt = runtime::Builder::new() + .threaded_scheduler() + .build() + .unwrap(); + let local = LocalSet::new(); + local.block_on(&mut rt, async move { + let (tx, rx) = oneshot::channel(); + let join = task::spawn_local(async move { + println!("hello world running..."); + assert!( + ON_RT_THREAD.with(|cell| cell.get()), + "local task must run on local thread, no matter where it is awaited" + ); + rx.await.unwrap(); + + println!("hello world task done"); + "hello world" + }); + let join2 = task::spawn(async move { + assert!( + !ON_RT_THREAD.with(|cell| cell.get()), + "spawned task should be on a worker" + ); + + tx.send(()).expect("task shouldn't have ended yet"); + println!("waking up hello world..."); + + join.await.expect("task should complete successfully"); + + println!("hello world task joined"); + }); + join2.await.unwrap() + }); +} + +#[test] +fn drop_cancels_tasks() { + use std::rc::Rc; + + // This test reproduces issue #1842 + let mut rt = rt(); + let rc1 = Rc::new(()); + let rc2 = rc1.clone(); + + let (started_tx, started_rx) = oneshot::channel(); + + let local = LocalSet::new(); + local.spawn_local(async move { + // Move this in + let _rc2 = rc2; + + started_tx.send(()).unwrap(); + loop { + time::delay_for(Duration::from_secs(3600)).await; + } + }); + + local.block_on(&mut rt, async { + started_rx.await.unwrap(); + }); + drop(local); + drop(rt); + + assert_eq!(1, Rc::strong_count(&rc1)); +} + +#[test] +fn drop_cancels_remote_tasks() { + // This test reproduces issue #1885. + use std::sync::mpsc::RecvTimeoutError; + + let (done_tx, done_rx) = std::sync::mpsc::channel(); + let thread = std::thread::spawn(move || { + let (tx, mut rx) = mpsc::channel::<()>(1024); + + let mut rt = rt(); + + let local = LocalSet::new(); + local.spawn_local(async move { while let Some(_) = rx.recv().await {} }); + local.block_on(&mut rt, async { + time::delay_for(Duration::from_millis(1)).await; + }); + + drop(tx); + + // This enters an infinite loop if the remote notified tasks are not + // properly cancelled. + drop(local); + + // Send a message on the channel so that the test thread can + // determine if we have entered an infinite loop: + done_tx.send(()).unwrap(); + }); + + // Since the failure mode of this test is an infinite loop, rather than + // something we can easily make assertions about, we'll run it in a + // thread. When the test thread finishes, it will send a message on a + // channel to this thread. We'll wait for that message with a fairly + // generous timeout, and if we don't recieve it, we assume the test + // thread has hung. + // + // Note that it should definitely complete in under a minute, but just + // in case CI is slow, we'll give it a long timeout. + match done_rx.recv_timeout(Duration::from_secs(60)) { + Err(RecvTimeoutError::Timeout) => panic!( + "test did not complete within 60 seconds, \ + we have (probably) entered an infinite loop!" + ), + // Did the test thread panic? We'll find out for sure when we `join` + // with it. + Err(RecvTimeoutError::Disconnected) => { + println!("done_rx dropped, did the test thread panic?"); + } + // Test completed successfully! + Ok(()) => {} + } + + thread.join().expect("test thread should not panic!") +} + +#[tokio::test] +async fn local_tasks_are_polled_after_tick() { + // Reproduces issues #1899 and #1900 + + static RX1: AtomicUsize = AtomicUsize::new(0); + static RX2: AtomicUsize = AtomicUsize::new(0); + static EXPECTED: usize = 500; + + let (tx, mut rx) = mpsc::unbounded_channel(); + + let local = LocalSet::new(); + + local + .run_until(async { + let task2 = task::spawn(async move { + // Wait a bit + time::delay_for(Duration::from_millis(100)).await; + + let mut oneshots = Vec::with_capacity(EXPECTED); + + // Send values + for _ in 0..EXPECTED { + let (oneshot_tx, oneshot_rx) = oneshot::channel(); + oneshots.push(oneshot_tx); + tx.send(oneshot_rx).unwrap(); + } + + time::delay_for(Duration::from_millis(100)).await; + + for tx in oneshots.drain(..) { + tx.send(()).unwrap(); + } + + time::delay_for(Duration::from_millis(300)).await; + let rx1 = RX1.load(SeqCst); + let rx2 = RX2.load(SeqCst); + println!("EXPECT = {}; RX1 = {}; RX2 = {}", EXPECTED, rx1, rx2); + assert_eq!(EXPECTED, rx1); + assert_eq!(EXPECTED, rx2); + }); + + while let Some(oneshot) = rx.recv().await { + RX1.fetch_add(1, SeqCst); + + task::spawn_local(async move { + oneshot.await.unwrap(); + RX2.fetch_add(1, SeqCst); + }); + } + + task2.await.unwrap(); + }) + .await; +} + +#[tokio::test] +async fn acquire_mutex_in_drop() { + use futures::future::pending; + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let local = LocalSet::new(); + + local.spawn_local(async move { + let _ = rx2.await; + unreachable!(); + }); + + local.spawn_local(async move { + let _ = rx1.await; + tx2.send(()).unwrap(); + unreachable!(); + }); + + // Spawn a task that will never notify + local.spawn_local(async move { + pending::<()>().await; + tx1.send(()).unwrap(); + }); + + // Tick the loop + local + .run_until(async { + task::yield_now().await; + }) + .await; + + // Drop the LocalSet + drop(local); +} + +fn rt() -> Runtime { + tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap() +} diff --git a/third_party/rust/tokio/tests/tcp_accept.rs b/third_party/rust/tokio/tests/tcp_accept.rs new file mode 100644 index 0000000000..ff62fb96a2 --- /dev/null +++ b/third_party/rust/tokio/tests/tcp_accept.rs @@ -0,0 +1,99 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::{mpsc, oneshot}; +use tokio_test::assert_ok; + +use std::net::{IpAddr, SocketAddr}; + +macro_rules! test_accept { + ($(($ident:ident, $target:expr),)*) => { + $( + #[tokio::test] + async fn $ident() { + let mut listener = assert_ok!(TcpListener::bind($target).await); + let addr = listener.local_addr().unwrap(); + + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + let (socket, _) = assert_ok!(listener.accept().await); + assert_ok!(tx.send(socket)); + }); + + let cli = assert_ok!(TcpStream::connect(&addr).await); + let srv = assert_ok!(rx.await); + + assert_eq!(cli.local_addr().unwrap(), srv.peer_addr().unwrap()); + } + )* + } +} + +test_accept! { + (ip_str, "127.0.0.1:0"), + (host_str, "localhost:0"), + (socket_addr, "127.0.0.1:0".parse::<SocketAddr>().unwrap()), + (str_port_tuple, ("127.0.0.1", 0)), + (ip_port_tuple, ("127.0.0.1".parse::<IpAddr>().unwrap(), 0)), +} + +use std::pin::Pin; +use std::sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, +}; +use std::task::{Context, Poll}; +use tokio::stream::{Stream, StreamExt}; + +struct TrackPolls<S> { + npolls: Arc<AtomicUsize>, + s: S, +} + +impl<S> Stream for TrackPolls<S> +where + S: Stream, +{ + type Item = S::Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + // safety: we do not move s + let this = unsafe { self.get_unchecked_mut() }; + this.npolls.fetch_add(1, SeqCst); + // safety: we are pinned, and so is s + unsafe { Pin::new_unchecked(&mut this.s) }.poll_next(cx) + } +} + +#[tokio::test] +async fn no_extra_poll() { + let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = listener.local_addr().unwrap(); + + let (tx, rx) = oneshot::channel(); + let (accepted_tx, mut accepted_rx) = mpsc::unbounded_channel(); + + tokio::spawn(async move { + let mut incoming = TrackPolls { + npolls: Arc::new(AtomicUsize::new(0)), + s: listener.incoming(), + }; + assert_ok!(tx.send(Arc::clone(&incoming.npolls))); + while let Some(_) = incoming.next().await { + accepted_tx.send(()).unwrap(); + } + }); + + let npolls = assert_ok!(rx.await); + tokio::task::yield_now().await; + + // should have been polled exactly once: the initial poll + assert_eq!(npolls.load(SeqCst), 1); + + let _ = assert_ok!(TcpStream::connect(&addr).await); + accepted_rx.next().await.unwrap(); + + // should have been polled twice more: once to yield Some(), then once to yield Pending + assert_eq!(npolls.load(SeqCst), 1 + 2); +} diff --git a/third_party/rust/tokio/tests/tcp_connect.rs b/third_party/rust/tokio/tests/tcp_connect.rs new file mode 100644 index 0000000000..de1cead829 --- /dev/null +++ b/third_party/rust/tokio/tests/tcp_connect.rs @@ -0,0 +1,229 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::oneshot; +use tokio_test::assert_ok; + +use futures::join; + +#[tokio::test] +async fn connect_v4() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + assert!(addr.is_ipv4()); + + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + let (socket, addr) = assert_ok!(srv.accept().await); + assert_eq!(addr, assert_ok!(socket.peer_addr())); + assert_ok!(tx.send(socket)); + }); + + let mine = assert_ok!(TcpStream::connect(&addr).await); + let theirs = assert_ok!(rx.await); + + assert_eq!( + assert_ok!(mine.local_addr()), + assert_ok!(theirs.peer_addr()) + ); + assert_eq!( + assert_ok!(theirs.local_addr()), + assert_ok!(mine.peer_addr()) + ); +} + +#[tokio::test] +async fn connect_v6() { + let mut srv = assert_ok!(TcpListener::bind("[::1]:0").await); + let addr = assert_ok!(srv.local_addr()); + assert!(addr.is_ipv6()); + + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + let (socket, addr) = assert_ok!(srv.accept().await); + assert_eq!(addr, assert_ok!(socket.peer_addr())); + assert_ok!(tx.send(socket)); + }); + + let mine = assert_ok!(TcpStream::connect(&addr).await); + let theirs = assert_ok!(rx.await); + + assert_eq!( + assert_ok!(mine.local_addr()), + assert_ok!(theirs.peer_addr()) + ); + assert_eq!( + assert_ok!(theirs.local_addr()), + assert_ok!(mine.peer_addr()) + ); +} + +#[tokio::test] +async fn connect_addr_ip_string() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = format!("127.0.0.1:{}", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(addr).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_ip_str_slice() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = format!("127.0.0.1:{}", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(&addr[..]).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_host_string() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = format!("localhost:{}", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(addr).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_ip_port_tuple() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = (addr.ip(), addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(&addr).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_ip_str_port_tuple() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = ("127.0.0.1", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(&addr).await); + }; + + join!(server, client); +} + +#[tokio::test] +async fn connect_addr_host_str_port_tuple() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + let addr = ("localhost", addr.port()); + + let server = async { + assert_ok!(srv.accept().await); + }; + + let client = async { + assert_ok!(TcpStream::connect(&addr).await); + }; + + join!(server, client); +} + +/* + * TODO: bring this back once TCP exposes HUP again + * +#[cfg(target_os = "linux")] +mod linux { + use tokio::net::{TcpListener, TcpStream}; + use tokio::prelude::*; + use tokio_test::assert_ok; + + use mio::unix::UnixReady; + + use futures_util::future::poll_fn; + use std::io::Write; + use std::time::Duration; + use std::{net, thread}; + + #[tokio::test] + fn poll_hup() { + let addr = assert_ok!("127.0.0.1:0".parse()); + let mut srv = assert_ok!(TcpListener::bind(&addr)); + let addr = assert_ok!(srv.local_addr()); + + tokio::spawn(async move { + let (mut client, _) = assert_ok!(srv.accept().await); + assert_ok!(client.set_linger(Some(Duration::from_millis(0)))); + assert_ok!(client.write_all(b"hello world").await); + + // TODO: Drop? + }); + + /* + let t = thread::spawn(move || { + let mut client = assert_ok!(srv.accept()).0; + client.set_linger(Some(Duration::from_millis(0))).unwrap(); + client.write(b"hello world").unwrap(); + thread::sleep(Duration::from_millis(200)); + }); + */ + + let mut stream = assert_ok!(TcpStream::connect(&addr).await); + + // Poll for HUP before reading. + future::poll_fn(|| stream.poll_read_ready(UnixReady::hup().into())) + .wait() + .unwrap(); + + // Same for write half + future::poll_fn(|| stream.poll_write_ready()) + .wait() + .unwrap(); + + let mut buf = vec![0; 11]; + + // Read the data + future::poll_fn(|| stream.poll_read(&mut buf)) + .wait() + .unwrap(); + + assert_eq!(b"hello world", &buf[..]); + + t.join().unwrap(); + } +} +*/ diff --git a/third_party/rust/tokio/tests/tcp_echo.rs b/third_party/rust/tokio/tests/tcp_echo.rs new file mode 100644 index 0000000000..1feba63ee7 --- /dev/null +++ b/third_party/rust/tokio/tests/tcp_echo.rs @@ -0,0 +1,42 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; +use tokio::sync::oneshot; +use tokio_test::assert_ok; + +#[tokio::test] +async fn echo_server() { + const ITER: usize = 1024; + + let (tx, rx) = oneshot::channel(); + + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + + let msg = "foo bar baz"; + tokio::spawn(async move { + let mut stream = assert_ok!(TcpStream::connect(&addr).await); + + for _ in 0..ITER { + // write + assert_ok!(stream.write_all(msg.as_bytes()).await); + + // read + let mut buf = [0; 11]; + assert_ok!(stream.read_exact(&mut buf).await); + assert_eq!(&buf[..], msg.as_bytes()); + } + + assert_ok!(tx.send(())); + }); + + let (mut stream, _) = assert_ok!(srv.accept().await); + let (mut rd, mut wr) = stream.split(); + + let n = assert_ok!(io::copy(&mut rd, &mut wr).await); + assert_eq!(n, (ITER * msg.len()) as u64); + + assert_ok!(rx.await); +} diff --git a/third_party/rust/tokio/tests/tcp_peek.rs b/third_party/rust/tokio/tests/tcp_peek.rs new file mode 100644 index 0000000000..aecc0ac19c --- /dev/null +++ b/third_party/rust/tokio/tests/tcp_peek.rs @@ -0,0 +1,29 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::AsyncReadExt; +use tokio::net::TcpStream; + +use tokio_test::assert_ok; + +use std::thread; +use std::{convert::TryInto, io::Write, net}; + +#[tokio::test] +async fn peek() { + let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + let t = thread::spawn(move || assert_ok!(listener.accept()).0); + + let left = net::TcpStream::connect(&addr).unwrap(); + let mut right = t.join().unwrap(); + let _ = right.write(&[1, 2, 3, 4]).unwrap(); + + let mut left: TcpStream = left.try_into().unwrap(); + let mut buf = [0u8; 16]; + let n = assert_ok!(left.peek(&mut buf).await); + assert_eq!([1, 2, 3, 4], buf[..n]); + + let n = assert_ok!(left.read(&mut buf).await); + assert_eq!([1, 2, 3, 4], buf[..n]); +} diff --git a/third_party/rust/tokio/tests/tcp_shutdown.rs b/third_party/rust/tokio/tests/tcp_shutdown.rs new file mode 100644 index 0000000000..bd43e143b8 --- /dev/null +++ b/third_party/rust/tokio/tests/tcp_shutdown.rs @@ -0,0 +1,29 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::{self, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; +use tokio_test::assert_ok; + +#[tokio::test] +async fn shutdown() { + let mut srv = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(srv.local_addr()); + + tokio::spawn(async move { + let mut stream = assert_ok!(TcpStream::connect(&addr).await); + + assert_ok!(AsyncWriteExt::shutdown(&mut stream).await); + + let mut buf = [0; 1]; + let n = assert_ok!(stream.read(&mut buf).await); + assert_eq!(n, 0); + }); + + let (mut stream, _) = assert_ok!(srv.accept().await); + let (mut rd, mut wr) = stream.split(); + + let n = assert_ok!(io::copy(&mut rd, &mut wr).await); + assert_eq!(n, 0); +} diff --git a/third_party/rust/tokio/tests/tcp_split.rs b/third_party/rust/tokio/tests/tcp_split.rs new file mode 100644 index 0000000000..42f797708c --- /dev/null +++ b/third_party/rust/tokio/tests/tcp_split.rs @@ -0,0 +1,42 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::io::Result; +use std::io::{Read, Write}; +use std::{net, thread}; + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; + +#[tokio::test] +async fn split() -> Result<()> { + const MSG: &[u8] = b"split"; + + let listener = net::TcpListener::bind("127.0.0.1:0")?; + let addr = listener.local_addr()?; + + let handle = thread::spawn(move || { + let (mut stream, _) = listener.accept().unwrap(); + stream.write(MSG).unwrap(); + + let mut read_buf = [0u8; 32]; + let read_len = stream.read(&mut read_buf).unwrap(); + assert_eq!(&read_buf[..read_len], MSG); + }); + + let mut stream = TcpStream::connect(&addr).await?; + let (mut read_half, mut write_half) = stream.split(); + + let mut read_buf = [0u8; 32]; + let peek_len1 = read_half.peek(&mut read_buf[..]).await?; + let peek_len2 = read_half.peek(&mut read_buf[..]).await?; + assert_eq!(peek_len1, peek_len2); + + let read_len = read_half.read(&mut read_buf[..]).await?; + assert_eq!(peek_len1, read_len); + assert_eq!(&read_buf[..read_len], MSG); + + write_half.write(MSG).await?; + handle.join().unwrap(); + Ok(()) +} diff --git a/third_party/rust/tokio/tests/test_clock.rs b/third_party/rust/tokio/tests/test_clock.rs new file mode 100644 index 0000000000..891636fdb2 --- /dev/null +++ b/third_party/rust/tokio/tests/test_clock.rs @@ -0,0 +1,50 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::time::{self, Duration, Instant}; + +#[tokio::test] +async fn resume_lets_time_move_forward_instead_of_resetting_it() { + let start = Instant::now(); + time::pause(); + time::advance(Duration::from_secs(10)).await; + let advanced_by_ten_secs = Instant::now(); + assert!(advanced_by_ten_secs - start > Duration::from_secs(10)); + assert!(advanced_by_ten_secs - start < Duration::from_secs(11)); + time::resume(); + assert!(advanced_by_ten_secs < Instant::now()); + assert!(Instant::now() - advanced_by_ten_secs < Duration::from_secs(1)); +} + +#[tokio::test] +async fn can_pause_after_resume() { + let start = Instant::now(); + time::pause(); + time::advance(Duration::from_secs(10)).await; + time::resume(); + time::pause(); + time::advance(Duration::from_secs(10)).await; + assert!(Instant::now() - start > Duration::from_secs(20)); + assert!(Instant::now() - start < Duration::from_secs(21)); +} + +#[tokio::test] +#[should_panic] +async fn freezing_time_while_frozen_panics() { + time::pause(); + time::pause(); +} + +#[tokio::test] +#[should_panic] +async fn advancing_time_when_time_is_not_frozen_panics() { + time::advance(Duration::from_secs(1)).await; +} + +#[tokio::test] +#[should_panic] +async fn resuming_time_when_not_frozen_panics() { + time::pause(); + time::resume(); + time::resume(); +} diff --git a/third_party/rust/tokio/tests/time_delay.rs b/third_party/rust/tokio/tests/time_delay.rs new file mode 100644 index 0000000000..e763ae03be --- /dev/null +++ b/third_party/rust/tokio/tests/time_delay.rs @@ -0,0 +1,176 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::time::{self, Duration, Instant}; +use tokio_test::{assert_pending, task}; + +macro_rules! assert_elapsed { + ($now:expr, $ms:expr) => {{ + let elapsed = $now.elapsed(); + let lower = ms($ms); + + // Handles ms rounding + assert!( + elapsed >= lower && elapsed <= lower + ms(1), + "actual = {:?}, expected = {:?}", + elapsed, + lower + ); + }}; +} + +#[tokio::test] +async fn immediate_delay() { + time::pause(); + + let now = Instant::now(); + + // Ready! + time::delay_until(now).await; + assert_elapsed!(now, 0); +} + +#[tokio::test] +async fn delayed_delay_level_0() { + time::pause(); + + for &i in &[1, 10, 60] { + let now = Instant::now(); + + time::delay_until(now + ms(i)).await; + + assert_elapsed!(now, i); + } +} + +#[tokio::test] +async fn sub_ms_delayed_delay() { + time::pause(); + + for _ in 0..5 { + let now = Instant::now(); + let deadline = now + ms(1) + Duration::new(0, 1); + + time::delay_until(deadline).await; + + assert_elapsed!(now, 1); + } +} + +#[tokio::test] +async fn delayed_delay_wrapping_level_0() { + time::pause(); + + time::delay_for(ms(5)).await; + + let now = Instant::now(); + time::delay_until(now + ms(60)).await; + + assert_elapsed!(now, 60); +} + +#[tokio::test] +async fn reset_future_delay_before_fire() { + time::pause(); + + let now = Instant::now(); + + let mut delay = task::spawn(time::delay_until(now + ms(100))); + assert_pending!(delay.poll()); + + let mut delay = delay.into_inner(); + + delay.reset(Instant::now() + ms(200)); + delay.await; + + assert_elapsed!(now, 200); +} + +#[tokio::test] +async fn reset_past_delay_before_turn() { + time::pause(); + + let now = Instant::now(); + + let mut delay = task::spawn(time::delay_until(now + ms(100))); + assert_pending!(delay.poll()); + + let mut delay = delay.into_inner(); + + delay.reset(now + ms(80)); + delay.await; + + assert_elapsed!(now, 80); +} + +#[tokio::test] +async fn reset_past_delay_before_fire() { + time::pause(); + + let now = Instant::now(); + + let mut delay = task::spawn(time::delay_until(now + ms(100))); + assert_pending!(delay.poll()); + + let mut delay = delay.into_inner(); + + time::delay_for(ms(10)).await; + + delay.reset(now + ms(80)); + delay.await; + + assert_elapsed!(now, 80); +} + +#[tokio::test] +async fn reset_future_delay_after_fire() { + time::pause(); + + let now = Instant::now(); + let mut delay = time::delay_until(now + ms(100)); + + (&mut delay).await; + assert_elapsed!(now, 100); + + delay.reset(now + ms(110)); + delay.await; + assert_elapsed!(now, 110); +} + +#[test] +#[should_panic] +fn creating_delay_outside_of_context() { + let now = Instant::now(); + + // This creates a delay outside of the context of a mock timer. This tests + // that it will panic. + let _fut = time::delay_until(now + ms(500)); +} + +#[should_panic] +#[tokio::test] +async fn greater_than_max() { + const YR_5: u64 = 5 * 365 * 24 * 60 * 60 * 1000; + + time::delay_until(Instant::now() + ms(YR_5)).await; +} + +const NUM_LEVELS: usize = 6; +const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; + +#[should_panic] +#[tokio::test] +async fn exactly_max() { + // TODO: this should not panic but `time::ms()` is acting up + time::delay_for(ms(MAX_DURATION)).await; +} + +#[tokio::test] +async fn no_out_of_bounds_close_to_max() { + time::pause(); + time::delay_for(ms(MAX_DURATION - 1)).await; +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} diff --git a/third_party/rust/tokio/tests/time_delay_queue.rs b/third_party/rust/tokio/tests/time_delay_queue.rs new file mode 100644 index 0000000000..214b9ebee6 --- /dev/null +++ b/third_party/rust/tokio/tests/time_delay_queue.rs @@ -0,0 +1,448 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::time::{self, delay_for, DelayQueue, Duration, Instant}; +use tokio_test::{assert_ok, assert_pending, assert_ready, task}; + +macro_rules! poll { + ($queue:ident) => { + $queue.enter(|cx, mut queue| queue.poll_expired(cx)) + }; +} + +macro_rules! assert_ready_ok { + ($e:expr) => {{ + assert_ok!(match assert_ready!($e) { + Some(v) => v, + None => panic!("None"), + }) + }}; +} + +#[tokio::test] +async fn single_immediate_delay() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + let _key = queue.insert_at("foo", Instant::now()); + + // Advance time by 1ms to handle thee rounding + delay_for(ms(1)).await; + + assert_ready_ok!(poll!(queue)); + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()) +} + +#[tokio::test] +async fn multi_immediate_delays() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let _k = queue.insert_at("1", Instant::now()); + let _k = queue.insert_at("2", Instant::now()); + let _k = queue.insert_at("3", Instant::now()); + + delay_for(ms(1)).await; + + let mut res = vec![]; + + while res.len() < 3 { + let entry = assert_ready_ok!(poll!(queue)); + res.push(entry.into_inner()); + } + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()); + + res.sort(); + + assert_eq!("1", res[0]); + assert_eq!("2", res[1]); + assert_eq!("3", res[2]); +} + +#[tokio::test] +async fn single_short_delay() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + let _key = queue.insert_at("foo", Instant::now() + ms(5)); + + assert_pending!(poll!(queue)); + + delay_for(ms(1)).await; + + assert!(!queue.is_woken()); + + delay_for(ms(5)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_ok!(poll!(queue)); + assert_eq!(*entry.get_ref(), "foo"); + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()); +} + +#[tokio::test] +async fn multi_delay_at_start() { + time::pause(); + + let long = 262_144 + 9 * 4096; + let delays = &[1000, 2, 234, long, 60, 10]; + + let mut queue = task::spawn(DelayQueue::new()); + + // Setup the delays + for &i in delays { + let _key = queue.insert_at(i, Instant::now() + ms(i)); + } + + assert_pending!(poll!(queue)); + assert!(!queue.is_woken()); + + for elapsed in 0..1200 { + delay_for(ms(1)).await; + let elapsed = elapsed + 1; + + if delays.contains(&elapsed) { + assert!(queue.is_woken()); + assert_ready!(poll!(queue)); + assert_pending!(poll!(queue)); + } else if queue.is_woken() { + let cascade = &[192, 960]; + assert!(cascade.contains(&elapsed), "elapsed={}", elapsed); + + assert_pending!(poll!(queue)); + } + } +} + +#[tokio::test] +async fn insert_in_past_fires_immediately() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + let now = Instant::now(); + + delay_for(ms(10)).await; + + queue.insert_at("foo", now); + + assert_ready!(poll!(queue)); +} + +#[tokio::test] +async fn remove_entry() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let key = queue.insert_at("foo", Instant::now() + ms(5)); + + assert_pending!(poll!(queue)); + + let entry = queue.remove(&key); + assert_eq!(entry.into_inner(), "foo"); + + delay_for(ms(10)).await; + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()); +} + +#[tokio::test] +async fn reset_entry() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + let key = queue.insert_at("foo", now + ms(5)); + + assert_pending!(poll!(queue)); + delay_for(ms(1)).await; + + queue.reset_at(&key, now + ms(10)); + + assert_pending!(poll!(queue)); + + delay_for(ms(7)).await; + + assert!(!queue.is_woken()); + + assert_pending!(poll!(queue)); + + delay_for(ms(3)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_ok!(poll!(queue)); + assert_eq!(*entry.get_ref(), "foo"); + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()) +} + +// Reproduces tokio-rs/tokio#849. +#[tokio::test] +async fn reset_much_later() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + delay_for(ms(1)).await; + + let key = queue.insert_at("foo", now + ms(200)); + assert_pending!(poll!(queue)); + + delay_for(ms(3)).await; + + queue.reset_at(&key, now + ms(5)); + + delay_for(ms(20)).await; + + assert!(queue.is_woken()); +} + +// Reproduces tokio-rs/tokio#849. +#[tokio::test] +async fn reset_twice() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + let now = Instant::now(); + + delay_for(ms(1)).await; + + let key = queue.insert_at("foo", now + ms(200)); + + assert_pending!(poll!(queue)); + + delay_for(ms(3)).await; + + queue.reset_at(&key, now + ms(50)); + + delay_for(ms(20)).await; + + queue.reset_at(&key, now + ms(40)); + + delay_for(ms(20)).await; + + assert!(queue.is_woken()); +} + +#[tokio::test] +async fn remove_expired_item() { + time::pause(); + + let mut queue = DelayQueue::new(); + + let now = Instant::now(); + + delay_for(ms(10)).await; + + let key = queue.insert_at("foo", now); + + let entry = queue.remove(&key); + assert_eq!(entry.into_inner(), "foo"); +} + +#[tokio::test] +async fn expires_before_last_insert() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + queue.insert_at("foo", now + ms(10_000)); + + // Delay should be set to 8.192s here. + assert_pending!(poll!(queue)); + + // Delay should be set to the delay of the new item here + queue.insert_at("bar", now + ms(600)); + + assert_pending!(poll!(queue)); + + delay_for(ms(600)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_ok!(poll!(queue)).into_inner(); + assert_eq!(entry, "bar"); +} + +#[tokio::test] +async fn multi_reset() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let one = queue.insert_at("one", now + ms(200)); + let two = queue.insert_at("two", now + ms(250)); + + assert_pending!(poll!(queue)); + + queue.reset_at(&one, now + ms(300)); + queue.reset_at(&two, now + ms(350)); + queue.reset_at(&one, now + ms(400)); + + delay_for(ms(310)).await; + + assert_pending!(poll!(queue)); + + delay_for(ms(50)).await; + + let entry = assert_ready_ok!(poll!(queue)); + assert_eq!(*entry.get_ref(), "two"); + + assert_pending!(poll!(queue)); + + delay_for(ms(50)).await; + + let entry = assert_ready_ok!(poll!(queue)); + assert_eq!(*entry.get_ref(), "one"); + + let entry = assert_ready!(poll!(queue)); + assert!(entry.is_none()) +} + +#[tokio::test] +async fn expire_first_key_when_reset_to_expire_earlier() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let one = queue.insert_at("one", now + ms(200)); + queue.insert_at("two", now + ms(250)); + + assert_pending!(poll!(queue)); + + queue.reset_at(&one, now + ms(100)); + + delay_for(ms(100)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_ok!(poll!(queue)).into_inner(); + assert_eq!(entry, "one"); +} + +#[tokio::test] +async fn expire_second_key_when_reset_to_expire_earlier() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + queue.insert_at("one", now + ms(200)); + let two = queue.insert_at("two", now + ms(250)); + + assert_pending!(poll!(queue)); + + queue.reset_at(&two, now + ms(100)); + + delay_for(ms(100)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_ok!(poll!(queue)).into_inner(); + assert_eq!(entry, "two"); +} + +#[tokio::test] +async fn reset_first_expiring_item_to_expire_later() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let one = queue.insert_at("one", now + ms(200)); + let _two = queue.insert_at("two", now + ms(250)); + + assert_pending!(poll!(queue)); + + queue.reset_at(&one, now + ms(300)); + delay_for(ms(250)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_ok!(poll!(queue)).into_inner(); + assert_eq!(entry, "two"); +} + +#[tokio::test] +async fn insert_before_first_after_poll() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + let _one = queue.insert_at("one", now + ms(200)); + + assert_pending!(poll!(queue)); + + let _two = queue.insert_at("two", now + ms(100)); + + delay_for(ms(99)).await; + + assert!(!queue.is_woken()); + + delay_for(ms(1)).await; + + assert!(queue.is_woken()); + + let entry = assert_ready_ok!(poll!(queue)).into_inner(); + assert_eq!(entry, "two"); +} + +#[tokio::test] +async fn insert_after_ready_poll() { + time::pause(); + + let mut queue = task::spawn(DelayQueue::new()); + + let now = Instant::now(); + + queue.insert_at("1", now + ms(100)); + queue.insert_at("2", now + ms(100)); + queue.insert_at("3", now + ms(100)); + + assert_pending!(poll!(queue)); + + delay_for(ms(100)).await; + + assert!(queue.is_woken()); + + let mut res = vec![]; + + while res.len() < 3 { + let entry = assert_ready_ok!(poll!(queue)); + res.push(entry.into_inner()); + queue.insert_at("foo", now + ms(500)); + } + + res.sort(); + + assert_eq!("1", res[0]); + assert_eq!("2", res[1]); + assert_eq!("3", res[2]); +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} diff --git a/third_party/rust/tokio/tests/time_interval.rs b/third_party/rust/tokio/tests/time_interval.rs new file mode 100644 index 0000000000..1123681f49 --- /dev/null +++ b/third_party/rust/tokio/tests/time_interval.rs @@ -0,0 +1,66 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::time::{self, Duration, Instant}; +use tokio_test::{assert_pending, assert_ready_eq, task}; + +use std::task::Poll; + +#[tokio::test] +#[should_panic] +async fn interval_zero_duration() { + let _ = time::interval_at(Instant::now(), ms(0)); +} + +#[tokio::test] +async fn usage() { + time::pause(); + + let start = Instant::now(); + + // TODO: Skip this + time::advance(ms(1)).await; + + let mut i = task::spawn(time::interval_at(start, ms(300))); + + assert_ready_eq!(poll_next(&mut i), start); + assert_pending!(poll_next(&mut i)); + + time::advance(ms(100)).await; + assert_pending!(poll_next(&mut i)); + + time::advance(ms(200)).await; + assert_ready_eq!(poll_next(&mut i), start + ms(300)); + assert_pending!(poll_next(&mut i)); + + time::advance(ms(400)).await; + assert_ready_eq!(poll_next(&mut i), start + ms(600)); + assert_pending!(poll_next(&mut i)); + + time::advance(ms(500)).await; + assert_ready_eq!(poll_next(&mut i), start + ms(900)); + assert_ready_eq!(poll_next(&mut i), start + ms(1200)); + assert_pending!(poll_next(&mut i)); +} + +#[tokio::test] +async fn usage_stream() { + use tokio::stream::StreamExt; + + let start = Instant::now(); + let mut interval = time::interval(ms(10)); + + for _ in 0..3 { + interval.next().await.unwrap(); + } + + assert!(start.elapsed() > ms(20)); +} + +fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> { + interval.enter(|cx, mut interval| interval.poll_tick(cx)) +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} diff --git a/third_party/rust/tokio/tests/time_rt.rs b/third_party/rust/tokio/tests/time_rt.rs new file mode 100644 index 0000000000..b739f1b2f6 --- /dev/null +++ b/third_party/rust/tokio/tests/time_rt.rs @@ -0,0 +1,93 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::time::*; + +use std::sync::mpsc; + +#[test] +fn timer_with_threaded_runtime() { + use tokio::runtime::Runtime; + + let rt = Runtime::new().unwrap(); + let (tx, rx) = mpsc::channel(); + + rt.spawn(async move { + let when = Instant::now() + Duration::from_millis(100); + + delay_until(when).await; + assert!(Instant::now() >= when); + + tx.send(()).unwrap(); + }); + + rx.recv().unwrap(); +} + +#[test] +fn timer_with_basic_scheduler() { + use tokio::runtime::Builder; + + let mut rt = Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); + let (tx, rx) = mpsc::channel(); + + rt.block_on(async move { + let when = Instant::now() + Duration::from_millis(100); + + delay_until(when).await; + assert!(Instant::now() >= when); + + tx.send(()).unwrap(); + }); + + rx.recv().unwrap(); +} + +#[tokio::test] +async fn starving() { + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + struct Starve<T: Future<Output = ()> + Unpin>(T, u64); + + impl<T: Future<Output = ()> + Unpin> Future for Starve<T> { + type Output = u64; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<u64> { + if Pin::new(&mut self.0).poll(cx).is_ready() { + return Poll::Ready(self.1); + } + + self.1 += 1; + + cx.waker().wake_by_ref(); + + Poll::Pending + } + } + + let when = Instant::now() + Duration::from_millis(20); + let starve = Starve(delay_until(when), 0); + + starve.await; + assert!(Instant::now() >= when); +} + +#[tokio::test] +async fn timeout_value() { + use tokio::sync::oneshot; + + let (_tx, rx) = oneshot::channel::<()>(); + + let now = Instant::now(); + let dur = Duration::from_millis(20); + + let res = timeout(dur, rx).await; + assert!(res.is_err()); + assert!(Instant::now() >= now + dur); +} diff --git a/third_party/rust/tokio/tests/time_throttle.rs b/third_party/rust/tokio/tests/time_throttle.rs new file mode 100644 index 0000000000..7102d17343 --- /dev/null +++ b/third_party/rust/tokio/tests/time_throttle.rs @@ -0,0 +1,30 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::time::{self, throttle}; +use tokio_test::*; + +use std::time::Duration; + +#[tokio::test] +async fn usage() { + time::pause(); + + let mut stream = task::spawn(throttle( + Duration::from_millis(100), + futures::stream::repeat(()), + )); + + assert_ready!(stream.poll_next()); + assert_pending!(stream.poll_next()); + + time::advance(Duration::from_millis(90)).await; + + assert_pending!(stream.poll_next()); + + time::advance(Duration::from_millis(101)).await; + + assert!(stream.is_woken()); + + assert_ready!(stream.poll_next()); +} diff --git a/third_party/rust/tokio/tests/time_timeout.rs b/third_party/rust/tokio/tests/time_timeout.rs new file mode 100644 index 0000000000..4efcd8ca82 --- /dev/null +++ b/third_party/rust/tokio/tests/time_timeout.rs @@ -0,0 +1,110 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::sync::oneshot; +use tokio::time::{self, timeout, timeout_at, Instant}; +use tokio_test::*; + +use futures::future::pending; +use std::time::Duration; + +#[tokio::test] +async fn simultaneous_deadline_future_completion() { + // Create a future that is immediately ready + let mut fut = task::spawn(timeout_at(Instant::now(), async {})); + + // Ready! + assert_ready_ok!(fut.poll()); +} + +#[tokio::test] +async fn completed_future_past_deadline() { + // Wrap it with a deadline + let mut fut = task::spawn(timeout_at(Instant::now() - ms(1000), async {})); + + // Ready! + assert_ready_ok!(fut.poll()); +} + +#[tokio::test] +async fn future_and_deadline_in_future() { + time::pause(); + + // Not yet complete + let (tx, rx) = oneshot::channel(); + + // Wrap it with a deadline + let mut fut = task::spawn(timeout_at(Instant::now() + ms(100), rx)); + + assert_pending!(fut.poll()); + + // Turn the timer, it runs for the elapsed time + time::advance(ms(90)).await; + + assert_pending!(fut.poll()); + + // Complete the future + tx.send(()).unwrap(); + assert!(fut.is_woken()); + + assert_ready_ok!(fut.poll()).unwrap(); +} + +#[tokio::test] +async fn future_and_timeout_in_future() { + time::pause(); + + // Not yet complete + let (tx, rx) = oneshot::channel(); + + // Wrap it with a deadline + let mut fut = task::spawn(timeout(ms(100), rx)); + + // Ready! + assert_pending!(fut.poll()); + + // Turn the timer, it runs for the elapsed time + time::advance(ms(90)).await; + + assert_pending!(fut.poll()); + + // Complete the future + tx.send(()).unwrap(); + + assert_ready_ok!(fut.poll()).unwrap(); +} + +#[tokio::test] +async fn deadline_now_elapses() { + use futures::future::pending; + + time::pause(); + + // Wrap it with a deadline + let mut fut = task::spawn(timeout_at(Instant::now(), pending::<()>())); + + // Factor in jitter + // TODO: don't require this + time::advance(ms(1)).await; + + assert_ready_err!(fut.poll()); +} + +#[tokio::test] +async fn deadline_future_elapses() { + time::pause(); + + // Wrap it with a deadline + let mut fut = task::spawn(timeout_at(Instant::now() + ms(300), pending::<()>())); + + assert_pending!(fut.poll()); + + time::advance(ms(301)).await; + + assert!(fut.is_woken()); + assert_ready_err!(fut.poll()); +} + +fn ms(n: u64) -> Duration { + Duration::from_millis(n) +} diff --git a/third_party/rust/tokio/tests/udp.rs b/third_party/rust/tokio/tests/udp.rs new file mode 100644 index 0000000000..71c282a5cd --- /dev/null +++ b/third_party/rust/tokio/tests/udp.rs @@ -0,0 +1,73 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::net::UdpSocket; + +#[tokio::test] +async fn send_recv() -> std::io::Result<()> { + let mut sender = UdpSocket::bind("127.0.0.1:0").await?; + let mut receiver = UdpSocket::bind("127.0.0.1:0").await?; + + sender.connect(receiver.local_addr()?).await?; + receiver.connect(sender.local_addr()?).await?; + + let message = b"hello!"; + sender.send(message).await?; + + let mut recv_buf = [0u8; 32]; + let len = receiver.recv(&mut recv_buf[..]).await?; + + assert_eq!(&recv_buf[..len], message); + Ok(()) +} + +#[tokio::test] +async fn send_to_recv_from() -> std::io::Result<()> { + let mut sender = UdpSocket::bind("127.0.0.1:0").await?; + let mut receiver = UdpSocket::bind("127.0.0.1:0").await?; + + let message = b"hello!"; + let receiver_addr = receiver.local_addr()?; + sender.send_to(message, &receiver_addr).await?; + + let mut recv_buf = [0u8; 32]; + let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?; + + assert_eq!(&recv_buf[..len], message); + assert_eq!(addr, sender.local_addr()?); + Ok(()) +} + +#[tokio::test] +async fn split() -> std::io::Result<()> { + let socket = UdpSocket::bind("127.0.0.1:0").await?; + let (mut r, mut s) = socket.split(); + + let msg = b"hello"; + let addr = s.as_ref().local_addr()?; + tokio::spawn(async move { + s.send_to(msg, &addr).await.unwrap(); + }); + let mut recv_buf = [0u8; 32]; + let (len, _) = r.recv_from(&mut recv_buf[..]).await?; + assert_eq!(&recv_buf[..len], msg); + Ok(()) +} + +#[tokio::test] +async fn reunite() -> std::io::Result<()> { + let socket = UdpSocket::bind("127.0.0.1:0").await?; + let (s, r) = socket.split(); + assert!(s.reunite(r).is_ok()); + Ok(()) +} + +#[tokio::test] +async fn reunite_error() -> std::io::Result<()> { + let socket = UdpSocket::bind("127.0.0.1:0").await?; + let socket1 = UdpSocket::bind("127.0.0.1:0").await?; + let (s, _) = socket.split(); + let (_, r1) = socket1.split(); + assert!(s.reunite(r1).is_err()); + Ok(()) +} diff --git a/third_party/rust/tokio/tests/uds_cred.rs b/third_party/rust/tokio/tests/uds_cred.rs new file mode 100644 index 0000000000..c02b2aee4a --- /dev/null +++ b/third_party/rust/tokio/tests/uds_cred.rs @@ -0,0 +1,30 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(all(unix, not(target_os = "dragonfly")))] + +use tokio::net::UnixStream; + +use libc::getegid; +use libc::geteuid; + +#[tokio::test] +#[cfg_attr( + target_os = "freebsd", + ignore = "Requires FreeBSD 12.0 or later. https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=176419" +)] +#[cfg_attr( + target_os = "netbsd", + ignore = "NetBSD does not support getpeereid() for sockets created by socketpair()" +)] +async fn test_socket_pair() { + let (a, b) = UnixStream::pair().unwrap(); + let cred_a = a.peer_cred().unwrap(); + let cred_b = b.peer_cred().unwrap(); + assert_eq!(cred_a, cred_b); + + let uid = unsafe { geteuid() }; + let gid = unsafe { getegid() }; + + assert_eq!(cred_a.uid, uid); + assert_eq!(cred_a.gid, gid); +} diff --git a/third_party/rust/tokio/tests/uds_datagram.rs b/third_party/rust/tokio/tests/uds_datagram.rs new file mode 100644 index 0000000000..dd9952378f --- /dev/null +++ b/third_party/rust/tokio/tests/uds_datagram.rs @@ -0,0 +1,43 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(unix)] + +use tokio::net::UnixDatagram; + +use std::io; + +async fn echo_server(mut socket: UnixDatagram) -> io::Result<()> { + let mut recv_buf = vec![0u8; 1024]; + loop { + let (len, peer_addr) = socket.recv_from(&mut recv_buf[..]).await?; + if let Some(path) = peer_addr.as_pathname() { + socket.send_to(&recv_buf[..len], path).await?; + } + } +} + +#[tokio::test] +async fn echo() -> io::Result<()> { + let dir = tempfile::tempdir().unwrap(); + let server_path = dir.path().join("server.sock"); + let client_path = dir.path().join("client.sock"); + + let server_socket = UnixDatagram::bind(server_path.clone())?; + + tokio::spawn(async move { + if let Err(e) = echo_server(server_socket).await { + eprintln!("Error in echo server: {}", e); + } + }); + + { + let mut socket = UnixDatagram::bind(&client_path).unwrap(); + socket.connect(server_path)?; + socket.send(b"ECHO").await?; + let mut recv_buf = [0u8; 16]; + let len = socket.recv(&mut recv_buf[..]).await?; + assert_eq!(&recv_buf[..len], b"ECHO"); + } + + Ok(()) +} diff --git a/third_party/rust/tokio/tests/uds_split.rs b/third_party/rust/tokio/tests/uds_split.rs new file mode 100644 index 0000000000..76ff4613cd --- /dev/null +++ b/third_party/rust/tokio/tests/uds_split.rs @@ -0,0 +1,43 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] +#![cfg(unix)] + +use tokio::net::UnixStream; +use tokio::prelude::*; + +/// Checks that `UnixStream` can be split into a read half and a write half using +/// `UnixStream::split` and `UnixStream::split_mut`. +/// +/// Verifies that the implementation of `AsyncWrite::poll_shutdown` shutdowns the stream for +/// writing by reading to the end of stream on the other side of the connection. +#[tokio::test] +async fn split() -> std::io::Result<()> { + let (mut a, mut b) = UnixStream::pair()?; + + let (mut a_read, mut a_write) = a.split(); + let (mut b_read, mut b_write) = b.split(); + + let (a_response, b_response) = futures::future::try_join( + send_recv_all(&mut a_read, &mut a_write, b"A"), + send_recv_all(&mut b_read, &mut b_write, b"B"), + ) + .await?; + + assert_eq!(a_response, b"B"); + assert_eq!(b_response, b"A"); + + Ok(()) +} + +async fn send_recv_all( + read: &mut (dyn AsyncRead + Unpin), + write: &mut (dyn AsyncWrite + Unpin), + input: &[u8], +) -> std::io::Result<Vec<u8>> { + write.write_all(input).await?; + write.shutdown().await?; + + let mut output = Vec::new(); + read.read_to_end(&mut output).await?; + Ok(output) +} diff --git a/third_party/rust/tokio/tests/uds_stream.rs b/third_party/rust/tokio/tests/uds_stream.rs new file mode 100644 index 0000000000..29f118a2d4 --- /dev/null +++ b/third_party/rust/tokio/tests/uds_stream.rs @@ -0,0 +1,58 @@ +#![cfg(feature = "full")] +#![warn(rust_2018_idioms)] +#![cfg(unix)] + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{UnixListener, UnixStream}; + +use futures::future::try_join; + +#[tokio::test] +async fn accept_read_write() -> std::io::Result<()> { + let dir = tempfile::Builder::new() + .prefix("tokio-uds-tests") + .tempdir() + .unwrap(); + let sock_path = dir.path().join("connect.sock"); + + let mut listener = UnixListener::bind(&sock_path)?; + + let accept = listener.accept(); + let connect = UnixStream::connect(&sock_path); + let ((mut server, _), mut client) = try_join(accept, connect).await?; + + // Write to the client. TODO: Switch to write_all. + let write_len = client.write(b"hello").await?; + assert_eq!(write_len, 5); + drop(client); + // Read from the server. TODO: Switch to read_to_end. + let mut buf = [0u8; 5]; + server.read_exact(&mut buf).await?; + assert_eq!(&buf, b"hello"); + let len = server.read(&mut buf).await?; + assert_eq!(len, 0); + Ok(()) +} + +#[tokio::test] +async fn shutdown() -> std::io::Result<()> { + let dir = tempfile::Builder::new() + .prefix("tokio-uds-tests") + .tempdir() + .unwrap(); + let sock_path = dir.path().join("connect.sock"); + + let mut listener = UnixListener::bind(&sock_path)?; + + let accept = listener.accept(); + let connect = UnixStream::connect(&sock_path); + let ((mut server, _), mut client) = try_join(accept, connect).await?; + + // Shut down the client + AsyncWriteExt::shutdown(&mut client).await?; + // Read from the server should return 0 to indicate the channel has been closed. + let mut buf = [0u8; 1]; + let n = server.read(&mut buf).await?; + assert_eq!(n, 0); + Ok(()) +} |