aboutsummaryrefslogtreecommitdiff
path: root/internal/queue/queue.go
diff options
context:
space:
mode:
authorJonathan Amsterdam <jba@google.com>2020-09-11 12:01:26 -0400
committerJonathan Amsterdam <jba@google.com>2020-09-11 22:20:14 +0000
commit5b4ee7b71e59908675534086d3d8ce1fd10aeef9 (patch)
tree71eaff7581c5fa7414177a1e4b34ab3291053ed7 /internal/queue/queue.go
parent8edea6c529ae601bac2527a03d0b24bd0581519a (diff)
downloadgo-x-pkgsite-5b4ee7b71e59908675534086d3d8ce1fd10aeef9.tar.xz
internal/worker: improve logging of scheduled tasks
Report how many tasks were actually scheduled; take duplicate tasks into account. Change-Id: I855b7ca3a2c72da7cc104d6b5960e5f3a39c2f54 Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/254381 Reviewed-by: Julie Qiu <julie@golang.org>
Diffstat (limited to 'internal/queue/queue.go')
-rw-r--r--internal/queue/queue.go16
1 files changed, 9 insertions, 7 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index a428f5b8..6d470b0b 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -26,7 +26,7 @@ import (
// A Queue provides an interface for asynchronous scheduling of fetch actions.
type Queue interface {
- ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) error
+ ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (bool, error)
}
// New creates a new Queue with name queueName based on the configuration
@@ -100,22 +100,24 @@ func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) (_ *G
// ScheduleFetch enqueues a task on GCP to fetch the given modulePath and
// version. It returns an error if there was an error hashing the task name, or
-// an error pushing the task to GCP.
-func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (err error) {
+// an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil).
+func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (enqueued bool, err error) {
// the new taskqueue API requires a deadline of <= 30s
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q, %d)", modulePath, version, suffix, taskIDChangeInterval)
req := q.newTaskRequest(modulePath, version, suffix, taskIDChangeInterval)
+ enqueued = true
if _, err := q.client.CreateTask(ctx, req); err != nil {
if status.Code(err) == codes.AlreadyExists {
log.Infof(ctx, "ignoring duplicate task ID %s: %s@%s", req.Task.Name, modulePath, version)
+ enqueued = false
} else {
- return fmt.Errorf("q.client.CreateTask(ctx, req): %v", err)
+ return false, fmt.Errorf("q.client.CreateTask(ctx, req): %v", err)
}
}
- return nil
+ return enqueued, nil
}
func (q *GCP) newTaskRequest(modulePath, version, suffix string, taskIDChangeInterval time.Duration) *taskspb.CreateTaskRequest {
@@ -221,9 +223,9 @@ func NewInMemory(ctx context.Context, workerCount int, experiments []string, pro
// ScheduleFetch pushes a fetch task into the local queue to be processed
// asynchronously.
-func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) error {
+func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (bool, error) {
q.queue <- moduleVersion{modulePath, version}
- return nil
+ return true, nil
}
// WaitForTesting waits for all queued requests to finish. It should only be