diff options
Diffstat (limited to 'internal/database')
| -rw-r--r-- | internal/database/database.go | 23 | ||||
| -rw-r--r-- | internal/database/logging.go | 19 |
2 files changed, 27 insertions, 15 deletions
diff --git a/internal/database/database.go b/internal/database/database.go index f2d29f9e..96e858f4 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -33,6 +33,7 @@ type DB struct { db *sql.DB instanceID string tx *sql.Tx + opts sql.TxOptions // valid when tx != nil mu sync.Mutex maxRetries int // max times a single transaction was retried } @@ -68,6 +69,10 @@ func (db *DB) InTransaction() bool { return db.tx != nil } +func (db *DB) IsRetryable() bool { + return db.tx != nil && isRetryable(db.opts.Isolation) +} + var passwordRegexp = regexp.MustCompile(`password=\S+`) func redactPassword(dbinfo string) string { @@ -81,7 +86,7 @@ func (db *DB) Close() error { // Exec executes a SQL statement and returns the number of rows it affected. func (db *DB) Exec(ctx context.Context, query string, args ...interface{}) (_ int64, err error) { - defer logQuery(ctx, query, args, db.instanceID)(&err) + defer logQuery(ctx, query, args, db.instanceID, db.IsRetryable())(&err) res, err := db.execResult(ctx, query, args...) if err != nil { return 0, err @@ -103,7 +108,7 @@ func (db *DB) execResult(ctx context.Context, query string, args ...interface{}) // Query runs the DB query. func (db *DB) Query(ctx context.Context, query string, args ...interface{}) (_ *sql.Rows, err error) { - defer logQuery(ctx, query, args, db.instanceID)(&err) + defer logQuery(ctx, query, args, db.instanceID, db.IsRetryable())(&err) if db.tx != nil { return db.tx.QueryContext(ctx, query, args...) } @@ -112,7 +117,7 @@ func (db *DB) Query(ctx context.Context, query string, args ...interface{}) (_ * // QueryRow runs the query and returns a single row. func (db *DB) QueryRow(ctx context.Context, query string, args ...interface{}) *sql.Row { - defer logQuery(ctx, query, args, db.instanceID)(nil) + defer logQuery(ctx, query, args, db.instanceID, db.IsRetryable())(nil) start := time.Now() defer func() { if ctx.Err() != nil { @@ -128,7 +133,7 @@ func (db *DB) QueryRow(ctx context.Context, query string, args ...interface{}) * } func (db *DB) Prepare(ctx context.Context, query string) (*sql.Stmt, error) { - defer logQuery(ctx, "preparing "+query, nil, db.instanceID) + defer logQuery(ctx, "preparing "+query, nil, db.instanceID, db.IsRetryable()) if db.tx != nil { return db.tx.PrepareContext(ctx, query) } @@ -169,12 +174,16 @@ func (db *DB) Transact(ctx context.Context, iso sql.IsolationLevel, txFunc func( // For the levels which require retry, see // https://www.postgresql.org/docs/11/transaction-iso.html. opts := &sql.TxOptions{Isolation: iso} - if iso == sql.LevelRepeatableRead || iso == sql.LevelSerializable { + if isRetryable(iso) { return db.transactWithRetry(ctx, opts, txFunc) } return db.transact(ctx, opts, txFunc) } +func isRetryable(iso sql.IsolationLevel) bool { + return iso == sql.LevelRepeatableRead || iso == sql.LevelSerializable +} + // serializationFailureCode is the Postgres error code returned when a serializable // transaction fails because it would violate serializability. // See https://www.postgresql.org/docs/current/errcodes-appendix.html. @@ -193,6 +202,7 @@ func (db *DB) transactWithRetry(ctx context.Context, opts *sql.TxOptions, txFunc db.maxRetries = i } db.mu.Unlock() + log.Debugf(ctx, "serialization failure; retrying") continue } if err != nil { @@ -246,7 +256,8 @@ func (db *DB) transact(ctx context.Context, opts *sql.TxOptions, txFunc func(*DB dbtx := New(db.db, db.instanceID) dbtx.tx = tx - defer dbtx.logTransaction(ctx, opts)(&err) + dbtx.opts = *opts + defer dbtx.logTransaction(ctx)(&err) if err := txFunc(dbtx); err != nil { return fmt.Errorf("txFunc(tx): %w", err) } diff --git a/internal/database/logging.go b/internal/database/logging.go index 5979dfa1..fffd2c55 100644 --- a/internal/database/logging.go +++ b/internal/database/logging.go @@ -6,7 +6,6 @@ package database import ( "context" - "database/sql" "errors" "fmt" "strings" @@ -32,7 +31,7 @@ type queryEndLogEntry struct { Error string `json:",omitempty"` } -func logQuery(ctx context.Context, query string, args []interface{}, instanceID string) func(*error) { +func logQuery(ctx context.Context, query string, args []interface{}, instanceID string, retryable bool) func(*error) { if QueryLoggingDisabled { return func(*error) {} } @@ -109,26 +108,28 @@ func logQuery(ctx context.Context, query string, args []interface{}, instanceID strings.Contains(entry.Error, "pq: canceling statement due to user request") { logf = log.Debug } + // If the transaction is retryable and this is a serialization error, + // then it's not really an error at all. Log it as a warning, so if + // we get a "failed due to max retries" error, we can find these easily. + if retryable && isSerializationFailure(*errp) { + logf = log.Warning + } logf(ctx, entry) } } } } -func (db *DB) logTransaction(ctx context.Context, opts *sql.TxOptions) func(*error) { +func (db *DB) logTransaction(ctx context.Context) func(*error) { if QueryLoggingDisabled { return func(*error) {} } uid := generateLoggingID(db.instanceID) - isoLevel := "default" - if opts != nil { - isoLevel = opts.Isolation.String() - } - log.Debugf(ctx, "%s transaction (isolation %s) started", uid, isoLevel) + log.Debugf(ctx, "%s transaction (isolation %s) started", uid, db.opts.Isolation) start := time.Now() return func(errp *error) { log.Debugf(ctx, "%s transaction (isolation %s) finished in %s with error %v", - uid, isoLevel, time.Since(start), *errp) + uid, db.opts.Isolation, time.Since(start), *errp) } } |
