diff options
Diffstat (limited to 'vendor/gix-packetline-blocking/src/read')
6 files changed, 1082 insertions, 0 deletions
diff --git a/vendor/gix-packetline-blocking/src/read/async_io.rs b/vendor/gix-packetline-blocking/src/read/async_io.rs new file mode 100644 index 000000000..402c2434b --- /dev/null +++ b/vendor/gix-packetline-blocking/src/read/async_io.rs @@ -0,0 +1,178 @@ +use std::io; + +use bstr::ByteSlice; +use futures_io::AsyncRead; +use futures_lite::AsyncReadExt; + +use crate::{ + decode, + read::{ExhaustiveOutcome, ProgressAction, WithSidebands}, + PacketLineRef, StreamingPeekableIter, MAX_LINE_LEN, U16_HEX_BYTES, +}; + +/// Non-IO methods +impl<T> StreamingPeekableIter<T> +where + T: AsyncRead + Unpin, +{ + #[allow(clippy::needless_lifetimes)] // TODO: remove once this is clippy false positive is fixed + async fn read_line_inner<'a>( + reader: &mut T, + buf: &'a mut [u8], + ) -> io::Result<Result<PacketLineRef<'a>, decode::Error>> { + let (hex_bytes, data_bytes) = buf.split_at_mut(4); + reader.read_exact(hex_bytes).await?; + let num_data_bytes = match decode::hex_prefix(hex_bytes) { + Ok(decode::PacketLineOrWantedSize::Line(line)) => return Ok(Ok(line)), + Ok(decode::PacketLineOrWantedSize::Wanted(additional_bytes)) => additional_bytes as usize, + Err(err) => return Ok(Err(err)), + }; + + let (data_bytes, _) = data_bytes.split_at_mut(num_data_bytes); + reader.read_exact(data_bytes).await?; + match decode::to_data_line(data_bytes) { + Ok(line) => Ok(Ok(line)), + Err(err) => Ok(Err(err)), + } + } + + /// This function is needed to help the borrow checker allow us to return references all the time + /// It contains a bunch of logic shared between peek and `read_line` invocations. + async fn read_line_inner_exhaustive<'a>( + reader: &mut T, + buf: &'a mut Vec<u8>, + delimiters: &[PacketLineRef<'static>], + fail_on_err_lines: bool, + buf_resize: bool, + ) -> ExhaustiveOutcome<'a> { + ( + false, + None, + Some(match Self::read_line_inner(reader, buf).await { + Ok(Ok(line)) => { + if delimiters.contains(&line) { + let stopped_at = delimiters.iter().find(|l| **l == line).copied(); + buf.clear(); + return (true, stopped_at, None); + } else if fail_on_err_lines { + if let Some(err) = line.check_error() { + let err = err.0.as_bstr().to_owned(); + buf.clear(); + return ( + true, + None, + Some(Err(io::Error::new( + io::ErrorKind::Other, + crate::read::Error { message: err }, + ))), + ); + } + } + let len = line.as_slice().map_or(U16_HEX_BYTES, |s| s.len() + U16_HEX_BYTES); + if buf_resize { + buf.resize(len, 0); + } + Ok(Ok(crate::decode(buf).expect("only valid data here"))) + } + Ok(Err(err)) => { + buf.clear(); + Ok(Err(err)) + } + Err(err) => { + buf.clear(); + Err(err) + } + }), + ) + } + + /// Read a packet line into the internal buffer and return it. + /// + /// Returns `None` if the end of iteration is reached because of one of the following: + /// + /// * natural EOF + /// * ERR packet line encountered if [`fail_on_err_lines()`][StreamingPeekableIter::fail_on_err_lines()] is true. + /// * A `delimiter` packet line encountered + pub async fn read_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, decode::Error>>> { + if self.is_done { + return None; + } + if !self.peek_buf.is_empty() { + std::mem::swap(&mut self.peek_buf, &mut self.buf); + self.peek_buf.clear(); + Some(Ok(Ok(crate::decode(&self.buf).expect("only valid data in peek buf")))) + } else { + if self.buf.len() != MAX_LINE_LEN { + self.buf.resize(MAX_LINE_LEN, 0); + } + let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive( + &mut self.read, + &mut self.buf, + self.delimiters, + self.fail_on_err_lines, + false, + ) + .await; + self.is_done = is_done; + self.stopped_at = stopped_at; + res + } + } + + /// Peek the next packet line without consuming it. + /// + /// Multiple calls to peek will return the same packet line, if there is one. + pub async fn peek_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, decode::Error>>> { + if self.is_done { + return None; + } + if self.peek_buf.is_empty() { + self.peek_buf.resize(MAX_LINE_LEN, 0); + let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive( + &mut self.read, + &mut self.peek_buf, + self.delimiters, + self.fail_on_err_lines, + true, + ) + .await; + self.is_done = is_done; + self.stopped_at = stopped_at; + res + } else { + Some(Ok(Ok(crate::decode(&self.peek_buf).expect("only valid data here")))) + } + } + + /// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support. + /// + /// Due to the preconfigured function type this method can be called without 'turbofish'. + #[allow(clippy::type_complexity)] + pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8]) -> ProgressAction> { + WithSidebands::new(self) + } + + /// Return this instance as implementor of [`Read`][io::Read] assuming side bands to be used in all received packet lines. + /// Each invocation of [`read_line()`][io::BufRead::read_line()] returns a packet line. + /// + /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool` + /// being true in case the `text` is to be interpreted as error. + /// + /// _Please note_ that side bands need to be negotiated with the server. + pub fn as_read_with_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction + Unpin>( + &mut self, + handle_progress: F, + ) -> WithSidebands<'_, T, F> { + WithSidebands::with_progress_handler(self, handle_progress) + } + + /// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support. + /// + /// The type parameter `F` needs to be configured for this method to be callable using the 'turbofish' operator. + /// Use [`as_read()`][StreamingPeekableIter::as_read()]. + pub fn as_read_without_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction + Unpin>( + &mut self, + ) -> WithSidebands<'_, T, F> { + WithSidebands::without_progress_handler(self) + } +} diff --git a/vendor/gix-packetline-blocking/src/read/blocking_io.rs b/vendor/gix-packetline-blocking/src/read/blocking_io.rs new file mode 100644 index 000000000..50c634c4c --- /dev/null +++ b/vendor/gix-packetline-blocking/src/read/blocking_io.rs @@ -0,0 +1,168 @@ +use std::io; + +use bstr::ByteSlice; + +use crate::{ + decode, + read::{ExhaustiveOutcome, ProgressAction, WithSidebands}, + PacketLineRef, StreamingPeekableIter, MAX_LINE_LEN, U16_HEX_BYTES, +}; + +/// Non-IO methods +impl<T> StreamingPeekableIter<T> +where + T: io::Read, +{ + fn read_line_inner<'a>(reader: &mut T, buf: &'a mut [u8]) -> io::Result<Result<PacketLineRef<'a>, decode::Error>> { + let (hex_bytes, data_bytes) = buf.split_at_mut(4); + reader.read_exact(hex_bytes)?; + let num_data_bytes = match decode::hex_prefix(hex_bytes) { + Ok(decode::PacketLineOrWantedSize::Line(line)) => return Ok(Ok(line)), + Ok(decode::PacketLineOrWantedSize::Wanted(additional_bytes)) => additional_bytes as usize, + Err(err) => return Ok(Err(err)), + }; + + let (data_bytes, _) = data_bytes.split_at_mut(num_data_bytes); + reader.read_exact(data_bytes)?; + match decode::to_data_line(data_bytes) { + Ok(line) => Ok(Ok(line)), + Err(err) => Ok(Err(err)), + } + } + + /// This function is needed to help the borrow checker allow us to return references all the time + /// It contains a bunch of logic shared between peek and `read_line` invocations. + fn read_line_inner_exhaustive<'a>( + reader: &mut T, + buf: &'a mut Vec<u8>, + delimiters: &[PacketLineRef<'static>], + fail_on_err_lines: bool, + buf_resize: bool, + ) -> ExhaustiveOutcome<'a> { + ( + false, + None, + Some(match Self::read_line_inner(reader, buf) { + Ok(Ok(line)) => { + if delimiters.contains(&line) { + let stopped_at = delimiters.iter().find(|l| **l == line).copied(); + buf.clear(); + return (true, stopped_at, None); + } else if fail_on_err_lines { + if let Some(err) = line.check_error() { + let err = err.0.as_bstr().to_owned(); + buf.clear(); + return ( + true, + None, + Some(Err(io::Error::new( + io::ErrorKind::Other, + crate::read::Error { message: err }, + ))), + ); + } + } + let len = line.as_slice().map_or(U16_HEX_BYTES, |s| s.len() + U16_HEX_BYTES); + if buf_resize { + buf.resize(len, 0); + } + Ok(Ok(crate::decode(buf).expect("only valid data here"))) + } + Ok(Err(err)) => { + buf.clear(); + Ok(Err(err)) + } + Err(err) => { + buf.clear(); + Err(err) + } + }), + ) + } + + /// Read a packet line into the internal buffer and return it. + /// + /// Returns `None` if the end of iteration is reached because of one of the following: + /// + /// * natural EOF + /// * ERR packet line encountered if [`fail_on_err_lines()`][StreamingPeekableIter::fail_on_err_lines()] is true. + /// * A `delimiter` packet line encountered + pub fn read_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, decode::Error>>> { + if self.is_done { + return None; + } + if !self.peek_buf.is_empty() { + std::mem::swap(&mut self.peek_buf, &mut self.buf); + self.peek_buf.clear(); + Some(Ok(Ok(crate::decode(&self.buf).expect("only valid data in peek buf")))) + } else { + if self.buf.len() != MAX_LINE_LEN { + self.buf.resize(MAX_LINE_LEN, 0); + } + let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive( + &mut self.read, + &mut self.buf, + self.delimiters, + self.fail_on_err_lines, + false, + ); + self.is_done = is_done; + self.stopped_at = stopped_at; + res + } + } + + /// Peek the next packet line without consuming it. + /// + /// Multiple calls to peek will return the same packet line, if there is one. + pub fn peek_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, decode::Error>>> { + if self.is_done { + return None; + } + if self.peek_buf.is_empty() { + self.peek_buf.resize(MAX_LINE_LEN, 0); + let (is_done, stopped_at, res) = Self::read_line_inner_exhaustive( + &mut self.read, + &mut self.peek_buf, + self.delimiters, + self.fail_on_err_lines, + true, + ); + self.is_done = is_done; + self.stopped_at = stopped_at; + res + } else { + Some(Ok(Ok(crate::decode(&self.peek_buf).expect("only valid data here")))) + } + } + + /// Return this instance as implementor of [`Read`][io::Read] assuming side bands to be used in all received packet lines. + /// Each invocation of [`read_line()`][io::BufRead::read_line()] returns a packet line. + /// + /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool` + /// being true in case the `text` is to be interpreted as error. + /// + /// _Please note_ that side bands need to be negotiated with the server. + pub fn as_read_with_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction>( + &mut self, + handle_progress: F, + ) -> WithSidebands<'_, T, F> { + WithSidebands::with_progress_handler(self, handle_progress) + } + + /// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support. + /// + /// The type parameter `F` needs to be configured for this method to be callable using the 'turbofish' operator. + /// Use [`as_read()`][StreamingPeekableIter::as_read()]. + pub fn as_read_without_sidebands<F: FnMut(bool, &[u8]) -> ProgressAction>(&mut self) -> WithSidebands<'_, T, F> { + WithSidebands::without_progress_handler(self) + } + + /// Same as [`as_read_with_sidebands(…)`][StreamingPeekableIter::as_read_with_sidebands()], but for channels without side band support. + /// + /// Due to the preconfigured function type this method can be called without 'turbofish'. + #[allow(clippy::type_complexity)] + pub fn as_read(&mut self) -> WithSidebands<'_, T, fn(bool, &[u8]) -> ProgressAction> { + WithSidebands::new(self) + } +} diff --git a/vendor/gix-packetline-blocking/src/read/mod.rs b/vendor/gix-packetline-blocking/src/read/mod.rs new file mode 100644 index 000000000..0fd30c892 --- /dev/null +++ b/vendor/gix-packetline-blocking/src/read/mod.rs @@ -0,0 +1,126 @@ +#[cfg(any(feature = "blocking-io", feature = "async-io"))] +use crate::MAX_LINE_LEN; +use crate::{PacketLineRef, StreamingPeekableIter, U16_HEX_BYTES}; + +/// Allow the read-progress handler to determine how to continue. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum ProgressAction { + /// Continue reading the next progress if available. + Continue, + /// Abort all IO even if more would be available, claiming the operation was interrupted. + Interrupt, +} + +#[cfg(any(feature = "blocking-io", feature = "async-io"))] +type ExhaustiveOutcome<'a> = ( + bool, // is_done + Option<PacketLineRef<'static>>, // stopped_at + Option<std::io::Result<Result<PacketLineRef<'a>, crate::decode::Error>>>, // actual method result +); + +mod error { + use std::fmt::{Debug, Display, Formatter}; + + use bstr::BString; + + /// The error representing an ERR packet line, as possibly wrapped into an `std::io::Error` in + /// [`read_line(…)`][super::StreamingPeekableIter::read_line()]. + #[derive(Debug)] + pub struct Error { + /// The contents of the ERR line, with `ERR` portion stripped. + pub message: BString, + } + + impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.message, f) + } + } + + impl std::error::Error for Error {} +} +pub use error::Error; + +impl<T> StreamingPeekableIter<T> { + /// Return a new instance from `read` which will stop decoding packet lines when receiving one of the given `delimiters`. + pub fn new(read: T, delimiters: &'static [PacketLineRef<'static>]) -> Self { + StreamingPeekableIter { + read, + #[cfg(any(feature = "blocking-io", feature = "async-io"))] + buf: vec![0; MAX_LINE_LEN], + peek_buf: Vec::new(), + delimiters, + fail_on_err_lines: false, + is_done: false, + stopped_at: None, + } + } + + /// Modify the peek buffer, overwriting the byte at `position` with the given byte to `replace_with` while truncating + /// it to contain only bytes until the newly replaced `position`. + /// + /// This is useful if you would want to remove 'special bytes' hidden behind, say a NULL byte to disappear and allow + /// standard line readers to read the next line as usual. + /// + /// **Note** that `position` does not include the 4 bytes prefix (they are invisible outside the reader) + pub fn peek_buffer_replace_and_truncate(&mut self, position: usize, replace_with: u8) { + let position = position + U16_HEX_BYTES; + self.peek_buf[position] = replace_with; + + let new_len = position + 1; + self.peek_buf.truncate(new_len); + self.peek_buf[..4].copy_from_slice(&crate::encode::u16_to_hex((new_len) as u16)); + } + + /// Returns the packet line that stopped the iteration, or + /// `None` if the end wasn't reached yet, on EOF, or if [`fail_on_err_lines()`][StreamingPeekableIter::fail_on_err_lines()] was true. + pub fn stopped_at(&self) -> Option<PacketLineRef<'static>> { + self.stopped_at + } + + /// Reset all iteration state allowing to continue a stopped iteration that is not yet at EOF. + /// + /// This can happen once a delimiter is reached. + pub fn reset(&mut self) { + let delimiters = std::mem::take(&mut self.delimiters); + self.reset_with(delimiters); + } + + /// Similar to [`reset()`][StreamingPeekableIter::reset()] with support to changing the `delimiters`. + pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) { + self.delimiters = delimiters; + self.is_done = false; + self.stopped_at = None; + } + + /// If `value` is `true` the provider will check for special `ERR` packet lines and stop iteration when one is encountered. + /// + /// Use [`stopped_at()]`[`StreamingPeekableIter::stopped_at()`] to inspect the cause of the end of the iteration. + /// ne + pub fn fail_on_err_lines(&mut self, value: bool) { + self.fail_on_err_lines = value; + } + + /// Replace the reader used with the given `read`, resetting all other iteration state as well. + pub fn replace(&mut self, read: T) -> T { + let prev = std::mem::replace(&mut self.read, read); + self.reset(); + self.fail_on_err_lines = false; + prev + } + + /// Return the inner read + pub fn into_inner(self) -> T { + self.read + } +} + +#[cfg(feature = "blocking-io")] +mod blocking_io; + +#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))] +mod async_io; + +mod sidebands; +#[cfg(any(feature = "blocking-io", feature = "async-io"))] +pub use sidebands::WithSidebands; diff --git a/vendor/gix-packetline-blocking/src/read/sidebands/async_io.rs b/vendor/gix-packetline-blocking/src/read/sidebands/async_io.rs new file mode 100644 index 000000000..37f93bca9 --- /dev/null +++ b/vendor/gix-packetline-blocking/src/read/sidebands/async_io.rs @@ -0,0 +1,383 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use futures_io::{AsyncBufRead, AsyncRead}; +use futures_lite::ready; + +use crate::{decode, read::ProgressAction, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES}; + +type ReadLineResult<'a> = Option<std::io::Result<Result<PacketLineRef<'a>, decode::Error>>>; +/// An implementor of [`AsyncBufRead`] yielding packet lines on each call to [`read_line()`][AsyncBufRead::read_line()]. +/// It's also possible to hide the underlying packet lines using the [`Read`][AsyncRead] implementation which is useful +/// if they represent binary data, like the one of a pack file. +pub struct WithSidebands<'a, T, F> +where + T: AsyncRead, +{ + state: State<'a, T>, + handle_progress: Option<F>, + pos: usize, + cap: usize, +} + +impl<'a, T, F> Drop for WithSidebands<'a, T, F> +where + T: AsyncRead, +{ + fn drop(&mut self) { + if let State::Idle { ref mut parent } = self.state { + parent + .as_mut() + .expect("parent is always available if we are idle") + .reset(); + } + } +} + +impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction> +where + T: AsyncRead, +{ + /// Create a new instance with the given provider as `parent`. + pub fn new(parent: &'a mut StreamingPeekableIter<T>) -> Self { + WithSidebands { + state: State::Idle { parent: Some(parent) }, + handle_progress: None, + pos: 0, + cap: 0, + } + } +} + +enum State<'a, T> { + Idle { + parent: Option<&'a mut StreamingPeekableIter<T>>, + }, + ReadLine { + read_line: Pin<Box<dyn Future<Output = ReadLineResult<'a>> + 'a>>, + parent_inactive: Option<*mut StreamingPeekableIter<T>>, + }, +} + +/// # SAFETY +/// It's safe because T is `Send` and we have a test that assures that our `StreamingPeekableIter` is `Send` as well, +/// hence the `*mut _` is `Send`. +/// `read_line` isn't send and we can't declare it as such as it forces `Send` in all places (BUT WHY IS THAT A PROBLEM, I don't recall). +/// However, it's only used when pinned and thus isn't actually sent anywhere, it's a secondary state of the future used after it was Send +/// to a thread possibly. +// TODO: Is it possible to declare it as it should be? +#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)] +unsafe impl<'a, T> Send for State<'a, T> where T: Send {} + +#[cfg(test)] +mod tests { + use super::*; + fn receiver<T: Send>(_i: T) {} + + /// We want to declare items containing pointers of `StreamingPeekableIter` `Send` as well, so it must be `Send` itself. + #[test] + fn streaming_peekable_iter_is_send() { + receiver(StreamingPeekableIter::new(Vec::<u8>::new(), &[])); + } + + #[test] + fn state_is_send() { + let mut s = StreamingPeekableIter::new(Vec::<u8>::new(), &[]); + receiver(State::Idle { parent: Some(&mut s) }); + } +} + +impl<'a, T, F> WithSidebands<'a, T, F> +where + T: AsyncRead + Unpin, + F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, +{ + /// Create a new instance with the given `parent` provider and the `handle_progress` function. + /// + /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool` + /// being true in case the `text` is to be interpreted as error. + pub fn with_progress_handler(parent: &'a mut StreamingPeekableIter<T>, handle_progress: F) -> Self { + WithSidebands { + state: State::Idle { parent: Some(parent) }, + handle_progress: Some(handle_progress), + pos: 0, + cap: 0, + } + } + + /// Create a new instance without a progress handler. + pub fn without_progress_handler(parent: &'a mut StreamingPeekableIter<T>) -> Self { + WithSidebands { + state: State::Idle { parent: Some(parent) }, + handle_progress: None, + pos: 0, + cap: 0, + } + } + + /// Forwards to the parent [`StreamingPeekableIter::reset_with()`] + pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) { + if let State::Idle { ref mut parent } = self.state { + parent + .as_mut() + .expect("parent is always available if we are idle") + .reset_with(delimiters) + } + } + + /// Forwards to the parent [`StreamingPeekableIter::stopped_at()`] + pub fn stopped_at(&self) -> Option<PacketLineRef<'static>> { + match self.state { + State::Idle { ref parent } => { + parent + .as_ref() + .expect("parent is always available if we are idle") + .stopped_at + } + _ => None, + } + } + + /// Set or unset the progress handler. + pub fn set_progress_handler(&mut self, handle_progress: Option<F>) { + self.handle_progress = handle_progress; + } + + /// Effectively forwards to the parent [`StreamingPeekableIter::peek_line()`], allowing to see what would be returned + /// next on a call to [`read_line()`][io::BufRead::read_line()]. + /// + /// # Warning + /// + /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it. + pub async fn peek_data_line(&mut self) -> Option<std::io::Result<Result<&[u8], decode::Error>>> { + match self.state { + State::Idle { ref mut parent } => match parent + .as_mut() + .expect("parent is always available if we are idle") + .peek_line() + .await + { + Some(Ok(Ok(PacketLineRef::Data(line)))) => Some(Ok(Ok(line))), + Some(Ok(Err(err))) => Some(Ok(Err(err))), + Some(Err(err)) => Some(Err(err)), + _ => None, + }, + _ => None, + } + } + + /// Read a packet line as string line. + pub fn read_line_to_string<'b>(&'b mut self, buf: &'b mut String) -> ReadLineFuture<'a, 'b, T, F> { + ReadLineFuture { parent: self, buf } + } + + /// Read a packet line from the underlying packet reader, returning empty lines if a stop-packetline was reached. + /// + /// # Warning + /// + /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it. + pub async fn read_data_line(&mut self) -> Option<std::io::Result<Result<PacketLineRef<'_>, decode::Error>>> { + match &mut self.state { + State::Idle { parent: Some(parent) } => { + assert_eq!( + self.cap, 0, + "we don't support partial buffers right now - read-line must be used consistently" + ); + parent.read_line().await + } + _ => None, + } + } +} + +pub struct ReadDataLineFuture<'a, 'b, T: AsyncRead, F> { + parent: &'b mut WithSidebands<'a, T, F>, + buf: &'b mut Vec<u8>, +} + +impl<'a, 'b, T, F> Future for ReadDataLineFuture<'a, 'b, T, F> +where + T: AsyncRead + Unpin, + F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, +{ + type Output = std::io::Result<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + assert_eq!( + self.parent.cap, 0, + "we don't support partial buffers right now - read-line must be used consistently" + ); + let Self { buf, parent } = &mut *self; + let line = ready!(Pin::new(parent).poll_fill_buf(cx))?; + buf.clear(); + buf.extend_from_slice(line); + let bytes = line.len(); + self.parent.cap = 0; + Poll::Ready(Ok(bytes)) + } +} + +pub struct ReadLineFuture<'a, 'b, T: AsyncRead, F> { + parent: &'b mut WithSidebands<'a, T, F>, + buf: &'b mut String, +} + +impl<'a, 'b, T, F> Future for ReadLineFuture<'a, 'b, T, F> +where + T: AsyncRead + Unpin, + F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, +{ + type Output = std::io::Result<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + assert_eq!( + self.parent.cap, 0, + "we don't support partial buffers right now - read-line must be used consistently" + ); + let Self { buf, parent } = &mut *self; + let line = std::str::from_utf8(ready!(Pin::new(parent).poll_fill_buf(cx))?) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; + buf.clear(); + buf.push_str(line); + let bytes = line.len(); + self.parent.cap = 0; + Poll::Ready(Ok(bytes)) + } +} + +impl<'a, T, F> AsyncBufRead for WithSidebands<'a, T, F> +where + T: AsyncRead + Unpin, + F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, +{ + fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> { + use std::io; + + use futures_lite::FutureExt; + { + let this = self.as_mut().get_mut(); + if this.pos >= this.cap { + let (ofs, cap) = loop { + match this.state { + State::Idle { ref mut parent } => { + let parent = parent.take().expect("parent to be present here"); + let inactive = parent as *mut _; + this.state = State::ReadLine { + read_line: parent.read_line().boxed_local(), + parent_inactive: Some(inactive), + } + } + State::ReadLine { + ref mut read_line, + ref mut parent_inactive, + } => { + let line = ready!(read_line.poll(cx)); + + this.state = { + let parent = parent_inactive.take().expect("parent pointer always set"); + // SAFETY: It's safe to recover the original mutable reference (from which + // the `read_line` future was created as the latter isn't accessible anymore + // once the state is set to Idle. In other words, either one or the other are + // accessible, never both at the same time. + // Also: We keep a pointer around which is protected by borrowcheck since it's created + // from a legal mutable reference which is moved into the read_line future - if it was manually + // implemented we would be able to re-obtain it from there. + #[allow(unsafe_code)] + let parent = unsafe { &mut *parent }; + State::Idle { parent: Some(parent) } + }; + + let line = match line { + Some(line) => line?.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?, + None => break (0, 0), + }; + + match this.handle_progress.as_mut() { + Some(handle_progress) => { + let band = line + .decode_band() + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + const ENCODED_BAND: usize = 1; + match band { + BandRef::Data(d) => { + if d.is_empty() { + continue; + } + break (U16_HEX_BYTES + ENCODED_BAND, d.len()); + } + BandRef::Progress(d) => { + let text = TextRef::from(d).0; + match handle_progress(false, text) { + ProgressAction::Continue => {} + ProgressAction::Interrupt => { + return Poll::Ready(Err(io::Error::new( + std::io::ErrorKind::Other, + "interrupted by user", + ))) + } + }; + } + BandRef::Error(d) => { + let text = TextRef::from(d).0; + match handle_progress(true, text) { + ProgressAction::Continue => {} + ProgressAction::Interrupt => { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::Other, + "interrupted by user", + ))) + } + }; + } + }; + } + None => { + break match line.as_slice() { + Some(d) => (U16_HEX_BYTES, d.len()), + None => { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "encountered non-data line in a data-line only context", + ))) + } + } + } + } + } + } + }; + this.cap = cap + ofs; + this.pos = ofs; + } + } + let range = self.pos..self.cap; + match &self.get_mut().state { + State::Idle { parent } => Poll::Ready(Ok(&parent.as_ref().expect("parent always available").buf[range])), + State::ReadLine { .. } => unreachable!("at least in theory"), + } + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + let this = self.get_mut(); + this.pos = std::cmp::min(this.pos + amt, this.cap); + } +} + +impl<'a, T, F> AsyncRead for WithSidebands<'a, T, F> +where + T: AsyncRead + Unpin, + F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, +{ + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> { + let nread = { + use std::io::Read; + let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; + rem.read(buf)? + }; + self.consume(nread); + Poll::Ready(Ok(nread)) + } +} diff --git a/vendor/gix-packetline-blocking/src/read/sidebands/blocking_io.rs b/vendor/gix-packetline-blocking/src/read/sidebands/blocking_io.rs new file mode 100644 index 000000000..f5c87aeb8 --- /dev/null +++ b/vendor/gix-packetline-blocking/src/read/sidebands/blocking_io.rs @@ -0,0 +1,218 @@ +use std::{io, io::BufRead}; + +use crate::{read::ProgressAction, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES}; + +/// An implementor of [`BufRead`][io::BufRead] yielding packet lines on each call to [`read_line()`][io::BufRead::read_line()]. +/// It's also possible to hide the underlying packet lines using the [`Read`][io::Read] implementation which is useful +/// if they represent binary data, like the one of a pack file. +pub struct WithSidebands<'a, T, F> +where + T: io::Read, +{ + parent: &'a mut StreamingPeekableIter<T>, + handle_progress: Option<F>, + pos: usize, + cap: usize, +} + +impl<'a, T, F> Drop for WithSidebands<'a, T, F> +where + T: io::Read, +{ + fn drop(&mut self) { + self.parent.reset(); + } +} + +impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction> +where + T: io::Read, +{ + /// Create a new instance with the given provider as `parent`. + pub fn new(parent: &'a mut StreamingPeekableIter<T>) -> Self { + WithSidebands { + parent, + handle_progress: None, + pos: 0, + cap: 0, + } + } +} + +impl<'a, T, F> WithSidebands<'a, T, F> +where + T: io::Read, + F: FnMut(bool, &[u8]) -> ProgressAction, +{ + /// Create a new instance with the given `parent` provider and the `handle_progress` function. + /// + /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool` + /// being true in case the `text` is to be interpreted as error. + pub fn with_progress_handler(parent: &'a mut StreamingPeekableIter<T>, handle_progress: F) -> Self { + WithSidebands { + parent, + handle_progress: Some(handle_progress), + pos: 0, + cap: 0, + } + } + + /// Create a new instance without a progress handler. + pub fn without_progress_handler(parent: &'a mut StreamingPeekableIter<T>) -> Self { + WithSidebands { + parent, + handle_progress: None, + pos: 0, + cap: 0, + } + } + + /// Forwards to the parent [`StreamingPeekableIter::reset_with()`] + pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) { + self.parent.reset_with(delimiters) + } + + /// Forwards to the parent [`StreamingPeekableIter::stopped_at()`] + pub fn stopped_at(&self) -> Option<PacketLineRef<'static>> { + self.parent.stopped_at + } + + /// Set or unset the progress handler. + pub fn set_progress_handler(&mut self, handle_progress: Option<F>) { + self.handle_progress = handle_progress; + } + + /// Effectively forwards to the parent [`StreamingPeekableIter::peek_line()`], allowing to see what would be returned + /// next on a call to [`read_line()`][io::BufRead::read_line()]. + /// + /// # Warning + /// + /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it. + pub fn peek_data_line(&mut self) -> Option<io::Result<Result<&[u8], crate::decode::Error>>> { + match self.parent.peek_line() { + Some(Ok(Ok(PacketLineRef::Data(line)))) => Some(Ok(Ok(line))), + Some(Ok(Err(err))) => Some(Ok(Err(err))), + Some(Err(err)) => Some(Err(err)), + _ => None, + } + } + + /// Read a whole packetline from the underlying reader, with empty lines indicating a stop packetline. + /// + /// # Warning + /// + /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it. + pub fn read_data_line(&mut self) -> Option<io::Result<Result<PacketLineRef<'_>, crate::decode::Error>>> { + assert_eq!( + self.cap, 0, + "we don't support partial buffers right now - read-line must be used consistently" + ); + self.parent.read_line() + } + + /// Like `BufRead::read_line()`, but will only read one packetline at a time. + /// + /// It will also be easier to call as sometimes it's unclear which implementation we get on a type like this with + /// plenty of generic parameters. + pub fn read_line_to_string(&mut self, buf: &mut String) -> io::Result<usize> { + assert_eq!( + self.cap, 0, + "we don't support partial buffers right now - read-line must be used consistently" + ); + let line = std::str::from_utf8(self.fill_buf()?).map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + buf.push_str(line); + let bytes = line.len(); + self.cap = 0; + Ok(bytes) + } +} + +impl<'a, T, F> BufRead for WithSidebands<'a, T, F> +where + T: io::Read, + F: FnMut(bool, &[u8]) -> ProgressAction, +{ + fn fill_buf(&mut self) -> io::Result<&[u8]> { + if self.pos >= self.cap { + let (ofs, cap) = loop { + let line = match self.parent.read_line() { + Some(line) => line?.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?, + None => break (0, 0), + }; + match self.handle_progress.as_mut() { + Some(handle_progress) => { + let band = line + .decode_band() + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + const ENCODED_BAND: usize = 1; + match band { + BandRef::Data(d) => { + if d.is_empty() { + continue; + } + break (U16_HEX_BYTES + ENCODED_BAND, d.len()); + } + BandRef::Progress(d) => { + let text = TextRef::from(d).0; + match handle_progress(false, text) { + ProgressAction::Continue => {} + ProgressAction::Interrupt => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "interrupted by user", + )) + } + }; + } + BandRef::Error(d) => { + let text = TextRef::from(d).0; + match handle_progress(true, text) { + ProgressAction::Continue => {} + ProgressAction::Interrupt => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "interrupted by user", + )) + } + }; + } + }; + } + None => { + break match line.as_slice() { + Some(d) => (U16_HEX_BYTES, d.len()), + None => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "encountered non-data line in a data-line only context", + )) + } + } + } + } + }; + self.cap = cap + ofs; + self.pos = ofs; + } + Ok(&self.parent.buf[self.pos..self.cap]) + } + + fn consume(&mut self, amt: usize) { + self.pos = std::cmp::min(self.pos + amt, self.cap); + } +} + +impl<'a, T, F> io::Read for WithSidebands<'a, T, F> +where + T: io::Read, + F: FnMut(bool, &[u8]) -> ProgressAction, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + let nread = { + let mut rem = self.fill_buf()?; + rem.read(buf)? + }; + self.consume(nread); + Ok(nread) + } +} diff --git a/vendor/gix-packetline-blocking/src/read/sidebands/mod.rs b/vendor/gix-packetline-blocking/src/read/sidebands/mod.rs new file mode 100644 index 000000000..a1b6628e1 --- /dev/null +++ b/vendor/gix-packetline-blocking/src/read/sidebands/mod.rs @@ -0,0 +1,9 @@ +#[cfg(feature = "blocking-io")] +mod blocking_io; +#[cfg(feature = "blocking-io")] +pub use blocking_io::WithSidebands; + +#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))] +mod async_io; +#[cfg(all(not(feature = "blocking-io"), feature = "async-io"))] +pub use async_io::WithSidebands; |