diff options
Diffstat (limited to 'src/internal')
| -rw-r--r-- | src/internal/poll/fd_windows.go | 132 |
1 files changed, 59 insertions, 73 deletions
diff --git a/src/internal/poll/fd_windows.go b/src/internal/poll/fd_windows.go index a6ecdafc34..6443f6eb30 100644 --- a/src/internal/poll/fd_windows.go +++ b/src/internal/poll/fd_windows.go @@ -78,22 +78,6 @@ type operation struct { mode int32 } -func (o *operation) setEvent() { - h, err := windows.CreateEvent(nil, 0, 0, nil) - if err != nil { - // This shouldn't happen when all CreateEvent arguments are zero. - panic(err) - } - // Set the low bit so that the external IOCP doesn't receive the completion packet. - o.o.HEvent = h | 1 -} - -func (o *operation) close() { - if o.o.HEvent != 0 { - syscall.CloseHandle(o.o.HEvent) - } -} - func (fd *FD) overlapped(o *operation) *syscall.Overlapped { if fd.isBlocking { // Don't return the overlapped object if the file handle @@ -208,6 +192,12 @@ var wsaRsaPool = sync.Pool{ }, } +var operationPool = sync.Pool{ + New: func() any { + return new(operation) + }, +} + // waitIO waits for the IO operation o to complete. func (fd *FD) waitIO(o *operation) error { if fd.isBlocking { @@ -246,27 +236,57 @@ func (fd *FD) cancelIO(o *operation) { fd.pd.waitCanceled(int(o.mode)) } +// pin pins ptr for the duration of the IO operation. +// If fd is in blocking mode, pin does nothing. +func (fd *FD) pin(mode int, ptr any) { + if fd.isBlocking { + return + } + if mode == 'r' { + fd.readPinner.Pin(ptr) + } else { + fd.writePinner.Pin(ptr) + } +} + // execIO executes a single IO operation o. // It supports both synchronous and asynchronous IO. -// o.qty and o.flags are set to zero before calling submit -// to avoid reusing the values from a previous call. func (fd *FD) execIO(mode int, submit func(o *operation) (uint32, error)) (int, error) { + if mode == 'r' { + defer fd.readPinner.Unpin() + } else { + defer fd.writePinner.Unpin() + } // Notify runtime netpoll about starting IO. err := fd.pd.prepare(mode, fd.isFile) if err != nil { return 0, err } - o := &fd.rop - if mode == 'w' { - o = &fd.wop + o := operationPool.Get().(*operation) + defer operationPool.Put(o) + *o = operation{ + o: syscall.Overlapped{ + OffsetHigh: uint32(fd.offset >> 32), + Offset: uint32(fd.offset), + }, + runtimeCtx: fd.pd.runtimeCtx, + mode: int32(mode), } // Start IO. - if !fd.isBlocking && o.o.HEvent == 0 && !fd.pollable() { + if !fd.isBlocking && !fd.pollable() { // If the handle is opened for overlapped IO but we can't // use the runtime poller, then we need to use an // event to wait for the IO to complete. - o.setEvent() + h, err := windows.CreateEvent(nil, 0, 0, nil) + if err != nil { + // This shouldn't happen when all CreateEvent arguments are zero. + panic(err) + } + // Set the low bit so that the external IOCP doesn't receive the completion packet. + o.o.HEvent = h | 1 + defer syscall.CloseHandle(h) } + fd.pin(mode, o) qty, err := submit(o) var waitErr error // Blocking operations shouldn't return ERROR_IO_PENDING. @@ -321,11 +341,6 @@ type FD struct { // System file descriptor. Immutable until Close. Sysfd syscall.Handle - // Read operation. - rop operation - // Write operation. - wop operation - // I/O poller. pd pollDesc @@ -364,6 +379,8 @@ type FD struct { disassociated atomic.Bool + // readPinner and writePinner are automatically unpinned + // before execIO returns. readPinner runtime.Pinner writePinner runtime.Pinner } @@ -383,8 +400,6 @@ type FD struct { // using an external mechanism. func (fd *FD) setOffset(off int64) { fd.offset = off - fd.rop.o.OffsetHigh, fd.rop.o.Offset = uint32(off>>32), uint32(off) - fd.wop.o.OffsetHigh, fd.wop.o.Offset = uint32(off>>32), uint32(off) } // addOffset adds the given offset to the current offset. @@ -435,8 +450,6 @@ func (fd *FD) Init(net string, pollable bool) error { } fd.isFile = fd.kind != kindNet fd.isBlocking = !pollable - fd.rop.mode = 'r' - fd.wop.mode = 'w' // It is safe to add overlapped handles that also perform I/O // outside of the runtime poller. The runtime poller will ignore @@ -445,8 +458,6 @@ func (fd *FD) Init(net string, pollable bool) error { if err != nil { return err } - fd.rop.runtimeCtx = fd.pd.runtimeCtx - fd.wop.runtimeCtx = fd.pd.runtimeCtx if fd.kind != kindNet || socketCanUseSetFileCompletionNotificationModes { // Non-socket handles can use SetFileCompletionNotificationModes without problems. err := syscall.SetFileCompletionNotificationModes(fd.Sysfd, @@ -485,8 +496,6 @@ func (fd *FD) destroy() error { if fd.Sysfd == syscall.InvalidHandle { return syscall.EINVAL } - fd.rop.close() - fd.wop.close() // Poller may want to unregister fd in readiness notification mechanism, // so this must be executed before fd.CloseFunc. fd.pd.close() @@ -541,9 +550,8 @@ func (fd *FD) Read(buf []byte) (int, error) { defer fd.readUnlock() } - if len(buf) > 0 && !fd.isBlocking { - fd.readPinner.Pin(&buf[0]) - defer fd.readPinner.Unpin() + if len(buf) > 0 { + fd.pin('r', &buf[0]) } if len(buf) > maxRW { @@ -672,9 +680,8 @@ func (fd *FD) Pread(buf []byte, off int64) (int, error) { } defer fd.readWriteUnlock() - if len(buf) > 0 && !fd.isBlocking { - fd.readPinner.Pin(&buf[0]) - defer fd.readPinner.Unpin() + if len(buf) > 0 { + fd.pin('r', &buf[0]) } if len(buf) > maxRW { @@ -724,10 +731,7 @@ func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) { } defer fd.readUnlock() - if !fd.isBlocking { - fd.readPinner.Pin(&buf[0]) - defer fd.readPinner.Unpin() - } + fd.pin('r', &buf[0]) rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny) defer wsaRsaPool.Put(rsa) @@ -758,10 +762,7 @@ func (fd *FD) ReadFromInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error) } defer fd.readUnlock() - if !fd.isBlocking { - fd.readPinner.Pin(&buf[0]) - defer fd.readPinner.Unpin() - } + fd.pin('r', &buf[0]) rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny) defer wsaRsaPool.Put(rsa) @@ -792,10 +793,7 @@ func (fd *FD) ReadFromInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error) } defer fd.readUnlock() - if !fd.isBlocking { - fd.readPinner.Pin(&buf[0]) - defer fd.readPinner.Unpin() - } + fd.pin('r', &buf[0]) rsa := wsaRsaPool.Get().(*syscall.RawSockaddrAny) defer wsaRsaPool.Put(rsa) @@ -827,11 +825,9 @@ func (fd *FD) Write(buf []byte) (int, error) { defer fd.writeUnlock() } - if len(buf) > 0 && !fd.isBlocking { - fd.writePinner.Pin(&buf[0]) - defer fd.writePinner.Unpin() + if len(buf) > 0 { + fd.pin('w', &buf[0]) } - var ntotal int for { max := len(buf) @@ -924,9 +920,8 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) { } defer fd.readWriteUnlock() - if len(buf) > 0 && !fd.isBlocking { - fd.writePinner.Pin(&buf[0]) - defer fd.writePinner.Unpin() + if len(buf) > 0 { + fd.pin('w', &buf[0]) } if fd.isBlocking { @@ -1008,10 +1003,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { return n, err } - if !fd.isBlocking { - fd.writePinner.Pin(&buf[0]) - defer fd.writePinner.Unpin() - } + fd.pin('w', &buf[0]) ntotal := 0 for len(buf) > 0 { @@ -1048,10 +1040,7 @@ func (fd *FD) WriteToInet4(buf []byte, sa4 *syscall.SockaddrInet4) (int, error) return n, err } - if !fd.isBlocking { - fd.writePinner.Pin(&buf[0]) - defer fd.writePinner.Unpin() - } + fd.pin('w', &buf[0]) ntotal := 0 for len(buf) > 0 { @@ -1088,10 +1077,7 @@ func (fd *FD) WriteToInet6(buf []byte, sa6 *syscall.SockaddrInet6) (int, error) return n, err } - if !fd.isBlocking { - fd.writePinner.Pin(&buf[0]) - defer fd.writePinner.Unpin() - } + fd.pin('w', &buf[0]) ntotal := 0 for len(buf) > 0 { |
