diff options
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/frontend/main.go | 33 | ||||
| -rw-r--r-- | cmd/worker/main.go | 37 |
2 files changed, 48 insertions, 22 deletions
diff --git a/cmd/frontend/main.go b/cmd/frontend/main.go index 303b9616..1684981f 100644 --- a/cmd/frontend/main.go +++ b/cmd/frontend/main.go @@ -34,6 +34,7 @@ import ( "golang.org/x/pkgsite/internal/queue" "golang.org/x/pkgsite/internal/queue/gcpqueue" "golang.org/x/pkgsite/internal/queue/inmemqueue" + "golang.org/x/pkgsite/internal/queue/pgqueue" "golang.org/x/pkgsite/internal/source" "golang.org/x/pkgsite/internal/static" "golang.org/x/pkgsite/internal/trace" @@ -54,6 +55,7 @@ var ( "as a direct backend, bypassing the database") bypassLicenseCheck = flag.Bool("bypass_license_check", false, "display all information, even for non-redistributable paths") hostAddr = flag.String("host", "localhost:8080", "Host address for the server") + queueType = flag.String("queue", "inmemory", `queue implementation when not on GCP: "inmemory" or "postgres"`) ) func main() { @@ -127,19 +129,30 @@ func main() { } fetchQueue = q } else { - experiments, err := expg(ctx) - if err != nil { - log.Fatalf(ctx, "error getting experiment: %v", err) + processFunc := func(ctx context.Context, modulePath, version string) (int, error) { + return fetchserver.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db) } - var names []string - for _, e := range experiments { - if e.Rollout > 0 { - names = append(names, e.Name) + switch *queueType { + case "postgres": + q, err := pgqueue.New(ctx, db.Underlying()) + if err != nil { + log.Fatalf(ctx, "error creating postgres queue: %v", err) + } + go q.Poll(ctx, *workers, processFunc) + fetchQueue = q + default: + experiments, err := expg(ctx) + if err != nil { + log.Fatalf(ctx, "error getting experiment: %v", err) } + var names []string + for _, e := range experiments { + if e.Rollout > 0 { + names = append(names, e.Name) + } + } + fetchQueue = inmemqueue.New(ctx, *workers, names, processFunc) } - fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) { - return fetchserver.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db) - }) } } diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 8f178aa9..bf732bf7 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -31,6 +31,7 @@ import ( "golang.org/x/pkgsite/internal/queue" "golang.org/x/pkgsite/internal/queue/gcpqueue" "golang.org/x/pkgsite/internal/queue/inmemqueue" + "golang.org/x/pkgsite/internal/queue/pgqueue" "golang.org/x/pkgsite/internal/source" "golang.org/x/pkgsite/internal/trace" "golang.org/x/pkgsite/internal/worker" @@ -43,6 +44,7 @@ var ( // flag used in call to safehtml/template.TrustedSourceFromFlag _ = flag.String("static", "static", "path to folder containing static files served") bypassLicenseCheck = flag.Bool("bypass_license_check", false, "insert all data into the DB, even for non-redistributable paths") + queueType = flag.String("queue", "inmemory", `queue implementation when not on GCP: "inmemory" or "postgres"`) // Ordinarily, index polling is initiated by a separate scheduler that calls // /poll. But for convenience, you can instead have the worker periodically @@ -107,25 +109,36 @@ func main() { } fetchQueue = q } else { - experiments, err := expg(ctx) - if err != nil { - log.Fatalf(ctx, "error getting experiment: %v", err) - } - var names []string - for _, e := range experiments { - if e.Rollout > 0 { - names = append(names, e.Name) - } - } f := &worker.Fetcher{ ProxyClient: proxyClient, SourceClient: sourceClient, DB: db, } - fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) { + processFunc := func(ctx context.Context, modulePath, version string) (int, error) { code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel()) return code, err - }) + } + switch *queueType { + case "postgres": + q, err := pgqueue.New(ctx, db.Underlying()) + if err != nil { + log.Fatalf(ctx, "error creating postgres queue: %v", err) + } + go q.Poll(ctx, *workers, processFunc) + fetchQueue = q + default: + experiments, err := expg(ctx) + if err != nil { + log.Fatalf(ctx, "error getting experiment: %v", err) + } + var names []string + for _, e := range experiments { + if e.Rollout > 0 { + names = append(names, e.Name) + } + } + fetchQueue = inmemqueue.New(ctx, *workers, names, processFunc) + } } reporter := cmdconfig.Reporter(ctx, cfg) |
