diff options
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/stream/split.rs')
-rw-r--r-- | third_party/rust/futures-0.1.31/src/stream/split.rs | 105 |
1 files changed, 105 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/stream/split.rs b/third_party/rust/futures-0.1.31/src/stream/split.rs new file mode 100644 index 0000000000..ddaa52997d --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/split.rs @@ -0,0 +1,105 @@ +use std::any::Any; +use std::error::Error; +use std::fmt; + +use {StartSend, Sink, Stream, Poll, Async, AsyncSink}; +use sync::BiLock; + +/// A `Stream` part of the split pair +#[derive(Debug)] +pub struct SplitStream<S>(BiLock<S>); + +impl<S> SplitStream<S> { + /// Attempts to put the two "halves" of a split `Stream + Sink` back + /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are + /// a matching pair originating from the same call to `Stream::split`. + pub fn reunite(self, other: SplitSink<S>) -> Result<S, ReuniteError<S>> { + other.reunite(self) + } +} + +impl<S: Stream> Stream for SplitStream<S> { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + match self.0.poll_lock() { + Async::Ready(mut inner) => inner.poll(), + Async::NotReady => Ok(Async::NotReady), + } + } +} + +/// A `Sink` part of the split pair +#[derive(Debug)] +pub struct SplitSink<S>(BiLock<S>); + +impl<S> SplitSink<S> { + /// Attempts to put the two "halves" of a split `Stream + Sink` back + /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are + /// a matching pair originating from the same call to `Stream::split`. + pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S>> { + self.0.reunite(other.0).map_err(|err| { + ReuniteError(SplitSink(err.0), SplitStream(err.1)) + }) + } +} + +impl<S: Sink> Sink for SplitSink<S> { + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: S::SinkItem) + -> StartSend<S::SinkItem, S::SinkError> + { + match self.0.poll_lock() { + Async::Ready(mut inner) => inner.start_send(item), + Async::NotReady => Ok(AsyncSink::NotReady(item)), + } + } + + fn poll_complete(&mut self) -> Poll<(), S::SinkError> { + match self.0.poll_lock() { + Async::Ready(mut inner) => inner.poll_complete(), + Async::NotReady => Ok(Async::NotReady), + } + } + + fn close(&mut self) -> Poll<(), S::SinkError> { + match self.0.poll_lock() { + Async::Ready(mut inner) => inner.close(), + Async::NotReady => Ok(Async::NotReady), + } + } +} + +pub fn split<S: Stream + Sink>(s: S) -> (SplitSink<S>, SplitStream<S>) { + let (a, b) = BiLock::new(s); + let read = SplitStream(a); + let write = SplitSink(b); + (write, read) +} + +/// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves +/// of a `Stream + Split`, and thus could not be `reunite`d. +pub struct ReuniteError<T>(pub SplitSink<T>, pub SplitStream<T>); + +impl<T> fmt::Debug for ReuniteError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("ReuniteError") + .field(&"...") + .finish() + } +} + +impl<T> fmt::Display for ReuniteError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "tried to reunite a SplitStream and SplitSink that don't form a pair") + } +} + +impl<T: Any> Error for ReuniteError<T> { + fn description(&self) -> &str { + "tried to reunite a SplitStream and SplitSink that don't form a pair" + } +} |