diff options
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/mutex.go | 149 | ||||
| -rw-r--r-- | src/sync/mutex_test.go | 35 | ||||
| -rw-r--r-- | src/sync/runtime.go | 8 | ||||
| -rw-r--r-- | src/sync/runtime_sema_test.go | 6 | ||||
| -rw-r--r-- | src/sync/rwmutex.go | 4 | ||||
| -rw-r--r-- | src/sync/waitgroup.go | 2 |
6 files changed, 162 insertions, 42 deletions
diff --git a/src/sync/mutex.go b/src/sync/mutex.go index 8c9366f4fe..506b23f6ff 100644 --- a/src/sync/mutex.go +++ b/src/sync/mutex.go @@ -37,7 +37,34 @@ type Locker interface { const ( mutexLocked = 1 << iota // mutex is locked mutexWoken + mutexStarving mutexWaiterShift = iota + + // Mutex fairness. + // + // Mutex can be in 2 modes of operations: normal and starvation. + // In normal mode waiters are queued in FIFO order, but a woken up waiter + // does not own the mutex and competes with new arriving goroutines over + // the ownership. New arriving goroutines have an advantage -- they are + // already running on CPU and there can be lots of them, so a woken up + // waiter has good chances of losing. In such case it is queued at front + // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, + // it switches mutex to the starvation mode. + // + // In starvation mode ownership of the mutex is directly handed off from + // the unlocking goroutine to the waiter at the front of the queue. + // New arriving goroutines don't try to acquire the mutex even if it appears + // to be unlocked, and don't try to spin. Instead they queue themselves at + // the tail of the wait queue. + // + // If a waiter receives ownership of the mutex and sees that either + // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, + // it switches mutex back to normal operation mode. + // + // Normal mode has considerably better performance as a goroutine can acquire + // a mutex several times in a row even if there are blocked waiters. + // Starvation mode is important to prevent pathological cases of tail latency. + starvationThresholdNs = 1e6 ) // Lock locks m. @@ -52,41 +79,86 @@ func (m *Mutex) Lock() { return } + var waitStartTime int64 + starving := false awoke := false iter := 0 + old := m.state for { - old := m.state - new := old | mutexLocked - if old&mutexLocked != 0 { - if runtime_canSpin(iter) { - // Active spinning makes sense. - // Try to set mutexWoken flag to inform Unlock - // to not wake other blocked goroutines. - if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && - atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { - awoke = true - } - runtime_doSpin() - iter++ - continue + // Don't spin in starvation mode, ownership is handed off to waiters + // so we won't be able to acquire the mutex anyway. + if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { + // Active spinning makes sense. + // Try to set mutexWoken flag to inform Unlock + // to not wake other blocked goroutines. + if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && + atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { + awoke = true } - new = old + 1<<mutexWaiterShift + runtime_doSpin() + iter++ + old = m.state + continue + } + new := old + // Don't try to acquire starving mutex, new arriving goroutines must queue. + if old&mutexStarving == 0 { + new |= mutexLocked + } + if old&(mutexLocked|mutexStarving) != 0 { + new += 1 << mutexWaiterShift + } + // The current goroutine switches mutex to starvation mode. + // But if the mutex is currently unlocked, don't do the switch. + // Unlock expects that starving mutex has waiters, which will not + // be true in this case. + if starving && old&mutexLocked != 0 { + new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { - throw("sync: inconsistent mutex state") + panic("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { - if old&mutexLocked == 0 { + if old&(mutexLocked|mutexStarving) == 0 { + break // locked the mutex with CAS + } + // If we were already waiting before, queue at the front of the queue. + queueLifo := waitStartTime != 0 + if waitStartTime == 0 { + waitStartTime = runtime_nanotime() + } + runtime_SemacquireMutex(&m.sema, queueLifo) + starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs + old = m.state + if old&mutexStarving != 0 { + // If this goroutine was woken and mutex is in starvation mode, + // ownership was handed off to us but mutex is in somewhat + // inconsistent state: mutexLocked is not set and we are still + // accounted as waiter. Fix that. + if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { + panic("sync: inconsistent mutex state") + } + delta := int32(mutexLocked - 1<<mutexWaiterShift) + if !starving || old>>mutexWaiterShift == 1 { + // Exit starvation mode. + // Critical to do it here and consider wait time. + // Starvation mode is so inefficient, that two goroutines + // can go lock-step infinitely once they switch mutex + // to starvation mode. + delta -= mutexStarving + } + atomic.AddInt32(&m.state, delta) break } - runtime_SemacquireMutex(&m.sema) awoke = true iter = 0 + } else { + old = m.state } } @@ -110,22 +182,33 @@ func (m *Mutex) Unlock() { // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { - throw("sync: unlock of unlocked mutex") + panic("sync: unlock of unlocked mutex") } - - old := new - for { - // If there are no waiters or a goroutine has already - // been woken or grabbed the lock, no need to wake anyone. - if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { - return - } - // Grab the right to wake someone. - new = (old - 1<<mutexWaiterShift) | mutexWoken - if atomic.CompareAndSwapInt32(&m.state, old, new) { - runtime_Semrelease(&m.sema) - return + if new&mutexStarving == 0 { + old := new + for { + // If there are no waiters or a goroutine has already + // been woken or grabbed the lock, no need to wake anyone. + // In starvation mode ownership is directly handed off from unlocking + // goroutine to the next waiter. We are not part of this chain, + // since we did not observe mutexStarving when we unlocked the mutex above. + // So get off the way. + if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { + return + } + // Grab the right to wake someone. + new = (old - 1<<mutexWaiterShift) | mutexWoken + if atomic.CompareAndSwapInt32(&m.state, old, new) { + runtime_Semrelease(&m.sema, false) + return + } + old = m.state } - old = m.state + } else { + // Starving mode: handoff mutex ownership to the next waiter. + // Note: mutexLocked is not set, the waiter will set it after wakeup. + // But mutex is still considered locked if mutexStarving is set, + // so new coming goroutines won't acquire it. + runtime_Semrelease(&m.sema, true) } } diff --git a/src/sync/mutex_test.go b/src/sync/mutex_test.go index 88dbccf3ad..784471df12 100644 --- a/src/sync/mutex_test.go +++ b/src/sync/mutex_test.go @@ -15,12 +15,13 @@ import ( "strings" . "sync" "testing" + "time" ) func HammerSemaphore(s *uint32, loops int, cdone chan bool) { for i := 0; i < loops; i++ { Runtime_Semacquire(s) - Runtime_Semrelease(s) + Runtime_Semrelease(s, false) } cdone <- true } @@ -174,6 +175,38 @@ func TestMutexMisuse(t *testing.T) { } } +func TestMutexFairness(t *testing.T) { + var mu Mutex + stop := make(chan bool) + defer close(stop) + go func() { + for { + mu.Lock() + time.Sleep(100 * time.Microsecond) + mu.Unlock() + select { + case <-stop: + return + default: + } + } + }() + done := make(chan bool) + go func() { + for i := 0; i < 10; i++ { + time.Sleep(100 * time.Microsecond) + mu.Lock() + mu.Unlock() + } + done <- true + }() + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatalf("can't acquire Mutex in 10 seconds") + } +} + func BenchmarkMutexUncontended(b *testing.B) { type PaddedMutex struct { Mutex diff --git a/src/sync/runtime.go b/src/sync/runtime.go index 4d22ce6b0d..be16bcc8f7 100644 --- a/src/sync/runtime.go +++ b/src/sync/runtime.go @@ -14,13 +14,15 @@ import "unsafe" func runtime_Semacquire(s *uint32) // SemacquireMutex is like Semacquire, but for profiling contended Mutexes. -func runtime_SemacquireMutex(*uint32) +// If lifo is true, queue waiter at the head of wait queue. +func runtime_SemacquireMutex(s *uint32, lifo bool) // Semrelease atomically increments *s and notifies a waiting goroutine // if one is blocked in Semacquire. // It is intended as a simple wakeup primitive for use by the synchronization // library and should not be used directly. -func runtime_Semrelease(s *uint32) +// If handoff is true, pass count directly to the first waiter. +func runtime_Semrelease(s *uint32, handoff bool) // Approximation of notifyList in runtime/sema.go. Size and alignment must // agree. @@ -57,3 +59,5 @@ func runtime_canSpin(i int) bool // runtime_doSpin does active spinning. func runtime_doSpin() + +func runtime_nanotime() int64 diff --git a/src/sync/runtime_sema_test.go b/src/sync/runtime_sema_test.go index a2382f4655..a680847edf 100644 --- a/src/sync/runtime_sema_test.go +++ b/src/sync/runtime_sema_test.go @@ -18,7 +18,7 @@ func BenchmarkSemaUncontended(b *testing.B) { b.RunParallel(func(pb *testing.PB) { sem := new(PaddedSem) for pb.Next() { - Runtime_Semrelease(&sem.sem) + Runtime_Semrelease(&sem.sem, false) Runtime_Semacquire(&sem.sem) } }) @@ -44,7 +44,7 @@ func benchmarkSema(b *testing.B, block, work bool) { b.RunParallel(func(pb *testing.PB) { foo := 0 for pb.Next() { - Runtime_Semrelease(&sem) + Runtime_Semrelease(&sem, false) if work { for i := 0; i < 100; i++ { foo *= 2 @@ -54,7 +54,7 @@ func benchmarkSema(b *testing.B, block, work bool) { Runtime_Semacquire(&sem) } _ = foo - Runtime_Semrelease(&sem) + Runtime_Semrelease(&sem, false) }) } diff --git a/src/sync/rwmutex.go b/src/sync/rwmutex.go index 71064eeeba..55b69f2bb8 100644 --- a/src/sync/rwmutex.go +++ b/src/sync/rwmutex.go @@ -66,7 +66,7 @@ func (rw *RWMutex) RUnlock() { // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. - runtime_Semrelease(&rw.writerSem) + runtime_Semrelease(&rw.writerSem, false) } } if race.Enabled { @@ -119,7 +119,7 @@ func (rw *RWMutex) Unlock() { } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { - runtime_Semrelease(&rw.readerSem) + runtime_Semrelease(&rw.readerSem, false) } // Allow other writers to proceed. rw.w.Unlock() diff --git a/src/sync/waitgroup.go b/src/sync/waitgroup.go index b386e1fec2..4b23540ae7 100644 --- a/src/sync/waitgroup.go +++ b/src/sync/waitgroup.go @@ -91,7 +91,7 @@ func (wg *WaitGroup) Add(delta int) { // Reset waiters count to 0. *statep = 0 for ; w != 0; w-- { - runtime_Semrelease(&wg.sema) + runtime_Semrelease(&wg.sema, false) } } |
