summaryrefslogtreecommitdiffstats
path: root/vendor/rayon-core/src/log.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--vendor/rayon-core/src/log.rs423
1 files changed, 423 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/log.rs b/vendor/rayon-core/src/log.rs
new file mode 100644
index 000000000..e1ff827df
--- /dev/null
+++ b/vendor/rayon-core/src/log.rs
@@ -0,0 +1,423 @@
+//! Debug Logging
+//!
+//! To use in a debug build, set the env var `RAYON_LOG` as
+//! described below. In a release build, logs are compiled out by
+//! default unless Rayon is built with `--cfg rayon_rs_log` (try
+//! `RUSTFLAGS="--cfg rayon_rs_log"`).
+//!
+//! Note that logs are an internally debugging tool and their format
+//! is considered unstable, as are the details of how to enable them.
+//!
+//! # Valid values for RAYON_LOG
+//!
+//! The `RAYON_LOG` variable can take on the following values:
+//!
+//! * `tail:<file>` -- dumps the last 10,000 events into the given file;
+//! useful for tracking down deadlocks
+//! * `profile:<file>` -- dumps only those events needed to reconstruct how
+//! many workers are active at a given time
+//! * `all:<file>` -- dumps every event to the file; useful for debugging
+
+use crossbeam_channel::{self, Receiver, Sender};
+use std::collections::VecDeque;
+use std::env;
+use std::fs::File;
+use std::io::{self, BufWriter, Write};
+
+/// True if logs are compiled in.
+pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions));
+
+#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
+pub(super) enum Event {
+ /// Flushes events to disk, used to terminate benchmarking.
+ Flush,
+
+ /// Indicates that a worker thread started execution.
+ ThreadStart {
+ worker: usize,
+ terminate_addr: usize,
+ },
+
+ /// Indicates that a worker thread started execution.
+ ThreadTerminate { worker: usize },
+
+ /// Indicates that a worker thread became idle, blocked on `latch_addr`.
+ ThreadIdle { worker: usize, latch_addr: usize },
+
+ /// Indicates that an idle worker thread found work to do, after
+ /// yield rounds. It should no longer be considered idle.
+ ThreadFoundWork { worker: usize, yields: u32 },
+
+ /// Indicates that a worker blocked on a latch observed that it was set.
+ ///
+ /// Internal debugging event that does not affect the state
+ /// machine.
+ ThreadSawLatchSet { worker: usize, latch_addr: usize },
+
+ /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal
+ /// sleep state that we saw at the time.
+ ThreadSleepy { worker: usize, jobs_counter: usize },
+
+ /// Indicates that the thread's attempt to fall asleep was
+ /// interrupted because the latch was set. (This is not, in and of
+ /// itself, a change to the thread state.)
+ ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize },
+
+ /// Indicates that the thread's attempt to fall asleep was
+ /// interrupted because a job was posted. (This is not, in and of
+ /// itself, a change to the thread state.)
+ ThreadSleepInterruptedByJob { worker: usize },
+
+ /// Indicates that an idle worker has gone to sleep.
+ ThreadSleeping { worker: usize, latch_addr: usize },
+
+ /// Indicates that a sleeping worker has awoken.
+ ThreadAwoken { worker: usize, latch_addr: usize },
+
+ /// Indicates that the given worker thread was notified it should
+ /// awaken.
+ ThreadNotify { worker: usize },
+
+ /// The given worker has pushed a job to its local deque.
+ JobPushed { worker: usize },
+
+ /// The given worker has popped a job from its local deque.
+ JobPopped { worker: usize },
+
+ /// The given worker has stolen a job from the deque of another.
+ JobStolen { worker: usize, victim: usize },
+
+ /// N jobs were injected into the global queue.
+ JobsInjected { count: usize },
+
+ /// A job was removed from the global queue.
+ JobUninjected { worker: usize },
+
+ /// When announcing a job, this was the value of the counters we observed.
+ ///
+ /// No effect on thread state, just a debugging event.
+ JobThreadCounts {
+ worker: usize,
+ num_idle: u16,
+ num_sleepers: u16,
+ },
+}
+
+/// Handle to the logging thread, if any. You can use this to deliver
+/// logs. You can also clone it freely.
+#[derive(Clone)]
+pub(super) struct Logger {
+ sender: Option<Sender<Event>>,
+}
+
+impl Logger {
+ pub(super) fn new(num_workers: usize) -> Logger {
+ if !LOG_ENABLED {
+ return Self::disabled();
+ }
+
+ // see the doc comment for the format
+ let env_log = match env::var("RAYON_LOG") {
+ Ok(s) => s,
+ Err(_) => return Self::disabled(),
+ };
+
+ let (sender, receiver) = crossbeam_channel::unbounded();
+
+ if env_log.starts_with("tail:") {
+ let filename = env_log["tail:".len()..].to_string();
+ ::std::thread::spawn(move || {
+ Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
+ });
+ } else if env_log == "all" {
+ ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
+ } else if env_log.starts_with("profile:") {
+ let filename = env_log["profile:".len()..].to_string();
+ ::std::thread::spawn(move || {
+ Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
+ });
+ } else {
+ panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
+ }
+
+ return Logger {
+ sender: Some(sender),
+ };
+ }
+
+ fn disabled() -> Logger {
+ Logger { sender: None }
+ }
+
+ #[inline]
+ pub(super) fn log(&self, event: impl FnOnce() -> Event) {
+ if !LOG_ENABLED {
+ return;
+ }
+
+ if let Some(sender) = &self.sender {
+ sender.send(event()).unwrap();
+ }
+ }
+
+ fn profile_logger_thread(
+ num_workers: usize,
+ log_filename: String,
+ capacity: usize,
+ receiver: Receiver<Event>,
+ ) {
+ let file = File::create(&log_filename)
+ .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
+
+ let mut writer = BufWriter::new(file);
+ let mut events = Vec::with_capacity(capacity);
+ let mut state = SimulatorState::new(num_workers);
+ let timeout = std::time::Duration::from_secs(30);
+
+ loop {
+ loop {
+ match receiver.recv_timeout(timeout) {
+ Ok(event) => {
+ if let Event::Flush = event {
+ break;
+ } else {
+ events.push(event);
+ }
+ }
+
+ Err(_) => break,
+ }
+
+ if events.len() == capacity {
+ break;
+ }
+ }
+
+ for event in events.drain(..) {
+ if state.simulate(&event) {
+ state.dump(&mut writer, &event).unwrap();
+ }
+ }
+
+ writer.flush().unwrap();
+ }
+ }
+
+ fn tail_logger_thread(
+ num_workers: usize,
+ log_filename: String,
+ capacity: usize,
+ receiver: Receiver<Event>,
+ ) {
+ let file = File::create(&log_filename)
+ .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
+
+ let mut writer = BufWriter::new(file);
+ let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity);
+ let mut state = SimulatorState::new(num_workers);
+ let timeout = std::time::Duration::from_secs(30);
+ let mut skipped = false;
+
+ loop {
+ loop {
+ match receiver.recv_timeout(timeout) {
+ Ok(event) => {
+ if let Event::Flush = event {
+ // We ignore Flush events in tail mode --
+ // we're really just looking for
+ // deadlocks.
+ continue;
+ } else {
+ if events.len() == capacity {
+ let event = events.pop_front().unwrap();
+ state.simulate(&event);
+ skipped = true;
+ }
+
+ events.push_back(event);
+ }
+ }
+
+ Err(_) => break,
+ }
+ }
+
+ if skipped {
+ write!(writer, "...\n").unwrap();
+ skipped = false;
+ }
+
+ for event in events.drain(..) {
+ // In tail mode, we dump *all* events out, whether or
+ // not they were 'interesting' to the state machine.
+ state.simulate(&event);
+ state.dump(&mut writer, &event).unwrap();
+ }
+
+ writer.flush().unwrap();
+ }
+ }
+
+ fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) {
+ let stderr = std::io::stderr();
+ let mut state = SimulatorState::new(num_workers);
+
+ for event in receiver {
+ let mut writer = BufWriter::new(stderr.lock());
+ state.simulate(&event);
+ state.dump(&mut writer, &event).unwrap();
+ writer.flush().unwrap();
+ }
+ }
+}
+
+#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
+enum State {
+ Working,
+ Idle,
+ Notified,
+ Sleeping,
+ Terminated,
+}
+
+impl State {
+ fn letter(&self) -> char {
+ match self {
+ State::Working => 'W',
+ State::Idle => 'I',
+ State::Notified => 'N',
+ State::Sleeping => 'S',
+ State::Terminated => 'T',
+ }
+ }
+}
+
+struct SimulatorState {
+ local_queue_size: Vec<usize>,
+ thread_states: Vec<State>,
+ injector_size: usize,
+}
+
+impl SimulatorState {
+ fn new(num_workers: usize) -> Self {
+ Self {
+ local_queue_size: (0..num_workers).map(|_| 0).collect(),
+ thread_states: (0..num_workers).map(|_| State::Working).collect(),
+ injector_size: 0,
+ }
+ }
+
+ fn simulate(&mut self, event: &Event) -> bool {
+ match *event {
+ Event::ThreadIdle { worker, .. } => {
+ assert_eq!(self.thread_states[worker], State::Working);
+ self.thread_states[worker] = State::Idle;
+ true
+ }
+
+ Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => {
+ self.thread_states[worker] = State::Working;
+ true
+ }
+
+ Event::ThreadTerminate { worker, .. } => {
+ self.thread_states[worker] = State::Terminated;
+ true
+ }
+
+ Event::ThreadSleeping { worker, .. } => {
+ assert_eq!(self.thread_states[worker], State::Idle);
+ self.thread_states[worker] = State::Sleeping;
+ true
+ }
+
+ Event::ThreadAwoken { worker, .. } => {
+ assert_eq!(self.thread_states[worker], State::Notified);
+ self.thread_states[worker] = State::Idle;
+ true
+ }
+
+ Event::JobPushed { worker } => {
+ self.local_queue_size[worker] += 1;
+ true
+ }
+
+ Event::JobPopped { worker } => {
+ self.local_queue_size[worker] -= 1;
+ true
+ }
+
+ Event::JobStolen { victim, .. } => {
+ self.local_queue_size[victim] -= 1;
+ true
+ }
+
+ Event::JobsInjected { count } => {
+ self.injector_size += count;
+ true
+ }
+
+ Event::JobUninjected { .. } => {
+ self.injector_size -= 1;
+ true
+ }
+
+ Event::ThreadNotify { worker } => {
+ // Currently, this log event occurs while holding the
+ // thread lock, so we should *always* see it before
+ // the worker awakens.
+ assert_eq!(self.thread_states[worker], State::Sleeping);
+ self.thread_states[worker] = State::Notified;
+ true
+ }
+
+ // remaining events are no-ops from pov of simulating the
+ // thread state
+ _ => false,
+ }
+ }
+
+ fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> {
+ let num_idle_threads = self
+ .thread_states
+ .iter()
+ .filter(|s| **s == State::Idle)
+ .count();
+
+ let num_sleeping_threads = self
+ .thread_states
+ .iter()
+ .filter(|s| **s == State::Sleeping)
+ .count();
+
+ let num_notified_threads = self
+ .thread_states
+ .iter()
+ .filter(|s| **s == State::Notified)
+ .count();
+
+ let num_pending_jobs: usize = self.local_queue_size.iter().sum();
+
+ write!(w, "{:2},", num_idle_threads)?;
+ write!(w, "{:2},", num_sleeping_threads)?;
+ write!(w, "{:2},", num_notified_threads)?;
+ write!(w, "{:4},", num_pending_jobs)?;
+ write!(w, "{:4},", self.injector_size)?;
+
+ let event_str = format!("{:?}", event);
+ write!(w, r#""{:60}","#, event_str)?;
+
+ for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) {
+ write!(w, " T{:02},{}", i, state.letter(),)?;
+
+ if *queue_size > 0 {
+ write!(w, ",{:03},", queue_size)?;
+ } else {
+ write!(w, ", ,")?;
+ }
+ }
+
+ write!(w, "\n")?;
+ Ok(())
+ }
+}