aboutsummaryrefslogtreecommitdiff
path: root/internal/queue/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/queue/queue.go')
-rw-r--r--internal/queue/queue.go74
1 files changed, 29 insertions, 45 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index a66365a1..c792f401 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -17,9 +17,6 @@ import (
"golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/experiment"
"golang.org/x/pkgsite/internal/log"
- "golang.org/x/pkgsite/internal/postgres"
- "golang.org/x/pkgsite/internal/proxy"
- "golang.org/x/pkgsite/internal/source"
taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -115,59 +112,46 @@ type moduleVersion struct {
//
// This should only be used for local development.
type InMemory struct {
- proxyClient *proxy.Client
- sourceClient *source.Client
- db *postgres.DB
-
- queue chan moduleVersion
- sem chan struct{}
- experiments *experiment.Set
- appVersionLabel string
+ queue chan moduleVersion
+ sem chan struct{}
+ experiments *experiment.Set
}
// NewInMemory creates a new InMemory that asynchronously fetches
// from proxyClient and stores in db. It uses workerCount parallelism to
// execute these fetches.
-func NewInMemory(ctx context.Context, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB, workerCount int,
- processFunc func(context.Context, string, string, *proxy.Client, *source.Client, *postgres.DB, string) (int, error), experiments *experiment.Set, appVersionLabel string) *InMemory {
+func NewInMemory(ctx context.Context, workerCount int, experiments *experiment.Set, processFunc func(context.Context, string, string) (int, error)) *InMemory {
q := &InMemory{
- proxyClient: proxyClient,
- sourceClient: sourceClient,
- db: db,
- queue: make(chan moduleVersion, 1000),
- sem: make(chan struct{}, workerCount),
- experiments: experiments,
- appVersionLabel: appVersionLabel,
+ queue: make(chan moduleVersion, 1000),
+ sem: make(chan struct{}, workerCount),
+ experiments: experiments,
}
- go q.process(ctx, processFunc)
- return q
-}
-
-func (q *InMemory) process(ctx context.Context, processFunc func(context.Context, string, string, *proxy.Client, *source.Client, *postgres.DB, string) (int, error)) {
-
- for v := range q.queue {
- select {
- case <-ctx.Done():
- return
- case q.sem <- struct{}{}:
- }
+ go func() {
+ for v := range q.queue {
+ select {
+ case <-ctx.Done():
+ return
+ case q.sem <- struct{}{}:
+ }
- // If a worker is available, make a request to the fetch service inside a
- // goroutine and wait for it to finish.
- go func(v moduleVersion) {
- defer func() { <-q.sem }()
+ // If a worker is available, make a request to the fetch service inside a
+ // goroutine and wait for it to finish.
+ go func(v moduleVersion) {
+ defer func() { <-q.sem }()
- log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(q.sem))
+ log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(q.sem))
- fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
- fetchCtx = experiment.NewContext(fetchCtx, q.experiments)
- defer cancel()
+ fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
+ fetchCtx = experiment.NewContext(fetchCtx, q.experiments)
+ defer cancel()
- if _, err := processFunc(fetchCtx, v.modulePath, v.version, q.proxyClient, q.sourceClient, q.db, q.appVersionLabel); err != nil {
- log.Error(fetchCtx, err)
- }
- }(v)
- }
+ if _, err := processFunc(fetchCtx, v.modulePath, v.version); err != nil {
+ log.Error(fetchCtx, err)
+ }
+ }(v)
+ }
+ }()
+ return q
}
// ScheduleFetch pushes a fetch task into the local queue to be processed