diff options
| author | Jonathan Amsterdam <jba@google.com> | 2021-06-17 10:17:27 -0400 |
|---|---|---|
| committer | Jonathan Amsterdam <jba@google.com> | 2021-06-18 17:41:02 +0000 |
| commit | 9b1c9bd2b0c3df16a819a464b4ecf410b0557a48 (patch) | |
| tree | 570f90bd90906f4be7094cfb62530de4ee1d9bea /internal/database/database.go | |
| parent | 2cfdcb5d9ede961efdbbbc790ee3c7f2722ef1fa (diff) | |
| download | go-x-pkgsite-9b1c9bd2b0c3df16a819a464b4ecf410b0557a48.tar.xz | |
internal/database: add ability to run queries incrementally
Postgres lets you define a cursor and fetch rows from it as top-level
statements, without needing to write any PS-SQL. See
https://www.postgresql.org/docs/11/sql-declare.html and
https://www.postgresql.org/docs/11/sql-fetch.html for details.
Use this feature to define RunQueryIncrementally, which repeatedly
fetches query rows in batches until it runs out or the passed function
says it's done.
If grouping by module paths is enabled, use RunQueryIncrementally
with a very large limit to read rows until we've seen a page's worth
of module paths.
This CL doesn't handle pagination correctly. Any page after the first
is going to be wrong.
Change-Id: Idf8233160b0cf74412a688e1a6b95f4f2b720008
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/329469
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Julie Qiu <julie@golang.org>
Diffstat (limited to 'internal/database/database.go')
| -rw-r--r-- | internal/database/database.go | 47 |
1 files changed, 41 insertions, 6 deletions
diff --git a/internal/database/database.go b/internal/database/database.go index 30963f02..dbbde425 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -12,6 +12,7 @@ import ( "database/sql" "errors" "fmt" + "io" "regexp" "strings" "sync" @@ -141,23 +142,57 @@ func (db *DB) Prepare(ctx context.Context, query string) (*sql.Stmt, error) { return db.db.PrepareContext(ctx, query) } -// RunQuery executes query, then calls f on each row. +// RunQuery executes query, then calls f on each row. It stops when there are no +// more rows or f returns a non-nil error. func (db *DB) RunQuery(ctx context.Context, query string, f func(*sql.Rows) error, params ...interface{}) error { rows, err := db.Query(ctx, query, params...) if err != nil { return err } - return processRows(rows, f) + _, err = processRows(rows, f) + return err } -func processRows(rows *sql.Rows, f func(*sql.Rows) error) error { +func processRows(rows *sql.Rows, f func(*sql.Rows) error) (int, error) { defer rows.Close() + n := 0 for rows.Next() { + n++ if err := f(rows); err != nil { - return err + return n, err } } - return rows.Err() + return n, rows.Err() +} + +// RunQueryIncrementally executes query, then calls f on each row. It fetches +// rows in groups of size batchSize. It stops when there are no more rows, or +// when f returns io.EOF. +func (db *DB) RunQueryIncrementally(ctx context.Context, query string, batchSize int, f func(*sql.Rows) error, params ...interface{}) (err error) { + // Run in a transaction, because cursors require one. + return db.Transact(ctx, sql.LevelDefault, func(tx *DB) error { + // Declare a cursor and associate it with the query. + // It will be closed when the transaction commits. + _, err = tx.Exec(ctx, fmt.Sprintf(`DECLARE c CURSOR FOR %s`, query), params...) + if err != nil { + return err + } + for { + // Fetch batchSize rows and process them. + rows, err := tx.Query(ctx, fmt.Sprintf(`FETCH %d FROM c`, batchSize)) + if err != nil { + return err + } + n, err := processRows(rows, f) + // Stop if there were no rows, or the processing function returned io.EOF. + if n == 0 || err == io.EOF { + return nil + } + if err != nil { + return err + } + } + }) } // Transact executes the given function in the context of a SQL transaction at @@ -369,7 +404,7 @@ func (db *DB) bulkInsert(ctx context.Context, table string, columns, returningCo if err != nil { return err } - err = processRows(rows, scanFunc) + _, err = processRows(rows, scanFunc) } if err != nil { return fmt.Errorf("running bulk insert query, values[%d:%d]): %w", leftBound, rightBound, err) |
