summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs')
-rw-r--r--third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs86
1 files changed, 86 insertions, 0 deletions
diff --git a/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs b/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs
new file mode 100644
index 0000000000..48acd9328b
--- /dev/null
+++ b/third_party/rust/tokio-stream/src/stream_ext/chunks_timeout.rs
@@ -0,0 +1,86 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+use tokio::time::{sleep, Sleep};
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+use std::time::Duration;
+
+pin_project! {
+ /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
+ #[must_use = "streams do nothing unless polled"]
+ #[derive(Debug)]
+ pub struct ChunksTimeout<S: Stream> {
+ #[pin]
+ stream: Fuse<S>,
+ #[pin]
+ deadline: Option<Sleep>,
+ duration: Duration,
+ items: Vec<S::Item>,
+ cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
+ }
+}
+
+impl<S: Stream> ChunksTimeout<S> {
+ pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self {
+ ChunksTimeout {
+ stream: Fuse::new(stream),
+ deadline: None,
+ duration,
+ items: Vec::with_capacity(max_size),
+ cap: max_size,
+ }
+ }
+}
+
+impl<S: Stream> Stream for ChunksTimeout<S> {
+ type Item = Vec<S::Item>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut me = self.as_mut().project();
+ loop {
+ match me.stream.as_mut().poll_next(cx) {
+ Poll::Pending => break,
+ Poll::Ready(Some(item)) => {
+ if me.items.is_empty() {
+ me.deadline.set(Some(sleep(*me.duration)));
+ me.items.reserve_exact(*me.cap);
+ }
+ me.items.push(item);
+ if me.items.len() >= *me.cap {
+ return Poll::Ready(Some(std::mem::take(me.items)));
+ }
+ }
+ Poll::Ready(None) => {
+ // Returning Some here is only correct because we fuse the inner stream.
+ let last = if me.items.is_empty() {
+ None
+ } else {
+ Some(std::mem::take(me.items))
+ };
+
+ return Poll::Ready(last);
+ }
+ }
+ }
+
+ if !me.items.is_empty() {
+ if let Some(deadline) = me.deadline.as_pin_mut() {
+ ready!(deadline.poll(cx));
+ }
+ return Poll::Ready(Some(std::mem::take(me.items)));
+ }
+
+ Poll::Pending
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+ let (lower, upper) = self.stream.size_hint();
+ let lower = (lower / self.cap).saturating_add(chunk_len);
+ let upper = upper.and_then(|x| x.checked_add(chunk_len));
+ (lower, upper)
+ }
+}