aboutsummaryrefslogtreecommitdiff
path: root/internal/queue
diff options
context:
space:
mode:
authorJulie Qiu <julie@golang.org>2020-04-03 13:15:18 -0400
committerJulie Qiu <julie@golang.org>2020-04-06 17:09:52 -0400
commitbdcbfe07d19874ca082eefb1b724b9f8973cd088 (patch)
tree575be073ee5c49087255db9108dc265e37349606 /internal/queue
parent14287ce18e0bfab5e9e16f620a8038c24879066b (diff)
downloadgo-x-pkgsite-bdcbfe07d19874ca082eefb1b724b9f8973cd088.tar.xz
internal/source: add Client
source.Client is added, which replaces the used of http.DefaultClient when fetching source code. This also allows us to set a custom timeout for tests vs when running the worker. Change-Id: I5b7b0fd32fa7a2cf836b951af1ad471751fade00 Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/709198 Reviewed-by: Jonathan Amsterdam <jba@google.com>
Diffstat (limited to 'internal/queue')
-rw-r--r--internal/queue/queue.go23
1 files changed, 13 insertions, 10 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 4384c8d0..88752553 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -18,6 +18,7 @@ import (
"golang.org/x/discovery/internal/log"
"golang.org/x/discovery/internal/postgres"
"golang.org/x/discovery/internal/proxy"
+ "golang.org/x/discovery/internal/source"
taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -114,8 +115,9 @@ type moduleVersion struct {
//
// This should only be used for local development.
type InMemory struct {
- proxyClient *proxy.Client
- db *postgres.DB
+ proxyClient *proxy.Client
+ sourceClient *source.Client
+ db *postgres.DB
queue chan moduleVersion
sem chan struct{}
@@ -124,19 +126,20 @@ type InMemory struct {
// NewInMemory creates a new InMemory that asynchronously fetches
// from proxyClient and stores in db. It uses workerCount parallelism to
// execute these fetches.
-func NewInMemory(ctx context.Context, proxyClient *proxy.Client, db *postgres.DB, workerCount int,
- processFunc func(context.Context, string, string, *proxy.Client, *postgres.DB) (int, error)) *InMemory {
+func NewInMemory(ctx context.Context, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB, workerCount int,
+ processFunc func(context.Context, string, string, *proxy.Client, *source.Client, *postgres.DB) (int, error)) *InMemory {
q := &InMemory{
- proxyClient: proxyClient,
- db: db,
- queue: make(chan moduleVersion, 1000),
- sem: make(chan struct{}, workerCount),
+ proxyClient: proxyClient,
+ sourceClient: sourceClient,
+ db: db,
+ queue: make(chan moduleVersion, 1000),
+ sem: make(chan struct{}, workerCount),
}
go q.process(ctx, processFunc)
return q
}
-func (q *InMemory) process(ctx context.Context, processFunc func(context.Context, string, string, *proxy.Client, *postgres.DB) (int, error)) {
+func (q *InMemory) process(ctx context.Context, processFunc func(context.Context, string, string, *proxy.Client, *source.Client, *postgres.DB) (int, error)) {
for v := range q.queue {
select {
@@ -155,7 +158,7 @@ func (q *InMemory) process(ctx context.Context, processFunc func(context.Context
fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
- if _, err := processFunc(fetchCtx, v.modulePath, v.version, q.proxyClient, q.db); err != nil {
+ if _, err := processFunc(fetchCtx, v.modulePath, v.version, q.proxyClient, q.sourceClient, q.db); err != nil {
log.Error(fetchCtx, err)
}
}(v)