diff options
Diffstat (limited to 'third_party/rust/futures-0.1.29/src/stream/mod.rs')
-rw-r--r-- | third_party/rust/futures-0.1.29/src/stream/mod.rs | 1146 |
1 files changed, 1146 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.29/src/stream/mod.rs b/third_party/rust/futures-0.1.29/src/stream/mod.rs new file mode 100644 index 0000000000..2d90362470 --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/stream/mod.rs @@ -0,0 +1,1146 @@ +//! Asynchronous streams +//! +//! This module contains the `Stream` trait and a number of adaptors for this +//! trait. This trait is very similar to the `Iterator` trait in the standard +//! library except that it expresses the concept of blocking as well. A stream +//! here is a sequential sequence of values which may take some amount of time +//! in between to produce. +//! +//! A stream may request that it is blocked between values while the next value +//! is calculated, and provides a way to get notified once the next value is +//! ready as well. +//! +//! You can find more information/tutorials about streams [online at +//! https://tokio.rs][online] +//! +//! [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/ + +use {IntoFuture, Poll}; + +mod iter; +#[allow(deprecated)] +pub use self::iter::{iter, Iter}; +#[cfg(feature = "with-deprecated")] +#[allow(deprecated)] +pub use self::Iter as IterStream; +mod iter_ok; +pub use self::iter_ok::{iter_ok, IterOk}; +mod iter_result; +pub use self::iter_result::{iter_result, IterResult}; + +mod repeat; +pub use self::repeat::{repeat, Repeat}; + +mod and_then; +mod chain; +mod concat; +mod empty; +mod filter; +mod filter_map; +mod flatten; +mod fold; +mod for_each; +mod from_err; +mod fuse; +mod future; +mod inspect; +mod inspect_err; +mod map; +mod map_err; +mod merge; +mod once; +mod or_else; +mod peek; +mod poll_fn; +mod select; +mod skip; +mod skip_while; +mod take; +mod take_while; +mod then; +mod unfold; +mod zip; +mod forward; +pub use self::and_then::AndThen; +pub use self::chain::Chain; +#[allow(deprecated)] +pub use self::concat::Concat; +pub use self::concat::Concat2; +pub use self::empty::{Empty, empty}; +pub use self::filter::Filter; +pub use self::filter_map::FilterMap; +pub use self::flatten::Flatten; +pub use self::fold::Fold; +pub use self::for_each::ForEach; +pub use self::from_err::FromErr; +pub use self::fuse::Fuse; +pub use self::future::StreamFuture; +pub use self::inspect::Inspect; +pub use self::inspect_err::InspectErr; +pub use self::map::Map; +pub use self::map_err::MapErr; +#[allow(deprecated)] +pub use self::merge::{Merge, MergedItem}; +pub use self::once::{Once, once}; +pub use self::or_else::OrElse; +pub use self::peek::Peekable; +pub use self::poll_fn::{poll_fn, PollFn}; +pub use self::select::Select; +pub use self::skip::Skip; +pub use self::skip_while::SkipWhile; +pub use self::take::Take; +pub use self::take_while::TakeWhile; +pub use self::then::Then; +pub use self::unfold::{Unfold, unfold}; +pub use self::zip::Zip; +pub use self::forward::Forward; +use sink::{Sink}; + +if_std! { + use std; + + mod buffered; + mod buffer_unordered; + mod catch_unwind; + mod chunks; + mod collect; + mod wait; + mod channel; + mod split; + pub mod futures_unordered; + mod futures_ordered; + pub use self::buffered::Buffered; + pub use self::buffer_unordered::BufferUnordered; + pub use self::catch_unwind::CatchUnwind; + pub use self::chunks::Chunks; + pub use self::collect::Collect; + pub use self::wait::Wait; + pub use self::split::{SplitStream, SplitSink, ReuniteError}; + pub use self::futures_unordered::FuturesUnordered; + pub use self::futures_ordered::{futures_ordered, FuturesOrdered}; + + #[doc(hidden)] + #[cfg(feature = "with-deprecated")] + #[allow(deprecated)] + pub use self::channel::{channel, Sender, Receiver, FutureSender, SendError}; + + /// A type alias for `Box<Stream + Send>` + #[doc(hidden)] + #[deprecated(note = "removed without replacement, recommended to use a \ + local extension trait or function if needed, more \ + details in https://github.com/rust-lang-nursery/futures-rs/issues/228")] + pub type BoxStream<T, E> = ::std::boxed::Box<Stream<Item = T, Error = E> + Send>; + + impl<S: ?Sized + Stream> Stream for ::std::boxed::Box<S> { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + (**self).poll() + } + } +} + +/// A stream of values, not all of which may have been produced yet. +/// +/// `Stream` is a trait to represent any source of sequential events or items +/// which acts like an iterator but long periods of time may pass between +/// items. Like `Future` the methods of `Stream` never block and it is thus +/// suitable for programming in an asynchronous fashion. This trait is very +/// similar to the `Iterator` trait in the standard library where `Some` is +/// used to signal elements of the stream and `None` is used to indicate that +/// the stream is finished. +/// +/// Like futures a stream has basic combinators to transform the stream, perform +/// more work on each item, etc. +/// +/// You can find more information/tutorials about streams [online at +/// https://tokio.rs][online] +/// +/// [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/ +/// +/// # Streams as Futures +/// +/// Any instance of `Stream` can also be viewed as a `Future` where the resolved +/// value is the next item in the stream along with the rest of the stream. The +/// `into_future` adaptor can be used here to convert any stream into a future +/// for use with other future methods like `join` and `select`. +/// +/// # Errors +/// +/// Streams, like futures, can also model errors in their computation. All +/// streams have an associated `Error` type like with futures. Currently as of +/// the 0.1 release of this library an error on a stream **does not terminate +/// the stream**. That is, after one error is received, another error may be +/// received from the same stream (it's valid to keep polling). +/// +/// This property of streams, however, is [being considered] for change in 0.2 +/// where an error on a stream is similar to `None`, it terminates the stream +/// entirely. If one of these use cases suits you perfectly and not the other, +/// please feel welcome to comment on [the issue][being considered]! +/// +/// [being considered]: https://github.com/rust-lang-nursery/futures-rs/issues/206 +#[must_use = "streams do nothing unless polled"] +pub trait Stream { + /// The type of item this stream will yield on success. + type Item; + + /// The type of error this stream may generate. + type Error; + + /// Attempt to pull out the next value of this stream, returning `None` if + /// the stream is finished. + /// + /// This method, like `Future::poll`, is the sole method of pulling out a + /// value from a stream. This method must also be run within the context of + /// a task typically and implementors of this trait must ensure that + /// implementations of this method do not block, as it may cause consumers + /// to behave badly. + /// + /// # Return value + /// + /// If `NotReady` is returned then this stream's next value is not ready + /// yet and implementations will ensure that the current task will be + /// notified when the next value may be ready. If `Some` is returned then + /// the returned value represents the next value on the stream. `Err` + /// indicates an error happened, while `Ok` indicates whether there was a + /// new item on the stream or whether the stream has terminated. + /// + /// # Panics + /// + /// Once a stream is finished, that is `Ready(None)` has been returned, + /// further calls to `poll` may result in a panic or other "bad behavior". + /// If this is difficult to guard against then the `fuse` adapter can be + /// used to ensure that `poll` always has well-defined semantics. + // TODO: more here + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>; + + // TODO: should there also be a method like `poll` but doesn't return an + // item? basically just says "please make more progress internally" + // seems crucial for buffering to actually make any sense. + + /// Creates an iterator which blocks the current thread until each item of + /// this stream is resolved. + /// + /// This method will consume ownership of this stream, returning an + /// implementation of a standard iterator. This iterator will *block the + /// current thread* on each call to `next` if the item in the stream isn't + /// ready yet. + /// + /// > **Note:** This method is not appropriate to call on event loops or + /// > similar I/O situations because it will prevent the event + /// > loop from making progress (this blocks the thread). This + /// > method should only be called when it's guaranteed that the + /// > blocking work associated with this stream will be completed + /// > by another thread. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Panics + /// + /// The returned iterator does not attempt to catch panics. If the `poll` + /// function panics, panics will be propagated to the caller of `next`. + #[cfg(feature = "use_std")] + fn wait(self) -> Wait<Self> + where Self: Sized + { + wait::new(self) + } + + /// Convenience function for turning this stream into a trait object. + /// + /// This simply avoids the need to write `Box::new` and can often help with + /// type inference as well by always returning a trait object. Note that + /// this method requires the `Send` bound and returns a `BoxStream`, which + /// also encodes this. If you'd like to create a `Box<Stream>` without the + /// `Send` bound, then the `Box::new` function can be used instead. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Examples + /// + /// ``` + /// use futures::stream::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel(1); + /// let a: BoxStream<i32, ()> = rx.boxed(); + /// ``` + #[cfg(feature = "use_std")] + #[doc(hidden)] + #[deprecated(note = "removed without replacement, recommended to use a \ + local extension trait or function if needed, more \ + details in https://github.com/rust-lang-nursery/futures-rs/issues/228")] + #[allow(deprecated)] + fn boxed(self) -> BoxStream<Self::Item, Self::Error> + where Self: Sized + Send + 'static, + { + ::std::boxed::Box::new(self) + } + + /// Converts this stream into a `Future`. + /// + /// A stream can be viewed as a future which will resolve to a pair containing + /// the next element of the stream plus the remaining stream. If the stream + /// terminates, then the next element is `None` and the remaining stream is + /// still passed back, to allow reclamation of its resources. + /// + /// The returned future can be used to compose streams and futures together by + /// placing everything into the "world of futures". + fn into_future(self) -> StreamFuture<Self> + where Self: Sized + { + future::new(self) + } + + /// Converts a stream of type `T` to a stream of type `U`. + /// + /// The provided closure is executed over all elements of this stream as + /// they are made available, and the callback will be executed inline with + /// calls to `poll`. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel::<i32>(1); + /// let rx = rx.map(|x| x + 3); + /// ``` + fn map<U, F>(self, f: F) -> Map<Self, F> + where F: FnMut(Self::Item) -> U, + Self: Sized + { + map::new(self, f) + } + + /// Converts a stream of error type `T` to a stream of error type `U`. + /// + /// The provided closure is executed over all errors of this stream as + /// they are made available, and the callback will be executed inline with + /// calls to `poll`. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it, similar to the existing `map_err` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel::<i32>(1); + /// let rx = rx.map_err(|()| 3); + /// ``` + fn map_err<U, F>(self, f: F) -> MapErr<Self, F> + where F: FnMut(Self::Error) -> U, + Self: Sized + { + map_err::new(self, f) + } + + /// Filters the values produced by this stream according to the provided + /// predicate. + /// + /// As values of this stream are made available, the provided predicate will + /// be run against them. If the predicate returns `true` then the stream + /// will yield the value, but if the predicate returns `false` then the + /// value will be discarded and the next value will be produced. + /// + /// All errors are passed through without filtering in this combinator. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it, similar to the existing `filter` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel::<i32>(1); + /// let evens = rx.filter(|x| x % 2 == 0); + /// ``` + fn filter<F>(self, f: F) -> Filter<Self, F> + where F: FnMut(&Self::Item) -> bool, + Self: Sized + { + filter::new(self, f) + } + + /// Filters the values produced by this stream while simultaneously mapping + /// them to a different type. + /// + /// As values of this stream are made available, the provided function will + /// be run on them. If the predicate returns `Some(e)` then the stream will + /// yield the value `e`, but if the predicate returns `None` then the next + /// value will be produced. + /// + /// All errors are passed through without filtering in this combinator. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it, similar to the existing `filter_map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel::<i32>(1); + /// let evens_plus_one = rx.filter_map(|x| { + /// if x % 0 == 2 { + /// Some(x + 1) + /// } else { + /// None + /// } + /// }); + /// ``` + fn filter_map<F, B>(self, f: F) -> FilterMap<Self, F> + where F: FnMut(Self::Item) -> Option<B>, + Self: Sized + { + filter_map::new(self, f) + } + + /// Chain on a computation for when a value is ready, passing the resulting + /// item to the provided closure `f`. + /// + /// This function can be used to ensure a computation runs regardless of + /// the next value on the stream. The closure provided will be yielded a + /// `Result` once a value is ready, and the returned future will then be run + /// to completion to produce the next value on this stream. + /// + /// The returned value of the closure must implement the `IntoFuture` trait + /// and can represent some more work to be done before the composed stream + /// is finished. Note that the `Result` type implements the `IntoFuture` + /// trait so it is possible to simply alter the `Result` yielded to the + /// closure and return it. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel::<i32>(1); + /// + /// let rx = rx.then(|result| { + /// match result { + /// Ok(e) => Ok(e + 3), + /// Err(()) => Err(4), + /// } + /// }); + /// ``` + fn then<F, U>(self, f: F) -> Then<Self, F, U> + where F: FnMut(Result<Self::Item, Self::Error>) -> U, + U: IntoFuture, + Self: Sized + { + then::new(self, f) + } + + /// Chain on a computation for when a value is ready, passing the successful + /// results to the provided closure `f`. + /// + /// This function can be used to run a unit of work when the next successful + /// value on a stream is ready. The closure provided will be yielded a value + /// when ready, and the returned future will then be run to completion to + /// produce the next value on this stream. + /// + /// Any errors produced by this stream will not be passed to the closure, + /// and will be passed through. + /// + /// The returned value of the closure must implement the `IntoFuture` trait + /// and can represent some more work to be done before the composed stream + /// is finished. Note that the `Result` type implements the `IntoFuture` + /// trait so it is possible to simply alter the `Result` yielded to the + /// closure and return it. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it. + /// + /// To process the entire stream and return a single future representing + /// success or error, use `for_each` instead. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (_tx, rx) = mpsc::channel::<i32>(1); + /// + /// let rx = rx.and_then(|result| { + /// if result % 2 == 0 { + /// Ok(result) + /// } else { + /// Err(()) + /// } + /// }); + /// ``` + fn and_then<F, U>(self, f: F) -> AndThen<Self, F, U> + where F: FnMut(Self::Item) -> U, + U: IntoFuture<Error = Self::Error>, + Self: Sized + { + and_then::new(self, f) + } + + /// Chain on a computation for when an error happens, passing the + /// erroneous result to the provided closure `f`. + /// + /// This function can be used to run a unit of work and attempt to recover from + /// an error if one happens. The closure provided will be yielded an error + /// when one appears, and the returned future will then be run to completion + /// to produce the next value on this stream. + /// + /// Any successful values produced by this stream will not be passed to the + /// closure, and will be passed through. + /// + /// The returned value of the closure must implement the `IntoFuture` trait + /// and can represent some more work to be done before the composed stream + /// is finished. Note that the `Result` type implements the `IntoFuture` + /// trait so it is possible to simply alter the `Result` yielded to the + /// closure and return it. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it. + fn or_else<F, U>(self, f: F) -> OrElse<Self, F, U> + where F: FnMut(Self::Error) -> U, + U: IntoFuture<Item = Self::Item>, + Self: Sized + { + or_else::new(self, f) + } + + /// Collect all of the values of this stream into a vector, returning a + /// future representing the result of that computation. + /// + /// This combinator will collect all successful results of this stream and + /// collect them into a `Vec<Self::Item>`. If an error happens then all + /// collected elements will be dropped and the error will be returned. + /// + /// The returned future will be resolved whenever an error happens or when + /// the stream returns `Ok(None)`. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (mut tx, rx) = mpsc::channel(1); + /// + /// thread::spawn(|| { + /// for i in (0..5).rev() { + /// tx = tx.send(i + 1).wait().unwrap(); + /// } + /// }); + /// + /// let mut result = rx.collect(); + /// assert_eq!(result.wait(), Ok(vec![5, 4, 3, 2, 1])); + /// ``` + #[cfg(feature = "use_std")] + fn collect(self) -> Collect<Self> + where Self: Sized + { + collect::new(self) + } + + /// Concatenate all results of a stream into a single extendable + /// destination, returning a future representing the end result. + /// + /// This combinator will extend the first item with the contents + /// of all the successful results of the stream. If the stream is + /// empty, the default value will be returned. If an error occurs, + /// all the results will be dropped and the error will be returned. + /// + /// The name `concat2` is an intermediate measure until the release of + /// futures 0.2, at which point it will be renamed back to `concat`. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (mut tx, rx) = mpsc::channel(1); + /// + /// thread::spawn(move || { + /// for i in (0..3).rev() { + /// let n = i * 3; + /// tx = tx.send(vec![n + 1, n + 2, n + 3]).wait().unwrap(); + /// } + /// }); + /// let result = rx.concat2(); + /// assert_eq!(result.wait(), Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3])); + /// ``` + fn concat2(self) -> Concat2<Self> + where Self: Sized, + Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default, + { + concat::new2(self) + } + + /// Concatenate all results of a stream into a single extendable + /// destination, returning a future representing the end result. + /// + /// This combinator will extend the first item with the contents + /// of all the successful results of the stream. If an error occurs, + /// all the results will be dropped and the error will be returned. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (mut tx, rx) = mpsc::channel(1); + /// + /// thread::spawn(move || { + /// for i in (0..3).rev() { + /// let n = i * 3; + /// tx = tx.send(vec![n + 1, n + 2, n + 3]).wait().unwrap(); + /// } + /// }); + /// let result = rx.concat(); + /// assert_eq!(result.wait(), Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3])); + /// ``` + /// + /// # Panics + /// + /// It's important to note that this function will panic if the stream + /// is empty, which is the reason for its deprecation. + #[deprecated(since="0.1.14", note="please use `Stream::concat2` instead")] + #[allow(deprecated)] + fn concat(self) -> Concat<Self> + where Self: Sized, + Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator, + { + concat::new(self) + } + + /// Execute an accumulating computation over a stream, collecting all the + /// values into one final result. + /// + /// This combinator will collect all successful results of this stream + /// according to the closure provided. The initial state is also provided to + /// this method and then is returned again by each execution of the closure. + /// Once the entire stream has been exhausted the returned future will + /// resolve to this value. + /// + /// If an error happens then collected state will be dropped and the error + /// will be returned. + /// + /// # Examples + /// + /// ``` + /// use futures::prelude::*; + /// use futures::stream; + /// use futures::future; + /// + /// let number_stream = stream::iter_ok::<_, ()>(0..6); + /// let sum = number_stream.fold(0, |acc, x| future::ok(acc + x)); + /// assert_eq!(sum.wait(), Ok(15)); + /// ``` + fn fold<F, T, Fut>(self, init: T, f: F) -> Fold<Self, F, Fut, T> + where F: FnMut(T, Self::Item) -> Fut, + Fut: IntoFuture<Item = T>, + Self::Error: From<Fut::Error>, + Self: Sized + { + fold::new(self, f, init) + } + + /// Flattens a stream of streams into just one continuous stream. + /// + /// If this stream's elements are themselves streams then this combinator + /// will flatten out the entire stream to one long chain of elements. Any + /// errors are passed through without looking at them, but otherwise each + /// individual stream will get exhausted before moving on to the next. + /// + /// ``` + /// use std::thread; + /// + /// use futures::prelude::*; + /// use futures::sync::mpsc; + /// + /// let (tx1, rx1) = mpsc::channel::<i32>(1); + /// let (tx2, rx2) = mpsc::channel::<i32>(1); + /// let (tx3, rx3) = mpsc::channel(1); + /// + /// thread::spawn(|| { + /// tx1.send(1).wait().unwrap() + /// .send(2).wait().unwrap(); + /// }); + /// thread::spawn(|| { + /// tx2.send(3).wait().unwrap() + /// .send(4).wait().unwrap(); + /// }); + /// thread::spawn(|| { + /// tx3.send(rx1).wait().unwrap() + /// .send(rx2).wait().unwrap(); + /// }); + /// + /// let mut result = rx3.flatten().collect(); + /// assert_eq!(result.wait(), Ok(vec![1, 2, 3, 4])); + /// ``` + fn flatten(self) -> Flatten<Self> + where Self::Item: Stream, + <Self::Item as Stream>::Error: From<Self::Error>, + Self: Sized + { + flatten::new(self) + } + + /// Skip elements on this stream while the predicate provided resolves to + /// `true`. + /// + /// This function, like `Iterator::skip_while`, will skip elements on the + /// stream until the `predicate` resolves to `false`. Once one element + /// returns false all future elements will be returned from the underlying + /// stream. + fn skip_while<P, R>(self, pred: P) -> SkipWhile<Self, P, R> + where P: FnMut(&Self::Item) -> R, + R: IntoFuture<Item=bool, Error=Self::Error>, + Self: Sized + { + skip_while::new(self, pred) + } + + /// Take elements from this stream while the predicate provided resolves to + /// `true`. + /// + /// This function, like `Iterator::take_while`, will take elements from the + /// stream until the `predicate` resolves to `false`. Once one element + /// returns false it will always return that the stream is done. + fn take_while<P, R>(self, pred: P) -> TakeWhile<Self, P, R> + where P: FnMut(&Self::Item) -> R, + R: IntoFuture<Item=bool, Error=Self::Error>, + Self: Sized + { + take_while::new(self, pred) + } + + /// Runs this stream to completion, executing the provided closure for each + /// element on the stream. + /// + /// The closure provided will be called for each item this stream resolves + /// to successfully, producing a future. That future will then be executed + /// to completion before moving on to the next item. + /// + /// The returned value is a `Future` where the `Item` type is `()` and + /// errors are otherwise threaded through. Any error on the stream or in the + /// closure will cause iteration to be halted immediately and the future + /// will resolve to that error. + /// + /// To process each item in the stream and produce another stream instead + /// of a single future, use `and_then` instead. + fn for_each<F, U>(self, f: F) -> ForEach<Self, F, U> + where F: FnMut(Self::Item) -> U, + U: IntoFuture<Item=(), Error = Self::Error>, + Self: Sized + { + for_each::new(self, f) + } + + /// Map this stream's error to any error implementing `From` for + /// this stream's `Error`, returning a new stream. + /// + /// This function does for streams what `try!` does for `Result`, + /// by letting the compiler infer the type of the resulting error. + /// Just as `map_err` above, this is useful for example to ensure + /// that streams have the same error type when used with + /// combinators. + /// + /// Note that this function consumes the receiving stream and returns a + /// wrapped version of it. + fn from_err<E: From<Self::Error>>(self) -> FromErr<Self, E> + where Self: Sized, + { + from_err::new(self) + } + + /// Creates a new stream of at most `amt` items of the underlying stream. + /// + /// Once `amt` items have been yielded from this stream then it will always + /// return that the stream is done. + /// + /// # Errors + /// + /// Any errors yielded from underlying stream, before the desired amount of + /// items is reached, are passed through and do not affect the total number + /// of items taken. + fn take(self, amt: u64) -> Take<Self> + where Self: Sized + { + take::new(self, amt) + } + + /// Creates a new stream which skips `amt` items of the underlying stream. + /// + /// Once `amt` items have been skipped from this stream then it will always + /// return the remaining items on this stream. + /// + /// # Errors + /// + /// All errors yielded from underlying stream are passed through and do not + /// affect the total number of items skipped. + fn skip(self, amt: u64) -> Skip<Self> + where Self: Sized + { + skip::new(self, amt) + } + + /// Fuse a stream such that `poll` will never again be called once it has + /// finished. + /// + /// Currently once a stream has returned `None` from `poll` any further + /// calls could exhibit bad behavior such as block forever, panic, never + /// return, etc. If it is known that `poll` may be called after stream has + /// already finished, then this method can be used to ensure that it has + /// defined semantics. + /// + /// Once a stream has been `fuse`d and it finishes, then it will forever + /// return `None` from `poll`. This, unlike for the traits `poll` method, + /// is guaranteed. + /// + /// Also note that as soon as this stream returns `None` it will be dropped + /// to reclaim resources associated with it. + fn fuse(self) -> Fuse<Self> + where Self: Sized + { + fuse::new(self) + } + + /// Borrows a stream, rather than consuming it. + /// + /// This is useful to allow applying stream adaptors while still retaining + /// ownership of the original stream. + /// + /// ``` + /// use futures::prelude::*; + /// use futures::stream; + /// use futures::future; + /// + /// let mut stream = stream::iter_ok::<_, ()>(1..5); + /// + /// let sum = stream.by_ref().take(2).fold(0, |a, b| future::ok(a + b)).wait(); + /// assert_eq!(sum, Ok(3)); + /// + /// // You can use the stream again + /// let sum = stream.take(2).fold(0, |a, b| future::ok(a + b)).wait(); + /// assert_eq!(sum, Ok(7)); + /// ``` + fn by_ref(&mut self) -> &mut Self + where Self: Sized + { + self + } + + /// Catches unwinding panics while polling the stream. + /// + /// Caught panic (if any) will be the last element of the resulting stream. + /// + /// In general, panics within a stream can propagate all the way out to the + /// task level. This combinator makes it possible to halt unwinding within + /// the stream itself. It's most commonly used within task executors. This + /// method should not be used for error handling. + /// + /// Note that this method requires the `UnwindSafe` bound from the standard + /// library. This isn't always applied automatically, and the standard + /// library provides an `AssertUnwindSafe` wrapper type to apply it + /// after-the fact. To assist using this method, the `Stream` trait is also + /// implemented for `AssertUnwindSafe<S>` where `S` implements `Stream`. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Examples + /// + /// ```rust + /// use futures::prelude::*; + /// use futures::stream; + /// + /// let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]); + /// // panic on second element + /// let stream_panicking = stream.map(|o| o.unwrap()); + /// let mut iter = stream_panicking.catch_unwind().wait(); + /// + /// assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap()); + /// assert!(iter.next().unwrap().is_err()); + /// assert!(iter.next().is_none()); + /// ``` + #[cfg(feature = "use_std")] + fn catch_unwind(self) -> CatchUnwind<Self> + where Self: Sized + std::panic::UnwindSafe + { + catch_unwind::new(self) + } + + /// An adaptor for creating a buffered list of pending futures. + /// + /// If this stream's item can be converted into a future, then this adaptor + /// will buffer up to at most `amt` futures and then return results in the + /// same order as the underlying stream. No more than `amt` futures will be + /// buffered at any point in time, and less than `amt` may also be buffered + /// depending on the state of each future. + /// + /// The returned stream will be a stream of each future's result, with + /// errors passed through whenever they occur. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "use_std")] + fn buffered(self, amt: usize) -> Buffered<Self> + where Self::Item: IntoFuture<Error = <Self as Stream>::Error>, + Self: Sized + { + buffered::new(self, amt) + } + + /// An adaptor for creating a buffered list of pending futures (unordered). + /// + /// If this stream's item can be converted into a future, then this adaptor + /// will buffer up to `amt` futures and then return results in the order + /// in which they complete. No more than `amt` futures will be buffered at + /// any point in time, and less than `amt` may also be buffered depending on + /// the state of each future. + /// + /// The returned stream will be a stream of each future's result, with + /// errors passed through whenever they occur. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "use_std")] + fn buffer_unordered(self, amt: usize) -> BufferUnordered<Self> + where Self::Item: IntoFuture<Error = <Self as Stream>::Error>, + Self: Sized + { + buffer_unordered::new(self, amt) + } + + /// An adapter for merging the output of two streams. + /// + /// The merged stream produces items from one or both of the underlying + /// streams as they become available. Errors, however, are not merged: you + /// get at most one error at a time. + #[deprecated(note = "functionality provided by `select` now")] + #[allow(deprecated)] + fn merge<S>(self, other: S) -> Merge<Self, S> + where S: Stream<Error = Self::Error>, + Self: Sized, + { + merge::new(self, other) + } + + /// An adapter for zipping two streams together. + /// + /// The zipped stream waits for both streams to produce an item, and then + /// returns that pair. If an error happens, then that error will be returned + /// immediately. If either stream ends then the zipped stream will also end. + fn zip<S>(self, other: S) -> Zip<Self, S> + where S: Stream<Error = Self::Error>, + Self: Sized, + { + zip::new(self, other) + } + + /// Adapter for chaining two stream. + /// + /// The resulting stream emits elements from the first stream, and when + /// first stream reaches the end, emits the elements from the second stream. + /// + /// ```rust + /// use futures::prelude::*; + /// use futures::stream; + /// + /// let stream1 = stream::iter_result(vec![Ok(10), Err(false)]); + /// let stream2 = stream::iter_result(vec![Err(true), Ok(20)]); + /// let mut chain = stream1.chain(stream2).wait(); + /// + /// assert_eq!(Some(Ok(10)), chain.next()); + /// assert_eq!(Some(Err(false)), chain.next()); + /// assert_eq!(Some(Err(true)), chain.next()); + /// assert_eq!(Some(Ok(20)), chain.next()); + /// assert_eq!(None, chain.next()); + /// ``` + fn chain<S>(self, other: S) -> Chain<Self, S> + where S: Stream<Item = Self::Item, Error = Self::Error>, + Self: Sized + { + chain::new(self, other) + } + + /// Creates a new stream which exposes a `peek` method. + /// + /// Calling `peek` returns a reference to the next item in the stream. + fn peekable(self) -> Peekable<Self> + where Self: Sized + { + peek::new(self) + } + + /// An adaptor for chunking up items of the stream inside a vector. + /// + /// This combinator will attempt to pull items from this stream and buffer + /// them into a local vector. At most `capacity` items will get buffered + /// before they're yielded from the returned stream. + /// + /// Note that the vectors returned from this iterator may not always have + /// `capacity` elements. If the underlying stream ended and only a partial + /// vector was created, it'll be returned. Additionally if an error happens + /// from the underlying stream then the currently buffered items will be + /// yielded. + /// + /// Errors are passed through the stream unbuffered. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Panics + /// + /// This method will panic of `capacity` is zero. + #[cfg(feature = "use_std")] + fn chunks(self, capacity: usize) -> Chunks<Self> + where Self: Sized + { + chunks::new(self, capacity) + } + + /// Creates a stream that selects the next element from either this stream + /// or the provided one, whichever is ready first. + /// + /// This combinator will attempt to pull items from both streams. Each + /// stream will be polled in a round-robin fashion, and whenever a stream is + /// ready to yield an item that item is yielded. + /// + /// The `select` function is similar to `merge` except that it requires both + /// streams to have the same item and error types. + /// + /// Error are passed through from either stream. + fn select<S>(self, other: S) -> Select<Self, S> + where S: Stream<Item = Self::Item, Error = Self::Error>, + Self: Sized, + { + select::new(self, other) + } + + /// A future that completes after the given stream has been fully processed + /// into the sink, including flushing. + /// + /// This future will drive the stream to keep producing items until it is + /// exhausted, sending each item to the sink. It will complete once both the + /// stream is exhausted, and the sink has fully processed received item, + /// flushed successfully, and closed successfully. + /// + /// Doing `stream.forward(sink)` is roughly equivalent to + /// `sink.send_all(stream)`. The returned future will exhaust all items from + /// `self`, sending them all to `sink`. Furthermore the `sink` will be + /// closed and flushed. + /// + /// On completion, the pair `(stream, sink)` is returned. + fn forward<S>(self, sink: S) -> Forward<Self, S> + where S: Sink<SinkItem = Self::Item>, + Self::Error: From<S::SinkError>, + Self: Sized + { + forward::new(self, sink) + } + + /// Splits this `Stream + Sink` object into separate `Stream` and `Sink` + /// objects. + /// + /// This can be useful when you want to split ownership between tasks, or + /// allow direct interaction between the two objects (e.g. via + /// `Sink::send_all`). + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "use_std")] + fn split(self) -> (SplitSink<Self>, SplitStream<Self>) + where Self: super::sink::Sink + Sized + { + split::split(self) + } + + /// Do something with each item of this stream, afterwards passing it on. + /// + /// This is similar to the `Iterator::inspect` method in the standard + /// library where it allows easily inspecting each value as it passes + /// through the stream, for example to debug what's going on. + fn inspect<F>(self, f: F) -> Inspect<Self, F> + where F: FnMut(&Self::Item), + Self: Sized, + { + inspect::new(self, f) + } + + /// Do something with the error of this stream, afterwards passing it on. + /// + /// This is similar to the `Stream::inspect` method where it allows + /// easily inspecting the error as it passes through the stream, for + /// example to debug what's going on. + fn inspect_err<F>(self, f: F) -> InspectErr<Self, F> + where F: FnMut(&Self::Error), + Self: Sized, + { + inspect_err::new(self, f) + } +} + +impl<'a, S: ?Sized + Stream> Stream for &'a mut S { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + (**self).poll() + } +} + +/// Converts a list of futures into a `Stream` of results from the futures. +/// +/// This function will take an list of futures (e.g. a vector, an iterator, +/// etc), and return a stream. The stream will yield items as they become +/// available on the futures internally, in the order that they become +/// available. This function is similar to `buffer_unordered` in that it may +/// return items in a different order than in the list specified. +/// +/// Note that the returned set can also be used to dynamically push more +/// futures into the set as they become available. +#[cfg(feature = "use_std")] +pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future> + where I: IntoIterator, + I::Item: IntoFuture +{ + let mut set = FuturesUnordered::new(); + + for future in futures { + set.push(future.into_future()); + } + + return set +} |