aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
Diffstat (limited to 'cmd')
-rw-r--r--cmd/frontend/main.go23
-rw-r--r--cmd/worker/main.go34
2 files changed, 45 insertions, 12 deletions
diff --git a/cmd/frontend/main.go b/cmd/frontend/main.go
index b438c8be..303b9616 100644
--- a/cmd/frontend/main.go
+++ b/cmd/frontend/main.go
@@ -33,6 +33,7 @@ import (
"golang.org/x/pkgsite/internal/proxy"
"golang.org/x/pkgsite/internal/queue"
"golang.org/x/pkgsite/internal/queue/gcpqueue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/static"
"golang.org/x/pkgsite/internal/trace"
@@ -119,12 +120,26 @@ func main() {
// The closure passed to queue.New is only used for testing and local
// execution, not in production. So it's okay that it doesn't use a
// per-request connection.
- fetchQueue, err = gcpqueue.New(ctx, cfg, queueName, *workers, expg,
- func(ctx context.Context, modulePath, version string) (int, error) {
+ if serverconfig.OnGCP() {
+ q, err := gcpqueue.New(ctx, cfg, queueName, *workers)
+ if err != nil {
+ log.Fatalf(ctx, "error creating GCP queue: %v", err)
+ }
+ fetchQueue = q
+ } else {
+ experiments, err := expg(ctx)
+ if err != nil {
+ log.Fatalf(ctx, "error getting experiment: %v", err)
+ }
+ var names []string
+ for _, e := range experiments {
+ if e.Rollout > 0 {
+ names = append(names, e.Name)
+ }
+ }
+ fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) {
return fetchserver.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db)
})
- if err != nil {
- log.Fatalf(ctx, "gcpqueue.New: %v", err)
}
}
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 06990b2b..712301ec 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -28,7 +28,9 @@ import (
"golang.org/x/pkgsite/internal/middleware"
mtimeout "golang.org/x/pkgsite/internal/middleware/timeout"
"golang.org/x/pkgsite/internal/proxy"
+ "golang.org/x/pkgsite/internal/queue"
"golang.org/x/pkgsite/internal/queue/gcpqueue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/trace"
"golang.org/x/pkgsite/internal/worker"
@@ -96,18 +98,34 @@ func main() {
Timeout: config.SourceTimeout,
})
expg := cmdconfig.ExperimentGetter(ctx, cfg)
- fetchQueue, err := gcpqueue.New(ctx, cfg, queueName, *workers, expg,
- func(ctx context.Context, modulePath, version string) (int, error) {
- f := &worker.Fetcher{
- ProxyClient: proxyClient,
- SourceClient: sourceClient,
- DB: db,
+
+ var fetchQueue queue.Queue
+ if serverconfig.OnGCP() {
+ q, err := gcpqueue.New(ctx, cfg, queueName, *workers)
+ if err != nil {
+ log.Fatalf(ctx, "error creating GCP queue: %v", err)
+ }
+ fetchQueue = q
+ } else {
+ experiments, err := expg(ctx)
+ if err != nil {
+ log.Fatalf(ctx, "error getting experiment: %v", err)
+ }
+ var names []string
+ for _, e := range experiments {
+ if e.Rollout > 0 {
+ names = append(names, e.Name)
}
+ }
+ f := &worker.Fetcher{
+ ProxyClient: proxyClient,
+ SourceClient: sourceClient,
+ DB: db,
+ }
+ fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) {
code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel())
return code, err
})
- if err != nil {
- log.Fatalf(ctx, "gcpqueue.New: %v", err)
}
reporter := cmdconfig.Reporter(ctx, cfg)