diff options
Diffstat (limited to 'third_party/rust/tokio/src/signal')
-rw-r--r-- | third_party/rust/tokio/src/signal/ctrl_c.rs | 62 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/mod.rs | 100 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/registry.rs | 279 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/reusable_box.rs | 228 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/unix.rs | 477 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/unix/driver.rs | 207 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/windows.rs | 223 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/windows/stub.rs | 13 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/windows/sys.rs | 153 |
9 files changed, 1742 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/signal/ctrl_c.rs b/third_party/rust/tokio/src/signal/ctrl_c.rs new file mode 100644 index 0000000000..b26ab7ead6 --- /dev/null +++ b/third_party/rust/tokio/src/signal/ctrl_c.rs @@ -0,0 +1,62 @@ +#[cfg(unix)] +use super::unix::{self as os_impl}; +#[cfg(windows)] +use super::windows::{self as os_impl}; + +use std::io; + +/// Completes when a "ctrl-c" notification is sent to the process. +/// +/// While signals are handled very differently between Unix and Windows, both +/// platforms support receiving a signal on "ctrl-c". This function provides a +/// portable API for receiving this notification. +/// +/// Once the returned future is polled, a listener is registered. The future +/// will complete on the first received `ctrl-c` **after** the initial call to +/// either `Future::poll` or `.await`. +/// +/// # Caveats +/// +/// On Unix platforms, the first time that a `Signal` instance is registered for a +/// particular signal kind, an OS signal-handler is installed which replaces the +/// default platform behavior when that signal is received, **for the duration of +/// the entire process**. +/// +/// For example, Unix systems will terminate a process by default when it +/// receives a signal generated by "CTRL+C" on the terminal. But, when a +/// `ctrl_c` stream is created to listen for this signal, the time it arrives, +/// it will be translated to a stream event, and the process will continue to +/// execute. **Even if this `Signal` instance is dropped, subsequent SIGINT +/// deliveries will end up captured by Tokio, and the default platform behavior +/// will NOT be reset**. +/// +/// Thus, applications should take care to ensure the expected signal behavior +/// occurs as expected after listening for specific signals. +/// +/// # Examples +/// +/// ```rust,no_run +/// use tokio::signal; +/// +/// #[tokio::main] +/// async fn main() { +/// println!("waiting for ctrl-c"); +/// +/// signal::ctrl_c().await.expect("failed to listen for event"); +/// +/// println!("received ctrl-c event"); +/// } +/// ``` +/// +/// Listen in the background: +/// +/// ```rust,no_run +/// tokio::spawn(async move { +/// tokio::signal::ctrl_c().await.unwrap(); +/// // Your handler here +/// }); +/// ``` +pub async fn ctrl_c() -> io::Result<()> { + os_impl::ctrl_c()?.recv().await; + Ok(()) +} diff --git a/third_party/rust/tokio/src/signal/mod.rs b/third_party/rust/tokio/src/signal/mod.rs new file mode 100644 index 0000000000..882218a0f6 --- /dev/null +++ b/third_party/rust/tokio/src/signal/mod.rs @@ -0,0 +1,100 @@ +//! Asynchronous signal handling for Tokio. +//! +//! Note that signal handling is in general a very tricky topic and should be +//! used with great care. This crate attempts to implement 'best practice' for +//! signal handling, but it should be evaluated for your own applications' needs +//! to see if it's suitable. +//! +//! There are some fundamental limitations of this crate documented on the OS +//! specific structures, as well. +//! +//! # Examples +//! +//! Print on "ctrl-c" notification. +//! +//! ```rust,no_run +//! use tokio::signal; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! signal::ctrl_c().await?; +//! println!("ctrl-c received!"); +//! Ok(()) +//! } +//! ``` +//! +//! Wait for SIGHUP on Unix +//! +//! ```rust,no_run +//! # #[cfg(unix)] { +//! use tokio::signal::unix::{signal, SignalKind}; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! // An infinite stream of hangup signals. +//! let mut stream = signal(SignalKind::hangup())?; +//! +//! // Print whenever a HUP signal is received +//! loop { +//! stream.recv().await; +//! println!("got signal HUP"); +//! } +//! } +//! # } +//! ``` +use crate::sync::watch::Receiver; +use std::task::{Context, Poll}; + +mod ctrl_c; +pub use ctrl_c::ctrl_c; + +mod registry; + +mod os { + #[cfg(unix)] + pub(crate) use super::unix::{OsExtraData, OsStorage}; + + #[cfg(windows)] + pub(crate) use super::windows::{OsExtraData, OsStorage}; +} + +pub mod unix; +pub mod windows; + +mod reusable_box; +use self::reusable_box::ReusableBoxFuture; + +#[derive(Debug)] +struct RxFuture { + inner: ReusableBoxFuture<Receiver<()>>, +} + +async fn make_future(mut rx: Receiver<()>) -> Receiver<()> { + match rx.changed().await { + Ok(()) => rx, + Err(_) => panic!("signal sender went away"), + } +} + +impl RxFuture { + fn new(rx: Receiver<()>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } + + async fn recv(&mut self) -> Option<()> { + use crate::future::poll_fn; + poll_fn(|cx| self.poll_recv(cx)).await + } + + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + match self.inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(rx) => { + self.inner.set(make_future(rx)); + Poll::Ready(Some(())) + } + } + } +} diff --git a/third_party/rust/tokio/src/signal/registry.rs b/third_party/rust/tokio/src/signal/registry.rs new file mode 100644 index 0000000000..6d8eb9e748 --- /dev/null +++ b/third_party/rust/tokio/src/signal/registry.rs @@ -0,0 +1,279 @@ +#![allow(clippy::unit_arg)] + +use crate::signal::os::{OsExtraData, OsStorage}; + +use crate::sync::watch; + +use once_cell::sync::Lazy; +use std::ops; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; + +pub(crate) type EventId = usize; + +/// State for a specific event, whether a notification is pending delivery, +/// and what listeners are registered. +#[derive(Debug)] +pub(crate) struct EventInfo { + pending: AtomicBool, + tx: watch::Sender<()>, +} + +impl Default for EventInfo { + fn default() -> Self { + let (tx, _rx) = watch::channel(()); + + Self { + pending: AtomicBool::new(false), + tx, + } + } +} + +/// An interface for retrieving the `EventInfo` for a particular eventId. +pub(crate) trait Storage { + /// Gets the `EventInfo` for `id` if it exists. + fn event_info(&self, id: EventId) -> Option<&EventInfo>; + + /// Invokes `f` once for each defined `EventInfo` in this storage. + fn for_each<'a, F>(&'a self, f: F) + where + F: FnMut(&'a EventInfo); +} + +impl Storage for Vec<EventInfo> { + fn event_info(&self, id: EventId) -> Option<&EventInfo> { + self.get(id) + } + + fn for_each<'a, F>(&'a self, f: F) + where + F: FnMut(&'a EventInfo), + { + self.iter().for_each(f) + } +} + +/// An interface for initializing a type. Useful for situations where we cannot +/// inject a configured instance in the constructor of another type. +pub(crate) trait Init { + fn init() -> Self; +} + +/// Manages and distributes event notifications to any registered listeners. +/// +/// Generic over the underlying storage to allow for domain specific +/// optimizations (e.g. eventIds may or may not be contiguous). +#[derive(Debug)] +pub(crate) struct Registry<S> { + storage: S, +} + +impl<S> Registry<S> { + fn new(storage: S) -> Self { + Self { storage } + } +} + +impl<S: Storage> Registry<S> { + /// Registers a new listener for `event_id`. + fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { + self.storage + .event_info(event_id) + .unwrap_or_else(|| panic!("invalid event_id: {}", event_id)) + .tx + .subscribe() + } + + /// Marks `event_id` as having been delivered, without broadcasting it to + /// any listeners. + fn record_event(&self, event_id: EventId) { + if let Some(event_info) = self.storage.event_info(event_id) { + event_info.pending.store(true, Ordering::SeqCst) + } + } + + /// Broadcasts all previously recorded events to their respective listeners. + /// + /// Returns `true` if an event was delivered to at least one listener. + fn broadcast(&self) -> bool { + let mut did_notify = false; + self.storage.for_each(|event_info| { + // Any signal of this kind arrived since we checked last? + if !event_info.pending.swap(false, Ordering::SeqCst) { + return; + } + + // Ignore errors if there are no listeners + if event_info.tx.send(()).is_ok() { + did_notify = true; + } + }); + + did_notify + } +} + +pub(crate) struct Globals { + extra: OsExtraData, + registry: Registry<OsStorage>, +} + +impl ops::Deref for Globals { + type Target = OsExtraData; + + fn deref(&self) -> &Self::Target { + &self.extra + } +} + +impl Globals { + /// Registers a new listener for `event_id`. + pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { + self.registry.register_listener(event_id) + } + + /// Marks `event_id` as having been delivered, without broadcasting it to + /// any listeners. + pub(crate) fn record_event(&self, event_id: EventId) { + self.registry.record_event(event_id); + } + + /// Broadcasts all previously recorded events to their respective listeners. + /// + /// Returns `true` if an event was delivered to at least one listener. + pub(crate) fn broadcast(&self) -> bool { + self.registry.broadcast() + } + + #[cfg(unix)] + pub(crate) fn storage(&self) -> &OsStorage { + &self.registry.storage + } +} + +pub(crate) fn globals() -> Pin<&'static Globals> +where + OsExtraData: 'static + Send + Sync + Init, + OsStorage: 'static + Send + Sync + Init, +{ + static GLOBALS: Lazy<Pin<Box<Globals>>> = Lazy::new(|| { + Box::pin(Globals { + extra: OsExtraData::init(), + registry: Registry::new(OsStorage::init()), + }) + }); + + GLOBALS.as_ref() +} + +#[cfg(all(test, not(loom)))] +mod tests { + use super::*; + use crate::runtime::{self, Runtime}; + use crate::sync::{oneshot, watch}; + + use futures::future; + + #[test] + fn smoke() { + let rt = rt(); + rt.block_on(async move { + let registry = Registry::new(vec![ + EventInfo::default(), + EventInfo::default(), + EventInfo::default(), + ]); + + let first = registry.register_listener(0); + let second = registry.register_listener(1); + let third = registry.register_listener(2); + + let (fire, wait) = oneshot::channel(); + + crate::spawn(async { + wait.await.expect("wait failed"); + + // Record some events which should get coalesced + registry.record_event(0); + registry.record_event(0); + registry.record_event(1); + registry.record_event(1); + registry.broadcast(); + + // Yield so the previous broadcast can get received + // + // This yields many times since the block_on task is only polled every 61 + // ticks. + for _ in 0..100 { + crate::task::yield_now().await; + } + + // Send subsequent signal + registry.record_event(0); + registry.broadcast(); + + drop(registry); + }); + + let _ = fire.send(()); + let all = future::join3(collect(first), collect(second), collect(third)); + + let (first_results, second_results, third_results) = all.await; + assert_eq!(2, first_results.len()); + assert_eq!(1, second_results.len()); + assert_eq!(0, third_results.len()); + }); + } + + #[test] + #[should_panic = "invalid event_id: 1"] + fn register_panics_on_invalid_input() { + let registry = Registry::new(vec![EventInfo::default()]); + + registry.register_listener(1); + } + + #[test] + fn record_invalid_event_does_nothing() { + let registry = Registry::new(vec![EventInfo::default()]); + registry.record_event(42); + } + + #[test] + fn broadcast_returns_if_at_least_one_event_fired() { + let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]); + + registry.record_event(0); + assert!(!registry.broadcast()); + + let first = registry.register_listener(0); + let second = registry.register_listener(1); + + registry.record_event(0); + assert!(registry.broadcast()); + + drop(first); + registry.record_event(0); + assert!(!registry.broadcast()); + + drop(second); + } + + fn rt() -> Runtime { + runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap() + } + + async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> { + let mut ret = vec![]; + + while let Ok(v) = rx.changed().await { + ret.push(v); + } + + ret + } +} diff --git a/third_party/rust/tokio/src/signal/reusable_box.rs b/third_party/rust/tokio/src/signal/reusable_box.rs new file mode 100644 index 0000000000..02f32474b1 --- /dev/null +++ b/third_party/rust/tokio/src/signal/reusable_box.rs @@ -0,0 +1,228 @@ +use std::alloc::Layout; +use std::future::Future; +use std::panic::AssertUnwindSafe; +use std::pin::Pin; +use std::ptr::{self, NonNull}; +use std::task::{Context, Poll}; +use std::{fmt, panic}; + +/// A reusable `Pin<Box<dyn Future<Output = T> + Send>>`. +/// +/// This type lets you replace the future stored in the box without +/// reallocating when the size and alignment permits this. +pub(crate) struct ReusableBoxFuture<T> { + boxed: NonNull<dyn Future<Output = T> + Send>, +} + +impl<T> ReusableBoxFuture<T> { + /// Create a new `ReusableBoxFuture<T>` containing the provided future. + pub(crate) fn new<F>(future: F) -> Self + where + F: Future<Output = T> + Send + 'static, + { + let boxed: Box<dyn Future<Output = T> + Send> = Box::new(future); + + let boxed = Box::into_raw(boxed); + + // SAFETY: Box::into_raw does not return null pointers. + let boxed = unsafe { NonNull::new_unchecked(boxed) }; + + Self { boxed } + } + + /// Replaces the future currently stored in this box. + /// + /// This reallocates if and only if the layout of the provided future is + /// different from the layout of the currently stored future. + pub(crate) fn set<F>(&mut self, future: F) + where + F: Future<Output = T> + Send + 'static, + { + if let Err(future) = self.try_set(future) { + *self = Self::new(future); + } + } + + /// Replaces the future currently stored in this box. + /// + /// This function never reallocates, but returns an error if the provided + /// future has a different size or alignment from the currently stored + /// future. + pub(crate) fn try_set<F>(&mut self, future: F) -> Result<(), F> + where + F: Future<Output = T> + Send + 'static, + { + // SAFETY: The pointer is not dangling. + let self_layout = { + let dyn_future: &(dyn Future<Output = T> + Send) = unsafe { self.boxed.as_ref() }; + Layout::for_value(dyn_future) + }; + + if Layout::new::<F>() == self_layout { + // SAFETY: We just checked that the layout of F is correct. + unsafe { + self.set_same_layout(future); + } + + Ok(()) + } else { + Err(future) + } + } + + /// Sets the current future. + /// + /// # Safety + /// + /// This function requires that the layout of the provided future is the + /// same as `self.layout`. + unsafe fn set_same_layout<F>(&mut self, future: F) + where + F: Future<Output = T> + Send + 'static, + { + // Drop the existing future, catching any panics. + let result = panic::catch_unwind(AssertUnwindSafe(|| { + ptr::drop_in_place(self.boxed.as_ptr()); + })); + + // Overwrite the future behind the pointer. This is safe because the + // allocation was allocated with the same size and alignment as the type F. + let self_ptr: *mut F = self.boxed.as_ptr() as *mut F; + ptr::write(self_ptr, future); + + // Update the vtable of self.boxed. The pointer is not null because we + // just got it from self.boxed, which is not null. + self.boxed = NonNull::new_unchecked(self_ptr); + + // If the old future's destructor panicked, resume unwinding. + match result { + Ok(()) => {} + Err(payload) => { + panic::resume_unwind(payload); + } + } + } + + /// Gets a pinned reference to the underlying future. + pub(crate) fn get_pin(&mut self) -> Pin<&mut (dyn Future<Output = T> + Send)> { + // SAFETY: The user of this box cannot move the box, and we do not move it + // either. + unsafe { Pin::new_unchecked(self.boxed.as_mut()) } + } + + /// Polls the future stored inside this box. + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<T> { + self.get_pin().poll(cx) + } +} + +impl<T> Future for ReusableBoxFuture<T> { + type Output = T; + + /// Polls the future stored inside this box. + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { + Pin::into_inner(self).get_pin().poll(cx) + } +} + +// The future stored inside ReusableBoxFuture<T> must be Send. +unsafe impl<T> Send for ReusableBoxFuture<T> {} + +// The only method called on self.boxed is poll, which takes &mut self, so this +// struct being Sync does not permit any invalid access to the Future, even if +// the future is not Sync. +unsafe impl<T> Sync for ReusableBoxFuture<T> {} + +// Just like a Pin<Box<dyn Future>> is always Unpin, so is this type. +impl<T> Unpin for ReusableBoxFuture<T> {} + +impl<T> Drop for ReusableBoxFuture<T> { + fn drop(&mut self) { + unsafe { + drop(Box::from_raw(self.boxed.as_ptr())); + } + } +} + +impl<T> fmt::Debug for ReusableBoxFuture<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReusableBoxFuture").finish() + } +} + +#[cfg(test)] +#[cfg(not(miri))] // Miri breaks when you use Pin<&mut dyn Future> +mod test { + use super::ReusableBoxFuture; + use futures::future::FutureExt; + use std::alloc::Layout; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + #[test] + fn test_different_futures() { + let fut = async move { 10 }; + // Not zero sized! + assert_eq!(Layout::for_value(&fut).size(), 1); + + let mut b = ReusableBoxFuture::new(fut); + + assert_eq!(b.get_pin().now_or_never(), Some(10)); + + b.try_set(async move { 20 }) + .unwrap_or_else(|_| panic!("incorrect size")); + + assert_eq!(b.get_pin().now_or_never(), Some(20)); + + b.try_set(async move { 30 }) + .unwrap_or_else(|_| panic!("incorrect size")); + + assert_eq!(b.get_pin().now_or_never(), Some(30)); + } + + #[test] + fn test_different_sizes() { + let fut1 = async move { 10 }; + let val = [0u32; 1000]; + let fut2 = async move { val[0] }; + let fut3 = ZeroSizedFuture {}; + + assert_eq!(Layout::for_value(&fut1).size(), 1); + assert_eq!(Layout::for_value(&fut2).size(), 4004); + assert_eq!(Layout::for_value(&fut3).size(), 0); + + let mut b = ReusableBoxFuture::new(fut1); + assert_eq!(b.get_pin().now_or_never(), Some(10)); + b.set(fut2); + assert_eq!(b.get_pin().now_or_never(), Some(0)); + b.set(fut3); + assert_eq!(b.get_pin().now_or_never(), Some(5)); + } + + struct ZeroSizedFuture {} + impl Future for ZeroSizedFuture { + type Output = u32; + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u32> { + Poll::Ready(5) + } + } + + #[test] + fn test_zero_sized() { + let fut = ZeroSizedFuture {}; + // Zero sized! + assert_eq!(Layout::for_value(&fut).size(), 0); + + let mut b = ReusableBoxFuture::new(fut); + + assert_eq!(b.get_pin().now_or_never(), Some(5)); + assert_eq!(b.get_pin().now_or_never(), Some(5)); + + b.try_set(ZeroSizedFuture {}) + .unwrap_or_else(|_| panic!("incorrect size")); + + assert_eq!(b.get_pin().now_or_never(), Some(5)); + assert_eq!(b.get_pin().now_or_never(), Some(5)); + } +} diff --git a/third_party/rust/tokio/src/signal/unix.rs b/third_party/rust/tokio/src/signal/unix.rs new file mode 100644 index 0000000000..86ea9a93ee --- /dev/null +++ b/third_party/rust/tokio/src/signal/unix.rs @@ -0,0 +1,477 @@ +//! Unix-specific types for signal handling. +//! +//! This module is only defined on Unix platforms and contains the primary +//! `Signal` type for receiving notifications of signals. + +#![cfg(unix)] +#![cfg_attr(docsrs, doc(cfg(all(unix, feature = "signal"))))] + +use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; +use crate::signal::RxFuture; +use crate::sync::watch; + +use mio::net::UnixStream; +use std::io::{self, Error, ErrorKind, Write}; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Once; +use std::task::{Context, Poll}; + +pub(crate) mod driver; +use self::driver::Handle; + +pub(crate) type OsStorage = Vec<SignalInfo>; + +// Number of different unix signals +// (FreeBSD has 33) +const SIGNUM: usize = 33; + +impl Init for OsStorage { + fn init() -> Self { + (0..SIGNUM).map(|_| SignalInfo::default()).collect() + } +} + +impl Storage for OsStorage { + fn event_info(&self, id: EventId) -> Option<&EventInfo> { + self.get(id).map(|si| &si.event_info) + } + + fn for_each<'a, F>(&'a self, f: F) + where + F: FnMut(&'a EventInfo), + { + self.iter().map(|si| &si.event_info).for_each(f) + } +} + +#[derive(Debug)] +pub(crate) struct OsExtraData { + sender: UnixStream, + receiver: UnixStream, +} + +impl Init for OsExtraData { + fn init() -> Self { + let (receiver, sender) = UnixStream::pair().expect("failed to create UnixStream"); + + Self { sender, receiver } + } +} + +/// Represents the specific kind of signal to listen for. +#[derive(Debug, Clone, Copy)] +pub struct SignalKind(libc::c_int); + +impl SignalKind { + /// Allows for listening to any valid OS signal. + /// + /// For example, this can be used for listening for platform-specific + /// signals. + /// ```rust,no_run + /// # use tokio::signal::unix::SignalKind; + /// # let signum = -1; + /// // let signum = libc::OS_SPECIFIC_SIGNAL; + /// let kind = SignalKind::from_raw(signum); + /// ``` + // Use `std::os::raw::c_int` on public API to prevent leaking a non-stable + // type alias from libc. + // `libc::c_int` and `std::os::raw::c_int` are currently the same type, and are + // unlikely to change to other types, but technically libc can change this + // in the future minor version. + // See https://github.com/tokio-rs/tokio/issues/3767 for more. + pub fn from_raw(signum: std::os::raw::c_int) -> Self { + Self(signum as libc::c_int) + } + + /// Represents the SIGALRM signal. + /// + /// On Unix systems this signal is sent when a real-time timer has expired. + /// By default, the process is terminated by this signal. + pub fn alarm() -> Self { + Self(libc::SIGALRM) + } + + /// Represents the SIGCHLD signal. + /// + /// On Unix systems this signal is sent when the status of a child process + /// has changed. By default, this signal is ignored. + pub fn child() -> Self { + Self(libc::SIGCHLD) + } + + /// Represents the SIGHUP signal. + /// + /// On Unix systems this signal is sent when the terminal is disconnected. + /// By default, the process is terminated by this signal. + pub fn hangup() -> Self { + Self(libc::SIGHUP) + } + + /// Represents the SIGINFO signal. + /// + /// On Unix systems this signal is sent to request a status update from the + /// process. By default, this signal is ignored. + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ))] + pub fn info() -> Self { + Self(libc::SIGINFO) + } + + /// Represents the SIGINT signal. + /// + /// On Unix systems this signal is sent to interrupt a program. + /// By default, the process is terminated by this signal. + pub fn interrupt() -> Self { + Self(libc::SIGINT) + } + + /// Represents the SIGIO signal. + /// + /// On Unix systems this signal is sent when I/O operations are possible + /// on some file descriptor. By default, this signal is ignored. + pub fn io() -> Self { + Self(libc::SIGIO) + } + + /// Represents the SIGPIPE signal. + /// + /// On Unix systems this signal is sent when the process attempts to write + /// to a pipe which has no reader. By default, the process is terminated by + /// this signal. + pub fn pipe() -> Self { + Self(libc::SIGPIPE) + } + + /// Represents the SIGQUIT signal. + /// + /// On Unix systems this signal is sent to issue a shutdown of the + /// process, after which the OS will dump the process core. + /// By default, the process is terminated by this signal. + pub fn quit() -> Self { + Self(libc::SIGQUIT) + } + + /// Represents the SIGTERM signal. + /// + /// On Unix systems this signal is sent to issue a shutdown of the + /// process. By default, the process is terminated by this signal. + pub fn terminate() -> Self { + Self(libc::SIGTERM) + } + + /// Represents the SIGUSR1 signal. + /// + /// On Unix systems this is a user defined signal. + /// By default, the process is terminated by this signal. + pub fn user_defined1() -> Self { + Self(libc::SIGUSR1) + } + + /// Represents the SIGUSR2 signal. + /// + /// On Unix systems this is a user defined signal. + /// By default, the process is terminated by this signal. + pub fn user_defined2() -> Self { + Self(libc::SIGUSR2) + } + + /// Represents the SIGWINCH signal. + /// + /// On Unix systems this signal is sent when the terminal window is resized. + /// By default, this signal is ignored. + pub fn window_change() -> Self { + Self(libc::SIGWINCH) + } +} + +pub(crate) struct SignalInfo { + event_info: EventInfo, + init: Once, + initialized: AtomicBool, +} + +impl Default for SignalInfo { + fn default() -> SignalInfo { + SignalInfo { + event_info: Default::default(), + init: Once::new(), + initialized: AtomicBool::new(false), + } + } +} + +/// Our global signal handler for all signals registered by this module. +/// +/// The purpose of this signal handler is to primarily: +/// +/// 1. Flag that our specific signal was received (e.g. store an atomic flag) +/// 2. Wake up the driver by writing a byte to a pipe +/// +/// Those two operations should both be async-signal safe. +fn action(globals: Pin<&'static Globals>, signal: libc::c_int) { + globals.record_event(signal as EventId); + + // Send a wakeup, ignore any errors (anything reasonably possible is + // full pipe and then it will wake up anyway). + let mut sender = &globals.sender; + drop(sender.write(&[1])); +} + +/// Enables this module to receive signal notifications for the `signal` +/// provided. +/// +/// This will register the signal handler if it hasn't already been registered, +/// returning any error along the way if that fails. +fn signal_enable(signal: SignalKind, handle: &Handle) -> io::Result<()> { + let signal = signal.0; + if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) { + return Err(Error::new( + ErrorKind::Other, + format!("Refusing to register signal {}", signal), + )); + } + + // Check that we have a signal driver running + handle.check_inner()?; + + let globals = globals(); + let siginfo = match globals.storage().get(signal as EventId) { + Some(slot) => slot, + None => return Err(io::Error::new(io::ErrorKind::Other, "signal too large")), + }; + let mut registered = Ok(()); + siginfo.init.call_once(|| { + registered = unsafe { + signal_hook_registry::register(signal, move || action(globals, signal)).map(|_| ()) + }; + if registered.is_ok() { + siginfo.initialized.store(true, Ordering::Relaxed); + } + }); + registered?; + // If the call_once failed, it won't be retried on the next attempt to register the signal. In + // such case it is not run, registered is still `Ok(())`, initialized is still `false`. + if siginfo.initialized.load(Ordering::Relaxed) { + Ok(()) + } else { + Err(Error::new( + ErrorKind::Other, + "Failed to register signal handler", + )) + } +} + +/// A stream of events for receiving a particular type of OS signal. +/// +/// In general signal handling on Unix is a pretty tricky topic, and this +/// structure is no exception! There are some important limitations to keep in +/// mind when using `Signal` streams: +/// +/// * Signals handling in Unix already necessitates coalescing signals +/// together sometimes. This `Signal` stream is also no exception here in +/// that it will also coalesce signals. That is, even if the signal handler +/// for this process runs multiple times, the `Signal` stream may only return +/// one signal notification. Specifically, before `poll` is called, all +/// signal notifications are coalesced into one item returned from `poll`. +/// Once `poll` has been called, however, a further signal is guaranteed to +/// be yielded as an item. +/// +/// Put another way, any element pulled off the returned stream corresponds to +/// *at least one* signal, but possibly more. +/// +/// * Signal handling in general is relatively inefficient. Although some +/// improvements are possible in this crate, it's recommended to not plan on +/// having millions of signal channels open. +/// +/// If you've got any questions about this feel free to open an issue on the +/// repo! New approaches to alleviate some of these limitations are always +/// appreciated! +/// +/// # Caveats +/// +/// The first time that a `Signal` instance is registered for a particular +/// signal kind, an OS signal-handler is installed which replaces the default +/// platform behavior when that signal is received, **for the duration of the +/// entire process**. +/// +/// For example, Unix systems will terminate a process by default when it +/// receives SIGINT. But, when a `Signal` instance is created to listen for +/// this signal, the next SIGINT that arrives will be translated to a stream +/// event, and the process will continue to execute. **Even if this `Signal` +/// instance is dropped, subsequent SIGINT deliveries will end up captured by +/// Tokio, and the default platform behavior will NOT be reset**. +/// +/// Thus, applications should take care to ensure the expected signal behavior +/// occurs as expected after listening for specific signals. +/// +/// # Examples +/// +/// Wait for SIGHUP +/// +/// ```rust,no_run +/// use tokio::signal::unix::{signal, SignalKind}; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box<dyn std::error::Error>> { +/// // An infinite stream of hangup signals. +/// let mut stream = signal(SignalKind::hangup())?; +/// +/// // Print whenever a HUP signal is received +/// loop { +/// stream.recv().await; +/// println!("got signal HUP"); +/// } +/// } +/// ``` +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct Signal { + inner: RxFuture, +} + +/// Creates a new stream which will receive notifications when the current +/// process receives the specified signal `kind`. +/// +/// This function will create a new stream which binds to the default reactor. +/// The `Signal` stream is an infinite stream which will receive +/// notifications whenever a signal is received. More documentation can be +/// found on `Signal` itself, but to reiterate: +/// +/// * Signals may be coalesced beyond what the kernel already does. +/// * Once a signal handler is registered with the process the underlying +/// libc signal handler is never unregistered. +/// +/// A `Signal` stream can be created for a particular signal number +/// multiple times. When a signal is received then all the associated +/// channels will receive the signal notification. +/// +/// # Errors +/// +/// * If the lower-level C functions fail for some reason. +/// * If the previous initialization of this specific signal failed. +/// * If the signal is one of +/// [`signal_hook::FORBIDDEN`](fn@signal_hook_registry::register#panics) +pub fn signal(kind: SignalKind) -> io::Result<Signal> { + let rx = signal_with_handle(kind, &Handle::current())?; + + Ok(Signal { + inner: RxFuture::new(rx), + }) +} + +pub(crate) fn signal_with_handle( + kind: SignalKind, + handle: &Handle, +) -> io::Result<watch::Receiver<()>> { + // Turn the signal delivery on once we are ready for it + signal_enable(kind, handle)?; + + Ok(globals().register_listener(kind.0 as EventId)) +} + +impl Signal { + /// Receives the next signal notification event. + /// + /// `None` is returned if no more events can be received by this stream. + /// + /// # Examples + /// + /// Wait for SIGHUP + /// + /// ```rust,no_run + /// use tokio::signal::unix::{signal, SignalKind}; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// // An infinite stream of hangup signals. + /// let mut stream = signal(SignalKind::hangup())?; + /// + /// // Print whenever a HUP signal is received + /// loop { + /// stream.recv().await; + /// println!("got signal HUP"); + /// } + /// } + /// ``` + pub async fn recv(&mut self) -> Option<()> { + self.inner.recv().await + } + + /// Polls to receive the next signal notification event, outside of an + /// `async` context. + /// + /// This method returns: + /// + /// * `Poll::Pending` if no signals are available but the channel is not + /// closed. + /// * `Poll::Ready(Some(()))` if a signal is available. + /// * `Poll::Ready(None)` if the channel has been closed and all signals + /// sent before it was closed have been received. + /// + /// # Examples + /// + /// Polling from a manually implemented future + /// + /// ```rust,no_run + /// use std::pin::Pin; + /// use std::future::Future; + /// use std::task::{Context, Poll}; + /// use tokio::signal::unix::Signal; + /// + /// struct MyFuture { + /// signal: Signal, + /// } + /// + /// impl Future for MyFuture { + /// type Output = Option<()>; + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + /// println!("polling MyFuture"); + /// self.signal.poll_recv(cx) + /// } + /// } + /// ``` + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +// Work around for abstracting streams internally +pub(crate) trait InternalStream { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>; +} + +impl InternalStream for Signal { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.poll_recv(cx) + } +} + +pub(crate) fn ctrl_c() -> io::Result<Signal> { + signal(SignalKind::interrupt()) +} + +#[cfg(all(test, not(loom)))] +mod tests { + use super::*; + + #[test] + fn signal_enable_error_on_invalid_input() { + signal_enable(SignalKind::from_raw(-1), &Handle::default()).unwrap_err(); + } + + #[test] + fn signal_enable_error_on_forbidden_input() { + signal_enable( + SignalKind::from_raw(signal_hook_registry::FORBIDDEN[0]), + &Handle::default(), + ) + .unwrap_err(); + } +} diff --git a/third_party/rust/tokio/src/signal/unix/driver.rs b/third_party/rust/tokio/src/signal/unix/driver.rs new file mode 100644 index 0000000000..5fe7c354c5 --- /dev/null +++ b/third_party/rust/tokio/src/signal/unix/driver.rs @@ -0,0 +1,207 @@ +#![cfg_attr(not(feature = "rt"), allow(dead_code))] + +//! Signal driver + +use crate::io::driver::{Driver as IoDriver, Interest}; +use crate::io::PollEvented; +use crate::park::Park; +use crate::signal::registry::globals; + +use mio::net::UnixStream; +use std::io::{self, Read}; +use std::ptr; +use std::sync::{Arc, Weak}; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +use std::time::Duration; + +/// Responsible for registering wakeups when an OS signal is received, and +/// subsequently dispatching notifications to any signal listeners as appropriate. +/// +/// Note: this driver relies on having an enabled IO driver in order to listen to +/// pipe write wakeups. +#[derive(Debug)] +pub(crate) struct Driver { + /// Thread parker. The `Driver` park implementation delegates to this. + park: IoDriver, + + /// A pipe for receiving wake events from the signal handler + receiver: PollEvented<UnixStream>, + + /// Shared state + inner: Arc<Inner>, +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct Handle { + inner: Weak<Inner>, +} + +#[derive(Debug)] +pub(super) struct Inner(()); + +// ===== impl Driver ===== + +impl Driver { + /// Creates a new signal `Driver` instance that delegates wakeups to `park`. + pub(crate) fn new(park: IoDriver) -> io::Result<Self> { + use std::mem::ManuallyDrop; + use std::os::unix::io::{AsRawFd, FromRawFd}; + + // NB: We give each driver a "fresh" receiver file descriptor to avoid + // the issues described in alexcrichton/tokio-process#42. + // + // In the past we would reuse the actual receiver file descriptor and + // swallow any errors around double registration of the same descriptor. + // I'm not sure if the second (failed) registration simply doesn't end + // up receiving wake up notifications, or there could be some race + // condition when consuming readiness events, but having distinct + // descriptors for distinct PollEvented instances appears to mitigate + // this. + // + // Unfortunately we cannot just use a single global PollEvented instance + // either, since we can't compare Handles or assume they will always + // point to the exact same reactor. + // + // Mio 0.7 removed `try_clone()` as an API due to unexpected behavior + // with registering dups with the same reactor. In this case, duping is + // safe as each dup is registered with separate reactors **and** we + // only expect at least one dup to receive the notification. + + // Manually drop as we don't actually own this instance of UnixStream. + let receiver_fd = globals().receiver.as_raw_fd(); + + // safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe. + let original = + ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) }); + let receiver = UnixStream::from_std(original.try_clone()?); + let receiver = PollEvented::new_with_interest_and_handle( + receiver, + Interest::READABLE | Interest::WRITABLE, + park.handle(), + )?; + + Ok(Self { + park, + receiver, + inner: Arc::new(Inner(())), + }) + } + + /// Returns a handle to this event loop which can be sent across threads + /// and can be used as a proxy to the event loop itself. + pub(crate) fn handle(&self) -> Handle { + Handle { + inner: Arc::downgrade(&self.inner), + } + } + + fn process(&self) { + // Check if the pipe is ready to read and therefore has "woken" us up + // + // To do so, we will `poll_read_ready` with a noop waker, since we don't + // need to actually be notified when read ready... + let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) }; + let mut cx = Context::from_waker(&waker); + + let ev = match self.receiver.registration().poll_read_ready(&mut cx) { + Poll::Ready(Ok(ev)) => ev, + Poll::Ready(Err(e)) => panic!("reactor gone: {}", e), + Poll::Pending => return, // No wake has arrived, bail + }; + + // Drain the pipe completely so we can receive a new readiness event + // if another signal has come in. + let mut buf = [0; 128]; + loop { + match (&*self.receiver).read(&mut buf) { + Ok(0) => panic!("EOF on self-pipe"), + Ok(_) => continue, // Keep reading + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(e) => panic!("Bad read on self-pipe: {}", e), + } + } + + self.receiver.registration().clear_readiness(ev); + + // Broadcast any signals which were received + globals().broadcast(); + } +} + +const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop); + +unsafe fn noop_clone(_data: *const ()) -> RawWaker { + RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE) +} + +unsafe fn noop(_data: *const ()) {} + +// ===== impl Park for Driver ===== + +impl Park for Driver { + type Unpark = <IoDriver as Park>::Unpark; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.park.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.park.park()?; + self.process(); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.park.park_timeout(duration)?; + self.process(); + Ok(()) + } + + fn shutdown(&mut self) { + self.park.shutdown() + } +} + +// ===== impl Handle ===== + +impl Handle { + pub(super) fn check_inner(&self) -> io::Result<()> { + if self.inner.strong_count() > 0 { + Ok(()) + } else { + Err(io::Error::new(io::ErrorKind::Other, "signal driver gone")) + } + } +} + +cfg_rt! { + impl Handle { + /// Returns a handle to the current driver + /// + /// # Panics + /// + /// This function panics if there is no current signal driver set. + pub(super) fn current() -> Self { + crate::runtime::context::signal_handle().expect( + "there is no signal driver running, must be called from the context of Tokio runtime", + ) + } + } +} + +cfg_not_rt! { + impl Handle { + /// Returns a handle to the current driver + /// + /// # Panics + /// + /// This function panics if there is no current signal driver set. + pub(super) fn current() -> Self { + panic!( + "there is no signal driver running, must be called from the context of Tokio runtime or with\ + `rt` enabled.", + ) + } + } +} diff --git a/third_party/rust/tokio/src/signal/windows.rs b/third_party/rust/tokio/src/signal/windows.rs new file mode 100644 index 0000000000..11ec6cb08c --- /dev/null +++ b/third_party/rust/tokio/src/signal/windows.rs @@ -0,0 +1,223 @@ +//! Windows-specific types for signal handling. +//! +//! This module is only defined on Windows and allows receiving "ctrl-c" +//! and "ctrl-break" notifications. These events are listened for via the +//! `SetConsoleCtrlHandler` function which receives events of the type +//! `CTRL_C_EVENT` and `CTRL_BREAK_EVENT`. + +#![cfg(any(windows, docsrs))] +#![cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))] + +use crate::signal::RxFuture; +use std::io; +use std::task::{Context, Poll}; + +#[cfg(not(docsrs))] +#[path = "windows/sys.rs"] +mod imp; +#[cfg(not(docsrs))] +pub(crate) use self::imp::{OsExtraData, OsStorage}; + +#[cfg(docsrs)] +#[path = "windows/stub.rs"] +mod imp; + +/// Creates a new stream which receives "ctrl-c" notifications sent to the +/// process. +/// +/// # Examples +/// +/// ```rust,no_run +/// use tokio::signal::windows::ctrl_c; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box<dyn std::error::Error>> { +/// // An infinite stream of CTRL-C events. +/// let mut stream = ctrl_c()?; +/// +/// // Print whenever a CTRL-C event is received. +/// for countdown in (0..3).rev() { +/// stream.recv().await; +/// println!("got CTRL-C. {} more to exit", countdown); +/// } +/// +/// Ok(()) +/// } +/// ``` +pub fn ctrl_c() -> io::Result<CtrlC> { + Ok(CtrlC { + inner: self::imp::ctrl_c()?, + }) +} + +/// Represents a stream which receives "ctrl-c" notifications sent to the process +/// via `SetConsoleCtrlHandler`. +/// +/// A notification to this process notifies *all* streams listening for +/// this event. Moreover, the notifications **are coalesced** if they aren't processed +/// quickly enough. This means that if two notifications are received back-to-back, +/// then the stream may only receive one item about the two notifications. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct CtrlC { + inner: RxFuture, +} + +impl CtrlC { + /// Receives the next signal notification event. + /// + /// `None` is returned if no more events can be received by this stream. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tokio::signal::windows::ctrl_c; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// // An infinite stream of CTRL-C events. + /// let mut stream = ctrl_c()?; + /// + /// // Print whenever a CTRL-C event is received. + /// for countdown in (0..3).rev() { + /// stream.recv().await; + /// println!("got CTRL-C. {} more to exit", countdown); + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn recv(&mut self) -> Option<()> { + self.inner.recv().await + } + + /// Polls to receive the next signal notification event, outside of an + /// `async` context. + /// + /// `None` is returned if no more events can be received by this stream. + /// + /// # Examples + /// + /// Polling from a manually implemented future + /// + /// ```rust,no_run + /// use std::pin::Pin; + /// use std::future::Future; + /// use std::task::{Context, Poll}; + /// use tokio::signal::windows::CtrlC; + /// + /// struct MyFuture { + /// ctrl_c: CtrlC, + /// } + /// + /// impl Future for MyFuture { + /// type Output = Option<()>; + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + /// println!("polling MyFuture"); + /// self.ctrl_c.poll_recv(cx) + /// } + /// } + /// ``` + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +/// Represents a stream which receives "ctrl-break" notifications sent to the process +/// via `SetConsoleCtrlHandler`. +/// +/// A notification to this process notifies *all* streams listening for +/// this event. Moreover, the notifications **are coalesced** if they aren't processed +/// quickly enough. This means that if two notifications are received back-to-back, +/// then the stream may only receive one item about the two notifications. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct CtrlBreak { + inner: RxFuture, +} + +impl CtrlBreak { + /// Receives the next signal notification event. + /// + /// `None` is returned if no more events can be received by this stream. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tokio::signal::windows::ctrl_break; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// // An infinite stream of CTRL-BREAK events. + /// let mut stream = ctrl_break()?; + /// + /// // Print whenever a CTRL-BREAK event is received. + /// loop { + /// stream.recv().await; + /// println!("got signal CTRL-BREAK"); + /// } + /// } + /// ``` + pub async fn recv(&mut self) -> Option<()> { + self.inner.recv().await + } + + /// Polls to receive the next signal notification event, outside of an + /// `async` context. + /// + /// `None` is returned if no more events can be received by this stream. + /// + /// # Examples + /// + /// Polling from a manually implemented future + /// + /// ```rust,no_run + /// use std::pin::Pin; + /// use std::future::Future; + /// use std::task::{Context, Poll}; + /// use tokio::signal::windows::CtrlBreak; + /// + /// struct MyFuture { + /// ctrl_break: CtrlBreak, + /// } + /// + /// impl Future for MyFuture { + /// type Output = Option<()>; + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + /// println!("polling MyFuture"); + /// self.ctrl_break.poll_recv(cx) + /// } + /// } + /// ``` + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +/// Creates a new stream which receives "ctrl-break" notifications sent to the +/// process. +/// +/// # Examples +/// +/// ```rust,no_run +/// use tokio::signal::windows::ctrl_break; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box<dyn std::error::Error>> { +/// // An infinite stream of CTRL-BREAK events. +/// let mut stream = ctrl_break()?; +/// +/// // Print whenever a CTRL-BREAK event is received. +/// loop { +/// stream.recv().await; +/// println!("got signal CTRL-BREAK"); +/// } +/// } +/// ``` +pub fn ctrl_break() -> io::Result<CtrlBreak> { + Ok(CtrlBreak { + inner: self::imp::ctrl_break()?, + }) +} diff --git a/third_party/rust/tokio/src/signal/windows/stub.rs b/third_party/rust/tokio/src/signal/windows/stub.rs new file mode 100644 index 0000000000..88630543da --- /dev/null +++ b/third_party/rust/tokio/src/signal/windows/stub.rs @@ -0,0 +1,13 @@ +//! Stub implementations for the platform API so that rustdoc can build linkable +//! documentation on non-windows platforms. + +use crate::signal::RxFuture; +use std::io; + +pub(super) fn ctrl_c() -> io::Result<RxFuture> { + panic!() +} + +pub(super) fn ctrl_break() -> io::Result<RxFuture> { + panic!() +} diff --git a/third_party/rust/tokio/src/signal/windows/sys.rs b/third_party/rust/tokio/src/signal/windows/sys.rs new file mode 100644 index 0000000000..8d29c357b6 --- /dev/null +++ b/third_party/rust/tokio/src/signal/windows/sys.rs @@ -0,0 +1,153 @@ +use std::convert::TryFrom; +use std::io; +use std::sync::Once; + +use crate::signal::registry::{globals, EventId, EventInfo, Init, Storage}; +use crate::signal::RxFuture; + +use winapi::shared::minwindef::{BOOL, DWORD, FALSE, TRUE}; +use winapi::um::consoleapi::SetConsoleCtrlHandler; +use winapi::um::wincon::{CTRL_BREAK_EVENT, CTRL_C_EVENT}; + +pub(super) fn ctrl_c() -> io::Result<RxFuture> { + new(CTRL_C_EVENT) +} + +pub(super) fn ctrl_break() -> io::Result<RxFuture> { + new(CTRL_BREAK_EVENT) +} + +fn new(signum: DWORD) -> io::Result<RxFuture> { + global_init()?; + let rx = globals().register_listener(signum as EventId); + Ok(RxFuture::new(rx)) +} + +#[derive(Debug)] +pub(crate) struct OsStorage { + ctrl_c: EventInfo, + ctrl_break: EventInfo, +} + +impl Init for OsStorage { + fn init() -> Self { + Self { + ctrl_c: EventInfo::default(), + ctrl_break: EventInfo::default(), + } + } +} + +impl Storage for OsStorage { + fn event_info(&self, id: EventId) -> Option<&EventInfo> { + match DWORD::try_from(id) { + Ok(CTRL_C_EVENT) => Some(&self.ctrl_c), + Ok(CTRL_BREAK_EVENT) => Some(&self.ctrl_break), + _ => None, + } + } + + fn for_each<'a, F>(&'a self, mut f: F) + where + F: FnMut(&'a EventInfo), + { + f(&self.ctrl_c); + f(&self.ctrl_break); + } +} + +#[derive(Debug)] +pub(crate) struct OsExtraData {} + +impl Init for OsExtraData { + fn init() -> Self { + Self {} + } +} + +fn global_init() -> io::Result<()> { + static INIT: Once = Once::new(); + + let mut init = None; + + INIT.call_once(|| unsafe { + let rc = SetConsoleCtrlHandler(Some(handler), TRUE); + let ret = if rc == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + }; + + init = Some(ret); + }); + + init.unwrap_or_else(|| Ok(())) +} + +unsafe extern "system" fn handler(ty: DWORD) -> BOOL { + let globals = globals(); + globals.record_event(ty as EventId); + + // According to https://docs.microsoft.com/en-us/windows/console/handlerroutine + // the handler routine is always invoked in a new thread, thus we don't + // have the same restrictions as in Unix signal handlers, meaning we can + // go ahead and perform the broadcast here. + if globals.broadcast() { + TRUE + } else { + // No one is listening for this notification any more + // let the OS fire the next (possibly the default) handler. + FALSE + } +} + +#[cfg(all(test, not(loom)))] +mod tests { + use super::*; + use crate::runtime::Runtime; + + use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task}; + + #[test] + fn ctrl_c() { + let rt = rt(); + let _enter = rt.enter(); + + let mut ctrl_c = task::spawn(crate::signal::ctrl_c()); + + assert_pending!(ctrl_c.poll()); + + // Windows doesn't have a good programmatic way of sending events + // like sending signals on Unix, so we'll stub out the actual OS + // integration and test that our handling works. + unsafe { + super::handler(CTRL_C_EVENT); + } + + assert_ready_ok!(ctrl_c.poll()); + } + + #[test] + fn ctrl_break() { + let rt = rt(); + + rt.block_on(async { + let mut ctrl_break = assert_ok!(crate::signal::windows::ctrl_break()); + + // Windows doesn't have a good programmatic way of sending events + // like sending signals on Unix, so we'll stub out the actual OS + // integration and test that our handling works. + unsafe { + super::handler(CTRL_BREAK_EVENT); + } + + ctrl_break.recv().await.unwrap(); + }); + } + + fn rt() -> Runtime { + crate::runtime::Builder::new_current_thread() + .build() + .unwrap() + } +} |