aboutsummaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2016-12-13 16:45:55 +0100
committerRuss Cox <rsc@golang.org>2017-02-17 17:24:59 +0000
commit0556e26273f704db73df9e7c4c3d2e8434dec7be (patch)
tree4a674c2e48825e9a6e0ec437d16468a13d3f002e /src/sync
parent79f6a5c7bd684f2e6007ee505b522440beb86bf0 (diff)
downloadgo-0556e26273f704db73df9e7c4c3d2e8434dec7be.tar.xz
sync: make Mutex more fair
Add new starvation mode for Mutex. In starvation mode ownership is directly handed off from unlocking goroutine to the next waiter. New arriving goroutines don't compete for ownership. Unfair wait time is now limited to 1ms. Also fix a long standing bug that goroutines were requeued at the tail of the wait queue. That lead to even more unfair acquisition times with multiple waiters. Performance of normal mode is not considerably affected. Fixes #13086 On the provided in the issue lockskew program: done in 1.207853ms done in 1.177451ms done in 1.184168ms done in 1.198633ms done in 1.185797ms done in 1.182502ms done in 1.316485ms done in 1.211611ms done in 1.182418ms name old time/op new time/op delta MutexUncontended-48 0.65ns ± 0% 0.65ns ± 1% ~ (p=0.087 n=10+10) Mutex-48 112ns ± 1% 114ns ± 1% +1.69% (p=0.000 n=10+10) MutexSlack-48 113ns ± 0% 87ns ± 1% -22.65% (p=0.000 n=8+10) MutexWork-48 149ns ± 0% 145ns ± 0% -2.48% (p=0.000 n=9+10) MutexWorkSlack-48 149ns ± 0% 122ns ± 3% -18.26% (p=0.000 n=6+10) MutexNoSpin-48 103ns ± 4% 105ns ± 3% ~ (p=0.089 n=10+10) MutexSpin-48 490ns ± 4% 515ns ± 6% +5.08% (p=0.006 n=10+10) Cond32-48 13.4µs ± 6% 13.1µs ± 5% -2.75% (p=0.023 n=10+10) RWMutexWrite100-48 53.2ns ± 3% 41.2ns ± 3% -22.57% (p=0.000 n=10+10) RWMutexWrite10-48 45.9ns ± 2% 43.9ns ± 2% -4.38% (p=0.000 n=10+10) RWMutexWorkWrite100-48 122ns ± 2% 134ns ± 1% +9.92% (p=0.000 n=10+10) RWMutexWorkWrite10-48 206ns ± 1% 188ns ± 1% -8.52% (p=0.000 n=8+10) Cond32-24 12.1µs ± 3% 12.4µs ± 3% +1.98% (p=0.043 n=10+9) MutexUncontended-24 0.74ns ± 1% 0.75ns ± 1% ~ (p=0.650 n=10+10) Mutex-24 122ns ± 2% 124ns ± 1% +1.31% (p=0.007 n=10+10) MutexSlack-24 96.9ns ± 2% 102.8ns ± 2% +6.11% (p=0.000 n=10+10) MutexWork-24 146ns ± 1% 135ns ± 2% -7.70% (p=0.000 n=10+9) MutexWorkSlack-24 135ns ± 1% 128ns ± 2% -5.01% (p=0.000 n=10+9) MutexNoSpin-24 114ns ± 3% 110ns ± 4% -3.84% (p=0.000 n=10+10) MutexSpin-24 482ns ± 4% 475ns ± 8% ~ (p=0.286 n=10+10) RWMutexWrite100-24 43.0ns ± 3% 43.1ns ± 2% ~ (p=0.956 n=10+10) RWMutexWrite10-24 43.4ns ± 1% 43.2ns ± 1% ~ (p=0.085 n=10+9) RWMutexWorkWrite100-24 130ns ± 3% 131ns ± 3% ~ (p=0.747 n=10+10) RWMutexWorkWrite10-24 191ns ± 1% 192ns ± 1% ~ (p=0.210 n=10+10) Cond32-12 11.5µs ± 2% 11.7µs ± 2% +1.98% (p=0.002 n=10+10) MutexUncontended-12 1.48ns ± 0% 1.50ns ± 1% +1.08% (p=0.004 n=10+10) Mutex-12 141ns ± 1% 143ns ± 1% +1.63% (p=0.000 n=10+10) MutexSlack-12 121ns ± 0% 119ns ± 0% -1.65% (p=0.001 n=8+9) MutexWork-12 141ns ± 2% 150ns ± 3% +6.36% (p=0.000 n=9+10) MutexWorkSlack-12 131ns ± 0% 138ns ± 0% +5.73% (p=0.000 n=9+10) MutexNoSpin-12 87.0ns ± 1% 83.7ns ± 1% -3.80% (p=0.000 n=10+10) MutexSpin-12 364ns ± 1% 377ns ± 1% +3.77% (p=0.000 n=10+10) RWMutexWrite100-12 42.8ns ± 1% 43.9ns ± 1% +2.41% (p=0.000 n=8+10) RWMutexWrite10-12 39.8ns ± 4% 39.3ns ± 1% ~ (p=0.433 n=10+9) RWMutexWorkWrite100-12 131ns ± 1% 131ns ± 0% ~ (p=0.591 n=10+9) RWMutexWorkWrite10-12 173ns ± 1% 174ns ± 0% ~ (p=0.059 n=10+8) Cond32-6 10.9µs ± 2% 10.9µs ± 2% ~ (p=0.739 n=10+10) MutexUncontended-6 2.97ns ± 0% 2.97ns ± 0% ~ (all samples are equal) Mutex-6 122ns ± 6% 122ns ± 2% ~ (p=0.668 n=10+10) MutexSlack-6 149ns ± 3% 142ns ± 3% -4.63% (p=0.000 n=10+10) MutexWork-6 136ns ± 3% 140ns ± 5% ~ (p=0.077 n=10+10) MutexWorkSlack-6 152ns ± 0% 138ns ± 2% -9.21% (p=0.000 n=6+10) MutexNoSpin-6 150ns ± 1% 152ns ± 0% +1.50% (p=0.000 n=8+10) MutexSpin-6 726ns ± 0% 730ns ± 1% ~ (p=0.069 n=10+10) RWMutexWrite100-6 40.6ns ± 1% 40.9ns ± 1% +0.91% (p=0.001 n=8+10) RWMutexWrite10-6 37.1ns ± 0% 37.0ns ± 1% ~ (p=0.386 n=9+10) RWMutexWorkWrite100-6 133ns ± 1% 134ns ± 1% +1.01% (p=0.005 n=9+10) RWMutexWorkWrite10-6 152ns ± 0% 152ns ± 0% ~ (all samples are equal) Cond32-2 7.86µs ± 2% 7.95µs ± 2% +1.10% (p=0.023 n=10+10) MutexUncontended-2 8.10ns ± 0% 9.11ns ± 4% +12.44% (p=0.000 n=9+10) Mutex-2 32.9ns ± 9% 38.4ns ± 6% +16.58% (p=0.000 n=10+10) MutexSlack-2 93.4ns ± 1% 98.5ns ± 2% +5.39% (p=0.000 n=10+9) MutexWork-2 40.8ns ± 3% 43.8ns ± 7% +7.38% (p=0.000 n=10+9) MutexWorkSlack-2 98.6ns ± 5% 108.2ns ± 2% +9.80% (p=0.000 n=10+8) MutexNoSpin-2 399ns ± 1% 398ns ± 2% ~ (p=0.463 n=8+9) MutexSpin-2 1.99µs ± 3% 1.97µs ± 1% -0.81% (p=0.003 n=9+8) RWMutexWrite100-2 37.6ns ± 5% 46.0ns ± 4% +22.17% (p=0.000 n=10+8) RWMutexWrite10-2 50.1ns ± 6% 36.8ns ±12% -26.46% (p=0.000 n=9+10) RWMutexWorkWrite100-2 136ns ± 0% 134ns ± 2% -1.80% (p=0.001 n=7+9) RWMutexWorkWrite10-2 140ns ± 1% 138ns ± 1% -1.50% (p=0.000 n=10+10) Cond32 5.93µs ± 1% 5.91µs ± 0% ~ (p=0.411 n=9+10) MutexUncontended 15.9ns ± 0% 15.8ns ± 0% -0.63% (p=0.000 n=8+8) Mutex 15.9ns ± 0% 15.8ns ± 0% -0.44% (p=0.003 n=10+10) MutexSlack 26.9ns ± 3% 26.7ns ± 2% ~ (p=0.084 n=10+10) MutexWork 47.8ns ± 0% 47.9ns ± 0% +0.21% (p=0.014 n=9+8) MutexWorkSlack 54.9ns ± 3% 54.5ns ± 3% ~ (p=0.254 n=10+10) MutexNoSpin 786ns ± 2% 765ns ± 1% -2.66% (p=0.000 n=10+10) MutexSpin 3.87µs ± 1% 3.83µs ± 0% -0.85% (p=0.005 n=9+8) RWMutexWrite100 21.2ns ± 2% 21.0ns ± 1% -0.88% (p=0.018 n=10+9) RWMutexWrite10 22.6ns ± 1% 22.6ns ± 0% ~ (p=0.471 n=9+9) RWMutexWorkWrite100 132ns ± 0% 132ns ± 0% ~ (all samples are equal) RWMutexWorkWrite10 124ns ± 0% 123ns ± 0% ~ (p=0.656 n=10+10) Change-Id: I66412a3a0980df1233ad7a5a0cd9723b4274528b Reviewed-on: https://go-review.googlesource.com/34310 Run-TryBot: Russ Cox <rsc@golang.org> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Russ Cox <rsc@golang.org>
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mutex.go149
-rw-r--r--src/sync/mutex_test.go35
-rw-r--r--src/sync/runtime.go8
-rw-r--r--src/sync/runtime_sema_test.go6
-rw-r--r--src/sync/rwmutex.go4
-rw-r--r--src/sync/waitgroup.go2
6 files changed, 162 insertions, 42 deletions
diff --git a/src/sync/mutex.go b/src/sync/mutex.go
index 8c9366f4fe..506b23f6ff 100644
--- a/src/sync/mutex.go
+++ b/src/sync/mutex.go
@@ -37,7 +37,34 @@ type Locker interface {
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
+ mutexStarving
mutexWaiterShift = iota
+
+ // Mutex fairness.
+ //
+ // Mutex can be in 2 modes of operations: normal and starvation.
+ // In normal mode waiters are queued in FIFO order, but a woken up waiter
+ // does not own the mutex and competes with new arriving goroutines over
+ // the ownership. New arriving goroutines have an advantage -- they are
+ // already running on CPU and there can be lots of them, so a woken up
+ // waiter has good chances of losing. In such case it is queued at front
+ // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
+ // it switches mutex to the starvation mode.
+ //
+ // In starvation mode ownership of the mutex is directly handed off from
+ // the unlocking goroutine to the waiter at the front of the queue.
+ // New arriving goroutines don't try to acquire the mutex even if it appears
+ // to be unlocked, and don't try to spin. Instead they queue themselves at
+ // the tail of the wait queue.
+ //
+ // If a waiter receives ownership of the mutex and sees that either
+ // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
+ // it switches mutex back to normal operation mode.
+ //
+ // Normal mode has considerably better performance as a goroutine can acquire
+ // a mutex several times in a row even if there are blocked waiters.
+ // Starvation mode is important to prevent pathological cases of tail latency.
+ starvationThresholdNs = 1e6
)
// Lock locks m.
@@ -52,41 +79,86 @@ func (m *Mutex) Lock() {
return
}
+ var waitStartTime int64
+ starving := false
awoke := false
iter := 0
+ old := m.state
for {
- old := m.state
- new := old | mutexLocked
- if old&mutexLocked != 0 {
- if runtime_canSpin(iter) {
- // Active spinning makes sense.
- // Try to set mutexWoken flag to inform Unlock
- // to not wake other blocked goroutines.
- if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
- atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
- awoke = true
- }
- runtime_doSpin()
- iter++
- continue
+ // Don't spin in starvation mode, ownership is handed off to waiters
+ // so we won't be able to acquire the mutex anyway.
+ if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
+ // Active spinning makes sense.
+ // Try to set mutexWoken flag to inform Unlock
+ // to not wake other blocked goroutines.
+ if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
+ atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
+ awoke = true
}
- new = old + 1<<mutexWaiterShift
+ runtime_doSpin()
+ iter++
+ old = m.state
+ continue
+ }
+ new := old
+ // Don't try to acquire starving mutex, new arriving goroutines must queue.
+ if old&mutexStarving == 0 {
+ new |= mutexLocked
+ }
+ if old&(mutexLocked|mutexStarving) != 0 {
+ new += 1 << mutexWaiterShift
+ }
+ // The current goroutine switches mutex to starvation mode.
+ // But if the mutex is currently unlocked, don't do the switch.
+ // Unlock expects that starving mutex has waiters, which will not
+ // be true in this case.
+ if starving && old&mutexLocked != 0 {
+ new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
- throw("sync: inconsistent mutex state")
+ panic("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
- if old&mutexLocked == 0 {
+ if old&(mutexLocked|mutexStarving) == 0 {
+ break // locked the mutex with CAS
+ }
+ // If we were already waiting before, queue at the front of the queue.
+ queueLifo := waitStartTime != 0
+ if waitStartTime == 0 {
+ waitStartTime = runtime_nanotime()
+ }
+ runtime_SemacquireMutex(&m.sema, queueLifo)
+ starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
+ old = m.state
+ if old&mutexStarving != 0 {
+ // If this goroutine was woken and mutex is in starvation mode,
+ // ownership was handed off to us but mutex is in somewhat
+ // inconsistent state: mutexLocked is not set and we are still
+ // accounted as waiter. Fix that.
+ if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
+ panic("sync: inconsistent mutex state")
+ }
+ delta := int32(mutexLocked - 1<<mutexWaiterShift)
+ if !starving || old>>mutexWaiterShift == 1 {
+ // Exit starvation mode.
+ // Critical to do it here and consider wait time.
+ // Starvation mode is so inefficient, that two goroutines
+ // can go lock-step infinitely once they switch mutex
+ // to starvation mode.
+ delta -= mutexStarving
+ }
+ atomic.AddInt32(&m.state, delta)
break
}
- runtime_SemacquireMutex(&m.sema)
awoke = true
iter = 0
+ } else {
+ old = m.state
}
}
@@ -110,22 +182,33 @@ func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
- throw("sync: unlock of unlocked mutex")
+ panic("sync: unlock of unlocked mutex")
}
-
- old := new
- for {
- // If there are no waiters or a goroutine has already
- // been woken or grabbed the lock, no need to wake anyone.
- if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
- return
- }
- // Grab the right to wake someone.
- new = (old - 1<<mutexWaiterShift) | mutexWoken
- if atomic.CompareAndSwapInt32(&m.state, old, new) {
- runtime_Semrelease(&m.sema)
- return
+ if new&mutexStarving == 0 {
+ old := new
+ for {
+ // If there are no waiters or a goroutine has already
+ // been woken or grabbed the lock, no need to wake anyone.
+ // In starvation mode ownership is directly handed off from unlocking
+ // goroutine to the next waiter. We are not part of this chain,
+ // since we did not observe mutexStarving when we unlocked the mutex above.
+ // So get off the way.
+ if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
+ return
+ }
+ // Grab the right to wake someone.
+ new = (old - 1<<mutexWaiterShift) | mutexWoken
+ if atomic.CompareAndSwapInt32(&m.state, old, new) {
+ runtime_Semrelease(&m.sema, false)
+ return
+ }
+ old = m.state
}
- old = m.state
+ } else {
+ // Starving mode: handoff mutex ownership to the next waiter.
+ // Note: mutexLocked is not set, the waiter will set it after wakeup.
+ // But mutex is still considered locked if mutexStarving is set,
+ // so new coming goroutines won't acquire it.
+ runtime_Semrelease(&m.sema, true)
}
}
diff --git a/src/sync/mutex_test.go b/src/sync/mutex_test.go
index 88dbccf3ad..784471df12 100644
--- a/src/sync/mutex_test.go
+++ b/src/sync/mutex_test.go
@@ -15,12 +15,13 @@ import (
"strings"
. "sync"
"testing"
+ "time"
)
func HammerSemaphore(s *uint32, loops int, cdone chan bool) {
for i := 0; i < loops; i++ {
Runtime_Semacquire(s)
- Runtime_Semrelease(s)
+ Runtime_Semrelease(s, false)
}
cdone <- true
}
@@ -174,6 +175,38 @@ func TestMutexMisuse(t *testing.T) {
}
}
+func TestMutexFairness(t *testing.T) {
+ var mu Mutex
+ stop := make(chan bool)
+ defer close(stop)
+ go func() {
+ for {
+ mu.Lock()
+ time.Sleep(100 * time.Microsecond)
+ mu.Unlock()
+ select {
+ case <-stop:
+ return
+ default:
+ }
+ }
+ }()
+ done := make(chan bool)
+ go func() {
+ for i := 0; i < 10; i++ {
+ time.Sleep(100 * time.Microsecond)
+ mu.Lock()
+ mu.Unlock()
+ }
+ done <- true
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ t.Fatalf("can't acquire Mutex in 10 seconds")
+ }
+}
+
func BenchmarkMutexUncontended(b *testing.B) {
type PaddedMutex struct {
Mutex
diff --git a/src/sync/runtime.go b/src/sync/runtime.go
index 4d22ce6b0d..be16bcc8f7 100644
--- a/src/sync/runtime.go
+++ b/src/sync/runtime.go
@@ -14,13 +14,15 @@ import "unsafe"
func runtime_Semacquire(s *uint32)
// SemacquireMutex is like Semacquire, but for profiling contended Mutexes.
-func runtime_SemacquireMutex(*uint32)
+// If lifo is true, queue waiter at the head of wait queue.
+func runtime_SemacquireMutex(s *uint32, lifo bool)
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
-func runtime_Semrelease(s *uint32)
+// If handoff is true, pass count directly to the first waiter.
+func runtime_Semrelease(s *uint32, handoff bool)
// Approximation of notifyList in runtime/sema.go. Size and alignment must
// agree.
@@ -57,3 +59,5 @@ func runtime_canSpin(i int) bool
// runtime_doSpin does active spinning.
func runtime_doSpin()
+
+func runtime_nanotime() int64
diff --git a/src/sync/runtime_sema_test.go b/src/sync/runtime_sema_test.go
index a2382f4655..a680847edf 100644
--- a/src/sync/runtime_sema_test.go
+++ b/src/sync/runtime_sema_test.go
@@ -18,7 +18,7 @@ func BenchmarkSemaUncontended(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
sem := new(PaddedSem)
for pb.Next() {
- Runtime_Semrelease(&sem.sem)
+ Runtime_Semrelease(&sem.sem, false)
Runtime_Semacquire(&sem.sem)
}
})
@@ -44,7 +44,7 @@ func benchmarkSema(b *testing.B, block, work bool) {
b.RunParallel(func(pb *testing.PB) {
foo := 0
for pb.Next() {
- Runtime_Semrelease(&sem)
+ Runtime_Semrelease(&sem, false)
if work {
for i := 0; i < 100; i++ {
foo *= 2
@@ -54,7 +54,7 @@ func benchmarkSema(b *testing.B, block, work bool) {
Runtime_Semacquire(&sem)
}
_ = foo
- Runtime_Semrelease(&sem)
+ Runtime_Semrelease(&sem, false)
})
}
diff --git a/src/sync/rwmutex.go b/src/sync/rwmutex.go
index 71064eeeba..55b69f2bb8 100644
--- a/src/sync/rwmutex.go
+++ b/src/sync/rwmutex.go
@@ -66,7 +66,7 @@ func (rw *RWMutex) RUnlock() {
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
- runtime_Semrelease(&rw.writerSem)
+ runtime_Semrelease(&rw.writerSem, false)
}
}
if race.Enabled {
@@ -119,7 +119,7 @@ func (rw *RWMutex) Unlock() {
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
- runtime_Semrelease(&rw.readerSem)
+ runtime_Semrelease(&rw.readerSem, false)
}
// Allow other writers to proceed.
rw.w.Unlock()
diff --git a/src/sync/waitgroup.go b/src/sync/waitgroup.go
index b386e1fec2..4b23540ae7 100644
--- a/src/sync/waitgroup.go
+++ b/src/sync/waitgroup.go
@@ -91,7 +91,7 @@ func (wg *WaitGroup) Add(delta int) {
// Reset waiters count to 0.
*statep = 0
for ; w != 0; w-- {
- runtime_Semrelease(&wg.sema)
+ runtime_Semrelease(&wg.sema, false)
}
}