summaryrefslogtreecommitdiffstats
path: root/third_party/rust/hyper/src/proto/h1/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/hyper/src/proto/h1/io.rs')
-rw-r--r--third_party/rust/hyper/src/proto/h1/io.rs1002
1 files changed, 1002 insertions, 0 deletions
diff --git a/third_party/rust/hyper/src/proto/h1/io.rs b/third_party/rust/hyper/src/proto/h1/io.rs
new file mode 100644
index 0000000000..1d251e2c84
--- /dev/null
+++ b/third_party/rust/hyper/src/proto/h1/io.rs
@@ -0,0 +1,1002 @@
+use std::cmp;
+use std::fmt;
+#[cfg(all(feature = "server", feature = "runtime"))]
+use std::future::Future;
+use std::io::{self, IoSlice};
+use std::marker::Unpin;
+use std::mem::MaybeUninit;
+#[cfg(all(feature = "server", feature = "runtime"))]
+use std::time::Duration;
+
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+#[cfg(all(feature = "server", feature = "runtime"))]
+use tokio::time::Instant;
+use tracing::{debug, trace};
+
+use super::{Http1Transaction, ParseContext, ParsedMessage};
+use crate::common::buf::BufList;
+use crate::common::{task, Pin, Poll};
+
+/// The initial buffer size allocated before trying to read from IO.
+pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
+
+/// The minimum value that can be set to max buffer size.
+pub(crate) const MINIMUM_MAX_BUFFER_SIZE: usize = INIT_BUFFER_SIZE;
+
+/// The default maximum read buffer size. If the buffer gets this big and
+/// a message is still not complete, a `TooLarge` error is triggered.
+// Note: if this changes, update server::conn::Http::max_buf_size docs.
+pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
+
+/// The maximum number of distinct `Buf`s to hold in a list before requiring
+/// a flush. Only affects when the buffer strategy is to queue buffers.
+///
+/// Note that a flush can happen before reaching the maximum. This simply
+/// forces a flush if the queue gets this big.
+const MAX_BUF_LIST_BUFFERS: usize = 16;
+
+pub(crate) struct Buffered<T, B> {
+ flush_pipeline: bool,
+ io: T,
+ read_blocked: bool,
+ read_buf: BytesMut,
+ read_buf_strategy: ReadStrategy,
+ write_buf: WriteBuf<B>,
+}
+
+impl<T, B> fmt::Debug for Buffered<T, B>
+where
+ B: Buf,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Buffered")
+ .field("read_buf", &self.read_buf)
+ .field("write_buf", &self.write_buf)
+ .finish()
+ }
+}
+
+impl<T, B> Buffered<T, B>
+where
+ T: AsyncRead + AsyncWrite + Unpin,
+ B: Buf,
+{
+ pub(crate) fn new(io: T) -> Buffered<T, B> {
+ let strategy = if io.is_write_vectored() {
+ WriteStrategy::Queue
+ } else {
+ WriteStrategy::Flatten
+ };
+ let write_buf = WriteBuf::new(strategy);
+ Buffered {
+ flush_pipeline: false,
+ io,
+ read_blocked: false,
+ read_buf: BytesMut::with_capacity(0),
+ read_buf_strategy: ReadStrategy::default(),
+ write_buf,
+ }
+ }
+
+ #[cfg(feature = "server")]
+ pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
+ debug_assert!(!self.write_buf.has_remaining());
+ self.flush_pipeline = enabled;
+ if enabled {
+ self.set_write_strategy_flatten();
+ }
+ }
+
+ pub(crate) fn set_max_buf_size(&mut self, max: usize) {
+ assert!(
+ max >= MINIMUM_MAX_BUFFER_SIZE,
+ "The max_buf_size cannot be smaller than {}.",
+ MINIMUM_MAX_BUFFER_SIZE,
+ );
+ self.read_buf_strategy = ReadStrategy::with_max(max);
+ self.write_buf.max_buf_size = max;
+ }
+
+ #[cfg(feature = "client")]
+ pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
+ self.read_buf_strategy = ReadStrategy::Exact(sz);
+ }
+
+ pub(crate) fn set_write_strategy_flatten(&mut self) {
+ // this should always be called only at construction time,
+ // so this assert is here to catch myself
+ debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
+ self.write_buf.set_strategy(WriteStrategy::Flatten);
+ }
+
+ pub(crate) fn set_write_strategy_queue(&mut self) {
+ // this should always be called only at construction time,
+ // so this assert is here to catch myself
+ debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
+ self.write_buf.set_strategy(WriteStrategy::Queue);
+ }
+
+ pub(crate) fn read_buf(&self) -> &[u8] {
+ self.read_buf.as_ref()
+ }
+
+ #[cfg(test)]
+ #[cfg(feature = "nightly")]
+ pub(super) fn read_buf_mut(&mut self) -> &mut BytesMut {
+ &mut self.read_buf
+ }
+
+ /// Return the "allocated" available space, not the potential space
+ /// that could be allocated in the future.
+ fn read_buf_remaining_mut(&self) -> usize {
+ self.read_buf.capacity() - self.read_buf.len()
+ }
+
+ /// Return whether we can append to the headers buffer.
+ ///
+ /// Reasons we can't:
+ /// - The write buf is in queue mode, and some of the past body is still
+ /// needing to be flushed.
+ pub(crate) fn can_headers_buf(&self) -> bool {
+ !self.write_buf.queue.has_remaining()
+ }
+
+ pub(crate) fn headers_buf(&mut self) -> &mut Vec<u8> {
+ let buf = self.write_buf.headers_mut();
+ &mut buf.bytes
+ }
+
+ pub(super) fn write_buf(&mut self) -> &mut WriteBuf<B> {
+ &mut self.write_buf
+ }
+
+ pub(crate) fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
+ self.write_buf.buffer(buf)
+ }
+
+ pub(crate) fn can_buffer(&self) -> bool {
+ self.flush_pipeline || self.write_buf.can_buffer()
+ }
+
+ pub(crate) fn consume_leading_lines(&mut self) {
+ if !self.read_buf.is_empty() {
+ let mut i = 0;
+ while i < self.read_buf.len() {
+ match self.read_buf[i] {
+ b'\r' | b'\n' => i += 1,
+ _ => break,
+ }
+ }
+ self.read_buf.advance(i);
+ }
+ }
+
+ pub(super) fn parse<S>(
+ &mut self,
+ cx: &mut task::Context<'_>,
+ parse_ctx: ParseContext<'_>,
+ ) -> Poll<crate::Result<ParsedMessage<S::Incoming>>>
+ where
+ S: Http1Transaction,
+ {
+ loop {
+ match super::role::parse_headers::<S>(
+ &mut self.read_buf,
+ ParseContext {
+ cached_headers: parse_ctx.cached_headers,
+ req_method: parse_ctx.req_method,
+ h1_parser_config: parse_ctx.h1_parser_config.clone(),
+ #[cfg(all(feature = "server", feature = "runtime"))]
+ h1_header_read_timeout: parse_ctx.h1_header_read_timeout,
+ #[cfg(all(feature = "server", feature = "runtime"))]
+ h1_header_read_timeout_fut: parse_ctx.h1_header_read_timeout_fut,
+ #[cfg(all(feature = "server", feature = "runtime"))]
+ h1_header_read_timeout_running: parse_ctx.h1_header_read_timeout_running,
+ preserve_header_case: parse_ctx.preserve_header_case,
+ #[cfg(feature = "ffi")]
+ preserve_header_order: parse_ctx.preserve_header_order,
+ h09_responses: parse_ctx.h09_responses,
+ #[cfg(feature = "ffi")]
+ on_informational: parse_ctx.on_informational,
+ #[cfg(feature = "ffi")]
+ raw_headers: parse_ctx.raw_headers,
+ },
+ )? {
+ Some(msg) => {
+ debug!("parsed {} headers", msg.head.headers.len());
+
+ #[cfg(all(feature = "server", feature = "runtime"))]
+ {
+ *parse_ctx.h1_header_read_timeout_running = false;
+
+ if let Some(h1_header_read_timeout_fut) =
+ parse_ctx.h1_header_read_timeout_fut
+ {
+ // Reset the timer in order to avoid woken up when the timeout finishes
+ h1_header_read_timeout_fut
+ .as_mut()
+ .reset(Instant::now() + Duration::from_secs(30 * 24 * 60 * 60));
+ }
+ }
+ return Poll::Ready(Ok(msg));
+ }
+ None => {
+ let max = self.read_buf_strategy.max();
+ if self.read_buf.len() >= max {
+ debug!("max_buf_size ({}) reached, closing", max);
+ return Poll::Ready(Err(crate::Error::new_too_large()));
+ }
+
+ #[cfg(all(feature = "server", feature = "runtime"))]
+ if *parse_ctx.h1_header_read_timeout_running {
+ if let Some(h1_header_read_timeout_fut) =
+ parse_ctx.h1_header_read_timeout_fut
+ {
+ if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() {
+ *parse_ctx.h1_header_read_timeout_running = false;
+
+ tracing::warn!("read header from client timeout");
+ return Poll::Ready(Err(crate::Error::new_header_timeout()));
+ }
+ }
+ }
+ }
+ }
+ if ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? == 0 {
+ trace!("parse eof");
+ return Poll::Ready(Err(crate::Error::new_incomplete()));
+ }
+ }
+ }
+
+ pub(crate) fn poll_read_from_io(
+ &mut self,
+ cx: &mut task::Context<'_>,
+ ) -> Poll<io::Result<usize>> {
+ self.read_blocked = false;
+ let next = self.read_buf_strategy.next();
+ if self.read_buf_remaining_mut() < next {
+ self.read_buf.reserve(next);
+ }
+
+ let dst = self.read_buf.chunk_mut();
+ let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
+ let mut buf = ReadBuf::uninit(dst);
+ match Pin::new(&mut self.io).poll_read(cx, &mut buf) {
+ Poll::Ready(Ok(_)) => {
+ let n = buf.filled().len();
+ trace!("received {} bytes", n);
+ unsafe {
+ // Safety: we just read that many bytes into the
+ // uninitialized part of the buffer, so this is okay.
+ // @tokio pls give me back `poll_read_buf` thanks
+ self.read_buf.advance_mut(n);
+ }
+ self.read_buf_strategy.record(n);
+ Poll::Ready(Ok(n))
+ }
+ Poll::Pending => {
+ self.read_blocked = true;
+ Poll::Pending
+ }
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ }
+ }
+
+ pub(crate) fn into_inner(self) -> (T, Bytes) {
+ (self.io, self.read_buf.freeze())
+ }
+
+ pub(crate) fn io_mut(&mut self) -> &mut T {
+ &mut self.io
+ }
+
+ pub(crate) fn is_read_blocked(&self) -> bool {
+ self.read_blocked
+ }
+
+ pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ if self.flush_pipeline && !self.read_buf.is_empty() {
+ Poll::Ready(Ok(()))
+ } else if self.write_buf.remaining() == 0 {
+ Pin::new(&mut self.io).poll_flush(cx)
+ } else {
+ if let WriteStrategy::Flatten = self.write_buf.strategy {
+ return self.poll_flush_flattened(cx);
+ }
+
+ const MAX_WRITEV_BUFS: usize = 64;
+ loop {
+ let n = {
+ let mut iovs = [IoSlice::new(&[]); MAX_WRITEV_BUFS];
+ let len = self.write_buf.chunks_vectored(&mut iovs);
+ ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))?
+ };
+ // TODO(eliza): we have to do this manually because
+ // `poll_write_buf` doesn't exist in Tokio 0.3 yet...when
+ // `poll_write_buf` comes back, the manual advance will need to leave!
+ self.write_buf.advance(n);
+ debug!("flushed {} bytes", n);
+ if self.write_buf.remaining() == 0 {
+ break;
+ } else if n == 0 {
+ trace!(
+ "write returned zero, but {} bytes remaining",
+ self.write_buf.remaining()
+ );
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
+ }
+ }
+ Pin::new(&mut self.io).poll_flush(cx)
+ }
+ }
+
+ /// Specialized version of `flush` when strategy is Flatten.
+ ///
+ /// Since all buffered bytes are flattened into the single headers buffer,
+ /// that skips some bookkeeping around using multiple buffers.
+ fn poll_flush_flattened(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+ loop {
+ let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.chunk()))?;
+ debug!("flushed {} bytes", n);
+ self.write_buf.headers.advance(n);
+ if self.write_buf.headers.remaining() == 0 {
+ self.write_buf.headers.reset();
+ break;
+ } else if n == 0 {
+ trace!(
+ "write returned zero, but {} bytes remaining",
+ self.write_buf.remaining()
+ );
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
+ }
+ }
+ Pin::new(&mut self.io).poll_flush(cx)
+ }
+
+ #[cfg(test)]
+ fn flush<'a>(&'a mut self) -> impl std::future::Future<Output = io::Result<()>> + 'a {
+ futures_util::future::poll_fn(move |cx| self.poll_flush(cx))
+ }
+}
+
+// The `B` is a `Buf`, we never project a pin to it
+impl<T: Unpin, B> Unpin for Buffered<T, B> {}
+
+// TODO: This trait is old... at least rename to PollBytes or something...
+pub(crate) trait MemRead {
+ fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>>;
+}
+
+impl<T, B> MemRead for Buffered<T, B>
+where
+ T: AsyncRead + AsyncWrite + Unpin,
+ B: Buf,
+{
+ fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
+ if !self.read_buf.is_empty() {
+ let n = std::cmp::min(len, self.read_buf.len());
+ Poll::Ready(Ok(self.read_buf.split_to(n).freeze()))
+ } else {
+ let n = ready!(self.poll_read_from_io(cx))?;
+ Poll::Ready(Ok(self.read_buf.split_to(::std::cmp::min(len, n)).freeze()))
+ }
+ }
+}
+
+#[derive(Clone, Copy, Debug)]
+enum ReadStrategy {
+ Adaptive {
+ decrease_now: bool,
+ next: usize,
+ max: usize,
+ },
+ #[cfg(feature = "client")]
+ Exact(usize),
+}
+
+impl ReadStrategy {
+ fn with_max(max: usize) -> ReadStrategy {
+ ReadStrategy::Adaptive {
+ decrease_now: false,
+ next: INIT_BUFFER_SIZE,
+ max,
+ }
+ }
+
+ fn next(&self) -> usize {
+ match *self {
+ ReadStrategy::Adaptive { next, .. } => next,
+ #[cfg(feature = "client")]
+ ReadStrategy::Exact(exact) => exact,
+ }
+ }
+
+ fn max(&self) -> usize {
+ match *self {
+ ReadStrategy::Adaptive { max, .. } => max,
+ #[cfg(feature = "client")]
+ ReadStrategy::Exact(exact) => exact,
+ }
+ }
+
+ fn record(&mut self, bytes_read: usize) {
+ match *self {
+ ReadStrategy::Adaptive {
+ ref mut decrease_now,
+ ref mut next,
+ max,
+ ..
+ } => {
+ if bytes_read >= *next {
+ *next = cmp::min(incr_power_of_two(*next), max);
+ *decrease_now = false;
+ } else {
+ let decr_to = prev_power_of_two(*next);
+ if bytes_read < decr_to {
+ if *decrease_now {
+ *next = cmp::max(decr_to, INIT_BUFFER_SIZE);
+ *decrease_now = false;
+ } else {
+ // Decreasing is a two "record" process.
+ *decrease_now = true;
+ }
+ } else {
+ // A read within the current range should cancel
+ // a potential decrease, since we just saw proof
+ // that we still need this size.
+ *decrease_now = false;
+ }
+ }
+ }
+ #[cfg(feature = "client")]
+ ReadStrategy::Exact(_) => (),
+ }
+ }
+}
+
+fn incr_power_of_two(n: usize) -> usize {
+ n.saturating_mul(2)
+}
+
+fn prev_power_of_two(n: usize) -> usize {
+ // Only way this shift can underflow is if n is less than 4.
+ // (Which would means `usize::MAX >> 64` and underflowed!)
+ debug_assert!(n >= 4);
+ (::std::usize::MAX >> (n.leading_zeros() + 2)) + 1
+}
+
+impl Default for ReadStrategy {
+ fn default() -> ReadStrategy {
+ ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE)
+ }
+}
+
+#[derive(Clone)]
+pub(crate) struct Cursor<T> {
+ bytes: T,
+ pos: usize,
+}
+
+impl<T: AsRef<[u8]>> Cursor<T> {
+ #[inline]
+ pub(crate) fn new(bytes: T) -> Cursor<T> {
+ Cursor { bytes, pos: 0 }
+ }
+}
+
+impl Cursor<Vec<u8>> {
+ /// If we've advanced the position a bit in this cursor, and wish to
+ /// extend the underlying vector, we may wish to unshift the "read" bytes
+ /// off, and move everything else over.
+ fn maybe_unshift(&mut self, additional: usize) {
+ if self.pos == 0 {
+ // nothing to do
+ return;
+ }
+
+ if self.bytes.capacity() - self.bytes.len() >= additional {
+ // there's room!
+ return;
+ }
+
+ self.bytes.drain(0..self.pos);
+ self.pos = 0;
+ }
+
+ fn reset(&mut self) {
+ self.pos = 0;
+ self.bytes.clear();
+ }
+}
+
+impl<T: AsRef<[u8]>> fmt::Debug for Cursor<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Cursor")
+ .field("pos", &self.pos)
+ .field("len", &self.bytes.as_ref().len())
+ .finish()
+ }
+}
+
+impl<T: AsRef<[u8]>> Buf for Cursor<T> {
+ #[inline]
+ fn remaining(&self) -> usize {
+ self.bytes.as_ref().len() - self.pos
+ }
+
+ #[inline]
+ fn chunk(&self) -> &[u8] {
+ &self.bytes.as_ref()[self.pos..]
+ }
+
+ #[inline]
+ fn advance(&mut self, cnt: usize) {
+ debug_assert!(self.pos + cnt <= self.bytes.as_ref().len());
+ self.pos += cnt;
+ }
+}
+
+// an internal buffer to collect writes before flushes
+pub(super) struct WriteBuf<B> {
+ /// Re-usable buffer that holds message headers
+ headers: Cursor<Vec<u8>>,
+ max_buf_size: usize,
+ /// Deque of user buffers if strategy is Queue
+ queue: BufList<B>,
+ strategy: WriteStrategy,
+}
+
+impl<B: Buf> WriteBuf<B> {
+ fn new(strategy: WriteStrategy) -> WriteBuf<B> {
+ WriteBuf {
+ headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
+ max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
+ queue: BufList::new(),
+ strategy,
+ }
+ }
+}
+
+impl<B> WriteBuf<B>
+where
+ B: Buf,
+{
+ fn set_strategy(&mut self, strategy: WriteStrategy) {
+ self.strategy = strategy;
+ }
+
+ pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) {
+ debug_assert!(buf.has_remaining());
+ match self.strategy {
+ WriteStrategy::Flatten => {
+ let head = self.headers_mut();
+
+ head.maybe_unshift(buf.remaining());
+ trace!(
+ self.len = head.remaining(),
+ buf.len = buf.remaining(),
+ "buffer.flatten"
+ );
+ //perf: This is a little faster than <Vec as BufMut>>::put,
+ //but accomplishes the same result.
+ loop {
+ let adv = {
+ let slice = buf.chunk();
+ if slice.is_empty() {
+ return;
+ }
+ head.bytes.extend_from_slice(slice);
+ slice.len()
+ };
+ buf.advance(adv);
+ }
+ }
+ WriteStrategy::Queue => {
+ trace!(
+ self.len = self.remaining(),
+ buf.len = buf.remaining(),
+ "buffer.queue"
+ );
+ self.queue.push(buf.into());
+ }
+ }
+ }
+
+ fn can_buffer(&self) -> bool {
+ match self.strategy {
+ WriteStrategy::Flatten => self.remaining() < self.max_buf_size,
+ WriteStrategy::Queue => {
+ self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size
+ }
+ }
+ }
+
+ fn headers_mut(&mut self) -> &mut Cursor<Vec<u8>> {
+ debug_assert!(!self.queue.has_remaining());
+ &mut self.headers
+ }
+}
+
+impl<B: Buf> fmt::Debug for WriteBuf<B> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("WriteBuf")
+ .field("remaining", &self.remaining())
+ .field("strategy", &self.strategy)
+ .finish()
+ }
+}
+
+impl<B: Buf> Buf for WriteBuf<B> {
+ #[inline]
+ fn remaining(&self) -> usize {
+ self.headers.remaining() + self.queue.remaining()
+ }
+
+ #[inline]
+ fn chunk(&self) -> &[u8] {
+ let headers = self.headers.chunk();
+ if !headers.is_empty() {
+ headers
+ } else {
+ self.queue.chunk()
+ }
+ }
+
+ #[inline]
+ fn advance(&mut self, cnt: usize) {
+ let hrem = self.headers.remaining();
+
+ match hrem.cmp(&cnt) {
+ cmp::Ordering::Equal => self.headers.reset(),
+ cmp::Ordering::Greater => self.headers.advance(cnt),
+ cmp::Ordering::Less => {
+ let qcnt = cnt - hrem;
+ self.headers.reset();
+ self.queue.advance(qcnt);
+ }
+ }
+ }
+
+ #[inline]
+ fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
+ let n = self.headers.chunks_vectored(dst);
+ self.queue.chunks_vectored(&mut dst[n..]) + n
+ }
+}
+
+#[derive(Debug)]
+enum WriteStrategy {
+ Flatten,
+ Queue,
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::time::Duration;
+
+ use tokio_test::io::Builder as Mock;
+
+ // #[cfg(feature = "nightly")]
+ // use test::Bencher;
+
+ /*
+ impl<T: Read> MemRead for AsyncIo<T> {
+ fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {
+ let mut v = vec![0; len];
+ let n = try_nb!(self.read(v.as_mut_slice()));
+ Ok(Async::Ready(BytesMut::from(&v[..n]).freeze()))
+ }
+ }
+ */
+
+ #[tokio::test]
+ #[ignore]
+ async fn iobuf_write_empty_slice() {
+ // TODO(eliza): can i have writev back pls T_T
+ // // First, let's just check that the Mock would normally return an
+ // // error on an unexpected write, even if the buffer is empty...
+ // let mut mock = Mock::new().build();
+ // futures_util::future::poll_fn(|cx| {
+ // Pin::new(&mut mock).poll_write_buf(cx, &mut Cursor::new(&[]))
+ // })
+ // .await
+ // .expect_err("should be a broken pipe");
+
+ // // underlying io will return the logic error upon write,
+ // // so we are testing that the io_buf does not trigger a write
+ // // when there is nothing to flush
+ // let mock = Mock::new().build();
+ // let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
+ // io_buf.flush().await.expect("should short-circuit flush");
+ }
+
+ #[tokio::test]
+ async fn parse_reads_until_blocked() {
+ use crate::proto::h1::ClientTransaction;
+
+ let _ = pretty_env_logger::try_init();
+ let mock = Mock::new()
+ // Split over multiple reads will read all of it
+ .read(b"HTTP/1.1 200 OK\r\n")
+ .read(b"Server: hyper\r\n")
+ // missing last line ending
+ .wait(Duration::from_secs(1))
+ .build();
+
+ let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
+
+ // We expect a `parse` to be not ready, and so can't await it directly.
+ // Rather, this `poll_fn` will wrap the `Poll` result.
+ futures_util::future::poll_fn(|cx| {
+ let parse_ctx = ParseContext {
+ cached_headers: &mut None,
+ req_method: &mut None,
+ h1_parser_config: Default::default(),
+ #[cfg(feature = "runtime")]
+ h1_header_read_timeout: None,
+ #[cfg(feature = "runtime")]
+ h1_header_read_timeout_fut: &mut None,
+ #[cfg(feature = "runtime")]
+ h1_header_read_timeout_running: &mut false,
+ preserve_header_case: false,
+ #[cfg(feature = "ffi")]
+ preserve_header_order: false,
+ h09_responses: false,
+ #[cfg(feature = "ffi")]
+ on_informational: &mut None,
+ #[cfg(feature = "ffi")]
+ raw_headers: false,
+ };
+ assert!(buffered
+ .parse::<ClientTransaction>(cx, parse_ctx)
+ .is_pending());
+ Poll::Ready(())
+ })
+ .await;
+
+ assert_eq!(
+ buffered.read_buf,
+ b"HTTP/1.1 200 OK\r\nServer: hyper\r\n"[..]
+ );
+ }
+
+ #[test]
+ fn read_strategy_adaptive_increments() {
+ let mut strategy = ReadStrategy::default();
+ assert_eq!(strategy.next(), 8192);
+
+ // Grows if record == next
+ strategy.record(8192);
+ assert_eq!(strategy.next(), 16384);
+
+ strategy.record(16384);
+ assert_eq!(strategy.next(), 32768);
+
+ // Enormous records still increment at same rate
+ strategy.record(::std::usize::MAX);
+ assert_eq!(strategy.next(), 65536);
+
+ let max = strategy.max();
+ while strategy.next() < max {
+ strategy.record(max);
+ }
+
+ assert_eq!(strategy.next(), max, "never goes over max");
+ strategy.record(max + 1);
+ assert_eq!(strategy.next(), max, "never goes over max");
+ }
+
+ #[test]
+ fn read_strategy_adaptive_decrements() {
+ let mut strategy = ReadStrategy::default();
+ strategy.record(8192);
+ assert_eq!(strategy.next(), 16384);
+
+ strategy.record(1);
+ assert_eq!(
+ strategy.next(),
+ 16384,
+ "first smaller record doesn't decrement yet"
+ );
+ strategy.record(8192);
+ assert_eq!(strategy.next(), 16384, "record was with range");
+
+ strategy.record(1);
+ assert_eq!(
+ strategy.next(),
+ 16384,
+ "in-range record should make this the 'first' again"
+ );
+
+ strategy.record(1);
+ assert_eq!(strategy.next(), 8192, "second smaller record decrements");
+
+ strategy.record(1);
+ assert_eq!(strategy.next(), 8192, "first doesn't decrement");
+ strategy.record(1);
+ assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum");
+ }
+
+ #[test]
+ fn read_strategy_adaptive_stays_the_same() {
+ let mut strategy = ReadStrategy::default();
+ strategy.record(8192);
+ assert_eq!(strategy.next(), 16384);
+
+ strategy.record(8193);
+ assert_eq!(
+ strategy.next(),
+ 16384,
+ "first smaller record doesn't decrement yet"
+ );
+
+ strategy.record(8193);
+ assert_eq!(
+ strategy.next(),
+ 16384,
+ "with current step does not decrement"
+ );
+ }
+
+ #[test]
+ fn read_strategy_adaptive_max_fuzz() {
+ fn fuzz(max: usize) {
+ let mut strategy = ReadStrategy::with_max(max);
+ while strategy.next() < max {
+ strategy.record(::std::usize::MAX);
+ }
+ let mut next = strategy.next();
+ while next > 8192 {
+ strategy.record(1);
+ strategy.record(1);
+ next = strategy.next();
+ assert!(
+ next.is_power_of_two(),
+ "decrement should be powers of two: {} (max = {})",
+ next,
+ max,
+ );
+ }
+ }
+
+ let mut max = 8192;
+ while max < std::usize::MAX {
+ fuzz(max);
+ max = (max / 2).saturating_mul(3);
+ }
+ fuzz(::std::usize::MAX);
+ }
+
+ #[test]
+ #[should_panic]
+ #[cfg(debug_assertions)] // needs to trigger a debug_assert
+ fn write_buf_requires_non_empty_bufs() {
+ let mock = Mock::new().build();
+ let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
+
+ buffered.buffer(Cursor::new(Vec::new()));
+ }
+
+ /*
+ TODO: needs tokio_test::io to allow configure write_buf calls
+ #[test]
+ fn write_buf_queue() {
+ let _ = pretty_env_logger::try_init();
+
+ let mock = AsyncIo::new_buf(vec![], 1024);
+ let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
+
+
+ buffered.headers_buf().extend(b"hello ");
+ buffered.buffer(Cursor::new(b"world, ".to_vec()));
+ buffered.buffer(Cursor::new(b"it's ".to_vec()));
+ buffered.buffer(Cursor::new(b"hyper!".to_vec()));
+ assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
+ buffered.flush().unwrap();
+
+ assert_eq!(buffered.io, b"hello world, it's hyper!");
+ assert_eq!(buffered.io.num_writes(), 1);
+ assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
+ }
+ */
+
+ #[tokio::test]
+ async fn write_buf_flatten() {
+ let _ = pretty_env_logger::try_init();
+
+ let mock = Mock::new().write(b"hello world, it's hyper!").build();
+
+ let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
+ buffered.write_buf.set_strategy(WriteStrategy::Flatten);
+
+ buffered.headers_buf().extend(b"hello ");
+ buffered.buffer(Cursor::new(b"world, ".to_vec()));
+ buffered.buffer(Cursor::new(b"it's ".to_vec()));
+ buffered.buffer(Cursor::new(b"hyper!".to_vec()));
+ assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
+
+ buffered.flush().await.expect("flush");
+ }
+
+ #[test]
+ fn write_buf_flatten_partially_flushed() {
+ let _ = pretty_env_logger::try_init();
+
+ let b = |s: &str| Cursor::new(s.as_bytes().to_vec());
+
+ let mut write_buf = WriteBuf::<Cursor<Vec<u8>>>::new(WriteStrategy::Flatten);
+
+ write_buf.buffer(b("hello "));
+ write_buf.buffer(b("world, "));
+
+ assert_eq!(write_buf.chunk(), b"hello world, ");
+
+ // advance most of the way, but not all
+ write_buf.advance(11);
+
+ assert_eq!(write_buf.chunk(), b", ");
+ assert_eq!(write_buf.headers.pos, 11);
+ assert_eq!(write_buf.headers.bytes.capacity(), INIT_BUFFER_SIZE);
+
+ // there's still room in the headers buffer, so just push on the end
+ write_buf.buffer(b("it's hyper!"));
+
+ assert_eq!(write_buf.chunk(), b", it's hyper!");
+ assert_eq!(write_buf.headers.pos, 11);
+
+ let rem1 = write_buf.remaining();
+ let cap = write_buf.headers.bytes.capacity();
+
+ // but when this would go over capacity, don't copy the old bytes
+ write_buf.buffer(Cursor::new(vec![b'X'; cap]));
+ assert_eq!(write_buf.remaining(), cap + rem1);
+ assert_eq!(write_buf.headers.pos, 0);
+ }
+
+ #[tokio::test]
+ async fn write_buf_queue_disable_auto() {
+ let _ = pretty_env_logger::try_init();
+
+ let mock = Mock::new()
+ .write(b"hello ")
+ .write(b"world, ")
+ .write(b"it's ")
+ .write(b"hyper!")
+ .build();
+
+ let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
+ buffered.write_buf.set_strategy(WriteStrategy::Queue);
+
+ // we have 4 buffers, and vec IO disabled, but explicitly said
+ // don't try to auto detect (via setting strategy above)
+
+ buffered.headers_buf().extend(b"hello ");
+ buffered.buffer(Cursor::new(b"world, ".to_vec()));
+ buffered.buffer(Cursor::new(b"it's ".to_vec()));
+ buffered.buffer(Cursor::new(b"hyper!".to_vec()));
+ assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
+
+ buffered.flush().await.expect("flush");
+
+ assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
+ }
+
+ // #[cfg(feature = "nightly")]
+ // #[bench]
+ // fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) {
+ // let s = "Hello, World!";
+ // b.bytes = s.len() as u64;
+
+ // let mut write_buf = WriteBuf::<bytes::Bytes>::new();
+ // write_buf.set_strategy(WriteStrategy::Flatten);
+ // b.iter(|| {
+ // let chunk = bytes::Bytes::from(s);
+ // write_buf.buffer(chunk);
+ // ::test::black_box(&write_buf);
+ // write_buf.headers.bytes.clear();
+ // })
+ // }
+}