diff options
Diffstat (limited to 'third_party/rust/futures-0.1.29/src/sink')
-rw-r--r-- | third_party/rust/futures-0.1.29/src/sink/buffer.rs | 108 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.29/src/sink/fanout.rs | 135 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.29/src/sink/flush.rs | 46 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.29/src/sink/from_err.rs | 71 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.29/src/sink/map_err.rs | 64 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.29/src/sink/mod.rs | 489 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.29/src/sink/send.rs | 59 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.29/src/sink/send_all.rs | 88 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.29/src/sink/wait.rs | 59 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.29/src/sink/with.rs | 153 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.29/src/sink/with_flat_map.rs | 126 |
11 files changed, 1398 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.29/src/sink/buffer.rs b/third_party/rust/futures-0.1.29/src/sink/buffer.rs new file mode 100644 index 0000000000..419579d9a0 --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/sink/buffer.rs @@ -0,0 +1,108 @@ +use std::collections::VecDeque; + +use {Poll, Async}; +use {StartSend, AsyncSink}; +use sink::Sink; +use stream::Stream; + +/// Sink for the `Sink::buffer` combinator, which buffers up to some fixed +/// number of values when the underlying sink is unable to accept them. +#[derive(Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct Buffer<S: Sink> { + sink: S, + buf: VecDeque<S::SinkItem>, + + // Track capacity separately from the `VecDeque`, which may be rounded up + cap: usize, +} + +pub fn new<S: Sink>(sink: S, amt: usize) -> Buffer<S> { + Buffer { + sink: sink, + buf: VecDeque::with_capacity(amt), + cap: amt, + } +} + +impl<S: Sink> Buffer<S> { + /// Get a shared reference to the inner sink. + pub fn get_ref(&self) -> &S { + &self.sink + } + + /// Get a mutable reference to the inner sink. + pub fn get_mut(&mut self) -> &mut S { + &mut self.sink + } + + /// Consumes this combinator, returning the underlying sink. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.sink + } + + fn try_empty_buffer(&mut self) -> Poll<(), S::SinkError> { + while let Some(item) = self.buf.pop_front() { + if let AsyncSink::NotReady(item) = self.sink.start_send(item)? { + self.buf.push_front(item); + + return Ok(Async::NotReady); + } + } + + Ok(Async::Ready(())) + } +} + +// Forwarding impl of Stream from the underlying sink +impl<S> Stream for Buffer<S> where S: Sink + Stream { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + self.sink.poll() + } +} + +impl<S: Sink> Sink for Buffer<S> { + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + if self.cap == 0 { + return self.sink.start_send(item); + } + + self.try_empty_buffer()?; + if self.buf.len() == self.cap { + return Ok(AsyncSink::NotReady(item)); + } + self.buf.push_back(item); + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + if self.cap == 0 { + return self.sink.poll_complete(); + } + + try_ready!(self.try_empty_buffer()); + debug_assert!(self.buf.is_empty()); + self.sink.poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + if self.cap == 0 { + return self.sink.close(); + } + + if self.buf.len() > 0 { + try_ready!(self.try_empty_buffer()); + } + assert_eq!(self.buf.len(), 0); + self.sink.close() + } +} diff --git a/third_party/rust/futures-0.1.29/src/sink/fanout.rs b/third_party/rust/futures-0.1.29/src/sink/fanout.rs new file mode 100644 index 0000000000..8d2456e7e8 --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/sink/fanout.rs @@ -0,0 +1,135 @@ +use core::fmt::{Debug, Formatter, Result as FmtResult}; +use core::mem::replace; + +use {Async, AsyncSink, Poll, Sink, StartSend}; + +/// Sink that clones incoming items and forwards them to two sinks at the same time. +/// +/// Backpressure from any downstream sink propagates up, which means that this sink +/// can only process items as fast as its _slowest_ downstream sink. +pub struct Fanout<A: Sink, B: Sink> { + left: Downstream<A>, + right: Downstream<B> +} + +impl<A: Sink, B: Sink> Fanout<A, B> { + /// Consumes this combinator, returning the underlying sinks. + /// + /// Note that this may discard intermediate state of this combinator, + /// so care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> (A, B) { + (self.left.sink, self.right.sink) + } +} + +impl<A: Sink + Debug, B: Sink + Debug> Debug for Fanout<A, B> + where A::SinkItem: Debug, + B::SinkItem: Debug +{ + fn fmt(&self, f: &mut Formatter) -> FmtResult { + f.debug_struct("Fanout") + .field("left", &self.left) + .field("right", &self.right) + .finish() + } +} + +pub fn new<A: Sink, B: Sink>(left: A, right: B) -> Fanout<A, B> { + Fanout { + left: Downstream::new(left), + right: Downstream::new(right) + } +} + +impl<A, B> Sink for Fanout<A, B> + where A: Sink, + A::SinkItem: Clone, + B: Sink<SinkItem=A::SinkItem, SinkError=A::SinkError> +{ + type SinkItem = A::SinkItem; + type SinkError = A::SinkError; + + fn start_send( + &mut self, + item: Self::SinkItem + ) -> StartSend<Self::SinkItem, Self::SinkError> { + // Attempt to complete processing any outstanding requests. + self.left.keep_flushing()?; + self.right.keep_flushing()?; + // Only if both downstream sinks are ready, start sending the next item. + if self.left.is_ready() && self.right.is_ready() { + self.left.state = self.left.sink.start_send(item.clone())?; + self.right.state = self.right.sink.start_send(item)?; + Ok(AsyncSink::Ready) + } else { + Ok(AsyncSink::NotReady(item)) + } + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + let left_async = self.left.poll_complete()?; + let right_async = self.right.poll_complete()?; + // Only if both downstream sinks are ready, signal readiness. + if left_async.is_ready() && right_async.is_ready() { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + let left_async = self.left.close()?; + let right_async = self.right.close()?; + // Only if both downstream sinks are ready, signal readiness. + if left_async.is_ready() && right_async.is_ready() { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + } +} + +#[derive(Debug)] +struct Downstream<S: Sink> { + sink: S, + state: AsyncSink<S::SinkItem> +} + +impl<S: Sink> Downstream<S> { + fn new(sink: S) -> Self { + Downstream { sink: sink, state: AsyncSink::Ready } + } + + fn is_ready(&self) -> bool { + self.state.is_ready() + } + + fn keep_flushing(&mut self) -> Result<(), S::SinkError> { + if let AsyncSink::NotReady(item) = replace(&mut self.state, AsyncSink::Ready) { + self.state = self.sink.start_send(item)?; + } + Ok(()) + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + self.keep_flushing()?; + let async = self.sink.poll_complete()?; + // Only if all values have been sent _and_ the underlying + // sink is completely flushed, signal readiness. + if self.state.is_ready() && async.is_ready() { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + self.keep_flushing()?; + // If all items have been flushed, initiate close. + if self.state.is_ready() { + self.sink.close() + } else { + Ok(Async::NotReady) + } + } +} diff --git a/third_party/rust/futures-0.1.29/src/sink/flush.rs b/third_party/rust/futures-0.1.29/src/sink/flush.rs new file mode 100644 index 0000000000..f66811e03d --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/sink/flush.rs @@ -0,0 +1,46 @@ +use {Poll, Async, Future}; +use sink::Sink; + +/// Future for the `Sink::flush` combinator, which polls the sink until all data +/// has been flushed. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct Flush<S> { + sink: Option<S>, +} + +pub fn new<S: Sink>(sink: S) -> Flush<S> { + Flush { sink: Some(sink) } +} + +impl<S: Sink> Flush<S> { + /// Get a shared reference to the inner sink. + pub fn get_ref(&self) -> &S { + self.sink.as_ref().expect("Attempted `Flush::get_ref` after the flush completed") + } + + /// Get a mutable reference to the inner sink. + pub fn get_mut(&mut self) -> &mut S { + self.sink.as_mut().expect("Attempted `Flush::get_mut` after the flush completed") + } + + /// Consume the `Flush` and return the inner sink. + pub fn into_inner(self) -> S { + self.sink.expect("Attempted `Flush::into_inner` after the flush completed") + } +} + +impl<S: Sink> Future for Flush<S> { + type Item = S; + type Error = S::SinkError; + + fn poll(&mut self) -> Poll<S, S::SinkError> { + let mut sink = self.sink.take().expect("Attempted to poll Flush after it completed"); + if sink.poll_complete()?.is_ready() { + Ok(Async::Ready(sink)) + } else { + self.sink = Some(sink); + Ok(Async::NotReady) + } + } +} diff --git a/third_party/rust/futures-0.1.29/src/sink/from_err.rs b/third_party/rust/futures-0.1.29/src/sink/from_err.rs new file mode 100644 index 0000000000..4880c30ef4 --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/sink/from_err.rs @@ -0,0 +1,71 @@ +use core::marker::PhantomData; + +use {Sink, Poll, StartSend}; + +/// A sink combinator to change the error type of a sink. +/// +/// This is created by the `Sink::from_err` method. +#[derive(Clone, Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct SinkFromErr<S, E> { + sink: S, + f: PhantomData<E> +} + +pub fn new<S, E>(sink: S) -> SinkFromErr<S, E> + where S: Sink +{ + SinkFromErr { + sink: sink, + f: PhantomData + } +} + +impl<S, E> SinkFromErr<S, E> { + /// Get a shared reference to the inner sink. + pub fn get_ref(&self) -> &S { + &self.sink + } + + /// Get a mutable reference to the inner sink. + pub fn get_mut(&mut self) -> &mut S { + &mut self.sink + } + + /// Consumes this combinator, returning the underlying sink. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.sink + } +} + +impl<S, E> Sink for SinkFromErr<S, E> + where S: Sink, + E: From<S::SinkError> +{ + type SinkItem = S::SinkItem; + type SinkError = E; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + self.sink.start_send(item).map_err(|e| e.into()) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.sink.poll_complete().map_err(|e| e.into()) + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.sink.close().map_err(|e| e.into()) + } +} + +impl<S: ::stream::Stream, E> ::stream::Stream for SinkFromErr<S, E> { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + self.sink.poll() + } +} diff --git a/third_party/rust/futures-0.1.29/src/sink/map_err.rs b/third_party/rust/futures-0.1.29/src/sink/map_err.rs new file mode 100644 index 0000000000..25c168c071 --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/sink/map_err.rs @@ -0,0 +1,64 @@ +use sink::Sink; + +use {Poll, StartSend, Stream}; + +/// Sink for the `Sink::sink_map_err` combinator. +#[derive(Clone,Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct SinkMapErr<S, F> { + sink: S, + f: Option<F>, +} + +pub fn new<S, F>(s: S, f: F) -> SinkMapErr<S, F> { + SinkMapErr { sink: s, f: Some(f) } +} + +impl<S, E> SinkMapErr<S, E> { + /// Get a shared reference to the inner sink. + pub fn get_ref(&self) -> &S { + &self.sink + } + + /// Get a mutable reference to the inner sink. + pub fn get_mut(&mut self) -> &mut S { + &mut self.sink + } + + /// Consumes this combinator, returning the underlying sink. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.sink + } +} + +impl<S, F, E> Sink for SinkMapErr<S, F> + where S: Sink, + F: FnOnce(S::SinkError) -> E, +{ + type SinkItem = S::SinkItem; + type SinkError = E; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + self.sink.start_send(item).map_err(|e| self.f.take().expect("cannot use MapErr after an error")(e)) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.sink.poll_complete().map_err(|e| self.f.take().expect("cannot use MapErr after an error")(e)) + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.sink.close().map_err(|e| self.f.take().expect("cannot use MapErr after an error")(e)) + } +} + +impl<S: Stream, F> Stream for SinkMapErr<S, F> { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + self.sink.poll() + } +} diff --git a/third_party/rust/futures-0.1.29/src/sink/mod.rs b/third_party/rust/futures-0.1.29/src/sink/mod.rs new file mode 100644 index 0000000000..e5ea97f92a --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/sink/mod.rs @@ -0,0 +1,489 @@ +//! Asynchronous sinks +//! +//! This module contains the `Sink` trait, along with a number of adapter types +//! for it. An overview is available in the documentation for the trait itself. +//! +//! You can find more information/tutorials about streams [online at +//! https://tokio.rs][online] +//! +//! [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/ + +use {IntoFuture, Poll, StartSend}; +use stream::Stream; + +mod with; +mod with_flat_map; +// mod with_map; +// mod with_filter; +// mod with_filter_map; +mod flush; +mod from_err; +mod send; +mod send_all; +mod map_err; +mod fanout; + +if_std! { + mod buffer; + mod wait; + + pub use self::buffer::Buffer; + pub use self::wait::Wait; + + // TODO: consider expanding this via e.g. FromIterator + impl<T> Sink for ::std::vec::Vec<T> { + type SinkItem = T; + type SinkError = (); // Change this to ! once it stabilizes + + fn start_send(&mut self, item: Self::SinkItem) + -> StartSend<Self::SinkItem, Self::SinkError> + { + self.push(item); + Ok(::AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + Ok(::Async::Ready(())) + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + Ok(::Async::Ready(())) + } + } + + /// A type alias for `Box<Sink + Send>` + pub type BoxSink<T, E> = ::std::boxed::Box<Sink<SinkItem = T, SinkError = E> + + ::core::marker::Send>; + + impl<S: ?Sized + Sink> Sink for ::std::boxed::Box<S> { + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: Self::SinkItem) + -> StartSend<Self::SinkItem, Self::SinkError> { + (**self).start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + (**self).poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + (**self).close() + } + } +} + +pub use self::with::With; +pub use self::with_flat_map::WithFlatMap; +pub use self::flush::Flush; +pub use self::send::Send; +pub use self::send_all::SendAll; +pub use self::map_err::SinkMapErr; +pub use self::from_err::SinkFromErr; +pub use self::fanout::Fanout; + +/// A `Sink` is a value into which other values can be sent, asynchronously. +/// +/// Basic examples of sinks include the sending side of: +/// +/// - Channels +/// - Sockets +/// - Pipes +/// +/// In addition to such "primitive" sinks, it's typical to layer additional +/// functionality, such as buffering, on top of an existing sink. +/// +/// Sending to a sink is "asynchronous" in the sense that the value may not be +/// sent in its entirety immediately. Instead, values are sent in a two-phase +/// way: first by initiating a send, and then by polling for completion. This +/// two-phase setup is analogous to buffered writing in synchronous code, where +/// writes often succeed immediately, but internally are buffered and are +/// *actually* written only upon flushing. +/// +/// In addition, the `Sink` may be *full*, in which case it is not even possible +/// to start the sending process. +/// +/// As with `Future` and `Stream`, the `Sink` trait is built from a few core +/// required methods, and a host of default methods for working in a +/// higher-level way. The `Sink::send_all` combinator is of particular +/// importance: you can use it to send an entire stream to a sink, which is +/// the simplest way to ultimately consume a sink. +/// +/// You can find more information/tutorials about streams [online at +/// https://tokio.rs][online] +/// +/// [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/ +pub trait Sink { + /// The type of value that the sink accepts. + type SinkItem; + + /// The type of value produced by the sink when an error occurs. + type SinkError; + + /// Begin the process of sending a value to the sink. + /// + /// As the name suggests, this method only *begins* the process of sending + /// the item. If the sink employs buffering, the item isn't fully processed + /// until the buffer is fully flushed. Since sinks are designed to work with + /// asynchronous I/O, the process of actually writing out the data to an + /// underlying object takes place asynchronously. **You *must* use + /// `poll_complete` in order to drive completion of a send**. In particular, + /// `start_send` does not begin the flushing process + /// + /// # Return value + /// + /// This method returns `AsyncSink::Ready` if the sink was able to start + /// sending `item`. In that case, you *must* ensure that you call + /// `poll_complete` to process the sent item to completion. Note, however, + /// that several calls to `start_send` can be made prior to calling + /// `poll_complete`, which will work on completing all pending items. + /// + /// The method returns `AsyncSink::NotReady` if the sink was unable to begin + /// sending, usually due to being full. The sink must have attempted to + /// complete processing any outstanding requests (equivalent to + /// `poll_complete`) before yielding this result. The current task will be + /// automatically scheduled for notification when the sink may be ready to + /// receive new values. + /// + /// # Errors + /// + /// If the sink encounters an error other than being temporarily full, it + /// uses the `Err` variant to signal that error. In most cases, such errors + /// mean that the sink will permanently be unable to receive items. + /// + /// # Panics + /// + /// This method may panic in a few situations, depending on the specific + /// sink: + /// + /// - It is called outside of the context of a task. + /// - A previous call to `start_send` or `poll_complete` yielded an error. + fn start_send(&mut self, item: Self::SinkItem) + -> StartSend<Self::SinkItem, Self::SinkError>; + + /// Flush all output from this sink, if necessary. + /// + /// Some sinks may buffer intermediate data as an optimization to improve + /// throughput. In other words, if a sink has a corresponding receiver then + /// a successful `start_send` above may not guarantee that the value is + /// actually ready to be received by the receiver. This function is intended + /// to be used to ensure that values do indeed make their way to the + /// receiver. + /// + /// This function will attempt to process any pending requests on behalf of + /// the sink and drive it to completion. + /// + /// # Return value + /// + /// Returns `Ok(Async::Ready(()))` when no buffered items remain. If this + /// value is returned then it is guaranteed that all previous values sent + /// via `start_send` will be guaranteed to be available to a listening + /// receiver. + /// + /// Returns `Ok(Async::NotReady)` if there is more work left to do, in which + /// case the current task is scheduled to wake up when more progress may be + /// possible. + /// + /// # Errors + /// + /// Returns `Err` if the sink encounters an error while processing one of + /// its pending requests. Due to the buffered nature of requests, it is not + /// generally possible to correlate the error with a particular request. As + /// with `start_send`, these errors are generally "fatal" for continued use + /// of the sink. + /// + /// # Panics + /// + /// This method may panic in a few situations, depending on the specific sink: + /// + /// - It is called outside of the context of a task. + /// - A previous call to `start_send` or `poll_complete` yielded an error. + /// + /// # Compatibility nodes + /// + /// The name of this method may be slightly misleading as the original + /// intention was to have this method be more general than just flushing + /// requests. Over time though it was decided to trim back the ambitions of + /// this method to what it's always done, just flushing. + /// + /// In the 0.2 release series of futures this method will be renamed to + /// `poll_flush`. For 0.1, however, the breaking change is not happening + /// yet. + fn poll_complete(&mut self) -> Poll<(), Self::SinkError>; + + /// A method to indicate that no more values will ever be pushed into this + /// sink. + /// + /// This method is used to indicate that a sink will no longer even be given + /// another value by the caller. That is, the `start_send` method above will + /// be called no longer (nor `poll_complete`). This method is intended to + /// model "graceful shutdown" in various protocols where the intent to shut + /// down is followed by a little more blocking work. + /// + /// Callers of this function should work it it in a similar fashion to + /// `poll_complete`. Once called it may return `NotReady` which indicates + /// that more external work needs to happen to make progress. The current + /// task will be scheduled to receive a notification in such an event, + /// however. + /// + /// Note that this function will imply `poll_complete` above. That is, if a + /// sink has buffered data, then it'll be flushed out during a `close` + /// operation. It is not necessary to have `poll_complete` return `Ready` + /// before a `close` is called. Once a `close` is called, though, + /// `poll_complete` cannot be called. + /// + /// # Return value + /// + /// This function, like `poll_complete`, returns a `Poll`. The value is + /// `Ready` once the close operation has completed. At that point it should + /// be safe to drop the sink and deallocate associated resources. + /// + /// If the value returned is `NotReady` then the sink is not yet closed and + /// work needs to be done to close it. The work has been scheduled and the + /// current task will receive a notification when it's next ready to call + /// this method again. + /// + /// Finally, this function may also return an error. + /// + /// # Errors + /// + /// This function will return an `Err` if any operation along the way during + /// the close operation fails. An error typically is fatal for a sink and is + /// unable to be recovered from, but in specific situations this may not + /// always be true. + /// + /// Note that it's also typically an error to call `start_send` or + /// `poll_complete` after the `close` function is called. This method will + /// *initiate* a close, and continuing to send values after that (or attempt + /// to flush) may result in strange behavior, panics, errors, etc. Once this + /// method is called, it must be the only method called on this `Sink`. + /// + /// # Panics + /// + /// This method may panic or cause panics if: + /// + /// * It is called outside the context of a future's task + /// * It is called and then `start_send` or `poll_complete` is called + /// + /// # Compatibility notes + /// + /// Note that this function is currently by default a provided function, + /// defaulted to calling `poll_complete` above. This function was added + /// in the 0.1 series of the crate as a backwards-compatible addition. It + /// is intended that in the 0.2 series the method will no longer be a + /// default method. + /// + /// It is highly recommended to consider this method a required method and + /// to implement it whenever you implement `Sink` locally. It is especially + /// crucial to be sure to close inner sinks, if applicable. + #[cfg(feature = "with-deprecated")] + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.poll_complete() + } + + /// dox (you should see the above, not this) + #[cfg(not(feature = "with-deprecated"))] + fn close(&mut self) -> Poll<(), Self::SinkError>; + + /// Creates a new object which will produce a synchronous sink. + /// + /// The sink returned does **not** implement the `Sink` trait, and instead + /// only has two methods: `send` and `flush`. These two methods correspond + /// to `start_send` and `poll_complete` above except are executed in a + /// blocking fashion. + #[cfg(feature = "use_std")] + fn wait(self) -> Wait<Self> + where Self: Sized + { + wait::new(self) + } + + /// Composes a function *in front of* the sink. + /// + /// This adapter produces a new sink that passes each value through the + /// given function `f` before sending it to `self`. + /// + /// To process each value, `f` produces a *future*, which is then polled to + /// completion before passing its result down to the underlying sink. If the + /// future produces an error, that error is returned by the new sink. + /// + /// Note that this function consumes the given sink, returning a wrapped + /// version, much like `Iterator::map`. + fn with<U, F, Fut>(self, f: F) -> With<Self, U, F, Fut> + where F: FnMut(U) -> Fut, + Fut: IntoFuture<Item = Self::SinkItem>, + Fut::Error: From<Self::SinkError>, + Self: Sized + { + with::new(self, f) + } + + /// Composes a function *in front of* the sink. + /// + /// This adapter produces a new sink that passes each value through the + /// given function `f` before sending it to `self`. + /// + /// To process each value, `f` produces a *stream*, of which each value + /// is passed to the underlying sink. A new value will not be accepted until + /// the stream has been drained + /// + /// Note that this function consumes the given sink, returning a wrapped + /// version, much like `Iterator::flat_map`. + /// + /// # Examples + /// --- + /// Using this function with an iterator through use of the `stream::iter_ok()` + /// function + /// + /// ``` + /// use futures::prelude::*; + /// use futures::stream; + /// use futures::sync::mpsc; + /// + /// let (tx, rx) = mpsc::channel::<i32>(5); + /// + /// let tx = tx.with_flat_map(|x| { + /// stream::iter_ok(vec![42; x].into_iter().map(|y| y)) + /// }); + /// tx.send(5).wait().unwrap(); + /// assert_eq!(rx.collect().wait(), Ok(vec![42, 42, 42, 42, 42])) + /// ``` + fn with_flat_map<U, F, St>(self, f: F) -> WithFlatMap<Self, U, F, St> + where F: FnMut(U) -> St, + St: Stream<Item = Self::SinkItem, Error=Self::SinkError>, + Self: Sized + { + with_flat_map::new(self, f) + } + + /* + fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F> + where F: FnMut(U) -> Self::SinkItem, + Self: Sized; + + fn with_filter<F>(self, f: F) -> WithFilter<Self, F> + where F: FnMut(Self::SinkItem) -> bool, + Self: Sized; + + fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F> + where F: FnMut(U) -> Option<Self::SinkItem>, + Self: Sized; + */ + + /// Transforms the error returned by the sink. + fn sink_map_err<F, E>(self, f: F) -> SinkMapErr<Self, F> + where F: FnOnce(Self::SinkError) -> E, + Self: Sized, + { + map_err::new(self, f) + } + + /// Map this sink's error to any error implementing `From` for this sink's + /// `Error`, returning a new sink. + /// + /// If wanting to map errors of a `Sink + Stream`, use `.sink_from_err().from_err()`. + fn sink_from_err<E: From<Self::SinkError>>(self) -> from_err::SinkFromErr<Self, E> + where Self: Sized, + { + from_err::new(self) + } + + + /// Adds a fixed-size buffer to the current sink. + /// + /// The resulting sink will buffer up to `amt` items when the underlying + /// sink is unwilling to accept additional items. Calling `poll_complete` on + /// the buffered sink will attempt to both empty the buffer and complete + /// processing on the underlying sink. + /// + /// Note that this function consumes the given sink, returning a wrapped + /// version, much like `Iterator::map`. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + #[cfg(feature = "use_std")] + fn buffer(self, amt: usize) -> Buffer<Self> + where Self: Sized + { + buffer::new(self, amt) + } + + /// Fanout items to multiple sinks. + /// + /// This adapter clones each incoming item and forwards it to both this as well as + /// the other sink at the same time. + fn fanout<S>(self, other: S) -> Fanout<Self, S> + where Self: Sized, + Self::SinkItem: Clone, + S: Sink<SinkItem=Self::SinkItem, SinkError=Self::SinkError> + { + fanout::new(self, other) + } + + /// A future that completes when the sink has finished processing all + /// pending requests. + /// + /// The sink itself is returned after flushing is complete; this adapter is + /// intended to be used when you want to stop sending to the sink until + /// all current requests are processed. + fn flush(self) -> Flush<Self> + where Self: Sized + { + flush::new(self) + } + + /// A future that completes after the given item has been fully processed + /// into the sink, including flushing. + /// + /// Note that, **because of the flushing requirement, it is usually better + /// to batch together items to send via `send_all`, rather than flushing + /// between each item.** + /// + /// On completion, the sink is returned. + fn send(self, item: Self::SinkItem) -> Send<Self> + where Self: Sized + { + send::new(self, item) + } + + /// A future that completes after the given stream has been fully processed + /// into the sink, including flushing. + /// + /// This future will drive the stream to keep producing items until it is + /// exhausted, sending each item to the sink. It will complete once both the + /// stream is exhausted, the sink has received all items, the sink has been + /// flushed, and the sink has been closed. + /// + /// Doing `sink.send_all(stream)` is roughly equivalent to + /// `stream.forward(sink)`. The returned future will exhaust all items from + /// `stream` and send them to `self`, closing `self` when all items have been + /// received. + /// + /// On completion, the pair `(sink, source)` is returned. + fn send_all<S>(self, stream: S) -> SendAll<Self, S> + where S: Stream<Item = Self::SinkItem>, + Self::SinkError: From<S::Error>, + Self: Sized + { + send_all::new(self, stream) + } +} + +impl<'a, S: ?Sized + Sink> Sink for &'a mut S { + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: Self::SinkItem) + -> StartSend<Self::SinkItem, Self::SinkError> { + (**self).start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + (**self).poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + (**self).close() + } +} diff --git a/third_party/rust/futures-0.1.29/src/sink/send.rs b/third_party/rust/futures-0.1.29/src/sink/send.rs new file mode 100644 index 0000000000..71173fa836 --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/sink/send.rs @@ -0,0 +1,59 @@ +use {Poll, Async, Future, AsyncSink}; +use sink::Sink; + +/// Future for the `Sink::send` combinator, which sends a value to a sink and +/// then waits until the sink has fully flushed. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct Send<S: Sink> { + sink: Option<S>, + item: Option<S::SinkItem>, +} + +pub fn new<S: Sink>(sink: S, item: S::SinkItem) -> Send<S> { + Send { + sink: Some(sink), + item: Some(item), + } +} + +impl<S: Sink> Send<S> { + /// Get a shared reference to the inner sink. + pub fn get_ref(&self) -> &S { + self.sink.as_ref().take().expect("Attempted Send::get_ref after completion") + } + + /// Get a mutable reference to the inner sink. + pub fn get_mut(&mut self) -> &mut S { + self.sink.as_mut().take().expect("Attempted Send::get_mut after completion") + } + + fn sink_mut(&mut self) -> &mut S { + self.sink.as_mut().take().expect("Attempted to poll Send after completion") + } + + fn take_sink(&mut self) -> S { + self.sink.take().expect("Attempted to poll Send after completion") + } +} + +impl<S: Sink> Future for Send<S> { + type Item = S; + type Error = S::SinkError; + + fn poll(&mut self) -> Poll<S, S::SinkError> { + if let Some(item) = self.item.take() { + if let AsyncSink::NotReady(item) = self.sink_mut().start_send(item)? { + self.item = Some(item); + return Ok(Async::NotReady); + } + } + + // we're done sending the item, but want to block on flushing the + // sink + try_ready!(self.sink_mut().poll_complete()); + + // now everything's emptied, so return the sink for further use + Ok(Async::Ready(self.take_sink())) + } +} diff --git a/third_party/rust/futures-0.1.29/src/sink/send_all.rs b/third_party/rust/futures-0.1.29/src/sink/send_all.rs new file mode 100644 index 0000000000..a230903d1c --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/sink/send_all.rs @@ -0,0 +1,88 @@ +use {Poll, Async, Future, AsyncSink}; +use stream::{Stream, Fuse}; +use sink::Sink; + +/// Future for the `Sink::send_all` combinator, which sends a stream of values +/// to a sink and then waits until the sink has fully flushed those values. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct SendAll<T, U: Stream> { + sink: Option<T>, + stream: Option<Fuse<U>>, + buffered: Option<U::Item>, +} + +pub fn new<T, U>(sink: T, stream: U) -> SendAll<T, U> + where T: Sink, + U: Stream<Item = T::SinkItem>, + T::SinkError: From<U::Error>, +{ + SendAll { + sink: Some(sink), + stream: Some(stream.fuse()), + buffered: None, + } +} + +impl<T, U> SendAll<T, U> + where T: Sink, + U: Stream<Item = T::SinkItem>, + T::SinkError: From<U::Error>, +{ + fn sink_mut(&mut self) -> &mut T { + self.sink.as_mut().take().expect("Attempted to poll SendAll after completion") + } + + fn stream_mut(&mut self) -> &mut Fuse<U> { + self.stream.as_mut().take() + .expect("Attempted to poll SendAll after completion") + } + + fn take_result(&mut self) -> (T, U) { + let sink = self.sink.take() + .expect("Attempted to poll Forward after completion"); + let fuse = self.stream.take() + .expect("Attempted to poll Forward after completion"); + (sink, fuse.into_inner()) + } + + fn try_start_send(&mut self, item: U::Item) -> Poll<(), T::SinkError> { + debug_assert!(self.buffered.is_none()); + if let AsyncSink::NotReady(item) = self.sink_mut().start_send(item)? { + self.buffered = Some(item); + return Ok(Async::NotReady) + } + Ok(Async::Ready(())) + } +} + +impl<T, U> Future for SendAll<T, U> + where T: Sink, + U: Stream<Item = T::SinkItem>, + T::SinkError: From<U::Error>, +{ + type Item = (T, U); + type Error = T::SinkError; + + fn poll(&mut self) -> Poll<(T, U), T::SinkError> { + // If we've got an item buffered already, we need to write it to the + // sink before we can do anything else + if let Some(item) = self.buffered.take() { + try_ready!(self.try_start_send(item)) + } + + loop { + match self.stream_mut().poll()? { + Async::Ready(Some(item)) => try_ready!(self.try_start_send(item)), + Async::Ready(None) => { + try_ready!(self.sink_mut().close()); + return Ok(Async::Ready(self.take_result())) + } + Async::NotReady => { + try_ready!(self.sink_mut().poll_complete()); + return Ok(Async::NotReady) + } + } + } + } +} diff --git a/third_party/rust/futures-0.1.29/src/sink/wait.rs b/third_party/rust/futures-0.1.29/src/sink/wait.rs new file mode 100644 index 0000000000..940a58862f --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/sink/wait.rs @@ -0,0 +1,59 @@ +use sink::Sink; +use executor; + +/// A sink combinator which converts an asynchronous sink to a **blocking +/// sink**. +/// +/// Created by the `Sink::wait` method, this function transforms any sink into a +/// blocking version. This is implemented by blocking the current thread when a +/// sink is otherwise unable to make progress. +#[must_use = "sinks do nothing unless used"] +#[derive(Debug)] +pub struct Wait<S> { + sink: executor::Spawn<S>, +} + +pub fn new<S: Sink>(s: S) -> Wait<S> { + Wait { + sink: executor::spawn(s), + } +} + +impl<S: Sink> Wait<S> { + /// Sends a value to this sink, blocking the current thread until it's able + /// to do so. + /// + /// This function will take the `value` provided and call the underlying + /// sink's `start_send` function until it's ready to accept the value. If + /// the function returns `NotReady` then the current thread is blocked + /// until it is otherwise ready to accept the value. + /// + /// # Return value + /// + /// If `Ok(())` is returned then the `value` provided was successfully sent + /// along the sink, and if `Err(e)` is returned then an error occurred + /// which prevented the value from being sent. + pub fn send(&mut self, value: S::SinkItem) -> Result<(), S::SinkError> { + self.sink.wait_send(value) + } + + /// Flushes any buffered data in this sink, blocking the current thread + /// until it's entirely flushed. + /// + /// This function will call the underlying sink's `poll_complete` method + /// until it returns that it's ready to proceed. If the method returns + /// `NotReady` the current thread will be blocked until it's otherwise + /// ready to proceed. + pub fn flush(&mut self) -> Result<(), S::SinkError> { + self.sink.wait_flush() + } + + /// Close this sink, blocking the current thread until it's entirely closed. + /// + /// This function will call the underlying sink's `close` method + /// until it returns that it's closed. If the method returns + /// `NotReady` the current thread will be blocked until it's otherwise closed. + pub fn close(&mut self) -> Result<(), S::SinkError> { + self.sink.wait_close() + } +} diff --git a/third_party/rust/futures-0.1.29/src/sink/with.rs b/third_party/rust/futures-0.1.29/src/sink/with.rs new file mode 100644 index 0000000000..3326b6e49c --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/sink/with.rs @@ -0,0 +1,153 @@ +use core::mem; +use core::marker::PhantomData; + +use {IntoFuture, Future, Poll, Async, StartSend, AsyncSink}; +use sink::Sink; +use stream::Stream; + +/// Sink for the `Sink::with` combinator, chaining a computation to run *prior* +/// to pushing a value into the underlying sink. +#[derive(Clone, Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct With<S, U, F, Fut> + where S: Sink, + F: FnMut(U) -> Fut, + Fut: IntoFuture, +{ + sink: S, + f: F, + state: State<Fut::Future, S::SinkItem>, + _phantom: PhantomData<fn(U)>, +} + +#[derive(Clone, Debug)] +enum State<Fut, T> { + Empty, + Process(Fut), + Buffered(T), +} + +impl<Fut, T> State<Fut, T> { + fn is_empty(&self) -> bool { + if let State::Empty = *self { + true + } else { + false + } + } +} + +pub fn new<S, U, F, Fut>(sink: S, f: F) -> With<S, U, F, Fut> + where S: Sink, + F: FnMut(U) -> Fut, + Fut: IntoFuture<Item = S::SinkItem>, + Fut::Error: From<S::SinkError>, +{ + With { + state: State::Empty, + sink: sink, + f: f, + _phantom: PhantomData, + } +} + +// Forwarding impl of Stream from the underlying sink +impl<S, U, F, Fut> Stream for With<S, U, F, Fut> + where S: Stream + Sink, + F: FnMut(U) -> Fut, + Fut: IntoFuture +{ + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + self.sink.poll() + } +} + +impl<S, U, F, Fut> With<S, U, F, Fut> + where S: Sink, + F: FnMut(U) -> Fut, + Fut: IntoFuture<Item = S::SinkItem>, + Fut::Error: From<S::SinkError>, +{ + /// Get a shared reference to the inner sink. + pub fn get_ref(&self) -> &S { + &self.sink + } + + /// Get a mutable reference to the inner sink. + pub fn get_mut(&mut self) -> &mut S { + &mut self.sink + } + + /// Consumes this combinator, returning the underlying sink. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.sink + } + + fn poll(&mut self) -> Poll<(), Fut::Error> { + loop { + match mem::replace(&mut self.state, State::Empty) { + State::Empty => break, + State::Process(mut fut) => { + match fut.poll()? { + Async::Ready(item) => { + self.state = State::Buffered(item); + } + Async::NotReady => { + self.state = State::Process(fut); + break + } + } + } + State::Buffered(item) => { + if let AsyncSink::NotReady(item) = self.sink.start_send(item)? { + self.state = State::Buffered(item); + break + } + } + } + } + + if self.state.is_empty() { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + } +} + +impl<S, U, F, Fut> Sink for With<S, U, F, Fut> + where S: Sink, + F: FnMut(U) -> Fut, + Fut: IntoFuture<Item = S::SinkItem>, + Fut::Error: From<S::SinkError>, +{ + type SinkItem = U; + type SinkError = Fut::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Fut::Error> { + if self.poll()?.is_not_ready() { + return Ok(AsyncSink::NotReady(item)) + } + self.state = State::Process((self.f)(item).into_future()); + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Fut::Error> { + // poll ourselves first, to push data downward + let me_ready = self.poll()?; + // always propagate `poll_complete` downward to attempt to make progress + try_ready!(self.sink.poll_complete()); + Ok(me_ready) + } + + fn close(&mut self) -> Poll<(), Fut::Error> { + try_ready!(self.poll()); + Ok(self.sink.close()?) + } +} diff --git a/third_party/rust/futures-0.1.29/src/sink/with_flat_map.rs b/third_party/rust/futures-0.1.29/src/sink/with_flat_map.rs new file mode 100644 index 0000000000..80c4f6605a --- /dev/null +++ b/third_party/rust/futures-0.1.29/src/sink/with_flat_map.rs @@ -0,0 +1,126 @@ +use core::marker::PhantomData; + +use {Poll, Async, StartSend, AsyncSink}; +use sink::Sink; +use stream::Stream; + +/// Sink for the `Sink::with_flat_map` combinator, chaining a computation that returns an iterator +/// to run prior to pushing a value into the underlying sink +#[derive(Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct WithFlatMap<S, U, F, St> +where + S: Sink, + F: FnMut(U) -> St, + St: Stream<Item = S::SinkItem, Error=S::SinkError>, +{ + sink: S, + f: F, + stream: Option<St>, + buffer: Option<S::SinkItem>, + _phantom: PhantomData<fn(U)>, +} + +pub fn new<S, U, F, St>(sink: S, f: F) -> WithFlatMap<S, U, F, St> +where + S: Sink, + F: FnMut(U) -> St, + St: Stream<Item = S::SinkItem, Error=S::SinkError>, +{ + WithFlatMap { + sink: sink, + f: f, + stream: None, + buffer: None, + _phantom: PhantomData, + } +} + +impl<S, U, F, St> WithFlatMap<S, U, F, St> +where + S: Sink, + F: FnMut(U) -> St, + St: Stream<Item = S::SinkItem, Error=S::SinkError>, +{ + /// Get a shared reference to the inner sink. + pub fn get_ref(&self) -> &S { + &self.sink + } + + /// Get a mutable reference to the inner sink. + pub fn get_mut(&mut self) -> &mut S { + &mut self.sink + } + + /// Consumes this combinator, returning the underlying sink. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.sink + } + + fn try_empty_stream(&mut self) -> Poll<(), S::SinkError> { + if let Some(x) = self.buffer.take() { + if let AsyncSink::NotReady(x) = self.sink.start_send(x)? { + self.buffer = Some(x); + return Ok(Async::NotReady); + } + } + if let Some(mut stream) = self.stream.take() { + while let Some(x) = try_ready!(stream.poll()) { + if let AsyncSink::NotReady(x) = self.sink.start_send(x)? { + self.stream = Some(stream); + self.buffer = Some(x); + return Ok(Async::NotReady); + } + } + } + Ok(Async::Ready(())) + } +} + +impl<S, U, F, St> Stream for WithFlatMap<S, U, F, St> +where + S: Stream + Sink, + F: FnMut(U) -> St, + St: Stream<Item = S::SinkItem, Error=S::SinkError>, +{ + type Item = S::Item; + type Error = S::Error; + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + self.sink.poll() + } +} + +impl<S, U, F, St> Sink for WithFlatMap<S, U, F, St> +where + S: Sink, + F: FnMut(U) -> St, + St: Stream<Item = S::SinkItem, Error=S::SinkError>, +{ + type SinkItem = U; + type SinkError = S::SinkError; + fn start_send(&mut self, i: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + if self.try_empty_stream()?.is_not_ready() { + return Ok(AsyncSink::NotReady(i)); + } + assert!(self.stream.is_none()); + self.stream = Some((self.f)(i)); + self.try_empty_stream()?; + Ok(AsyncSink::Ready) + } + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + if self.try_empty_stream()?.is_not_ready() { + return Ok(Async::NotReady); + } + self.sink.poll_complete() + } + fn close(&mut self) -> Poll<(), Self::SinkError> { + if self.try_empty_stream()?.is_not_ready() { + return Ok(Async::NotReady); + } + assert!(self.stream.is_none()); + self.sink.close() + } +} |