diff options
Diffstat (limited to 'src/internal/trace/v2/generation.go')
-rw-r--r-- | src/internal/trace/v2/generation.go | 399 |
1 files changed, 399 insertions, 0 deletions
diff --git a/src/internal/trace/v2/generation.go b/src/internal/trace/v2/generation.go new file mode 100644 index 0000000..4cdf76e --- /dev/null +++ b/src/internal/trace/v2/generation.go @@ -0,0 +1,399 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package trace + +import ( + "bufio" + "bytes" + "cmp" + "encoding/binary" + "fmt" + "io" + "slices" + "strings" + + "internal/trace/v2/event" + "internal/trace/v2/event/go122" +) + +// generation contains all the trace data for a single +// trace generation. It is purely data: it does not +// track any parse state nor does it contain a cursor +// into the generation. +type generation struct { + gen uint64 + batches map[ThreadID][]batch + cpuSamples []cpuSample + *evTable +} + +// spilledBatch represents a batch that was read out for the next generation, +// while reading the previous one. It's passed on when parsing the next +// generation. +type spilledBatch struct { + gen uint64 + *batch +} + +// readGeneration buffers and decodes the structural elements of a trace generation +// out of r. spill is the first batch of the new generation (already buffered and +// parsed from reading the last generation). Returns the generation and the first +// batch read of the next generation, if any. +func readGeneration(r *bufio.Reader, spill *spilledBatch) (*generation, *spilledBatch, error) { + g := &generation{ + evTable: new(evTable), + batches: make(map[ThreadID][]batch), + } + // Process the spilled batch. + if spill != nil { + g.gen = spill.gen + if err := processBatch(g, *spill.batch); err != nil { + return nil, nil, err + } + spill = nil + } + // Read batches one at a time until we either hit EOF or + // the next generation. + for { + b, gen, err := readBatch(r) + if err == io.EOF { + break + } + if err != nil { + return nil, nil, err + } + if gen == 0 { + // 0 is a sentinel used by the runtime, so we'll never see it. + return nil, nil, fmt.Errorf("invalid generation number %d", gen) + } + if g.gen == 0 { + // Initialize gen. + g.gen = gen + } + if gen == g.gen+1 { // TODO: advance this the same way the runtime does. + spill = &spilledBatch{gen: gen, batch: &b} + break + } + if gen != g.gen { + // N.B. Fail as fast as possible if we see this. At first it + // may seem prudent to be fault-tolerant and assume we have a + // complete generation, parsing and returning that first. However, + // if the batches are mixed across generations then it's likely + // we won't be able to parse this generation correctly at all. + // Rather than return a cryptic error in that case, indicate the + // problem as soon as we see it. + return nil, nil, fmt.Errorf("generations out of order") + } + if err := processBatch(g, b); err != nil { + return nil, nil, err + } + } + + // Check some invariants. + if g.freq == 0 { + return nil, nil, fmt.Errorf("no frequency event found") + } + // N.B. Trust that the batch order is correct. We can't validate the batch order + // by timestamp because the timestamps could just be plain wrong. The source of + // truth is the order things appear in the trace and the partial order sequence + // numbers on certain events. If it turns out the batch order is actually incorrect + // we'll very likely fail to advance a partial order from the frontier. + + // Compactify stacks and strings for better lookup performance later. + g.stacks.compactify() + g.strings.compactify() + + // Validate stacks. + if err := validateStackStrings(&g.stacks, &g.strings); err != nil { + return nil, nil, err + } + + // Fix up the CPU sample timestamps, now that we have freq. + for i := range g.cpuSamples { + s := &g.cpuSamples[i] + s.time = g.freq.mul(timestamp(s.time)) + } + // Sort the CPU samples. + slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int { + return cmp.Compare(a.time, b.time) + }) + return g, spill, nil +} + +// processBatch adds the batch to the generation. +func processBatch(g *generation, b batch) error { + switch { + case b.isStringsBatch(): + if err := addStrings(&g.strings, b); err != nil { + return err + } + case b.isStacksBatch(): + if err := addStacks(&g.stacks, b); err != nil { + return err + } + case b.isCPUSamplesBatch(): + samples, err := addCPUSamples(g.cpuSamples, b) + if err != nil { + return err + } + g.cpuSamples = samples + case b.isFreqBatch(): + freq, err := parseFreq(b) + if err != nil { + return err + } + if g.freq != 0 { + return fmt.Errorf("found multiple frequency events") + } + g.freq = freq + default: + g.batches[b.m] = append(g.batches[b.m], b) + } + return nil +} + +// validateStackStrings makes sure all the string references in +// the stack table are present in the string table. +func validateStackStrings(stacks *dataTable[stackID, stack], strings *dataTable[stringID, string]) error { + var err error + stacks.forEach(func(id stackID, stk stack) bool { + for _, frame := range stk.frames { + _, ok := strings.get(frame.funcID) + if !ok { + err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id) + return false + } + _, ok = strings.get(frame.fileID) + if !ok { + err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id) + return false + } + } + return true + }) + return err +} + +// addStrings takes a batch whose first byte is an EvStrings event +// (indicating that the batch contains only strings) and adds each +// string contained therein to the provided strings map. +func addStrings(stringTable *dataTable[stringID, string], b batch) error { + if !b.isStringsBatch() { + return fmt.Errorf("internal error: addStrings called on non-string batch") + } + r := bytes.NewReader(b.data) + hdr, err := r.ReadByte() // Consume the EvStrings byte. + if err != nil || event.Type(hdr) != go122.EvStrings { + return fmt.Errorf("missing strings batch header") + } + + var sb strings.Builder + for r.Len() != 0 { + // Read the header. + ev, err := r.ReadByte() + if err != nil { + return err + } + if event.Type(ev) != go122.EvString { + return fmt.Errorf("expected string event, got %d", ev) + } + + // Read the string's ID. + id, err := binary.ReadUvarint(r) + if err != nil { + return err + } + + // Read the string's length. + len, err := binary.ReadUvarint(r) + if err != nil { + return err + } + if len > go122.MaxStringSize { + return fmt.Errorf("invalid string size %d, maximum is %d", len, go122.MaxStringSize) + } + + // Copy out the string. + n, err := io.CopyN(&sb, r, int64(len)) + if n != int64(len) { + return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len) + } + if err != nil { + return fmt.Errorf("copying string data: %w", err) + } + + // Add the string to the map. + s := sb.String() + sb.Reset() + if err := stringTable.insert(stringID(id), s); err != nil { + return err + } + } + return nil +} + +// addStacks takes a batch whose first byte is an EvStacks event +// (indicating that the batch contains only stacks) and adds each +// string contained therein to the provided stacks map. +func addStacks(stackTable *dataTable[stackID, stack], b batch) error { + if !b.isStacksBatch() { + return fmt.Errorf("internal error: addStacks called on non-stacks batch") + } + r := bytes.NewReader(b.data) + hdr, err := r.ReadByte() // Consume the EvStacks byte. + if err != nil || event.Type(hdr) != go122.EvStacks { + return fmt.Errorf("missing stacks batch header") + } + + for r.Len() != 0 { + // Read the header. + ev, err := r.ReadByte() + if err != nil { + return err + } + if event.Type(ev) != go122.EvStack { + return fmt.Errorf("expected stack event, got %d", ev) + } + + // Read the stack's ID. + id, err := binary.ReadUvarint(r) + if err != nil { + return err + } + + // Read how many frames are in each stack. + nFrames, err := binary.ReadUvarint(r) + if err != nil { + return err + } + if nFrames > go122.MaxFramesPerStack { + return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, go122.MaxFramesPerStack) + } + + // Each frame consists of 4 fields: pc, funcID (string), fileID (string), line. + frames := make([]frame, 0, nFrames) + for i := uint64(0); i < nFrames; i++ { + // Read the frame data. + pc, err := binary.ReadUvarint(r) + if err != nil { + return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err) + } + funcID, err := binary.ReadUvarint(r) + if err != nil { + return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err) + } + fileID, err := binary.ReadUvarint(r) + if err != nil { + return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err) + } + line, err := binary.ReadUvarint(r) + if err != nil { + return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err) + } + frames = append(frames, frame{ + pc: pc, + funcID: stringID(funcID), + fileID: stringID(fileID), + line: line, + }) + } + + // Add the stack to the map. + if err := stackTable.insert(stackID(id), stack{frames: frames}); err != nil { + return err + } + } + return nil +} + +// addCPUSamples takes a batch whose first byte is an EvCPUSamples event +// (indicating that the batch contains only CPU samples) and adds each +// sample contained therein to the provided samples list. +func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) { + if !b.isCPUSamplesBatch() { + return nil, fmt.Errorf("internal error: addStrings called on non-string batch") + } + r := bytes.NewReader(b.data) + hdr, err := r.ReadByte() // Consume the EvCPUSamples byte. + if err != nil || event.Type(hdr) != go122.EvCPUSamples { + return nil, fmt.Errorf("missing CPU samples batch header") + } + + for r.Len() != 0 { + // Read the header. + ev, err := r.ReadByte() + if err != nil { + return nil, err + } + if event.Type(ev) != go122.EvCPUSample { + return nil, fmt.Errorf("expected CPU sample event, got %d", ev) + } + + // Read the sample's timestamp. + ts, err := binary.ReadUvarint(r) + if err != nil { + return nil, err + } + + // Read the sample's M. + m, err := binary.ReadUvarint(r) + if err != nil { + return nil, err + } + mid := ThreadID(m) + + // Read the sample's P. + p, err := binary.ReadUvarint(r) + if err != nil { + return nil, err + } + pid := ProcID(p) + + // Read the sample's G. + g, err := binary.ReadUvarint(r) + if err != nil { + return nil, err + } + goid := GoID(g) + if g == 0 { + goid = NoGoroutine + } + + // Read the sample's stack. + s, err := binary.ReadUvarint(r) + if err != nil { + return nil, err + } + + // Add the sample to the slice. + samples = append(samples, cpuSample{ + schedCtx: schedCtx{ + M: mid, + P: pid, + G: goid, + }, + time: Time(ts), // N.B. this is really a "timestamp," not a Time. + stack: stackID(s), + }) + } + return samples, nil +} + +// parseFreq parses out a lone EvFrequency from a batch. +func parseFreq(b batch) (frequency, error) { + if !b.isFreqBatch() { + return 0, fmt.Errorf("internal error: parseFreq called on non-frequency batch") + } + r := bytes.NewReader(b.data) + r.ReadByte() // Consume the EvFrequency byte. + + // Read the frequency. It'll come out as timestamp units per second. + f, err := binary.ReadUvarint(r) + if err != nil { + return 0, err + } + // Convert to nanoseconds per timestamp unit. + return frequency(1.0 / (float64(f) / 1e9)), nil +} |