aboutsummaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mutex.go149
-rw-r--r--src/sync/mutex_test.go35
-rw-r--r--src/sync/runtime.go8
-rw-r--r--src/sync/runtime_sema_test.go6
-rw-r--r--src/sync/rwmutex.go4
-rw-r--r--src/sync/waitgroup.go2
6 files changed, 162 insertions, 42 deletions
diff --git a/src/sync/mutex.go b/src/sync/mutex.go
index 8c9366f4fe..506b23f6ff 100644
--- a/src/sync/mutex.go
+++ b/src/sync/mutex.go
@@ -37,7 +37,34 @@ type Locker interface {
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
+ mutexStarving
mutexWaiterShift = iota
+
+ // Mutex fairness.
+ //
+ // Mutex can be in 2 modes of operations: normal and starvation.
+ // In normal mode waiters are queued in FIFO order, but a woken up waiter
+ // does not own the mutex and competes with new arriving goroutines over
+ // the ownership. New arriving goroutines have an advantage -- they are
+ // already running on CPU and there can be lots of them, so a woken up
+ // waiter has good chances of losing. In such case it is queued at front
+ // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
+ // it switches mutex to the starvation mode.
+ //
+ // In starvation mode ownership of the mutex is directly handed off from
+ // the unlocking goroutine to the waiter at the front of the queue.
+ // New arriving goroutines don't try to acquire the mutex even if it appears
+ // to be unlocked, and don't try to spin. Instead they queue themselves at
+ // the tail of the wait queue.
+ //
+ // If a waiter receives ownership of the mutex and sees that either
+ // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
+ // it switches mutex back to normal operation mode.
+ //
+ // Normal mode has considerably better performance as a goroutine can acquire
+ // a mutex several times in a row even if there are blocked waiters.
+ // Starvation mode is important to prevent pathological cases of tail latency.
+ starvationThresholdNs = 1e6
)
// Lock locks m.
@@ -52,41 +79,86 @@ func (m *Mutex) Lock() {
return
}
+ var waitStartTime int64
+ starving := false
awoke := false
iter := 0
+ old := m.state
for {
- old := m.state
- new := old | mutexLocked
- if old&mutexLocked != 0 {
- if runtime_canSpin(iter) {
- // Active spinning makes sense.
- // Try to set mutexWoken flag to inform Unlock
- // to not wake other blocked goroutines.
- if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
- atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
- awoke = true
- }
- runtime_doSpin()
- iter++
- continue
+ // Don't spin in starvation mode, ownership is handed off to waiters
+ // so we won't be able to acquire the mutex anyway.
+ if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
+ // Active spinning makes sense.
+ // Try to set mutexWoken flag to inform Unlock
+ // to not wake other blocked goroutines.
+ if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
+ atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
+ awoke = true
}
- new = old + 1<<mutexWaiterShift
+ runtime_doSpin()
+ iter++
+ old = m.state
+ continue
+ }
+ new := old
+ // Don't try to acquire starving mutex, new arriving goroutines must queue.
+ if old&mutexStarving == 0 {
+ new |= mutexLocked
+ }
+ if old&(mutexLocked|mutexStarving) != 0 {
+ new += 1 << mutexWaiterShift
+ }
+ // The current goroutine switches mutex to starvation mode.
+ // But if the mutex is currently unlocked, don't do the switch.
+ // Unlock expects that starving mutex has waiters, which will not
+ // be true in this case.
+ if starving && old&mutexLocked != 0 {
+ new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
- throw("sync: inconsistent mutex state")
+ panic("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
- if old&mutexLocked == 0 {
+ if old&(mutexLocked|mutexStarving) == 0 {
+ break // locked the mutex with CAS
+ }
+ // If we were already waiting before, queue at the front of the queue.
+ queueLifo := waitStartTime != 0
+ if waitStartTime == 0 {
+ waitStartTime = runtime_nanotime()
+ }
+ runtime_SemacquireMutex(&m.sema, queueLifo)
+ starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
+ old = m.state
+ if old&mutexStarving != 0 {
+ // If this goroutine was woken and mutex is in starvation mode,
+ // ownership was handed off to us but mutex is in somewhat
+ // inconsistent state: mutexLocked is not set and we are still
+ // accounted as waiter. Fix that.
+ if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
+ panic("sync: inconsistent mutex state")
+ }
+ delta := int32(mutexLocked - 1<<mutexWaiterShift)
+ if !starving || old>>mutexWaiterShift == 1 {
+ // Exit starvation mode.
+ // Critical to do it here and consider wait time.
+ // Starvation mode is so inefficient, that two goroutines
+ // can go lock-step infinitely once they switch mutex
+ // to starvation mode.
+ delta -= mutexStarving
+ }
+ atomic.AddInt32(&m.state, delta)
break
}
- runtime_SemacquireMutex(&m.sema)
awoke = true
iter = 0
+ } else {
+ old = m.state
}
}
@@ -110,22 +182,33 @@ func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
- throw("sync: unlock of unlocked mutex")
+ panic("sync: unlock of unlocked mutex")
}
-
- old := new
- for {
- // If there are no waiters or a goroutine has already
- // been woken or grabbed the lock, no need to wake anyone.
- if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
- return
- }
- // Grab the right to wake someone.
- new = (old - 1<<mutexWaiterShift) | mutexWoken
- if atomic.CompareAndSwapInt32(&m.state, old, new) {
- runtime_Semrelease(&m.sema)
- return
+ if new&mutexStarving == 0 {
+ old := new
+ for {
+ // If there are no waiters or a goroutine has already
+ // been woken or grabbed the lock, no need to wake anyone.
+ // In starvation mode ownership is directly handed off from unlocking
+ // goroutine to the next waiter. We are not part of this chain,
+ // since we did not observe mutexStarving when we unlocked the mutex above.
+ // So get off the way.
+ if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
+ return
+ }
+ // Grab the right to wake someone.
+ new = (old - 1<<mutexWaiterShift) | mutexWoken
+ if atomic.CompareAndSwapInt32(&m.state, old, new) {
+ runtime_Semrelease(&m.sema, false)
+ return
+ }
+ old = m.state
}
- old = m.state
+ } else {
+ // Starving mode: handoff mutex ownership to the next waiter.
+ // Note: mutexLocked is not set, the waiter will set it after wakeup.
+ // But mutex is still considered locked if mutexStarving is set,
+ // so new coming goroutines won't acquire it.
+ runtime_Semrelease(&m.sema, true)
}
}
diff --git a/src/sync/mutex_test.go b/src/sync/mutex_test.go
index 88dbccf3ad..784471df12 100644
--- a/src/sync/mutex_test.go
+++ b/src/sync/mutex_test.go
@@ -15,12 +15,13 @@ import (
"strings"
. "sync"
"testing"
+ "time"
)
func HammerSemaphore(s *uint32, loops int, cdone chan bool) {
for i := 0; i < loops; i++ {
Runtime_Semacquire(s)
- Runtime_Semrelease(s)
+ Runtime_Semrelease(s, false)
}
cdone <- true
}
@@ -174,6 +175,38 @@ func TestMutexMisuse(t *testing.T) {
}
}
+func TestMutexFairness(t *testing.T) {
+ var mu Mutex
+ stop := make(chan bool)
+ defer close(stop)
+ go func() {
+ for {
+ mu.Lock()
+ time.Sleep(100 * time.Microsecond)
+ mu.Unlock()
+ select {
+ case <-stop:
+ return
+ default:
+ }
+ }
+ }()
+ done := make(chan bool)
+ go func() {
+ for i := 0; i < 10; i++ {
+ time.Sleep(100 * time.Microsecond)
+ mu.Lock()
+ mu.Unlock()
+ }
+ done <- true
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ t.Fatalf("can't acquire Mutex in 10 seconds")
+ }
+}
+
func BenchmarkMutexUncontended(b *testing.B) {
type PaddedMutex struct {
Mutex
diff --git a/src/sync/runtime.go b/src/sync/runtime.go
index 4d22ce6b0d..be16bcc8f7 100644
--- a/src/sync/runtime.go
+++ b/src/sync/runtime.go
@@ -14,13 +14,15 @@ import "unsafe"
func runtime_Semacquire(s *uint32)
// SemacquireMutex is like Semacquire, but for profiling contended Mutexes.
-func runtime_SemacquireMutex(*uint32)
+// If lifo is true, queue waiter at the head of wait queue.
+func runtime_SemacquireMutex(s *uint32, lifo bool)
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
-func runtime_Semrelease(s *uint32)
+// If handoff is true, pass count directly to the first waiter.
+func runtime_Semrelease(s *uint32, handoff bool)
// Approximation of notifyList in runtime/sema.go. Size and alignment must
// agree.
@@ -57,3 +59,5 @@ func runtime_canSpin(i int) bool
// runtime_doSpin does active spinning.
func runtime_doSpin()
+
+func runtime_nanotime() int64
diff --git a/src/sync/runtime_sema_test.go b/src/sync/runtime_sema_test.go
index a2382f4655..a680847edf 100644
--- a/src/sync/runtime_sema_test.go
+++ b/src/sync/runtime_sema_test.go
@@ -18,7 +18,7 @@ func BenchmarkSemaUncontended(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
sem := new(PaddedSem)
for pb.Next() {
- Runtime_Semrelease(&sem.sem)
+ Runtime_Semrelease(&sem.sem, false)
Runtime_Semacquire(&sem.sem)
}
})
@@ -44,7 +44,7 @@ func benchmarkSema(b *testing.B, block, work bool) {
b.RunParallel(func(pb *testing.PB) {
foo := 0
for pb.Next() {
- Runtime_Semrelease(&sem)
+ Runtime_Semrelease(&sem, false)
if work {
for i := 0; i < 100; i++ {
foo *= 2
@@ -54,7 +54,7 @@ func benchmarkSema(b *testing.B, block, work bool) {
Runtime_Semacquire(&sem)
}
_ = foo
- Runtime_Semrelease(&sem)
+ Runtime_Semrelease(&sem, false)
})
}
diff --git a/src/sync/rwmutex.go b/src/sync/rwmutex.go
index 71064eeeba..55b69f2bb8 100644
--- a/src/sync/rwmutex.go
+++ b/src/sync/rwmutex.go
@@ -66,7 +66,7 @@ func (rw *RWMutex) RUnlock() {
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
- runtime_Semrelease(&rw.writerSem)
+ runtime_Semrelease(&rw.writerSem, false)
}
}
if race.Enabled {
@@ -119,7 +119,7 @@ func (rw *RWMutex) Unlock() {
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
- runtime_Semrelease(&rw.readerSem)
+ runtime_Semrelease(&rw.readerSem, false)
}
// Allow other writers to proceed.
rw.w.Unlock()
diff --git a/src/sync/waitgroup.go b/src/sync/waitgroup.go
index b386e1fec2..4b23540ae7 100644
--- a/src/sync/waitgroup.go
+++ b/src/sync/waitgroup.go
@@ -91,7 +91,7 @@ func (wg *WaitGroup) Add(delta int) {
// Reset waiters count to 0.
*statep = 0
for ; w != 0; w-- {
- runtime_Semrelease(&wg.sema)
+ runtime_Semrelease(&wg.sema, false)
}
}