diff options
Diffstat (limited to 'third_party/rust/tokio-timer/src/throttle.rs')
-rw-r--r-- | third_party/rust/tokio-timer/src/throttle.rs | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/third_party/rust/tokio-timer/src/throttle.rs b/third_party/rust/tokio-timer/src/throttle.rs new file mode 100644 index 0000000000..97d313313e --- /dev/null +++ b/third_party/rust/tokio-timer/src/throttle.rs @@ -0,0 +1,167 @@ +//! Slow down a stream by enforcing a delay between items. + +use {clock, Delay, Error}; + +use futures::future::Either; +use futures::{Async, Future, Poll, Stream}; + +use std::{ + error::Error as StdError, + fmt::{Display, Formatter, Result as FmtResult}, + time::Duration, +}; + +/// Slow down a stream by enforcing a delay between items. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Throttle<T> { + delay: Option<Delay>, + duration: Duration, + stream: T, +} + +/// Either the error of the underlying stream, or an error within +/// tokio's timing machinery. +#[derive(Debug)] +pub struct ThrottleError<T>(Either<T, Error>); + +impl<T> Throttle<T> { + /// Slow down a stream by enforcing a delay between items. + pub fn new(stream: T, duration: Duration) -> Self { + Self { + delay: None, + duration: duration, + stream: stream, + } + } + + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &T { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this combinator + /// is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the stream + /// which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut T { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so care + /// should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> T { + self.stream + } +} + +impl<T: Stream> Stream for Throttle<T> { + type Item = T::Item; + type Error = ThrottleError<T::Error>; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + if let Some(ref mut delay) = self.delay { + try_ready!({ delay.poll().map_err(ThrottleError::from_timer_err) }); + } + + self.delay = None; + let value = try_ready!({ self.stream.poll().map_err(ThrottleError::from_stream_err) }); + + if value.is_some() { + self.delay = Some(Delay::new(clock::now() + self.duration)); + } + + Ok(Async::Ready(value)) + } +} + +impl<T> ThrottleError<T> { + /// Creates a new `ThrottleError` from the given stream error. + pub fn from_stream_err(err: T) -> Self { + ThrottleError(Either::A(err)) + } + + /// Creates a new `ThrottleError` from the given tokio timer error. + pub fn from_timer_err(err: Error) -> Self { + ThrottleError(Either::B(err)) + } + + /// Attempts to get the underlying stream error, if it is present. + pub fn get_stream_error(&self) -> Option<&T> { + match self.0 { + Either::A(ref x) => Some(x), + _ => None, + } + } + + /// Attempts to get the underlying timer error, if it is present. + pub fn get_timer_error(&self) -> Option<&Error> { + match self.0 { + Either::B(ref x) => Some(x), + _ => None, + } + } + + /// Attempts to extract the underlying stream error, if it is present. + pub fn into_stream_error(self) -> Option<T> { + match self.0 { + Either::A(x) => Some(x), + _ => None, + } + } + + /// Attempts to extract the underlying timer error, if it is present. + pub fn into_timer_error(self) -> Option<Error> { + match self.0 { + Either::B(x) => Some(x), + _ => None, + } + } + + /// Returns whether the throttle error has occured because of an error + /// in the underlying stream. + pub fn is_stream_error(&self) -> bool { + !self.is_timer_error() + } + + /// Returns whether the throttle error has occured because of an error + /// in tokio's timer system. + pub fn is_timer_error(&self) -> bool { + match self.0 { + Either::A(_) => false, + Either::B(_) => true, + } + } +} + +impl<T: StdError> Display for ThrottleError<T> { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + match self.0 { + Either::A(ref err) => write!(f, "stream error: {}", err), + Either::B(ref err) => write!(f, "timer error: {}", err), + } + } +} + +impl<T: StdError + 'static> StdError for ThrottleError<T> { + fn description(&self) -> &str { + match self.0 { + Either::A(_) => "stream error", + Either::B(_) => "timer error", + } + } + + // FIXME(taiki-e): When the minimum support version of tokio reaches Rust 1.30, + // replace this with Error::source. + #[allow(deprecated)] + fn cause(&self) -> Option<&dyn StdError> { + match self.0 { + Either::A(ref err) => Some(err), + Either::B(ref err) => Some(err), + } + } +} |