diff options
| author | Ian Lance Taylor <iant@golang.org> | 2017-02-10 14:59:38 -0800 |
|---|---|---|
| committer | Ian Lance Taylor <iant@golang.org> | 2017-02-13 18:36:28 +0000 |
| commit | 3792db518327c685da17ca6c6faa4e1d2da4c33c (patch) | |
| tree | 8d2456fa679526c8349a968aaf3cae09524aadfd /src/net | |
| parent | b548eee3d96fc0b6e962a243b28121e1f37ad792 (diff) | |
| download | go-3792db518327c685da17ca6c6faa4e1d2da4c33c.tar.xz | |
net: refactor poller into new internal/poll package
This will make it possible to use the poller with the os package.
This is a lot of code movement but the behavior is intended to be
unchanged.
Update #6817.
Update #7903.
Update #15021.
Update #18507.
Change-Id: I1413685928017c32df5654ded73a2643820977ae
Reviewed-on: https://go-review.googlesource.com/36799
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: David Crawshaw <crawshaw@golang.org>
Reviewed-by: Russ Cox <rsc@golang.org>
Diffstat (limited to 'src/net')
47 files changed, 340 insertions, 2229 deletions
diff --git a/src/net/dial.go b/src/net/dial.go index 50bba5a49e..0a7da408fe 100644 --- a/src/net/dial.go +++ b/src/net/dial.go @@ -7,6 +7,7 @@ package net import ( "context" "internal/nettrace" + "internal/poll" "time" ) @@ -110,7 +111,7 @@ func partialDeadline(now, deadline time.Time, addrsRemaining int) (time.Time, er } timeRemaining := deadline.Sub(now) if timeRemaining <= 0 { - return time.Time{}, errTimeout + return time.Time{}, poll.ErrTimeout } // Tentatively allocate equal time to each remaining address. timeout := timeRemaining / time.Duration(addrsRemaining) diff --git a/src/net/dial_test.go b/src/net/dial_test.go index 9919d72ce3..9825bc92ab 100644 --- a/src/net/dial_test.go +++ b/src/net/dial_test.go @@ -7,6 +7,7 @@ package net import ( "bufio" "context" + "internal/poll" "internal/testenv" "io" "net/internal/socktest" @@ -94,7 +95,7 @@ func TestDialTimeoutFDLeak(t *testing.T) { default: sw.Set(socktest.FilterConnect, func(so *socktest.Status) (socktest.AfterFilter, error) { time.Sleep(2 * T) - return nil, errTimeout + return nil, poll.ErrTimeout }) defer sw.Set(socktest.FilterConnect, nil) } @@ -585,8 +586,8 @@ func TestDialerPartialDeadline(t *testing.T) { {now, noDeadline, 1, noDeadline, nil}, // Step the clock forward and cross the deadline. {now.Add(-1 * time.Millisecond), now, 1, now, nil}, - {now.Add(0 * time.Millisecond), now, 1, noDeadline, errTimeout}, - {now.Add(1 * time.Millisecond), now, 1, noDeadline, errTimeout}, + {now.Add(0 * time.Millisecond), now, 1, noDeadline, poll.ErrTimeout}, + {now.Add(1 * time.Millisecond), now, 1, noDeadline, poll.ErrTimeout}, } for i, tt := range testCases { deadline, err := partialDeadline(tt.now, tt.deadline, tt.addrs) diff --git a/src/net/dnsclient_unix_test.go b/src/net/dnsclient_unix_test.go index 85267bbddc..4464804c70 100644 --- a/src/net/dnsclient_unix_test.go +++ b/src/net/dnsclient_unix_test.go @@ -9,6 +9,7 @@ package net import ( "context" "fmt" + "internal/poll" "internal/testenv" "io/ioutil" "os" @@ -767,7 +768,7 @@ func TestRetryTimeout(t *testing.T) { if s == "192.0.2.1:53" { deadline0 = deadline time.Sleep(10 * time.Millisecond) - return nil, errTimeout + return nil, poll.ErrTimeout } if deadline == deadline0 { diff --git a/src/net/error_posix.go b/src/net/error_posix.go new file mode 100644 index 0000000000..dd9754c841 --- /dev/null +++ b/src/net/error_posix.go @@ -0,0 +1,21 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows + +package net + +import ( + "os" + "syscall" +) + +// wrapSyscallError takes an error and a syscall name. If the error is +// a syscall.Errno, it wraps it in a os.SyscallError using the syscall name. +func wrapSyscallError(name string, err error) error { + if _, ok := err.(syscall.Errno); ok { + err = os.NewSyscallError(name, err) + } + return err +} diff --git a/src/net/error_test.go b/src/net/error_test.go index c23da49fad..61abfae5f0 100644 --- a/src/net/error_test.go +++ b/src/net/error_test.go @@ -7,6 +7,7 @@ package net import ( "context" "fmt" + "internal/poll" "io" "io/ioutil" "net/internal/socktest" @@ -87,7 +88,7 @@ second: return nil } switch err := nestedErr.(type) { - case *AddrError, addrinfoErrno, *DNSError, InvalidAddrError, *ParseError, *timeoutError, UnknownNetworkError: + case *AddrError, addrinfoErrno, *DNSError, InvalidAddrError, *ParseError, *poll.TimeoutError, UnknownNetworkError: return nil case *os.SyscallError: nestedErr = err.Err @@ -97,7 +98,7 @@ second: goto third } switch nestedErr { - case errCanceled, errClosing, errMissingAddress, errNoSuitableAddress, + case errCanceled, poll.ErrClosing, errMissingAddress, errNoSuitableAddress, context.DeadlineExceeded, context.Canceled: return nil } @@ -432,7 +433,7 @@ second: goto third } switch nestedErr { - case errClosing, errTimeout: + case poll.ErrClosing, poll.ErrTimeout: return nil } return fmt.Errorf("unexpected type on 2nd nested level: %T", nestedErr) @@ -467,14 +468,14 @@ second: return nil } switch err := nestedErr.(type) { - case *AddrError, addrinfoErrno, *DNSError, InvalidAddrError, *ParseError, *timeoutError, UnknownNetworkError: + case *AddrError, addrinfoErrno, *DNSError, InvalidAddrError, *ParseError, *poll.TimeoutError, UnknownNetworkError: return nil case *os.SyscallError: nestedErr = err.Err goto third } switch nestedErr { - case errCanceled, errClosing, errMissingAddress, errTimeout, ErrWriteToConnected, io.ErrUnexpectedEOF: + case errCanceled, poll.ErrClosing, errMissingAddress, poll.ErrTimeout, ErrWriteToConnected, io.ErrUnexpectedEOF: return nil } return fmt.Errorf("unexpected type on 2nd nested level: %T", nestedErr) @@ -517,7 +518,7 @@ second: goto third } switch nestedErr { - case errClosing: + case poll.ErrClosing: return nil } return fmt.Errorf("unexpected type on 2nd nested level: %T", nestedErr) @@ -613,7 +614,7 @@ second: goto third } switch nestedErr { - case errClosing, errTimeout: + case poll.ErrClosing, poll.ErrTimeout: return nil } return fmt.Errorf("unexpected type on 2nd nested level: %T", nestedErr) @@ -692,7 +693,7 @@ second: goto third } switch nestedErr { - case errClosing: + case poll.ErrClosing: return nil } return fmt.Errorf("unexpected type on 2nd nested level: %T", nestedErr) diff --git a/src/net/fd_io_plan9.go b/src/net/fd_io_plan9.go deleted file mode 100644 index 76da0c546c..0000000000 --- a/src/net/fd_io_plan9.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2016 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package net - -import ( - "os" - "runtime" - "sync" - "syscall" -) - -// asyncIO implements asynchronous cancelable I/O. -// An asyncIO represents a single asynchronous Read or Write -// operation. The result is returned on the result channel. -// The undergoing I/O system call can either complete or be -// interrupted by a note. -type asyncIO struct { - res chan result - - // mu guards the pid field. - mu sync.Mutex - - // pid holds the process id of - // the process running the IO operation. - pid int -} - -// result is the return value of a Read or Write operation. -type result struct { - n int - err error -} - -// newAsyncIO returns a new asyncIO that performs an I/O -// operation by calling fn, which must do one and only one -// interruptible system call. -func newAsyncIO(fn func([]byte) (int, error), b []byte) *asyncIO { - aio := &asyncIO{ - res: make(chan result, 0), - } - aio.mu.Lock() - go func() { - // Lock the current goroutine to its process - // and store the pid in io so that Cancel can - // interrupt it. We ignore the "hangup" signal, - // so the signal does not take down the entire - // Go runtime. - runtime.LockOSThread() - runtime_ignoreHangup() - aio.pid = os.Getpid() - aio.mu.Unlock() - - n, err := fn(b) - - aio.mu.Lock() - aio.pid = -1 - runtime_unignoreHangup() - aio.mu.Unlock() - - aio.res <- result{n, err} - }() - return aio -} - -var hangupNote os.Signal = syscall.Note("hangup") - -// Cancel interrupts the I/O operation, causing -// the Wait function to return. -func (aio *asyncIO) Cancel() { - aio.mu.Lock() - defer aio.mu.Unlock() - if aio.pid == -1 { - return - } - proc, err := os.FindProcess(aio.pid) - if err != nil { - return - } - proc.Signal(hangupNote) -} - -// Wait for the I/O operation to complete. -func (aio *asyncIO) Wait() (int, error) { - res := <-aio.res - return res.n, res.err -} - -// The following functions, provided by the runtime, are used to -// ignore and unignore the "hangup" signal received by the process. -func runtime_ignoreHangup() -func runtime_unignoreHangup() diff --git a/src/net/fd_mutex.go b/src/net/fd_mutex.go deleted file mode 100644 index 4591fd1cac..0000000000 --- a/src/net/fd_mutex.go +++ /dev/null @@ -1,249 +0,0 @@ -// Copyright 2013 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package net - -import "sync/atomic" - -// fdMutex is a specialized synchronization primitive that manages -// lifetime of an fd and serializes access to Read, Write and Close -// methods on netFD. -type fdMutex struct { - state uint64 - rsema uint32 - wsema uint32 -} - -// fdMutex.state is organized as follows: -// 1 bit - whether netFD is closed, if set all subsequent lock operations will fail. -// 1 bit - lock for read operations. -// 1 bit - lock for write operations. -// 20 bits - total number of references (read+write+misc). -// 20 bits - number of outstanding read waiters. -// 20 bits - number of outstanding write waiters. -const ( - mutexClosed = 1 << 0 - mutexRLock = 1 << 1 - mutexWLock = 1 << 2 - mutexRef = 1 << 3 - mutexRefMask = (1<<20 - 1) << 3 - mutexRWait = 1 << 23 - mutexRMask = (1<<20 - 1) << 23 - mutexWWait = 1 << 43 - mutexWMask = (1<<20 - 1) << 43 -) - -// Read operations must do rwlock(true)/rwunlock(true). -// -// Write operations must do rwlock(false)/rwunlock(false). -// -// Misc operations must do incref/decref. -// Misc operations include functions like setsockopt and setDeadline. -// They need to use incref/decref to ensure that they operate on the -// correct fd in presence of a concurrent close call (otherwise fd can -// be closed under their feet). -// -// Close operations must do increfAndClose/decref. - -// incref adds a reference to mu. -// It reports whether mu is available for reading or writing. -func (mu *fdMutex) incref() bool { - for { - old := atomic.LoadUint64(&mu.state) - if old&mutexClosed != 0 { - return false - } - new := old + mutexRef - if new&mutexRefMask == 0 { - panic("net: inconsistent fdMutex") - } - if atomic.CompareAndSwapUint64(&mu.state, old, new) { - return true - } - } -} - -// increfAndClose sets the state of mu to closed. -// It reports whether there is no remaining reference. -func (mu *fdMutex) increfAndClose() bool { - for { - old := atomic.LoadUint64(&mu.state) - if old&mutexClosed != 0 { - return false - } - // Mark as closed and acquire a reference. - new := (old | mutexClosed) + mutexRef - if new&mutexRefMask == 0 { - panic("net: inconsistent fdMutex") - } - // Remove all read and write waiters. - new &^= mutexRMask | mutexWMask - if atomic.CompareAndSwapUint64(&mu.state, old, new) { - // Wake all read and write waiters, - // they will observe closed flag after wakeup. - for old&mutexRMask != 0 { - old -= mutexRWait - runtime_Semrelease(&mu.rsema) - } - for old&mutexWMask != 0 { - old -= mutexWWait - runtime_Semrelease(&mu.wsema) - } - return true - } - } -} - -// decref removes a reference from mu. -// It reports whether there is no remaining reference. -func (mu *fdMutex) decref() bool { - for { - old := atomic.LoadUint64(&mu.state) - if old&mutexRefMask == 0 { - panic("net: inconsistent fdMutex") - } - new := old - mutexRef - if atomic.CompareAndSwapUint64(&mu.state, old, new) { - return new&(mutexClosed|mutexRefMask) == mutexClosed - } - } -} - -// lock adds a reference to mu and locks mu. -// It reports whether mu is available for reading or writing. -func (mu *fdMutex) rwlock(read bool) bool { - var mutexBit, mutexWait, mutexMask uint64 - var mutexSema *uint32 - if read { - mutexBit = mutexRLock - mutexWait = mutexRWait - mutexMask = mutexRMask - mutexSema = &mu.rsema - } else { - mutexBit = mutexWLock - mutexWait = mutexWWait - mutexMask = mutexWMask - mutexSema = &mu.wsema - } - for { - old := atomic.LoadUint64(&mu.state) - if old&mutexClosed != 0 { - return false - } - var new uint64 - if old&mutexBit == 0 { - // Lock is free, acquire it. - new = (old | mutexBit) + mutexRef - if new&mutexRefMask == 0 { - panic("net: inconsistent fdMutex") - } - } else { - // Wait for lock. - new = old + mutexWait - if new&mutexMask == 0 { - panic("net: inconsistent fdMutex") - } - } - if atomic.CompareAndSwapUint64(&mu.state, old, new) { - if old&mutexBit == 0 { - return true - } - runtime_Semacquire(mutexSema) - // The signaller has subtracted mutexWait. - } - } -} - -// unlock removes a reference from mu and unlocks mu. -// It reports whether there is no remaining reference. -func (mu *fdMutex) rwunlock(read bool) bool { - var mutexBit, mutexWait, mutexMask uint64 - var mutexSema *uint32 - if read { - mutexBit = mutexRLock - mutexWait = mutexRWait - mutexMask = mutexRMask - mutexSema = &mu.rsema - } else { - mutexBit = mutexWLock - mutexWait = mutexWWait - mutexMask = mutexWMask - mutexSema = &mu.wsema - } - for { - old := atomic.LoadUint64(&mu.state) - if old&mutexBit == 0 || old&mutexRefMask == 0 { - panic("net: inconsistent fdMutex") - } - // Drop lock, drop reference and wake read waiter if present. - new := (old &^ mutexBit) - mutexRef - if old&mutexMask != 0 { - new -= mutexWait - } - if atomic.CompareAndSwapUint64(&mu.state, old, new) { - if old&mutexMask != 0 { - runtime_Semrelease(mutexSema) - } - return new&(mutexClosed|mutexRefMask) == mutexClosed - } - } -} - -// Implemented in runtime package. -func runtime_Semacquire(sema *uint32) -func runtime_Semrelease(sema *uint32) - -// incref adds a reference to fd. -// It returns an error when fd cannot be used. -func (fd *netFD) incref() error { - if !fd.fdmu.incref() { - return errClosing - } - return nil -} - -// decref removes a reference from fd. -// It also closes fd when the state of fd is set to closed and there -// is no remaining reference. -func (fd *netFD) decref() { - if fd.fdmu.decref() { - fd.destroy() - } -} - -// readLock adds a reference to fd and locks fd for reading. -// It returns an error when fd cannot be used for reading. -func (fd *netFD) readLock() error { - if !fd.fdmu.rwlock(true) { - return errClosing - } - return nil -} - -// readUnlock removes a reference from fd and unlocks fd for reading. -// It also closes fd when the state of fd is set to closed and there -// is no remaining reference. -func (fd *netFD) readUnlock() { - if fd.fdmu.rwunlock(true) { - fd.destroy() - } -} - -// writeLock adds a reference to fd and locks fd for writing. -// It returns an error when fd cannot be used for writing. -func (fd *netFD) writeLock() error { - if !fd.fdmu.rwlock(false) { - return errClosing - } - return nil -} - -// writeUnlock removes a reference from fd and unlocks fd for writing. -// It also closes fd when the state of fd is set to closed and there -// is no remaining reference. -func (fd *netFD) writeUnlock() { - if fd.fdmu.rwunlock(false) { - fd.destroy() - } -} diff --git a/src/net/fd_mutex_test.go b/src/net/fd_mutex_test.go deleted file mode 100644 index 3542c70f9d..0000000000 --- a/src/net/fd_mutex_test.go +++ /dev/null @@ -1,195 +0,0 @@ -// Copyright 2013 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package net - -import ( - "math/rand" - "runtime" - "testing" - "time" -) - -func TestMutexLock(t *testing.T) { - var mu fdMutex - - if !mu.incref() { - t.Fatal("broken") - } - if mu.decref() { - t.Fatal("broken") - } - - if !mu.rwlock(true) { - t.Fatal("broken") - } - if mu.rwunlock(true) { - t.Fatal("broken") - } - - if !mu.rwlock(false) { - t.Fatal("broken") - } - if mu.rwunlock(false) { - t.Fatal("broken") - } -} - -func TestMutexClose(t *testing.T) { - var mu fdMutex - if !mu.increfAndClose() { - t.Fatal("broken") - } - - if mu.incref() { - t.Fatal("broken") - } - if mu.rwlock(true) { - t.Fatal("broken") - } - if mu.rwlock(false) { - t.Fatal("broken") - } - if mu.increfAndClose() { - t.Fatal("broken") - } -} - -func TestMutexCloseUnblock(t *testing.T) { - c := make(chan bool) - var mu fdMutex - mu.rwlock(true) - for i := 0; i < 4; i++ { - go func() { - if mu.rwlock(true) { - t.Error("broken") - return - } - c <- true - }() - } - // Concurrent goroutines must not be able to read lock the mutex. - time.Sleep(time.Millisecond) - select { - case <-c: - t.Fatal("broken") - default: - } - mu.increfAndClose() // Must unblock the readers. - for i := 0; i < 4; i++ { - select { - case <-c: - case <-time.After(10 * time.Second): - t.Fatal("broken") - } - } - if mu.decref() { - t.Fatal("broken") - } - if !mu.rwunlock(true) { - t.Fatal("broken") - } -} - -func TestMutexPanic(t *testing.T) { - ensurePanics := func(f func()) { - defer func() { - if recover() == nil { - t.Fatal("does not panic") - } - }() - f() - } - - var mu fdMutex - ensurePanics(func() { mu.decref() }) - ensurePanics(func() { mu.rwunlock(true) }) - ensurePanics(func() { mu.rwunlock(false) }) - - ensurePanics(func() { mu.incref(); mu.decref(); mu.decref() }) - ensurePanics(func() { mu.rwlock(true); mu.rwunlock(true); mu.rwunlock(true) }) - ensurePanics(func() { mu.rwlock(false); mu.rwunlock(false); mu.rwunlock(false) }) - - // ensure that it's still not broken - mu.incref() - mu.decref() - mu.rwlock(true) - mu.rwunlock(true) - mu.rwlock(false) - mu.rwunlock(false) -} - -func TestMutexStress(t *testing.T) { - P := 8 - N := int(1e6) - if testing.Short() { - P = 4 - N = 1e4 - } - defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P)) - done := make(chan bool) - var mu fdMutex - var readState [2]uint64 - var writeState [2]uint64 - for p := 0; p < P; p++ { - go func() { - r := rand.New(rand.NewSource(rand.Int63())) - for i := 0; i < N; i++ { - switch r.Intn(3) { - case 0: - if !mu.incref() { - t.Error("broken") - return - } - if mu.decref() { - t.Error("broken") - return - } - case 1: - if !mu.rwlock(true) { - t.Error("broken") - return - } - // Ensure that it provides mutual exclusion for readers. - if readState[0] != readState[1] { - t.Error("broken") - return - } - readState[0]++ - readState[1]++ - if mu.rwunlock(true) { - t.Error("broken") - return - } - case 2: - if !mu.rwlock(false) { - t.Error("broken") - return - } - // Ensure that it provides mutual exclusion for writers. - if writeState[0] != writeState[1] { - t.Error("broken") - return - } - writeState[0]++ - writeState[1]++ - if mu.rwunlock(false) { - t.Error("broken") - return - } - } - } - done <- true - }() - } - for p := 0; p < P; p++ { - <-done - } - if !mu.increfAndClose() { - t.Fatal("broken") - } - if !mu.decref() { - t.Fatal("broken") - } -} diff --git a/src/net/fd_plan9.go b/src/net/fd_plan9.go index 300d8c4543..7496e36ca7 100644 --- a/src/net/fd_plan9.go +++ b/src/net/fd_plan9.go @@ -5,23 +5,15 @@ package net import ( + "internal/poll" "io" "os" - "sync/atomic" "syscall" - "time" ) -type atomicBool int32 - -func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 } -func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) } -func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) } - // Network file descriptor. type netFD struct { - // locking/lifetime of sysfd + serialize access to Read and Write methods - fdmu fdMutex + pfd poll.FD // immutable until Close net string @@ -30,14 +22,6 @@ type netFD struct { listen, ctl, data *os.File laddr, raddr Addr isStream bool - - // deadlines - raio *asyncIO - waio *asyncIO - rtimer *time.Timer - wtimer *time.Timer - rtimedout atomicBool // set true when read deadline has been reached - wtimedout atomicBool // set true when write deadline has been reached } var ( @@ -49,7 +33,7 @@ func sysInit() { } func newFD(net, name string, listen, ctl, data *os.File, laddr, raddr Addr) (*netFD, error) { - return &netFD{ + ret := &netFD{ net: net, n: name, dir: netdir + "/" + net + "/" + name, @@ -57,7 +41,9 @@ func newFD(net, name string, listen, ctl, data *os.File, laddr, raddr Addr) (*ne ctl: ctl, data: data, laddr: laddr, raddr: raddr, - }, nil + } + ret.pfd.Destroy = ret.destroy + return ret, nil } func (fd *netFD) init() error { @@ -99,28 +85,10 @@ func (fd *netFD) destroy() { } func (fd *netFD) Read(b []byte) (n int, err error) { - if fd.rtimedout.isSet() { - return 0, errTimeout - } if !fd.ok() || fd.data == nil { return 0, syscall.EINVAL } - if err := fd.readLock(); err != nil { - return 0, err - } - defer fd.readUnlock() - if len(b) == 0 { - return 0, nil - } - fd.raio = newAsyncIO(fd.data.Read, b) - n, err = fd.raio.Wait() - fd.raio = nil - if isHangup(err) { - err = io.EOF - } - if isInterrupted(err) { - err = errTimeout - } + n, err = fd.pfd.Read(fd.data.Read, b) if fd.net == "udp" && err == io.EOF { n = 0 err = nil @@ -129,23 +97,10 @@ func (fd *netFD) Read(b []byte) (n int, err error) { } func (fd *netFD) Write(b []byte) (n int, err error) { - if fd.wtimedout.isSet() { - return 0, errTimeout - } if !fd.ok() || fd.data == nil { return 0, syscall.EINVAL } - if err := fd.writeLock(); err != nil { - return 0, err - } - defer fd.writeUnlock() - fd.waio = newAsyncIO(fd.data.Write, b) - n, err = fd.waio.Wait() - fd.waio = nil - if isInterrupted(err) { - err = errTimeout - } - return + return fd.pfd.Write(fd.data.Write, b) } func (fd *netFD) closeRead() error { @@ -163,8 +118,8 @@ func (fd *netFD) closeWrite() error { } func (fd *netFD) Close() error { - if !fd.fdmu.increfAndClose() { - return errClosing + if err := fd.pfd.Close(); err != nil { + return err } if !fd.ok() { return syscall.EINVAL @@ -216,77 +171,6 @@ func (fd *netFD) file(f *os.File, s string) (*os.File, error) { return os.NewFile(uintptr(dfd), s), nil } -func (fd *netFD) setDeadline(t time.Time) error { - return setDeadlineImpl(fd, t, 'r'+'w') -} - -func (fd *netFD) setReadDeadline(t time.Time) error { - return setDeadlineImpl(fd, t, 'r') -} - -func (fd *netFD) setWriteDeadline(t time.Time) error { - return setDeadlineImpl(fd, t, 'w') -} - -func setDeadlineImpl(fd *netFD, t time.Time, mode int) error { - d := t.Sub(time.Now()) - if mode == 'r' || mode == 'r'+'w' { - fd.rtimedout.setFalse() - } - if mode == 'w' || mode == 'r'+'w' { - fd.wtimedout.setFalse() - } - if t.IsZero() || d < 0 { - // Stop timer - if mode == 'r' || mode == 'r'+'w' { - if fd.rtimer != nil { - fd.rtimer.Stop() - } - fd.rtimer = nil - } - if mode == 'w' || mode == 'r'+'w' { - if fd.wtimer != nil { - fd.wtimer.Stop() - } - fd.wtimer = nil - } - } else { - // Interrupt I/O operation once timer has expired - if mode == 'r' || mode == 'r'+'w' { - fd.rtimer = time.AfterFunc(d, func() { - fd.rtimedout.setTrue() - if fd.raio != nil { - fd.raio.Cancel() - } - }) - } - if mode == 'w' || mode == 'r'+'w' { - fd.wtimer = time.AfterFunc(d, func() { - fd.wtimedout.setTrue() - if fd.waio != nil { - fd.waio.Cancel() - } - }) - } - } - if !t.IsZero() && d < 0 { - // Interrupt current I/O operation - if mode == 'r' || mode == 'r'+'w' { - fd.rtimedout.setTrue() - if fd.raio != nil { - fd.raio.Cancel() - } - } - if mode == 'w' || mode == 'r'+'w' { - fd.wtimedout.setTrue() - if fd.waio != nil { - fd.waio.Cancel() - } - } - } - return nil -} - func setReadBuffer(fd *netFD, bytes int) error { return syscall.EPLAN9 } @@ -294,11 +178,3 @@ func setReadBuffer(fd *netFD, bytes int) error { func setWriteBuffer(fd *netFD, bytes int) error { return syscall.EPLAN9 } - -func isHangup(err error) bool { - return err != nil && stringsHasSuffix(err.Error(), "Hangup") -} - -func isInterrupted(err error) bool { - return err != nil && stringsHasSuffix(err.Error(), "interrupted") -} diff --git a/src/net/fd_poll_nacl.go b/src/net/fd_poll_nacl.go deleted file mode 100644 index 83987602a5..0000000000 --- a/src/net/fd_poll_nacl.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2013 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package net - -import ( - "runtime" - "syscall" - "time" -) - -type pollDesc struct { - fd *netFD - closing bool -} - -func (pd *pollDesc) init(fd *netFD) error { pd.fd = fd; return nil } - -func (pd *pollDesc) close() {} - -func (pd *pollDesc) evict() { - pd.closing = true - if pd.fd != nil { - syscall.StopIO(pd.fd.sysfd) - runtime.KeepAlive(pd.fd) - } -} - -func (pd *pollDesc) prepare(mode int) error { - if pd.closing { - return errClosing - } - return nil -} - -func (pd *pollDesc) prepareRead() error { return pd.prepare('r') } - -func (pd *pollDesc) prepareWrite() error { return pd.prepare('w') } - -func (pd *pollDesc) wait(mode int) error { - if pd.closing { - return errClosing - } - return errTimeout -} - -func (pd *pollDesc) waitRead() error { return pd.wait('r') } - -func (pd *pollDesc) waitWrite() error { return pd.wait('w') } - -func (pd *pollDesc) waitCanceled(mode int) {} - -func (pd *pollDesc) waitCanceledRead() {} - -func (pd *pollDesc) waitCanceledWrite() {} - -func (fd *netFD) setDeadline(t time.Time) error { - return setDeadlineImpl(fd, t, 'r'+'w') -} - -func (fd *netFD) setReadDeadline(t time.Time) error { - return setDeadlineImpl(fd, t, 'r') -} - -func (fd *netFD) setWriteDeadline(t time.Time) error { - return setDeadlineImpl(fd, t, 'w') -} - -func setDeadlineImpl(fd *netFD, t time.Time, mode int) error { - d := t.UnixNano() - if t.IsZero() { - d = 0 - } - if err := fd.incref(); err != nil { - return err - } - switch mode { - case 'r': - syscall.SetReadDeadline(fd.sysfd, d) - case 'w': - syscall.SetWriteDeadline(fd.sysfd, d) - case 'r' + 'w': - syscall.SetReadDeadline(fd.sysfd, d) - syscall.SetWriteDeadline(fd.sysfd, d) - } - fd.decref() - return nil -} diff --git a/src/net/fd_poll_runtime.go b/src/net/fd_poll_runtime.go deleted file mode 100644 index 62b69fcbf1..0000000000 --- a/src/net/fd_poll_runtime.go +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright 2013 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// +build darwin dragonfly freebsd linux netbsd openbsd windows solaris - -package net - -import ( - "runtime" - "sync" - "syscall" - "time" -) - -// runtimeNano returns the current value of the runtime clock in nanoseconds. -func runtimeNano() int64 - -func runtime_pollServerInit() -func runtime_pollOpen(fd uintptr) (uintptr, int) -func runtime_pollClose(ctx uintptr) -func runtime_pollWait(ctx uintptr, mode int) int -func runtime_pollWaitCanceled(ctx uintptr, mode int) int -func runtime_pollReset(ctx uintptr, mode int) int -func runtime_pollSetDeadline(ctx uintptr, d int64, mode int) -func runtime_pollUnblock(ctx uintptr) - -type pollDesc struct { - runtimeCtx uintptr -} - -var serverInit sync.Once - -func (pd *pollDesc) init(fd *netFD) error { - serverInit.Do(runtime_pollServerInit) - ctx, errno := runtime_pollOpen(uintptr(fd.sysfd)) - runtime.KeepAlive(fd) - if errno != 0 { - return syscall.Errno(errno) - } - pd.runtimeCtx = ctx - return nil -} - -func (pd *pollDesc) close() { - if pd.runtimeCtx == 0 { - return - } - runtime_pollClose(pd.runtimeCtx) - pd.runtimeCtx = 0 -} - -// Evict evicts fd from the pending list, unblocking any I/O running on fd. -func (pd *pollDesc) evict() { - if pd.runtimeCtx == 0 { - return - } - runtime_pollUnblock(pd.runtimeCtx) -} - -func (pd *pollDesc) prepare(mode int) error { - res := runtime_pollReset(pd.runtimeCtx, mode) - return convertErr(res) -} - -func (pd *pollDesc) prepareRead() error { - return pd.prepare('r') -} - -func (pd *pollDesc) prepareWrite() error { - return pd.prepare('w') -} - -func (pd *pollDesc) wait(mode int) error { - res := runtime_pollWait(pd.runtimeCtx, mode) - return convertErr(res) -} - -func (pd *pollDesc) waitRead() error { - return pd.wait('r') -} - -func (pd *pollDesc) waitWrite() error { - return pd.wait('w') -} - -func (pd *pollDesc) waitCanceled(mode int) { - runtime_pollWaitCanceled(pd.runtimeCtx, mode) -} - -func (pd *pollDesc) waitCanceledRead() { - pd.waitCanceled('r') -} - -func (pd *pollDesc) waitCanceledWrite() { - pd.waitCanceled('w') -} - -func convertErr(res int) error { - switch res { - case 0: - return nil - case 1: - return errClosing - case 2: - return errTimeout - } - println("unreachable: ", res) - panic("unreachable") -} - -func (fd *netFD) setDeadline(t time.Time) error { - return setDeadlineImpl(fd, t, 'r'+'w') -} - -func (fd *netFD) setReadDeadline(t time.Time) error { - return setDeadlineImpl(fd, t, 'r') -} - -func (fd *netFD) setWriteDeadline(t time.Time) error { - return setDeadlineImpl(fd, t, 'w') -} - -func setDeadlineImpl(fd *netFD, t time.Time, mode int) error { - diff := int64(time.Until(t)) - d := runtimeNano() + diff - if d <= 0 && diff > 0 { - // If the user has a deadline in the future, but the delay calculation - // overflows, then set the deadline to the maximum possible value. - d = 1<<63 - 1 - } - if t.IsZero() { - d = 0 - } - if err := fd.incref(); err != nil { - return err - } - runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode) - fd.decref() - return nil -} diff --git a/src/net/fd_posix.go b/src/net/fd_posix.go deleted file mode 100644 index b4b908abac..0000000000 --- a/src/net/fd_posix.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2009 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows - -package net - -import ( - "io" - "syscall" -) - -// eofError returns io.EOF when fd is available for reading end of -// file. -func (fd *netFD) eofError(n int, err error) error { - if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW { - return io.EOF - } - return err -} diff --git a/src/net/fd_posix_test.go b/src/net/fd_posix_test.go deleted file mode 100644 index 85711ef1b7..0000000000 --- a/src/net/fd_posix_test.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2012 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows - -package net - -import ( - "io" - "syscall" - "testing" -) - -var eofErrorTests = []struct { - n int - err error - fd *netFD - expected error -}{ - {100, nil, &netFD{sotype: syscall.SOCK_STREAM}, nil}, - {100, io.EOF, &netFD{sotype: syscall.SOCK_STREAM}, io.EOF}, - {100, errClosing, &netFD{sotype: syscall.SOCK_STREAM}, errClosing}, - {0, nil, &netFD{sotype: syscall.SOCK_STREAM}, io.EOF}, - {0, io.EOF, &netFD{sotype: syscall.SOCK_STREAM}, io.EOF}, - {0, errClosing, &netFD{sotype: syscall.SOCK_STREAM}, errClosing}, - - {100, nil, &netFD{sotype: syscall.SOCK_DGRAM}, nil}, - {100, io.EOF, &netFD{sotype: syscall.SOCK_DGRAM}, io.EOF}, - {100, errClosing, &netFD{sotype: syscall.SOCK_DGRAM}, errClosing}, - {0, nil, &netFD{sotype: syscall.SOCK_DGRAM}, nil}, - {0, io.EOF, &netFD{sotype: syscall.SOCK_DGRAM}, io.EOF}, - {0, errClosing, &netFD{sotype: syscall.SOCK_DGRAM}, errClosing}, - - {100, nil, &netFD{sotype: syscall.SOCK_SEQPACKET}, nil}, - {100, io.EOF, &netFD{sotype: syscall.SOCK_SEQPACKET}, io.EOF}, - {100, errClosing, &netFD{sotype: syscall.SOCK_SEQPACKET}, errClosing}, - {0, nil, &netFD{sotype: syscall.SOCK_SEQPACKET}, io.EOF}, - {0, io.EOF, &netFD{sotype: syscall.SOCK_SEQPACKET}, io.EOF}, - {0, errClosing, &netFD{sotype: syscall.SOCK_SEQPACKET}, errClosing}, - - {100, nil, &netFD{sotype: syscall.SOCK_RAW}, nil}, - {100, io.EOF, &netFD{sotype: syscall.SOCK_RAW}, io.EOF}, - {100, errClosing, &netFD{sotype: syscall.SOCK_RAW}, errClosing}, - {0, nil, &netFD{sotype: syscall.SOCK_RAW}, nil}, - {0, io.EOF, &netFD{sotype: syscall.SOCK_RAW}, io.EOF}, - {0, errClosing, &netFD{sotype: syscall.SOCK_RAW}, errClosing}, -} - -func TestEOFError(t *testing.T) { - for _, tt := range eofErrorTests { - actual := tt.fd.eofError(tt.n, tt.err) - if actual != tt.expected { - t.Errorf("eofError(%v, %v, %v): expected %v, actual %v", tt.n, tt.err, tt.fd.sotype, tt.expected, actual) - } - } -} diff --git a/src/net/fd_unix.go b/src/net/fd_unix.go index 3c95fc01d4..9f36069bf3 100644 --- a/src/net/fd_unix.go +++ b/src/net/fd_unix.go @@ -8,7 +8,7 @@ package net import ( "context" - "io" + "internal/poll" "os" "runtime" "sync/atomic" @@ -17,38 +17,36 @@ import ( // Network file descriptor. type netFD struct { - // locking/lifetime of sysfd + serialize access to Read and Write methods - fdmu fdMutex + pfd poll.FD // immutable until Close - sysfd int family int sotype int - isStream bool isConnected bool net string laddr Addr raddr Addr - - // writev cache. - iovecs *[]syscall.Iovec - - // wait server - pd pollDesc } func sysInit() { } func newFD(sysfd, family, sotype int, net string) (*netFD, error) { - return &netFD{sysfd: sysfd, family: family, sotype: sotype, net: net, isStream: sotype == syscall.SOCK_STREAM}, nil + ret := &netFD{ + pfd: poll.FD{ + Sysfd: sysfd, + IsStream: sotype == syscall.SOCK_STREAM, + ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW, + }, + family: family, + sotype: sotype, + net: net, + } + return ret, nil } func (fd *netFD) init() error { - if err := fd.pd.init(fd); err != nil { - return err - } - return nil + return fd.pfd.Init() } func (fd *netFD) setAddr(laddr, raddr Addr) { @@ -72,7 +70,7 @@ func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (ret erro // Do not need to call fd.writeLock here, // because fd is not yet accessible to user, // so no concurrent operations are possible. - switch err := connectFunc(fd.sysfd, ra); err { + switch err := connectFunc(fd.pfd.Sysfd, ra); err { case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR: case nil, syscall.EISCONN: select { @@ -80,9 +78,10 @@ func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (ret erro return mapErr(ctx.Err()) default: } - if err := fd.init(); err != nil { + if err := fd.pfd.Init(); err != nil { return err } + runtime.KeepAlive(fd) return nil case syscall.EINVAL: // On Solaris we can see EINVAL if the socket has @@ -97,12 +96,12 @@ func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (ret erro default: return os.NewSyscallError("connect", err) } - if err := fd.init(); err != nil { + if err := fd.pfd.Init(); err != nil { return err } if deadline, _ := ctx.Deadline(); !deadline.IsZero() { - fd.setWriteDeadline(deadline) - defer fd.setWriteDeadline(noDeadline) + fd.pfd.SetWriteDeadline(deadline) + defer fd.pfd.SetWriteDeadline(noDeadline) } // Start the "interrupter" goroutine, if this context might be canceled. @@ -119,7 +118,7 @@ func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (ret erro defer func() { close(done) if ctxErr := <-interruptRes; ctxErr != nil && ret == nil { - // The interrupter goroutine called setWriteDeadline, + // The interrupter goroutine called SetWriteDeadline, // but the connect code below had returned from // waitWrite already and did a successful connect (ret // == nil). Because we've now poisoned the connection @@ -135,7 +134,7 @@ func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (ret erro // Force the runtime's poller to immediately give up // waiting for writability, unblocking waitWrite // below. - fd.setWriteDeadline(aLongTimeAgo) + fd.pfd.SetWriteDeadline(aLongTimeAgo) testHookCanceledDial() interruptRes <- ctx.Err() case <-done: @@ -153,7 +152,7 @@ func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (ret erro // SO_ERROR socket option to see if the connection // succeeded or failed. See issue 7474 for further // details. - if err := fd.pd.waitWrite(); err != nil { + if err := fd.pfd.WaitWrite(); err != nil { select { case <-ctx.Done(): return mapErr(ctx.Err()) @@ -161,7 +160,7 @@ func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (ret erro } return err } - nerr, err := getsockoptIntFunc(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR) + nerr, err := getsockoptIntFunc(fd.pfd.Sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR) if err != nil { return os.NewSyscallError("getsockopt", err) } @@ -174,45 +173,26 @@ func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (ret erro // See golang.org/issue/14548. // On Darwin, multiple connect system calls on // a non-blocking socket never harm SO_ERROR. - switch err := connectFunc(fd.sysfd, ra); err { + switch err := connectFunc(fd.pfd.Sysfd, ra); err { case nil, syscall.EISCONN: return nil } default: return os.NewSyscallError("getsockopt", err) } + runtime.KeepAlive(fd) } } -func (fd *netFD) destroy() { - // Poller may want to unregister fd in readiness notification mechanism, - // so this must be executed before closeFunc. - fd.pd.close() - closeFunc(fd.sysfd) - fd.sysfd = -1 - runtime.SetFinalizer(fd, nil) -} - func (fd *netFD) Close() error { - if !fd.fdmu.increfAndClose() { - return errClosing - } - // Unblock any I/O. Once it all unblocks and returns, - // so that it cannot be referring to fd.sysfd anymore, - // the final decref will close fd.sysfd. This should happen - // fairly quickly, since all the I/O is non-blocking, and any - // attempts to block in the pollDesc will return errClosing. - fd.pd.evict() - fd.decref() - return nil + runtime.SetFinalizer(fd, nil) + return fd.pfd.Close() } func (fd *netFD) shutdown(how int) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("shutdown", syscall.Shutdown(fd.sysfd, how)) + err := fd.pfd.Shutdown(how) + runtime.KeepAlive(fd) + return wrapSyscallError("shutdown", err) } func (fd *netFD) closeRead() error { @@ -224,233 +204,59 @@ func (fd *netFD) closeWrite() error { } func (fd *netFD) Read(p []byte) (n int, err error) { - if err := fd.readLock(); err != nil { - return 0, err - } - defer fd.readUnlock() - if len(p) == 0 { - // If the caller wanted a zero byte read, return immediately - // without trying. (But after acquiring the readLock.) Otherwise - // syscall.Read returns 0, nil and eofError turns that into - // io.EOF. - // TODO(bradfitz): make it wait for readability? (Issue 15735) - return 0, nil - } - if err := fd.pd.prepareRead(); err != nil { - return 0, err - } - if fd.isStream && len(p) > 1<<30 { - p = p[:1<<30] - } - for { - n, err = syscall.Read(fd.sysfd, p) - if err != nil { - n = 0 - if err == syscall.EAGAIN { - if err = fd.pd.waitRead(); err == nil { - continue - } - } - } - err = fd.eofError(n, err) - break - } - if _, ok := err.(syscall.Errno); ok { - err = os.NewSyscallError("read", err) - } - return + n, err = fd.pfd.Read(p) + runtime.KeepAlive(fd) + return n, wrapSyscallError("read", err) } func (fd *netFD) readFrom(p []byte) (n int, sa syscall.Sockaddr, err error) { - if err := fd.readLock(); err != nil { - return 0, nil, err - } - defer fd.readUnlock() - if err := fd.pd.prepareRead(); err != nil { - return 0, nil, err - } - for { - n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0) - if err != nil { - n = 0 - if err == syscall.EAGAIN { - if err = fd.pd.waitRead(); err == nil { - continue - } - } - } - err = fd.eofError(n, err) - break - } - if _, ok := err.(syscall.Errno); ok { - err = os.NewSyscallError("recvfrom", err) - } - return + n, sa, err = fd.pfd.RecvFrom(p) + runtime.KeepAlive(fd) + return n, sa, wrapSyscallError("recvfrom", err) } func (fd *netFD) readMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) { - if err := fd.readLock(); err != nil { - return 0, 0, 0, nil, err - } - defer fd.readUnlock() - if err := fd.pd.prepareRead(); err != nil { - return 0, 0, 0, nil, err - } - for { - n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0) - if err != nil { - // TODO(dfc) should n and oobn be set to 0 - if err == syscall.EAGAIN { - if err = fd.pd.waitRead(); err == nil { - continue - } - } - } - err = fd.eofError(n, err) - break - } - if _, ok := err.(syscall.Errno); ok { - err = os.NewSyscallError("recvmsg", err) - } - return + n, oobn, flags, sa, err = fd.pfd.ReadMsg(p, oob) + runtime.KeepAlive(fd) + return n, oobn, flags, sa, wrapSyscallError("recvmsg", err) } func (fd *netFD) Write(p []byte) (nn int, err error) { - if err := fd.writeLock(); err != nil { - return 0, err - } - defer fd.writeUnlock() - if err := fd.pd.prepareWrite(); err != nil { - return 0, err - } - for { - var n int - max := len(p) - if fd.isStream && max-nn > 1<<30 { - max = nn + 1<<30 - } - n, err = syscall.Write(fd.sysfd, p[nn:max]) - if n > 0 { - nn += n - } - if nn == len(p) { - break - } - if err == syscall.EAGAIN { - if err = fd.pd.waitWrite(); err == nil { - continue - } - } - if err != nil { - break - } - if n == 0 { - err = io.ErrUnexpectedEOF - break - } - } - if _, ok := err.(syscall.Errno); ok { - err = os.NewSyscallError("write", err) - } - return nn, err + nn, err = fd.pfd.Write(p) + runtime.KeepAlive(fd) + return nn, wrapSyscallError("write", err) } func (fd *netFD) writeTo(p []byte, sa syscall.Sockaddr) (n int, err error) { - if err := fd.writeLock(); err != nil { - return 0, err - } - defer fd.writeUnlock() - if err := fd.pd.prepareWrite(); err != nil { - return 0, err - } - for { - err = syscall.Sendto(fd.sysfd, p, 0, sa) - if err == syscall.EAGAIN { - if err = fd.pd.waitWrite(); err == nil { - continue - } - } - break - } - if err == nil { - n = len(p) - } - if _, ok := err.(syscall.Errno); ok { - err = os.NewSyscallError("sendto", err) - } - return + n, err = fd.pfd.WriteTo(p, sa) + runtime.KeepAlive(fd) + return n, wrapSyscallError("sendto", err) } func (fd *netFD) writeMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) { - if err := fd.writeLock(); err != nil { - return 0, 0, err - } - defer fd.writeUnlock() - if err := fd.pd.prepareWrite(); err != nil { - return 0, 0, err - } - for { - n, err = syscall.SendmsgN(fd.sysfd, p, oob, sa, 0) - if err == syscall.EAGAIN { - if err = fd.pd.waitWrite(); err == nil { - continue - } - } - break - } - if err == nil { - oobn = len(oob) - } - if _, ok := err.(syscall.Errno); ok { - err = os.NewSyscallError("sendmsg", err) - } - return + n, oobn, err = fd.pfd.WriteMsg(p, oob, sa) + runtime.KeepAlive(fd) + return n, oobn, wrapSyscallError("sendmsg", err) } func (fd *netFD) accept() (netfd *netFD, err error) { - if err := fd.readLock(); err != nil { - return nil, err - } - defer fd.readUnlock() - - var s int - var rsa syscall.Sockaddr - if err = fd.pd.prepareRead(); err != nil { - return nil, err - } - for { - s, rsa, err = accept(fd.sysfd) - if err != nil { - nerr, ok := err.(*os.SyscallError) - if !ok { - return nil, err - } - switch nerr.Err { - case syscall.EAGAIN: - if err = fd.pd.waitRead(); err == nil { - continue - } - case syscall.ECONNABORTED: - // This means that a socket on the - // listen queue was closed before we - // Accept()ed it; it's a silly error, - // so try again. - continue - } - return nil, err + d, rsa, errcall, err := fd.pfd.Accept() + if err != nil { + if errcall != "" { + err = wrapSyscallError(errcall, err) } - break + return nil, err } - if netfd, err = newFD(s, fd.family, fd.sotype, fd.net); err != nil { - closeFunc(s) + if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil { + poll.CloseFunc(d) return nil, err } if err = netfd.init(); err != nil { fd.Close() return nil, err } - lsa, _ := syscall.Getsockname(netfd.sysfd) + lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd) netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa)) return netfd, nil } @@ -503,7 +309,7 @@ func dupCloseOnExecOld(fd int) (newfd int, err error) { } func (fd *netFD) dup() (f *os.File, err error) { - ns, err := dupCloseOnExec(fd.sysfd) + ns, err := dupCloseOnExec(fd.pfd.Sysfd) if err != nil { return nil, err } diff --git a/src/net/fd_windows.go b/src/net/fd_windows.go index a976f2ac7f..2182b730f9 100644 --- a/src/net/fd_windows.go +++ b/src/net/fd_windows.go @@ -6,62 +6,14 @@ package net import ( "context" - "internal/race" + "internal/poll" "os" "runtime" - "sync" "syscall" "unsafe" ) -var ( - initErr error - ioSync uint64 -) - -// CancelIo Windows API cancels all outstanding IO for a particular -// socket on current thread. To overcome that limitation, we run -// special goroutine, locked to OS single thread, that both starts -// and cancels IO. It means, there are 2 unavoidable thread switches -// for every IO. -// Some newer versions of Windows has new CancelIoEx API, that does -// not have that limitation and can be used from any thread. This -// package uses CancelIoEx API, if present, otherwise it fallback -// to CancelIo. - -var ( - canCancelIO bool // determines if CancelIoEx API is present - skipSyncNotif bool - hasLoadSetFileCompletionNotificationModes bool -) - func sysInit() { - var d syscall.WSAData - e := syscall.WSAStartup(uint32(0x202), &d) - if e != nil { - initErr = os.NewSyscallError("wsastartup", e) - } - canCancelIO = syscall.LoadCancelIoEx() == nil - hasLoadSetFileCompletionNotificationModes = syscall.LoadSetFileCompletionNotificationModes() == nil - if hasLoadSetFileCompletionNotificationModes { - // It's not safe to use FILE_SKIP_COMPLETION_PORT_ON_SUCCESS if non IFS providers are installed: - // http://support.microsoft.com/kb/2568167 - skipSyncNotif = true - protos := [2]int32{syscall.IPPROTO_TCP, 0} - var buf [32]syscall.WSAProtocolInfo - len := uint32(unsafe.Sizeof(buf)) - n, err := syscall.WSAEnumProtocols(&protos[0], &buf[0], &len) - if err != nil { - skipSyncNotif = false - } else { - for i := int32(0); i < n; i++ { - if buf[i].ServiceFlags1&syscall.XP1_IFS_HANDLES == 0 { - skipSyncNotif = false - break - } - } - } - } } // canUseConnectEx reports whether we can use the ConnectEx Windows API call @@ -75,257 +27,39 @@ func canUseConnectEx(net string) bool { return false } -// operation contains superset of data necessary to perform all async IO. -type operation struct { - // Used by IOCP interface, it must be first field - // of the struct, as our code rely on it. - o syscall.Overlapped - - // fields used by runtime.netpoll - runtimeCtx uintptr - mode int32 - errno int32 - qty uint32 - - // fields used only by net package - fd *netFD - errc chan error - buf syscall.WSABuf - sa syscall.Sockaddr - rsa *syscall.RawSockaddrAny - rsan int32 - handle syscall.Handle - flags uint32 - bufs []syscall.WSABuf -} - -func (o *operation) InitBuf(buf []byte) { - o.buf.Len = uint32(len(buf)) - o.buf.Buf = nil - if len(buf) != 0 { - o.buf.Buf = &buf[0] - } -} - -func (o *operation) InitBufs(buf *Buffers) { - if o.bufs == nil { - o.bufs = make([]syscall.WSABuf, 0, len(*buf)) - } else { - o.bufs = o.bufs[:0] - } - for _, b := range *buf { - var p *byte - if len(b) > 0 { - p = &b[0] - } - o.bufs = append(o.bufs, syscall.WSABuf{Len: uint32(len(b)), Buf: p}) - } -} - -// ClearBufs clears all pointers to Buffers parameter captured -// by InitBufs, so it can be released by garbage collector. -func (o *operation) ClearBufs() { - for i := range o.bufs { - o.bufs[i].Buf = nil - } - o.bufs = o.bufs[:0] -} - -// ioSrv executes net IO requests. -type ioSrv struct { - req chan ioSrvReq -} - -type ioSrvReq struct { - o *operation - submit func(o *operation) error // if nil, cancel the operation -} - -// ProcessRemoteIO will execute submit IO requests on behalf -// of other goroutines, all on a single os thread, so it can -// cancel them later. Results of all operations will be sent -// back to their requesters via channel supplied in request. -// It is used only when the CancelIoEx API is unavailable. -func (s *ioSrv) ProcessRemoteIO() { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - for r := range s.req { - if r.submit != nil { - r.o.errc <- r.submit(r.o) - } else { - r.o.errc <- syscall.CancelIo(r.o.fd.sysfd) - } - } -} - -// ExecIO executes a single IO operation o. It submits and cancels -// IO in the current thread for systems where Windows CancelIoEx API -// is available. Alternatively, it passes the request onto -// runtime netpoll and waits for completion or cancels request. -func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) error) (int, error) { - fd := o.fd - // Notify runtime netpoll about starting IO. - err := fd.pd.prepare(int(o.mode)) - if err != nil { - return 0, err - } - // Start IO. - if canCancelIO { - err = submit(o) - } else { - // Send request to a special dedicated thread, - // so it can stop the IO with CancelIO later. - s.req <- ioSrvReq{o, submit} - err = <-o.errc - } - switch err { - case nil: - // IO completed immediately - if o.fd.skipSyncNotif { - // No completion message will follow, so return immediately. - return int(o.qty), nil - } - // Need to get our completion message anyway. - case syscall.ERROR_IO_PENDING: - // IO started, and we have to wait for its completion. - err = nil - default: - return 0, err - } - // Wait for our request to complete. - err = fd.pd.wait(int(o.mode)) - if err == nil { - // All is good. Extract our IO results and return. - if o.errno != 0 { - err = syscall.Errno(o.errno) - return 0, err - } - return int(o.qty), nil - } - // IO is interrupted by "close" or "timeout" - netpollErr := err - switch netpollErr { - case errClosing, errTimeout: - // will deal with those. - default: - panic("net: unexpected runtime.netpoll error: " + netpollErr.Error()) - } - // Cancel our request. - if canCancelIO { - err := syscall.CancelIoEx(fd.sysfd, &o.o) - // Assuming ERROR_NOT_FOUND is returned, if IO is completed. - if err != nil && err != syscall.ERROR_NOT_FOUND { - // TODO(brainman): maybe do something else, but panic. - panic(err) - } - } else { - s.req <- ioSrvReq{o, nil} - <-o.errc - } - // Wait for cancelation to complete. - fd.pd.waitCanceled(int(o.mode)) - if o.errno != 0 { - err = syscall.Errno(o.errno) - if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled - err = netpollErr - } - return 0, err - } - // We issued a cancelation request. But, it seems, IO operation succeeded - // before the cancelation request run. We need to treat the IO operation as - // succeeded (the bytes are actually sent/recv from network). - return int(o.qty), nil -} - -// Start helper goroutines. -var rsrv, wsrv *ioSrv -var onceStartServer sync.Once - -func startServer() { - rsrv = new(ioSrv) - wsrv = new(ioSrv) - if !canCancelIO { - // Only CancelIo API is available. Lets start two special goroutines - // locked to an OS thread, that both starts and cancels IO. One will - // process read requests, while other will do writes. - rsrv.req = make(chan ioSrvReq) - go rsrv.ProcessRemoteIO() - wsrv.req = make(chan ioSrvReq) - go wsrv.ProcessRemoteIO() - } -} - // Network file descriptor. type netFD struct { - // locking/lifetime of sysfd + serialize access to Read and Write methods - fdmu fdMutex + pfd poll.FD // immutable until Close - sysfd syscall.Handle - family int - sotype int - isStream bool - isConnected bool - skipSyncNotif bool - net string - laddr Addr - raddr Addr - - rop operation // read operation - wop operation // write operation - - // wait server - pd pollDesc + family int + sotype int + isConnected bool + net string + laddr Addr + raddr Addr } func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) { - if initErr != nil { - return nil, initErr + ret := &netFD{ + pfd: poll.FD{ + Sysfd: sysfd, + IsStream: sotype == syscall.SOCK_STREAM, + ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW, + }, + family: family, + sotype: sotype, + net: net, } - onceStartServer.Do(startServer) - return &netFD{sysfd: sysfd, family: family, sotype: sotype, net: net, isStream: sotype == syscall.SOCK_STREAM}, nil + return ret, nil } func (fd *netFD) init() error { - if err := fd.pd.init(fd); err != nil { - return err - } - if hasLoadSetFileCompletionNotificationModes { - // We do not use events, so we can skip them always. - flags := uint8(syscall.FILE_SKIP_SET_EVENT_ON_HANDLE) - // It's not safe to skip completion notifications for UDP: - // http://blogs.technet.com/b/winserverperformance/archive/2008/06/26/designing-applications-for-high-performance-part-iii.aspx - if skipSyncNotif && fd.net == "tcp" { - flags |= syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS - } - err := syscall.SetFileCompletionNotificationModes(fd.sysfd, flags) - if err == nil && flags&syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS != 0 { - fd.skipSyncNotif = true - } - } - // Disable SIO_UDP_CONNRESET behavior. - // http://support.microsoft.com/kb/263823 - switch fd.net { - case "udp", "udp4", "udp6": - ret := uint32(0) - flag := uint32(0) - size := uint32(unsafe.Sizeof(flag)) - err := syscall.WSAIoctl(fd.sysfd, syscall.SIO_UDP_CONNRESET, (*byte)(unsafe.Pointer(&flag)), size, nil, 0, &ret, nil, 0) - if err != nil { - return os.NewSyscallError("wsaioctl", err) - } - } - fd.rop.mode = 'r' - fd.wop.mode = 'w' - fd.rop.fd = fd - fd.wop.fd = fd - fd.rop.runtimeCtx = fd.pd.runtimeCtx - fd.wop.runtimeCtx = fd.pd.runtimeCtx - if !canCancelIO { - fd.rop.errc = make(chan error) - fd.wop.errc = make(chan error) + errcall, err := fd.pfd.Init(fd.net) + if errcall != "" { + err = wrapSyscallError(errcall, err) } - return nil + return err } func (fd *netFD) setAddr(laddr, raddr Addr) { @@ -342,11 +76,11 @@ func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) error { return err } if deadline, ok := ctx.Deadline(); ok && !deadline.IsZero() { - fd.setWriteDeadline(deadline) - defer fd.setWriteDeadline(noDeadline) + fd.pfd.SetWriteDeadline(deadline) + defer fd.pfd.SetWriteDeadline(noDeadline) } if !canUseConnectEx(fd.net) { - err := connectFunc(fd.sysfd, ra) + err := connectFunc(fd.pfd.Sysfd, ra) return os.NewSyscallError("connect", err) } // ConnectEx windows API requires an unconnected, previously bound socket. @@ -359,13 +93,10 @@ func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) error { default: panic("unexpected type in connect") } - if err := syscall.Bind(fd.sysfd, la); err != nil { + if err := syscall.Bind(fd.pfd.Sysfd, la); err != nil { return os.NewSyscallError("bind", err) } } - // Call ConnectEx API. - o := &fd.wop - o.sa = ra // Wait for the goroutine converting context.Done into a write timeout // to exist, otherwise our caller might cancel the context and @@ -377,16 +108,14 @@ func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) error { case <-ctx.Done(): // Force the runtime's poller to immediately give // up waiting for writability. - fd.setWriteDeadline(aLongTimeAgo) + fd.pfd.SetWriteDeadline(aLongTimeAgo) <-done case <-done: } }() - _, err := wsrv.ExecIO(o, "ConnectEx", func(o *operation) error { - return connectExFunc(o.fd.sysfd, o.sa, nil, 0, nil, &o.o) - }) - if err != nil { + // Call ConnectEx API. + if err := fd.pfd.ConnectEx(ra); err != nil { select { case <-ctx.Done(): return mapErr(ctx.Err()) @@ -398,38 +127,18 @@ func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) error { } } // Refresh socket properties. - return os.NewSyscallError("setsockopt", syscall.Setsockopt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_UPDATE_CONNECT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd)))) -} - -func (fd *netFD) destroy() { - if fd.sysfd == syscall.InvalidHandle { - return - } - // Poller may want to unregister fd in readiness notification mechanism, - // so this must be executed before closeFunc. - fd.pd.close() - closeFunc(fd.sysfd) - fd.sysfd = syscall.InvalidHandle - // no need for a finalizer anymore - runtime.SetFinalizer(fd, nil) + return os.NewSyscallError("setsockopt", syscall.Setsockopt(fd.pfd.Sysfd, syscall.SOL_SOCKET, syscall.SO_UPDATE_CONNECT_CONTEXT, (*byte)(unsafe.Pointer(&fd.pfd.Sysfd)), int32(unsafe.Sizeof(fd.pfd.Sysfd)))) } func (fd *netFD) Close() error { - if !fd.fdmu.increfAndClose() { - return errClosing - } - // unblock pending reader and writer - fd.pd.evict() - fd.decref() - return nil + runtime.SetFinalizer(fd, nil) + return fd.pfd.Close() } func (fd *netFD) shutdown(how int) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return syscall.Shutdown(fd.sysfd, how) + err := fd.pfd.Shutdown(how) + runtime.KeepAlive(fd) + return err } func (fd *netFD) closeRead() error { @@ -441,72 +150,21 @@ func (fd *netFD) closeWrite() error { } func (fd *netFD) Read(buf []byte) (int, error) { - if err := fd.readLock(); err != nil { - return 0, err - } - defer fd.readUnlock() - o := &fd.rop - o.InitBuf(buf) - n, err := rsrv.ExecIO(o, "WSARecv", func(o *operation) error { - return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil) - }) - if race.Enabled { - race.Acquire(unsafe.Pointer(&ioSync)) - } - if len(buf) != 0 { - err = fd.eofError(n, err) - } - if _, ok := err.(syscall.Errno); ok { - err = os.NewSyscallError("wsarecv", err) - } - return n, err + n, err := fd.pfd.Read(buf) + runtime.KeepAlive(fd) + return n, wrapSyscallError("wsarecv", err) } func (fd *netFD) readFrom(buf []byte) (int, syscall.Sockaddr, error) { - if len(buf) == 0 { - return 0, nil, nil - } - if err := fd.readLock(); err != nil { - return 0, nil, err - } - defer fd.readUnlock() - o := &fd.rop - o.InitBuf(buf) - n, err := rsrv.ExecIO(o, "WSARecvFrom", func(o *operation) error { - if o.rsa == nil { - o.rsa = new(syscall.RawSockaddrAny) - } - o.rsan = int32(unsafe.Sizeof(*o.rsa)) - return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil) - }) - err = fd.eofError(n, err) - if _, ok := err.(syscall.Errno); ok { - err = os.NewSyscallError("wsarecvfrom", err) - } - if err != nil { - return n, nil, err - } - sa, _ := o.rsa.Sockaddr() - return n, sa, nil + n, sa, err := fd.pfd.RecvFrom(buf) + runtime.KeepAlive(fd) + return n, sa, wrapSyscallError("wsarecvfrom", err) } func (fd *netFD) Write(buf []byte) (int, error) { - if err := fd.writeLock(); err != nil { - return 0, err - } - defer fd.writeUnlock() - if race.Enabled { - race.ReleaseMerge(unsafe.Pointer(&ioSync)) - } - o := &fd.wop - o.InitBuf(buf) - n, err := wsrv.ExecIO(o, "WSASend", func(o *operation) error { - return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil) - }) - if _, ok := err.(syscall.Errno); ok { - err = os.NewSyscallError("wsasend", err) - } - return n, err + n, err := fd.pfd.Write(buf) + runtime.KeepAlive(fd) + return n, wrapSyscallError("wsasend", err) } func (c *conn) writeBuffers(v *Buffers) (int64, error) { @@ -521,61 +179,33 @@ func (c *conn) writeBuffers(v *Buffers) (int64, error) { } func (fd *netFD) writeBuffers(buf *Buffers) (int64, error) { - if len(*buf) == 0 { - return 0, nil - } - if err := fd.writeLock(); err != nil { - return 0, err - } - defer fd.writeUnlock() - if race.Enabled { - race.ReleaseMerge(unsafe.Pointer(&ioSync)) - } - o := &fd.wop - o.InitBufs(buf) - n, err := wsrv.ExecIO(o, "WSASend", func(o *operation) error { - return syscall.WSASend(o.fd.sysfd, &o.bufs[0], uint32(len(*buf)), &o.qty, 0, &o.o, nil) - }) - o.ClearBufs() - if _, ok := err.(syscall.Errno); ok { - err = os.NewSyscallError("wsasend", err) - } - testHookDidWritev(n) - buf.consume(int64(n)) - return int64(n), err + n, err := fd.pfd.Writev((*[][]byte)(buf)) + runtime.KeepAlive(fd) + return n, wrapSyscallError("wsasend", err) } func (fd *netFD) writeTo(buf []byte, sa syscall.Sockaddr) (int, error) { - if len(buf) == 0 { - return 0, nil - } - if err := fd.writeLock(); err != nil { - return 0, err - } - defer fd.writeUnlock() - o := &fd.wop - o.InitBuf(buf) - o.sa = sa - n, err := wsrv.ExecIO(o, "WSASendto", func(o *operation) error { - return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil) - }) - if _, ok := err.(syscall.Errno); ok { - err = os.NewSyscallError("wsasendto", err) - } - return n, err + n, err := fd.pfd.WriteTo(buf, sa) + runtime.KeepAlive(fd) + return n, wrapSyscallError("wsasendto", err) } -func (fd *netFD) acceptOne(rawsa []syscall.RawSockaddrAny, o *operation) (*netFD, error) { - // Get new socket. - s, err := sysSocket(fd.family, fd.sotype, 0) +func (fd *netFD) accept() (*netFD, error) { + s, rawsa, rsan, errcall, err := fd.pfd.Accept(func() (syscall.Handle, error) { + return sysSocket(fd.family, fd.sotype, 0) + }) + if err != nil { + if errcall != "" { + err = wrapSyscallError(errcall, err) + } return nil, err } // Associate our new socket with IOCP. netfd, err := newFD(s, fd.family, fd.sotype, fd.net) if err != nil { - closeFunc(s) + poll.CloseFunc(s) return nil, err } if err := netfd.init(); err != nil { @@ -583,71 +213,11 @@ func (fd *netFD) acceptOne(rawsa []syscall.RawSockaddrAny, o *operation) (*netFD return nil, err } - // Submit accept request. - o.handle = s - o.rsan = int32(unsafe.Sizeof(rawsa[0])) - _, err = rsrv.ExecIO(o, "AcceptEx", func(o *operation) error { - return acceptFunc(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o) - }) - if err != nil { - netfd.Close() - if _, ok := err.(syscall.Errno); ok { - err = os.NewSyscallError("acceptex", err) - } - return nil, err - } - - // Inherit properties of the listening socket. - err = syscall.Setsockopt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd))) - if err != nil { - netfd.Close() - return nil, os.NewSyscallError("setsockopt", err) - } - runtime.KeepAlive(fd) - return netfd, nil -} - -func (fd *netFD) accept() (*netFD, error) { - if err := fd.readLock(); err != nil { - return nil, err - } - defer fd.readUnlock() - - o := &fd.rop - var netfd *netFD - var err error - var rawsa [2]syscall.RawSockaddrAny - for { - netfd, err = fd.acceptOne(rawsa[:], o) - if err == nil { - break - } - // Sometimes we see WSAECONNRESET and ERROR_NETNAME_DELETED is - // returned here. These happen if connection reset is received - // before AcceptEx could complete. These errors relate to new - // connection, not to AcceptEx, so ignore broken connection and - // try AcceptEx again for more connections. - nerr, ok := err.(*os.SyscallError) - if !ok { - return nil, err - } - errno, ok := nerr.Err.(syscall.Errno) - if !ok { - return nil, err - } - switch errno { - case syscall.ERROR_NETNAME_DELETED, syscall.WSAECONNRESET: - // ignore these and try again - default: - return nil, err - } - } - // Get local and peer addr out of AcceptEx buffer. var lrsa, rrsa *syscall.RawSockaddrAny var llen, rlen int32 syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&rawsa[0])), - 0, uint32(o.rsan), uint32(o.rsan), &lrsa, &llen, &rrsa, &rlen) + 0, rsan, rsan, &lrsa, &llen, &rrsa, &rlen) lsa, _ := lrsa.Sockaddr() rsa, _ := rrsa.Sockaddr() diff --git a/src/net/file_unix.go b/src/net/file_unix.go index 9e581fcb41..d67dff8e05 100644 --- a/src/net/file_unix.go +++ b/src/net/file_unix.go @@ -7,6 +7,7 @@ package net import ( + "internal/poll" "os" "syscall" ) @@ -17,7 +18,7 @@ func dupSocket(f *os.File) (int, error) { return -1, err } if err := syscall.SetNonblock(s, true); err != nil { - closeFunc(s) + poll.CloseFunc(s) return -1, os.NewSyscallError("setnonblock", err) } return s, nil @@ -31,7 +32,7 @@ func newFileFD(f *os.File) (*netFD, error) { family := syscall.AF_UNSPEC sotype, err := syscall.GetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_TYPE) if err != nil { - closeFunc(s) + poll.CloseFunc(s) return nil, os.NewSyscallError("getsockopt", err) } lsa, _ := syscall.Getsockname(s) @@ -44,12 +45,12 @@ func newFileFD(f *os.File) (*netFD, error) { case *syscall.SockaddrUnix: family = syscall.AF_UNIX default: - closeFunc(s) + poll.CloseFunc(s) return nil, syscall.EPROTONOSUPPORT } fd, err := newFD(s, family, sotype, "") if err != nil { - closeFunc(s) + poll.CloseFunc(s) return nil, err } laddr := fd.addrFunc()(lsa) diff --git a/src/net/hook_cloexec.go b/src/net/hook_cloexec.go deleted file mode 100644 index 870f0d78b1..0000000000 --- a/src/net/hook_cloexec.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2015 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// +build freebsd linux - -package net - -import "syscall" - -var ( - // Placeholders for socket system calls. - accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4 -) diff --git a/src/net/hook_unix.go b/src/net/hook_unix.go index cf52567fcf..fee62a972f 100644 --- a/src/net/hook_unix.go +++ b/src/net/hook_unix.go @@ -13,10 +13,8 @@ var ( testHookCanceledDial = func() {} // for golang.org/issue/16523 // Placeholders for socket system calls. - socketFunc func(int, int, int) (int, error) = syscall.Socket - closeFunc func(int) error = syscall.Close - connectFunc func(int, syscall.Sockaddr) error = syscall.Connect - listenFunc func(int, int) error = syscall.Listen - acceptFunc func(int) (int, syscall.Sockaddr, error) = syscall.Accept - getsockoptIntFunc func(int, int, int) (int, error) = syscall.GetsockoptInt + socketFunc func(int, int, int) (int, error) = syscall.Socket + connectFunc func(int, syscall.Sockaddr) error = syscall.Connect + listenFunc func(int, int) error = syscall.Listen + getsockoptIntFunc func(int, int, int) (int, error) = syscall.GetsockoptInt ) diff --git a/src/net/hook_windows.go b/src/net/hook_windows.go index 63ea35ab8c..4e64dcef51 100644 --- a/src/net/hook_windows.go +++ b/src/net/hook_windows.go @@ -13,10 +13,7 @@ var ( testHookDialChannel = func() { time.Sleep(time.Millisecond) } // see golang.org/issue/5349 // Placeholders for socket system calls. - socketFunc func(int, int, int) (syscall.Handle, error) = syscall.Socket - closeFunc func(syscall.Handle) error = syscall.Closesocket - connectFunc func(syscall.Handle, syscall.Sockaddr) error = syscall.Connect - connectExFunc func(syscall.Handle, syscall.Sockaddr, *byte, uint32, *uint32, *syscall.Overlapped) error = syscall.ConnectEx - listenFunc func(syscall.Handle, int) error = syscall.Listen - acceptFunc func(syscall.Handle, syscall.Handle, *byte, uint32, uint32, uint32, *uint32, *syscall.Overlapped) error = syscall.AcceptEx + socketFunc func(int, int, int) (syscall.Handle, error) = syscall.Socket + connectFunc func(syscall.Handle, syscall.Sockaddr) error = syscall.Connect + listenFunc func(syscall.Handle, int) error = syscall.Listen ) diff --git a/src/net/ipsock_plan9.go b/src/net/ipsock_plan9.go index b7fd344c8a..1cd8fa23ff 100644 --- a/src/net/ipsock_plan9.go +++ b/src/net/ipsock_plan9.go @@ -249,10 +249,10 @@ func (fd *netFD) netFD() (*netFD, error) { func (fd *netFD) acceptPlan9() (nfd *netFD, err error) { defer func() { fixErr(err) }() - if err := fd.readLock(); err != nil { + if err := fd.pfd.ReadLock(); err != nil { return nil, err } - defer fd.readUnlock() + defer fd.pfd.ReadUnlock() listen, err := os.Open(fd.dir + "/listen") if err != nil { return nil, err diff --git a/src/net/ipsock_posix.go b/src/net/ipsock_posix.go index ff280c3e4e..5cb85f8c15 100644 --- a/src/net/ipsock_posix.go +++ b/src/net/ipsock_posix.go @@ -8,6 +8,7 @@ package net import ( "context" + "internal/poll" "runtime" "syscall" ) @@ -18,7 +19,7 @@ func probeIPv4Stack() bool { case syscall.EAFNOSUPPORT, syscall.EPROTONOSUPPORT: return false case nil: - closeFunc(s) + poll.CloseFunc(s) } return true } @@ -68,7 +69,7 @@ func probeIPv6Stack() (supportsIPv6, supportsIPv4map bool) { if err != nil { continue } - defer closeFunc(s) + defer poll.CloseFunc(s) syscall.SetsockoptInt(s, syscall.IPPROTO_IPV6, syscall.IPV6_V6ONLY, probes[i].value) sa, err := probes[i].laddr.sockaddr(syscall.AF_INET6) if err != nil { diff --git a/src/net/main_cloexec_test.go b/src/net/main_cloexec_test.go index 7903819585..ade71a9490 100644 --- a/src/net/main_cloexec_test.go +++ b/src/net/main_cloexec_test.go @@ -6,6 +6,8 @@ package net +import "internal/poll" + func init() { extraTestHookInstallers = append(extraTestHookInstallers, installAccept4TestHook) extraTestHookUninstallers = append(extraTestHookUninstallers, uninstallAccept4TestHook) @@ -13,13 +15,13 @@ func init() { var ( // Placeholders for saving original socket system calls. - origAccept4 = accept4Func + origAccept4 = poll.Accept4Func ) func installAccept4TestHook() { - accept4Func = sw.Accept4 + poll.Accept4Func = sw.Accept4 } func uninstallAccept4TestHook() { - accept4Func = origAccept4 + poll.Accept4Func = origAccept4 } diff --git a/src/net/main_unix_test.go b/src/net/main_unix_test.go index 0cc129f34d..9cfbc8efc4 100644 --- a/src/net/main_unix_test.go +++ b/src/net/main_unix_test.go @@ -6,13 +6,15 @@ package net +import "internal/poll" + var ( // Placeholders for saving original socket system calls. origSocket = socketFunc - origClose = closeFunc + origClose = poll.CloseFunc origConnect = connectFunc origListen = listenFunc - origAccept = acceptFunc + origAccept = poll.AcceptFunc origGetsockoptInt = getsockoptIntFunc extraTestHookInstallers []func() @@ -21,10 +23,10 @@ var ( func installTestHooks() { socketFunc = sw.Socket - closeFunc = sw.Close + poll.CloseFunc = sw.Close connectFunc = sw.Connect listenFunc = sw.Listen - acceptFunc = sw.Accept + poll.AcceptFunc = sw.Accept getsockoptIntFunc = sw.GetsockoptInt for _, fn := range extraTestHookInstallers { @@ -34,10 +36,10 @@ func installTestHooks() { func uninstallTestHooks() { socketFunc = origSocket - closeFunc = origClose + poll.CloseFunc = origClose connectFunc = origConnect listenFunc = origListen - acceptFunc = origAccept + poll.AcceptFunc = origAccept getsockoptIntFunc = origGetsockoptInt for _, fn := range extraTestHookUninstallers { @@ -48,6 +50,6 @@ func uninstallTestHooks() { // forceCloseSockets must be called only from TestMain. func forceCloseSockets() { for s := range sw.Sockets() { - closeFunc(s) + poll.CloseFunc(s) } } diff --git a/src/net/main_windows_test.go b/src/net/main_windows_test.go index 6ea318c2a5..f38a3a0d66 100644 --- a/src/net/main_windows_test.go +++ b/src/net/main_windows_test.go @@ -4,37 +4,39 @@ package net +import "internal/poll" + var ( // Placeholders for saving original socket system calls. origSocket = socketFunc - origClosesocket = closeFunc + origClosesocket = poll.CloseFunc origConnect = connectFunc - origConnectEx = connectExFunc + origConnectEx = poll.ConnectExFunc origListen = listenFunc - origAccept = acceptFunc + origAccept = poll.AcceptFunc ) func installTestHooks() { socketFunc = sw.Socket - closeFunc = sw.Closesocket + poll.CloseFunc = sw.Closesocket connectFunc = sw.Connect - connectExFunc = sw.ConnectEx + poll.ConnectExFunc = sw.ConnectEx listenFunc = sw.Listen - acceptFunc = sw.AcceptEx + poll.AcceptFunc = sw.AcceptEx } func uninstallTestHooks() { socketFunc = origSocket - closeFunc = origClosesocket + poll.CloseFunc = origClosesocket connectFunc = origConnect - connectExFunc = origConnectEx + poll.ConnectExFunc = origConnectEx listenFunc = origListen - acceptFunc = origAccept + poll.AcceptFunc = origAccept } // forceCloseSockets must be called only from TestMain. func forceCloseSockets() { for s := range sw.Sockets() { - closeFunc(s) + poll.CloseFunc(s) } } diff --git a/src/net/net.go b/src/net/net.go index 81206ea1cb..9c27f1baf9 100644 --- a/src/net/net.go +++ b/src/net/net.go @@ -81,6 +81,7 @@ package net import ( "context" "errors" + "internal/poll" "io" "os" "syscall" @@ -234,7 +235,7 @@ func (c *conn) SetDeadline(t time.Time) error { if !c.ok() { return syscall.EINVAL } - if err := c.fd.setDeadline(t); err != nil { + if err := c.fd.pfd.SetDeadline(t); err != nil { return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err} } return nil @@ -245,7 +246,7 @@ func (c *conn) SetReadDeadline(t time.Time) error { if !c.ok() { return syscall.EINVAL } - if err := c.fd.setReadDeadline(t); err != nil { + if err := c.fd.pfd.SetReadDeadline(t); err != nil { return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err} } return nil @@ -256,7 +257,7 @@ func (c *conn) SetWriteDeadline(t time.Time) error { if !c.ok() { return syscall.EINVAL } - if err := c.fd.setWriteDeadline(t); err != nil { + if err := c.fd.pfd.SetWriteDeadline(t); err != nil { return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err} } return nil @@ -391,10 +392,8 @@ var ( errMissingAddress = errors.New("missing address") // For both read and write operations. - errTimeout error = &timeoutError{} - errCanceled = errors.New("operation was canceled") - errClosing = errors.New("use of closed network connection") - ErrWriteToConnected = errors.New("use of WriteTo with pre-connected connection") + errCanceled = errors.New("operation was canceled") + ErrWriteToConnected = errors.New("use of WriteTo with pre-connected connection") ) // mapErr maps from the context errors to the historical internal net @@ -407,7 +406,7 @@ func mapErr(err error) error { case context.Canceled: return errCanceled case context.DeadlineExceeded: - return errTimeout + return poll.ErrTimeout default: return err } @@ -502,12 +501,6 @@ func (e *OpError) Temporary() bool { return ok && t.Temporary() } -type timeoutError struct{} - -func (e *timeoutError) Error() string { return "i/o timeout" } -func (e *timeoutError) Timeout() bool { return true } -func (e *timeoutError) Temporary() bool { return true } - // A ParseError is the error type of literal network address parsers. type ParseError struct { // Type is the type of string that was expected, such as @@ -632,8 +625,6 @@ type buffersWriter interface { writeBuffers(*Buffers) (int64, error) } -var testHookDidWritev = func(wrote int) {} - // Buffers contains zero or more runs of bytes to write. // // On certain machines, for certain types of connections, this is diff --git a/src/net/sendfile_bsd.go b/src/net/sendfile_bsd.go index 67e80c9c6a..7a2b48c6cf 100644 --- a/src/net/sendfile_bsd.go +++ b/src/net/sendfile_bsd.go @@ -7,15 +7,11 @@ package net import ( + "internal/poll" "io" "os" - "syscall" ) -// maxSendfileSize is the largest chunk size we ask the kernel to copy -// at a time. -const maxSendfileSize int = 4 << 20 - // sendFile copies the contents of r to c using the sendfile // system call to minimize copies. // @@ -62,49 +58,10 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { return 0, err, false } - if err := c.writeLock(); err != nil { - return 0, err, true - } - defer c.writeUnlock() + written, err = poll.SendFile(&c.pfd, int(f.Fd()), pos, remain) - dst := c.sysfd - src := int(f.Fd()) - for remain > 0 { - n := maxSendfileSize - if int64(n) > remain { - n = int(remain) - } - pos1 := pos - n, err1 := syscall.Sendfile(dst, src, &pos1, n) - if n > 0 { - pos += int64(n) - written += int64(n) - remain -= int64(n) - } - if n == 0 && err1 == nil { - break - } - if err1 == syscall.EAGAIN { - if err1 = c.pd.waitWrite(); err1 == nil { - continue - } - } - if err1 == syscall.EINTR { - continue - } - if err1 != nil { - // This includes syscall.ENOSYS (no kernel - // support) and syscall.EINVAL (fd types which - // don't implement sendfile) - err = err1 - break - } - } if lr != nil { - lr.N = remain - } - if err != nil { - err = os.NewSyscallError("sendfile", err) + lr.N = remain - written } - return written, err, written > 0 + return written, wrapSyscallError("sendfile", err), written > 0 } diff --git a/src/net/sendfile_linux.go b/src/net/sendfile_linux.go index 7e741f9794..c537ea68b2 100644 --- a/src/net/sendfile_linux.go +++ b/src/net/sendfile_linux.go @@ -5,15 +5,11 @@ package net import ( + "internal/poll" "io" "os" - "syscall" ) -// maxSendfileSize is the largest chunk size we ask the kernel to copy -// at a time. -const maxSendfileSize int = 4 << 20 - // sendFile copies the contents of r to c using the sendfile // system call to minimize copies. // @@ -36,44 +32,10 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { return 0, nil, false } - if err := c.writeLock(); err != nil { - return 0, err, true - } - defer c.writeUnlock() + written, err = poll.SendFile(&c.pfd, int(f.Fd()), remain) - dst := c.sysfd - src := int(f.Fd()) - for remain > 0 { - n := maxSendfileSize - if int64(n) > remain { - n = int(remain) - } - n, err1 := syscall.Sendfile(dst, src, nil, n) - if n > 0 { - written += int64(n) - remain -= int64(n) - } - if n == 0 && err1 == nil { - break - } - if err1 == syscall.EAGAIN { - if err1 = c.pd.waitWrite(); err1 == nil { - continue - } - } - if err1 != nil { - // This includes syscall.ENOSYS (no kernel - // support) and syscall.EINVAL (fd types which - // don't implement sendfile) - err = err1 - break - } - } if lr != nil { - lr.N = remain - } - if err != nil { - err = os.NewSyscallError("sendfile", err) + lr.N = remain - written } - return written, err, written > 0 + return written, wrapSyscallError("sendfile", err), written > 0 } diff --git a/src/net/sendfile_solaris.go b/src/net/sendfile_solaris.go index add70c3147..63ca9d47b8 100644 --- a/src/net/sendfile_solaris.go +++ b/src/net/sendfile_solaris.go @@ -5,19 +5,11 @@ package net import ( + "internal/poll" "io" "os" - "syscall" ) -// Not strictly needed, but very helpful for debugging, see issue #10221. -//go:cgo_import_dynamic _ _ "libsendfile.so" -//go:cgo_import_dynamic _ _ "libsocket.so" - -// maxSendfileSize is the largest chunk size we ask the kernel to copy -// at a time. -const maxSendfileSize int = 4 << 20 - // sendFile copies the contents of r to c using the sendfile // system call to minimize copies. // @@ -62,56 +54,10 @@ func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) { return 0, err, false } - if err := c.writeLock(); err != nil { - return 0, err, true - } - defer c.writeUnlock() + written, err = poll.SendFile(&c.pfd, int(f.Fd()), pos, remain) - dst := c.sysfd - src := int(f.Fd()) - for remain > 0 { - n := maxSendfileSize - if int64(n) > remain { - n = int(remain) - } - pos1 := pos - n, err1 := syscall.Sendfile(dst, src, &pos1, n) - if err1 == syscall.EAGAIN || err1 == syscall.EINTR { - // partial write may have occurred - if n = int(pos1 - pos); n == 0 { - // nothing more to write - err1 = nil - } - } - if n > 0 { - pos += int64(n) - written += int64(n) - remain -= int64(n) - } - if n == 0 && err1 == nil { - break - } - if err1 == syscall.EAGAIN { - if err1 = c.pd.waitWrite(); err1 == nil { - continue - } - } - if err1 == syscall.EINTR { - continue - } - if err1 != nil { - // This includes syscall.ENOSYS (no kernel - // support) and syscall.EINVAL (fd types which - // don't implement sendfile) - err = err1 - break - } - } if lr != nil { - lr.N = remain - } - if err != nil { - err = os.NewSyscallError("sendfile", err) + lr.N = remain - written } - return written, err, written > 0 + return written, wrapSyscallError("sendfile", err), written > 0 } diff --git a/src/net/sendfile_windows.go b/src/net/sendfile_windows.go index bc0b7fb5b2..bccd8b149f 100644 --- a/src/net/sendfile_windows.go +++ b/src/net/sendfile_windows.go @@ -5,6 +5,7 @@ package net import ( + "internal/poll" "io" "os" "syscall" @@ -34,19 +35,10 @@ func sendFile(fd *netFD, r io.Reader) (written int64, err error, handled bool) { return 0, nil, false } - if err := fd.writeLock(); err != nil { - return 0, err, true - } - defer fd.writeUnlock() + done, err := poll.SendFile(&fd.pfd, syscall.Handle(f.Fd()), n) - o := &fd.wop - o.qty = uint32(n) - o.handle = syscall.Handle(f.Fd()) - done, err := wsrv.ExecIO(o, "TransmitFile", func(o *operation) error { - return syscall.TransmitFile(o.fd.sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND) - }) if err != nil { - return 0, os.NewSyscallError("transmitfile", err), false + return 0, wrapSyscallError("transmitfile", err), false } if lr != nil { lr.N -= int64(done) diff --git a/src/net/sock_cloexec.go b/src/net/sock_cloexec.go index 616a101eac..3f5be2d62c 100644 --- a/src/net/sock_cloexec.go +++ b/src/net/sock_cloexec.go @@ -10,6 +10,7 @@ package net import ( + "internal/poll" "os" "syscall" ) @@ -42,46 +43,8 @@ func sysSocket(family, sotype, proto int) (int, error) { return -1, os.NewSyscallError("socket", err) } if err = syscall.SetNonblock(s, true); err != nil { - closeFunc(s) + poll.CloseFunc(s) return -1, os.NewSyscallError("setnonblock", err) } return s, nil } - -// Wrapper around the accept system call that marks the returned file -// descriptor as nonblocking and close-on-exec. -func accept(s int) (int, syscall.Sockaddr, error) { - ns, sa, err := accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC) - // On Linux the accept4 system call was introduced in 2.6.28 - // kernel and on FreeBSD it was introduced in 10 kernel. If we - // get an ENOSYS error on both Linux and FreeBSD, or EINVAL - // error on Linux, fall back to using accept. - switch err { - case nil: - return ns, sa, nil - default: // errors other than the ones listed - return -1, sa, os.NewSyscallError("accept4", err) - case syscall.ENOSYS: // syscall missing - case syscall.EINVAL: // some Linux use this instead of ENOSYS - case syscall.EACCES: // some Linux use this instead of ENOSYS - case syscall.EFAULT: // some Linux use this instead of ENOSYS - } - - // See ../syscall/exec_unix.go for description of ForkLock. - // It is probably okay to hold the lock across syscall.Accept - // because we have put fd.sysfd into non-blocking mode. - // However, a call to the File method will put it back into - // blocking mode. We can't take that risk, so no use of ForkLock here. - ns, sa, err = acceptFunc(s) - if err == nil { - syscall.CloseOnExec(ns) - } - if err != nil { - return -1, nil, os.NewSyscallError("accept", err) - } - if err = syscall.SetNonblock(ns, true); err != nil { - closeFunc(ns) - return -1, nil, os.NewSyscallError("setnonblock", err) - } - return ns, sa, nil -} diff --git a/src/net/sock_posix.go b/src/net/sock_posix.go index 16351e1f14..8985f8f23f 100644 --- a/src/net/sock_posix.go +++ b/src/net/sock_posix.go @@ -8,6 +8,7 @@ package net import ( "context" + "internal/poll" "os" "syscall" ) @@ -43,11 +44,11 @@ func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only return nil, err } if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil { - closeFunc(s) + poll.CloseFunc(s) return nil, err } if fd, err = newFD(s, family, sotype, net); err != nil { - closeFunc(s) + poll.CloseFunc(s) return nil, err } @@ -127,7 +128,7 @@ func (fd *netFD) dial(ctx context.Context, laddr, raddr sockaddr) error { if lsa, err = laddr.sockaddr(fd.family); err != nil { return err } else if lsa != nil { - if err := syscall.Bind(fd.sysfd, lsa); err != nil { + if err := syscall.Bind(fd.pfd.Sysfd, lsa); err != nil { return os.NewSyscallError("bind", err) } } @@ -146,8 +147,8 @@ func (fd *netFD) dial(ctx context.Context, laddr, raddr sockaddr) error { return err } } - lsa, _ = syscall.Getsockname(fd.sysfd) - if rsa, _ = syscall.Getpeername(fd.sysfd); rsa != nil { + lsa, _ = syscall.Getsockname(fd.pfd.Sysfd) + if rsa, _ = syscall.Getpeername(fd.pfd.Sysfd); rsa != nil { fd.setAddr(fd.addrFunc()(lsa), fd.addrFunc()(rsa)) } else { fd.setAddr(fd.addrFunc()(lsa), raddr) @@ -156,23 +157,23 @@ func (fd *netFD) dial(ctx context.Context, laddr, raddr sockaddr) error { } func (fd *netFD) listenStream(laddr sockaddr, backlog int) error { - if err := setDefaultListenerSockopts(fd.sysfd); err != nil { + if err := setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil { return err } if lsa, err := laddr.sockaddr(fd.family); err != nil { return err } else if lsa != nil { - if err := syscall.Bind(fd.sysfd, lsa); err != nil { + if err := syscall.Bind(fd.pfd.Sysfd, lsa); err != nil { return os.NewSyscallError("bind", err) } } - if err := listenFunc(fd.sysfd, backlog); err != nil { + if err := listenFunc(fd.pfd.Sysfd, backlog); err != nil { return os.NewSyscallError("listen", err) } if err := fd.init(); err != nil { return err } - lsa, _ := syscall.Getsockname(fd.sysfd) + lsa, _ := syscall.Getsockname(fd.pfd.Sysfd) fd.setAddr(fd.addrFunc()(lsa), nil) return nil } @@ -188,7 +189,7 @@ func (fd *netFD) listenDatagram(laddr sockaddr) error { // multiple UDP listeners that listen on the same UDP // port to join the same group address. if addr.IP != nil && addr.IP.IsMulticast() { - if err := setDefaultMulticastSockopts(fd.sysfd); err != nil { + if err := setDefaultMulticastSockopts(fd.pfd.Sysfd); err != nil { return err } addr := *addr @@ -204,14 +205,14 @@ func (fd *netFD) listenDatagram(laddr sockaddr) error { if lsa, err := laddr.sockaddr(fd.family); err != nil { return err } else if lsa != nil { - if err := syscall.Bind(fd.sysfd, lsa); err != nil { + if err := syscall.Bind(fd.pfd.Sysfd, lsa); err != nil { return os.NewSyscallError("bind", err) } } if err := fd.init(); err != nil { return err } - lsa, _ := syscall.Getsockname(fd.sysfd) + lsa, _ := syscall.Getsockname(fd.pfd.Sysfd) fd.setAddr(fd.addrFunc()(lsa), nil) return nil } diff --git a/src/net/sockopt_posix.go b/src/net/sockopt_posix.go index cd3d562289..e8af84f418 100644 --- a/src/net/sockopt_posix.go +++ b/src/net/sockopt_posix.go @@ -7,7 +7,7 @@ package net import ( - "os" + "runtime" "syscall" ) @@ -101,27 +101,21 @@ done: } func setReadBuffer(fd *netFD, bytes int) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_RCVBUF, bytes)) + err := fd.pfd.SetsockoptInt(syscall.SOL_SOCKET, syscall.SO_RCVBUF, bytes) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } func setWriteBuffer(fd *netFD, bytes int) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_SNDBUF, bytes)) + err := fd.pfd.SetsockoptInt(syscall.SOL_SOCKET, syscall.SO_SNDBUF, bytes) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } func setKeepAlive(fd *netFD, keepalive bool) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, boolint(keepalive))) + err := fd.pfd.SetsockoptInt(syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, boolint(keepalive)) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } func setLinger(fd *netFD, sec int) error { @@ -133,9 +127,7 @@ func setLinger(fd *netFD, sec int) error { l.Onoff = 0 l.Linger = 0 } - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptLinger(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_LINGER, &l)) + err := fd.pfd.SetsockoptLinger(syscall.SOL_SOCKET, syscall.SO_LINGER, &l) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } diff --git a/src/net/sockoptip_bsd.go b/src/net/sockoptip_bsd.go index b15c6396ba..b11f3a4edb 100644 --- a/src/net/sockoptip_bsd.go +++ b/src/net/sockoptip_bsd.go @@ -7,28 +7,24 @@ package net import ( - "os" + "runtime" "syscall" ) func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error { ip, err := interfaceToIPv4Addr(ifi) if err != nil { - return os.NewSyscallError("setsockopt", err) + return wrapSyscallError("setsockopt", err) } var a [4]byte copy(a[:], ip.To4()) - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptInet4Addr(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_IF, a)) + err = fd.pfd.SetsockoptInet4Addr(syscall.IPPROTO_IP, syscall.IP_MULTICAST_IF, a) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } func setIPv4MulticastLoopback(fd *netFD, v bool) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptByte(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, byte(boolint(v)))) + err := fd.pfd.SetsockoptByte(syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, byte(boolint(v))) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } diff --git a/src/net/sockoptip_linux.go b/src/net/sockoptip_linux.go index c1dcc911c7..bd7d834425 100644 --- a/src/net/sockoptip_linux.go +++ b/src/net/sockoptip_linux.go @@ -5,7 +5,7 @@ package net import ( - "os" + "runtime" "syscall" ) @@ -15,17 +15,13 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error { v = int32(ifi.Index) } mreq := &syscall.IPMreqn{Ifindex: v} - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptIPMreqn(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_IF, mreq)) + err := fd.pfd.SetsockoptIPMreqn(syscall.IPPROTO_IP, syscall.IP_MULTICAST_IF, mreq) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } func setIPv4MulticastLoopback(fd *netFD, v bool) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, boolint(v))) + err := fd.pfd.SetsockoptInt(syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, boolint(v)) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } diff --git a/src/net/sockoptip_posix.go b/src/net/sockoptip_posix.go index d50886003e..4e10f2a6a5 100644 --- a/src/net/sockoptip_posix.go +++ b/src/net/sockoptip_posix.go @@ -7,7 +7,7 @@ package net import ( - "os" + "runtime" "syscall" ) @@ -16,11 +16,9 @@ func joinIPv4Group(fd *netFD, ifi *Interface, ip IP) error { if err := setIPv4MreqToInterface(mreq, ifi); err != nil { return err } - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptIPMreq(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_ADD_MEMBERSHIP, mreq)) + err := fd.pfd.SetsockoptIPMreq(syscall.IPPROTO_IP, syscall.IP_ADD_MEMBERSHIP, mreq) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } func setIPv6MulticastInterface(fd *netFD, ifi *Interface) error { @@ -28,19 +26,15 @@ func setIPv6MulticastInterface(fd *netFD, ifi *Interface) error { if ifi != nil { v = ifi.Index } - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_IPV6, syscall.IPV6_MULTICAST_IF, v)) + err := fd.pfd.SetsockoptInt(syscall.IPPROTO_IPV6, syscall.IPV6_MULTICAST_IF, v) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } func setIPv6MulticastLoopback(fd *netFD, v bool) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_IPV6, syscall.IPV6_MULTICAST_LOOP, boolint(v))) + err := fd.pfd.SetsockoptInt(syscall.IPPROTO_IPV6, syscall.IPV6_MULTICAST_LOOP, boolint(v)) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } func joinIPv6Group(fd *netFD, ifi *Interface, ip IP) error { @@ -49,9 +43,7 @@ func joinIPv6Group(fd *netFD, ifi *Interface, ip IP) error { if ifi != nil { mreq.Interface = uint32(ifi.Index) } - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptIPv6Mreq(fd.sysfd, syscall.IPPROTO_IPV6, syscall.IPV6_JOIN_GROUP, mreq)) + err := fd.pfd.SetsockoptIPv6Mreq(syscall.IPPROTO_IPV6, syscall.IPV6_JOIN_GROUP, mreq) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } diff --git a/src/net/sockoptip_windows.go b/src/net/sockoptip_windows.go index 916debebc6..62676039a3 100644 --- a/src/net/sockoptip_windows.go +++ b/src/net/sockoptip_windows.go @@ -6,6 +6,7 @@ package net import ( "os" + "runtime" "syscall" "unsafe" ) @@ -17,17 +18,13 @@ func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error { } var a [4]byte copy(a[:], ip.To4()) - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.Setsockopt(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_IF, (*byte)(unsafe.Pointer(&a[0])), 4)) + err = fd.pfd.Setsockopt(syscall.IPPROTO_IP, syscall.IP_MULTICAST_IF, (*byte)(unsafe.Pointer(&a[0])), 4) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } func setIPv4MulticastLoopback(fd *netFD, v bool) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, boolint(v))) + err := fd.pfd.SetsockoptInt(syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, boolint(v)) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } diff --git a/src/net/sys_cloexec.go b/src/net/sys_cloexec.go index ba266e6534..b7a842501e 100644 --- a/src/net/sys_cloexec.go +++ b/src/net/sys_cloexec.go @@ -10,6 +10,7 @@ package net import ( + "internal/poll" "os" "syscall" ) @@ -28,30 +29,8 @@ func sysSocket(family, sotype, proto int) (int, error) { return -1, os.NewSyscallError("socket", err) } if err = syscall.SetNonblock(s, true); err != nil { - closeFunc(s) + poll.CloseFunc(s) return -1, os.NewSyscallError("setnonblock", err) } return s, nil } - -// Wrapper around the accept system call that marks the returned file -// descriptor as nonblocking and close-on-exec. -func accept(s int) (int, syscall.Sockaddr, error) { - // See ../syscall/exec_unix.go for description of ForkLock. - // It is probably okay to hold the lock across syscall.Accept - // because we have put fd.sysfd into non-blocking mode. - // However, a call to the File method will put it back into - // blocking mode. We can't take that risk, so no use of ForkLock here. - ns, sa, err := acceptFunc(s) - if err == nil { - syscall.CloseOnExec(ns) - } - if err != nil { - return -1, nil, os.NewSyscallError("accept", err) - } - if err = syscall.SetNonblock(ns, true); err != nil { - closeFunc(ns) - return -1, nil, os.NewSyscallError("setnonblock", err) - } - return ns, sa, nil -} diff --git a/src/net/tcpsock.go b/src/net/tcpsock.go index 69731ebc91..a544a5b3c3 100644 --- a/src/net/tcpsock.go +++ b/src/net/tcpsock.go @@ -255,7 +255,7 @@ func (l *TCPListener) SetDeadline(t time.Time) error { if !l.ok() { return syscall.EINVAL } - if err := l.fd.setDeadline(t); err != nil { + if err := l.fd.pfd.SetDeadline(t); err != nil { return &OpError{Op: "set", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err} } return nil diff --git a/src/net/tcpsockopt_darwin.go b/src/net/tcpsockopt_darwin.go index 0d1310eaf9..7415c763c5 100644 --- a/src/net/tcpsockopt_darwin.go +++ b/src/net/tcpsockopt_darwin.go @@ -5,7 +5,7 @@ package net import ( - "os" + "runtime" "syscall" "time" ) @@ -13,17 +13,15 @@ import ( const sysTCP_KEEPINTVL = 0x101 func setKeepAlivePeriod(fd *netFD, d time.Duration) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() // The kernel expects seconds so round to next highest second. d += (time.Second - time.Nanosecond) secs := int(d.Seconds()) - switch err := syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_TCP, sysTCP_KEEPINTVL, secs); err { + switch err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, sysTCP_KEEPINTVL, secs); err { case nil, syscall.ENOPROTOOPT: // OS X 10.7 and earlier don't support this option default: - return os.NewSyscallError("setsockopt", err) + return wrapSyscallError("setsockopt", err) } - return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_TCP, syscall.TCP_KEEPALIVE, secs)) + err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_KEEPALIVE, secs) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } diff --git a/src/net/tcpsockopt_posix.go b/src/net/tcpsockopt_posix.go index 805b56b5c7..9cef434b6f 100644 --- a/src/net/tcpsockopt_posix.go +++ b/src/net/tcpsockopt_posix.go @@ -7,14 +7,12 @@ package net import ( - "os" + "runtime" "syscall" ) func setNoDelay(fd *netFD, noDelay bool) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() - return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY, boolint(noDelay))) + err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_NODELAY, boolint(noDelay)) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } diff --git a/src/net/tcpsockopt_solaris.go b/src/net/tcpsockopt_solaris.go index 76285e5d2e..019fe349eb 100644 --- a/src/net/tcpsockopt_solaris.go +++ b/src/net/tcpsockopt_solaris.go @@ -5,16 +5,12 @@ package net import ( - "os" + "runtime" "syscall" "time" ) func setKeepAlivePeriod(fd *netFD, d time.Duration) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() // The kernel expects milliseconds so round to next highest // millisecond. d += (time.Millisecond - time.Nanosecond) @@ -31,5 +27,7 @@ func setKeepAlivePeriod(fd *netFD, d time.Duration) error { // allocate a constant with a different meaning for the value of // TCP_KEEPINTVL on illumos. - return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_TCP, syscall.TCP_KEEPALIVE_THRESHOLD, msecs)) + err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_KEEPALIVE_THRESHOLD, msecs) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } diff --git a/src/net/tcpsockopt_unix.go b/src/net/tcpsockopt_unix.go index 8d44fb2095..c1df6605be 100644 --- a/src/net/tcpsockopt_unix.go +++ b/src/net/tcpsockopt_unix.go @@ -7,21 +7,19 @@ package net import ( - "os" + "runtime" "syscall" "time" ) func setKeepAlivePeriod(fd *netFD, d time.Duration) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() // The kernel expects seconds so round to next highest second. d += (time.Second - time.Nanosecond) secs := int(d.Seconds()) - if err := syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, secs); err != nil { - return os.NewSyscallError("setsockopt", err) + if err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, secs); err != nil { + return wrapSyscallError("setsockopt", err) } - return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, secs)) + err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, secs) + runtime.KeepAlive(fd) + return wrapSyscallError("setsockopt", err) } diff --git a/src/net/tcpsockopt_windows.go b/src/net/tcpsockopt_windows.go index 45a4dca525..73dead11d0 100644 --- a/src/net/tcpsockopt_windows.go +++ b/src/net/tcpsockopt_windows.go @@ -6,16 +6,13 @@ package net import ( "os" + "runtime" "syscall" "time" "unsafe" ) func setKeepAlivePeriod(fd *netFD, d time.Duration) error { - if err := fd.incref(); err != nil { - return err - } - defer fd.decref() // The kernel expects milliseconds so round to next highest // millisecond. d += (time.Millisecond - time.Nanosecond) @@ -27,6 +24,7 @@ func setKeepAlivePeriod(fd *netFD, d time.Duration) error { } ret := uint32(0) size := uint32(unsafe.Sizeof(ka)) - err := syscall.WSAIoctl(fd.sysfd, syscall.SIO_KEEPALIVE_VALS, (*byte)(unsafe.Pointer(&ka)), size, nil, 0, &ret, nil, 0) + err := fd.pfd.WSAIoctl(syscall.SIO_KEEPALIVE_VALS, (*byte)(unsafe.Pointer(&ka)), size, nil, 0, &ret, nil, 0) + runtime.KeepAlive(fd) return os.NewSyscallError("wsaioctl", err) } diff --git a/src/net/timeout_test.go b/src/net/timeout_test.go index 55bbf4402d..9de7801ad1 100644 --- a/src/net/timeout_test.go +++ b/src/net/timeout_test.go @@ -6,6 +6,7 @@ package net import ( "fmt" + "internal/poll" "internal/testenv" "io" "io/ioutil" @@ -145,9 +146,9 @@ var acceptTimeoutTests = []struct { }{ // Tests that accept deadlines in the past work, even if // there's incoming connections available. - {-5 * time.Second, [2]error{errTimeout, errTimeout}}, + {-5 * time.Second, [2]error{poll.ErrTimeout, poll.ErrTimeout}}, - {50 * time.Millisecond, [2]error{nil, errTimeout}}, + {50 * time.Millisecond, [2]error{nil, poll.ErrTimeout}}, } func TestAcceptTimeout(t *testing.T) { @@ -299,9 +300,9 @@ var readTimeoutTests = []struct { }{ // Tests that read deadlines work, even if there's data ready // to be read. - {-5 * time.Second, [2]error{errTimeout, errTimeout}}, + {-5 * time.Second, [2]error{poll.ErrTimeout, poll.ErrTimeout}}, - {50 * time.Millisecond, [2]error{nil, errTimeout}}, + {50 * time.Millisecond, [2]error{nil, poll.ErrTimeout}}, } func TestReadTimeout(t *testing.T) { @@ -423,9 +424,9 @@ var readFromTimeoutTests = []struct { }{ // Tests that read deadlines work, even if there's data ready // to be read. - {-5 * time.Second, [2]error{errTimeout, errTimeout}}, + {-5 * time.Second, [2]error{poll.ErrTimeout, poll.ErrTimeout}}, - {50 * time.Millisecond, [2]error{nil, errTimeout}}, + {50 * time.Millisecond, [2]error{nil, poll.ErrTimeout}}, } func TestReadFromTimeout(t *testing.T) { @@ -496,9 +497,9 @@ var writeTimeoutTests = []struct { }{ // Tests that write deadlines work, even if there's buffer // space available to write. - {-5 * time.Second, [2]error{errTimeout, errTimeout}}, + {-5 * time.Second, [2]error{poll.ErrTimeout, poll.ErrTimeout}}, - {10 * time.Millisecond, [2]error{nil, errTimeout}}, + {10 * time.Millisecond, [2]error{nil, poll.ErrTimeout}}, } func TestWriteTimeout(t *testing.T) { @@ -610,9 +611,9 @@ var writeToTimeoutTests = []struct { }{ // Tests that write deadlines work, even if there's buffer // space available to write. - {-5 * time.Second, [2]error{errTimeout, errTimeout}}, + {-5 * time.Second, [2]error{poll.ErrTimeout, poll.ErrTimeout}}, - {10 * time.Millisecond, [2]error{nil, errTimeout}}, + {10 * time.Millisecond, [2]error{nil, poll.ErrTimeout}}, } func TestWriteToTimeout(t *testing.T) { diff --git a/src/net/unixsock.go b/src/net/unixsock.go index b25d492f59..d29514e47b 100644 --- a/src/net/unixsock.go +++ b/src/net/unixsock.go @@ -264,7 +264,7 @@ func (l *UnixListener) SetDeadline(t time.Time) error { if !l.ok() { return syscall.EINVAL } - if err := l.fd.setDeadline(t); err != nil { + if err := l.fd.pfd.SetDeadline(t); err != nil { return &OpError{Op: "set", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err} } return nil diff --git a/src/net/writev_test.go b/src/net/writev_test.go index 7160d28c3a..4c05be473d 100644 --- a/src/net/writev_test.go +++ b/src/net/writev_test.go @@ -7,6 +7,7 @@ package net import ( "bytes" "fmt" + "internal/poll" "io" "io/ioutil" "reflect" @@ -99,13 +100,13 @@ func TestBuffers_WriteTo(t *testing.T) { } func testBuffer_writeTo(t *testing.T, chunks int, useCopy bool) { - oldHook := testHookDidWritev - defer func() { testHookDidWritev = oldHook }() + oldHook := poll.TestHookDidWritev + defer func() { poll.TestHookDidWritev = oldHook }() var writeLog struct { sync.Mutex log []int } - testHookDidWritev = func(size int) { + poll.TestHookDidWritev = func(size int) { writeLog.Lock() writeLog.log = append(writeLog.log, size) writeLog.Unlock() diff --git a/src/net/writev_unix.go b/src/net/writev_unix.go index 174e6bc51e..bf0fbf8a13 100644 --- a/src/net/writev_unix.go +++ b/src/net/writev_unix.go @@ -7,10 +7,8 @@ package net import ( - "io" - "os" + "runtime" "syscall" - "unsafe" ) func (c *conn) writeBuffers(v *Buffers) (int64, error) { @@ -25,71 +23,7 @@ func (c *conn) writeBuffers(v *Buffers) (int64, error) { } func (fd *netFD) writeBuffers(v *Buffers) (n int64, err error) { - if err := fd.writeLock(); err != nil { - return 0, err - } - defer fd.writeUnlock() - if err := fd.pd.prepareWrite(); err != nil { - return 0, err - } - - var iovecs []syscall.Iovec - if fd.iovecs != nil { - iovecs = *fd.iovecs - } - // TODO: read from sysconf(_SC_IOV_MAX)? The Linux default is - // 1024 and this seems conservative enough for now. Darwin's - // UIO_MAXIOV also seems to be 1024. - maxVec := 1024 - - for len(*v) > 0 { - iovecs = iovecs[:0] - for _, chunk := range *v { - if len(chunk) == 0 { - continue - } - iovecs = append(iovecs, syscall.Iovec{Base: &chunk[0]}) - if fd.isStream && len(chunk) > 1<<30 { - iovecs[len(iovecs)-1].SetLen(1 << 30) - break // continue chunk on next writev - } - iovecs[len(iovecs)-1].SetLen(len(chunk)) - if len(iovecs) == maxVec { - break - } - } - if len(iovecs) == 0 { - break - } - fd.iovecs = &iovecs // cache - - wrote, _, e0 := syscall.Syscall(syscall.SYS_WRITEV, - uintptr(fd.sysfd), - uintptr(unsafe.Pointer(&iovecs[0])), - uintptr(len(iovecs))) - if wrote == ^uintptr(0) { - wrote = 0 - } - testHookDidWritev(int(wrote)) - n += int64(wrote) - v.consume(int64(wrote)) - if e0 == syscall.EAGAIN { - if err = fd.pd.waitWrite(); err == nil { - continue - } - } else if e0 != 0 { - err = syscall.Errno(e0) - } - if err != nil { - break - } - if n == 0 { - err = io.ErrUnexpectedEOF - break - } - } - if _, ok := err.(syscall.Errno); ok { - err = os.NewSyscallError("writev", err) - } - return n, err + n, err = fd.pfd.Writev((*[][]byte)(v)) + runtime.KeepAlive(fd) + return n, wrapSyscallError("writev", err) } |
