aboutsummaryrefslogtreecommitdiff
path: root/internal/queue/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/queue/queue.go')
-rw-r--r--internal/queue/queue.go8
1 files changed, 6 insertions, 2 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index baabe919..b0b1fb9f 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -16,6 +16,7 @@ import (
cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
"golang.org/x/pkgsite/internal/config"
"golang.org/x/pkgsite/internal/derrors"
+ "golang.org/x/pkgsite/internal/experiment"
"golang.org/x/pkgsite/internal/log"
"golang.org/x/pkgsite/internal/postgres"
"golang.org/x/pkgsite/internal/proxy"
@@ -122,8 +123,9 @@ type InMemory struct {
sourceClient *source.Client
db *postgres.DB
- queue chan moduleVersion
- sem chan struct{}
+ queue chan moduleVersion
+ sem chan struct{}
+ experiments *experiment.Set
}
// NewInMemory creates a new InMemory that asynchronously fetches
@@ -137,6 +139,7 @@ func NewInMemory(ctx context.Context, proxyClient *proxy.Client, sourceClient *s
db: db,
queue: make(chan moduleVersion, 1000),
sem: make(chan struct{}, workerCount),
+ experiments: experiment.FromContext(ctx),
}
go q.process(ctx, processFunc)
return q
@@ -159,6 +162,7 @@ func (q *InMemory) process(ctx context.Context, processFunc func(context.Context
log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(q.sem))
fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
+ fetchCtx = experiment.NewContext(fetchCtx, q.experiments)
defer cancel()
if _, err := processFunc(fetchCtx, v.modulePath, v.version, q.proxyClient, q.sourceClient, q.db); err != nil {