From 7a5705bf50a5bacd2060e3bdb6fb72cd48e2393f Mon Sep 17 00:00:00 2001 From: Anze Kolar Date: Wed, 8 Jul 2020 14:09:57 +0200 Subject: internal/queue: refactor existing queue creators into queue.New Add a unified function for creating new queues so that separate implementations of newQueue in cmd/frontend and cmd/worker can be removed. Since queue.New contains the necessary logic for deciding between GCP and InMemory, the two could be made unexported after updating the corresponding tests. Fixes golang/go#40097. Change-Id: Ie509ba39ef293cca3ff95f2ce12833339c0542ea Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/241477 Reviewed-by: Jonathan Amsterdam --- internal/queue/queue.go | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) (limited to 'internal/queue/queue.go') diff --git a/internal/queue/queue.go b/internal/queue/queue.go index ef4810d3..0b042819 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -9,6 +9,7 @@ package queue import ( "context" "crypto/sha256" + "errors" "fmt" "time" @@ -17,6 +18,7 @@ import ( "golang.org/x/pkgsite/internal/derrors" "golang.org/x/pkgsite/internal/experiment" "golang.org/x/pkgsite/internal/log" + "golang.org/x/pkgsite/internal/postgres" taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -27,6 +29,34 @@ type Queue interface { ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) error } +// New creates a new Queue with name queueName based on the configuration +// in cfg. When running locally, Queue uses numWorkers concurrent workers. +func New(ctx context.Context, cfg *config.Config, queueName string, numWorkers int, db *postgres.DB, processFunc inMemoryProcessFunc) (Queue, error) { + if !cfg.OnAppEngine() { + experiments, err := db.GetExperiments(ctx) + if err != nil { + return nil, err + } + var names []string + for _, e := range experiments { + if e.Rollout > 0 { + names = append(names, e.Name) + } + } + return NewInMemory(ctx, numWorkers, names, processFunc), nil + } + + client, err := cloudtasks.NewClient(ctx) + if err != nil { + return nil, err + } + + if queueName == "" { + return nil, errors.New("missing queue name: queueName cannot be empty") + } + return newGCP(cfg, client, queueName), nil +} + // GCP provides a Queue implementation backed by the Google Cloud Tasks // API. type GCP struct { @@ -38,7 +68,7 @@ type GCP struct { // NewGCP returns a new Queue that can be used to enqueue tasks using the // cloud tasks API. The given queueID should be the name of the queue in the // cloud tasks console. -func NewGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) *GCP { +func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) *GCP { return &GCP{ cfg: cfg, client: client, @@ -122,10 +152,12 @@ type InMemory struct { experiments []string } +type inMemoryProcessFunc func(context.Context, string, string) (int, error) + // NewInMemory creates a new InMemory that asynchronously fetches // from proxyClient and stores in db. It uses workerCount parallelism to // execute these fetches. -func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc func(context.Context, string, string) (int, error)) *InMemory { +func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc inMemoryProcessFunc) *InMemory { q := &InMemory{ queue: make(chan moduleVersion, 1000), sem: make(chan struct{}, workerCount), -- cgit v1.3-5-g9baa