diff options
Diffstat (limited to 'internal/queue/queue.go')
| -rw-r--r-- | internal/queue/queue.go | 33 |
1 files changed, 23 insertions, 10 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 8905e1f5..b8f9e76d 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -31,7 +31,7 @@ import ( // A Queue provides an interface for asynchronous scheduling of fetch actions. type Queue interface { - ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (bool, error) + ScheduleFetch(ctx context.Context, modulePath, version string, opts *Options) (bool, error) } // New creates a new Queue with name queueName based on the configuration @@ -115,9 +115,11 @@ func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) (_ *G // ScheduleFetch enqueues a task on GCP to fetch the given modulePath and // version. It returns an error if there was an error hashing the task name, or // an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil). -func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (enqueued bool, err error) { - defer derrors.WrapStack(&err, "queue.ScheduleFetch(%q, %q, %q)", modulePath, version, suffix) - +func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version string, opts *Options) (enqueued bool, err error) { + defer derrors.WrapStack(&err, "queue.ScheduleFetch(%q, %q, %v)", modulePath, version, opts) + if opts == nil { + opts = &Options{} + } // Cloud Tasks enforces an RPC timeout of at most 30s. I couldn't find this // in the documentation, but using a larger value, or no timeout, results in // an InvalidArgument error with the text "The deadline cannot be more than @@ -128,7 +130,7 @@ func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix str if modulePath == internal.UnknownModulePath { return false, errors.New("given unknown module path") } - req := q.newTaskRequest(modulePath, version, suffix, disableProxyFetch) + req := q.newTaskRequest(modulePath, version, opts) enqueued = true if _, err := q.client.CreateTask(ctx, req); err != nil { if status.Code(err) == codes.AlreadyExists { @@ -141,6 +143,17 @@ func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix str return enqueued, nil } +// Options is used to provide option arguments for a task queue. +type Options struct { + // DisableProxyFetch reports whether proxyfetch should be set to off when + // making a fetch request. + DisableProxyFetch bool + + // Suffix is used to force reprocessing of tasks that would normally be + // de-duplicated. It is appended to the task name. + Suffix string +} + // Maximum timeout for HTTP tasks. // See https://cloud.google.com/tasks/docs/creating-http-target-tasks. const maxCloudTasksTimeout = 30 * time.Minute @@ -150,10 +163,10 @@ const ( DisableProxyFetchValue = "off" ) -func (q *GCP) newTaskRequest(modulePath, version, suffix string, disableProxyFetch bool) *taskspb.CreateTaskRequest { +func (q *GCP) newTaskRequest(modulePath, version string, opts *Options) *taskspb.CreateTaskRequest { taskID := newTaskID(modulePath, version) relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version) - if disableProxyFetch { + if opts.DisableProxyFetch { relativeURI += fmt.Sprintf("?%s=%s", DisableProxyFetchParam, DisableProxyFetchValue) } task := &taskspb.Task{ @@ -173,8 +186,8 @@ func (q *GCP) newTaskRequest(modulePath, version, suffix string, disableProxyFet } // If suffix is non-empty, append it to the task name. This lets us force reprocessing // of tasks that would normally be de-duplicated. - if suffix != "" { - req.Task.Name += "-" + suffix + if opts.Suffix != "" { + req.Task.Name += "-" + opts.Suffix } return req } @@ -271,7 +284,7 @@ func NewInMemory(ctx context.Context, workerCount int, experiments []string, pro // ScheduleFetch pushes a fetch task into the local queue to be processed // asynchronously. -func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, _ string, _ bool) (bool, error) { +func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *Options) (bool, error) { q.queue <- internal.Modver{Path: modulePath, Version: version} return true, nil } |
