diff options
Diffstat (limited to 'src/database/sql/sql.go')
| -rw-r--r-- | src/database/sql/sql.go | 72 |
1 files changed, 71 insertions, 1 deletions
diff --git a/src/database/sql/sql.go b/src/database/sql/sql.go index 8dd48107a6..3db387e841 100644 --- a/src/database/sql/sql.go +++ b/src/database/sql/sql.go @@ -2893,6 +2893,8 @@ type Rows struct { cancel func() // called when Rows is closed, may be nil. closeStmt *driverStmt // if non-nil, statement to Close on close + contextDone atomic.Pointer[error] // error that awaitDone saw; set before close attempt + // closemu prevents Rows from closing while there // is an active streaming result. It is held for read during non-close operations // and exclusively during close. @@ -2905,6 +2907,15 @@ type Rows struct { // lastcols is only used in Scan, Next, and NextResultSet which are expected // not to be called concurrently. lastcols []driver.Value + + // closemuScanHold is whether the previous call to Scan kept closemu RLock'ed + // without unlocking it. It does that when the user passes a *RawBytes scan + // target. In that case, we need to prevent awaitDone from closing the Rows + // while the user's still using the memory. See go.dev/issue/60304. + // + // It is only used by Scan, Next, and NextResultSet which are expected + // not to be called concurrently. + closemuScanHold bool } // lasterrOrErrLocked returns either lasterr or the provided err. @@ -2942,7 +2953,11 @@ func (rs *Rows) awaitDone(ctx, txctx context.Context) { } select { case <-ctx.Done(): + err := ctx.Err() + rs.contextDone.Store(&err) case <-txctxDone: + err := txctx.Err() + rs.contextDone.Store(&err) } rs.close(ctx.Err()) } @@ -2954,6 +2969,15 @@ func (rs *Rows) awaitDone(ctx, txctx context.Context) { // // Every call to Scan, even the first one, must be preceded by a call to Next. func (rs *Rows) Next() bool { + // If the user's calling Next, they're done with their previous row's Scan + // results (any RawBytes memory), so we can release the read lock that would + // be preventing awaitDone from calling close. + rs.closemuRUnlockIfHeldByScan() + + if rs.contextDone.Load() != nil { + return false + } + var doClose, ok bool withLock(rs.closemu.RLocker(), func() { doClose, ok = rs.nextLocked() @@ -3008,6 +3032,11 @@ func (rs *Rows) nextLocked() (doClose, ok bool) { // scanning. If there are further result sets they may not have rows in the result // set. func (rs *Rows) NextResultSet() bool { + // If the user's calling NextResultSet, they're done with their previous + // row's Scan results (any RawBytes memory), so we can release the read lock + // that would be preventing awaitDone from calling close. + rs.closemuRUnlockIfHeldByScan() + var doClose bool defer func() { if doClose { @@ -3044,6 +3073,10 @@ func (rs *Rows) NextResultSet() bool { // Err returns the error, if any, that was encountered during iteration. // Err may be called after an explicit or implicit Close. func (rs *Rows) Err() error { + if errp := rs.contextDone.Load(); errp != nil { + return *errp + } + rs.closemu.RLock() defer rs.closemu.RUnlock() return rs.lasterrOrErrLocked(nil) @@ -3237,6 +3270,11 @@ func rowsColumnInfoSetupConnLocked(rowsi driver.Rows) []*ColumnType { // If any of the first arguments implementing Scanner returns an error, // that error will be wrapped in the returned error. func (rs *Rows) Scan(dest ...any) error { + if rs.closemuScanHold { + // This should only be possible if the user calls Scan twice in a row + // without calling Next. + return fmt.Errorf("sql: Scan called without calling Next (closemuScanHold)") + } rs.closemu.RLock() if rs.lasterr != nil && rs.lasterr != io.EOF { @@ -3248,23 +3286,50 @@ func (rs *Rows) Scan(dest ...any) error { rs.closemu.RUnlock() return err } - rs.closemu.RUnlock() + + if scanArgsContainRawBytes(dest) { + rs.closemuScanHold = true + } else { + rs.closemu.RUnlock() + } if rs.lastcols == nil { + rs.closemuRUnlockIfHeldByScan() return errors.New("sql: Scan called without calling Next") } if len(dest) != len(rs.lastcols) { + rs.closemuRUnlockIfHeldByScan() return fmt.Errorf("sql: expected %d destination arguments in Scan, not %d", len(rs.lastcols), len(dest)) } + for i, sv := range rs.lastcols { err := convertAssignRows(dest[i], sv, rs) if err != nil { + rs.closemuRUnlockIfHeldByScan() return fmt.Errorf(`sql: Scan error on column index %d, name %q: %w`, i, rs.rowsi.Columns()[i], err) } } return nil } +// closemuRUnlockIfHeldByScan releases any closemu.RLock held open by a previous +// call to Scan with *RawBytes. +func (rs *Rows) closemuRUnlockIfHeldByScan() { + if rs.closemuScanHold { + rs.closemuScanHold = false + rs.closemu.RUnlock() + } +} + +func scanArgsContainRawBytes(args []any) bool { + for _, a := range args { + if _, ok := a.(*RawBytes); ok { + return true + } + } + return false +} + // rowsCloseHook returns a function so tests may install the // hook through a test only mutex. var rowsCloseHook = func() func(*Rows, *error) { return nil } @@ -3274,6 +3339,11 @@ var rowsCloseHook = func() func(*Rows, *error) { return nil } // the Rows are closed automatically and it will suffice to check the // result of Err. Close is idempotent and does not affect the result of Err. func (rs *Rows) Close() error { + // If the user's calling Close, they're done with their previous row's Scan + // results (any RawBytes memory), so we can release the read lock that would + // be preventing awaitDone from calling the unexported close before we do so. + rs.closemuRUnlockIfHeldByScan() + return rs.close(nil) } |
