aboutsummaryrefslogtreecommitdiff
path: root/cmd/worker/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/worker/main.go')
-rw-r--r--cmd/worker/main.go37
1 files changed, 25 insertions, 12 deletions
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 8f178aa9..bf732bf7 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -31,6 +31,7 @@ import (
"golang.org/x/pkgsite/internal/queue"
"golang.org/x/pkgsite/internal/queue/gcpqueue"
"golang.org/x/pkgsite/internal/queue/inmemqueue"
+ "golang.org/x/pkgsite/internal/queue/pgqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/trace"
"golang.org/x/pkgsite/internal/worker"
@@ -43,6 +44,7 @@ var (
// flag used in call to safehtml/template.TrustedSourceFromFlag
_ = flag.String("static", "static", "path to folder containing static files served")
bypassLicenseCheck = flag.Bool("bypass_license_check", false, "insert all data into the DB, even for non-redistributable paths")
+ queueType = flag.String("queue", "inmemory", `queue implementation when not on GCP: "inmemory" or "postgres"`)
// Ordinarily, index polling is initiated by a separate scheduler that calls
// /poll. But for convenience, you can instead have the worker periodically
@@ -107,25 +109,36 @@ func main() {
}
fetchQueue = q
} else {
- experiments, err := expg(ctx)
- if err != nil {
- log.Fatalf(ctx, "error getting experiment: %v", err)
- }
- var names []string
- for _, e := range experiments {
- if e.Rollout > 0 {
- names = append(names, e.Name)
- }
- }
f := &worker.Fetcher{
ProxyClient: proxyClient,
SourceClient: sourceClient,
DB: db,
}
- fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) {
+ processFunc := func(ctx context.Context, modulePath, version string) (int, error) {
code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel())
return code, err
- })
+ }
+ switch *queueType {
+ case "postgres":
+ q, err := pgqueue.New(ctx, db.Underlying())
+ if err != nil {
+ log.Fatalf(ctx, "error creating postgres queue: %v", err)
+ }
+ go q.Poll(ctx, *workers, processFunc)
+ fetchQueue = q
+ default:
+ experiments, err := expg(ctx)
+ if err != nil {
+ log.Fatalf(ctx, "error getting experiment: %v", err)
+ }
+ var names []string
+ for _, e := range experiments {
+ if e.Rollout > 0 {
+ names = append(names, e.Name)
+ }
+ }
+ fetchQueue = inmemqueue.New(ctx, *workers, names, processFunc)
+ }
}
reporter := cmdconfig.Reporter(ctx, cfg)