diff options
| author | Russ Cox <rsc@golang.org> | 2011-01-31 18:36:28 -0500 |
|---|---|---|
| committer | Russ Cox <rsc@golang.org> | 2011-01-31 18:36:28 -0500 |
| commit | f4e76d83091b43e88bb2a832c3b6424c3cc17e1d (patch) | |
| tree | 6527360b647fd1fabce1dbc839ba03e24c308163 /src | |
| parent | fc52d7029fcd667557230d4b4b6443886e261ef9 (diff) | |
| download | go-f4e76d83091b43e88bb2a832c3b6424c3cc17e1d.tar.xz | |
replace non-blocking send, receive syntax with select
R=golang-dev, nigeltao, niemeyer, r
CC=golang-dev
https://golang.org/cl/4079053
Diffstat (limited to 'src')
| -rw-r--r-- | src/pkg/compress/flate/deflate_test.go | 13 | ||||
| -rw-r--r-- | src/pkg/exp/draw/x11/conn.go | 11 | ||||
| -rw-r--r-- | src/pkg/fmt/print.go | 43 | ||||
| -rw-r--r-- | src/pkg/fmt/scan.go | 12 | ||||
| -rw-r--r-- | src/pkg/net/fd.go | 15 | ||||
| -rw-r--r-- | src/pkg/net/server_test.go | 9 | ||||
| -rw-r--r-- | src/pkg/netchan/common.go | 12 | ||||
| -rw-r--r-- | src/pkg/netchan/import.go | 6 | ||||
| -rw-r--r-- | src/pkg/os/inotify/inotify_linux.go | 6 | ||||
| -rw-r--r-- | src/pkg/path/path_test.go | 18 | ||||
| -rw-r--r-- | src/pkg/rpc/client.go | 20 | ||||
| -rw-r--r-- | src/pkg/rpc/server_test.go | 12 | ||||
| -rw-r--r-- | src/pkg/time/sleep_test.go | 10 | ||||
| -rw-r--r-- | src/pkg/time/tick.go | 12 | ||||
| -rw-r--r-- | src/pkg/time/tick_test.go | 6 |
15 files changed, 142 insertions, 63 deletions
diff --git a/src/pkg/compress/flate/deflate_test.go b/src/pkg/compress/flate/deflate_test.go index 3db955609d..68dcd7bcc5 100644 --- a/src/pkg/compress/flate/deflate_test.go +++ b/src/pkg/compress/flate/deflate_test.go @@ -116,9 +116,16 @@ func (b *syncBuffer) Read(p []byte) (n int, err os.Error) { panic("unreachable") } +func (b *syncBuffer) signal() { + select { + case b.ready <- true: + default: + } +} + func (b *syncBuffer) Write(p []byte) (n int, err os.Error) { n, err = b.buf.Write(p) - _ = b.ready <- true + b.signal() return } @@ -128,12 +135,12 @@ func (b *syncBuffer) WriteMode() { func (b *syncBuffer) ReadMode() { b.mu.Unlock() - _ = b.ready <- true + b.signal() } func (b *syncBuffer) Close() os.Error { b.closed = true - _ = b.ready <- true + b.signal() return nil } diff --git a/src/pkg/exp/draw/x11/conn.go b/src/pkg/exp/draw/x11/conn.go index da2181536f..e28fb21706 100644 --- a/src/pkg/exp/draw/x11/conn.go +++ b/src/pkg/exp/draw/x11/conn.go @@ -122,10 +122,13 @@ func (c *conn) writeSocket() { func (c *conn) Screen() draw.Image { return c.img } func (c *conn) FlushImage() { - // We do the send (the <- operator) in an expression context, rather than in - // a statement context, so that it does not block, and fails if the buffered - // channel is full (in which case there already is a flush request pending). - _ = c.flush <- false + select { + case c.flush <- false: + // Flush notification sent. + default: + // Could not send. + // Flush notification must be pending already. + } } func (c *conn) Close() os.Error { diff --git a/src/pkg/fmt/print.go b/src/pkg/fmt/print.go index 96029a8789..d6dc8eb3da 100644 --- a/src/pkg/fmt/print.go +++ b/src/pkg/fmt/print.go @@ -74,15 +74,42 @@ type pp struct { fmt fmt } -// A leaky bucket of reusable pp structures. -var ppFree = make(chan *pp, 100) +// A cache holds a set of reusable objects. +// The buffered channel holds the currently available objects. +// If more are needed, the cache creates them by calling new. +type cache struct { + saved chan interface{} + new func() interface{} +} -// Allocate a new pp struct. Probably can grab the previous one from ppFree. -func newPrinter() *pp { - p, ok := <-ppFree - if !ok { - p = new(pp) +func (c *cache) put(x interface{}) { + select { + case c.saved <- x: + // saved in cache + default: + // discard + } +} + +func (c *cache) get() interface{} { + select { + case x := <-c.saved: + return x // reused from cache + default: + return c.new() } + panic("not reached") +} + +func newCache(f func() interface{}) *cache { + return &cache{make(chan interface{}, 100), f} +} + +var ppFree = newCache(func() interface{} { return new(pp) }) + +// Allocate a new pp struct or grab a cached one. +func newPrinter() *pp { + p := ppFree.get().(*pp) p.fmt.init(&p.buf) return p } @@ -94,7 +121,7 @@ func (p *pp) free() { return } p.buf.Reset() - _ = ppFree <- p + ppFree.put(p) } func (p *pp) Width() (wid int, ok bool) { return p.fmt.wid, p.fmt.widPresent } diff --git a/src/pkg/fmt/scan.go b/src/pkg/fmt/scan.go index ebbb17155e..a408c42aaf 100644 --- a/src/pkg/fmt/scan.go +++ b/src/pkg/fmt/scan.go @@ -303,15 +303,11 @@ func (r *readRune) ReadRune() (rune int, size int, err os.Error) { } -// A leaky bucket of reusable ss structures. -var ssFree = make(chan *ss, 100) +var ssFree = newCache(func() interface{} { return new(ss) }) -// Allocate a new ss struct. Probably can grab the previous one from ssFree. +// Allocate a new ss struct or grab a cached one. func newScanState(r io.Reader, nlIsSpace bool) *ss { - s, ok := <-ssFree - if !ok { - s = new(ss) - } + s := ssFree.get().(*ss) if rr, ok := r.(readRuner); ok { s.rr = rr } else { @@ -333,7 +329,7 @@ func (s *ss) free() { } s.buf.Reset() s.rr = nil - _ = ssFree <- s + ssFree.put(s) } // skipSpace skips spaces and maybe newlines. diff --git a/src/pkg/net/fd.go b/src/pkg/net/fd.go index 896178f18e..2ba9296f31 100644 --- a/src/pkg/net/fd.go +++ b/src/pkg/net/fd.go @@ -220,11 +220,16 @@ func (s *pollServer) Run() { nn, _ = s.pr.Read(scratch[0:]) } // Read from channels - for fd, ok := <-s.cr; ok; fd, ok = <-s.cr { - s.AddFD(fd, 'r') - } - for fd, ok := <-s.cw; ok; fd, ok = <-s.cw { - s.AddFD(fd, 'w') + Update: + for { + select { + case fd := <-s.cr: + s.AddFD(fd, 'r') + case fd := <-s.cw: + s.AddFD(fd, 'w') + default: + break Update + } } } else { netfd := s.LookupFD(fd, mode) diff --git a/src/pkg/net/server_test.go b/src/pkg/net/server_test.go index 543227f7d1..3dda500e58 100644 --- a/src/pkg/net/server_test.go +++ b/src/pkg/net/server_test.go @@ -140,13 +140,16 @@ func runPacket(t *testing.T, network, addr string, listening chan<- string, done listening <- c.LocalAddr().String() c.SetReadTimeout(10e6) // 10ms var buf [1000]byte +Run: for { n, addr, err := c.ReadFrom(buf[0:]) if e, ok := err.(Error); ok && e.Timeout() { - if done <- 1 { - break + select { + case done <- 1: + break Run + default: + continue Run } - continue } if err != nil { break diff --git a/src/pkg/netchan/common.go b/src/pkg/netchan/common.go index 56c0b25199..6c085484e5 100644 --- a/src/pkg/netchan/common.go +++ b/src/pkg/netchan/common.go @@ -256,7 +256,10 @@ func (nch *netChan) send(val reflect.Value) { nch.sendCh = make(chan reflect.Value, nch.size) go nch.sender() } - if ok := nch.sendCh <- val; !ok { + select { + case nch.sendCh <- val: + // ok + default: // TODO: should this be more resilient? panic("netchan: remote sender sent more values than allowed") } @@ -318,8 +321,11 @@ func (nch *netChan) acked() { if nch.dir != Send { panic("recv on wrong direction of channel") } - if ok := nch.ackCh <- true; !ok { - panic("netchan: remote receiver sent too many acks") + select { + case nch.ackCh <- true: + // ok + default: // TODO: should this be more resilient? + panic("netchan: remote receiver sent too many acks") } } diff --git a/src/pkg/netchan/import.go b/src/pkg/netchan/import.go index a694fb41f6..d1e9bbd406 100644 --- a/src/pkg/netchan/import.go +++ b/src/pkg/netchan/import.go @@ -91,11 +91,13 @@ func (imp *Importer) run() { } if err.Error != "" { impLog("response error:", err.Error) - if sent := imp.errors <- os.ErrorString(err.Error); !sent { + select { + case imp.errors <- os.ErrorString(err.Error): + continue // errors are not acknowledged + default: imp.shutdown() return } - continue // errors are not acknowledged. } case payClosed: nch := imp.getChan(hdr.Id, false) diff --git a/src/pkg/os/inotify/inotify_linux.go b/src/pkg/os/inotify/inotify_linux.go index 1e74c7fbc5..9d7a074424 100644 --- a/src/pkg/os/inotify/inotify_linux.go +++ b/src/pkg/os/inotify/inotify_linux.go @@ -153,7 +153,11 @@ func (w *Watcher) readEvents() { for { n, errno = syscall.Read(w.fd, buf[0:]) // See if there is a message on the "done" channel - _, done := <-w.done + var done bool + select { + case done = <-w.done: + default: + } // If EOF or a "done" message is received if n == 0 || done { diff --git a/src/pkg/path/path_test.go b/src/pkg/path/path_test.go index 6b4be07a95..ab0b48ad6a 100644 --- a/src/pkg/path/path_test.go +++ b/src/pkg/path/path_test.go @@ -256,8 +256,11 @@ func TestWalk(t *testing.T) { // 2) handle errors, expect none errors := make(chan os.Error, 64) Walk(tree.name, v, errors) - if err, ok := <-errors; ok { + select { + case err := <-errors: t.Errorf("no error expected, found: %s", err) + default: + // ok } checkMarks(t) @@ -276,14 +279,21 @@ func TestWalk(t *testing.T) { errors = make(chan os.Error, 64) os.Chmod(Join(tree.name, tree.entries[1].name), 0) Walk(tree.name, v, errors) + Loop: for i := 1; i <= 2; i++ { - if _, ok := <-errors; !ok { + select { + case <-errors: + // ok + default: t.Errorf("%d. error expected, none found", i) - break + break Loop } } - if err, ok := <-errors; ok { + select { + case err := <-errors: t.Errorf("only two errors expected, found 3rd: %v", err) + default: + // ok } // the inaccessible subtrees were marked manually checkMarks(t) diff --git a/src/pkg/rpc/client.go b/src/pkg/rpc/client.go index 601c49715b..6f028c10d9 100644 --- a/src/pkg/rpc/client.go +++ b/src/pkg/rpc/client.go @@ -58,7 +58,7 @@ func (client *Client) send(c *Call) { if client.shutdown != nil { c.Error = client.shutdown client.mutex.Unlock() - _ = c.Done <- c // do not block + c.done() return } c.seq = client.seq @@ -102,16 +102,14 @@ func (client *Client) input() { // Empty strings should turn into nil os.Errors c.Error = nil } - // We don't want to block here. It is the caller's responsibility to make - // sure the channel has enough buffer space. See comment in Go(). - _ = c.Done <- c // do not block + c.done() } // Terminate pending calls. client.mutex.Lock() client.shutdown = err for _, call := range client.pending { call.Error = err - _ = call.Done <- call // do not block + call.done() } client.mutex.Unlock() if err != os.EOF || !client.closing { @@ -119,6 +117,16 @@ func (client *Client) input() { } } +func (call *Call) done() { + select { + case call.Done <- call: + // ok + default: + // We don't want to block here. It is the caller's responsibility to make + // sure the channel has enough buffer space. See comment in Go(). + } +} + // NewClient returns a new Client to handle requests to the // set of services at the other end of the connection. func NewClient(conn io.ReadWriteCloser) *Client { @@ -233,7 +241,7 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface c.Done = done if client.shutdown != nil { c.Error = client.shutdown - _ = c.Done <- c // do not block + c.done() return c } client.send(c) diff --git a/src/pkg/rpc/server_test.go b/src/pkg/rpc/server_test.go index 355d51ce46..67b8762fa5 100644 --- a/src/pkg/rpc/server_test.go +++ b/src/pkg/rpc/server_test.go @@ -364,14 +364,12 @@ func TestSendDeadlock(t *testing.T) { testSendDeadlock(client) done <- true }() - for i := 0; i < 50; i++ { - time.Sleep(100 * 1e6) - _, ok := <-done - if ok { - return - } + select { + case <-done: + return + case <-time.After(5e9): + t.Fatal("deadlock") } - t.Fatal("deadlock") } func testSendDeadlock(client *Client) { diff --git a/src/pkg/time/sleep_test.go b/src/pkg/time/sleep_test.go index 4007db561a..8bf599c3e1 100644 --- a/src/pkg/time/sleep_test.go +++ b/src/pkg/time/sleep_test.go @@ -120,10 +120,12 @@ func TestAfterStop(t *testing.T) { t.Fatalf("failed to stop event 1") } <-c2 - _, ok0 := <-t0.C - _, ok1 := <-c1 - if ok0 || ok1 { - t.Fatalf("events were not stopped") + select { + case <-t0.C: + t.Fatalf("event 0 was not stopped") + case <-c1: + t.Fatalf("event 1 was not stopped") + default: } if t1.Stop() { t.Fatalf("Stop returned true twice") diff --git a/src/pkg/time/tick.go b/src/pkg/time/tick.go index ddd7272702..6c21bf19b9 100644 --- a/src/pkg/time/tick.go +++ b/src/pkg/time/tick.go @@ -22,8 +22,12 @@ type Ticker struct { // Stop turns off a ticker. After Stop, no more ticks will be sent. func (t *Ticker) Stop() { - // Make it non-blocking so multiple Stops don't block. - _ = t.shutdown <- true + select { + case t.shutdown <- true: + // ok + default: + // Stop in progress already + } } // Tick is a convenience wrapper for NewTicker providing access to the ticking @@ -106,7 +110,8 @@ func tickerLoop() { // that need it and determining the next wake time. // TODO(r): list should be sorted in time order. for t := tickers; t != nil; t = t.next { - if _, ok := <-t.shutdown; ok { + select { + case <-t.shutdown: // Ticker is done; remove it from list. if prev == nil { tickers = t.next @@ -114,6 +119,7 @@ func tickerLoop() { prev.next = t.next } continue + default: } if t.nextTick <= now { if len(t.c) == 0 { diff --git a/src/pkg/time/tick_test.go b/src/pkg/time/tick_test.go index f2304a14e3..4dcb63956b 100644 --- a/src/pkg/time/tick_test.go +++ b/src/pkg/time/tick_test.go @@ -29,9 +29,11 @@ func TestTicker(t *testing.T) { } // Now test that the ticker stopped Sleep(2 * Delta) - _, received := <-ticker.C - if received { + select { + case <-ticker.C: t.Fatal("Ticker did not shut down") + default: + // ok } } |
