diff options
Diffstat (limited to 'src/pkg/database/sql')
| -rw-r--r-- | src/pkg/database/sql/fakedb_test.go | 24 | ||||
| -rw-r--r-- | src/pkg/database/sql/sql.go | 125 | ||||
| -rw-r--r-- | src/pkg/database/sql/sql_test.go | 183 |
3 files changed, 296 insertions, 36 deletions
diff --git a/src/pkg/database/sql/fakedb_test.go b/src/pkg/database/sql/fakedb_test.go index 07e7fd242a..d900e2cebe 100644 --- a/src/pkg/database/sql/fakedb_test.go +++ b/src/pkg/database/sql/fakedb_test.go @@ -35,9 +35,10 @@ var _ = log.Printf // When opening a fakeDriver's database, it starts empty with no // tables. All tables and data are stored in memory only. type fakeDriver struct { - mu sync.Mutex - openCount int - dbs map[string]*fakeDB + mu sync.Mutex // guards 3 following fields + openCount int // conn opens + closeCount int // conn closes + dbs map[string]*fakeDB } type fakeDB struct { @@ -250,6 +251,7 @@ func setStrictFakeConnClose(t *testing.T) { } func (c *fakeConn) Close() (err error) { + drv := fdriver.(*fakeDriver) defer func() { if err != nil && testStrictClose != nil { testStrictClose.Errorf("failed to close a test fakeConn: %v", err) @@ -260,6 +262,11 @@ func (c *fakeConn) Close() (err error) { if fn != nil { fn(c, err) } + if err == nil { + drv.mu.Lock() + drv.closeCount++ + drv.mu.Unlock() + } }() if c.currTx != nil { return errors.New("can't close fakeConn; in a Transaction") @@ -459,7 +466,7 @@ func (s *fakeStmt) Close() error { panic("nil conn in fakeStmt.Close") } if s.c.db == nil { - panic("in fakeSmt.Close, conn's db is nil (already closed)") + panic("in fakeStmt.Close, conn's db is nil (already closed)") } if !s.closed { s.c.incrStat(&s.c.stmtsClosed) @@ -552,6 +559,15 @@ func (s *fakeStmt) Query(args []driver.Value) (driver.Rows, error) { if !ok { return nil, fmt.Errorf("fakedb: table %q doesn't exist", s.table) } + + if s.table == "magicquery" { + if len(s.whereCol) == 2 && s.whereCol[0] == "op" && s.whereCol[1] == "millis" { + if args[0] == "sleep" { + time.Sleep(time.Duration(args[1].(int64)) * time.Millisecond) + } + } + } + t.mu.Lock() defer t.mu.Unlock() diff --git a/src/pkg/database/sql/sql.go b/src/pkg/database/sql/sql.go index 72289407c9..0646fb796f 100644 --- a/src/pkg/database/sql/sql.go +++ b/src/pkg/database/sql/sql.go @@ -207,13 +207,46 @@ type DB struct { type driverConn struct { db *DB - sync.Mutex // guards following - ci driver.Conn - closed bool + sync.Mutex // guards following + ci driver.Conn + closed bool + finalClosed bool // ci.Close has been called + openStmt map[driver.Stmt]bool // guarded by db.mu - inUse bool - onPut []func() // code (with db.mu held) run when conn is next returned + inUse bool + onPut []func() // code (with db.mu held) run when conn is next returned + dbmuClosed bool // same as closed, but guarded by db.mu, for connIfFree +} + +func (dc *driverConn) removeOpenStmt(si driver.Stmt) { + dc.Lock() + defer dc.Unlock() + delete(dc.openStmt, si) +} + +func (dc *driverConn) prepareLocked(query string) (driver.Stmt, error) { + si, err := dc.ci.Prepare(query) + if err == nil { + // Track each driverConn's open statements, so we can close them + // before closing the conn. + // + // TODO(bradfitz): let drivers opt out of caring about + // stmt closes if the conn is about to close anyway? For now + // do the safe thing, in case stmts need to be closed. + // + // TODO(bradfitz): after Go 1.1, closing driver.Stmts + // should be moved to driverStmt, using unique + // *driverStmts everywhere (including from + // *Stmt.connStmt, instead of returning a + // driver.Stmt), using driverStmt as a pointer + // everywhere, and making it a finalCloser. + if dc.openStmt == nil { + dc.openStmt = make(map[driver.Stmt]bool) + } + dc.openStmt[si] = true + } + return si, err } // the dc.db's Mutex is held. @@ -236,13 +269,27 @@ func (dc *driverConn) Close() error { } dc.closed = true dc.Unlock() // not defer; removeDep finalClose calls may need to lock - return dc.db.removeDep(dc, dc) + + // And now updates that require holding dc.mu.Lock. + dc.db.mu.Lock() + dc.dbmuClosed = true + fn := dc.db.removeDepLocked(dc, dc) + dc.db.mu.Unlock() + return fn() } func (dc *driverConn) finalClose() error { dc.Lock() + + for si := range dc.openStmt { + si.Close() + } + dc.openStmt = nil + err := dc.ci.Close() dc.ci = nil + dc.finalClosed = true + dc.Unlock() return err } @@ -264,7 +311,8 @@ func (ds *driverStmt) Close() error { // depSet is a finalCloser's outstanding dependencies type depSet map[interface{}]bool // set of true bools -// The finalCloser interface is used by (*DB).addDep and (*DB).get +// The finalCloser interface is used by (*DB).addDep and related +// dependency reference counting. type finalCloser interface { // finalClose is called when the reference count of an object // goes to zero. (*DB).mu is not held while calling it. @@ -448,16 +496,26 @@ func (db *DB) conn() (*driverConn, error) { return dc, nil } -// connIfFree returns (wanted, true) if wanted is still a valid conn and +var ( + errConnClosed = errors.New("database/sql: internal sentinel error: conn is closed") + errConnBusy = errors.New("database/sql: internal sentinel error: conn is busy") +) + +// connIfFree returns (wanted, nil) if wanted is still a valid conn and // isn't in use. // -// If wanted is valid but in use, connIfFree returns (wanted, false). -// If wanted is invalid, connIfFre returns (nil, false). -func (db *DB) connIfFree(wanted *driverConn) (conn *driverConn, ok bool) { +// The error is errConnClosed if the connection if the requested connection +// is invalid because it's been closed. +// +// The error is errConnBusy if the connection is in use. +func (db *DB) connIfFree(wanted *driverConn) (*driverConn, error) { db.mu.Lock() defer db.mu.Unlock() if wanted.inUse { - return conn, false + return nil, errConnBusy + } + if wanted.dbmuClosed { + return nil, errConnClosed } for i, conn := range db.freeConn { if conn != wanted { @@ -466,9 +524,14 @@ func (db *DB) connIfFree(wanted *driverConn) (conn *driverConn, ok bool) { db.freeConn[i] = db.freeConn[len(db.freeConn)-1] db.freeConn = db.freeConn[:len(db.freeConn)-1] wanted.inUse = true - return wanted, true + return wanted, nil } - return nil, false + // TODO(bradfitz): shouldn't get here. After Go 1.1, change this to: + // panic("connIfFree call requested a non-closed, non-busy, non-free conn") + // Which passes all the tests, but I'm too paranoid to include this + // late in Go 1.1. + // Instead, treat it like a busy connection: + return nil, errConnBusy } // putConnHook is a hook for testing. @@ -485,7 +548,11 @@ func (db *DB) noteUnusedDriverStatement(c *driverConn, si driver.Stmt) { si.Close() }) } else { - si.Close() + c.Lock() + defer c.Unlock() + if !c.finalClosed { + si.Close() + } } } @@ -526,8 +593,6 @@ func (db *DB) putConn(dc *driverConn, err error) { db.mu.Unlock() return } - // TODO: check to see if we need this Conn for any prepared - // statements which are still active? db.mu.Unlock() dc.Close() @@ -560,7 +625,7 @@ func (db *DB) prepare(query string) (*Stmt, error) { return nil, err } dc.Lock() - si, err := dc.ci.Prepare(query) + si, err := dc.prepareLocked(query) dc.Unlock() if err != nil { db.putConn(dc, err) @@ -572,7 +637,6 @@ func (db *DB) prepare(query string) (*Stmt, error) { css: []connStmt{{dc, si}}, } db.addDep(stmt, stmt) - db.addDep(dc, stmt) db.putConn(dc, nil) return stmt, nil } @@ -623,7 +687,6 @@ func (db *DB) exec(query string, args []interface{}) (res Result, err error) { return nil, err } defer withLock(dc, func() { si.Close() }) - return resultFromStatement(driverStmt{dc, si}, args...) } @@ -1049,13 +1112,21 @@ func (s *Stmt) connStmt() (ci *driverConn, releaseConn func(error), si driver.St var cs connStmt match := false - for _, v := range s.css { - // TODO(bradfitz): lazily clean up entries in this - // list with dead conns while enumerating - if _, match = s.db.connIfFree(v.dc); match { + for i := 0; i < len(s.css); i++ { + v := s.css[i] + _, err := s.db.connIfFree(v.dc) + if err == nil { + match = true cs = v break } + if err == errConnClosed { + // Lazily remove dead conn from our freelist. + s.css[i] = s.css[len(s.css)-1] + s.css = s.css[:len(s.css)-1] + i-- + } + } s.mu.Unlock() @@ -1068,7 +1139,7 @@ func (s *Stmt) connStmt() (ci *driverConn, releaseConn func(error), si driver.St return nil, nil, nil, err } dc.Lock() - si, err := dc.ci.Prepare(s.query) + si, err := dc.prepareLocked(s.query) dc.Unlock() if err == driver.ErrBadConn && i < 10 { continue @@ -1076,7 +1147,6 @@ func (s *Stmt) connStmt() (ci *driverConn, releaseConn func(error), si driver.St if err != nil { return nil, nil, nil, err } - s.db.addDep(dc, s) s.mu.Lock() cs = connStmt{dc, si} s.css = append(s.css, cs) @@ -1195,6 +1265,7 @@ func (s *Stmt) Close() error { func (s *Stmt) finalClose() error { for _, v := range s.css { s.db.noteUnusedDriverStatement(v.dc, v.si) + v.dc.removeOpenStmt(v.si) s.db.removeDep(v.dc, s) } s.css = nil @@ -1389,7 +1460,7 @@ func (dr driverResult) RowsAffected() (int64, error) { } func stack() string { - var buf [1024]byte + var buf [2 << 10]byte return string(buf[:runtime.Stack(buf[:], false)]) } diff --git a/src/pkg/database/sql/sql_test.go b/src/pkg/database/sql/sql_test.go index 37fdd2795e..e6cc667fa9 100644 --- a/src/pkg/database/sql/sql_test.go +++ b/src/pkg/database/sql/sql_test.go @@ -43,6 +43,7 @@ type testOrBench interface { Errorf(string, ...interface{}) Fatal(...interface{}) Error(...interface{}) + Logf(string, ...interface{}) } func newTestDB(t testOrBench, name string) *DB { @@ -59,6 +60,11 @@ func newTestDB(t testOrBench, name string) *DB { exec(t, db, "INSERT|people|name=Bob,age=?,photo=BPHOTO", 2) exec(t, db, "INSERT|people|name=Chris,age=?,photo=CPHOTO,bdate=?", 3, chrisBirthday) } + if name == "magicquery" { + // Magic table name and column, known by fakedb_test.go. + exec(t, db, "CREATE|magicquery|op=string,millis=int32") + exec(t, db, "INSERT|magicquery|op=sleep,millis=10") + } return db } @@ -80,6 +86,16 @@ func closeDB(t testOrBench, db *DB) { t.Errorf("Error closing fakeConn: %v", err) } }) + for i, dc := range db.freeConn { + if n := len(dc.openStmt); n > 0 { + // Just a sanity check. This is legal in + // general, but if we make the tests clean up + // their statements first, then we can safely + // verify this is always zero here, and any + // other value is a leak. + t.Errorf("while closing db, freeConn %d/%d had %d open stmts; want 0", i, len(db.freeConn), n) + } + } err := db.Close() if err != nil { t.Fatalf("error closing DB: %v", err) @@ -95,6 +111,51 @@ func numPrepares(t *testing.T, db *DB) int { return db.freeConn[0].ci.(*fakeConn).numPrepare } +func (db *DB) numDeps() int { + db.mu.Lock() + defer db.mu.Unlock() + return len(db.dep) +} + +// Dependencies are closed via a goroutine, so this polls waiting for +// numDeps to fall to want, waiting up to d. +func (db *DB) numDepsPollUntil(want int, d time.Duration) int { + deadline := time.Now().Add(d) + for { + n := db.numDeps() + if n <= want || time.Now().After(deadline) { + return n + } + time.Sleep(50 * time.Millisecond) + } +} + +func (db *DB) numFreeConns() int { + db.mu.Lock() + defer db.mu.Unlock() + return len(db.freeConn) +} + +func (db *DB) dumpDeps(t *testing.T) { + for fc := range db.dep { + db.dumpDep(t, 0, fc, map[finalCloser]bool{}) + } +} + +func (db *DB) dumpDep(t *testing.T, depth int, dep finalCloser, seen map[finalCloser]bool) { + seen[dep] = true + indent := strings.Repeat(" ", depth) + ds := db.dep[dep] + for k := range ds { + t.Logf("%s%T (%p) waiting for -> %T (%p)", indent, dep, dep, k, k) + if fc, ok := k.(finalCloser); ok { + if !seen[fc] { + db.dumpDep(t, depth+1, fc, seen) + } + } + } +} + func TestQuery(t *testing.T) { db := newTestDB(t, "people") defer closeDB(t, db) @@ -131,7 +192,7 @@ func TestQuery(t *testing.T) { // And verify that the final rows.Next() call, which hit EOF, // also closed the rows connection. - if n := len(db.freeConn); n != 1 { + if n := db.numFreeConns(); n != 1 { t.Fatalf("free conns after query hitting EOF = %d; want 1", n) } if prepares := numPrepares(t, db) - prepares0; prepares != 1 { @@ -806,8 +867,11 @@ func TestMaxIdleConns(t *testing.T) { } } -// golang.org/issue/5046 -func TestCloseConnBeforeStmts(t *testing.T) { +// golang.org/issue/5323 +func TestStmtCloseDeps(t *testing.T) { + if testing.Short() { + t.Skip("skipping in short mode") + } defer setHookpostCloseConn(nil) setHookpostCloseConn(func(_ *fakeConn, err error) { if err != nil { @@ -815,7 +879,112 @@ func TestCloseConnBeforeStmts(t *testing.T) { } }) + db := newTestDB(t, "magicquery") + defer closeDB(t, db) + + driver := db.driver.(*fakeDriver) + + driver.mu.Lock() + opens0 := driver.openCount + closes0 := driver.closeCount + driver.mu.Unlock() + openDelta0 := opens0 - closes0 + + stmt, err := db.Prepare("SELECT|magicquery|op|op=?,millis=?") + if err != nil { + t.Fatal(err) + } + + // Start 50 parallel slow queries. + const ( + nquery = 50 + sleepMillis = 25 + nbatch = 2 + ) + var wg sync.WaitGroup + for batch := 0; batch < nbatch; batch++ { + for i := 0; i < nquery; i++ { + wg.Add(1) + go func() { + defer wg.Done() + var op string + if err := stmt.QueryRow("sleep", sleepMillis).Scan(&op); err != nil && err != ErrNoRows { + t.Error(err) + } + }() + } + // Sleep for twice the expected length of time for the + // batch of 50 queries above to finish before starting + // the next round. + time.Sleep(2 * sleepMillis * time.Millisecond) + } + wg.Wait() + + if g, w := db.numFreeConns(), 2; g != w { + t.Errorf("free conns = %d; want %d", g, w) + } + + if n := db.numDepsPollUntil(4, time.Second); n > 4 { + t.Errorf("number of dependencies = %d; expected <= 4", n) + db.dumpDeps(t) + } + + driver.mu.Lock() + opens := driver.openCount - opens0 + closes := driver.closeCount - closes0 + driver.mu.Unlock() + openDelta := (driver.openCount - driver.closeCount) - openDelta0 + + if openDelta > 2 { + t.Logf("open calls = %d", opens) + t.Logf("close calls = %d", closes) + t.Logf("open delta = %d", openDelta) + t.Errorf("db connections opened = %d; want <= 2", openDelta) + db.dumpDeps(t) + } + + if len(stmt.css) > nquery { + t.Errorf("len(stmt.css) = %d; want <= %d", len(stmt.css), nquery) + } + + if err := stmt.Close(); err != nil { + t.Fatal(err) + } + + if g, w := db.numFreeConns(), 2; g != w { + t.Errorf("free conns = %d; want %d", g, w) + } + + if n := db.numDepsPollUntil(2, time.Second); n > 2 { + t.Errorf("number of dependencies = %d; expected <= 2", n) + db.dumpDeps(t) + } + + db.SetMaxIdleConns(0) + + if g, w := db.numFreeConns(), 0; g != w { + t.Errorf("free conns = %d; want %d", g, w) + } + + if n := db.numDepsPollUntil(0, time.Second); n > 0 { + t.Errorf("number of dependencies = %d; expected 0", n) + db.dumpDeps(t) + } +} + +// golang.org/issue/5046 +func TestCloseConnBeforeStmts(t *testing.T) { db := newTestDB(t, "people") + defer closeDB(t, db) + + defer setHookpostCloseConn(nil) + setHookpostCloseConn(func(_ *fakeConn, err error) { + if err != nil { + t.Errorf("Error closing fakeConn: %v; from %s", err, stack()) + db.dumpDeps(t) + t.Errorf("DB = %#v", db) + } + }) stmt, err := db.Prepare("SELECT|people|name|") if err != nil { @@ -830,6 +999,9 @@ func TestCloseConnBeforeStmts(t *testing.T) { t.Errorf("conn shouldn't be closed") } + if n := len(dc.openStmt); n != 1 { + t.Errorf("driverConn num openStmt = %d; want 1", n) + } err = db.Close() if err != nil { t.Errorf("db Close = %v", err) @@ -837,8 +1009,8 @@ func TestCloseConnBeforeStmts(t *testing.T) { if !dc.closed { t.Errorf("after db.Close, driverConn should be closed") } - if dc.ci == nil { - t.Errorf("after db.Close, driverConn should still have its Conn interface") + if n := len(dc.openStmt); n != 0 { + t.Errorf("driverConn num openStmt = %d; want 0", n) } err = stmt.Close() @@ -888,6 +1060,7 @@ func manyConcurrentQueries(t testOrBench) { if err != nil { t.Fatal(err) } + defer stmt.Close() var wg sync.WaitGroup wg.Add(numReqs) |
