diff options
| author | Keith Randall <khr@golang.org> | 2014-08-24 12:31:03 +0400 |
|---|---|---|
| committer | Dmitriy Vyukov <dvyukov@google.com> | 2014-08-24 12:31:03 +0400 |
| commit | 9a1e142bbc3209f8899a729ec857a1c8838bf9f4 (patch) | |
| tree | d490b16e05e6d80d05d295d4379f944fbf7317a9 /src/pkg/runtime | |
| parent | 7f223e3b3ba732c287db5a57b5091784baa9b86f (diff) | |
| download | go-9a1e142bbc3209f8899a729ec857a1c8838bf9f4.tar.xz | |
runtime: convert channel operations to Go, part 1 (chansend1).
LGTM=dvyukov
R=dvyukov, khr
CC=golang-codereviews
https://golang.org/cl/127460044
Diffstat (limited to 'src/pkg/runtime')
| -rw-r--r-- | src/pkg/runtime/chan.go | 278 | ||||
| -rw-r--r-- | src/pkg/runtime/chan.goc | 77 | ||||
| -rw-r--r-- | src/pkg/runtime/chan.h | 13 | ||||
| -rw-r--r-- | src/pkg/runtime/chan_test.go | 32 | ||||
| -rw-r--r-- | src/pkg/runtime/mprof.goc | 11 | ||||
| -rw-r--r-- | src/pkg/runtime/pprof/pprof_test.go | 2 | ||||
| -rw-r--r-- | src/pkg/runtime/proc.c | 1 | ||||
| -rw-r--r-- | src/pkg/runtime/runtime.h | 15 | ||||
| -rw-r--r-- | src/pkg/runtime/stack.c | 22 | ||||
| -rw-r--r-- | src/pkg/runtime/stubs.go | 12 |
10 files changed, 368 insertions, 95 deletions
diff --git a/src/pkg/runtime/chan.go b/src/pkg/runtime/chan.go new file mode 100644 index 0000000000..67427e960e --- /dev/null +++ b/src/pkg/runtime/chan.go @@ -0,0 +1,278 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package runtime + +// This file contains the implementation of Go channels +// and select statements. + +import "unsafe" + +const ( + maxAlign = 8 + hchanSize = unsafe.Sizeof(hchan{}) + debugChan = false +) + +// TODO: make hchan.buf an unsafe.Pointer, not a *uint8 + +func makechan(t *chantype, size int64) *hchan { + elem := t.elem + + // compiler checks this but be safe. + if elem.size >= 1<<16 { + gothrow("makechan: invalid channel element type") + } + if hchanSize%maxAlign != 0 || elem.align > maxAlign { + gothrow("makechan: bad alignment") + } + if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (maxMem-hchanSize)/uintptr(elem.size)) { + panic("makechan: size out of range") + } + + var c *hchan + if elem.kind&kindNoPointers != 0 || size == 0 { + // allocate memory in one call + c = (*hchan)(gomallocgc(hchanSize+uintptr(size)*uintptr(elem.size), nil, flagNoScan)) + if size > 0 && elem.size != 0 { + c.buf = (*uint8)(add(unsafe.Pointer(c), hchanSize)) + } else { + c.buf = (*uint8)(unsafe.Pointer(c)) // race detector uses this location for synchronization + } + } else { + c = new(hchan) + c.buf = (*uint8)(newarray(elem, uintptr(size))) + } + c.elemsize = uint16(elem.size) + c.elemtype = elem + c.dataqsiz = uint(size) + + if debugChan { + println("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size) + } + return c +} + +// chanbuf(c, i) is pointer to the i'th slot in the buffer. +func chanbuf(c *hchan, i uint) unsafe.Pointer { + return add(unsafe.Pointer(c.buf), uintptr(i)*uintptr(c.elemsize)) +} + +// entry point for c <- x from compiled code +//go:nosplit +func chansend1(t *chantype, c *hchan, elem unsafe.Pointer) { + chansend(t, c, elem, true, gogetcallerpc(unsafe.Pointer(&t))) +} + +/* + * generic single channel send/recv + * If block is not nil, + * then the protocol will not + * sleep but return if it could + * not complete. + * + * sleep can wake up with g.param == nil + * when a channel involved in the sleep has + * been closed. it is easiest to loop and re-run + * the operation; we'll see that it's now closed. + */ +func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { + if raceenabled { + fn := chansend + pc := **(**uintptr)(unsafe.Pointer(&fn)) + raceReadObjectPC(t.elem, ep, callerpc, pc) + } + + if c == nil { + if !block { + return false + } + gopark(nil, nil, "chan send (nil chan)") + return false // not reached + } + + if debugChan { + println("chansend: chan=", c) + } + + var t0 int64 + if blockprofilerate > 0 { + t0 = gocputicks() + } + + golock(&c.lock) + if raceenabled { + fn := chansend + pc := **(**uintptr)(unsafe.Pointer(&fn)) + racereadpc(unsafe.Pointer(c), pc, callerpc) + } + if c.closed != 0 { + gounlock(&c.lock) + panic("send on closed channel") + } + + if c.dataqsiz == 0 { // synchronous channel + sg := c.recvq.dequeue() + if sg != nil { // found a waiting receiver + if raceenabled { + racesync(c, sg) + } + gounlock(&c.lock) + + recvg := sg.g + recvg.param = unsafe.Pointer(sg) + if sg.elem != nil { + memmove(unsafe.Pointer(sg.elem), ep, uintptr(c.elemsize)) + } + if sg.releasetime != 0 { + // Yes, this is ugly. On 64-bit sg.releasetime has type + // int. On 32-bit it has type int64. There's no easy way + // to assign to both types in Go. At some point we'll + // write the Go types directly instead of generating them + // via the C types. At that point, this nastiness goes away. + *(*int64)(unsafe.Pointer(&sg.releasetime)) = gocputicks() + } + goready(recvg) + return true + } + + if !block { + gounlock(&c.lock) + return false + } + + // no receiver available: block on this channel. + gp := getg() + mysg := acquireSudog() + if t0 != 0 { + mysg.releasetime = -1 + } + mysg.elem = (*uint8)(ep) + mysg.waitlink = nil + gp.waiting = mysg + mysg.g = gp + mysg.selectdone = nil + gp.param = nil + c.sendq.enqueue(mysg) + goparkunlock(&c.lock, "chan send") + + // someone woke us up. + if gp.param == nil { + if c.closed == 0 { + gothrow("chansend: spurious wakeup") + } + panic("send on closed channel") + } + if mysg.releasetime > 0 { + goblockevent(int64(mysg.releasetime)-t0, 3) + } + if mysg != gp.waiting { + gothrow("G waiting list is corrupted!") + } + gp.waiting = nil + releaseSudog(mysg) + return true + } + + // asynchronous channel + // wait for some space to write our data + var t1 int64 + for c.qcount >= c.dataqsiz { + if !block { + gounlock(&c.lock) + return false + } + gp := getg() + mysg := acquireSudog() + if t0 != 0 { + mysg.releasetime = -1 + } + mysg.g = gp + mysg.elem = nil + mysg.selectdone = nil + c.sendq.enqueue(mysg) + goparkunlock(&c.lock, "chan send") + + // someone woke us up - try again + if mysg.releasetime != 0 { + t1 = int64(mysg.releasetime) + } + releaseSudog(mysg) + golock(&c.lock) + if c.closed != 0 { + gounlock(&c.lock) + panic("send on closed channel") + } + } + + // write our data into the channel buffer + if raceenabled { + raceacquire(chanbuf(c, c.sendx)) + racerelease(chanbuf(c, c.sendx)) + } + memmove(chanbuf(c, c.sendx), ep, uintptr(c.elemsize)) + c.sendx++ + if c.sendx == c.dataqsiz { + c.sendx = 0 + } + c.qcount++ + + // wake up a waiting receiver + sg := c.recvq.dequeue() + if sg != nil { + recvg := sg.g + gounlock(&c.lock) + if sg.releasetime != 0 { + *(*int64)(unsafe.Pointer(&sg.releasetime)) = gocputicks() + } + goready(recvg) + } else { + gounlock(&c.lock) + } + if t1 > 0 { + goblockevent(t1-t0, 3) + } + return true +} + +func (q *waitq) enqueue(sgp *sudog) { + sgp.link = nil + if q.first == nil { + q.first = sgp + q.last = sgp + return + } + q.last.link = sgp + q.last = sgp +} + +func (q *waitq) dequeue() *sudog { + for { + sgp := q.first + if sgp == nil { + return nil + } + q.first = sgp.link + if q.last == sgp { + q.last = nil + } + + // if sgp participates in a select and is already signaled, ignore it + if sgp.selectdone != nil { + // claim the right to signal + if *sgp.selectdone != 0 || !gocas(sgp.selectdone, 0, 1) { + continue + } + } + + return sgp + } +} + +func racesync(c *hchan, sg *sudog) { + racerelease(chanbuf(c, 0)) + raceacquireg(sg.g, chanbuf(c, 0)) + racereleaseg(sg.g, chanbuf(c, 0)) + raceacquire(chanbuf(c, 0)) +} diff --git a/src/pkg/runtime/chan.goc b/src/pkg/runtime/chan.goc index 2ef1c8566e..7f6373dc81 100644 --- a/src/pkg/runtime/chan.goc +++ b/src/pkg/runtime/chan.goc @@ -18,78 +18,6 @@ static SudoG* dequeue(WaitQ*); static void enqueue(WaitQ*, SudoG*); static void racesync(Hchan*, SudoG*); -static Type hchanType; -static String hchanStr; - -void -runtime·chaninit(void) -{ - int32 i, off; - byte *mask; - - // Generate (bare minimum) type descriptor for Hchan. - hchanType.size = sizeof(Hchan); - hchanStr = runtime·gostringnocopy((byte*)"chan"); - hchanType.string = &hchanStr; - // Hchan has only one interesting pointer -- buf. - off = offsetof(Hchan, buf)/PtrSize*gcBits; - if(off%8) - runtime·throw("makechan: unaligned buffer"); - if(off+8 >= sizeof(hchanType.gc)*8) - runtime·throw("makechan: gc mask does not fit"); - mask = (byte*)hchanType.gc; - for(i = 0; i < off/8; i++) - mask[i] = (BitsScalar<<2) | (BitsScalar<<6); - mask[off/8] = (BitsPointer<<2) | (BitsDead<<6); -} - -static Hchan* -makechan(ChanType *t, int64 hint) -{ - Hchan *c; - Type *elem; - - elem = t->elem; - - // compiler checks this but be safe. - if(elem->size >= (1<<16)) - runtime·throw("makechan: invalid channel element type"); - if((sizeof(*c)%MAXALIGN) != 0 || elem->align > MAXALIGN) - runtime·throw("makechan: bad alignment"); - - if(hint < 0 || (intgo)hint != hint || (elem->size > 0 && hint > (MaxMem - sizeof(*c)) / elem->size)) - runtime·panicstring("makechan: size out of range"); - - if((elem->kind&KindNoPointers) || hint == 0) { - // allocate memory in one call - c = (Hchan*)runtime·mallocgc(sizeof(*c) + hint*elem->size, nil, FlagNoScan); - if(hint > 0 && elem->size != 0) - c->buf = (byte*)(c+1); - else - c->buf = (byte*)c; // race detector uses this location for synchronization - } else { - c = (Hchan*)runtime·cnew(&hchanType); - c->buf = runtime·cnewarray(elem, hint); - } - c->elemsize = elem->size; - c->elemtype = elem; - c->dataqsiz = hint; - - if(debug) - runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%p; dataqsiz=%D\n", - c, (int64)elem->size, elem->alg, (int64)c->dataqsiz); - - return c; -} - -func reflect·makechan(t *ChanType, size uint64) (c *Hchan) { - c = makechan(t, size); -} - -func makechan(t *ChanType, size int64) (c *Hchan) { - c = makechan(t, size); -} - /* * generic single channel send/recv * if the bool pointer is nil, @@ -376,11 +304,6 @@ closed: } #pragma textflag NOSPLIT -func chansend1(t *ChanType, c *Hchan, elem *byte) { - chansend(t, c, elem, true, runtime·getcallerpc(&t)); -} - -#pragma textflag NOSPLIT func chanrecv1(t *ChanType, c *Hchan, elem *byte) { chanrecv(t, c, elem, true, nil); } diff --git a/src/pkg/runtime/chan.h b/src/pkg/runtime/chan.h index 5ac39cab88..5ebbcfd4da 100644 --- a/src/pkg/runtime/chan.h +++ b/src/pkg/runtime/chan.h @@ -5,22 +5,9 @@ #define MAXALIGN 8 typedef struct WaitQ WaitQ; -typedef struct SudoG SudoG; typedef struct Select Select; typedef struct Scase Scase; -// Known to compiler. -// Changes here must also be made in src/cmd/gc/select.c's selecttype. -struct SudoG -{ - G* g; - uint32* selectdone; - SudoG* link; - byte* elem; // data element - int64 releasetime; - int32 nrelease; // -1 for acquire -}; - struct WaitQ { SudoG* first; diff --git a/src/pkg/runtime/chan_test.go b/src/pkg/runtime/chan_test.go index 9ffdc07dc7..bb0f28655d 100644 --- a/src/pkg/runtime/chan_test.go +++ b/src/pkg/runtime/chan_test.go @@ -430,6 +430,38 @@ func TestMultiConsumer(t *testing.T) { } } +func TestShrinkStackDuringBlockedSend(t *testing.T) { + // make sure that channel operations still work when we are + // blocked on a channel send and we shrink the stack. + // NOTE: this test probably won't fail unless stack.c:StackDebug + // is set to >= 1. + const n = 10 + c := make(chan int) + done := make(chan struct{}) + + go func() { + for i := 0; i < n; i++ { + c <- i + // use lots of stack, briefly. + stackGrowthRecursive(20) + } + done <- struct{}{} + }() + + for i := 0; i < n; i++ { + x := <-c + if x != i { + t.Errorf("bad channel read: want %d, got %d", i, x) + } + // Waste some time so sender can finish using lots of stack + // and block in channel send. + time.Sleep(1 * time.Millisecond) + // trigger GC which will shrink the stack of the sender. + runtime.GC() + } + <-done +} + func BenchmarkChanNonblocking(b *testing.B) { myc := make(chan int) b.RunParallel(func(pb *testing.PB) { diff --git a/src/pkg/runtime/mprof.goc b/src/pkg/runtime/mprof.goc index 3d8d790cdd..57596b2231 100644 --- a/src/pkg/runtime/mprof.goc +++ b/src/pkg/runtime/mprof.goc @@ -218,7 +218,10 @@ runtime·blockevent(int64 cycles, int32 skip) if(rate <= 0 || (rate > cycles && runtime·fastrand1()%rate > cycles)) return; - nstk = runtime·callers(skip, stk, nelem(stk)); + if(g->m->curg == nil || g->m->curg == g) + nstk = runtime·callers(skip, stk, nelem(stk)); + else + nstk = runtime·gcallers(g->m->curg, skip, stk, nelem(stk)); runtime·lock(&runtime·proflock); b = stkbucket(BProf, 0, stk, nstk, true); b->data.bp.count++; @@ -227,6 +230,12 @@ runtime·blockevent(int64 cycles, int32 skip) } void +runtime·blockevent_m(void) +{ + runtime·blockevent(g->m->scalararg[0] + ((int64)g->m->scalararg[1]<<32), g->m->scalararg[2]); +} + +void runtime·iterate_memprof(void (*callback)(Bucket*, uintptr, uintptr*, uintptr, uintptr, uintptr)) { Bucket *b; diff --git a/src/pkg/runtime/pprof/pprof_test.go b/src/pkg/runtime/pprof/pprof_test.go index aba538e755..9ab211c2da 100644 --- a/src/pkg/runtime/pprof/pprof_test.go +++ b/src/pkg/runtime/pprof/pprof_test.go @@ -287,7 +287,7 @@ func TestBlockProfile(t *testing.T) { `}, {"chan send", blockChanSend, ` [0-9]+ [0-9]+ @ 0x[0-9,a-f]+ 0x[0-9,a-f]+ 0x[0-9,a-f]+ 0x[0-9,a-f]+ 0x[0-9,a-f]+ -# 0x[0-9,a-f]+ runtime\.chansend1\+0x[0-9,a-f]+ .*/src/pkg/runtime/chan.goc:[0-9]+ +# 0x[0-9,a-f]+ runtime\.chansend1\+0x[0-9,a-f]+ .*/src/pkg/runtime/chan.go:[0-9]+ # 0x[0-9,a-f]+ runtime/pprof_test\.blockChanSend\+0x[0-9,a-f]+ .*/src/pkg/runtime/pprof/pprof_test.go:[0-9]+ # 0x[0-9,a-f]+ runtime/pprof_test\.TestBlockProfile\+0x[0-9,a-f]+ .*/src/pkg/runtime/pprof/pprof_test.go:[0-9]+ `}, diff --git a/src/pkg/runtime/proc.c b/src/pkg/runtime/proc.c index 6767622846..722f44bb1b 100644 --- a/src/pkg/runtime/proc.c +++ b/src/pkg/runtime/proc.c @@ -158,7 +158,6 @@ runtime·schedinit(void) runtime·symtabinit(); runtime·stackinit(); runtime·mallocinit(); - runtime·chaninit(); mcommoninit(g->m); // Initialize the itable value for newErrorCString, diff --git a/src/pkg/runtime/runtime.h b/src/pkg/runtime/runtime.h index dcce369a5c..f12e50cbfb 100644 --- a/src/pkg/runtime/runtime.h +++ b/src/pkg/runtime/runtime.h @@ -63,6 +63,7 @@ typedef uint8 byte; typedef struct Func Func; typedef struct G G; typedef struct Gobuf Gobuf; +typedef struct SudoG SudoG; typedef struct Lock Lock; typedef struct M M; typedef struct P P; @@ -217,6 +218,18 @@ struct Gobuf uintreg ret; uintptr lr; }; +// Known to compiler. +// Changes here must also be made in src/cmd/gc/select.c's selecttype. +struct SudoG +{ + G* g; + uint32* selectdone; + SudoG* link; + byte* elem; // data element + int64 releasetime; + int32 nrelease; // -1 for acquire + SudoG* waitlink; // G.waiting list +}; struct GCStats { // the struct must consist of only uint64's, @@ -285,6 +298,7 @@ struct G uintptr sigpc; uintptr gopc; // pc of go statement that created this goroutine uintptr racectx; + SudoG *waiting; // sudog structures this G is waiting on (that have a valid elem ptr) uintptr end[]; }; @@ -884,7 +898,6 @@ MCache* runtime·allocmcache(void); void runtime·freemcache(MCache*); void runtime·mallocinit(void); void runtime·gcinit(void); -void runtime·chaninit(void); void* runtime·mallocgc(uintptr size, Type* typ, uint32 flag); void runtime·runpanic(Panic*); uintptr runtime·getcallersp(void*); diff --git a/src/pkg/runtime/stack.c b/src/pkg/runtime/stack.c index aeb5fb7211..fc11d98c9b 100644 --- a/src/pkg/runtime/stack.c +++ b/src/pkg/runtime/stack.c @@ -268,8 +268,10 @@ runtime·stackfree(G *gp, void *v, Stktop *top) n = (uintptr)(top+1) - (uintptr)v; if(n & (n-1)) runtime·throw("stack not a power of 2"); - if(StackDebug >= 1) + if(StackDebug >= 1) { runtime·printf("stackfree %p %d\n", v, (int32)n); + runtime·memclr(v, n); // for testing, clobber stack data + } gp->stacksize -= n; if(runtime·debug.efence || StackFromSystem) { if(runtime·debug.efence || StackFaultOnFree) @@ -753,6 +755,21 @@ adjustdefers(G *gp, AdjustInfo *adjinfo) } } +static void +adjustsudogs(G *gp, AdjustInfo *adjinfo) +{ + SudoG *s; + byte *e; + + // the data elements pointed to by a SudoG structure + // might be in the stack. + for(s = gp->waiting; s != nil; s = s->waitlink) { + e = s->elem; + if(adjinfo->oldstk <= e && e < adjinfo->oldbase) + s->elem = e + adjinfo->delta; + } +} + // Copies the top stack segment of gp to a new stack segment of a // different size. The top segment must contain nframes frames. static void @@ -791,6 +808,7 @@ copystack(G *gp, uintptr nframes, uintptr newsize) // adjust other miscellaneous things that have pointers into stacks. adjustctxt(gp, &adjinfo); adjustdefers(gp, &adjinfo); + adjustsudogs(gp, &adjinfo); // copy the stack (including Stktop) to the new location runtime·memmove(newbase - used, oldbase - used, used); @@ -1069,6 +1087,8 @@ runtime·shrinkstack(G *gp) if(gp->m != nil && gp->m->libcallsp != 0) return; #endif + if(StackDebug > 0) + runtime·printf("shrinking stack %D->%D\n", (uint64)oldsize, (uint64)newsize); nframes = copyabletopsegment(gp); if(nframes == -1) return; diff --git a/src/pkg/runtime/stubs.go b/src/pkg/runtime/stubs.go index a31589ca86..e7d7c38bf1 100644 --- a/src/pkg/runtime/stubs.go +++ b/src/pkg/runtime/stubs.go @@ -30,6 +30,18 @@ func racereadrangepc(addr unsafe.Pointer, len int, callpc, pc uintptr) //go:noescape func racewriterangepc(addr unsafe.Pointer, len int, callpc, pc uintptr) +//go:noescape +func raceacquire(addr unsafe.Pointer) + +//go:noescape +func racerelease(addr unsafe.Pointer) + +//go:noescape +func raceacquireg(gp *g, addr unsafe.Pointer) + +//go:noescape +func racereleaseg(gp *g, addr unsafe.Pointer) + // Should be a built-in for unsafe.Pointer? func add(p unsafe.Pointer, x uintptr) unsafe.Pointer { return unsafe.Pointer(uintptr(p) + x) |
