diff options
| author | Jonathan Amsterdam <jba@google.com> | 2020-06-17 08:28:16 -0400 |
|---|---|---|
| committer | Jonathan Amsterdam <jba@google.com> | 2020-06-27 12:03:22 +0000 |
| commit | bf0874fd2ae9ce273e4764dec2bd38e2b8444fd8 (patch) | |
| tree | ca55816b44270c60487a7ed4e25490e0e5f6e071 /internal/queue | |
| parent | 08e8e660a65bf3ad770682a02fc3298b12e251b4 (diff) | |
| download | go-x-pkgsite-bf0874fd2ae9ce273e4764dec2bd38e2b8444fd8.tar.xz | |
internal/queue: reorganize InMemory
InMemory now expects the given function to close over things like the
proxy and source clients.
Change-Id: I7b3a2793a824ca29453b19b47b96bdedb2a91010
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/238441
Reviewed-by: Julie Qiu <julie@golang.org>
Diffstat (limited to 'internal/queue')
| -rw-r--r-- | internal/queue/queue.go | 74 |
1 files changed, 29 insertions, 45 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index a66365a1..c792f401 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -17,9 +17,6 @@ import ( "golang.org/x/pkgsite/internal/derrors" "golang.org/x/pkgsite/internal/experiment" "golang.org/x/pkgsite/internal/log" - "golang.org/x/pkgsite/internal/postgres" - "golang.org/x/pkgsite/internal/proxy" - "golang.org/x/pkgsite/internal/source" taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -115,59 +112,46 @@ type moduleVersion struct { // // This should only be used for local development. type InMemory struct { - proxyClient *proxy.Client - sourceClient *source.Client - db *postgres.DB - - queue chan moduleVersion - sem chan struct{} - experiments *experiment.Set - appVersionLabel string + queue chan moduleVersion + sem chan struct{} + experiments *experiment.Set } // NewInMemory creates a new InMemory that asynchronously fetches // from proxyClient and stores in db. It uses workerCount parallelism to // execute these fetches. -func NewInMemory(ctx context.Context, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB, workerCount int, - processFunc func(context.Context, string, string, *proxy.Client, *source.Client, *postgres.DB, string) (int, error), experiments *experiment.Set, appVersionLabel string) *InMemory { +func NewInMemory(ctx context.Context, workerCount int, experiments *experiment.Set, processFunc func(context.Context, string, string) (int, error)) *InMemory { q := &InMemory{ - proxyClient: proxyClient, - sourceClient: sourceClient, - db: db, - queue: make(chan moduleVersion, 1000), - sem: make(chan struct{}, workerCount), - experiments: experiments, - appVersionLabel: appVersionLabel, + queue: make(chan moduleVersion, 1000), + sem: make(chan struct{}, workerCount), + experiments: experiments, } - go q.process(ctx, processFunc) - return q -} - -func (q *InMemory) process(ctx context.Context, processFunc func(context.Context, string, string, *proxy.Client, *source.Client, *postgres.DB, string) (int, error)) { - - for v := range q.queue { - select { - case <-ctx.Done(): - return - case q.sem <- struct{}{}: - } + go func() { + for v := range q.queue { + select { + case <-ctx.Done(): + return + case q.sem <- struct{}{}: + } - // If a worker is available, make a request to the fetch service inside a - // goroutine and wait for it to finish. - go func(v moduleVersion) { - defer func() { <-q.sem }() + // If a worker is available, make a request to the fetch service inside a + // goroutine and wait for it to finish. + go func(v moduleVersion) { + defer func() { <-q.sem }() - log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(q.sem)) + log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(q.sem)) - fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) - fetchCtx = experiment.NewContext(fetchCtx, q.experiments) - defer cancel() + fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + fetchCtx = experiment.NewContext(fetchCtx, q.experiments) + defer cancel() - if _, err := processFunc(fetchCtx, v.modulePath, v.version, q.proxyClient, q.sourceClient, q.db, q.appVersionLabel); err != nil { - log.Error(fetchCtx, err) - } - }(v) - } + if _, err := processFunc(fetchCtx, v.modulePath, v.version); err != nil { + log.Error(fetchCtx, err) + } + }(v) + } + }() + return q } // ScheduleFetch pushes a fetch task into the local queue to be processed |
