//! Process-global interrupt handling //! //! This module contains facilities to globally request an interrupt, which will cause supporting computations to //! abort once it is observed. //! Such checks for interrupts are provided in custom implementations of various traits to transparently add interrupt //! support to methods who wouldn't otherwise by injecting it. see [`Read`]. #[cfg(feature = "interrupt")] mod init { use std::{ io, sync::atomic::{AtomicUsize, Ordering}, }; static DEREGISTER_COUNT: AtomicUsize = AtomicUsize::new(0); static REGISTERED_HOOKS: once_cell::sync::Lazy>> = once_cell::sync::Lazy::new(Default::default); static DEFAULT_BEHAVIOUR_HOOKS: once_cell::sync::Lazy>> = once_cell::sync::Lazy::new(Default::default); /// A type to help deregistering hooks registered with [`init_handler`](super::init_handler()); #[derive(Default)] pub struct Deregister { do_reset: bool, } pub struct AutoDeregister(Deregister); impl Deregister { /// Remove all previously registered handlers, and assure the default behaviour is reinstated, if this is the last available instance. /// /// Note that only the instantiation of the default behaviour can fail. pub fn deregister(self) -> std::io::Result<()> { let mut hooks = REGISTERED_HOOKS.lock(); let count = DEREGISTER_COUNT.fetch_sub(1, Ordering::SeqCst); if count > 1 || hooks.is_empty() { return Ok(()); } if self.do_reset { super::reset(); } for (_, hook_id) in hooks.iter() { signal_hook::low_level::unregister(*hook_id); } let hooks = hooks.drain(..); let mut default_hooks = DEFAULT_BEHAVIOUR_HOOKS.lock(); // Even if dropped, `drain(..)` clears the vec which is a must. for (sig, _) in hooks { // # SAFETY // * we only register a handler that is specifically designed to run in this environment. #[allow(unsafe_code)] unsafe { default_hooks.push(signal_hook::low_level::register(sig, move || { signal_hook::low_level::emulate_default_handler(sig).ok(); })?); } } Ok(()) } /// If called with `toggle` being `true`, when actually deregistering, we will also reset the trigger by /// calling [`reset()`](super::reset()). pub fn with_reset(mut self, toggle: bool) -> Self { self.do_reset = toggle; self } /// Return a type that deregisters all installed signal handlers on drop. pub fn auto_deregister(self) -> AutoDeregister { AutoDeregister(self) } } impl Drop for AutoDeregister { fn drop(&mut self) { std::mem::take(&mut self.0).deregister().ok(); } } /// Initialize a signal handler to listen to SIGINT and SIGTERM and trigger our [`trigger()`](super::trigger()) that way. /// Also trigger `interrupt()` which promises to never use a Mutex, allocate or deallocate, or do anything else that's blocking. /// Use `grace_count` to determine how often the termination signal can be received before it's terminal, e.g. 1 would only terminate /// the application the second time the signal is received. /// Note that only the `grace_count` and `interrupt` of the first call are effective, all others will be ignored. /// /// Use the returned `Deregister` type to explicitly deregister hooks, or to do so automatically. /// /// # Note /// /// It will abort the process on second press and won't inform the user about this behaviour either as we are unable to do so without /// deadlocking even when trying to write to stderr directly. pub fn init_handler( grace_count: usize, interrupt: impl Fn() + Send + Sync + Clone + 'static, ) -> io::Result { let prev_count = DEREGISTER_COUNT.fetch_add(1, Ordering::SeqCst); if prev_count != 0 { // Try to obtain the lock before we return just to wait for the signals to actually be registered. let _guard = REGISTERED_HOOKS.lock(); return Ok(Deregister::default()); } let mut guard = REGISTERED_HOOKS.lock(); if !guard.is_empty() { return Ok(Deregister::default()); } let mut hooks = Vec::with_capacity(signal_hook::consts::TERM_SIGNALS.len()); for sig in signal_hook::consts::TERM_SIGNALS { // # SAFETY // * we only set atomics or call functions that do // * there is no use of the heap let interrupt = interrupt.clone(); #[allow(unsafe_code)] unsafe { let hook_id = signal_hook::low_level::register(*sig, move || { static INTERRUPT_COUNT: AtomicUsize = AtomicUsize::new(0); if !super::is_triggered() { INTERRUPT_COUNT.store(0, Ordering::SeqCst); } let msg_idx = INTERRUPT_COUNT.fetch_add(1, Ordering::SeqCst); if msg_idx == grace_count { gix_tempfile::registry::cleanup_tempfiles_signal_safe(); signal_hook::low_level::emulate_default_handler(*sig).ok(); } interrupt(); super::trigger(); })?; hooks.push((*sig, hook_id)); } } for hook_id in DEFAULT_BEHAVIOUR_HOOKS.lock().drain(..) { signal_hook::low_level::unregister(hook_id); } // This means that they won't setup a handler allowing us to call them right before we actually abort. gix_tempfile::signal::setup(gix_tempfile::signal::handler::Mode::None); *guard = hooks; Ok(Deregister::default()) } } use std::{ io, sync::atomic::{AtomicBool, Ordering}, }; #[cfg(feature = "interrupt")] pub use init::{init_handler, Deregister}; /// A wrapper for an inner iterator which will check for interruptions on each iteration. pub struct Iter { /// The actual iterator to yield elements from. inner: gix_features::interrupt::IterWithErr<'static, I, EFN>, } impl Iter where I: Iterator, EFN: FnOnce() -> E, { /// Create a new iterator over `inner` which checks for interruptions on each iteration and calls `make_err()` to /// signal an interruption happened, causing no further items to be iterated from that point on. pub fn new(inner: I, make_err: EFN) -> Self { Iter { inner: gix_features::interrupt::IterWithErr::new(inner, make_err, &IS_INTERRUPTED), } } /// Return the inner iterator pub fn into_inner(self) -> I { self.inner.inner } /// Return the inner iterator as reference pub fn inner(&self) -> &I { &self.inner.inner } } impl Iterator for Iter where I: Iterator, EFN: FnOnce() -> E, { type Item = Result; fn next(&mut self) -> Option { self.inner.next() } } /// A wrapper for implementors of [`std::io::Read`] or [`std::io::BufRead`] with interrupt support. /// /// It fails a [read][`std::io::Read::read`] while an interrupt was requested. pub struct Read { /// The actual implementor of [`std::io::Read`] to which interrupt support will be added. inner: gix_features::interrupt::Read<'static, R>, } impl Read where R: io::Read, { /// Create a new interruptible reader from `read`. pub fn new(read: R) -> Self { Read { inner: gix_features::interrupt::Read { inner: read, should_interrupt: &IS_INTERRUPTED, }, } } /// Return the inner reader pub fn into_inner(self) -> R { self.inner.inner } } impl io::Read for Read where R: io::Read, { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.read(buf) } } impl io::BufRead for Read where R: io::BufRead, { fn fill_buf(&mut self) -> io::Result<&[u8]> { self.inner.fill_buf() } fn consume(&mut self, amt: usize) { self.inner.consume(amt) } } /// The flag behind all utility functions in this module. pub static IS_INTERRUPTED: AtomicBool = AtomicBool::new(false); /// Returns true if an interrupt is requested. pub fn is_triggered() -> bool { IS_INTERRUPTED.load(Ordering::Relaxed) } /// Trigger an interrupt, signalling to those checking for [`is_triggered()`] to stop what they are doing. pub fn trigger() { IS_INTERRUPTED.store(true, Ordering::SeqCst); } /// Sets the interrupt request to false, thus allowing those checking for [`is_triggered()`] to proceed. pub fn reset() { IS_INTERRUPTED.store(false, Ordering::SeqCst); }