aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/select.go
diff options
context:
space:
mode:
authorKeith Randall <khr@golang.org>2015-11-07 21:28:21 -0800
committerKeith Randall <khr@golang.org>2015-11-08 23:20:25 +0000
commite410a527b208e0a9acd0cded3775b302d8f2b00a (patch)
tree5bb9f4ba8c8260570e5ac15ae11f923e71cb62f5 /src/runtime/select.go
parent1b4d28f8cf48f6d8cf346adc1d3cbd0ede338558 (diff)
downloadgo-e410a527b208e0a9acd0cded3775b302d8f2b00a.tar.xz
runtime: simplify chan ops, take 2
This change is the same as CL #9345 which was reverted, except for a small bug fix. The only change is to the body of sendDirect and its callsite. Also added a test. The problem was during a channel send operation. The target of the send was a sleeping goroutine waiting to receive. We basically do: 1) Read the destination pointer out of the sudog structure 2) Copy the value we're sending to that destination pointer Unfortunately, the previous change had a goroutine suspend point between 1 & 2 (the call to sendDirect). At that point the destination goroutine's stack could be copied (shrunk). The pointer we read in step 1 is no longer valid for step 2. Fixed by not allowing any suspension points between 1 & 2. I suspect the old code worked correctly basically by accident. Fixes #13169 The original 9345: This change removes the retry mechanism we use for buffered channels. Instead, any sender waking up a receiver or vice versa completes the full protocol with its counterpart. This means the counterpart does not need to relock the channel when it wakes up. (Currently buffered channels need to relock on wakeup.) For sends on a channel with waiting receivers, this change replaces two copies (sender->queue, queue->receiver) with one (sender->receiver). For receives on channels with a waiting sender, two copies are still required. This change unifies to a large degree the algorithm for buffered and unbuffered channels, simplifying the overall implementation. Fixes #11506 Change-Id: I57dfa3fc219cffa4d48301ee15fe5479299efa09 Reviewed-on: https://go-review.googlesource.com/16740 Reviewed-by: Ian Lance Taylor <iant@golang.org>
Diffstat (limited to 'src/runtime/select.go')
-rw-r--r--src/runtime/select.go118
1 files changed, 32 insertions, 86 deletions
diff --git a/src/runtime/select.go b/src/runtime/select.go
index 8b6c3ed4c0..508a19b630 100644
--- a/src/runtime/select.go
+++ b/src/runtime/select.go
@@ -304,7 +304,7 @@ func selectgoImpl(sel *hselect) (uintptr, uint16) {
k *scase
sglist *sudog
sgnext *sudog
- futile byte
+ qp unsafe.Pointer
)
loop:
@@ -317,15 +317,12 @@ loop:
switch cas.kind {
case caseRecv:
- if c.dataqsiz > 0 {
- if c.qcount > 0 {
- goto asyncrecv
- }
- } else {
- sg = c.sendq.dequeue()
- if sg != nil {
- goto syncrecv
- }
+ sg = c.sendq.dequeue()
+ if sg != nil {
+ goto recv
+ }
+ if c.qcount > 0 {
+ goto bufrecv
}
if c.closed != 0 {
goto rclose
@@ -338,15 +335,12 @@ loop:
if c.closed != 0 {
goto sclose
}
- if c.dataqsiz > 0 {
- if c.qcount < c.dataqsiz {
- goto asyncsend
- }
- } else {
- sg = c.recvq.dequeue()
- if sg != nil {
- goto syncsend
- }
+ sg = c.recvq.dequeue()
+ if sg != nil {
+ goto send
+ }
+ if c.qcount < c.dataqsiz {
+ goto bufsend
}
case caseDefault:
@@ -363,6 +357,9 @@ loop:
// pass 2 - enqueue on all chans
gp = getg()
done = 0
+ if gp.waiting != nil {
+ throw("gp.waiting != nil")
+ }
for i := 0; i < int(sel.ncase); i++ {
cas = &scases[pollorder[i]]
c = cas.c
@@ -389,7 +386,7 @@ loop:
// wait for someone to wake us up
gp.param = nil
- gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect|futile, 2)
+ gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect, 2)
// someone woke us up
sellock(sel)
@@ -432,16 +429,13 @@ loop:
}
if cas == nil {
- futile = traceFutileWakeup
+ // This can happen if we were woken up by a close().
+ // TODO: figure that out explicitly so we don't need this loop.
goto loop
}
c = cas.c
- if c.dataqsiz > 0 {
- throw("selectgo: shouldn't happen")
- }
-
if debugSelect {
print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
}
@@ -470,7 +464,7 @@ loop:
selunlock(sel)
goto retc
-asyncrecv:
+bufrecv:
// can receive from buffer
if raceenabled {
if cas.elem != nil {
@@ -485,29 +479,20 @@ asyncrecv:
if cas.receivedp != nil {
*cas.receivedp = true
}
+ qp = chanbuf(c, c.recvx)
if cas.elem != nil {
- typedmemmove(c.elemtype, cas.elem, chanbuf(c, c.recvx))
+ typedmemmove(c.elemtype, cas.elem, qp)
}
- memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
+ memclr(qp, uintptr(c.elemsize))
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
- sg = c.sendq.dequeue()
- if sg != nil {
- gp = sg.g
- selunlock(sel)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(gp, 3)
- } else {
- selunlock(sel)
- }
+ selunlock(sel)
goto retc
-asyncsend:
+bufsend:
// can send to buffer
if raceenabled {
raceacquire(chanbuf(c, c.sendx))
@@ -523,47 +508,18 @@ asyncsend:
c.sendx = 0
}
c.qcount++
- sg = c.recvq.dequeue()
- if sg != nil {
- gp = sg.g
- selunlock(sel)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(gp, 3)
- } else {
- selunlock(sel)
- }
+ selunlock(sel)
goto retc
-syncrecv:
+recv:
// can receive from sleeping sender (sg)
- if raceenabled {
- if cas.elem != nil {
- raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
- }
- racesync(c, sg)
- }
- if msanenabled && cas.elem != nil {
- msanwrite(cas.elem, c.elemtype.size)
- }
- selunlock(sel)
+ recv(c, sg, cas.elem, func() { selunlock(sel) })
if debugSelect {
print("syncrecv: sel=", sel, " c=", c, "\n")
}
if cas.receivedp != nil {
*cas.receivedp = true
}
- if cas.elem != nil {
- typedmemmove(c.elemtype, cas.elem, sg.elem)
- }
- sg.elem = nil
- gp = sg.g
- gp.param = unsafe.Pointer(sg)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(gp, 3)
goto retc
rclose:
@@ -580,29 +536,19 @@ rclose:
}
goto retc
-syncsend:
- // can send to sleeping receiver (sg)
+send:
+ // can send to a sleeping receiver (sg)
if raceenabled {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
- racesync(c, sg)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
- selunlock(sel)
+ send(c, sg, cas.elem, func() { selunlock(sel) })
if debugSelect {
print("syncsend: sel=", sel, " c=", c, "\n")
}
- if sg.elem != nil {
- syncsend(c, sg, cas.elem)
- }
- sg.elem = nil
- gp = sg.g
- gp.param = unsafe.Pointer(sg)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(gp, 3)
+ goto retc
retc:
if cas.releasetime > 0 {