aboutsummaryrefslogtreecommitdiff
path: root/cmd/worker
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/worker')
-rw-r--r--cmd/worker/main.go34
1 files changed, 26 insertions, 8 deletions
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 06990b2b..712301ec 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -28,7 +28,9 @@ import (
"golang.org/x/pkgsite/internal/middleware"
mtimeout "golang.org/x/pkgsite/internal/middleware/timeout"
"golang.org/x/pkgsite/internal/proxy"
+ "golang.org/x/pkgsite/internal/queue"
"golang.org/x/pkgsite/internal/queue/gcpqueue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/trace"
"golang.org/x/pkgsite/internal/worker"
@@ -96,18 +98,34 @@ func main() {
Timeout: config.SourceTimeout,
})
expg := cmdconfig.ExperimentGetter(ctx, cfg)
- fetchQueue, err := gcpqueue.New(ctx, cfg, queueName, *workers, expg,
- func(ctx context.Context, modulePath, version string) (int, error) {
- f := &worker.Fetcher{
- ProxyClient: proxyClient,
- SourceClient: sourceClient,
- DB: db,
+
+ var fetchQueue queue.Queue
+ if serverconfig.OnGCP() {
+ q, err := gcpqueue.New(ctx, cfg, queueName, *workers)
+ if err != nil {
+ log.Fatalf(ctx, "error creating GCP queue: %v", err)
+ }
+ fetchQueue = q
+ } else {
+ experiments, err := expg(ctx)
+ if err != nil {
+ log.Fatalf(ctx, "error getting experiment: %v", err)
+ }
+ var names []string
+ for _, e := range experiments {
+ if e.Rollout > 0 {
+ names = append(names, e.Name)
}
+ }
+ f := &worker.Fetcher{
+ ProxyClient: proxyClient,
+ SourceClient: sourceClient,
+ DB: db,
+ }
+ fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) {
code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel())
return code, err
})
- if err != nil {
- log.Fatalf(ctx, "gcpqueue.New: %v", err)
}
reporter := cmdconfig.Reporter(ctx, cfg)