aboutsummaryrefslogtreecommitdiff
path: root/internal/queue/queue.go
diff options
context:
space:
mode:
authorJonathan Amsterdam <jba@google.com>2020-11-16 05:23:58 -0500
committerJonathan Amsterdam <jba@google.com>2020-11-16 18:45:07 +0000
commit9695d5bffab0a61a8a77f6d0f0fb0fa7e9fa96a4 (patch)
tree3c944ec2d484e4c4bc60c9e097ebc12a015c9a0d /internal/queue/queue.go
parentc2aa9709359b9357918d10c99ddcb1fd865c675d (diff)
downloadgo-x-pkgsite-9695d5bffab0a61a8a77f6d0f0fb0fa7e9fa96a4.tar.xz
internal/queue: use time-independent task IDs
The task names we generated for Cloud Tasks included time information, so that we could control how freqeuently tasks were de-duped. We've been using a 3-hour window, meaning that if a module's task finished or was deleted, another task for that module couldn't be added to the queue (would be de-duped) for 3 hours. This scheme has the unintended consequence that if a module is still on the queue after 3 hours, then another task for the same module could be enqueued (since its task ID would be different). That means that if the queue ever gets 3 hours behind, it could get filled with duplicate modules. For example, this morning the queue had about 25,000 tasks, of which about 7,000 were duplicates. This CL sets a task's name to a function of the module path and version. That will prevent a task on the queue from being duplicated. A repeat task can be added about 1 hour after the previous task finishes or is deleted (see the Task De-duplication section of https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#task), which is fine. It is still possible to add a suffix to the task name to requeue sooner. Change-Id: I34ffce5ea67b9e00b88ca4cf38182e34a6ba8657 Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/270337 Trust: Jonathan Amsterdam <jba@google.com> Run-TryBot: Jonathan Amsterdam <jba@google.com> TryBot-Result: kokoro <noreply+kokoro@google.com> Reviewed-by: Julie Qiu <julie@golang.org>
Diffstat (limited to 'internal/queue/queue.go')
-rw-r--r--internal/queue/queue.go55
1 files changed, 38 insertions, 17 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 229631f5..93e141c9 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -8,9 +8,12 @@ package queue
import (
"context"
- "crypto/sha256"
"errors"
"fmt"
+ "hash/fnv"
+ "io"
+ "math"
+ "strings"
"time"
cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
@@ -27,7 +30,7 @@ import (
// A Queue provides an interface for asynchronous scheduling of fetch actions.
type Queue interface {
- ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (bool, error)
+ ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (bool, error)
}
// New creates a new Queue with name queueName based on the configuration
@@ -121,13 +124,13 @@ func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) (_ *G
// ScheduleFetch enqueues a task on GCP to fetch the given modulePath and
// version. It returns an error if there was an error hashing the task name, or
// an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil).
-func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (enqueued bool, err error) {
+func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (enqueued bool, err error) {
// the new taskqueue API requires a deadline of <= 30s
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
- defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q, %d)", modulePath, version, suffix, taskIDChangeInterval)
+ defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q)", modulePath, version, suffix)
- req := q.newTaskRequest(modulePath, version, suffix, taskIDChangeInterval)
+ req := q.newTaskRequest(modulePath, version, suffix)
enqueued = true
if _, err := q.client.CreateTask(ctx, req); err != nil {
if status.Code(err) == codes.AlreadyExists {
@@ -144,8 +147,8 @@ func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix str
// See https://cloud.google.com/tasks/docs/creating-http-target-tasks.
const maxCloudTasksTimeout = 30 * time.Minute
-func (q *GCP) newTaskRequest(modulePath, version, suffix string, taskIDChangeInterval time.Duration) *taskspb.CreateTaskRequest {
- taskID := newTaskID(modulePath, version, time.Now(), taskIDChangeInterval)
+func (q *GCP) newTaskRequest(modulePath, version, suffix string) *taskspb.CreateTaskRequest {
+ taskID := newTaskID(modulePath, version)
relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version)
task := &taskspb.Task{
Name: fmt.Sprintf("%s/tasks/%s", q.queueName, taskID),
@@ -184,15 +187,33 @@ func (q *GCP) newTaskRequest(modulePath, version, suffix string, taskIDChangeInt
// Create a task ID for the given module path and version.
// Task IDs can contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-), or underscores (_).
-// Also include a truncated time in the hash, so it changes periodically.
-//
-// Since we truncate the time to the nearest taskIDChangeInterval, it's still possible
-// for two identical tasks to appear within that time period (for example, one at 2:59
-// and the other at 3:01) -- each is part of a different taskIDChangeInterval-sized chunk
-// of time. But there will never be a third identical task in that interval.
-func newTaskID(modulePath, version string, now time.Time, taskIDChangeInterval time.Duration) string {
- t := now.Truncate(taskIDChangeInterval)
- return fmt.Sprintf("%x", sha256.Sum256([]byte(modulePath+"@"+version+"-"+t.String())))
+func newTaskID(modulePath, version string) string {
+ mv := modulePath + "@" + version
+ // Compute a hash to use as a prefix, so the task IDs are distributed uniformly.
+ // See https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#task
+ // under "Task De-duplication".
+ hasher := fnv.New32()
+ io.WriteString(hasher, mv)
+ hash := hasher.Sum32() % math.MaxUint16
+ // Escape the name so it contains only valid characters. Do our best to make it readable.
+ var b strings.Builder
+ for _, r := range mv {
+ switch {
+ case r >= 'A' && r <= 'Z' || r >= 'a' && r <= 'z' || r >= '0' && r <= '9' || r == '-':
+ b.WriteRune(r)
+ case r == '_':
+ b.WriteString("__")
+ case r == '/':
+ b.WriteString("_-")
+ case r == '@':
+ b.WriteString("_v")
+ case r == '.':
+ b.WriteString("_o")
+ default:
+ fmt.Fprintf(&b, "_%04x", r)
+ }
+ }
+ return fmt.Sprintf("%04x-%s", hash, &b)
}
type moduleVersion struct {
@@ -251,7 +272,7 @@ func NewInMemory(ctx context.Context, workerCount int, experiments []string, pro
// ScheduleFetch pushes a fetch task into the local queue to be processed
// asynchronously.
-func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (bool, error) {
+func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (bool, error) {
q.queue <- moduleVersion{modulePath, version}
return true, nil
}