diff options
Diffstat (limited to 'third_party/rust/tokio/tests/io_poll_aio.rs')
-rw-r--r-- | third_party/rust/tokio/tests/io_poll_aio.rs | 375 |
1 files changed, 375 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/io_poll_aio.rs b/third_party/rust/tokio/tests/io_poll_aio.rs new file mode 100644 index 0000000000..f044af5cc4 --- /dev/null +++ b/third_party/rust/tokio/tests/io_poll_aio.rs @@ -0,0 +1,375 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(target_os = "freebsd", feature = "net"))] + +use mio_aio::{AioCb, AioFsyncMode, LioCb}; +use std::{ + future::Future, + mem, + os::unix::io::{AsRawFd, RawFd}, + pin::Pin, + task::{Context, Poll}, +}; +use tempfile::tempfile; +use tokio::io::bsd::{Aio, AioSource}; +use tokio_test::assert_pending; + +mod aio { + use super::*; + + /// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource + struct WrappedAioCb<'a>(AioCb<'a>); + impl<'a> AioSource for WrappedAioCb<'a> { + fn register(&mut self, kq: RawFd, token: usize) { + self.0.register_raw(kq, token) + } + fn deregister(&mut self) { + self.0.deregister_raw() + } + } + + /// A very crude implementation of an AIO-based future + struct FsyncFut(Aio<WrappedAioCb<'static>>); + + impl Future for FsyncFut { + type Output = std::io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_ev)) => { + // At this point, we could clear readiness. But there's no + // point, since we're about to drop the Aio. + let result = (*self.0).0.aio_return(); + match result { + Ok(_) => Poll::Ready(Ok(())), + Err(e) => Poll::Ready(Err(e.into())), + } + } + } + } + } + + /// Low-level AIO Source + /// + /// An example bypassing mio_aio and Nix to demonstrate how the kevent + /// registration actually works, under the hood. + struct LlSource(Pin<Box<libc::aiocb>>); + + impl AioSource for LlSource { + fn register(&mut self, kq: RawFd, token: usize) { + let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; + sev.sigev_notify = libc::SIGEV_KEVENT; + sev.sigev_signo = kq; + sev.sigev_value = libc::sigval { + sival_ptr: token as *mut libc::c_void, + }; + self.0.aio_sigevent = sev; + } + + fn deregister(&mut self) { + unsafe { + self.0.aio_sigevent = mem::zeroed(); + } + } + } + + struct LlFut(Aio<LlSource>); + + impl Future for LlFut { + type Output = std::io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_ev)) => { + let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) }; + assert_eq!(0, r); + Poll::Ready(Ok(())) + } + } + } + } + + /// A very simple object that can implement AioSource and can be reused. + /// + /// mio_aio normally assumes that each AioCb will be consumed on completion. + /// This somewhat contrived example shows how an Aio object can be reused + /// anyway. + struct ReusableFsyncSource { + aiocb: Pin<Box<AioCb<'static>>>, + fd: RawFd, + token: usize, + } + impl ReusableFsyncSource { + fn fsync(&mut self) { + self.aiocb.register_raw(self.fd, self.token); + self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap(); + } + fn new(aiocb: AioCb<'static>) -> Self { + ReusableFsyncSource { + aiocb: Box::pin(aiocb), + fd: 0, + token: 0, + } + } + fn reset(&mut self, aiocb: AioCb<'static>) { + self.aiocb = Box::pin(aiocb); + } + } + impl AioSource for ReusableFsyncSource { + fn register(&mut self, kq: RawFd, token: usize) { + self.fd = kq; + self.token = token; + } + fn deregister(&mut self) { + self.fd = 0; + } + } + + struct ReusableFsyncFut<'a>(&'a mut Aio<ReusableFsyncSource>); + impl<'a> Future for ReusableFsyncFut<'a> { + type Output = std::io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(ev)) => { + // Since this future uses a reusable Aio, we must clear + // its readiness here. That makes the future + // non-idempotent; the caller can't poll it repeatedly after + // it has already returned Ready. But that's ok; most + // futures behave this way. + self.0.clear_ready(ev); + let result = (*self.0).aiocb.aio_return(); + match result { + Ok(_) => Poll::Ready(Ok(())), + Err(e) => Poll::Ready(Err(e.into())), + } + } + } + } + } + + #[tokio::test] + async fn fsync() { + let f = tempfile().unwrap(); + let fd = f.as_raw_fd(); + let aiocb = AioCb::from_fd(fd, 0); + let source = WrappedAioCb(aiocb); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + (*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap(); + let fut = FsyncFut(poll_aio); + fut.await.unwrap(); + } + + #[tokio::test] + async fn ll_fsync() { + let f = tempfile().unwrap(); + let fd = f.as_raw_fd(); + let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() }; + aiocb.aio_fildes = fd; + let source = LlSource(Box::pin(aiocb)); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + let r = unsafe { + let p = (*poll_aio).0.as_mut().get_unchecked_mut(); + libc::aio_fsync(libc::O_SYNC, p) + }; + assert_eq!(0, r); + let fut = LlFut(poll_aio); + fut.await.unwrap(); + } + + /// A suitably crafted future type can reuse an Aio object + #[tokio::test] + async fn reuse() { + let f = tempfile().unwrap(); + let fd = f.as_raw_fd(); + let aiocb0 = AioCb::from_fd(fd, 0); + let source = ReusableFsyncSource::new(aiocb0); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + poll_aio.fsync(); + let fut0 = ReusableFsyncFut(&mut poll_aio); + fut0.await.unwrap(); + + let aiocb1 = AioCb::from_fd(fd, 0); + poll_aio.reset(aiocb1); + let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); + assert_pending!(poll_aio.poll_ready(&mut ctx)); + poll_aio.fsync(); + let fut1 = ReusableFsyncFut(&mut poll_aio); + fut1.await.unwrap(); + } +} + +mod lio { + use super::*; + + struct WrappedLioCb<'a>(LioCb<'a>); + impl<'a> AioSource for WrappedLioCb<'a> { + fn register(&mut self, kq: RawFd, token: usize) { + self.0.register_raw(kq, token) + } + fn deregister(&mut self) { + self.0.deregister_raw() + } + } + + /// A very crude lio_listio-based Future + struct LioFut(Option<Aio<WrappedLioCb<'static>>>); + + impl Future for LioFut { + type Output = std::io::Result<Vec<isize>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let poll_result = self.0.as_mut().unwrap().poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_ev)) => { + // At this point, we could clear readiness. But there's no + // point, since we're about to drop the Aio. + let r = self.0.take().unwrap().into_inner().0.into_results(|iter| { + iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>() + }); + Poll::Ready(Ok(r)) + } + } + } + } + + /// Minimal example demonstrating reuse of an Aio object with lio + /// readiness. mio_aio::LioCb actually does something similar under the + /// hood. + struct ReusableLioSource { + liocb: Option<LioCb<'static>>, + fd: RawFd, + token: usize, + } + impl ReusableLioSource { + fn new(liocb: LioCb<'static>) -> Self { + ReusableLioSource { + liocb: Some(liocb), + fd: 0, + token: 0, + } + } + fn reset(&mut self, liocb: LioCb<'static>) { + self.liocb = Some(liocb); + } + fn submit(&mut self) { + self.liocb + .as_mut() + .unwrap() + .register_raw(self.fd, self.token); + self.liocb.as_mut().unwrap().submit().unwrap(); + } + } + impl AioSource for ReusableLioSource { + fn register(&mut self, kq: RawFd, token: usize) { + self.fd = kq; + self.token = token; + } + fn deregister(&mut self) { + self.fd = 0; + } + } + struct ReusableLioFut<'a>(&'a mut Aio<ReusableLioSource>); + impl<'a> Future for ReusableLioFut<'a> { + type Output = std::io::Result<Vec<isize>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let poll_result = self.0.poll_ready(cx); + match poll_result { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(ev)) => { + // Since this future uses a reusable Aio, we must clear + // its readiness here. That makes the future + // non-idempotent; the caller can't poll it repeatedly after + // it has already returned Ready. But that's ok; most + // futures behave this way. + self.0.clear_ready(ev); + let r = (*self.0).liocb.take().unwrap().into_results(|iter| { + iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>() + }); + Poll::Ready(Ok(r)) + } + } + } + } + + /// An lio_listio operation with one write element + #[tokio::test] + async fn onewrite() { + const WBUF: &[u8] = b"abcdef"; + let f = tempfile().unwrap(); + + let mut builder = mio_aio::LioCbBuilder::with_capacity(1); + builder = builder.emplace_slice( + f.as_raw_fd(), + 0, + &WBUF[..], + 0, + mio_aio::LioOpcode::LIO_WRITE, + ); + let liocb = builder.finish(); + let source = WrappedLioCb(liocb); + let mut poll_aio = Aio::new_for_lio(source).unwrap(); + + // Send the operation to the kernel + (*poll_aio).0.submit().unwrap(); + let fut = LioFut(Some(poll_aio)); + let v = fut.await.unwrap(); + assert_eq!(v.len(), 1); + assert_eq!(v[0] as usize, WBUF.len()); + } + + /// A suitably crafted future type can reuse an Aio object + #[tokio::test] + async fn reuse() { + const WBUF: &[u8] = b"abcdef"; + let f = tempfile().unwrap(); + + let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1); + builder0 = builder0.emplace_slice( + f.as_raw_fd(), + 0, + &WBUF[..], + 0, + mio_aio::LioOpcode::LIO_WRITE, + ); + let liocb0 = builder0.finish(); + let source = ReusableLioSource::new(liocb0); + let mut poll_aio = Aio::new_for_aio(source).unwrap(); + poll_aio.submit(); + let fut0 = ReusableLioFut(&mut poll_aio); + let v = fut0.await.unwrap(); + assert_eq!(v.len(), 1); + assert_eq!(v[0] as usize, WBUF.len()); + + // Now reuse the same Aio + let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1); + builder1 = builder1.emplace_slice( + f.as_raw_fd(), + 0, + &WBUF[..], + 0, + mio_aio::LioOpcode::LIO_WRITE, + ); + let liocb1 = builder1.finish(); + poll_aio.reset(liocb1); + let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); + assert_pending!(poll_aio.poll_ready(&mut ctx)); + poll_aio.submit(); + let fut1 = ReusableLioFut(&mut poll_aio); + let v = fut1.await.unwrap(); + assert_eq!(v.len(), 1); + assert_eq!(v[0] as usize, WBUF.len()); + } +} |