summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/src/sink/buffer.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/futures-0.1.31/src/sink/buffer.rs
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/sink/buffer.rs')
-rw-r--r--third_party/rust/futures-0.1.31/src/sink/buffer.rs108
1 files changed, 108 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/sink/buffer.rs b/third_party/rust/futures-0.1.31/src/sink/buffer.rs
new file mode 100644
index 0000000000..419579d9a0
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/sink/buffer.rs
@@ -0,0 +1,108 @@
+use std::collections::VecDeque;
+
+use {Poll, Async};
+use {StartSend, AsyncSink};
+use sink::Sink;
+use stream::Stream;
+
+/// Sink for the `Sink::buffer` combinator, which buffers up to some fixed
+/// number of values when the underlying sink is unable to accept them.
+#[derive(Debug)]
+#[must_use = "sinks do nothing unless polled"]
+pub struct Buffer<S: Sink> {
+ sink: S,
+ buf: VecDeque<S::SinkItem>,
+
+ // Track capacity separately from the `VecDeque`, which may be rounded up
+ cap: usize,
+}
+
+pub fn new<S: Sink>(sink: S, amt: usize) -> Buffer<S> {
+ Buffer {
+ sink: sink,
+ buf: VecDeque::with_capacity(amt),
+ cap: amt,
+ }
+}
+
+impl<S: Sink> Buffer<S> {
+ /// Get a shared reference to the inner sink.
+ pub fn get_ref(&self) -> &S {
+ &self.sink
+ }
+
+ /// Get a mutable reference to the inner sink.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.sink
+ }
+
+ /// Consumes this combinator, returning the underlying sink.
+ ///
+ /// Note that this may discard intermediate state of this combinator, so
+ /// care should be taken to avoid losing resources when this is called.
+ pub fn into_inner(self) -> S {
+ self.sink
+ }
+
+ fn try_empty_buffer(&mut self) -> Poll<(), S::SinkError> {
+ while let Some(item) = self.buf.pop_front() {
+ if let AsyncSink::NotReady(item) = self.sink.start_send(item)? {
+ self.buf.push_front(item);
+
+ return Ok(Async::NotReady);
+ }
+ }
+
+ Ok(Async::Ready(()))
+ }
+}
+
+// Forwarding impl of Stream from the underlying sink
+impl<S> Stream for Buffer<S> where S: Sink + Stream {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
+ self.sink.poll()
+ }
+}
+
+impl<S: Sink> Sink for Buffer<S> {
+ type SinkItem = S::SinkItem;
+ type SinkError = S::SinkError;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+ if self.cap == 0 {
+ return self.sink.start_send(item);
+ }
+
+ self.try_empty_buffer()?;
+ if self.buf.len() == self.cap {
+ return Ok(AsyncSink::NotReady(item));
+ }
+ self.buf.push_back(item);
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ if self.cap == 0 {
+ return self.sink.poll_complete();
+ }
+
+ try_ready!(self.try_empty_buffer());
+ debug_assert!(self.buf.is_empty());
+ self.sink.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ if self.cap == 0 {
+ return self.sink.close();
+ }
+
+ if self.buf.len() > 0 {
+ try_ready!(self.try_empty_buffer());
+ }
+ assert_eq!(self.buf.len(), 0);
+ self.sink.close()
+ }
+}