diff options
| author | Julie Qiu <julie@golang.org> | 2020-02-19 16:15:04 -0500 |
|---|---|---|
| committer | Julie Qiu <julie@golang.org> | 2020-04-06 15:50:52 -0400 |
| commit | c0dfa6a74dcef191ce8ae99635a7cfdcc4c4ef83 (patch) | |
| tree | d34f4e607e258cfb6d7e75dd4ad87ca9e87a03b3 /internal/queue | |
| parent | f2324fa41381e39dd9f6c88786122d2236e483ea (diff) | |
| download | go-x-pkgsite-c0dfa6a74dcef191ce8ae99635a7cfdcc4c4ef83.tar.xz | |
internal/queue: create package
The code for creating a new package is moved to internal/queue. This
allows for adding items to a task queue from the frontend, without
importing the entire internal/etl package.
No logic is changed, except the signature of
queue.InMemoryQueue.process, so that etl.FetchAndUpdateState can be
passed in, and the entire internal/etl package does not need to be
imported by internal/queue, which would lead to a circular import.
Updates b/135954292
Change-Id: I33a6418c73d85e15c4aa5593d633e19eb7e4eb1b
Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/671014
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Diffstat (limited to 'internal/queue')
| -rw-r--r-- | internal/queue/queue.go | 181 | ||||
| -rw-r--r-- | internal/queue/queue_test.go | 30 |
2 files changed, 211 insertions, 0 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go new file mode 100644 index 00000000..c4b6a9ac --- /dev/null +++ b/internal/queue/queue.go @@ -0,0 +1,181 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package queue provides queue implementations that can be used for +// asynchronous scheduling of fetch actions. +package queue + +import ( + "context" + "crypto/sha256" + "fmt" + "os" + "time" + + cloudtasks "cloud.google.com/go/cloudtasks/apiv2" + "golang.org/x/discovery/internal/config" + "golang.org/x/discovery/internal/log" + "golang.org/x/discovery/internal/postgres" + "golang.org/x/discovery/internal/proxy" + taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// A Queue provides an interface for asynchronous scheduling of fetch actions. +type Queue interface { + ScheduleFetch(ctx context.Context, modulePath, version, suffix string) error +} + +// GCP provides a Queue implementation backed by the Google Cloud Tasks +// API. +type GCP struct { + client *cloudtasks.Client + queueID string +} + +// NewGCP returns a new Queue that can be used to enqueue tasks using the +// cloud tasks API. The given queueID should be the name of the queue in the +// cloud tasks console. +func NewGCP(client *cloudtasks.Client, queueID string) *GCP { + return &GCP{ + client: client, + queueID: queueID, + } +} + +// ScheduleFetch enqueues a task on GCP to fetch the given modulePath and +// version. It returns an error if there was an error hashing the task name, or +// an error pushing the task to GCP. +func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) error { + // the new taskqueue API requires a deadline of <= 30s + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + queueName := fmt.Sprintf("projects/%s/locations/%s/queues/%s", config.ProjectID(), config.LocationID(), q.queueID) + u := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version) + taskID := newTaskID(modulePath, version, time.Now()) + req := &taskspb.CreateTaskRequest{ + Parent: queueName, + Task: &taskspb.Task{ + Name: fmt.Sprintf("%s/tasks/%s", queueName, taskID), + MessageType: &taskspb.Task_AppEngineHttpRequest{ + AppEngineHttpRequest: &taskspb.AppEngineHttpRequest{ + HttpMethod: taskspb.HttpMethod_POST, + RelativeUri: u, + AppEngineRouting: &taskspb.AppEngineRouting{ + Service: os.Getenv("GAE_SERVICE"), + }, + }, + }, + }, + } + // If suffix is non-empty, append it to the task name. This lets us force reprocessing + // of tasks that would normally be de-duplicated. + if suffix != "" { + req.Task.Name += "-" + suffix + } + + if _, err := q.client.CreateTask(ctx, req); err != nil { + if status.Code(err) == codes.AlreadyExists { + log.Infof(ctx, "ignoring duplicate task ID %s", taskID) + } else { + return fmt.Errorf("q.client.CreateTask(ctx, req): %v", err) + } + } + return nil +} + +// How often the task ID for a given module and version will change. +const taskIDChangeInterval = 3 * time.Hour + +// Create a task ID for the given module path and version. +// Task IDs can contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-), or underscores (_). +// Also include a truncated time in the hash, so it changes periodically. +// +// Since we truncate the time to the nearest taskIDChangeInterval, it's still possible +// for two identical tasks to appear within that time period (for example, one at 2:59 +// and the other at 3:01) -- each is part of a different taskIDChangeInterval-sized chunk +// of time. But there will never be a third identical task in that interval. +func newTaskID(modulePath, version string, now time.Time) string { + t := now.Truncate(taskIDChangeInterval) + return fmt.Sprintf("%x", sha256.Sum256([]byte(modulePath+"@"+version+"-"+t.String()))) +} + +type moduleVersion struct { + modulePath, version string +} + +// InMemory is a Queue implementation that schedules in-process fetch +// operations. Unlike the GCP task queue, it will not automatically retry tasks +// on failure. +// +// This should only be used for local development. +type InMemory struct { + proxyClient *proxy.Client + db *postgres.DB + + queue chan moduleVersion + sem chan struct{} +} + +// NewInMemory creates a new InMemory that asynchronously fetches +// from proxyClient and stores in db. It uses workerCount parallelism to +// execute these fetches. +func NewInMemory(ctx context.Context, proxyClient *proxy.Client, db *postgres.DB, workerCount int, + processFunc func(context.Context, string, string, *proxy.Client, *postgres.DB) (int, error)) *InMemory { + q := &InMemory{ + proxyClient: proxyClient, + db: db, + queue: make(chan moduleVersion, 1000), + sem: make(chan struct{}, workerCount), + } + go q.process(ctx, processFunc) + return q +} + +func (q *InMemory) process(ctx context.Context, processFunc func(context.Context, string, string, *proxy.Client, *postgres.DB) (int, error)) { + + for v := range q.queue { + select { + case <-ctx.Done(): + return + case q.sem <- struct{}{}: + } + + // If a worker is available, make a request to the fetch service inside a + // goroutine and wait for it to finish. + go func(v moduleVersion) { + defer func() { <-q.sem }() + + log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(q.sem)) + + fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + if _, err := processFunc(fetchCtx, v.modulePath, v.version, q.proxyClient, q.db); err != nil { + log.Error(fetchCtx, err) + } + }(v) + } +} + +// ScheduleFetch pushes a fetch task into the local queue to be processed +// asynchronously. +func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) error { + q.queue <- moduleVersion{modulePath, version} + return nil +} + +// WaitForTesting waits for all queued requests to finish. It should only be +// used by test code. +func (q InMemory) WaitForTesting(ctx context.Context) { + for i := 0; i < cap(q.sem); i++ { + select { + case <-ctx.Done(): + return + case q.sem <- struct{}{}: + } + } + close(q.queue) +} diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go new file mode 100644 index 00000000..3933325b --- /dev/null +++ b/internal/queue/queue_test.go @@ -0,0 +1,30 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "testing" + "time" +) + +func TestNewTaskID(t *testing.T) { + // Verify that the task ID is the same within taskIDChangeInterval and changes + // afterwards. + const ( + module = "mod" + version = "ver" + ) + + tm := time.Now().Truncate(taskIDChangeInterval) + id1 := newTaskID(module, version, tm) + id2 := newTaskID(module, version, tm.Add(taskIDChangeInterval/2)) + if id1 != id2 { + t.Error("wanted same task ID, got different") + } + id3 := newTaskID(module, version, tm.Add(taskIDChangeInterval+1)) + if id1 == id3 { + t.Error("wanted different task ID, got same") + } +} |
