diff options
| author | Michael Anthony Knyszek <mknyszek@google.com> | 2025-08-05 21:37:07 +0000 |
|---|---|---|
| committer | Michael Knyszek <mknyszek@google.com> | 2025-08-15 14:01:30 -0700 |
| commit | 4a7fde922ff12dce9d4e00f1b1e658fa907e488d (patch) | |
| tree | 74b130e4a531bc6bce41cb34e66431d3966a80b8 /src/internal/trace/reader.go | |
| parent | cb814bd5bc3f0575e8d0e26370c05456770cb3da (diff) | |
| download | go-4a7fde922ff12dce9d4e00f1b1e658fa907e488d.tar.xz | |
internal/trace: add end-of-generation signal to trace
This change takes the EvEndOfGeneration event and promotes it to a real
event that appears in the trace.
This allows the trace parser to unambiguously identify truncated traces
vs. broken traces. It also makes a lot of the logic around parsing
simpler, because there's no more batch spilling necessary.
Fixes #73904.
Change-Id: I37c359b32b6b5f894825aafc02921adeaacf2595
Reviewed-on: https://go-review.googlesource.com/c/go/+/693398
Reviewed-by: Carlos Amedee <carlos@golang.org>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Diffstat (limited to 'src/internal/trace/reader.go')
| -rw-r--r-- | src/internal/trace/reader.go | 153 |
1 files changed, 99 insertions, 54 deletions
diff --git a/src/internal/trace/reader.go b/src/internal/trace/reader.go index 83b5a2f123..5a094277fb 100644 --- a/src/internal/trace/reader.go +++ b/src/internal/trace/reader.go @@ -6,6 +6,7 @@ package trace import ( "bufio" + "errors" "fmt" "io" "slices" @@ -22,18 +23,28 @@ import ( // event as the first event, and a Sync event as the last event. // (There may also be any number of Sync events in the middle, too.) type Reader struct { - version version.Version - r *bufio.Reader - lastTs Time - gen *generation + version version.Version + r *bufio.Reader + lastTs Time + gen *generation + frontier []*batchCursor + cpuSamples []cpuSample + order ordering + syncs int + done bool + + // Spill state. + // + // Traces before Go 1.26 had no explicit end-of-generation signal, and + // so the first batch of the next generation needed to be parsed to identify + // a new generation. This batch is the "spilled" so we don't lose track + // of it when parsing the next generation. + // + // This is unnecessary after Go 1.26 because of an explicit end-of-generation + // signal. spill *spilledBatch spillErr error // error from reading spill spillErrSync bool // whether we emitted a Sync before reporting spillErr - frontier []*batchCursor - cpuSamples []cpuSample - order ordering - syncs int - done bool v1Events *traceV1Converter } @@ -54,7 +65,7 @@ func NewReader(r io.Reader) (*Reader, error) { return &Reader{ v1Events: convertV1Trace(tr), }, nil - case version.Go122, version.Go123, version.Go125: + case version.Go122, version.Go123, version.Go125, version.Go126: return &Reader{ version: v, r: br, @@ -139,52 +150,14 @@ func (r *Reader) ReadEvent() (e Event, err error) { // Check if we need to refresh the generation. if len(r.frontier) == 0 && len(r.cpuSamples) == 0 { - if r.spillErr != nil { - if r.spillErrSync { - return Event{}, r.spillErr - } - r.spillErrSync = true - r.syncs++ - return syncEvent(nil, r.lastTs, r.syncs), nil + if r.version < version.Go126 { + return r.nextGenWithSpill() } - if r.gen != nil && r.spill == nil { - // If we have a generation from the last read, - // and there's nothing left in the frontier, and - // there's no spilled batch, indicating that there's - // no further generation, it means we're done. - // Emit the final sync event. - r.done = true - r.syncs++ - return syncEvent(nil, r.lastTs, r.syncs), nil - } - // Read the next generation. - r.gen, r.spill, r.spillErr = readGeneration(r.r, r.spill, r.version) - if r.gen == nil { - r.spillErrSync = true - r.syncs++ - return syncEvent(nil, r.lastTs, r.syncs), nil - } - - // Reset CPU samples cursor. - r.cpuSamples = r.gen.cpuSamples - - // Reset frontier. - for _, m := range r.gen.batchMs { - batches := r.gen.batches[m] - bc := &batchCursor{m: m} - ok, err := bc.nextEvent(batches, r.gen.freq) - if err != nil { - return Event{}, err - } - if !ok { - // Turns out there aren't actually any events in these batches. - continue - } - r.frontier = heapInsert(r.frontier, bc) + gen, err := readGeneration(r.r, r.version) + if err != nil { + return Event{}, err } - r.syncs++ - // Always emit a sync event at the beginning of the generation. - return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil + return r.installGen(gen) } tryAdvance := func(i int) (bool, error) { bc := r.frontier[i] @@ -251,6 +224,78 @@ func (r *Reader) ReadEvent() (e Event, err error) { return ev, nil } +// nextGenWithSpill reads the generation and calls nextGen while +// also handling any spilled batches. +func (r *Reader) nextGenWithSpill() (Event, error) { + if r.version >= version.Go126 { + return Event{}, errors.New("internal error: nextGenWithSpill called for Go 1.26+ trace") + } + if r.spillErr != nil { + if r.spillErrSync { + return Event{}, r.spillErr + } + r.spillErrSync = true + r.syncs++ + return syncEvent(nil, r.lastTs, r.syncs), nil + } + if r.gen != nil && r.spill == nil { + // If we have a generation from the last read, + // and there's nothing left in the frontier, and + // there's no spilled batch, indicating that there's + // no further generation, it means we're done. + // Emit the final sync event. + r.done = true + r.syncs++ + return syncEvent(nil, r.lastTs, r.syncs), nil + } + + // Read the next generation. + var gen *generation + gen, r.spill, r.spillErr = readGenerationWithSpill(r.r, r.spill, r.version) + if gen == nil { + r.gen = nil + r.spillErrSync = true + r.syncs++ + return syncEvent(nil, r.lastTs, r.syncs), nil + } + return r.installGen(gen) +} + +// installGen installs the new generation into the Reader and returns +// a Sync event for the new generation. +func (r *Reader) installGen(gen *generation) (Event, error) { + if gen == nil { + // Emit the final sync event. + r.gen = nil + r.done = true + r.syncs++ + return syncEvent(nil, r.lastTs, r.syncs), nil + } + r.gen = gen + + // Reset CPU samples cursor. + r.cpuSamples = r.gen.cpuSamples + + // Reset frontier. + for _, m := range r.gen.batchMs { + batches := r.gen.batches[m] + bc := &batchCursor{m: m} + ok, err := bc.nextEvent(batches, r.gen.freq) + if err != nil { + return Event{}, err + } + if !ok { + // Turns out there aren't actually any events in these batches. + continue + } + r.frontier = heapInsert(r.frontier, bc) + } + r.syncs++ + + // Always emit a sync event at the beginning of the generation. + return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil +} + func dumpFrontier(frontier []*batchCursor) string { var sb strings.Builder for _, bc := range frontier { |
