diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:02:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:02:58 +0000 |
commit | 698f8c2f01ea549d77d7dc3338a12e04c11057b9 (patch) | |
tree | 173a775858bd501c378080a10dca74132f05bc50 /vendor/tokio/tests/io_async_fd.rs | |
parent | Initial commit. (diff) | |
download | rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.tar.xz rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.zip |
Adding upstream version 1.64.0+dfsg1.upstream/1.64.0+dfsg1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/tests/io_async_fd.rs')
-rw-r--r-- | vendor/tokio/tests/io_async_fd.rs | 599 |
1 files changed, 599 insertions, 0 deletions
diff --git a/vendor/tokio/tests/io_async_fd.rs b/vendor/tokio/tests/io_async_fd.rs new file mode 100644 index 000000000..dc21e426f --- /dev/null +++ b/vendor/tokio/tests/io_async_fd.rs @@ -0,0 +1,599 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(unix, feature = "full"))] + +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; +use std::time::Duration; +use std::{ + future::Future, + io::{self, ErrorKind, Read, Write}, + task::{Context, Waker}, +}; + +use nix::unistd::{close, read, write}; + +use futures::{poll, FutureExt}; + +use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard}; +use tokio_test::{assert_err, assert_pending}; + +struct TestWaker { + inner: Arc<TestWakerInner>, + waker: Waker, +} + +#[derive(Default)] +struct TestWakerInner { + awoken: AtomicBool, +} + +impl futures::task::ArcWake for TestWakerInner { + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.awoken.store(true, Ordering::SeqCst); + } +} + +impl TestWaker { + fn new() -> Self { + let inner: Arc<TestWakerInner> = Default::default(); + + Self { + inner: inner.clone(), + waker: futures::task::waker(inner), + } + } + + fn awoken(&self) -> bool { + self.inner.awoken.swap(false, Ordering::SeqCst) + } + + fn context(&self) -> Context<'_> { + Context::from_waker(&self.waker) + } +} + +#[derive(Debug)] +struct FileDescriptor { + fd: RawFd, +} + +impl AsRawFd for FileDescriptor { + fn as_raw_fd(&self) -> RawFd { + self.fd + } +} + +impl Read for &FileDescriptor { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + read(self.fd, buf).map_err(io::Error::from) + } +} + +impl Read for FileDescriptor { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + (self as &Self).read(buf) + } +} + +impl Write for &FileDescriptor { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + write(self.fd, buf).map_err(io::Error::from) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl Write for FileDescriptor { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + (self as &Self).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (self as &Self).flush() + } +} + +impl Drop for FileDescriptor { + fn drop(&mut self) { + let _ = close(self.fd); + } +} + +fn set_nonblocking(fd: RawFd) { + use nix::fcntl::{OFlag, F_GETFL, F_SETFL}; + + let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)"); + + if flags < 0 { + panic!( + "bad return value from fcntl(F_GETFL): {} ({:?})", + flags, + nix::Error::last() + ); + } + + let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK; + + nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)"); +} + +fn socketpair() -> (FileDescriptor, FileDescriptor) { + use nix::sys::socket::{self, AddressFamily, SockFlag, SockType}; + + let (fd_a, fd_b) = socket::socketpair( + AddressFamily::Unix, + SockType::Stream, + None, + SockFlag::empty(), + ) + .expect("socketpair"); + let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b }); + + set_nonblocking(fds.0.fd); + set_nonblocking(fds.1.fd); + + fds +} + +fn drain(mut fd: &FileDescriptor) { + let mut buf = [0u8; 512]; + + loop { + match fd.read(&mut buf[..]) { + Err(e) if e.kind() == ErrorKind::WouldBlock => break, + Ok(0) => panic!("unexpected EOF"), + Err(e) => panic!("unexpected error: {:?}", e), + Ok(_) => continue, + } + } +} + +#[tokio::test] +async fn initially_writable() { + let (a, b) = socketpair(); + + let afd_a = AsyncFd::new(a).unwrap(); + let afd_b = AsyncFd::new(b).unwrap(); + + afd_a.writable().await.unwrap().clear_ready(); + afd_b.writable().await.unwrap().clear_ready(); + + futures::select_biased! { + _ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {}, + _ = afd_a.readable().fuse() => panic!("Unexpected readable state"), + _ = afd_b.readable().fuse() => panic!("Unexpected readable state"), + } +} + +#[tokio::test] +async fn reset_readable() { + let (a, mut b) = socketpair(); + + let afd_a = AsyncFd::new(a).unwrap(); + + let readable = afd_a.readable(); + tokio::pin!(readable); + + tokio::select! { + _ = readable.as_mut() => panic!(), + _ = tokio::time::sleep(Duration::from_millis(10)) => {} + } + + b.write_all(b"0").unwrap(); + + let mut guard = readable.await.unwrap(); + + guard + .try_io(|_| afd_a.get_ref().read(&mut [0])) + .unwrap() + .unwrap(); + + // `a` is not readable, but the reactor still thinks it is + // (because we have not observed a not-ready error yet) + afd_a.readable().await.unwrap().retain_ready(); + + // Explicitly clear the ready state + guard.clear_ready(); + + let readable = afd_a.readable(); + tokio::pin!(readable); + + tokio::select! { + _ = readable.as_mut() => panic!(), + _ = tokio::time::sleep(Duration::from_millis(10)) => {} + } + + b.write_all(b"0").unwrap(); + + // We can observe the new readable event + afd_a.readable().await.unwrap().clear_ready(); +} + +#[tokio::test] +async fn reset_writable() { + let (a, b) = socketpair(); + + let afd_a = AsyncFd::new(a).unwrap(); + + let mut guard = afd_a.writable().await.unwrap(); + + // Write until we get a WouldBlock. This also clears the ready state. + while guard + .try_io(|_| afd_a.get_ref().write(&[0; 512][..])) + .is_ok() + {} + + // Writable state should be cleared now. + let writable = afd_a.writable(); + tokio::pin!(writable); + + tokio::select! { + _ = writable.as_mut() => panic!(), + _ = tokio::time::sleep(Duration::from_millis(10)) => {} + } + + // Read from the other side; we should become writable now. + drain(&b); + + let _ = writable.await.unwrap(); +} + +#[derive(Debug)] +struct ArcFd<T>(Arc<T>); +impl<T: AsRawFd> AsRawFd for ArcFd<T> { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + +#[tokio::test] +async fn drop_closes() { + let (a, mut b) = socketpair(); + + let afd_a = AsyncFd::new(a).unwrap(); + + assert_eq!( + ErrorKind::WouldBlock, + b.read(&mut [0]).err().unwrap().kind() + ); + + std::mem::drop(afd_a); + + assert_eq!(0, b.read(&mut [0]).unwrap()); + + // into_inner does not close the fd + + let (a, mut b) = socketpair(); + let afd_a = AsyncFd::new(a).unwrap(); + let _a: FileDescriptor = afd_a.into_inner(); + + assert_eq!( + ErrorKind::WouldBlock, + b.read(&mut [0]).err().unwrap().kind() + ); + + // Drop closure behavior is delegated to the inner object + let (a, mut b) = socketpair(); + let arc_fd = Arc::new(a); + let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap(); + std::mem::drop(afd_a); + + assert_eq!( + ErrorKind::WouldBlock, + b.read(&mut [0]).err().unwrap().kind() + ); + + std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning +} + +#[tokio::test] +async fn reregister() { + let (a, _b) = socketpair(); + + let afd_a = AsyncFd::new(a).unwrap(); + let a = afd_a.into_inner(); + AsyncFd::new(a).unwrap(); +} + +#[tokio::test] +async fn try_io() { + let (a, mut b) = socketpair(); + + b.write_all(b"0").unwrap(); + + let afd_a = AsyncFd::new(a).unwrap(); + + let mut guard = afd_a.readable().await.unwrap(); + + afd_a.get_ref().read_exact(&mut [0]).unwrap(); + + // Should not clear the readable state + let _ = guard.try_io(|_| Ok(())); + + // Still readable... + let _ = afd_a.readable().await.unwrap(); + + // Should clear the readable state + let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into())); + + // Assert not readable + let readable = afd_a.readable(); + tokio::pin!(readable); + + tokio::select! { + _ = readable.as_mut() => panic!(), + _ = tokio::time::sleep(Duration::from_millis(10)) => {} + } + + // Write something down b again and make sure we're reawoken + b.write_all(b"0").unwrap(); + let _ = readable.await.unwrap(); +} + +#[tokio::test] +async fn multiple_waiters() { + let (a, mut b) = socketpair(); + let afd_a = Arc::new(AsyncFd::new(a).unwrap()); + + let barrier = Arc::new(tokio::sync::Barrier::new(11)); + + let mut tasks = Vec::new(); + for _ in 0..10 { + let afd_a = afd_a.clone(); + let barrier = barrier.clone(); + + let f = async move { + let notify_barrier = async { + barrier.wait().await; + futures::future::pending::<()>().await; + }; + + futures::select_biased! { + guard = afd_a.readable().fuse() => { + tokio::task::yield_now().await; + guard.unwrap().clear_ready() + }, + _ = notify_barrier.fuse() => unreachable!(), + } + + std::mem::drop(afd_a); + }; + + tasks.push(tokio::spawn(f)); + } + + let mut all_tasks = futures::future::try_join_all(tasks); + + tokio::select! { + r = std::pin::Pin::new(&mut all_tasks) => { + r.unwrap(); // propagate panic + panic!("Tasks exited unexpectedly") + }, + _ = barrier.wait() => {} + }; + + b.write_all(b"0").unwrap(); + + all_tasks.await.unwrap(); +} + +#[tokio::test] +async fn poll_fns() { + let (a, b) = socketpair(); + let afd_a = Arc::new(AsyncFd::new(a).unwrap()); + let afd_b = Arc::new(AsyncFd::new(b).unwrap()); + + // Fill up the write side of A + while afd_a.get_ref().write(&[0; 512]).is_ok() {} + + let waker = TestWaker::new(); + + assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context())); + + let afd_a_2 = afd_a.clone(); + let r_barrier = Arc::new(tokio::sync::Barrier::new(2)); + let barrier_clone = r_barrier.clone(); + + let read_fut = tokio::spawn(async move { + // Move waker onto this task first + assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2 + .as_ref() + .poll_read_ready(cx)))); + barrier_clone.wait().await; + + let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await; + }); + + let afd_a_2 = afd_a.clone(); + let w_barrier = Arc::new(tokio::sync::Barrier::new(2)); + let barrier_clone = w_barrier.clone(); + + let mut write_fut = tokio::spawn(async move { + // Move waker onto this task first + assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2 + .as_ref() + .poll_write_ready(cx)))); + barrier_clone.wait().await; + + let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await; + }); + + r_barrier.wait().await; + w_barrier.wait().await; + + let readable = afd_a.readable(); + tokio::pin!(readable); + + tokio::select! { + _ = &mut readable => unreachable!(), + _ = tokio::task::yield_now() => {} + } + + // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly + afd_b.get_ref().write_all(b"0").unwrap(); + + let _ = tokio::join!(readable, read_fut); + + // Our original waker should _not_ be awoken (poll_read_ready retains only the last context) + assert!(!waker.awoken()); + + // The writable side should not be awoken + tokio::select! { + _ = &mut write_fut => unreachable!(), + _ = tokio::time::sleep(Duration::from_millis(5)) => {} + } + + // Make it writable now + drain(afd_b.get_ref()); + + // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side) + let _ = write_fut.await; +} + +fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> { + let mut pinned = Box::pin(f); + + assert_pending!(pinned + .as_mut() + .poll(&mut Context::from_waker(futures::task::noop_waker_ref()))); + + pinned +} + +fn rt() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() +} + +#[test] +fn driver_shutdown_wakes_currently_pending() { + let rt = rt(); + + let (a, _b) = socketpair(); + let afd_a = { + let _enter = rt.enter(); + AsyncFd::new(a).unwrap() + }; + + let readable = assert_pending(afd_a.readable()); + + std::mem::drop(rt); + + // The future was initialized **before** dropping the rt + assert_err!(futures::executor::block_on(readable)); + + // The future is initialized **after** dropping the rt. + assert_err!(futures::executor::block_on(afd_a.readable())); +} + +#[test] +fn driver_shutdown_wakes_future_pending() { + let rt = rt(); + + let (a, _b) = socketpair(); + let afd_a = { + let _enter = rt.enter(); + AsyncFd::new(a).unwrap() + }; + + std::mem::drop(rt); + + assert_err!(futures::executor::block_on(afd_a.readable())); +} + +#[test] +fn driver_shutdown_wakes_pending_race() { + // TODO: make this a loom test + for _ in 0..100 { + let rt = rt(); + + let (a, _b) = socketpair(); + let afd_a = { + let _enter = rt.enter(); + AsyncFd::new(a).unwrap() + }; + + let _ = std::thread::spawn(move || std::mem::drop(rt)); + + // This may or may not return an error (but will be awoken) + let _ = futures::executor::block_on(afd_a.readable()); + + // However retrying will always return an error + assert_err!(futures::executor::block_on(afd_a.readable())); + } +} + +async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> { + futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await +} + +async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> { + futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await +} + +#[test] +fn driver_shutdown_wakes_currently_pending_polls() { + let rt = rt(); + + let (a, _b) = socketpair(); + let afd_a = { + let _enter = rt.enter(); + AsyncFd::new(a).unwrap() + }; + + while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable + + let readable = assert_pending(poll_readable(&afd_a)); + let writable = assert_pending(poll_writable(&afd_a)); + + std::mem::drop(rt); + + // Attempting to poll readiness when the rt is dropped is an error + assert_err!(futures::executor::block_on(readable)); + assert_err!(futures::executor::block_on(writable)); +} + +#[test] +fn driver_shutdown_wakes_poll() { + let rt = rt(); + + let (a, _b) = socketpair(); + let afd_a = { + let _enter = rt.enter(); + AsyncFd::new(a).unwrap() + }; + + std::mem::drop(rt); + + assert_err!(futures::executor::block_on(poll_readable(&afd_a))); + assert_err!(futures::executor::block_on(poll_writable(&afd_a))); +} + +#[test] +fn driver_shutdown_wakes_poll_race() { + // TODO: make this a loom test + for _ in 0..100 { + let rt = rt(); + + let (a, _b) = socketpair(); + let afd_a = { + let _enter = rt.enter(); + AsyncFd::new(a).unwrap() + }; + + while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable + + let _ = std::thread::spawn(move || std::mem::drop(rt)); + + // The poll variants will always return an error in this case + assert_err!(futures::executor::block_on(poll_readable(&afd_a))); + assert_err!(futures::executor::block_on(poll_writable(&afd_a))); + } +} |