aboutsummaryrefslogtreecommitdiff
path: root/src/internal/poll
diff options
context:
space:
mode:
authorqmuntal <quimmuntal@gmail.com>2025-03-28 20:38:34 +0100
committerQuim Muntal <quimmuntal@gmail.com>2025-03-28 14:14:48 -0700
commitb9934d855c4635edf02092a72802017676abd8eb (patch)
tree1e349674d9b21af9d29933a9a92b7842cf2a261a /src/internal/poll
parent5ec76ae5aa965208d820a0bde8f0abd685c17ecc (diff)
downloadgo-b9934d855c4635edf02092a72802017676abd8eb.tar.xz
internal/poll: honor ERROR_OPERATION_ABORTED if pipe is not closed
FD.Read converts a syscall.ERROR_OPERATION_ABORTED error to ErrFileClosing. It does that in case the pipe operation was aborted by a CancelIoEx call in FD.Close. It doesn't take into account that the operation might have been aborted by a CancelIoEx call in external code. In that case, the operation should return the error as is. Change-Id: I75dcf0edaace8b57dc47b398ea591ca9f116112b Reviewed-on: https://go-review.googlesource.com/c/go/+/661555 LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: Damien Neil <dneil@google.com> Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
Diffstat (limited to 'src/internal/poll')
-rw-r--r--src/internal/poll/fd_mutex.go5
-rw-r--r--src/internal/poll/fd_windows.go12
-rw-r--r--src/internal/poll/fd_windows_test.go43
3 files changed, 55 insertions, 5 deletions
diff --git a/src/internal/poll/fd_mutex.go b/src/internal/poll/fd_mutex.go
index 0a8ee6f0d4..4d194df186 100644
--- a/src/internal/poll/fd_mutex.go
+++ b/src/internal/poll/fd_mutex.go
@@ -250,3 +250,8 @@ func (fd *FD) writeUnlock() {
fd.destroy()
}
}
+
+// closing returns true if fd is closing.
+func (fd *FD) closing() bool {
+ return atomic.LoadUint64(&fd.fdmu.state)&mutexClosed != 0
+}
diff --git a/src/internal/poll/fd_windows.go b/src/internal/poll/fd_windows.go
index 81c8293911..1caa760349 100644
--- a/src/internal/poll/fd_windows.go
+++ b/src/internal/poll/fd_windows.go
@@ -461,10 +461,12 @@ func (fd *FD) Read(buf []byte) (int, error) {
// Returned by pipes when the other end is closed.
err = nil
case syscall.ERROR_OPERATION_ABORTED:
- // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
- // If the fd is a pipe and the Read was interrupted by CancelIoEx,
- // we assume it is interrupted by Close.
- err = ErrFileClosing
+ if fd.closing() {
+ // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
+ // If the fd is a pipe and the Read was interrupted by CancelIoEx,
+ // we assume it is interrupted by Close.
+ err = ErrFileClosing
+ }
}
}
case kindNet:
@@ -717,7 +719,7 @@ func (fd *FD) Write(buf []byte) (int, error) {
return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
})
fd.addOffset(n)
- if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED {
+ if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED && fd.closing() {
// Close uses CancelIoEx to interrupt concurrent I/O for pipes.
// If the fd is a pipe and the Write was interrupted by CancelIoEx,
// we assume it is interrupted by Close.
diff --git a/src/internal/poll/fd_windows_test.go b/src/internal/poll/fd_windows_test.go
index f5fa4a26e3..042bdf8bed 100644
--- a/src/internal/poll/fd_windows_test.go
+++ b/src/internal/poll/fd_windows_test.go
@@ -18,6 +18,7 @@ import (
"sync/atomic"
"syscall"
"testing"
+ "time"
"unsafe"
)
@@ -339,7 +340,9 @@ func testPreadPwrite(t *testing.T, fdr, fdw *poll.FD) {
}
func TestFile(t *testing.T) {
+ t.Parallel()
test := func(t *testing.T, r, w bool) {
+ t.Parallel()
name := filepath.Join(t.TempDir(), "foo")
rh := newFile(t, name, r)
wh := newFile(t, name, w)
@@ -361,27 +364,33 @@ func TestFile(t *testing.T) {
}
func TestPipe(t *testing.T) {
+ t.Parallel()
t.Run("overlapped", func(t *testing.T) {
+ t.Parallel()
name, pipe := newPipe(t, true, false)
file := newFile(t, name, true)
testReadWrite(t, pipe, file)
})
t.Run("overlapped-write", func(t *testing.T) {
+ t.Parallel()
name, pipe := newPipe(t, true, false)
file := newFile(t, name, false)
testReadWrite(t, file, pipe)
})
t.Run("overlapped-read", func(t *testing.T) {
+ t.Parallel()
name, pipe := newPipe(t, false, false)
file := newFile(t, name, true)
testReadWrite(t, file, pipe)
})
t.Run("sync", func(t *testing.T) {
+ t.Parallel()
name, pipe := newPipe(t, false, false)
file := newFile(t, name, false)
testReadWrite(t, file, pipe)
})
t.Run("anonymous", func(t *testing.T) {
+ t.Parallel()
var r, w syscall.Handle
if err := syscall.CreatePipe(&r, &w, nil, 0); err != nil {
t.Fatal(err)
@@ -402,6 +411,7 @@ func TestPipe(t *testing.T) {
}
func TestPipeWriteEOF(t *testing.T) {
+ t.Parallel()
name, pipe := newPipe(t, false, true)
file := newFile(t, name, false)
read := make(chan struct{}, 1)
@@ -423,6 +433,39 @@ func TestPipeWriteEOF(t *testing.T) {
}
}
+func TestPipeCanceled(t *testing.T) {
+ t.Parallel()
+ name, _ := newPipe(t, true, false)
+ file := newFile(t, name, true)
+ ch := make(chan struct{}, 1)
+ go func() {
+ for {
+ select {
+ case <-ch:
+ return
+ default:
+ syscall.CancelIo(syscall.Handle(file.Sysfd))
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+ }()
+ // Try to cancel for max 1 second.
+ // Canceling is normally really fast, but it can take an
+ // arbitrary amount of time on busy systems.
+ // If it takes too long, we skip the test.
+ file.SetReadDeadline(time.Now().Add(1 * time.Second))
+ var tmp [1]byte
+ // Read will block until the cancel is complete.
+ _, err := file.Read(tmp[:])
+ ch <- struct{}{}
+ if err == poll.ErrDeadlineExceeded {
+ t.Skip("took too long to cancel")
+ }
+ if err != syscall.ERROR_OPERATION_ABORTED {
+ t.Errorf("expected ERROR_OPERATION_ABORTED, got %v", err)
+ }
+}
+
func BenchmarkReadOverlapped(b *testing.B) {
benchmarkRead(b, true)
}