diff options
| author | Michael Anthony Knyszek <mknyszek@google.com> | 2025-08-05 21:37:07 +0000 |
|---|---|---|
| committer | Michael Knyszek <mknyszek@google.com> | 2025-08-15 14:01:30 -0700 |
| commit | 4a7fde922ff12dce9d4e00f1b1e658fa907e488d (patch) | |
| tree | 74b130e4a531bc6bce41cb34e66431d3966a80b8 /src/runtime/trace | |
| parent | cb814bd5bc3f0575e8d0e26370c05456770cb3da (diff) | |
| download | go-4a7fde922ff12dce9d4e00f1b1e658fa907e488d.tar.xz | |
internal/trace: add end-of-generation signal to trace
This change takes the EvEndOfGeneration event and promotes it to a real
event that appears in the trace.
This allows the trace parser to unambiguously identify truncated traces
vs. broken traces. It also makes a lot of the logic around parsing
simpler, because there's no more batch spilling necessary.
Fixes #73904.
Change-Id: I37c359b32b6b5f894825aafc02921adeaacf2595
Reviewed-on: https://go-review.googlesource.com/c/go/+/693398
Reviewed-by: Carlos Amedee <carlos@golang.org>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Diffstat (limited to 'src/runtime/trace')
| -rw-r--r-- | src/runtime/trace/batch.go | 57 | ||||
| -rw-r--r-- | src/runtime/trace/flightrecorder.go | 4 | ||||
| -rw-r--r-- | src/runtime/trace/recorder.go | 12 | ||||
| -rw-r--r-- | src/runtime/trace/subscribe.go | 17 |
4 files changed, 47 insertions, 43 deletions
diff --git a/src/runtime/trace/batch.go b/src/runtime/trace/batch.go index d726a3d375..f8b0a96b3f 100644 --- a/src/runtime/trace/batch.go +++ b/src/runtime/trace/batch.go @@ -12,72 +12,77 @@ import ( // timestamp is an unprocessed timestamp. type timestamp uint64 -// batch represents a batch of trace events. -// It is unparsed except for its header. type batch struct { - m threadID time timestamp + gen uint64 data []byte } -// threadID is the runtime-internal M structure's ID. This is unique -// for each OS thread. -type threadID int64 - // readBatch copies b and parses the trace batch header inside. -// Returns the batch, the generation, bytes read, and an error. -func readBatch(b []byte) (batch, uint64, uint64, error) { +// Returns the batch, bytes read, and an error. +func readBatch(b []byte) (batch, uint64, error) { if len(b) == 0 { - return batch{}, 0, 0, fmt.Errorf("batch is empty") + return batch{}, 0, fmt.Errorf("batch is empty") } data := make([]byte, len(b)) - if nw := copy(data, b); nw != len(b) { - return batch{}, 0, 0, fmt.Errorf("unexpected error copying batch") - } + copy(data, b) + // Read batch header byte. + if typ := tracev2.EventType(b[0]); typ == tracev2.EvEndOfGeneration { + if len(b) != 1 { + return batch{}, 1, fmt.Errorf("unexpected end of generation in batch of size >1") + } + return batch{data: data}, 1, nil + } if typ := tracev2.EventType(b[0]); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch { - return batch{}, 0, 1, fmt.Errorf("expected batch event, got event %d", typ) + return batch{}, 1, fmt.Errorf("expected batch event, got event %d", typ) } - - // Read the batch header: gen (generation), thread (M) ID, base timestamp - // for the batch. total := 1 b = b[1:] + + // Read the generation gen, n, err := readUvarint(b) if err != nil { - return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch gen: %w", err) + return batch{}, uint64(total + n), fmt.Errorf("error reading batch gen: %w", err) } total += n b = b[n:] - m, n, err := readUvarint(b) + + // Read the M (discard it). + _, n, err = readUvarint(b) if err != nil { - return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch M ID: %w", err) + return batch{}, uint64(total + n), fmt.Errorf("error reading batch M ID: %w", err) } total += n b = b[n:] + + // Read the timestamp. ts, n, err := readUvarint(b) if err != nil { - return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch timestamp: %w", err) + return batch{}, uint64(total + n), fmt.Errorf("error reading batch timestamp: %w", err) } total += n b = b[n:] - // Read in the size of the batch to follow. + // Read the size of the batch to follow. size, n, err := readUvarint(b) if err != nil { - return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch size: %w", err) + return batch{}, uint64(total + n), fmt.Errorf("error reading batch size: %w", err) } if size > tracev2.MaxBatchSize { - return batch{}, gen, uint64(total + n), fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize) + return batch{}, uint64(total + n), fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize) } total += n total += int(size) + if total != len(data) { + return batch{}, uint64(total), fmt.Errorf("expected complete batch") + } data = data[:total] // Return the batch. return batch{ - m: threadID(m), + gen: gen, time: timestamp(ts), data: data, - }, gen, uint64(total), nil + }, uint64(total), nil } diff --git a/src/runtime/trace/flightrecorder.go b/src/runtime/trace/flightrecorder.go index b0b75ceb60..99ee4d060d 100644 --- a/src/runtime/trace/flightrecorder.go +++ b/src/runtime/trace/flightrecorder.go @@ -141,9 +141,9 @@ func (fr *FlightRecorder) WriteTo(w io.Writer) (n int64, err error) { // Write all the data. for _, gen := range gens { - for _, batch := range gen.batches { + for _, data := range gen.batches { // Write batch data. - nw, err = w.Write(batch.data) + nw, err = w.Write(data) n += int64(nw) if err != nil { return n, err diff --git a/src/runtime/trace/recorder.go b/src/runtime/trace/recorder.go index bf8d7ce647..4f2d3aa92a 100644 --- a/src/runtime/trace/recorder.go +++ b/src/runtime/trace/recorder.go @@ -41,21 +41,21 @@ func (w *recorder) Write(b []byte) (n int, err error) { if len(b) == n { return 0, nil } - ba, gen, nb, err := readBatch(b[n:]) // Every write from the runtime is guaranteed to be a complete batch. + ba, nb, err := readBatch(b[n:]) // Every write from the runtime is guaranteed to be a complete batch. if err != nil { return len(b) - int(nb) - n, err } n += int(nb) // Append the batch to the current generation. - if r.active.gen == 0 { - r.active.gen = gen + if ba.gen != 0 && r.active.gen == 0 { + r.active.gen = ba.gen } - if r.active.minTime == 0 || r.active.minTime > r.freq.mul(ba.time) { + if ba.time != 0 && (r.active.minTime == 0 || r.active.minTime > r.freq.mul(ba.time)) { r.active.minTime = r.freq.mul(ba.time) } r.active.size += len(ba.data) - r.active.batches = append(r.active.batches, ba) + r.active.batches = append(r.active.batches, ba.data) return len(b), nil } @@ -99,7 +99,7 @@ type rawGeneration struct { gen uint64 size int minTime eventTime - batches []batch + batches [][]byte } func traceTimeNow(freq frequency) eventTime { diff --git a/src/runtime/trace/subscribe.go b/src/runtime/trace/subscribe.go index 7e22b6abdb..a4d653dcae 100644 --- a/src/runtime/trace/subscribe.go +++ b/src/runtime/trace/subscribe.go @@ -155,7 +155,7 @@ func (t *traceMultiplexer) startLocked() error { t.subscribersMu.Unlock() go func() { - header := runtime_readTrace() + header := runtime.ReadTrace() if traceStartWriter != nil { traceStartWriter.Write(header) } @@ -164,10 +164,16 @@ func (t *traceMultiplexer) startLocked() error { } for { - data := runtime_readTrace() + data := runtime.ReadTrace() if data == nil { break } + if traceStartWriter != nil { + traceStartWriter.Write(data) + } + if flightRecorder != nil { + flightRecorder.Write(data) + } if len(data) == 1 && tracev2.EventType(data[0]) == tracev2.EvEndOfGeneration { if flightRecorder != nil { flightRecorder.endGeneration() @@ -187,13 +193,6 @@ func (t *traceMultiplexer) startLocked() error { if frIsNew { flightRecorder.Write(header) } - } else { - if traceStartWriter != nil { - traceStartWriter.Write(data) - } - if flightRecorder != nil { - flightRecorder.Write(data) - } } } }() |
