diff options
Diffstat (limited to 'src/pkg')
| -rw-r--r-- | src/pkg/runtime/asm_386.s | 6 | ||||
| -rw-r--r-- | src/pkg/runtime/asm_amd64.s | 6 | ||||
| -rw-r--r-- | src/pkg/runtime/atomic_arm.c | 13 | ||||
| -rw-r--r-- | src/pkg/runtime/chan.c | 18 | ||||
| -rw-r--r-- | src/pkg/runtime/mgc0.c | 2 | ||||
| -rw-r--r-- | src/pkg/runtime/netpoll.goc | 124 | ||||
| -rw-r--r-- | src/pkg/runtime/proc.c | 30 | ||||
| -rw-r--r-- | src/pkg/runtime/runtime.h | 42 | ||||
| -rw-r--r-- | src/pkg/runtime/sema.goc | 6 | ||||
| -rw-r--r-- | src/pkg/runtime/time.goc | 4 |
10 files changed, 173 insertions, 78 deletions
diff --git a/src/pkg/runtime/asm_386.s b/src/pkg/runtime/asm_386.s index 5c642c0ed8..ccd2567fdc 100644 --- a/src/pkg/runtime/asm_386.s +++ b/src/pkg/runtime/asm_386.s @@ -483,6 +483,12 @@ TEXT runtime·xchg(SB), NOSPLIT, $0-8 XCHGL AX, 0(BX) RET +TEXT runtime·xchgp(SB), NOSPLIT, $0-8 + MOVL 4(SP), BX + MOVL 8(SP), AX + XCHGL AX, 0(BX) + RET + TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL 4(SP), AX again: diff --git a/src/pkg/runtime/asm_amd64.s b/src/pkg/runtime/asm_amd64.s index 980bcd4520..17e91c04db 100644 --- a/src/pkg/runtime/asm_amd64.s +++ b/src/pkg/runtime/asm_amd64.s @@ -549,6 +549,12 @@ TEXT runtime·xchg64(SB), NOSPLIT, $0-16 XCHGQ AX, 0(BX) RET +TEXT runtime·xchgp(SB), NOSPLIT, $0-16 + MOVQ 8(SP), BX + MOVQ 16(SP), AX + XCHGQ AX, 0(BX) + RET + TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL 8(SP), AX again: diff --git a/src/pkg/runtime/atomic_arm.c b/src/pkg/runtime/atomic_arm.c index b1e97b27dd..87e88d7563 100644 --- a/src/pkg/runtime/atomic_arm.c +++ b/src/pkg/runtime/atomic_arm.c @@ -42,6 +42,19 @@ runtime·xchg(uint32 volatile* addr, uint32 v) } #pragma textflag NOSPLIT +void* +runtime·xchgp(void* volatile* addr, void* v) +{ + void *old; + + for(;;) { + old = *addr; + if(runtime·cas(addr, old, v)) + return old; + } +} + +#pragma textflag NOSPLIT void runtime·procyield(uint32 cnt) { diff --git a/src/pkg/runtime/chan.c b/src/pkg/runtime/chan.c index bb3388548d..fd382f80f1 100644 --- a/src/pkg/runtime/chan.c +++ b/src/pkg/runtime/chan.c @@ -224,7 +224,7 @@ runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc) mysg.selgen = NOSELGEN; g->param = nil; enqueue(&c->sendq, &mysg); - runtime·park(runtime·unlock, c, "chan send"); + runtime·parkunlock(c, "chan send"); if(g->param == nil) { runtime·lock(c); @@ -252,7 +252,7 @@ asynch: mysg.elem = nil; mysg.selgen = NOSELGEN; enqueue(&c->sendq, &mysg); - runtime·park(runtime·unlock, c, "chan send"); + runtime·parkunlock(c, "chan send"); runtime·lock(c); goto asynch; @@ -356,7 +356,7 @@ runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *receive mysg.selgen = NOSELGEN; g->param = nil; enqueue(&c->recvq, &mysg); - runtime·park(runtime·unlock, c, "chan receive"); + runtime·parkunlock(c, "chan receive"); if(g->param == nil) { runtime·lock(c); @@ -387,7 +387,7 @@ asynch: mysg.elem = nil; mysg.selgen = NOSELGEN; enqueue(&c->recvq, &mysg); - runtime·park(runtime·unlock, c, "chan receive"); + runtime·parkunlock(c, "chan receive"); runtime·lock(c); goto asynch; @@ -799,6 +799,14 @@ selunlock(Select *sel) } } +static bool +selparkcommit(G *gp, void *sel) +{ + USED(gp); + selunlock(sel); + return true; +} + void runtime·block(void) { @@ -971,7 +979,7 @@ loop: } g->param = nil; - runtime·park((void(*)(Lock*))selunlock, (Lock*)sel, "select"); + runtime·park(selparkcommit, sel, "select"); sellock(sel); sg = g->param; diff --git a/src/pkg/runtime/mgc0.c b/src/pkg/runtime/mgc0.c index ebcc364618..2c82fb3ac4 100644 --- a/src/pkg/runtime/mgc0.c +++ b/src/pkg/runtime/mgc0.c @@ -2307,7 +2307,7 @@ runfinq(void) finq = nil; if(fb == nil) { fingwait = 1; - runtime·park(runtime·unlock, &finlock, "finalizer wait"); + runtime·parkunlock(&finlock, "finalizer wait"); continue; } runtime·unlock(&finlock); diff --git a/src/pkg/runtime/netpoll.goc b/src/pkg/runtime/netpoll.goc index 9b5176645a..2830f882d8 100644 --- a/src/pkg/runtime/netpoll.goc +++ b/src/pkg/runtime/netpoll.goc @@ -19,21 +19,40 @@ package net // An implementation must call the following function to denote that the pd is ready. // void runtime·netpollready(G **gpp, PollDesc *pd, int32 mode); +// PollDesc contains 2 binary semaphores, rg and wg, to park reader and writer +// goroutines respectively. The semaphore can be in the following states: +// READY - io readiness notification is pending; +// a goroutine consumes the notification by changing the state to nil. +// WAIT - a goroutine prepares to park on the semaphore, but not yet parked; +// the goroutine commits to park by changing the state to G pointer, +// or, alternatively, concurrent io notification changes the state to READY, +// or, alternatively, concurrent timeout/close changes the state to nil. +// G pointer - the goroutine is blocked on the semaphore; +// io notification or timeout/close changes the state to READY or nil respectively +// and unparks the goroutine. +// nil - nothing of the above. #define READY ((G*)1) +#define WAIT ((G*)2) struct PollDesc { PollDesc* link; // in pollcache, protected by pollcache.Lock + + // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations. + // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime. + // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO rediness notification) + // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated + // in a lock-free way by all operations. Lock; // protectes the following fields uintptr fd; bool closing; uintptr seq; // protects from stale timers and ready notifications - G* rg; // G waiting for read or READY (binary semaphore) + G* rg; // READY, WAIT, G waiting for read or nil Timer rt; // read deadline timer (set if rt.fv != nil) int64 rd; // read deadline - G* wg; // the same for writes - Timer wt; - int64 wd; + G* wg; // READY, WAIT, G waiting for write or nil + Timer wt; // write deadline timer + int64 wd; // write deadline }; static struct @@ -47,7 +66,7 @@ static struct // seq is incremented when deadlines are changed or descriptor is reused. } pollcache; -static bool netpollblock(PollDesc*, int32); +static bool netpollblock(PollDesc*, int32, bool); static G* netpollunblock(PollDesc*, int32, bool); static void deadline(int64, Eface); static void readDeadline(int64, Eface); @@ -97,7 +116,6 @@ func runtime_pollClose(pd *PollDesc) { } func runtime_pollReset(pd *PollDesc, mode int) (err int) { - runtime·lock(pd); err = checkerr(pd, mode); if(err) goto ret; @@ -106,11 +124,9 @@ func runtime_pollReset(pd *PollDesc, mode int) (err int) { else if(mode == 'w') pd->wg = nil; ret: - runtime·unlock(pd); } func runtime_pollWait(pd *PollDesc, mode int) (err int) { - runtime·lock(pd); err = checkerr(pd, mode); if(err == 0) { #ifdef GOOS_solaris @@ -119,7 +135,7 @@ func runtime_pollWait(pd *PollDesc, mode int) (err int) { else if(mode == 'w') runtime·netpollarmwrite(pd->fd); #endif - while(!netpollblock(pd, mode)) { + while(!netpollblock(pd, mode, false)) { err = checkerr(pd, mode); if(err != 0) break; @@ -128,11 +144,9 @@ func runtime_pollWait(pd *PollDesc, mode int) (err int) { // Pretend it has not happened and retry. } } - runtime·unlock(pd); } func runtime_pollWaitCanceled(pd *PollDesc, mode int) { - runtime·lock(pd); #ifdef GOOS_solaris if(mode == 'r') runtime·netpollarmread(pd->fd); @@ -140,9 +154,8 @@ func runtime_pollWaitCanceled(pd *PollDesc, mode int) { runtime·netpollarmwrite(pd->fd); #endif // wait for ioready, ignore closing or timeouts. - while(!netpollblock(pd, mode)) + while(!netpollblock(pd, mode, true)) ; - runtime·unlock(pd); } func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) { @@ -197,7 +210,7 @@ func runtime_pollSetDeadline(pd *PollDesc, d int64, mode int) { } // If we set the new deadline in the past, unblock currently pending IO if any. rg = nil; - wg = nil; + runtime·atomicstorep(&wg, nil); // full memory barrier between stores to rd/wd and load of rg/wg in netpollunblock if(pd->rd < 0) rg = netpollunblock(pd, 'r', false); if(pd->wd < 0) @@ -217,6 +230,7 @@ func runtime_pollUnblock(pd *PollDesc) { runtime·throw("runtime_pollUnblock: already closing"); pd->closing = true; pd->seq++; + runtime·atomicstorep(&rg, nil); // full memory barrier between store to closing and read of rg/wg in netpollunblock rg = netpollunblock(pd, 'r', false); wg = netpollunblock(pd, 'w', false); if(pd->rt.fv) { @@ -247,12 +261,10 @@ runtime·netpollready(G **gpp, PollDesc *pd, int32 mode) G *rg, *wg; rg = wg = nil; - runtime·lock(pd); if(mode == 'r' || mode == 'r'+'w') rg = netpollunblock(pd, 'r', true); if(mode == 'w' || mode == 'r'+'w') wg = netpollunblock(pd, 'w', true); - runtime·unlock(pd); if(rg) { rg->schedlink = *gpp; *gpp = rg; @@ -273,51 +285,75 @@ checkerr(PollDesc *pd, int32 mode) return 0; } +static bool +blockcommit(G *gp, G **gpp) +{ + return runtime·casp(gpp, WAIT, gp); +} + // returns true if IO is ready, or false if timedout or closed +// waitio - wait only for completed IO, ignore errors static bool -netpollblock(PollDesc *pd, int32 mode) +netpollblock(PollDesc *pd, int32 mode, bool waitio) { - G **gpp; + G **gpp, *old; gpp = &pd->rg; if(mode == 'w') gpp = &pd->wg; - if(*gpp == READY) { - *gpp = nil; - return true; + + // set the gpp semaphore to WAIT + for(;;) { + old = *gpp; + if(old == READY) { + *gpp = nil; + return true; + } + if(old != nil) + runtime·throw("netpollblock: double wait"); + if(runtime·casp(gpp, nil, WAIT)) + break; } - if(*gpp != nil) - runtime·throw("netpollblock: double wait"); - *gpp = g; - runtime·park(runtime·unlock, &pd->Lock, "IO wait"); - runtime·lock(pd); - if(g->param) - return true; - return false; + + // need to recheck error states after setting gpp to WAIT + // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl + // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg + if(waitio || checkerr(pd, mode) == 0) + runtime·park((bool(*)(G*, void*))blockcommit, gpp, "IO wait"); + // be careful to not lose concurrent READY notification + old = runtime·xchgp(gpp, nil); + if(old > WAIT) + runtime·throw("netpollblock: corrupted state"); + return old == READY; } static G* netpollunblock(PollDesc *pd, int32 mode, bool ioready) { - G **gpp, *old; + G **gpp, *old, *new; gpp = &pd->rg; if(mode == 'w') gpp = &pd->wg; - if(*gpp == READY) - return nil; - if(*gpp == nil) { - // Only set READY for ioready. runtime_pollWait - // will check for timeout/cancel before waiting. + + for(;;) { + old = *gpp; + if(old == READY) + return nil; + if(old == nil && !ioready) { + // Only set READY for ioready. runtime_pollWait + // will check for timeout/cancel before waiting. + return nil; + } + new = nil; if(ioready) - *gpp = READY; - return nil; + new = READY; + if(runtime·casp(gpp, old, new)) + break; } - old = *gpp; - // pass unblock reason onto blocked g - old->param = (void*)ioready; - *gpp = nil; - return old; + if(old > WAIT) + return old; // must be G* + return nil; } static void @@ -343,14 +379,14 @@ deadlineimpl(int64 now, Eface arg, bool read, bool write) if(pd->rd <= 0 || pd->rt.fv == nil) runtime·throw("deadlineimpl: inconsistent read deadline"); pd->rd = -1; - pd->rt.fv = nil; + runtime·atomicstorep(&pd->rt.fv, nil); // full memory barrier between store to rd and load of rg in netpollunblock rg = netpollunblock(pd, 'r', false); } if(write) { if(pd->wd <= 0 || (pd->wt.fv == nil && !read)) runtime·throw("deadlineimpl: inconsistent write deadline"); pd->wd = -1; - pd->wt.fv = nil; + runtime·atomicstorep(&pd->wt.fv, nil); // full memory barrier between store to wd and load of wg in netpollunblock wg = netpollunblock(pd, 'w', false); } runtime·unlock(pd); diff --git a/src/pkg/runtime/proc.c b/src/pkg/runtime/proc.c index 9eb4ad9f95..24feda4183 100644 --- a/src/pkg/runtime/proc.c +++ b/src/pkg/runtime/proc.c @@ -1353,10 +1353,10 @@ top: execute(gp); } -// Puts the current goroutine into a waiting state and unlocks the lock. -// The goroutine can be made runnable again by calling runtime·ready(gp). +// Puts the current goroutine into a waiting state and calls unlockf. +// If unlockf returns false, the goroutine is resumed. void -runtime·park(void(*unlockf)(Lock*), Lock *lock, int8 *reason) +runtime·park(bool(*unlockf)(G*, void*), void *lock, int8 *reason) { m->waitlock = lock; m->waitunlockf = unlockf; @@ -1364,17 +1364,39 @@ runtime·park(void(*unlockf)(Lock*), Lock *lock, int8 *reason) runtime·mcall(park0); } +static bool +parkunlock(G *gp, void *lock) +{ + USED(gp); + runtime·unlock(lock); + return true; +} + +// Puts the current goroutine into a waiting state and unlocks the lock. +// The goroutine can be made runnable again by calling runtime·ready(gp). +void +runtime·parkunlock(Lock *lock, int8 *reason) +{ + runtime·park(parkunlock, lock, reason); +} + // runtime·park continuation on g0. static void park0(G *gp) { + bool ok; + gp->status = Gwaiting; gp->m = nil; m->curg = nil; if(m->waitunlockf) { - m->waitunlockf(m->waitlock); + ok = m->waitunlockf(gp, m->waitlock); m->waitunlockf = nil; m->waitlock = nil; + if(!ok) { + gp->status = Grunnable; + execute(gp); // Schedule it back, never returns. + } } if(m->lockedg) { stoplockedm(); diff --git a/src/pkg/runtime/runtime.h b/src/pkg/runtime/runtime.h index 5e3c0c497f..c4c47964b9 100644 --- a/src/pkg/runtime/runtime.h +++ b/src/pkg/runtime/runtime.h @@ -339,7 +339,7 @@ struct M GCStats gcstats; bool racecall; bool needextram; - void (*waitunlockf)(Lock*); + bool (*waitunlockf)(G*, void*); void* waitlock; uintptr settype_buf[1024]; @@ -790,21 +790,6 @@ int32 runtime·read(int32, void*, int32); int32 runtime·write(int32, void*, int32); int32 runtime·close(int32); int32 runtime·mincore(void*, uintptr, byte*); -bool runtime·cas(uint32*, uint32, uint32); -bool runtime·cas64(uint64*, uint64, uint64); -bool runtime·casp(void**, void*, void*); -// Don't confuse with XADD x86 instruction, -// this one is actually 'addx', that is, add-and-fetch. -uint32 runtime·xadd(uint32 volatile*, int32); -uint64 runtime·xadd64(uint64 volatile*, int64); -uint32 runtime·xchg(uint32 volatile*, uint32); -uint64 runtime·xchg64(uint64 volatile*, uint64); -uint32 runtime·atomicload(uint32 volatile*); -void runtime·atomicstore(uint32 volatile*, uint32); -void runtime·atomicstore64(uint64 volatile*, uint64); -uint64 runtime·atomicload64(uint64 volatile*); -void* runtime·atomicloadp(void* volatile*); -void runtime·atomicstorep(void* volatile*, void*); void runtime·jmpdefer(FuncVal*, void*); void runtime·exit1(int32); void runtime·ready(G*); @@ -845,14 +830,33 @@ uint32 runtime·fastrand1(void); void runtime·rewindmorestack(Gobuf*); int32 runtime·timediv(int64, int32, int32*); -void runtime·setmg(M*, G*); -void runtime·newextram(void); +// atomic operations +bool runtime·cas(uint32*, uint32, uint32); +bool runtime·cas64(uint64*, uint64, uint64); +bool runtime·casp(void**, void*, void*); +// Don't confuse with XADD x86 instruction, +// this one is actually 'addx', that is, add-and-fetch. +uint32 runtime·xadd(uint32 volatile*, int32); +uint64 runtime·xadd64(uint64 volatile*, int64); +uint32 runtime·xchg(uint32 volatile*, uint32); +uint64 runtime·xchg64(uint64 volatile*, uint64); +void* runtime·xchgp(void* volatile*, void*); +uint32 runtime·atomicload(uint32 volatile*); +void runtime·atomicstore(uint32 volatile*, uint32); +void runtime·atomicstore64(uint64 volatile*, uint64); +uint64 runtime·atomicload64(uint64 volatile*); +void* runtime·atomicloadp(void* volatile*); +void runtime·atomicstorep(void* volatile*, void*); + +void runtime·setmg(M*, G*); +void runtime·newextram(void); void runtime·exit(int32); void runtime·breakpoint(void); void runtime·gosched(void); void runtime·gosched0(G*); void runtime·schedtrace(bool); -void runtime·park(void(*)(Lock*), Lock*, int8*); +void runtime·park(bool(*)(G*, void*), void*, int8*); +void runtime·parkunlock(Lock*, int8*); void runtime·tsleep(int64, int8*); M* runtime·newm(void); void runtime·goexit(void); diff --git a/src/pkg/runtime/sema.goc b/src/pkg/runtime/sema.goc index 57f32a0ddb..c1e8e4e18b 100644 --- a/src/pkg/runtime/sema.goc +++ b/src/pkg/runtime/sema.goc @@ -137,7 +137,7 @@ runtime·semacquire(uint32 volatile *addr, bool profile) // Any semrelease after the cansemacquire knows we're waiting // (we set nwait above), so go to sleep. semqueue(root, addr, &s); - runtime·park(runtime·unlock, root, "semacquire"); + runtime·parkunlock(root, "semacquire"); if(cansemacquire(addr)) { if(t0) runtime·blockevent(s.releasetime - t0, 3); @@ -254,7 +254,7 @@ func runtime_Syncsemacquire(s *SyncSema) { else s->tail->next = &w; s->tail = &w; - runtime·park(runtime·unlock, s, "semacquire"); + runtime·parkunlock(s, "semacquire"); if(t0) runtime·blockevent(w.releasetime - t0, 2); } @@ -288,7 +288,7 @@ func runtime_Syncsemrelease(s *SyncSema, n uint32) { else s->tail->next = &w; s->tail = &w; - runtime·park(runtime·unlock, s, "semarelease"); + runtime·parkunlock(s, "semarelease"); } else runtime·unlock(s); } diff --git a/src/pkg/runtime/time.goc b/src/pkg/runtime/time.goc index b575696f71..d52a3b3217 100644 --- a/src/pkg/runtime/time.goc +++ b/src/pkg/runtime/time.goc @@ -76,7 +76,7 @@ runtime·tsleep(int64 ns, int8 *reason) t.arg.data = g; runtime·lock(&timers); addtimer(&t); - runtime·park(runtime·unlock, &timers, reason); + runtime·parkunlock(&timers, reason); } static FuncVal timerprocv = {timerproc}; @@ -222,7 +222,7 @@ timerproc(void) if(delta < 0) { // No timers left - put goroutine to sleep. timers.rescheduling = true; - runtime·park(runtime·unlock, &timers, "timer goroutine (idle)"); + runtime·parkunlock(&timers, "timer goroutine (idle)"); continue; } // At least one timer pending. Sleep until then. |
