diff options
Diffstat (limited to 'vendor/gix-filter/src/driver/process/client.rs')
-rw-r--r-- | vendor/gix-filter/src/driver/process/client.rs | 309 |
1 files changed, 309 insertions, 0 deletions
diff --git a/vendor/gix-filter/src/driver/process/client.rs b/vendor/gix-filter/src/driver/process/client.rs new file mode 100644 index 000000000..25635ea0d --- /dev/null +++ b/vendor/gix-filter/src/driver/process/client.rs @@ -0,0 +1,309 @@ +use std::{collections::HashSet, io::Write, str::FromStr}; + +use bstr::{BStr, BString, ByteVec}; + +use crate::driver::{ + process, + process::{Capabilities, Client, PacketlineReader}, +}; + +/// +pub mod handshake { + /// The error returned by [Client::handshake()][super::Client::handshake()]. + #[derive(Debug, thiserror::Error)] + #[allow(missing_docs)] + pub enum Error { + #[error("Failed to read or write to the process")] + Io(#[from] std::io::Error), + #[error("{msg} '{actual}'")] + Protocol { msg: String, actual: String }, + #[error("The server sent the '{name}' capability which isn't among the ones we desire can support")] + UnsupportedCapability { name: String }, + } +} + +/// +pub mod invoke { + /// The error returned by [Client::invoke()][super::Client::invoke()]. + #[derive(Debug, thiserror::Error)] + #[allow(missing_docs)] + pub enum Error { + #[error("Failed to read or write to the process")] + Io(#[from] std::io::Error), + } + + /// + pub mod without_content { + /// The error returned by [Client::invoke_without_content()][super::super::Client::invoke_without_content()]. + #[derive(Debug, thiserror::Error)] + #[allow(missing_docs)] + pub enum Error { + #[error("Failed to read or write to the process")] + Io(#[from] std::io::Error), + #[error(transparent)] + PacketlineDecode(#[from] gix_packetline::decode::Error), + } + + impl From<super::Error> for Error { + fn from(value: super::Error) -> Self { + match value { + super::Error::Io(err) => Error::Io(err), + } + } + } + } +} + +/// Protocol implementation +impl Client { + /// Given a spawned `process` as created from `cmd`, use the 'long-running-process' protocol to send `welcome-prefix` and supported + /// `versions`, along with the `desired_capabilities`, and perform the handshake to negotiate a version to use along with + /// obtaining supported capabilities, which may be a sub-set of the desired capabilities. + pub fn handshake( + mut process: std::process::Child, + welcome_prefix: &str, + versions: &[usize], + desired_capabilities: &[&str], + ) -> Result<Self, handshake::Error> { + let mut out = gix_packetline::Writer::new(process.stdin.take().expect("configured stdin when spawning")); + out.write_all(format!("{welcome_prefix}-client").as_bytes())?; + for version in versions { + out.write_all(format!("version={version}").as_bytes())?; + } + gix_packetline::encode::flush_to_write(out.inner_mut())?; + out.flush()?; + + let mut input = gix_packetline::StreamingPeekableIter::new( + process.stdout.take().expect("configured stdout when spawning"), + &[gix_packetline::PacketLineRef::Flush], + ); + let mut read = input.as_read(); + let mut buf = String::new(); + read.read_line_to_string(&mut buf)?; + if buf + .strip_prefix(welcome_prefix) + .map_or(true, |rest| rest.trim_end() != "-server") + { + return Err(handshake::Error::Protocol { + msg: format!("Wanted '{welcome_prefix}-server, got "), + actual: buf, + }); + } + + let chosen_version; + buf.clear(); + read.read_line_to_string(&mut buf)?; + match buf + .strip_prefix("version=") + .and_then(|version| usize::from_str(version.trim_end()).ok()) + { + Some(version) => { + chosen_version = version; + } + None => { + return Err(handshake::Error::Protocol { + msg: "Needed 'version=<integer>', got ".into(), + actual: buf, + }) + } + } + + if !versions.contains(&chosen_version) { + return Err(handshake::Error::Protocol { + msg: format!("Server offered {chosen_version}, we only support "), + actual: versions.iter().map(ToString::to_string).collect::<Vec<_>>().join(", "), + }); + } + + if read.read_line_to_string(&mut buf)? != 0 { + return Err(handshake::Error::Protocol { + msg: "expected flush packet, got".into(), + actual: buf, + }); + } + for capability in desired_capabilities { + out.write_all(format!("capability={capability}").as_bytes())?; + } + gix_packetline::encode::flush_to_write(out.inner_mut())?; + out.flush()?; + + read.reset_with(&[gix_packetline::PacketLineRef::Flush]); + let mut capabilities = HashSet::new(); + loop { + buf.clear(); + let num_read = read.read_line_to_string(&mut buf)?; + if num_read == 0 { + break; + } + match buf.strip_prefix("capability=") { + Some(cap) => { + let cap = cap.trim_end(); + if !desired_capabilities.contains(&cap) { + return Err(handshake::Error::UnsupportedCapability { name: cap.into() }); + } + capabilities.insert(cap.to_owned()); + } + None => continue, + } + } + + drop(read); + Ok(Client { + child: process, + out: input, + input: out, + capabilities, + version: chosen_version, + }) + } + + /// Invoke `command` and send all `meta` data before sending all `content` in full. + pub fn invoke( + &mut self, + command: &str, + meta: &mut dyn Iterator<Item = (&str, BString)>, + content: &mut dyn std::io::Read, + ) -> Result<process::Status, invoke::Error> { + self.send_command_and_meta(command, meta)?; + std::io::copy(content, &mut self.input)?; + gix_packetline::encode::flush_to_write(self.input.inner_mut())?; + self.input.flush()?; + Ok(self.read_status()?) + } + + /// Invoke `command` while passing `meta` data, but don't send any content, and return their status. + /// Call `inspect_line` for each line that we see as command response. + /// + /// This is for commands that don't expect a content stream. + pub fn invoke_without_content<'a>( + &mut self, + command: &str, + meta: &mut dyn Iterator<Item = (&'a str, BString)>, + inspect_line: &mut dyn FnMut(&BStr), + ) -> Result<process::Status, invoke::without_content::Error> { + self.send_command_and_meta(command, meta)?; + while let Some(data) = self.out.read_line() { + let line = data??; + if let Some(line) = line.as_bstr() { + inspect_line(line); + } + } + self.out.reset_with(&[gix_packetline::PacketLineRef::Flush]); + let status = self.read_status()?; + Ok(status) + } + + /// Return a `Read` implementation that reads the server process output until the next flush package, and validates + /// the status. If the status indicates failure, the last read will also fail. + pub fn as_read(&mut self) -> impl std::io::Read + '_ { + self.out.reset_with(&[gix_packetline::PacketLineRef::Flush]); + ReadProcessOutputAndStatus { + inner: self.out.as_read(), + } + } + + /// Read a `status=` line from the process output until it is exhausted. + /// Note that the last sent status line wins and no status line means that the `Previous` still counts. + pub fn read_status(&mut self) -> std::io::Result<process::Status> { + read_status(&mut self.out.as_read()) + } +} + +impl Client { + fn send_command_and_meta( + &mut self, + command: &str, + meta: &mut dyn Iterator<Item = (&str, BString)>, + ) -> Result<(), invoke::Error> { + self.input.write_all(format!("command={command}").as_bytes())?; + let mut buf = BString::default(); + for (key, value) in meta { + buf.clear(); + buf.push_str(key); + buf.push(b'='); + buf.push_str(&value); + self.input.write_all(&buf)?; + } + gix_packetline::encode::flush_to_write(self.input.inner_mut())?; + Ok(()) + } +} + +fn read_status(read: &mut PacketlineReader<'_>) -> std::io::Result<process::Status> { + let mut status = process::Status::Previous; + let mut buf = String::new(); + let mut count = 0; + loop { + buf.clear(); + let num_read = read.read_line_to_string(&mut buf)?; + if num_read == 0 { + break; + } + if let Some(name) = buf.strip_prefix("status=") { + status = process::Status::Named(name.trim_end().into()); + } + count += 1; + } + if count > 0 && matches!(status, process::Status::Previous) { + status = process::Status::Unset; + } + read.reset_with(&[gix_packetline::PacketLineRef::Flush]); + Ok(status) +} + +struct ReadProcessOutputAndStatus<'a> { + inner: PacketlineReader<'a>, +} + +impl<'a> std::io::Read for ReadProcessOutputAndStatus<'a> { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { + let num_read = self.inner.read(buf)?; + if num_read == 0 { + self.inner.reset_with(&[gix_packetline::PacketLineRef::Flush]); + let status = read_status(&mut self.inner)?; + if status.is_success() { + Ok(0) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "Process indicated error after reading: {}", + status.message().unwrap_or_default() + ), + )) + } + } else { + Ok(num_read) + } + } +} + +/// Access +impl Client { + /// Return the list of capabilities reported by the serving process. + pub fn capabilities(&self) -> &Capabilities { + &self.capabilities + } + + /// Return the mutable list of capabilities reported by the serving process. + pub fn capabilities_mut(&mut self) -> &mut Capabilities { + &mut self.capabilities + } + + /// Return the negotiated version of the protocol. + /// + /// Note that it is the highest one that both the client and the server support. + pub fn version(&self) -> usize { + self.version + } +} + +/// Lifecycle +impl Client { + /// Return the child handle of the running process. + /// + /// Note that this will naturally close input and output handles, which is a signal for the child process to shutdown. + pub fn into_child(self) -> std::process::Child { + self.child + } +} |