use std::any::Any; use std::error::Error; use std::fmt; use {StartSend, Sink, Stream, Poll, Async, AsyncSink}; use sync::BiLock; /// A `Stream` part of the split pair #[derive(Debug)] pub struct SplitStream(BiLock); impl SplitStream { /// Attempts to put the two "halves" of a split `Stream + Sink` back /// together. Succeeds only if the `SplitStream` and `SplitSink` are /// a matching pair originating from the same call to `Stream::split`. pub fn reunite(self, other: SplitSink) -> Result> { other.reunite(self) } } impl Stream for SplitStream { type Item = S::Item; type Error = S::Error; fn poll(&mut self) -> Poll, S::Error> { match self.0.poll_lock() { Async::Ready(mut inner) => inner.poll(), Async::NotReady => Ok(Async::NotReady), } } } /// A `Sink` part of the split pair #[derive(Debug)] pub struct SplitSink(BiLock); impl SplitSink { /// Attempts to put the two "halves" of a split `Stream + Sink` back /// together. Succeeds only if the `SplitStream` and `SplitSink` are /// a matching pair originating from the same call to `Stream::split`. pub fn reunite(self, other: SplitStream) -> Result> { self.0.reunite(other.0).map_err(|err| { ReuniteError(SplitSink(err.0), SplitStream(err.1)) }) } } impl Sink for SplitSink { type SinkItem = S::SinkItem; type SinkError = S::SinkError; fn start_send(&mut self, item: S::SinkItem) -> StartSend { match self.0.poll_lock() { Async::Ready(mut inner) => inner.start_send(item), Async::NotReady => Ok(AsyncSink::NotReady(item)), } } fn poll_complete(&mut self) -> Poll<(), S::SinkError> { match self.0.poll_lock() { Async::Ready(mut inner) => inner.poll_complete(), Async::NotReady => Ok(Async::NotReady), } } fn close(&mut self) -> Poll<(), S::SinkError> { match self.0.poll_lock() { Async::Ready(mut inner) => inner.close(), Async::NotReady => Ok(Async::NotReady), } } } pub fn split(s: S) -> (SplitSink, SplitStream) { let (a, b) = BiLock::new(s); let read = SplitStream(a); let write = SplitSink(b); (write, read) } /// Error indicating a `SplitSink` and `SplitStream` were not two halves /// of a `Stream + Split`, and thus could not be `reunite`d. pub struct ReuniteError(pub SplitSink, pub SplitStream); impl fmt::Debug for ReuniteError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_tuple("ReuniteError") .field(&"...") .finish() } } impl fmt::Display for ReuniteError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "tried to reunite a SplitStream and SplitSink that don't form a pair") } } impl Error for ReuniteError { fn description(&self) -> &str { "tried to reunite a SplitStream and SplitSink that don't form a pair" } }