aboutsummaryrefslogtreecommitdiff
path: root/src/pkg/runtime
diff options
context:
space:
mode:
authorRuss Cox <rsc@golang.org>2011-11-09 15:17:05 -0500
committerRuss Cox <rsc@golang.org>2011-11-09 15:17:05 -0500
commit3b860269eeb0b2d6176da5c972139b7c21d5251b (patch)
tree247969f3d82d13d31928f470ec4c4cd6a7616847 /src/pkg/runtime
parentfbfed49134bca038184dbc1a427e82647fc1f12e (diff)
downloadgo-3b860269eeb0b2d6176da5c972139b7c21d5251b.tar.xz
runtime: add timer support, use for package time
This looks like it is just moving some code from time to runtime (and translating it to C), but the runtime can do a better job managing the goroutines, and it needs this functionality for its own maintenance (for example, for the garbage collector to hand back unused memory to the OS on a time delay). Might as well have just one copy of the timer logic, and runtime can't depend on time, so vice versa. It also unifies Sleep, NewTicker, and NewTimer behind one mechanism, so that there are no claims that one is more efficient than another. (For example, today people recommend using time.After instead of time.Sleep to avoid blocking an OS thread.) Fixes #1644. Fixes #1731. Fixes #2190. R=golang-dev, r, hectorchu, iant, iant, jsing, alex.brainman, dvyukov CC=golang-dev https://golang.org/cl/5334051
Diffstat (limited to 'src/pkg/runtime')
-rw-r--r--src/pkg/runtime/darwin/os.h2
-rw-r--r--src/pkg/runtime/darwin/thread.c20
-rw-r--r--src/pkg/runtime/freebsd/thread.c15
-rw-r--r--src/pkg/runtime/linux/thread.c18
-rw-r--r--src/pkg/runtime/lock_futex.c60
-rw-r--r--src/pkg/runtime/lock_sema.c125
-rw-r--r--src/pkg/runtime/openbsd/thread.c47
-rw-r--r--src/pkg/runtime/plan9/thread.c45
-rw-r--r--src/pkg/runtime/proc.c11
-rw-r--r--src/pkg/runtime/runtime.h51
-rw-r--r--src/pkg/runtime/time.goc232
-rw-r--r--src/pkg/runtime/windows/thread.c23
12 files changed, 578 insertions, 71 deletions
diff --git a/src/pkg/runtime/darwin/os.h b/src/pkg/runtime/darwin/os.h
index 37160f779c..3e96071ba3 100644
--- a/src/pkg/runtime/darwin/os.h
+++ b/src/pkg/runtime/darwin/os.h
@@ -9,7 +9,7 @@ int32 runtime·bsdthread_create(void*, M*, G*, void(*)(void));
void runtime·bsdthread_register(void);
int32 runtime·mach_msg_trap(MachHeader*, int32, uint32, uint32, uint32, uint32, uint32);
uint32 runtime·mach_reply_port(void);
-void runtime·mach_semacquire(uint32);
+int32 runtime·mach_semacquire(uint32, int64);
uint32 runtime·mach_semcreate(void);
void runtime·mach_semdestroy(uint32);
void runtime·mach_semrelease(uint32);
diff --git a/src/pkg/runtime/darwin/thread.c b/src/pkg/runtime/darwin/thread.c
index 92cc051e3f..4665408725 100644
--- a/src/pkg/runtime/darwin/thread.c
+++ b/src/pkg/runtime/darwin/thread.c
@@ -17,10 +17,10 @@ unimplemented(int8 *name)
*(int32*)1231 = 1231;
}
-void
-runtime·semasleep(void)
+int32
+runtime·semasleep(int64 ns)
{
- runtime·mach_semacquire(m->waitsema);
+ return runtime·mach_semacquire(m->waitsema, ns);
}
void
@@ -252,6 +252,7 @@ enum
// Mach calls that get interrupted by Unix signals
// return this error code. We retry them.
KERN_ABORTED = 14,
+ KERN_OPERATION_TIMED_OUT = 49,
};
typedef struct Tmach_semcreateMsg Tmach_semcreateMsg;
@@ -343,16 +344,25 @@ int32 runtime·mach_semaphore_timedwait(uint32 sema, uint32 sec, uint32 nsec);
int32 runtime·mach_semaphore_signal(uint32 sema);
int32 runtime·mach_semaphore_signal_all(uint32 sema);
-void
-runtime·mach_semacquire(uint32 sem)
+int32
+runtime·mach_semacquire(uint32 sem, int64 ns)
{
int32 r;
+ if(ns >= 0) {
+ r = runtime·mach_semaphore_timedwait(sem, ns/1000000000LL, ns%1000000000LL);
+ if(r == KERN_ABORTED || r == KERN_OPERATION_TIMED_OUT)
+ return -1;
+ if(r != 0)
+ macherror(r, "semaphore_wait");
+ return 0;
+ }
while((r = runtime·mach_semaphore_wait(sem)) != 0) {
if(r == KERN_ABORTED) // interrupted
continue;
macherror(r, "semaphore_wait");
}
+ return 0;
}
void
diff --git a/src/pkg/runtime/freebsd/thread.c b/src/pkg/runtime/freebsd/thread.c
index 8e60a11d0b..4a52a83570 100644
--- a/src/pkg/runtime/freebsd/thread.c
+++ b/src/pkg/runtime/freebsd/thread.c
@@ -10,14 +10,23 @@ extern SigTab runtime·sigtab[];
extern int32 runtime·sys_umtx_op(uint32*, int32, uint32, void*, void*);
// FreeBSD's umtx_op syscall is effectively the same as Linux's futex, and
-// thus the code is largely similar. See linux/thread.c for comments.
+// thus the code is largely similar. See linux/thread.c and lock_futex.c for comments.
void
-runtime·futexsleep(uint32 *addr, uint32 val)
+runtime·futexsleep(uint32 *addr, uint32 val, int64 ns)
{
int32 ret;
+ Timespec ts, *tsp;
- ret = runtime·sys_umtx_op(addr, UMTX_OP_WAIT, val, nil, nil);
+ if(ns < 0)
+ tsp = nil;
+ else {
+ ts.sec = ns / 1000000000LL;
+ ts.nsec = ns % 1000000000LL;
+ tsp = &ts;
+ }
+
+ ret = runtime·sys_umtx_op(addr, UMTX_OP_WAIT, val, nil, tsp);
if(ret >= 0 || ret == -EINTR)
return;
diff --git a/src/pkg/runtime/linux/thread.c b/src/pkg/runtime/linux/thread.c
index b24aa4f453..5502bbbcab 100644
--- a/src/pkg/runtime/linux/thread.c
+++ b/src/pkg/runtime/linux/thread.c
@@ -34,15 +34,29 @@ enum
// Atomically,
// if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
+// Don't sleep longer than ns; ns < 0 means forever.
void
-runtime·futexsleep(uint32 *addr, uint32 val)
+runtime·futexsleep(uint32 *addr, uint32 val, int64 ns)
{
+ Timespec ts, *tsp;
+
+ if(ns < 0)
+ tsp = nil;
+ else {
+ ts.tv_sec = ns/1000000000LL;
+ ts.tv_nsec = ns%1000000000LL;
+ // Avoid overflow
+ if(ts.tv_sec > 1<<30)
+ ts.tv_sec = 1<<30;
+ tsp = &ts;
+ }
+
// Some Linux kernels have a bug where futex of
// FUTEX_WAIT returns an internal error code
// as an errno. Libpthread ignores the return value
// here, and so can we: as it says a few lines up,
// spurious wakeups are allowed.
- runtime·futex(addr, FUTEX_WAIT, val, nil, nil, 0);
+ runtime·futex(addr, FUTEX_WAIT, val, tsp, nil, 0);
}
// If any procs are sleeping on addr, wake up at most cnt.
diff --git a/src/pkg/runtime/lock_futex.c b/src/pkg/runtime/lock_futex.c
index e4d6c6aedf..375b7d7d7c 100644
--- a/src/pkg/runtime/lock_futex.c
+++ b/src/pkg/runtime/lock_futex.c
@@ -4,25 +4,28 @@
#include "runtime.h"
+// This implementation depends on OS-specific implementations of
+//
+// runtime.futexsleep(uint32 *addr, uint32 val, int64 ns)
+// Atomically,
+// if(*addr == val) sleep
+// Might be woken up spuriously; that's allowed.
+// Don't sleep longer than ns; ns < 0 means forever.
+//
+// runtime.futexwakeup(uint32 *addr, uint32 cnt)
+// If any procs are sleeping on addr, wake up at most cnt.
+
enum
{
MUTEX_UNLOCKED = 0,
MUTEX_LOCKED = 1,
MUTEX_SLEEPING = 2,
-
+
ACTIVE_SPIN = 4,
ACTIVE_SPIN_CNT = 30,
PASSIVE_SPIN = 1,
};
-// Atomically,
-// if(*addr == val) sleep
-// Might be woken up spuriously; that's allowed.
-void runtime·futexsleep(uint32 *addr, uint32 val);
-
-// If any procs are sleeping on addr, wake up at most cnt.
-void runtime·futexwakeup(uint32 *addr, uint32 cnt);
-
// Possible lock states are MUTEX_UNLOCKED, MUTEX_LOCKED and MUTEX_SLEEPING.
// MUTEX_SLEEPING means that there is presumably at least one sleeping thread.
// Note that there can be spinning threads during all states - they do not
@@ -39,7 +42,7 @@ runtime·lock(Lock *l)
v = runtime·xchg(&l->key, MUTEX_LOCKED);
if(v == MUTEX_UNLOCKED)
return;
-
+
// wait is either MUTEX_LOCKED or MUTEX_SLEEPING
// depending on whether there is a thread sleeping
// on this mutex. If we ever change l->key from
@@ -48,13 +51,13 @@ runtime·lock(Lock *l)
// returning, to ensure that the sleeping thread gets
// its wakeup call.
wait = v;
-
+
// On uniprocessor's, no point spinning.
// On multiprocessors, spin for ACTIVE_SPIN attempts.
spin = 0;
if(runtime·ncpu > 1)
spin = ACTIVE_SPIN;
-
+
for(;;) {
// Try for lock, spinning.
for(i = 0; i < spin; i++) {
@@ -63,7 +66,7 @@ runtime·lock(Lock *l)
return;
runtime·procyield(ACTIVE_SPIN_CNT);
}
-
+
// Try for lock, rescheduling.
for(i=0; i < PASSIVE_SPIN; i++) {
while(l->key == MUTEX_UNLOCKED)
@@ -71,13 +74,13 @@ runtime·lock(Lock *l)
return;
runtime·osyield();
}
-
+
// Sleep.
v = runtime·xchg(&l->key, MUTEX_SLEEPING);
if(v == MUTEX_UNLOCKED)
return;
wait = MUTEX_SLEEPING;
- runtime·futexsleep(&l->key, MUTEX_SLEEPING);
+ runtime·futexsleep(&l->key, MUTEX_SLEEPING, -1);
}
}
@@ -114,5 +117,30 @@ void
runtime·notesleep(Note *n)
{
while(runtime·atomicload(&n->key) == 0)
- runtime·futexsleep(&n->key, 0);
+ runtime·futexsleep(&n->key, 0, -1);
+}
+
+void
+runtime·notetsleep(Note *n, int64 ns)
+{
+ int64 deadline, now;
+
+ if(ns < 0) {
+ runtime·notesleep(n);
+ return;
+ }
+
+ if(runtime·atomicload(&n->key) != 0)
+ return;
+
+ deadline = runtime·nanotime() + ns;
+ for(;;) {
+ runtime·futexsleep(&n->key, 0, ns);
+ if(runtime·atomicload(&n->key) != 0)
+ return;
+ now = runtime·nanotime();
+ if(now >= deadline)
+ return;
+ ns = deadline - now;
+ }
}
diff --git a/src/pkg/runtime/lock_sema.c b/src/pkg/runtime/lock_sema.c
index c8f8b05ce9..8875b17a24 100644
--- a/src/pkg/runtime/lock_sema.c
+++ b/src/pkg/runtime/lock_sema.c
@@ -4,6 +4,22 @@
#include "runtime.h"
+// This implementation depends on OS-specific implementations of
+//
+// uintptr runtime.semacreate(void)
+// Create a semaphore, which will be assigned to m->waitsema.
+// The zero value is treated as absence of any semaphore,
+// so be sure to return a non-zero value.
+//
+// int32 runtime.semasleep(int64 ns)
+// If ns < 0, acquire m->waitsema and return 0.
+// If ns >= 0, try to acquire m->waitsema for at most ns nanoseconds.
+// Return 0 if the semaphore was acquired, -1 if interrupted or timed out.
+//
+// int32 runtime.semawakeup(M *mp)
+// Wake up mp, which is or will soon be sleeping on mp->waitsema.
+//
+
enum
{
LOCKED = 1,
@@ -13,13 +29,6 @@ enum
PASSIVE_SPIN = 1,
};
-// creates per-M semaphore (must not return 0)
-uintptr runtime·semacreate(void);
-// acquires per-M semaphore
-void runtime·semasleep(void);
-// releases mp's per-M semaphore
-void runtime·semawakeup(M *mp);
-
void
runtime·lock(Lock *l)
{
@@ -35,13 +44,13 @@ runtime·lock(Lock *l)
if(m->waitsema == 0)
m->waitsema = runtime·semacreate();
-
+
// On uniprocessor's, no point spinning.
// On multiprocessors, spin for ACTIVE_SPIN attempts.
spin = 0;
if(runtime·ncpu > 1)
spin = ACTIVE_SPIN;
-
+
for(i=0;; i++) {
v = (uintptr)runtime·atomicloadp(&l->waitm);
if((v&LOCKED) == 0) {
@@ -68,11 +77,11 @@ unlocked:
goto unlocked;
}
if(v&LOCKED) {
- // Wait.
- runtime·semasleep();
+ // Queued. Wait.
+ runtime·semasleep(-1);
i = 0;
}
- }
+ }
}
}
@@ -95,7 +104,7 @@ runtime·unlock(Lock *l)
// Dequeue an M.
mp = (void*)(v&~LOCKED);
if(runtime·casp(&l->waitm, (void*)v, mp->nextwaitm)) {
- // Wake that M.
+ // Dequeued an M. Wake it.
runtime·semawakeup(mp);
break;
}
@@ -113,9 +122,23 @@ runtime·noteclear(Note *n)
void
runtime·notewakeup(Note *n)
{
- if(runtime·casp(&n->waitm, nil, (void*)LOCKED))
- return;
- runtime·semawakeup(n->waitm);
+ M *mp;
+
+ do
+ mp = runtime·atomicloadp(&n->waitm);
+ while(!runtime·casp(&n->waitm, mp, (void*)LOCKED));
+
+ // Successfully set waitm to LOCKED.
+ // What was it before?
+ if(mp == nil) {
+ // Nothing was waiting. Done.
+ } else if(mp == (M*)LOCKED) {
+ // Two notewakeups! Not allowed.
+ runtime·throw("notewakeup - double wakeup");
+ } else {
+ // Must be the waiting m. Wake it up.
+ runtime·semawakeup(mp);
+ }
}
void
@@ -123,6 +146,72 @@ runtime·notesleep(Note *n)
{
if(m->waitsema == 0)
m->waitsema = runtime·semacreate();
- if(runtime·casp(&n->waitm, nil, m))
- runtime·semasleep();
+ if(!runtime·casp(&n->waitm, nil, m)) { // must be LOCKED (got wakeup)
+ if(n->waitm != (void*)LOCKED)
+ runtime·throw("notesleep - waitm out of sync");
+ return;
+ }
+ // Queued. Sleep.
+ runtime·semasleep(-1);
+}
+
+void
+runtime·notetsleep(Note *n, int64 ns)
+{
+ M *mp;
+ int64 deadline, now;
+
+ if(ns < 0) {
+ runtime·notesleep(n);
+ return;
+ }
+
+ if(m->waitsema == 0)
+ m->waitsema = runtime·semacreate();
+
+ // Register for wakeup on n->waitm.
+ if(!runtime·casp(&n->waitm, nil, m)) { // must be LOCKED (got wakeup already)
+ if(n->waitm != (void*)LOCKED)
+ runtime·throw("notetsleep - waitm out of sync");
+ return;
+ }
+
+ deadline = runtime·nanotime() + ns;
+ for(;;) {
+ // Registered. Sleep.
+ if(runtime·semasleep(ns) >= 0) {
+ // Acquired semaphore, semawakeup unregistered us.
+ // Done.
+ return;
+ }
+
+ // Interrupted or timed out. Still registered. Semaphore not acquired.
+ now = runtime·nanotime();
+ if(now >= deadline)
+ break;
+
+ // Deadline hasn't arrived. Keep sleeping.
+ ns = deadline - now;
+ }
+
+ // Deadline arrived. Still registered. Semaphore not acquired.
+ // Want to give up and return, but have to unregister first,
+ // so that any notewakeup racing with the return does not
+ // try to grant us the semaphore when we don't expect it.
+ for(;;) {
+ mp = runtime·atomicloadp(&n->waitm);
+ if(mp == m) {
+ // No wakeup yet; unregister if possible.
+ if(runtime·casp(&n->waitm, mp, nil))
+ return;
+ } else if(mp == (M*)LOCKED) {
+ // Wakeup happened so semaphore is available.
+ // Grab it to avoid getting out of sync.
+ if(runtime·semasleep(-1) < 0)
+ runtime·throw("runtime: unable to acquire - semaphore out of sync");
+ return;
+ } else {
+ runtime·throw("runtime: unexpected waitm - semaphore out of sync");
+ }
+ }
}
diff --git a/src/pkg/runtime/openbsd/thread.c b/src/pkg/runtime/openbsd/thread.c
index e16bc47627..fd8cbfd033 100644
--- a/src/pkg/runtime/openbsd/thread.c
+++ b/src/pkg/runtime/openbsd/thread.c
@@ -62,21 +62,52 @@ runtime·semacreate(void)
return 1;
}
-void
-runtime·semasleep(void)
+int32
+runtime·semasleep(int64 ns)
{
-retry:
+ Timespec ts;
+
// spin-mutex lock
while(runtime·xchg(&m->waitsemalock, 1))
runtime·osyield();
- if(m->waitsemacount == 0) {
- // the function unlocks the spinlock
- runtime·thrsleep(&m->waitsemacount, 0, nil, &m->waitsemalock);
- goto retry;
+
+ for(;;) {
+ // lock held
+ if(m->waitsemacount == 0) {
+ // sleep until semaphore != 0 or timeout.
+ // thrsleep unlocks m->waitsemalock.
+ if(ns < 0)
+ runtime·thrsleep(&m->waitsemacount, 0, nil, &m->waitsemalock);
+ else {
+ ts.tv_sec = ns/1000000000LL;
+ ts.tv_nsec = ns%1000000000LL;
+ runtime·thrsleep(&m->waitsemacount, CLOCK_REALTIME, &ts, &m->waitsemalock);
+ }
+ // reacquire lock
+ while(runtime·xchg(&m->waitsemalock, 1))
+ runtime·osyield();
+ }
+
+ // lock held (again)
+ if(m->waitsemacount != 0) {
+ // semaphore is available.
+ m->waitsemacount--;
+ // spin-mutex unlock
+ runtime·atomicstore(&m->waitsemalock, 0);
+ return 0; // semaphore acquired
+ }
+
+ // semaphore not available.
+ // if there is a timeout, stop now.
+ // otherwise keep trying.
+ if(ns >= 0)
+ break;
}
- m->waitsemacount--;
+
+ // lock held but giving up
// spin-mutex unlock
runtime·atomicstore(&m->waitsemalock, 0);
+ return -1;
}
void
diff --git a/src/pkg/runtime/plan9/thread.c b/src/pkg/runtime/plan9/thread.c
index 29ac5f2dc7..8ad06ca1e4 100644
--- a/src/pkg/runtime/plan9/thread.c
+++ b/src/pkg/runtime/plan9/thread.c
@@ -78,7 +78,7 @@ runtime·exit(int32)
uint8 tmp[16];
uint8 *p, *q;
int32 pid;
-
+
runtime·memclr(buf, sizeof buf);
runtime·memclr(tmp, sizeof tmp);
pid = _tos->pid;
@@ -94,7 +94,7 @@ runtime·exit(int32)
for(q--; q >= tmp;)
*p++ = *q--;
runtime·memmove((void*)p, (void*)"/notepg", 7);
-
+
/* post interrupt note */
fd = runtime·open(buf, OWRITE);
runtime·write(fd, "interrupt", 9);
@@ -108,8 +108,8 @@ runtime·newosproc(M *m, G *g, void *stk, void (*fn)(void))
if(0){
runtime·printf("newosproc stk=%p m=%p g=%p fn=%p rfork=%p id=%d/%d ostk=%p\n",
stk, m, g, fn, runtime·rfork, m->id, m->tls[0], &m);
- }
-
+ }
+
if(runtime·rfork(RFPROC|RFMEM|RFNOWAIT, stk, m, g, fn) < 0)
runtime·throw("newosproc: rfork failed");
}
@@ -120,12 +120,45 @@ runtime·semacreate(void)
return 1;
}
-void
-runtime·semasleep(void)
+int32
+runtime·semasleep(int64 ns)
{
+ int32 ret;
+ int32 ms;
+
+ if(ns >= 0) {
+ // TODO: Plan 9 needs a new system call, tsemacquire.
+ // The kernel implementation is the same as semacquire
+ // except with a tsleep and check for timeout.
+ // It would be great if the implementation returned the
+ // value that was added to the semaphore, so that on
+ // timeout the return value would be 0, on success 1.
+ // Then the error string does not have to be parsed
+ // to detect timeout.
+ //
+ // If a negative time indicates no timeout, then
+ // semacquire can be implemented (in the kernel)
+ // as tsemacquire(p, v, -1).
+ runtime·throw("semasleep: timed sleep not implemented on Plan 9");
+
+ /*
+ if(ns < 0)
+ ms = -1;
+ else if(ns/1000 > 0x7fffffffll)
+ ms = 0x7fffffff;
+ else
+ ms = ns/1000;
+ ret = runtime·plan9_tsemacquire(&m->waitsemacount, 1, ms);
+ if(ret == 1)
+ return 0; // success
+ return -1; // timeout or interrupted
+ */
+ }
+
while(runtime·plan9_semacquire(&m->waitsemacount, 1) < 0) {
/* interrupted; try again */
}
+ return 0; // success
}
void
diff --git a/src/pkg/runtime/proc.c b/src/pkg/runtime/proc.c
index bd56c7f27e..2f8a40a2d9 100644
--- a/src/pkg/runtime/proc.c
+++ b/src/pkg/runtime/proc.c
@@ -15,7 +15,6 @@ static void unwindstack(G*, byte*);
static void schedule(G*);
static void acquireproc(void);
static void releaseproc(void);
-static M *startm(void);
typedef struct Sched Sched;
@@ -72,7 +71,7 @@ struct Sched {
volatile uint32 atomic; // atomic scheduling word (see below)
int32 profilehz; // cpu profiling rate
-
+
bool init; // running initialization
bool lockmain; // init called runtime.LockOSThread
@@ -701,7 +700,7 @@ runtime·starttheworld(bool extra)
// but m is not running a specific goroutine,
// so set the helpgc flag as a signal to m's
// first schedule(nil) to mcpu-- and grunning--.
- m = startm();
+ m = runtime·newm();
m->helpgc = 1;
runtime·sched.grunning++;
}
@@ -756,14 +755,14 @@ matchmg(void)
// Find the m that will run gp.
if((mp = mget(gp)) == nil)
- mp = startm();
+ mp = runtime·newm();
mnextg(mp, gp);
}
}
// Create a new m. It will start off with a call to runtime·mstart.
-static M*
-startm(void)
+M*
+runtime·newm(void)
{
M *m;
diff --git a/src/pkg/runtime/runtime.h b/src/pkg/runtime/runtime.h
index da80b99eb8..9324ef76bb 100644
--- a/src/pkg/runtime/runtime.h
+++ b/src/pkg/runtime/runtime.h
@@ -70,6 +70,8 @@ typedef struct Hchan Hchan;
typedef struct Complex64 Complex64;
typedef struct Complex128 Complex128;
typedef struct WinCall WinCall;
+typedef struct Timers Timers;
+typedef struct Timer Timer;
/*
* per-cpu declaration.
@@ -239,7 +241,7 @@ struct M
uintptr waitsema; // semaphore for parking on locks
uint32 waitsemacount;
uint32 waitsemalock;
-
+
#ifdef __WINDOWS__
void* thread; // thread handle
#endif
@@ -315,6 +317,33 @@ enum {
};
#endif
+struct Timers
+{
+ Lock;
+ G *timerproc;
+ bool sleeping;
+ bool rescheduling;
+ Note waitnote;
+ Timer **t;
+ int32 len;
+ int32 cap;
+};
+
+// Package time knows the layout of this structure.
+// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.
+struct Timer
+{
+ int32 i; // heap index
+
+ // Timer wakes up at when, and then at when+period, ... (period > 0 only)
+ // each time calling f(now, arg) in the timer goroutine, so f must be
+ // a well-behaved function and not block.
+ int64 when;
+ int64 period;
+ void (*f)(int64, Eface);
+ Eface arg;
+};
+
/*
* defined macros
* you need super-gopher-guru privilege
@@ -483,6 +512,8 @@ uint32 runtime·fastrand1(void);
void runtime·exit(int32);
void runtime·breakpoint(void);
void runtime·gosched(void);
+void runtime·tsleep(int64);
+M* runtime·newm(void);
void runtime·goexit(void);
void runtime·asmcgocall(void (*fn)(void*), void*);
void runtime·entersyscall(void);
@@ -540,10 +571,28 @@ void runtime·unlock(Lock*);
* subsequent noteclear must be called only after
* previous notesleep has returned, e.g. it's disallowed
* to call noteclear straight after notewakeup.
+ *
+ * notetsleep is like notesleep but wakes up after
+ * a given number of nanoseconds even if the event
+ * has not yet happened. if a goroutine uses notetsleep to
+ * wake up early, it must wait to call noteclear until it
+ * can be sure that no other goroutine is calling
+ * notewakeup.
*/
void runtime·noteclear(Note*);
void runtime·notesleep(Note*);
void runtime·notewakeup(Note*);
+void runtime·notetsleep(Note*, int64);
+
+/*
+ * low-level synchronization for implementing the above
+ */
+uintptr runtime·semacreate(void);
+int32 runtime·semasleep(int64);
+void runtime·semawakeup(M*);
+// or
+void runtime·futexsleep(uint32*, uint32, int64);
+void runtime·futexwakeup(uint32*, uint32);
/*
* This is consistent across Linux and BSD.
diff --git a/src/pkg/runtime/time.goc b/src/pkg/runtime/time.goc
index a620f2b328..5904f887fe 100644
--- a/src/pkg/runtime/time.goc
+++ b/src/pkg/runtime/time.goc
@@ -2,12 +2,242 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-// Runtime implementations to help package time.
+// Time-related runtime and pieces of package time.
package time
#include "runtime.h"
+#include "defs.h"
+#include "os.h"
+#include "arch.h"
+#include "malloc.h"
+static Timers timers;
+static void addtimer(Timer*);
+static bool deltimer(Timer*);
+
+// Package time APIs.
+// Godoc uses the comments in package time, not these.
+
+// Nanoseconds returns the current time in nanoseconds.
func Nanoseconds() (ret int64) {
ret = runtime·nanotime();
}
+
+// Sleep puts the current goroutine to sleep for at least ns nanoseconds.
+func Sleep(ns int64) {
+ g->status = Gwaiting;
+ g->waitreason = "sleep";
+ runtime·tsleep(ns);
+}
+
+// startTimer adds t to the timer heap.
+func startTimer(t *Timer) {
+ addtimer(t);
+}
+
+// stopTimer removes t from the timer heap if it is there.
+// It returns true if t was removed, false if t wasn't even there.
+func stopTimer(t *Timer) (stopped bool) {
+ stopped = deltimer(t);
+}
+
+// C runtime.
+
+static void timerproc(void);
+static void siftup(int32);
+static void siftdown(int32);
+
+// Ready the goroutine e.data.
+static void
+ready(int64 now, Eface e)
+{
+ USED(now);
+
+ runtime·ready(e.data);
+}
+
+// Put the current goroutine to sleep for ns nanoseconds.
+// The caller must have set g->status and g->waitreason.
+void
+runtime·tsleep(int64 ns)
+{
+ Timer t;
+
+ if(ns <= 0)
+ return;
+
+ t.when = runtime·nanotime() + ns;
+ t.period = 0;
+ t.f = ready;
+ t.arg.data = g;
+ addtimer(&t);
+ runtime·gosched();
+}
+
+// Add a timer to the heap and start or kick the timer proc
+// if the new timer is earlier than any of the others.
+static void
+addtimer(Timer *t)
+{
+ int32 n;
+ Timer **nt;
+
+ runtime·lock(&timers);
+ if(timers.len >= timers.cap) {
+ // Grow slice.
+ n = 16;
+ if(n <= timers.cap)
+ n = timers.cap*3 / 2;
+ nt = runtime·malloc(n*sizeof nt[0]);
+ runtime·memmove(nt, timers.t, timers.len*sizeof nt[0]);
+ runtime·free(timers.t);
+ timers.t = nt;
+ timers.cap = n;
+ }
+ t->i = timers.len++;
+ timers.t[t->i] = t;
+ siftup(t->i);
+ if(t->i == 0) {
+ // siftup moved to top: new earliest deadline.
+ if(timers.sleeping) {
+ timers.sleeping = false;
+ runtime·notewakeup(&timers.waitnote);
+ }
+ if(timers.rescheduling) {
+ timers.rescheduling = false;
+ runtime·ready(timers.timerproc);
+ }
+ }
+ if(timers.timerproc == nil)
+ timers.timerproc = runtime·newproc1((byte*)timerproc, nil, 0, 0, addtimer);
+ runtime·unlock(&timers);
+}
+
+// Delete timer t from the heap.
+// Do not need to update the timerproc:
+// if it wakes up early, no big deal.
+static bool
+deltimer(Timer *t)
+{
+ int32 i;
+
+ runtime·lock(&timers);
+
+ // t may not be registered anymore and may have
+ // a bogus i (typically 0, if generated by Go).
+ // Verify it before proceeding.
+ i = t->i;
+ if(i < 0 || i >= timers.len || timers.t[i] != t) {
+ runtime·unlock(&timers);
+ return false;
+ }
+
+ timers.t[i] = timers.t[--timers.len];
+ siftup(i);
+ siftdown(i);
+ runtime·unlock(&timers);
+ return true;
+}
+
+// Timerproc runs the time-driven events.
+// It sleeps until the next event in the timers heap.
+// If addtimer inserts a new earlier event, addtimer
+// wakes timerproc early.
+static void
+timerproc(void)
+{
+ int64 delta, now;
+ Timer *t;
+
+ for(;;) {
+ runtime·lock(&timers);
+ now = runtime·nanotime();
+ for(;;) {
+ if(timers.len == 0) {
+ delta = -1;
+ break;
+ }
+ t = timers.t[0];
+ delta = t->when - now;
+ if(delta > 0)
+ break;
+ if(t->period > 0) {
+ // leave in heap but adjust next time to fire
+ t->when += t->period * (1 + -delta/t->period);
+ siftdown(0);
+ } else {
+ // remove from heap
+ timers.t[0] = timers.t[--timers.len];
+ timers.t[0]->i = 0;
+ siftdown(0);
+ t->i = -1; // mark as removed
+ }
+ t->f(now, t->arg);
+ }
+ if(delta < 0) {
+ // No timers left - put goroutine to sleep.
+ timers.rescheduling = true;
+ g->status = Gwaiting;
+ g->waitreason = "timer goroutine (idle)";
+ runtime·unlock(&timers);
+ runtime·gosched();
+ continue;
+ }
+ // At least one timer pending. Sleep until then.
+ timers.sleeping = true;
+ runtime·noteclear(&timers.waitnote);
+ runtime·unlock(&timers);
+ runtime·entersyscall();
+ runtime·notetsleep(&timers.waitnote, delta);
+ runtime·exitsyscall();
+ }
+}
+
+// heap maintenance algorithms.
+
+static void
+siftup(int32 i)
+{
+ int32 p;
+ Timer **t, *tmp;
+
+ t = timers.t;
+ while(i > 0) {
+ p = (i-1)/2; // parent
+ if(t[i]->when >= t[p]->when)
+ break;
+ tmp = t[i];
+ t[i] = t[p];
+ t[p] = tmp;
+ t[i]->i = i;
+ t[p]->i = p;
+ i = p;
+ }
+}
+
+static void
+siftdown(int32 i)
+{
+ int32 c, len;
+ Timer **t, *tmp;
+
+ t = timers.t;
+ len = timers.len;
+ for(;;) {
+ c = i*2 + 1; // left child
+ if(c >= len) {
+ break;
+ }
+ if(c+1 < len && t[c+1]->when < t[c]->when)
+ c++;
+ if(t[c]->when >= t[i]->when)
+ break;
+ tmp = t[i];
+ t[i] = t[c];
+ t[c] = tmp;
+ t[i]->i = i;
+ t[c]->i = c;
+ i = c;
+ }
+}
diff --git a/src/pkg/runtime/windows/thread.c b/src/pkg/runtime/windows/thread.c
index c00485b1a8..aec78509d5 100644
--- a/src/pkg/runtime/windows/thread.c
+++ b/src/pkg/runtime/windows/thread.c
@@ -150,10 +150,25 @@ runtime·usleep(uint32 us)
runtime·stdcall(runtime·Sleep, 1, (uintptr)us);
}
-void
-runtime·semasleep(void)
+#define INFINITE ((uintptr)0xFFFFFFFF)
+
+int32
+runtime·semasleep(int64 ns)
{
- runtime·stdcall(runtime·WaitForSingleObject, 2, m->waitsema, (uintptr)-1);
+ uintptr ms;
+
+ if(ns < 0)
+ ms = INFINITE;
+ else if(ns/1000000 > 0x7fffffffLL)
+ ms = 0x7fffffff;
+ else {
+ ms = ns/1000000;
+ if(ms == 0)
+ ms = 1;
+ }
+ if(runtime·stdcall(runtime·WaitForSingleObject, 2, m->waitsema, ms) != 0)
+ return -1; // timeout
+ return 0;
}
void
@@ -198,7 +213,7 @@ runtime·nanotime(void)
int64 filetime;
runtime·stdcall(runtime·GetSystemTimeAsFileTime, 1, &filetime);
-
+
// Filetime is 100s of nanoseconds since January 1, 1601.
// Convert to nanoseconds since January 1, 1970.
return (filetime - 116444736000000000LL) * 100LL;