summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-io/src/io
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-io/src/io')
-rw-r--r--third_party/rust/tokio-io/src/io/copy.rs100
-rw-r--r--third_party/rust/tokio-io/src/io/flush.rs43
-rw-r--r--third_party/rust/tokio-io/src/io/mod.rs32
-rw-r--r--third_party/rust/tokio-io/src/io/read.rs60
-rw-r--r--third_party/rust/tokio-io/src/io/read_exact.rs85
-rw-r--r--third_party/rust/tokio-io/src/io/read_to_end.rs66
-rw-r--r--third_party/rust/tokio-io/src/io/read_until.rs76
-rw-r--r--third_party/rust/tokio-io/src/io/shutdown.rs44
-rw-r--r--third_party/rust/tokio-io/src/io/write_all.rs88
9 files changed, 594 insertions, 0 deletions
diff --git a/third_party/rust/tokio-io/src/io/copy.rs b/third_party/rust/tokio-io/src/io/copy.rs
new file mode 100644
index 0000000000..e8a1dac95d
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/copy.rs
@@ -0,0 +1,100 @@
+use std::io;
+
+use futures::{Future, Poll};
+
+use {AsyncRead, AsyncWrite};
+
+/// A future which will copy all data from a reader into a writer.
+///
+/// Created by the [`copy`] function, this future will resolve to the number of
+/// bytes copied or an error if one happens.
+///
+/// [`copy`]: fn.copy.html
+#[derive(Debug)]
+pub struct Copy<R, W> {
+ reader: Option<R>,
+ read_done: bool,
+ writer: Option<W>,
+ pos: usize,
+ cap: usize,
+ amt: u64,
+ buf: Box<[u8]>,
+}
+
+/// Creates a future which represents copying all the bytes from one object to
+/// another.
+///
+/// The returned future will copy all the bytes read from `reader` into the
+/// `writer` specified. This future will only complete once the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned and the `reader` and `writer` are
+/// consumed. On error the error is returned and the I/O objects are consumed as
+/// well.
+pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
+where
+ R: AsyncRead,
+ W: AsyncWrite,
+{
+ Copy {
+ reader: Some(reader),
+ read_done: false,
+ writer: Some(writer),
+ amt: 0,
+ pos: 0,
+ cap: 0,
+ buf: Box::new([0; 2048]),
+ }
+}
+
+impl<R, W> Future for Copy<R, W>
+where
+ R: AsyncRead,
+ W: AsyncWrite,
+{
+ type Item = (u64, R, W);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(u64, R, W), io::Error> {
+ loop {
+ // If our buffer is empty, then we need to read some data to
+ // continue.
+ if self.pos == self.cap && !self.read_done {
+ let reader = self.reader.as_mut().unwrap();
+ let n = try_ready!(reader.poll_read(&mut self.buf));
+ if n == 0 {
+ self.read_done = true;
+ } else {
+ self.pos = 0;
+ self.cap = n;
+ }
+ }
+
+ // If our buffer has some data, let's write it out!
+ while self.pos < self.cap {
+ let writer = self.writer.as_mut().unwrap();
+ let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap]));
+ if i == 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::WriteZero,
+ "write zero byte into writer",
+ ));
+ } else {
+ self.pos += i;
+ self.amt += i as u64;
+ }
+ }
+
+ // If we've written al the data and we've seen EOF, flush out the
+ // data and finish the transfer.
+ // done with the entire transfer.
+ if self.pos == self.cap && self.read_done {
+ try_ready!(self.writer.as_mut().unwrap().poll_flush());
+ let reader = self.reader.take().unwrap();
+ let writer = self.writer.take().unwrap();
+ return Ok((self.amt, reader, writer).into());
+ }
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/flush.rs b/third_party/rust/tokio-io/src/io/flush.rs
new file mode 100644
index 0000000000..febc7ee1b1
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/flush.rs
@@ -0,0 +1,43 @@
+use std::io;
+
+use futures::{Async, Future, Poll};
+
+use AsyncWrite;
+
+/// A future used to fully flush an I/O object.
+///
+/// Resolves to the underlying I/O object once the flush operation is complete.
+///
+/// Created by the [`flush`] function.
+///
+/// [`flush`]: fn.flush.html
+#[derive(Debug)]
+pub struct Flush<A> {
+ a: Option<A>,
+}
+
+/// Creates a future which will entirely flush an I/O object and then yield the
+/// object itself.
+///
+/// This function will consume the object provided if an error happens, and
+/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling
+/// a retry if `WouldBlock` is seen along the way.
+pub fn flush<A>(a: A) -> Flush<A>
+where
+ A: AsyncWrite,
+{
+ Flush { a: Some(a) }
+}
+
+impl<A> Future for Flush<A>
+where
+ A: AsyncWrite,
+{
+ type Item = A;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<A, io::Error> {
+ try_ready!(self.a.as_mut().unwrap().poll_flush());
+ Ok(Async::Ready(self.a.take().unwrap()))
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/mod.rs b/third_party/rust/tokio-io/src/io/mod.rs
new file mode 100644
index 0000000000..763cfaee7f
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/mod.rs
@@ -0,0 +1,32 @@
+//! I/O conveniences when working with primitives in `tokio-core`
+//!
+//! Contains various combinators to work with I/O objects and type definitions
+//! as well.
+//!
+//! A description of the high-level I/O combinators can be [found online] in
+//! addition to a description of the [low level details].
+//!
+//! [found online]: https://tokio.rs/docs/getting-started/core/
+//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/
+
+mod copy;
+mod flush;
+mod read;
+mod read_exact;
+mod read_to_end;
+mod read_until;
+mod shutdown;
+mod write_all;
+
+pub use self::copy::{copy, Copy};
+pub use self::flush::{flush, Flush};
+pub use self::read::{read, Read};
+pub use self::read_exact::{read_exact, ReadExact};
+pub use self::read_to_end::{read_to_end, ReadToEnd};
+pub use self::read_until::{read_until, ReadUntil};
+pub use self::shutdown::{shutdown, Shutdown};
+pub use self::write_all::{write_all, WriteAll};
+pub use allow_std::AllowStdIo;
+pub use lines::{lines, Lines};
+pub use split::{ReadHalf, WriteHalf};
+pub use window::Window;
diff --git a/third_party/rust/tokio-io/src/io/read.rs b/third_party/rust/tokio-io/src/io/read.rs
new file mode 100644
index 0000000000..632cef4d28
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/read.rs
@@ -0,0 +1,60 @@
+use std::io;
+use std::mem;
+
+use futures::{Future, Poll};
+
+use AsyncRead;
+
+#[derive(Debug)]
+enum State<R, T> {
+ Pending { rd: R, buf: T },
+ Empty,
+}
+
+/// Tries to read some bytes directly into the given `buf` in asynchronous
+/// manner, returning a future type.
+///
+/// The returned future will resolve to both the I/O stream and the buffer
+/// as well as the number of bytes read once the read operation is completed.
+pub fn read<R, T>(rd: R, buf: T) -> Read<R, T>
+where
+ R: AsyncRead,
+ T: AsMut<[u8]>,
+{
+ Read {
+ state: State::Pending { rd: rd, buf: buf },
+ }
+}
+
+/// A future which can be used to easily read available number of bytes to fill
+/// a buffer.
+///
+/// Created by the [`read`] function.
+#[derive(Debug)]
+pub struct Read<R, T> {
+ state: State<R, T>,
+}
+
+impl<R, T> Future for Read<R, T>
+where
+ R: AsyncRead,
+ T: AsMut<[u8]>,
+{
+ type Item = (R, T, usize);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(R, T, usize), io::Error> {
+ let nread = match self.state {
+ State::Pending {
+ ref mut rd,
+ ref mut buf,
+ } => try_ready!(rd.poll_read(&mut buf.as_mut()[..])),
+ State::Empty => panic!("poll a Read after it's done"),
+ };
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Pending { rd, buf } => Ok((rd, buf, nread).into()),
+ State::Empty => panic!("invalid internal state"),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/read_exact.rs b/third_party/rust/tokio-io/src/io/read_exact.rs
new file mode 100644
index 0000000000..3b98621ae4
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/read_exact.rs
@@ -0,0 +1,85 @@
+use std::io;
+use std::mem;
+
+use futures::{Future, Poll};
+
+use AsyncRead;
+
+/// A future which can be used to easily read exactly enough bytes to fill
+/// a buffer.
+///
+/// Created by the [`read_exact`] function.
+///
+/// [`read_exact`]: fn.read_exact.html
+#[derive(Debug)]
+pub struct ReadExact<A, T> {
+ state: State<A, T>,
+}
+
+#[derive(Debug)]
+enum State<A, T> {
+ Reading { a: A, buf: T, pos: usize },
+ Empty,
+}
+
+/// Creates a future which will read exactly enough bytes to fill `buf`,
+/// returning an error if EOF is hit sooner.
+///
+/// The returned future will resolve to both the I/O stream as well as the
+/// buffer once the read operation is completed.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all data read from the stream appended to
+/// the buffer.
+pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T>
+where
+ A: AsyncRead,
+ T: AsMut<[u8]>,
+{
+ ReadExact {
+ state: State::Reading {
+ a: a,
+ buf: buf,
+ pos: 0,
+ },
+ }
+}
+
+fn eof() -> io::Error {
+ io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")
+}
+
+impl<A, T> Future for ReadExact<A, T>
+where
+ A: AsyncRead,
+ T: AsMut<[u8]>,
+{
+ type Item = (A, T);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, T), io::Error> {
+ match self.state {
+ State::Reading {
+ ref mut a,
+ ref mut buf,
+ ref mut pos,
+ } => {
+ let buf = buf.as_mut();
+ while *pos < buf.len() {
+ let n = try_ready!(a.poll_read(&mut buf[*pos..]));
+ *pos += n;
+ if n == 0 {
+ return Err(eof());
+ }
+ }
+ }
+ State::Empty => panic!("poll a ReadExact after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, buf, .. } => Ok((a, buf).into()),
+ State::Empty => panic!(),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/read_to_end.rs b/third_party/rust/tokio-io/src/io/read_to_end.rs
new file mode 100644
index 0000000000..296af6e304
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/read_to_end.rs
@@ -0,0 +1,66 @@
+use std::io;
+use std::mem;
+
+use futures::{Future, Poll};
+
+use AsyncRead;
+
+/// A future which can be used to easily read the entire contents of a stream
+/// into a vector.
+///
+/// Created by the [`read_to_end`] function.
+///
+/// [`read_to_end`]: fn.read_to_end.html
+#[derive(Debug)]
+pub struct ReadToEnd<A> {
+ state: State<A>,
+}
+
+#[derive(Debug)]
+enum State<A> {
+ Reading { a: A, buf: Vec<u8> },
+ Empty,
+}
+
+/// Creates a future which will read all the bytes associated with the I/O
+/// object `A` into the buffer provided.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success both the object and the buffer
+/// will be returned, with all data read from the stream appended to the buffer.
+pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A>
+where
+ A: AsyncRead,
+{
+ ReadToEnd {
+ state: State::Reading { a: a, buf: buf },
+ }
+}
+
+impl<A> Future for ReadToEnd<A>
+where
+ A: AsyncRead,
+{
+ type Item = (A, Vec<u8>);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> {
+ match self.state {
+ State::Reading {
+ ref mut a,
+ ref mut buf,
+ } => {
+ // If we get `Ok`, then we know the stream hit EOF and we're done. If we
+ // hit "would block" then all the read data so far is in our buffer, and
+ // otherwise we propagate errors
+ try_nb!(a.read_to_end(buf));
+ }
+ State::Empty => panic!("poll ReadToEnd after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, buf } => Ok((a, buf).into()),
+ State::Empty => unreachable!(),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/read_until.rs b/third_party/rust/tokio-io/src/io/read_until.rs
new file mode 100644
index 0000000000..d0be4c9448
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/read_until.rs
@@ -0,0 +1,76 @@
+use std::io::{self, BufRead};
+use std::mem;
+
+use futures::{Future, Poll};
+
+use AsyncRead;
+
+/// A future which can be used to easily read the contents of a stream into a
+/// vector until the delimiter is reached.
+///
+/// Created by the [`read_until`] function.
+///
+/// [`read_until`]: fn.read_until.html
+#[derive(Debug)]
+pub struct ReadUntil<A> {
+ state: State<A>,
+}
+
+#[derive(Debug)]
+enum State<A> {
+ Reading { a: A, byte: u8, buf: Vec<u8> },
+ Empty,
+}
+
+/// Creates a future which will read all the bytes associated with the I/O
+/// object `A` into the buffer provided until the delimiter `byte` is reached.
+/// This method is the async equivalent to [`BufRead::read_until`].
+///
+/// In case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all bytes up to, and including, the delimiter
+/// (if found).
+///
+/// [`BufRead::read_until`]: https://doc.rust-lang.org/std/io/trait.BufRead.html#method.read_until
+pub fn read_until<A>(a: A, byte: u8, buf: Vec<u8>) -> ReadUntil<A>
+where
+ A: AsyncRead + BufRead,
+{
+ ReadUntil {
+ state: State::Reading {
+ a: a,
+ byte: byte,
+ buf: buf,
+ },
+ }
+}
+
+impl<A> Future for ReadUntil<A>
+where
+ A: AsyncRead + BufRead,
+{
+ type Item = (A, Vec<u8>);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> {
+ match self.state {
+ State::Reading {
+ ref mut a,
+ byte,
+ ref mut buf,
+ } => {
+ // If we get `Ok(n)`, then we know the stream hit EOF or the delimiter.
+ // and just return it, as we are finished.
+ // If we hit "would block" then all the read data so far
+ // is in our buffer, and otherwise we propagate errors.
+ try_nb!(a.read_until(byte, buf));
+ }
+ State::Empty => panic!("poll ReadUntil after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, byte: _, buf } => Ok((a, buf).into()),
+ State::Empty => unreachable!(),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/shutdown.rs b/third_party/rust/tokio-io/src/io/shutdown.rs
new file mode 100644
index 0000000000..d963a813ad
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/shutdown.rs
@@ -0,0 +1,44 @@
+use std::io;
+
+use futures::{Async, Future, Poll};
+
+use AsyncWrite;
+
+/// A future used to fully shutdown an I/O object.
+///
+/// Resolves to the underlying I/O object once the shutdown operation is
+/// complete.
+///
+/// Created by the [`shutdown`] function.
+///
+/// [`shutdown`]: fn.shutdown.html
+#[derive(Debug)]
+pub struct Shutdown<A> {
+ a: Option<A>,
+}
+
+/// Creates a future which will entirely shutdown an I/O object and then yield
+/// the object itself.
+///
+/// This function will consume the object provided if an error happens, and
+/// otherwise it will repeatedly call `shutdown` until it sees `Ok(())`,
+/// scheduling a retry if `WouldBlock` is seen along the way.
+pub fn shutdown<A>(a: A) -> Shutdown<A>
+where
+ A: AsyncWrite,
+{
+ Shutdown { a: Some(a) }
+}
+
+impl<A> Future for Shutdown<A>
+where
+ A: AsyncWrite,
+{
+ type Item = A;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<A, io::Error> {
+ try_ready!(self.a.as_mut().unwrap().shutdown());
+ Ok(Async::Ready(self.a.take().unwrap()))
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/write_all.rs b/third_party/rust/tokio-io/src/io/write_all.rs
new file mode 100644
index 0000000000..ba8af4a222
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/write_all.rs
@@ -0,0 +1,88 @@
+use std::io;
+use std::mem;
+
+use futures::{Future, Poll};
+
+use AsyncWrite;
+
+/// A future used to write the entire contents of some data to a stream.
+///
+/// This is created by the [`write_all`] top-level method.
+///
+/// [`write_all`]: fn.write_all.html
+#[derive(Debug)]
+pub struct WriteAll<A, T> {
+ state: State<A, T>,
+}
+
+#[derive(Debug)]
+enum State<A, T> {
+ Writing { a: A, buf: T, pos: usize },
+ Empty,
+}
+
+/// Creates a future that will write the entire contents of the buffer `buf` to
+/// the stream `a` provided.
+///
+/// The returned future will not return until all the data has been written, and
+/// the future will resolve to the stream as well as the buffer (for reuse if
+/// needed).
+///
+/// Any error which happens during writing will cause both the stream and the
+/// buffer to get destroyed.
+///
+/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should
+/// be broadly applicable to accepting data which can be converted to a slice.
+/// The `Window` struct is also available in this crate to provide a different
+/// window into a slice if necessary.
+pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T>
+where
+ A: AsyncWrite,
+ T: AsRef<[u8]>,
+{
+ WriteAll {
+ state: State::Writing {
+ a: a,
+ buf: buf,
+ pos: 0,
+ },
+ }
+}
+
+fn zero_write() -> io::Error {
+ io::Error::new(io::ErrorKind::WriteZero, "zero-length write")
+}
+
+impl<A, T> Future for WriteAll<A, T>
+where
+ A: AsyncWrite,
+ T: AsRef<[u8]>,
+{
+ type Item = (A, T);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, T), io::Error> {
+ match self.state {
+ State::Writing {
+ ref mut a,
+ ref buf,
+ ref mut pos,
+ } => {
+ let buf = buf.as_ref();
+ while *pos < buf.len() {
+ let n = try_ready!(a.poll_write(&buf[*pos..]));
+ *pos += n;
+ if n == 0 {
+ return Err(zero_write());
+ }
+ }
+ }
+ State::Empty => panic!("poll a WriteAll after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Writing { a, buf, .. } => Ok((a, buf).into()),
+ State::Empty => panic!(),
+ }
+ }
+}