diff options
Diffstat (limited to '')
-rw-r--r-- | src/go/collectors/go.d.plugin/agent/discovery/file/watch.go | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go b/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go new file mode 100644 index 000000000..a723b706e --- /dev/null +++ b/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go @@ -0,0 +1,220 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package file + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" + "github.com/netdata/netdata/go/go.d.plugin/logger" + + "github.com/fsnotify/fsnotify" +) + +type ( + Watcher struct { + *logger.Logger + + paths []string + reg confgroup.Registry + watcher *fsnotify.Watcher + cache cache + refreshEvery time.Duration + } + cache map[string]time.Time +) + +func (c cache) lookup(path string) (time.Time, bool) { v, ok := c[path]; return v, ok } +func (c cache) has(path string) bool { _, ok := c.lookup(path); return ok } +func (c cache) remove(path string) { delete(c, path) } +func (c cache) put(path string, modTime time.Time) { c[path] = modTime } + +func NewWatcher(reg confgroup.Registry, paths []string) *Watcher { + d := &Watcher{ + Logger: log, + paths: paths, + reg: reg, + watcher: nil, + cache: make(cache), + refreshEvery: time.Minute, + } + return d +} + +func (w *Watcher) String() string { + return w.Name() +} + +func (w *Watcher) Name() string { + return "file watcher" +} + +func (w *Watcher) Run(ctx context.Context, in chan<- []*confgroup.Group) { + w.Info("instance is started") + defer func() { w.Info("instance is stopped") }() + + watcher, err := fsnotify.NewWatcher() + if err != nil { + w.Errorf("fsnotify watcher initialization: %v", err) + return + } + + w.watcher = watcher + defer w.stop() + w.refresh(ctx, in) + + tk := time.NewTicker(w.refreshEvery) + defer tk.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tk.C: + w.refresh(ctx, in) + case event := <-w.watcher.Events: + // TODO: check if event.Has will do + if event.Name == "" || isChmodOnly(event) || !w.fileMatches(event.Name) { + break + } + if event.Has(fsnotify.Create) && w.cache.has(event.Name) { + // vim "backupcopy=no" case, already collected after Rename event. + break + } + if event.Has(fsnotify.Rename) { + // It is common to modify files using vim. + // When writing to a file a backup is made. "backupcopy" option tells how it's done. + // Default is "no": rename the file and write a new one. + // This is cheap attempt to not send empty group for the old file. + time.Sleep(time.Millisecond * 100) + } + w.refresh(ctx, in) + case err := <-w.watcher.Errors: + if err != nil { + w.Warningf("watch: %v", err) + } + } + } +} + +func (w *Watcher) fileMatches(file string) bool { + for _, pattern := range w.paths { + if ok, _ := filepath.Match(pattern, file); ok { + return true + } + } + return false +} + +func (w *Watcher) listFiles() (files []string) { + for _, pattern := range w.paths { + if matches, err := filepath.Glob(pattern); err == nil { + files = append(files, matches...) + } + } + return files +} + +func (w *Watcher) refresh(ctx context.Context, in chan<- []*confgroup.Group) { + select { + case <-ctx.Done(): + return + default: + } + var groups []*confgroup.Group + seen := make(map[string]bool) + + for _, file := range w.listFiles() { + fi, err := os.Lstat(file) + if err != nil { + w.Warningf("lstat '%s': %v", file, err) + continue + } + + if !fi.Mode().IsRegular() { + continue + } + + seen[file] = true + if v, ok := w.cache.lookup(file); ok && v.Equal(fi.ModTime()) { + continue + } + w.cache.put(file, fi.ModTime()) + + if group, err := parse(w.reg, file); err != nil { + w.Warningf("parse '%s': %v", file, err) + } else if group == nil { + groups = append(groups, &confgroup.Group{Source: file}) + } else { + for _, cfg := range group.Configs { + cfg.SetProvider("file watcher") + cfg.SetSourceType(configSourceType(file)) + cfg.SetSource(fmt.Sprintf("discoverer=file_watcher,file=%s", file)) + } + groups = append(groups, group) + } + } + + for name := range w.cache { + if seen[name] { + continue + } + w.cache.remove(name) + groups = append(groups, &confgroup.Group{Source: name}) + } + + send(ctx, in, groups) + + w.watchDirs() +} + +func (w *Watcher) watchDirs() { + for _, path := range w.paths { + if idx := strings.LastIndex(path, "/"); idx > -1 { + path = path[:idx] + } else { + path = "./" + } + if err := w.watcher.Add(path); err != nil { + w.Errorf("start watching '%s': %v", path, err) + } + } +} + +func (w *Watcher) stop() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // closing the watcher deadlocks unless all events and errors are drained. + go func() { + for { + select { + case <-w.watcher.Errors: + case <-w.watcher.Events: + case <-ctx.Done(): + return + } + } + }() + + _ = w.watcher.Close() +} + +func isChmodOnly(event fsnotify.Event) bool { + return event.Op^fsnotify.Chmod == 0 +} + +func send(ctx context.Context, in chan<- []*confgroup.Group, groups []*confgroup.Group) { + if len(groups) == 0 { + return + } + select { + case <-ctx.Done(): + case in <- groups: + } +} |