diff options
Diffstat (limited to 'vendor/gix/src/remote/connection/fetch/receive_pack.rs')
-rw-r--r-- | vendor/gix/src/remote/connection/fetch/receive_pack.rs | 296 |
1 files changed, 177 insertions, 119 deletions
diff --git a/vendor/gix/src/remote/connection/fetch/receive_pack.rs b/vendor/gix/src/remote/connection/fetch/receive_pack.rs index 99560fbca..7837a9d3a 100644 --- a/vendor/gix/src/remote/connection/fetch/receive_pack.rs +++ b/vendor/gix/src/remote/connection/fetch/receive_pack.rs @@ -1,18 +1,26 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::{ + ops::DerefMut, + sync::atomic::{AtomicBool, Ordering}, +}; -use gix_odb::FindExt; +use gix_odb::{store::RefreshMode, FindExt}; use gix_protocol::{ fetch::Arguments, transport::{client::Transport, packetline::read::ProgressAction}, }; use crate::{ - config::tree::Clone, + config::{ + cache::util::ApplyLeniency, + tree::{Clone, Fetch, Key}, + }, remote, remote::{ connection::fetch::config, fetch, - fetch::{negotiate, refs, Error, Outcome, Prepare, ProgressId, RefLogMessage, Shallow, Status}, + fetch::{ + negotiate, negotiate::Algorithm, refs, Error, Outcome, Prepare, ProgressId, RefLogMessage, Shallow, Status, + }, }, Progress, Repository, }; @@ -99,9 +107,6 @@ where } let (shallow_commits, mut shallow_lock) = add_shallow_args(&mut arguments, &self.shallow, repo)?; - let mut previous_response = None::<gix_protocol::fetch::Response>; - let mut round = 1; - if self.ref_map.object_hash != repo.object_hash() { return Err(Error::IncompatibleObjectHash { local: repo.object_hash(), @@ -109,118 +114,161 @@ where }); } - let reader = 'negotiation: loop { - progress.step(); - progress.set_name(format!("negotiate (round {round})")); - - let is_done = match negotiate::one_round( - negotiate::Algorithm::Naive, - round, - repo, - &self.ref_map, - con.remote.fetch_tags, - &mut arguments, - previous_response.as_ref(), - (self.shallow != Shallow::NoChange).then_some(&self.shallow), - ) { - Ok(_) if arguments.is_empty() => { - gix_protocol::indicate_end_of_interaction(&mut con.transport).await.ok(); - let update_refs = refs::update( - repo, - self.reflog_message - .take() - .unwrap_or_else(|| RefLogMessage::Prefixed { action: "fetch".into() }), - &self.ref_map.mappings, - con.remote.refspecs(remote::Direction::Fetch), - &self.ref_map.extra_refspecs, - con.remote.fetch_tags, - self.dry_run, - self.write_packed_refs, - )?; - return Ok(Outcome { - ref_map: std::mem::take(&mut self.ref_map), - status: Status::NoPackReceived { update_refs }, - }); - } - Ok(is_done) => is_done, - Err(err) => { - gix_protocol::indicate_end_of_interaction(&mut con.transport).await.ok(); - return Err(err.into()); - } - }; - round += 1; - let mut reader = arguments.send(&mut con.transport, is_done).await?; - if sideband_all { - setup_remote_progress(progress, &mut reader, should_interrupt); - } - let response = gix_protocol::fetch::Response::from_line_reader(protocol_version, &mut reader).await?; - if response.has_pack() { - progress.step(); - progress.set_name("receiving pack"); - if !sideband_all { - setup_remote_progress(progress, &mut reader, should_interrupt); - } - previous_response = Some(response); - break 'negotiation reader; - } else { - previous_response = Some(response); - } + let mut negotiator = repo + .config + .resolved + .string_by_key(Fetch::NEGOTIATION_ALGORITHM.logical_name().as_str()) + .map(|n| Fetch::NEGOTIATION_ALGORITHM.try_into_negotiation_algorithm(n)) + .transpose() + .with_leniency(repo.config.lenient_config)? + .unwrap_or(Algorithm::Consecutive) + .into_negotiator(); + let graph_repo = { + let mut r = repo.clone(); + // assure that checking for unknown server refs doesn't trigger ODB refreshes. + r.objects.refresh = RefreshMode::Never; + // we cache everything of importance in the graph and thus don't need an object cache. + r.objects.unset_object_cache(); + r }; - let previous_response = previous_response.expect("knowledge of a pack means a response was received"); - if !previous_response.shallow_updates().is_empty() && shallow_lock.is_none() { - let reject_shallow_remote = repo - .config - .resolved - .boolean_filter_by_key("clone.rejectShallow", &mut repo.filter_config_section()) - .map(|val| Clone::REJECT_SHALLOW.enrich_error(val)) - .transpose()? - .unwrap_or(false); - if reject_shallow_remote { - return Err(Error::RejectShallowRemote); + let mut graph = graph_repo.commit_graph(); + let action = negotiate::mark_complete_and_common_ref( + &graph_repo, + negotiator.deref_mut(), + &mut graph, + &self.ref_map, + &self.shallow, + )?; + let mut previous_response = None::<gix_protocol::fetch::Response>; + let mut round = 1; + let mut write_pack_bundle = match &action { + negotiate::Action::NoChange | negotiate::Action::SkipToRefUpdate => { + gix_protocol::indicate_end_of_interaction(&mut con.transport).await.ok(); + None } - shallow_lock = acquire_shallow_lock(repo).map(Some)?; - } + negotiate::Action::MustNegotiate { + remote_ref_target_known, + } => { + negotiate::add_wants( + repo, + &mut arguments, + &self.ref_map, + remote_ref_target_known, + &self.shallow, + con.remote.fetch_tags, + ); + let is_stateless = + arguments.is_stateless(!con.transport.connection_persists_across_multiple_requests()); + let mut haves_to_send = gix_negotiate::window_size(is_stateless, None); + let mut seen_ack = false; + let mut in_vain = 0; + let mut common = is_stateless.then(Vec::new); + let reader = 'negotiation: loop { + progress.step(); + progress.set_name(format!("negotiate (round {round})")); - let options = gix_pack::bundle::write::Options { - thread_limit: config::index_threads(repo)?, - index_version: config::pack_index_version(repo)?, - iteration_mode: gix_pack::data::input::Mode::Verify, - object_hash: con.remote.repo.object_hash(), - }; + let is_done = match negotiate::one_round( + negotiator.deref_mut(), + &mut graph, + haves_to_send, + &mut arguments, + previous_response.as_ref(), + common.as_mut(), + ) { + Ok((haves_sent, ack_seen)) => { + if ack_seen { + in_vain = 0; + } + seen_ack |= ack_seen; + in_vain += haves_sent; + let is_done = haves_sent != haves_to_send || (seen_ack && in_vain >= 256); + haves_to_send = gix_negotiate::window_size(is_stateless, haves_to_send); + is_done + } + Err(err) => { + gix_protocol::indicate_end_of_interaction(&mut con.transport).await.ok(); + return Err(err.into()); + } + }; + let mut reader = arguments.send(&mut con.transport, is_done).await?; + if sideband_all { + setup_remote_progress(progress, &mut reader, should_interrupt); + } + let response = + gix_protocol::fetch::Response::from_line_reader(protocol_version, &mut reader, is_done).await?; + let has_pack = response.has_pack(); + previous_response = Some(response); + if has_pack { + progress.step(); + progress.set_name("receiving pack"); + if !sideband_all { + setup_remote_progress(progress, &mut reader, should_interrupt); + } + break 'negotiation reader; + } else { + round += 1; + } + }; + drop(graph); + drop(graph_repo); + let previous_response = previous_response.expect("knowledge of a pack means a response was received"); + if !previous_response.shallow_updates().is_empty() && shallow_lock.is_none() { + let reject_shallow_remote = repo + .config + .resolved + .boolean_filter_by_key("clone.rejectShallow", &mut repo.filter_config_section()) + .map(|val| Clone::REJECT_SHALLOW.enrich_error(val)) + .transpose()? + .unwrap_or(false); + if reject_shallow_remote { + return Err(Error::RejectShallowRemote); + } + shallow_lock = acquire_shallow_lock(repo).map(Some)?; + } - let mut write_pack_bundle = if matches!(self.dry_run, fetch::DryRun::No) { - Some(gix_pack::Bundle::write_to_directory( - #[cfg(feature = "async-network-client")] - { - gix_protocol::futures_lite::io::BlockOn::new(reader) - }, - #[cfg(not(feature = "async-network-client"))] - { - reader - }, - Some(repo.objects.store_ref().path().join("pack")), - progress, - should_interrupt, - Some(Box::new({ - let repo = repo.clone(); - move |oid, buf| repo.objects.find(oid, buf).ok() - })), - options, - )?) - } else { - drop(reader); - None - }; + let options = gix_pack::bundle::write::Options { + thread_limit: config::index_threads(repo)?, + index_version: config::pack_index_version(repo)?, + iteration_mode: gix_pack::data::input::Mode::Verify, + object_hash: con.remote.repo.object_hash(), + }; - if matches!(protocol_version, gix_protocol::transport::Protocol::V2) { - gix_protocol::indicate_end_of_interaction(&mut con.transport).await.ok(); - } + let write_pack_bundle = if matches!(self.dry_run, fetch::DryRun::No) { + Some(gix_pack::Bundle::write_to_directory( + #[cfg(feature = "async-network-client")] + { + gix_protocol::futures_lite::io::BlockOn::new(reader) + }, + #[cfg(not(feature = "async-network-client"))] + { + reader + }, + Some(repo.objects.store_ref().path().join("pack")), + progress, + should_interrupt, + Some(Box::new({ + let repo = repo.clone(); + move |oid, buf| repo.objects.find(oid, buf).ok() + })), + options, + )?) + } else { + drop(reader); + None + }; + + if matches!(protocol_version, gix_protocol::transport::Protocol::V2) { + gix_protocol::indicate_end_of_interaction(&mut con.transport).await.ok(); + } - if let Some(shallow_lock) = shallow_lock { - if !previous_response.shallow_updates().is_empty() { - crate::shallow::write(shallow_lock, shallow_commits, previous_response.shallow_updates())?; + if let Some(shallow_lock) = shallow_lock { + if !previous_response.shallow_updates().is_empty() { + crate::shallow::write(shallow_lock, shallow_commits, previous_response.shallow_updates())?; + } + } + write_pack_bundle } - } + }; let update_refs = refs::update( repo, @@ -243,16 +291,26 @@ where } } - Ok(Outcome { + let out = Outcome { ref_map: std::mem::take(&mut self.ref_map), - status: match write_pack_bundle { - Some(write_pack_bundle) => Status::Change { - write_pack_bundle, + status: if matches!(self.dry_run, fetch::DryRun::Yes) { + assert!(write_pack_bundle.is_none(), "in dry run we never read a bundle"); + Status::DryRun { update_refs, - }, - None => Status::DryRun { update_refs }, + negotiation_rounds: round, + } + } else { + match write_pack_bundle { + Some(write_pack_bundle) => Status::Change { + write_pack_bundle, + update_refs, + negotiation_rounds: round, + }, + None => Status::NoPackReceived { update_refs }, + } }, - }) + }; + Ok(out) } } |