aboutsummaryrefslogtreecommitdiff
path: root/internal/queue/queue.go
diff options
context:
space:
mode:
authorJulie Qiu <julie@golang.org>2020-05-24 10:37:48 -0400
committerJulie Qiu <julieqiu@google.com>2020-05-26 22:51:16 +0000
commit2cfca4659c70faac28f57f9d8de76562f7165df7 (patch)
tree210614a41d16225196232127f5c0df24223c11d9 /internal/queue/queue.go
parent79aa726c0e0167797c009d689500024e954b7d23 (diff)
downloadgo-x-pkgsite-2cfca4659c70faac28f57f9d8de76562f7165df7.tar.xz
cmd,internal/queue: pass experiments to InMemoryQueue
At the moment, experiment flags won't be set after they are passed from a service to the InMemoryQueue. Experiments with a rollout > 0 are now set when initiating the InMemoryQueue for local development with cmd/frontend and cmd/worker. Change-Id: I209024e8281b735d37b4e76d81b4b05ff500e7c3 Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/753601 CI-Result: Cloud Build <devtools-proctor-result-processor@system.gserviceaccount.com> Reviewed-by: Jonathan Amsterdam <jba@google.com>
Diffstat (limited to 'internal/queue/queue.go')
-rw-r--r--internal/queue/queue.go8
1 files changed, 6 insertions, 2 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index baabe919..b0b1fb9f 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -16,6 +16,7 @@ import (
cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
"golang.org/x/pkgsite/internal/config"
"golang.org/x/pkgsite/internal/derrors"
+ "golang.org/x/pkgsite/internal/experiment"
"golang.org/x/pkgsite/internal/log"
"golang.org/x/pkgsite/internal/postgres"
"golang.org/x/pkgsite/internal/proxy"
@@ -122,8 +123,9 @@ type InMemory struct {
sourceClient *source.Client
db *postgres.DB
- queue chan moduleVersion
- sem chan struct{}
+ queue chan moduleVersion
+ sem chan struct{}
+ experiments *experiment.Set
}
// NewInMemory creates a new InMemory that asynchronously fetches
@@ -137,6 +139,7 @@ func NewInMemory(ctx context.Context, proxyClient *proxy.Client, sourceClient *s
db: db,
queue: make(chan moduleVersion, 1000),
sem: make(chan struct{}, workerCount),
+ experiments: experiment.FromContext(ctx),
}
go q.process(ctx, processFunc)
return q
@@ -159,6 +162,7 @@ func (q *InMemory) process(ctx context.Context, processFunc func(context.Context
log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(q.sem))
fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
+ fetchCtx = experiment.NewContext(fetchCtx, q.experiments)
defer cancel()
if _, err := processFunc(fetchCtx, v.modulePath, v.version, q.proxyClient, q.sourceClient, q.db); err != nil {