aboutsummaryrefslogtreecommitdiff
path: root/internal/queue/queue.go
diff options
context:
space:
mode:
authorMichael Matloob <matloob@golang.org>2023-07-18 15:09:40 -0400
committerMichael Matloob <matloob@golang.org>2023-07-19 19:28:32 +0000
commit96b17712a822c1eded85e478adfa1244b1990d8e (patch)
treefee2394061147b0110636f2a5b4a9ba61a4847e4 /internal/queue/queue.go
parent64dbd74b14032a5f58252a47fd85377381f3610f (diff)
downloadgo-x-pkgsite-96b17712a822c1eded85e478adfa1244b1990d8e.tar.xz
internal/queue: split out gcp queue into different package
internal/frontend depends on internal/queue. Move the parts with gcp dependencies into its own package so internal/frontend no longer has those gcp queue dependencies. For #61399 Change-Id: I0083c31da1b367f135e53686e4c994d7c4d6420f Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/510815 TryBot-Result: Gopher Robot <gobot@golang.org> kokoro-CI: kokoro <noreply+kokoro@google.com> Run-TryBot: Michael Matloob <matloob@golang.org> Reviewed-by: Jamal Carvalho <jamal@golang.org>
Diffstat (limited to 'internal/queue/queue.go')
-rw-r--r--internal/queue/queue.go200
1 files changed, 3 insertions, 197 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 6f64a4d1..00b580c7 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -2,31 +2,18 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-// Package queue provides queue implementations that can be used for
+// Package queue provides a queue interface that can be used for
// asynchronous scheduling of fetch actions.
package queue
import (
"context"
- "errors"
"fmt"
- "hash/fnv"
- "io"
- "math"
- "strings"
"time"
- cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
"golang.org/x/pkgsite/internal"
- "golang.org/x/pkgsite/internal/config"
- "golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/experiment"
"golang.org/x/pkgsite/internal/log"
- "golang.org/x/pkgsite/internal/middleware"
- taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "google.golang.org/protobuf/types/known/durationpb"
)
// A Queue provides an interface for asynchronous scheduling of fetch actions.
@@ -34,115 +21,6 @@ type Queue interface {
ScheduleFetch(ctx context.Context, modulePath, version string, opts *Options) (bool, error)
}
-// New creates a new Queue with name queueName based on the configuration
-// in cfg. When running locally, Queue uses numWorkers concurrent workers.
-func New(ctx context.Context, cfg *config.Config, queueName string, numWorkers int, expGetter middleware.ExperimentGetter, processFunc inMemoryProcessFunc) (Queue, error) {
- if !cfg.OnGCP() {
- experiments, err := expGetter(ctx)
- if err != nil {
- return nil, err
- }
- var names []string
- for _, e := range experiments {
- if e.Rollout > 0 {
- names = append(names, e.Name)
- }
- }
- return NewInMemory(ctx, numWorkers, names, processFunc), nil
- }
-
- client, err := cloudtasks.NewClient(ctx)
- if err != nil {
- return nil, err
- }
- g, err := newGCP(cfg, client, queueName)
- if err != nil {
- return nil, err
- }
- log.Infof(ctx, "enqueuing at %s with queueURL=%q", g.queueName, g.queueURL)
- return g, nil
-}
-
-// GCP provides a Queue implementation backed by the Google Cloud Tasks
-// API.
-type GCP struct {
- client *cloudtasks.Client
- queueName string // full GCP name of the queue
- queueURL string // non-AppEngine URL to post tasks to
- // token holds information that lets the task queue construct an authorized request to the worker.
- // Since the worker sits behind the IAP, the queue needs an identity token that includes the
- // identity of a service account that has access, and the client ID for the IAP.
- // We use the service account of the current process.
- token *taskspb.HttpRequest_OidcToken
-}
-
-// newGCP returns a new Queue that can be used to enqueue tasks using the
-// cloud tasks API. The given queueID should be the name of the queue in the
-// cloud tasks console.
-func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) (_ *GCP, err error) {
- defer derrors.Wrap(&err, "newGCP(cfg, client, %q)", queueID)
- if queueID == "" {
- return nil, errors.New("empty queueID")
- }
- if cfg.ProjectID == "" {
- return nil, errors.New("empty ProjectID")
- }
- if cfg.LocationID == "" {
- return nil, errors.New("empty LocationID")
- }
- if cfg.QueueURL == "" {
- return nil, errors.New("empty QueueURL")
- }
- if cfg.ServiceAccount == "" {
- return nil, errors.New("empty ServiceAccount")
- }
- if cfg.QueueAudience == "" {
- return nil, errors.New("empty QueueAudience")
- }
- return &GCP{
- client: client,
- queueName: fmt.Sprintf("projects/%s/locations/%s/queues/%s", cfg.ProjectID, cfg.LocationID, queueID),
- queueURL: cfg.QueueURL,
- token: &taskspb.HttpRequest_OidcToken{
- OidcToken: &taskspb.OidcToken{
- ServiceAccountEmail: cfg.ServiceAccount,
- Audience: cfg.QueueAudience,
- },
- },
- }, nil
-}
-
-// ScheduleFetch enqueues a task on GCP to fetch the given modulePath and
-// version. It returns an error if there was an error hashing the task name, or
-// an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil).
-func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version string, opts *Options) (enqueued bool, err error) {
- defer derrors.WrapStack(&err, "queue.ScheduleFetch(%q, %q, %v)", modulePath, version, opts)
- if opts == nil {
- opts = &Options{}
- }
- // Cloud Tasks enforces an RPC timeout of at most 30s. I couldn't find this
- // in the documentation, but using a larger value, or no timeout, results in
- // an InvalidArgument error with the text "The deadline cannot be more than
- // 30s in the future."
- ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
- defer cancel()
-
- if modulePath == internal.UnknownModulePath {
- return false, errors.New("given unknown module path")
- }
- req := q.newTaskRequest(modulePath, version, opts)
- enqueued = true
- if _, err := q.client.CreateTask(ctx, req); err != nil {
- if status.Code(err) == codes.AlreadyExists {
- log.Debugf(ctx, "ignoring duplicate task ID %s: %s@%s", req.Task.Name, modulePath, version)
- enqueued = false
- } else {
- return false, fmt.Errorf("q.client.CreateTask(ctx, req): %v", err)
- }
- }
- return enqueued, nil
-}
-
// Options is used to provide option arguments for a task queue.
type Options struct {
// DisableProxyFetch reports whether proxyfetch should be set to off when
@@ -158,10 +36,6 @@ type Options struct {
Source string
}
-// Maximum timeout for HTTP tasks.
-// See https://cloud.google.com/tasks/docs/creating-http-target-tasks.
-const maxCloudTasksTimeout = 30 * time.Minute
-
const (
DisableProxyFetchParam = "proxyfetch"
DisableProxyFetchValue = "off"
@@ -170,74 +44,6 @@ const (
SourceWorkerValue = "worker"
)
-func (q *GCP) newTaskRequest(modulePath, version string, opts *Options) *taskspb.CreateTaskRequest {
- taskID := newTaskID(modulePath, version)
- relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version)
- var params []string
- if opts.Source != "" {
- params = append(params, fmt.Sprintf("%s=%s", SourceParam, opts.Source))
- }
- if opts.DisableProxyFetch {
- params = append(params, fmt.Sprintf("%s=%s", DisableProxyFetchParam, DisableProxyFetchValue))
- }
- if len(params) > 0 {
- relativeURI += fmt.Sprintf("?%s", strings.Join(params, "&"))
- }
-
- task := &taskspb.Task{
- Name: fmt.Sprintf("%s/tasks/%s", q.queueName, taskID),
- DispatchDeadline: durationpb.New(maxCloudTasksTimeout),
- }
- task.MessageType = &taskspb.Task_HttpRequest{
- HttpRequest: &taskspb.HttpRequest{
- HttpMethod: taskspb.HttpMethod_POST,
- Url: q.queueURL + relativeURI,
- AuthorizationHeader: q.token,
- },
- }
- req := &taskspb.CreateTaskRequest{
- Parent: q.queueName,
- Task: task,
- }
- // If suffix is non-empty, append it to the task name. This lets us force reprocessing
- // of tasks that would normally be de-duplicated.
- if opts.Suffix != "" {
- req.Task.Name += "-" + opts.Suffix
- }
- return req
-}
-
-// Create a task ID for the given module path and version.
-// Task IDs can contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-), or underscores (_).
-func newTaskID(modulePath, version string) string {
- mv := modulePath + "@" + version
- // Compute a hash to use as a prefix, so the task IDs are distributed uniformly.
- // See https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#task
- // under "Task De-duplication".
- hasher := fnv.New32()
- io.WriteString(hasher, mv)
- hash := hasher.Sum32() % math.MaxUint16
- // Escape the name so it contains only valid characters. Do our best to make it readable.
- var b strings.Builder
- for _, r := range mv {
- switch {
- case r >= 'A' && r <= 'Z' || r >= 'a' && r <= 'z' || r >= '0' && r <= '9' || r == '-':
- b.WriteRune(r)
- case r == '_':
- b.WriteString("__")
- case r == '/':
- b.WriteString("_-")
- case r == '@':
- b.WriteString("_v")
- case r == '.':
- b.WriteString("_o")
- default:
- fmt.Fprintf(&b, "_%04x", r)
- }
- }
- return fmt.Sprintf("%04x-%s", hash, &b)
-}
-
// InMemory is a Queue implementation that schedules in-process fetch
// operations. Unlike the GCP task queue, it will not automatically retry tasks
// on failure.
@@ -249,12 +55,12 @@ type InMemory struct {
experiments []string
}
-type inMemoryProcessFunc func(context.Context, string, string) (int, error)
+type InMemoryProcessFunc func(context.Context, string, string) (int, error)
// NewInMemory creates a new InMemory that asynchronously fetches
// from proxyClient and stores in db. It uses workerCount parallelism to
// execute these fetches.
-func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc inMemoryProcessFunc) *InMemory {
+func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory {
q := &InMemory{
queue: make(chan internal.Modver, 1000),
experiments: experiments,