summaryrefslogtreecommitdiffstats
path: root/vendor/gix/src/remote/connection/fetch/receive_pack.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gix/src/remote/connection/fetch/receive_pack.rs')
-rw-r--r--vendor/gix/src/remote/connection/fetch/receive_pack.rs296
1 files changed, 177 insertions, 119 deletions
diff --git a/vendor/gix/src/remote/connection/fetch/receive_pack.rs b/vendor/gix/src/remote/connection/fetch/receive_pack.rs
index 99560fbca..7837a9d3a 100644
--- a/vendor/gix/src/remote/connection/fetch/receive_pack.rs
+++ b/vendor/gix/src/remote/connection/fetch/receive_pack.rs
@@ -1,18 +1,26 @@
-use std::sync::atomic::{AtomicBool, Ordering};
+use std::{
+ ops::DerefMut,
+ sync::atomic::{AtomicBool, Ordering},
+};
-use gix_odb::FindExt;
+use gix_odb::{store::RefreshMode, FindExt};
use gix_protocol::{
fetch::Arguments,
transport::{client::Transport, packetline::read::ProgressAction},
};
use crate::{
- config::tree::Clone,
+ config::{
+ cache::util::ApplyLeniency,
+ tree::{Clone, Fetch, Key},
+ },
remote,
remote::{
connection::fetch::config,
fetch,
- fetch::{negotiate, refs, Error, Outcome, Prepare, ProgressId, RefLogMessage, Shallow, Status},
+ fetch::{
+ negotiate, negotiate::Algorithm, refs, Error, Outcome, Prepare, ProgressId, RefLogMessage, Shallow, Status,
+ },
},
Progress, Repository,
};
@@ -99,9 +107,6 @@ where
}
let (shallow_commits, mut shallow_lock) = add_shallow_args(&mut arguments, &self.shallow, repo)?;
- let mut previous_response = None::<gix_protocol::fetch::Response>;
- let mut round = 1;
-
if self.ref_map.object_hash != repo.object_hash() {
return Err(Error::IncompatibleObjectHash {
local: repo.object_hash(),
@@ -109,118 +114,161 @@ where
});
}
- let reader = 'negotiation: loop {
- progress.step();
- progress.set_name(format!("negotiate (round {round})"));
-
- let is_done = match negotiate::one_round(
- negotiate::Algorithm::Naive,
- round,
- repo,
- &self.ref_map,
- con.remote.fetch_tags,
- &mut arguments,
- previous_response.as_ref(),
- (self.shallow != Shallow::NoChange).then_some(&self.shallow),
- ) {
- Ok(_) if arguments.is_empty() => {
- gix_protocol::indicate_end_of_interaction(&mut con.transport).await.ok();
- let update_refs = refs::update(
- repo,
- self.reflog_message
- .take()
- .unwrap_or_else(|| RefLogMessage::Prefixed { action: "fetch".into() }),
- &self.ref_map.mappings,
- con.remote.refspecs(remote::Direction::Fetch),
- &self.ref_map.extra_refspecs,
- con.remote.fetch_tags,
- self.dry_run,
- self.write_packed_refs,
- )?;
- return Ok(Outcome {
- ref_map: std::mem::take(&mut self.ref_map),
- status: Status::NoPackReceived { update_refs },
- });
- }
- Ok(is_done) => is_done,
- Err(err) => {
- gix_protocol::indicate_end_of_interaction(&mut con.transport).await.ok();
- return Err(err.into());
- }
- };
- round += 1;
- let mut reader = arguments.send(&mut con.transport, is_done).await?;
- if sideband_all {
- setup_remote_progress(progress, &mut reader, should_interrupt);
- }
- let response = gix_protocol::fetch::Response::from_line_reader(protocol_version, &mut reader).await?;
- if response.has_pack() {
- progress.step();
- progress.set_name("receiving pack");
- if !sideband_all {
- setup_remote_progress(progress, &mut reader, should_interrupt);
- }
- previous_response = Some(response);
- break 'negotiation reader;
- } else {
- previous_response = Some(response);
- }
+ let mut negotiator = repo
+ .config
+ .resolved
+ .string_by_key(Fetch::NEGOTIATION_ALGORITHM.logical_name().as_str())
+ .map(|n| Fetch::NEGOTIATION_ALGORITHM.try_into_negotiation_algorithm(n))
+ .transpose()
+ .with_leniency(repo.config.lenient_config)?
+ .unwrap_or(Algorithm::Consecutive)
+ .into_negotiator();
+ let graph_repo = {
+ let mut r = repo.clone();
+ // assure that checking for unknown server refs doesn't trigger ODB refreshes.
+ r.objects.refresh = RefreshMode::Never;
+ // we cache everything of importance in the graph and thus don't need an object cache.
+ r.objects.unset_object_cache();
+ r
};
- let previous_response = previous_response.expect("knowledge of a pack means a response was received");
- if !previous_response.shallow_updates().is_empty() && shallow_lock.is_none() {
- let reject_shallow_remote = repo
- .config
- .resolved
- .boolean_filter_by_key("clone.rejectShallow", &mut repo.filter_config_section())
- .map(|val| Clone::REJECT_SHALLOW.enrich_error(val))
- .transpose()?
- .unwrap_or(false);
- if reject_shallow_remote {
- return Err(Error::RejectShallowRemote);
+ let mut graph = graph_repo.commit_graph();
+ let action = negotiate::mark_complete_and_common_ref(
+ &graph_repo,
+ negotiator.deref_mut(),
+ &mut graph,
+ &self.ref_map,
+ &self.shallow,
+ )?;
+ let mut previous_response = None::<gix_protocol::fetch::Response>;
+ let mut round = 1;
+ let mut write_pack_bundle = match &action {
+ negotiate::Action::NoChange | negotiate::Action::SkipToRefUpdate => {
+ gix_protocol::indicate_end_of_interaction(&mut con.transport).await.ok();
+ None
}
- shallow_lock = acquire_shallow_lock(repo).map(Some)?;
- }
+ negotiate::Action::MustNegotiate {
+ remote_ref_target_known,
+ } => {
+ negotiate::add_wants(
+ repo,
+ &mut arguments,
+ &self.ref_map,
+ remote_ref_target_known,
+ &self.shallow,
+ con.remote.fetch_tags,
+ );
+ let is_stateless =
+ arguments.is_stateless(!con.transport.connection_persists_across_multiple_requests());
+ let mut haves_to_send = gix_negotiate::window_size(is_stateless, None);
+ let mut seen_ack = false;
+ let mut in_vain = 0;
+ let mut common = is_stateless.then(Vec::new);
+ let reader = 'negotiation: loop {
+ progress.step();
+ progress.set_name(format!("negotiate (round {round})"));
- let options = gix_pack::bundle::write::Options {
- thread_limit: config::index_threads(repo)?,
- index_version: config::pack_index_version(repo)?,
- iteration_mode: gix_pack::data::input::Mode::Verify,
- object_hash: con.remote.repo.object_hash(),
- };
+ let is_done = match negotiate::one_round(
+ negotiator.deref_mut(),
+ &mut graph,
+ haves_to_send,
+ &mut arguments,
+ previous_response.as_ref(),
+ common.as_mut(),
+ ) {
+ Ok((haves_sent, ack_seen)) => {
+ if ack_seen {
+ in_vain = 0;
+ }
+ seen_ack |= ack_seen;
+ in_vain += haves_sent;
+ let is_done = haves_sent != haves_to_send || (seen_ack && in_vain >= 256);
+ haves_to_send = gix_negotiate::window_size(is_stateless, haves_to_send);
+ is_done
+ }
+ Err(err) => {
+ gix_protocol::indicate_end_of_interaction(&mut con.transport).await.ok();
+ return Err(err.into());
+ }
+ };
+ let mut reader = arguments.send(&mut con.transport, is_done).await?;
+ if sideband_all {
+ setup_remote_progress(progress, &mut reader, should_interrupt);
+ }
+ let response =
+ gix_protocol::fetch::Response::from_line_reader(protocol_version, &mut reader, is_done).await?;
+ let has_pack = response.has_pack();
+ previous_response = Some(response);
+ if has_pack {
+ progress.step();
+ progress.set_name("receiving pack");
+ if !sideband_all {
+ setup_remote_progress(progress, &mut reader, should_interrupt);
+ }
+ break 'negotiation reader;
+ } else {
+ round += 1;
+ }
+ };
+ drop(graph);
+ drop(graph_repo);
+ let previous_response = previous_response.expect("knowledge of a pack means a response was received");
+ if !previous_response.shallow_updates().is_empty() && shallow_lock.is_none() {
+ let reject_shallow_remote = repo
+ .config
+ .resolved
+ .boolean_filter_by_key("clone.rejectShallow", &mut repo.filter_config_section())
+ .map(|val| Clone::REJECT_SHALLOW.enrich_error(val))
+ .transpose()?
+ .unwrap_or(false);
+ if reject_shallow_remote {
+ return Err(Error::RejectShallowRemote);
+ }
+ shallow_lock = acquire_shallow_lock(repo).map(Some)?;
+ }
- let mut write_pack_bundle = if matches!(self.dry_run, fetch::DryRun::No) {
- Some(gix_pack::Bundle::write_to_directory(
- #[cfg(feature = "async-network-client")]
- {
- gix_protocol::futures_lite::io::BlockOn::new(reader)
- },
- #[cfg(not(feature = "async-network-client"))]
- {
- reader
- },
- Some(repo.objects.store_ref().path().join("pack")),
- progress,
- should_interrupt,
- Some(Box::new({
- let repo = repo.clone();
- move |oid, buf| repo.objects.find(oid, buf).ok()
- })),
- options,
- )?)
- } else {
- drop(reader);
- None
- };
+ let options = gix_pack::bundle::write::Options {
+ thread_limit: config::index_threads(repo)?,
+ index_version: config::pack_index_version(repo)?,
+ iteration_mode: gix_pack::data::input::Mode::Verify,
+ object_hash: con.remote.repo.object_hash(),
+ };
- if matches!(protocol_version, gix_protocol::transport::Protocol::V2) {
- gix_protocol::indicate_end_of_interaction(&mut con.transport).await.ok();
- }
+ let write_pack_bundle = if matches!(self.dry_run, fetch::DryRun::No) {
+ Some(gix_pack::Bundle::write_to_directory(
+ #[cfg(feature = "async-network-client")]
+ {
+ gix_protocol::futures_lite::io::BlockOn::new(reader)
+ },
+ #[cfg(not(feature = "async-network-client"))]
+ {
+ reader
+ },
+ Some(repo.objects.store_ref().path().join("pack")),
+ progress,
+ should_interrupt,
+ Some(Box::new({
+ let repo = repo.clone();
+ move |oid, buf| repo.objects.find(oid, buf).ok()
+ })),
+ options,
+ )?)
+ } else {
+ drop(reader);
+ None
+ };
+
+ if matches!(protocol_version, gix_protocol::transport::Protocol::V2) {
+ gix_protocol::indicate_end_of_interaction(&mut con.transport).await.ok();
+ }
- if let Some(shallow_lock) = shallow_lock {
- if !previous_response.shallow_updates().is_empty() {
- crate::shallow::write(shallow_lock, shallow_commits, previous_response.shallow_updates())?;
+ if let Some(shallow_lock) = shallow_lock {
+ if !previous_response.shallow_updates().is_empty() {
+ crate::shallow::write(shallow_lock, shallow_commits, previous_response.shallow_updates())?;
+ }
+ }
+ write_pack_bundle
}
- }
+ };
let update_refs = refs::update(
repo,
@@ -243,16 +291,26 @@ where
}
}
- Ok(Outcome {
+ let out = Outcome {
ref_map: std::mem::take(&mut self.ref_map),
- status: match write_pack_bundle {
- Some(write_pack_bundle) => Status::Change {
- write_pack_bundle,
+ status: if matches!(self.dry_run, fetch::DryRun::Yes) {
+ assert!(write_pack_bundle.is_none(), "in dry run we never read a bundle");
+ Status::DryRun {
update_refs,
- },
- None => Status::DryRun { update_refs },
+ negotiation_rounds: round,
+ }
+ } else {
+ match write_pack_bundle {
+ Some(write_pack_bundle) => Status::Change {
+ write_pack_bundle,
+ update_refs,
+ negotiation_rounds: round,
+ },
+ None => Status::NoPackReceived { update_refs },
+ }
},
- })
+ };
+ Ok(out)
}
}