diff options
Diffstat (limited to 'third_party/rust/tokio-0.1.22/src/util')
-rw-r--r-- | third_party/rust/tokio-0.1.22/src/util/enumerate.rs | 84 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.22/src/util/future.rs | 93 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.22/src/util/mod.rs | 15 | ||||
-rw-r--r-- | third_party/rust/tokio-0.1.22/src/util/stream.rs | 95 |
4 files changed, 287 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.22/src/util/enumerate.rs b/third_party/rust/tokio-0.1.22/src/util/enumerate.rs new file mode 100644 index 0000000000..8f6926fa4f --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/util/enumerate.rs @@ -0,0 +1,84 @@ +use futures::{Async, Poll, Sink, StartSend, Stream}; + +/// A stream combinator which combines the yields the current item +/// plus its count starting from 0. +/// +/// This structure is produced by the `Stream::enumerate` method. +#[derive(Debug)] +#[must_use = "Does nothing unless polled"] +pub struct Enumerate<T> { + inner: T, + count: usize, +} + +impl<T> Enumerate<T> { + pub(crate) fn new(stream: T) -> Self { + Self { + inner: stream, + count: 0, + } + } + + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> T { + self.inner + } +} + +impl<T> Stream for Enumerate<T> +where + T: Stream, +{ + type Item = (usize, T::Item); + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, T::Error> { + match try_ready!(self.inner.poll()) { + Some(item) => { + let ret = Some((self.count, item)); + self.count += 1; + Ok(Async::Ready(ret)) + } + None => return Ok(Async::Ready(None)), + } + } +} + +// Forwarding impl of Sink from the underlying stream +impl<T> Sink for Enumerate<T> +where + T: Sink, +{ + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), T::SinkError> { + self.inner.poll_complete() + } + + fn close(&mut self) -> Poll<(), T::SinkError> { + self.inner.close() + } +} diff --git a/third_party/rust/tokio-0.1.22/src/util/future.rs b/third_party/rust/tokio-0.1.22/src/util/future.rs new file mode 100644 index 0000000000..5a3818101c --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/util/future.rs @@ -0,0 +1,93 @@ +#[cfg(feature = "timer")] +#[allow(deprecated)] +use tokio_timer::Deadline; +#[cfg(feature = "timer")] +use tokio_timer::Timeout; + +use futures::Future; + +#[cfg(feature = "timer")] +use std::time::{Duration, Instant}; + +/// An extension trait for `Future` that provides a variety of convenient +/// combinator functions. +/// +/// Currently, there only is a [`timeout`] function, but this will increase +/// over time. +/// +/// Users are not expected to implement this trait. All types that implement +/// `Future` already implement `FutureExt`. +/// +/// This trait can be imported directly or via the Tokio prelude: `use +/// tokio::prelude::*`. +/// +/// [`timeout`]: #method.timeout +pub trait FutureExt: Future { + /// Creates a new future which allows `self` until `timeout`. + /// + /// This combinator creates a new future which wraps the receiving future + /// with a timeout. The returned future is allowed to execute until it + /// completes or `timeout` has elapsed, whichever happens first. + /// + /// If the future completes before `timeout` then the future will resolve + /// with that item. Otherwise the future will resolve to an error. + /// + /// The future is guaranteed to be polled at least once, even if `timeout` + /// is set to zero. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// use tokio::prelude::*; + /// use std::time::Duration; + /// # use futures::future::{self, FutureResult}; + /// + /// # fn long_future() -> FutureResult<(), ()> { + /// # future::ok(()) + /// # } + /// # + /// # fn main() { + /// let future = long_future() + /// .timeout(Duration::from_secs(1)) + /// .map_err(|e| println!("error = {:?}", e)); + /// + /// tokio::run(future); + /// # } + /// ``` + #[cfg(feature = "timer")] + fn timeout(self, timeout: Duration) -> Timeout<Self> + where + Self: Sized, + { + Timeout::new(self, timeout) + } + + #[cfg(feature = "timer")] + #[deprecated(since = "0.1.8", note = "use `timeout` instead")] + #[allow(deprecated)] + #[doc(hidden)] + fn deadline(self, deadline: Instant) -> Deadline<Self> + where + Self: Sized, + { + Deadline::new(self, deadline) + } +} + +impl<T: ?Sized> FutureExt for T where T: Future {} + +#[cfg(test)] +mod test { + use super::*; + use prelude::future; + + #[cfg(feature = "timer")] + #[test] + fn timeout_polls_at_least_once() { + let base_future = future::result::<(), ()>(Ok(())); + let timeouted_future = base_future.timeout(Duration::new(0, 0)); + assert!(timeouted_future.wait().is_ok()); + } +} diff --git a/third_party/rust/tokio-0.1.22/src/util/mod.rs b/third_party/rust/tokio-0.1.22/src/util/mod.rs new file mode 100644 index 0000000000..58fd3d0b05 --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/util/mod.rs @@ -0,0 +1,15 @@ +//! Utilities for working with Tokio. +//! +//! This module contains utilities that are useful for working with Tokio. +//! Currently, this only includes [`FutureExt`] and [`StreamExt`], but this +//! may grow over time. +//! +//! [`FutureExt`]: trait.FutureExt.html +//! [`StreamExt`]: trait.StreamExt.html + +mod enumerate; +mod future; +mod stream; + +pub use self::future::FutureExt; +pub use self::stream::StreamExt; diff --git a/third_party/rust/tokio-0.1.22/src/util/stream.rs b/third_party/rust/tokio-0.1.22/src/util/stream.rs new file mode 100644 index 0000000000..3b7aa2686c --- /dev/null +++ b/third_party/rust/tokio-0.1.22/src/util/stream.rs @@ -0,0 +1,95 @@ +#[cfg(feature = "timer")] +use tokio_timer::{throttle::Throttle, Timeout}; + +use futures::Stream; + +#[cfg(feature = "timer")] +use std::time::Duration; +pub use util::enumerate::Enumerate; + +/// An extension trait for `Stream` that provides a variety of convenient +/// combinator functions. +/// +/// Currently, there are only [`timeout`] and [`throttle`] functions, but +/// this will increase over time. +/// +/// Users are not expected to implement this trait. All types that implement +/// `Stream` already implement `StreamExt`. +/// +/// This trait can be imported directly or via the Tokio prelude: `use +/// tokio::prelude::*`. +/// +/// [`timeout`]: #method.timeout +pub trait StreamExt: Stream { + /// Throttle down the stream by enforcing a fixed delay between items. + /// + /// Errors are also delayed. + #[cfg(feature = "timer")] + fn throttle(self, duration: Duration) -> Throttle<Self> + where + Self: Sized, + { + Throttle::new(self, duration) + } + + /// Creates a new stream which gives the current iteration count as well + /// as the next value. + /// + /// The stream returned yields pairs `(i, val)`, where `i` is the + /// current index of iteration and `val` is the value returned by the + /// iterator. + /// + /// # Overflow Behavior + /// + /// The method does no guarding against overflows, so counting elements of + /// an iterator with more than [`std::usize::MAX`] elements either produces the + /// wrong result or panics. + fn enumerate(self) -> Enumerate<Self> + where + Self: Sized, + { + Enumerate::new(self) + } + + /// Creates a new stream which allows `self` until `timeout`. + /// + /// This combinator creates a new stream which wraps the receiving stream + /// with a timeout. For each item, the returned stream is allowed to execute + /// until it completes or `timeout` has elapsed, whichever happens first. + /// + /// If an item completes before `timeout` then the stream will yield + /// with that item. Otherwise the stream will yield to an error. + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio; + /// # extern crate futures; + /// use tokio::prelude::*; + /// use std::time::Duration; + /// # use futures::future::{self, FutureResult}; + /// + /// # fn long_future() -> FutureResult<(), ()> { + /// # future::ok(()) + /// # } + /// # + /// # fn main() { + /// let stream = long_future() + /// .into_stream() + /// .timeout(Duration::from_secs(1)) + /// .for_each(|i| future::ok(println!("item = {:?}", i))) + /// .map_err(|e| println!("error = {:?}", e)); + /// + /// tokio::run(stream); + /// # } + /// ``` + #[cfg(feature = "timer")] + fn timeout(self, timeout: Duration) -> Timeout<Self> + where + Self: Sized, + { + Timeout::new(self, timeout) + } +} + +impl<T: ?Sized> StreamExt for T where T: Stream {} |