diff options
| author | Julie Qiu <julie@golang.org> | 2020-05-24 10:37:48 -0400 |
|---|---|---|
| committer | Julie Qiu <julieqiu@google.com> | 2020-05-26 22:51:16 +0000 |
| commit | 2cfca4659c70faac28f57f9d8de76562f7165df7 (patch) | |
| tree | 210614a41d16225196232127f5c0df24223c11d9 /internal/queue/queue.go | |
| parent | 79aa726c0e0167797c009d689500024e954b7d23 (diff) | |
| download | go-x-pkgsite-2cfca4659c70faac28f57f9d8de76562f7165df7.tar.xz | |
cmd,internal/queue: pass experiments to InMemoryQueue
At the moment, experiment flags won't be set after they are passed from
a service to the InMemoryQueue. Experiments with a rollout > 0 are now
set when initiating the InMemoryQueue for local development with
cmd/frontend and cmd/worker.
Change-Id: I209024e8281b735d37b4e76d81b4b05ff500e7c3
Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/753601
CI-Result: Cloud Build <devtools-proctor-result-processor@system.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Diffstat (limited to 'internal/queue/queue.go')
| -rw-r--r-- | internal/queue/queue.go | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index baabe919..b0b1fb9f 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -16,6 +16,7 @@ import ( cloudtasks "cloud.google.com/go/cloudtasks/apiv2" "golang.org/x/pkgsite/internal/config" "golang.org/x/pkgsite/internal/derrors" + "golang.org/x/pkgsite/internal/experiment" "golang.org/x/pkgsite/internal/log" "golang.org/x/pkgsite/internal/postgres" "golang.org/x/pkgsite/internal/proxy" @@ -122,8 +123,9 @@ type InMemory struct { sourceClient *source.Client db *postgres.DB - queue chan moduleVersion - sem chan struct{} + queue chan moduleVersion + sem chan struct{} + experiments *experiment.Set } // NewInMemory creates a new InMemory that asynchronously fetches @@ -137,6 +139,7 @@ func NewInMemory(ctx context.Context, proxyClient *proxy.Client, sourceClient *s db: db, queue: make(chan moduleVersion, 1000), sem: make(chan struct{}, workerCount), + experiments: experiment.FromContext(ctx), } go q.process(ctx, processFunc) return q @@ -159,6 +162,7 @@ func (q *InMemory) process(ctx context.Context, processFunc func(context.Context log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(q.sem)) fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + fetchCtx = experiment.NewContext(fetchCtx, q.experiments) defer cancel() if _, err := processFunc(fetchCtx, v.modulePath, v.version, q.proxyClient, q.sourceClient, q.db); err != nil { |
