aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/chan.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/runtime/chan.go')
-rw-r--r--src/runtime/chan.go76
1 files changed, 63 insertions, 13 deletions
diff --git a/src/runtime/chan.go b/src/runtime/chan.go
index 0afe5d962b..ba56e2cc40 100644
--- a/src/runtime/chan.go
+++ b/src/runtime/chan.go
@@ -215,8 +215,7 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
- raceacquire(qp)
- racerelease(qp)
+ racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
@@ -250,6 +249,11 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
+ // Signal to anyone trying to shrink our stack that we're about
+ // to park on a channel. The window between when this G's status
+ // changes and when we set gp.activeStackChans is not safe for
+ // stack shrinking.
+ atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
@@ -293,11 +297,8 @@ func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
- qp := chanbuf(c, c.recvx)
- raceacquire(qp)
- racerelease(qp)
- raceacquireg(sg.g, qp)
- racereleaseg(sg.g, qp)
+ racenotify(c, c.recvx, nil)
+ racenotify(c, c.recvx, sg)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
@@ -530,8 +531,7 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
- raceacquire(qp)
- racerelease(qp)
+ racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
@@ -568,6 +568,11 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
+ // Signal to anyone trying to shrink our stack that we're about
+ // to park on a channel. The window between when this G's status
+ // changes and when we set gp.activeStackChans is not safe for
+ // stack shrinking.
+ atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
@@ -615,10 +620,8 @@ func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
- raceacquire(qp)
- racerelease(qp)
- raceacquireg(sg.g, qp)
- racereleaseg(sg.g, qp)
+ racenotify(c, c.recvx, nil)
+ racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
@@ -646,7 +649,19 @@ func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
// There are unlocked sudogs that point into gp's stack. Stack
// copying must lock the channels of those sudogs.
+ // Set activeStackChans here instead of before we try parking
+ // because we could self-deadlock in stack growth on the
+ // channel lock.
gp.activeStackChans = true
+ // Mark that it's safe for stack shrinking to occur now,
+ // because any thread acquiring this G's stack for shrinking
+ // is guaranteed to observe activeStackChans after this store.
+ atomic.Store8(&gp.parkingOnChan, 0)
+ // Make sure we unlock after setting activeStackChans and
+ // unsetting parkingOnChan. The moment we unlock chanLock
+ // we risk gp getting readied by a channel operation and
+ // so gp could continue running before everything before
+ // the unlock is visible (even to gp itself).
unlock((*mutex)(chanLock))
return true
}
@@ -817,3 +832,38 @@ func racesync(c *hchan, sg *sudog) {
racereleaseg(sg.g, chanbuf(c, 0))
raceacquire(chanbuf(c, 0))
}
+
+// Notify the race detector of a send or receive involving buffer entry idx
+// and a channel c or its communicating partner sg.
+// This function handles the special case of c.elemsize==0.
+func racenotify(c *hchan, idx uint, sg *sudog) {
+ // We could have passed the unsafe.Pointer corresponding to entry idx
+ // instead of idx itself. However, in a future version of this function,
+ // we can use idx to better handle the case of elemsize==0.
+ // A future improvement to the detector is to call TSan with c and idx:
+ // this way, Go will continue to not allocating buffer entries for channels
+ // of elemsize==0, yet the race detector can be made to handle multiple
+ // sync objects underneath the hood (one sync object per idx)
+ qp := chanbuf(c, idx)
+ // When elemsize==0, we don't allocate a full buffer for the channel.
+ // Instead of individual buffer entries, the race detector uses the
+ // c.buf as the only buffer entry. This simplification prevents us from
+ // following the memory model's happens-before rules (rules that are
+ // implemented in racereleaseacquire). Instead, we accumulate happens-before
+ // information in the synchronization object associated with c.buf.
+ if c.elemsize == 0 {
+ if sg == nil {
+ raceacquire(qp)
+ racerelease(qp)
+ } else {
+ raceacquireg(sg.g, qp)
+ racereleaseg(sg.g, qp)
+ }
+ } else {
+ if sg == nil {
+ racereleaseacquire(qp)
+ } else {
+ racereleaseacquireg(sg.g, qp)
+ }
+ }
+}