diff options
Diffstat (limited to 'third_party/rust/jobserver/src/unix.rs')
-rw-r--r-- | third_party/rust/jobserver/src/unix.rs | 368 |
1 files changed, 368 insertions, 0 deletions
diff --git a/third_party/rust/jobserver/src/unix.rs b/third_party/rust/jobserver/src/unix.rs new file mode 100644 index 0000000000..2474deefc5 --- /dev/null +++ b/third_party/rust/jobserver/src/unix.rs @@ -0,0 +1,368 @@ +use libc::c_int; + +use std::fs::File; +use std::io::{self, Read, Write}; +use std::mem; +use std::mem::MaybeUninit; +use std::os::unix::prelude::*; +use std::process::Command; +use std::ptr; +use std::sync::{Arc, Once}; +use std::thread::{self, Builder, JoinHandle}; +use std::time::Duration; + +#[derive(Debug)] +pub struct Client { + read: File, + write: File, +} + +#[derive(Debug)] +pub struct Acquired { + byte: u8, +} + +impl Client { + pub fn new(mut limit: usize) -> io::Result<Client> { + let client = unsafe { Client::mk()? }; + + // I don't think the character written here matters, but I could be + // wrong! + const BUFFER: [u8; 128] = [b'|'; 128]; + + set_nonblocking(client.write.as_raw_fd(), true)?; + + while limit > 0 { + let n = limit.min(BUFFER.len()); + + (&client.write).write_all(&BUFFER[..n])?; + limit -= n; + } + + set_nonblocking(client.write.as_raw_fd(), false)?; + + Ok(client) + } + + unsafe fn mk() -> io::Result<Client> { + let mut pipes = [0; 2]; + + // Attempt atomically-create-with-cloexec if we can on Linux, + // detected by using the `syscall` function in `libc` to try to work + // with as many kernels/glibc implementations as possible. + #[cfg(target_os = "linux")] + { + use std::sync::atomic::{AtomicBool, Ordering}; + + static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true); + if PIPE2_AVAILABLE.load(Ordering::SeqCst) { + match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) { + -1 => { + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::ENOSYS) { + PIPE2_AVAILABLE.store(false, Ordering::SeqCst); + } else { + return Err(err); + } + } + _ => return Ok(Client::from_fds(pipes[0], pipes[1])), + } + } + } + + cvt(libc::pipe(pipes.as_mut_ptr()))?; + drop(set_cloexec(pipes[0], true)); + drop(set_cloexec(pipes[1], true)); + Ok(Client::from_fds(pipes[0], pipes[1])) + } + + pub unsafe fn open(s: &str) -> Option<Client> { + let mut parts = s.splitn(2, ','); + let read = parts.next().unwrap(); + let write = match parts.next() { + Some(s) => s, + None => return None, + }; + + let read = match read.parse() { + Ok(n) => n, + Err(_) => return None, + }; + let write = match write.parse() { + Ok(n) => n, + Err(_) => return None, + }; + + // Ok so we've got two integers that look like file descriptors, but + // for extra sanity checking let's see if they actually look like + // instances of a pipe before we return the client. + // + // If we're called from `make` *without* the leading + on our rule + // then we'll have `MAKEFLAGS` env vars but won't actually have + // access to the file descriptors. + if is_valid_fd(read) && is_valid_fd(write) { + drop(set_cloexec(read, true)); + drop(set_cloexec(write, true)); + Some(Client::from_fds(read, write)) + } else { + None + } + } + + unsafe fn from_fds(read: c_int, write: c_int) -> Client { + Client { + read: File::from_raw_fd(read), + write: File::from_raw_fd(write), + } + } + + pub fn acquire(&self) -> io::Result<Acquired> { + // Ignore interrupts and keep trying if that happens + loop { + if let Some(token) = self.acquire_allow_interrupts()? { + return Ok(token); + } + } + } + + /// Block waiting for a token, returning `None` if we're interrupted with + /// EINTR. + fn acquire_allow_interrupts(&self) -> io::Result<Option<Acquired>> { + // We don't actually know if the file descriptor here is set in + // blocking or nonblocking mode. AFAIK all released versions of + // `make` use blocking fds for the jobserver, but the unreleased + // version of `make` doesn't. In the unreleased version jobserver + // fds are set to nonblocking and combined with `pselect` + // internally. + // + // Here we try to be compatible with both strategies. We optimistically + // try to read from the file descriptor which then may block, return + // a token or indicate that polling is needed. + // Blocking reads (if possible) allows the kernel to be more selective + // about which readers to wake up when a token is written to the pipe. + // + // We use `poll` here to block this thread waiting for read + // readiness, and then afterwards we perform the `read` itself. If + // the `read` returns that it would block then we start over and try + // again. + // + // Also note that we explicitly don't handle EINTR here. That's used + // to shut us down, so we otherwise punt all errors upwards. + unsafe { + let mut fd: libc::pollfd = mem::zeroed(); + fd.fd = self.read.as_raw_fd(); + fd.events = libc::POLLIN; + loop { + let mut buf = [0]; + match (&self.read).read(&mut buf) { + Ok(1) => return Ok(Some(Acquired { byte: buf[0] })), + Ok(_) => { + return Err(io::Error::new( + io::ErrorKind::Other, + "early EOF on jobserver pipe", + )) + } + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock => { /* fall through to polling */ } + io::ErrorKind::Interrupted => return Ok(None), + _ => return Err(e), + }, + } + + loop { + fd.revents = 0; + if libc::poll(&mut fd, 1, -1) == -1 { + let e = io::Error::last_os_error(); + return match e.kind() { + io::ErrorKind::Interrupted => Ok(None), + _ => Err(e), + }; + } + if fd.revents != 0 { + break; + } + } + } + } + } + + pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> { + // Note that the fd may be nonblocking but we're going to go ahead + // and assume that the writes here are always nonblocking (we can + // always quickly release a token). If that turns out to not be the + // case we'll get an error anyway! + let byte = data.map(|d| d.byte).unwrap_or(b'+'); + match (&self.write).write(&[byte])? { + 1 => Ok(()), + _ => Err(io::Error::new( + io::ErrorKind::Other, + "failed to write token back to jobserver", + )), + } + } + + pub fn string_arg(&self) -> String { + format!("{},{}", self.read.as_raw_fd(), self.write.as_raw_fd()) + } + + pub fn available(&self) -> io::Result<usize> { + let mut len = MaybeUninit::<c_int>::uninit(); + cvt(unsafe { libc::ioctl(self.read.as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?; + Ok(unsafe { len.assume_init() } as usize) + } + + pub fn configure(&self, cmd: &mut Command) { + // Here we basically just want to say that in the child process + // we'll configure the read/write file descriptors to *not* be + // cloexec, so they're inherited across the exec and specified as + // integers through `string_arg` above. + let read = self.read.as_raw_fd(); + let write = self.write.as_raw_fd(); + unsafe { + cmd.pre_exec(move || { + set_cloexec(read, false)?; + set_cloexec(write, false)?; + Ok(()) + }); + } + } +} + +#[derive(Debug)] +pub struct Helper { + thread: JoinHandle<()>, + state: Arc<super::HelperState>, +} + +pub(crate) fn spawn_helper( + client: crate::Client, + state: Arc<super::HelperState>, + mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>, +) -> io::Result<Helper> { + static USR1_INIT: Once = Once::new(); + let mut err = None; + USR1_INIT.call_once(|| unsafe { + let mut new: libc::sigaction = mem::zeroed(); + new.sa_sigaction = sigusr1_handler as usize; + new.sa_flags = libc::SA_SIGINFO as _; + if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 { + err = Some(io::Error::last_os_error()); + } + }); + + if let Some(e) = err.take() { + return Err(e); + } + + let state2 = state.clone(); + let thread = Builder::new().spawn(move || { + state2.for_each_request(|helper| loop { + match client.inner.acquire_allow_interrupts() { + Ok(Some(data)) => { + break f(Ok(crate::Acquired { + client: client.inner.clone(), + data, + disabled: false, + })) + } + Err(e) => break f(Err(e)), + Ok(None) if helper.producer_done() => break, + Ok(None) => {} + } + }); + })?; + + Ok(Helper { thread, state }) +} + +impl Helper { + pub fn join(self) { + let dur = Duration::from_millis(10); + let mut state = self.state.lock(); + debug_assert!(state.producer_done); + + // We need to join our helper thread, and it could be blocked in one + // of two locations. First is the wait for a request, but the + // initial drop of `HelperState` will take care of that. Otherwise + // it may be blocked in `client.acquire()`. We actually have no way + // of interrupting that, so resort to `pthread_kill` as a fallback. + // This signal should interrupt any blocking `read` call with + // `io::ErrorKind::Interrupt` and cause the thread to cleanly exit. + // + // Note that we don't do this forever though since there's a chance + // of bugs, so only do this opportunistically to make a best effort + // at clearing ourselves up. + for _ in 0..100 { + if state.consumer_done { + break; + } + unsafe { + // Ignore the return value here of `pthread_kill`, + // apparently on OSX if you kill a dead thread it will + // return an error, but on other platforms it may not. In + // that sense we don't actually know if this will succeed or + // not! + libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1); + } + state = self + .state + .cvar + .wait_timeout(state, dur) + .unwrap_or_else(|e| e.into_inner()) + .0; + thread::yield_now(); // we really want the other thread to run + } + + // If we managed to actually see the consumer get done, then we can + // definitely wait for the thread. Otherwise it's... off in the ether + // I guess? + if state.consumer_done { + drop(self.thread.join()); + } + } +} + +fn is_valid_fd(fd: c_int) -> bool { + unsafe { libc::fcntl(fd, libc::F_GETFD) != -1 } +} + +fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> { + unsafe { + let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?; + let new = if set { + previous | libc::FD_CLOEXEC + } else { + previous & !libc::FD_CLOEXEC + }; + if new != previous { + cvt(libc::fcntl(fd, libc::F_SETFD, new))?; + } + Ok(()) + } +} + +fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> { + let status_flag = if set { libc::O_NONBLOCK } else { 0 }; + + unsafe { + cvt(libc::fcntl(fd, libc::F_SETFL, status_flag))?; + } + + Ok(()) +} + +fn cvt(t: c_int) -> io::Result<c_int> { + if t == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(t) + } +} + +extern "C" fn sigusr1_handler( + _signum: c_int, + _info: *mut libc::siginfo_t, + _ptr: *mut libc::c_void, +) { + // nothing to do +} |