diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/futures-0.1.31/src/sink/buffer.rs | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/sink/buffer.rs')
-rw-r--r-- | third_party/rust/futures-0.1.31/src/sink/buffer.rs | 108 |
1 files changed, 108 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/sink/buffer.rs b/third_party/rust/futures-0.1.31/src/sink/buffer.rs new file mode 100644 index 0000000000..419579d9a0 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/sink/buffer.rs @@ -0,0 +1,108 @@ +use std::collections::VecDeque; + +use {Poll, Async}; +use {StartSend, AsyncSink}; +use sink::Sink; +use stream::Stream; + +/// Sink for the `Sink::buffer` combinator, which buffers up to some fixed +/// number of values when the underlying sink is unable to accept them. +#[derive(Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct Buffer<S: Sink> { + sink: S, + buf: VecDeque<S::SinkItem>, + + // Track capacity separately from the `VecDeque`, which may be rounded up + cap: usize, +} + +pub fn new<S: Sink>(sink: S, amt: usize) -> Buffer<S> { + Buffer { + sink: sink, + buf: VecDeque::with_capacity(amt), + cap: amt, + } +} + +impl<S: Sink> Buffer<S> { + /// Get a shared reference to the inner sink. + pub fn get_ref(&self) -> &S { + &self.sink + } + + /// Get a mutable reference to the inner sink. + pub fn get_mut(&mut self) -> &mut S { + &mut self.sink + } + + /// Consumes this combinator, returning the underlying sink. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.sink + } + + fn try_empty_buffer(&mut self) -> Poll<(), S::SinkError> { + while let Some(item) = self.buf.pop_front() { + if let AsyncSink::NotReady(item) = self.sink.start_send(item)? { + self.buf.push_front(item); + + return Ok(Async::NotReady); + } + } + + Ok(Async::Ready(())) + } +} + +// Forwarding impl of Stream from the underlying sink +impl<S> Stream for Buffer<S> where S: Sink + Stream { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> { + self.sink.poll() + } +} + +impl<S: Sink> Sink for Buffer<S> { + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + if self.cap == 0 { + return self.sink.start_send(item); + } + + self.try_empty_buffer()?; + if self.buf.len() == self.cap { + return Ok(AsyncSink::NotReady(item)); + } + self.buf.push_back(item); + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + if self.cap == 0 { + return self.sink.poll_complete(); + } + + try_ready!(self.try_empty_buffer()); + debug_assert!(self.buf.is_empty()); + self.sink.poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + if self.cap == 0 { + return self.sink.close(); + } + + if self.buf.len() > 0 { + try_ready!(self.try_empty_buffer()); + } + assert_eq!(self.buf.len(), 0); + self.sink.close() + } +} |