summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs')
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs71
1 files changed, 71 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs b/third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs
new file mode 100644
index 0000000000..d3244946e5
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs
@@ -0,0 +1,71 @@
+use std::prelude::v1::*;
+use std::any::Any;
+use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
+use std::mem;
+
+use super::super::{Poll, Async};
+use super::Stream;
+
+/// Stream for the `catch_unwind` combinator.
+///
+/// This is created by the `Stream::catch_unwind` method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct CatchUnwind<S> where S: Stream {
+ state: CatchUnwindState<S>,
+}
+
+pub fn new<S>(stream: S) -> CatchUnwind<S>
+ where S: Stream + UnwindSafe,
+{
+ CatchUnwind {
+ state: CatchUnwindState::Stream(stream),
+ }
+}
+
+#[derive(Debug)]
+enum CatchUnwindState<S> {
+ Stream(S),
+ Eof,
+ Done,
+}
+
+impl<S> Stream for CatchUnwind<S>
+ where S: Stream + UnwindSafe,
+{
+ type Item = Result<S::Item, S::Error>;
+ type Error = Box<Any + Send>;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ let mut stream = match mem::replace(&mut self.state, CatchUnwindState::Eof) {
+ CatchUnwindState::Done => panic!("cannot poll after eof"),
+ CatchUnwindState::Eof => {
+ self.state = CatchUnwindState::Done;
+ return Ok(Async::Ready(None));
+ }
+ CatchUnwindState::Stream(stream) => stream,
+ };
+ let res = catch_unwind(|| (stream.poll(), stream));
+ match res {
+ Err(e) => Err(e), // and state is already Eof
+ Ok((poll, stream)) => {
+ self.state = CatchUnwindState::Stream(stream);
+ match poll {
+ Err(e) => Ok(Async::Ready(Some(Err(e)))),
+ Ok(Async::NotReady) => Ok(Async::NotReady),
+ Ok(Async::Ready(Some(r))) => Ok(Async::Ready(Some(Ok(r)))),
+ Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
+ }
+ }
+ }
+ }
+}
+
+impl<S: Stream> Stream for AssertUnwindSafe<S> {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ self.0.poll()
+ }
+}