aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Amsterdam <jba@google.com>2021-06-09 07:28:03 -0400
committerJonathan Amsterdam <jba@google.com>2021-06-09 17:56:31 +0000
commit3f6ba65acd671c661c4cc4ec7396f9216a5a656d (patch)
treedb99aa5ae7c743fe6955345884909c9aaab58785
parent6606008756175243019de712757dc9ce8499a276 (diff)
downloadgo-x-pkgsite-3f6ba65acd671c661c4cc4ec7396f9216a5a656d.tar.xz
internal/worker: add metric for new unprocessed modules
We have a metric for total unprocessed modules, including new ones and those we are reprocessing. Add a metric that tracks only the new ones: those with status 0 or 500 (the latter to count the number of failed new modules). This metric is a better choice for an alert, since during reprocessing we expect a very large total backlog, but we never want a large backlog of new modules. Change-Id: Ibc3cbee1d867f6a454748237352a70cf9eb500c0 Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/326290 Trust: Jonathan Amsterdam <jba@google.com> Run-TryBot: Jonathan Amsterdam <jba@google.com> TryBot-Result: kokoro <noreply+kokoro@google.com> Reviewed-by: Julie Qiu <julie@golang.org>
-rw-r--r--internal/postgres/postgres.go19
-rw-r--r--internal/worker/metrics.go18
-rw-r--r--internal/worker/server.go6
3 files changed, 32 insertions, 11 deletions
diff --git a/internal/postgres/postgres.go b/internal/postgres/postgres.go
index 4e1dafe7..b8ef1925 100644
--- a/internal/postgres/postgres.go
+++ b/internal/postgres/postgres.go
@@ -113,15 +113,22 @@ func (db *DB) StalenessTimestamp(ctx context.Context) (time.Time, error) {
}
// NumUnprocessedModules returns the number of modules that need to be processed.
-func (db *DB) NumUnprocessedModules(ctx context.Context) (int, error) {
- var n int
- err := db.db.QueryRow(ctx, `
+func (db *DB) NumUnprocessedModules(ctx context.Context) (total, new int, err error) {
+ defer derrors.Wrap(&err, "NumUnprocessedModules()")
+
+ err = db.db.QueryRow(ctx, `
SELECT COUNT(*) FROM module_version_states WHERE status = 0 OR status >= 500
- `).Scan(&n)
+ `).Scan(&total)
+ if err != nil {
+ return 0, 0, err
+ }
+ err = db.db.QueryRow(ctx, `
+ SELECT COUNT(*) FROM module_version_states WHERE status = 0 OR status = 500
+ `).Scan(&new)
if err != nil {
- return 0, err
+ return 0, 0, err
}
- return n, nil
+ return total, new, nil
}
// collectStrings runs the query, which must select for a single string column, and returns
diff --git a/internal/worker/metrics.go b/internal/worker/metrics.go
index bba0bc53..ff155df7 100644
--- a/internal/worker/metrics.go
+++ b/internal/worker/metrics.go
@@ -56,6 +56,19 @@ var (
Aggregation: view.LastValue(),
Description: "number of unprocessed modules",
}
+
+ unprocessedNewModules = stats.Int64(
+ "go-discovery/unprocessed_new_modules_count",
+ "Number of new unprocessed modules (status = 0 or 500).",
+ stats.UnitDimensionless,
+ )
+
+ UnprocessedNewModules = &view.View{
+ Name: "go-discovery/unprocessed_new_modules/count",
+ Measure: unprocessedNewModules,
+ Aggregation: view.LastValue(),
+ Description: "number of unprocessed new modules",
+ }
)
func recordEnqueue(ctx context.Context, status int) {
@@ -68,6 +81,7 @@ func recordProcessingLag(ctx context.Context, d time.Duration) {
stats.Record(ctx, processingLag.M(d.Milliseconds()/1000))
}
-func recordUnprocessedModules(ctx context.Context, n int) {
- stats.Record(ctx, unprocessedModules.M(int64(n)))
+func recordUnprocessedModules(ctx context.Context, total, new int) {
+ stats.Record(ctx, unprocessedModules.M(int64(total)))
+ stats.Record(ctx, unprocessedNewModules.M(int64(new)))
}
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 88da9676..9f299a5e 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -401,12 +401,12 @@ func (s *Server) computeProcessingLag(ctx context.Context) {
}
func (s *Server) computeUnprocessedModules(ctx context.Context) {
- n, err := s.db.NumUnprocessedModules(ctx)
+ total, new, err := s.db.NumUnprocessedModules(ctx)
if err != nil {
- log.Warningf(ctx, "NumUnprocessedModules: %v", err)
+ log.Warningf(ctx, "%v", err)
return
}
- recordUnprocessedModules(ctx, n)
+ recordUnprocessedModules(ctx, total, new)
}
// handleEnqueue queries the module_version_states table for the next batch of