diff options
| author | Shulhan <ms@kilabit.info> | 2024-02-03 18:59:32 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2024-02-04 00:29:34 +0700 |
| commit | 0e5a09c952e7775f5b6e82f9f94d81eb977c2d5e (patch) | |
| tree | 76dc8488edffab3768191237354454f918543762 | |
| parent | 0bdc54f2a7a3b2d685b083675856631d967eb17b (diff) | |
| download | pakakeh.go-0e5a09c952e7775f5b6e82f9f94d81eb977c2d5e.tar.xz | |
lib/memfs: protect internal [DirWatcher.fileWatcher] with mutex
This is to fix several flaky tests on ExampleDirWatcher [1].
[1] https://github.com/shuLhan/share/actions/runs/7765975142
[2] https://github.com/shuLhan/share/actions/runs/7672754239
| -rw-r--r-- | lib/memfs/dirwatcher.go | 174 | ||||
| -rw-r--r-- | lib/memfs/dirwatcher_example_test.go | 14 |
2 files changed, 108 insertions, 80 deletions
diff --git a/lib/memfs/dirwatcher.go b/lib/memfs/dirwatcher.go index c4011788..e1ab7dc1 100644 --- a/lib/memfs/dirwatcher.go +++ b/lib/memfs/dirwatcher.go @@ -32,12 +32,15 @@ type DirWatcher struct { qrun chan struct{} + // The fs field initialized from Root if its nil. + fs *MemFS + + // The root Node in fs. root *Node - fs *MemFS // dirs contains list of directory and their sub-directories that is // being watched for changes. - // The map key is relative path to directory and its value is a node + // The map key is relative path to Root and its value is a Node // information. dirs map[string]*Node @@ -59,38 +62,43 @@ type DirWatcher struct { // Delay define a duration when the new changes will be fetched from // system. - // This field is optional, minimum is 100 milli second and default is - // 5 seconds. + // This field is optional, minimum is 100 milli second and default + // is 5 seconds. Delay time.Duration + // dirsLocker protect adding and removing key in [dirs]. dirsLocker sync.Mutex + + // mtxFileWatcher protect adding and removing key in [fileWatcher]. + mtxFileWatcher sync.Mutex } +// init validate and initialized all fields. +// Once initialized the [DirWatcher.dirs] will contains all directories and +// [DirWatcher.fileWatcher] start watching all regular files. func (dw *DirWatcher) init() (err error) { - var ( - logp = "init" - - fi fs.FileInfo - ) + var logp = `init` if dw.Delay < 100*time.Millisecond { dw.Delay = defWatchDelay } if dw.fs == nil { + var fi fs.FileInfo + fi, err = os.Stat(dw.Root) if err != nil { - return fmt.Errorf("%s: %w", logp, err) + return fmt.Errorf(`%s: %w`, logp, err) } if !fi.IsDir() { - return fmt.Errorf("%s: %q is not a directory", logp, dw.Root) + return fmt.Errorf(`%s: %q is not a directory`, logp, dw.Root) } dw.Options.MaxFileSize = -1 dw.fs, err = New(&dw.Options) if err != nil { - return fmt.Errorf("%s: %w", logp, err) + return fmt.Errorf(`%s: %w`, logp, err) } } dw.root = dw.fs.Root @@ -111,13 +119,11 @@ func (dw *DirWatcher) init() (err error) { // Start watching changes in directory and its content. func (dw *DirWatcher) Start() (err error) { - var ( - logp = "Start" - ) + var logp = `Start` err = dw.init() if err != nil { - return fmt.Errorf("%s: %w", logp, err) + return fmt.Errorf(`%s: %w`, logp, err) } go dw.start() @@ -127,11 +133,7 @@ func (dw *DirWatcher) Start() (err error) { // Stop watching changes on directory. func (dw *DirWatcher) Stop() { - // Stop all file watchers. - var watcher *Watcher - for _, watcher = range dw.fileWatcher { - watcher.Stop() - } + dw.stopAllFileWatcher() select { case dw.qrun <- struct{}{}: @@ -154,16 +156,15 @@ func (dw *DirWatcher) dirsKeys() (keys []string) { return keys } -// mapSubdirs iterate each child node recursively and map any sub -// directories into mapSubdirs. -// If its a regular file, start a NewWatcher. +// mapSubdirs iterate each child node recursively and map directories into +// [DirWatcher.dirs]. +// If its a regular file, start a new file [Watcher]. func (dw *DirWatcher) mapSubdirs(node *Node) { var ( - logp = `DirWatcher.mapSubdirs` + logp = `mapSubdirs` - child *Node - watcher *Watcher - err error + child *Node + err error ) for _, child = range node.Childs { @@ -174,12 +175,10 @@ func (dw *DirWatcher) mapSubdirs(node *Node) { dw.mapSubdirs(child) continue } - watcher, err = newWatcher(node, child, dw.Delay, dw.qFileChanges) + err = dw.startWatchingFile(node, child) if err != nil { - log.Printf("%s %q: %s", logp, child.SysPath, err) - continue + log.Printf(`%s: %s`, logp, err) } - dw.fileWatcher[child.Path] = watcher } } @@ -190,15 +189,7 @@ func (dw *DirWatcher) onCreated(parent, child *Node) (err error) { dw.dirs[child.Path] = child dw.dirsLocker.Unlock() } else { - // Start watching the file for modification. - var watcher *Watcher - - watcher, err = newWatcher(parent, child, dw.Delay, dw.qFileChanges) - if err != nil { - return fmt.Errorf(`onCreated: %w`, err) - } - - dw.fileWatcher[child.Path] = watcher + dw.startWatchingFile(parent, child) } var ns = NodeState{ @@ -242,13 +233,7 @@ func (dw *DirWatcher) onDirDeleted(node *Node) { } func (dw *DirWatcher) onFileDeleted(node *Node) { - var watcher = dw.fileWatcher[node.Path] - if watcher != nil { - watcher.Stop() - delete(dw.fileWatcher, node.Path) - } - - dw.fs.RemoveChild(node.Parent, node) + dw.stopWatchingFile(node) var ns = NodeState{ State: FileStateDeleted, @@ -372,16 +357,17 @@ func (dw *DirWatcher) onRootCreated() { dw.mapSubdirs(dw.root) } -// onRootDeleted handle change when the root directory that we watch get -// deleted. It will send deleted event and unmount the root directory from -// memory. +// onRootDeleted handle change when the [DirWatcher.Options.Root] directory +// deleted. +// It will send the [FileStateDeleted] event and unmount the root directory +// from memory. func (dw *DirWatcher) onRootDeleted() { - var ( - ns = NodeState{ - Node: *dw.root, - State: FileStateDeleted, - } - ) + var ns = NodeState{ + Node: *dw.root, + State: FileStateDeleted, + } + + dw.stopAllFileWatcher() dw.fs = nil dw.root = nil @@ -459,27 +445,26 @@ func (dw *DirWatcher) onUpdateMode(node *Node, newInfo os.FileInfo) { func (dw *DirWatcher) start() { var ( - logp = "DirWatcher" + logp = `DirWatcher` ticker = time.NewTicker(dw.Delay) - ever = true - node *Node - fi os.FileInfo - ns NodeState - err error + ns NodeState + err error ) - for ever { + for { select { case <-ticker.C: + var fi os.FileInfo + fi, err = os.Stat(dw.Root) if err != nil { - if !os.IsNotExist(err) { - log.Printf("%s: %s", logp, err) - continue - } - if dw.fs != nil { - dw.onRootDeleted() + if os.IsNotExist(err) { + if dw.fs != nil { + dw.onRootDeleted() + } + } else { + log.Printf(`%s: %s`, logp, err) } continue } @@ -497,6 +482,8 @@ func (dw *DirWatcher) start() { dw.processSubdirs() case ns = <-dw.qFileChanges: + var node *Node + node, err = dw.fs.Get(ns.Node.Path) if err != nil { log.Printf("%s: on file changes %s: %s", logp, ns.Node.Path, err) @@ -514,14 +501,59 @@ func (dw *DirWatcher) start() { } case <-dw.qrun: - ever = false ticker.Stop() // Signal back to the Stop caller. dw.qrun <- struct{}{} + return } } } +func (dw *DirWatcher) startWatchingFile(parent, child *Node) (err error) { + var ( + logp = `startWatchingFile` + watcher *Watcher + ) + + watcher, err = newWatcher(parent, child, dw.Delay, dw.qFileChanges) + if err != nil { + return fmt.Errorf(`%s %q: %s`, logp, child.SysPath, err) + } + + dw.mtxFileWatcher.Lock() + dw.fileWatcher[child.Path] = watcher + dw.mtxFileWatcher.Unlock() + + return nil +} + +func (dw *DirWatcher) stopAllFileWatcher() { + var watcher *Watcher + + dw.mtxFileWatcher.Lock() + + for _, watcher = range dw.fileWatcher { + watcher.Stop() + dw.fs.RemoveChild(watcher.node.Parent, watcher.node) + } + dw.fileWatcher = map[string]*Watcher{} + + dw.mtxFileWatcher.Unlock() +} + +func (dw *DirWatcher) stopWatchingFile(node *Node) { + dw.mtxFileWatcher.Lock() + + var watcher = dw.fileWatcher[node.Path] + if watcher != nil { + watcher.Stop() + delete(dw.fileWatcher, node.Path) + dw.fs.RemoveChild(node.Parent, node) + } + + dw.mtxFileWatcher.Unlock() +} + func (dw *DirWatcher) processSubdirs() { var ( logp = `processSubdirs` diff --git a/lib/memfs/dirwatcher_example_test.go b/lib/memfs/dirwatcher_example_test.go index 5f9a69db..b6b52d16 100644 --- a/lib/memfs/dirwatcher_example_test.go +++ b/lib/memfs/dirwatcher_example_test.go @@ -16,7 +16,6 @@ import ( func ExampleDirWatcher() { var ( - ns memfs.NodeState rootDir string err error ) @@ -26,9 +25,9 @@ func ExampleDirWatcher() { log.Fatal(err) } - // In this example, we watch sub directory "assets" and its contents, - // include only file with .adoc extension and ignoring files with - // .html extension. + // In this example, we watch sub directory "assets" and its + // contents, including only files with ".adoc" extension and + // excluding files with ".html" extension. var dw = &memfs.DirWatcher{ Options: memfs.Options{ Root: rootDir, @@ -48,16 +47,13 @@ func ExampleDirWatcher() { log.Fatal(err) } - // Add delay for goroutine to catch up and modtime to changes. - // We try with 100ms but sometimes it stuck on the first <-dw.C. - time.Sleep(200 * time.Millisecond) - fmt.Println(`Deleting the root directory:`) err = os.Remove(rootDir) if err != nil { log.Fatal(err) } - ns = <-dw.C + + var ns = <-dw.C fmt.Println(`--`, ns.State, ns.Node.Path) // Create the root directory back with sub directory |
