diff options
| author | Jonathan Amsterdam <jba@google.com> | 2020-06-01 15:32:59 -0400 |
|---|---|---|
| committer | Jonathan Amsterdam <jba@google.com> | 2020-06-02 15:12:19 +0000 |
| commit | e66341ffa9aaedd9fee511576e2e501738faa281 (patch) | |
| tree | 081bf2bea81e7118e164dafd3787ad638658153e /internal | |
| parent | 7557ba489297ee225fc0cad232619d21fbc93f83 (diff) | |
| download | go-x-pkgsite-e66341ffa9aaedd9fee511576e2e501738faa281.tar.xz | |
internal/database: Transact supports any isolation level
- Add an arg to Transact for the isolation level
- Remove TransactSerializable
This makes it possible to use other levels, and makes it easier to see
which level is being used for each transaction.
Change-Id: Iba5e2920b4139e5e2f0f8c6b331a658d7c84f60f
Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/758942
CI-Result: Cloud Build <devtools-proctor-result-processor@system.gserviceaccount.com>
Reviewed-by: Julie Qiu <julieqiu@google.com>
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/database/database.go | 38 | ||||
| -rw-r--r-- | internal/database/database_test.go | 11 | ||||
| -rw-r--r-- | internal/postgres/insert_module.go | 11 | ||||
| -rw-r--r-- | internal/postgres/search.go | 4 | ||||
| -rw-r--r-- | internal/postgres/test_helper.go | 2 | ||||
| -rw-r--r-- | internal/postgres/versionstate.go | 4 |
6 files changed, 34 insertions, 36 deletions
diff --git a/internal/database/database.go b/internal/database/database.go index 3e009e1c..2bd6b8cc 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -116,16 +116,26 @@ func (db *DB) RunQuery(ctx context.Context, query string, f func(*sql.Rows) erro return rows.Err() } -// Transact executes the given function in the context of a SQL transaction, -// rolling back the transaction if the function panics or returns an error. It -// uses the default transaction options. +// Transact executes the given function in the context of a SQL transaction at +// the given isolation level, rolling back the transaction if the function +// panics or returns an error. // // The given function is called with a DB that is associated with a transaction. // The DB should be used only inside the function; if it is used to access the // database after the function returns, the calls will return errors. -func (db *DB) Transact(ctx context.Context, txFunc func(*DB) error) (err error) { - defer derrors.Wrap(&err, "Transact") - return db.transact(ctx, nil, txFunc) +// +// If the isolation level requires it, Transact will retry the transaction upon +// serialization failure, so txFunc may be called more than once. + +func (db *DB) Transact(ctx context.Context, iso sql.IsolationLevel, txFunc func(*DB) error) (err error) { + defer derrors.Wrap(&err, "Transact(%s)", iso) + // For the levels which require retry, see + // https://www.postgresql.org/docs/11/transaction-iso.html. + opts := &sql.TxOptions{Isolation: iso} + if iso == sql.LevelRepeatableRead || iso == sql.LevelSerializable { + return db.transactWithRetry(ctx, opts, txFunc) + } + return db.transact(ctx, opts, txFunc) } // serializationFailureCode is the Postgres error code returned when a serializable @@ -133,23 +143,13 @@ func (db *DB) Transact(ctx context.Context, txFunc func(*DB) error) (err error) // See https://www.postgresql.org/docs/current/errcodes-appendix.html. const serializationFailureCode = "40001" -// TransactSerializable executes the given function in the context of a SQL transaction, -// rolling back the transaction if the function panics or returns an error. It uses -// the given context and the Serializable transaction isolation level. -// -// The given function is called with a DB that is associated with a transaction. -// The DB should be used only inside the function; if it is used to access the -// database after the function returns, the calls will return errors. -// -// TransactSerializable will retry the transaction upon serialization failure, so txFunc -// may be called more than once. -func (db *DB) TransactSerializable(ctx context.Context, txFunc func(*DB) error) (err error) { - defer derrors.Wrap(&err, "TransactSerializable") +func (db *DB) transactWithRetry(ctx context.Context, opts *sql.TxOptions, txFunc func(*DB) error) (err error) { + defer derrors.Wrap(&err, "transactWithRetry(%v)", opts) // Retry on serialization failure, up to some max. // See https://www.postgresql.org/docs/11/transaction-iso.html. const maxRetries = 30 for i := 0; i <= maxRetries; i++ { - err = db.transact(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}, txFunc) + err = db.transact(ctx, opts, txFunc) var perr *pq.Error if errors.As(err, &perr) && perr.Code == serializationFailureCode { db.mu.Lock() diff --git a/internal/database/database_test.go b/internal/database/database_test.go index 7d95bd83..935b6707 100644 --- a/internal/database/database_test.go +++ b/internal/database/database_test.go @@ -193,7 +193,7 @@ func TestLargeBulkInsert(t *testing.T) { for i := 0; i < size; i++ { vals[i] = i + 1 } - if err := testDB.Transact(ctx, func(db *DB) error { + if err := testDB.Transact(ctx, sql.LevelDefault, func(db *DB) error { return db.BulkInsert(ctx, "test_large_bulk", []string{"i"}, vals, "") }); err != nil { t.Fatal(err) @@ -220,7 +220,7 @@ func TestLargeBulkInsert(t *testing.T) { func TestDBAfterTransactFails(t *testing.T) { ctx := context.Background() var tx *DB - err := testDB.Transact(ctx, func(d *DB) error { + err := testDB.Transact(ctx, sql.LevelDefault, func(d *DB) error { tx = d return nil }) @@ -269,7 +269,7 @@ func TestBulkUpdate(t *testing.T) { for i := 0; i < 50; i++ { values = append(values, i, i) } - err := testDB.Transact(ctx, func(tx *DB) error { + err := testDB.Transact(ctx, sql.LevelDefault, func(tx *DB) error { return tx.BulkInsert(ctx, "bulk_update", cols, values, "") }) if err != nil { @@ -283,7 +283,7 @@ func TestBulkUpdate(t *testing.T) { updateVals[1] = append(updateVals[1], -i) } - err = testDB.Transact(ctx, func(tx *DB) error { + err = testDB.Transact(ctx, sql.LevelDefault, func(tx *DB) error { return tx.BulkUpdate(ctx, "bulk_update", cols, []string{"INT", "INT"}, updateVals) }) if err != nil { @@ -349,7 +349,8 @@ func TestTransactSerializable(t *testing.T) { for i := 0; i < numTransactions; i++ { i := i go func() { - errc <- testDB.TransactSerializable(ctx, func(tx *DB) error { return insertSum(tx, 1+i%2) }) + errc <- testDB.Transact(ctx, sql.LevelSerializable, + func(tx *DB) error { return insertSum(tx, 1+i%2) }) }() } // None of the transactions should fail. diff --git a/internal/postgres/insert_module.go b/internal/postgres/insert_module.go index 4c3e33f9..3f969f9c 100644 --- a/internal/postgres/insert_module.go +++ b/internal/postgres/insert_module.go @@ -70,14 +70,11 @@ func (db *DB) saveModule(ctx context.Context, m *internal.Module) (err error) { ctx, span := trace.StartSpan(ctx, "saveModule") defer span.End() - var transact func(context.Context, func(*database.DB) error) error + iso := sql.LevelDefault if experiment.IsActive(ctx, internal.ExperimentInsertSerializable) { - transact = db.db.TransactSerializable - } else { - transact = db.db.Transact + iso = sql.LevelSerializable } - - return transact(ctx, func(tx *database.DB) error { + return db.db.Transact(ctx, iso, func(tx *database.DB) error { moduleID, err := insertModule(ctx, tx, m) if err != nil { return err @@ -645,7 +642,7 @@ func removeNonDistributableData(m *internal.Module) { // DeleteModule deletes a Version from the database. func (db *DB) DeleteModule(ctx context.Context, modulePath, version string) (err error) { defer derrors.Wrap(&err, "DeleteModule(ctx, db, %q, %q)", modulePath, version) - return db.db.Transact(ctx, func(tx *database.DB) error { + return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error { // We only need to delete from the modules table. Thanks to ON DELETE // CASCADE constraints, that will trigger deletions from all other tables. const stmt = `DELETE FROM modules WHERE module_path=$1 AND version=$2` diff --git a/internal/postgres/search.go b/internal/postgres/search.go index 20cff0fe..1b697841 100644 --- a/internal/postgres/search.go +++ b/internal/postgres/search.go @@ -640,7 +640,7 @@ func (db *DB) UpdateSearchDocumentsImportedByCount(ctx context.Context) (nUpdate if err != nil { return 0, err } - err = db.db.Transact(ctx, func(tx *database.DB) error { + err = db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error { if err := insertImportedByCounts(ctx, tx, counts); err != nil { return err } @@ -905,7 +905,7 @@ func isInternalPackage(path string) bool { func (db *DB) DeleteOlderVersionFromSearchDocuments(ctx context.Context, modulePath, version string) (err error) { defer derrors.Wrap(&err, "DeleteOlderVersionFromSearchDocuments(ctx, %q, %q)", modulePath, version) - return db.db.Transact(ctx, func(tx *database.DB) error { + return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error { // Collect all package paths in search_documents with the given module path // and an older version. (package_path is the primary key of search_documents.) var ppaths []string diff --git a/internal/postgres/test_helper.go b/internal/postgres/test_helper.go index efe8bbd8..9d920b02 100644 --- a/internal/postgres/test_helper.go +++ b/internal/postgres/test_helper.go @@ -102,7 +102,7 @@ func SetupTestDB(dbName string) (_ *DB, err error) { func ResetTestDB(db *DB, t *testing.T) { ctx := context.Background() t.Helper() - if err := db.db.Transact(ctx, func(tx *database.DB) error { + if err := db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error { if _, err := tx.Exec(ctx, ` TRUNCATE modules CASCADE; TRUNCATE version_map; diff --git a/internal/postgres/versionstate.go b/internal/postgres/versionstate.go index 22a38e13..f2bfc99d 100644 --- a/internal/postgres/versionstate.go +++ b/internal/postgres/versionstate.go @@ -36,7 +36,7 @@ func (db *DB) InsertIndexVersions(ctx context.Context, versions []*internal.Inde DO UPDATE SET index_timestamp=excluded.index_timestamp, next_processed_after=CURRENT_TIMESTAMP` - return db.db.Transact(ctx, func(tx *database.DB) error { + return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error { return tx.BulkInsert(ctx, "module_version_states", cols, vals, conflictAction) }) } @@ -57,7 +57,7 @@ func (db *DB) UpsertModuleVersionState(ctx context.Context, modulePath, vers, ap numPackages = &n } - return db.db.Transact(ctx, func(tx *database.DB) error { + return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error { if err := upsertModuleVersionState(ctx, tx, modulePath, vers, appVersion, numPackages, timestamp, status, goModPath, fetchErr); err != nil { return err } |
