diff options
| author | Jonathan Amsterdam <jba@google.com> | 2020-06-02 09:32:40 -0400 |
|---|---|---|
| committer | Jonathan Amsterdam <jba@google.com> | 2020-06-04 02:18:45 +0000 |
| commit | bc42b7ce3cb16956272a77fb69c5e8700f0d8f08 (patch) | |
| tree | 26aa9149e37a43715f69680046f911f13d5ffa04 /internal/postgres/insert_module.go | |
| parent | ba4f3b3e6d7f4d52c6f2aa607dece5871a164027 (diff) | |
| download | go-x-pkgsite-bc42b7ce3cb16956272a77fb69c5e8700f0d8f08.tar.xz | |
internal/postgres: lock latest-version changes by module path
Use Postgres advisory locks (see
https://www.postgresql.org/docs/11/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS)
to prevent more than one transaction from thinking it is the latest
version of a particular module.
Change-Id: Ie928dac240b6e8679b41434b2433f89fcb2f49ae
Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/760100
CI-Result: Cloud Build <devtools-proctor-result-processor@system.gserviceaccount.com>
Reviewed-by: Julie Qiu <julieqiu@google.com>
Diffstat (limited to 'internal/postgres/insert_module.go')
| -rw-r--r-- | internal/postgres/insert_module.go | 46 |
1 files changed, 40 insertions, 6 deletions
diff --git a/internal/postgres/insert_module.go b/internal/postgres/insert_module.go index 0284ed9e..edeca148 100644 --- a/internal/postgres/insert_module.go +++ b/internal/postgres/insert_module.go @@ -10,6 +10,8 @@ import ( "encoding/json" "errors" "fmt" + "hash/fnv" + "io" "sort" "strings" "unicode/utf8" @@ -71,12 +73,7 @@ func (db *DB) saveModule(ctx context.Context, m *internal.Module) (err error) { ctx, span := trace.StartSpan(ctx, "saveModule") defer span.End() - iso := sql.LevelDefault - if experiment.IsActive(ctx, internal.ExperimentInsertSerializable) { - iso = sql.LevelSerializable - } - - return db.db.Transact(ctx, iso, func(tx *database.DB) error { + return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error { moduleID, err := insertModule(ctx, tx, m) if err != nil { return err @@ -94,6 +91,16 @@ func (db *DB) saveModule(ctx context.Context, m *internal.Module) (err error) { } } + // Obtain a transaction-scoped exclusive advisory lock on the module + // path. The transaction that holds the lock is the only one that can + // execute the subsequent code on any module with the given path. That + // means that conflicts from two transactions both believing they are + // working on the latest version of a given module cannot happen. + // The lock is released automatically at the end of the transaction. + if err := lock(ctx, tx, m.ModulePath); err != nil { + return err + } + // We only insert into imports_unique and search_documents if this is // the latest version of the module. isLatest, err := isLatestVersion(ctx, tx, m.ModulePath, m.Version) @@ -510,6 +517,33 @@ func insertDirectories(ctx context.Context, db *database.DB, m *internal.Module, return nil } +// lock obtains an exclusive, transaction-scoped advisory lock on modulePath. +func lock(ctx context.Context, tx *database.DB, modulePath string) (err error) { + defer derrors.Wrap(&err, "lock(%s)", modulePath) + if !tx.InTransaction() { + return errors.New("not in a transaction") + } + // Postgres advisory locks use a 64-bit integer key. Convert modulePath to a + // key by hashing. + // + // This can result in collisions (two module paths hashing to the same key), + // but they are unlikely and at worst will slow things down a bit. + // + // We use the FNV hash algorithm from the standard library. It fits into 64 + // bits unlike a crypto hash, and is stable across processes, unlike + // hash/maphash. + hasher := fnv.New64() + io.WriteString(hasher, modulePath) // Writing to a hash.Hash never returns an error. + h := int64(hasher.Sum64()) + log.Debugf(ctx, "locking %s (%d) ...", modulePath, h) + // See https://www.postgresql.org/docs/11/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS. + if _, err := tx.Exec(ctx, `SELECT pg_advisory_xact_lock($1)`, h); err != nil { + return err + } + log.Debugf(ctx, "locking %s (%d) succeeded", modulePath, h) + return nil +} + // isLatestVersion reports whether version is the latest version of the module. func isLatestVersion(ctx context.Context, db *database.DB, modulePath, version string) (_ bool, err error) { defer derrors.Wrap(&err, "isLatestVersion(ctx, tx, %q)", modulePath) |
