From eb3c6a93c3236bbde5dee6cc5bd4ca9f8ab1647a Mon Sep 17 00:00:00 2001 From: Michael Anthony Knyszek Date: Mon, 10 Aug 2020 20:02:22 +0000 Subject: runtime: disable stack shrinking in activeStackChans race window Currently activeStackChans is set before a goroutine blocks on a channel operation in an unlockf passed to gopark. The trouble is that the unlockf is called *after* the G's status is changed, and the G's status is what is used by a concurrent mark worker (calling suspendG) to determine that a G has successfully been suspended. In this window between the status change and unlockf, the mark worker could try to shrink the G's stack, and in particular observe that activeStackChans is false. This observation will cause the mark worker to *not* synchronize with concurrent channel operations when it should, and so updating pointers in the sudog for the blocked goroutine (which may point to the goroutine's stack) races with channel operations which may also manipulate the pointer (read it, dereference it, update it, etc.). Fix the problem by adding a new atomically-updated flag to the g struct called parkingOnChan, which is non-zero in the race window above. Then, in isShrinkStackSafe, check if parkingOnChan is zero. The race is resolved like so: * Blocking G sets parkingOnChan, then changes status in gopark. * Mark worker successfully suspends blocking G. * If the mark worker observes parkingOnChan is non-zero when checking isShrinkStackSafe, then it's not safe to shrink (we're in the race window). * If the mark worker observes parkingOnChan as zero, then because the mark worker observed the G status change, it can be sure that gopark's unlockf completed, and gp.activeStackChans will be correct. The risk of this change is low, since although it reduces the number of places that stack shrinking is allowed, the window here is incredibly small. Essentially, every place that it might crash now is replaced with no shrink. This change adds a test, but the race window is so small that it's hard to trigger without a well-placed sleep in park_m. Also, this change fixes stackGrowRecursive in proc_test.go to actually allocate a 128-byte stack frame. It turns out the compiler was destructuring the "pad" field and only allocating one uint64 on the stack. Fixes #40641. Change-Id: I7dfbe7d460f6972b8956116b137bc13bc24464e8 Reviewed-on: https://go-review.googlesource.com/c/go/+/247050 Run-TryBot: Michael Knyszek TryBot-Result: Go Bot Reviewed-by: Michael Pratt Trust: Michael Knyszek --- src/runtime/select.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) (limited to 'src/runtime/select.go') diff --git a/src/runtime/select.go b/src/runtime/select.go index a506747910..41e68a3746 100644 --- a/src/runtime/select.go +++ b/src/runtime/select.go @@ -7,6 +7,7 @@ package runtime // This file contains the implementation of Go select statements. import ( + "runtime/internal/atomic" "unsafe" ) @@ -61,7 +62,20 @@ func selunlock(scases []scase, lockorder []uint16) { func selparkcommit(gp *g, _ unsafe.Pointer) bool { // There are unlocked sudogs that point into gp's stack. Stack // copying must lock the channels of those sudogs. + // Set activeStackChans here instead of before we try parking + // because we could self-deadlock in stack growth on a + // channel lock. gp.activeStackChans = true + // Mark that it's safe for stack shrinking to occur now, + // because any thread acquiring this G's stack for shrinking + // is guaranteed to observe activeStackChans after this store. + atomic.Store8(&gp.parkingOnChan, 0) + // Make sure we unlock after setting activeStackChans and + // unsetting parkingOnChan. The moment we unlock any of the + // channel locks we risk gp getting readied by a channel operation + // and so gp could continue running before everything before the + // unlock is visible (even to gp itself). + // This must not access gp's stack (see gopark). In // particular, it must not access the *hselect. That's okay, // because by the time this is called, gp.waiting has all @@ -305,6 +319,11 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo // wait for someone to wake us up gp.param = nil + // Signal to anyone trying to shrink our stack that we're about + // to park on a channel. The window between when this G's status + // changes and when we set gp.activeStackChans is not safe for + // stack shrinking. + atomic.Store8(&gp.parkingOnChan, 1) gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1) gp.activeStackChans = false -- cgit v1.3 From 35455fff0ebb7dd1b8e698f245a823ef8c711ac9 Mon Sep 17 00:00:00 2001 From: Daniel S Fava Date: Thu, 12 Nov 2020 17:25:47 +0100 Subject: runtime: swap the order of raceacquire() and racerelease() In chansend() and chanrecv() of chan.go, the order of calls to raceacquire() and racerelease() was swapped, which meant that the code was not following the memory model "by the letter of the law." Similar for bufrecv and bufsend in select.go The memory model says: - A send happens before the corresponding receive completes, and - the kth receive on a channel with capacity C happens before the k+C send on that channel completes. The operative word here is "completes." For example, a sender obtains happens-before information on completion of the send-operation, which means, after the sender has deposited its message onto the channel. Similarly for receives. If the order of raceacquire() and racerelease() is incorrect, the race detector may fail to report some race conditions. The fix is minimal from the point of view of Go. The fix does, however, rely on a new function added to TSan: https://reviews.llvm.org/D76322 This commit only affects execution when race detection is enabled. Added two tests into `runtime/race/output_test.go`: - `chanmm` tests for the issue addressed by this patch - `mutex` is a test for inverted semaphores, which must not be broken by this (or any other) patch Fixes #37355 Change-Id: I5e886879ead2bd456a4b7dd1d17253641b767f63 Reviewed-on: https://go-review.googlesource.com/c/go/+/220419 Run-TryBot: Ian Lance Taylor TryBot-Result: Go Bot Trust: Dmitri Shuralyov Reviewed-by: Dmitry Vyukov --- src/runtime/chan.go | 18 ++++------ src/runtime/race.go | 17 ++++++++++ src/runtime/race/output_test.go | 73 +++++++++++++++++++++++++++++++++++++++++ src/runtime/race0.go | 2 ++ src/runtime/select.go | 6 ++-- 5 files changed, 100 insertions(+), 16 deletions(-) (limited to 'src/runtime/select.go') diff --git a/src/runtime/chan.go b/src/runtime/chan.go index 859f36c914..254816e369 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -215,8 +215,7 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { - raceacquire(qp) - racerelease(qp) + racereleaseacquire(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ @@ -299,10 +298,8 @@ func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. qp := chanbuf(c, c.recvx) - raceacquire(qp) - racerelease(qp) - raceacquireg(sg.g, qp) - racereleaseg(sg.g, qp) + racereleaseacquire(qp) + racereleaseacquireg(sg.g, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 @@ -535,8 +532,7 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { - raceacquire(qp) - racerelease(qp) + racereleaseacquire(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) @@ -625,10 +621,8 @@ func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // queue is full, those are both the same slot. qp := chanbuf(c, c.recvx) if raceenabled { - raceacquire(qp) - racerelease(qp) - raceacquireg(sg.g, qp) - racereleaseg(sg.g, qp) + racereleaseacquire(qp) + racereleaseacquireg(sg.g, qp) } // copy data from queue to receiver if ep != nil { diff --git a/src/runtime/race.go b/src/runtime/race.go index 53910f991c..79fd21765d 100644 --- a/src/runtime/race.go +++ b/src/runtime/race.go @@ -268,6 +268,9 @@ var __tsan_acquire byte //go:linkname __tsan_release __tsan_release var __tsan_release byte +//go:linkname __tsan_release_acquire __tsan_release_acquire +var __tsan_release_acquire byte + //go:linkname __tsan_release_merge __tsan_release_merge var __tsan_release_merge byte @@ -293,6 +296,7 @@ var __tsan_report_count byte //go:cgo_import_static __tsan_free //go:cgo_import_static __tsan_acquire //go:cgo_import_static __tsan_release +//go:cgo_import_static __tsan_release_acquire //go:cgo_import_static __tsan_release_merge //go:cgo_import_static __tsan_go_ignore_sync_begin //go:cgo_import_static __tsan_go_ignore_sync_end @@ -535,6 +539,19 @@ func racereleaseg(gp *g, addr unsafe.Pointer) { racecall(&__tsan_release, gp.racectx, uintptr(addr), 0, 0) } +//go:nosplit +func racereleaseacquire(addr unsafe.Pointer) { + racereleaseacquireg(getg(), addr) +} + +//go:nosplit +func racereleaseacquireg(gp *g, addr unsafe.Pointer) { + if getg().raceignore != 0 || !isvalidaddr(addr) { + return + } + racecall(&__tsan_release_acquire, gp.racectx, uintptr(addr), 0, 0) +} + //go:nosplit func racereleasemerge(addr unsafe.Pointer) { racereleasemergeg(getg(), addr) diff --git a/src/runtime/race/output_test.go b/src/runtime/race/output_test.go index b4b8936c7c..5d0192f67f 100644 --- a/src/runtime/race/output_test.go +++ b/src/runtime/race/output_test.go @@ -338,4 +338,77 @@ func TestPass(t *testing.T) { --- FAIL: TestFail \(0...s\) .*testing.go:.*: race detected during execution of test FAIL`}, + {"mutex", "run", "", "atexit_sleep_ms=0", ` +package main +import ( + "sync" + "fmt" +) +func main() { + c := make(chan bool, 1) + threads := 1 + iterations := 20000 + data := 0 + var wg sync.WaitGroup + for i := 0; i < threads; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + c <- true + data += 1 + <- c + } + }() + } + for i := 0; i < iterations; i++ { + c <- true + data += 1 + <- c + } + wg.Wait() + if (data == iterations*(threads+1)) { fmt.Println("pass") } +}`, `pass`}, + // Test for https://github.com/golang/go/issues/37355 + {"chanmm", "run", "", "atexit_sleep_ms=0", ` +package main +import ( + "sync" + "time" +) +func main() { + c := make(chan bool, 1) + var data uint64 + var wg sync.WaitGroup + wg.Add(2) + c <- true + go func() { + defer wg.Done() + c <- true + }() + go func() { + defer wg.Done() + time.Sleep(time.Second) + <-c + data = 2 + }() + data = 1 + <-c + wg.Wait() + _ = data +} +`, `================== +WARNING: DATA RACE +Write at 0x[0-9,a-f]+ by goroutine [0-9]: + main\.main\.func2\(\) + .*/main\.go:21 \+0x[0-9,a-f]+ + +Previous write at 0x[0-9,a-f]+ by main goroutine: + main\.main\(\) + .*/main\.go:23 \+0x[0-9,a-f]+ + +Goroutine [0-9] \(running\) created at: + main\.main\(\) + .*/main.go:[0-9]+ \+0x[0-9,a-f]+ +==================`}, } diff --git a/src/runtime/race0.go b/src/runtime/race0.go index 6f26afa854..180f707b1a 100644 --- a/src/runtime/race0.go +++ b/src/runtime/race0.go @@ -32,6 +32,8 @@ func raceacquireg(gp *g, addr unsafe.Pointer) { th func raceacquirectx(racectx uintptr, addr unsafe.Pointer) { throw("race") } func racerelease(addr unsafe.Pointer) { throw("race") } func racereleaseg(gp *g, addr unsafe.Pointer) { throw("race") } +func racereleaseacquire(addr unsafe.Pointer) { throw("race") } +func racereleaseacquireg(gp *g, addr unsafe.Pointer) { throw("race") } func racereleasemerge(addr unsafe.Pointer) { throw("race") } func racereleasemergeg(gp *g, addr unsafe.Pointer) { throw("race") } func racefingo() { throw("race") } diff --git a/src/runtime/select.go b/src/runtime/select.go index 41e68a3746..f04b130b15 100644 --- a/src/runtime/select.go +++ b/src/runtime/select.go @@ -415,8 +415,7 @@ bufrecv: if cas.elem != nil { raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc) } - raceacquire(chanbuf(c, c.recvx)) - racerelease(chanbuf(c, c.recvx)) + racereleaseacquire(chanbuf(c, c.recvx)) } if msanenabled && cas.elem != nil { msanwrite(cas.elem, c.elemtype.size) @@ -438,8 +437,7 @@ bufrecv: bufsend: // can send to buffer if raceenabled { - raceacquire(chanbuf(c, c.sendx)) - racerelease(chanbuf(c, c.sendx)) + racereleaseacquire(chanbuf(c, c.sendx)) raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) } if msanenabled { -- cgit v1.3 From df68e01b6860e585033156e84f8f9716d2f41a28 Mon Sep 17 00:00:00 2001 From: Daniel S Fava Date: Fri, 20 Nov 2020 21:23:45 +0100 Subject: runtime: check channel's elemsize before calling race detector When c.elemsize==0 we call raceacquire() and racerelease() as opposed to calling racereleaseacquire() The reason for this change is that, when elemsize==0, we don't allocate a full buffer for the channel. Instead of individual buffer entries, the race detector uses the c.buf as the only buffer entry. This simplification prevents us following the memory model's happens-before rules implemented in racereleaseacquire(). So, instead of calling racereleaseacquire(), we accumulate happens-before information in the synchronization object associated with c.buf. The functionality in this change is implemented in a new function called racenotify() Fixes #42598 Change-Id: I75b92708633fdfde658dc52e06264e2171824e51 Reviewed-on: https://go-review.googlesource.com/c/go/+/271987 Reviewed-by: Dmitry Vyukov Reviewed-by: Russ Cox Run-TryBot: Dmitry Vyukov TryBot-Result: Go Bot Trust: Ian Lance Taylor --- src/runtime/chan.go | 48 +++++++++++++++++++++++++++++----- src/runtime/race/testdata/chan_test.go | 22 ++++++++++++++++ src/runtime/select.go | 4 +-- 3 files changed, 65 insertions(+), 9 deletions(-) (limited to 'src/runtime/select.go') diff --git a/src/runtime/chan.go b/src/runtime/chan.go index 254816e369..ba56e2cc40 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -215,7 +215,7 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { - racereleaseacquire(qp) + racenotify(c, c.sendx, nil) } typedmemmove(c.elemtype, qp, ep) c.sendx++ @@ -297,9 +297,8 @@ func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // Pretend we go through the buffer, even though // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. - qp := chanbuf(c, c.recvx) - racereleaseacquire(qp) - racereleaseacquireg(sg.g, qp) + racenotify(c, c.recvx, nil) + racenotify(c, c.recvx, sg) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 @@ -532,7 +531,7 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { - racereleaseacquire(qp) + racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) @@ -621,8 +620,8 @@ func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // queue is full, those are both the same slot. qp := chanbuf(c, c.recvx) if raceenabled { - racereleaseacquire(qp) - racereleaseacquireg(sg.g, qp) + racenotify(c, c.recvx, nil) + racenotify(c, c.recvx, sg) } // copy data from queue to receiver if ep != nil { @@ -833,3 +832,38 @@ func racesync(c *hchan, sg *sudog) { racereleaseg(sg.g, chanbuf(c, 0)) raceacquire(chanbuf(c, 0)) } + +// Notify the race detector of a send or receive involving buffer entry idx +// and a channel c or its communicating partner sg. +// This function handles the special case of c.elemsize==0. +func racenotify(c *hchan, idx uint, sg *sudog) { + // We could have passed the unsafe.Pointer corresponding to entry idx + // instead of idx itself. However, in a future version of this function, + // we can use idx to better handle the case of elemsize==0. + // A future improvement to the detector is to call TSan with c and idx: + // this way, Go will continue to not allocating buffer entries for channels + // of elemsize==0, yet the race detector can be made to handle multiple + // sync objects underneath the hood (one sync object per idx) + qp := chanbuf(c, idx) + // When elemsize==0, we don't allocate a full buffer for the channel. + // Instead of individual buffer entries, the race detector uses the + // c.buf as the only buffer entry. This simplification prevents us from + // following the memory model's happens-before rules (rules that are + // implemented in racereleaseacquire). Instead, we accumulate happens-before + // information in the synchronization object associated with c.buf. + if c.elemsize == 0 { + if sg == nil { + raceacquire(qp) + racerelease(qp) + } else { + raceacquireg(sg.g, qp) + racereleaseg(sg.g, qp) + } + } else { + if sg == nil { + racereleaseacquire(qp) + } else { + racereleaseacquireg(sg.g, qp) + } + } +} diff --git a/src/runtime/race/testdata/chan_test.go b/src/runtime/race/testdata/chan_test.go index 3e57b8221c..e39ad4f99c 100644 --- a/src/runtime/race/testdata/chan_test.go +++ b/src/runtime/race/testdata/chan_test.go @@ -763,3 +763,25 @@ func TestNoRaceCloseHappensBeforeRead(t *testing.T) { <-read } } + +// Test that we call the proper race detector function when c.elemsize==0. +// See https://github.com/golang/go/issues/42598 +func TestNoRaceElemetSize0(t *testing.T) { + var x, y int + var c = make(chan struct{}, 2) + c <- struct{}{} + c <- struct{}{} + go func() { + x += 1 + <-c + }() + go func() { + y += 1 + <-c + }() + time.Sleep(10 * time.Millisecond) + c <- struct{}{} + c <- struct{}{} + x += 1 + y += 1 +} diff --git a/src/runtime/select.go b/src/runtime/select.go index f04b130b15..e72761bfa9 100644 --- a/src/runtime/select.go +++ b/src/runtime/select.go @@ -415,7 +415,7 @@ bufrecv: if cas.elem != nil { raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc) } - racereleaseacquire(chanbuf(c, c.recvx)) + racenotify(c, c.recvx, nil) } if msanenabled && cas.elem != nil { msanwrite(cas.elem, c.elemtype.size) @@ -437,7 +437,7 @@ bufrecv: bufsend: // can send to buffer if raceenabled { - racereleaseacquire(chanbuf(c, c.sendx)) + racenotify(c, c.sendx, nil) raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) } if msanenabled { -- cgit v1.3