aboutsummaryrefslogtreecommitdiff
path: root/internal/queue/queue.go
diff options
context:
space:
mode:
authorJonathan Amsterdam <jba@google.com>2020-12-16 11:29:14 -0500
committerJonathan Amsterdam <jba@google.com>2020-12-21 18:22:28 +0000
commit337d847dbf5fed82d0358040c75a4ea1f9572a4b (patch)
treed93aa78a5e6292ca10c28ce672051e127241f48a /internal/queue/queue.go
parent119183b4e9c74b8237d4f7be93df0ee748e2dad5 (diff)
downloadgo-x-pkgsite-337d847dbf5fed82d0358040c75a4ea1f9572a4b.tar.xz
internal/queue,frontend,worker: enqueue tasks disabling proxy fetch
If the status of a task to be enqueued indicates reprocessing, then enqueue the task with a URL whose query param indicates that proxy fetching should be disabled. Change-Id: Id7255e861c1dea87d131d83151eb1a46df0ea4ff Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/278712 Trust: Jonathan Amsterdam <jba@google.com> Run-TryBot: Jonathan Amsterdam <jba@google.com> TryBot-Result: kokoro <noreply+kokoro@google.com> Reviewed-by: Jamal Carvalho <jamal@golang.org>
Diffstat (limited to 'internal/queue/queue.go')
-rw-r--r--internal/queue/queue.go13
1 files changed, 8 insertions, 5 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 891df968..03578d12 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -30,7 +30,7 @@ import (
// A Queue provides an interface for asynchronous scheduling of fetch actions.
type Queue interface {
- ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (bool, error)
+ ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (bool, error)
}
// New creates a new Queue with name queueName based on the configuration
@@ -114,13 +114,13 @@ func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) (_ *G
// ScheduleFetch enqueues a task on GCP to fetch the given modulePath and
// version. It returns an error if there was an error hashing the task name, or
// an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil).
-func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (enqueued bool, err error) {
+func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (enqueued bool, err error) {
// the new taskqueue API requires a deadline of <= 30s
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q)", modulePath, version, suffix)
- req := q.newTaskRequest(modulePath, version, suffix)
+ req := q.newTaskRequest(modulePath, version, suffix, disableProxyFetch)
enqueued = true
if _, err := q.client.CreateTask(ctx, req); err != nil {
if status.Code(err) == codes.AlreadyExists {
@@ -137,9 +137,12 @@ func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix str
// See https://cloud.google.com/tasks/docs/creating-http-target-tasks.
const maxCloudTasksTimeout = 30 * time.Minute
-func (q *GCP) newTaskRequest(modulePath, version, suffix string) *taskspb.CreateTaskRequest {
+func (q *GCP) newTaskRequest(modulePath, version, suffix string, disableProxyFetch bool) *taskspb.CreateTaskRequest {
taskID := newTaskID(modulePath, version)
relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version)
+ if disableProxyFetch {
+ relativeURI += "?proxyfetch=off"
+ }
task := &taskspb.Task{
Name: fmt.Sprintf("%s/tasks/%s", q.queueName, taskID),
DispatchDeadline: ptypes.DurationProto(maxCloudTasksTimeout),
@@ -250,7 +253,7 @@ func NewInMemory(ctx context.Context, workerCount int, experiments []string, pro
// ScheduleFetch pushes a fetch task into the local queue to be processed
// asynchronously.
-func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (bool, error) {
+func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, _ string, _ bool) (bool, error) {
q.queue <- moduleVersion{modulePath, version}
return true, nil
}