From 87d772a7d708fec12f48cd8adc0dedff6e1025da Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 26 Aug 2024 10:15:20 +0200 Subject: Adding upstream version 1.47.0. Signed-off-by: Daniel Baumann --- .../go.d.plugin/agent/discovery/file/watch.go | 220 --------------------- 1 file changed, 220 deletions(-) delete mode 100644 src/go/collectors/go.d.plugin/agent/discovery/file/watch.go (limited to 'src/go/collectors/go.d.plugin/agent/discovery/file/watch.go') diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go b/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go deleted file mode 100644 index a723b706e..000000000 --- a/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go +++ /dev/null @@ -1,220 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -package file - -import ( - "context" - "fmt" - "os" - "path/filepath" - "strings" - "time" - - "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup" - "github.com/netdata/netdata/go/go.d.plugin/logger" - - "github.com/fsnotify/fsnotify" -) - -type ( - Watcher struct { - *logger.Logger - - paths []string - reg confgroup.Registry - watcher *fsnotify.Watcher - cache cache - refreshEvery time.Duration - } - cache map[string]time.Time -) - -func (c cache) lookup(path string) (time.Time, bool) { v, ok := c[path]; return v, ok } -func (c cache) has(path string) bool { _, ok := c.lookup(path); return ok } -func (c cache) remove(path string) { delete(c, path) } -func (c cache) put(path string, modTime time.Time) { c[path] = modTime } - -func NewWatcher(reg confgroup.Registry, paths []string) *Watcher { - d := &Watcher{ - Logger: log, - paths: paths, - reg: reg, - watcher: nil, - cache: make(cache), - refreshEvery: time.Minute, - } - return d -} - -func (w *Watcher) String() string { - return w.Name() -} - -func (w *Watcher) Name() string { - return "file watcher" -} - -func (w *Watcher) Run(ctx context.Context, in chan<- []*confgroup.Group) { - w.Info("instance is started") - defer func() { w.Info("instance is stopped") }() - - watcher, err := fsnotify.NewWatcher() - if err != nil { - w.Errorf("fsnotify watcher initialization: %v", err) - return - } - - w.watcher = watcher - defer w.stop() - w.refresh(ctx, in) - - tk := time.NewTicker(w.refreshEvery) - defer tk.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-tk.C: - w.refresh(ctx, in) - case event := <-w.watcher.Events: - // TODO: check if event.Has will do - if event.Name == "" || isChmodOnly(event) || !w.fileMatches(event.Name) { - break - } - if event.Has(fsnotify.Create) && w.cache.has(event.Name) { - // vim "backupcopy=no" case, already collected after Rename event. - break - } - if event.Has(fsnotify.Rename) { - // It is common to modify files using vim. - // When writing to a file a backup is made. "backupcopy" option tells how it's done. - // Default is "no": rename the file and write a new one. - // This is cheap attempt to not send empty group for the old file. - time.Sleep(time.Millisecond * 100) - } - w.refresh(ctx, in) - case err := <-w.watcher.Errors: - if err != nil { - w.Warningf("watch: %v", err) - } - } - } -} - -func (w *Watcher) fileMatches(file string) bool { - for _, pattern := range w.paths { - if ok, _ := filepath.Match(pattern, file); ok { - return true - } - } - return false -} - -func (w *Watcher) listFiles() (files []string) { - for _, pattern := range w.paths { - if matches, err := filepath.Glob(pattern); err == nil { - files = append(files, matches...) - } - } - return files -} - -func (w *Watcher) refresh(ctx context.Context, in chan<- []*confgroup.Group) { - select { - case <-ctx.Done(): - return - default: - } - var groups []*confgroup.Group - seen := make(map[string]bool) - - for _, file := range w.listFiles() { - fi, err := os.Lstat(file) - if err != nil { - w.Warningf("lstat '%s': %v", file, err) - continue - } - - if !fi.Mode().IsRegular() { - continue - } - - seen[file] = true - if v, ok := w.cache.lookup(file); ok && v.Equal(fi.ModTime()) { - continue - } - w.cache.put(file, fi.ModTime()) - - if group, err := parse(w.reg, file); err != nil { - w.Warningf("parse '%s': %v", file, err) - } else if group == nil { - groups = append(groups, &confgroup.Group{Source: file}) - } else { - for _, cfg := range group.Configs { - cfg.SetProvider("file watcher") - cfg.SetSourceType(configSourceType(file)) - cfg.SetSource(fmt.Sprintf("discoverer=file_watcher,file=%s", file)) - } - groups = append(groups, group) - } - } - - for name := range w.cache { - if seen[name] { - continue - } - w.cache.remove(name) - groups = append(groups, &confgroup.Group{Source: name}) - } - - send(ctx, in, groups) - - w.watchDirs() -} - -func (w *Watcher) watchDirs() { - for _, path := range w.paths { - if idx := strings.LastIndex(path, "/"); idx > -1 { - path = path[:idx] - } else { - path = "./" - } - if err := w.watcher.Add(path); err != nil { - w.Errorf("start watching '%s': %v", path, err) - } - } -} - -func (w *Watcher) stop() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // closing the watcher deadlocks unless all events and errors are drained. - go func() { - for { - select { - case <-w.watcher.Errors: - case <-w.watcher.Events: - case <-ctx.Done(): - return - } - } - }() - - _ = w.watcher.Close() -} - -func isChmodOnly(event fsnotify.Event) bool { - return event.Op^fsnotify.Chmod == 0 -} - -func send(ctx context.Context, in chan<- []*confgroup.Group, groups []*confgroup.Group) { - if len(groups) == 0 { - return - } - select { - case <-ctx.Done(): - case in <- groups: - } -} -- cgit v1.2.3