aboutsummaryrefslogtreecommitdiff
path: root/internal/queue/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/queue/queue.go')
-rw-r--r--internal/queue/queue.go33
1 files changed, 23 insertions, 10 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 8905e1f5..b8f9e76d 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -31,7 +31,7 @@ import (
// A Queue provides an interface for asynchronous scheduling of fetch actions.
type Queue interface {
- ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (bool, error)
+ ScheduleFetch(ctx context.Context, modulePath, version string, opts *Options) (bool, error)
}
// New creates a new Queue with name queueName based on the configuration
@@ -115,9 +115,11 @@ func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) (_ *G
// ScheduleFetch enqueues a task on GCP to fetch the given modulePath and
// version. It returns an error if there was an error hashing the task name, or
// an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil).
-func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (enqueued bool, err error) {
- defer derrors.WrapStack(&err, "queue.ScheduleFetch(%q, %q, %q)", modulePath, version, suffix)
-
+func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version string, opts *Options) (enqueued bool, err error) {
+ defer derrors.WrapStack(&err, "queue.ScheduleFetch(%q, %q, %v)", modulePath, version, opts)
+ if opts == nil {
+ opts = &Options{}
+ }
// Cloud Tasks enforces an RPC timeout of at most 30s. I couldn't find this
// in the documentation, but using a larger value, or no timeout, results in
// an InvalidArgument error with the text "The deadline cannot be more than
@@ -128,7 +130,7 @@ func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix str
if modulePath == internal.UnknownModulePath {
return false, errors.New("given unknown module path")
}
- req := q.newTaskRequest(modulePath, version, suffix, disableProxyFetch)
+ req := q.newTaskRequest(modulePath, version, opts)
enqueued = true
if _, err := q.client.CreateTask(ctx, req); err != nil {
if status.Code(err) == codes.AlreadyExists {
@@ -141,6 +143,17 @@ func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix str
return enqueued, nil
}
+// Options is used to provide option arguments for a task queue.
+type Options struct {
+ // DisableProxyFetch reports whether proxyfetch should be set to off when
+ // making a fetch request.
+ DisableProxyFetch bool
+
+ // Suffix is used to force reprocessing of tasks that would normally be
+ // de-duplicated. It is appended to the task name.
+ Suffix string
+}
+
// Maximum timeout for HTTP tasks.
// See https://cloud.google.com/tasks/docs/creating-http-target-tasks.
const maxCloudTasksTimeout = 30 * time.Minute
@@ -150,10 +163,10 @@ const (
DisableProxyFetchValue = "off"
)
-func (q *GCP) newTaskRequest(modulePath, version, suffix string, disableProxyFetch bool) *taskspb.CreateTaskRequest {
+func (q *GCP) newTaskRequest(modulePath, version string, opts *Options) *taskspb.CreateTaskRequest {
taskID := newTaskID(modulePath, version)
relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version)
- if disableProxyFetch {
+ if opts.DisableProxyFetch {
relativeURI += fmt.Sprintf("?%s=%s", DisableProxyFetchParam, DisableProxyFetchValue)
}
task := &taskspb.Task{
@@ -173,8 +186,8 @@ func (q *GCP) newTaskRequest(modulePath, version, suffix string, disableProxyFet
}
// If suffix is non-empty, append it to the task name. This lets us force reprocessing
// of tasks that would normally be de-duplicated.
- if suffix != "" {
- req.Task.Name += "-" + suffix
+ if opts.Suffix != "" {
+ req.Task.Name += "-" + opts.Suffix
}
return req
}
@@ -271,7 +284,7 @@ func NewInMemory(ctx context.Context, workerCount int, experiments []string, pro
// ScheduleFetch pushes a fetch task into the local queue to be processed
// asynchronously.
-func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, _ string, _ bool) (bool, error) {
+func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *Options) (bool, error) {
q.queue <- internal.Modver{Path: modulePath, Version: version}
return true, nil
}