diff options
| author | Julie Qiu <julie@golang.org> | 2020-04-03 13:15:18 -0400 |
|---|---|---|
| committer | Julie Qiu <julie@golang.org> | 2020-04-06 17:09:52 -0400 |
| commit | bdcbfe07d19874ca082eefb1b724b9f8973cd088 (patch) | |
| tree | 575be073ee5c49087255db9108dc265e37349606 /internal/queue/queue.go | |
| parent | 14287ce18e0bfab5e9e16f620a8038c24879066b (diff) | |
| download | go-x-pkgsite-bdcbfe07d19874ca082eefb1b724b9f8973cd088.tar.xz | |
internal/source: add Client
source.Client is added, which replaces the used of http.DefaultClient
when fetching source code. This also allows us to set a custom timeout
for tests vs when running the worker.
Change-Id: I5b7b0fd32fa7a2cf836b951af1ad471751fade00
Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/709198
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Diffstat (limited to 'internal/queue/queue.go')
| -rw-r--r-- | internal/queue/queue.go | 23 |
1 files changed, 13 insertions, 10 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 4384c8d0..88752553 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -18,6 +18,7 @@ import ( "golang.org/x/discovery/internal/log" "golang.org/x/discovery/internal/postgres" "golang.org/x/discovery/internal/proxy" + "golang.org/x/discovery/internal/source" taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -114,8 +115,9 @@ type moduleVersion struct { // // This should only be used for local development. type InMemory struct { - proxyClient *proxy.Client - db *postgres.DB + proxyClient *proxy.Client + sourceClient *source.Client + db *postgres.DB queue chan moduleVersion sem chan struct{} @@ -124,19 +126,20 @@ type InMemory struct { // NewInMemory creates a new InMemory that asynchronously fetches // from proxyClient and stores in db. It uses workerCount parallelism to // execute these fetches. -func NewInMemory(ctx context.Context, proxyClient *proxy.Client, db *postgres.DB, workerCount int, - processFunc func(context.Context, string, string, *proxy.Client, *postgres.DB) (int, error)) *InMemory { +func NewInMemory(ctx context.Context, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB, workerCount int, + processFunc func(context.Context, string, string, *proxy.Client, *source.Client, *postgres.DB) (int, error)) *InMemory { q := &InMemory{ - proxyClient: proxyClient, - db: db, - queue: make(chan moduleVersion, 1000), - sem: make(chan struct{}, workerCount), + proxyClient: proxyClient, + sourceClient: sourceClient, + db: db, + queue: make(chan moduleVersion, 1000), + sem: make(chan struct{}, workerCount), } go q.process(ctx, processFunc) return q } -func (q *InMemory) process(ctx context.Context, processFunc func(context.Context, string, string, *proxy.Client, *postgres.DB) (int, error)) { +func (q *InMemory) process(ctx context.Context, processFunc func(context.Context, string, string, *proxy.Client, *source.Client, *postgres.DB) (int, error)) { for v := range q.queue { select { @@ -155,7 +158,7 @@ func (q *InMemory) process(ctx context.Context, processFunc func(context.Context fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() - if _, err := processFunc(fetchCtx, v.modulePath, v.version, q.proxyClient, q.db); err != nil { + if _, err := processFunc(fetchCtx, v.modulePath, v.version, q.proxyClient, q.sourceClient, q.db); err != nil { log.Error(fetchCtx, err) } }(v) |
