summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2024-02-03 18:59:32 +0700
committerShulhan <ms@kilabit.info>2024-02-04 00:29:34 +0700
commit0e5a09c952e7775f5b6e82f9f94d81eb977c2d5e (patch)
tree76dc8488edffab3768191237354454f918543762
parent0bdc54f2a7a3b2d685b083675856631d967eb17b (diff)
downloadpakakeh.go-0e5a09c952e7775f5b6e82f9f94d81eb977c2d5e.tar.xz
lib/memfs: protect internal [DirWatcher.fileWatcher] with mutex
This is to fix several flaky tests on ExampleDirWatcher [1]. [1] https://github.com/shuLhan/share/actions/runs/7765975142 [2] https://github.com/shuLhan/share/actions/runs/7672754239
-rw-r--r--lib/memfs/dirwatcher.go174
-rw-r--r--lib/memfs/dirwatcher_example_test.go14
2 files changed, 108 insertions, 80 deletions
diff --git a/lib/memfs/dirwatcher.go b/lib/memfs/dirwatcher.go
index c4011788..e1ab7dc1 100644
--- a/lib/memfs/dirwatcher.go
+++ b/lib/memfs/dirwatcher.go
@@ -32,12 +32,15 @@ type DirWatcher struct {
qrun chan struct{}
+ // The fs field initialized from Root if its nil.
+ fs *MemFS
+
+ // The root Node in fs.
root *Node
- fs *MemFS
// dirs contains list of directory and their sub-directories that is
// being watched for changes.
- // The map key is relative path to directory and its value is a node
+ // The map key is relative path to Root and its value is a Node
// information.
dirs map[string]*Node
@@ -59,38 +62,43 @@ type DirWatcher struct {
// Delay define a duration when the new changes will be fetched from
// system.
- // This field is optional, minimum is 100 milli second and default is
- // 5 seconds.
+ // This field is optional, minimum is 100 milli second and default
+ // is 5 seconds.
Delay time.Duration
+ // dirsLocker protect adding and removing key in [dirs].
dirsLocker sync.Mutex
+
+ // mtxFileWatcher protect adding and removing key in [fileWatcher].
+ mtxFileWatcher sync.Mutex
}
+// init validate and initialized all fields.
+// Once initialized the [DirWatcher.dirs] will contains all directories and
+// [DirWatcher.fileWatcher] start watching all regular files.
func (dw *DirWatcher) init() (err error) {
- var (
- logp = "init"
-
- fi fs.FileInfo
- )
+ var logp = `init`
if dw.Delay < 100*time.Millisecond {
dw.Delay = defWatchDelay
}
if dw.fs == nil {
+ var fi fs.FileInfo
+
fi, err = os.Stat(dw.Root)
if err != nil {
- return fmt.Errorf("%s: %w", logp, err)
+ return fmt.Errorf(`%s: %w`, logp, err)
}
if !fi.IsDir() {
- return fmt.Errorf("%s: %q is not a directory", logp, dw.Root)
+ return fmt.Errorf(`%s: %q is not a directory`, logp, dw.Root)
}
dw.Options.MaxFileSize = -1
dw.fs, err = New(&dw.Options)
if err != nil {
- return fmt.Errorf("%s: %w", logp, err)
+ return fmt.Errorf(`%s: %w`, logp, err)
}
}
dw.root = dw.fs.Root
@@ -111,13 +119,11 @@ func (dw *DirWatcher) init() (err error) {
// Start watching changes in directory and its content.
func (dw *DirWatcher) Start() (err error) {
- var (
- logp = "Start"
- )
+ var logp = `Start`
err = dw.init()
if err != nil {
- return fmt.Errorf("%s: %w", logp, err)
+ return fmt.Errorf(`%s: %w`, logp, err)
}
go dw.start()
@@ -127,11 +133,7 @@ func (dw *DirWatcher) Start() (err error) {
// Stop watching changes on directory.
func (dw *DirWatcher) Stop() {
- // Stop all file watchers.
- var watcher *Watcher
- for _, watcher = range dw.fileWatcher {
- watcher.Stop()
- }
+ dw.stopAllFileWatcher()
select {
case dw.qrun <- struct{}{}:
@@ -154,16 +156,15 @@ func (dw *DirWatcher) dirsKeys() (keys []string) {
return keys
}
-// mapSubdirs iterate each child node recursively and map any sub
-// directories into mapSubdirs.
-// If its a regular file, start a NewWatcher.
+// mapSubdirs iterate each child node recursively and map directories into
+// [DirWatcher.dirs].
+// If its a regular file, start a new file [Watcher].
func (dw *DirWatcher) mapSubdirs(node *Node) {
var (
- logp = `DirWatcher.mapSubdirs`
+ logp = `mapSubdirs`
- child *Node
- watcher *Watcher
- err error
+ child *Node
+ err error
)
for _, child = range node.Childs {
@@ -174,12 +175,10 @@ func (dw *DirWatcher) mapSubdirs(node *Node) {
dw.mapSubdirs(child)
continue
}
- watcher, err = newWatcher(node, child, dw.Delay, dw.qFileChanges)
+ err = dw.startWatchingFile(node, child)
if err != nil {
- log.Printf("%s %q: %s", logp, child.SysPath, err)
- continue
+ log.Printf(`%s: %s`, logp, err)
}
- dw.fileWatcher[child.Path] = watcher
}
}
@@ -190,15 +189,7 @@ func (dw *DirWatcher) onCreated(parent, child *Node) (err error) {
dw.dirs[child.Path] = child
dw.dirsLocker.Unlock()
} else {
- // Start watching the file for modification.
- var watcher *Watcher
-
- watcher, err = newWatcher(parent, child, dw.Delay, dw.qFileChanges)
- if err != nil {
- return fmt.Errorf(`onCreated: %w`, err)
- }
-
- dw.fileWatcher[child.Path] = watcher
+ dw.startWatchingFile(parent, child)
}
var ns = NodeState{
@@ -242,13 +233,7 @@ func (dw *DirWatcher) onDirDeleted(node *Node) {
}
func (dw *DirWatcher) onFileDeleted(node *Node) {
- var watcher = dw.fileWatcher[node.Path]
- if watcher != nil {
- watcher.Stop()
- delete(dw.fileWatcher, node.Path)
- }
-
- dw.fs.RemoveChild(node.Parent, node)
+ dw.stopWatchingFile(node)
var ns = NodeState{
State: FileStateDeleted,
@@ -372,16 +357,17 @@ func (dw *DirWatcher) onRootCreated() {
dw.mapSubdirs(dw.root)
}
-// onRootDeleted handle change when the root directory that we watch get
-// deleted. It will send deleted event and unmount the root directory from
-// memory.
+// onRootDeleted handle change when the [DirWatcher.Options.Root] directory
+// deleted.
+// It will send the [FileStateDeleted] event and unmount the root directory
+// from memory.
func (dw *DirWatcher) onRootDeleted() {
- var (
- ns = NodeState{
- Node: *dw.root,
- State: FileStateDeleted,
- }
- )
+ var ns = NodeState{
+ Node: *dw.root,
+ State: FileStateDeleted,
+ }
+
+ dw.stopAllFileWatcher()
dw.fs = nil
dw.root = nil
@@ -459,27 +445,26 @@ func (dw *DirWatcher) onUpdateMode(node *Node, newInfo os.FileInfo) {
func (dw *DirWatcher) start() {
var (
- logp = "DirWatcher"
+ logp = `DirWatcher`
ticker = time.NewTicker(dw.Delay)
- ever = true
- node *Node
- fi os.FileInfo
- ns NodeState
- err error
+ ns NodeState
+ err error
)
- for ever {
+ for {
select {
case <-ticker.C:
+ var fi os.FileInfo
+
fi, err = os.Stat(dw.Root)
if err != nil {
- if !os.IsNotExist(err) {
- log.Printf("%s: %s", logp, err)
- continue
- }
- if dw.fs != nil {
- dw.onRootDeleted()
+ if os.IsNotExist(err) {
+ if dw.fs != nil {
+ dw.onRootDeleted()
+ }
+ } else {
+ log.Printf(`%s: %s`, logp, err)
}
continue
}
@@ -497,6 +482,8 @@ func (dw *DirWatcher) start() {
dw.processSubdirs()
case ns = <-dw.qFileChanges:
+ var node *Node
+
node, err = dw.fs.Get(ns.Node.Path)
if err != nil {
log.Printf("%s: on file changes %s: %s", logp, ns.Node.Path, err)
@@ -514,14 +501,59 @@ func (dw *DirWatcher) start() {
}
case <-dw.qrun:
- ever = false
ticker.Stop()
// Signal back to the Stop caller.
dw.qrun <- struct{}{}
+ return
}
}
}
+func (dw *DirWatcher) startWatchingFile(parent, child *Node) (err error) {
+ var (
+ logp = `startWatchingFile`
+ watcher *Watcher
+ )
+
+ watcher, err = newWatcher(parent, child, dw.Delay, dw.qFileChanges)
+ if err != nil {
+ return fmt.Errorf(`%s %q: %s`, logp, child.SysPath, err)
+ }
+
+ dw.mtxFileWatcher.Lock()
+ dw.fileWatcher[child.Path] = watcher
+ dw.mtxFileWatcher.Unlock()
+
+ return nil
+}
+
+func (dw *DirWatcher) stopAllFileWatcher() {
+ var watcher *Watcher
+
+ dw.mtxFileWatcher.Lock()
+
+ for _, watcher = range dw.fileWatcher {
+ watcher.Stop()
+ dw.fs.RemoveChild(watcher.node.Parent, watcher.node)
+ }
+ dw.fileWatcher = map[string]*Watcher{}
+
+ dw.mtxFileWatcher.Unlock()
+}
+
+func (dw *DirWatcher) stopWatchingFile(node *Node) {
+ dw.mtxFileWatcher.Lock()
+
+ var watcher = dw.fileWatcher[node.Path]
+ if watcher != nil {
+ watcher.Stop()
+ delete(dw.fileWatcher, node.Path)
+ dw.fs.RemoveChild(node.Parent, node)
+ }
+
+ dw.mtxFileWatcher.Unlock()
+}
+
func (dw *DirWatcher) processSubdirs() {
var (
logp = `processSubdirs`
diff --git a/lib/memfs/dirwatcher_example_test.go b/lib/memfs/dirwatcher_example_test.go
index 5f9a69db..b6b52d16 100644
--- a/lib/memfs/dirwatcher_example_test.go
+++ b/lib/memfs/dirwatcher_example_test.go
@@ -16,7 +16,6 @@ import (
func ExampleDirWatcher() {
var (
- ns memfs.NodeState
rootDir string
err error
)
@@ -26,9 +25,9 @@ func ExampleDirWatcher() {
log.Fatal(err)
}
- // In this example, we watch sub directory "assets" and its contents,
- // include only file with .adoc extension and ignoring files with
- // .html extension.
+ // In this example, we watch sub directory "assets" and its
+ // contents, including only files with ".adoc" extension and
+ // excluding files with ".html" extension.
var dw = &memfs.DirWatcher{
Options: memfs.Options{
Root: rootDir,
@@ -48,16 +47,13 @@ func ExampleDirWatcher() {
log.Fatal(err)
}
- // Add delay for goroutine to catch up and modtime to changes.
- // We try with 100ms but sometimes it stuck on the first <-dw.C.
- time.Sleep(200 * time.Millisecond)
-
fmt.Println(`Deleting the root directory:`)
err = os.Remove(rootDir)
if err != nil {
log.Fatal(err)
}
- ns = <-dw.C
+
+ var ns = <-dw.C
fmt.Println(`--`, ns.State, ns.Node.Path)
// Create the root directory back with sub directory