diff options
Diffstat (limited to 'collectors/systemd-journal.plugin/systemd-journal-watcher.c')
-rw-r--r-- | collectors/systemd-journal.plugin/systemd-journal-watcher.c | 379 |
1 files changed, 379 insertions, 0 deletions
diff --git a/collectors/systemd-journal.plugin/systemd-journal-watcher.c b/collectors/systemd-journal.plugin/systemd-journal-watcher.c new file mode 100644 index 00000000..ed41f624 --- /dev/null +++ b/collectors/systemd-journal.plugin/systemd-journal-watcher.c @@ -0,0 +1,379 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "systemd-internals.h" +#include <sys/inotify.h> + +#define EVENT_SIZE (sizeof(struct inotify_event)) +#define INITIAL_WATCHES 256 + +#define WATCH_FOR (IN_CREATE | IN_MODIFY | IN_DELETE | IN_DELETE_SELF | IN_MOVED_FROM | IN_MOVED_TO | IN_UNMOUNT) + +typedef struct watch_entry { + int slot; + + int wd; // Watch descriptor + char *path; // Dynamically allocated path + + struct watch_entry *next; // for the free list +} WatchEntry; + +typedef struct { + WatchEntry *watchList; + WatchEntry *freeList; + int watchCount; + int watchListSize; + + size_t errors; + + DICTIONARY *pending; +} Watcher; + +static WatchEntry *get_slot(Watcher *watcher) { + WatchEntry *t; + + if (watcher->freeList != NULL) { + t = watcher->freeList; + watcher->freeList = t->next; + t->next = NULL; + return t; + } + + if (watcher->watchCount == watcher->watchListSize) { + watcher->watchListSize *= 2; + watcher->watchList = reallocz(watcher->watchList, watcher->watchListSize * sizeof(WatchEntry)); + } + + watcher->watchList[watcher->watchCount] = (WatchEntry){ + .slot = watcher->watchCount, + .wd = -1, + .path = NULL, + .next = NULL, + }; + t = &watcher->watchList[watcher->watchCount]; + watcher->watchCount++; + + return t; +} + +static void free_slot(Watcher *watcher, WatchEntry *t) { + t->wd = -1; + freez(t->path); + t->path = NULL; + + // link it to the free list + t->next = watcher->freeList; + watcher->freeList = t; +} + +static int add_watch(Watcher *watcher, int inotifyFd, const char *path) { + WatchEntry *t = get_slot(watcher); + + t->wd = inotify_add_watch(inotifyFd, path, WATCH_FOR); + if (t->wd == -1) { + nd_log(NDLS_COLLECTORS, NDLP_ERR, + "JOURNAL WATCHER: cannot watch directory: '%s'", + path); + + free_slot(watcher, t); + + struct stat info; + if(stat(path, &info) == 0 && S_ISDIR(info.st_mode)) { + // the directory exists, but we failed to add the watch + // increase errors + watcher->errors++; + } + } + else { + t->path = strdupz(path); + + nd_log(NDLS_COLLECTORS, NDLP_DEBUG, + "JOURNAL WATCHER: watching directory: '%s'", + path); + + } + return t->wd; +} + +static void remove_watch(Watcher *watcher, int inotifyFd, int wd) { + int i; + for (i = 0; i < watcher->watchCount; ++i) { + if (watcher->watchList[i].wd == wd) { + + nd_log(NDLS_COLLECTORS, NDLP_DEBUG, + "JOURNAL WATCHER: removing watch from directory: '%s'", + watcher->watchList[i].path); + + inotify_rm_watch(inotifyFd, watcher->watchList[i].wd); + free_slot(watcher, &watcher->watchList[i]); + return; + } + } + + nd_log(NDLS_COLLECTORS, NDLP_WARNING, + "JOURNAL WATCHER: cannot find directory watch %d to remove.", + wd); +} + +static void free_watches(Watcher *watcher, int inotifyFd) { + for (int i = 0; i < watcher->watchCount; ++i) { + if (watcher->watchList[i].wd != -1) { + inotify_rm_watch(inotifyFd, watcher->watchList[i].wd); + free_slot(watcher, &watcher->watchList[i]); + } + } + freez(watcher->watchList); + watcher->watchList = NULL; + + dictionary_destroy(watcher->pending); + watcher->pending = NULL; +} + +static char* get_path_from_wd(Watcher *watcher, int wd) { + for (int i = 0; i < watcher->watchCount; ++i) { + if (watcher->watchList[i].wd == wd) + return watcher->watchList[i].path; + } + return NULL; +} + +static bool is_directory_watched(Watcher *watcher, const char *path) { + for (int i = 0; i < watcher->watchCount; ++i) { + if (watcher->watchList[i].wd != -1 && strcmp(watcher->watchList[i].path, path) == 0) { + return true; + } + } + return false; +} + +static void watch_directory_and_subdirectories(Watcher *watcher, int inotifyFd, const char *basePath) { + DICTIONARY *dirs = dictionary_create(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE); + + journal_directory_scan_recursively(NULL, dirs, basePath, 0); + + void *x; + dfe_start_read(dirs, x) { + const char *dirname = x_dfe.name; + // Check if this directory is already being watched + if (!is_directory_watched(watcher, dirname)) { + add_watch(watcher, inotifyFd, dirname); + } + } + dfe_done(x); + + dictionary_destroy(dirs); +} + +static bool is_subpath(const char *path, const char *subpath) { + // Use strncmp to compare the paths + if (strncmp(path, subpath, strlen(path)) == 0) { + // Ensure that the next character is a '/' or '\0' + char next_char = subpath[strlen(path)]; + return next_char == '/' || next_char == '\0'; + } + + return false; +} + +void remove_directory_watch(Watcher *watcher, int inotifyFd, const char *dirPath) { + for (int i = 0; i < watcher->watchCount; ++i) { + WatchEntry *t = &watcher->watchList[i]; + if (t->wd != -1 && is_subpath(t->path, dirPath)) { + inotify_rm_watch(inotifyFd, t->wd); + free_slot(watcher, t); + } + } + + struct journal_file *jf; + dfe_start_write(journal_files_registry, jf) { + if(is_subpath(jf->filename, dirPath)) + dictionary_del(journal_files_registry, jf->filename); + } + dfe_done(jf); + + dictionary_garbage_collect(journal_files_registry); +} + +void process_event(Watcher *watcher, int inotifyFd, struct inotify_event *event) { + if(!event->len) { + nd_log(NDLS_COLLECTORS, NDLP_NOTICE + , "JOURNAL WATCHER: received event with mask %u and len %u (this is zero) for path: '%s' - ignoring it." + , event->mask, event->len, event->name); + return; + } + + char *dirPath = get_path_from_wd(watcher, event->wd); + if(!dirPath) { + nd_log(NDLS_COLLECTORS, NDLP_NOTICE, + "JOURNAL WATCHER: received event with mask %u and len %u for path: '%s' - " + "but we can't find its watch descriptor - ignoring it." + , event->mask, event->len, event->name); + return; + } + + if(event->mask & IN_DELETE_SELF) { + remove_watch(watcher, inotifyFd, event->wd); + return; + } + + static __thread char fullPath[PATH_MAX]; + snprintfz(fullPath, sizeof(fullPath), "%s/%s", dirPath, event->name); + // fullPath contains the full path to the file + + size_t len = strlen(event->name); + + if(event->mask & IN_ISDIR) { + if (event->mask & (IN_DELETE | IN_MOVED_FROM)) { + // A directory is deleted or moved out + nd_log(NDLS_COLLECTORS, NDLP_DEBUG, + "JOURNAL WATCHER: Directory deleted or moved out: '%s'", + fullPath); + + // Remove the watch - implement this function based on how you manage your watches + remove_directory_watch(watcher, inotifyFd, fullPath); + } + else if (event->mask & (IN_CREATE | IN_MOVED_TO)) { + // A new directory is created or moved in + nd_log(NDLS_COLLECTORS, NDLP_DEBUG, + "JOURNAL WATCHER: New directory created or moved in: '%s'", + fullPath); + + // Start watching the new directory - recursive watch + watch_directory_and_subdirectories(watcher, inotifyFd, fullPath); + } + else + nd_log(NDLS_COLLECTORS, NDLP_WARNING, + "JOURNAL WATCHER: Received unhandled event with mask %u for directory '%s'", + event->mask, fullPath); + } + else if(len > sizeof(".journal") - 1 && strcmp(&event->name[len - (sizeof(".journal") - 1)], ".journal") == 0) { + // It is a file that ends in .journal + // add it to our pending list + dictionary_set(watcher->pending, fullPath, NULL, 0); + } + else + nd_log(NDLS_COLLECTORS, NDLP_DEBUG, + "JOURNAL WATCHER: ignoring event with mask %u for file '%s'", + event->mask, fullPath); +} + +static void process_pending(Watcher *watcher) { + void *x; + dfe_start_write(watcher->pending, x) { + struct stat info; + const char *fullPath = x_dfe.name; + + if(stat(fullPath, &info) != 0) { + nd_log(NDLS_COLLECTORS, NDLP_DEBUG, + "JOURNAL WATCHER: file '%s' no longer exists, removing it from the registry", + fullPath); + + dictionary_del(journal_files_registry, fullPath); + } + else if(S_ISREG(info.st_mode)) { + nd_log(NDLS_COLLECTORS, NDLP_DEBUG, + "JOURNAL WATCHER: file '%s' has been added/updated, updating the registry", + fullPath); + + struct journal_file t = { + .file_last_modified_ut = info.st_mtim.tv_sec * USEC_PER_SEC + + info.st_mtim.tv_nsec / NSEC_PER_USEC, + .last_scan_monotonic_ut = now_monotonic_usec(), + .size = info.st_size, + .max_journal_vs_realtime_delta_ut = JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT, + }; + struct journal_file *jf = dictionary_set(journal_files_registry, fullPath, &t, sizeof(t)); + journal_file_update_header(jf->filename, jf); + } + + dictionary_del(watcher->pending, fullPath); + } + dfe_done(x); + + dictionary_garbage_collect(watcher->pending); +} + +void *journal_watcher_main(void *arg __maybe_unused) { + while(1) { + Watcher watcher = { + .watchList = mallocz(INITIAL_WATCHES * sizeof(WatchEntry)), + .freeList = NULL, + .watchCount = 0, + .watchListSize = INITIAL_WATCHES, + .pending = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE|DICT_OPTION_SINGLE_THREADED), + .errors = 0, + }; + + int inotifyFd = inotify_init(); + if (inotifyFd < 0) { + nd_log(NDLS_COLLECTORS, NDLP_ERR, "inotify_init() failed."); + free_watches(&watcher, inotifyFd); + return NULL; + } + + for (unsigned i = 0; i < MAX_JOURNAL_DIRECTORIES; i++) { + if (!journal_directories[i].path) break; + watch_directory_and_subdirectories(&watcher, inotifyFd, journal_directories[i].path); + } + + usec_t last_headers_update_ut = now_monotonic_usec(); + struct buffered_reader reader; + while (1) { + buffered_reader_ret_t rc = buffered_reader_read_timeout( + &reader, inotifyFd, SYSTEMD_JOURNAL_EXECUTE_WATCHER_PENDING_EVERY_MS, false); + + if (rc != BUFFERED_READER_READ_OK && rc != BUFFERED_READER_READ_POLL_TIMEOUT) { + nd_log(NDLS_COLLECTORS, NDLP_CRIT, + "JOURNAL WATCHER: cannot read inotify events, buffered_reader_read_timeout() returned %d - " + "restarting the watcher.", + rc); + break; + } + + if(rc == BUFFERED_READER_READ_OK) { + bool unmount_event = false; + + ssize_t i = 0; + while (i < reader.read_len) { + struct inotify_event *event = (struct inotify_event *) &reader.read_buffer[i]; + + if(event->mask & IN_UNMOUNT) { + unmount_event = true; + break; + } + + process_event(&watcher, inotifyFd, event); + i += (ssize_t)EVENT_SIZE + event->len; + } + + reader.read_buffer[0] = '\0'; + reader.read_len = 0; + reader.pos = 0; + + if(unmount_event) + break; + } + + usec_t ut = now_monotonic_usec(); + if (dictionary_entries(watcher.pending) && (rc == BUFFERED_READER_READ_POLL_TIMEOUT || + last_headers_update_ut + (SYSTEMD_JOURNAL_EXECUTE_WATCHER_PENDING_EVERY_MS * USEC_PER_MS) <= ut)) { + process_pending(&watcher); + last_headers_update_ut = ut; + } + + if(watcher.errors) { + nd_log(NDLS_COLLECTORS, NDLP_NOTICE, + "JOURNAL WATCHER: there were errors in setting up inotify watches - restarting the watcher."); + } + } + + close(inotifyFd); + free_watches(&watcher, inotifyFd); + + // this will scan the directories and cleanup the registry + journal_files_registry_update(); + + sleep_usec(5 * USEC_PER_SEC); + } + + return NULL; +} |