diff options
| author | Julie Qiu <julie@golang.org> | 2020-06-01 11:45:41 -0400 |
|---|---|---|
| committer | Julie Qiu <julieqiu@google.com> | 2020-06-02 00:27:30 +0000 |
| commit | 0bbc5df5bf4a85d1c44db11afcd1e2e7c2dc52d4 (patch) | |
| tree | 580d776f8c7fcdc003895429b3901bdf918ba0a8 /internal/queue/queue.go | |
| parent | 1d5b18fc5d996d0c8e5596faeaef044bccf69220 (diff) | |
| download | go-x-pkgsite-0bbc5df5bf4a85d1c44db11afcd1e2e7c2dc52d4.tar.xz | |
cmd,internal: move taskIDChangeInterval to config
At the moment, taskIDChangeInterval is a hardcoded value in
internal/queue. However, we will soon have two task queues running,
which require different change intervals, so this value is now set in
internal/config.
Additionally, the taskIDChangeInterval for the worker is changed to 3
hours.
Change-Id: I498abefce6543005463be7da99a5a778f3a6e973
Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/758919
CI-Result: Cloud Build <devtools-proctor-result-processor@system.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Diffstat (limited to 'internal/queue/queue.go')
| -rw-r--r-- | internal/queue/queue.go | 15 |
1 files changed, 6 insertions, 9 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index b0b1fb9f..6e567aa8 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -28,7 +28,7 @@ import ( // A Queue provides an interface for asynchronous scheduling of fetch actions. type Queue interface { - ScheduleFetch(ctx context.Context, modulePath, version, suffix string) error + ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) error } // GCP provides a Queue implementation backed by the Google Cloud Tasks @@ -53,15 +53,15 @@ func NewGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) *GCP // ScheduleFetch enqueues a task on GCP to fetch the given modulePath and // version. It returns an error if there was an error hashing the task name, or // an error pushing the task to GCP. -func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (err error) { +func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (err error) { // the new taskqueue API requires a deadline of <= 30s ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q)", modulePath, version, suffix) + defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q, %d)", modulePath, version, suffix, taskIDChangeInterval) queueName := fmt.Sprintf("projects/%s/locations/%s/queues/%s", q.cfg.ProjectID, q.cfg.LocationID, q.queueID) mod := fmt.Sprintf("%s/@v/%s", modulePath, version) u := fmt.Sprintf("/fetch/" + mod) - taskID := newTaskID(modulePath, version, time.Now()) + taskID := newTaskID(modulePath, version, time.Now(), taskIDChangeInterval) req := &taskspb.CreateTaskRequest{ Parent: queueName, Task: &taskspb.Task{ @@ -93,9 +93,6 @@ func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix str return nil } -// How often the task ID for a given module and version will change. -const taskIDChangeInterval = time.Hour - // Create a task ID for the given module path and version. // Task IDs can contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-), or underscores (_). // Also include a truncated time in the hash, so it changes periodically. @@ -104,7 +101,7 @@ const taskIDChangeInterval = time.Hour // for two identical tasks to appear within that time period (for example, one at 2:59 // and the other at 3:01) -- each is part of a different taskIDChangeInterval-sized chunk // of time. But there will never be a third identical task in that interval. -func newTaskID(modulePath, version string, now time.Time) string { +func newTaskID(modulePath, version string, now time.Time, taskIDChangeInterval time.Duration) string { t := now.Truncate(taskIDChangeInterval) return fmt.Sprintf("%x", sha256.Sum256([]byte(modulePath+"@"+version+"-"+t.String()))) } @@ -174,7 +171,7 @@ func (q *InMemory) process(ctx context.Context, processFunc func(context.Context // ScheduleFetch pushes a fetch task into the local queue to be processed // asynchronously. -func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) error { +func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) error { q.queue <- moduleVersion{modulePath, version} return nil } |
