diff options
Diffstat (limited to 'internal/queue/queue.go')
| -rw-r--r-- | internal/queue/queue.go | 13 |
1 files changed, 8 insertions, 5 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 891df968..03578d12 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -30,7 +30,7 @@ import ( // A Queue provides an interface for asynchronous scheduling of fetch actions. type Queue interface { - ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (bool, error) + ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (bool, error) } // New creates a new Queue with name queueName based on the configuration @@ -114,13 +114,13 @@ func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) (_ *G // ScheduleFetch enqueues a task on GCP to fetch the given modulePath and // version. It returns an error if there was an error hashing the task name, or // an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil). -func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (enqueued bool, err error) { +func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (enqueued bool, err error) { // the new taskqueue API requires a deadline of <= 30s ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q)", modulePath, version, suffix) - req := q.newTaskRequest(modulePath, version, suffix) + req := q.newTaskRequest(modulePath, version, suffix, disableProxyFetch) enqueued = true if _, err := q.client.CreateTask(ctx, req); err != nil { if status.Code(err) == codes.AlreadyExists { @@ -137,9 +137,12 @@ func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix str // See https://cloud.google.com/tasks/docs/creating-http-target-tasks. const maxCloudTasksTimeout = 30 * time.Minute -func (q *GCP) newTaskRequest(modulePath, version, suffix string) *taskspb.CreateTaskRequest { +func (q *GCP) newTaskRequest(modulePath, version, suffix string, disableProxyFetch bool) *taskspb.CreateTaskRequest { taskID := newTaskID(modulePath, version) relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version) + if disableProxyFetch { + relativeURI += "?proxyfetch=off" + } task := &taskspb.Task{ Name: fmt.Sprintf("%s/tasks/%s", q.queueName, taskID), DispatchDeadline: ptypes.DurationProto(maxCloudTasksTimeout), @@ -250,7 +253,7 @@ func NewInMemory(ctx context.Context, workerCount int, experiments []string, pro // ScheduleFetch pushes a fetch task into the local queue to be processed // asynchronously. -func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (bool, error) { +func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, _ string, _ bool) (bool, error) { q.queue <- moduleVersion{modulePath, version} return true, nil } |
