diff options
| author | Michael Matloob <matloob@golang.org> | 2023-07-18 15:09:40 -0400 |
|---|---|---|
| committer | Michael Matloob <matloob@golang.org> | 2023-07-19 19:28:32 +0000 |
| commit | 96b17712a822c1eded85e478adfa1244b1990d8e (patch) | |
| tree | fee2394061147b0110636f2a5b4a9ba61a4847e4 /internal/queue/queue.go | |
| parent | 64dbd74b14032a5f58252a47fd85377381f3610f (diff) | |
| download | go-x-pkgsite-96b17712a822c1eded85e478adfa1244b1990d8e.tar.xz | |
internal/queue: split out gcp queue into different package
internal/frontend depends on internal/queue. Move the parts with gcp
dependencies into its own package so internal/frontend no longer has
those gcp queue dependencies.
For #61399
Change-Id: I0083c31da1b367f135e53686e4c994d7c4d6420f
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/510815
TryBot-Result: Gopher Robot <gobot@golang.org>
kokoro-CI: kokoro <noreply+kokoro@google.com>
Run-TryBot: Michael Matloob <matloob@golang.org>
Reviewed-by: Jamal Carvalho <jamal@golang.org>
Diffstat (limited to 'internal/queue/queue.go')
| -rw-r--r-- | internal/queue/queue.go | 200 |
1 files changed, 3 insertions, 197 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 6f64a4d1..00b580c7 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -2,31 +2,18 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Package queue provides queue implementations that can be used for +// Package queue provides a queue interface that can be used for // asynchronous scheduling of fetch actions. package queue import ( "context" - "errors" "fmt" - "hash/fnv" - "io" - "math" - "strings" "time" - cloudtasks "cloud.google.com/go/cloudtasks/apiv2" "golang.org/x/pkgsite/internal" - "golang.org/x/pkgsite/internal/config" - "golang.org/x/pkgsite/internal/derrors" "golang.org/x/pkgsite/internal/experiment" "golang.org/x/pkgsite/internal/log" - "golang.org/x/pkgsite/internal/middleware" - taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/durationpb" ) // A Queue provides an interface for asynchronous scheduling of fetch actions. @@ -34,115 +21,6 @@ type Queue interface { ScheduleFetch(ctx context.Context, modulePath, version string, opts *Options) (bool, error) } -// New creates a new Queue with name queueName based on the configuration -// in cfg. When running locally, Queue uses numWorkers concurrent workers. -func New(ctx context.Context, cfg *config.Config, queueName string, numWorkers int, expGetter middleware.ExperimentGetter, processFunc inMemoryProcessFunc) (Queue, error) { - if !cfg.OnGCP() { - experiments, err := expGetter(ctx) - if err != nil { - return nil, err - } - var names []string - for _, e := range experiments { - if e.Rollout > 0 { - names = append(names, e.Name) - } - } - return NewInMemory(ctx, numWorkers, names, processFunc), nil - } - - client, err := cloudtasks.NewClient(ctx) - if err != nil { - return nil, err - } - g, err := newGCP(cfg, client, queueName) - if err != nil { - return nil, err - } - log.Infof(ctx, "enqueuing at %s with queueURL=%q", g.queueName, g.queueURL) - return g, nil -} - -// GCP provides a Queue implementation backed by the Google Cloud Tasks -// API. -type GCP struct { - client *cloudtasks.Client - queueName string // full GCP name of the queue - queueURL string // non-AppEngine URL to post tasks to - // token holds information that lets the task queue construct an authorized request to the worker. - // Since the worker sits behind the IAP, the queue needs an identity token that includes the - // identity of a service account that has access, and the client ID for the IAP. - // We use the service account of the current process. - token *taskspb.HttpRequest_OidcToken -} - -// newGCP returns a new Queue that can be used to enqueue tasks using the -// cloud tasks API. The given queueID should be the name of the queue in the -// cloud tasks console. -func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) (_ *GCP, err error) { - defer derrors.Wrap(&err, "newGCP(cfg, client, %q)", queueID) - if queueID == "" { - return nil, errors.New("empty queueID") - } - if cfg.ProjectID == "" { - return nil, errors.New("empty ProjectID") - } - if cfg.LocationID == "" { - return nil, errors.New("empty LocationID") - } - if cfg.QueueURL == "" { - return nil, errors.New("empty QueueURL") - } - if cfg.ServiceAccount == "" { - return nil, errors.New("empty ServiceAccount") - } - if cfg.QueueAudience == "" { - return nil, errors.New("empty QueueAudience") - } - return &GCP{ - client: client, - queueName: fmt.Sprintf("projects/%s/locations/%s/queues/%s", cfg.ProjectID, cfg.LocationID, queueID), - queueURL: cfg.QueueURL, - token: &taskspb.HttpRequest_OidcToken{ - OidcToken: &taskspb.OidcToken{ - ServiceAccountEmail: cfg.ServiceAccount, - Audience: cfg.QueueAudience, - }, - }, - }, nil -} - -// ScheduleFetch enqueues a task on GCP to fetch the given modulePath and -// version. It returns an error if there was an error hashing the task name, or -// an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil). -func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version string, opts *Options) (enqueued bool, err error) { - defer derrors.WrapStack(&err, "queue.ScheduleFetch(%q, %q, %v)", modulePath, version, opts) - if opts == nil { - opts = &Options{} - } - // Cloud Tasks enforces an RPC timeout of at most 30s. I couldn't find this - // in the documentation, but using a larger value, or no timeout, results in - // an InvalidArgument error with the text "The deadline cannot be more than - // 30s in the future." - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - - if modulePath == internal.UnknownModulePath { - return false, errors.New("given unknown module path") - } - req := q.newTaskRequest(modulePath, version, opts) - enqueued = true - if _, err := q.client.CreateTask(ctx, req); err != nil { - if status.Code(err) == codes.AlreadyExists { - log.Debugf(ctx, "ignoring duplicate task ID %s: %s@%s", req.Task.Name, modulePath, version) - enqueued = false - } else { - return false, fmt.Errorf("q.client.CreateTask(ctx, req): %v", err) - } - } - return enqueued, nil -} - // Options is used to provide option arguments for a task queue. type Options struct { // DisableProxyFetch reports whether proxyfetch should be set to off when @@ -158,10 +36,6 @@ type Options struct { Source string } -// Maximum timeout for HTTP tasks. -// See https://cloud.google.com/tasks/docs/creating-http-target-tasks. -const maxCloudTasksTimeout = 30 * time.Minute - const ( DisableProxyFetchParam = "proxyfetch" DisableProxyFetchValue = "off" @@ -170,74 +44,6 @@ const ( SourceWorkerValue = "worker" ) -func (q *GCP) newTaskRequest(modulePath, version string, opts *Options) *taskspb.CreateTaskRequest { - taskID := newTaskID(modulePath, version) - relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version) - var params []string - if opts.Source != "" { - params = append(params, fmt.Sprintf("%s=%s", SourceParam, opts.Source)) - } - if opts.DisableProxyFetch { - params = append(params, fmt.Sprintf("%s=%s", DisableProxyFetchParam, DisableProxyFetchValue)) - } - if len(params) > 0 { - relativeURI += fmt.Sprintf("?%s", strings.Join(params, "&")) - } - - task := &taskspb.Task{ - Name: fmt.Sprintf("%s/tasks/%s", q.queueName, taskID), - DispatchDeadline: durationpb.New(maxCloudTasksTimeout), - } - task.MessageType = &taskspb.Task_HttpRequest{ - HttpRequest: &taskspb.HttpRequest{ - HttpMethod: taskspb.HttpMethod_POST, - Url: q.queueURL + relativeURI, - AuthorizationHeader: q.token, - }, - } - req := &taskspb.CreateTaskRequest{ - Parent: q.queueName, - Task: task, - } - // If suffix is non-empty, append it to the task name. This lets us force reprocessing - // of tasks that would normally be de-duplicated. - if opts.Suffix != "" { - req.Task.Name += "-" + opts.Suffix - } - return req -} - -// Create a task ID for the given module path and version. -// Task IDs can contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-), or underscores (_). -func newTaskID(modulePath, version string) string { - mv := modulePath + "@" + version - // Compute a hash to use as a prefix, so the task IDs are distributed uniformly. - // See https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#task - // under "Task De-duplication". - hasher := fnv.New32() - io.WriteString(hasher, mv) - hash := hasher.Sum32() % math.MaxUint16 - // Escape the name so it contains only valid characters. Do our best to make it readable. - var b strings.Builder - for _, r := range mv { - switch { - case r >= 'A' && r <= 'Z' || r >= 'a' && r <= 'z' || r >= '0' && r <= '9' || r == '-': - b.WriteRune(r) - case r == '_': - b.WriteString("__") - case r == '/': - b.WriteString("_-") - case r == '@': - b.WriteString("_v") - case r == '.': - b.WriteString("_o") - default: - fmt.Fprintf(&b, "_%04x", r) - } - } - return fmt.Sprintf("%04x-%s", hash, &b) -} - // InMemory is a Queue implementation that schedules in-process fetch // operations. Unlike the GCP task queue, it will not automatically retry tasks // on failure. @@ -249,12 +55,12 @@ type InMemory struct { experiments []string } -type inMemoryProcessFunc func(context.Context, string, string) (int, error) +type InMemoryProcessFunc func(context.Context, string, string) (int, error) // NewInMemory creates a new InMemory that asynchronously fetches // from proxyClient and stores in db. It uses workerCount parallelism to // execute these fetches. -func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc inMemoryProcessFunc) *InMemory { +func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory { q := &InMemory{ queue: make(chan internal.Modver, 1000), experiments: experiments, |
