diff options
| author | Anze Kolar <me@akolar.com> | 2020-07-08 14:09:57 +0200 |
|---|---|---|
| committer | Jonathan Amsterdam <jba@google.com> | 2020-07-08 16:06:55 +0000 |
| commit | 7a5705bf50a5bacd2060e3bdb6fb72cd48e2393f (patch) | |
| tree | b594188f3be053d61cb921e79105d23fb7d71462 /internal/queue/queue.go | |
| parent | 71c954f70eeee31a7c9f0700a90573895f2e327d (diff) | |
| download | go-x-pkgsite-7a5705bf50a5bacd2060e3bdb6fb72cd48e2393f.tar.xz | |
internal/queue: refactor existing queue creators into queue.New
Add a unified function for creating new queues so that separate
implementations of newQueue in cmd/frontend and cmd/worker can be
removed. Since queue.New contains the necessary logic for deciding
between GCP and InMemory, the two could be made unexported after
updating the corresponding tests.
Fixes golang/go#40097.
Change-Id: Ie509ba39ef293cca3ff95f2ce12833339c0542ea
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/241477
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Diffstat (limited to 'internal/queue/queue.go')
| -rw-r--r-- | internal/queue/queue.go | 36 |
1 files changed, 34 insertions, 2 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index ef4810d3..0b042819 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -9,6 +9,7 @@ package queue import ( "context" "crypto/sha256" + "errors" "fmt" "time" @@ -17,6 +18,7 @@ import ( "golang.org/x/pkgsite/internal/derrors" "golang.org/x/pkgsite/internal/experiment" "golang.org/x/pkgsite/internal/log" + "golang.org/x/pkgsite/internal/postgres" taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -27,6 +29,34 @@ type Queue interface { ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) error } +// New creates a new Queue with name queueName based on the configuration +// in cfg. When running locally, Queue uses numWorkers concurrent workers. +func New(ctx context.Context, cfg *config.Config, queueName string, numWorkers int, db *postgres.DB, processFunc inMemoryProcessFunc) (Queue, error) { + if !cfg.OnAppEngine() { + experiments, err := db.GetExperiments(ctx) + if err != nil { + return nil, err + } + var names []string + for _, e := range experiments { + if e.Rollout > 0 { + names = append(names, e.Name) + } + } + return NewInMemory(ctx, numWorkers, names, processFunc), nil + } + + client, err := cloudtasks.NewClient(ctx) + if err != nil { + return nil, err + } + + if queueName == "" { + return nil, errors.New("missing queue name: queueName cannot be empty") + } + return newGCP(cfg, client, queueName), nil +} + // GCP provides a Queue implementation backed by the Google Cloud Tasks // API. type GCP struct { @@ -38,7 +68,7 @@ type GCP struct { // NewGCP returns a new Queue that can be used to enqueue tasks using the // cloud tasks API. The given queueID should be the name of the queue in the // cloud tasks console. -func NewGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) *GCP { +func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) *GCP { return &GCP{ cfg: cfg, client: client, @@ -122,10 +152,12 @@ type InMemory struct { experiments []string } +type inMemoryProcessFunc func(context.Context, string, string) (int, error) + // NewInMemory creates a new InMemory that asynchronously fetches // from proxyClient and stores in db. It uses workerCount parallelism to // execute these fetches. -func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc func(context.Context, string, string) (int, error)) *InMemory { +func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc inMemoryProcessFunc) *InMemory { q := &InMemory{ queue: make(chan moduleVersion, 1000), sem: make(chan struct{}, workerCount), |
