aboutsummaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorJonathan Amsterdam <jba@google.com>2020-06-01 15:32:59 -0400
committerJonathan Amsterdam <jba@google.com>2020-06-02 15:12:19 +0000
commite66341ffa9aaedd9fee511576e2e501738faa281 (patch)
tree081bf2bea81e7118e164dafd3787ad638658153e /internal
parent7557ba489297ee225fc0cad232619d21fbc93f83 (diff)
downloadgo-x-pkgsite-e66341ffa9aaedd9fee511576e2e501738faa281.tar.xz
internal/database: Transact supports any isolation level
- Add an arg to Transact for the isolation level - Remove TransactSerializable This makes it possible to use other levels, and makes it easier to see which level is being used for each transaction. Change-Id: Iba5e2920b4139e5e2f0f8c6b331a658d7c84f60f Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/758942 CI-Result: Cloud Build <devtools-proctor-result-processor@system.gserviceaccount.com> Reviewed-by: Julie Qiu <julieqiu@google.com>
Diffstat (limited to 'internal')
-rw-r--r--internal/database/database.go38
-rw-r--r--internal/database/database_test.go11
-rw-r--r--internal/postgres/insert_module.go11
-rw-r--r--internal/postgres/search.go4
-rw-r--r--internal/postgres/test_helper.go2
-rw-r--r--internal/postgres/versionstate.go4
6 files changed, 34 insertions, 36 deletions
diff --git a/internal/database/database.go b/internal/database/database.go
index 3e009e1c..2bd6b8cc 100644
--- a/internal/database/database.go
+++ b/internal/database/database.go
@@ -116,16 +116,26 @@ func (db *DB) RunQuery(ctx context.Context, query string, f func(*sql.Rows) erro
return rows.Err()
}
-// Transact executes the given function in the context of a SQL transaction,
-// rolling back the transaction if the function panics or returns an error. It
-// uses the default transaction options.
+// Transact executes the given function in the context of a SQL transaction at
+// the given isolation level, rolling back the transaction if the function
+// panics or returns an error.
//
// The given function is called with a DB that is associated with a transaction.
// The DB should be used only inside the function; if it is used to access the
// database after the function returns, the calls will return errors.
-func (db *DB) Transact(ctx context.Context, txFunc func(*DB) error) (err error) {
- defer derrors.Wrap(&err, "Transact")
- return db.transact(ctx, nil, txFunc)
+//
+// If the isolation level requires it, Transact will retry the transaction upon
+// serialization failure, so txFunc may be called more than once.
+
+func (db *DB) Transact(ctx context.Context, iso sql.IsolationLevel, txFunc func(*DB) error) (err error) {
+ defer derrors.Wrap(&err, "Transact(%s)", iso)
+ // For the levels which require retry, see
+ // https://www.postgresql.org/docs/11/transaction-iso.html.
+ opts := &sql.TxOptions{Isolation: iso}
+ if iso == sql.LevelRepeatableRead || iso == sql.LevelSerializable {
+ return db.transactWithRetry(ctx, opts, txFunc)
+ }
+ return db.transact(ctx, opts, txFunc)
}
// serializationFailureCode is the Postgres error code returned when a serializable
@@ -133,23 +143,13 @@ func (db *DB) Transact(ctx context.Context, txFunc func(*DB) error) (err error)
// See https://www.postgresql.org/docs/current/errcodes-appendix.html.
const serializationFailureCode = "40001"
-// TransactSerializable executes the given function in the context of a SQL transaction,
-// rolling back the transaction if the function panics or returns an error. It uses
-// the given context and the Serializable transaction isolation level.
-//
-// The given function is called with a DB that is associated with a transaction.
-// The DB should be used only inside the function; if it is used to access the
-// database after the function returns, the calls will return errors.
-//
-// TransactSerializable will retry the transaction upon serialization failure, so txFunc
-// may be called more than once.
-func (db *DB) TransactSerializable(ctx context.Context, txFunc func(*DB) error) (err error) {
- defer derrors.Wrap(&err, "TransactSerializable")
+func (db *DB) transactWithRetry(ctx context.Context, opts *sql.TxOptions, txFunc func(*DB) error) (err error) {
+ defer derrors.Wrap(&err, "transactWithRetry(%v)", opts)
// Retry on serialization failure, up to some max.
// See https://www.postgresql.org/docs/11/transaction-iso.html.
const maxRetries = 30
for i := 0; i <= maxRetries; i++ {
- err = db.transact(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}, txFunc)
+ err = db.transact(ctx, opts, txFunc)
var perr *pq.Error
if errors.As(err, &perr) && perr.Code == serializationFailureCode {
db.mu.Lock()
diff --git a/internal/database/database_test.go b/internal/database/database_test.go
index 7d95bd83..935b6707 100644
--- a/internal/database/database_test.go
+++ b/internal/database/database_test.go
@@ -193,7 +193,7 @@ func TestLargeBulkInsert(t *testing.T) {
for i := 0; i < size; i++ {
vals[i] = i + 1
}
- if err := testDB.Transact(ctx, func(db *DB) error {
+ if err := testDB.Transact(ctx, sql.LevelDefault, func(db *DB) error {
return db.BulkInsert(ctx, "test_large_bulk", []string{"i"}, vals, "")
}); err != nil {
t.Fatal(err)
@@ -220,7 +220,7 @@ func TestLargeBulkInsert(t *testing.T) {
func TestDBAfterTransactFails(t *testing.T) {
ctx := context.Background()
var tx *DB
- err := testDB.Transact(ctx, func(d *DB) error {
+ err := testDB.Transact(ctx, sql.LevelDefault, func(d *DB) error {
tx = d
return nil
})
@@ -269,7 +269,7 @@ func TestBulkUpdate(t *testing.T) {
for i := 0; i < 50; i++ {
values = append(values, i, i)
}
- err := testDB.Transact(ctx, func(tx *DB) error {
+ err := testDB.Transact(ctx, sql.LevelDefault, func(tx *DB) error {
return tx.BulkInsert(ctx, "bulk_update", cols, values, "")
})
if err != nil {
@@ -283,7 +283,7 @@ func TestBulkUpdate(t *testing.T) {
updateVals[1] = append(updateVals[1], -i)
}
- err = testDB.Transact(ctx, func(tx *DB) error {
+ err = testDB.Transact(ctx, sql.LevelDefault, func(tx *DB) error {
return tx.BulkUpdate(ctx, "bulk_update", cols, []string{"INT", "INT"}, updateVals)
})
if err != nil {
@@ -349,7 +349,8 @@ func TestTransactSerializable(t *testing.T) {
for i := 0; i < numTransactions; i++ {
i := i
go func() {
- errc <- testDB.TransactSerializable(ctx, func(tx *DB) error { return insertSum(tx, 1+i%2) })
+ errc <- testDB.Transact(ctx, sql.LevelSerializable,
+ func(tx *DB) error { return insertSum(tx, 1+i%2) })
}()
}
// None of the transactions should fail.
diff --git a/internal/postgres/insert_module.go b/internal/postgres/insert_module.go
index 4c3e33f9..3f969f9c 100644
--- a/internal/postgres/insert_module.go
+++ b/internal/postgres/insert_module.go
@@ -70,14 +70,11 @@ func (db *DB) saveModule(ctx context.Context, m *internal.Module) (err error) {
ctx, span := trace.StartSpan(ctx, "saveModule")
defer span.End()
- var transact func(context.Context, func(*database.DB) error) error
+ iso := sql.LevelDefault
if experiment.IsActive(ctx, internal.ExperimentInsertSerializable) {
- transact = db.db.TransactSerializable
- } else {
- transact = db.db.Transact
+ iso = sql.LevelSerializable
}
-
- return transact(ctx, func(tx *database.DB) error {
+ return db.db.Transact(ctx, iso, func(tx *database.DB) error {
moduleID, err := insertModule(ctx, tx, m)
if err != nil {
return err
@@ -645,7 +642,7 @@ func removeNonDistributableData(m *internal.Module) {
// DeleteModule deletes a Version from the database.
func (db *DB) DeleteModule(ctx context.Context, modulePath, version string) (err error) {
defer derrors.Wrap(&err, "DeleteModule(ctx, db, %q, %q)", modulePath, version)
- return db.db.Transact(ctx, func(tx *database.DB) error {
+ return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
// We only need to delete from the modules table. Thanks to ON DELETE
// CASCADE constraints, that will trigger deletions from all other tables.
const stmt = `DELETE FROM modules WHERE module_path=$1 AND version=$2`
diff --git a/internal/postgres/search.go b/internal/postgres/search.go
index 20cff0fe..1b697841 100644
--- a/internal/postgres/search.go
+++ b/internal/postgres/search.go
@@ -640,7 +640,7 @@ func (db *DB) UpdateSearchDocumentsImportedByCount(ctx context.Context) (nUpdate
if err != nil {
return 0, err
}
- err = db.db.Transact(ctx, func(tx *database.DB) error {
+ err = db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
if err := insertImportedByCounts(ctx, tx, counts); err != nil {
return err
}
@@ -905,7 +905,7 @@ func isInternalPackage(path string) bool {
func (db *DB) DeleteOlderVersionFromSearchDocuments(ctx context.Context, modulePath, version string) (err error) {
defer derrors.Wrap(&err, "DeleteOlderVersionFromSearchDocuments(ctx, %q, %q)", modulePath, version)
- return db.db.Transact(ctx, func(tx *database.DB) error {
+ return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
// Collect all package paths in search_documents with the given module path
// and an older version. (package_path is the primary key of search_documents.)
var ppaths []string
diff --git a/internal/postgres/test_helper.go b/internal/postgres/test_helper.go
index efe8bbd8..9d920b02 100644
--- a/internal/postgres/test_helper.go
+++ b/internal/postgres/test_helper.go
@@ -102,7 +102,7 @@ func SetupTestDB(dbName string) (_ *DB, err error) {
func ResetTestDB(db *DB, t *testing.T) {
ctx := context.Background()
t.Helper()
- if err := db.db.Transact(ctx, func(tx *database.DB) error {
+ if err := db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
if _, err := tx.Exec(ctx, `
TRUNCATE modules CASCADE;
TRUNCATE version_map;
diff --git a/internal/postgres/versionstate.go b/internal/postgres/versionstate.go
index 22a38e13..f2bfc99d 100644
--- a/internal/postgres/versionstate.go
+++ b/internal/postgres/versionstate.go
@@ -36,7 +36,7 @@ func (db *DB) InsertIndexVersions(ctx context.Context, versions []*internal.Inde
DO UPDATE SET
index_timestamp=excluded.index_timestamp,
next_processed_after=CURRENT_TIMESTAMP`
- return db.db.Transact(ctx, func(tx *database.DB) error {
+ return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
return tx.BulkInsert(ctx, "module_version_states", cols, vals, conflictAction)
})
}
@@ -57,7 +57,7 @@ func (db *DB) UpsertModuleVersionState(ctx context.Context, modulePath, vers, ap
numPackages = &n
}
- return db.db.Transact(ctx, func(tx *database.DB) error {
+ return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
if err := upsertModuleVersionState(ctx, tx, modulePath, vers, appVersion, numPackages, timestamp, status, goModPath, fetchErr); err != nil {
return err
}