diff options
Diffstat (limited to 'src/collectors/log2journal')
-rw-r--r-- | src/collectors/log2journal/README.md | 37 | ||||
-rw-r--r-- | src/collectors/log2journal/log2journal-hashed-key.h | 80 | ||||
-rw-r--r-- | src/collectors/log2journal/log2journal-help.c | 2 | ||||
-rw-r--r-- | src/collectors/log2journal/log2journal-inject.c | 11 | ||||
-rw-r--r-- | src/collectors/log2journal/log2journal-params.c | 58 | ||||
-rw-r--r-- | src/collectors/log2journal/log2journal-pattern.c | 4 | ||||
-rw-r--r-- | src/collectors/log2journal/log2journal-pcre2.c | 9 | ||||
-rw-r--r-- | src/collectors/log2journal/log2journal-rename.c | 6 | ||||
-rw-r--r-- | src/collectors/log2journal/log2journal-replace.c | 12 | ||||
-rw-r--r-- | src/collectors/log2journal/log2journal-rewrite.c | 7 | ||||
-rw-r--r-- | src/collectors/log2journal/log2journal-txt.h | 90 | ||||
-rw-r--r-- | src/collectors/log2journal/log2journal-yaml.c | 301 | ||||
-rw-r--r-- | src/collectors/log2journal/log2journal.c | 64 | ||||
-rw-r--r-- | src/collectors/log2journal/log2journal.h | 251 |
14 files changed, 477 insertions, 455 deletions
diff --git a/src/collectors/log2journal/README.md b/src/collectors/log2journal/README.md index 9807b33ee..d9764d5d5 100644 --- a/src/collectors/log2journal/README.md +++ b/src/collectors/log2journal/README.md @@ -1,4 +1,3 @@ - # log2journal `log2journal` and `systemd-cat-native` can be used to convert a structured log file, such as the ones generated by web servers, into `systemd-journal` entries. @@ -11,7 +10,6 @@ The result is like this: nginx logs into systemd-journal: ![image](https://github.com/netdata/netdata/assets/2662304/16b471ff-c5a1-4fcc-bcd5-83551e089f6c) - The overall process looks like this: ```bash @@ -23,7 +21,8 @@ tail -F /var/log/nginx/*.log |\ # outputs log lines These are the steps: 1. `tail -F /var/log/nginx/*.log`<br/>this command will tail all `*.log` files in `/var/log/nginx/`. We use `-F` instead of `-f` to ensure that files will still be tailed after log rotation. -2. `log2joural` is a Netdata program. It reads log entries and extracts fields, according to the PCRE2 pattern it accepts. It can also apply some basic operations on the fields, like injecting new fields or duplicating existing ones or rewriting their values. The output of `log2journal` is in Systemd Journal Export Format, and it looks like this: +2. `log2journal` is a Netdata program. It reads log entries and extracts fields, according to the PCRE2 pattern it accepts. It can also apply some basic operations on the fields, like injecting new fields or duplicating existing ones or rewriting their values. The output of `log2journal` is in Systemd Journal Export Format, and it looks like this: + ```bash KEY1=VALUE1 # << start of the first log line KEY2=VALUE2 @@ -31,8 +30,8 @@ These are the steps: KEY1=VALUE1 # << start of the second log line KEY2=VALUE2 ``` -3. `systemd-cat-native` is a Netdata program. I can send the logs to a local `systemd-journald` (journal namespaces supported), or to a remote `systemd-journal-remote`. +3. `systemd-cat-native` is a Netdata program. I can send the logs to a local `systemd-journald` (journal namespaces supported), or to a remote `systemd-journal-remote`. ## Processing pipeline @@ -44,19 +43,19 @@ The sequence of processing in Netdata's `log2journal` is designed to methodicall 2. **Extract Fields and Values**<br/> Based on the input format (JSON, logfmt, or custom pattern), it extracts fields and their values from each log line. In the case of JSON and logfmt, it automatically extracts all fields. For custom patterns, it uses PCRE2 regular expressions, and fields are extracted based on sub-expressions defined in the pattern. -3. **Transliteration**<br/> +3. **Transliteration**<br/> Extracted fields are transliterated to the limited character set accepted by systemd-journal: capitals A-Z, digits 0-9, underscores. 4. **Apply Optional Prefix**<br/> If a prefix is specified, it is added to all keys. This happens before any other processing so that all subsequent matches and manipulations take the prefix into account. -5. **Rename Fields**<br/> +5. **Rename Fields**<br/> Renames fields as specified in the configuration. This is used to change the names of the fields to match desired or required naming conventions. 6. **Inject New Fields**<br/> New fields are injected into the log data. This can include constants or values derived from other fields, using variable substitution. -7. **Rewrite Field Values**<br/> +7. **Rewrite Field Values**<br/> Applies rewriting rules to alter the values of the fields. This can involve complex transformations, including regular expressions and variable substitutions. The rewrite rules can also inject new fields into the data. 8. **Filter Fields**<br/> @@ -81,7 +80,7 @@ We have an nginx server logging in this standard combined log format: First, let's find the right pattern for `log2journal`. We ask ChatGPT: -``` +```text My nginx log uses this log format: log_format access '$remote_addr - $remote_user [$time_local] ' @@ -122,11 +121,11 @@ ChatGPT replies with this: Let's see what the above says: 1. `(?x)`: enable PCRE2 extended mode. In this mode spaces and newlines in the pattern are ignored. To match a space you have to use `\s`. This mode allows us to split the pattern is multiple lines and add comments to it. -1. `^`: match the beginning of the line -2. `(?<remote_addr[^ ]+)`: match anything up to the first space (`[^ ]+`), and name it `remote_addr`. -3. `\s`: match a space -4. `-`: match a hyphen -5. and so on... +2. `^`: match the beginning of the line +3. `(?<remote_addr[^ ]+)`: match anything up to the first space (`[^ ]+`), and name it `remote_addr`. +4. `\s`: match a space +5. `-`: match a hyphen +6. and so on... We edit `nginx.yaml` and add it, like this: @@ -427,7 +426,6 @@ Rewrite rules are powerful. You can have named groups in them, like in the main Now the message is ready to be sent to a systemd-journal. For this we use `systemd-cat-native`. This command can send such messages to a journal running on the localhost, a local journal namespace, or a `systemd-journal-remote` running on another server. By just appending `| systemd-cat-native` to the command, the message will be sent to the local journal. - ```bash # echo '1.2.3.4 - - [19/Nov/2023:00:24:43 +0000] "GET /index.html HTTP/1.1" 200 4172 "-" "Go-http-client/1.1"' | log2journal -f nginx.yaml | systemd-cat-native # no output @@ -486,7 +484,7 @@ tail -F /var/log/nginx/access.log |\ Create the file `/etc/systemd/system/nginx-logs.service` (change `/path/to/nginx.yaml` to the right path): -``` +```text [Unit] Description=NGINX Log to Systemd Journal After=network.target @@ -524,7 +522,6 @@ Netdata will automatically pick the new namespace and present it at the list of You can also instruct `systemd-cat-native` to log to a remote system, sending the logs to a `systemd-journal-remote` instance running on another server. Check [the manual of systemd-cat-native](/src/libnetdata/log/systemd-cat-native.md). - ## Performance `log2journal` and `systemd-cat-native` have been designed to process hundreds of thousands of log lines per second. They both utilize high performance indexing hashtables to speed up lookups, and queues that dynamically adapt to the number of log lines offered, offering a smooth and fast experience under all conditions. @@ -537,15 +534,15 @@ The key characteristic that can influence the performance of a logs processing p Especially the pattern `.*` seems to have the biggest impact on CPU consumption, especially when multiple `.*` are on the same pattern. -Usually we use `.*` to indicate that we need to match everything up to a character, e.g. `.* ` to match up to a space. By replacing it with `[^ ]+` (meaning: match at least a character up to a space), the regular expression engine can be a lot more efficient, reducing the overall CPU utilization significantly. +Usually we use `.*` to indicate that we need to match everything up to a character, e.g. `.*` to match up to a space. By replacing it with `[^ ]+` (meaning: match at least a character up to a space), the regular expression engine can be a lot more efficient, reducing the overall CPU utilization significantly. ### Performance of systemd journals The ingestion pipeline of logs, from `tail` to `systemd-journald` or `systemd-journal-remote` is very efficient in all aspects. CPU utilization is better than any other system we tested and RAM usage is independent of the number of fields indexed, making systemd-journal one of the most efficient log management engines for ingesting high volumes of structured logs. -High fields cardinality does not have a noticable impact on systemd-journal. The amount of fields indexed and the amount of unique values per field, have a linear and predictable result in the resource utilization of `systemd-journald` and `systemd-journal-remote`. This is unlike other logs management solutions, like Loki, that their RAM requirements grow exponentially as the cardinality increases, making it impractical for them to index the amount of information systemd journals can index. +High fields cardinality does not have a noticeable impact on systemd-journal. The amount of fields indexed and the amount of unique values per field, have a linear and predictable result in the resource utilization of `systemd-journald` and `systemd-journal-remote`. This is unlike other logs management solutions, like Loki, that their RAM requirements grow exponentially as the cardinality increases, making it impractical for them to index the amount of information systemd journals can index. -However, the number of fields added to journals influences the overall disk footprint. Less fields means more log entries per journal file, smaller overall disk footprint and faster queries. +However, the number of fields added to journals influences the overall disk footprint. Less fields means more log entries per journal file, smaller overall disk footprint and faster queries. systemd-journal files are primarily designed for security and reliability. This comes at the cost of disk footprint. The internal structure of journal files is such that in case of corruption, minimum data loss will incur. To achieve such a unique characteristic, certain data within the files need to be aligned at predefined boundaries, so that in case there is a corruption, non-corrupted parts of the journal file can be recovered. @@ -578,7 +575,7 @@ If on other hand your organization prefers to maintain the full logs and control ## `log2journal` options -``` +```text Netdata log2journal v1.43.0-341-gdac4df856 diff --git a/src/collectors/log2journal/log2journal-hashed-key.h b/src/collectors/log2journal/log2journal-hashed-key.h new file mode 100644 index 000000000..0618d9538 --- /dev/null +++ b/src/collectors/log2journal/log2journal-hashed-key.h @@ -0,0 +1,80 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_LOG2JOURNAL_HASHED_KEY_H +#define NETDATA_LOG2JOURNAL_HASHED_KEY_H + +#include "log2journal.h" + +typedef enum __attribute__((__packed__)) { + HK_NONE = 0, + + // permanent flags - they are set once to optimize various decisions and lookups + + HK_HASHTABLE_ALLOCATED = (1 << 0), // this is the key object allocated in the hashtable + // objects that do not have this, have a pointer to a key in the hashtable + // objects that have this, value is allocated + + HK_FILTERED = (1 << 1), // we checked once if this key in filtered + HK_FILTERED_INCLUDED = (1 << 2), // the result of the filtering was to include it in the output + + HK_COLLISION_CHECKED = (1 << 3), // we checked once for collision check of this key + + HK_RENAMES_CHECKED = (1 << 4), // we checked once if there are renames on this key + HK_HAS_RENAMES = (1 << 5), // and we found there is a rename rule related to it + + // ephemeral flags - they are unset at the end of each log line + + HK_VALUE_FROM_LOG = (1 << 14), // the value of this key has been read from the log (or from injection, duplication) + HK_VALUE_REWRITTEN = (1 << 15), // the value of this key has been rewritten due to one of our rewrite rules + +} HASHED_KEY_FLAGS; + +typedef struct hashed_key { + const char *key; + uint32_t len; + HASHED_KEY_FLAGS flags; + XXH64_hash_t hash; + union { + struct hashed_key *hashtable_ptr; // HK_HASHTABLE_ALLOCATED is not set + TXT_L2J value; // HK_HASHTABLE_ALLOCATED is set + }; +} HASHED_KEY; + +static inline void hashed_key_cleanup(HASHED_KEY *k) { + if(k->flags & HK_HASHTABLE_ALLOCATED) + txt_l2j_cleanup(&k->value); + else + k->hashtable_ptr = NULL; + + freez((void *)k->key); + k->key = NULL; + k->len = 0; + k->hash = 0; + k->flags = HK_NONE; +} + +static inline void hashed_key_set(HASHED_KEY *k, const char *name, int32_t len) { + hashed_key_cleanup(k); + + if(len == -1) { + k->key = strdupz(name); + k->len = strlen(k->key); + } + else { + k->key = strndupz(name, len); + k->len = len; + } + + k->hash = XXH3_64bits(k->key, k->len); + k->flags = HK_NONE; +} + +static inline bool hashed_keys_match(HASHED_KEY *k1, HASHED_KEY *k2) { + return ((k1 == k2) || (k1->hash == k2->hash && strcmp(k1->key, k2->key) == 0)); +} + +static inline int compare_keys(struct hashed_key *k1, struct hashed_key *k2) { + return strcmp(k1->key, k2->key); +} + +#endif //NETDATA_LOG2JOURNAL_HASHED_KEY_H diff --git a/src/collectors/log2journal/log2journal-help.c b/src/collectors/log2journal/log2journal-help.c index 23ff4c056..0cb35bb0f 100644 --- a/src/collectors/log2journal/log2journal-help.c +++ b/src/collectors/log2journal/log2journal-help.c @@ -10,7 +10,7 @@ static void config_dir_print_available(void) { dir = opendir(path); if (dir == NULL) { - log2stderr(" >>> Cannot open directory:\n %s", path); + l2j_log(" >>> Cannot open directory:\n %s", path); return; } diff --git a/src/collectors/log2journal/log2journal-inject.c b/src/collectors/log2journal/log2journal-inject.c index 45158066b..f1a70ac8b 100644 --- a/src/collectors/log2journal/log2journal-inject.c +++ b/src/collectors/log2journal/log2journal-inject.c @@ -9,12 +9,13 @@ void injection_cleanup(INJECTION *inj) { static inline bool log_job_injection_replace(INJECTION *inj, const char *key, size_t key_len, const char *value, size_t value_len) { if(key_len > JOURNAL_MAX_KEY_LEN) - log2stderr("WARNING: injection key '%.*s' is too long for journal. Will be truncated.", (int)key_len, key); + l2j_log("WARNING: injection key '%.*s' is too long for journal. Will be truncated.", (int)key_len, key); if(value_len > JOURNAL_MAX_VALUE_LEN) - log2stderr("WARNING: injection value of key '%.*s' is too long for journal. Will be truncated.", (int)key_len, key); + l2j_log( + "WARNING: injection value of key '%.*s' is too long for journal. Will be truncated.", (int)key_len, key); - hashed_key_len_set(&inj->key, key, key_len); + hashed_key_set(&inj->key, key, key_len); char *v = strndupz(value, value_len); bool ret = replace_pattern_set(&inj->value, v); freez(v); @@ -25,13 +26,13 @@ static inline bool log_job_injection_replace(INJECTION *inj, const char *key, si bool log_job_injection_add(LOG_JOB *jb, const char *key, size_t key_len, const char *value, size_t value_len, bool unmatched) { if (unmatched) { if (jb->unmatched.injections.used >= MAX_INJECTIONS) { - log2stderr("Error: too many unmatched injections. You can inject up to %d lines.", MAX_INJECTIONS); + l2j_log("Error: too many unmatched injections. You can inject up to %d lines.", MAX_INJECTIONS); return false; } } else { if (jb->injections.used >= MAX_INJECTIONS) { - log2stderr("Error: too many injections. You can inject up to %d lines.", MAX_INJECTIONS); + l2j_log("Error: too many injections. You can inject up to %d lines.", MAX_INJECTIONS); return false; } } diff --git a/src/collectors/log2journal/log2journal-params.c b/src/collectors/log2journal/log2journal-params.c index a7bb3e263..a56d1711e 100644 --- a/src/collectors/log2journal/log2journal-params.c +++ b/src/collectors/log2journal/log2journal-params.c @@ -7,7 +7,7 @@ void log_job_init(LOG_JOB *jb) { memset(jb, 0, sizeof(*jb)); simple_hashtable_init_KEY(&jb->hashtable, 32); - hashed_key_set(&jb->line.key, "LINE"); + hashed_key_set(&jb->line.key, "LINE", -1); } static void simple_hashtable_cleanup_allocated_keys(SIMPLE_HASHTABLE_KEY *ht) { @@ -47,8 +47,14 @@ void log_job_cleanup(LOG_JOB *jb) { for(size_t i = 0; i < jb->rewrites.used; i++) rewrite_cleanup(&jb->rewrites.array[i]); - txt_cleanup(&jb->rewrites.tmp); - txt_cleanup(&jb->filename.current); + search_pattern_cleanup(&jb->filter.include); + search_pattern_cleanup(&jb->filter.exclude); + + hashed_key_cleanup(&jb->filename.key); + hashed_key_cleanup(&jb->unmatched.key); + + txt_l2j_cleanup(&jb->rewrites.tmp); + txt_l2j_cleanup(&jb->filename.current); simple_hashtable_cleanup_allocated_keys(&jb->hashtable); simple_hashtable_destroy_KEY(&jb->hashtable); @@ -61,18 +67,18 @@ void log_job_cleanup(LOG_JOB *jb) { bool log_job_filename_key_set(LOG_JOB *jb, const char *key, size_t key_len) { if(!key || !*key) { - log2stderr("filename key cannot be empty."); + l2j_log("filename key cannot be empty."); return false; } - hashed_key_len_set(&jb->filename.key, key, key_len); + hashed_key_set(&jb->filename.key, key, key_len); return true; } bool log_job_key_prefix_set(LOG_JOB *jb, const char *prefix, size_t prefix_len) { if(!prefix || !*prefix) { - log2stderr("filename key cannot be empty."); + l2j_log("filename key cannot be empty."); return false; } @@ -86,7 +92,7 @@ bool log_job_key_prefix_set(LOG_JOB *jb, const char *prefix, size_t prefix_len) bool log_job_pattern_set(LOG_JOB *jb, const char *pattern, size_t pattern_len) { if(!pattern || !*pattern) { - log2stderr("filename key cannot be empty."); + l2j_log("filename key cannot be empty."); return false; } @@ -100,12 +106,12 @@ bool log_job_pattern_set(LOG_JOB *jb, const char *pattern, size_t pattern_len) { bool log_job_include_pattern_set(LOG_JOB *jb, const char *pattern, size_t pattern_len) { if(jb->filter.include.re) { - log2stderr("FILTER INCLUDE: there is already an include filter set"); + l2j_log("FILTER INCLUDE: there is already an include filter set"); return false; } if(!search_pattern_set(&jb->filter.include, pattern, pattern_len)) { - log2stderr("FILTER INCLUDE: failed: %s", jb->filter.include.error.txt); + l2j_log("FILTER INCLUDE: failed: %s", jb->filter.include.error.txt); return false; } @@ -114,12 +120,12 @@ bool log_job_include_pattern_set(LOG_JOB *jb, const char *pattern, size_t patter bool log_job_exclude_pattern_set(LOG_JOB *jb, const char *pattern, size_t pattern_len) { if(jb->filter.exclude.re) { - log2stderr("FILTER INCLUDE: there is already an exclude filter set"); + l2j_log("FILTER INCLUDE: there is already an exclude filter set"); return false; } if(!search_pattern_set(&jb->filter.exclude, pattern, pattern_len)) { - log2stderr("FILTER EXCLUDE: failed: %s", jb->filter.exclude.error.txt); + l2j_log("FILTER EXCLUDE: failed: %s", jb->filter.exclude.error.txt); return false; } @@ -132,7 +138,7 @@ static bool parse_rename(LOG_JOB *jb, const char *param) { // Search for '=' in param const char *equal_sign = strchr(param, '='); if (!equal_sign || equal_sign == param) { - log2stderr("Error: Invalid rename format, '=' not found in %s", param); + l2j_log("Error: Invalid rename format, '=' not found in %s", param); return false; } @@ -210,7 +216,7 @@ RW_FLAGS parse_rewrite_flags(const char *options) { } if(!found) - log2stderr("Warning: rewrite options '%s' is not understood.", token); + l2j_log("Warning: rewrite options '%s' is not understood.", token); // Get the next token token = strtok(NULL, ","); @@ -226,33 +232,33 @@ static bool parse_rewrite(LOG_JOB *jb, const char *param) { // Search for '=' in param const char *equal_sign = strchr(param, '='); if (!equal_sign || equal_sign == param) { - log2stderr("Error: Invalid rewrite format, '=' not found in %s", param); + l2j_log("Error: Invalid rewrite format, '=' not found in %s", param); return false; } // Get the next character as the separator char separator = *(equal_sign + 1); if (!separator || !is_symbol(separator)) { - log2stderr("Error: rewrite separator not found after '=', or is not one of /\\|-# in: %s", param); + l2j_log("Error: rewrite separator not found after '=', or is not one of /\\|-# in: %s", param); return false; } // Find the next occurrence of the separator const char *second_separator = strchr(equal_sign + 2, separator); if (!second_separator) { - log2stderr("Error: rewrite second separator not found in: %s", param); + l2j_log("Error: rewrite second separator not found in: %s", param); return false; } // Check if the search pattern is empty if (equal_sign + 1 == second_separator) { - log2stderr("Error: rewrite search pattern is empty in: %s", param); + l2j_log("Error: rewrite search pattern is empty in: %s", param); return false; } // Check if the replacement pattern is empty if (*(second_separator + 1) == '\0') { - log2stderr("Error: rewrite replacement pattern is empty in: %s", param); + l2j_log("Error: rewrite replacement pattern is empty in: %s", param); return false; } @@ -281,7 +287,7 @@ static bool parse_rewrite(LOG_JOB *jb, const char *param) { static bool parse_inject(LOG_JOB *jb, const char *value, bool unmatched) { const char *equal = strchr(value, '='); if (!equal) { - log2stderr("Error: injection '%s' does not have an equal sign.", value); + l2j_log("Error: injection '%s' does not have an equal sign.", value); return false; } @@ -330,7 +336,10 @@ bool log_job_command_line_parse_parameters(LOG_JOB *jb, int argc, char **argv) { log_job_pattern_set(jb, arg, strlen(arg)); continue; } else { - log2stderr("Error: Multiple patterns detected. Specify only one pattern. The first is '%s', the second is '%s'", jb->pattern, arg); + l2j_log( + "Error: Multiple patterns detected. Specify only one pattern. The first is '%s', the second is '%s'", + jb->pattern, + arg); return false; } } @@ -355,7 +364,7 @@ bool log_job_command_line_parse_parameters(LOG_JOB *jb, int argc, char **argv) { } #endif else if (strcmp(param, "--unmatched-key") == 0) - hashed_key_set(&jb->unmatched.key, value); + hashed_key_set(&jb->unmatched.key, value, -1); else if (strcmp(param, "--inject") == 0) { if (!parse_inject(jb, value, false)) return false; @@ -386,7 +395,10 @@ bool log_job_command_line_parse_parameters(LOG_JOB *jb, int argc, char **argv) { log_job_pattern_set(jb, arg, strlen(arg)); continue; } else { - log2stderr("Error: Multiple patterns detected. Specify only one pattern. The first is '%s', the second is '%s'", jb->pattern, arg); + l2j_log( + "Error: Multiple patterns detected. Specify only one pattern. The first is '%s', the second is '%s'", + jb->pattern, + arg); return false; } } @@ -395,7 +407,7 @@ bool log_job_command_line_parse_parameters(LOG_JOB *jb, int argc, char **argv) { // Check if a pattern is set and exactly one pattern is specified if (!jb->pattern) { - log2stderr("Warning: pattern not specified. Try the default config with: -c default"); + l2j_log("Warning: pattern not specified. Try the default config with: -c default"); log_job_command_line_help(argv[0]); return false; } diff --git a/src/collectors/log2journal/log2journal-pattern.c b/src/collectors/log2journal/log2journal-pattern.c index 4b7e9026b..158ac1129 100644 --- a/src/collectors/log2journal/log2journal-pattern.c +++ b/src/collectors/log2journal/log2journal-pattern.c @@ -18,13 +18,13 @@ void search_pattern_cleanup(SEARCH_PATTERN *sp) { sp->match_data = NULL; } - txt_cleanup(&sp->error); + txt_l2j_cleanup(&sp->error); } static void pcre2_error_message(SEARCH_PATTERN *sp, int rc, int pos) { char msg[1024]; pcre2_get_error_in_buffer(msg, sizeof(msg), rc, pos); - txt_replace(&sp->error, msg, strlen(msg)); + txt_l2j_set(&sp->error, msg, strlen(msg)); } static inline bool compile_pcre2(SEARCH_PATTERN *sp) { diff --git a/src/collectors/log2journal/log2journal-pcre2.c b/src/collectors/log2journal/log2journal-pcre2.c index 185e69108..77f804cc8 100644 --- a/src/collectors/log2journal/log2journal-pcre2.c +++ b/src/collectors/log2journal/log2journal-pcre2.c @@ -102,8 +102,15 @@ PCRE2_STATE *pcre2_parser_create(LOG_JOB *jb) { } void pcre2_parser_destroy(PCRE2_STATE *pcre2) { - if(pcre2) + if(pcre2) { + if(pcre2->re) + pcre2_code_free(pcre2->re); + + if(pcre2->match_data) + pcre2_match_data_free(pcre2->match_data); + freez(pcre2); + } } const char *pcre2_parser_error(PCRE2_STATE *pcre2) { diff --git a/src/collectors/log2journal/log2journal-rename.c b/src/collectors/log2journal/log2journal-rename.c index c6975779f..11b3d2178 100644 --- a/src/collectors/log2journal/log2journal-rename.c +++ b/src/collectors/log2journal/log2journal-rename.c @@ -9,13 +9,13 @@ void rename_cleanup(RENAME *rn) { bool log_job_rename_add(LOG_JOB *jb, const char *new_key, size_t new_key_len, const char *old_key, size_t old_key_len) { if(jb->renames.used >= MAX_RENAMES) { - log2stderr("Error: too many renames. You can rename up to %d fields.", MAX_RENAMES); + l2j_log("Error: too many renames. You can rename up to %d fields.", MAX_RENAMES); return false; } RENAME *rn = &jb->renames.array[jb->renames.used++]; - hashed_key_len_set(&rn->new_key, new_key, new_key_len); - hashed_key_len_set(&rn->old_key, old_key, old_key_len); + hashed_key_set(&rn->new_key, new_key, new_key_len); + hashed_key_set(&rn->old_key, old_key, old_key_len); return true; } diff --git a/src/collectors/log2journal/log2journal-replace.c b/src/collectors/log2journal/log2journal-replace.c index 7075d109d..66ba48d9f 100644 --- a/src/collectors/log2journal/log2journal-replace.c +++ b/src/collectors/log2journal/log2journal-replace.c @@ -26,7 +26,7 @@ static REPLACE_NODE *replace_pattern_add_node(REPLACE_NODE **head, bool is_varia if (!new_node) return NULL; - hashed_key_set(&new_node->name, text); + hashed_key_set(&new_node->name, text, -1); new_node->is_variable = is_variable; new_node->next = NULL; @@ -57,21 +57,21 @@ bool replace_pattern_set(REPLACE_PATTERN *rp, const char *pattern) { // Start of a variable const char *end = strchr(current, '}'); if (!end) { - log2stderr("Error: Missing closing brace in replacement pattern: %s", rp->pattern); + l2j_log("Error: Missing closing brace in replacement pattern: %s", rp->pattern); return false; } size_t name_length = end - current - 2; // Length of the variable name char *variable_name = strndupz(current + 2, name_length); if (!variable_name) { - log2stderr("Error: Memory allocation failed for variable name."); + l2j_log("Error: Memory allocation failed for variable name."); return false; } REPLACE_NODE *node = replace_pattern_add_node(&(rp->nodes), true, variable_name); if (!node) { freez(variable_name); - log2stderr("Error: Failed to add replacement node for variable."); + l2j_log("Error: Failed to add replacement node for variable."); return false; } freez(variable_name); @@ -88,14 +88,14 @@ bool replace_pattern_set(REPLACE_PATTERN *rp, const char *pattern) { size_t text_length = current - start; char *text = strndupz(start, text_length); if (!text) { - log2stderr("Error: Memory allocation failed for literal text."); + l2j_log("Error: Memory allocation failed for literal text."); return false; } REPLACE_NODE *node = replace_pattern_add_node(&(rp->nodes), false, text); if (!node) { freez(text); - log2stderr("Error: Failed to add replacement node for text."); + l2j_log("Error: Failed to add replacement node for text."); return false; } freez(text); diff --git a/src/collectors/log2journal/log2journal-rewrite.c b/src/collectors/log2journal/log2journal-rewrite.c index 112391bf0..0c9a8ddea 100644 --- a/src/collectors/log2journal/log2journal-rewrite.c +++ b/src/collectors/log2journal/log2journal-rewrite.c @@ -7,6 +7,7 @@ void rewrite_cleanup(REWRITE *rw) { if(rw->flags & RW_MATCH_PCRE2) search_pattern_cleanup(&rw->match_pcre2); + else if(rw->flags & RW_MATCH_NON_EMPTY) replace_pattern_cleanup(&rw->match_non_empty); @@ -16,19 +17,19 @@ void rewrite_cleanup(REWRITE *rw) { bool log_job_rewrite_add(LOG_JOB *jb, const char *key, RW_FLAGS flags, const char *search_pattern, const char *replace_pattern) { if(jb->rewrites.used >= MAX_REWRITES) { - log2stderr("Error: too many rewrites. You can add up to %d rewrite rules.", MAX_REWRITES); + l2j_log("Error: too many rewrites. You can add up to %d rewrite rules.", MAX_REWRITES); return false; } if((flags & (RW_MATCH_PCRE2|RW_MATCH_NON_EMPTY)) && (!search_pattern || !*search_pattern)) { - log2stderr("Error: rewrite for key '%s' does not specify a search pattern.", key); + l2j_log("Error: rewrite for key '%s' does not specify a search pattern.", key); return false; } REWRITE *rw = &jb->rewrites.array[jb->rewrites.used++]; rw->flags = flags; - hashed_key_set(&rw->key, key); + hashed_key_set(&rw->key, key, -1); if((flags & RW_MATCH_PCRE2) && !search_pattern_set(&rw->match_pcre2, search_pattern, strlen(search_pattern))) { rewrite_cleanup(rw); diff --git a/src/collectors/log2journal/log2journal-txt.h b/src/collectors/log2journal/log2journal-txt.h new file mode 100644 index 000000000..f68b85a3d --- /dev/null +++ b/src/collectors/log2journal/log2journal-txt.h @@ -0,0 +1,90 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_LOG2JOURNAL_TXT_H +#define NETDATA_LOG2JOURNAL_TXT_H + +#include "log2journal.h" + +// ---------------------------------------------------------------------------- +// A dynamically sized, reusable text buffer, +// allowing us to be fast (no allocations during iterations) while having the +// smallest possible allocations. + +typedef struct txt_l2j { + char *txt; + uint32_t size; + uint32_t len; +} TXT_L2J; + +static inline void txt_l2j_cleanup(TXT_L2J *t) { + if(!t) + return; + + if(t->txt) + freez(t->txt); + + t->txt = NULL; + t->size = 0; + t->len = 0; +} + +#define TXT_L2J_ALLOC_ALIGN 1024 + +static inline size_t txt_l2j_compute_new_size(size_t old_size, size_t required_size) { + size_t size = (required_size % TXT_L2J_ALLOC_ALIGN == 0) ? required_size : required_size + TXT_L2J_ALLOC_ALIGN; + size = (size / TXT_L2J_ALLOC_ALIGN) * TXT_L2J_ALLOC_ALIGN; + + if(size < old_size * 2) + size = old_size * 2; + + return size; +} + +static inline void txt_l2j_resize(TXT_L2J *dst, size_t required_size, bool keep) { + if(required_size <= dst->size) + return; + + size_t new_size = txt_l2j_compute_new_size(dst->size, required_size); + + if(keep && dst->txt) + dst->txt = reallocz(dst->txt, new_size); + else { + txt_l2j_cleanup(dst); + dst->txt = mallocz(new_size); + dst->len = 0; + } + + dst->size = new_size; +} + +static inline void txt_l2j_set(TXT_L2J *dst, const char *s, int32_t len) { + if(!s || !*s || len == 0) { + s = ""; + len = 0; + } + + if(len == -1) + len = (int32_t)strlen(s); + + txt_l2j_resize(dst, len + 1, false); + memcpy(dst->txt, s, len); + dst->txt[len] = '\0'; + dst->len = len; +} + +static inline void txt_l2j_append(TXT_L2J *dst, const char *s, int32_t len) { + if(!dst->txt || !dst->len) + txt_l2j_set(dst, s, len); + + else { + if(len == -1) + len = (int32_t)strlen(s); + + txt_l2j_resize(dst, dst->len + len + 1, true); + memcpy(&dst->txt[dst->len], s, len); + dst->len += len; + dst->txt[dst->len] = '\0'; + } +} + +#endif //NETDATA_LOG2JOURNAL_TXT_H diff --git a/src/collectors/log2journal/log2journal-yaml.c b/src/collectors/log2journal/log2journal-yaml.c index e73a469f5..53f83d623 100644 --- a/src/collectors/log2journal/log2journal-yaml.c +++ b/src/collectors/log2journal/log2journal-yaml.c @@ -280,6 +280,8 @@ static bool yaml_parse_constant_field_injection(yaml_parser_t *parser, LOG_JOB * goto cleanup; } + yaml_event_delete(&event); + if (!yaml_parse(parser, &event) || event.type != YAML_SCALAR_EVENT) { yaml_error(parser, &event, "Expected scalar for constant field injection value"); goto cleanup; @@ -315,7 +317,7 @@ static bool yaml_parse_injection_mapping(yaml_parser_t *parser, LOG_JOB *jb, boo switch (event.type) { case YAML_SCALAR_EVENT: if (yaml_scalar_matches(&event, "key", strlen("key"))) { - errors += yaml_parse_constant_field_injection(parser, jb, unmatched); + errors += yaml_parse_constant_field_injection(parser, jb, unmatched) ? 1 : 0; } else { yaml_error(parser, &event, "Unexpected scalar in injection mapping"); errors++; @@ -396,7 +398,8 @@ static size_t yaml_parse_unmatched(yaml_parser_t *parser, LOG_JOB *jb) { errors++; } else { if (sub_event.type == YAML_SCALAR_EVENT) { - hashed_key_len_set(&jb->unmatched.key, (char *)sub_event.data.scalar.value, sub_event.data.scalar.length); + hashed_key_set( + &jb->unmatched.key, (char *)sub_event.data.scalar.value, sub_event.data.scalar.length); } else { yaml_error(parser, &sub_event, "expected a scalar value for 'key'"); errors++; @@ -427,6 +430,149 @@ static size_t yaml_parse_unmatched(yaml_parser_t *parser, LOG_JOB *jb) { return errors; } +static bool yaml_parse_scalar_boolean(yaml_parser_t *parser, bool def, const char *where, size_t *errors) { + bool rc = def; + + yaml_event_t value_event; + if (!yaml_parse(parser, &value_event)) { + (*errors)++; + return rc; + } + + if (value_event.type != YAML_SCALAR_EVENT) { + yaml_error(parser, &value_event, "Expected scalar for %s boolean", where); + (*errors)++; + } + else if(strncmp((char*)value_event.data.scalar.value, "yes", 3) == 0 || + strncmp((char*)value_event.data.scalar.value, "true", 4) == 0) + rc = true; + else if(strncmp((char*)value_event.data.scalar.value, "no", 2) == 0 || + strncmp((char*)value_event.data.scalar.value, "false", 5) == 0) + rc = false; + else { + yaml_error(parser, &value_event, "Expected scalar for %s boolean: invalid value %s", where, value_event.data.scalar.value); + rc = def; + } + + yaml_event_delete(&value_event); + return rc; +} + +static bool handle_rewrite_event(yaml_parser_t *parser, yaml_event_t *event, + char **key, char **search_pattern, char **replace_pattern, + RW_FLAGS *flags, bool *mapping_finished, + LOG_JOB *jb, size_t *errors) { + switch (event->type) { + case YAML_SCALAR_EVENT: + if (yaml_scalar_matches(event, "key", strlen("key"))) { + yaml_event_t value_event; + if (!yaml_parse(parser, &value_event)) { + (*errors)++; + return false; + } + + if (value_event.type != YAML_SCALAR_EVENT) { + yaml_error(parser, &value_event, "Expected scalar for rewrite key"); + (*errors)++; + } else { + freez(*key); + *key = strndupz((char *)value_event.data.scalar.value, value_event.data.scalar.length); + } + yaml_event_delete(&value_event); + } + else if (yaml_scalar_matches(event, "match", strlen("match"))) { + yaml_event_t value_event; + if (!yaml_parse(parser, &value_event)) { + (*errors)++; + return false; + } + + if (value_event.type != YAML_SCALAR_EVENT) { + yaml_error(parser, &value_event, "Expected scalar for rewrite match PCRE2 pattern"); + (*errors)++; + } + else { + freez(*search_pattern); + *flags |= RW_MATCH_PCRE2; + *flags &= ~RW_MATCH_NON_EMPTY; + *search_pattern = strndupz((char *)value_event.data.scalar.value, value_event.data.scalar.length); + } + yaml_event_delete(&value_event); + } + else if (yaml_scalar_matches(event, "not_empty", strlen("not_empty"))) { + yaml_event_t value_event; + if (!yaml_parse(parser, &value_event)) { + (*errors)++; + return false; + } + + if (value_event.type != YAML_SCALAR_EVENT) { + yaml_error(parser, &value_event, "Expected scalar for rewrite not empty condition"); + (*errors)++; + } + else { + freez(*search_pattern); + *flags |= RW_MATCH_NON_EMPTY; + *flags &= ~RW_MATCH_PCRE2; + *search_pattern = strndupz((char *)value_event.data.scalar.value, value_event.data.scalar.length); + } + yaml_event_delete(&value_event); + } + else if (yaml_scalar_matches(event, "value", strlen("value"))) { + yaml_event_t value_event; + if (!yaml_parse(parser, &value_event)) { + (*errors)++; + return false; + } + + if (value_event.type != YAML_SCALAR_EVENT) { + yaml_error(parser, &value_event, "Expected scalar for rewrite value"); + (*errors)++; + } else { + freez(*replace_pattern); + *replace_pattern = strndupz((char *)value_event.data.scalar.value, value_event.data.scalar.length); + } + yaml_event_delete(&value_event); + } + else if (yaml_scalar_matches(event, "stop", strlen("stop"))) { + if(yaml_parse_scalar_boolean(parser, true, "rewrite stop", errors)) + *flags &= ~RW_DONT_STOP; + else + *flags |= RW_DONT_STOP; + } + else if (yaml_scalar_matches(event, "inject", strlen("inject"))) { + if(yaml_parse_scalar_boolean(parser, false, "rewrite inject", errors)) + *flags |= RW_INJECT; + else + *flags &= ~RW_INJECT; + } + else { + yaml_error(parser, event, "Unexpected scalar in rewrite mapping"); + (*errors)++; + } + break; + + case YAML_MAPPING_END_EVENT: + if(*key) { + if (!log_job_rewrite_add(jb, *key, *flags, *search_pattern, *replace_pattern)) + (*errors)++; + } + + freez(*key); + freez(*search_pattern); + freez(*replace_pattern); + *mapping_finished = true; + break; + + default: + yaml_error(parser, event, "Unexpected event in rewrite mapping"); + (*errors)++; + break; + } + + return true; +} + static size_t yaml_parse_rewrites(yaml_parser_t *parser, LOG_JOB *jb) { size_t errors = 0; @@ -457,120 +603,14 @@ static size_t yaml_parse_rewrites(yaml_parser_t *parser, LOG_JOB *jb) { continue; } - switch (sub_event.type) { - case YAML_SCALAR_EVENT: - if (yaml_scalar_matches(&sub_event, "key", strlen("key"))) { - if (!yaml_parse(parser, &sub_event) || sub_event.type != YAML_SCALAR_EVENT) { - yaml_error(parser, &sub_event, "Expected scalar for rewrite key"); - errors++; - } else { - freez(key); - key = strndupz((char *)sub_event.data.scalar.value, sub_event.data.scalar.length); - yaml_event_delete(&sub_event); - } - } else if (yaml_scalar_matches(&sub_event, "match", strlen("match"))) { - if (!yaml_parse(parser, &sub_event) || sub_event.type != YAML_SCALAR_EVENT) { - yaml_error(parser, &sub_event, "Expected scalar for rewrite match PCRE2 pattern"); - errors++; - } - else { - if(search_pattern) - freez(search_pattern); - flags |= RW_MATCH_PCRE2; - flags &= ~RW_MATCH_NON_EMPTY; - search_pattern = strndupz((char *)sub_event.data.scalar.value, sub_event.data.scalar.length); - yaml_event_delete(&sub_event); - } - } else if (yaml_scalar_matches(&sub_event, "not_empty", strlen("not_empty"))) { - if (!yaml_parse(parser, &sub_event) || sub_event.type != YAML_SCALAR_EVENT) { - yaml_error(parser, &sub_event, "Expected scalar for rewrite not empty condition"); - errors++; - } - else { - if(search_pattern) - freez(search_pattern); - flags |= RW_MATCH_NON_EMPTY; - flags &= ~RW_MATCH_PCRE2; - search_pattern = strndupz((char *)sub_event.data.scalar.value, sub_event.data.scalar.length); - yaml_event_delete(&sub_event); - } - } else if (yaml_scalar_matches(&sub_event, "value", strlen("value"))) { - if (!yaml_parse(parser, &sub_event) || sub_event.type != YAML_SCALAR_EVENT) { - yaml_error(parser, &sub_event, "Expected scalar for rewrite value"); - errors++; - } else { - freez(replace_pattern); - replace_pattern = strndupz((char *)sub_event.data.scalar.value, sub_event.data.scalar.length); - yaml_event_delete(&sub_event); - } - } else if (yaml_scalar_matches(&sub_event, "stop", strlen("stop"))) { - if (!yaml_parse(parser, &sub_event) || sub_event.type != YAML_SCALAR_EVENT) { - yaml_error(parser, &sub_event, "Expected scalar for rewrite stop boolean"); - errors++; - } else { - if(strncmp((char*)sub_event.data.scalar.value, "no", 2) == 0 || - strncmp((char*)sub_event.data.scalar.value, "false", 5) == 0) - flags |= RW_DONT_STOP; - else - flags &= ~RW_DONT_STOP; - - yaml_event_delete(&sub_event); - } - } else if (yaml_scalar_matches(&sub_event, "inject", strlen("inject"))) { - if (!yaml_parse(parser, &sub_event) || sub_event.type != YAML_SCALAR_EVENT) { - yaml_error(parser, &sub_event, "Expected scalar for rewrite inject boolean"); - errors++; - } else { - if(strncmp((char*)sub_event.data.scalar.value, "yes", 3) == 0 || - strncmp((char*)sub_event.data.scalar.value, "true", 4) == 0) - flags |= RW_INJECT; - else - flags &= ~RW_INJECT; - - yaml_event_delete(&sub_event); - } - } else { - yaml_error(parser, &sub_event, "Unexpected scalar in rewrite mapping"); - errors++; - } - break; - - case YAML_MAPPING_END_EVENT: - if(key) { - if (!log_job_rewrite_add(jb, key, flags, search_pattern, replace_pattern)) - errors++; - } - - freez(key); - key = NULL; - - freez(search_pattern); - search_pattern = NULL; - - freez(replace_pattern); - replace_pattern = NULL; - - flags = RW_NONE; - - mapping_finished = true; - break; - - default: - yaml_error(parser, &sub_event, "Unexpected event in rewrite mapping"); - errors++; - break; - } + handle_rewrite_event(parser, &sub_event, &key, + &search_pattern, &replace_pattern, + &flags, &mapping_finished, jb, &errors); yaml_event_delete(&sub_event); } - freez(replace_pattern); - replace_pattern = NULL; - freez(search_pattern); - search_pattern = NULL; - freez(key); - key = NULL; - } break; + } case YAML_SEQUENCE_END_EVENT: finished = true; @@ -618,25 +658,36 @@ static size_t yaml_parse_renames(yaml_parser_t *parser, LOG_JOB *jb) { switch (sub_event.type) { case YAML_SCALAR_EVENT: if (yaml_scalar_matches(&sub_event, "new_key", strlen("new_key"))) { - if (!yaml_parse(parser, &sub_event) || sub_event.type != YAML_SCALAR_EVENT) { - yaml_error(parser, &sub_event, "Expected scalar for rename new_key"); + yaml_event_t value_event; + + if (!yaml_parse(parser, &value_event) || value_event.type != YAML_SCALAR_EVENT) { + yaml_error(parser, &value_event, "Expected scalar for rename new_key"); errors++; } else { - hashed_key_len_set(&rn.new_key, (char *)sub_event.data.scalar.value, sub_event.data.scalar.length); - yaml_event_delete(&sub_event); + hashed_key_set( + &rn.new_key, + (char *)value_event.data.scalar.value, + value_event.data.scalar.length); + yaml_event_delete(&value_event); } } else if (yaml_scalar_matches(&sub_event, "old_key", strlen("old_key"))) { - if (!yaml_parse(parser, &sub_event) || sub_event.type != YAML_SCALAR_EVENT) { - yaml_error(parser, &sub_event, "Expected scalar for rename old_key"); + yaml_event_t value_event; + + if (!yaml_parse(parser, &value_event) || value_event.type != YAML_SCALAR_EVENT) { + yaml_error(parser, &value_event, "Expected scalar for rename old_key"); errors++; } else { - hashed_key_len_set(&rn.old_key, (char *)sub_event.data.scalar.value, sub_event.data.scalar.length); - yaml_event_delete(&sub_event); + hashed_key_set( + &rn.old_key, + (char *)value_event.data.scalar.value, + value_event.data.scalar.length); + yaml_event_delete(&value_event); } } else { yaml_error(parser, &sub_event, "Unexpected scalar in rewrite mapping"); errors++; } + break; case YAML_MAPPING_END_EVENT: @@ -782,18 +833,22 @@ cleanup: bool yaml_parse_file(const char *config_file_path, LOG_JOB *jb) { if(!config_file_path || !*config_file_path) { - log2stderr("yaml configuration filename cannot be empty."); + l2j_log("yaml configuration filename cannot be empty."); return false; } FILE *fp = fopen(config_file_path, "r"); if (!fp) { - log2stderr("Error opening config file: %s", config_file_path); + l2j_log("Error opening config file: %s", config_file_path); return false; } yaml_parser_t parser; - yaml_parser_initialize(&parser); + if (!yaml_parser_initialize(&parser)) { + fclose(fp); + return false; + } + yaml_parser_set_input_file(&parser, fp); size_t errors = yaml_parse_initialized(&parser, jb); diff --git a/src/collectors/log2journal/log2journal.c b/src/collectors/log2journal/log2journal.c index 0fbba0b0c..769547bc1 100644 --- a/src/collectors/log2journal/log2journal.c +++ b/src/collectors/log2journal/log2journal.c @@ -1,6 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "log2journal.h" +#include "libnetdata/required_dummies.h" // ---------------------------------------------------------------------------- @@ -73,10 +74,13 @@ static inline HASHED_KEY *get_key_from_hashtable(LOG_JOB *jb, HASHED_KEY *k) { ht_key->flags |= HK_COLLISION_CHECKED; if(strcmp(ht_key->key, k->key) != 0) - log2stderr("Hashtable collision detected on key '%s' (hash %lx) and '%s' (hash %lx). " - "Please file a bug report.", ht_key->key, (unsigned long) ht_key->hash, k->key - , (unsigned long) k->hash - ); + l2j_log( + "Hashtable collision detected on key '%s' (hash %lx) and '%s' (hash %lx). " + "Please file a bug report.", + ht_key->key, + (unsigned long)ht_key->hash, + k->key, + (unsigned long)k->hash); } } else { @@ -97,8 +101,9 @@ static inline HASHED_KEY *get_key_from_hashtable(LOG_JOB *jb, HASHED_KEY *k) { static inline HASHED_KEY *get_key_from_hashtable_with_char_ptr(LOG_JOB *jb, const char *key) { HASHED_KEY find = { - .key = key, - .len = strlen(key), + .flags = HK_NONE, + .key = key, + .len = strlen(key), }; find.hash = XXH3_64bits(key, find.len); @@ -109,24 +114,29 @@ static inline HASHED_KEY *get_key_from_hashtable_with_char_ptr(LOG_JOB *jb, cons static inline void validate_key(LOG_JOB *jb __maybe_unused, HASHED_KEY *k) { if(k->len > JOURNAL_MAX_KEY_LEN) - log2stderr("WARNING: key '%s' has length %zu, which is more than %zu, the max systemd-journal allows", - k->key, (size_t)k->len, (size_t)JOURNAL_MAX_KEY_LEN); + l2j_log( + "WARNING: key '%s' has length %zu, which is more than %zu, the max systemd-journal allows", + k->key, + (size_t)k->len, + (size_t)JOURNAL_MAX_KEY_LEN); for(size_t i = 0; i < k->len ;i++) { char c = k->key[i]; if((c < 'A' || c > 'Z') && !isdigit(c) && c != '_') { - log2stderr("WARNING: key '%s' contains characters that are not allowed by systemd-journal.", k->key); + l2j_log("WARNING: key '%s' contains characters that are not allowed by systemd-journal.", k->key); break; } } if(isdigit(k->key[0])) - log2stderr("WARNING: key '%s' starts with a digit and may not be accepted by systemd-journal.", k->key); + l2j_log("WARNING: key '%s' starts with a digit and may not be accepted by systemd-journal.", k->key); if(k->key[0] == '_') - log2stderr("WARNING: key '%s' starts with an underscore, which makes it a systemd-journal trusted field. " - "Such fields are accepted by systemd-journal-remote, but not by systemd-journald.", k->key); + l2j_log( + "WARNING: key '%s' starts with an underscore, which makes it a systemd-journal trusted field. " + "Such fields are accepted by systemd-journal-remote, but not by systemd-journald.", + k->key); } // ---------------------------------------------------------------------------- @@ -170,16 +180,16 @@ static inline void replace_evaluate(LOG_JOB *jb, HASHED_KEY *k, REPLACE_PATTERN for(REPLACE_NODE *node = rp->nodes; node != NULL; node = node->next) { if(node->is_variable) { if(hashed_keys_match(&node->name, &jb->line.key)) - txt_expand_and_append(&ht_key->value, jb->line.trimmed, jb->line.trimmed_len); + txt_l2j_append(&ht_key->value, jb->line.trimmed, jb->line.trimmed_len); else { HASHED_KEY *ktmp = get_key_from_hashtable_with_char_ptr(jb, node->name.key); if(ktmp->value.len) - txt_expand_and_append(&ht_key->value, ktmp->value.txt, ktmp->value.len); + txt_l2j_append(&ht_key->value, ktmp->value.txt, ktmp->value.len); } } else - txt_expand_and_append(&ht_key->value, node->name.key, node->name.len); + txt_l2j_append(&ht_key->value, node->name.key, node->name.len); } } @@ -202,26 +212,26 @@ static inline void replace_evaluate_from_pcre2(LOG_JOB *jb, HASHED_KEY *k, REPLA PCRE2_SIZE end_offset = ovector[2 * group_number + 1]; PCRE2_SIZE length = end_offset - start_offset; - txt_expand_and_append(&jb->rewrites.tmp, k->value.txt + start_offset, length); + txt_l2j_append(&jb->rewrites.tmp, k->value.txt + start_offset, length); } else { if(hashed_keys_match(&node->name, &jb->line.key)) - txt_expand_and_append(&jb->rewrites.tmp, jb->line.trimmed, jb->line.trimmed_len); + txt_l2j_append(&jb->rewrites.tmp, jb->line.trimmed, jb->line.trimmed_len); else { HASHED_KEY *ktmp = get_key_from_hashtable_with_char_ptr(jb, node->name.key); if(ktmp->value.len) - txt_expand_and_append(&jb->rewrites.tmp, ktmp->value.txt, ktmp->value.len); + txt_l2j_append(&jb->rewrites.tmp, ktmp->value.txt, ktmp->value.len); } } } else { - txt_expand_and_append(&jb->rewrites.tmp, node->name.key, node->name.len); + txt_l2j_append(&jb->rewrites.tmp, node->name.key, node->name.len); } } // swap the values of the temporary TEXT and the key value - TEXT tmp = k->value; + TXT_L2J tmp = k->value; k->value = jb->rewrites.tmp; jb->rewrites.tmp = tmp; } @@ -271,7 +281,7 @@ static inline HASHED_KEY *rename_key(LOG_JOB *jb, HASHED_KEY *k) { static inline void send_key_value_constant(LOG_JOB *jb __maybe_unused, HASHED_KEY *key, const char *value, size_t len) { HASHED_KEY *ht_key = get_key_from_hashtable(jb, key); - txt_replace(&ht_key->value, value, len); + txt_l2j_set(&ht_key->value, value, len); ht_key->flags |= HK_VALUE_FROM_LOG; // fprintf(stderr, "SET %s=%.*s\n", ht_key->key, (int)ht_key->value.len, ht_key->value.txt); @@ -292,7 +302,7 @@ static inline void send_key_value_error(LOG_JOB *jb, HASHED_KEY *key, const char inline void log_job_send_extracted_key_value(LOG_JOB *jb, const char *key, const char *value, size_t len) { HASHED_KEY *ht_key = get_key_from_hashtable_with_char_ptr(jb, key); HASHED_KEY *nk = rename_key(jb, ht_key); - txt_replace(&nk->value, value, len); + txt_l2j_set(&nk->value, value, len); ht_key->flags |= HK_VALUE_FROM_LOG; // fprintf(stderr, "SET %s=%.*s\n", ht_key->key, (int)ht_key->value.len, ht_key->value.txt); @@ -417,7 +427,7 @@ static inline bool jb_switched_filename(LOG_JOB *jb, const char *line, size_t le const char *end = strstr(line, " <=="); while (*start == ' ') start++; if (*start != '\n' && *start != '\0' && end) { - txt_replace(&jb->filename.current, start, end - start); + txt_l2j_set(&jb->filename.current, start, end - start); return true; } } @@ -486,7 +496,7 @@ int log_job_run(LOG_JOB *jb) { else if(strcmp(jb->pattern, "none") != 0) { pcre2 = pcre2_parser_create(jb); if(pcre2_has_error(pcre2)) { - log2stderr("%s", pcre2_parser_error(pcre2)); + l2j_log("%s", pcre2_parser_error(pcre2)); pcre2_parser_destroy(pcre2); return 1; } @@ -515,11 +525,11 @@ int log_job_run(LOG_JOB *jb) { if(!line_is_matched) { if(json) - log2stderr("%s", json_parser_error(json)); + l2j_log("%s", json_parser_error(json)); else if(logfmt) - log2stderr("%s", logfmt_parser_error(logfmt)); + l2j_log("%s", logfmt_parser_error(logfmt)); else if(pcre2) - log2stderr("%s", pcre2_parser_error(pcre2)); + l2j_log("%s", pcre2_parser_error(pcre2)); if(!jb_send_unmatched_line(jb, line)) // just logging to stderr, not sending unmatched lines diff --git a/src/collectors/log2journal/log2journal.h b/src/collectors/log2journal/log2journal.h index 5bdf7276b..480c0598c 100644 --- a/src/collectors/log2journal/log2journal.h +++ b/src/collectors/log2journal/log2journal.h @@ -3,49 +3,16 @@ #ifndef NETDATA_LOG2JOURNAL_H #define NETDATA_LOG2JOURNAL_H -// only for PACKAGE_VERSION -#include <config.h> - -#include <stdio.h> -#include <stdlib.h> -#include <dirent.h> -#include <string.h> -#include <stdbool.h> -#include <string.h> -#include <ctype.h> -#include <math.h> -#include <stdarg.h> -#include <assert.h> - -// ---------------------------------------------------------------------------- -// compatibility - -#ifndef HAVE_STRNDUP -// strndup() is not available on Windows -static inline char *os_strndup( const char *s1, size_t n) -{ - char *copy= (char*)malloc( n+1 ); - memcpy( copy, s1, n ); - copy[n] = 0; - return copy; -}; -#define strndup(s, n) os_strndup(s, n) -#endif - -#if defined(HAVE_FUNC_ATTRIBUTE_FORMAT_GNU_PRINTF) -#define PRINTFLIKE(f, a) __attribute__ ((format(gnu_printf, f, a))) -#elif defined(HAVE_FUNC_ATTRIBUTE_FORMAT_PRINTF) -#define PRINTFLIKE(f, a) __attribute__ ((format(printf, f, a))) -#else -#define PRINTFLIKE(f, a) -#endif +#include "libnetdata/libnetdata.h" +#include "log2journal-txt.h" +#include "log2journal-hashed-key.h" // ---------------------------------------------------------------------------- // logging // enable the compiler to check for printf like errors on our log2stderr() function -static inline void log2stderr(const char *format, ...) PRINTFLIKE(1, 2); -static inline void log2stderr(const char *format, ...) { +static inline void l2j_log(const char *format, ...) PRINTFLIKE(1, 2); +static inline void l2j_log(const char *format, ...) { va_list args; va_start(args, format); vfprintf(stderr, format, args); @@ -54,62 +21,6 @@ static inline void log2stderr(const char *format, ...) { } // ---------------------------------------------------------------------------- -// allocation functions abstraction - -static inline void *mallocz(size_t size) { - void *ptr = malloc(size); - if (!ptr) { - log2stderr("Fatal Error: Memory allocation failed. Requested size: %zu bytes.", size); - exit(EXIT_FAILURE); - } - return ptr; -} - -static inline void *callocz(size_t elements, size_t size) { - void *ptr = calloc(elements, size); - if (!ptr) { - log2stderr("Fatal Error: Memory allocation failed. Requested size: %zu bytes.", elements * size); - exit(EXIT_FAILURE); - } - return ptr; -} - -static inline void *reallocz(void *ptr, size_t size) { - void *new_ptr = realloc(ptr, size); - if (!new_ptr) { - log2stderr("Fatal Error: Memory reallocation failed. Requested size: %zu bytes.", size); - exit(EXIT_FAILURE); - } - return new_ptr; -} - -static inline char *strdupz(const char *s) { - char *ptr = strdup(s); - if (!ptr) { - log2stderr("Fatal Error: Memory allocation failed in strdup."); - exit(EXIT_FAILURE); - } - return ptr; -} - -static inline char *strndupz(const char *s, size_t n) { - char *ptr = strndup(s, n); - if (!ptr) { - log2stderr("Fatal Error: Memory allocation failed in strndup. Requested size: %zu bytes.", n); - exit(EXIT_FAILURE); - } - return ptr; -} - -static inline void freez(void *ptr) { - if (ptr) - free(ptr); -} - -// ---------------------------------------------------------------------------- - -#define XXH_INLINE_ALL -#include "libnetdata/xxhash.h" #define PCRE2_CODE_UNIT_WIDTH 8 #include <pcre2.h> @@ -121,15 +32,12 @@ static inline void freez(void *ptr) { // ---------------------------------------------------------------------------- // hashtable for HASHED_KEY -// cleanup hashtable defines -#include "libnetdata/simple_hashtable_undef.h" - struct hashed_key; static inline int compare_keys(struct hashed_key *k1, struct hashed_key *k2); #define SIMPLE_HASHTABLE_SORT_FUNCTION compare_keys -#define SIMPLE_HASHTABLE_VALUE_TYPE struct hashed_key +#define SIMPLE_HASHTABLE_VALUE_TYPE HASHED_KEY #define SIMPLE_HASHTABLE_NAME _KEY -#include "libnetdata/simple_hashtable.h" +#include "libnetdata/simple_hashtable/simple_hashtable.h" // ---------------------------------------------------------------------------- @@ -173,151 +81,12 @@ static inline size_t copy_to_buffer(char *dst, size_t dst_size, const char *src, } // ---------------------------------------------------------------------------- -// A dynamically sized, reusable text buffer, -// allowing us to be fast (no allocations during iterations) while having the -// smallest possible allocations. - -typedef struct txt { - char *txt; - uint32_t size; - uint32_t len; -} TEXT; - -static inline void txt_cleanup(TEXT *t) { - if(!t) - return; - - if(t->txt) - freez(t->txt); - - t->txt = NULL; - t->size = 0; - t->len = 0; -} - -static inline void txt_replace(TEXT *t, const char *s, size_t len) { - if(!s || !*s || len == 0) { - s = ""; - len = 0; - } - - if(len + 1 <= t->size) { - // the existing value allocation, fits our value - - memcpy(t->txt, s, len); - t->txt[len] = '\0'; - t->len = len; - } - else { - // no existing value allocation, or too small for our value - // cleanup and increase the buffer - - txt_cleanup(t); - - t->txt = strndupz(s, len); - t->size = len + 1; - t->len = len; - } -} - -static inline void txt_expand_and_append(TEXT *t, const char *s, size_t len) { - if(len + 1 > (t->size - t->len)) { - size_t new_size = t->len + len + 1; - if(new_size < t->size * 2) - new_size = t->size * 2; - - t->txt = reallocz(t->txt, new_size); - t->size = new_size; - } - - char *copy_to = &t->txt[t->len]; - memcpy(copy_to, s, len); - copy_to[len] = '\0'; - t->len += len; -} - -// ---------------------------------------------------------------------------- - -typedef enum __attribute__((__packed__)) { - HK_NONE = 0, - - // permanent flags - they are set once to optimize various decisions and lookups - - HK_HASHTABLE_ALLOCATED = (1 << 0), // this is key object allocated in the hashtable - // objects that do not have this, have a pointer to a key in the hashtable - // objects that have this, value a value allocated - - HK_FILTERED = (1 << 1), // we checked once if this key in filtered - HK_FILTERED_INCLUDED = (1 << 2), // the result of the filtering was to include it in the output - - HK_COLLISION_CHECKED = (1 << 3), // we checked once for collision check of this key - - HK_RENAMES_CHECKED = (1 << 4), // we checked once if there are renames on this key - HK_HAS_RENAMES = (1 << 5), // and we found there is a rename rule related to it - - // ephemeral flags - they are unset at the end of each log line - - HK_VALUE_FROM_LOG = (1 << 14), // the value of this key has been read from the log (or from injection, duplication) - HK_VALUE_REWRITTEN = (1 << 15), // the value of this key has been rewritten due to one of our rewrite rules - -} HASHED_KEY_FLAGS; - -typedef struct hashed_key { - const char *key; - uint32_t len; - HASHED_KEY_FLAGS flags; - XXH64_hash_t hash; - union { - struct hashed_key *hashtable_ptr; // HK_HASHTABLE_ALLOCATED is not set - TEXT value; // HK_HASHTABLE_ALLOCATED is set - }; -} HASHED_KEY; - -static inline void hashed_key_cleanup(HASHED_KEY *k) { - if(k->key) { - freez((void *)k->key); - k->key = NULL; - } - - if(k->flags & HK_HASHTABLE_ALLOCATED) - txt_cleanup(&k->value); - else - k->hashtable_ptr = NULL; -} - -static inline void hashed_key_set(HASHED_KEY *k, const char *name) { - hashed_key_cleanup(k); - - k->key = strdupz(name); - k->len = strlen(k->key); - k->hash = XXH3_64bits(k->key, k->len); - k->flags = HK_NONE; -} - -static inline void hashed_key_len_set(HASHED_KEY *k, const char *name, size_t len) { - hashed_key_cleanup(k); - - k->key = strndupz(name, len); - k->len = len; - k->hash = XXH3_64bits(k->key, k->len); - k->flags = HK_NONE; -} - -static inline bool hashed_keys_match(HASHED_KEY *k1, HASHED_KEY *k2) { - return ((k1 == k2) || (k1->hash == k2->hash && strcmp(k1->key, k2->key) == 0)); -} - -static inline int compare_keys(struct hashed_key *k1, struct hashed_key *k2) { - return strcmp(k1->key, k2->key); -} - -// ---------------------------------------------------------------------------- typedef struct search_pattern { const char *pattern; pcre2_code *re; pcre2_match_data *match_data; - TEXT error; + TXT_L2J error; } SEARCH_PATTERN; void search_pattern_cleanup(SEARCH_PATTERN *sp); @@ -416,7 +185,7 @@ typedef struct log_job { struct { bool last_line_was_empty; HASHED_KEY key; - TEXT current; + TXT_L2J current; } filename; struct { @@ -435,7 +204,7 @@ typedef struct log_job { struct { uint32_t used; REWRITE array[MAX_REWRITES]; - TEXT tmp; + TXT_L2J tmp; } rewrites; struct { |