aboutsummaryrefslogtreecommitdiff
path: root/src/internal
diff options
context:
space:
mode:
Diffstat (limited to 'src/internal')
-rw-r--r--src/internal/poll/fd_windows.go132
1 files changed, 59 insertions, 73 deletions
diff --git a/src/internal/poll/fd_windows.go b/src/internal/poll/fd_windows.go
index a6ecdafc34..6443f6eb30 100644
--- a/src/internal/poll/fd_windows.go
+++ b/src/internal/poll/fd_windows.go
@@ -78,22 +78,6 @@ type operation struct {
mode int32
}
-func (o *operation) setEvent() {
- h, err := windows.CreateEvent(nil, 0, 0, nil)
- if err != nil {
- // This shouldn't happen when all CreateEvent arguments are zero.
- panic(err)
- }
- // Set the low bit so that the external IOCP doesn't receive the completion packet.
- o.o.HEvent = h | 1
-}
-
-func (o *operation) close() {
- if o.o.HEvent != 0 {
- syscall.CloseHandle(o.o.HEvent)
- }
-}
-
func (fd *FD) overlapped(o *operation) *syscall.Overlapped {
if fd.isBlocking {
// Don't return the overlapped object if the file handle
@@ -208,6 +192,12 @@ var wsaRsaPool = sync.Pool{
},
}
+var operationPool = sync.Pool{
+ New: func() any {
+ return new(operation)
+ },
+}
+
// waitIO waits for the IO operation o to complete.
func (fd *FD) waitIO(o *operation) error {
if fd.isBlocking {
@@ -246,27 +236,57 @@ func (fd *FD) cancelIO(o *operation) {
fd.pd.waitCanceled(int(o.mode))
}
+// pin pins ptr for the duration of the IO operation.
+// If fd is in blocking mode, pin does nothing.
+func (fd *FD) pin(mode int, ptr any) {
+ if fd.isBlocking {
+ return
+ }
+ if mode == 'r' {
+ fd.readPinner.Pin(ptr)
+ } else {
+ fd.writePinner.Pin(ptr)
+ }
+}
+
// execIO executes a single IO operation o.
// It supports both synchronous and asynchronous IO.
-// o.qty and o.flags are set to zero before calling submit
-// to avoid reusing the values from a previous call.
func (fd *FD) execIO(mode int, submit func(o *operation) (uint32, error)) (int, error) {
+ if mode == 'r' {
+ defer fd.readPinner.Unpin()
+ } else {
+ defer fd.writePinner.Unpin()
+ }
// Notify runtime netpoll about starting IO.
err := fd.pd.prepare(mode, fd.isFile)
if err != nil {
return 0, err
}
- o := &fd.rop
- if mode == 'w' {
- o = &fd.wop
+ o := operationPool.Get().(*operation)
+ defer operationPool.Put(o)
+ *o = operation{
+ o: syscall.Overlapped{
+ OffsetHigh: uint32(fd.offset >> 32),
+ Offset: uint32(fd.offset),
+ },
+ runtimeCtx: fd.pd.runtimeCtx,
+ mode: int32(mode),
}
// Start IO.
- if !fd.isBlocking && o.o.HEvent == 0 && !fd.pollable() {
+ if !fd.isBlocking && !fd.pollable() {
// If the handle is opened for overlapped IO but we can't
// use the runtime poller, then we need to use an
// event to wait for the IO to complete.
- o.setEvent()
+ h, err := windows.CreateEvent(nil, 0, 0, nil)
+ if err != nil {
+ // This shouldn't happen when all CreateEvent arguments are zero.
+ panic(err)
+ }
+ // Set the low bit so that the external IOCP doesn't receive the completion packet.
+ o.o.HEvent = h | 1
+ defer syscall.CloseHandle(h)
}
+ fd.pin(mode, o)
qty, err := submit(o)
var waitErr error
// Blocking operations shouldn't return ERROR_IO_PENDING.
@@ -321,11 +341,6 @@ type FD struct {
// System file descriptor. Immutable until Close.
Sysfd syscall.Handle
- // Read operation.
- rop operation
- // Write operation.
- wop operation
-
// I/O poller.
pd pollDesc
@@ -364,6 +379,8 @@ type FD struct {
disassociated atomic.Bool
+ // readPinner and writePinner are automatically unpinned
+ // before execIO returns.
readPinner runtime.Pinner
writePinner runtime.Pinner
}
@@ -383,8 +400,6 @@ type FD struct {
// using an external mechanism.
func (fd *FD) setOffset(off int64) {
fd.offset = off
- fd.rop.o.OffsetHigh, fd.rop.o.Offset = uint32(off>>32), uint32(off)
- fd.wop.o.OffsetHigh, fd.wop.o.Offset = uint32(off>>32), uint32(off)
}
// addOffset adds the given offset to the current offset.
@@ -435,8 +450,6 @@ func (fd *FD) Init(net string, pollable bool) error {
}
fd.isFile = fd.kind != kindNet
fd.isBlocking = !pollable
- fd.rop.mode = 'r'
- fd.wop.mode = 'w'
// It is safe to add overlapped handles that also perform I/O
// outside of the runtime poller. The runtime poller will ignore
@@ -445,8 +458,6 @@ func (fd *FD) Init(net string, pollable bool) error {
if err != nil {
return err
}
- fd.rop.runtimeCtx = fd.pd.runtimeCtx
- fd.wop.runtimeCtx = fd.pd.runtimeCtx
if fd.kind != kindNet || socketCanUseSetFileCompletionNotificationModes {
// Non-socket handles can use SetFileCompletionNotificationModes without problems.
err := syscall.SetFileCompletionNotificationModes(fd.Sysfd,
@@ -485,8 +496,6 @@ func (fd *FD) destroy() error {
if fd.Sysfd == syscall.InvalidHandle {
return syscall.EINVAL
}
- fd.rop.close()
- fd.wop.close()
// Poller may want to unregister fd in readiness notification mechanism,
// so this must be executed before fd.CloseFunc.
fd.pd.close()
@@ -541,9 +550,8 @@ func (fd *FD) Read(buf []byte) (int, error) {
defer fd.readUnlock()
}
- if len(buf) > 0 && !fd.isBlocking {
- fd.readPinner.Pin(&buf[0])
- defer fd.readPinner.Unpin()
+ if len(buf) > 0 {
+ fd.pin('r', &buf[0])
}
if len(buf) > maxRW {
@@ -672,9 +680,8 @@ func (fd *FD) Pread(buf []byte, off int64) (int, error) {
}
defer fd.readWriteUnlock()
- if len(buf) > 0 && !fd.isBlocking {
- fd.readPinner.Pin(&buf[0])
- defer fd.readPinner.Unpin()
+ if len(buf) > 0 {
+ fd.pin('r', &buf[0])
}
if len(buf) > maxRW {
@@ -724,10 +731,7 @@ func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) {
}
defer fd.readUnlock()
- if !fd.isBlocking {
- fd.readPinner.Pin(&buf[0])
- defer fd.readPinner.Unpin()
- }
+ fd.pin('r', &buf[0])
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
@@ -758,10 +762,7 @@ func (fd *FD) ReadFromInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
}
defer fd.readUnlock()
- if !fd.isBlocking {
- fd.readPinner.Pin(&buf[0])
- defer fd.readPinner.Unpin()
- }
+ fd.pin('r', &buf[0])
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
@@ -792,10 +793,7 @@ func (fd *FD) ReadFromInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
}
defer fd.readUnlock()
- if !fd.isBlocking {
- fd.readPinner.Pin(&buf[0])
- defer fd.readPinner.Unpin()
- }
+ fd.pin('r', &buf[0])
rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny)
defer wsaRsaPool.Put(rsa)
@@ -827,11 +825,9 @@ func (fd *FD) Write(buf []byte) (int, error) {
defer fd.writeUnlock()
}
- if len(buf) > 0 && !fd.isBlocking {
- fd.writePinner.Pin(&buf[0])
- defer fd.writePinner.Unpin()
+ if len(buf) > 0 {
+ fd.pin('w', &buf[0])
}
-
var ntotal int
for {
max := len(buf)
@@ -924,9 +920,8 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) {
}
defer fd.readWriteUnlock()
- if len(buf) > 0 && !fd.isBlocking {
- fd.writePinner.Pin(&buf[0])
- defer fd.writePinner.Unpin()
+ if len(buf) > 0 {
+ fd.pin('w', &buf[0])
}
if fd.isBlocking {
@@ -1008,10 +1003,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
return n, err
}
- if !fd.isBlocking {
- fd.writePinner.Pin(&buf[0])
- defer fd.writePinner.Unpin()
- }
+ fd.pin('w', &buf[0])
ntotal := 0
for len(buf) > 0 {
@@ -1048,10 +1040,7 @@ func (fd *FD) WriteToInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error)
return n, err
}
- if !fd.isBlocking {
- fd.writePinner.Pin(&buf[0])
- defer fd.writePinner.Unpin()
- }
+ fd.pin('w', &buf[0])
ntotal := 0
for len(buf) > 0 {
@@ -1088,10 +1077,7 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error)
return n, err
}
- if !fd.isBlocking {
- fd.writePinner.Pin(&buf[0])
- defer fd.writePinner.Unpin()
- }
+ fd.pin('w', &buf[0])
ntotal := 0
for len(buf) > 0 {