diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:41:41 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:41:41 +0000 |
commit | 10ee2acdd26a7f1298c6f6d6b7af9b469fe29b87 (patch) | |
tree | bdffd5d80c26cf4a7a518281a204be1ace85b4c1 /vendor/gix-protocol/src/fetch | |
parent | Releasing progress-linux version 1.70.0+dfsg1-9~progress7.99u1. (diff) | |
download | rustc-10ee2acdd26a7f1298c6f6d6b7af9b469fe29b87.tar.xz rustc-10ee2acdd26a7f1298c6f6d6b7af9b469fe29b87.zip |
Merging upstream version 1.70.0+dfsg2.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/gix-protocol/src/fetch')
-rw-r--r-- | vendor/gix-protocol/src/fetch/arguments/async_io.rs | 55 | ||||
-rw-r--r-- | vendor/gix-protocol/src/fetch/arguments/blocking_io.rs | 56 | ||||
-rw-r--r-- | vendor/gix-protocol/src/fetch/arguments/mod.rs | 252 | ||||
-rw-r--r-- | vendor/gix-protocol/src/fetch/delegate.rs | 313 | ||||
-rw-r--r-- | vendor/gix-protocol/src/fetch/error.rs | 21 | ||||
-rw-r--r-- | vendor/gix-protocol/src/fetch/handshake.rs | 27 | ||||
-rw-r--r-- | vendor/gix-protocol/src/fetch/mod.rs | 20 | ||||
-rw-r--r-- | vendor/gix-protocol/src/fetch/response/async_io.rs | 136 | ||||
-rw-r--r-- | vendor/gix-protocol/src/fetch/response/blocking_io.rs | 135 | ||||
-rw-r--r-- | vendor/gix-protocol/src/fetch/response/mod.rs | 244 | ||||
-rw-r--r-- | vendor/gix-protocol/src/fetch/tests.rs | 388 |
11 files changed, 1647 insertions, 0 deletions
diff --git a/vendor/gix-protocol/src/fetch/arguments/async_io.rs b/vendor/gix-protocol/src/fetch/arguments/async_io.rs new file mode 100644 index 000000000..3984ec610 --- /dev/null +++ b/vendor/gix-protocol/src/fetch/arguments/async_io.rs @@ -0,0 +1,55 @@ +use futures_lite::io::AsyncWriteExt; +use gix_transport::{client, client::TransportV2Ext}; + +use crate::{fetch::Arguments, Command}; + +impl Arguments { + /// Send fetch arguments to the server, and indicate this is the end of negotiations only if `add_done_argument` is present. + pub async fn send<'a, T: client::Transport + 'a>( + &mut self, + transport: &'a mut T, + add_done_argument: bool, + ) -> Result<Box<dyn client::ExtendedBufRead + Unpin + 'a>, client::Error> { + if self.haves.is_empty() { + assert!(add_done_argument, "If there are no haves, is_done must be true."); + } + match self.version { + gix_transport::Protocol::V1 => { + let (on_into_read, retained_state) = self.prepare_v1( + transport.connection_persists_across_multiple_requests(), + add_done_argument, + )?; + let mut line_writer = + transport.request(client::WriteMode::OneLfTerminatedLinePerWriteCall, on_into_read)?; + let had_args = !self.args.is_empty(); + for arg in self.args.drain(..) { + line_writer.write_all(&arg).await?; + } + if had_args { + line_writer.write_message(client::MessageKind::Flush).await?; + } + for line in self.haves.drain(..) { + line_writer.write_all(&line).await?; + } + if let Some(next_args) = retained_state { + self.args = next_args; + } + Ok(line_writer.into_read().await?) + } + gix_transport::Protocol::V2 => { + let retained_state = self.args.clone(); + self.args.append(&mut self.haves); + if add_done_argument { + self.args.push("done".into()); + } + transport + .invoke( + Command::Fetch.as_str(), + self.features.iter().filter(|(_, v)| v.is_some()).cloned(), + Some(std::mem::replace(&mut self.args, retained_state).into_iter()), + ) + .await + } + } + } +} diff --git a/vendor/gix-protocol/src/fetch/arguments/blocking_io.rs b/vendor/gix-protocol/src/fetch/arguments/blocking_io.rs new file mode 100644 index 000000000..b49d1a1ba --- /dev/null +++ b/vendor/gix-protocol/src/fetch/arguments/blocking_io.rs @@ -0,0 +1,56 @@ +use std::io::Write; + +use gix_transport::{client, client::TransportV2Ext}; + +use crate::{fetch::Arguments, Command}; + +impl Arguments { + /// Send fetch arguments to the server, and indicate this is the end of negotiations only if `add_done_argument` is present. + pub fn send<'a, T: client::Transport + 'a>( + &mut self, + transport: &'a mut T, + add_done_argument: bool, + ) -> Result<Box<dyn client::ExtendedBufRead + Unpin + 'a>, client::Error> { + if self.haves.is_empty() { + assert!(add_done_argument, "If there are no haves, is_done must be true."); + } + match self.version { + gix_transport::Protocol::V1 => { + let (on_into_read, retained_state) = self.prepare_v1( + transport.connection_persists_across_multiple_requests(), + add_done_argument, + )?; + let mut line_writer = + transport.request(client::WriteMode::OneLfTerminatedLinePerWriteCall, on_into_read)?; + let had_args = !self.args.is_empty(); + for arg in self.args.drain(..) { + line_writer.write_all(&arg)?; + } + if had_args { + line_writer.write_message(client::MessageKind::Flush)?; + } + for line in self.haves.drain(..) { + line_writer.write_all(&line)?; + } + if let Some(next_args) = retained_state { + self.args = next_args; + } + Ok(line_writer.into_read()?) + } + gix_transport::Protocol::V2 => { + let retained_state = self.args.clone(); + self.args.append(&mut self.haves); + if add_done_argument { + self.args.push("done".into()); + } + transport.invoke( + Command::Fetch.as_str(), + self.features + .iter() + .filter_map(|(k, v)| v.as_ref().map(|v| (*k, Some(v.as_ref())))), + Some(std::mem::replace(&mut self.args, retained_state).into_iter()), + ) + } + } + } +} diff --git a/vendor/gix-protocol/src/fetch/arguments/mod.rs b/vendor/gix-protocol/src/fetch/arguments/mod.rs new file mode 100644 index 000000000..39c9eee3a --- /dev/null +++ b/vendor/gix-protocol/src/fetch/arguments/mod.rs @@ -0,0 +1,252 @@ +use std::fmt; + +use bstr::{BStr, BString, ByteSlice, ByteVec}; + +/// The arguments passed to a server command. +#[derive(Debug)] +pub struct Arguments { + /// The active features/capabilities of the fetch invocation + #[cfg(any(feature = "async-client", feature = "blocking-client"))] + features: Vec<crate::command::Feature>, + + args: Vec<BString>, + haves: Vec<BString>, + + filter: bool, + shallow: bool, + deepen_since: bool, + deepen_not: bool, + deepen_relative: bool, + ref_in_want: bool, + supports_include_tag: bool, + + features_for_first_want: Option<Vec<String>>, + #[cfg(any(feature = "async-client", feature = "blocking-client"))] + version: gix_transport::Protocol, +} + +impl Arguments { + /// Return true if there is no argument at all. + /// + /// This can happen if callers assure that they won't add 'wants' if their 'have' is the same, i.e. if the remote has nothing + /// new for them. + pub fn is_empty(&self) -> bool { + self.haves.is_empty() && !self.args.iter().rev().any(|arg| arg.starts_with_str("want ")) + } + /// Return true if ref filters is supported. + pub fn can_use_filter(&self) -> bool { + self.filter + } + /// Return true if shallow refs are supported. + /// + /// This is relevant for partial clones when using `--depth X`. + pub fn can_use_shallow(&self) -> bool { + self.shallow + } + /// Return true if the 'deepen' capability is supported. + /// + /// This is relevant for partial clones when using `--depth X` and retrieving additional history. + pub fn can_use_deepen(&self) -> bool { + self.shallow + } + /// Return true if the 'deepen_since' capability is supported. + /// + /// This is relevant for partial clones when using `--depth X` and retrieving additional history + /// based on a date beyond which all history should be present. + pub fn can_use_deepen_since(&self) -> bool { + self.deepen_since + } + /// Return true if the 'deepen_not' capability is supported. + /// + /// This is relevant for partial clones when using `--depth X`. + pub fn can_use_deepen_not(&self) -> bool { + self.deepen_not + } + /// Return true if the 'deepen_relative' capability is supported. + /// + /// This is relevant for partial clones when using `--depth X`. + pub fn can_use_deepen_relative(&self) -> bool { + self.deepen_relative + } + /// Return true if the 'ref-in-want' capability is supported. + /// + /// This can be used to bypass 'ls-refs' entirely in protocol v2. + pub fn can_use_ref_in_want(&self) -> bool { + self.ref_in_want + } + /// Return true if the 'include-tag' capability is supported. + pub fn can_use_include_tag(&self) -> bool { + self.supports_include_tag + } + + /// Add the given `id` pointing to a commit to the 'want' list. + /// + /// As such it should be included in the server response as it's not present on the client. + pub fn want(&mut self, id: impl AsRef<gix_hash::oid>) { + match self.features_for_first_want.take() { + Some(features) => self.prefixed("want ", format!("{} {}", id.as_ref(), features.join(" "))), + None => self.prefixed("want ", id.as_ref()), + } + } + /// Add the given ref to the 'want-ref' list. + /// + /// The server should respond with a corresponding 'wanted-refs' section if it will include the + /// wanted ref in the packfile response. + pub fn want_ref(&mut self, ref_path: &BStr) { + let mut arg = BString::from("want-ref "); + arg.push_str(ref_path); + self.args.push(arg); + } + /// Add the given `id` pointing to a commit to the 'have' list. + /// + /// As such it should _not_ be included in the server response as it's already present on the client. + pub fn have(&mut self, id: impl AsRef<gix_hash::oid>) { + self.haves.push(format!("have {}", id.as_ref()).into()); + } + /// Add the given `id` pointing to a commit to the 'shallow' list. + pub fn shallow(&mut self, id: impl AsRef<gix_hash::oid>) { + debug_assert!(self.shallow, "'shallow' feature required for 'shallow <id>'"); + if self.shallow { + self.prefixed("shallow ", id.as_ref()); + } + } + /// Deepen the commit history by `depth` amount of commits. + pub fn deepen(&mut self, depth: usize) { + debug_assert!(self.shallow, "'shallow' feature required for deepen"); + if self.shallow { + self.prefixed("deepen ", depth); + } + } + /// Deepen the commit history to include all commits from now to `seconds_since_unix_epoch`. + pub fn deepen_since(&mut self, seconds_since_unix_epoch: usize) { + debug_assert!(self.deepen_since, "'deepen-since' feature required"); + if self.deepen_since { + self.prefixed("deepen-since ", seconds_since_unix_epoch); + } + } + /// Deepen the commit history in a relative instead of absolute fashion. + pub fn deepen_relative(&mut self) { + debug_assert!(self.deepen_relative, "'deepen-relative' feature required"); + if self.deepen_relative { + self.args.push("deepen-relative".into()); + } + } + /// Do not include commits reachable by the given `ref_path` when deepening the history. + pub fn deepen_not(&mut self, ref_path: &BStr) { + debug_assert!(self.deepen_not, "'deepen-not' feature required"); + if self.deepen_not { + let mut line = BString::from("deepen-not "); + line.extend_from_slice(ref_path); + self.args.push(line); + } + } + /// Set the given filter `spec` when listing references. + pub fn filter(&mut self, spec: &str) { + debug_assert!(self.filter, "'filter' feature required"); + if self.filter { + self.prefixed("filter ", spec); + } + } + /// Permanently allow the server to include tags that point to commits or objects it would return. + /// + /// Needs to only be called once. + pub fn use_include_tag(&mut self) { + debug_assert!(self.supports_include_tag, "'include-tag' feature required"); + if self.supports_include_tag { + self.args.push("include-tag".into()); + } + } + fn prefixed(&mut self, prefix: &str, value: impl fmt::Display) { + self.args.push(format!("{prefix}{value}").into()); + } + /// Create a new instance to help setting up arguments to send to the server as part of a `fetch` operation + /// for which `features` are the available and configured features to use. + #[cfg(any(feature = "async-client", feature = "blocking-client"))] + pub fn new(version: gix_transport::Protocol, features: Vec<crate::command::Feature>) -> Self { + use crate::Command; + let has = |name: &str| features.iter().any(|f| f.0 == name); + let filter = has("filter"); + let shallow = has("shallow"); + let ref_in_want = has("ref-in-want"); + let mut deepen_since = shallow; + let mut deepen_not = shallow; + let mut deepen_relative = shallow; + let supports_include_tag; + let (initial_arguments, features_for_first_want) = match version { + gix_transport::Protocol::V1 => { + deepen_since = has("deepen-since"); + deepen_not = has("deepen-not"); + deepen_relative = has("deepen-relative"); + supports_include_tag = has("include-tag"); + let baked_features = features + .iter() + .map(|(n, v)| match v { + Some(v) => format!("{n}={v}"), + None => n.to_string(), + }) + .collect::<Vec<_>>(); + (Vec::new(), Some(baked_features)) + } + gix_transport::Protocol::V2 => { + supports_include_tag = true; + (Command::Fetch.initial_arguments(&features), None) + } + }; + + Arguments { + features, + version, + args: initial_arguments, + haves: Vec::new(), + filter, + shallow, + supports_include_tag, + deepen_not, + deepen_relative, + ref_in_want, + deepen_since, + features_for_first_want, + } + } +} + +#[cfg(any(feature = "blocking-client", feature = "async-client"))] +mod shared { + use bstr::{BString, ByteSlice}; + use gix_transport::{client, client::MessageKind}; + + use crate::fetch::Arguments; + + impl Arguments { + pub(in crate::fetch::arguments) fn prepare_v1( + &mut self, + transport_is_stateful: bool, + add_done_argument: bool, + ) -> Result<(MessageKind, Option<Vec<BString>>), client::Error> { + if self.haves.is_empty() { + assert!(add_done_argument, "If there are no haves, is_done must be true."); + } + let on_into_read = if add_done_argument { + client::MessageKind::Text(&b"done"[..]) + } else { + client::MessageKind::Flush + }; + let retained_state = if transport_is_stateful { + None + } else { + Some(self.args.clone()) + }; + + if let Some(first_arg_position) = self.args.iter().position(|l| l.starts_with_str("want ")) { + self.args.swap(first_arg_position, 0); + } + Ok((on_into_read, retained_state)) + } + } +} + +#[cfg(feature = "async-client")] +mod async_io; + +#[cfg(feature = "blocking-client")] +mod blocking_io; diff --git a/vendor/gix-protocol/src/fetch/delegate.rs b/vendor/gix-protocol/src/fetch/delegate.rs new file mode 100644 index 000000000..b0db2f833 --- /dev/null +++ b/vendor/gix-protocol/src/fetch/delegate.rs @@ -0,0 +1,313 @@ +use std::{ + borrow::Cow, + io, + ops::{Deref, DerefMut}, +}; + +use bstr::BString; +use gix_transport::client::Capabilities; + +use crate::{ + fetch::{Arguments, Response}, + handshake::Ref, +}; + +/// Defines what to do next after certain [`Delegate`] operations. +#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)] +pub enum Action { + /// Continue the typical flow of operations in this flow. + Continue, + /// Return at the next possible opportunity without making further requests, possibly after closing the connection. + Cancel, +} + +/// The non-IO protocol delegate is the bare minimal interface needed to fully control the [`fetch`][crate::fetch()] operation, sparing +/// the IO parts. +/// Async implementations must treat it as blocking and unblock it by evaluating it elsewhere. +/// +/// See [Delegate] for the complete trait. +pub trait DelegateBlocking { + /// Return extra parameters to be provided during the handshake. + /// + /// Note that this method is only called once and the result is reused during subsequent handshakes which may happen + /// if there is an authentication failure. + fn handshake_extra_parameters(&self) -> Vec<(String, Option<String>)> { + Vec::new() + } + /// Called before invoking 'ls-refs' on the server to allow providing it with additional `arguments` and to enable `features`. + /// If the server `capabilities` don't match the requirements abort with an error to abort the entire fetch operation. + /// + /// Note that some arguments are preset based on typical use, and `features` are preset to maximize options. + /// The `server` capabilities can be used to see which additional capabilities the server supports as per the handshake which happened prior. + /// + /// If the delegate returns [`ls_refs::Action::Skip`], no 'ls-refs` command is sent to the server. + /// + /// Note that this is called only if we are using protocol version 2. + fn prepare_ls_refs( + &mut self, + _server: &Capabilities, + _arguments: &mut Vec<BString>, + _features: &mut Vec<(&str, Option<Cow<'_, str>>)>, + ) -> std::io::Result<ls_refs::Action> { + Ok(ls_refs::Action::Continue) + } + + /// Called before invoking the 'fetch' interaction with `features` pre-filled for typical use + /// and to maximize capabilities to allow aborting an interaction early. + /// + /// `refs` is a list of known references on the remote based on the handshake or a prior call to ls_refs. + /// These can be used to abort early in case the refs are already known here. + /// + /// As there will be another call allowing to post arguments conveniently in the correct format, i.e. `want hex-oid`, + /// there is no way to set arguments at this time. + /// + /// `version` is the actually supported version as reported by the server, which is relevant in case the server requested a downgrade. + /// `server` capabilities is a list of features the server supports for your information, along with enabled `features` that the server knows about. + fn prepare_fetch( + &mut self, + _version: gix_transport::Protocol, + _server: &Capabilities, + _features: &mut Vec<(&str, Option<Cow<'_, str>>)>, + _refs: &[Ref], + ) -> std::io::Result<Action> { + Ok(Action::Continue) + } + + /// A method called repeatedly to negotiate the objects to receive in [`receive_pack(…)`][Delegate::receive_pack()]. + /// + /// The first call has `previous_response` set to `None` as there was no previous response. Every call that follows `previous_response` + /// will be set to `Some`. + /// + /// ### If `previous_response` is `None`… + /// + /// Given a list of `arguments` to populate with wants, want-refs, shallows, filters and other contextual information to be + /// sent to the server. This method is called once. + /// Send the objects you `have` have afterwards based on the tips of your refs, in preparation to walk down their parents + /// with each call to `negotiate` to find the common base(s). + /// + /// Note that you should not `want` and object that you already have. + /// `refs` are the the tips of on the server side, effectively the latest objects _they_ have. + /// + /// Return `Action::Close` if you know that there are no `haves` on your end to allow the server to send all of its objects + /// as is the case during initial clones. + /// + /// ### If `previous_response` is `Some`… + /// + /// Populate `arguments` with the objects you `have` starting from the tips of _your_ refs, taking into consideration + /// the `previous_response` response of the server to see which objects they acknowledged to have. You have to maintain + /// enough state to be able to walk down from your tips on each call, if they are not in common, and keep setting `have` + /// for those which are in common if that helps teaching the server about our state and to acknowledge their existence on _their_ end. + /// This method is called until the other side signals they are ready to send a pack. + /// Return `Action::Close` if you want to give up before finding a common base. This can happen if the remote repository + /// has radically changed so there are no bases, or they are very far in the past, causing all objects to be sent. + fn negotiate( + &mut self, + refs: &[Ref], + arguments: &mut Arguments, + previous_response: Option<&Response>, + ) -> io::Result<Action>; +} + +impl<T: DelegateBlocking> DelegateBlocking for Box<T> { + fn handshake_extra_parameters(&self) -> Vec<(String, Option<String>)> { + self.deref().handshake_extra_parameters() + } + + fn prepare_ls_refs( + &mut self, + _server: &Capabilities, + _arguments: &mut Vec<BString>, + _features: &mut Vec<(&str, Option<Cow<'_, str>>)>, + ) -> io::Result<ls_refs::Action> { + self.deref_mut().prepare_ls_refs(_server, _arguments, _features) + } + + fn prepare_fetch( + &mut self, + _version: gix_transport::Protocol, + _server: &Capabilities, + _features: &mut Vec<(&str, Option<Cow<'_, str>>)>, + _refs: &[Ref], + ) -> io::Result<Action> { + self.deref_mut().prepare_fetch(_version, _server, _features, _refs) + } + + fn negotiate( + &mut self, + refs: &[Ref], + arguments: &mut Arguments, + previous_response: Option<&Response>, + ) -> io::Result<Action> { + self.deref_mut().negotiate(refs, arguments, previous_response) + } +} + +impl<T: DelegateBlocking> DelegateBlocking for &mut T { + fn handshake_extra_parameters(&self) -> Vec<(String, Option<String>)> { + self.deref().handshake_extra_parameters() + } + + fn prepare_ls_refs( + &mut self, + _server: &Capabilities, + _arguments: &mut Vec<BString>, + _features: &mut Vec<(&str, Option<Cow<'_, str>>)>, + ) -> io::Result<ls_refs::Action> { + self.deref_mut().prepare_ls_refs(_server, _arguments, _features) + } + + fn prepare_fetch( + &mut self, + _version: gix_transport::Protocol, + _server: &Capabilities, + _features: &mut Vec<(&str, Option<Cow<'_, str>>)>, + _refs: &[Ref], + ) -> io::Result<Action> { + self.deref_mut().prepare_fetch(_version, _server, _features, _refs) + } + + fn negotiate( + &mut self, + refs: &[Ref], + arguments: &mut Arguments, + previous_response: Option<&Response>, + ) -> io::Result<Action> { + self.deref_mut().negotiate(refs, arguments, previous_response) + } +} + +#[cfg(feature = "blocking-client")] +mod blocking_io { + use std::{ + io::{self, BufRead}, + ops::DerefMut, + }; + + use gix_features::progress::Progress; + + use crate::{ + fetch::{DelegateBlocking, Response}, + handshake::Ref, + }; + + /// The protocol delegate is the bare minimal interface needed to fully control the [`fetch`][crate::fetch()] operation. + /// + /// Implementations of this trait are controlled by code with intricate knowledge about how fetching works in protocol version V1 and V2, + /// so you don't have to. + /// Everything is tucked away behind type-safety so 'nothing can go wrong'©. Runtime assertions assure invalid + /// features or arguments don't make it to the server in the first place. + /// Please note that this trait mostly corresponds to what V2 would look like, even though V1 is supported as well. + pub trait Delegate: DelegateBlocking { + /// Receive a pack provided from the given `input`. + /// + /// Use `progress` to emit your own progress messages when decoding the pack. + /// + /// `refs` of the remote side are provided for convenience, along with the parsed `previous_response` response in case you want + /// to check additional acks. + fn receive_pack( + &mut self, + input: impl io::BufRead, + progress: impl Progress, + refs: &[Ref], + previous_response: &Response, + ) -> io::Result<()>; + } + + impl<T: Delegate> Delegate for Box<T> { + fn receive_pack( + &mut self, + input: impl BufRead, + progress: impl Progress, + refs: &[Ref], + previous_response: &Response, + ) -> io::Result<()> { + self.deref_mut().receive_pack(input, progress, refs, previous_response) + } + } + + impl<T: Delegate> Delegate for &mut T { + fn receive_pack( + &mut self, + input: impl BufRead, + progress: impl Progress, + refs: &[Ref], + previous_response: &Response, + ) -> io::Result<()> { + self.deref_mut().receive_pack(input, progress, refs, previous_response) + } + } +} +#[cfg(feature = "blocking-client")] +pub use blocking_io::Delegate; + +#[cfg(feature = "async-client")] +mod async_io { + use std::{io, ops::DerefMut}; + + use async_trait::async_trait; + use futures_io::AsyncBufRead; + use gix_features::progress::Progress; + + use crate::{ + fetch::{DelegateBlocking, Response}, + handshake::Ref, + }; + + /// The protocol delegate is the bare minimal interface needed to fully control the [`fetch`][crate::fetch()] operation. + /// + /// Implementations of this trait are controlled by code with intricate knowledge about how fetching works in protocol version V1 and V2, + /// so you don't have to. + /// Everything is tucked away behind type-safety so 'nothing can go wrong'©. Runtime assertions assure invalid + /// features or arguments don't make it to the server in the first place. + /// Please note that this trait mostly corresponds to what V2 would look like, even though V1 is supported as well. + #[async_trait(?Send)] + pub trait Delegate: DelegateBlocking { + /// Receive a pack provided from the given `input`, and the caller should consider it to be blocking as + /// most operations on the received pack are implemented in a blocking fashion. + /// + /// Use `progress` to emit your own progress messages when decoding the pack. + /// + /// `refs` of the remote side are provided for convenience, along with the parsed `previous_response` response in case you want + /// to check additional acks. + async fn receive_pack( + &mut self, + input: impl AsyncBufRead + Unpin + 'async_trait, + progress: impl Progress, + refs: &[Ref], + previous_response: &Response, + ) -> io::Result<()>; + } + #[async_trait(?Send)] + impl<T: Delegate> Delegate for Box<T> { + async fn receive_pack( + &mut self, + input: impl AsyncBufRead + Unpin + 'async_trait, + progress: impl Progress, + refs: &[Ref], + previous_response: &Response, + ) -> io::Result<()> { + self.deref_mut() + .receive_pack(input, progress, refs, previous_response) + .await + } + } + + #[async_trait(?Send)] + impl<T: Delegate> Delegate for &mut T { + async fn receive_pack( + &mut self, + input: impl AsyncBufRead + Unpin + 'async_trait, + progress: impl Progress, + refs: &[Ref], + previous_response: &Response, + ) -> io::Result<()> { + self.deref_mut() + .receive_pack(input, progress, refs, previous_response) + .await + } + } +} +#[cfg(feature = "async-client")] +pub use async_io::Delegate; + +use crate::ls_refs; diff --git a/vendor/gix-protocol/src/fetch/error.rs b/vendor/gix-protocol/src/fetch/error.rs new file mode 100644 index 000000000..5646ce4ec --- /dev/null +++ b/vendor/gix-protocol/src/fetch/error.rs @@ -0,0 +1,21 @@ +use std::io; + +use gix_transport::client; + +use crate::{fetch::response, handshake, ls_refs}; + +/// The error used in [`fetch()`][crate::fetch()]. +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum Error { + #[error(transparent)] + Handshake(#[from] handshake::Error), + #[error("Could not access repository or failed to read streaming pack file")] + Io(#[from] io::Error), + #[error(transparent)] + Transport(#[from] client::Error), + #[error(transparent)] + LsRefs(#[from] ls_refs::Error), + #[error(transparent)] + Response(#[from] response::Error), +} diff --git a/vendor/gix-protocol/src/fetch/handshake.rs b/vendor/gix-protocol/src/fetch/handshake.rs new file mode 100644 index 000000000..9ffc184a8 --- /dev/null +++ b/vendor/gix-protocol/src/fetch/handshake.rs @@ -0,0 +1,27 @@ +use gix_features::progress::Progress; +use gix_transport::{client, Service}; +use maybe_async::maybe_async; + +use crate::{ + credentials, + handshake::{Error, Outcome}, +}; + +/// Perform a handshake with the server on the other side of `transport`, with `authenticate` being used if authentication +/// turns out to be required. `extra_parameters` are the parameters `(name, optional value)` to add to the handshake, +/// each time it is performed in case authentication is required. +/// `progress` is used to inform about what's currently happening. +#[allow(clippy::result_large_err)] +#[maybe_async] +pub async fn upload_pack<AuthFn, T>( + transport: T, + authenticate: AuthFn, + extra_parameters: Vec<(String, Option<String>)>, + progress: &mut impl Progress, +) -> Result<Outcome, Error> +where + AuthFn: FnMut(credentials::helper::Action) -> credentials::protocol::Result, + T: client::Transport, +{ + crate::handshake(transport, Service::UploadPack, authenticate, extra_parameters, progress).await +} diff --git a/vendor/gix-protocol/src/fetch/mod.rs b/vendor/gix-protocol/src/fetch/mod.rs new file mode 100644 index 000000000..0828ea733 --- /dev/null +++ b/vendor/gix-protocol/src/fetch/mod.rs @@ -0,0 +1,20 @@ +mod arguments; +pub use arguments::Arguments; + +/// +pub mod delegate; +#[cfg(any(feature = "async-client", feature = "blocking-client"))] +pub use delegate::Delegate; +pub use delegate::{Action, DelegateBlocking}; + +mod error; +pub use error::Error; +/// +pub mod response; +pub use response::Response; + +mod handshake; +pub use handshake::upload_pack as handshake; + +#[cfg(test)] +mod tests; diff --git a/vendor/gix-protocol/src/fetch/response/async_io.rs b/vendor/gix-protocol/src/fetch/response/async_io.rs new file mode 100644 index 000000000..7b00d843c --- /dev/null +++ b/vendor/gix-protocol/src/fetch/response/async_io.rs @@ -0,0 +1,136 @@ +use std::io; + +use futures_lite::AsyncBufReadExt; +use gix_transport::{client, Protocol}; + +use crate::fetch::{ + response, + response::{Acknowledgement, ShallowUpdate, WantedRef}, + Response, +}; + +async fn parse_v2_section<T>( + line: &mut String, + reader: &mut (impl client::ExtendedBufRead + Unpin), + res: &mut Vec<T>, + parse: impl Fn(&str) -> Result<T, response::Error>, +) -> Result<bool, response::Error> { + line.clear(); + while reader.read_line(line).await? != 0 { + res.push(parse(line)?); + line.clear(); + } + // End of message, or end of section? + Ok(if reader.stopped_at() == Some(client::MessageKind::Delimiter) { + // try reading more sections + reader.reset(Protocol::V2); + false + } else { + // we are done, there is no pack + true + }) +} + +impl Response { + /// Parse a response of the given `version` of the protocol from `reader`. + pub async fn from_line_reader( + version: Protocol, + reader: &mut (impl client::ExtendedBufRead + Unpin), + ) -> Result<Response, response::Error> { + match version { + Protocol::V1 => { + let mut line = String::new(); + let mut acks = Vec::<Acknowledgement>::new(); + let mut shallows = Vec::<ShallowUpdate>::new(); + let has_pack = 'lines: loop { + line.clear(); + let peeked_line = match reader.peek_data_line().await { + Some(Ok(Ok(line))) => String::from_utf8_lossy(line), + // This special case (hang/block forever) deals with a single NAK being a legitimate EOF sometimes + // Note that this might block forever in stateful connections as there it's not really clear + // if something will be following or not by just looking at the response. Instead you have to know + // the arguments sent to the server and count response lines based on intricate knowledge on how the + // server works. + // For now this is acceptable, as V2 can be used as a workaround, which also is the default. + Some(Err(err)) if err.kind() == io::ErrorKind::UnexpectedEof => break 'lines false, + Some(Err(err)) => return Err(err.into()), + Some(Ok(Err(err))) => return Err(err.into()), + None => { + // maybe we saw a shallow flush packet, let's reset and retry + debug_assert_eq!( + reader.stopped_at(), + Some(client::MessageKind::Flush), + "If this isn't a flush packet, we don't know what's going on" + ); + reader.read_line(&mut line).await?; + reader.reset(Protocol::V1); + match reader.peek_data_line().await { + Some(Ok(Ok(line))) => String::from_utf8_lossy(line), + Some(Err(err)) => return Err(err.into()), + Some(Ok(Err(err))) => return Err(err.into()), + None => break 'lines false, // EOF + } + } + }; + + if Response::parse_v1_ack_or_shallow_or_assume_pack(&mut acks, &mut shallows, &peeked_line) { + break 'lines true; + } + assert_ne!(reader.read_line(&mut line).await?, 0, "consuming a peeked line works"); + }; + Ok(Response { + acks, + shallows, + wanted_refs: vec![], + has_pack, + }) + } + Protocol::V2 => { + // NOTE: We only read acknowledgements and scrub to the pack file, until we have use for the other features + let mut line = String::new(); + reader.reset(Protocol::V2); + let mut acks = Vec::<Acknowledgement>::new(); + let mut shallows = Vec::<ShallowUpdate>::new(); + let mut wanted_refs = Vec::<WantedRef>::new(); + let has_pack = 'section: loop { + line.clear(); + if reader.read_line(&mut line).await? == 0 { + return Err(response::Error::Io(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Could not read message headline", + ))); + }; + + match line.trim_end() { + "acknowledgments" => { + if parse_v2_section(&mut line, reader, &mut acks, Acknowledgement::from_line).await? { + break 'section false; + } + } + "shallow-info" => { + if parse_v2_section(&mut line, reader, &mut shallows, ShallowUpdate::from_line).await? { + break 'section false; + } + } + "wanted-refs" => { + if parse_v2_section(&mut line, reader, &mut wanted_refs, WantedRef::from_line).await? { + break 'section false; + } + } + "packfile" => { + // what follows is the packfile itself, which can be read with a sideband enabled reader + break 'section true; + } + _ => return Err(response::Error::UnknownSectionHeader { header: line }), + } + }; + Ok(Response { + acks, + shallows, + wanted_refs, + has_pack, + }) + } + } + } +} diff --git a/vendor/gix-protocol/src/fetch/response/blocking_io.rs b/vendor/gix-protocol/src/fetch/response/blocking_io.rs new file mode 100644 index 000000000..ca79724e2 --- /dev/null +++ b/vendor/gix-protocol/src/fetch/response/blocking_io.rs @@ -0,0 +1,135 @@ +use std::io; + +use gix_transport::{client, Protocol}; + +use crate::fetch::{ + response, + response::{Acknowledgement, ShallowUpdate, WantedRef}, + Response, +}; + +fn parse_v2_section<T>( + line: &mut String, + reader: &mut impl client::ExtendedBufRead, + res: &mut Vec<T>, + parse: impl Fn(&str) -> Result<T, response::Error>, +) -> Result<bool, response::Error> { + line.clear(); + while reader.read_line(line)? != 0 { + res.push(parse(line)?); + line.clear(); + } + // End of message, or end of section? + Ok(if reader.stopped_at() == Some(client::MessageKind::Delimiter) { + // try reading more sections + reader.reset(Protocol::V2); + false + } else { + // we are done, there is no pack + true + }) +} + +impl Response { + /// Parse a response of the given `version` of the protocol from `reader`. + pub fn from_line_reader( + version: Protocol, + reader: &mut impl client::ExtendedBufRead, + ) -> Result<Response, response::Error> { + match version { + Protocol::V1 => { + let mut line = String::new(); + let mut acks = Vec::<Acknowledgement>::new(); + let mut shallows = Vec::<ShallowUpdate>::new(); + let has_pack = 'lines: loop { + line.clear(); + let peeked_line = match reader.peek_data_line() { + Some(Ok(Ok(line))) => String::from_utf8_lossy(line), + // This special case (hang/block forever) deals with a single NAK being a legitimate EOF sometimes + // Note that this might block forever in stateful connections as there it's not really clear + // if something will be following or not by just looking at the response. Instead you have to know + // the arguments sent to the server and count response lines based on intricate knowledge on how the + // server works. + // For now this is acceptable, as V2 can be used as a workaround, which also is the default. + Some(Err(err)) if err.kind() == io::ErrorKind::UnexpectedEof => break 'lines false, + Some(Err(err)) => return Err(err.into()), + Some(Ok(Err(err))) => return Err(err.into()), + None => { + // maybe we saw a shallow flush packet, let's reset and retry + debug_assert_eq!( + reader.stopped_at(), + Some(client::MessageKind::Flush), + "If this isn't a flush packet, we don't know what's going on" + ); + reader.read_line(&mut line)?; + reader.reset(Protocol::V1); + match reader.peek_data_line() { + Some(Ok(Ok(line))) => String::from_utf8_lossy(line), + Some(Err(err)) => return Err(err.into()), + Some(Ok(Err(err))) => return Err(err.into()), + None => break 'lines false, // EOF + } + } + }; + + if Response::parse_v1_ack_or_shallow_or_assume_pack(&mut acks, &mut shallows, &peeked_line) { + break 'lines true; + } + assert_ne!(reader.read_line(&mut line)?, 0, "consuming a peeked line works"); + }; + Ok(Response { + acks, + shallows, + wanted_refs: vec![], + has_pack, + }) + } + Protocol::V2 => { + // NOTE: We only read acknowledgements and scrub to the pack file, until we have use for the other features + let mut line = String::new(); + reader.reset(Protocol::V2); + let mut acks = Vec::<Acknowledgement>::new(); + let mut shallows = Vec::<ShallowUpdate>::new(); + let mut wanted_refs = Vec::<WantedRef>::new(); + let has_pack = 'section: loop { + line.clear(); + if reader.read_line(&mut line)? == 0 { + return Err(response::Error::Io(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Could not read message headline", + ))); + }; + + match line.trim_end() { + "acknowledgments" => { + if parse_v2_section(&mut line, reader, &mut acks, Acknowledgement::from_line)? { + break 'section false; + } + } + "shallow-info" => { + if parse_v2_section(&mut line, reader, &mut shallows, ShallowUpdate::from_line)? { + break 'section false; + } + } + "wanted-refs" => { + if parse_v2_section(&mut line, reader, &mut wanted_refs, WantedRef::from_line)? { + break 'section false; + } + } + "packfile" => { + // what follows is the packfile itself, which can be read with a sideband enabled reader + break 'section true; + } + _ => return Err(response::Error::UnknownSectionHeader { header: line }), + } + }; + Ok(Response { + acks, + shallows, + wanted_refs, + has_pack, + }) + } + } + } +} diff --git a/vendor/gix-protocol/src/fetch/response/mod.rs b/vendor/gix-protocol/src/fetch/response/mod.rs new file mode 100644 index 000000000..8c99cc872 --- /dev/null +++ b/vendor/gix-protocol/src/fetch/response/mod.rs @@ -0,0 +1,244 @@ +use bstr::BString; +use gix_transport::{client, Protocol}; + +use crate::command::Feature; + +/// The error returned in the [response module][crate::fetch::response]. +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum Error { + #[error("Failed to read from line reader")] + Io(#[source] std::io::Error), + #[error(transparent)] + UploadPack(#[from] gix_transport::packetline::read::Error), + #[error(transparent)] + Transport(#[from] client::Error), + #[error("Currently we require feature {feature:?}, which is not supported by the server")] + MissingServerCapability { feature: &'static str }, + #[error("Encountered an unknown line prefix in {line:?}")] + UnknownLineType { line: String }, + #[error("Unknown or unsupported header: {header:?}")] + UnknownSectionHeader { header: String }, +} + +impl From<std::io::Error> for Error { + fn from(err: std::io::Error) -> Self { + if err.kind() == std::io::ErrorKind::Other { + match err.into_inner() { + Some(err) => match err.downcast::<gix_transport::packetline::read::Error>() { + Ok(err) => Error::UploadPack(*err), + Err(err) => Error::Io(std::io::Error::new(std::io::ErrorKind::Other, err)), + }, + None => Error::Io(std::io::ErrorKind::Other.into()), + } + } else { + Error::Io(err) + } + } +} + +impl gix_transport::IsSpuriousError for Error { + fn is_spurious(&self) -> bool { + match self { + Error::Io(err) => err.is_spurious(), + Error::Transport(err) => err.is_spurious(), + _ => false, + } + } +} + +/// An 'ACK' line received from the server. +#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +pub enum Acknowledgement { + /// The contained `id` is in common. + Common(gix_hash::ObjectId), + /// The server is ready to receive more lines. + Ready, + /// The server isn't ready yet. + Nak, +} + +/// A shallow line received from the server. +#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +pub enum ShallowUpdate { + /// Shallow the given `id`. + Shallow(gix_hash::ObjectId), + /// Don't shallow the given `id` anymore. + Unshallow(gix_hash::ObjectId), +} + +/// A wanted-ref line received from the server. +#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone)] +#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))] +pub struct WantedRef { + /// The object id of the wanted ref, as seen by the server. + pub id: gix_hash::ObjectId, + /// The name of the ref, as requested by the client as a `want-ref` argument. + pub path: BString, +} + +impl ShallowUpdate { + /// Parse a `ShallowUpdate` from a `line` as received to the server. + pub fn from_line(line: &str) -> Result<ShallowUpdate, Error> { + match line.trim_end().split_once(' ') { + Some((prefix, id)) => { + let id = gix_hash::ObjectId::from_hex(id.as_bytes()) + .map_err(|_| Error::UnknownLineType { line: line.to_owned() })?; + Ok(match prefix { + "shallow" => ShallowUpdate::Shallow(id), + "unshallow" => ShallowUpdate::Unshallow(id), + _ => return Err(Error::UnknownLineType { line: line.to_owned() }), + }) + } + None => Err(Error::UnknownLineType { line: line.to_owned() }), + } + } +} + +impl Acknowledgement { + /// Parse an `Acknowledgement` from a `line` as received to the server. + pub fn from_line(line: &str) -> Result<Acknowledgement, Error> { + let mut tokens = line.trim_end().splitn(3, ' '); + match (tokens.next(), tokens.next(), tokens.next()) { + (Some(first), id, description) => Ok(match first { + "ready" => Acknowledgement::Ready, // V2 + "NAK" => Acknowledgement::Nak, // V1 + "ACK" => { + let id = match id { + Some(id) => gix_hash::ObjectId::from_hex(id.as_bytes()) + .map_err(|_| Error::UnknownLineType { line: line.to_owned() })?, + None => return Err(Error::UnknownLineType { line: line.to_owned() }), + }; + if let Some(description) = description { + match description { + "common" => {} + "ready" => return Ok(Acknowledgement::Ready), + _ => return Err(Error::UnknownLineType { line: line.to_owned() }), + } + } + Acknowledgement::Common(id) + } + _ => return Err(Error::UnknownLineType { line: line.to_owned() }), + }), + (None, _, _) => Err(Error::UnknownLineType { line: line.to_owned() }), + } + } + /// Returns the hash of the acknowledged object if this instance acknowledges a common one. + pub fn id(&self) -> Option<&gix_hash::ObjectId> { + match self { + Acknowledgement::Common(id) => Some(id), + _ => None, + } + } +} + +impl WantedRef { + /// Parse a `WantedRef` from a `line` as received from the server. + pub fn from_line(line: &str) -> Result<WantedRef, Error> { + match line.trim_end().split_once(' ') { + Some((id, path)) => { + let id = gix_hash::ObjectId::from_hex(id.as_bytes()) + .map_err(|_| Error::UnknownLineType { line: line.to_owned() })?; + Ok(WantedRef { id, path: path.into() }) + } + None => Err(Error::UnknownLineType { line: line.to_owned() }), + } + } +} + +/// A representation of a complete fetch response +#[derive(Debug)] +pub struct Response { + acks: Vec<Acknowledgement>, + shallows: Vec<ShallowUpdate>, + wanted_refs: Vec<WantedRef>, + has_pack: bool, +} + +impl Response { + /// Return true if the response has a pack which can be read next. + pub fn has_pack(&self) -> bool { + self.has_pack + } + + /// Return an error if the given `features` don't contain the required ones (the ones this implementation needs) + /// for the given `version` of the protocol. + /// + /// Even though technically any set of features supported by the server could work, we only implement the ones that + /// make it easy to maintain all versions with a single code base that aims to be and remain maintainable. + pub fn check_required_features(version: Protocol, features: &[Feature]) -> Result<(), Error> { + match version { + Protocol::V1 => { + let has = |name: &str| features.iter().any(|f| f.0 == name); + // Let's focus on V2 standards, and simply not support old servers to keep our code simpler + if !has("multi_ack_detailed") { + return Err(Error::MissingServerCapability { + feature: "multi_ack_detailed", + }); + } + // It's easy to NOT do sideband for us, but then again, everyone supports it. + // CORRECTION: If side-band is off, it would send the packfile without packet line encoding, + // which is nothing we ever want to deal with (despite it being more efficient). In V2, this + // is not even an option anymore, sidebands are always present. + if !has("side-band") && !has("side-band-64k") { + return Err(Error::MissingServerCapability { + feature: "side-band OR side-band-64k", + }); + } + } + Protocol::V2 => {} + } + Ok(()) + } + + /// Return all acknowledgements [parsed previously][Response::from_line_reader()]. + pub fn acknowledgements(&self) -> &[Acknowledgement] { + &self.acks + } + + /// Return all shallow update lines [parsed previously][Response::from_line_reader()]. + pub fn shallow_updates(&self) -> &[ShallowUpdate] { + &self.shallows + } + + /// Return all wanted-refs [parsed previously][Response::from_line_reader()]. + pub fn wanted_refs(&self) -> &[WantedRef] { + &self.wanted_refs + } +} + +#[cfg(any(feature = "async-client", feature = "blocking-client"))] +impl Response { + /// with a friendly server, we just assume that a non-ack line is a pack line + /// which is our hint to stop here. + fn parse_v1_ack_or_shallow_or_assume_pack( + acks: &mut Vec<Acknowledgement>, + shallows: &mut Vec<ShallowUpdate>, + peeked_line: &str, + ) -> bool { + match Acknowledgement::from_line(peeked_line) { + Ok(ack) => match ack.id() { + Some(id) => { + if !acks.iter().any(|a| a.id() == Some(id)) { + acks.push(ack); + } + } + None => acks.push(ack), + }, + Err(_) => match ShallowUpdate::from_line(peeked_line) { + Ok(shallow) => { + shallows.push(shallow); + } + Err(_) => return true, + }, + }; + false + } +} + +#[cfg(feature = "async-client")] +mod async_io; +#[cfg(feature = "blocking-client")] +mod blocking_io; diff --git a/vendor/gix-protocol/src/fetch/tests.rs b/vendor/gix-protocol/src/fetch/tests.rs new file mode 100644 index 000000000..5a1902ad2 --- /dev/null +++ b/vendor/gix-protocol/src/fetch/tests.rs @@ -0,0 +1,388 @@ +#[cfg(any(feature = "async-client", feature = "blocking-client"))] +mod arguments { + use bstr::ByteSlice; + use gix_transport::Protocol; + + use crate::fetch; + + fn arguments_v1(features: impl IntoIterator<Item = &'static str>) -> fetch::Arguments { + fetch::Arguments::new(Protocol::V1, features.into_iter().map(|n| (n, None)).collect()) + } + + fn arguments_v2(features: impl IntoIterator<Item = &'static str>) -> fetch::Arguments { + fetch::Arguments::new(Protocol::V2, features.into_iter().map(|n| (n, None)).collect()) + } + + struct Transport<T> { + inner: T, + stateful: bool, + } + + #[cfg(feature = "blocking-client")] + mod impls { + use std::borrow::Cow; + + use bstr::BStr; + use gix_transport::{ + client, + client::{Error, MessageKind, RequestWriter, SetServiceResponse, WriteMode}, + Protocol, Service, + }; + + use crate::fetch::tests::arguments::Transport; + + impl<T: client::TransportWithoutIO> client::TransportWithoutIO for Transport<T> { + fn set_identity(&mut self, identity: client::Account) -> Result<(), Error> { + self.inner.set_identity(identity) + } + + fn request( + &mut self, + write_mode: WriteMode, + on_into_read: MessageKind, + ) -> Result<RequestWriter<'_>, Error> { + self.inner.request(write_mode, on_into_read) + } + + fn to_url(&self) -> Cow<'_, BStr> { + self.inner.to_url() + } + + fn supported_protocol_versions(&self) -> &[Protocol] { + self.inner.supported_protocol_versions() + } + + fn connection_persists_across_multiple_requests(&self) -> bool { + self.stateful + } + + fn configure( + &mut self, + config: &dyn std::any::Any, + ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> { + self.inner.configure(config) + } + } + + impl<T: client::Transport> client::Transport for Transport<T> { + fn handshake<'a>( + &mut self, + service: Service, + extra_parameters: &'a [(&'a str, Option<&'a str>)], + ) -> Result<SetServiceResponse<'_>, Error> { + self.inner.handshake(service, extra_parameters) + } + } + } + + #[cfg(feature = "async-client")] + mod impls { + use std::borrow::Cow; + + use async_trait::async_trait; + use bstr::BStr; + use gix_transport::{ + client, + client::{Error, MessageKind, RequestWriter, SetServiceResponse, WriteMode}, + Protocol, Service, + }; + + use crate::fetch::tests::arguments::Transport; + impl<T: client::TransportWithoutIO + Send> client::TransportWithoutIO for Transport<T> { + fn set_identity(&mut self, identity: client::Account) -> Result<(), Error> { + self.inner.set_identity(identity) + } + + fn request( + &mut self, + write_mode: WriteMode, + on_into_read: MessageKind, + ) -> Result<RequestWriter<'_>, Error> { + self.inner.request(write_mode, on_into_read) + } + + fn to_url(&self) -> Cow<'_, BStr> { + self.inner.to_url() + } + + fn supported_protocol_versions(&self) -> &[Protocol] { + self.inner.supported_protocol_versions() + } + + fn connection_persists_across_multiple_requests(&self) -> bool { + self.stateful + } + + fn configure( + &mut self, + config: &dyn std::any::Any, + ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> { + self.inner.configure(config) + } + } + + #[async_trait(?Send)] + impl<T: client::Transport + Send> client::Transport for Transport<T> { + async fn handshake<'a>( + &mut self, + service: Service, + extra_parameters: &'a [(&'a str, Option<&'a str>)], + ) -> Result<SetServiceResponse<'_>, Error> { + self.inner.handshake(service, extra_parameters).await + } + } + } + + fn transport( + out: &mut Vec<u8>, + stateful: bool, + ) -> Transport<gix_transport::client::git::Connection<&'static [u8], &mut Vec<u8>>> { + Transport { + inner: gix_transport::client::git::Connection::new( + &[], + out, + Protocol::V1, // does not matter + b"does/not/matter".as_bstr().to_owned(), + None::<(&str, _)>, + gix_transport::client::git::ConnectMode::Process, // avoid header to be sent + ), + stateful, + } + } + + fn id(hex: &str) -> gix_hash::ObjectId { + gix_hash::ObjectId::from_hex(hex.as_bytes()).expect("expect valid hex id") + } + + mod v1 { + use bstr::ByteSlice; + + use crate::fetch::tests::arguments::{arguments_v1, id, transport}; + + #[maybe_async::test(feature = "blocking-client", async(feature = "async-client", async_std::test))] + async fn include_tag() { + let mut out = Vec::new(); + let mut t = transport(&mut out, true); + let mut arguments = arguments_v1(["include-tag", "feature-b"].iter().cloned()); + assert!(arguments.can_use_include_tag()); + + arguments.use_include_tag(); + arguments.want(id("ff333369de1221f9bfbbe03a3a13e9a09bc1ffff")); + arguments.send(&mut t, true).await.expect("sending to buffer to work"); + assert_eq!( + out.as_bstr(), + b"0048want ff333369de1221f9bfbbe03a3a13e9a09bc1ffff include-tag feature-b +0010include-tag +00000009done +" + .as_bstr() + ); + } + + #[maybe_async::test(feature = "blocking-client", async(feature = "async-client", async_std::test))] + async fn haves_and_wants_for_clone() { + let mut out = Vec::new(); + let mut t = transport(&mut out, true); + let mut arguments = arguments_v1(["feature-a", "feature-b"].iter().cloned()); + assert!( + !arguments.can_use_include_tag(), + "needs to be enabled by features in V1" + ); + + arguments.want(id("7b333369de1221f9bfbbe03a3a13e9a09bc1c907")); + arguments.want(id("ff333369de1221f9bfbbe03a3a13e9a09bc1ffff")); + arguments.send(&mut t, true).await.expect("sending to buffer to work"); + assert_eq!( + out.as_bstr(), + b"0046want 7b333369de1221f9bfbbe03a3a13e9a09bc1c907 feature-a feature-b +0032want ff333369de1221f9bfbbe03a3a13e9a09bc1ffff +00000009done +" + .as_bstr() + ); + } + + #[maybe_async::test(feature = "blocking-client", async(feature = "async-client", async_std::test))] + async fn haves_and_wants_for_fetch_stateless() { + let mut out = Vec::new(); + let mut t = transport(&mut out, false); + let mut arguments = arguments_v1(["feature-a", "shallow", "deepen-since", "deepen-not"].iter().copied()); + + arguments.deepen(1); + arguments.shallow(id("7b333369de1221f9bfbbe03a3a13e9a09bc1c9ff")); + arguments.want(id("7b333369de1221f9bfbbe03a3a13e9a09bc1c907")); + arguments.deepen_since(12345); + arguments.deepen_not("refs/heads/main".into()); + arguments.have(id("0000000000000000000000000000000000000000")); + arguments.send(&mut t, false).await.expect("sending to buffer to work"); + + arguments.have(id("1111111111111111111111111111111111111111")); + arguments.send(&mut t, true).await.expect("sending to buffer to work"); + assert_eq!( + out.as_bstr(), + b"005cwant 7b333369de1221f9bfbbe03a3a13e9a09bc1c907 feature-a shallow deepen-since deepen-not +0035shallow 7b333369de1221f9bfbbe03a3a13e9a09bc1c9ff +000ddeepen 1 +0017deepen-since 12345 +001fdeepen-not refs/heads/main +00000032have 0000000000000000000000000000000000000000 +0000005cwant 7b333369de1221f9bfbbe03a3a13e9a09bc1c907 feature-a shallow deepen-since deepen-not +0035shallow 7b333369de1221f9bfbbe03a3a13e9a09bc1c9ff +000ddeepen 1 +0017deepen-since 12345 +001fdeepen-not refs/heads/main +00000032have 1111111111111111111111111111111111111111 +0009done +" + .as_bstr() + ); + } + + #[maybe_async::test(feature = "blocking-client", async(feature = "async-client", async_std::test))] + async fn haves_and_wants_for_fetch_stateful() { + let mut out = Vec::new(); + let mut t = transport(&mut out, true); + let mut arguments = arguments_v1(["feature-a", "shallow"].iter().copied()); + + arguments.deepen(1); + arguments.want(id("7b333369de1221f9bfbbe03a3a13e9a09bc1c907")); + arguments.have(id("0000000000000000000000000000000000000000")); + arguments.send(&mut t, false).await.expect("sending to buffer to work"); + + arguments.have(id("1111111111111111111111111111111111111111")); + arguments.send(&mut t, true).await.expect("sending to buffer to work"); + assert_eq!( + out.as_bstr(), + b"0044want 7b333369de1221f9bfbbe03a3a13e9a09bc1c907 feature-a shallow +000ddeepen 1 +00000032have 0000000000000000000000000000000000000000 +00000032have 1111111111111111111111111111111111111111 +0009done +" + .as_bstr() + ); + } + } + + mod v2 { + use bstr::ByteSlice; + + use crate::fetch::tests::arguments::{arguments_v2, id, transport}; + + #[maybe_async::test(feature = "blocking-client", async(feature = "async-client", async_std::test))] + async fn include_tag() { + let mut out = Vec::new(); + let mut t = transport(&mut out, true); + let mut arguments = arguments_v2(["does not matter for us here"].iter().copied()); + assert!(arguments.can_use_include_tag(), "always on in V2"); + arguments.use_include_tag(); + + arguments.want(id("ff333369de1221f9bfbbe03a3a13e9a09bc1ffff")); + arguments.send(&mut t, true).await.expect("sending to buffer to work"); + assert_eq!( + out.as_bstr(), + b"0012command=fetch +0001000ethin-pack +000eofs-delta +0010include-tag +0032want ff333369de1221f9bfbbe03a3a13e9a09bc1ffff +0009done +0000" + .as_bstr(), + "we filter features/capabilities without value as these apparently shouldn't be listed (remote dies otherwise)" + ); + } + + #[maybe_async::test(feature = "blocking-client", async(feature = "async-client", async_std::test))] + async fn haves_and_wants_for_clone_stateful() { + let mut out = Vec::new(); + let mut t = transport(&mut out, true); + let mut arguments = arguments_v2(["feature-a", "shallow"].iter().copied()); + + arguments.deepen(1); + arguments.deepen_relative(); + arguments.want(id("7b333369de1221f9bfbbe03a3a13e9a09bc1c907")); + arguments.want(id("ff333369de1221f9bfbbe03a3a13e9a09bc1ffff")); + arguments.send(&mut t, true).await.expect("sending to buffer to work"); + assert_eq!( + out.as_bstr(), + b"0012command=fetch +0001000ethin-pack +000eofs-delta +000ddeepen 1 +0014deepen-relative +0032want 7b333369de1221f9bfbbe03a3a13e9a09bc1c907 +0032want ff333369de1221f9bfbbe03a3a13e9a09bc1ffff +0009done +0000" + .as_bstr(), + "we filter features/capabilities without value as these apparently shouldn't be listed (remote dies otherwise)" + ); + } + + #[maybe_async::test(feature = "blocking-client", async(feature = "async-client", async_std::test))] + async fn haves_and_wants_for_fetch_stateless_and_stateful() { + for is_stateful in &[false, true] { + let mut out = Vec::new(); + let mut t = transport(&mut out, *is_stateful); + let mut arguments = arguments_v2(Some("shallow")); + + arguments.deepen(1); + arguments.deepen_since(12345); + arguments.shallow(id("7b333369de1221f9bfbbe03a3a13e9a09bc1c9ff")); + arguments.want(id("7b333369de1221f9bfbbe03a3a13e9a09bc1c907")); + arguments.deepen_not("refs/heads/main".into()); + arguments.have(id("0000000000000000000000000000000000000000")); + arguments.send(&mut t, false).await.expect("sending to buffer to work"); + + arguments.have(id("1111111111111111111111111111111111111111")); + arguments.send(&mut t, true).await.expect("sending to buffer to work"); + assert_eq!( + out.as_bstr(), + b"0012command=fetch +0001000ethin-pack +000eofs-delta +000ddeepen 1 +0017deepen-since 12345 +0035shallow 7b333369de1221f9bfbbe03a3a13e9a09bc1c9ff +0032want 7b333369de1221f9bfbbe03a3a13e9a09bc1c907 +001fdeepen-not refs/heads/main +0032have 0000000000000000000000000000000000000000 +00000012command=fetch +0001000ethin-pack +000eofs-delta +000ddeepen 1 +0017deepen-since 12345 +0035shallow 7b333369de1221f9bfbbe03a3a13e9a09bc1c9ff +0032want 7b333369de1221f9bfbbe03a3a13e9a09bc1c907 +001fdeepen-not refs/heads/main +0032have 1111111111111111111111111111111111111111 +0009done +0000" + .as_bstr(), + "V2 is stateless by default, so it repeats all but 'haves' in each request" + ); + } + } + + #[maybe_async::test(feature = "blocking-client", async(feature = "async-client", async_std::test))] + async fn ref_in_want() { + let mut out = Vec::new(); + let mut t = transport(&mut out, false); + let mut arguments = arguments_v2(["ref-in-want"].iter().copied()); + + arguments.want_ref(b"refs/heads/main".as_bstr()); + arguments.send(&mut t, true).await.expect("sending to buffer to work"); + assert_eq!( + out.as_bstr(), + b"0012command=fetch +0001000ethin-pack +000eofs-delta +001dwant-ref refs/heads/main +0009done +0000" + .as_bstr() + ) + } + } +} |