aboutsummaryrefslogtreecommitdiff
path: root/internal/queue/queue.go
diff options
context:
space:
mode:
authorAnze Kolar <me@akolar.com>2020-07-08 14:09:57 +0200
committerJonathan Amsterdam <jba@google.com>2020-07-08 16:06:55 +0000
commit7a5705bf50a5bacd2060e3bdb6fb72cd48e2393f (patch)
treeb594188f3be053d61cb921e79105d23fb7d71462 /internal/queue/queue.go
parent71c954f70eeee31a7c9f0700a90573895f2e327d (diff)
downloadgo-x-pkgsite-7a5705bf50a5bacd2060e3bdb6fb72cd48e2393f.tar.xz
internal/queue: refactor existing queue creators into queue.New
Add a unified function for creating new queues so that separate implementations of newQueue in cmd/frontend and cmd/worker can be removed. Since queue.New contains the necessary logic for deciding between GCP and InMemory, the two could be made unexported after updating the corresponding tests. Fixes golang/go#40097. Change-Id: Ie509ba39ef293cca3ff95f2ce12833339c0542ea Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/241477 Reviewed-by: Jonathan Amsterdam <jba@google.com>
Diffstat (limited to 'internal/queue/queue.go')
-rw-r--r--internal/queue/queue.go36
1 files changed, 34 insertions, 2 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index ef4810d3..0b042819 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -9,6 +9,7 @@ package queue
import (
"context"
"crypto/sha256"
+ "errors"
"fmt"
"time"
@@ -17,6 +18,7 @@ import (
"golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/experiment"
"golang.org/x/pkgsite/internal/log"
+ "golang.org/x/pkgsite/internal/postgres"
taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -27,6 +29,34 @@ type Queue interface {
ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) error
}
+// New creates a new Queue with name queueName based on the configuration
+// in cfg. When running locally, Queue uses numWorkers concurrent workers.
+func New(ctx context.Context, cfg *config.Config, queueName string, numWorkers int, db *postgres.DB, processFunc inMemoryProcessFunc) (Queue, error) {
+ if !cfg.OnAppEngine() {
+ experiments, err := db.GetExperiments(ctx)
+ if err != nil {
+ return nil, err
+ }
+ var names []string
+ for _, e := range experiments {
+ if e.Rollout > 0 {
+ names = append(names, e.Name)
+ }
+ }
+ return NewInMemory(ctx, numWorkers, names, processFunc), nil
+ }
+
+ client, err := cloudtasks.NewClient(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ if queueName == "" {
+ return nil, errors.New("missing queue name: queueName cannot be empty")
+ }
+ return newGCP(cfg, client, queueName), nil
+}
+
// GCP provides a Queue implementation backed by the Google Cloud Tasks
// API.
type GCP struct {
@@ -38,7 +68,7 @@ type GCP struct {
// NewGCP returns a new Queue that can be used to enqueue tasks using the
// cloud tasks API. The given queueID should be the name of the queue in the
// cloud tasks console.
-func NewGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) *GCP {
+func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) *GCP {
return &GCP{
cfg: cfg,
client: client,
@@ -122,10 +152,12 @@ type InMemory struct {
experiments []string
}
+type inMemoryProcessFunc func(context.Context, string, string) (int, error)
+
// NewInMemory creates a new InMemory that asynchronously fetches
// from proxyClient and stores in db. It uses workerCount parallelism to
// execute these fetches.
-func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc func(context.Context, string, string) (int, error)) *InMemory {
+func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc inMemoryProcessFunc) *InMemory {
q := &InMemory{
queue: make(chan moduleVersion, 1000),
sem: make(chan struct{}, workerCount),