diff options
Diffstat (limited to 'third_party/rust/tokio-stream/src/stream_ext/throttle.rs')
-rw-r--r-- | third_party/rust/tokio-stream/src/stream_ext/throttle.rs | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/src/stream_ext/throttle.rs b/third_party/rust/tokio-stream/src/stream_ext/throttle.rs new file mode 100644 index 0000000000..50001392ee --- /dev/null +++ b/third_party/rust/tokio-stream/src/stream_ext/throttle.rs @@ -0,0 +1,96 @@ +//! Slow down a stream by enforcing a delay between items. + +use crate::Stream; +use tokio::time::{Duration, Instant, Sleep}; + +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; + +use pin_project_lite::pin_project; + +pub(super) fn throttle<T>(duration: Duration, stream: T) -> Throttle<T> +where + T: Stream, +{ + Throttle { + delay: tokio::time::sleep_until(Instant::now() + duration), + duration, + has_delayed: true, + stream, + } +} + +pin_project! { + /// Stream for the [`throttle`](throttle) function. This object is `!Unpin`. If you need it to + /// implement `Unpin` you can pin your throttle like this: `Box::pin(your_throttle)`. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct Throttle<T> { + #[pin] + delay: Sleep, + duration: Duration, + + // Set to true when `delay` has returned ready, but `stream` hasn't. + has_delayed: bool, + + // The stream to throttle + #[pin] + stream: T, + } +} + +impl<T> Throttle<T> { + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &T { + &self.stream + } + + /// Acquires a mutable reference to the underlying stream that this combinator + /// is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the stream + /// which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut T { + &mut self.stream + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so care + /// should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> T { + self.stream + } +} + +impl<T: Stream> Stream for Throttle<T> { + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { + let mut me = self.project(); + let dur = *me.duration; + + if !*me.has_delayed && !is_zero(dur) { + ready!(me.delay.as_mut().poll(cx)); + *me.has_delayed = true; + } + + let value = ready!(me.stream.poll_next(cx)); + + if value.is_some() { + if !is_zero(dur) { + me.delay.reset(Instant::now() + dur); + } + + *me.has_delayed = false; + } + + Poll::Ready(value) + } +} + +fn is_zero(dur: Duration) -> bool { + dur == Duration::from_millis(0) +} |