diff options
| author | Jonathan Amsterdam <jba@google.com> | 2020-09-11 12:01:26 -0400 |
|---|---|---|
| committer | Jonathan Amsterdam <jba@google.com> | 2020-09-11 22:20:14 +0000 |
| commit | 5b4ee7b71e59908675534086d3d8ce1fd10aeef9 (patch) | |
| tree | 71eaff7581c5fa7414177a1e4b34ab3291053ed7 /internal/queue/queue.go | |
| parent | 8edea6c529ae601bac2527a03d0b24bd0581519a (diff) | |
| download | go-x-pkgsite-5b4ee7b71e59908675534086d3d8ce1fd10aeef9.tar.xz | |
internal/worker: improve logging of scheduled tasks
Report how many tasks were actually scheduled; take duplicate
tasks into account.
Change-Id: I855b7ca3a2c72da7cc104d6b5960e5f3a39c2f54
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/254381
Reviewed-by: Julie Qiu <julie@golang.org>
Diffstat (limited to 'internal/queue/queue.go')
| -rw-r--r-- | internal/queue/queue.go | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index a428f5b8..6d470b0b 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -26,7 +26,7 @@ import ( // A Queue provides an interface for asynchronous scheduling of fetch actions. type Queue interface { - ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) error + ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (bool, error) } // New creates a new Queue with name queueName based on the configuration @@ -100,22 +100,24 @@ func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) (_ *G // ScheduleFetch enqueues a task on GCP to fetch the given modulePath and // version. It returns an error if there was an error hashing the task name, or -// an error pushing the task to GCP. -func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (err error) { +// an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil). +func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (enqueued bool, err error) { // the new taskqueue API requires a deadline of <= 30s ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q, %d)", modulePath, version, suffix, taskIDChangeInterval) req := q.newTaskRequest(modulePath, version, suffix, taskIDChangeInterval) + enqueued = true if _, err := q.client.CreateTask(ctx, req); err != nil { if status.Code(err) == codes.AlreadyExists { log.Infof(ctx, "ignoring duplicate task ID %s: %s@%s", req.Task.Name, modulePath, version) + enqueued = false } else { - return fmt.Errorf("q.client.CreateTask(ctx, req): %v", err) + return false, fmt.Errorf("q.client.CreateTask(ctx, req): %v", err) } } - return nil + return enqueued, nil } func (q *GCP) newTaskRequest(modulePath, version, suffix string, taskIDChangeInterval time.Duration) *taskspb.CreateTaskRequest { @@ -221,9 +223,9 @@ func NewInMemory(ctx context.Context, workerCount int, experiments []string, pro // ScheduleFetch pushes a fetch task into the local queue to be processed // asynchronously. -func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) error { +func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (bool, error) { q.queue <- moduleVersion{modulePath, version} - return nil + return true, nil } // WaitForTesting waits for all queued requests to finish. It should only be |
