diff options
| author | Jonathan Amsterdam <jba@google.com> | 2020-11-16 05:23:58 -0500 |
|---|---|---|
| committer | Jonathan Amsterdam <jba@google.com> | 2020-11-16 18:45:07 +0000 |
| commit | 9695d5bffab0a61a8a77f6d0f0fb0fa7e9fa96a4 (patch) | |
| tree | 3c944ec2d484e4c4bc60c9e097ebc12a015c9a0d /internal/queue/queue.go | |
| parent | c2aa9709359b9357918d10c99ddcb1fd865c675d (diff) | |
| download | go-x-pkgsite-9695d5bffab0a61a8a77f6d0f0fb0fa7e9fa96a4.tar.xz | |
internal/queue: use time-independent task IDs
The task names we generated for Cloud Tasks included time information,
so that we could control how freqeuently tasks were de-duped. We've
been using a 3-hour window, meaning that if a module's task finished
or was deleted, another task for that module couldn't be added to the
queue (would be de-duped) for 3 hours.
This scheme has the unintended consequence that if a module is still
on the queue after 3 hours, then another task for the same module
could be enqueued (since its task ID would be different). That means
that if the queue ever gets 3 hours behind, it could get filled with
duplicate modules. For example, this morning the queue had about 25,000
tasks, of which about 7,000 were duplicates.
This CL sets a task's name to a function of the module path and
version. That will prevent a task on the queue from being duplicated.
A repeat task can be added about 1 hour after the previous task
finishes or is deleted (see the Task De-duplication section of
https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#task),
which is fine. It is still possible to add a suffix to the task
name to requeue sooner.
Change-Id: I34ffce5ea67b9e00b88ca4cf38182e34a6ba8657
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/270337
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Julie Qiu <julie@golang.org>
Diffstat (limited to 'internal/queue/queue.go')
| -rw-r--r-- | internal/queue/queue.go | 55 |
1 files changed, 38 insertions, 17 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 229631f5..93e141c9 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -8,9 +8,12 @@ package queue import ( "context" - "crypto/sha256" "errors" "fmt" + "hash/fnv" + "io" + "math" + "strings" "time" cloudtasks "cloud.google.com/go/cloudtasks/apiv2" @@ -27,7 +30,7 @@ import ( // A Queue provides an interface for asynchronous scheduling of fetch actions. type Queue interface { - ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (bool, error) + ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (bool, error) } // New creates a new Queue with name queueName based on the configuration @@ -121,13 +124,13 @@ func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) (_ *G // ScheduleFetch enqueues a task on GCP to fetch the given modulePath and // version. It returns an error if there was an error hashing the task name, or // an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil). -func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (enqueued bool, err error) { +func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (enqueued bool, err error) { // the new taskqueue API requires a deadline of <= 30s ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q, %d)", modulePath, version, suffix, taskIDChangeInterval) + defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q)", modulePath, version, suffix) - req := q.newTaskRequest(modulePath, version, suffix, taskIDChangeInterval) + req := q.newTaskRequest(modulePath, version, suffix) enqueued = true if _, err := q.client.CreateTask(ctx, req); err != nil { if status.Code(err) == codes.AlreadyExists { @@ -144,8 +147,8 @@ func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix str // See https://cloud.google.com/tasks/docs/creating-http-target-tasks. const maxCloudTasksTimeout = 30 * time.Minute -func (q *GCP) newTaskRequest(modulePath, version, suffix string, taskIDChangeInterval time.Duration) *taskspb.CreateTaskRequest { - taskID := newTaskID(modulePath, version, time.Now(), taskIDChangeInterval) +func (q *GCP) newTaskRequest(modulePath, version, suffix string) *taskspb.CreateTaskRequest { + taskID := newTaskID(modulePath, version) relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version) task := &taskspb.Task{ Name: fmt.Sprintf("%s/tasks/%s", q.queueName, taskID), @@ -184,15 +187,33 @@ func (q *GCP) newTaskRequest(modulePath, version, suffix string, taskIDChangeInt // Create a task ID for the given module path and version. // Task IDs can contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-), or underscores (_). -// Also include a truncated time in the hash, so it changes periodically. -// -// Since we truncate the time to the nearest taskIDChangeInterval, it's still possible -// for two identical tasks to appear within that time period (for example, one at 2:59 -// and the other at 3:01) -- each is part of a different taskIDChangeInterval-sized chunk -// of time. But there will never be a third identical task in that interval. -func newTaskID(modulePath, version string, now time.Time, taskIDChangeInterval time.Duration) string { - t := now.Truncate(taskIDChangeInterval) - return fmt.Sprintf("%x", sha256.Sum256([]byte(modulePath+"@"+version+"-"+t.String()))) +func newTaskID(modulePath, version string) string { + mv := modulePath + "@" + version + // Compute a hash to use as a prefix, so the task IDs are distributed uniformly. + // See https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#task + // under "Task De-duplication". + hasher := fnv.New32() + io.WriteString(hasher, mv) + hash := hasher.Sum32() % math.MaxUint16 + // Escape the name so it contains only valid characters. Do our best to make it readable. + var b strings.Builder + for _, r := range mv { + switch { + case r >= 'A' && r <= 'Z' || r >= 'a' && r <= 'z' || r >= '0' && r <= '9' || r == '-': + b.WriteRune(r) + case r == '_': + b.WriteString("__") + case r == '/': + b.WriteString("_-") + case r == '@': + b.WriteString("_v") + case r == '.': + b.WriteString("_o") + default: + fmt.Fprintf(&b, "_%04x", r) + } + } + return fmt.Sprintf("%04x-%s", hash, &b) } type moduleVersion struct { @@ -251,7 +272,7 @@ func NewInMemory(ctx context.Context, workerCount int, experiments []string, pro // ScheduleFetch pushes a fetch task into the local queue to be processed // asynchronously. -func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (bool, error) { +func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (bool, error) { q.queue <- moduleVersion{modulePath, version} return true, nil } |
