aboutsummaryrefslogtreecommitdiff
path: root/internal/queue
diff options
context:
space:
mode:
authorJulie Qiu <julie@golang.org>2020-06-01 11:45:41 -0400
committerJulie Qiu <julieqiu@google.com>2020-06-02 00:27:30 +0000
commit0bbc5df5bf4a85d1c44db11afcd1e2e7c2dc52d4 (patch)
tree580d776f8c7fcdc003895429b3901bdf918ba0a8 /internal/queue
parent1d5b18fc5d996d0c8e5596faeaef044bccf69220 (diff)
downloadgo-x-pkgsite-0bbc5df5bf4a85d1c44db11afcd1e2e7c2dc52d4.tar.xz
cmd,internal: move taskIDChangeInterval to config
At the moment, taskIDChangeInterval is a hardcoded value in internal/queue. However, we will soon have two task queues running, which require different change intervals, so this value is now set in internal/config. Additionally, the taskIDChangeInterval for the worker is changed to 3 hours. Change-Id: I498abefce6543005463be7da99a5a778f3a6e973 Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/758919 CI-Result: Cloud Build <devtools-proctor-result-processor@system.gserviceaccount.com> Reviewed-by: Jonathan Amsterdam <jba@google.com>
Diffstat (limited to 'internal/queue')
-rw-r--r--internal/queue/queue.go15
-rw-r--r--internal/queue/queue_test.go14
2 files changed, 13 insertions, 16 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index b0b1fb9f..6e567aa8 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -28,7 +28,7 @@ import (
// A Queue provides an interface for asynchronous scheduling of fetch actions.
type Queue interface {
- ScheduleFetch(ctx context.Context, modulePath, version, suffix string) error
+ ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) error
}
// GCP provides a Queue implementation backed by the Google Cloud Tasks
@@ -53,15 +53,15 @@ func NewGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) *GCP
// ScheduleFetch enqueues a task on GCP to fetch the given modulePath and
// version. It returns an error if there was an error hashing the task name, or
// an error pushing the task to GCP.
-func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (err error) {
+func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (err error) {
// the new taskqueue API requires a deadline of <= 30s
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
- defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q)", modulePath, version, suffix)
+ defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q, %d)", modulePath, version, suffix, taskIDChangeInterval)
queueName := fmt.Sprintf("projects/%s/locations/%s/queues/%s", q.cfg.ProjectID, q.cfg.LocationID, q.queueID)
mod := fmt.Sprintf("%s/@v/%s", modulePath, version)
u := fmt.Sprintf("/fetch/" + mod)
- taskID := newTaskID(modulePath, version, time.Now())
+ taskID := newTaskID(modulePath, version, time.Now(), taskIDChangeInterval)
req := &taskspb.CreateTaskRequest{
Parent: queueName,
Task: &taskspb.Task{
@@ -93,9 +93,6 @@ func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix str
return nil
}
-// How often the task ID for a given module and version will change.
-const taskIDChangeInterval = time.Hour
-
// Create a task ID for the given module path and version.
// Task IDs can contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-), or underscores (_).
// Also include a truncated time in the hash, so it changes periodically.
@@ -104,7 +101,7 @@ const taskIDChangeInterval = time.Hour
// for two identical tasks to appear within that time period (for example, one at 2:59
// and the other at 3:01) -- each is part of a different taskIDChangeInterval-sized chunk
// of time. But there will never be a third identical task in that interval.
-func newTaskID(modulePath, version string, now time.Time) string {
+func newTaskID(modulePath, version string, now time.Time, taskIDChangeInterval time.Duration) string {
t := now.Truncate(taskIDChangeInterval)
return fmt.Sprintf("%x", sha256.Sum256([]byte(modulePath+"@"+version+"-"+t.String())))
}
@@ -174,7 +171,7 @@ func (q *InMemory) process(ctx context.Context, processFunc func(context.Context
// ScheduleFetch pushes a fetch task into the local queue to be processed
// asynchronously.
-func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) error {
+func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) error {
q.queue <- moduleVersion{modulePath, version}
return nil
}
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
index 3933325b..429e8017 100644
--- a/internal/queue/queue_test.go
+++ b/internal/queue/queue_test.go
@@ -12,18 +12,18 @@ import (
func TestNewTaskID(t *testing.T) {
// Verify that the task ID is the same within taskIDChangeInterval and changes
// afterwards.
- const (
- module = "mod"
- version = "ver"
+ var (
+ module = "mod"
+ version = "ver"
+ taskIDChangeInterval = 3 * time.Hour
)
-
tm := time.Now().Truncate(taskIDChangeInterval)
- id1 := newTaskID(module, version, tm)
- id2 := newTaskID(module, version, tm.Add(taskIDChangeInterval/2))
+ id1 := newTaskID(module, version, tm, taskIDChangeInterval)
+ id2 := newTaskID(module, version, tm.Add(taskIDChangeInterval/2), taskIDChangeInterval)
if id1 != id2 {
t.Error("wanted same task ID, got different")
}
- id3 := newTaskID(module, version, tm.Add(taskIDChangeInterval+1))
+ id3 := newTaskID(module, version, tm.Add(taskIDChangeInterval+1), taskIDChangeInterval)
if id1 == id3 {
t.Error("wanted different task ID, got same")
}