use crate::io::sys; use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; use std::cmp; use std::future::Future; use std::io; use std::io::prelude::*; use std::pin::Pin; use std::task::Poll::*; use std::task::{Context, Poll}; use self::State::*; /// `T` should not implement _both_ Read and Write. #[derive(Debug)] pub(crate) struct Blocking { inner: Option, state: State, /// `true` if the lower IO layer needs flushing. need_flush: bool, } #[derive(Debug)] pub(crate) struct Buf { buf: Vec, pos: usize, } pub(crate) const MAX_BUF: usize = 2 * 1024 * 1024; #[derive(Debug)] enum State { Idle(Option), Busy(sys::Blocking<(io::Result, Buf, T)>), } cfg_io_blocking! { impl Blocking { #[cfg_attr(feature = "fs", allow(dead_code))] pub(crate) fn new(inner: T) -> Blocking { Blocking { inner: Some(inner), state: State::Idle(Some(Buf::with_capacity(0))), need_flush: false, } } } } impl AsyncRead for Blocking where T: Read + Unpin + Send + 'static, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, dst: &mut ReadBuf<'_>, ) -> Poll> { loop { match self.state { Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); if !buf.is_empty() { buf.copy_to(dst); *buf_cell = Some(buf); return Ready(Ok(())); } buf.ensure_capacity_for(dst); let mut inner = self.inner.take().unwrap(); self.state = Busy(sys::run(move || { let res = buf.read_from(&mut inner); (res, buf, inner) })); } Busy(ref mut rx) => { let (res, mut buf, inner) = ready!(Pin::new(rx).poll(cx))?; self.inner = Some(inner); match res { Ok(_) => { buf.copy_to(dst); self.state = Idle(Some(buf)); return Ready(Ok(())); } Err(e) => { assert!(buf.is_empty()); self.state = Idle(Some(buf)); return Ready(Err(e)); } } } } } } } impl AsyncWrite for Blocking where T: Write + Unpin + Send + 'static, { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, src: &[u8], ) -> Poll> { loop { match self.state { Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); assert!(buf.is_empty()); let n = buf.copy_from(src); let mut inner = self.inner.take().unwrap(); self.state = Busy(sys::run(move || { let n = buf.len(); let res = buf.write_to(&mut inner).map(|_| n); (res, buf, inner) })); self.need_flush = true; return Ready(Ok(n)); } Busy(ref mut rx) => { let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?; self.state = Idle(Some(buf)); self.inner = Some(inner); // If error, return res?; } } } } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { let need_flush = self.need_flush; match self.state { // The buffer is not used here Idle(ref mut buf_cell) => { if need_flush { let buf = buf_cell.take().unwrap(); let mut inner = self.inner.take().unwrap(); self.state = Busy(sys::run(move || { let res = inner.flush().map(|_| 0); (res, buf, inner) })); self.need_flush = false; } else { return Ready(Ok(())); } } Busy(ref mut rx) => { let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?; self.state = Idle(Some(buf)); self.inner = Some(inner); // If error, return res?; } } } } fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } /// Repeats operations that are interrupted. macro_rules! uninterruptibly { ($e:expr) => {{ loop { match $e { Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} res => break res, } } }}; } impl Buf { pub(crate) fn with_capacity(n: usize) -> Buf { Buf { buf: Vec::with_capacity(n), pos: 0, } } pub(crate) fn is_empty(&self) -> bool { self.len() == 0 } pub(crate) fn len(&self) -> usize { self.buf.len() - self.pos } pub(crate) fn copy_to(&mut self, dst: &mut ReadBuf<'_>) -> usize { let n = cmp::min(self.len(), dst.remaining()); dst.put_slice(&self.bytes()[..n]); self.pos += n; if self.pos == self.buf.len() { self.buf.truncate(0); self.pos = 0; } n } pub(crate) fn copy_from(&mut self, src: &[u8]) -> usize { assert!(self.is_empty()); let n = cmp::min(src.len(), MAX_BUF); self.buf.extend_from_slice(&src[..n]); n } pub(crate) fn bytes(&self) -> &[u8] { &self.buf[self.pos..] } pub(crate) fn ensure_capacity_for(&mut self, bytes: &ReadBuf<'_>) { assert!(self.is_empty()); let len = cmp::min(bytes.remaining(), MAX_BUF); if self.buf.len() < len { self.buf.reserve(len - self.buf.len()); } unsafe { self.buf.set_len(len); } } pub(crate) fn read_from(&mut self, rd: &mut T) -> io::Result { let res = uninterruptibly!(rd.read(&mut self.buf)); if let Ok(n) = res { self.buf.truncate(n); } else { self.buf.clear(); } assert_eq!(self.pos, 0); res } pub(crate) fn write_to(&mut self, wr: &mut T) -> io::Result<()> { assert_eq!(self.pos, 0); // `write_all` already ignores interrupts let res = wr.write_all(&self.buf); self.buf.clear(); res } } cfg_fs! { impl Buf { pub(crate) fn discard_read(&mut self) -> i64 { let ret = -(self.bytes().len() as i64); self.pos = 0; self.buf.truncate(0); ret } } }