summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.29/src/sink
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.29/src/sink')
-rw-r--r--third_party/rust/futures-0.1.29/src/sink/buffer.rs108
-rw-r--r--third_party/rust/futures-0.1.29/src/sink/fanout.rs135
-rw-r--r--third_party/rust/futures-0.1.29/src/sink/flush.rs46
-rw-r--r--third_party/rust/futures-0.1.29/src/sink/from_err.rs71
-rw-r--r--third_party/rust/futures-0.1.29/src/sink/map_err.rs64
-rw-r--r--third_party/rust/futures-0.1.29/src/sink/mod.rs489
-rw-r--r--third_party/rust/futures-0.1.29/src/sink/send.rs59
-rw-r--r--third_party/rust/futures-0.1.29/src/sink/send_all.rs88
-rw-r--r--third_party/rust/futures-0.1.29/src/sink/wait.rs59
-rw-r--r--third_party/rust/futures-0.1.29/src/sink/with.rs153
-rw-r--r--third_party/rust/futures-0.1.29/src/sink/with_flat_map.rs126
11 files changed, 1398 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.29/src/sink/buffer.rs b/third_party/rust/futures-0.1.29/src/sink/buffer.rs
new file mode 100644
index 0000000000..419579d9a0
--- /dev/null
+++ b/third_party/rust/futures-0.1.29/src/sink/buffer.rs
@@ -0,0 +1,108 @@
+use std::collections::VecDeque;
+
+use {Poll, Async};
+use {StartSend, AsyncSink};
+use sink::Sink;
+use stream::Stream;
+
+/// Sink for the `Sink::buffer` combinator, which buffers up to some fixed
+/// number of values when the underlying sink is unable to accept them.
+#[derive(Debug)]
+#[must_use = "sinks do nothing unless polled"]
+pub struct Buffer<S: Sink> {
+ sink: S,
+ buf: VecDeque<S::SinkItem>,
+
+ // Track capacity separately from the `VecDeque`, which may be rounded up
+ cap: usize,
+}
+
+pub fn new<S: Sink>(sink: S, amt: usize) -> Buffer<S> {
+ Buffer {
+ sink: sink,
+ buf: VecDeque::with_capacity(amt),
+ cap: amt,
+ }
+}
+
+impl<S: Sink> Buffer<S> {
+ /// Get a shared reference to the inner sink.
+ pub fn get_ref(&self) -> &S {
+ &self.sink
+ }
+
+ /// Get a mutable reference to the inner sink.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.sink
+ }
+
+ /// Consumes this combinator, returning the underlying sink.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.sink
+ }
+
+ fn try_empty_buffer(&mut self) -> Poll<(), S::SinkError> {
+ while let Some(item) = self.buf.pop_front() {
+ if let AsyncSink::NotReady(item) = self.sink.start_send(item)? {
+ self.buf.push_front(item);
+
+ return Ok(Async::NotReady);
+ }
+ }
+
+ Ok(Async::Ready(()))
+ }
+}
+
+// Forwarding impl of Stream from the underlying sink
+impl<S> Stream for Buffer<S> where S: Sink + Stream {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ self.sink.poll()
+ }
+}
+
+impl<S: Sink> Sink for Buffer<S> {
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+ if self.cap == 0 {
+ return self.sink.start_send(item);
+ }
+
+ self.try_empty_buffer()?;
+ if self.buf.len() == self.cap {
+ return Ok(AsyncSink::NotReady(item));
+ }
+ self.buf.push_back(item);
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ if self.cap == 0 {
+ return self.sink.poll_complete();
+ }
+
+ try_ready!(self.try_empty_buffer());
+ debug_assert!(self.buf.is_empty());
+ self.sink.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ if self.cap == 0 {
+ return self.sink.close();
+ }
+
+ if self.buf.len() > 0 {
+ try_ready!(self.try_empty_buffer());
+ }
+ assert_eq!(self.buf.len(), 0);
+ self.sink.close()
+ }
+}
diff --git a/third_party/rust/futures-0.1.29/src/sink/fanout.rs b/third_party/rust/futures-0.1.29/src/sink/fanout.rs
new file mode 100644
index 0000000000..8d2456e7e8
--- /dev/null
+++ b/third_party/rust/futures-0.1.29/src/sink/fanout.rs
@@ -0,0 +1,135 @@
+use core::fmt::{Debug, Formatter, Result as FmtResult};
+use core::mem::replace;
+
+use {Async, AsyncSink, Poll, Sink, StartSend};
+
+/// Sink that clones incoming items and forwards them to two sinks at the same time.
+///
+/// Backpressure from any downstream sink propagates up, which means that this sink
+/// can only process items as fast as its _slowest_ downstream sink.
+pub struct Fanout<A: Sink, B: Sink> {
+ left: Downstream<A>,
+ right: Downstream<B>
+}
+
+impl<A: Sink, B: Sink> Fanout<A, B> {
+ /// Consumes this combinator, returning the underlying sinks.
+ ///
+ /// Note that this may discard intermediate state of this combinator,
+ /// so care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> (A, B) {
+ (self.left.sink, self.right.sink)
+ }
+}
+
+impl<A: Sink + Debug, B: Sink + Debug> Debug for Fanout<A, B>
+ where A::SinkItem: Debug,
+ B::SinkItem: Debug
+{
+ fn fmt(&self, f: &mut Formatter) -> FmtResult {
+ f.debug_struct("Fanout")
+ .field("left", &self.left)
+ .field("right", &self.right)
+ .finish()
+ }
+}
+
+pub fn new<A: Sink, B: Sink>(left: A, right: B) -> Fanout<A, B> {
+ Fanout {
+ left: Downstream::new(left),
+ right: Downstream::new(right)
+ }
+}
+
+impl<A, B> Sink for Fanout<A, B>
+ where A: Sink,
+ A::SinkItem: Clone,
+ B: Sink<SinkItem=A::SinkItem, SinkError=A::SinkError>
+{
+ type SinkItem = A::SinkItem;
+ type SinkError = A::SinkError;
+
+ fn start_send(
+ &mut self,
+ item: Self::SinkItem
+ ) -> StartSend<Self::SinkItem, Self::SinkError> {
+ // Attempt to complete processing any outstanding requests.
+ self.left.keep_flushing()?;
+ self.right.keep_flushing()?;
+ // Only if both downstream sinks are ready, start sending the next item.
+ if self.left.is_ready() && self.right.is_ready() {
+ self.left.state = self.left.sink.start_send(item.clone())?;
+ self.right.state = self.right.sink.start_send(item)?;
+ Ok(AsyncSink::Ready)
+ } else {
+ Ok(AsyncSink::NotReady(item))
+ }
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ let left_async = self.left.poll_complete()?;
+ let right_async = self.right.poll_complete()?;
+ // Only if both downstream sinks are ready, signal readiness.
+ if left_async.is_ready() && right_async.is_ready() {
+ Ok(Async::Ready(()))
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ let left_async = self.left.close()?;
+ let right_async = self.right.close()?;
+ // Only if both downstream sinks are ready, signal readiness.
+ if left_async.is_ready() && right_async.is_ready() {
+ Ok(Async::Ready(()))
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+}
+
+#[derive(Debug)]
+struct Downstream<S: Sink> {
+ sink: S,
+ state: AsyncSink<S::SinkItem>
+}
+
+impl<S: Sink> Downstream<S> {
+ fn new(sink: S) -> Self {
+ Downstream { sink: sink, state: AsyncSink::Ready }
+ }
+
+ fn is_ready(&self) -> bool {
+ self.state.is_ready()
+ }
+
+ fn keep_flushing(&mut self) -> Result<(), S::SinkError> {
+ if let AsyncSink::NotReady(item) = replace(&mut self.state, AsyncSink::Ready) {
+ self.state = self.sink.start_send(item)?;
+ }
+ Ok(())
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ self.keep_flushing()?;
+ let async = self.sink.poll_complete()?;
+ // Only if all values have been sent _and_ the underlying
+ // sink is completely flushed, signal readiness.
+ if self.state.is_ready() && async.is_ready() {
+ Ok(Async::Ready(()))
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ self.keep_flushing()?;
+ // If all items have been flushed, initiate close.
+ if self.state.is_ready() {
+ self.sink.close()
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.29/src/sink/flush.rs b/third_party/rust/futures-0.1.29/src/sink/flush.rs
new file mode 100644
index 0000000000..f66811e03d
--- /dev/null
+++ b/third_party/rust/futures-0.1.29/src/sink/flush.rs
@@ -0,0 +1,46 @@
+use {Poll, Async, Future};
+use sink::Sink;
+
+/// Future for the `Sink::flush` combinator, which polls the sink until all data
+/// has been flushed.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless polled"]
+pub struct Flush<S> {
+ sink: Option<S>,
+}
+
+pub fn new<S: Sink>(sink: S) -> Flush<S> {
+ Flush { sink: Some(sink) }
+}
+
+impl<S: Sink> Flush<S> {
+ /// Get a shared reference to the inner sink.
+ pub fn get_ref(&self) -> &S {
+ self.sink.as_ref().expect("Attempted `Flush::get_ref` after the flush completed")
+ }
+
+ /// Get a mutable reference to the inner sink.
+ pub fn get_mut(&mut self) -> &mut S {
+ self.sink.as_mut().expect("Attempted `Flush::get_mut` after the flush completed")
+ }
+
+ /// Consume the `Flush` and return the inner sink.
+ pub fn into_inner(self) -> S {
+ self.sink.expect("Attempted `Flush::into_inner` after the flush completed")
+ }
+}
+
+impl<S: Sink> Future for Flush<S> {
+ type Item = S;
+ type Error = S::SinkError;
+
+ fn poll(&mut self) -> Poll<S, S::SinkError> {
+ let mut sink = self.sink.take().expect("Attempted to poll Flush after it completed");
+ if sink.poll_complete()?.is_ready() {
+ Ok(Async::Ready(sink))
+ } else {
+ self.sink = Some(sink);
+ Ok(Async::NotReady)
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.29/src/sink/from_err.rs b/third_party/rust/futures-0.1.29/src/sink/from_err.rs
new file mode 100644
index 0000000000..4880c30ef4
--- /dev/null
+++ b/third_party/rust/futures-0.1.29/src/sink/from_err.rs
@@ -0,0 +1,71 @@
+use core::marker::PhantomData;
+
+use {Sink, Poll, StartSend};
+
+/// A sink combinator to change the error type of a sink.
+///
+/// This is created by the `Sink::from_err` method.
+#[derive(Clone, Debug)]
+#[must_use = "futures do nothing unless polled"]
+pub struct SinkFromErr<S, E> {
+ sink: S,
+ f: PhantomData<E>
+}
+
+pub fn new<S, E>(sink: S) -> SinkFromErr<S, E>
+ where S: Sink
+{
+ SinkFromErr {
+ sink: sink,
+ f: PhantomData
+ }
+}
+
+impl<S, E> SinkFromErr<S, E> {
+ /// Get a shared reference to the inner sink.
+ pub fn get_ref(&self) -> &S {
+ &self.sink
+ }
+
+ /// Get a mutable reference to the inner sink.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.sink
+ }
+
+ /// Consumes this combinator, returning the underlying sink.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.sink
+ }
+}
+
+impl<S, E> Sink for SinkFromErr<S, E>
+ where S: Sink,
+ E: From<S::SinkError>
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = E;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+ self.sink.start_send(item).map_err(|e| e.into())
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ self.sink.poll_complete().map_err(|e| e.into())
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ self.sink.close().map_err(|e| e.into())
+ }
+}
+
+impl<S: ::stream::Stream, E> ::stream::Stream for SinkFromErr<S, E> {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ self.sink.poll()
+ }
+}
diff --git a/third_party/rust/futures-0.1.29/src/sink/map_err.rs b/third_party/rust/futures-0.1.29/src/sink/map_err.rs
new file mode 100644
index 0000000000..25c168c071
--- /dev/null
+++ b/third_party/rust/futures-0.1.29/src/sink/map_err.rs
@@ -0,0 +1,64 @@
+use sink::Sink;
+
+use {Poll, StartSend, Stream};
+
+/// Sink for the `Sink::sink_map_err` combinator.
+#[derive(Clone,Debug)]
+#[must_use = "sinks do nothing unless polled"]
+pub struct SinkMapErr<S, F> {
+ sink: S,
+ f: Option<F>,
+}
+
+pub fn new<S, F>(s: S, f: F) -> SinkMapErr<S, F> {
+ SinkMapErr { sink: s, f: Some(f) }
+}
+
+impl<S, E> SinkMapErr<S, E> {
+ /// Get a shared reference to the inner sink.
+ pub fn get_ref(&self) -> &S {
+ &self.sink
+ }
+
+ /// Get a mutable reference to the inner sink.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.sink
+ }
+
+ /// Consumes this combinator, returning the underlying sink.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.sink
+ }
+}
+
+impl<S, F, E> Sink for SinkMapErr<S, F>
+ where S: Sink,
+ F: FnOnce(S::SinkError) -> E,
+{
+ type SinkItem = S::SinkItem;
+ type SinkError = E;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+ self.sink.start_send(item).map_err(|e| self.f.take().expect("cannot use MapErr after an error")(e))
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ self.sink.poll_complete().map_err(|e| self.f.take().expect("cannot use MapErr after an error")(e))
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ self.sink.close().map_err(|e| self.f.take().expect("cannot use MapErr after an error")(e))
+ }
+}
+
+impl<S: Stream, F> Stream for SinkMapErr<S, F> {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ self.sink.poll()
+ }
+}
diff --git a/third_party/rust/futures-0.1.29/src/sink/mod.rs b/third_party/rust/futures-0.1.29/src/sink/mod.rs
new file mode 100644
index 0000000000..e5ea97f92a
--- /dev/null
+++ b/third_party/rust/futures-0.1.29/src/sink/mod.rs
@@ -0,0 +1,489 @@
+//! Asynchronous sinks
+//!
+//! This module contains the `Sink` trait, along with a number of adapter types
+//! for it. An overview is available in the documentation for the trait itself.
+//!
+//! You can find more information/tutorials about streams [online at
+//! https://tokio.rs][online]
+//!
+//! [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/
+
+use {IntoFuture, Poll, StartSend};
+use stream::Stream;
+
+mod with;
+mod with_flat_map;
+// mod with_map;
+// mod with_filter;
+// mod with_filter_map;
+mod flush;
+mod from_err;
+mod send;
+mod send_all;
+mod map_err;
+mod fanout;
+
+if_std! {
+ mod buffer;
+ mod wait;
+
+ pub use self::buffer::Buffer;
+ pub use self::wait::Wait;
+
+ // TODO: consider expanding this via e.g. FromIterator
+ impl<T> Sink for ::std::vec::Vec<T> {
+ type SinkItem = T;
+ type SinkError = (); // Change this to ! once it stabilizes
+
+ fn start_send(&mut self, item: Self::SinkItem)
+ -> StartSend<Self::SinkItem, Self::SinkError>
+ {
+ self.push(item);
+ Ok(::AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ Ok(::Async::Ready(()))
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ Ok(::Async::Ready(()))
+ }
+ }
+
+ /// A type alias for `Box<Sink + Send>`
+ pub type BoxSink<T, E> = ::std::boxed::Box<Sink<SinkItem = T, SinkError = E> +
+ ::core::marker::Send>;
+
+ impl<S: ?Sized + Sink> Sink for ::std::boxed::Box<S> {
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: Self::SinkItem)
+ -> StartSend<Self::SinkItem, Self::SinkError> {
+ (**self).start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ (**self).poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ (**self).close()
+ }
+ }
+}
+
+pub use self::with::With;
+pub use self::with_flat_map::WithFlatMap;
+pub use self::flush::Flush;
+pub use self::send::Send;
+pub use self::send_all::SendAll;
+pub use self::map_err::SinkMapErr;
+pub use self::from_err::SinkFromErr;
+pub use self::fanout::Fanout;
+
+/// A `Sink` is a value into which other values can be sent, asynchronously.
+///
+/// Basic examples of sinks include the sending side of:
+///
+/// - Channels
+/// - Sockets
+/// - Pipes
+///
+/// In addition to such "primitive" sinks, it's typical to layer additional
+/// functionality, such as buffering, on top of an existing sink.
+///
+/// Sending to a sink is "asynchronous" in the sense that the value may not be
+/// sent in its entirety immediately. Instead, values are sent in a two-phase
+/// way: first by initiating a send, and then by polling for completion. This
+/// two-phase setup is analogous to buffered writing in synchronous code, where
+/// writes often succeed immediately, but internally are buffered and are
+/// *actually* written only upon flushing.
+///
+/// In addition, the `Sink` may be *full*, in which case it is not even possible
+/// to start the sending process.
+///
+/// As with `Future` and `Stream`, the `Sink` trait is built from a few core
+/// required methods, and a host of default methods for working in a
+/// higher-level way. The `Sink::send_all` combinator is of particular
+/// importance: you can use it to send an entire stream to a sink, which is
+/// the simplest way to ultimately consume a sink.
+///
+/// You can find more information/tutorials about streams [online at
+/// https://tokio.rs][online]
+///
+/// [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/
+pub trait Sink {
+ /// The type of value that the sink accepts.
+ type SinkItem;
+
+ /// The type of value produced by the sink when an error occurs.
+ type SinkError;
+
+ /// Begin the process of sending a value to the sink.
+ ///
+ /// As the name suggests, this method only *begins* the process of sending
+ /// the item. If the sink employs buffering, the item isn't fully processed
+ /// until the buffer is fully flushed. Since sinks are designed to work with
+ /// asynchronous I/O, the process of actually writing out the data to an
+ /// underlying object takes place asynchronously. **You *must* use
+ /// `poll_complete` in order to drive completion of a send**. In particular,
+ /// `start_send` does not begin the flushing process
+ ///
+ /// # Return value
+ ///
+ /// This method returns `AsyncSink::Ready` if the sink was able to start
+ /// sending `item`. In that case, you *must* ensure that you call
+ /// `poll_complete` to process the sent item to completion. Note, however,
+ /// that several calls to `start_send` can be made prior to calling
+ /// `poll_complete`, which will work on completing all pending items.
+ ///
+ /// The method returns `AsyncSink::NotReady` if the sink was unable to begin
+ /// sending, usually due to being full. The sink must have attempted to
+ /// complete processing any outstanding requests (equivalent to
+ /// `poll_complete`) before yielding this result. The current task will be
+ /// automatically scheduled for notification when the sink may be ready to
+ /// receive new values.
+ ///
+ /// # Errors
+ ///
+ /// If the sink encounters an error other than being temporarily full, it
+ /// uses the `Err` variant to signal that error. In most cases, such errors
+ /// mean that the sink will permanently be unable to receive items.
+ ///
+ /// # Panics
+ ///
+ /// This method may panic in a few situations, depending on the specific
+ /// sink:
+ ///
+ /// - It is called outside of the context of a task.
+ /// - A previous call to `start_send` or `poll_complete` yielded an error.
+ fn start_send(&mut self, item: Self::SinkItem)
+ -> StartSend<Self::SinkItem, Self::SinkError>;
+
+ /// Flush all output from this sink, if necessary.
+ ///
+ /// Some sinks may buffer intermediate data as an optimization to improve
+ /// throughput. In other words, if a sink has a corresponding receiver then
+ /// a successful `start_send` above may not guarantee that the value is
+ /// actually ready to be received by the receiver. This function is intended
+ /// to be used to ensure that values do indeed make their way to the
+ /// receiver.
+ ///
+ /// This function will attempt to process any pending requests on behalf of
+ /// the sink and drive it to completion.
+ ///
+ /// # Return value
+ ///
+ /// Returns `Ok(Async::Ready(()))` when no buffered items remain. If this
+ /// value is returned then it is guaranteed that all previous values sent
+ /// via `start_send` will be guaranteed to be available to a listening
+ /// receiver.
+ ///
+ /// Returns `Ok(Async::NotReady)` if there is more work left to do, in which
+ /// case the current task is scheduled to wake up when more progress may be
+ /// possible.
+ ///
+ /// # Errors
+ ///
+ /// Returns `Err` if the sink encounters an error while processing one of
+ /// its pending requests. Due to the buffered nature of requests, it is not
+ /// generally possible to correlate the error with a particular request. As
+ /// with `start_send`, these errors are generally "fatal" for continued use
+ /// of the sink.
+ ///
+ /// # Panics
+ ///
+ /// This method may panic in a few situations, depending on the specific sink:
+ ///
+ /// - It is called outside of the context of a task.
+ /// - A previous call to `start_send` or `poll_complete` yielded an error.
+ ///
+ /// # Compatibility nodes
+ ///
+ /// The name of this method may be slightly misleading as the original
+ /// intention was to have this method be more general than just flushing
+ /// requests. Over time though it was decided to trim back the ambitions of
+ /// this method to what it's always done, just flushing.
+ ///
+ /// In the 0.2 release series of futures this method will be renamed to
+ /// `poll_flush`. For 0.1, however, the breaking change is not happening
+ /// yet.
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError>;
+
+ /// A method to indicate that no more values will ever be pushed into this
+ /// sink.
+ ///
+ /// This method is used to indicate that a sink will no longer even be given
+ /// another value by the caller. That is, the `start_send` method above will
+ /// be called no longer (nor `poll_complete`). This method is intended to
+ /// model "graceful shutdown" in various protocols where the intent to shut
+ /// down is followed by a little more blocking work.
+ ///
+ /// Callers of this function should work it it in a similar fashion to
+ /// `poll_complete`. Once called it may return `NotReady` which indicates
+ /// that more external work needs to happen to make progress. The current
+ /// task will be scheduled to receive a notification in such an event,
+ /// however.
+ ///
+ /// Note that this function will imply `poll_complete` above. That is, if a
+ /// sink has buffered data, then it'll be flushed out during a `close`
+ /// operation. It is not necessary to have `poll_complete` return `Ready`
+ /// before a `close` is called. Once a `close` is called, though,
+ /// `poll_complete` cannot be called.
+ ///
+ /// # Return value
+ ///
+ /// This function, like `poll_complete`, returns a `Poll`. The value is
+ /// `Ready` once the close operation has completed. At that point it should
+ /// be safe to drop the sink and deallocate associated resources.
+ ///
+ /// If the value returned is `NotReady` then the sink is not yet closed and
+ /// work needs to be done to close it. The work has been scheduled and the
+ /// current task will receive a notification when it's next ready to call
+ /// this method again.
+ ///
+ /// Finally, this function may also return an error.
+ ///
+ /// # Errors
+ ///
+ /// This function will return an `Err` if any operation along the way during
+ /// the close operation fails. An error typically is fatal for a sink and is
+ /// unable to be recovered from, but in specific situations this may not
+ /// always be true.
+ ///
+ /// Note that it's also typically an error to call `start_send` or
+ /// `poll_complete` after the `close` function is called. This method will
+ /// *initiate* a close, and continuing to send values after that (or attempt
+ /// to flush) may result in strange behavior, panics, errors, etc. Once this
+ /// method is called, it must be the only method called on this `Sink`.
+ ///
+ /// # Panics
+ ///
+ /// This method may panic or cause panics if:
+ ///
+ /// * It is called outside the context of a future's task
+ /// * It is called and then `start_send` or `poll_complete` is called
+ ///
+ /// # Compatibility notes
+ ///
+ /// Note that this function is currently by default a provided function,
+ /// defaulted to calling `poll_complete` above. This function was added
+ /// in the 0.1 series of the crate as a backwards-compatible addition. It
+ /// is intended that in the 0.2 series the method will no longer be a
+ /// default method.
+ ///
+ /// It is highly recommended to consider this method a required method and
+ /// to implement it whenever you implement `Sink` locally. It is especially
+ /// crucial to be sure to close inner sinks, if applicable.
+ #[cfg(feature = "with-deprecated")]
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ self.poll_complete()
+ }
+
+ /// dox (you should see the above, not this)
+ #[cfg(not(feature = "with-deprecated"))]
+ fn close(&mut self) -> Poll<(), Self::SinkError>;
+
+ /// Creates a new object which will produce a synchronous sink.
+ ///
+ /// The sink returned does **not** implement the `Sink` trait, and instead
+ /// only has two methods: `send` and `flush`. These two methods correspond
+ /// to `start_send` and `poll_complete` above except are executed in a
+ /// blocking fashion.
+ #[cfg(feature = "use_std")]
+ fn wait(self) -> Wait<Self>
+ where Self: Sized
+ {
+ wait::new(self)
+ }
+
+ /// Composes a function *in front of* the sink.
+ ///
+ /// This adapter produces a new sink that passes each value through the
+ /// given function `f` before sending it to `self`.
+ ///
+ /// To process each value, `f` produces a *future*, which is then polled to
+ /// completion before passing its result down to the underlying sink. If the
+ /// future produces an error, that error is returned by the new sink.
+ ///
+ /// Note that this function consumes the given sink, returning a wrapped
+ /// version, much like `Iterator::map`.
+ fn with<U, F, Fut>(self, f: F) -> With<Self, U, F, Fut>
+ where F: FnMut(U) -> Fut,
+ Fut: IntoFuture<Item = Self::SinkItem>,
+ Fut::Error: From<Self::SinkError>,
+ Self: Sized
+ {
+ with::new(self, f)
+ }
+
+ /// Composes a function *in front of* the sink.
+ ///
+ /// This adapter produces a new sink that passes each value through the
+ /// given function `f` before sending it to `self`.
+ ///
+ /// To process each value, `f` produces a *stream*, of which each value
+ /// is passed to the underlying sink. A new value will not be accepted until
+ /// the stream has been drained
+ ///
+ /// Note that this function consumes the given sink, returning a wrapped
+ /// version, much like `Iterator::flat_map`.
+ ///
+ /// # Examples
+ /// ---
+ /// Using this function with an iterator through use of the `stream::iter_ok()`
+ /// function
+ ///
+ /// ```
+ /// use futures::prelude::*;
+ /// use futures::stream;
+ /// use futures::sync::mpsc;
+ ///
+ /// let (tx, rx) = mpsc::channel::<i32>(5);
+ ///
+ /// let tx = tx.with_flat_map(|x| {
+ /// stream::iter_ok(vec![42; x].into_iter().map(|y| y))
+ /// });
+ /// tx.send(5).wait().unwrap();
+ /// assert_eq!(rx.collect().wait(), Ok(vec![42, 42, 42, 42, 42]))
+ /// ```
+ fn with_flat_map<U, F, St>(self, f: F) -> WithFlatMap<Self, U, F, St>
+ where F: FnMut(U) -> St,
+ St: Stream<Item = Self::SinkItem, Error=Self::SinkError>,
+ Self: Sized
+ {
+ with_flat_map::new(self, f)
+ }
+
+ /*
+ fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
+ where F: FnMut(U) -> Self::SinkItem,
+ Self: Sized;
+
+ fn with_filter<F>(self, f: F) -> WithFilter<Self, F>
+ where F: FnMut(Self::SinkItem) -> bool,
+ Self: Sized;
+
+ fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F>
+ where F: FnMut(U) -> Option<Self::SinkItem>,
+ Self: Sized;
+ */
+
+ /// Transforms the error returned by the sink.
+ fn sink_map_err<F, E>(self, f: F) -> SinkMapErr<Self, F>
+ where F: FnOnce(Self::SinkError) -> E,
+ Self: Sized,
+ {
+ map_err::new(self, f)
+ }
+
+ /// Map this sink's error to any error implementing `From` for this sink's
+ /// `Error`, returning a new sink.
+ ///
+ /// If wanting to map errors of a `Sink + Stream`, use `.sink_from_err().from_err()`.
+ fn sink_from_err<E: From<Self::SinkError>>(self) -> from_err::SinkFromErr<Self, E>
+ where Self: Sized,
+ {
+ from_err::new(self)
+ }
+
+
+ /// Adds a fixed-size buffer to the current sink.
+ ///
+ /// The resulting sink will buffer up to `amt` items when the underlying
+ /// sink is unwilling to accept additional items. Calling `poll_complete` on
+ /// the buffered sink will attempt to both empty the buffer and complete
+ /// processing on the underlying sink.
+ ///
+ /// Note that this function consumes the given sink, returning a wrapped
+ /// version, much like `Iterator::map`.
+ ///
+ /// This method is only available when the `use_std` feature of this
+ /// library is activated, and it is activated by default.
+ #[cfg(feature = "use_std")]
+ fn buffer(self, amt: usize) -> Buffer<Self>
+ where Self: Sized
+ {
+ buffer::new(self, amt)
+ }
+
+ /// Fanout items to multiple sinks.
+ ///
+ /// This adapter clones each incoming item and forwards it to both this as well as
+ /// the other sink at the same time.
+ fn fanout<S>(self, other: S) -> Fanout<Self, S>
+ where Self: Sized,
+ Self::SinkItem: Clone,
+ S: Sink<SinkItem=Self::SinkItem, SinkError=Self::SinkError>
+ {
+ fanout::new(self, other)
+ }
+
+ /// A future that completes when the sink has finished processing all
+ /// pending requests.
+ ///
+ /// The sink itself is returned after flushing is complete; this adapter is
+ /// intended to be used when you want to stop sending to the sink until
+ /// all current requests are processed.
+ fn flush(self) -> Flush<Self>
+ where Self: Sized
+ {
+ flush::new(self)
+ }
+
+ /// A future that completes after the given item has been fully processed
+ /// into the sink, including flushing.
+ ///
+ /// Note that, **because of the flushing requirement, it is usually better
+ /// to batch together items to send via `send_all`, rather than flushing
+ /// between each item.**
+ ///
+ /// On completion, the sink is returned.
+ fn send(self, item: Self::SinkItem) -> Send<Self>
+ where Self: Sized
+ {
+ send::new(self, item)
+ }
+
+ /// A future that completes after the given stream has been fully processed
+ /// into the sink, including flushing.
+ ///
+ /// This future will drive the stream to keep producing items until it is
+ /// exhausted, sending each item to the sink. It will complete once both the
+ /// stream is exhausted, the sink has received all items, the sink has been
+ /// flushed, and the sink has been closed.
+ ///
+ /// Doing `sink.send_all(stream)` is roughly equivalent to
+ /// `stream.forward(sink)`. The returned future will exhaust all items from
+ /// `stream` and send them to `self`, closing `self` when all items have been
+ /// received.
+ ///
+ /// On completion, the pair `(sink, source)` is returned.
+ fn send_all<S>(self, stream: S) -> SendAll<Self, S>
+ where S: Stream<Item = Self::SinkItem>,
+ Self::SinkError: From<S::Error>,
+ Self: Sized
+ {
+ send_all::new(self, stream)
+ }
+}
+
+impl<'a, S: ?Sized + Sink> Sink for &'a mut S {
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: Self::SinkItem)
+ -> StartSend<Self::SinkItem, Self::SinkError> {
+ (**self).start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ (**self).poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ (**self).close()
+ }
+}
diff --git a/third_party/rust/futures-0.1.29/src/sink/send.rs b/third_party/rust/futures-0.1.29/src/sink/send.rs
new file mode 100644
index 0000000000..71173fa836
--- /dev/null
+++ b/third_party/rust/futures-0.1.29/src/sink/send.rs
@@ -0,0 +1,59 @@
+use {Poll, Async, Future, AsyncSink};
+use sink::Sink;
+
+/// Future for the `Sink::send` combinator, which sends a value to a sink and
+/// then waits until the sink has fully flushed.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless polled"]
+pub struct Send<S: Sink> {
+ sink: Option<S>,
+ item: Option<S::SinkItem>,
+}
+
+pub fn new<S: Sink>(sink: S, item: S::SinkItem) -> Send<S> {
+ Send {
+ sink: Some(sink),
+ item: Some(item),
+ }
+}
+
+impl<S: Sink> Send<S> {
+ /// Get a shared reference to the inner sink.
+ pub fn get_ref(&self) -> &S {
+ self.sink.as_ref().take().expect("Attempted Send::get_ref after completion")
+ }
+
+ /// Get a mutable reference to the inner sink.
+ pub fn get_mut(&mut self) -> &mut S {
+ self.sink.as_mut().take().expect("Attempted Send::get_mut after completion")
+ }
+
+ fn sink_mut(&mut self) -> &mut S {
+ self.sink.as_mut().take().expect("Attempted to poll Send after completion")
+ }
+
+ fn take_sink(&mut self) -> S {
+ self.sink.take().expect("Attempted to poll Send after completion")
+ }
+}
+
+impl<S: Sink> Future for Send<S> {
+ type Item = S;
+ type Error = S::SinkError;
+
+ fn poll(&mut self) -> Poll<S, S::SinkError> {
+ if let Some(item) = self.item.take() {
+ if let AsyncSink::NotReady(item) = self.sink_mut().start_send(item)? {
+ self.item = Some(item);
+ return Ok(Async::NotReady);
+ }
+ }
+
+ // we're done sending the item, but want to block on flushing the
+ // sink
+ try_ready!(self.sink_mut().poll_complete());
+
+ // now everything's emptied, so return the sink for further use
+ Ok(Async::Ready(self.take_sink()))
+ }
+}
diff --git a/third_party/rust/futures-0.1.29/src/sink/send_all.rs b/third_party/rust/futures-0.1.29/src/sink/send_all.rs
new file mode 100644
index 0000000000..a230903d1c
--- /dev/null
+++ b/third_party/rust/futures-0.1.29/src/sink/send_all.rs
@@ -0,0 +1,88 @@
+use {Poll, Async, Future, AsyncSink};
+use stream::{Stream, Fuse};
+use sink::Sink;
+
+/// Future for the `Sink::send_all` combinator, which sends a stream of values
+/// to a sink and then waits until the sink has fully flushed those values.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless polled"]
+pub struct SendAll<T, U: Stream> {
+ sink: Option<T>,
+ stream: Option<Fuse<U>>,
+ buffered: Option<U::Item>,
+}
+
+pub fn new<T, U>(sink: T, stream: U) -> SendAll<T, U>
+ where T: Sink,
+ U: Stream<Item = T::SinkItem>,
+ T::SinkError: From<U::Error>,
+{
+ SendAll {
+ sink: Some(sink),
+ stream: Some(stream.fuse()),
+ buffered: None,
+ }
+}
+
+impl<T, U> SendAll<T, U>
+ where T: Sink,
+ U: Stream<Item = T::SinkItem>,
+ T::SinkError: From<U::Error>,
+{
+ fn sink_mut(&mut self) -> &mut T {
+ self.sink.as_mut().take().expect("Attempted to poll SendAll after completion")
+ }
+
+ fn stream_mut(&mut self) -> &mut Fuse<U> {
+ self.stream.as_mut().take()
+ .expect("Attempted to poll SendAll after completion")
+ }
+
+ fn take_result(&mut self) -> (T, U) {
+ let sink = self.sink.take()
+ .expect("Attempted to poll Forward after completion");
+ let fuse = self.stream.take()
+ .expect("Attempted to poll Forward after completion");
+ (sink, fuse.into_inner())
+ }
+
+ fn try_start_send(&mut self, item: U::Item) -> Poll<(), T::SinkError> {
+ debug_assert!(self.buffered.is_none());
+ if let AsyncSink::NotReady(item) = self.sink_mut().start_send(item)? {
+ self.buffered = Some(item);
+ return Ok(Async::NotReady)
+ }
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<T, U> Future for SendAll<T, U>
+ where T: Sink,
+ U: Stream<Item = T::SinkItem>,
+ T::SinkError: From<U::Error>,
+{
+ type Item = (T, U);
+ type Error = T::SinkError;
+
+ fn poll(&mut self) -> Poll<(T, U), T::SinkError> {
+ // If we've got an item buffered already, we need to write it to the
+ // sink before we can do anything else
+ if let Some(item) = self.buffered.take() {
+ try_ready!(self.try_start_send(item))
+ }
+
+ loop {
+ match self.stream_mut().poll()? {
+ Async::Ready(Some(item)) => try_ready!(self.try_start_send(item)),
+ Async::Ready(None) => {
+ try_ready!(self.sink_mut().close());
+ return Ok(Async::Ready(self.take_result()))
+ }
+ Async::NotReady => {
+ try_ready!(self.sink_mut().poll_complete());
+ return Ok(Async::NotReady)
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.29/src/sink/wait.rs b/third_party/rust/futures-0.1.29/src/sink/wait.rs
new file mode 100644
index 0000000000..940a58862f
--- /dev/null
+++ b/third_party/rust/futures-0.1.29/src/sink/wait.rs
@@ -0,0 +1,59 @@
+use sink::Sink;
+use executor;
+
+/// A sink combinator which converts an asynchronous sink to a **blocking
+/// sink**.
+///
+/// Created by the `Sink::wait` method, this function transforms any sink into a
+/// blocking version. This is implemented by blocking the current thread when a
+/// sink is otherwise unable to make progress.
+#[must_use = "sinks do nothing unless used"]
+#[derive(Debug)]
+pub struct Wait<S> {
+ sink: executor::Spawn<S>,
+}
+
+pub fn new<S: Sink>(s: S) -> Wait<S> {
+ Wait {
+ sink: executor::spawn(s),
+ }
+}
+
+impl<S: Sink> Wait<S> {
+ /// Sends a value to this sink, blocking the current thread until it's able
+ /// to do so.
+ ///
+ /// This function will take the `value` provided and call the underlying
+ /// sink's `start_send` function until it's ready to accept the value. If
+ /// the function returns `NotReady` then the current thread is blocked
+ /// until it is otherwise ready to accept the value.
+ ///
+ /// # Return value
+ ///
+ /// If `Ok(())` is returned then the `value` provided was successfully sent
+ /// along the sink, and if `Err(e)` is returned then an error occurred
+ /// which prevented the value from being sent.
+ pub fn send(&mut self, value: S::SinkItem) -> Result<(), S::SinkError> {
+ self.sink.wait_send(value)
+ }
+
+ /// Flushes any buffered data in this sink, blocking the current thread
+ /// until it's entirely flushed.
+ ///
+ /// This function will call the underlying sink's `poll_complete` method
+ /// until it returns that it's ready to proceed. If the method returns
+ /// `NotReady` the current thread will be blocked until it's otherwise
+ /// ready to proceed.
+ pub fn flush(&mut self) -> Result<(), S::SinkError> {
+ self.sink.wait_flush()
+ }
+
+ /// Close this sink, blocking the current thread until it's entirely closed.
+ ///
+ /// This function will call the underlying sink's `close` method
+ /// until it returns that it's closed. If the method returns
+ /// `NotReady` the current thread will be blocked until it's otherwise closed.
+ pub fn close(&mut self) -> Result<(), S::SinkError> {
+ self.sink.wait_close()
+ }
+}
diff --git a/third_party/rust/futures-0.1.29/src/sink/with.rs b/third_party/rust/futures-0.1.29/src/sink/with.rs
new file mode 100644
index 0000000000..3326b6e49c
--- /dev/null
+++ b/third_party/rust/futures-0.1.29/src/sink/with.rs
@@ -0,0 +1,153 @@
+use core::mem;
+use core::marker::PhantomData;
+
+use {IntoFuture, Future, Poll, Async, StartSend, AsyncSink};
+use sink::Sink;
+use stream::Stream;
+
+/// Sink for the `Sink::with` combinator, chaining a computation to run *prior*
+/// to pushing a value into the underlying sink.
+#[derive(Clone, Debug)]
+#[must_use = "sinks do nothing unless polled"]
+pub struct With<S, U, F, Fut>
+ where S: Sink,
+ F: FnMut(U) -> Fut,
+ Fut: IntoFuture,
+{
+ sink: S,
+ f: F,
+ state: State<Fut::Future, S::SinkItem>,
+ _phantom: PhantomData<fn(U)>,
+}
+
+#[derive(Clone, Debug)]
+enum State<Fut, T> {
+ Empty,
+ Process(Fut),
+ Buffered(T),
+}
+
+impl<Fut, T> State<Fut, T> {
+ fn is_empty(&self) -> bool {
+ if let State::Empty = *self {
+ true
+ } else {
+ false
+ }
+ }
+}
+
+pub fn new<S, U, F, Fut>(sink: S, f: F) -> With<S, U, F, Fut>
+ where S: Sink,
+ F: FnMut(U) -> Fut,
+ Fut: IntoFuture<Item = S::SinkItem>,
+ Fut::Error: From<S::SinkError>,
+{
+ With {
+ state: State::Empty,
+ sink: sink,
+ f: f,
+ _phantom: PhantomData,
+ }
+}
+
+// Forwarding impl of Stream from the underlying sink
+impl<S, U, F, Fut> Stream for With<S, U, F, Fut>
+ where S: Stream + Sink,
+ F: FnMut(U) -> Fut,
+ Fut: IntoFuture
+{
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ self.sink.poll()
+ }
+}
+
+impl<S, U, F, Fut> With<S, U, F, Fut>
+ where S: Sink,
+ F: FnMut(U) -> Fut,
+ Fut: IntoFuture<Item = S::SinkItem>,
+ Fut::Error: From<S::SinkError>,
+{
+ /// Get a shared reference to the inner sink.
+ pub fn get_ref(&self) -> &S {
+ &self.sink
+ }
+
+ /// Get a mutable reference to the inner sink.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.sink
+ }
+
+ /// Consumes this combinator, returning the underlying sink.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.sink
+ }
+
+ fn poll(&mut self) -> Poll<(), Fut::Error> {
+ loop {
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Empty => break,
+ State::Process(mut fut) => {
+ match fut.poll()? {
+ Async::Ready(item) => {
+ self.state = State::Buffered(item);
+ }
+ Async::NotReady => {
+ self.state = State::Process(fut);
+ break
+ }
+ }
+ }
+ State::Buffered(item) => {
+ if let AsyncSink::NotReady(item) = self.sink.start_send(item)? {
+ self.state = State::Buffered(item);
+ break
+ }
+ }
+ }
+ }
+
+ if self.state.is_empty() {
+ Ok(Async::Ready(()))
+ } else {
+ Ok(Async::NotReady)
+ }
+ }
+}
+
+impl<S, U, F, Fut> Sink for With<S, U, F, Fut>
+ where S: Sink,
+ F: FnMut(U) -> Fut,
+ Fut: IntoFuture<Item = S::SinkItem>,
+ Fut::Error: From<S::SinkError>,
+{
+ type SinkItem = U;
+ type SinkError = Fut::Error;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Fut::Error> {
+ if self.poll()?.is_not_ready() {
+ return Ok(AsyncSink::NotReady(item))
+ }
+ self.state = State::Process((self.f)(item).into_future());
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Fut::Error> {
+ // poll ourselves first, to push data downward
+ let me_ready = self.poll()?;
+ // always propagate `poll_complete` downward to attempt to make progress
+ try_ready!(self.sink.poll_complete());
+ Ok(me_ready)
+ }
+
+ fn close(&mut self) -> Poll<(), Fut::Error> {
+ try_ready!(self.poll());
+ Ok(self.sink.close()?)
+ }
+}
diff --git a/third_party/rust/futures-0.1.29/src/sink/with_flat_map.rs b/third_party/rust/futures-0.1.29/src/sink/with_flat_map.rs
new file mode 100644
index 0000000000..80c4f6605a
--- /dev/null
+++ b/third_party/rust/futures-0.1.29/src/sink/with_flat_map.rs
@@ -0,0 +1,126 @@
+use core::marker::PhantomData;
+
+use {Poll, Async, StartSend, AsyncSink};
+use sink::Sink;
+use stream::Stream;
+
+/// Sink for the `Sink::with_flat_map` combinator, chaining a computation that returns an iterator
+/// to run prior to pushing a value into the underlying sink
+#[derive(Debug)]
+#[must_use = "sinks do nothing unless polled"]
+pub struct WithFlatMap<S, U, F, St>
+where
+ S: Sink,
+ F: FnMut(U) -> St,
+ St: Stream<Item = S::SinkItem, Error=S::SinkError>,
+{
+ sink: S,
+ f: F,
+ stream: Option<St>,
+ buffer: Option<S::SinkItem>,
+ _phantom: PhantomData<fn(U)>,
+}
+
+pub fn new<S, U, F, St>(sink: S, f: F) -> WithFlatMap<S, U, F, St>
+where
+ S: Sink,
+ F: FnMut(U) -> St,
+ St: Stream<Item = S::SinkItem, Error=S::SinkError>,
+{
+ WithFlatMap {
+ sink: sink,
+ f: f,
+ stream: None,
+ buffer: None,
+ _phantom: PhantomData,
+ }
+}
+
+impl<S, U, F, St> WithFlatMap<S, U, F, St>
+where
+ S: Sink,
+ F: FnMut(U) -> St,
+ St: Stream<Item = S::SinkItem, Error=S::SinkError>,
+{
+ /// Get a shared reference to the inner sink.
+ pub fn get_ref(&self) -> &S {
+ &self.sink
+ }
+
+ /// Get a mutable reference to the inner sink.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.sink
+ }
+
+ /// Consumes this combinator, returning the underlying sink.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.sink
+ }
+
+ fn try_empty_stream(&mut self) -> Poll<(), S::SinkError> {
+ if let Some(x) = self.buffer.take() {
+ if let AsyncSink::NotReady(x) = self.sink.start_send(x)? {
+ self.buffer = Some(x);
+ return Ok(Async::NotReady);
+ }
+ }
+ if let Some(mut stream) = self.stream.take() {
+ while let Some(x) = try_ready!(stream.poll()) {
+ if let AsyncSink::NotReady(x) = self.sink.start_send(x)? {
+ self.stream = Some(stream);
+ self.buffer = Some(x);
+ return Ok(Async::NotReady);
+ }
+ }
+ }
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<S, U, F, St> Stream for WithFlatMap<S, U, F, St>
+where
+ S: Stream + Sink,
+ F: FnMut(U) -> St,
+ St: Stream<Item = S::SinkItem, Error=S::SinkError>,
+{
+ type Item = S::Item;
+ type Error = S::Error;
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ self.sink.poll()
+ }
+}
+
+impl<S, U, F, St> Sink for WithFlatMap<S, U, F, St>
+where
+ S: Sink,
+ F: FnMut(U) -> St,
+ St: Stream<Item = S::SinkItem, Error=S::SinkError>,
+{
+ type SinkItem = U;
+ type SinkError = S::SinkError;
+ fn start_send(&mut self, i: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+ if self.try_empty_stream()?.is_not_ready() {
+ return Ok(AsyncSink::NotReady(i));
+ }
+ assert!(self.stream.is_none());
+ self.stream = Some((self.f)(i));
+ self.try_empty_stream()?;
+ Ok(AsyncSink::Ready)
+ }
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ if self.try_empty_stream()?.is_not_ready() {
+ return Ok(Async::NotReady);
+ }
+ self.sink.poll_complete()
+ }
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ if self.try_empty_stream()?.is_not_ready() {
+ return Ok(Async::NotReady);
+ }
+ assert!(self.stream.is_none());
+ self.sink.close()
+ }
+}