summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/agent/discovery/file/watch.go220
1 files changed, 220 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go b/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go
new file mode 100644
index 000000000..a723b706e
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/agent/discovery/file/watch.go
@@ -0,0 +1,220 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package file
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+
+ "github.com/fsnotify/fsnotify"
+)
+
+type (
+ Watcher struct {
+ *logger.Logger
+
+ paths []string
+ reg confgroup.Registry
+ watcher *fsnotify.Watcher
+ cache cache
+ refreshEvery time.Duration
+ }
+ cache map[string]time.Time
+)
+
+func (c cache) lookup(path string) (time.Time, bool) { v, ok := c[path]; return v, ok }
+func (c cache) has(path string) bool { _, ok := c.lookup(path); return ok }
+func (c cache) remove(path string) { delete(c, path) }
+func (c cache) put(path string, modTime time.Time) { c[path] = modTime }
+
+func NewWatcher(reg confgroup.Registry, paths []string) *Watcher {
+ d := &Watcher{
+ Logger: log,
+ paths: paths,
+ reg: reg,
+ watcher: nil,
+ cache: make(cache),
+ refreshEvery: time.Minute,
+ }
+ return d
+}
+
+func (w *Watcher) String() string {
+ return w.Name()
+}
+
+func (w *Watcher) Name() string {
+ return "file watcher"
+}
+
+func (w *Watcher) Run(ctx context.Context, in chan<- []*confgroup.Group) {
+ w.Info("instance is started")
+ defer func() { w.Info("instance is stopped") }()
+
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ w.Errorf("fsnotify watcher initialization: %v", err)
+ return
+ }
+
+ w.watcher = watcher
+ defer w.stop()
+ w.refresh(ctx, in)
+
+ tk := time.NewTicker(w.refreshEvery)
+ defer tk.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tk.C:
+ w.refresh(ctx, in)
+ case event := <-w.watcher.Events:
+ // TODO: check if event.Has will do
+ if event.Name == "" || isChmodOnly(event) || !w.fileMatches(event.Name) {
+ break
+ }
+ if event.Has(fsnotify.Create) && w.cache.has(event.Name) {
+ // vim "backupcopy=no" case, already collected after Rename event.
+ break
+ }
+ if event.Has(fsnotify.Rename) {
+ // It is common to modify files using vim.
+ // When writing to a file a backup is made. "backupcopy" option tells how it's done.
+ // Default is "no": rename the file and write a new one.
+ // This is cheap attempt to not send empty group for the old file.
+ time.Sleep(time.Millisecond * 100)
+ }
+ w.refresh(ctx, in)
+ case err := <-w.watcher.Errors:
+ if err != nil {
+ w.Warningf("watch: %v", err)
+ }
+ }
+ }
+}
+
+func (w *Watcher) fileMatches(file string) bool {
+ for _, pattern := range w.paths {
+ if ok, _ := filepath.Match(pattern, file); ok {
+ return true
+ }
+ }
+ return false
+}
+
+func (w *Watcher) listFiles() (files []string) {
+ for _, pattern := range w.paths {
+ if matches, err := filepath.Glob(pattern); err == nil {
+ files = append(files, matches...)
+ }
+ }
+ return files
+}
+
+func (w *Watcher) refresh(ctx context.Context, in chan<- []*confgroup.Group) {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ var groups []*confgroup.Group
+ seen := make(map[string]bool)
+
+ for _, file := range w.listFiles() {
+ fi, err := os.Lstat(file)
+ if err != nil {
+ w.Warningf("lstat '%s': %v", file, err)
+ continue
+ }
+
+ if !fi.Mode().IsRegular() {
+ continue
+ }
+
+ seen[file] = true
+ if v, ok := w.cache.lookup(file); ok && v.Equal(fi.ModTime()) {
+ continue
+ }
+ w.cache.put(file, fi.ModTime())
+
+ if group, err := parse(w.reg, file); err != nil {
+ w.Warningf("parse '%s': %v", file, err)
+ } else if group == nil {
+ groups = append(groups, &confgroup.Group{Source: file})
+ } else {
+ for _, cfg := range group.Configs {
+ cfg.SetProvider("file watcher")
+ cfg.SetSourceType(configSourceType(file))
+ cfg.SetSource(fmt.Sprintf("discoverer=file_watcher,file=%s", file))
+ }
+ groups = append(groups, group)
+ }
+ }
+
+ for name := range w.cache {
+ if seen[name] {
+ continue
+ }
+ w.cache.remove(name)
+ groups = append(groups, &confgroup.Group{Source: name})
+ }
+
+ send(ctx, in, groups)
+
+ w.watchDirs()
+}
+
+func (w *Watcher) watchDirs() {
+ for _, path := range w.paths {
+ if idx := strings.LastIndex(path, "/"); idx > -1 {
+ path = path[:idx]
+ } else {
+ path = "./"
+ }
+ if err := w.watcher.Add(path); err != nil {
+ w.Errorf("start watching '%s': %v", path, err)
+ }
+ }
+}
+
+func (w *Watcher) stop() {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // closing the watcher deadlocks unless all events and errors are drained.
+ go func() {
+ for {
+ select {
+ case <-w.watcher.Errors:
+ case <-w.watcher.Events:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ _ = w.watcher.Close()
+}
+
+func isChmodOnly(event fsnotify.Event) bool {
+ return event.Op^fsnotify.Chmod == 0
+}
+
+func send(ctx context.Context, in chan<- []*confgroup.Group, groups []*confgroup.Group) {
+ if len(groups) == 0 {
+ return
+ }
+ select {
+ case <-ctx.Done():
+ case in <- groups:
+ }
+}