summaryrefslogtreecommitdiffstats
path: root/vendor/gix-packetline-blocking/src/read
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 18:31:44 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 18:31:44 +0000
commitc23a457e72abe608715ac76f076f47dc42af07a5 (patch)
tree2772049aaf84b5c9d0ed12ec8d86812f7a7904b6 /vendor/gix-packetline-blocking/src/read
parentReleasing progress-linux version 1.73.0+dfsg1-1~progress7.99u1. (diff)
downloadrustc-c23a457e72abe608715ac76f076f47dc42af07a5.tar.xz
rustc-c23a457e72abe608715ac76f076f47dc42af07a5.zip
Merging upstream version 1.74.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gix-packetline-blocking/src/read')
-rw-r--r--vendor/gix-packetline-blocking/src/read/async_io.rs178
-rw-r--r--vendor/gix-packetline-blocking/src/read/blocking_io.rs168
-rw-r--r--vendor/gix-packetline-blocking/src/read/mod.rs126
-rw-r--r--vendor/gix-packetline-blocking/src/read/sidebands/async_io.rs383
-rw-r--r--vendor/gix-packetline-blocking/src/read/sidebands/blocking_io.rs218
-rw-r--r--vendor/gix-packetline-blocking/src/read/sidebands/mod.rs9
6 files changed, 1082 insertions, 0 deletions
diff --git a/vendor/gix-packetline-blocking/src/read/async_io.rs b/vendor/gix-packetline-blocking/src/read/async_io.rs
new file mode 100644
index 000000000..402c2434b
--- /dev/null
+++ b/vendor/gix-packetline-blocking/src/read/async_io.rs
@@ -0,0 +1,178 @@
+use std::io;
+
+use bstr::ByteSlice;
+use futures_io::AsyncRead;
+use futures_lite::AsyncReadExt;
+
+use crate::{
+ decode,
+ read::{ExhaustiveOutcome, ProgressAction, WithSidebands},
+ PacketLineRef, StreamingPeekableIter, MAX_LINE_LEN, U16_HEX_BYTES,
+};
+
+/// Non-IO methods
+impl<T> StreamingPeekableIter<T>
+where
+ T: AsyncRead + Unpin,
+{
+ #[allow(clippy::needless_lifetimes)] // TODO: remove once this is clippy false positive is fixed
+ async fn read_line_inner<'a>(
+ reader: &mut T,
+ buf: &'a mut [u8],
+ ) -> io::Result<Result<PacketLineRef<'a>, decode::Error>> {
+ let (hex_bytes, data_bytes) = buf.split_at_mut(4);
+ reader.read_exact(hex_bytes).await?;
+ let num_data_bytes = match decode::hex_prefix(hex_bytes) {
+ Ok(decode::PacketLineOrWantedSize::Line(line)) => return Ok(Ok(line)),
+ Ok(decode::PacketLineOrWantedSize::Wanted(additional_bytes)) => additional_bytes as usize,
+ Err(err) => return Ok(Err(err)),
+ };
+
+ let (data_bytes, _) = data_bytes.split_at_mut(num_data_bytes);
+ reader.read_exact(data_bytes).await?;
+ match decode::to_data_line(data_bytes) {
+ Ok(line) => Ok(Ok(line)),
+ Err(err) => Ok(Err(err)),
+ }
+ }
+
+ /// This function is needed to help the borrow checker allow us to return references all the time
+ /// It contains a bunch of logic shared between peek and `read_line` invocations.
+ async fn read_line_inner_exhaustive<'a>(
+ reader: &mut T,
+ buf: &'a mut Vec<u8>,
+ delimiters: &[PacketLineRef<'static>],
+ fail_on_err_lines: bool,
+ buf_resize: bool,
+ ) -> ExhaustiveOutcome<'a> {
+ (
+ false,
+ None,
+ Some(match Self::read_line_inner(reader, buf).await {
+ Ok(Ok(line)) => {
+ if delimiters.contains(&line) {
+ let stopped_at = delimiters.iter().find(|l| **l == line).copied();
+ buf.clear();
+ return (true, stopped_at, None);
+ } else if fail_on_err_lines {
+ if let Some(err) = line.check_error() {
+ let err = err.0.as_bstr().to_owned();
+ buf.clear();
+ return (
+ true,
+ None,
+ Some(Err(io::Error::new(
+ io::ErrorKind::Other,
+ crate::read::Error { message: err },
+ ))),
+ );
+ }
+ }
+ let len = line.as_slice().map_or(U16_HEX_BYTES, |s| s.len() + U16_HEX_BYTES);
+ if buf_resize {
+ buf.resize(len, 0);
+ }
+ Ok(Ok(crate::decode(buf).expect("only valid data here")))
+ }
+ Ok(Err(err)) => {
+ buf.clear();
+ Ok(Err(err))
+ }
+ Err(err) => {
+ buf.clear();
+ Err(err)
+ }
+ }),
+ )
+ }
+
+ /// Read a packet line into the internal buffer and return it.
+ ///
+ /// Returns `None` if the end of iteration is reached because of one of the following:
+ ///
+ /// * natural EOF
+ /// * ERR packet line encountered if [`fail_on_err_lines()`][StreamingPeekableIter::fail_on_err_lines()] is true.
+ /// * A `delimiter` packet line encountered
+ pub async fn read_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, decode::Error>>> {
+ if self.is_done {
+ return None;
+ }
+ if !self.peek_buf.is_empty() {
+ std::mem::swap(&mut self.peek_buf, &mut self.buf);
+ self.peek_buf.clear();
+ Some(Ok(Ok(crate::decode(&self.buf).expect("only valid data in peek buf"))))
+ } else {
+ if self.buf.len() != MAX_LINE_LEN {
+ self.buf.resize(MAX_LINE_LEN, 0);
+ }
+ let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive(
+ &mut self.read,
+ &mut self.buf,
+ self.delimiters,
+ self.fail_on_err_lines,
+ false,
+ )
+ .await;
+ self.is_done = is_done;
+ self.stopped_at = stopped_at;
+ res
+ }
+ }
+
+ /// Peek the next packet line without consuming it.
+ ///
+ /// Multiple calls to peek will return the same packet line, if there is one.
+ pub async fn peek_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, decode::Error>>> {
+ if self.is_done {
+ return None;
+ }
+ if self.peek_buf.is_empty() {
+ self.peek_buf.resize(MAX_LINE_LEN, 0);
+ let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive(
+ &mut self.read,
+ &mut self.peek_buf,
+ self.delimiters,
+ self.fail_on_err_lines,
+ true,
+ )
+ .await;
+ self.is_done = is_done;
+ self.stopped_at = stopped_at;
+ res
+ } else {
+ Some(Ok(Ok(crate::decode(&self.peek_buf).expect("only valid data here"))))
+ }
+ }
+
+ /// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support.
+ ///
+ /// Due to the preconfigured function type this method can be called without 'turbofish'.
+ #[allow(clippy::type_complexity)]
+ pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8]) -> ProgressAction> {
+ WithSidebands::new(self)
+ }
+
+ /// Return this instance as implementor of [`Read`][io::Read] assuming side bands to be used in all received packet lines.
+ /// Each invocation of [`read_line()`][io::BufRead::read_line()] returns a packet line.
+ ///
+ /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool`
+ /// being true in case the `text` is to be interpreted as error.
+ ///
+ /// _Please note_ that side bands need to be negotiated with the server.
+ pub fn as_read_with_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction + Unpin>(
+ &mut self,
+ handle_progress: F,
+ ) -> WithSidebands<'_, T, F> {
+ WithSidebands::with_progress_handler(self, handle_progress)
+ }
+
+ /// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support.
+ ///
+ /// The type parameter `F` needs to be configured for this method to be callable using the 'turbofish' operator.
+ /// Use [`as_read()`][StreamingPeekableIter::as_read()].
+ pub fn as_read_without_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction + Unpin>(
+ &mut self,
+ ) -> WithSidebands<'_, T, F> {
+ WithSidebands::without_progress_handler(self)
+ }
+}
diff --git a/vendor/gix-packetline-blocking/src/read/blocking_io.rs b/vendor/gix-packetline-blocking/src/read/blocking_io.rs
new file mode 100644
index 000000000..50c634c4c
--- /dev/null
+++ b/vendor/gix-packetline-blocking/src/read/blocking_io.rs
@@ -0,0 +1,168 @@
+use std::io;
+
+use bstr::ByteSlice;
+
+use crate::{
+ decode,
+ read::{ExhaustiveOutcome, ProgressAction, WithSidebands},
+ PacketLineRef, StreamingPeekableIter, MAX_LINE_LEN, U16_HEX_BYTES,
+};
+
+/// Non-IO methods
+impl<T> StreamingPeekableIter<T>
+where
+ T: io::Read,
+{
+ fn read_line_inner<'a>(reader: &mut T, buf: &'a mut [u8]) -> io::Result<Result<PacketLineRef<'a>, decode::Error>> {
+ let (hex_bytes, data_bytes) = buf.split_at_mut(4);
+ reader.read_exact(hex_bytes)?;
+ let num_data_bytes = match decode::hex_prefix(hex_bytes) {
+ Ok(decode::PacketLineOrWantedSize::Line(line)) => return Ok(Ok(line)),
+ Ok(decode::PacketLineOrWantedSize::Wanted(additional_bytes)) => additional_bytes as usize,
+ Err(err) => return Ok(Err(err)),
+ };
+
+ let (data_bytes, _) = data_bytes.split_at_mut(num_data_bytes);
+ reader.read_exact(data_bytes)?;
+ match decode::to_data_line(data_bytes) {
+ Ok(line) => Ok(Ok(line)),
+ Err(err) => Ok(Err(err)),
+ }
+ }
+
+ /// This function is needed to help the borrow checker allow us to return references all the time
+ /// It contains a bunch of logic shared between peek and `read_line` invocations.
+ fn read_line_inner_exhaustive<'a>(
+ reader: &mut T,
+ buf: &'a mut Vec<u8>,
+ delimiters: &[PacketLineRef<'static>],
+ fail_on_err_lines: bool,
+ buf_resize: bool,
+ ) -> ExhaustiveOutcome<'a> {
+ (
+ false,
+ None,
+ Some(match Self::read_line_inner(reader, buf) {
+ Ok(Ok(line)) => {
+ if delimiters.contains(&line) {
+ let stopped_at = delimiters.iter().find(|l| **l == line).copied();
+ buf.clear();
+ return (true, stopped_at, None);
+ } else if fail_on_err_lines {
+ if let Some(err) = line.check_error() {
+ let err = err.0.as_bstr().to_owned();
+ buf.clear();
+ return (
+ true,
+ None,
+ Some(Err(io::Error::new(
+ io::ErrorKind::Other,
+ crate::read::Error { message: err },
+ ))),
+ );
+ }
+ }
+ let len = line.as_slice().map_or(U16_HEX_BYTES, |s| s.len() + U16_HEX_BYTES);
+ if buf_resize {
+ buf.resize(len, 0);
+ }
+ Ok(Ok(crate::decode(buf).expect("only valid data here")))
+ }
+ Ok(Err(err)) => {
+ buf.clear();
+ Ok(Err(err))
+ }
+ Err(err) => {
+ buf.clear();
+ Err(err)
+ }
+ }),
+ )
+ }
+
+ /// Read a packet line into the internal buffer and return it.
+ ///
+ /// Returns `None` if the end of iteration is reached because of one of the following:
+ ///
+ /// * natural EOF
+ /// * ERR packet line encountered if [`fail_on_err_lines()`][StreamingPeekableIter::fail_on_err_lines()] is true.
+ /// * A `delimiter` packet line encountered
+ pub fn read_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, decode::Error>>> {
+ if self.is_done {
+ return None;
+ }
+ if !self.peek_buf.is_empty() {
+ std::mem::swap(&mut self.peek_buf, &mut self.buf);
+ self.peek_buf.clear();
+ Some(Ok(Ok(crate::decode(&self.buf).expect("only valid data in peek buf"))))
+ } else {
+ if self.buf.len() != MAX_LINE_LEN {
+ self.buf.resize(MAX_LINE_LEN, 0);
+ }
+ let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive(
+ &mut self.read,
+ &mut self.buf,
+ self.delimiters,
+ self.fail_on_err_lines,
+ false,
+ );
+ self.is_done = is_done;
+ self.stopped_at = stopped_at;
+ res
+ }
+ }
+
+ /// Peek the next packet line without consuming it.
+ ///
+ /// Multiple calls to peek will return the same packet line, if there is one.
+ pub fn peek_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, decode::Error>>> {
+ if self.is_done {
+ return None;
+ }
+ if self.peek_buf.is_empty() {
+ self.peek_buf.resize(MAX_LINE_LEN, 0);
+ let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive(
+ &mut self.read,
+ &mut self.peek_buf,
+ self.delimiters,
+ self.fail_on_err_lines,
+ true,
+ );
+ self.is_done = is_done;
+ self.stopped_at = stopped_at;
+ res
+ } else {
+ Some(Ok(Ok(crate::decode(&self.peek_buf).expect("only valid data here"))))
+ }
+ }
+
+ /// Return this instance as implementor of [`Read`][io::Read] assuming side bands to be used in all received packet lines.
+ /// Each invocation of [`read_line()`][io::BufRead::read_line()] returns a packet line.
+ ///
+ /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool`
+ /// being true in case the `text` is to be interpreted as error.
+ ///
+ /// _Please note_ that side bands need to be negotiated with the server.
+ pub fn as_read_with_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction>(
+ &mut self,
+ handle_progress: F,
+ ) -> WithSidebands<'_, T, F> {
+ WithSidebands::with_progress_handler(self, handle_progress)
+ }
+
+ /// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support.
+ ///
+ /// The type parameter `F` needs to be configured for this method to be callable using the 'turbofish' operator.
+ /// Use [`as_read()`][StreamingPeekableIter::as_read()].
+ pub fn as_read_without_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction>(&mut self) -> WithSidebands<'_, T, F> {
+ WithSidebands::without_progress_handler(self)
+ }
+
+ /// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support.
+ ///
+ /// Due to the preconfigured function type this method can be called without 'turbofish'.
+ #[allow(clippy::type_complexity)]
+ pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8]) -> ProgressAction> {
+ WithSidebands::new(self)
+ }
+}
diff --git a/vendor/gix-packetline-blocking/src/read/mod.rs b/vendor/gix-packetline-blocking/src/read/mod.rs
new file mode 100644
index 000000000..0fd30c892
--- /dev/null
+++ b/vendor/gix-packetline-blocking/src/read/mod.rs
@@ -0,0 +1,126 @@
+#[cfg(any(feature = "blocking-io", feature = "async-io"))]
+use crate::MAX_LINE_LEN;
+use crate::{PacketLineRef, StreamingPeekableIter, U16_HEX_BYTES};
+
+/// Allow the read-progress handler to determine how to continue.
+#[derive(Copy, Clone, Debug, Eq, PartialEq)]
+pub enum ProgressAction {
+ /// Continue reading the next progress if available.
+ Continue,
+ /// Abort all IO even if more would be available, claiming the operation was interrupted.
+ Interrupt,
+}
+
+#[cfg(any(feature = "blocking-io", feature = "async-io"))]
+type ExhaustiveOutcome<'a> = (
+ bool, // is_done
+ Option<PacketLineRef<'static>>, // stopped_at
+ Option<std::io::Result<Result<PacketLineRef<'a>, crate::decode::Error>>>, // actual method result
+);
+
+mod error {
+ use std::fmt::{Debug, Display, Formatter};
+
+ use bstr::BString;
+
+ /// The error representing an ERR packet line, as possibly wrapped into an `std::io::Error` in
+ /// [`read_line(…)`][super::StreamingPeekableIter::read_line()].
+ #[derive(Debug)]
+ pub struct Error {
+ /// The contents of the ERR line, with `ERR` portion stripped.
+ pub message: BString,
+ }
+
+ impl Display for Error {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ Display::fmt(&self.message, f)
+ }
+ }
+
+ impl std::error::Error for Error {}
+}
+pub use error::Error;
+
+impl<T> StreamingPeekableIter<T> {
+ /// Return a new instance from `read` which will stop decoding packet lines when receiving one of the given `delimiters`.
+ pub fn new(read: T, delimiters: &'static [PacketLineRef<'static>]) -> Self {
+ StreamingPeekableIter {
+ read,
+ #[cfg(any(feature = "blocking-io", feature = "async-io"))]
+ buf: vec![0; MAX_LINE_LEN],
+ peek_buf: Vec::new(),
+ delimiters,
+ fail_on_err_lines: false,
+ is_done: false,
+ stopped_at: None,
+ }
+ }
+
+ /// Modify the peek buffer, overwriting the byte at `position` with the given byte to `replace_with` while truncating
+ /// it to contain only bytes until the newly replaced `position`.
+ ///
+ /// This is useful if you would want to remove 'special bytes' hidden behind, say a NULL byte to disappear and allow
+ /// standard line readers to read the next line as usual.
+ ///
+ /// **Note** that `position` does not include the 4 bytes prefix (they are invisible outside the reader)
+ pub fn peek_buffer_replace_and_truncate(&mut self, position: usize, replace_with: u8) {
+ let position = position + U16_HEX_BYTES;
+ self.peek_buf[position] = replace_with;
+
+ let new_len = position + 1;
+ self.peek_buf.truncate(new_len);
+ self.peek_buf[..4].copy_from_slice(&crate::encode::u16_to_hex((new_len) as u16));
+ }
+
+ /// Returns the packet line that stopped the iteration, or
+ /// `None` if the end wasn't reached yet, on EOF, or if [`fail_on_err_lines()`][StreamingPeekableIter::fail_on_err_lines()] was true.
+ pub fn stopped_at(&self) -> Option<PacketLineRef<'static>> {
+ self.stopped_at
+ }
+
+ /// Reset all iteration state allowing to continue a stopped iteration that is not yet at EOF.
+ ///
+ /// This can happen once a delimiter is reached.
+ pub fn reset(&mut self) {
+ let delimiters = std::mem::take(&mut self.delimiters);
+ self.reset_with(delimiters);
+ }
+
+ /// Similar to [`reset()`][StreamingPeekableIter::reset()] with support to changing the `delimiters`.
+ pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) {
+ self.delimiters = delimiters;
+ self.is_done = false;
+ self.stopped_at = None;
+ }
+
+ /// If `value` is `true` the provider will check for special `ERR` packet lines and stop iteration when one is encountered.
+ ///
+ /// Use [`stopped_at()]`[`StreamingPeekableIter::stopped_at()`] to inspect the cause of the end of the iteration.
+ /// ne
+ pub fn fail_on_err_lines(&mut self, value: bool) {
+ self.fail_on_err_lines = value;
+ }
+
+ /// Replace the reader used with the given `read`, resetting all other iteration state as well.
+ pub fn replace(&mut self, read: T) -> T {
+ let prev = std::mem::replace(&mut self.read, read);
+ self.reset();
+ self.fail_on_err_lines = false;
+ prev
+ }
+
+ /// Return the inner read
+ pub fn into_inner(self) -> T {
+ self.read
+ }
+}
+
+#[cfg(feature = "blocking-io")]
+mod blocking_io;
+
+#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))]
+mod async_io;
+
+mod sidebands;
+#[cfg(any(feature = "blocking-io", feature = "async-io"))]
+pub use sidebands::WithSidebands;
diff --git a/vendor/gix-packetline-blocking/src/read/sidebands/async_io.rs b/vendor/gix-packetline-blocking/src/read/sidebands/async_io.rs
new file mode 100644
index 000000000..37f93bca9
--- /dev/null
+++ b/vendor/gix-packetline-blocking/src/read/sidebands/async_io.rs
@@ -0,0 +1,383 @@
+use std::{
+ future::Future,
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+use futures_io::{AsyncBufRead, AsyncRead};
+use futures_lite::ready;
+
+use crate::{decode, read::ProgressAction, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES};
+
+type ReadLineResult<'a> = Option<std::io::Result<Result<PacketLineRef<'a>, decode::Error>>>;
+/// An implementor of [`AsyncBufRead`] yielding packet lines on each call to [`read_line()`][AsyncBufRead::read_line()].
+/// It's also possible to hide the underlying packet lines using the [`Read`][AsyncRead] implementation which is useful
+/// if they represent binary data, like the one of a pack file.
+pub struct WithSidebands<'a, T, F>
+where
+ T: AsyncRead,
+{
+ state: State<'a, T>,
+ handle_progress: Option<F>,
+ pos: usize,
+ cap: usize,
+}
+
+impl<'a, T, F> Drop for WithSidebands<'a, T, F>
+where
+ T: AsyncRead,
+{
+ fn drop(&mut self) {
+ if let State::Idle { ref mut parent } = self.state {
+ parent
+ .as_mut()
+ .expect("parent is always available if we are idle")
+ .reset();
+ }
+ }
+}
+
+impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction>
+where
+ T: AsyncRead,
+{
+ /// Create a new instance with the given provider as `parent`.
+ pub fn new(parent: &'a mut StreamingPeekableIter<T>) -> Self {
+ WithSidebands {
+ state: State::Idle { parent: Some(parent) },
+ handle_progress: None,
+ pos: 0,
+ cap: 0,
+ }
+ }
+}
+
+enum State<'a, T> {
+ Idle {
+ parent: Option<&'a mut StreamingPeekableIter<T>>,
+ },
+ ReadLine {
+ read_line: Pin<Box<dyn Future<Output = ReadLineResult<'a>> + 'a>>,
+ parent_inactive: Option<*mut StreamingPeekableIter<T>>,
+ },
+}
+
+/// # SAFETY
+/// It's safe because T is `Send` and we have a test that assures that our `StreamingPeekableIter` is `Send` as well,
+/// hence the `*mut _` is `Send`.
+/// `read_line` isn't send and we can't declare it as such as it forces `Send` in all places (BUT WHY IS THAT A PROBLEM, I don't recall).
+/// However, it's only used when pinned and thus isn't actually sent anywhere, it's a secondary state of the future used after it was Send
+/// to a thread possibly.
+// TODO: Is it possible to declare it as it should be?
+#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
+unsafe impl<'a, T> Send for State<'a, T> where T: Send {}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ fn receiver<T: Send>(_i: T) {}
+
+ /// We want to declare items containing pointers of `StreamingPeekableIter` `Send` as well, so it must be `Send` itself.
+ #[test]
+ fn streaming_peekable_iter_is_send() {
+ receiver(StreamingPeekableIter::new(Vec::<u8>::new(), &[]));
+ }
+
+ #[test]
+ fn state_is_send() {
+ let mut s = StreamingPeekableIter::new(Vec::<u8>::new(), &[]);
+ receiver(State::Idle { parent: Some(&mut s) });
+ }
+}
+
+impl<'a, T, F> WithSidebands<'a, T, F>
+where
+ T: AsyncRead + Unpin,
+ F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
+{
+ /// Create a new instance with the given `parent` provider and the `handle_progress` function.
+ ///
+ /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool`
+ /// being true in case the `text` is to be interpreted as error.
+ pub fn with_progress_handler(parent: &'a mut StreamingPeekableIter<T>, handle_progress: F) -> Self {
+ WithSidebands {
+ state: State::Idle { parent: Some(parent) },
+ handle_progress: Some(handle_progress),
+ pos: 0,
+ cap: 0,
+ }
+ }
+
+ /// Create a new instance without a progress handler.
+ pub fn without_progress_handler(parent: &'a mut StreamingPeekableIter<T>) -> Self {
+ WithSidebands {
+ state: State::Idle { parent: Some(parent) },
+ handle_progress: None,
+ pos: 0,
+ cap: 0,
+ }
+ }
+
+ /// Forwards to the parent [`StreamingPeekableIter::reset_with()`]
+ pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) {
+ if let State::Idle { ref mut parent } = self.state {
+ parent
+ .as_mut()
+ .expect("parent is always available if we are idle")
+ .reset_with(delimiters)
+ }
+ }
+
+ /// Forwards to the parent [`StreamingPeekableIter::stopped_at()`]
+ pub fn stopped_at(&self) -> Option<PacketLineRef<'static>> {
+ match self.state {
+ State::Idle { ref parent } => {
+ parent
+ .as_ref()
+ .expect("parent is always available if we are idle")
+ .stopped_at
+ }
+ _ => None,
+ }
+ }
+
+ /// Set or unset the progress handler.
+ pub fn set_progress_handler(&mut self, handle_progress: Option<F>) {
+ self.handle_progress = handle_progress;
+ }
+
+ /// Effectively forwards to the parent [`StreamingPeekableIter::peek_line()`], allowing to see what would be returned
+ /// next on a call to [`read_line()`][io::BufRead::read_line()].
+ ///
+ /// # Warning
+ ///
+ /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it.
+ pub async fn peek_data_line(&mut self) -> Option<std::io::Result<Result<&[u8], decode::Error>>> {
+ match self.state {
+ State::Idle { ref mut parent } => match parent
+ .as_mut()
+ .expect("parent is always available if we are idle")
+ .peek_line()
+ .await
+ {
+ Some(Ok(Ok(PacketLineRef::Data(line)))) => Some(Ok(Ok(line))),
+ Some(Ok(Err(err))) => Some(Ok(Err(err))),
+ Some(Err(err)) => Some(Err(err)),
+ _ => None,
+ },
+ _ => None,
+ }
+ }
+
+ /// Read a packet line as string line.
+ pub fn read_line_to_string<'b>(&'b mut self, buf: &'b mut String) -> ReadLineFuture<'a, 'b, T, F> {
+ ReadLineFuture { parent: self, buf }
+ }
+
+ /// Read a packet line from the underlying packet reader, returning empty lines if a stop-packetline was reached.
+ ///
+ /// # Warning
+ ///
+ /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it.
+ pub async fn read_data_line(&mut self) -> Option<std::io::Result<Result<PacketLineRef<'_>, decode::Error>>> {
+ match &mut self.state {
+ State::Idle { parent: Some(parent) } => {
+ assert_eq!(
+ self.cap, 0,
+ "we don't support partial buffers right now - read-line must be used consistently"
+ );
+ parent.read_line().await
+ }
+ _ => None,
+ }
+ }
+}
+
+pub struct ReadDataLineFuture<'a, 'b, T: AsyncRead, F> {
+ parent: &'b mut WithSidebands<'a, T, F>,
+ buf: &'b mut Vec<u8>,
+}
+
+impl<'a, 'b, T, F> Future for ReadDataLineFuture<'a, 'b, T, F>
+where
+ T: AsyncRead + Unpin,
+ F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
+{
+ type Output = std::io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ assert_eq!(
+ self.parent.cap, 0,
+ "we don't support partial buffers right now - read-line must be used consistently"
+ );
+ let Self { buf, parent } = &mut *self;
+ let line = ready!(Pin::new(parent).poll_fill_buf(cx))?;
+ buf.clear();
+ buf.extend_from_slice(line);
+ let bytes = line.len();
+ self.parent.cap = 0;
+ Poll::Ready(Ok(bytes))
+ }
+}
+
+pub struct ReadLineFuture<'a, 'b, T: AsyncRead, F> {
+ parent: &'b mut WithSidebands<'a, T, F>,
+ buf: &'b mut String,
+}
+
+impl<'a, 'b, T, F> Future for ReadLineFuture<'a, 'b, T, F>
+where
+ T: AsyncRead + Unpin,
+ F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
+{
+ type Output = std::io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ assert_eq!(
+ self.parent.cap, 0,
+ "we don't support partial buffers right now - read-line must be used consistently"
+ );
+ let Self { buf, parent } = &mut *self;
+ let line = std::str::from_utf8(ready!(Pin::new(parent).poll_fill_buf(cx))?)
+ .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
+ buf.clear();
+ buf.push_str(line);
+ let bytes = line.len();
+ self.parent.cap = 0;
+ Poll::Ready(Ok(bytes))
+ }
+}
+
+impl<'a, T, F> AsyncBufRead for WithSidebands<'a, T, F>
+where
+ T: AsyncRead + Unpin,
+ F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
+{
+ fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
+ use std::io;
+
+ use futures_lite::FutureExt;
+ {
+ let this = self.as_mut().get_mut();
+ if this.pos >= this.cap {
+ let (ofs, cap) = loop {
+ match this.state {
+ State::Idle { ref mut parent } => {
+ let parent = parent.take().expect("parent to be present here");
+ let inactive = parent as *mut _;
+ this.state = State::ReadLine {
+ read_line: parent.read_line().boxed_local(),
+ parent_inactive: Some(inactive),
+ }
+ }
+ State::ReadLine {
+ ref mut read_line,
+ ref mut parent_inactive,
+ } => {
+ let line = ready!(read_line.poll(cx));
+
+ this.state = {
+ let parent = parent_inactive.take().expect("parent pointer always set");
+ // SAFETY: It's safe to recover the original mutable reference (from which
+ // the `read_line` future was created as the latter isn't accessible anymore
+ // once the state is set to Idle. In other words, either one or the other are
+ // accessible, never both at the same time.
+ // Also: We keep a pointer around which is protected by borrowcheck since it's created
+ // from a legal mutable reference which is moved into the read_line future - if it was manually
+ // implemented we would be able to re-obtain it from there.
+ #[allow(unsafe_code)]
+ let parent = unsafe { &mut *parent };
+ State::Idle { parent: Some(parent) }
+ };
+
+ let line = match line {
+ Some(line) => line?.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?,
+ None => break (0, 0),
+ };
+
+ match this.handle_progress.as_mut() {
+ Some(handle_progress) => {
+ let band = line
+ .decode_band()
+ .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
+ const ENCODED_BAND: usize = 1;
+ match band {
+ BandRef::Data(d) => {
+ if d.is_empty() {
+ continue;
+ }
+ break (U16_HEX_BYTES + ENCODED_BAND, d.len());
+ }
+ BandRef::Progress(d) => {
+ let text = TextRef::from(d).0;
+ match handle_progress(false, text) {
+ ProgressAction::Continue => {}
+ ProgressAction::Interrupt => {
+ return Poll::Ready(Err(io::Error::new(
+ std::io::ErrorKind::Other,
+ "interrupted by user",
+ )))
+ }
+ };
+ }
+ BandRef::Error(d) => {
+ let text = TextRef::from(d).0;
+ match handle_progress(true, text) {
+ ProgressAction::Continue => {}
+ ProgressAction::Interrupt => {
+ return Poll::Ready(Err(io::Error::new(
+ io::ErrorKind::Other,
+ "interrupted by user",
+ )))
+ }
+ };
+ }
+ };
+ }
+ None => {
+ break match line.as_slice() {
+ Some(d) => (U16_HEX_BYTES, d.len()),
+ None => {
+ return Poll::Ready(Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "encountered non-data line in a data-line only context",
+ )))
+ }
+ }
+ }
+ }
+ }
+ }
+ };
+ this.cap = cap + ofs;
+ this.pos = ofs;
+ }
+ }
+ let range = self.pos..self.cap;
+ match &self.get_mut().state {
+ State::Idle { parent } => Poll::Ready(Ok(&parent.as_ref().expect("parent always available").buf[range])),
+ State::ReadLine { .. } => unreachable!("at least in theory"),
+ }
+ }
+
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ let this = self.get_mut();
+ this.pos = std::cmp::min(this.pos + amt, this.cap);
+ }
+}
+
+impl<'a, T, F> AsyncRead for WithSidebands<'a, T, F>
+where
+ T: AsyncRead + Unpin,
+ F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
+{
+ fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
+ let nread = {
+ use std::io::Read;
+ let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
+ rem.read(buf)?
+ };
+ self.consume(nread);
+ Poll::Ready(Ok(nread))
+ }
+}
diff --git a/vendor/gix-packetline-blocking/src/read/sidebands/blocking_io.rs b/vendor/gix-packetline-blocking/src/read/sidebands/blocking_io.rs
new file mode 100644
index 000000000..f5c87aeb8
--- /dev/null
+++ b/vendor/gix-packetline-blocking/src/read/sidebands/blocking_io.rs
@@ -0,0 +1,218 @@
+use std::{io, io::BufRead};
+
+use crate::{read::ProgressAction, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES};
+
+/// An implementor of [`BufRead`][io::BufRead] yielding packet lines on each call to [`read_line()`][io::BufRead::read_line()].
+/// It's also possible to hide the underlying packet lines using the [`Read`][io::Read] implementation which is useful
+/// if they represent binary data, like the one of a pack file.
+pub struct WithSidebands<'a, T, F>
+where
+ T: io::Read,
+{
+ parent: &'a mut StreamingPeekableIter<T>,
+ handle_progress: Option<F>,
+ pos: usize,
+ cap: usize,
+}
+
+impl<'a, T, F> Drop for WithSidebands<'a, T, F>
+where
+ T: io::Read,
+{
+ fn drop(&mut self) {
+ self.parent.reset();
+ }
+}
+
+impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction>
+where
+ T: io::Read,
+{
+ /// Create a new instance with the given provider as `parent`.
+ pub fn new(parent: &'a mut StreamingPeekableIter<T>) -> Self {
+ WithSidebands {
+ parent,
+ handle_progress: None,
+ pos: 0,
+ cap: 0,
+ }
+ }
+}
+
+impl<'a, T, F> WithSidebands<'a, T, F>
+where
+ T: io::Read,
+ F: FnMut(bool, &[u8]) -> ProgressAction,
+{
+ /// Create a new instance with the given `parent` provider and the `handle_progress` function.
+ ///
+ /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool`
+ /// being true in case the `text` is to be interpreted as error.
+ pub fn with_progress_handler(parent: &'a mut StreamingPeekableIter<T>, handle_progress: F) -> Self {
+ WithSidebands {
+ parent,
+ handle_progress: Some(handle_progress),
+ pos: 0,
+ cap: 0,
+ }
+ }
+
+ /// Create a new instance without a progress handler.
+ pub fn without_progress_handler(parent: &'a mut StreamingPeekableIter<T>) -> Self {
+ WithSidebands {
+ parent,
+ handle_progress: None,
+ pos: 0,
+ cap: 0,
+ }
+ }
+
+ /// Forwards to the parent [`StreamingPeekableIter::reset_with()`]
+ pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) {
+ self.parent.reset_with(delimiters)
+ }
+
+ /// Forwards to the parent [`StreamingPeekableIter::stopped_at()`]
+ pub fn stopped_at(&self) -> Option<PacketLineRef<'static>> {
+ self.parent.stopped_at
+ }
+
+ /// Set or unset the progress handler.
+ pub fn set_progress_handler(&mut self, handle_progress: Option<F>) {
+ self.handle_progress = handle_progress;
+ }
+
+ /// Effectively forwards to the parent [`StreamingPeekableIter::peek_line()`], allowing to see what would be returned
+ /// next on a call to [`read_line()`][io::BufRead::read_line()].
+ ///
+ /// # Warning
+ ///
+ /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it.
+ pub fn peek_data_line(&mut self) -> Option<io::Result<Result<&[u8], crate::decode::Error>>> {
+ match self.parent.peek_line() {
+ Some(Ok(Ok(PacketLineRef::Data(line)))) => Some(Ok(Ok(line))),
+ Some(Ok(Err(err))) => Some(Ok(Err(err))),
+ Some(Err(err)) => Some(Err(err)),
+ _ => None,
+ }
+ }
+
+ /// Read a whole packetline from the underlying reader, with empty lines indicating a stop packetline.
+ ///
+ /// # Warning
+ ///
+ /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it.
+ pub fn read_data_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, crate::decode::Error>>> {
+ assert_eq!(
+ self.cap, 0,
+ "we don't support partial buffers right now - read-line must be used consistently"
+ );
+ self.parent.read_line()
+ }
+
+ /// Like `BufRead::read_line()`, but will only read one packetline at a time.
+ ///
+ /// It will also be easier to call as sometimes it's unclear which implementation we get on a type like this with
+ /// plenty of generic parameters.
+ pub fn read_line_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
+ assert_eq!(
+ self.cap, 0,
+ "we don't support partial buffers right now - read-line must be used consistently"
+ );
+ let line = std::str::from_utf8(self.fill_buf()?).map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
+ buf.push_str(line);
+ let bytes = line.len();
+ self.cap = 0;
+ Ok(bytes)
+ }
+}
+
+impl<'a, T, F> BufRead for WithSidebands<'a, T, F>
+where
+ T: io::Read,
+ F: FnMut(bool, &[u8]) -> ProgressAction,
+{
+ fn fill_buf(&mut self) -> io::Result<&[u8]> {
+ if self.pos >= self.cap {
+ let (ofs, cap) = loop {
+ let line = match self.parent.read_line() {
+ Some(line) => line?.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?,
+ None => break (0, 0),
+ };
+ match self.handle_progress.as_mut() {
+ Some(handle_progress) => {
+ let band = line
+ .decode_band()
+ .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
+ const ENCODED_BAND: usize = 1;
+ match band {
+ BandRef::Data(d) => {
+ if d.is_empty() {
+ continue;
+ }
+ break (U16_HEX_BYTES + ENCODED_BAND, d.len());
+ }
+ BandRef::Progress(d) => {
+ let text = TextRef::from(d).0;
+ match handle_progress(false, text) {
+ ProgressAction::Continue => {}
+ ProgressAction::Interrupt => {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "interrupted by user",
+ ))
+ }
+ };
+ }
+ BandRef::Error(d) => {
+ let text = TextRef::from(d).0;
+ match handle_progress(true, text) {
+ ProgressAction::Continue => {}
+ ProgressAction::Interrupt => {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "interrupted by user",
+ ))
+ }
+ };
+ }
+ };
+ }
+ None => {
+ break match line.as_slice() {
+ Some(d) => (U16_HEX_BYTES, d.len()),
+ None => {
+ return Err(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "encountered non-data line in a data-line only context",
+ ))
+ }
+ }
+ }
+ }
+ };
+ self.cap = cap + ofs;
+ self.pos = ofs;
+ }
+ Ok(&self.parent.buf[self.pos..self.cap])
+ }
+
+ fn consume(&mut self, amt: usize) {
+ self.pos = std::cmp::min(self.pos + amt, self.cap);
+ }
+}
+
+impl<'a, T, F> io::Read for WithSidebands<'a, T, F>
+where
+ T: io::Read,
+ F: FnMut(bool, &[u8]) -> ProgressAction,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ let nread = {
+ let mut rem = self.fill_buf()?;
+ rem.read(buf)?
+ };
+ self.consume(nread);
+ Ok(nread)
+ }
+}
diff --git a/vendor/gix-packetline-blocking/src/read/sidebands/mod.rs b/vendor/gix-packetline-blocking/src/read/sidebands/mod.rs
new file mode 100644
index 000000000..a1b6628e1
--- /dev/null
+++ b/vendor/gix-packetline-blocking/src/read/sidebands/mod.rs
@@ -0,0 +1,9 @@
+#[cfg(feature = "blocking-io")]
+mod blocking_io;
+#[cfg(feature = "blocking-io")]
+pub use blocking_io::WithSidebands;
+
+#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))]
+mod async_io;
+#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))]
+pub use async_io::WithSidebands;