//! IO //! //! This module contains a number of functions for working with //! `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and `AsyncBufRead` types, including //! the `AsyncReadExt`, `AsyncWriteExt`, `AsyncSeekExt`, and `AsyncBufReadExt` //! traits which add methods to the `AsyncRead`, `AsyncWrite`, `AsyncSeek`, //! and `AsyncBufRead` types. //! //! This module is only available when the `io` and `std` features of this //! library is activated, and it is activated by default. #[cfg(feature = "io-compat")] use crate::compat::Compat; use std::ptr; pub use futures_io::{ AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom, }; #[cfg(feature = "read-initializer")] pub use futures_io::Initializer; // used by `BufReader` and `BufWriter` // https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1 const DEFAULT_BUF_SIZE: usize = 8 * 1024; /// Initializes a buffer if necessary. /// /// A buffer is always initialized if `read-initializer` feature is disabled. #[inline] unsafe fn initialize(_reader: &R, buf: &mut [u8]) { #[cfg(feature = "read-initializer")] { if !_reader.initializer().should_initialize() { return; } } ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) } mod allow_std; pub use self::allow_std::AllowStdIo; mod buf_reader; pub use self::buf_reader::BufReader; mod buf_writer; pub use self::buf_writer::BufWriter; mod chain; pub use self::chain::Chain; mod close; pub use self::close::Close; mod copy; pub use self::copy::{copy, Copy}; mod copy_buf; pub use self::copy_buf::{copy_buf, CopyBuf}; mod cursor; pub use self::cursor::Cursor; mod empty; pub use self::empty::{empty, Empty}; mod flush; pub use self::flush::Flush; #[cfg(feature = "sink")] mod into_sink; #[cfg(feature = "sink")] pub use self::into_sink::IntoSink; mod lines; pub use self::lines::Lines; mod read; pub use self::read::Read; mod read_vectored; pub use self::read_vectored::ReadVectored; mod read_exact; pub use self::read_exact::ReadExact; mod read_line; pub use self::read_line::ReadLine; mod read_to_end; pub use self::read_to_end::ReadToEnd; mod read_to_string; pub use self::read_to_string::ReadToString; mod read_until; pub use self::read_until::ReadUntil; mod repeat; pub use self::repeat::{repeat, Repeat}; mod seek; pub use self::seek::Seek; mod sink; pub use self::sink::{sink, Sink}; mod split; pub use self::split::{ReadHalf, WriteHalf}; mod take; pub use self::take::Take; mod window; pub use self::window::Window; mod write; pub use self::write::Write; mod write_vectored; pub use self::write_vectored::WriteVectored; mod write_all; pub use self::write_all::WriteAll; /// An extension trait which adds utility methods to `AsyncRead` types. pub trait AsyncReadExt: AsyncRead { /// Creates an adaptor which will chain this stream with another. /// /// The returned `AsyncRead` instance will first read all bytes from this object /// until EOF is encountered. Afterwards the output is equivalent to the /// output of `next`. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::{AsyncReadExt, Cursor}; /// /// let reader1 = Cursor::new([1, 2, 3, 4]); /// let reader2 = Cursor::new([5, 6, 7, 8]); /// /// let mut reader = reader1.chain(reader2); /// let mut buffer = Vec::new(); /// /// // read the value into a Vec. /// reader.read_to_end(&mut buffer).await?; /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` fn chain(self, next: R) -> Chain where Self: Sized, R: AsyncRead, { Chain::new(self, next) } /// Tries to read some bytes directly into the given `buf` in asynchronous /// manner, returning a future type. /// /// The returned future will resolve to the number of bytes read once the read /// operation is completed. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::{AsyncReadExt, Cursor}; /// /// let mut reader = Cursor::new([1, 2, 3, 4]); /// let mut output = [0u8; 5]; /// /// let bytes = reader.read(&mut output[..]).await?; /// /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous /// // reader. In a real system you could get anywhere from 1 to /// // `output.len()` bytes in a single read. /// assert_eq!(bytes, 4); /// assert_eq!(output, [1, 2, 3, 4, 0]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> where Self: Unpin, { Read::new(self, buf) } /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored /// IO operations. /// /// The returned future will resolve to the number of bytes read once the read /// operation is completed. fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self> where Self: Unpin, { ReadVectored::new(self, bufs) } /// Creates a future which will read exactly enough bytes to fill `buf`, /// returning an error if end of file (EOF) is hit sooner. /// /// The returned future will resolve once the read operation is completed. /// /// In the case of an error the buffer and the object will be discarded, with /// the error yielded. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::{AsyncReadExt, Cursor}; /// /// let mut reader = Cursor::new([1, 2, 3, 4]); /// let mut output = [0u8; 4]; /// /// reader.read_exact(&mut output).await?; /// /// assert_eq!(output, [1, 2, 3, 4]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` /// /// ## EOF is hit before `buf` is filled /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::{self, AsyncReadExt, Cursor}; /// /// let mut reader = Cursor::new([1, 2, 3, 4]); /// let mut output = [0u8; 5]; /// /// let result = reader.read_exact(&mut output).await; /// /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof); /// # }); /// ``` fn read_exact<'a>( &'a mut self, buf: &'a mut [u8], ) -> ReadExact<'a, Self> where Self: Unpin, { ReadExact::new(self, buf) } /// Creates a future which will read all the bytes from this `AsyncRead`. /// /// On success the total number of bytes read is returned. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::{AsyncReadExt, Cursor}; /// /// let mut reader = Cursor::new([1, 2, 3, 4]); /// let mut output = Vec::with_capacity(4); /// /// let bytes = reader.read_to_end(&mut output).await?; /// /// assert_eq!(bytes, 4); /// assert_eq!(output, vec![1, 2, 3, 4]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` fn read_to_end<'a>( &'a mut self, buf: &'a mut Vec, ) -> ReadToEnd<'a, Self> where Self: Unpin, { ReadToEnd::new(self, buf) } /// Creates a future which will read all the bytes from this `AsyncRead`. /// /// On success the total number of bytes read is returned. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::{AsyncReadExt, Cursor}; /// /// let mut reader = Cursor::new(&b"1234"[..]); /// let mut buffer = String::with_capacity(4); /// /// let bytes = reader.read_to_string(&mut buffer).await?; /// /// assert_eq!(bytes, 4); /// assert_eq!(buffer, String::from("1234")); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` fn read_to_string<'a>( &'a mut self, buf: &'a mut String, ) -> ReadToString<'a, Self> where Self: Unpin, { ReadToString::new(self, buf) } /// Helper method for splitting this read/write object into two halves. /// /// The two halves returned implement the `AsyncRead` and `AsyncWrite` /// traits, respectively. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::{self, AsyncReadExt, Cursor}; /// /// // Note that for `Cursor` the read and write halves share a single /// // seek position. This may or may not be true for other types that /// // implement both `AsyncRead` and `AsyncWrite`. /// /// let reader = Cursor::new([1, 2, 3, 4]); /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]); /// let mut writer = Cursor::new(vec![0u8; 5]); /// /// { /// let (buffer_reader, mut buffer_writer) = (&mut buffer).split(); /// io::copy(reader, &mut buffer_writer).await?; /// io::copy(buffer_reader, &mut writer).await?; /// } /// /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]); /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` fn split(self) -> (ReadHalf, WriteHalf) where Self: AsyncWrite + Sized, { split::split(self) } /// Creates an AsyncRead adapter which will read at most `limit` bytes /// from the underlying reader. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::{AsyncReadExt, Cursor}; /// /// let reader = Cursor::new(&b"12345678"[..]); /// let mut buffer = [0; 5]; /// /// let mut take = reader.take(4); /// let n = take.read(&mut buffer).await?; /// /// assert_eq!(n, 4); /// assert_eq!(&buffer, b"1234\0"); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` fn take(self, limit: u64) -> Take where Self: Sized { Take::new(self, limit) } /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type /// implements [`AsyncWrite`] as well, the result will also implement the /// futures 0.1 / tokio 0.1 `AsyncWrite` trait. /// /// Requires the `io-compat` feature to enable. #[cfg(feature = "io-compat")] fn compat(self) -> Compat where Self: Sized + Unpin, { Compat::new(self) } } impl AsyncReadExt for R {} /// An extension trait which adds utility methods to `AsyncWrite` types. pub trait AsyncWriteExt: AsyncWrite { /// Creates a future which will entirely flush this `AsyncWrite`. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::{AllowStdIo, AsyncWriteExt}; /// use std::io::{BufWriter, Cursor}; /// /// let mut output = vec![0u8; 5]; /// /// { /// let writer = Cursor::new(&mut output); /// let mut buffered = AllowStdIo::new(BufWriter::new(writer)); /// buffered.write_all(&[1, 2]).await?; /// buffered.write_all(&[3, 4]).await?; /// buffered.flush().await?; /// } /// /// assert_eq!(output, [1, 2, 3, 4, 0]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` fn flush(&mut self) -> Flush<'_, Self> where Self: Unpin, { Flush::new(self) } /// Creates a future which will entirely close this `AsyncWrite`. fn close(&mut self) -> Close<'_, Self> where Self: Unpin, { Close::new(self) } /// Creates a future which will write bytes from `buf` into the object. /// /// The returned future will resolve to the number of bytes written once the write /// operation is completed. fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self> where Self: Unpin, { Write::new(self, buf) } /// Creates a future which will write bytes from `bufs` into the object using vectored /// IO operations. /// /// The returned future will resolve to the number of bytes written once the write /// operation is completed. fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self> where Self: Unpin, { WriteVectored::new(self, bufs) } /// Write data into this object. /// /// Creates a future that will write the entire contents of the buffer `buf` into /// this `AsyncWrite`. /// /// The returned future will not complete until all the data has been written. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::{AsyncWriteExt, Cursor}; /// /// let mut writer = Cursor::new(vec![0u8; 5]); /// /// writer.write_all(&[1, 2, 3, 4]).await?; /// /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self> where Self: Unpin, { WriteAll::new(self, buf) } /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`. /// Requires the `io-compat` feature to enable. #[cfg(feature = "io-compat")] fn compat_write(self) -> Compat where Self: Sized + Unpin, { Compat::new(self) } /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`>`. /// /// This adapter produces a sink that will write each value passed to it /// into the underlying writer. /// /// Note that this function consumes the given writer, returning a wrapped /// version. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::AsyncWriteExt; /// use futures::stream::{self, StreamExt}; /// /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]); /// /// let mut writer = vec![]; /// /// stream.forward((&mut writer).into_sink()).await?; /// /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]); /// # Ok::<(), Box>(()) /// # })?; /// # Ok::<(), Box>(()) /// ``` #[cfg(feature = "sink")] fn into_sink>(self) -> IntoSink where Self: Sized, { IntoSink::new(self) } } impl AsyncWriteExt for W {} /// An extension trait which adds utility methods to `AsyncSeek` types. pub trait AsyncSeekExt: AsyncSeek { /// Creates a future which will seek an IO object, and then yield the /// new position in the object and the object itself. /// /// In the case of an error the buffer and the object will be discarded, with /// the error yielded. fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> where Self: Unpin, { Seek::new(self, pos) } } impl AsyncSeekExt for S {} /// An extension trait which adds utility methods to `AsyncBufRead` types. pub trait AsyncBufReadExt: AsyncBufRead { /// Creates a future which will read all the bytes associated with this I/O /// object into `buf` until the delimiter `byte` or EOF is reached. /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until). /// /// This function will read bytes from the underlying stream until the /// delimiter or EOF is found. Once found, all bytes up to, and including, /// the delimiter (if found) will be appended to `buf`. /// /// The returned future will resolve to the number of bytes read once the read /// operation is completed. /// /// In the case of an error the buffer and the object will be discarded, with /// the error yielded. /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::{AsyncBufReadExt, Cursor}; /// /// let mut cursor = Cursor::new(b"lorem-ipsum"); /// let mut buf = vec![]; /// /// // cursor is at 'l' /// let num_bytes = cursor.read_until(b'-', &mut buf).await?; /// assert_eq!(num_bytes, 6); /// assert_eq!(buf, b"lorem-"); /// buf.clear(); /// /// // cursor is at 'i' /// let num_bytes = cursor.read_until(b'-', &mut buf).await?; /// assert_eq!(num_bytes, 5); /// assert_eq!(buf, b"ipsum"); /// buf.clear(); /// /// // cursor is at EOF /// let num_bytes = cursor.read_until(b'-', &mut buf).await?; /// assert_eq!(num_bytes, 0); /// assert_eq!(buf, b""); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` fn read_until<'a>( &'a mut self, byte: u8, buf: &'a mut Vec, ) -> ReadUntil<'a, Self> where Self: Unpin, { ReadUntil::new(self, byte, buf) } /// Creates a future which will read all the bytes associated with this I/O /// object into `buf` until a newline (the 0xA byte) or EOF is reached, /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line). /// /// This function will read bytes from the underlying stream until the /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes /// up to, and including, the delimiter (if found) will be appended to /// `buf`. /// /// The returned future will resolve to the number of bytes read once the read /// operation is completed. /// /// In the case of an error the buffer and the object will be discarded, with /// the error yielded. /// /// # Errors /// /// This function has the same error semantics as [`read_until`] and will /// also return an error if the read bytes are not valid UTF-8. If an I/O /// error is encountered then `buf` may contain some bytes already read in /// the event that all data read so far was valid UTF-8. /// /// [`read_until`]: AsyncBufReadExt::read_until /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::{AsyncBufReadExt, Cursor}; /// /// let mut cursor = Cursor::new(b"foo\nbar"); /// let mut buf = String::new(); /// /// // cursor is at 'f' /// let num_bytes = cursor.read_line(&mut buf).await?; /// assert_eq!(num_bytes, 4); /// assert_eq!(buf, "foo\n"); /// buf.clear(); /// /// // cursor is at 'b' /// let num_bytes = cursor.read_line(&mut buf).await?; /// assert_eq!(num_bytes, 3); /// assert_eq!(buf, "bar"); /// buf.clear(); /// /// // cursor is at EOF /// let num_bytes = cursor.read_line(&mut buf).await?; /// assert_eq!(num_bytes, 0); /// assert_eq!(buf, ""); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> where Self: Unpin, { ReadLine::new(self, buf) } /// Returns a stream over the lines of this reader. /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines). /// /// The stream returned from this function will yield instances of /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end. /// /// [`io::Result`]: std::io::Result /// [`String`]: String /// /// # Errors /// /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`]. /// /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line /// /// # Examples /// /// ``` /// # futures::executor::block_on(async { /// use futures::io::{AsyncBufReadExt, Cursor}; /// use futures::stream::StreamExt; /// /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor"); /// /// let mut lines_stream = cursor.lines().map(|l| l.unwrap()); /// assert_eq!(lines_stream.next().await, Some(String::from("lorem"))); /// assert_eq!(lines_stream.next().await, Some(String::from("ipsum"))); /// assert_eq!(lines_stream.next().await, Some(String::from("dolor"))); /// assert_eq!(lines_stream.next().await, None); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` fn lines(self) -> Lines where Self: Sized, { Lines::new(self) } } impl AsyncBufReadExt for R {}