aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIan Lance Taylor <iant@golang.org>2022-10-04 14:35:39 -0700
committerGopher Robot <gobot@golang.org>2022-10-08 03:57:40 +0000
commit4fe1971b2dff1fa14cb8f5be47aed7fda76c0f7c (patch)
treedc3dbd8b4e173595674c520a1860fbcb0b17ac97 /src
parent669ec549b554ef7fe957bf284271ed3b7db82da1 (diff)
downloadgo-4fe1971b2dff1fa14cb8f5be47aed7fda76c0f7c.tar.xz
os: use poll.fdMutex for Plan 9 files
This permits us to safely support concurrent access to files on Plan 9. Concurrent access was already safe on other systems. This does introduce a change: if one goroutine calls a blocking read on a pipe, and another goroutine closes the pipe, then before this CL the close would occur. Now the close will be delayed until the blocking read completes. Also add tests that concurrent I/O and Close on a pipe are OK. For #50436 For #56043 Change-Id: I969c869ea3b8c5c2f2ef319e441a56a3c64e7bf5 Reviewed-on: https://go-review.googlesource.com/c/go/+/438347 Reviewed-by: Bryan Mills <bcmills@google.com> Reviewed-by: Ian Lance Taylor <iant@google.com> Reviewed-by: David du Colombier <0intro@gmail.com> Run-TryBot: Ian Lance Taylor <iant@google.com> Auto-Submit: Ian Lance Taylor <iant@google.com> TryBot-Result: Gopher Robot <gobot@golang.org> Reviewed-by: Rob Pike <r@golang.org>
Diffstat (limited to 'src')
-rw-r--r--src/internal/poll/export_test.go12
-rw-r--r--src/internal/poll/fd_mutex_test.go12
-rw-r--r--src/internal/poll/file_plan9.go42
-rw-r--r--src/os/file_mutex_plan9.go70
-rw-r--r--src/os/file_plan9.go80
-rw-r--r--src/os/os_test.go112
-rw-r--r--src/os/stat_plan9.go4
-rw-r--r--src/os/types_plan9.go2
8 files changed, 306 insertions, 28 deletions
diff --git a/src/internal/poll/export_test.go b/src/internal/poll/export_test.go
index 02664d9ea3..66d7c3274b 100644
--- a/src/internal/poll/export_test.go
+++ b/src/internal/poll/export_test.go
@@ -10,26 +10,26 @@ package poll
var Consume = consume
-type FDMutex struct {
+type XFDMutex struct {
fdMutex
}
-func (mu *FDMutex) Incref() bool {
+func (mu *XFDMutex) Incref() bool {
return mu.incref()
}
-func (mu *FDMutex) IncrefAndClose() bool {
+func (mu *XFDMutex) IncrefAndClose() bool {
return mu.increfAndClose()
}
-func (mu *FDMutex) Decref() bool {
+func (mu *XFDMutex) Decref() bool {
return mu.decref()
}
-func (mu *FDMutex) RWLock(read bool) bool {
+func (mu *XFDMutex) RWLock(read bool) bool {
return mu.rwlock(read)
}
-func (mu *FDMutex) RWUnlock(read bool) bool {
+func (mu *XFDMutex) RWUnlock(read bool) bool {
return mu.rwunlock(read)
}
diff --git a/src/internal/poll/fd_mutex_test.go b/src/internal/poll/fd_mutex_test.go
index 3029b9a681..62f953192d 100644
--- a/src/internal/poll/fd_mutex_test.go
+++ b/src/internal/poll/fd_mutex_test.go
@@ -14,7 +14,7 @@ import (
)
func TestMutexLock(t *testing.T) {
- var mu FDMutex
+ var mu XFDMutex
if !mu.Incref() {
t.Fatal("broken")
@@ -39,7 +39,7 @@ func TestMutexLock(t *testing.T) {
}
func TestMutexClose(t *testing.T) {
- var mu FDMutex
+ var mu XFDMutex
if !mu.IncrefAndClose() {
t.Fatal("broken")
}
@@ -60,7 +60,7 @@ func TestMutexClose(t *testing.T) {
func TestMutexCloseUnblock(t *testing.T) {
c := make(chan bool, 4)
- var mu FDMutex
+ var mu XFDMutex
mu.RWLock(true)
for i := 0; i < 4; i++ {
go func() {
@@ -104,7 +104,7 @@ func TestMutexPanic(t *testing.T) {
f()
}
- var mu FDMutex
+ var mu XFDMutex
ensurePanics(func() { mu.Decref() })
ensurePanics(func() { mu.RWUnlock(true) })
ensurePanics(func() { mu.RWUnlock(false) })
@@ -137,7 +137,7 @@ func TestMutexOverflowPanic(t *testing.T) {
}
}()
- var mu1 FDMutex
+ var mu1 XFDMutex
for i := 0; i < 1<<21; i++ {
mu1.Incref()
}
@@ -152,7 +152,7 @@ func TestMutexStress(t *testing.T) {
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P))
done := make(chan bool, P)
- var mu FDMutex
+ var mu XFDMutex
var readState [2]uint64
var writeState [2]uint64
for p := 0; p < P; p++ {
diff --git a/src/internal/poll/file_plan9.go b/src/internal/poll/file_plan9.go
new file mode 100644
index 0000000000..57dc0c668f
--- /dev/null
+++ b/src/internal/poll/file_plan9.go
@@ -0,0 +1,42 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package poll
+
+// Expose fdMutex for use by the os package on Plan 9.
+// On Plan 9 we don't want to use async I/O for file operations,
+// but we still want the locking semantics that fdMutex provides.
+
+// FDMutex is an exported fdMutex, only for Plan 9.
+type FDMutex struct {
+ fdmu fdMutex
+}
+
+func (fdmu *FDMutex) Incref() bool {
+ return fdmu.fdmu.incref()
+}
+
+func (fdmu *FDMutex) Decref() bool {
+ return fdmu.fdmu.decref()
+}
+
+func (fdmu *FDMutex) IncrefAndClose() bool {
+ return fdmu.fdmu.increfAndClose()
+}
+
+func (fdmu *FDMutex) ReadLock() bool {
+ return fdmu.fdmu.rwlock(true)
+}
+
+func (fdmu *FDMutex) ReadUnlock() bool {
+ return fdmu.fdmu.rwunlock(true)
+}
+
+func (fdmu *FDMutex) WriteLock() bool {
+ return fdmu.fdmu.rwlock(false)
+}
+
+func (fdmu *FDMutex) WriteUnlock() bool {
+ return fdmu.fdmu.rwunlock(false)
+}
diff --git a/src/os/file_mutex_plan9.go b/src/os/file_mutex_plan9.go
new file mode 100644
index 0000000000..26bf5a7d1e
--- /dev/null
+++ b/src/os/file_mutex_plan9.go
@@ -0,0 +1,70 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package os
+
+// File locking support for Plan 9. This uses fdMutex from the
+// internal/poll package.
+
+// incref adds a reference to the file. It returns an error if the file
+// is already closed. This method is on File so that we can incorporate
+// a nil test.
+func (f *File) incref(op string) (err error) {
+ if f == nil {
+ return ErrInvalid
+ }
+ if !f.fdmu.Incref() {
+ err = ErrClosed
+ if op != "" {
+ err = &PathError{Op: op, Path: f.name, Err: err}
+ }
+ }
+ return err
+}
+
+// decref removes a reference to the file. If this is the last
+// remaining reference, and the file has been marked to be closed,
+// then actually close it.
+func (file *file) decref() error {
+ if file.fdmu.Decref() {
+ return file.destroy()
+ }
+ return nil
+}
+
+// readLock adds a reference to the file and locks it for reading.
+// It returns an error if the file is already closed.
+func (file *file) readLock() error {
+ if !file.fdmu.ReadLock() {
+ return ErrClosed
+ }
+ return nil
+}
+
+// readUnlock removes a reference from the file and unlocks it for reading.
+// It also closes the file if it marked as closed and there is no remaining
+// reference.
+func (file *file) readUnlock() {
+ if file.fdmu.ReadUnlock() {
+ file.destroy()
+ }
+}
+
+// writeLock adds a reference to the file and locks it for writing.
+// It returns an error if the file is already closed.
+func (file *file) writeLock() error {
+ if !file.fdmu.WriteLock() {
+ return ErrClosed
+ }
+ return nil
+}
+
+// writeUnlock removes a reference from the file and unlocks it for writing.
+// It also closes the file if it is marked as closed and there is no remaining
+// reference.
+func (file *file) writeUnlock() {
+ if file.fdmu.WriteUnlock() {
+ file.destroy()
+ }
+}
diff --git a/src/os/file_plan9.go b/src/os/file_plan9.go
index 93eb233e00..7a4a562783 100644
--- a/src/os/file_plan9.go
+++ b/src/os/file_plan9.go
@@ -22,6 +22,7 @@ func fixLongPath(path string) string {
// can overwrite this data, which could cause the finalizer
// to close the wrong file descriptor.
type file struct {
+ fdmu poll.FDMutex
fd int
name string
dirinfo *dirInfo // nil unless directory being read
@@ -142,24 +143,35 @@ func openFileNolog(name string, flag int, perm FileMode) (*File, error) {
// be canceled and return immediately with an ErrClosed error.
// Close will return an error if it has already been called.
func (f *File) Close() error {
- if err := f.checkValid("close"); err != nil {
- return err
+ if f == nil {
+ return ErrInvalid
}
return f.file.close()
}
func (file *file) close() error {
- if file == nil || file.fd == badFd {
- return ErrInvalid
+ if !file.fdmu.IncrefAndClose() {
+ return &PathError{Op: "close", Path: file.name, Err: ErrClosed}
}
+
+ // At this point we should cancel any pending I/O.
+ // How do we do that on Plan 9?
+
+ err := file.decref()
+
+ // no need for a finalizer anymore
+ runtime.SetFinalizer(file, nil)
+ return err
+}
+
+// destroy actually closes the descriptor. This is called when
+// there are no remaining references, by the decref, readUnlock,
+// and writeUnlock methods.
+func (file *file) destroy() error {
var err error
if e := syscall.Close(file.fd); e != nil {
err = &PathError{Op: "close", Path: file.name, Err: e}
}
- file.fd = badFd // so it can't be closed again
-
- // no need for a finalizer anymore
- runtime.SetFinalizer(file, nil)
return err
}
@@ -193,6 +205,12 @@ func (f *File) Truncate(size int64) error {
if err != nil {
return &PathError{Op: "truncate", Path: f.name, Err: err}
}
+
+ if err := f.incref("truncate"); err != nil {
+ return err
+ }
+ defer f.decref()
+
if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
return &PathError{Op: "truncate", Path: f.name, Err: err}
}
@@ -219,6 +237,12 @@ func (f *File) chmod(mode FileMode) error {
if err != nil {
return &PathError{Op: "chmod", Path: f.name, Err: err}
}
+
+ if err := f.incref("chmod"); err != nil {
+ return err
+ }
+ defer f.decref()
+
if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
return &PathError{Op: "chmod", Path: f.name, Err: err}
}
@@ -240,6 +264,12 @@ func (f *File) Sync() error {
if err != nil {
return &PathError{Op: "sync", Path: f.name, Err: err}
}
+
+ if err := f.incref("sync"); err != nil {
+ return err
+ }
+ defer f.decref()
+
if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
return &PathError{Op: "sync", Path: f.name, Err: err}
}
@@ -249,6 +279,10 @@ func (f *File) Sync() error {
// read reads up to len(b) bytes from the File.
// It returns the number of bytes read and an error, if any.
func (f *File) read(b []byte) (n int, err error) {
+ if err := f.readLock(); err != nil {
+ return 0, err
+ }
+ defer f.readUnlock()
n, e := fixCount(syscall.Read(f.fd, b))
if n == 0 && len(b) > 0 && e == nil {
return 0, io.EOF
@@ -260,6 +294,10 @@ func (f *File) read(b []byte) (n int, err error) {
// It returns the number of bytes read and the error, if any.
// EOF is signaled by a zero count with err set to nil.
func (f *File) pread(b []byte, off int64) (n int, err error) {
+ if err := f.readLock(); err != nil {
+ return 0, err
+ }
+ defer f.readUnlock()
n, e := fixCount(syscall.Pread(f.fd, b, off))
if n == 0 && len(b) > 0 && e == nil {
return 0, io.EOF
@@ -272,6 +310,10 @@ func (f *File) pread(b []byte, off int64) (n int, err error) {
// Since Plan 9 preserves message boundaries, never allow
// a zero-byte write.
func (f *File) write(b []byte) (n int, err error) {
+ if err := f.writeLock(); err != nil {
+ return 0, err
+ }
+ defer f.writeUnlock()
if len(b) == 0 {
return 0, nil
}
@@ -283,6 +325,10 @@ func (f *File) write(b []byte) (n int, err error) {
// Since Plan 9 preserves message boundaries, never allow
// a zero-byte write.
func (f *File) pwrite(b []byte, off int64) (n int, err error) {
+ if err := f.writeLock(); err != nil {
+ return 0, err
+ }
+ defer f.writeUnlock()
if len(b) == 0 {
return 0, nil
}
@@ -294,6 +340,10 @@ func (f *File) pwrite(b []byte, off int64) (n int, err error) {
// relative to the current offset, and 2 means relative to the end.
// It returns the new offset and an error, if any.
func (f *File) seek(offset int64, whence int) (ret int64, err error) {
+ if err := f.incref(""); err != nil {
+ return 0, err
+ }
+ defer f.decref()
if f.dirinfo != nil {
// Free cached dirinfo, so we allocate a new one if we
// access this file as a directory again. See #35767 and #37161.
@@ -493,9 +543,10 @@ func tempDir() string {
// which must be a directory.
// If there is an error, it will be of type *PathError.
func (f *File) Chdir() error {
- if err := f.checkValid("chdir"); err != nil {
+ if err := f.incref("chdir"); err != nil {
return err
}
+ defer f.decref()
if e := syscall.Fchdir(f.fd); e != nil {
return &PathError{Op: "chdir", Path: f.name, Err: e}
}
@@ -526,16 +577,17 @@ func (f *File) setWriteDeadline(time.Time) error {
return poll.ErrNoDeadline
}
-// checkValid checks whether f is valid for use.
-// If not, it returns an appropriate error, perhaps incorporating the operation name op.
+// checkValid checks whether f is valid for use, but does not prepare
+// to actually use it. If f is not ready checkValid returns an appropriate
+// error, perhaps incorporating the operation name op.
func (f *File) checkValid(op string) error {
if f == nil {
return ErrInvalid
}
- if f.fd == badFd {
- return &PathError{Op: op, Path: f.name, Err: ErrClosed}
+ if err := f.incref(op); err != nil {
+ return err
}
- return nil
+ return f.decref()
}
type rawConn struct{}
diff --git a/src/os/os_test.go b/src/os/os_test.go
index ff74598362..550b7db5a3 100644
--- a/src/os/os_test.go
+++ b/src/os/os_test.go
@@ -2799,3 +2799,115 @@ func TestWriteStringAlloc(t *testing.T) {
t.Errorf("expected 0 allocs for File.WriteString, got %v", allocs)
}
}
+
+// Test that it's OK to have parallel I/O and Close on a pipe.
+func TestPipeIOCloseRace(t *testing.T) {
+ // Skip on wasm, which doesn't have pipes.
+ if runtime.GOOS == "js" {
+ t.Skip("skipping on js: no pipes")
+ }
+
+ r, w, err := Pipe()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(3)
+
+ go func() {
+ defer wg.Done()
+ for {
+ n, err := w.Write([]byte("hi"))
+ if err != nil {
+ // We look at error strings as the
+ // expected errors are OS-specific.
+ switch {
+ case errors.Is(err, ErrClosed),
+ strings.Contains(err.Error(), "broken pipe"),
+ strings.Contains(err.Error(), "pipe is being closed"),
+ strings.Contains(err.Error(), "hungup channel"):
+ // Ignore an expected error.
+ default:
+ // Unexpected error.
+ t.Error(err)
+ }
+ return
+ }
+ if n != 2 {
+ t.Errorf("wrote %d bytes, expected 2", n)
+ return
+ }
+ }
+ }()
+
+ go func() {
+ defer wg.Done()
+ for {
+ var buf [2]byte
+ n, err := r.Read(buf[:])
+ if err != nil {
+ if err != io.EOF && !errors.Is(err, ErrClosed) {
+ t.Error(err)
+ }
+ return
+ }
+ if n != 2 {
+ t.Errorf("read %d bytes, want 2", n)
+ }
+ }
+ }()
+
+ go func() {
+ defer wg.Done()
+
+ // Let the other goroutines start. This is just to get
+ // a better test, the test will still pass if they
+ // don't start.
+ time.Sleep(time.Millisecond)
+
+ if err := r.Close(); err != nil {
+ t.Error(err)
+ }
+ if err := w.Close(); err != nil {
+ t.Error(err)
+ }
+ }()
+
+ wg.Wait()
+}
+
+// Test that it's OK to call Close concurrently on a pipe.
+func TestPipeCloseRace(t *testing.T) {
+ // Skip on wasm, which doesn't have pipes.
+ if runtime.GOOS == "js" {
+ t.Skip("skipping on js: no pipes")
+ }
+
+ r, w, err := Pipe()
+ if err != nil {
+ t.Fatal(err)
+ }
+ var wg sync.WaitGroup
+ c := make(chan error, 4)
+ f := func() {
+ defer wg.Done()
+ c <- r.Close()
+ c <- w.Close()
+ }
+ wg.Add(2)
+ go f()
+ go f()
+ nils, errs := 0, 0
+ for i := 0; i < 4; i++ {
+ err := <-c
+ if err == nil {
+ nils++
+ } else {
+ errs++
+ }
+ }
+ if nils != 2 || errs != 2 {
+ t.Errorf("got nils %d errs %d, want 2 2", nils, errs)
+ }
+}
diff --git a/src/os/stat_plan9.go b/src/os/stat_plan9.go
index e20accf191..a5e9901379 100644
--- a/src/os/stat_plan9.go
+++ b/src/os/stat_plan9.go
@@ -56,7 +56,11 @@ func dirstat(arg any) (*syscall.Dir, error) {
switch a := arg.(type) {
case *File:
name = a.name
+ if err := a.incref("fstat"); err != nil {
+ return nil, err
+ }
n, err = syscall.Fstat(a.fd, buf)
+ a.decref()
case string:
name = a
n, err = syscall.Stat(a, buf)
diff --git a/src/os/types_plan9.go b/src/os/types_plan9.go
index ccf4fd932e..adb4013085 100644
--- a/src/os/types_plan9.go
+++ b/src/os/types_plan9.go
@@ -28,5 +28,3 @@ func sameFile(fs1, fs2 *fileStat) bool {
b := fs2.sys.(*syscall.Dir)
return a.Qid.Path == b.Qid.Path && a.Type == b.Type && a.Dev == b.Dev
}
-
-const badFd = -1