diff options
Diffstat (limited to 'internal/queue/queue.go')
| -rw-r--r-- | internal/queue/queue.go | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index a428f5b8..6d470b0b 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -26,7 +26,7 @@ import ( // A Queue provides an interface for asynchronous scheduling of fetch actions. type Queue interface { - ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) error + ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (bool, error) } // New creates a new Queue with name queueName based on the configuration @@ -100,22 +100,24 @@ func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) (_ *G // ScheduleFetch enqueues a task on GCP to fetch the given modulePath and // version. It returns an error if there was an error hashing the task name, or -// an error pushing the task to GCP. -func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (err error) { +// an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil). +func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (enqueued bool, err error) { // the new taskqueue API requires a deadline of <= 30s ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q, %d)", modulePath, version, suffix, taskIDChangeInterval) req := q.newTaskRequest(modulePath, version, suffix, taskIDChangeInterval) + enqueued = true if _, err := q.client.CreateTask(ctx, req); err != nil { if status.Code(err) == codes.AlreadyExists { log.Infof(ctx, "ignoring duplicate task ID %s: %s@%s", req.Task.Name, modulePath, version) + enqueued = false } else { - return fmt.Errorf("q.client.CreateTask(ctx, req): %v", err) + return false, fmt.Errorf("q.client.CreateTask(ctx, req): %v", err) } } - return nil + return enqueued, nil } func (q *GCP) newTaskRequest(modulePath, version, suffix string, taskIDChangeInterval time.Duration) *taskspb.CreateTaskRequest { @@ -221,9 +223,9 @@ func NewInMemory(ctx context.Context, workerCount int, experiments []string, pro // ScheduleFetch pushes a fetch task into the local queue to be processed // asynchronously. -func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) error { +func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, taskIDChangeInterval time.Duration) (bool, error) { q.queue <- moduleVersion{modulePath, version} - return nil + return true, nil } // WaitForTesting waits for all queued requests to finish. It should only be |
