aboutsummaryrefslogtreecommitdiff
path: root/src/internal/trace/reader.go
diff options
context:
space:
mode:
authorMichael Anthony Knyszek <mknyszek@google.com>2025-08-05 21:37:07 +0000
committerMichael Knyszek <mknyszek@google.com>2025-08-15 14:01:30 -0700
commit4a7fde922ff12dce9d4e00f1b1e658fa907e488d (patch)
tree74b130e4a531bc6bce41cb34e66431d3966a80b8 /src/internal/trace/reader.go
parentcb814bd5bc3f0575e8d0e26370c05456770cb3da (diff)
downloadgo-4a7fde922ff12dce9d4e00f1b1e658fa907e488d.tar.xz
internal/trace: add end-of-generation signal to trace
This change takes the EvEndOfGeneration event and promotes it to a real event that appears in the trace. This allows the trace parser to unambiguously identify truncated traces vs. broken traces. It also makes a lot of the logic around parsing simpler, because there's no more batch spilling necessary. Fixes #73904. Change-Id: I37c359b32b6b5f894825aafc02921adeaacf2595 Reviewed-on: https://go-review.googlesource.com/c/go/+/693398 Reviewed-by: Carlos Amedee <carlos@golang.org> Reviewed-by: Michael Pratt <mpratt@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Diffstat (limited to 'src/internal/trace/reader.go')
-rw-r--r--src/internal/trace/reader.go153
1 files changed, 99 insertions, 54 deletions
diff --git a/src/internal/trace/reader.go b/src/internal/trace/reader.go
index 83b5a2f123..5a094277fb 100644
--- a/src/internal/trace/reader.go
+++ b/src/internal/trace/reader.go
@@ -6,6 +6,7 @@ package trace
import (
"bufio"
+ "errors"
"fmt"
"io"
"slices"
@@ -22,18 +23,28 @@ import (
// event as the first event, and a Sync event as the last event.
// (There may also be any number of Sync events in the middle, too.)
type Reader struct {
- version version.Version
- r *bufio.Reader
- lastTs Time
- gen *generation
+ version version.Version
+ r *bufio.Reader
+ lastTs Time
+ gen *generation
+ frontier []*batchCursor
+ cpuSamples []cpuSample
+ order ordering
+ syncs int
+ done bool
+
+ // Spill state.
+ //
+ // Traces before Go 1.26 had no explicit end-of-generation signal, and
+ // so the first batch of the next generation needed to be parsed to identify
+ // a new generation. This batch is the "spilled" so we don't lose track
+ // of it when parsing the next generation.
+ //
+ // This is unnecessary after Go 1.26 because of an explicit end-of-generation
+ // signal.
spill *spilledBatch
spillErr error // error from reading spill
spillErrSync bool // whether we emitted a Sync before reporting spillErr
- frontier []*batchCursor
- cpuSamples []cpuSample
- order ordering
- syncs int
- done bool
v1Events *traceV1Converter
}
@@ -54,7 +65,7 @@ func NewReader(r io.Reader) (*Reader, error) {
return &Reader{
v1Events: convertV1Trace(tr),
}, nil
- case version.Go122, version.Go123, version.Go125:
+ case version.Go122, version.Go123, version.Go125, version.Go126:
return &Reader{
version: v,
r: br,
@@ -139,52 +150,14 @@ func (r *Reader) ReadEvent() (e Event, err error) {
// Check if we need to refresh the generation.
if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
- if r.spillErr != nil {
- if r.spillErrSync {
- return Event{}, r.spillErr
- }
- r.spillErrSync = true
- r.syncs++
- return syncEvent(nil, r.lastTs, r.syncs), nil
+ if r.version < version.Go126 {
+ return r.nextGenWithSpill()
}
- if r.gen != nil && r.spill == nil {
- // If we have a generation from the last read,
- // and there's nothing left in the frontier, and
- // there's no spilled batch, indicating that there's
- // no further generation, it means we're done.
- // Emit the final sync event.
- r.done = true
- r.syncs++
- return syncEvent(nil, r.lastTs, r.syncs), nil
- }
- // Read the next generation.
- r.gen, r.spill, r.spillErr = readGeneration(r.r, r.spill, r.version)
- if r.gen == nil {
- r.spillErrSync = true
- r.syncs++
- return syncEvent(nil, r.lastTs, r.syncs), nil
- }
-
- // Reset CPU samples cursor.
- r.cpuSamples = r.gen.cpuSamples
-
- // Reset frontier.
- for _, m := range r.gen.batchMs {
- batches := r.gen.batches[m]
- bc := &batchCursor{m: m}
- ok, err := bc.nextEvent(batches, r.gen.freq)
- if err != nil {
- return Event{}, err
- }
- if !ok {
- // Turns out there aren't actually any events in these batches.
- continue
- }
- r.frontier = heapInsert(r.frontier, bc)
+ gen, err := readGeneration(r.r, r.version)
+ if err != nil {
+ return Event{}, err
}
- r.syncs++
- // Always emit a sync event at the beginning of the generation.
- return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil
+ return r.installGen(gen)
}
tryAdvance := func(i int) (bool, error) {
bc := r.frontier[i]
@@ -251,6 +224,78 @@ func (r *Reader) ReadEvent() (e Event, err error) {
return ev, nil
}
+// nextGenWithSpill reads the generation and calls nextGen while
+// also handling any spilled batches.
+func (r *Reader) nextGenWithSpill() (Event, error) {
+ if r.version >= version.Go126 {
+ return Event{}, errors.New("internal error: nextGenWithSpill called for Go 1.26+ trace")
+ }
+ if r.spillErr != nil {
+ if r.spillErrSync {
+ return Event{}, r.spillErr
+ }
+ r.spillErrSync = true
+ r.syncs++
+ return syncEvent(nil, r.lastTs, r.syncs), nil
+ }
+ if r.gen != nil && r.spill == nil {
+ // If we have a generation from the last read,
+ // and there's nothing left in the frontier, and
+ // there's no spilled batch, indicating that there's
+ // no further generation, it means we're done.
+ // Emit the final sync event.
+ r.done = true
+ r.syncs++
+ return syncEvent(nil, r.lastTs, r.syncs), nil
+ }
+
+ // Read the next generation.
+ var gen *generation
+ gen, r.spill, r.spillErr = readGenerationWithSpill(r.r, r.spill, r.version)
+ if gen == nil {
+ r.gen = nil
+ r.spillErrSync = true
+ r.syncs++
+ return syncEvent(nil, r.lastTs, r.syncs), nil
+ }
+ return r.installGen(gen)
+}
+
+// installGen installs the new generation into the Reader and returns
+// a Sync event for the new generation.
+func (r *Reader) installGen(gen *generation) (Event, error) {
+ if gen == nil {
+ // Emit the final sync event.
+ r.gen = nil
+ r.done = true
+ r.syncs++
+ return syncEvent(nil, r.lastTs, r.syncs), nil
+ }
+ r.gen = gen
+
+ // Reset CPU samples cursor.
+ r.cpuSamples = r.gen.cpuSamples
+
+ // Reset frontier.
+ for _, m := range r.gen.batchMs {
+ batches := r.gen.batches[m]
+ bc := &batchCursor{m: m}
+ ok, err := bc.nextEvent(batches, r.gen.freq)
+ if err != nil {
+ return Event{}, err
+ }
+ if !ok {
+ // Turns out there aren't actually any events in these batches.
+ continue
+ }
+ r.frontier = heapInsert(r.frontier, bc)
+ }
+ r.syncs++
+
+ // Always emit a sync event at the beginning of the generation.
+ return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil
+}
+
func dumpFrontier(frontier []*batchCursor) string {
var sb strings.Builder
for _, bc := range frontier {