diff options
Diffstat (limited to 'internal/queue/queue.go')
| -rw-r--r-- | internal/queue/queue.go | 23 |
1 files changed, 13 insertions, 10 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 4384c8d0..88752553 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -18,6 +18,7 @@ import ( "golang.org/x/discovery/internal/log" "golang.org/x/discovery/internal/postgres" "golang.org/x/discovery/internal/proxy" + "golang.org/x/discovery/internal/source" taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -114,8 +115,9 @@ type moduleVersion struct { // // This should only be used for local development. type InMemory struct { - proxyClient *proxy.Client - db *postgres.DB + proxyClient *proxy.Client + sourceClient *source.Client + db *postgres.DB queue chan moduleVersion sem chan struct{} @@ -124,19 +126,20 @@ type InMemory struct { // NewInMemory creates a new InMemory that asynchronously fetches // from proxyClient and stores in db. It uses workerCount parallelism to // execute these fetches. -func NewInMemory(ctx context.Context, proxyClient *proxy.Client, db *postgres.DB, workerCount int, - processFunc func(context.Context, string, string, *proxy.Client, *postgres.DB) (int, error)) *InMemory { +func NewInMemory(ctx context.Context, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB, workerCount int, + processFunc func(context.Context, string, string, *proxy.Client, *source.Client, *postgres.DB) (int, error)) *InMemory { q := &InMemory{ - proxyClient: proxyClient, - db: db, - queue: make(chan moduleVersion, 1000), - sem: make(chan struct{}, workerCount), + proxyClient: proxyClient, + sourceClient: sourceClient, + db: db, + queue: make(chan moduleVersion, 1000), + sem: make(chan struct{}, workerCount), } go q.process(ctx, processFunc) return q } -func (q *InMemory) process(ctx context.Context, processFunc func(context.Context, string, string, *proxy.Client, *postgres.DB) (int, error)) { +func (q *InMemory) process(ctx context.Context, processFunc func(context.Context, string, string, *proxy.Client, *source.Client, *postgres.DB) (int, error)) { for v := range q.queue { select { @@ -155,7 +158,7 @@ func (q *InMemory) process(ctx context.Context, processFunc func(context.Context fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() - if _, err := processFunc(fetchCtx, v.modulePath, v.version, q.proxyClient, q.db); err != nil { + if _, err := processFunc(fetchCtx, v.modulePath, v.version, q.proxyClient, q.sourceClient, q.db); err != nil { log.Error(fetchCtx, err) } }(v) |
