summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/src/stream/split.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/stream/split.rs')
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/split.rs105
1 files changed, 105 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/stream/split.rs b/third_party/rust/futures-0.1.31/src/stream/split.rs
new file mode 100644
index 0000000000..ddaa52997d
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/split.rs
@@ -0,0 +1,105 @@
+use std::any::Any;
+use std::error::Error;
+use std::fmt;
+
+use {StartSend, Sink, Stream, Poll, Async, AsyncSink};
+use sync::BiLock;
+
+/// A `Stream` part of the split pair
+#[derive(Debug)]
+pub struct SplitStream<S>(BiLock<S>);
+
+impl<S> SplitStream<S> {
+ /// Attempts to put the two "halves" of a split `Stream + Sink` back
+ /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
+ /// a matching pair originating from the same call to `Stream::split`.
+ pub fn reunite(self, other: SplitSink<S>) -> Result<S, ReuniteError<S>> {
+ other.reunite(self)
+ }
+}
+
+impl<S: Stream> Stream for SplitStream<S> {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ match self.0.poll_lock() {
+ Async::Ready(mut inner) => inner.poll(),
+ Async::NotReady => Ok(Async::NotReady),
+ }
+ }
+}
+
+/// A `Sink` part of the split pair
+#[derive(Debug)]
+pub struct SplitSink<S>(BiLock<S>);
+
+impl<S> SplitSink<S> {
+ /// Attempts to put the two "halves" of a split `Stream + Sink` back
+ /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
+ /// a matching pair originating from the same call to `Stream::split`.
+ pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S>> {
+ self.0.reunite(other.0).map_err(|err| {
+ ReuniteError(SplitSink(err.0), SplitStream(err.1))
+ })
+ }
+}
+
+impl<S: Sink> Sink for SplitSink<S> {
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: S::SinkItem)
+ -> StartSend<S::SinkItem, S::SinkError>
+ {
+ match self.0.poll_lock() {
+ Async::Ready(mut inner) => inner.start_send(item),
+ Async::NotReady => Ok(AsyncSink::NotReady(item)),
+ }
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
+ match self.0.poll_lock() {
+ Async::Ready(mut inner) => inner.poll_complete(),
+ Async::NotReady => Ok(Async::NotReady),
+ }
+ }
+
+ fn close(&mut self) -> Poll<(), S::SinkError> {
+ match self.0.poll_lock() {
+ Async::Ready(mut inner) => inner.close(),
+ Async::NotReady => Ok(Async::NotReady),
+ }
+ }
+}
+
+pub fn split<S: Stream + Sink>(s: S) -> (SplitSink<S>, SplitStream<S>) {
+ let (a, b) = BiLock::new(s);
+ let read = SplitStream(a);
+ let write = SplitSink(b);
+ (write, read)
+}
+
+/// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves
+/// of a `Stream + Split`, and thus could not be `reunite`d.
+pub struct ReuniteError<T>(pub SplitSink<T>, pub SplitStream<T>);
+
+impl<T> fmt::Debug for ReuniteError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_tuple("ReuniteError")
+ .field(&"...")
+ .finish()
+ }
+}
+
+impl<T> fmt::Display for ReuniteError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "tried to reunite a SplitStream and SplitSink that don't form a pair")
+ }
+}
+
+impl<T: Any> Error for ReuniteError<T> {
+ fn description(&self) -> &str {
+ "tried to reunite a SplitStream and SplitSink that don't form a pair"
+ }
+}