#![warn(rust_2018_idioms)] #![cfg(all(target_os = "freebsd", feature = "net"))] use mio_aio::{AioCb, AioFsyncMode, LioCb}; use std::{ future::Future, mem, os::unix::io::{AsRawFd, RawFd}, pin::Pin, task::{Context, Poll}, }; use tempfile::tempfile; use tokio::io::bsd::{Aio, AioSource}; use tokio_test::assert_pending; mod aio { use super::*; /// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource struct WrappedAioCb<'a>(AioCb<'a>); impl<'a> AioSource for WrappedAioCb<'a> { fn register(&mut self, kq: RawFd, token: usize) { self.0.register_raw(kq, token) } fn deregister(&mut self) { self.0.deregister_raw() } } /// A very crude implementation of an AIO-based future struct FsyncFut(Aio>); impl Future for FsyncFut { type Output = std::io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let poll_result = self.0.poll_ready(cx); match poll_result { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Ready(Ok(_ev)) => { // At this point, we could clear readiness. But there's no // point, since we're about to drop the Aio. let result = (*self.0).0.aio_return(); match result { Ok(_) => Poll::Ready(Ok(())), Err(e) => Poll::Ready(Err(e.into())), } } } } } /// Low-level AIO Source /// /// An example bypassing mio_aio and Nix to demonstrate how the kevent /// registration actually works, under the hood. struct LlSource(Pin>); impl AioSource for LlSource { fn register(&mut self, kq: RawFd, token: usize) { let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; sev.sigev_notify = libc::SIGEV_KEVENT; sev.sigev_signo = kq; sev.sigev_value = libc::sigval { sival_ptr: token as *mut libc::c_void, }; self.0.aio_sigevent = sev; } fn deregister(&mut self) { unsafe { self.0.aio_sigevent = mem::zeroed(); } } } struct LlFut(Aio); impl Future for LlFut { type Output = std::io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let poll_result = self.0.poll_ready(cx); match poll_result { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Ready(Ok(_ev)) => { let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) }; assert_eq!(0, r); Poll::Ready(Ok(())) } } } } /// A very simple object that can implement AioSource and can be reused. /// /// mio_aio normally assumes that each AioCb will be consumed on completion. /// This somewhat contrived example shows how an Aio object can be reused /// anyway. struct ReusableFsyncSource { aiocb: Pin>>, fd: RawFd, token: usize, } impl ReusableFsyncSource { fn fsync(&mut self) { self.aiocb.register_raw(self.fd, self.token); self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap(); } fn new(aiocb: AioCb<'static>) -> Self { ReusableFsyncSource { aiocb: Box::pin(aiocb), fd: 0, token: 0, } } fn reset(&mut self, aiocb: AioCb<'static>) { self.aiocb = Box::pin(aiocb); } } impl AioSource for ReusableFsyncSource { fn register(&mut self, kq: RawFd, token: usize) { self.fd = kq; self.token = token; } fn deregister(&mut self) { self.fd = 0; } } struct ReusableFsyncFut<'a>(&'a mut Aio); impl<'a> Future for ReusableFsyncFut<'a> { type Output = std::io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let poll_result = self.0.poll_ready(cx); match poll_result { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Ready(Ok(ev)) => { // Since this future uses a reusable Aio, we must clear // its readiness here. That makes the future // non-idempotent; the caller can't poll it repeatedly after // it has already returned Ready. But that's ok; most // futures behave this way. self.0.clear_ready(ev); let result = (*self.0).aiocb.aio_return(); match result { Ok(_) => Poll::Ready(Ok(())), Err(e) => Poll::Ready(Err(e.into())), } } } } } #[tokio::test] async fn fsync() { let f = tempfile().unwrap(); let fd = f.as_raw_fd(); let aiocb = AioCb::from_fd(fd, 0); let source = WrappedAioCb(aiocb); let mut poll_aio = Aio::new_for_aio(source).unwrap(); (*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap(); let fut = FsyncFut(poll_aio); fut.await.unwrap(); } #[tokio::test] async fn ll_fsync() { let f = tempfile().unwrap(); let fd = f.as_raw_fd(); let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() }; aiocb.aio_fildes = fd; let source = LlSource(Box::pin(aiocb)); let mut poll_aio = Aio::new_for_aio(source).unwrap(); let r = unsafe { let p = (*poll_aio).0.as_mut().get_unchecked_mut(); libc::aio_fsync(libc::O_SYNC, p) }; assert_eq!(0, r); let fut = LlFut(poll_aio); fut.await.unwrap(); } /// A suitably crafted future type can reuse an Aio object #[tokio::test] async fn reuse() { let f = tempfile().unwrap(); let fd = f.as_raw_fd(); let aiocb0 = AioCb::from_fd(fd, 0); let source = ReusableFsyncSource::new(aiocb0); let mut poll_aio = Aio::new_for_aio(source).unwrap(); poll_aio.fsync(); let fut0 = ReusableFsyncFut(&mut poll_aio); fut0.await.unwrap(); let aiocb1 = AioCb::from_fd(fd, 0); poll_aio.reset(aiocb1); let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); assert_pending!(poll_aio.poll_ready(&mut ctx)); poll_aio.fsync(); let fut1 = ReusableFsyncFut(&mut poll_aio); fut1.await.unwrap(); } } mod lio { use super::*; struct WrappedLioCb<'a>(LioCb<'a>); impl<'a> AioSource for WrappedLioCb<'a> { fn register(&mut self, kq: RawFd, token: usize) { self.0.register_raw(kq, token) } fn deregister(&mut self) { self.0.deregister_raw() } } /// A very crude lio_listio-based Future struct LioFut(Option>>); impl Future for LioFut { type Output = std::io::Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let poll_result = self.0.as_mut().unwrap().poll_ready(cx); match poll_result { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Ready(Ok(_ev)) => { // At this point, we could clear readiness. But there's no // point, since we're about to drop the Aio. let r = self.0.take().unwrap().into_inner().0.into_results(|iter| { iter.map(|lr| lr.result.unwrap()).collect::>() }); Poll::Ready(Ok(r)) } } } } /// Minimal example demonstrating reuse of an Aio object with lio /// readiness. mio_aio::LioCb actually does something similar under the /// hood. struct ReusableLioSource { liocb: Option>, fd: RawFd, token: usize, } impl ReusableLioSource { fn new(liocb: LioCb<'static>) -> Self { ReusableLioSource { liocb: Some(liocb), fd: 0, token: 0, } } fn reset(&mut self, liocb: LioCb<'static>) { self.liocb = Some(liocb); } fn submit(&mut self) { self.liocb .as_mut() .unwrap() .register_raw(self.fd, self.token); self.liocb.as_mut().unwrap().submit().unwrap(); } } impl AioSource for ReusableLioSource { fn register(&mut self, kq: RawFd, token: usize) { self.fd = kq; self.token = token; } fn deregister(&mut self) { self.fd = 0; } } struct ReusableLioFut<'a>(&'a mut Aio); impl<'a> Future for ReusableLioFut<'a> { type Output = std::io::Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let poll_result = self.0.poll_ready(cx); match poll_result { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Ready(Ok(ev)) => { // Since this future uses a reusable Aio, we must clear // its readiness here. That makes the future // non-idempotent; the caller can't poll it repeatedly after // it has already returned Ready. But that's ok; most // futures behave this way. self.0.clear_ready(ev); let r = (*self.0).liocb.take().unwrap().into_results(|iter| { iter.map(|lr| lr.result.unwrap()).collect::>() }); Poll::Ready(Ok(r)) } } } } /// An lio_listio operation with one write element #[tokio::test] async fn onewrite() { const WBUF: &[u8] = b"abcdef"; let f = tempfile().unwrap(); let mut builder = mio_aio::LioCbBuilder::with_capacity(1); builder = builder.emplace_slice( f.as_raw_fd(), 0, &WBUF[..], 0, mio_aio::LioOpcode::LIO_WRITE, ); let liocb = builder.finish(); let source = WrappedLioCb(liocb); let mut poll_aio = Aio::new_for_lio(source).unwrap(); // Send the operation to the kernel (*poll_aio).0.submit().unwrap(); let fut = LioFut(Some(poll_aio)); let v = fut.await.unwrap(); assert_eq!(v.len(), 1); assert_eq!(v[0] as usize, WBUF.len()); } /// A suitably crafted future type can reuse an Aio object #[tokio::test] async fn reuse() { const WBUF: &[u8] = b"abcdef"; let f = tempfile().unwrap(); let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1); builder0 = builder0.emplace_slice( f.as_raw_fd(), 0, &WBUF[..], 0, mio_aio::LioOpcode::LIO_WRITE, ); let liocb0 = builder0.finish(); let source = ReusableLioSource::new(liocb0); let mut poll_aio = Aio::new_for_aio(source).unwrap(); poll_aio.submit(); let fut0 = ReusableLioFut(&mut poll_aio); let v = fut0.await.unwrap(); assert_eq!(v.len(), 1); assert_eq!(v[0] as usize, WBUF.len()); // Now reuse the same Aio let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1); builder1 = builder1.emplace_slice( f.as_raw_fd(), 0, &WBUF[..], 0, mio_aio::LioOpcode::LIO_WRITE, ); let liocb1 = builder1.finish(); poll_aio.reset(liocb1); let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); assert_pending!(poll_aio.poll_ready(&mut ctx)); poll_aio.submit(); let fut1 = ReusableLioFut(&mut poll_aio); let v = fut1.await.unwrap(); assert_eq!(v.len(), 1); assert_eq!(v[0] as usize, WBUF.len()); } }