diff options
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs')
-rw-r--r-- | third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs b/third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs new file mode 100644 index 0000000000..d3244946e5 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/catch_unwind.rs @@ -0,0 +1,71 @@ +use std::prelude::v1::*; +use std::any::Any; +use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; +use std::mem; + +use super::super::{Poll, Async}; +use super::Stream; + +/// Stream for the `catch_unwind` combinator. +/// +/// This is created by the `Stream::catch_unwind` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct CatchUnwind<S> where S: Stream { + state: CatchUnwindState<S>, +} + +pub fn new<S>(stream: S) -> CatchUnwind<S> + where S: Stream + UnwindSafe, +{ + CatchUnwind { + state: CatchUnwindState::Stream(stream), + } +} + +#[derive(Debug)] +enum CatchUnwindState<S> { + Stream(S), + Eof, + Done, +} + +impl<S> Stream for CatchUnwind<S> + where S: Stream + UnwindSafe, +{ + type Item = Result<S::Item, S::Error>; + type Error = Box<Any + Send>; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + let mut stream = match mem::replace(&mut self.state, CatchUnwindState::Eof) { + CatchUnwindState::Done => panic!("cannot poll after eof"), + CatchUnwindState::Eof => { + self.state = CatchUnwindState::Done; + return Ok(Async::Ready(None)); + } + CatchUnwindState::Stream(stream) => stream, + }; + let res = catch_unwind(|| (stream.poll(), stream)); + match res { + Err(e) => Err(e), // and state is already Eof + Ok((poll, stream)) => { + self.state = CatchUnwindState::Stream(stream); + match poll { + Err(e) => Ok(Async::Ready(Some(Err(e)))), + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(Some(r))) => Ok(Async::Ready(Some(Ok(r)))), + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + } + } + } + } +} + +impl<S: Stream> Stream for AssertUnwindSafe<S> { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + self.0.poll() + } +} |