From cdfc8c771301c8c1f32e50e773d620a6b8767078 Mon Sep 17 00:00:00 2001 From: Damien Neil Date: Sat, 14 Mar 2026 19:54:47 -0700 Subject: database/sql: avoid deadlock from reentrant RLock RWMutex.RLock blocks until any pending Lock operations are satisfied. This prohibits recursive read-locking. Replace various RWMutexes used to synchronize between reads and closes with a variant where the reader side takes priority. Reads can starve out Close, but will not deadlock. Fixes #78304 Change-Id: Id36529bf86bed5dbf22f2af94283aeac6a6a6964 Reviewed-on: https://go-review.googlesource.com/c/go/+/758064 Auto-Submit: Damien Neil Reviewed-by: Neal Patel LUCI-TryBot-Result: Go LUCI --- src/database/sql/closemu.go | 111 ++++++++++++++++++++++++++++++++++ src/database/sql/closemu_test.go | 126 +++++++++++++++++++++++++++++++++++++++ src/database/sql/sql.go | 14 +++-- src/database/sql/sql_test.go | 2 + 4 files changed, 247 insertions(+), 6 deletions(-) create mode 100644 src/database/sql/closemu.go create mode 100644 src/database/sql/closemu_test.go (limited to 'src/database/sql') diff --git a/src/database/sql/closemu.go b/src/database/sql/closemu.go new file mode 100644 index 0000000000..ee58a17edb --- /dev/null +++ b/src/database/sql/closemu.go @@ -0,0 +1,111 @@ +// Copyright 2026 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sql + +import ( + "sync" + "sync/atomic" +) + +// A closingMutex is an RWMutex for synchronizing close. +// Unlike a sync.RWMutex, RLock takes priority over Lock. +// Reads can starve out close, but reads are safely reentrant. +type closingMutex struct { + // state is 2*readers+writerWaiting. + // 0 is unlocked + // 1 is unlocked and a writer needs to wake + // >0 is read-locked + // <0 is write-locked + state atomic.Int64 + mu sync.Mutex + read *sync.Cond + write *sync.Cond +} + +func (m *closingMutex) RLock() { + if m.TryRLock() { + return + } + + // Wait for writer. + m.mu.Lock() + defer m.mu.Unlock() + for { + if m.TryRLock() { + return + } + m.init() + m.read.Wait() + } +} + +func (m *closingMutex) RUnlock() { + for { + x := m.state.Load() + if x < 2 { + panic("runlock of un-rlocked mutex") + } + if m.state.CompareAndSwap(x, x-2) { + if x-2 == 1 { + // We were the last reader, and a writer is waiting. + // The lock makes sure the writer sees the broadcast. + m.mu.Lock() + defer m.mu.Unlock() + m.write.Broadcast() + } + return + } + } +} + +func (m *closingMutex) Lock() { + m.mu.Lock() + defer m.mu.Unlock() + for { + x := m.state.Load() + if (x == 0 || x == 1) && m.state.CompareAndSwap(x, -1) { + return + } + // Set writer waiting bit and sleep. + if x&1 == 0 && !m.state.CompareAndSwap(x, x|1) { + continue + } + m.init() + m.write.Wait() + } +} + +func (m *closingMutex) Unlock() { + m.mu.Lock() + defer m.mu.Unlock() + if !m.state.CompareAndSwap(-1, 0) { + panic("unlock of unlocked mutex") + } + if m.read != nil { + m.read.Broadcast() + m.write.Broadcast() + } +} + +func (m *closingMutex) TryRLock() bool { + for { + x := m.state.Load() + if x < 0 { + return false + } + if m.state.CompareAndSwap(x, x+2) { + return true + } + } +} + +func (m *closingMutex) init() { + // Lazily create the read/write Conds. + // In the common, uncontended case, we'll never need them. + if m.read == nil { + m.read = sync.NewCond(&m.mu) + m.write = sync.NewCond(&m.mu) + } +} diff --git a/src/database/sql/closemu_test.go b/src/database/sql/closemu_test.go new file mode 100644 index 0000000000..9a1d9da584 --- /dev/null +++ b/src/database/sql/closemu_test.go @@ -0,0 +1,126 @@ +// Copyright 2026 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sql + +import ( + "testing" + "testing/synctest" +) + +func TestClosingMutex(t *testing.T) { + start := func(t *testing.T, f func()) func() bool { + done := false + go func() { + f() + done = true + }() + return func() bool { + synctest.Wait() + return done + } + } + + synctest.Test(t, func(t *testing.T) { + var m closingMutex + + // RLock does not block RLock. + m.RLock() + m.RLock() + m.RUnlock() + m.RUnlock() + + // RLock blocks Lock. + m.RLock() + lock1Done := start(t, m.Lock) + if lock1Done() { + t.Fatalf("m.Lock(): succeeded on RLocked mutex") + } + m.RLock() + m.RUnlock() + if lock1Done() { + t.Fatalf("m.Lock(): succeeded after one RUnlock, one RLock remains") + } + m.RUnlock() + if !lock1Done() { + t.Fatalf("m.Lock(): still blocking after all RUnlocks") + } + m.Unlock() + + // Lock blocks RLock. + m.Lock() + rlock1Done := start(t, m.RLock) + rlock2Done := start(t, m.RLock) + if rlock1Done() || rlock2Done() { + t.Fatalf("m.RLock(): succeeded on Locked mutex") + } + m.Unlock() + if !rlock1Done() || !rlock2Done() { + t.Fatalf("m.RLock(): succeeded on Locked mutex") + } + m.RUnlock() + m.RUnlock() + + // Lock blocks Lock. + m.Lock() + lock2Done := start(t, m.Lock) + if lock2Done() { + t.Fatalf("m.Lock(): succeeded on Locked mutex") + } + m.Unlock() + if !lock2Done() { + t.Fatalf("m.Lock(): still blocking after Unlock") + } + m.Unlock() + + // Lock on RLocked mutex does not block RLock. + m.RLock() + lock3Done := start(t, m.Lock) + if lock3Done() { + t.Fatalf("m.Lock(): succeeded on RLocked mutex") + } + m.RLock() + m.RUnlock() + m.RUnlock() + if !lock3Done() { + t.Fatalf("m.Lock(): still blocking after RUnlock") + } + m.Unlock() + + }) +} + +func TestClosingMutexPanics(t *testing.T) { + for _, test := range []struct { + name string + f func() + }{{ + name: "double RUnlock", + f: func() { + var m closingMutex + m.RLock() + m.RUnlock() + m.RUnlock() + }, + }, { + name: "double Unlock", + f: func() { + var m closingMutex + m.Lock() + m.Unlock() + m.Unlock() + }, + }} { + var got any + func() { + defer func() { + got = recover() + }() + test.f() + }() + if got == nil { + t.Errorf("no panic, want one") + } + } +} diff --git a/src/database/sql/sql.go b/src/database/sql/sql.go index c8ec91c1ec..bb771ccd46 100644 --- a/src/database/sql/sql.go +++ b/src/database/sql/sql.go @@ -1986,7 +1986,7 @@ type Conn struct { // closemu prevents the connection from closing while there // is an active query. It is held for read during queries // and exclusively during close. - closemu sync.RWMutex + closemu closingMutex // dc is owned until close, at which point // it's returned to the connection pool. @@ -2176,7 +2176,7 @@ type Tx struct { // closemu prevents the transaction from closing while there // is an active query. It is held for read during queries // and exclusively during close. - closemu sync.RWMutex + closemu closingMutex // dc is owned exclusively until Commit or Rollback, at which point // it's returned with putConn. @@ -2613,7 +2613,7 @@ type Stmt struct { query string // that created the Stmt stickyErr error // if non-nil, this error is returned for all operations - closemu sync.RWMutex // held exclusively during close, for read otherwise. + closemu closingMutex // held exclusively during close, for read otherwise. // If Stmt is prepared on a Tx or Conn then cg is present and will // only ever grab a connection from cg. @@ -2947,7 +2947,7 @@ type Rows struct { // and exclusively during close. // // closemu guards lasterr and closed. - closemu sync.RWMutex + closemu closingMutex lasterr error // non-nil only if closed is true closed bool @@ -3044,9 +3044,11 @@ func (rs *Rows) Next() bool { } var doClose, ok bool - withLock(rs.closemu.RLocker(), func() { + func() { + rs.closemu.RLock() + defer rs.closemu.RUnlock() doClose, ok = rs.nextLocked() - }) + }() if doClose { rs.Close() } diff --git a/src/database/sql/sql_test.go b/src/database/sql/sql_test.go index 5f093a2d6d..40da17472d 100644 --- a/src/database/sql/sql_test.go +++ b/src/database/sql/sql_test.go @@ -4483,6 +4483,8 @@ func testContextCancelDuringRawBytesScan(t *testing.T, mode string) { for _, b := range s { // some operation reading from the raw memory sink += b } + // r.Columns must not deadlock acquiring closemu.RLock. + _, _ = r.Columns() } if r.closemuScanHold { t.Errorf("closemu held; should not be") -- cgit v1.3