diff options
| -rw-r--r-- | src/internal/poll/fd_windows.go | 36 | ||||
| -rw-r--r-- | src/os/os_windows_test.go | 90 |
2 files changed, 102 insertions, 24 deletions
diff --git a/src/internal/poll/fd_windows.go b/src/internal/poll/fd_windows.go index 99891de763..469d078fa3 100644 --- a/src/internal/poll/fd_windows.go +++ b/src/internal/poll/fd_windows.go @@ -68,7 +68,7 @@ var InitWSA = sync.OnceFunc(func() { // operation contains superset of data necessary to perform all async IO. type operation struct { // Used by IOCP interface, it must be first field - // of the struct, as our code rely on it. + // of the struct, as our code relies on it. o syscall.Overlapped // fields used by runtime.netpoll @@ -88,6 +88,16 @@ type operation struct { bufs []syscall.WSABuf } +func (o *operation) setEvent() { + h, err := windows.CreateEvent(nil, 0, 0, nil) + if err != nil { + // This shouldn't happen when all CreateEvent arguments are zero. + panic(err) + } + // Set the low bit so that the external IOCP doesn't receive the completion packet. + o.o.HEvent = h | 1 +} + func (o *operation) overlapped() *syscall.Overlapped { if o.fd.isBlocking { // Don't return the overlapped object if the file handle @@ -155,11 +165,15 @@ func (o *operation) InitMsg(p []byte, oob []byte) { // waitIO waits for the IO operation o to complete. func waitIO(o *operation) error { + if o.fd.isBlocking { + panic("can't wait on blocking operations") + } fd := o.fd if !fd.pd.pollable() { // The overlapped handle is not added to the runtime poller, - // the only way to wait for the IO to complete is block. - _, err := syscall.WaitForSingleObject(fd.Sysfd, syscall.INFINITE) + // the only way to wait for the IO to complete is block until + // the overlapped event is signaled. + _, err := syscall.WaitForSingleObject(o.o.HEvent, syscall.INFINITE) return err } // Wait for our request to complete. @@ -202,11 +216,19 @@ func execIO(o *operation, submit func(o *operation) error) (int, error) { return 0, err } // Start IO. + if !fd.isBlocking && o.o.HEvent == 0 && !fd.pd.pollable() { + // If the handle is opened for overlapped IO but we can't + // use the runtime poller, then we need to use an + // event to wait for the IO to complete. + o.setEvent() + } o.qty = 0 o.flags = 0 err = submit(o) var waitErr error - if err == syscall.ERROR_IO_PENDING || (err == nil && !o.fd.skipSyncNotif) { + // Blocking operations shouldn't return ERROR_IO_PENDING. + // Continue without waiting if that happens. + if !o.fd.isBlocking && (err == syscall.ERROR_IO_PENDING || (err == nil && !o.fd.skipSyncNotif)) { // IO started asynchronously or completed synchronously but // a sync notification is required. Wait for it to complete. waitErr = waitIO(o) @@ -345,11 +367,6 @@ func (fd *FD) initIO() error { // so it is safe to add handles owned by the caller. fd.initIOErr = fd.pd.init(fd) if fd.initIOErr != nil { - // This can happen if the handle is already associated - // with another IOCP or if the isBlocking flag is incorrect. - // In both cases, fallback to synchronous IO. - fd.isBlocking = true - fd.skipSyncNotif = true return } fd.rop.runtimeCtx = fd.pd.runtimeCtx @@ -389,7 +406,6 @@ func (fd *FD) Init(net string, pollable bool) error { } fd.isFile = fd.kind != kindNet fd.isBlocking = !pollable - fd.skipSyncNotif = fd.isBlocking fd.rop.mode = 'r' fd.wop.mode = 'w' fd.rop.fd = fd diff --git a/src/os/os_windows_test.go b/src/os/os_windows_test.go index 5fbf987291..15f1b616e6 100644 --- a/src/os/os_windows_test.go +++ b/src/os/os_windows_test.go @@ -1984,31 +1984,93 @@ func TestPipeCanceled(t *testing.T) { } } -func TestPipeExternalIOCP(t *testing.T) { +func iocpAssociateFile(f *os.File, iocp syscall.Handle) error { + sc, err := f.SyscallConn() + if err != nil { + return err + } + var syserr error + err = sc.Control(func(fd uintptr) { + if _, err = windows.CreateIoCompletionPort(syscall.Handle(fd), iocp, 0, 0); err != nil { + syserr = err + } + }) + if err == nil { + err = syserr + } + return err +} + +func TestFileAssociatedWithExternalIOCP(t *testing.T) { // Test that a caller can associate an overlapped handle to an external IOCP - // even when the handle is also associated to a poll.FD. Also test that - // the FD can still perform I/O after the association. + // after the handle has been passed to os.NewFile. + // Also test that the File can perform I/O after it is associated with the + // external IOCP and that those operations do not post to the external IOCP. t.Parallel() name := pipeName() pipe := newMessagePipe(t, name, true) - _ = newFileOverlapped(t, name, true) // Just open a pipe client + _ = newFileOverlapped(t, name, true) // just open a pipe client + + // Use a file to exercise WriteAt. + file := newFileOverlapped(t, filepath.Join(t.TempDir(), "a"), true) - sc, err := pipe.SyscallConn() + iocp, err := windows.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 0) if err != nil { - t.Error(err) - return + t.Fatal(err) } - if err := sc.Control(func(fd uintptr) { - _, err := windows.CreateIoCompletionPort(syscall.Handle(fd), 0, 0, 1) - if err != nil { + defer func() { + if iocp == syscall.InvalidHandle { + // Already closed at the end of the test. + return + } + if err := syscall.CloseHandle(iocp); err != nil { t.Fatal(err) } - }); err != nil { - t.Error(err) + }() + + ch := make(chan error, 1) + go func() { + var bytes, key uint32 + var overlapped *syscall.Overlapped + err := syscall.GetQueuedCompletionStatus(syscall.Handle(iocp), &bytes, &key, &overlapped, syscall.INFINITE) + ch <- err + }() + + if err := iocpAssociateFile(pipe, iocp); err != nil { + t.Fatal(err) + } + if err := iocpAssociateFile(file, iocp); err != nil { + t.Fatal(err) } - _, err = pipe.Write([]byte("hello")) - if err != nil { + if _, err := pipe.Write([]byte("hello")); err != nil { + t.Fatal(err) + } + if _, err := file.Write([]byte("hello")); err != nil { t.Fatal(err) } + if _, err := file.WriteAt([]byte("hello"), 0); err != nil { + t.Fatal(err) + } + + // Wait fot he goroutine to call GetQueuedCompletionStatus. + time.Sleep(100 * time.Millisecond) + + // Trigger ERROR_ABANDONED_WAIT_0. + if err := syscall.CloseHandle(iocp); err != nil { + t.Fatal(err) + } + + // Wait for the completion to be posted to the IOCP. + err = <-ch + iocp = syscall.InvalidHandle + const ERROR_ABANDONED_WAIT_0 = syscall.Errno(735) + switch err { + case ERROR_ABANDONED_WAIT_0: + // This is what we expect. + case nil: + t.Error("unexpected queued completion") + default: + t.Error(err) + } } |
