diff options
Diffstat (limited to 'third_party/rust/tokio-io/src/io')
-rw-r--r-- | third_party/rust/tokio-io/src/io/copy.rs | 100 | ||||
-rw-r--r-- | third_party/rust/tokio-io/src/io/flush.rs | 43 | ||||
-rw-r--r-- | third_party/rust/tokio-io/src/io/mod.rs | 32 | ||||
-rw-r--r-- | third_party/rust/tokio-io/src/io/read.rs | 60 | ||||
-rw-r--r-- | third_party/rust/tokio-io/src/io/read_exact.rs | 85 | ||||
-rw-r--r-- | third_party/rust/tokio-io/src/io/read_to_end.rs | 66 | ||||
-rw-r--r-- | third_party/rust/tokio-io/src/io/read_until.rs | 76 | ||||
-rw-r--r-- | third_party/rust/tokio-io/src/io/shutdown.rs | 44 | ||||
-rw-r--r-- | third_party/rust/tokio-io/src/io/write_all.rs | 88 |
9 files changed, 594 insertions, 0 deletions
diff --git a/third_party/rust/tokio-io/src/io/copy.rs b/third_party/rust/tokio-io/src/io/copy.rs new file mode 100644 index 0000000000..e8a1dac95d --- /dev/null +++ b/third_party/rust/tokio-io/src/io/copy.rs @@ -0,0 +1,100 @@ +use std::io; + +use futures::{Future, Poll}; + +use {AsyncRead, AsyncWrite}; + +/// A future which will copy all data from a reader into a writer. +/// +/// Created by the [`copy`] function, this future will resolve to the number of +/// bytes copied or an error if one happens. +/// +/// [`copy`]: fn.copy.html +#[derive(Debug)] +pub struct Copy<R, W> { + reader: Option<R>, + read_done: bool, + writer: Option<W>, + pos: usize, + cap: usize, + amt: u64, + buf: Box<[u8]>, +} + +/// Creates a future which represents copying all the bytes from one object to +/// another. +/// +/// The returned future will copy all the bytes read from `reader` into the +/// `writer` specified. This future will only complete once the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned and the `reader` and `writer` are +/// consumed. On error the error is returned and the I/O objects are consumed as +/// well. +pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W> +where + R: AsyncRead, + W: AsyncWrite, +{ + Copy { + reader: Some(reader), + read_done: false, + writer: Some(writer), + amt: 0, + pos: 0, + cap: 0, + buf: Box::new([0; 2048]), + } +} + +impl<R, W> Future for Copy<R, W> +where + R: AsyncRead, + W: AsyncWrite, +{ + type Item = (u64, R, W); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(u64, R, W), io::Error> { + loop { + // If our buffer is empty, then we need to read some data to + // continue. + if self.pos == self.cap && !self.read_done { + let reader = self.reader.as_mut().unwrap(); + let n = try_ready!(reader.poll_read(&mut self.buf)); + if n == 0 { + self.read_done = true; + } else { + self.pos = 0; + self.cap = n; + } + } + + // If our buffer has some data, let's write it out! + while self.pos < self.cap { + let writer = self.writer.as_mut().unwrap(); + let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap])); + if i == 0 { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "write zero byte into writer", + )); + } else { + self.pos += i; + self.amt += i as u64; + } + } + + // If we've written al the data and we've seen EOF, flush out the + // data and finish the transfer. + // done with the entire transfer. + if self.pos == self.cap && self.read_done { + try_ready!(self.writer.as_mut().unwrap().poll_flush()); + let reader = self.reader.take().unwrap(); + let writer = self.writer.take().unwrap(); + return Ok((self.amt, reader, writer).into()); + } + } + } +} diff --git a/third_party/rust/tokio-io/src/io/flush.rs b/third_party/rust/tokio-io/src/io/flush.rs new file mode 100644 index 0000000000..febc7ee1b1 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/flush.rs @@ -0,0 +1,43 @@ +use std::io; + +use futures::{Async, Future, Poll}; + +use AsyncWrite; + +/// A future used to fully flush an I/O object. +/// +/// Resolves to the underlying I/O object once the flush operation is complete. +/// +/// Created by the [`flush`] function. +/// +/// [`flush`]: fn.flush.html +#[derive(Debug)] +pub struct Flush<A> { + a: Option<A>, +} + +/// Creates a future which will entirely flush an I/O object and then yield the +/// object itself. +/// +/// This function will consume the object provided if an error happens, and +/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling +/// a retry if `WouldBlock` is seen along the way. +pub fn flush<A>(a: A) -> Flush<A> +where + A: AsyncWrite, +{ + Flush { a: Some(a) } +} + +impl<A> Future for Flush<A> +where + A: AsyncWrite, +{ + type Item = A; + type Error = io::Error; + + fn poll(&mut self) -> Poll<A, io::Error> { + try_ready!(self.a.as_mut().unwrap().poll_flush()); + Ok(Async::Ready(self.a.take().unwrap())) + } +} diff --git a/third_party/rust/tokio-io/src/io/mod.rs b/third_party/rust/tokio-io/src/io/mod.rs new file mode 100644 index 0000000000..763cfaee7f --- /dev/null +++ b/third_party/rust/tokio-io/src/io/mod.rs @@ -0,0 +1,32 @@ +//! I/O conveniences when working with primitives in `tokio-core` +//! +//! Contains various combinators to work with I/O objects and type definitions +//! as well. +//! +//! A description of the high-level I/O combinators can be [found online] in +//! addition to a description of the [low level details]. +//! +//! [found online]: https://tokio.rs/docs/getting-started/core/ +//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/ + +mod copy; +mod flush; +mod read; +mod read_exact; +mod read_to_end; +mod read_until; +mod shutdown; +mod write_all; + +pub use self::copy::{copy, Copy}; +pub use self::flush::{flush, Flush}; +pub use self::read::{read, Read}; +pub use self::read_exact::{read_exact, ReadExact}; +pub use self::read_to_end::{read_to_end, ReadToEnd}; +pub use self::read_until::{read_until, ReadUntil}; +pub use self::shutdown::{shutdown, Shutdown}; +pub use self::write_all::{write_all, WriteAll}; +pub use allow_std::AllowStdIo; +pub use lines::{lines, Lines}; +pub use split::{ReadHalf, WriteHalf}; +pub use window::Window; diff --git a/third_party/rust/tokio-io/src/io/read.rs b/third_party/rust/tokio-io/src/io/read.rs new file mode 100644 index 0000000000..632cef4d28 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/read.rs @@ -0,0 +1,60 @@ +use std::io; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncRead; + +#[derive(Debug)] +enum State<R, T> { + Pending { rd: R, buf: T }, + Empty, +} + +/// Tries to read some bytes directly into the given `buf` in asynchronous +/// manner, returning a future type. +/// +/// The returned future will resolve to both the I/O stream and the buffer +/// as well as the number of bytes read once the read operation is completed. +pub fn read<R, T>(rd: R, buf: T) -> Read<R, T> +where + R: AsyncRead, + T: AsMut<[u8]>, +{ + Read { + state: State::Pending { rd: rd, buf: buf }, + } +} + +/// A future which can be used to easily read available number of bytes to fill +/// a buffer. +/// +/// Created by the [`read`] function. +#[derive(Debug)] +pub struct Read<R, T> { + state: State<R, T>, +} + +impl<R, T> Future for Read<R, T> +where + R: AsyncRead, + T: AsMut<[u8]>, +{ + type Item = (R, T, usize); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(R, T, usize), io::Error> { + let nread = match self.state { + State::Pending { + ref mut rd, + ref mut buf, + } => try_ready!(rd.poll_read(&mut buf.as_mut()[..])), + State::Empty => panic!("poll a Read after it's done"), + }; + + match mem::replace(&mut self.state, State::Empty) { + State::Pending { rd, buf } => Ok((rd, buf, nread).into()), + State::Empty => panic!("invalid internal state"), + } + } +} diff --git a/third_party/rust/tokio-io/src/io/read_exact.rs b/third_party/rust/tokio-io/src/io/read_exact.rs new file mode 100644 index 0000000000..3b98621ae4 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/read_exact.rs @@ -0,0 +1,85 @@ +use std::io; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncRead; + +/// A future which can be used to easily read exactly enough bytes to fill +/// a buffer. +/// +/// Created by the [`read_exact`] function. +/// +/// [`read_exact`]: fn.read_exact.html +#[derive(Debug)] +pub struct ReadExact<A, T> { + state: State<A, T>, +} + +#[derive(Debug)] +enum State<A, T> { + Reading { a: A, buf: T, pos: usize }, + Empty, +} + +/// Creates a future which will read exactly enough bytes to fill `buf`, +/// returning an error if EOF is hit sooner. +/// +/// The returned future will resolve to both the I/O stream as well as the +/// buffer once the read operation is completed. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all data read from the stream appended to +/// the buffer. +pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T> +where + A: AsyncRead, + T: AsMut<[u8]>, +{ + ReadExact { + state: State::Reading { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn eof() -> io::Error { + io::Error::new(io::ErrorKind::UnexpectedEof, "early eof") +} + +impl<A, T> Future for ReadExact<A, T> +where + A: AsyncRead, + T: AsMut<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T), io::Error> { + match self.state { + State::Reading { + ref mut a, + ref mut buf, + ref mut pos, + } => { + let buf = buf.as_mut(); + while *pos < buf.len() { + let n = try_ready!(a.poll_read(&mut buf[*pos..])); + *pos += n; + if n == 0 { + return Err(eof()); + } + } + } + State::Empty => panic!("poll a ReadExact after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf, .. } => Ok((a, buf).into()), + State::Empty => panic!(), + } + } +} diff --git a/third_party/rust/tokio-io/src/io/read_to_end.rs b/third_party/rust/tokio-io/src/io/read_to_end.rs new file mode 100644 index 0000000000..296af6e304 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/read_to_end.rs @@ -0,0 +1,66 @@ +use std::io; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncRead; + +/// A future which can be used to easily read the entire contents of a stream +/// into a vector. +/// +/// Created by the [`read_to_end`] function. +/// +/// [`read_to_end`]: fn.read_to_end.html +#[derive(Debug)] +pub struct ReadToEnd<A> { + state: State<A>, +} + +#[derive(Debug)] +enum State<A> { + Reading { a: A, buf: Vec<u8> }, + Empty, +} + +/// Creates a future which will read all the bytes associated with the I/O +/// object `A` into the buffer provided. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success both the object and the buffer +/// will be returned, with all data read from the stream appended to the buffer. +pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A> +where + A: AsyncRead, +{ + ReadToEnd { + state: State::Reading { a: a, buf: buf }, + } +} + +impl<A> Future for ReadToEnd<A> +where + A: AsyncRead, +{ + type Item = (A, Vec<u8>); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> { + match self.state { + State::Reading { + ref mut a, + ref mut buf, + } => { + // If we get `Ok`, then we know the stream hit EOF and we're done. If we + // hit "would block" then all the read data so far is in our buffer, and + // otherwise we propagate errors + try_nb!(a.read_to_end(buf)); + } + State::Empty => panic!("poll ReadToEnd after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf } => Ok((a, buf).into()), + State::Empty => unreachable!(), + } + } +} diff --git a/third_party/rust/tokio-io/src/io/read_until.rs b/third_party/rust/tokio-io/src/io/read_until.rs new file mode 100644 index 0000000000..d0be4c9448 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/read_until.rs @@ -0,0 +1,76 @@ +use std::io::{self, BufRead}; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncRead; + +/// A future which can be used to easily read the contents of a stream into a +/// vector until the delimiter is reached. +/// +/// Created by the [`read_until`] function. +/// +/// [`read_until`]: fn.read_until.html +#[derive(Debug)] +pub struct ReadUntil<A> { + state: State<A>, +} + +#[derive(Debug)] +enum State<A> { + Reading { a: A, byte: u8, buf: Vec<u8> }, + Empty, +} + +/// Creates a future which will read all the bytes associated with the I/O +/// object `A` into the buffer provided until the delimiter `byte` is reached. +/// This method is the async equivalent to [`BufRead::read_until`]. +/// +/// In case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all bytes up to, and including, the delimiter +/// (if found). +/// +/// [`BufRead::read_until`]: https://doc.rust-lang.org/std/io/trait.BufRead.html#method.read_until +pub fn read_until<A>(a: A, byte: u8, buf: Vec<u8>) -> ReadUntil<A> +where + A: AsyncRead + BufRead, +{ + ReadUntil { + state: State::Reading { + a: a, + byte: byte, + buf: buf, + }, + } +} + +impl<A> Future for ReadUntil<A> +where + A: AsyncRead + BufRead, +{ + type Item = (A, Vec<u8>); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> { + match self.state { + State::Reading { + ref mut a, + byte, + ref mut buf, + } => { + // If we get `Ok(n)`, then we know the stream hit EOF or the delimiter. + // and just return it, as we are finished. + // If we hit "would block" then all the read data so far + // is in our buffer, and otherwise we propagate errors. + try_nb!(a.read_until(byte, buf)); + } + State::Empty => panic!("poll ReadUntil after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, byte: _, buf } => Ok((a, buf).into()), + State::Empty => unreachable!(), + } + } +} diff --git a/third_party/rust/tokio-io/src/io/shutdown.rs b/third_party/rust/tokio-io/src/io/shutdown.rs new file mode 100644 index 0000000000..d963a813ad --- /dev/null +++ b/third_party/rust/tokio-io/src/io/shutdown.rs @@ -0,0 +1,44 @@ +use std::io; + +use futures::{Async, Future, Poll}; + +use AsyncWrite; + +/// A future used to fully shutdown an I/O object. +/// +/// Resolves to the underlying I/O object once the shutdown operation is +/// complete. +/// +/// Created by the [`shutdown`] function. +/// +/// [`shutdown`]: fn.shutdown.html +#[derive(Debug)] +pub struct Shutdown<A> { + a: Option<A>, +} + +/// Creates a future which will entirely shutdown an I/O object and then yield +/// the object itself. +/// +/// This function will consume the object provided if an error happens, and +/// otherwise it will repeatedly call `shutdown` until it sees `Ok(())`, +/// scheduling a retry if `WouldBlock` is seen along the way. +pub fn shutdown<A>(a: A) -> Shutdown<A> +where + A: AsyncWrite, +{ + Shutdown { a: Some(a) } +} + +impl<A> Future for Shutdown<A> +where + A: AsyncWrite, +{ + type Item = A; + type Error = io::Error; + + fn poll(&mut self) -> Poll<A, io::Error> { + try_ready!(self.a.as_mut().unwrap().shutdown()); + Ok(Async::Ready(self.a.take().unwrap())) + } +} diff --git a/third_party/rust/tokio-io/src/io/write_all.rs b/third_party/rust/tokio-io/src/io/write_all.rs new file mode 100644 index 0000000000..ba8af4a222 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/write_all.rs @@ -0,0 +1,88 @@ +use std::io; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncWrite; + +/// A future used to write the entire contents of some data to a stream. +/// +/// This is created by the [`write_all`] top-level method. +/// +/// [`write_all`]: fn.write_all.html +#[derive(Debug)] +pub struct WriteAll<A, T> { + state: State<A, T>, +} + +#[derive(Debug)] +enum State<A, T> { + Writing { a: A, buf: T, pos: usize }, + Empty, +} + +/// Creates a future that will write the entire contents of the buffer `buf` to +/// the stream `a` provided. +/// +/// The returned future will not return until all the data has been written, and +/// the future will resolve to the stream as well as the buffer (for reuse if +/// needed). +/// +/// Any error which happens during writing will cause both the stream and the +/// buffer to get destroyed. +/// +/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should +/// be broadly applicable to accepting data which can be converted to a slice. +/// The `Window` struct is also available in this crate to provide a different +/// window into a slice if necessary. +pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T> +where + A: AsyncWrite, + T: AsRef<[u8]>, +{ + WriteAll { + state: State::Writing { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn zero_write() -> io::Error { + io::Error::new(io::ErrorKind::WriteZero, "zero-length write") +} + +impl<A, T> Future for WriteAll<A, T> +where + A: AsyncWrite, + T: AsRef<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T), io::Error> { + match self.state { + State::Writing { + ref mut a, + ref buf, + ref mut pos, + } => { + let buf = buf.as_ref(); + while *pos < buf.len() { + let n = try_ready!(a.poll_write(&buf[*pos..])); + *pos += n; + if n == 0 { + return Err(zero_write()); + } + } + } + State::Empty => panic!("poll a WriteAll after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Writing { a, buf, .. } => Ok((a, buf).into()), + State::Empty => panic!(), + } + } +} |