From 43a97878ce14b72f0981164f87f2e35e14151312 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 11:22:09 +0200 Subject: Adding upstream version 110.0.1. Signed-off-by: Daniel Baumann --- third_party/rust/futures-0.1.31/src/sink/buffer.rs | 108 +++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 third_party/rust/futures-0.1.31/src/sink/buffer.rs (limited to 'third_party/rust/futures-0.1.31/src/sink/buffer.rs') diff --git a/third_party/rust/futures-0.1.31/src/sink/buffer.rs b/third_party/rust/futures-0.1.31/src/sink/buffer.rs new file mode 100644 index 0000000000..419579d9a0 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/sink/buffer.rs @@ -0,0 +1,108 @@ +use std::collections::VecDeque; + +use {Poll, Async}; +use {StartSend, AsyncSink}; +use sink::Sink; +use stream::Stream; + +/// Sink for the `Sink::buffer` combinator, which buffers up to some fixed +/// number of values when the underlying sink is unable to accept them. +#[derive(Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct Buffer { + sink: S, + buf: VecDeque, + + // Track capacity separately from the `VecDeque`, which may be rounded up + cap: usize, +} + +pub fn new(sink: S, amt: usize) -> Buffer { + Buffer { + sink: sink, + buf: VecDeque::with_capacity(amt), + cap: amt, + } +} + +impl Buffer { + /// Get a shared reference to the inner sink. + pub fn get_ref(&self) -> &S { + &self.sink + } + + /// Get a mutable reference to the inner sink. + pub fn get_mut(&mut self) -> &mut S { + &mut self.sink + } + + /// Consumes this combinator, returning the underlying sink. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> S { + self.sink + } + + fn try_empty_buffer(&mut self) -> Poll<(), S::SinkError> { + while let Some(item) = self.buf.pop_front() { + if let AsyncSink::NotReady(item) = self.sink.start_send(item)? { + self.buf.push_front(item); + + return Ok(Async::NotReady); + } + } + + Ok(Async::Ready(())) + } +} + +// Forwarding impl of Stream from the underlying sink +impl Stream for Buffer where S: Sink + Stream { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Poll, S::Error> { + self.sink.poll() + } +} + +impl Sink for Buffer { + type SinkItem = S::SinkItem; + type SinkError = S::SinkError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + if self.cap == 0 { + return self.sink.start_send(item); + } + + self.try_empty_buffer()?; + if self.buf.len() == self.cap { + return Ok(AsyncSink::NotReady(item)); + } + self.buf.push_back(item); + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + if self.cap == 0 { + return self.sink.poll_complete(); + } + + try_ready!(self.try_empty_buffer()); + debug_assert!(self.buf.is_empty()); + self.sink.poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + if self.cap == 0 { + return self.sink.close(); + } + + if self.buf.len() > 0 { + try_ready!(self.try_empty_buffer()); + } + assert_eq!(self.buf.len(), 0); + self.sink.close() + } +} -- cgit v1.2.3