diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
commit | 26a029d407be480d791972afb5975cf62c9360a6 (patch) | |
tree | f435a8308119effd964b339f76abb83a57c29483 /third_party/rust/tokio/src/signal | |
parent | Initial commit. (diff) | |
download | firefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz firefox-26a029d407be480d791972afb5975cf62c9360a6.zip |
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/signal')
-rw-r--r-- | third_party/rust/tokio/src/signal/ctrl_c.rs | 62 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/mod.rs | 100 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/registry.rs | 283 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/reusable_box.rs | 227 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/unix.rs | 530 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/windows.rs | 524 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/windows/stub.rs | 25 | ||||
-rw-r--r-- | third_party/rust/tokio/src/signal/windows/sys.rs | 229 |
8 files changed, 1980 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/signal/ctrl_c.rs b/third_party/rust/tokio/src/signal/ctrl_c.rs new file mode 100644 index 0000000000..b26ab7ead6 --- /dev/null +++ b/third_party/rust/tokio/src/signal/ctrl_c.rs @@ -0,0 +1,62 @@ +#[cfg(unix)] +use super::unix::{self as os_impl}; +#[cfg(windows)] +use super::windows::{self as os_impl}; + +use std::io; + +/// Completes when a "ctrl-c" notification is sent to the process. +/// +/// While signals are handled very differently between Unix and Windows, both +/// platforms support receiving a signal on "ctrl-c". This function provides a +/// portable API for receiving this notification. +/// +/// Once the returned future is polled, a listener is registered. The future +/// will complete on the first received `ctrl-c` **after** the initial call to +/// either `Future::poll` or `.await`. +/// +/// # Caveats +/// +/// On Unix platforms, the first time that a `Signal` instance is registered for a +/// particular signal kind, an OS signal-handler is installed which replaces the +/// default platform behavior when that signal is received, **for the duration of +/// the entire process**. +/// +/// For example, Unix systems will terminate a process by default when it +/// receives a signal generated by "CTRL+C" on the terminal. But, when a +/// `ctrl_c` stream is created to listen for this signal, the time it arrives, +/// it will be translated to a stream event, and the process will continue to +/// execute. **Even if this `Signal` instance is dropped, subsequent SIGINT +/// deliveries will end up captured by Tokio, and the default platform behavior +/// will NOT be reset**. +/// +/// Thus, applications should take care to ensure the expected signal behavior +/// occurs as expected after listening for specific signals. +/// +/// # Examples +/// +/// ```rust,no_run +/// use tokio::signal; +/// +/// #[tokio::main] +/// async fn main() { +/// println!("waiting for ctrl-c"); +/// +/// signal::ctrl_c().await.expect("failed to listen for event"); +/// +/// println!("received ctrl-c event"); +/// } +/// ``` +/// +/// Listen in the background: +/// +/// ```rust,no_run +/// tokio::spawn(async move { +/// tokio::signal::ctrl_c().await.unwrap(); +/// // Your handler here +/// }); +/// ``` +pub async fn ctrl_c() -> io::Result<()> { + os_impl::ctrl_c()?.recv().await; + Ok(()) +} diff --git a/third_party/rust/tokio/src/signal/mod.rs b/third_party/rust/tokio/src/signal/mod.rs new file mode 100644 index 0000000000..3aacc60efc --- /dev/null +++ b/third_party/rust/tokio/src/signal/mod.rs @@ -0,0 +1,100 @@ +//! Asynchronous signal handling for Tokio. +//! +//! Note that signal handling is in general a very tricky topic and should be +//! used with great care. This crate attempts to implement 'best practice' for +//! signal handling, but it should be evaluated for your own applications' needs +//! to see if it's suitable. +//! +//! There are some fundamental limitations of this crate documented on the OS +//! specific structures, as well. +//! +//! # Examples +//! +//! Print on "ctrl-c" notification. +//! +//! ```rust,no_run +//! use tokio::signal; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! signal::ctrl_c().await?; +//! println!("ctrl-c received!"); +//! Ok(()) +//! } +//! ``` +//! +//! Wait for SIGHUP on Unix +//! +//! ```rust,no_run +//! # #[cfg(unix)] { +//! use tokio::signal::unix::{signal, SignalKind}; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! // An infinite stream of hangup signals. +//! let mut stream = signal(SignalKind::hangup())?; +//! +//! // Print whenever a HUP signal is received +//! loop { +//! stream.recv().await; +//! println!("got signal HUP"); +//! } +//! } +//! # } +//! ``` +use crate::sync::watch::Receiver; +use std::task::{Context, Poll}; + +mod ctrl_c; +pub use ctrl_c::ctrl_c; + +pub(crate) mod registry; + +mod os { + #[cfg(unix)] + pub(crate) use super::unix::{OsExtraData, OsStorage}; + + #[cfg(windows)] + pub(crate) use super::windows::{OsExtraData, OsStorage}; +} + +pub mod unix; +pub mod windows; + +mod reusable_box; +use self::reusable_box::ReusableBoxFuture; + +#[derive(Debug)] +struct RxFuture { + inner: ReusableBoxFuture<Receiver<()>>, +} + +async fn make_future(mut rx: Receiver<()>) -> Receiver<()> { + match rx.changed().await { + Ok(()) => rx, + Err(_) => panic!("signal sender went away"), + } +} + +impl RxFuture { + fn new(rx: Receiver<()>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } + + async fn recv(&mut self) -> Option<()> { + use crate::future::poll_fn; + poll_fn(|cx| self.poll_recv(cx)).await + } + + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + match self.inner.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(rx) => { + self.inner.set(make_future(rx)); + Poll::Ready(Some(())) + } + } + } +} diff --git a/third_party/rust/tokio/src/signal/registry.rs b/third_party/rust/tokio/src/signal/registry.rs new file mode 100644 index 0000000000..48e98c832f --- /dev/null +++ b/third_party/rust/tokio/src/signal/registry.rs @@ -0,0 +1,283 @@ +#![allow(clippy::unit_arg)] + +use crate::signal::os::{OsExtraData, OsStorage}; +use crate::sync::watch; +use crate::util::once_cell::OnceCell; + +use std::ops; +use std::sync::atomic::{AtomicBool, Ordering}; + +pub(crate) type EventId = usize; + +/// State for a specific event, whether a notification is pending delivery, +/// and what listeners are registered. +#[derive(Debug)] +pub(crate) struct EventInfo { + pending: AtomicBool, + tx: watch::Sender<()>, +} + +impl Default for EventInfo { + fn default() -> Self { + let (tx, _rx) = watch::channel(()); + + Self { + pending: AtomicBool::new(false), + tx, + } + } +} + +/// An interface for retrieving the `EventInfo` for a particular eventId. +pub(crate) trait Storage { + /// Gets the `EventInfo` for `id` if it exists. + fn event_info(&self, id: EventId) -> Option<&EventInfo>; + + /// Invokes `f` once for each defined `EventInfo` in this storage. + fn for_each<'a, F>(&'a self, f: F) + where + F: FnMut(&'a EventInfo); +} + +impl Storage for Vec<EventInfo> { + fn event_info(&self, id: EventId) -> Option<&EventInfo> { + self.get(id) + } + + fn for_each<'a, F>(&'a self, f: F) + where + F: FnMut(&'a EventInfo), + { + self.iter().for_each(f) + } +} + +/// An interface for initializing a type. Useful for situations where we cannot +/// inject a configured instance in the constructor of another type. +pub(crate) trait Init { + fn init() -> Self; +} + +/// Manages and distributes event notifications to any registered listeners. +/// +/// Generic over the underlying storage to allow for domain specific +/// optimizations (e.g. eventIds may or may not be contiguous). +#[derive(Debug)] +pub(crate) struct Registry<S> { + storage: S, +} + +impl<S> Registry<S> { + fn new(storage: S) -> Self { + Self { storage } + } +} + +impl<S: Storage> Registry<S> { + /// Registers a new listener for `event_id`. + fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { + self.storage + .event_info(event_id) + .unwrap_or_else(|| panic!("invalid event_id: {}", event_id)) + .tx + .subscribe() + } + + /// Marks `event_id` as having been delivered, without broadcasting it to + /// any listeners. + fn record_event(&self, event_id: EventId) { + if let Some(event_info) = self.storage.event_info(event_id) { + event_info.pending.store(true, Ordering::SeqCst) + } + } + + /// Broadcasts all previously recorded events to their respective listeners. + /// + /// Returns `true` if an event was delivered to at least one listener. + fn broadcast(&self) -> bool { + let mut did_notify = false; + self.storage.for_each(|event_info| { + // Any signal of this kind arrived since we checked last? + if !event_info.pending.swap(false, Ordering::SeqCst) { + return; + } + + // Ignore errors if there are no listeners + if event_info.tx.send(()).is_ok() { + did_notify = true; + } + }); + + did_notify + } +} + +pub(crate) struct Globals { + extra: OsExtraData, + registry: Registry<OsStorage>, +} + +impl ops::Deref for Globals { + type Target = OsExtraData; + + fn deref(&self) -> &Self::Target { + &self.extra + } +} + +impl Globals { + /// Registers a new listener for `event_id`. + pub(crate) fn register_listener(&self, event_id: EventId) -> watch::Receiver<()> { + self.registry.register_listener(event_id) + } + + /// Marks `event_id` as having been delivered, without broadcasting it to + /// any listeners. + pub(crate) fn record_event(&self, event_id: EventId) { + self.registry.record_event(event_id); + } + + /// Broadcasts all previously recorded events to their respective listeners. + /// + /// Returns `true` if an event was delivered to at least one listener. + pub(crate) fn broadcast(&self) -> bool { + self.registry.broadcast() + } + + #[cfg(unix)] + pub(crate) fn storage(&self) -> &OsStorage { + &self.registry.storage + } +} + +fn globals_init() -> Globals +where + OsExtraData: 'static + Send + Sync + Init, + OsStorage: 'static + Send + Sync + Init, +{ + Globals { + extra: OsExtraData::init(), + registry: Registry::new(OsStorage::init()), + } +} + +pub(crate) fn globals() -> &'static Globals +where + OsExtraData: 'static + Send + Sync + Init, + OsStorage: 'static + Send + Sync + Init, +{ + static GLOBALS: OnceCell<Globals> = OnceCell::new(); + + GLOBALS.get(globals_init) +} + +#[cfg(all(test, not(loom)))] +mod tests { + use super::*; + use crate::runtime::{self, Runtime}; + use crate::sync::{oneshot, watch}; + + use futures::future; + + #[test] + fn smoke() { + let rt = rt(); + rt.block_on(async move { + let registry = Registry::new(vec![ + EventInfo::default(), + EventInfo::default(), + EventInfo::default(), + ]); + + let first = registry.register_listener(0); + let second = registry.register_listener(1); + let third = registry.register_listener(2); + + let (fire, wait) = oneshot::channel(); + + crate::spawn(async { + wait.await.expect("wait failed"); + + // Record some events which should get coalesced + registry.record_event(0); + registry.record_event(0); + registry.record_event(1); + registry.record_event(1); + registry.broadcast(); + + // Yield so the previous broadcast can get received + // + // This yields many times since the block_on task is only polled every 61 + // ticks. + for _ in 0..100 { + crate::task::yield_now().await; + } + + // Send subsequent signal + registry.record_event(0); + registry.broadcast(); + + drop(registry); + }); + + let _ = fire.send(()); + let all = future::join3(collect(first), collect(second), collect(third)); + + let (first_results, second_results, third_results) = all.await; + assert_eq!(2, first_results.len()); + assert_eq!(1, second_results.len()); + assert_eq!(0, third_results.len()); + }); + } + + #[test] + #[should_panic = "invalid event_id: 1"] + fn register_panics_on_invalid_input() { + let registry = Registry::new(vec![EventInfo::default()]); + + registry.register_listener(1); + } + + #[test] + fn record_invalid_event_does_nothing() { + let registry = Registry::new(vec![EventInfo::default()]); + registry.record_event(1302); + } + + #[test] + fn broadcast_returns_if_at_least_one_event_fired() { + let registry = Registry::new(vec![EventInfo::default(), EventInfo::default()]); + + registry.record_event(0); + assert!(!registry.broadcast()); + + let first = registry.register_listener(0); + let second = registry.register_listener(1); + + registry.record_event(0); + assert!(registry.broadcast()); + + drop(first); + registry.record_event(0); + assert!(!registry.broadcast()); + + drop(second); + } + + fn rt() -> Runtime { + runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap() + } + + async fn collect(mut rx: watch::Receiver<()>) -> Vec<()> { + let mut ret = vec![]; + + while let Ok(v) = rx.changed().await { + ret.push(v); + } + + ret + } +} diff --git a/third_party/rust/tokio/src/signal/reusable_box.rs b/third_party/rust/tokio/src/signal/reusable_box.rs new file mode 100644 index 0000000000..796fa210b0 --- /dev/null +++ b/third_party/rust/tokio/src/signal/reusable_box.rs @@ -0,0 +1,227 @@ +use std::alloc::Layout; +use std::future::Future; +use std::panic::AssertUnwindSafe; +use std::pin::Pin; +use std::ptr::{self, NonNull}; +use std::task::{Context, Poll}; +use std::{fmt, panic}; + +/// A reusable `Pin<Box<dyn Future<Output = T> + Send>>`. +/// +/// This type lets you replace the future stored in the box without +/// reallocating when the size and alignment permits this. +pub(crate) struct ReusableBoxFuture<T> { + boxed: NonNull<dyn Future<Output = T> + Send>, +} + +impl<T> ReusableBoxFuture<T> { + /// Create a new `ReusableBoxFuture<T>` containing the provided future. + pub(crate) fn new<F>(future: F) -> Self + where + F: Future<Output = T> + Send + 'static, + { + let boxed: Box<dyn Future<Output = T> + Send> = Box::new(future); + + let boxed = Box::into_raw(boxed); + + // SAFETY: Box::into_raw does not return null pointers. + let boxed = unsafe { NonNull::new_unchecked(boxed) }; + + Self { boxed } + } + + /// Replaces the future currently stored in this box. + /// + /// This reallocates if and only if the layout of the provided future is + /// different from the layout of the currently stored future. + pub(crate) fn set<F>(&mut self, future: F) + where + F: Future<Output = T> + Send + 'static, + { + if let Err(future) = self.try_set(future) { + *self = Self::new(future); + } + } + + /// Replaces the future currently stored in this box. + /// + /// This function never reallocates, but returns an error if the provided + /// future has a different size or alignment from the currently stored + /// future. + pub(crate) fn try_set<F>(&mut self, future: F) -> Result<(), F> + where + F: Future<Output = T> + Send + 'static, + { + // SAFETY: The pointer is not dangling. + let self_layout = { + let dyn_future: &(dyn Future<Output = T> + Send) = unsafe { self.boxed.as_ref() }; + Layout::for_value(dyn_future) + }; + + if Layout::new::<F>() == self_layout { + // SAFETY: We just checked that the layout of F is correct. + unsafe { + self.set_same_layout(future); + } + + Ok(()) + } else { + Err(future) + } + } + + /// Sets the current future. + /// + /// # Safety + /// + /// This function requires that the layout of the provided future is the + /// same as `self.layout`. + unsafe fn set_same_layout<F>(&mut self, future: F) + where + F: Future<Output = T> + Send + 'static, + { + // Drop the existing future, catching any panics. + let result = panic::catch_unwind(AssertUnwindSafe(|| { + ptr::drop_in_place(self.boxed.as_ptr()); + })); + + // Overwrite the future behind the pointer. This is safe because the + // allocation was allocated with the same size and alignment as the type F. + let self_ptr: *mut F = self.boxed.as_ptr() as *mut F; + ptr::write(self_ptr, future); + + // Update the vtable of self.boxed. The pointer is not null because we + // just got it from self.boxed, which is not null. + self.boxed = NonNull::new_unchecked(self_ptr); + + // If the old future's destructor panicked, resume unwinding. + match result { + Ok(()) => {} + Err(payload) => { + panic::resume_unwind(payload); + } + } + } + + /// Gets a pinned reference to the underlying future. + pub(crate) fn get_pin(&mut self) -> Pin<&mut (dyn Future<Output = T> + Send)> { + // SAFETY: The user of this box cannot move the box, and we do not move it + // either. + unsafe { Pin::new_unchecked(self.boxed.as_mut()) } + } + + /// Polls the future stored inside this box. + pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<T> { + self.get_pin().poll(cx) + } +} + +impl<T> Future for ReusableBoxFuture<T> { + type Output = T; + + /// Polls the future stored inside this box. + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { + Pin::into_inner(self).get_pin().poll(cx) + } +} + +// The future stored inside ReusableBoxFuture<T> must be Send. +unsafe impl<T> Send for ReusableBoxFuture<T> {} + +// The only method called on self.boxed is poll, which takes &mut self, so this +// struct being Sync does not permit any invalid access to the Future, even if +// the future is not Sync. +unsafe impl<T> Sync for ReusableBoxFuture<T> {} + +// Just like a Pin<Box<dyn Future>> is always Unpin, so is this type. +impl<T> Unpin for ReusableBoxFuture<T> {} + +impl<T> Drop for ReusableBoxFuture<T> { + fn drop(&mut self) { + unsafe { + drop(Box::from_raw(self.boxed.as_ptr())); + } + } +} + +impl<T> fmt::Debug for ReusableBoxFuture<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReusableBoxFuture").finish() + } +} + +#[cfg(test)] +mod test { + use super::ReusableBoxFuture; + use futures::future::FutureExt; + use std::alloc::Layout; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + #[test] + fn test_different_futures() { + let fut = async move { 10 }; + // Not zero sized! + assert_eq!(Layout::for_value(&fut).size(), 1); + + let mut b = ReusableBoxFuture::new(fut); + + assert_eq!(b.get_pin().now_or_never(), Some(10)); + + b.try_set(async move { 20 }) + .unwrap_or_else(|_| panic!("incorrect size")); + + assert_eq!(b.get_pin().now_or_never(), Some(20)); + + b.try_set(async move { 30 }) + .unwrap_or_else(|_| panic!("incorrect size")); + + assert_eq!(b.get_pin().now_or_never(), Some(30)); + } + + #[test] + fn test_different_sizes() { + let fut1 = async move { 10 }; + let val = [0u32; 1000]; + let fut2 = async move { val[0] }; + let fut3 = ZeroSizedFuture {}; + + assert_eq!(Layout::for_value(&fut1).size(), 1); + assert_eq!(Layout::for_value(&fut2).size(), 4004); + assert_eq!(Layout::for_value(&fut3).size(), 0); + + let mut b = ReusableBoxFuture::new(fut1); + assert_eq!(b.get_pin().now_or_never(), Some(10)); + b.set(fut2); + assert_eq!(b.get_pin().now_or_never(), Some(0)); + b.set(fut3); + assert_eq!(b.get_pin().now_or_never(), Some(5)); + } + + struct ZeroSizedFuture {} + impl Future for ZeroSizedFuture { + type Output = u32; + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u32> { + Poll::Ready(5) + } + } + + #[test] + fn test_zero_sized() { + let fut = ZeroSizedFuture {}; + // Zero sized! + assert_eq!(Layout::for_value(&fut).size(), 0); + + let mut b = ReusableBoxFuture::new(fut); + + assert_eq!(b.get_pin().now_or_never(), Some(5)); + assert_eq!(b.get_pin().now_or_never(), Some(5)); + + b.try_set(ZeroSizedFuture {}) + .unwrap_or_else(|_| panic!("incorrect size")); + + assert_eq!(b.get_pin().now_or_never(), Some(5)); + assert_eq!(b.get_pin().now_or_never(), Some(5)); + } +} diff --git a/third_party/rust/tokio/src/signal/unix.rs b/third_party/rust/tokio/src/signal/unix.rs new file mode 100644 index 0000000000..ae5c13085e --- /dev/null +++ b/third_party/rust/tokio/src/signal/unix.rs @@ -0,0 +1,530 @@ +//! Unix-specific types for signal handling. +//! +//! This module is only defined on Unix platforms and contains the primary +//! `Signal` type for receiving notifications of signals. + +#![cfg(unix)] +#![cfg_attr(docsrs, doc(cfg(all(unix, feature = "signal"))))] + +use crate::runtime::scheduler; +use crate::runtime::signal::Handle; +use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; +use crate::signal::RxFuture; +use crate::sync::watch; + +use mio::net::UnixStream; +use std::io::{self, Error, ErrorKind, Write}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Once; +use std::task::{Context, Poll}; + +pub(crate) type OsStorage = Vec<SignalInfo>; + +impl Init for OsStorage { + fn init() -> Self { + // There are reliable signals ranging from 1 to 33 available on every Unix platform. + #[cfg(not(target_os = "linux"))] + let possible = 0..=33; + + // On Linux, there are additional real-time signals available. + #[cfg(target_os = "linux")] + let possible = 0..=libc::SIGRTMAX(); + + possible.map(|_| SignalInfo::default()).collect() + } +} + +impl Storage for OsStorage { + fn event_info(&self, id: EventId) -> Option<&EventInfo> { + self.get(id).map(|si| &si.event_info) + } + + fn for_each<'a, F>(&'a self, f: F) + where + F: FnMut(&'a EventInfo), + { + self.iter().map(|si| &si.event_info).for_each(f) + } +} + +#[derive(Debug)] +pub(crate) struct OsExtraData { + sender: UnixStream, + pub(crate) receiver: UnixStream, +} + +impl Init for OsExtraData { + fn init() -> Self { + let (receiver, sender) = UnixStream::pair().expect("failed to create UnixStream"); + + Self { sender, receiver } + } +} + +/// Represents the specific kind of signal to listen for. +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub struct SignalKind(libc::c_int); + +impl SignalKind { + /// Allows for listening to any valid OS signal. + /// + /// For example, this can be used for listening for platform-specific + /// signals. + /// ```rust,no_run + /// # use tokio::signal::unix::SignalKind; + /// # let signum = -1; + /// // let signum = libc::OS_SPECIFIC_SIGNAL; + /// let kind = SignalKind::from_raw(signum); + /// ``` + // Use `std::os::raw::c_int` on public API to prevent leaking a non-stable + // type alias from libc. + // `libc::c_int` and `std::os::raw::c_int` are currently the same type, and are + // unlikely to change to other types, but technically libc can change this + // in the future minor version. + // See https://github.com/tokio-rs/tokio/issues/3767 for more. + pub const fn from_raw(signum: std::os::raw::c_int) -> Self { + Self(signum as libc::c_int) + } + + /// Get the signal's numeric value. + /// + /// ```rust + /// # use tokio::signal::unix::SignalKind; + /// let kind = SignalKind::interrupt(); + /// assert_eq!(kind.as_raw_value(), libc::SIGINT); + /// ``` + pub const fn as_raw_value(&self) -> std::os::raw::c_int { + self.0 + } + + /// Represents the SIGALRM signal. + /// + /// On Unix systems this signal is sent when a real-time timer has expired. + /// By default, the process is terminated by this signal. + pub const fn alarm() -> Self { + Self(libc::SIGALRM) + } + + /// Represents the SIGCHLD signal. + /// + /// On Unix systems this signal is sent when the status of a child process + /// has changed. By default, this signal is ignored. + pub const fn child() -> Self { + Self(libc::SIGCHLD) + } + + /// Represents the SIGHUP signal. + /// + /// On Unix systems this signal is sent when the terminal is disconnected. + /// By default, the process is terminated by this signal. + pub const fn hangup() -> Self { + Self(libc::SIGHUP) + } + + /// Represents the SIGINFO signal. + /// + /// On Unix systems this signal is sent to request a status update from the + /// process. By default, this signal is ignored. + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ))] + pub const fn info() -> Self { + Self(libc::SIGINFO) + } + + /// Represents the SIGINT signal. + /// + /// On Unix systems this signal is sent to interrupt a program. + /// By default, the process is terminated by this signal. + pub const fn interrupt() -> Self { + Self(libc::SIGINT) + } + + /// Represents the SIGIO signal. + /// + /// On Unix systems this signal is sent when I/O operations are possible + /// on some file descriptor. By default, this signal is ignored. + pub const fn io() -> Self { + Self(libc::SIGIO) + } + + /// Represents the SIGPIPE signal. + /// + /// On Unix systems this signal is sent when the process attempts to write + /// to a pipe which has no reader. By default, the process is terminated by + /// this signal. + pub const fn pipe() -> Self { + Self(libc::SIGPIPE) + } + + /// Represents the SIGQUIT signal. + /// + /// On Unix systems this signal is sent to issue a shutdown of the + /// process, after which the OS will dump the process core. + /// By default, the process is terminated by this signal. + pub const fn quit() -> Self { + Self(libc::SIGQUIT) + } + + /// Represents the SIGTERM signal. + /// + /// On Unix systems this signal is sent to issue a shutdown of the + /// process. By default, the process is terminated by this signal. + pub const fn terminate() -> Self { + Self(libc::SIGTERM) + } + + /// Represents the SIGUSR1 signal. + /// + /// On Unix systems this is a user defined signal. + /// By default, the process is terminated by this signal. + pub const fn user_defined1() -> Self { + Self(libc::SIGUSR1) + } + + /// Represents the SIGUSR2 signal. + /// + /// On Unix systems this is a user defined signal. + /// By default, the process is terminated by this signal. + pub const fn user_defined2() -> Self { + Self(libc::SIGUSR2) + } + + /// Represents the SIGWINCH signal. + /// + /// On Unix systems this signal is sent when the terminal window is resized. + /// By default, this signal is ignored. + pub const fn window_change() -> Self { + Self(libc::SIGWINCH) + } +} + +impl From<std::os::raw::c_int> for SignalKind { + fn from(signum: std::os::raw::c_int) -> Self { + Self::from_raw(signum as libc::c_int) + } +} + +impl From<SignalKind> for std::os::raw::c_int { + fn from(kind: SignalKind) -> Self { + kind.as_raw_value() + } +} + +pub(crate) struct SignalInfo { + event_info: EventInfo, + init: Once, + initialized: AtomicBool, +} + +impl Default for SignalInfo { + fn default() -> SignalInfo { + SignalInfo { + event_info: Default::default(), + init: Once::new(), + initialized: AtomicBool::new(false), + } + } +} + +/// Our global signal handler for all signals registered by this module. +/// +/// The purpose of this signal handler is to primarily: +/// +/// 1. Flag that our specific signal was received (e.g. store an atomic flag) +/// 2. Wake up the driver by writing a byte to a pipe +/// +/// Those two operations should both be async-signal safe. +fn action(globals: &'static Globals, signal: libc::c_int) { + globals.record_event(signal as EventId); + + // Send a wakeup, ignore any errors (anything reasonably possible is + // full pipe and then it will wake up anyway). + let mut sender = &globals.sender; + drop(sender.write(&[1])); +} + +/// Enables this module to receive signal notifications for the `signal` +/// provided. +/// +/// This will register the signal handler if it hasn't already been registered, +/// returning any error along the way if that fails. +fn signal_enable(signal: SignalKind, handle: &Handle) -> io::Result<()> { + let signal = signal.0; + if signal < 0 || signal_hook_registry::FORBIDDEN.contains(&signal) { + return Err(Error::new( + ErrorKind::Other, + format!("Refusing to register signal {}", signal), + )); + } + + // Check that we have a signal driver running + handle.check_inner()?; + + let globals = globals(); + let siginfo = match globals.storage().get(signal as EventId) { + Some(slot) => slot, + None => return Err(io::Error::new(io::ErrorKind::Other, "signal too large")), + }; + let mut registered = Ok(()); + siginfo.init.call_once(|| { + registered = unsafe { + signal_hook_registry::register(signal, move || action(globals, signal)).map(|_| ()) + }; + if registered.is_ok() { + siginfo.initialized.store(true, Ordering::Relaxed); + } + }); + registered?; + // If the call_once failed, it won't be retried on the next attempt to register the signal. In + // such case it is not run, registered is still `Ok(())`, initialized is still `false`. + if siginfo.initialized.load(Ordering::Relaxed) { + Ok(()) + } else { + Err(Error::new( + ErrorKind::Other, + "Failed to register signal handler", + )) + } +} + +/// An listener for receiving a particular type of OS signal. +/// +/// The listener can be turned into a `Stream` using [`SignalStream`]. +/// +/// [`SignalStream`]: https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.SignalStream.html +/// +/// In general signal handling on Unix is a pretty tricky topic, and this +/// structure is no exception! There are some important limitations to keep in +/// mind when using `Signal` streams: +/// +/// * Signals handling in Unix already necessitates coalescing signals +/// together sometimes. This `Signal` stream is also no exception here in +/// that it will also coalesce signals. That is, even if the signal handler +/// for this process runs multiple times, the `Signal` stream may only return +/// one signal notification. Specifically, before `poll` is called, all +/// signal notifications are coalesced into one item returned from `poll`. +/// Once `poll` has been called, however, a further signal is guaranteed to +/// be yielded as an item. +/// +/// Put another way, any element pulled off the returned listener corresponds to +/// *at least one* signal, but possibly more. +/// +/// * Signal handling in general is relatively inefficient. Although some +/// improvements are possible in this crate, it's recommended to not plan on +/// having millions of signal channels open. +/// +/// If you've got any questions about this feel free to open an issue on the +/// repo! New approaches to alleviate some of these limitations are always +/// appreciated! +/// +/// # Caveats +/// +/// The first time that a `Signal` instance is registered for a particular +/// signal kind, an OS signal-handler is installed which replaces the default +/// platform behavior when that signal is received, **for the duration of the +/// entire process**. +/// +/// For example, Unix systems will terminate a process by default when it +/// receives SIGINT. But, when a `Signal` instance is created to listen for +/// this signal, the next SIGINT that arrives will be translated to a stream +/// event, and the process will continue to execute. **Even if this `Signal` +/// instance is dropped, subsequent SIGINT deliveries will end up captured by +/// Tokio, and the default platform behavior will NOT be reset**. +/// +/// Thus, applications should take care to ensure the expected signal behavior +/// occurs as expected after listening for specific signals. +/// +/// # Examples +/// +/// Wait for SIGHUP +/// +/// ```rust,no_run +/// use tokio::signal::unix::{signal, SignalKind}; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box<dyn std::error::Error>> { +/// // An infinite stream of hangup signals. +/// let mut sig = signal(SignalKind::hangup())?; +/// +/// // Print whenever a HUP signal is received +/// loop { +/// sig.recv().await; +/// println!("got signal HUP"); +/// } +/// } +/// ``` +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct Signal { + inner: RxFuture, +} + +/// Creates a new listener which will receive notifications when the current +/// process receives the specified signal `kind`. +/// +/// This function will create a new stream which binds to the default reactor. +/// The `Signal` stream is an infinite stream which will receive +/// notifications whenever a signal is received. More documentation can be +/// found on `Signal` itself, but to reiterate: +/// +/// * Signals may be coalesced beyond what the kernel already does. +/// * Once a signal handler is registered with the process the underlying +/// libc signal handler is never unregistered. +/// +/// A `Signal` stream can be created for a particular signal number +/// multiple times. When a signal is received then all the associated +/// channels will receive the signal notification. +/// +/// # Errors +/// +/// * If the lower-level C functions fail for some reason. +/// * If the previous initialization of this specific signal failed. +/// * If the signal is one of +/// [`signal_hook::FORBIDDEN`](fn@signal_hook_registry::register#panics) +/// +/// # Panics +/// +/// This function panics if there is no current reactor set, or if the `rt` +/// feature flag is not enabled. +#[track_caller] +pub fn signal(kind: SignalKind) -> io::Result<Signal> { + let handle = scheduler::Handle::current(); + let rx = signal_with_handle(kind, handle.driver().signal())?; + + Ok(Signal { + inner: RxFuture::new(rx), + }) +} + +pub(crate) fn signal_with_handle( + kind: SignalKind, + handle: &Handle, +) -> io::Result<watch::Receiver<()>> { + // Turn the signal delivery on once we are ready for it + signal_enable(kind, handle)?; + + Ok(globals().register_listener(kind.0 as EventId)) +} + +impl Signal { + /// Receives the next signal notification event. + /// + /// `None` is returned if no more events can be received by this stream. + /// + /// # Cancel safety + /// + /// This method is cancel safe. If you use it as the event in a + /// [`tokio::select!`](crate::select) statement and some other branch + /// completes first, then it is guaranteed that no signal is lost. + /// + /// # Examples + /// + /// Wait for SIGHUP + /// + /// ```rust,no_run + /// use tokio::signal::unix::{signal, SignalKind}; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// // An infinite stream of hangup signals. + /// let mut stream = signal(SignalKind::hangup())?; + /// + /// // Print whenever a HUP signal is received + /// loop { + /// stream.recv().await; + /// println!("got signal HUP"); + /// } + /// } + /// ``` + pub async fn recv(&mut self) -> Option<()> { + self.inner.recv().await + } + + /// Polls to receive the next signal notification event, outside of an + /// `async` context. + /// + /// This method returns: + /// + /// * `Poll::Pending` if no signals are available but the channel is not + /// closed. + /// * `Poll::Ready(Some(()))` if a signal is available. + /// * `Poll::Ready(None)` if the channel has been closed and all signals + /// sent before it was closed have been received. + /// + /// # Examples + /// + /// Polling from a manually implemented future + /// + /// ```rust,no_run + /// use std::pin::Pin; + /// use std::future::Future; + /// use std::task::{Context, Poll}; + /// use tokio::signal::unix::Signal; + /// + /// struct MyFuture { + /// signal: Signal, + /// } + /// + /// impl Future for MyFuture { + /// type Output = Option<()>; + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + /// println!("polling MyFuture"); + /// self.signal.poll_recv(cx) + /// } + /// } + /// ``` + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +// Work around for abstracting streams internally +pub(crate) trait InternalStream { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>; +} + +impl InternalStream for Signal { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.poll_recv(cx) + } +} + +pub(crate) fn ctrl_c() -> io::Result<Signal> { + signal(SignalKind::interrupt()) +} + +#[cfg(all(test, not(loom)))] +mod tests { + use super::*; + + #[test] + fn signal_enable_error_on_invalid_input() { + signal_enable(SignalKind::from_raw(-1), &Handle::default()).unwrap_err(); + } + + #[test] + fn signal_enable_error_on_forbidden_input() { + signal_enable( + SignalKind::from_raw(signal_hook_registry::FORBIDDEN[0]), + &Handle::default(), + ) + .unwrap_err(); + } + + #[test] + fn from_c_int() { + assert_eq!(SignalKind::from(2), SignalKind::interrupt()); + } + + #[test] + fn into_c_int() { + let value: std::os::raw::c_int = SignalKind::interrupt().into(); + assert_eq!(value, libc::SIGINT as _); + } +} diff --git a/third_party/rust/tokio/src/signal/windows.rs b/third_party/rust/tokio/src/signal/windows.rs new file mode 100644 index 0000000000..2f70f98b15 --- /dev/null +++ b/third_party/rust/tokio/src/signal/windows.rs @@ -0,0 +1,524 @@ +//! Windows-specific types for signal handling. +//! +//! This module is only defined on Windows and allows receiving "ctrl-c", +//! "ctrl-break", "ctrl-logoff", "ctrl-shutdown", and "ctrl-close" +//! notifications. These events are listened for via the `SetConsoleCtrlHandler` +//! function which receives the corresponding windows_sys event type. + +#![cfg(any(windows, docsrs))] +#![cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))] + +use crate::signal::RxFuture; +use std::io; +use std::task::{Context, Poll}; + +#[cfg(not(docsrs))] +#[path = "windows/sys.rs"] +mod imp; +#[cfg(not(docsrs))] +pub(crate) use self::imp::{OsExtraData, OsStorage}; + +#[cfg(docsrs)] +#[path = "windows/stub.rs"] +mod imp; + +/// Creates a new listener which receives "ctrl-c" notifications sent to the +/// process. +/// +/// # Examples +/// +/// ```rust,no_run +/// use tokio::signal::windows::ctrl_c; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box<dyn std::error::Error>> { +/// // A listener of CTRL-C events. +/// let mut signal = ctrl_c()?; +/// +/// // Print whenever a CTRL-C event is received. +/// for countdown in (0..3).rev() { +/// signal.recv().await; +/// println!("got CTRL-C. {} more to exit", countdown); +/// } +/// +/// Ok(()) +/// } +/// ``` +pub fn ctrl_c() -> io::Result<CtrlC> { + Ok(CtrlC { + inner: self::imp::ctrl_c()?, + }) +} + +/// Represents a listener which receives "ctrl-c" notifications sent to the process +/// via `SetConsoleCtrlHandler`. +/// +/// This event can be turned into a `Stream` using [`CtrlCStream`]. +/// +/// [`CtrlCStream`]: https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.CtrlCStream.html +/// +/// A notification to this process notifies *all* receivers for +/// this event. Moreover, the notifications **are coalesced** if they aren't processed +/// quickly enough. This means that if two notifications are received back-to-back, +/// then the listener may only receive one item about the two notifications. +#[must_use = "listeners do nothing unless polled"] +#[derive(Debug)] +pub struct CtrlC { + inner: RxFuture, +} + +impl CtrlC { + /// Receives the next signal notification event. + /// + /// `None` is returned if no more events can be received by the listener. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tokio::signal::windows::ctrl_c; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// let mut signal = ctrl_c()?; + /// + /// // Print whenever a CTRL-C event is received. + /// for countdown in (0..3).rev() { + /// signal.recv().await; + /// println!("got CTRL-C. {} more to exit", countdown); + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn recv(&mut self) -> Option<()> { + self.inner.recv().await + } + + /// Polls to receive the next signal notification event, outside of an + /// `async` context. + /// + /// `None` is returned if no more events can be received. + /// + /// # Examples + /// + /// Polling from a manually implemented future + /// + /// ```rust,no_run + /// use std::pin::Pin; + /// use std::future::Future; + /// use std::task::{Context, Poll}; + /// use tokio::signal::windows::CtrlC; + /// + /// struct MyFuture { + /// ctrl_c: CtrlC, + /// } + /// + /// impl Future for MyFuture { + /// type Output = Option<()>; + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + /// println!("polling MyFuture"); + /// self.ctrl_c.poll_recv(cx) + /// } + /// } + /// ``` + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +/// Represents a listener which receives "ctrl-break" notifications sent to the process +/// via `SetConsoleCtrlHandler`. +/// +/// This listener can be turned into a `Stream` using [`CtrlBreakStream`]. +/// +/// [`CtrlBreakStream`]: https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.CtrlBreakStream.html +/// +/// A notification to this process notifies *all* receivers for +/// this event. Moreover, the notifications **are coalesced** if they aren't processed +/// quickly enough. This means that if two notifications are received back-to-back, +/// then the listener may only receive one item about the two notifications. +#[must_use = "listeners do nothing unless polled"] +#[derive(Debug)] +pub struct CtrlBreak { + inner: RxFuture, +} + +impl CtrlBreak { + /// Receives the next signal notification event. + /// + /// `None` is returned if no more events can be received by this listener. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tokio::signal::windows::ctrl_break; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// // A listener of CTRL-BREAK events. + /// let mut signal = ctrl_break()?; + /// + /// // Print whenever a CTRL-BREAK event is received. + /// loop { + /// signal.recv().await; + /// println!("got signal CTRL-BREAK"); + /// } + /// } + /// ``` + pub async fn recv(&mut self) -> Option<()> { + self.inner.recv().await + } + + /// Polls to receive the next signal notification event, outside of an + /// `async` context. + /// + /// `None` is returned if no more events can be received by this listener. + /// + /// # Examples + /// + /// Polling from a manually implemented future + /// + /// ```rust,no_run + /// use std::pin::Pin; + /// use std::future::Future; + /// use std::task::{Context, Poll}; + /// use tokio::signal::windows::CtrlBreak; + /// + /// struct MyFuture { + /// ctrl_break: CtrlBreak, + /// } + /// + /// impl Future for MyFuture { + /// type Output = Option<()>; + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + /// println!("polling MyFuture"); + /// self.ctrl_break.poll_recv(cx) + /// } + /// } + /// ``` + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +/// Creates a new listener which receives "ctrl-break" notifications sent to the +/// process. +/// +/// # Examples +/// +/// ```rust,no_run +/// use tokio::signal::windows::ctrl_break; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box<dyn std::error::Error>> { +/// // A listener of CTRL-BREAK events. +/// let mut signal = ctrl_break()?; +/// +/// // Print whenever a CTRL-BREAK event is received. +/// loop { +/// signal.recv().await; +/// println!("got signal CTRL-BREAK"); +/// } +/// } +/// ``` +pub fn ctrl_break() -> io::Result<CtrlBreak> { + Ok(CtrlBreak { + inner: self::imp::ctrl_break()?, + }) +} + +/// Creates a new listener which receives "ctrl-close" notifications sent to the +/// process. +/// +/// # Examples +/// +/// ```rust,no_run +/// use tokio::signal::windows::ctrl_close; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box<dyn std::error::Error>> { +/// // A listener of CTRL-CLOSE events. +/// let mut signal = ctrl_close()?; +/// +/// // Print whenever a CTRL-CLOSE event is received. +/// for countdown in (0..3).rev() { +/// signal.recv().await; +/// println!("got CTRL-CLOSE. {} more to exit", countdown); +/// } +/// +/// Ok(()) +/// } +/// ``` +pub fn ctrl_close() -> io::Result<CtrlClose> { + Ok(CtrlClose { + inner: self::imp::ctrl_close()?, + }) +} + +/// Represents a listener which receives "ctrl-close" notitifications sent to the process +/// via 'SetConsoleCtrlHandler'. +/// +/// A notification to this process notifies *all* listeners listening for +/// this event. Moreover, the notifications **are coalesced** if they aren't processed +/// quickly enough. This means that if two notifications are received back-to-back, +/// then the listener may only receive one item about the two notifications. +#[must_use = "listeners do nothing unless polled"] +#[derive(Debug)] +pub struct CtrlClose { + inner: RxFuture, +} + +impl CtrlClose { + /// Receives the next signal notification event. + /// + /// `None` is returned if no more events can be received by this listener. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tokio::signal::windows::ctrl_close; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// // A listener of CTRL-CLOSE events. + /// let mut signal = ctrl_close()?; + /// + /// // Print whenever a CTRL-CLOSE event is received. + /// signal.recv().await; + /// println!("got CTRL-CLOSE. Cleaning up before exiting"); + /// + /// Ok(()) + /// } + /// ``` + pub async fn recv(&mut self) -> Option<()> { + self.inner.recv().await + } + + /// Polls to receive the next signal notification event, outside of an + /// `async` context. + /// + /// `None` is returned if no more events can be received by this listener. + /// + /// # Examples + /// + /// Polling from a manually implemented future + /// + /// ```rust,no_run + /// use std::pin::Pin; + /// use std::future::Future; + /// use std::task::{Context, Poll}; + /// use tokio::signal::windows::CtrlClose; + /// + /// struct MyFuture { + /// ctrl_close: CtrlClose, + /// } + /// + /// impl Future for MyFuture { + /// type Output = Option<()>; + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + /// println!("polling MyFuture"); + /// self.ctrl_close.poll_recv(cx) + /// } + /// } + /// ``` + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +/// Creates a new listener which receives "ctrl-shutdown" notifications sent to the +/// process. +/// +/// # Examples +/// +/// ```rust,no_run +/// use tokio::signal::windows::ctrl_shutdown; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box<dyn std::error::Error>> { +/// // A listener of CTRL-SHUTDOWN events. +/// let mut signal = ctrl_shutdown()?; +/// +/// signal.recv().await; +/// println!("got CTRL-SHUTDOWN. Cleaning up before exiting"); +/// +/// Ok(()) +/// } +/// ``` +pub fn ctrl_shutdown() -> io::Result<CtrlShutdown> { + Ok(CtrlShutdown { + inner: self::imp::ctrl_shutdown()?, + }) +} + +/// Represents a listener which receives "ctrl-shutdown" notitifications sent to the process +/// via 'SetConsoleCtrlHandler'. +/// +/// A notification to this process notifies *all* listeners listening for +/// this event. Moreover, the notifications **are coalesced** if they aren't processed +/// quickly enough. This means that if two notifications are received back-to-back, +/// then the listener may only receive one item about the two notifications. +#[must_use = "listeners do nothing unless polled"] +#[derive(Debug)] +pub struct CtrlShutdown { + inner: RxFuture, +} + +impl CtrlShutdown { + /// Receives the next signal notification event. + /// + /// `None` is returned if no more events can be received by this listener. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tokio::signal::windows::ctrl_shutdown; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// // A listener of CTRL-SHUTDOWN events. + /// let mut signal = ctrl_shutdown()?; + /// + /// // Print whenever a CTRL-SHUTDOWN event is received. + /// signal.recv().await; + /// println!("got CTRL-SHUTDOWN. Cleaning up before exiting"); + /// + /// Ok(()) + /// } + /// ``` + pub async fn recv(&mut self) -> Option<()> { + self.inner.recv().await + } + + /// Polls to receive the next signal notification event, outside of an + /// `async` context. + /// + /// `None` is returned if no more events can be received by this listener. + /// + /// # Examples + /// + /// Polling from a manually implemented future + /// + /// ```rust,no_run + /// use std::pin::Pin; + /// use std::future::Future; + /// use std::task::{Context, Poll}; + /// use tokio::signal::windows::CtrlShutdown; + /// + /// struct MyFuture { + /// ctrl_shutdown: CtrlShutdown, + /// } + /// + /// impl Future for MyFuture { + /// type Output = Option<()>; + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + /// println!("polling MyFuture"); + /// self.ctrl_shutdown.poll_recv(cx) + /// } + /// } + /// ``` + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} + +/// Creates a new listener which receives "ctrl-logoff" notifications sent to the +/// process. +/// +/// # Examples +/// +/// ```rust,no_run +/// use tokio::signal::windows::ctrl_logoff; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box<dyn std::error::Error>> { +/// // A listener of CTRL-LOGOFF events. +/// let mut signal = ctrl_logoff()?; +/// +/// signal.recv().await; +/// println!("got CTRL-LOGOFF. Cleaning up before exiting"); +/// +/// Ok(()) +/// } +/// ``` +pub fn ctrl_logoff() -> io::Result<CtrlLogoff> { + Ok(CtrlLogoff { + inner: self::imp::ctrl_logoff()?, + }) +} + +/// Represents a listener which receives "ctrl-logoff" notitifications sent to the process +/// via 'SetConsoleCtrlHandler'. +/// +/// A notification to this process notifies *all* listeners listening for +/// this event. Moreover, the notifications **are coalesced** if they aren't processed +/// quickly enough. This means that if two notifications are received back-to-back, +/// then the listener may only receive one item about the two notifications. +#[must_use = "listeners do nothing unless polled"] +#[derive(Debug)] +pub struct CtrlLogoff { + inner: RxFuture, +} + +impl CtrlLogoff { + /// Receives the next signal notification event. + /// + /// `None` is returned if no more events can be received by this listener. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tokio::signal::windows::ctrl_logoff; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// // An listener of CTRL-LOGOFF events. + /// let mut signal = ctrl_logoff()?; + /// + /// // Print whenever a CTRL-LOGOFF event is received. + /// signal.recv().await; + /// println!("got CTRL-LOGOFF. Cleaning up before exiting"); + /// + /// Ok(()) + /// } + /// ``` + pub async fn recv(&mut self) -> Option<()> { + self.inner.recv().await + } + + /// Polls to receive the next signal notification event, outside of an + /// `async` context. + /// + /// `None` is returned if no more events can be received by this listener. + /// + /// # Examples + /// + /// Polling from a manually implemented future + /// + /// ```rust,no_run + /// use std::pin::Pin; + /// use std::future::Future; + /// use std::task::{Context, Poll}; + /// use tokio::signal::windows::CtrlLogoff; + /// + /// struct MyFuture { + /// ctrl_logoff: CtrlLogoff, + /// } + /// + /// impl Future for MyFuture { + /// type Output = Option<()>; + /// + /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + /// println!("polling MyFuture"); + /// self.ctrl_logoff.poll_recv(cx) + /// } + /// } + /// ``` + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> { + self.inner.poll_recv(cx) + } +} diff --git a/third_party/rust/tokio/src/signal/windows/stub.rs b/third_party/rust/tokio/src/signal/windows/stub.rs new file mode 100644 index 0000000000..61df30979b --- /dev/null +++ b/third_party/rust/tokio/src/signal/windows/stub.rs @@ -0,0 +1,25 @@ +//! Stub implementations for the platform API so that rustdoc can build linkable +//! documentation on non-windows platforms. + +use crate::signal::RxFuture; +use std::io; + +pub(super) fn ctrl_break() -> io::Result<RxFuture> { + panic!() +} + +pub(super) fn ctrl_close() -> io::Result<RxFuture> { + panic!() +} + +pub(super) fn ctrl_c() -> io::Result<RxFuture> { + panic!() +} + +pub(super) fn ctrl_logoff() -> io::Result<RxFuture> { + panic!() +} + +pub(super) fn ctrl_shutdown() -> io::Result<RxFuture> { + panic!() +} diff --git a/third_party/rust/tokio/src/signal/windows/sys.rs b/third_party/rust/tokio/src/signal/windows/sys.rs new file mode 100644 index 0000000000..26e6bdf818 --- /dev/null +++ b/third_party/rust/tokio/src/signal/windows/sys.rs @@ -0,0 +1,229 @@ +use std::io; +use std::sync::Once; + +use crate::signal::registry::{globals, EventId, EventInfo, Init, Storage}; +use crate::signal::RxFuture; + +use windows_sys::Win32::Foundation::BOOL; +use windows_sys::Win32::System::Console as console; + +pub(super) fn ctrl_break() -> io::Result<RxFuture> { + new(console::CTRL_BREAK_EVENT) +} + +pub(super) fn ctrl_close() -> io::Result<RxFuture> { + new(console::CTRL_CLOSE_EVENT) +} + +pub(super) fn ctrl_c() -> io::Result<RxFuture> { + new(console::CTRL_C_EVENT) +} + +pub(super) fn ctrl_logoff() -> io::Result<RxFuture> { + new(console::CTRL_LOGOFF_EVENT) +} + +pub(super) fn ctrl_shutdown() -> io::Result<RxFuture> { + new(console::CTRL_SHUTDOWN_EVENT) +} + +fn new(signum: u32) -> io::Result<RxFuture> { + global_init()?; + let rx = globals().register_listener(signum as EventId); + Ok(RxFuture::new(rx)) +} + +#[derive(Debug)] +pub(crate) struct OsStorage { + ctrl_break: EventInfo, + ctrl_close: EventInfo, + ctrl_c: EventInfo, + ctrl_logoff: EventInfo, + ctrl_shutdown: EventInfo, +} + +impl Init for OsStorage { + fn init() -> Self { + Self { + ctrl_break: Default::default(), + ctrl_close: Default::default(), + ctrl_c: Default::default(), + ctrl_logoff: Default::default(), + ctrl_shutdown: Default::default(), + } + } +} + +impl Storage for OsStorage { + fn event_info(&self, id: EventId) -> Option<&EventInfo> { + match u32::try_from(id) { + Ok(console::CTRL_BREAK_EVENT) => Some(&self.ctrl_break), + Ok(console::CTRL_CLOSE_EVENT) => Some(&self.ctrl_close), + Ok(console::CTRL_C_EVENT) => Some(&self.ctrl_c), + Ok(console::CTRL_LOGOFF_EVENT) => Some(&self.ctrl_logoff), + Ok(console::CTRL_SHUTDOWN_EVENT) => Some(&self.ctrl_shutdown), + _ => None, + } + } + + fn for_each<'a, F>(&'a self, mut f: F) + where + F: FnMut(&'a EventInfo), + { + f(&self.ctrl_break); + f(&self.ctrl_close); + f(&self.ctrl_c); + f(&self.ctrl_logoff); + f(&self.ctrl_shutdown); + } +} + +#[derive(Debug)] +pub(crate) struct OsExtraData {} + +impl Init for OsExtraData { + fn init() -> Self { + Self {} + } +} + +fn global_init() -> io::Result<()> { + static INIT: Once = Once::new(); + + let mut init = None; + + INIT.call_once(|| unsafe { + let rc = console::SetConsoleCtrlHandler(Some(handler), 1); + let ret = if rc == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + }; + + init = Some(ret); + }); + + init.unwrap_or_else(|| Ok(())) +} + +unsafe extern "system" fn handler(ty: u32) -> BOOL { + let globals = globals(); + globals.record_event(ty as EventId); + + // According to https://docs.microsoft.com/en-us/windows/console/handlerroutine + // the handler routine is always invoked in a new thread, thus we don't + // have the same restrictions as in Unix signal handlers, meaning we can + // go ahead and perform the broadcast here. + if globals.broadcast() { + 1 + } else { + // No one is listening for this notification any more + // let the OS fire the next (possibly the default) handler. + 0 + } +} + +#[cfg(all(test, not(loom)))] +mod tests { + use super::*; + use crate::runtime::Runtime; + + use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task}; + + #[test] + fn ctrl_c() { + let rt = rt(); + let _enter = rt.enter(); + + let mut ctrl_c = task::spawn(crate::signal::ctrl_c()); + + assert_pending!(ctrl_c.poll()); + + // Windows doesn't have a good programmatic way of sending events + // like sending signals on Unix, so we'll stub out the actual OS + // integration and test that our handling works. + unsafe { + super::handler(console::CTRL_C_EVENT); + } + + assert_ready_ok!(ctrl_c.poll()); + } + + #[test] + fn ctrl_break() { + let rt = rt(); + + rt.block_on(async { + let mut ctrl_break = assert_ok!(crate::signal::windows::ctrl_break()); + + // Windows doesn't have a good programmatic way of sending events + // like sending signals on Unix, so we'll stub out the actual OS + // integration and test that our handling works. + unsafe { + super::handler(console::CTRL_BREAK_EVENT); + } + + ctrl_break.recv().await.unwrap(); + }); + } + + #[test] + fn ctrl_close() { + let rt = rt(); + + rt.block_on(async { + let mut ctrl_close = assert_ok!(crate::signal::windows::ctrl_close()); + + // Windows doesn't have a good programmatic way of sending events + // like sending signals on Unix, so we'll stub out the actual OS + // integration and test that our handling works. + unsafe { + super::handler(console::CTRL_CLOSE_EVENT); + } + + ctrl_close.recv().await.unwrap(); + }); + } + + #[test] + fn ctrl_shutdown() { + let rt = rt(); + + rt.block_on(async { + let mut ctrl_shutdown = assert_ok!(crate::signal::windows::ctrl_shutdown()); + + // Windows doesn't have a good programmatic way of sending events + // like sending signals on Unix, so we'll stub out the actual OS + // integration and test that our handling works. + unsafe { + super::handler(console::CTRL_SHUTDOWN_EVENT); + } + + ctrl_shutdown.recv().await.unwrap(); + }); + } + + #[test] + fn ctrl_logoff() { + let rt = rt(); + + rt.block_on(async { + let mut ctrl_logoff = assert_ok!(crate::signal::windows::ctrl_logoff()); + + // Windows doesn't have a good programmatic way of sending events + // like sending signals on Unix, so we'll stub out the actual OS + // integration and test that our handling works. + unsafe { + super::handler(console::CTRL_LOGOFF_EVENT); + } + + ctrl_logoff.recv().await.unwrap(); + }); + } + + fn rt() -> Runtime { + crate::runtime::Builder::new_current_thread() + .build() + .unwrap() + } +} |