diff options
Diffstat (limited to 'internal/queue/inmemqueue/queue_test.go')
| -rw-r--r-- | internal/queue/inmemqueue/queue_test.go | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/internal/queue/inmemqueue/queue_test.go b/internal/queue/inmemqueue/queue_test.go new file mode 100644 index 00000000..79847140 --- /dev/null +++ b/internal/queue/inmemqueue/queue_test.go @@ -0,0 +1,118 @@ +// Copyright 2026 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package inmemqueue + +import ( + "context" + "errors" + "sync" + "testing" +) + +func TestInMemory_ScheduleAndProcess(t *testing.T) { + ctx := context.Background() + + var processed int + var mu sync.Mutex + + processFunc := func(ctx context.Context, path, version string) (int, error) { + mu.Lock() + processed++ + mu.Unlock() + return 200, nil + } + + q := New(ctx, 2, nil, processFunc) + + tests := []struct { + path string + version string + }{ + {"example.com/mod1", "v1.0.0"}, + {"example.com/mod2", "v1.1.0"}, + {"example.com/mod3", "v2.0.0"}, + } + + for _, tc := range tests { + ok, err := q.ScheduleFetch(ctx, tc.path, tc.version, nil) + if err != nil { + t.Fatalf("ScheduleFetch(%q, %q) returned error: %v", tc.path, tc.version, err) + } + if !ok { + t.Errorf("ScheduleFetch(%q, %q) = false, want true", tc.path, tc.version) + } + } + + q.WaitForTesting(ctx) + + if got, want := processed, len(tests); got != want { + t.Errorf("processed tasks = %d, want %d", got, want) + } +} + +func TestInMemory_ProcessFuncError(t *testing.T) { + ctx := context.Background() + + processFunc := func(ctx context.Context, path, version string) (int, error) { + return 500, errors.New("simulated fetch error") + } + + q := New(ctx, 1, nil, processFunc) + + _, err := q.ScheduleFetch(ctx, "example.com/error-mod", "v1.0.0", nil) + if err != nil { + t.Fatalf("ScheduleFetch returned error: %v", err) + } + + // This should complete without panicking or deadlocking. + q.WaitForTesting(ctx) +} + +func TestInMemory_WorkerConcurrency(t *testing.T) { + ctx := context.Background() + workerCount := 2 + + var active, max int + var mu sync.Mutex + var wg sync.WaitGroup + + blockCh := make(chan struct{}) + + processFunc := func(ctx context.Context, path, version string) (int, error) { + mu.Lock() + active++ + if active > max { + max = active + } + mu.Unlock() + + <-blockCh + + mu.Lock() + active-- + mu.Unlock() + wg.Done() + return 200, nil + } + + q := New(ctx, workerCount, nil, processFunc) + + taskCount := 5 + wg.Add(taskCount) + for range taskCount { + _, err := q.ScheduleFetch(ctx, "example.com/concurrent", "v1.0.0", nil) + if err != nil { + t.Fatalf("ScheduleFetch returned error: %v", err) + } + } + + close(blockCh) + wg.Wait() + q.WaitForTesting(ctx) + + if max > workerCount { + t.Errorf("max concurrent workers = %d, want <= %d", max, workerCount) + } +} |
