aboutsummaryrefslogtreecommitdiff
path: root/src/database/sql/sql.go
diff options
context:
space:
mode:
authorChris Broadfoot <cbro@golang.org>2017-01-26 09:24:20 -0800
committerChris Broadfoot <cbro@golang.org>2017-01-26 09:24:31 -0800
commit2f6c20b46c2964e3c8e2c985bc76fe2d660ac8af (patch)
tree183c99b60b2f31c7c2a63c630ff8b8bde849f4ab /src/database/sql/sql.go
parent59f181b6fda68ece22882945853ca2df9dbf1c88 (diff)
parent78860b2ad2261b6e3dc29006d2ffbdc4da14cb2e (diff)
downloadgo-2f6c20b46c2964e3c8e2c985bc76fe2d660ac8af.tar.xz
[release-branch.go1.8] all: merge master into release-branch.go1.8
78860b2ad2 cmd/go: don't reject ./... matching top-level file outside GOPATH 2b283cedef database/sql: fix race when canceling queries immediately 1cf08182f9 go/printer: fix format with leading comments in composite literal b531eb3062 runtime: reorder modules so main.main comes first 165cfbc409 database/sql: let tests wait for db pool to come to expected state ea73649343 doc: update gccgo docs 1db16711f5 doc: clarify what to do with Go 1.4 when installing from source 3717b429f2 doc: note that plugins are not fully baked 98842cabb6 net/http: don't send body on redirects for 301, 302, 303 when GetBody is set 314180e7f6 net/http: fix a nit aad06da2b9 cmd/link: mark DWARF function symbols as reachable be9dcfec29 doc: mention testing.MainStart signature change a96e117a58 runtime: amd64, use 4-byte ops for memmove of 4 bytes 4cce27a3fa cmd/compile: fix constant propagation through s390x MOVDNE instructions 1be957d703 misc/cgo/test: pass current environment to syscall.Exec ec654e2251 misc/cgo/test: fix test when using GCC 7 256a605faa cmd/compile: don't use nilcheck information until the next block e8d5989ed1 cmd/compile: fix compilebench -alloc ea7d9e6a52 runtime: check for nil g and m in msanread Change-Id: I61d508d4f0efe4b72e7396645c8ad6088d2bfa6e
Diffstat (limited to 'src/database/sql/sql.go')
-rw-r--r--src/database/sql/sql.go91
1 files changed, 64 insertions, 27 deletions
diff --git a/src/database/sql/sql.go b/src/database/sql/sql.go
index 0fa7c34a13..feb91223a9 100644
--- a/src/database/sql/sql.go
+++ b/src/database/sql/sql.go
@@ -1357,16 +1357,7 @@ func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStra
cancel: cancel,
ctx: ctx,
}
- go func(tx *Tx) {
- select {
- case <-tx.ctx.Done():
- if !tx.isDone() {
- // Discard and close the connection used to ensure the transaction
- // is closed and the resources are released.
- tx.rollback(true)
- }
- }
- }(tx)
+ go tx.awaitDone()
return tx, nil
}
@@ -1388,6 +1379,11 @@ func (db *DB) Driver() driver.Driver {
type Tx struct {
db *DB
+ // closemu prevents the transaction from closing while there
+ // is an active query. It is held for read during queries
+ // and exclusively during close.
+ closemu sync.RWMutex
+
// dc is owned exclusively until Commit or Rollback, at which point
// it's returned with putConn.
dc *driverConn
@@ -1413,6 +1409,20 @@ type Tx struct {
ctx context.Context
}
+// awaitDone blocks until the context in Tx is canceled and rolls back
+// the transaction if it's not already done.
+func (tx *Tx) awaitDone() {
+ // Wait for either the transaction to be committed or rolled
+ // back, or for the associated context to be closed.
+ <-tx.ctx.Done()
+
+ // Discard and close the connection used to ensure the
+ // transaction is closed and the resources are released. This
+ // rollback does nothing if the transaction has already been
+ // committed or rolled back.
+ tx.rollback(true)
+}
+
func (tx *Tx) isDone() bool {
return atomic.LoadInt32(&tx.done) != 0
}
@@ -1424,16 +1434,31 @@ var ErrTxDone = errors.New("sql: Transaction has already been committed or rolle
// close returns the connection to the pool and
// must only be called by Tx.rollback or Tx.Commit.
func (tx *Tx) close(err error) {
+ tx.closemu.Lock()
+ defer tx.closemu.Unlock()
+
tx.db.putConn(tx.dc, err)
tx.cancel()
tx.dc = nil
tx.txi = nil
}
+// hookTxGrabConn specifies an optional hook to be called on
+// a successful call to (*Tx).grabConn. For tests.
+var hookTxGrabConn func()
+
func (tx *Tx) grabConn(ctx context.Context) (*driverConn, error) {
+ select {
+ default:
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
if tx.isDone() {
return nil, ErrTxDone
}
+ if hookTxGrabConn != nil { // test hook
+ hookTxGrabConn()
+ }
return tx.dc, nil
}
@@ -1503,6 +1528,9 @@ func (tx *Tx) Rollback() error {
// for the execution of the returned statement. The returned statement
// will run in the transaction context.
func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
+ tx.closemu.RLock()
+ defer tx.closemu.RUnlock()
+
// TODO(bradfitz): We could be more efficient here and either
// provide a method to take an existing Stmt (created on
// perhaps a different Conn), and re-create it on this Conn if
@@ -1567,6 +1595,9 @@ func (tx *Tx) Prepare(query string) (*Stmt, error) {
// The returned statement operates within the transaction and will be closed
// when the transaction has been committed or rolled back.
func (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt {
+ tx.closemu.RLock()
+ defer tx.closemu.RUnlock()
+
// TODO(bradfitz): optimize this. Currently this re-prepares
// each time. This is fine for now to illustrate the API but
// we should really cache already-prepared statements
@@ -1618,6 +1649,9 @@ func (tx *Tx) Stmt(stmt *Stmt) *Stmt {
// ExecContext executes a query that doesn't return rows.
// For example: an INSERT and UPDATE.
func (tx *Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
+ tx.closemu.RLock()
+ defer tx.closemu.RUnlock()
+
dc, err := tx.grabConn(ctx)
if err != nil {
return nil, err
@@ -1661,6 +1695,9 @@ func (tx *Tx) Exec(query string, args ...interface{}) (Result, error) {
// QueryContext executes a query that returns rows, typically a SELECT.
func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
+ tx.closemu.RLock()
+ defer tx.closemu.RUnlock()
+
dc, err := tx.grabConn(ctx)
if err != nil {
return nil, err
@@ -2038,25 +2075,21 @@ type Rows struct {
// closed value is 1 when the Rows is closed.
// Use atomic operations on value when checking value.
closed int32
- ctxClose chan struct{} // closed when Rows is closed, may be null.
+ cancel func() // called when Rows is closed, may be nil.
lastcols []driver.Value
lasterr error // non-nil only if closed is true
closeStmt *driverStmt // if non-nil, statement to Close on close
}
func (rs *Rows) initContextClose(ctx context.Context) {
- if ctx.Done() == context.Background().Done() {
- return
- }
+ ctx, rs.cancel = context.WithCancel(ctx)
+ go rs.awaitDone(ctx)
+}
- rs.ctxClose = make(chan struct{})
- go func() {
- select {
- case <-ctx.Done():
- rs.Close()
- case <-rs.ctxClose:
- }
- }()
+// awaitDone blocks until the rows are closed or the context canceled.
+func (rs *Rows) awaitDone(ctx context.Context) {
+ <-ctx.Done()
+ rs.Close()
}
// Next prepares the next result row for reading with the Scan method. It
@@ -2314,7 +2347,9 @@ func (rs *Rows) Scan(dest ...interface{}) error {
return nil
}
-var rowsCloseHook func(*Rows, *error)
+// rowsCloseHook returns a function so tests may install the
+// hook throug a test only mutex.
+var rowsCloseHook = func() func(*Rows, *error) { return nil }
func (rs *Rows) isClosed() bool {
return atomic.LoadInt32(&rs.closed) != 0
@@ -2328,13 +2363,15 @@ func (rs *Rows) Close() error {
if !atomic.CompareAndSwapInt32(&rs.closed, 0, 1) {
return nil
}
- if rs.ctxClose != nil {
- close(rs.ctxClose)
- }
+
err := rs.rowsi.Close()
- if fn := rowsCloseHook; fn != nil {
+ if fn := rowsCloseHook(); fn != nil {
fn(rs, &err)
}
+ if rs.cancel != nil {
+ rs.cancel()
+ }
+
if rs.closeStmt != nil {
rs.closeStmt.Close()
}