aboutsummaryrefslogtreecommitdiff
path: root/src/pkg/database/sql/sql.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/pkg/database/sql/sql.go')
-rw-r--r--src/pkg/database/sql/sql.go259
1 files changed, 220 insertions, 39 deletions
diff --git a/src/pkg/database/sql/sql.go b/src/pkg/database/sql/sql.go
index d81f6fe984..44257778c1 100644
--- a/src/pkg/database/sql/sql.go
+++ b/src/pkg/database/sql/sql.go
@@ -10,6 +10,7 @@
package sql
import (
+ "container/list"
"database/sql/driver"
"errors"
"fmt"
@@ -192,12 +193,22 @@ type DB struct {
driver driver.Driver
dsn string
- mu sync.Mutex // protects following fields
- freeConn []*driverConn
+ mu sync.Mutex // protects following fields
+ freeConn *list.List // of *driverConn
+ connRequests *list.List // of connRequest
+ numOpen int
+ pendingOpens int
+ // Used to sygnal the need for new connections
+ // a goroutine running connectionOpener() reads on this chan and
+ // maybeOpenNewConnections sends on the chan (one send per needed connection)
+ // It is closed during db.Close(). The close tells the connectionOpener
+ // goroutine to exit.
+ openerCh chan struct{}
closed bool
dep map[finalCloser]depSet
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
maxIdle int // zero means defaultMaxIdleConns; negative means 0
+ maxOpen int // <= 0 means unlimited
}
// driverConn wraps a driver.Conn with a mutex, to
@@ -217,6 +228,9 @@ type driverConn struct {
inUse bool
onPut []func() // code (with db.mu held) run when conn is next returned
dbmuClosed bool // same as closed, but guarded by db.mu, for connIfFree
+ // This is the Element returned by db.freeConn.PushFront(conn).
+ // It's used by connIfFree to remove the conn from the freeConn list.
+ listElem *list.Element
}
func (dc *driverConn) releaseConn(err error) {
@@ -254,15 +268,14 @@ func (dc *driverConn) prepareLocked(query string) (driver.Stmt, error) {
}
// the dc.db's Mutex is held.
-func (dc *driverConn) closeDBLocked() error {
+func (dc *driverConn) closeDBLocked() func() error {
dc.Lock()
+ defer dc.Unlock()
if dc.closed {
- dc.Unlock()
- return errors.New("sql: duplicate driverConn close")
+ return func() error { return errors.New("sql: duplicate driverConn close") }
}
dc.closed = true
- dc.Unlock() // not defer; removeDep finalClose calls may need to lock
- return dc.db.removeDepLocked(dc, dc)()
+ return dc.db.removeDepLocked(dc, dc)
}
func (dc *driverConn) Close() error {
@@ -293,8 +306,13 @@ func (dc *driverConn) finalClose() error {
err := dc.ci.Close()
dc.ci = nil
dc.finalClosed = true
-
dc.Unlock()
+
+ dc.db.mu.Lock()
+ dc.db.numOpen--
+ dc.db.maybeOpenNewConnections()
+ dc.db.mu.Unlock()
+
return err
}
@@ -380,6 +398,13 @@ func (db *DB) removeDepLocked(x finalCloser, dep interface{}) func() error {
}
}
+// This is the size of the connectionOpener request chan (dn.openerCh).
+// This value should be larger than the maximum typical value
+// used for db.maxOpen. If maxOpen is significantly larger than
+// connectionRequestQueueSize then it is possible for ALL calls into the *DB
+// to block until the connectionOpener can satify the backlog of requests.
+var connectionRequestQueueSize = 1000000
+
// Open opens a database specified by its database driver name and a
// driver-specific data source name, usually consisting of at least a
// database name and connection information.
@@ -398,10 +423,14 @@ func Open(driverName, dataSourceName string) (*DB, error) {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}
db := &DB{
- driver: driveri,
- dsn: dataSourceName,
- lastPut: make(map[*driverConn]string),
+ driver: driveri,
+ dsn: dataSourceName,
+ openerCh: make(chan struct{}, connectionRequestQueueSize),
+ lastPut: make(map[*driverConn]string),
}
+ db.freeConn = list.New()
+ db.connRequests = list.New()
+ go db.connectionOpener()
return db, nil
}
@@ -422,16 +451,32 @@ func (db *DB) Ping() error {
// Close closes the database, releasing any open resources.
func (db *DB) Close() error {
db.mu.Lock()
- defer db.mu.Unlock()
+ if db.closed { // Make DB.Close idempotent
+ db.mu.Unlock()
+ return nil
+ }
+ close(db.openerCh)
var err error
- for _, dc := range db.freeConn {
- err1 := dc.closeDBLocked()
+ fns := make([]func() error, 0, db.freeConn.Len())
+ for db.freeConn.Front() != nil {
+ dc := db.freeConn.Front().Value.(*driverConn)
+ dc.listElem = nil
+ fns = append(fns, dc.closeDBLocked())
+ db.freeConn.Remove(db.freeConn.Front())
+ }
+ db.closed = true
+ for db.connRequests.Front() != nil {
+ req := db.connRequests.Front().Value.(connRequest)
+ db.connRequests.Remove(db.connRequests.Front())
+ close(req)
+ }
+ db.mu.Unlock()
+ for _, fn := range fns {
+ err1 := fn()
if err1 != nil {
err = err1
}
}
- db.freeConn = nil
- db.closed = true
return err
}
@@ -453,6 +498,9 @@ func (db *DB) maxIdleConnsLocked() int {
// SetMaxIdleConns sets the maximum number of connections in the idle
// connection pool.
//
+// If MaxOpenConns is greater than 0 but less than the new MaxIdleConns
+// then the new MaxIdleConns will be reduced to match the MaxOpenConns limit
+//
// If n <= 0, no idle connections are retained.
func (db *DB) SetMaxIdleConns(n int) {
db.mu.Lock()
@@ -463,40 +511,148 @@ func (db *DB) SetMaxIdleConns(n int) {
// No idle connections.
db.maxIdle = -1
}
- for len(db.freeConn) > 0 && len(db.freeConn) > n {
- nfree := len(db.freeConn)
- dc := db.freeConn[nfree-1]
- db.freeConn[nfree-1] = nil
- db.freeConn = db.freeConn[:nfree-1]
+ // Make sure maxIdle doesn't exceed maxOpen
+ if db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen {
+ db.maxIdle = db.maxOpen
+ }
+ for db.freeConn.Len() > db.maxIdleConnsLocked() {
+ dc := db.freeConn.Back().Value.(*driverConn)
+ dc.listElem = nil
+ db.freeConn.Remove(db.freeConn.Back())
go dc.Close()
}
}
+// SetMaxOpenConns sets the maximum number of open connections to the database.
+//
+// If MaxIdleConns is greater than 0 and the new MaxOpenConns is less than
+// MaxIdleConns, then MaxIdleConns will be reduced to match the new
+// MaxOpenConns limit
+//
+// If n <= 0, then there is no limit on the number of open connections.
+// The default is 0 (unlimited).
+func (db *DB) SetMaxOpenConns(n int) {
+ db.mu.Lock()
+ db.maxOpen = n
+ if n < 0 {
+ db.maxOpen = 0
+ }
+ syncMaxIdle := db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen
+ db.mu.Unlock()
+ if syncMaxIdle {
+ db.SetMaxIdleConns(n)
+ }
+}
+
+// Assumes db.mu is locked.
+// If there are connRequests and the connection limit hasn't been reached,
+// then tell the connectionOpener to open new connections.
+func (db *DB) maybeOpenNewConnections() {
+ numRequests := db.connRequests.Len() - db.pendingOpens
+ if db.maxOpen > 0 {
+ numCanOpen := db.maxOpen - (db.numOpen + db.pendingOpens)
+ if numRequests > numCanOpen {
+ numRequests = numCanOpen
+ }
+ }
+ for numRequests > 0 {
+ db.pendingOpens++
+ numRequests--
+ db.openerCh <- struct{}{}
+ }
+}
+
+// Runs in a seperate goroutine, opens new connections when requested.
+func (db *DB) connectionOpener() {
+ for _ = range db.openerCh {
+ db.openNewConnection()
+ }
+}
+
+// Open one new connection
+func (db *DB) openNewConnection() {
+ ci, err := db.driver.Open(db.dsn)
+ db.mu.Lock()
+ defer db.mu.Unlock()
+ if db.closed {
+ if err == nil {
+ ci.Close()
+ }
+ return
+ }
+ db.pendingOpens--
+ if err != nil {
+ db.putConnDBLocked(nil, err)
+ return
+ }
+ dc := &driverConn{
+ db: db,
+ ci: ci,
+ }
+ db.addDepLocked(dc, dc)
+ db.numOpen++
+ db.putConnDBLocked(dc, err)
+}
+
+// connRequest represents one request for a new connection
+// When there are no idle connections available, DB.conn will create
+// a new connRequest and put it on the db.connRequests list.
+type connRequest chan<- interface{} // takes either a *driverConn or an error
+
+var errDBClosed = errors.New("sql: database is closed")
+
// conn returns a newly-opened or cached *driverConn
func (db *DB) conn() (*driverConn, error) {
db.mu.Lock()
if db.closed {
db.mu.Unlock()
- return nil, errors.New("sql: database is closed")
+ return nil, errDBClosed
+ }
+
+ // If db.maxOpen > 0 and the number of open connections is over the limit
+ // or there are no free connection, then make a request and wait.
+ if db.maxOpen > 0 && (db.numOpen >= db.maxOpen || db.freeConn.Len() == 0) {
+ // Make the connRequest channel. It's buffered so that the
+ // connectionOpener doesn't block while waiting for the req to be read.
+ ch := make(chan interface{}, 1)
+ req := connRequest(ch)
+ db.connRequests.PushBack(req)
+ db.maybeOpenNewConnections()
+ db.mu.Unlock()
+ ret, ok := <-ch
+ if !ok {
+ return nil, errDBClosed
+ }
+ switch ret.(type) {
+ case *driverConn:
+ return ret.(*driverConn), nil
+ case error:
+ return nil, ret.(error)
+ default:
+ panic("sql: Unexpected type passed through connRequest.ch")
+ }
}
- if n := len(db.freeConn); n > 0 {
- conn := db.freeConn[n-1]
- db.freeConn = db.freeConn[:n-1]
+
+ if f := db.freeConn.Front(); f != nil {
+ conn := f.Value.(*driverConn)
+ conn.listElem = nil
+ db.freeConn.Remove(f)
conn.inUse = true
db.mu.Unlock()
return conn, nil
}
- db.mu.Unlock()
+ db.mu.Unlock()
ci, err := db.driver.Open(db.dsn)
if err != nil {
return nil, err
}
+ db.mu.Lock()
+ db.numOpen++
dc := &driverConn{
db: db,
ci: ci,
}
- db.mu.Lock()
db.addDepLocked(dc, dc)
dc.inUse = true
db.mu.Unlock()
@@ -524,12 +680,9 @@ func (db *DB) connIfFree(wanted *driverConn) (*driverConn, error) {
if wanted.inUse {
return nil, errConnBusy
}
- for i, conn := range db.freeConn {
- if conn != wanted {
- continue
- }
- db.freeConn[i] = db.freeConn[len(db.freeConn)-1]
- db.freeConn = db.freeConn[:len(db.freeConn)-1]
+ if wanted.listElem != nil {
+ db.freeConn.Remove(wanted.listElem)
+ wanted.listElem = nil
wanted.inUse = true
return wanted, nil
}
@@ -589,6 +742,10 @@ func (db *DB) putConn(dc *driverConn, err error) {
if err == driver.ErrBadConn {
// Don't reuse bad connections.
+ // Since the conn is considered bad and is being discarded, treat it
+ // as closed. Decrement the open count.
+ db.numOpen--
+ db.maybeOpenNewConnections()
db.mu.Unlock()
dc.Close()
return
@@ -596,14 +753,38 @@ func (db *DB) putConn(dc *driverConn, err error) {
if putConnHook != nil {
putConnHook(db, dc)
}
- if n := len(db.freeConn); !db.closed && n < db.maxIdleConnsLocked() {
- db.freeConn = append(db.freeConn, dc)
- db.mu.Unlock()
- return
- }
+ added := db.putConnDBLocked(dc, nil)
db.mu.Unlock()
+ if !added {
+ dc.Close()
+ }
+}
- dc.Close()
+// Satisfy a connRequest or put the driverConn in the idle pool and return true
+// or return false.
+// putConnDBLocked will satisfy a connRequest if there is one, or it will
+// return the *driverConn to the freeConn list if err != nil and the idle
+// connection limit would not be reached.
+// If err != nil, the value of dc is ignored.
+// If err == nil, then dc must not equal nil.
+// If a connRequest was fullfilled or the *driverConn was placed in the
+// freeConn list, then true is returned, otherwise false is returned.
+func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
+ if db.connRequests.Len() > 0 {
+ req := db.connRequests.Front().Value.(connRequest)
+ db.connRequests.Remove(db.connRequests.Front())
+ if err != nil {
+ req <- err
+ } else {
+ dc.inUse = true
+ req <- dc
+ }
+ return true
+ } else if err == nil && !db.closed && db.maxIdleConnsLocked() > 0 && db.maxIdleConnsLocked() > db.freeConn.Len() {
+ dc.listElem = db.freeConn.PushFront(dc)
+ return true
+ }
+ return false
}
// Prepare creates a prepared statement for later queries or executions.