summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs
blob: d310d9f6d350a771435f896eb6d68b9cefaae8bb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use super::{Core, Handle, Shared};

use crate::loom::sync::Arc;
use crate::runtime::scheduler::multi_thread::Stats;
use crate::runtime::task::trace::trace_multi_thread;
use crate::runtime::{dump, WorkerMetrics};

use std::time::Duration;

impl Handle {
    pub(super) fn trace_core(&self, mut core: Box<Core>) -> Box<Core> {
        core.is_traced = false;

        if core.is_shutdown {
            return core;
        }

        // wait for other workers, or timeout without tracing
        let timeout = Duration::from_millis(250); // a _very_ generous timeout
        let barrier =
            if let Some(barrier) = self.shared.trace_status.trace_start.wait_timeout(timeout) {
                barrier
            } else {
                // don't attempt to trace
                return core;
            };

        if !barrier.is_leader() {
            // wait for leader to finish tracing
            self.shared.trace_status.trace_end.wait();
            return core;
        }

        // trace

        let owned = &self.shared.owned;
        let mut local = self.shared.steal_all();
        let synced = &self.shared.synced;
        let injection = &self.shared.inject;

        // safety: `trace_multi_thread` is invoked with the same `synced` that `injection`
        // was created with.
        let traces = unsafe { trace_multi_thread(owned, &mut local, synced, injection) }
            .into_iter()
            .map(dump::Task::new)
            .collect();

        let result = dump::Dump::new(traces);

        // stash the result
        self.shared.trace_status.stash_result(result);

        // allow other workers to proceed
        self.shared.trace_status.trace_end.wait();

        core
    }
}

impl Shared {
    /// Steal all tasks from remotes into a single local queue.
    pub(super) fn steal_all(&self) -> super::queue::Local<Arc<Handle>> {
        let (_steal, mut local) = super::queue::local();

        let worker_metrics = WorkerMetrics::new();
        let mut stats = Stats::new(&worker_metrics);

        for remote in self.remotes.iter() {
            let steal = &remote.steal;
            while !steal.is_empty() {
                if let Some(task) = steal.steal_into(&mut local, &mut stats) {
                    local.push_back([task].into_iter());
                }
            }
        }

        local
    }
}