//! Asynchronous sinks //! //! This module contains the `Sink` trait, along with a number of adapter types //! for it. An overview is available in the documentation for the trait itself. //! //! You can find more information/tutorials about streams [online at //! https://tokio.rs][online] //! //! [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/ use {IntoFuture, Poll, StartSend}; use stream::Stream; mod with; mod with_flat_map; // mod with_map; // mod with_filter; // mod with_filter_map; mod flush; mod from_err; mod send; mod send_all; mod map_err; mod fanout; if_std! { mod buffer; mod wait; pub use self::buffer::Buffer; pub use self::wait::Wait; // TODO: consider expanding this via e.g. FromIterator impl Sink for ::std::vec::Vec { type SinkItem = T; type SinkError = (); // Change this to ! once it stabilizes fn start_send(&mut self, item: Self::SinkItem) -> StartSend { self.push(item); Ok(::AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { Ok(::Async::Ready(())) } fn close(&mut self) -> Poll<(), Self::SinkError> { Ok(::Async::Ready(())) } } /// A type alias for `Box` pub type BoxSink = ::std::boxed::Box + ::core::marker::Send>; impl Sink for ::std::boxed::Box { type SinkItem = S::SinkItem; type SinkError = S::SinkError; fn start_send(&mut self, item: Self::SinkItem) -> StartSend { (**self).start_send(item) } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { (**self).poll_complete() } fn close(&mut self) -> Poll<(), Self::SinkError> { (**self).close() } } } pub use self::with::With; pub use self::with_flat_map::WithFlatMap; pub use self::flush::Flush; pub use self::send::Send; pub use self::send_all::SendAll; pub use self::map_err::SinkMapErr; pub use self::from_err::SinkFromErr; pub use self::fanout::Fanout; /// A `Sink` is a value into which other values can be sent, asynchronously. /// /// Basic examples of sinks include the sending side of: /// /// - Channels /// - Sockets /// - Pipes /// /// In addition to such "primitive" sinks, it's typical to layer additional /// functionality, such as buffering, on top of an existing sink. /// /// Sending to a sink is "asynchronous" in the sense that the value may not be /// sent in its entirety immediately. Instead, values are sent in a two-phase /// way: first by initiating a send, and then by polling for completion. This /// two-phase setup is analogous to buffered writing in synchronous code, where /// writes often succeed immediately, but internally are buffered and are /// *actually* written only upon flushing. /// /// In addition, the `Sink` may be *full*, in which case it is not even possible /// to start the sending process. /// /// As with `Future` and `Stream`, the `Sink` trait is built from a few core /// required methods, and a host of default methods for working in a /// higher-level way. The `Sink::send_all` combinator is of particular /// importance: you can use it to send an entire stream to a sink, which is /// the simplest way to ultimately consume a sink. /// /// You can find more information/tutorials about streams [online at /// https://tokio.rs][online] /// /// [online]: https://tokio.rs/docs/getting-started/streams-and-sinks/ pub trait Sink { /// The type of value that the sink accepts. type SinkItem; /// The type of value produced by the sink when an error occurs. type SinkError; /// Begin the process of sending a value to the sink. /// /// As the name suggests, this method only *begins* the process of sending /// the item. If the sink employs buffering, the item isn't fully processed /// until the buffer is fully flushed. Since sinks are designed to work with /// asynchronous I/O, the process of actually writing out the data to an /// underlying object takes place asynchronously. **You *must* use /// `poll_complete` in order to drive completion of a send**. In particular, /// `start_send` does not begin the flushing process /// /// # Return value /// /// This method returns `AsyncSink::Ready` if the sink was able to start /// sending `item`. In that case, you *must* ensure that you call /// `poll_complete` to process the sent item to completion. Note, however, /// that several calls to `start_send` can be made prior to calling /// `poll_complete`, which will work on completing all pending items. /// /// The method returns `AsyncSink::NotReady` if the sink was unable to begin /// sending, usually due to being full. The sink must have attempted to /// complete processing any outstanding requests (equivalent to /// `poll_complete`) before yielding this result. The current task will be /// automatically scheduled for notification when the sink may be ready to /// receive new values. /// /// # Errors /// /// If the sink encounters an error other than being temporarily full, it /// uses the `Err` variant to signal that error. In most cases, such errors /// mean that the sink will permanently be unable to receive items. /// /// # Panics /// /// This method may panic in a few situations, depending on the specific /// sink: /// /// - It is called outside of the context of a task. /// - A previous call to `start_send` or `poll_complete` yielded an error. fn start_send(&mut self, item: Self::SinkItem) -> StartSend; /// Flush all output from this sink, if necessary. /// /// Some sinks may buffer intermediate data as an optimization to improve /// throughput. In other words, if a sink has a corresponding receiver then /// a successful `start_send` above may not guarantee that the value is /// actually ready to be received by the receiver. This function is intended /// to be used to ensure that values do indeed make their way to the /// receiver. /// /// This function will attempt to process any pending requests on behalf of /// the sink and drive it to completion. /// /// # Return value /// /// Returns `Ok(Async::Ready(()))` when no buffered items remain. If this /// value is returned then it is guaranteed that all previous values sent /// via `start_send` will be guaranteed to be available to a listening /// receiver. /// /// Returns `Ok(Async::NotReady)` if there is more work left to do, in which /// case the current task is scheduled to wake up when more progress may be /// possible. /// /// # Errors /// /// Returns `Err` if the sink encounters an error while processing one of /// its pending requests. Due to the buffered nature of requests, it is not /// generally possible to correlate the error with a particular request. As /// with `start_send`, these errors are generally "fatal" for continued use /// of the sink. /// /// # Panics /// /// This method may panic in a few situations, depending on the specific sink: /// /// - It is called outside of the context of a task. /// - A previous call to `start_send` or `poll_complete` yielded an error. /// /// # Compatibility nodes /// /// The name of this method may be slightly misleading as the original /// intention was to have this method be more general than just flushing /// requests. Over time though it was decided to trim back the ambitions of /// this method to what it's always done, just flushing. /// /// In the 0.2 release series of futures this method will be renamed to /// `poll_flush`. For 0.1, however, the breaking change is not happening /// yet. fn poll_complete(&mut self) -> Poll<(), Self::SinkError>; /// A method to indicate that no more values will ever be pushed into this /// sink. /// /// This method is used to indicate that a sink will no longer even be given /// another value by the caller. That is, the `start_send` method above will /// be called no longer (nor `poll_complete`). This method is intended to /// model "graceful shutdown" in various protocols where the intent to shut /// down is followed by a little more blocking work. /// /// Callers of this function should work it it in a similar fashion to /// `poll_complete`. Once called it may return `NotReady` which indicates /// that more external work needs to happen to make progress. The current /// task will be scheduled to receive a notification in such an event, /// however. /// /// Note that this function will imply `poll_complete` above. That is, if a /// sink has buffered data, then it'll be flushed out during a `close` /// operation. It is not necessary to have `poll_complete` return `Ready` /// before a `close` is called. Once a `close` is called, though, /// `poll_complete` cannot be called. /// /// # Return value /// /// This function, like `poll_complete`, returns a `Poll`. The value is /// `Ready` once the close operation has completed. At that point it should /// be safe to drop the sink and deallocate associated resources. /// /// If the value returned is `NotReady` then the sink is not yet closed and /// work needs to be done to close it. The work has been scheduled and the /// current task will receive a notification when it's next ready to call /// this method again. /// /// Finally, this function may also return an error. /// /// # Errors /// /// This function will return an `Err` if any operation along the way during /// the close operation fails. An error typically is fatal for a sink and is /// unable to be recovered from, but in specific situations this may not /// always be true. /// /// Note that it's also typically an error to call `start_send` or /// `poll_complete` after the `close` function is called. This method will /// *initiate* a close, and continuing to send values after that (or attempt /// to flush) may result in strange behavior, panics, errors, etc. Once this /// method is called, it must be the only method called on this `Sink`. /// /// # Panics /// /// This method may panic or cause panics if: /// /// * It is called outside the context of a future's task /// * It is called and then `start_send` or `poll_complete` is called /// /// # Compatibility notes /// /// Note that this function is currently by default a provided function, /// defaulted to calling `poll_complete` above. This function was added /// in the 0.1 series of the crate as a backwards-compatible addition. It /// is intended that in the 0.2 series the method will no longer be a /// default method. /// /// It is highly recommended to consider this method a required method and /// to implement it whenever you implement `Sink` locally. It is especially /// crucial to be sure to close inner sinks, if applicable. #[cfg(feature = "with-deprecated")] fn close(&mut self) -> Poll<(), Self::SinkError> { self.poll_complete() } /// dox (you should see the above, not this) #[cfg(not(feature = "with-deprecated"))] fn close(&mut self) -> Poll<(), Self::SinkError>; /// Creates a new object which will produce a synchronous sink. /// /// The sink returned does **not** implement the `Sink` trait, and instead /// only has two methods: `send` and `flush`. These two methods correspond /// to `start_send` and `poll_complete` above except are executed in a /// blocking fashion. #[cfg(feature = "use_std")] fn wait(self) -> Wait where Self: Sized { wait::new(self) } /// Composes a function *in front of* the sink. /// /// This adapter produces a new sink that passes each value through the /// given function `f` before sending it to `self`. /// /// To process each value, `f` produces a *future*, which is then polled to /// completion before passing its result down to the underlying sink. If the /// future produces an error, that error is returned by the new sink. /// /// Note that this function consumes the given sink, returning a wrapped /// version, much like `Iterator::map`. fn with(self, f: F) -> With where F: FnMut(U) -> Fut, Fut: IntoFuture, Fut::Error: From, Self: Sized { with::new(self, f) } /// Composes a function *in front of* the sink. /// /// This adapter produces a new sink that passes each value through the /// given function `f` before sending it to `self`. /// /// To process each value, `f` produces a *stream*, of which each value /// is passed to the underlying sink. A new value will not be accepted until /// the stream has been drained /// /// Note that this function consumes the given sink, returning a wrapped /// version, much like `Iterator::flat_map`. /// /// # Examples /// --- /// Using this function with an iterator through use of the `stream::iter_ok()` /// function /// /// ``` /// use futures::prelude::*; /// use futures::stream; /// use futures::sync::mpsc; /// /// let (tx, rx) = mpsc::channel::(5); /// /// let tx = tx.with_flat_map(|x| { /// stream::iter_ok(vec![42; x].into_iter().map(|y| y)) /// }); /// tx.send(5).wait().unwrap(); /// assert_eq!(rx.collect().wait(), Ok(vec![42, 42, 42, 42, 42])) /// ``` fn with_flat_map(self, f: F) -> WithFlatMap where F: FnMut(U) -> St, St: Stream, Self: Sized { with_flat_map::new(self, f) } /* fn with_map(self, f: F) -> WithMap where F: FnMut(U) -> Self::SinkItem, Self: Sized; fn with_filter(self, f: F) -> WithFilter where F: FnMut(Self::SinkItem) -> bool, Self: Sized; fn with_filter_map(self, f: F) -> WithFilterMap where F: FnMut(U) -> Option, Self: Sized; */ /// Transforms the error returned by the sink. fn sink_map_err(self, f: F) -> SinkMapErr where F: FnOnce(Self::SinkError) -> E, Self: Sized, { map_err::new(self, f) } /// Map this sink's error to any error implementing `From` for this sink's /// `Error`, returning a new sink. /// /// If wanting to map errors of a `Sink + Stream`, use `.sink_from_err().from_err()`. fn sink_from_err>(self) -> from_err::SinkFromErr where Self: Sized, { from_err::new(self) } /// Adds a fixed-size buffer to the current sink. /// /// The resulting sink will buffer up to `amt` items when the underlying /// sink is unwilling to accept additional items. Calling `poll_complete` on /// the buffered sink will attempt to both empty the buffer and complete /// processing on the underlying sink. /// /// Note that this function consumes the given sink, returning a wrapped /// version, much like `Iterator::map`. /// /// This method is only available when the `use_std` feature of this /// library is activated, and it is activated by default. #[cfg(feature = "use_std")] fn buffer(self, amt: usize) -> Buffer where Self: Sized { buffer::new(self, amt) } /// Fanout items to multiple sinks. /// /// This adapter clones each incoming item and forwards it to both this as well as /// the other sink at the same time. fn fanout(self, other: S) -> Fanout where Self: Sized, Self::SinkItem: Clone, S: Sink { fanout::new(self, other) } /// A future that completes when the sink has finished processing all /// pending requests. /// /// The sink itself is returned after flushing is complete; this adapter is /// intended to be used when you want to stop sending to the sink until /// all current requests are processed. fn flush(self) -> Flush where Self: Sized { flush::new(self) } /// A future that completes after the given item has been fully processed /// into the sink, including flushing. /// /// Note that, **because of the flushing requirement, it is usually better /// to batch together items to send via `send_all`, rather than flushing /// between each item.** /// /// On completion, the sink is returned. fn send(self, item: Self::SinkItem) -> Send where Self: Sized { send::new(self, item) } /// A future that completes after the given stream has been fully processed /// into the sink, including flushing. /// /// This future will drive the stream to keep producing items until it is /// exhausted, sending each item to the sink. It will complete once both the /// stream is exhausted, the sink has received all items, the sink has been /// flushed, and the sink has been closed. /// /// Doing `sink.send_all(stream)` is roughly equivalent to /// `stream.forward(sink)`. The returned future will exhaust all items from /// `stream` and send them to `self`, closing `self` when all items have been /// received. /// /// On completion, the pair `(sink, source)` is returned. fn send_all(self, stream: S) -> SendAll where S: Stream, Self::SinkError: From, Self: Sized { send_all::new(self, stream) } } impl<'a, S: ?Sized + Sink> Sink for &'a mut S { type SinkItem = S::SinkItem; type SinkError = S::SinkError; fn start_send(&mut self, item: Self::SinkItem) -> StartSend { (**self).start_send(item) } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { (**self).poll_complete() } fn close(&mut self) -> Poll<(), Self::SinkError> { (**self).close() } }