diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:02:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:02:58 +0000 |
commit | 698f8c2f01ea549d77d7dc3338a12e04c11057b9 (patch) | |
tree | 173a775858bd501c378080a10dca74132f05bc50 /vendor/rayon-core/src/log.rs | |
parent | Initial commit. (diff) | |
download | rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.tar.xz rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.zip |
Adding upstream version 1.64.0+dfsg1.upstream/1.64.0+dfsg1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/rayon-core/src/log.rs')
-rw-r--r-- | vendor/rayon-core/src/log.rs | 423 |
1 files changed, 423 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/log.rs b/vendor/rayon-core/src/log.rs new file mode 100644 index 000000000..e1ff827df --- /dev/null +++ b/vendor/rayon-core/src/log.rs @@ -0,0 +1,423 @@ +//! Debug Logging +//! +//! To use in a debug build, set the env var `RAYON_LOG` as +//! described below. In a release build, logs are compiled out by +//! default unless Rayon is built with `--cfg rayon_rs_log` (try +//! `RUSTFLAGS="--cfg rayon_rs_log"`). +//! +//! Note that logs are an internally debugging tool and their format +//! is considered unstable, as are the details of how to enable them. +//! +//! # Valid values for RAYON_LOG +//! +//! The `RAYON_LOG` variable can take on the following values: +//! +//! * `tail:<file>` -- dumps the last 10,000 events into the given file; +//! useful for tracking down deadlocks +//! * `profile:<file>` -- dumps only those events needed to reconstruct how +//! many workers are active at a given time +//! * `all:<file>` -- dumps every event to the file; useful for debugging + +use crossbeam_channel::{self, Receiver, Sender}; +use std::collections::VecDeque; +use std::env; +use std::fs::File; +use std::io::{self, BufWriter, Write}; + +/// True if logs are compiled in. +pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions)); + +#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] +pub(super) enum Event { + /// Flushes events to disk, used to terminate benchmarking. + Flush, + + /// Indicates that a worker thread started execution. + ThreadStart { + worker: usize, + terminate_addr: usize, + }, + + /// Indicates that a worker thread started execution. + ThreadTerminate { worker: usize }, + + /// Indicates that a worker thread became idle, blocked on `latch_addr`. + ThreadIdle { worker: usize, latch_addr: usize }, + + /// Indicates that an idle worker thread found work to do, after + /// yield rounds. It should no longer be considered idle. + ThreadFoundWork { worker: usize, yields: u32 }, + + /// Indicates that a worker blocked on a latch observed that it was set. + /// + /// Internal debugging event that does not affect the state + /// machine. + ThreadSawLatchSet { worker: usize, latch_addr: usize }, + + /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal + /// sleep state that we saw at the time. + ThreadSleepy { worker: usize, jobs_counter: usize }, + + /// Indicates that the thread's attempt to fall asleep was + /// interrupted because the latch was set. (This is not, in and of + /// itself, a change to the thread state.) + ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize }, + + /// Indicates that the thread's attempt to fall asleep was + /// interrupted because a job was posted. (This is not, in and of + /// itself, a change to the thread state.) + ThreadSleepInterruptedByJob { worker: usize }, + + /// Indicates that an idle worker has gone to sleep. + ThreadSleeping { worker: usize, latch_addr: usize }, + + /// Indicates that a sleeping worker has awoken. + ThreadAwoken { worker: usize, latch_addr: usize }, + + /// Indicates that the given worker thread was notified it should + /// awaken. + ThreadNotify { worker: usize }, + + /// The given worker has pushed a job to its local deque. + JobPushed { worker: usize }, + + /// The given worker has popped a job from its local deque. + JobPopped { worker: usize }, + + /// The given worker has stolen a job from the deque of another. + JobStolen { worker: usize, victim: usize }, + + /// N jobs were injected into the global queue. + JobsInjected { count: usize }, + + /// A job was removed from the global queue. + JobUninjected { worker: usize }, + + /// When announcing a job, this was the value of the counters we observed. + /// + /// No effect on thread state, just a debugging event. + JobThreadCounts { + worker: usize, + num_idle: u16, + num_sleepers: u16, + }, +} + +/// Handle to the logging thread, if any. You can use this to deliver +/// logs. You can also clone it freely. +#[derive(Clone)] +pub(super) struct Logger { + sender: Option<Sender<Event>>, +} + +impl Logger { + pub(super) fn new(num_workers: usize) -> Logger { + if !LOG_ENABLED { + return Self::disabled(); + } + + // see the doc comment for the format + let env_log = match env::var("RAYON_LOG") { + Ok(s) => s, + Err(_) => return Self::disabled(), + }; + + let (sender, receiver) = crossbeam_channel::unbounded(); + + if env_log.starts_with("tail:") { + let filename = env_log["tail:".len()..].to_string(); + ::std::thread::spawn(move || { + Self::tail_logger_thread(num_workers, filename, 10_000, receiver) + }); + } else if env_log == "all" { + ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver)); + } else if env_log.starts_with("profile:") { + let filename = env_log["profile:".len()..].to_string(); + ::std::thread::spawn(move || { + Self::profile_logger_thread(num_workers, filename, 10_000, receiver) + }); + } else { + panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'"); + } + + return Logger { + sender: Some(sender), + }; + } + + fn disabled() -> Logger { + Logger { sender: None } + } + + #[inline] + pub(super) fn log(&self, event: impl FnOnce() -> Event) { + if !LOG_ENABLED { + return; + } + + if let Some(sender) = &self.sender { + sender.send(event()).unwrap(); + } + } + + fn profile_logger_thread( + num_workers: usize, + log_filename: String, + capacity: usize, + receiver: Receiver<Event>, + ) { + let file = File::create(&log_filename) + .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err)); + + let mut writer = BufWriter::new(file); + let mut events = Vec::with_capacity(capacity); + let mut state = SimulatorState::new(num_workers); + let timeout = std::time::Duration::from_secs(30); + + loop { + loop { + match receiver.recv_timeout(timeout) { + Ok(event) => { + if let Event::Flush = event { + break; + } else { + events.push(event); + } + } + + Err(_) => break, + } + + if events.len() == capacity { + break; + } + } + + for event in events.drain(..) { + if state.simulate(&event) { + state.dump(&mut writer, &event).unwrap(); + } + } + + writer.flush().unwrap(); + } + } + + fn tail_logger_thread( + num_workers: usize, + log_filename: String, + capacity: usize, + receiver: Receiver<Event>, + ) { + let file = File::create(&log_filename) + .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err)); + + let mut writer = BufWriter::new(file); + let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity); + let mut state = SimulatorState::new(num_workers); + let timeout = std::time::Duration::from_secs(30); + let mut skipped = false; + + loop { + loop { + match receiver.recv_timeout(timeout) { + Ok(event) => { + if let Event::Flush = event { + // We ignore Flush events in tail mode -- + // we're really just looking for + // deadlocks. + continue; + } else { + if events.len() == capacity { + let event = events.pop_front().unwrap(); + state.simulate(&event); + skipped = true; + } + + events.push_back(event); + } + } + + Err(_) => break, + } + } + + if skipped { + write!(writer, "...\n").unwrap(); + skipped = false; + } + + for event in events.drain(..) { + // In tail mode, we dump *all* events out, whether or + // not they were 'interesting' to the state machine. + state.simulate(&event); + state.dump(&mut writer, &event).unwrap(); + } + + writer.flush().unwrap(); + } + } + + fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) { + let stderr = std::io::stderr(); + let mut state = SimulatorState::new(num_workers); + + for event in receiver { + let mut writer = BufWriter::new(stderr.lock()); + state.simulate(&event); + state.dump(&mut writer, &event).unwrap(); + writer.flush().unwrap(); + } + } +} + +#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)] +enum State { + Working, + Idle, + Notified, + Sleeping, + Terminated, +} + +impl State { + fn letter(&self) -> char { + match self { + State::Working => 'W', + State::Idle => 'I', + State::Notified => 'N', + State::Sleeping => 'S', + State::Terminated => 'T', + } + } +} + +struct SimulatorState { + local_queue_size: Vec<usize>, + thread_states: Vec<State>, + injector_size: usize, +} + +impl SimulatorState { + fn new(num_workers: usize) -> Self { + Self { + local_queue_size: (0..num_workers).map(|_| 0).collect(), + thread_states: (0..num_workers).map(|_| State::Working).collect(), + injector_size: 0, + } + } + + fn simulate(&mut self, event: &Event) -> bool { + match *event { + Event::ThreadIdle { worker, .. } => { + assert_eq!(self.thread_states[worker], State::Working); + self.thread_states[worker] = State::Idle; + true + } + + Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => { + self.thread_states[worker] = State::Working; + true + } + + Event::ThreadTerminate { worker, .. } => { + self.thread_states[worker] = State::Terminated; + true + } + + Event::ThreadSleeping { worker, .. } => { + assert_eq!(self.thread_states[worker], State::Idle); + self.thread_states[worker] = State::Sleeping; + true + } + + Event::ThreadAwoken { worker, .. } => { + assert_eq!(self.thread_states[worker], State::Notified); + self.thread_states[worker] = State::Idle; + true + } + + Event::JobPushed { worker } => { + self.local_queue_size[worker] += 1; + true + } + + Event::JobPopped { worker } => { + self.local_queue_size[worker] -= 1; + true + } + + Event::JobStolen { victim, .. } => { + self.local_queue_size[victim] -= 1; + true + } + + Event::JobsInjected { count } => { + self.injector_size += count; + true + } + + Event::JobUninjected { .. } => { + self.injector_size -= 1; + true + } + + Event::ThreadNotify { worker } => { + // Currently, this log event occurs while holding the + // thread lock, so we should *always* see it before + // the worker awakens. + assert_eq!(self.thread_states[worker], State::Sleeping); + self.thread_states[worker] = State::Notified; + true + } + + // remaining events are no-ops from pov of simulating the + // thread state + _ => false, + } + } + + fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> { + let num_idle_threads = self + .thread_states + .iter() + .filter(|s| **s == State::Idle) + .count(); + + let num_sleeping_threads = self + .thread_states + .iter() + .filter(|s| **s == State::Sleeping) + .count(); + + let num_notified_threads = self + .thread_states + .iter() + .filter(|s| **s == State::Notified) + .count(); + + let num_pending_jobs: usize = self.local_queue_size.iter().sum(); + + write!(w, "{:2},", num_idle_threads)?; + write!(w, "{:2},", num_sleeping_threads)?; + write!(w, "{:2},", num_notified_threads)?; + write!(w, "{:4},", num_pending_jobs)?; + write!(w, "{:4},", self.injector_size)?; + + let event_str = format!("{:?}", event); + write!(w, r#""{:60}","#, event_str)?; + + for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) { + write!(w, " T{:02},{}", i, state.letter(),)?; + + if *queue_size > 0 { + write!(w, ",{:03},", queue_size)?; + } else { + write!(w, ", ,")?; + } + } + + write!(w, "\n")?; + Ok(()) + } +} |