aboutsummaryrefslogtreecommitdiff
path: root/src/database/sql/sql.go
diff options
context:
space:
mode:
authorEmmanuel T Odeke <emmanuel@orijtech.com>2020-01-23 18:18:39 -0800
committerAndrew Bonventre <andybons@golang.org>2020-07-16 00:35:30 +0000
commit399ce807381f897d5ea7bdd3970017976d249738 (patch)
treeb3f44d6ceb375f32ba1ce67082c01020a5e289f1 /src/database/sql/sql.go
parentbce174c435f531d429c36540f3a52dccb4426245 (diff)
downloadgo-399ce807381f897d5ea7bdd3970017976d249738.tar.xz
[release-branch.go1.14] database/sql: backport 5 Tx rollback related CLs
Manually backported the subject CLs, because of lack of Gerrit "forge-author" permissions, but also because the prior cherry picks didn't apply cleanly, due to a tight relation chain. The backport comprises of: * CL 174122 * CL 216197 * CL 223963 * CL 216240 * CL 216241 Note: Due to the restrictions that we cannot retroactively introduce API changes to Go1.14.6 that weren't in Go1.14, the Conn.Validator interface (from CL 174122, CL 223963) isn't exposed, and drivers will just be inspected, for if they have an IsValid() bool method implemented. For a description of the content of each CL: * CL 174122: database/sql: process all Session Resets synchronously Adds a new interface, driver.ConnectionValidator, to allow drivers to signal they should not be used again, separatly from the session resetter interface. This is done now that the session reset is done after the connection is put into the connection pool. Previous behavior attempted to run Session Resets in a background worker. This implementation had two problems: untested performance gains for additional complexity, and failures when the pool size exceeded the connection reset channel buffer size. * CL 216197: database/sql: check conn expiry when returning to pool, not when handing it out With the original connection reuse strategy, it was possible that when a new connection was requested, the pool would wait for an an existing connection to return for re-use in a full connection pool, and then it would check if the returned connection was expired. If the returned connection expired while awaiting re-use, it would return an error to the location requestiong the new connection. The existing call sites requesting a new connection was often the last attempt at returning a connection for a query. This would then result in a failed query. This change ensures that we perform the expiry check right before a connection is inserted back in to the connection pool for while requesting a new connection. If requesting a new connection it will no longer fail due to the connection expiring. * CL 216240: database/sql: prevent Tx statement from committing after rollback It was possible for a Tx that was aborted for rollback asynchronously to execute a query after the rollback had completed on the database, which often would auto commit the query outside of the transaction. By W-locking the tx.closemu prior to issuing the rollback connection it ensures any Tx query either fails or finishes on the Tx, and never after the Tx has rolled back. * CL 216241: database/sql: on Tx rollback, retain connection if driver can reset session Previously the Tx would drop the connection after rolling back from a context cancel. Now if the driver can reset the session, keep the connection. * CL 223963 database/sql: add test for Conn.Validator interface This addresses comments made by Russ after https://golang.org/cl/174122 was merged. It addes a test for the connection validator and renames the interface to just "Validator". Updates #31480 Updates #32530 Updates #32942 Updates #34775 Fixes #39101 Change-Id: I043d2d724a367588689fd7d6f3cecb39abeb042c Reviewed-on: https://go-review.googlesource.com/c/go/+/242102 Run-TryBot: Emmanuel Odeke <emm.odeke@gmail.com> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Daniel Theophanes <kardianos@gmail.com>
Diffstat (limited to 'src/database/sql/sql.go')
-rw-r--r--src/database/sql/sql.go200
1 files changed, 110 insertions, 90 deletions
diff --git a/src/database/sql/sql.go b/src/database/sql/sql.go
index 0f5bbc01c9..a0b7ca8f08 100644
--- a/src/database/sql/sql.go
+++ b/src/database/sql/sql.go
@@ -421,7 +421,6 @@ type DB struct {
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
openerCh chan struct{}
- resetterCh chan *driverConn
closed bool
dep map[finalCloser]depSet
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
@@ -458,10 +457,10 @@ type driverConn struct {
sync.Mutex // guards following
ci driver.Conn
+ needReset bool // The connection session should be reset before use if true.
closed bool
finalClosed bool // ci.Close has been called
openStmt map[*driverStmt]bool
- lastErr error // lastError captures the result of the session resetter.
// guarded by db.mu
inUse bool
@@ -486,6 +485,41 @@ func (dc *driverConn) expired(timeout time.Duration) bool {
return dc.createdAt.Add(timeout).Before(nowFunc())
}
+// resetSession checks if the driver connection needs the
+// session to be reset and if required, resets it.
+func (dc *driverConn) resetSession(ctx context.Context) error {
+ dc.Lock()
+ defer dc.Unlock()
+
+ if !dc.needReset {
+ return nil
+ }
+ if cr, ok := dc.ci.(driver.SessionResetter); ok {
+ return cr.ResetSession(ctx)
+ }
+ return nil
+}
+
+// validator was introduced for Go1.15, but backported to Go1.14.
+type validator interface {
+ IsValid() bool
+}
+
+// validateConnection checks if the connection is valid and can
+// still be used. It also marks the session for reset if required.
+func (dc *driverConn) validateConnection(needsReset bool) bool {
+ dc.Lock()
+ defer dc.Unlock()
+
+ if needsReset {
+ dc.needReset = true
+ }
+ if cv, ok := dc.ci.(validator); ok {
+ return cv.IsValid()
+ }
+ return true
+}
+
// prepareLocked prepares the query on dc. When cg == nil the dc must keep track of
// the prepared statements in a pool.
func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, query string) (*driverStmt, error) {
@@ -511,19 +545,6 @@ func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, que
return ds, nil
}
-// resetSession resets the connection session and sets the lastErr
-// that is checked before returning the connection to another query.
-//
-// resetSession assumes that the embedded mutex is locked when the connection
-// was returned to the pool. This unlocks the mutex.
-func (dc *driverConn) resetSession(ctx context.Context) {
- defer dc.Unlock() // In case of panic.
- if dc.closed { // Check if the database has been closed.
- return
- }
- dc.lastErr = dc.ci.(driver.SessionResetter).ResetSession(ctx)
-}
-
// the dc.db's Mutex is held.
func (dc *driverConn) closeDBLocked() func() error {
dc.Lock()
@@ -713,14 +734,12 @@ func OpenDB(c driver.Connector) *DB {
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
- resetterCh: make(chan *driverConn, 50),
lastPut: make(map[*driverConn]string),
connRequests: make(map[uint64]chan connRequest),
stop: cancel,
}
go db.connectionOpener(ctx)
- go db.connectionResetter(ctx)
return db
}
@@ -1058,23 +1077,6 @@ func (db *DB) connectionOpener(ctx context.Context) {
}
}
-// connectionResetter runs in a separate goroutine to reset connections async
-// to exported API.
-func (db *DB) connectionResetter(ctx context.Context) {
- for {
- select {
- case <-ctx.Done():
- close(db.resetterCh)
- for dc := range db.resetterCh {
- dc.Unlock()
- }
- return
- case dc := <-db.resetterCh:
- dc.resetSession(ctx)
- }
- }
-}
-
// Open one new connection
func (db *DB) openNewConnection(ctx context.Context) {
// maybeOpenNewConnctions has already executed db.numOpen++ before it sent
@@ -1155,14 +1157,13 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn
conn.Close()
return nil, driver.ErrBadConn
}
- // Lock around reading lastErr to ensure the session resetter finished.
- conn.Lock()
- err := conn.lastErr
- conn.Unlock()
- if err == driver.ErrBadConn {
+
+ // Reset the session if required.
+ if err := conn.resetSession(ctx); err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
+
return conn, nil
}
@@ -1204,18 +1205,22 @@ func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn
if !ok {
return nil, errDBClosed
}
- if ret.err == nil && ret.conn.expired(lifetime) {
+ // Only check if the connection is expired if the strategy is cachedOrNewConns.
+ // If we require a new connection, just re-use the connection without looking
+ // at the expiry time. If it is expired, it will be checked when it is placed
+ // back into the connection pool.
+ // This prioritizes giving a valid connection to a client over the exact connection
+ // lifetime, which could expire exactly after this point anyway.
+ if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
- // Lock around reading lastErr to ensure the session resetter finished.
- ret.conn.Lock()
- err := ret.conn.lastErr
- ret.conn.Unlock()
- if err == driver.ErrBadConn {
+
+ // Reset the session if required.
+ if err := ret.conn.resetSession(ctx); err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
@@ -1275,13 +1280,23 @@ const debugGetPut = false
// putConn adds a connection to the db's free pool.
// err is optionally the last error that occurred on this connection.
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
+ if err != driver.ErrBadConn {
+ if !dc.validateConnection(resetSession) {
+ err = driver.ErrBadConn
+ }
+ }
db.mu.Lock()
if !dc.inUse {
+ db.mu.Unlock()
if debugGetPut {
fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
}
panic("sql: connection returned that was never out")
}
+
+ if err != driver.ErrBadConn && dc.expired(db.maxLifetime) {
+ err = driver.ErrBadConn
+ }
if debugGetPut {
db.lastPut[dc] = stack()
}
@@ -1305,41 +1320,13 @@ func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
if putConnHook != nil {
putConnHook(db, dc)
}
- if db.closed {
- // Connections do not need to be reset if they will be closed.
- // Prevents writing to resetterCh after the DB has closed.
- resetSession = false
- }
- if resetSession {
- if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
- // Lock the driverConn here so it isn't released until
- // the connection is reset.
- // The lock must be taken before the connection is put into
- // the pool to prevent it from being taken out before it is reset.
- dc.Lock()
- }
- }
added := db.putConnDBLocked(dc, nil)
db.mu.Unlock()
if !added {
- if resetSession {
- dc.Unlock()
- }
dc.Close()
return
}
- if !resetSession {
- return
- }
- select {
- default:
- // If the resetterCh is blocking then mark the connection
- // as bad and continue on.
- dc.lastErr = driver.ErrBadConn
- dc.Unlock()
- case db.resetterCh <- dc:
- }
}
// Satisfy a connRequest or put the driverConn in the idle pool and return true
@@ -1701,7 +1688,11 @@ func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStra
// beginDC starts a transaction. The provided dc must be valid and ready to use.
func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
var txi driver.Tx
+ keepConnOnRollback := false
withLock(dc, func() {
+ _, hasSessionResetter := dc.ci.(driver.SessionResetter)
+ _, hasConnectionValidator := dc.ci.(validator)
+ keepConnOnRollback = hasSessionResetter && hasConnectionValidator
txi, err = ctxDriverBegin(ctx, opts, dc.ci)
})
if err != nil {
@@ -1713,12 +1704,13 @@ func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error),
// The cancel function in Tx will be called after done is set to true.
ctx, cancel := context.WithCancel(ctx)
tx = &Tx{
- db: db,
- dc: dc,
- releaseConn: release,
- txi: txi,
- cancel: cancel,
- ctx: ctx,
+ db: db,
+ dc: dc,
+ releaseConn: release,
+ txi: txi,
+ cancel: cancel,
+ keepConnOnRollback: keepConnOnRollback,
+ ctx: ctx,
}
go tx.awaitDone()
return tx, nil
@@ -1980,6 +1972,11 @@ type Tx struct {
// Use atomic operations on value when checking value.
done int32
+ // keepConnOnRollback is true if the driver knows
+ // how to reset the connection's session and if need be discard
+ // the connection.
+ keepConnOnRollback bool
+
// All Stmts prepared for this transaction. These will be closed after the
// transaction has been committed or rolled back.
stmts struct {
@@ -2005,7 +2002,10 @@ func (tx *Tx) awaitDone() {
// transaction is closed and the resources are released. This
// rollback does nothing if the transaction has already been
// committed or rolled back.
- tx.rollback(true)
+ // Do not discard the connection if the connection knows
+ // how to reset the session.
+ discardConnection := !tx.keepConnOnRollback
+ tx.rollback(discardConnection)
}
func (tx *Tx) isDone() bool {
@@ -2016,14 +2016,10 @@ func (tx *Tx) isDone() bool {
// that has already been committed or rolled back.
var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back")
-// close returns the connection to the pool and
-// must only be called by Tx.rollback or Tx.Commit.
-func (tx *Tx) close(err error) {
- tx.cancel()
-
- tx.closemu.Lock()
- defer tx.closemu.Unlock()
-
+// closeLocked returns the connection to the pool and
+// must only be called by Tx.rollback or Tx.Commit while
+// closemu is Locked and tx already canceled.
+func (tx *Tx) closeLocked(err error) {
tx.releaseConn(err)
tx.dc = nil
tx.txi = nil
@@ -2090,6 +2086,15 @@ func (tx *Tx) Commit() error {
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone
}
+
+ // Cancel the Tx to release any active R-closemu locks.
+ // This is safe to do because tx.done has already transitioned
+ // from 0 to 1. Hold the W-closemu lock prior to rollback
+ // to ensure no other connection has an active query.
+ tx.cancel()
+ tx.closemu.Lock()
+ defer tx.closemu.Unlock()
+
var err error
withLock(tx.dc, func() {
err = tx.txi.Commit()
@@ -2097,16 +2102,31 @@ func (tx *Tx) Commit() error {
if err != driver.ErrBadConn {
tx.closePrepared()
}
- tx.close(err)
+ tx.closeLocked(err)
return err
}
+var rollbackHook func()
+
// rollback aborts the transaction and optionally forces the pool to discard
// the connection.
func (tx *Tx) rollback(discardConn bool) error {
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone
}
+
+ if rollbackHook != nil {
+ rollbackHook()
+ }
+
+ // Cancel the Tx to release any active R-closemu locks.
+ // This is safe to do because tx.done has already transitioned
+ // from 0 to 1. Hold the W-closemu lock prior to rollback
+ // to ensure no other connection has an active query.
+ tx.cancel()
+ tx.closemu.Lock()
+ defer tx.closemu.Unlock()
+
var err error
withLock(tx.dc, func() {
err = tx.txi.Rollback()
@@ -2117,7 +2137,7 @@ func (tx *Tx) rollback(discardConn bool) error {
if discardConn {
err = driver.ErrBadConn
}
- tx.close(err)
+ tx.closeLocked(err)
return err
}