diff options
Diffstat (limited to 'collectors/systemd-journal.plugin')
-rw-r--r-- | collectors/systemd-journal.plugin/README.md | 673 | ||||
-rw-r--r-- | collectors/systemd-journal.plugin/systemd-journal.c | 2664 |
2 files changed, 3100 insertions, 237 deletions
diff --git a/collectors/systemd-journal.plugin/README.md b/collectors/systemd-journal.plugin/README.md index e69de29bb..51aa1b7cd 100644 --- a/collectors/systemd-journal.plugin/README.md +++ b/collectors/systemd-journal.plugin/README.md @@ -0,0 +1,673 @@ + +# `systemd` journal plugin + +[KEY FEATURES](#key-features) | [JOURNAL SOURCES](#journal-sources) | [JOURNAL FIELDS](#journal-fields) | +[PLAY MODE](#play-mode) | [FULL TEXT SEARCH](#full-text-search) | [PERFORMANCE](#query-performance) | +[CONFIGURATION](#configuration-and-maintenance) | [FAQ](#faq) + +The `systemd` journal plugin by Netdata makes viewing, exploring and analyzing `systemd` journal logs simple and +efficient. +It automatically discovers available journal sources, allows advanced filtering, offers interactive visual +representations and supports exploring the logs of both individual servers and the logs on infrastructure wide +journal centralization servers. + +![image](https://github.com/netdata/netdata/assets/2662304/691b7470-ec56-430c-8b81-0c9e49012679) + +## Key features + +- Works on both **individual servers** and **journal centralization servers**. +- Supports `persistent` and `volatile` journals. +- Supports `system`, `user`, `namespaces` and `remote` journals. +- Allows filtering on **any journal field** or **field value**, for any time-frame. +- Allows **full text search** (`grep`) on all journal fields, for any time-frame. +- Provides a **histogram** for log entries over time, with a break down per field-value, for any field and any + time-frame. +- Works directly on journal files, without any other third-party components. +- Supports coloring log entries, the same way `journalctl` does. +- In PLAY mode provides the same experience as `journalctl -f`, showing new log entries immediately after they are + received. + +### Prerequisites + +`systemd-journal.plugin` is a Netdata Function Plugin. + +To protect your privacy, as with all Netdata Functions, a free Netdata Cloud user account is required to access it. +For more information check [this discussion](https://github.com/netdata/netdata/discussions/16136). + +### Limitations + +#### Plugin availability + +The following are limitations related to the availability of the plugin: + +- This plugin is not available when Netdata is installed in a container. The problem is that `libsystemd` is not + available in Alpine Linux (there is a `libsystemd`, but it is a dummy that returns failure on all calls). We plan to + change this, by shipping Netdata containers based on Debian. +- For the same reason (lack of `systemd` support for Alpine Linux), the plugin is not available on `static` builds of + Netdata (which are based on `muslc`, not `glibc`). +- On old systemd systems (like Centos 7), the plugin runs always in "full data query" mode, which makes it slower. The + reason, is that systemd API is missing some important calls we need to use the field indexes of `systemd` journal. + However, when running in this mode, the plugin offers also negative matches on the data (like filtering for all logs + that do not have set some field), and this is the reason "full data query" mode is also offered as an option even on + newer versions of `systemd`. + +To use the plugin, install one of our native distribution packages, or install it from source. + +#### `systemd` journal features + +The following are limitations related to the features of `systemd` journal: + +- This plugin does not support binary field values. `systemd` journal has the ability to assign fields with binary data. + This plugin assumes all fields contain text values (text in this context includes numbers). +- This plugin does not support multiple values per field for any given log entry. `systemd` journal has the ability to + accept the same field key, multiple times, with multiple values on a single log entry. This plugin will present the + last value and ignore the others for this log entry. +- This plugin will only read journal files located in `/var/log/journal` or `/run/log/journal`. `systemd-remote` has the + ability to store journal files anywhere (user configured). If journal files are not located in `/var/log/journal` + or `/run/log/journal` (and any of their subdirectories), the plugin will not find them. + +Other than the above, this plugin supports all features of `systemd` journals. + +## Journal Sources + +The plugin automatically detects the available journal sources, based on the journal files available in +`/var/log/journal` (persistent logs) and `/run/log/journal` (volatile logs). + +![journal-sources](https://github.com/netdata/netdata/assets/2662304/28e63a3e-6809-4586-b3b0-80755f340e31) + +The plugin, by default, merges all journal sources together, to provide a unified view of all log messages available. + +> To improve query performance, we recommend selecting the relevant journal source, before doing more analysis on the +> logs. + +### `system` journals + +`system` journals are the default journals available on all `systemd` based systems. + +`system` journals contain: + +- kernel log messages (via `kmsg`), +- audit records, originating from the kernel audit subsystem, +- messages received by `systemd-journald` via `syslog`, +- messages received via the standard output and error of service units, +- structured messages received via the native journal API. + +### `user` journals + +Unlike `journalctl`, the Netdata plugin allows viewing, exploring and querying the journal files of **all users**. + +By default, each user, with a UID outside the range of system users (0 - 999), dynamic service users, +and the nobody user (65534), will get their own set of `user` journal files. For more information about +this policy check [Users, Groups, UIDs and GIDs on systemd Systems](https://systemd.io/UIDS-GIDS/). + +Keep in mind that `user` journals are merged with the `system` journals when they are propagated to a journal +centralization server. So, at the centralization server, the `remote` journals contain both the `system` and `user` +journals of the sender. + +### `namespaces` journals + +The plugin auto-detects the namespaces available and provides a list of all namespaces at the "sources" list on the UI. + +Journal namespaces are both a mechanism for logically isolating the log stream of projects consisting +of one or more services from the rest of the system and a mechanism for improving performance. + +`systemd` service units may be assigned to a specific journal namespace through the `LogNamespace=` unit file setting. + +Keep in mind that namespaces require special configuration to be propagated to a journal centralization server. +This makes them a little more difficult to handle, from the administration perspective. + +### `remote` journals + +Remote journals are created by `systemd-journal-remote`. This `systemd` feature allows creating logs centralization +points within your infrastructure, based exclusively on `systemd`. + +Usually `remote` journals are named by the IP of the server sending these logs. The Netdata plugin automatically +extracts these IPs and performs a reverse DNS lookup to find their hostnames. When this is successful, +`remote` journals are named by the hostnames of the origin servers. + +For information about configuring a journals' centralization server, +check [this FAQ item](#how-do-i-configure-a-journals-centralization-server). + +## Journal Fields + +`systemd` journals are designed to support multiple fields per log entry. The power of `systemd` journals is that, +unlike other log management systems, it supports dynamic and variable fields for each log message, +while all fields and their values are indexed for fast querying. + +This means that each application can log messages annotated with its own unique fields and values, and `systemd` +journals will automatically index all of them, without any configuration or manual action. + +For a description of the most frequent fields found in `systemd` journals, check `man systemd.journal-fields`. + +Fields found in the journal files are automatically added to the UI in multiple places to help you explore +and filter the data. + +The plugin automatically enriches certain fields to make them more user-friendly: + +- `_BOOT_ID`: the hex value is annotated with the timestamp of the first message encountered for this boot id. +- `PRIORITY`: the numeric value is replaced with the human-readable name of each priority. +- `SYSLOG_FACILITY`: the encoded value is replaced with the human-readable name of each facility. +- `ERRNO`: the numeric value is annotated with the short name of each value. +- `_UID` `_AUDIT_LOGINUID`, `_SYSTEMD_OWNER_UID`, `OBJECT_UID`, `OBJECT_SYSTEMD_OWNER_UID`, `OBJECT_AUDIT_LOGINUID`: + the local user database is consulted to annotate them with usernames. +- `_GID`, `OBJECT_GID`: the local group database is consulted to annotate them with group names. +- `_CAP_EFFECTIVE`: the encoded value is annotated with a human-readable list of the linux capabilities. +- `_SOURCE_REALTIME_TIMESTAMP`: the numeric value is annotated with human-readable datetime in UTC. + +The values of all other fields are presented as found in the journals. + +> IMPORTANT: +> The UID and GID annotations are added during presentation and are taken from the server running the plugin. +> For `remote` sources, the names presented may not reflect the actual user and group names on the origin server. +> The numeric value will still be visible though, as-is on the origin server. + +The annotations are not searchable with full-text search. They are only added for the presentation of the fields. + +### Journal fields as columns in the table + +All journal fields available in the journal files are offered as columns on the UI. Use the gear button above the table: + +![image](https://github.com/netdata/netdata/assets/2662304/cd75fb55-6821-43d4-a2aa-033792c7f7ac) + +### Journal fields as additional info to each log entry + +When you click a log line, the `info` sidebar will open on the right of the screen, to provide the full list of fields +related to this log line. You can close this `info` sidebar, by selecting the filter icon at its top. + +![image](https://github.com/netdata/netdata/assets/2662304/3207794c-a61b-444c-8ffe-6c07cbc90ae2) + +### Journal fields as filters + +The plugin presents a select list of fields as filters to the query, with counters for each of the possible values +for the field. This list can used to quickly check which fields and values are available for the entire time-frame +of the query. + +Internally the plugin has: + +1. A white-list of fields, to be presented as filters. +2. A black-list of fields, to prevent them from becoming filters. This list includes fields with a very high + cardinality, like timestamps, unique message ids, etc. This is mainly for protecting the server's performance, + to avoid building in memory indexes for the fields that almost each of their values is unique. + +Keep in mind that the values presented in the filters, and their sorting is affected by the "full data queries" +setting: + +![image](https://github.com/netdata/netdata/assets/2662304/ac710d46-07c2-487b-8ce3-e7f767b9ae0f) + +When "full data queries" is off, empty values are hidden and cannot be selected. This is due to a limitation of +`libsystemd` that does not allow negative or empty matches. Also, values with zero counters may appear in the list. + +When "full data queries" is on, Netdata is applying all filtering to the data (not `libsystemd`), but this means +that all the data of the entire time-frame, without any filtering applied, have to be read by the plugin to prepare +the response required. So, "full data queries" can be significantly slower over long time-frames. + +### Journal fields as histogram sources + +The plugin presents a histogram of the number of log entries across time. + +The data source of this histogram can be any of the fields that are available as filters. +For each of the values this field has, across the entire time-frame of the query, the histogram will get corresponding +dimensions, showing the number of log entries, per value, over time. + +The granularity of the histogram is adjusted automatically to have about 150 columns visible on screen. + +The histogram presented by the plugin is interactive: + +- **Zoom**, either with the global date-time picker, or the zoom tool in the histogram's toolbox. +- **Pan**, either with global date-time picker, or by dragging with the mouse the chart to the left or the right. +- **Click**, to quickly jump to the highlighted point in time in the log entries. + +![image](https://github.com/netdata/netdata/assets/2662304/d3dcb1d1-daf4-49cf-9663-91b5b3099c2d) + +## PLAY mode + +The plugin supports PLAY mode, to continuously update the screen with new log entries found in the journal files. +Just hit the "play" button at the top of the Netdata dashboard screen. + +On centralized log servers, PLAY mode provides a unified view of all the new logs encountered across the entire +infrastructure, +from all hosts sending logs to the central logs server via `systemd-remote`. + +## Full-text search + +The plugin supports searching for any text on all fields of the log entries. + +Full text search is combined with the selected filters. + +The text box accepts asterisks `*` as wildcards. So, `a*b*c` means match anything that contains `a`, then `b` and +then `c` with anything between them. + +## Query performance + +Journal files are designed to be accessed by multiple readers and one writer, concurrently. + +Readers (like this Netdata plugin), open the journal files and `libsystemd`, behind the scenes, maps regions +of the files into memory, to satisfy each query. + +On logs aggregation servers, the performance of the queries depend on the following factors: + +1. The **number of files** involved in each query. + + This is why we suggest to select a source when possible. + +2. The **speed of the disks** hosting the journal files. + + Journal files perform a lot of reading while querying, so the fastest the disks, the faster the query will finish. + +3. The **memory available** for caching parts of the files. + + Increased memory will help the kernel cache the most frequently used parts of the journal files, avoiding disk I/O + and speeding up queries. + +4. The **number of filters** applied. + + Queries are significantly faster when just a few filters are selected. + +In general, for a faster experience, **keep a low number of rows within the visible timeframe**. + +Even on long timeframes, selecting a couple of filters that will result in a **few dozen thousand** log entries +will provide fast / rapid responses, usually less than a second. To the contrary, viewing timeframes with **millions +of entries** may result in longer delays. + +The plugin aborts journal queries when your browser cancels inflight requests. This allows you to work on the UI +while there are background queries running. + +At the time of this writing, this Netdata plugin is about 25-30 times faster than `journalctl` on queries that access +multiple journal files, over long time-frames. + +During the development of this plugin, we submitted, to `systemd`, a number of patches to improve `journalctl` +performance by a factor of 14: + +- https://github.com/systemd/systemd/pull/29365 +- https://github.com/systemd/systemd/pull/29366 +- https://github.com/systemd/systemd/pull/29261 + +However, even after these patches are merged, `journalctl` will still be 2x slower than this Netdata plugin, +on multi-journal queries. + +The problem lies in the way `libsystemd` handles multi-journal file queries. To overcome this problem, +the Netdata plugin queries each file individually and it then it merges the results to be returned. +This is transparent, thanks to the `facets` library in `libnetdata` that handles on-the-fly indexing, filtering, +and searching of any dataset, independently of its source. + +## Configuration and maintenance + +This Netdata plugin does not require any configuration or maintenance. + +## FAQ + +### Can I use this plugin on journals' centralization servers? + +Yes. You can centralize your logs using `systemd-journal-remote`, and then install Netdata +on this logs centralization server to explore the logs of all your infrastructure. + +This plugin will automatically provide multi-node views of your logs and also give you the ability to combine the logs +of multiple servers, as you see fit. + +Check [configuring a logs centralization server](#configuring-a-journals-centralization-server). + +### Can I use this plugin from a parent Netdata? + +Yes. When your nodes are connected to a Netdata parent, all their functions are available +via the parent's UI. So, from the parent UI, you can access the functions of all your nodes. + +Keep in mind that to protect your privacy, in order to access Netdata functions, you need a +free Netdata Cloud account. + +### Is any of my data exposed to Netdata Cloud from this plugin? + +No. When you access the agent directly, none of your data passes through Netdata Cloud. +You need a free Netdata Cloud account only to verify your identity and enable the use of +Netdata Functions. Once this is done, all the data flow directly from your Netdata agent +to your web browser. + +Also check [this discussion](https://github.com/netdata/netdata/discussions/16136). + +When you access Netdata via `https://app.netdata.cloud`, your data travel via Netdata Cloud, +but they are not stored in Netdata Cloud. This is to allow you access your Netdata agents from +anywhere. All communication from/to Netdata Cloud is encrypted. + +### What are `volatile` and `persistent` journals? + +`systemd` `journald` allows creating both `volatile` journals in a `tmpfs` ram drive, +and `persistent` journals stored on disk. + +`volatile` journals are particularly useful when the system monitored is sensitive to +disk I/O, or does not have any writable disks at all. + +For more information check `man systemd-journald`. + +### I centralize my logs with Loki. Why to use Netdata for my journals? + +`systemd` journals have almost infinite cardinality at their labels and all of them are indexed, +even if every single message has unique fields and values. + +When you send `systemd` journal logs to Loki, even if you use the `relabel_rules` argument to +`loki.source.journal` with a JSON format, you need to specify which of the fields from journald +you want inherited by Loki. This means you need to know the most important fields beforehand. +At the same time you loose all the flexibility `systemd` journal provides: +**indexing on all fields and all their values**. + +Loki generally assumes that all logs are like a table. All entries in a stream share the same +fields. But journald does exactly the opposite. Each log entry is unique and may have its own unique fields. + +So, Loki and `systemd-journal` are good for different use cases. + +`systemd-journal` already runs in your systems. You use it today. It is there inside all your systems +collecting the system and applications logs. And for its use case, it has advantages over other +centralization solutions. So, why not use it? + +### Is it worth to build a `systemd` logs centralization server? + +Yes. It is simple, fast and the software to do it is already in your systems. + +For application and system logs, `systemd` journal is ideal and the visibility you can get +by centralizing your system logs and the use of this Netdata plugin, is unparalleled. + +### How do I configure a journals' centralization server? + +A short summary to get journal server running can be found below. +There are two strategies you can apply, when it comes down to a centralized server for `systemd` journal logs. + +1. _Active sources_, where the centralized server fetches the logs from each individual server +2. _Passive sources_, where the centralized server accepts a log stream from an individual server. + +For more options and reference to documentation, check `man systemd-journal-remote` and `man systemd-journal-upload`. + +#### _passive_ journals' centralization without encryption + +> ℹ️ _passive_ is a journal server that waits for clients to push their metrics to it. + +> ⚠️ **IMPORTANT** +> These instructions will copy your logs to a central server, without any encryption or authorization. +> DO NOT USE THIS ON NON-TRUSTED NETWORKS. + +##### _passive_ server, without encryption + +On the centralization server install `systemd-journal-remote`: + +```sh +# change this according to your distro +sudo apt-get install systemd-journal-remote +``` + +Make sure the journal transfer protocol is `http`: + +```sh +sudo cp /lib/systemd/system/systemd-journal-remote.service /etc/systemd/system/ + +# edit it to make sure it says: +# --listen-http=-3 +# not: +# --listen-https=-3 +sudo nano /etc/systemd/system/systemd-journal-remote.service + +# reload systemd +sudo systemctl daemon-reload +``` + +Optionally, if you want to change the port (the default is `19532`), edit `systemd-journal-remote.socket` + +```sh +# edit the socket file +sudo systemctl edit systemd-journal-remote.socket +``` + +and add the following lines into the instructed place, and choose your desired port; save and exit. + +```sh +[Socket] +ListenStream=<DESIRED_PORT> +``` + +Finally, enable it, so that it will start automatically upon receiving a connection: + +``` +# enable systemd-journal-remote +sudo systemctl enable --now systemd-journal-remote.socket +sudo systemctl enable systemd-journal-remote.service +``` + +`systemd-journal-remote` is now listening for incoming journals from remote hosts. + +##### _passive_ client, without encryption + +On the clients, install `systemd-journal-remote`: + +```sh +# change this according to your distro +sudo apt-get install systemd-journal-remote +``` + +Edit `/etc/systemd/journal-upload.conf` and set the IP address and the port of the server, like so: + +``` +[Upload] +URL=http://centralization.server.ip:19532 +``` + +Edit `systemd-journal-upload`, and add `Restart=always` to make sure the client will keep trying to push logs, even if the server is temporarily not there, like this: + +```sh +sudo systemctl edit systemd-journal-upload +``` + +At the top, add: + +``` +[Service] +Restart=always +``` + +Enable and start `systemd-journal-upload`, like this: + +```sh +sudo systemctl enable systemd-journal-upload +sudo systemctl start systemd-journal-upload +``` + +##### verify it works + +To verify the central server is receiving logs, run this on the central server: + +```sh +sudo ls -l /var/log/journal/remote/ +``` + +You should see new files from the client's IP. + +Also, `systemctl status systemd-journal-remote` should show something like this: + +``` +systemd-journal-remote.service - Journal Remote Sink Service + Loaded: loaded (/etc/systemd/system/systemd-journal-remote.service; indirect; preset: disabled) + Active: active (running) since Sun 2023-10-15 14:29:46 EEST; 2h 24min ago +TriggeredBy: ● systemd-journal-remote.socket + Docs: man:systemd-journal-remote(8) + man:journal-remote.conf(5) + Main PID: 2118153 (systemd-journal) + Status: "Processing requests..." + Tasks: 1 (limit: 154152) + Memory: 2.2M + CPU: 71ms + CGroup: /system.slice/systemd-journal-remote.service + └─2118153 /usr/lib/systemd/systemd-journal-remote --listen-http=-3 --output=/var/log/journal/remote/ +``` + +Note the `status: "Processing requests..."` and the PID under `CGroup`. + +On the client `systemctl status systemd-journal-upload` should show something like this: + +``` +● systemd-journal-upload.service - Journal Remote Upload Service + Loaded: loaded (/lib/systemd/system/systemd-journal-upload.service; enabled; vendor preset: disabled) + Drop-In: /etc/systemd/system/systemd-journal-upload.service.d + └─override.conf + Active: active (running) since Sun 2023-10-15 10:39:04 UTC; 3h 17min ago + Docs: man:systemd-journal-upload(8) + Main PID: 4169 (systemd-journal) + Status: "Processing input..." + Tasks: 1 (limit: 13868) + Memory: 3.5M + CPU: 1.081s + CGroup: /system.slice/systemd-journal-upload.service + └─4169 /lib/systemd/systemd-journal-upload --save-state +``` + +Note the `Status: "Processing input..."` and the PID under `CGroup`. + +#### _passive_ journals' centralization with encryption using self-signed certificates + +> ℹ️ _passive_ is a journal server that waits for clients to push their metrics to it. + +##### _passive_ server, with encryption and self-singed certificates + +On the centralization server install `systemd-journal-remote` and `openssl`: + +```sh +# change this according to your distro +sudo apt-get install systemd-journal-remote openssl +``` + +Make sure the journal transfer protocol is `https`: + +```sh +sudo cp /lib/systemd/system/systemd-journal-remote.service /etc/systemd/system/ + +# edit it to make sure it says: +# --listen-https=-3 +# not: +# --listen-http=-3 +sudo nano /etc/systemd/system/systemd-journal-remote.service + +# reload systemd +sudo systemctl daemon-reload +``` + +Optionally, if you want to change the port (the default is `19532`), edit `systemd-journal-remote.socket` + +```sh +# edit the socket file +sudo systemctl edit systemd-journal-remote.socket +``` + +and add the following lines into the instructed place, and choose your desired port; save and exit. + +```sh +[Socket] +ListenStream=<DESIRED_PORT> +``` + +Finally, enable it, so that it will start automatically upon receiving a connection: + +```sh +# enable systemd-journal-remote +sudo systemctl enable --now systemd-journal-remote.socket +sudo systemctl enable systemd-journal-remote.service +``` + +`systemd-journal-remote` is now listening for incoming journals from remote hosts. + +Use [this script](https://gist.github.com/ktsaou/d62b8a6501cf9a0da94f03cbbb71c5c7) to create a self-signed certificates authority and certificates for all your servers. + +```sh +wget -O systemd-journal-self-signed-certs.sh "https://gist.githubusercontent.com/ktsaou/d62b8a6501cf9a0da94f03cbbb71c5c7/raw/c346e61e0a66f45dc4095d254bd23917f0a01bd0/systemd-journal-self-signed-certs.sh" +chmod 755 systemd-journal-self-signed-certs.sh +``` + +Edit the script and at its top, set your settings: + +```sh +# The directory to save the generated certificates (and everything about this certificate authority). +# This is only used on the node generating the certificates (usually on the journals server). +DIR="/etc/ssl/systemd-journal-remote" + +# The journals centralization server name (the CN of the server certificate). +SERVER="server-hostname" + +# All the DNS names or IPs this server is reachable at (the certificate will include them). +# Journal clients can use any of them to connect to this server. +# systemd-journal-upload validates its URL= hostname, against this list. +SERVER_ALIASES=("DNS:server-hostname1" "DNS:server-hostname2" "IP:1.2.3.4" "IP:10.1.1.1" "IP:172.16.1.1") + +# All the names of the journal clients that will be sending logs to the server (the CNs of their certificates). +# These names are used by systemd-journal-remote to name the journal files in /var/log/journal/remote/. +# Also the remote hosts will be presented using these names on Netdata dashboards. +CLIENTS=("vm1" "vm2" "vm3" "add_as_may_as_needed") +``` + +Then run the script: + +```sh +sudo ./systemd-journal-self-signed-certs.sh +``` + +The script will create the directory `/etc/ssl/systemd-journal-remote` and in it you will find all the certificates needed. + +There will also be files named `runme-on-XXX.sh`. There will be 1 script for the server and 1 script for each of the clients. You can copy and paste (or `scp`) these scripts on your server and each of your clients and run them as root: + +```sh +scp /etc/ssl/systemd-journal-remote/runme-on-XXX.sh XXX:/tmp/ +``` + +Once the above is done, `ssh` to each server/client and do: + +```sh +sudo bash /tmp/runme-on-XXX.sh +``` + +The scripts install the needed certificates, fix their file permissions to be accessible by systemd-journal-remote/upload, change `/etc/systemd/journal-remote.conf` (on the server) or `/etc/systemd/journal-upload.conf` on the clients and restart the relevant services. + + +##### _passive_ client, with encryption and self-singed certificates + +On the clients, install `systemd-journal-remote`: + +```sh +# change this according to your distro +sudo apt-get install systemd-journal-remote +``` + +Edit `/etc/systemd/journal-upload.conf` and set the IP address and the port of the server, like so: + +``` +[Upload] +URL=https://centralization.server.ip:19532 +``` + +Make sure that `centralization.server.ip` is one of the `SERVER_ALIASES` when you created the certificates. + +Edit `systemd-journal-upload`, and add `Restart=always` to make sure the client will keep trying to push logs, even if the server is temporarily not there, like this: + +```sh +sudo systemctl edit systemd-journal-upload +``` + +At the top, add: + +``` +[Service] +Restart=always +``` + +Enable and start `systemd-journal-upload`, like this: + +```sh +sudo systemctl enable systemd-journal-upload +``` + +Copy the relevant `runme-on-XXX.sh` script as described on server setup and run it: + +```sh +sudo bash /tmp/runme-on-XXX.sh +``` + + +#### Limitations when using a logs centralization server + +As of this writing `namespaces` support by `systemd` is limited: + +- Docker containers cannot log to namespaces. Check [this issue](https://github.com/moby/moby/issues/41879). +- `systemd-journal-upload` automatically uploads `system` and `user` journals, but not `namespaces` journals. For this + you need to spawn a `systemd-journal-upload` per namespace. + diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c index 304ff244a..c2bd98e7d 100644 --- a/collectors/systemd-journal.plugin/systemd-journal.c +++ b/collectors/systemd-journal.plugin/systemd-journal.c @@ -5,29 +5,110 @@ * GPL v3+ */ -// TODO - 1) MARKDOC - #include "collectors/all.h" #include "libnetdata/libnetdata.h" #include "libnetdata/required_dummies.h" -#ifndef SD_JOURNAL_ALL_NAMESPACES -#define JOURNAL_NAMESPACE SD_JOURNAL_LOCAL_ONLY -#else -#define JOURNAL_NAMESPACE SD_JOURNAL_ALL_NAMESPACES -#endif - +#include <linux/capability.h> #include <systemd/sd-journal.h> #include <syslog.h> +/* + * TODO + * + * _UDEV_DEVLINK is frequently set more than once per field - support multi-value faces + * + */ + + +// ---------------------------------------------------------------------------- +// fstat64 overloading to speed up libsystemd +// https://github.com/systemd/systemd/pull/29261 + +#define ND_SD_JOURNAL_OPEN_FLAGS (0) + +#include <dlfcn.h> +#include <sys/stat.h> + +#define FSTAT_CACHE_MAX 1024 +struct fdstat64_cache_entry { + bool enabled; + bool updated; + int err_no; + struct stat64 stat; + int ret; + size_t cached_count; + size_t session; +}; + +struct fdstat64_cache_entry fstat64_cache[FSTAT_CACHE_MAX] = {0 }; +static __thread size_t fstat_thread_calls = 0; +static __thread size_t fstat_thread_cached_responses = 0; +static __thread bool enable_thread_fstat = false; +static __thread size_t fstat_caching_thread_session = 0; +static size_t fstat_caching_global_session = 0; + +static void fstat_cache_enable_on_thread(void) { + fstat_caching_thread_session = __atomic_add_fetch(&fstat_caching_global_session, 1, __ATOMIC_ACQUIRE); + enable_thread_fstat = true; +} + +static void fstat_cache_disable_on_thread(void) { + fstat_caching_thread_session = __atomic_add_fetch(&fstat_caching_global_session, 1, __ATOMIC_RELEASE); + enable_thread_fstat = false; +} + +int fstat64(int fd, struct stat64 *buf) { + static int (*real_fstat)(int, struct stat64 *) = NULL; + if (!real_fstat) + real_fstat = dlsym(RTLD_NEXT, "fstat64"); + + fstat_thread_calls++; + + if(fd >= 0 && fd < FSTAT_CACHE_MAX) { + if(enable_thread_fstat && fstat64_cache[fd].session != fstat_caching_thread_session) { + fstat64_cache[fd].session = fstat_caching_thread_session; + fstat64_cache[fd].enabled = true; + fstat64_cache[fd].updated = false; + } + + if(fstat64_cache[fd].enabled && fstat64_cache[fd].updated && fstat64_cache[fd].session == fstat_caching_thread_session) { + fstat_thread_cached_responses++; + errno = fstat64_cache[fd].err_no; + *buf = fstat64_cache[fd].stat; + fstat64_cache[fd].cached_count++; + return fstat64_cache[fd].ret; + } + } + + int ret = real_fstat(fd, buf); + + if(fd >= 0 && fd < FSTAT_CACHE_MAX && fstat64_cache[fd].enabled) { + fstat64_cache[fd].ret = ret; + fstat64_cache[fd].updated = true; + fstat64_cache[fd].err_no = errno; + fstat64_cache[fd].stat = *buf; + fstat64_cache[fd].session = fstat_caching_thread_session; + } + + return ret; +} + +// ---------------------------------------------------------------------------- + #define FACET_MAX_VALUE_LENGTH 8192 +#define SYSTEMD_JOURNAL_MAX_SOURCE_LEN 64 #define SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION "View, search and analyze systemd journal entries." #define SYSTEMD_JOURNAL_FUNCTION_NAME "systemd-journal" -#define SYSTEMD_JOURNAL_DEFAULT_TIMEOUT 30 +#define SYSTEMD_JOURNAL_DEFAULT_TIMEOUT 60 #define SYSTEMD_JOURNAL_MAX_PARAMS 100 -#define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (3 * 3600) +#define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (1 * 3600) #define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200 +#define SYSTEMD_JOURNAL_WORKER_THREADS 5 + +#define JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT (5 * USEC_PER_SEC) // assume always 5 seconds latency +#define JOURNAL_VS_REALTIME_DELTA_MAX_UT (2 * 60 * USEC_PER_SEC) // up to 2 minutes latency #define JOURNAL_PARAMETER_HELP "help" #define JOURNAL_PARAMETER_AFTER "after" @@ -35,147 +116,1540 @@ #define JOURNAL_PARAMETER_ANCHOR "anchor" #define JOURNAL_PARAMETER_LAST "last" #define JOURNAL_PARAMETER_QUERY "query" +#define JOURNAL_PARAMETER_FACETS "facets" +#define JOURNAL_PARAMETER_HISTOGRAM "histogram" +#define JOURNAL_PARAMETER_DIRECTION "direction" +#define JOURNAL_PARAMETER_IF_MODIFIED_SINCE "if_modified_since" +#define JOURNAL_PARAMETER_DATA_ONLY "data_only" +#define JOURNAL_PARAMETER_SOURCE "source" +#define JOURNAL_PARAMETER_INFO "info" +#define JOURNAL_PARAMETER_ID "id" +#define JOURNAL_PARAMETER_PROGRESS "progress" +#define JOURNAL_PARAMETER_SLICE "slice" +#define JOURNAL_PARAMETER_DELTA "delta" +#define JOURNAL_PARAMETER_TAIL "tail" + +#define JOURNAL_KEY_ND_JOURNAL_FILE "ND_JOURNAL_FILE" +#define JOURNAL_KEY_ND_JOURNAL_PROCESS "ND_JOURNAL_PROCESS" + +#define JOURNAL_DEFAULT_SLICE_MODE true +#define JOURNAL_DEFAULT_DIRECTION FACETS_ANCHOR_DIRECTION_BACKWARD #define SYSTEMD_ALWAYS_VISIBLE_KEYS NULL -#define SYSTEMD_KEYS_EXCLUDED_FROM_FACETS NULL + +#define SYSTEMD_KEYS_EXCLUDED_FROM_FACETS \ + "*MESSAGE*" \ + "|*_RAW" \ + "|*_USEC" \ + "|*_NSEC" \ + "|*TIMESTAMP*" \ + "|*_ID" \ + "|*_ID_*" \ + "|__*" \ + "" + #define SYSTEMD_KEYS_INCLUDED_IN_FACETS \ - "_TRANSPORT" \ - "|SYSLOG_IDENTIFIER" \ - "|SYSLOG_FACILITY" \ + \ + /* --- USER JOURNAL FIELDS --- */ \ + \ + /* "|MESSAGE" */ \ + /* "|MESSAGE_ID" */ \ "|PRIORITY" \ - "|_HOSTNAME" \ - "|_RUNTIME_SCOPE" \ - "|_PID" \ + "|CODE_FILE" \ + /* "|CODE_LINE" */ \ + "|CODE_FUNC" \ + "|ERRNO" \ + /* "|INVOCATION_ID" */ \ + /* "|USER_INVOCATION_ID" */ \ + "|SYSLOG_FACILITY" \ + "|SYSLOG_IDENTIFIER" \ + /* "|SYSLOG_PID" */ \ + /* "|SYSLOG_TIMESTAMP" */ \ + /* "|SYSLOG_RAW" */ \ + /* "!DOCUMENTATION" */ \ + /* "|TID" */ \ + "|UNIT" \ + "|USER_UNIT" \ + "|UNIT_RESULT" /* undocumented */ \ + \ + \ + /* --- TRUSTED JOURNAL FIELDS --- */ \ + \ + /* "|_PID" */ \ "|_UID" \ "|_GID" \ - "|_SYSTEMD_UNIT" \ - "|_SYSTEMD_SLICE" \ - "|_SYSTEMD_USER_SLICE" \ "|_COMM" \ "|_EXE" \ + /* "|_CMDLINE" */ \ + "|_CAP_EFFECTIVE" \ + /* "|_AUDIT_SESSION" */ \ + "|_AUDIT_LOGINUID" \ "|_SYSTEMD_CGROUP" \ + "|_SYSTEMD_SLICE" \ + "|_SYSTEMD_UNIT" \ "|_SYSTEMD_USER_UNIT" \ - "|USER_UNIT" \ - "|UNIT" \ + "|_SYSTEMD_USER_SLICE" \ + "|_SYSTEMD_SESSION" \ + "|_SYSTEMD_OWNER_UID" \ + "|_SELINUX_CONTEXT" \ + /* "|_SOURCE_REALTIME_TIMESTAMP" */ \ + "|_BOOT_ID" \ + "|_MACHINE_ID" \ + /* "|_SYSTEMD_INVOCATION_ID" */ \ + "|_HOSTNAME" \ + "|_TRANSPORT" \ + "|_STREAM_ID" \ + /* "|LINE_BREAK" */ \ + "|_NAMESPACE" \ + "|_RUNTIME_SCOPE" \ + \ + \ + /* --- KERNEL JOURNAL FIELDS --- */ \ + \ + /* "|_KERNEL_DEVICE" */ \ + "|_KERNEL_SUBSYSTEM" \ + /* "|_UDEV_SYSNAME" */ \ + "|_UDEV_DEVNODE" \ + /* "|_UDEV_DEVLINK" */ \ + \ + \ + /* --- LOGGING ON BEHALF --- */ \ + \ + "|OBJECT_UID" \ + "|OBJECT_GID" \ + "|OBJECT_COMM" \ + "|OBJECT_EXE" \ + /* "|OBJECT_CMDLINE" */ \ + /* "|OBJECT_AUDIT_SESSION" */ \ + "|OBJECT_AUDIT_LOGINUID" \ + "|OBJECT_SYSTEMD_CGROUP" \ + "|OBJECT_SYSTEMD_SESSION" \ + "|OBJECT_SYSTEMD_OWNER_UID" \ + "|OBJECT_SYSTEMD_UNIT" \ + "|OBJECT_SYSTEMD_USER_UNIT" \ + \ + \ + /* --- CORE DUMPS --- */ \ + \ + "|COREDUMP_COMM" \ + "|COREDUMP_UNIT" \ + "|COREDUMP_USER_UNIT" \ + "|COREDUMP_SIGNAL_NAME" \ + "|COREDUMP_CGROUP" \ + \ + \ + /* --- DOCKER --- */ \ + \ + "|CONTAINER_ID" \ + /* "|CONTAINER_ID_FULL" */ \ + "|CONTAINER_NAME" \ + "|CONTAINER_TAG" \ + "|IMAGE_NAME" /* undocumented */ \ + /* "|CONTAINER_PARTIAL_MESSAGE" */ \ + \ "" -static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER; +static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER; static bool plugin_should_exit = false; -DICTIONARY *uids = NULL; -DICTIONARY *gids = NULL; +// ---------------------------------------------------------------------------- +typedef enum { + ND_SD_JOURNAL_NO_FILE_MATCHED, + ND_SD_JOURNAL_FAILED_TO_OPEN, + ND_SD_JOURNAL_FAILED_TO_SEEK, + ND_SD_JOURNAL_TIMED_OUT, + ND_SD_JOURNAL_OK, + ND_SD_JOURNAL_NOT_MODIFIED, + ND_SD_JOURNAL_CANCELLED, +} ND_SD_JOURNAL_STATUS; + +typedef enum { + SDJF_ALL = 0, + SDJF_LOCAL = (1 << 0), + SDJF_REMOTE = (1 << 1), + SDJF_SYSTEM = (1 << 2), + SDJF_USER = (1 << 3), + SDJF_NAMESPACE = (1 << 4), + SDJF_OTHER = (1 << 5), +} SD_JOURNAL_FILE_SOURCE_TYPE; + +typedef struct function_query_status { + bool *cancelled; // a pointer to the cancelling boolean + usec_t stop_monotonic_ut; + + usec_t started_monotonic_ut; + + // request + SD_JOURNAL_FILE_SOURCE_TYPE source_type; + STRING *source; + usec_t after_ut; + usec_t before_ut; + + struct { + usec_t start_ut; + usec_t stop_ut; + } anchor; + + FACETS_ANCHOR_DIRECTION direction; + size_t entries; + usec_t if_modified_since; + bool delta; + bool tail; + bool data_only; + bool slice; + size_t filters; + usec_t last_modified; + const char *query; + const char *histogram; + + // per file progress info + size_t cached_count; + + // progress statistics + usec_t matches_setup_ut; + size_t rows_useful; + size_t rows_read; + size_t bytes_read; + size_t files_matched; + size_t file_working; +} FUNCTION_QUERY_STATUS; + +struct journal_file { + const char *filename; + size_t filename_len; + STRING *source; + SD_JOURNAL_FILE_SOURCE_TYPE source_type; + usec_t file_last_modified_ut; + usec_t msg_first_ut; + usec_t msg_last_ut; + usec_t last_scan_ut; + size_t size; + bool logged_failure; + usec_t max_journal_vs_realtime_delta_ut; +}; + +static void log_fqs(FUNCTION_QUERY_STATUS *fqs, const char *msg) { + netdata_log_error("ERROR: %s, on query " + "timeframe [%"PRIu64" - %"PRIu64"], " + "anchor [%"PRIu64" - %"PRIu64"], " + "if_modified_since %"PRIu64", " + "data_only:%s, delta:%s, tail:%s, direction:%s" + , msg + , fqs->after_ut, fqs->before_ut + , fqs->anchor.start_ut, fqs->anchor.stop_ut + , fqs->if_modified_since + , fqs->data_only ? "true" : "false" + , fqs->delta ? "true" : "false" + , fqs->tail ? "tail" : "false" + , fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward"); +} -// ---------------------------------------------------------------------------- +static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timestamp) { + if(sd_journal_seek_realtime_usec(j, timestamp) < 0) { + netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to %" PRIu64, timestamp); + if(sd_journal_seek_tail(j) < 0) { + netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to journal's tail"); + return false; + } + } + + return true; +} + +#define JD_SOURCE_REALTIME_TIMESTAMP "_SOURCE_REALTIME_TIMESTAMP" -int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t stop_monotonic_ut) { - sd_journal *j; - int r; +static inline bool parse_journal_field(const char *data, size_t data_length, const char **key, size_t *key_length, const char **value, size_t *value_length) { + const char *k = data; + const char *equal = strchr(k, '='); + if(unlikely(!equal)) + return false; - // Open the system journal for reading - r = sd_journal_open(&j, JOURNAL_NAMESPACE); - if (r < 0) - return HTTP_RESP_INTERNAL_SERVER_ERROR; + size_t kl = equal - k; + + const char *v = ++equal; + size_t vl = data_length - kl - 1; + + *key = k; + *key_length = kl; + *value = v; + *value_length = vl; + + return true; +} + +static inline size_t netdata_systemd_journal_process_row(sd_journal *j, FACETS *facets, struct journal_file *jf, usec_t *msg_ut) { + const void *data; + size_t length, bytes = 0; + + facets_add_key_value_length(facets, JOURNAL_KEY_ND_JOURNAL_FILE, sizeof(JOURNAL_KEY_ND_JOURNAL_FILE) - 1, jf->filename, jf->filename_len); + + SD_JOURNAL_FOREACH_DATA(j, data, length) { + const char *key, *value; + size_t key_length, value_length; + + if(!parse_journal_field(data, length, &key, &key_length, &value, &value_length)) + continue; + +#ifdef NETDATA_INTERNAL_CHECKS + usec_t origin_journal_ut = *msg_ut; +#endif + if(unlikely(key_length == sizeof(JD_SOURCE_REALTIME_TIMESTAMP) - 1 && + memcmp(key, JD_SOURCE_REALTIME_TIMESTAMP, sizeof(JD_SOURCE_REALTIME_TIMESTAMP) - 1) == 0)) { + usec_t ut = str2ull(value, NULL); + if(ut && ut < *msg_ut) { + usec_t delta = *msg_ut - ut; + *msg_ut = ut; + + if(delta > JOURNAL_VS_REALTIME_DELTA_MAX_UT) + delta = JOURNAL_VS_REALTIME_DELTA_MAX_UT; + + // update max_journal_vs_realtime_delta_ut if the delta increased + usec_t expected = jf->max_journal_vs_realtime_delta_ut; + do { + if(delta <= expected) + break; + } while(!__atomic_compare_exchange_n(&jf->max_journal_vs_realtime_delta_ut, &expected, delta, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + internal_error(delta > expected, + "increased max_journal_vs_realtime_delta_ut from %"PRIu64" to %"PRIu64", " + "journal %"PRIu64", actual %"PRIu64" (delta %"PRIu64")" + , expected, delta, origin_journal_ut, *msg_ut, origin_journal_ut - (*msg_ut)); + } + } + + bytes += length; + facets_add_key_value_length(facets, key, key_length, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH); + } + + return bytes; +} + +#define FUNCTION_PROGRESS_UPDATE_ROWS(rows_read, rows) __atomic_fetch_add(&(rows_read), rows, __ATOMIC_RELAXED) +#define FUNCTION_PROGRESS_UPDATE_BYTES(bytes_read, bytes) __atomic_fetch_add(&(bytes_read), bytes, __ATOMIC_RELAXED) +#define FUNCTION_PROGRESS_EVERY_ROWS (1ULL << 13) +#define FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS (1ULL << 7) + +static inline ND_SD_JOURNAL_STATUS check_stop(const bool *cancelled, const usec_t *stop_monotonic_ut) { + if(cancelled && __atomic_load_n(cancelled, __ATOMIC_RELAXED)) { + internal_error(true, "Function has been cancelled"); + return ND_SD_JOURNAL_CANCELLED; + } + + if(now_monotonic_usec() > __atomic_load_n(stop_monotonic_ut, __ATOMIC_RELAXED)) { + internal_error(true, "Function timed out"); + return ND_SD_JOURNAL_TIMED_OUT; + } + + return ND_SD_JOURNAL_OK; +} + +ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_backward( + sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets, + struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) { + + usec_t anchor_delta = __atomic_load_n(&jf->max_journal_vs_realtime_delta_ut, __ATOMIC_RELAXED); + + usec_t start_ut = ((fqs->data_only && fqs->anchor.start_ut) ? fqs->anchor.start_ut : fqs->before_ut) + anchor_delta; + usec_t stop_ut = (fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->after_ut; + bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut); + + if(!netdata_systemd_journal_seek_to(j, start_ut)) + return ND_SD_JOURNAL_FAILED_TO_SEEK; + + size_t errors_no_timestamp = 0; + usec_t earliest_msg_ut = 0; + size_t row_counter = 0, last_row_counter = 0, rows_useful = 0; + size_t bytes = 0, last_bytes = 0; + + usec_t last_usec_from = 0; + usec_t last_usec_to = 0; + + ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK; facets_rows_begin(facets); + while (status == ND_SD_JOURNAL_OK && sd_journal_previous(j) > 0) { + usec_t msg_ut = 0; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0 || !msg_ut) { + errors_no_timestamp++; + continue; + } + + if(unlikely(msg_ut > earliest_msg_ut)) + earliest_msg_ut = msg_ut; - bool timed_out = false; - size_t row_counter = 0; - sd_journal_seek_realtime_usec(j, before_ut); - SD_JOURNAL_FOREACH_BACKWARDS(j) { - row_counter++; + if (unlikely(msg_ut > start_ut)) + continue; + + if (unlikely(msg_ut < stop_ut)) + break; - uint64_t msg_ut; - sd_journal_get_realtime_usec(j, &msg_ut); - if (msg_ut < after_ut) + bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut); + + // make sure each line gets a unique timestamp + if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to)) + msg_ut = --last_usec_from; + else + last_usec_from = last_usec_to = msg_ut; + + if(facets_row_finished(facets, msg_ut)) + rows_useful++; + + row_counter++; + if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 && + stop_when_full && + facets_rows(facets) >= fqs->entries)) { + // stop the data only query + usec_t oldest = facets_row_oldest_ut(facets); + if(oldest && msg_ut < (oldest - anchor_delta)) break; + } - const void *data; - size_t length; - SD_JOURNAL_FOREACH_DATA(j, data, length) { - const char *key = data; - const char *equal = strchr(key, '='); - if(unlikely(!equal)) - continue; + if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) { + FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter); + last_row_counter = row_counter; - const char *value = ++equal; - size_t key_length = value - key; // including '\0' + FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); + last_bytes = bytes; - char key_copy[key_length]; - memcpy(key_copy, key, key_length - 1); - key_copy[key_length - 1] = '\0'; + status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut); + } + } - size_t value_length = length - key_length; // without '\0' - facets_add_key_value_length(facets, key_copy, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH); - } + FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter); + FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); + + fqs->rows_useful += rows_useful; + + if(errors_no_timestamp) + netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); + + if(earliest_msg_ut > fqs->last_modified) + fqs->last_modified = earliest_msg_ut; + + return status; +} + +ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_forward( + sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets, + struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) { + + usec_t anchor_delta = __atomic_load_n(&jf->max_journal_vs_realtime_delta_ut, __ATOMIC_RELAXED); + + usec_t start_ut = (fqs->data_only && fqs->anchor.start_ut) ? fqs->anchor.start_ut : fqs->after_ut; + usec_t stop_ut = ((fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->before_ut) + anchor_delta; + bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut); + + if(!netdata_systemd_journal_seek_to(j, start_ut)) + return ND_SD_JOURNAL_FAILED_TO_SEEK; + + size_t errors_no_timestamp = 0; + usec_t earliest_msg_ut = 0; + size_t row_counter = 0, last_row_counter = 0, rows_useful = 0; + size_t bytes = 0, last_bytes = 0; + + usec_t last_usec_from = 0; + usec_t last_usec_to = 0; + + ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK; + + facets_rows_begin(facets); + while (status == ND_SD_JOURNAL_OK && sd_journal_next(j) > 0) { + usec_t msg_ut = 0; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0 || !msg_ut) { + errors_no_timestamp++; + continue; + } + + if(likely(msg_ut > earliest_msg_ut)) + earliest_msg_ut = msg_ut; - facets_row_finished(facets, msg_ut); + if (unlikely(msg_ut < start_ut)) + continue; - if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) { - timed_out = true; + if (unlikely(msg_ut > stop_ut)) + break; + + bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut); + + // make sure each line gets a unique timestamp + if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to)) + msg_ut = ++last_usec_to; + else + last_usec_from = last_usec_to = msg_ut; + + if(facets_row_finished(facets, msg_ut)) + rows_useful++; + + row_counter++; + if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 && + stop_when_full && + facets_rows(facets) >= fqs->entries)) { + // stop the data only query + usec_t newest = facets_row_newest_ut(facets); + if(newest && msg_ut > (newest + anchor_delta)) break; + } + + if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) { + FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter); + last_row_counter = row_counter; + + FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); + last_bytes = bytes; + + status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut); + } + } + + FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter); + FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); + + fqs->rows_useful += rows_useful; + + if(errors_no_timestamp) + netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); + + if(earliest_msg_ut > fqs->last_modified) + fqs->last_modified = earliest_msg_ut; + + return status; +} + +bool netdata_systemd_journal_check_if_modified_since(sd_journal *j, usec_t seek_to, usec_t last_modified) { + // return true, if data have been modified since the timestamp + + if(!last_modified || !seek_to) + return false; + + if(!netdata_systemd_journal_seek_to(j, seek_to)) + return false; + + usec_t first_msg_ut = 0; + while (sd_journal_previous(j) > 0) { + usec_t msg_ut; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) + continue; + + first_msg_ut = msg_ut; + break; + } + + return first_msg_ut != last_modified; +} + +#ifdef HAVE_SD_JOURNAL_RESTART_FIELDS +static bool netdata_systemd_filtering_by_journal(sd_journal *j, FACETS *facets, FUNCTION_QUERY_STATUS *fqs) { + const char *field = NULL; + const void *data = NULL; + size_t data_length; + size_t added_keys = 0; + size_t failures = 0; + size_t filters_added = 0; + + SD_JOURNAL_FOREACH_FIELD(j, field) { + bool interesting; + + if(fqs->data_only) + interesting = facets_key_name_is_filter(facets, field); + else + interesting = facets_key_name_is_facet(facets, field); + + if(interesting) { + if(sd_journal_query_unique(j, field) >= 0) { + bool added_this_key = false; + size_t added_values = 0; + + SD_JOURNAL_FOREACH_UNIQUE(j, data, data_length) { + const char *key, *value; + size_t key_length, value_length; + + if(!parse_journal_field(data, data_length, &key, &key_length, &value, &value_length)) + continue; + + facets_add_possible_value_name_to_key(facets, key, key_length, value, value_length); + + if(!facets_key_name_value_length_is_selected(facets, key, key_length, value, value_length)) + continue; + + if(added_keys && !added_this_key) { + if(sd_journal_add_conjunction(j) < 0) + failures++; + + added_this_key = true; + added_keys++; + } + else if(added_values) + if(sd_journal_add_disjunction(j) < 0) + failures++; + + if(sd_journal_add_match(j, data, data_length) < 0) + failures++; + + added_values++; + filters_added++; + } } } + } + + if(failures) { + log_fqs(fqs, "failed to setup journal filter, will run the full query."); + sd_journal_flush_matches(j); + return true; + } + + return filters_added ? true : false; +} +#endif // HAVE_SD_JOURNAL_RESTART_FIELDS + +static ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_one_file( + const char *filename, BUFFER *wb, FACETS *facets, + struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) { + + sd_journal *j = NULL; + errno = 0; + + fstat_cache_enable_on_thread(); + + const char *paths[2] = { + [0] = filename, + [1] = NULL, + }; + + if(sd_journal_open_files(&j, paths, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) { + fstat_cache_disable_on_thread(); + return ND_SD_JOURNAL_FAILED_TO_OPEN; + } + + ND_SD_JOURNAL_STATUS status; + bool matches_filters = true; + +#ifdef HAVE_SD_JOURNAL_RESTART_FIELDS + if(fqs->slice) { + usec_t started = now_monotonic_usec(); + + matches_filters = netdata_systemd_filtering_by_journal(j, facets, fqs) || !fqs->filters; + usec_t ended = now_monotonic_usec(); + + fqs->matches_setup_ut += (ended - started); + } +#endif // HAVE_SD_JOURNAL_RESTART_FIELDS + + if(matches_filters) { + if(fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD) + status = netdata_systemd_journal_query_forward(j, wb, facets, jf, fqs); + else + status = netdata_systemd_journal_query_backward(j, wb, facets, jf, fqs); + } + else + status = ND_SD_JOURNAL_NO_FILE_MATCHED; + + sd_journal_close(j); + fstat_cache_disable_on_thread(); + + return status; +} + +// ---------------------------------------------------------------------------- +// journal files registry + +#define VAR_LOG_JOURNAL_MAX_DEPTH 10 +#define MAX_JOURNAL_DIRECTORIES 100 + +struct journal_directory { + char *path; + bool logged_failure; +}; + +static struct journal_directory journal_directories[MAX_JOURNAL_DIRECTORIES] = { 0 }; +static DICTIONARY *journal_files_registry = NULL; +static DICTIONARY *used_hashes_registry = NULL; + +static usec_t systemd_journal_session = 0; + +static void buffer_json_journal_versions(BUFFER *wb) { + buffer_json_member_add_object(wb, "versions"); + { + buffer_json_member_add_uint64(wb, "sources", + systemd_journal_session + dictionary_version(journal_files_registry)); + } + buffer_json_object_close(wb); +} + +static void journal_file_update_msg_ut(const char *filename, struct journal_file *jf) { + fstat_cache_enable_on_thread(); + + const char *files[2] = { + [0] = filename, + [1] = NULL, + }; + + sd_journal *j = NULL; + if(sd_journal_open_files(&j, files, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) { + fstat_cache_disable_on_thread(); + + if(!jf->logged_failure) { + netdata_log_error("cannot open journal file '%s', using file timestamps to understand time-frame.", filename); + jf->logged_failure = true; + } + + jf->msg_first_ut = 0; + jf->msg_last_ut = jf->file_last_modified_ut; + return; + } + + usec_t first_ut = 0, last_ut = 0; + + if(sd_journal_seek_head(j) < 0 || sd_journal_next(j) < 0 || sd_journal_get_realtime_usec(j, &first_ut) < 0 || !first_ut) { + internal_error(true, "cannot find the timestamp of the first message in '%s'", filename); + first_ut = 0; + } + + if(sd_journal_seek_tail(j) < 0 || sd_journal_previous(j) < 0 || sd_journal_get_realtime_usec(j, &last_ut) < 0 || !last_ut) { + internal_error(true, "cannot find the timestamp of the last message in '%s'", filename); + last_ut = jf->file_last_modified_ut; + } sd_journal_close(j); + fstat_cache_disable_on_thread(); + + if(first_ut > last_ut) { + internal_error(true, "timestamps are flipped in file '%s'", filename); + usec_t t = first_ut; + first_ut = last_ut; + last_ut = t; + } + + jf->msg_first_ut = first_ut; + jf->msg_last_ut = last_ut; +} + +static STRING *string_strdupz_source(const char *s, const char *e, size_t max_len, const char *prefix) { + char buf[max_len]; + size_t len; + char *dst = buf; + + if(prefix) { + len = strlen(prefix); + memcpy(buf, prefix, len); + dst = &buf[len]; + max_len -= len; + } + + len = e - s; + if(len >= max_len) + len = max_len - 1; + memcpy(dst, s, len); + dst[len] = '\0'; + buf[max_len - 1] = '\0'; + + for(size_t i = 0; buf[i] ;i++) + if(!isalnum(buf[i]) && buf[i] != '-' && buf[i] != '.' && buf[i] != ':') + buf[i] = '_'; + + return string_strdupz(buf); +} + +static void files_registry_insert_cb(const DICTIONARY_ITEM *item, void *value, void *data __maybe_unused) { + struct journal_file *jf = value; + jf->filename = dictionary_acquired_item_name(item); + jf->filename_len = strlen(jf->filename); + + // based on the filename + // decide the source to show to the user + const char *s = strrchr(jf->filename, '/'); + if(s) { + if(strstr(jf->filename, "/remote/")) + jf->source_type = SDJF_REMOTE; + else { + const char *t = s - 1; + while(t >= jf->filename && *t != '.' && *t != '/') + t--; + + if(t >= jf->filename && *t == '.') { + jf->source_type = SDJF_NAMESPACE; + jf->source = string_strdupz_source(t + 1, s, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "namespace-"); + } + else + jf->source_type = SDJF_LOCAL; + } + + if(strncmp(s, "/system", 7) == 0) + jf->source_type |= SDJF_SYSTEM; + + else if(strncmp(s, "/user", 5) == 0) + jf->source_type |= SDJF_USER; + + else if(strncmp(s, "/remote-", 8) == 0) { + jf->source_type |= SDJF_REMOTE; + + s = &s[8]; // skip "/remote-" + + char *e = strchr(s, '@'); + if(!e) + e = strstr(s, ".journal"); + + if(e) { + const char *d = s; + for(; d < e && (isdigit(*d) || *d == '.' || *d == ':') ; d++) ; + if(d == e) { + // a valid IP address + char ip[e - s + 1]; + memcpy(ip, s, e - s); + ip[e - s] = '\0'; + char buf[SYSTEMD_JOURNAL_MAX_SOURCE_LEN]; + if(ip_to_hostname(ip, buf, sizeof(buf))) + jf->source = string_strdupz_source(buf, &buf[strlen(buf)], SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-"); + else { + internal_error(true, "Cannot find the hostname for IP '%s'", ip); + jf->source = string_strdupz_source(s, e, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-"); + } + } + else + jf->source = string_strdupz_source(s, e, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-"); + } + else + jf->source_type |= SDJF_OTHER; + } + else + jf->source_type |= SDJF_OTHER; + } + else + jf->source_type = SDJF_LOCAL | SDJF_OTHER; + + journal_file_update_msg_ut(jf->filename, jf); + + internal_error(true, + "found journal file '%s', type %d, source '%s', " + "file modified: %"PRIu64", " + "msg {first: %"PRIu64", last: %"PRIu64"}", + jf->filename, jf->source_type, jf->source ? string2str(jf->source) : "<unset>", + jf->file_last_modified_ut, + jf->msg_first_ut, jf->msg_last_ut); +} + +static bool files_registry_conflict_cb(const DICTIONARY_ITEM *item, void *old_value, void *new_value, void *data __maybe_unused) { + struct journal_file *jf = old_value; + struct journal_file *njf = new_value; + + if(njf->last_scan_ut > jf->last_scan_ut) + jf->last_scan_ut = njf->last_scan_ut; + + if(njf->file_last_modified_ut > jf->file_last_modified_ut) { + jf->file_last_modified_ut = njf->file_last_modified_ut; + jf->size = njf->size; + + const char *filename = dictionary_acquired_item_name(item); + journal_file_update_msg_ut(filename, jf); + +// internal_error(true, +// "updated journal file '%s', type %d, " +// "file modified: %"PRIu64", " +// "msg {first: %"PRIu64", last: %"PRIu64"}", +// filename, jf->source_type, +// jf->file_last_modified_ut, +// jf->msg_first_ut, jf->msg_last_ut); + } + + return false; +} + +#define SDJF_SOURCE_ALL_NAME "all" +#define SDJF_SOURCE_LOCAL_NAME "all-local-logs" +#define SDJF_SOURCE_LOCAL_SYSTEM_NAME "all-local-system-logs" +#define SDJF_SOURCE_LOCAL_USERS_NAME "all-local-user-logs" +#define SDJF_SOURCE_LOCAL_OTHER_NAME "all-uncategorized" +#define SDJF_SOURCE_NAMESPACES_NAME "all-local-namespaces" +#define SDJF_SOURCE_REMOTES_NAME "all-remote-systems" + +struct journal_file_source { + usec_t first_ut; + usec_t last_ut; + size_t count; + uint64_t size; +}; + +static void human_readable_size_ib(uint64_t size, char *dst, size_t dst_len) { + if(size > 1024ULL * 1024 * 1024 * 1024) + snprintfz(dst, dst_len, "%0.2f TiB", (double)size / 1024.0 / 1024.0 / 1024.0 / 1024.0); + else if(size > 1024ULL * 1024 * 1024) + snprintfz(dst, dst_len, "%0.2f GiB", (double)size / 1024.0 / 1024.0 / 1024.0); + else if(size > 1024ULL * 1024) + snprintfz(dst, dst_len, "%0.2f MiB", (double)size / 1024.0 / 1024.0); + else if(size > 1024ULL) + snprintfz(dst, dst_len, "%0.2f KiB", (double)size / 1024.0); + else + snprintfz(dst, dst_len, "%"PRIu64" B", size); +} + +#define print_duration(dst, dst_len, pos, remaining, duration, one, many, printed) do { \ + if((remaining) > (duration)) { \ + uint64_t _count = (remaining) / (duration); \ + uint64_t _rem = (remaining) - (_count * (duration)); \ + (pos) += snprintfz(&(dst)[pos], (dst_len) - (pos), "%s%s%"PRIu64" %s", (printed) ? ", " : "", _rem ? "" : "and ", _count, _count > 1 ? (many) : (one)); \ + (remaining) = _rem; \ + (printed) = true; \ + } \ +} while(0) + +static void human_readable_duration_s(time_t duration_s, char *dst, size_t dst_len) { + if(duration_s < 0) + duration_s = -duration_s; + + size_t pos = 0; + dst[0] = 0 ; + + bool printed = false; + print_duration(dst, dst_len, pos, duration_s, 86400 * 365, "year", "years", printed); + print_duration(dst, dst_len, pos, duration_s, 86400 * 30, "month", "months", printed); + print_duration(dst, dst_len, pos, duration_s, 86400 * 1, "day", "days", printed); + print_duration(dst, dst_len, pos, duration_s, 3600 * 1, "hour", "hours", printed); + print_duration(dst, dst_len, pos, duration_s, 60 * 1, "min", "mins", printed); + print_duration(dst, dst_len, pos, duration_s, 1, "sec", "secs", printed); +} + +static int journal_file_to_json_array_cb(const DICTIONARY_ITEM *item, void *entry, void *data) { + struct journal_file_source *jfs = entry; + BUFFER *wb = data; + + const char *name = dictionary_acquired_item_name(item); + + buffer_json_add_array_item_object(wb); + { + char size_for_humans[100]; + human_readable_size_ib(jfs->size, size_for_humans, sizeof(size_for_humans)); + + char duration_for_humans[1024]; + human_readable_duration_s((time_t)((jfs->last_ut - jfs->first_ut) / USEC_PER_SEC), + duration_for_humans, sizeof(duration_for_humans)); + + char info[1024]; + snprintfz(info, sizeof(info), "%zu files, with a total size of %s, covering %s", + jfs->count, size_for_humans, duration_for_humans); + + buffer_json_member_add_string(wb, "id", name); + buffer_json_member_add_string(wb, "name", name); + buffer_json_member_add_string(wb, "pill", size_for_humans); + buffer_json_member_add_string(wb, "info", info); + } + buffer_json_object_close(wb); // options object + + return 1; +} + +static bool journal_file_merge_sizes(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value , void *data __maybe_unused) { + struct journal_file_source *jfs = old_value, *njfs = new_value; + jfs->count += njfs->count; + jfs->size += njfs->size; + + if(njfs->first_ut && njfs->first_ut < jfs->first_ut) + jfs->first_ut = njfs->first_ut; + + if(njfs->last_ut && njfs->last_ut > jfs->last_ut) + jfs->last_ut = njfs->last_ut; + + return false; +} + +static void available_journal_file_sources_to_json_array(BUFFER *wb) { + DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_NAME_LINK_DONT_CLONE|DICT_OPTION_DONT_OVERWRITE_VALUE); + dictionary_register_conflict_callback(dict, journal_file_merge_sizes, NULL); + + struct journal_file_source t = { 0 }; + + struct journal_file *jf; + dfe_start_read(journal_files_registry, jf) { + t.first_ut = jf->msg_first_ut; + t.last_ut = jf->msg_last_ut; + t.count = 1; + t.size = jf->size; + + dictionary_set(dict, SDJF_SOURCE_ALL_NAME, &t, sizeof(t)); + + if((jf->source_type & (SDJF_LOCAL)) == (SDJF_LOCAL)) + dictionary_set(dict, SDJF_SOURCE_LOCAL_NAME, &t, sizeof(t)); + if((jf->source_type & (SDJF_LOCAL | SDJF_SYSTEM)) == (SDJF_LOCAL | SDJF_SYSTEM)) + dictionary_set(dict, SDJF_SOURCE_LOCAL_SYSTEM_NAME, &t, sizeof(t)); + if((jf->source_type & (SDJF_LOCAL | SDJF_USER)) == (SDJF_LOCAL | SDJF_USER)) + dictionary_set(dict, SDJF_SOURCE_LOCAL_USERS_NAME, &t, sizeof(t)); + if((jf->source_type & (SDJF_LOCAL | SDJF_OTHER)) == (SDJF_LOCAL | SDJF_OTHER)) + dictionary_set(dict, SDJF_SOURCE_LOCAL_OTHER_NAME, &t, sizeof(t)); + if((jf->source_type & (SDJF_NAMESPACE)) == (SDJF_NAMESPACE)) + dictionary_set(dict, SDJF_SOURCE_NAMESPACES_NAME, &t, sizeof(t)); + if((jf->source_type & (SDJF_REMOTE)) == (SDJF_REMOTE)) + dictionary_set(dict, SDJF_SOURCE_REMOTES_NAME, &t, sizeof(t)); + if(jf->source) + dictionary_set(dict, string2str(jf->source), &t, sizeof(t)); + } + dfe_done(jf); + + dictionary_sorted_walkthrough_read(dict, journal_file_to_json_array_cb, wb); + + dictionary_destroy(dict); +} + +static void files_registry_delete_cb(const DICTIONARY_ITEM *item, void *value, void *data __maybe_unused) { + struct journal_file *jf = value; (void)jf; + const char *filename = dictionary_acquired_item_name(item); (void)filename; + + string_freez(jf->source); + internal_error(true, "removed journal file '%s'", filename); +} + +void journal_directory_scan(const char *dirname, int depth, usec_t last_scan_ut) { + static const char *ext = ".journal"; + static const size_t ext_len = sizeof(".journal") - 1; + + if (depth > VAR_LOG_JOURNAL_MAX_DEPTH) + return; + + DIR *dir; + struct dirent *entry; + struct stat info; + char absolute_path[FILENAME_MAX]; + + // Open the directory. + if ((dir = opendir(dirname)) == NULL) { + if(errno != ENOENT && errno != ENOTDIR) + netdata_log_error("Cannot opendir() '%s'", dirname); + return; + } + + // Read each entry in the directory. + while ((entry = readdir(dir)) != NULL) { + snprintfz(absolute_path, sizeof(absolute_path), "%s/%s", dirname, entry->d_name); + if (stat(absolute_path, &info) != 0) { + netdata_log_error("Failed to stat() '%s", absolute_path); + continue; + } + + if (S_ISDIR(info.st_mode)) { + // If entry is a directory, call traverse recursively. + if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0) + journal_directory_scan(absolute_path, depth + 1, last_scan_ut); + + } + else if (S_ISREG(info.st_mode)) { + // If entry is a regular file, check if it ends with .journal. + char *filename = entry->d_name; + size_t len = strlen(filename); + + if (len > ext_len && strcmp(filename + len - ext_len, ext) == 0) { + struct journal_file t = { + .file_last_modified_ut = info.st_mtim.tv_sec * USEC_PER_SEC + info.st_mtim.tv_nsec / NSEC_PER_USEC, + .last_scan_ut = last_scan_ut, + .size = info.st_size, + .max_journal_vs_realtime_delta_ut = JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT, + }; + dictionary_set(journal_files_registry, absolute_path, &t, sizeof(t)); + } + } + } + + closedir(dir); +} + +static void journal_files_registry_update() { + usec_t scan_ut = now_monotonic_usec(); + + for(unsigned i = 0; i < MAX_JOURNAL_DIRECTORIES ;i++) { + if(!journal_directories[i].path) + break; + + journal_directory_scan(journal_directories[i].path, 0, scan_ut); + } + + struct journal_file *jf; + dfe_start_write(journal_files_registry, jf) { + if(jf->last_scan_ut < scan_ut) + dictionary_del(journal_files_registry, jf_dfe.name); + } + dfe_done(jf); +} + +// ---------------------------------------------------------------------------- + +static bool jf_is_mine(struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) { + + if((fqs->source_type == SDJF_ALL || (jf->source_type & fqs->source_type) == fqs->source_type) && + (!fqs->source || fqs->source == jf->source)) { + + usec_t anchor_delta = JOURNAL_VS_REALTIME_DELTA_MAX_UT; + usec_t first_ut = jf->msg_first_ut; + usec_t last_ut = jf->msg_last_ut + anchor_delta; + + if(last_ut >= fqs->after_ut && first_ut <= fqs->before_ut) + return true; + } + + return false; +} + +static int journal_file_dict_items_backward_compar(const void *a, const void *b) { + const DICTIONARY_ITEM **ad = (const DICTIONARY_ITEM **)a, **bd = (const DICTIONARY_ITEM **)b; + struct journal_file *jfa = dictionary_acquired_item_value(*ad); + struct journal_file *jfb = dictionary_acquired_item_value(*bd); + + if(jfa->msg_last_ut < jfb->msg_last_ut) + return 1; + + if(jfa->msg_last_ut > jfb->msg_last_ut) + return -1; + + if(jfa->msg_first_ut < jfb->msg_first_ut) + return 1; + + if(jfa->msg_first_ut > jfb->msg_first_ut) + return -1; + + return 0; +} + +static int journal_file_dict_items_forward_compar(const void *a, const void *b) { + return -journal_file_dict_items_backward_compar(a, b); +} + +static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QUERY_STATUS *fqs) { + ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_NO_FILE_MATCHED; + struct journal_file *jf; + + fqs->files_matched = 0; + fqs->file_working = 0; + fqs->rows_useful = 0; + fqs->rows_read = 0; + fqs->bytes_read = 0; + + size_t files_used = 0; + size_t files_max = dictionary_entries(journal_files_registry); + const DICTIONARY_ITEM *file_items[files_max]; + + // count the files + bool files_are_newer = false; + dfe_start_read(journal_files_registry, jf) { + if(!jf_is_mine(jf, fqs)) + continue; + + file_items[files_used++] = dictionary_acquired_item_dup(journal_files_registry, jf_dfe.item); + + if(jf->msg_last_ut > fqs->if_modified_since) + files_are_newer = true; + } + dfe_done(jf); + + fqs->files_matched = files_used; + + if(fqs->if_modified_since && !files_are_newer) { + buffer_flush(wb); + return HTTP_RESP_NOT_MODIFIED; + } + + // sort the files, so that they are optimal for facets + if(files_used >= 2) { + if (fqs->direction == FACETS_ANCHOR_DIRECTION_BACKWARD) + qsort(file_items, files_used, sizeof(const DICTIONARY_ITEM *), + journal_file_dict_items_backward_compar); + else + qsort(file_items, files_used, sizeof(const DICTIONARY_ITEM *), + journal_file_dict_items_forward_compar); + } + + bool partial = false; + usec_t started_ut; + usec_t ended_ut = now_monotonic_usec(); + + buffer_json_member_add_array(wb, "_journal_files"); + for(size_t f = 0; f < files_used ;f++) { + const char *filename = dictionary_acquired_item_name(file_items[f]); + jf = dictionary_acquired_item_value(file_items[f]); + + if(!jf_is_mine(jf, fqs)) + continue; + + fqs->file_working++; + fqs->cached_count = 0; + + size_t fs_calls = fstat_thread_calls; + size_t fs_cached = fstat_thread_cached_responses; + size_t rows_useful = fqs->rows_useful; + size_t rows_read = fqs->rows_read; + size_t bytes_read = fqs->bytes_read; + size_t matches_setup_ut = fqs->matches_setup_ut; + + ND_SD_JOURNAL_STATUS tmp_status = netdata_systemd_journal_query_one_file(filename, wb, facets, jf, fqs); + + rows_useful = fqs->rows_useful - rows_useful; + rows_read = fqs->rows_read - rows_read; + bytes_read = fqs->bytes_read - bytes_read; + matches_setup_ut = fqs->matches_setup_ut - matches_setup_ut; + fs_calls = fstat_thread_calls - fs_calls; + fs_cached = fstat_thread_cached_responses - fs_cached; + + started_ut = ended_ut; + ended_ut = now_monotonic_usec(); + usec_t duration_ut = ended_ut - started_ut; + + buffer_json_add_array_item_object(wb); // journal file + { + // information about the file + buffer_json_member_add_string(wb, "_filename", filename); + buffer_json_member_add_uint64(wb, "_source_type", jf->source_type); + buffer_json_member_add_string(wb, "_source", string2str(jf->source)); + buffer_json_member_add_uint64(wb, "_last_modified_ut", jf->file_last_modified_ut); + buffer_json_member_add_uint64(wb, "_msg_first_ut", jf->msg_first_ut); + buffer_json_member_add_uint64(wb, "_msg_last_ut", jf->msg_last_ut); + buffer_json_member_add_uint64(wb, "_journal_vs_realtime_delta_ut", jf->max_journal_vs_realtime_delta_ut); + + // information about the current use of the file + buffer_json_member_add_uint64(wb, "duration_ut", ended_ut - started_ut); + buffer_json_member_add_uint64(wb, "rows_read", rows_read); + buffer_json_member_add_uint64(wb, "rows_useful", rows_useful); + buffer_json_member_add_double(wb, "rows_per_second", (double) rows_read / (double) duration_ut * (double) USEC_PER_SEC); + buffer_json_member_add_uint64(wb, "bytes_read", bytes_read); + buffer_json_member_add_double(wb, "bytes_per_second", (double) bytes_read / (double) duration_ut * (double) USEC_PER_SEC); + buffer_json_member_add_uint64(wb, "duration_matches_ut", matches_setup_ut); + buffer_json_member_add_uint64(wb, "fstat_query_calls", fs_calls); + buffer_json_member_add_uint64(wb, "fstat_query_cached_responses", fs_cached); + } + buffer_json_object_close(wb); // journal file + + bool stop = false; + switch(tmp_status) { + case ND_SD_JOURNAL_OK: + case ND_SD_JOURNAL_NO_FILE_MATCHED: + status = (status == ND_SD_JOURNAL_OK) ? ND_SD_JOURNAL_OK : tmp_status; + break; + + case ND_SD_JOURNAL_FAILED_TO_OPEN: + case ND_SD_JOURNAL_FAILED_TO_SEEK: + partial = true; + if(status == ND_SD_JOURNAL_NO_FILE_MATCHED) + status = tmp_status; + break; + + case ND_SD_JOURNAL_CANCELLED: + case ND_SD_JOURNAL_TIMED_OUT: + partial = true; + stop = true; + status = tmp_status; + break; + + case ND_SD_JOURNAL_NOT_MODIFIED: + internal_fatal(true, "this should never be returned here"); + break; + } + + if(stop) + break; + } + buffer_json_array_close(wb); // _journal_files + + // release the files + for(size_t f = 0; f < files_used ;f++) + dictionary_acquired_item_release(journal_files_registry, file_items[f]); + + switch (status) { + case ND_SD_JOURNAL_OK: + if(fqs->if_modified_since && !fqs->rows_useful) { + buffer_flush(wb); + return HTTP_RESP_NOT_MODIFIED; + } + break; + + case ND_SD_JOURNAL_TIMED_OUT: + case ND_SD_JOURNAL_NO_FILE_MATCHED: + break; + + case ND_SD_JOURNAL_CANCELLED: + buffer_flush(wb); + return HTTP_RESP_CLIENT_CLOSED_REQUEST; + + case ND_SD_JOURNAL_NOT_MODIFIED: + buffer_flush(wb); + return HTTP_RESP_NOT_MODIFIED; + + default: + case ND_SD_JOURNAL_FAILED_TO_OPEN: + case ND_SD_JOURNAL_FAILED_TO_SEEK: + buffer_flush(wb); + return HTTP_RESP_INTERNAL_SERVER_ERROR; + } buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK); - buffer_json_member_add_boolean(wb, "partial", timed_out); + buffer_json_member_add_boolean(wb, "partial", partial); buffer_json_member_add_string(wb, "type", "table"); - buffer_json_member_add_time_t(wb, "update_every", 1); - buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); - facets_report(facets, wb); + if(!fqs->data_only) { + buffer_json_member_add_time_t(wb, "update_every", 1); + buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); + } + + if(!fqs->data_only || fqs->tail) + buffer_json_member_add_uint64(wb, "last_modified", fqs->last_modified); + + facets_sort_and_reorder_keys(facets); + facets_report(facets, wb, used_hashes_registry); + + buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + (fqs->data_only ? 3600 : 0)); - buffer_json_member_add_time_t(wb, "expires", now_realtime_sec()); + buffer_json_member_add_object(wb, "_fstat_caching"); + { + buffer_json_member_add_uint64(wb, "calls", fstat_thread_calls); + buffer_json_member_add_uint64(wb, "cached", fstat_thread_cached_responses); + } + buffer_json_object_close(wb); // _fstat_caching buffer_json_finalize(wb); return HTTP_RESP_OK; } -static void systemd_journal_function_help(const char *transaction) { - pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600); - fprintf(stdout, +static void netdata_systemd_journal_function_help(const char *transaction) { + BUFFER *wb = buffer_create(0, NULL); + buffer_sprintf(wb, "%s / %s\n" "\n" "%s\n" "\n" - "The following filters are supported:\n" + "The following parameters are supported:\n" "\n" - " help\n" + " "JOURNAL_PARAMETER_HELP"\n" " Shows this help message.\n" "\n" - " before:TIMESTAMP\n" + " "JOURNAL_PARAMETER_INFO"\n" + " Request initial configuration information about the plugin.\n" + " The key entity returned is the required_params array, which includes\n" + " all the available systemd journal sources.\n" + " When `"JOURNAL_PARAMETER_INFO"` is requested, all other parameters are ignored.\n" + "\n" + " "JOURNAL_PARAMETER_ID":STRING\n" + " Caller supplied unique ID of the request.\n" + " This can be used later to request a progress report of the query.\n" + " Optional, but if omitted no `"JOURNAL_PARAMETER_PROGRESS"` can be requested.\n" + "\n" + " "JOURNAL_PARAMETER_PROGRESS"\n" + " Request a progress report (the `id` of a running query is required).\n" + " When `"JOURNAL_PARAMETER_PROGRESS"` is requested, only parameter `"JOURNAL_PARAMETER_ID"` is used.\n" + "\n" + " "JOURNAL_PARAMETER_DATA_ONLY":true or "JOURNAL_PARAMETER_DATA_ONLY":false\n" + " Quickly respond with data requested, without generating a\n" + " `histogram`, `facets` counters and `items`.\n" + "\n" + " "JOURNAL_PARAMETER_DELTA":true or "JOURNAL_PARAMETER_DELTA":false\n" + " When doing data only queries, include deltas for histogram, facets and items.\n" + "\n" + " "JOURNAL_PARAMETER_TAIL":true or "JOURNAL_PARAMETER_TAIL":false\n" + " When doing data only queries, respond with the newest messages,\n" + " and up to the anchor, but calculate deltas (if requested) for\n" + " the duration [anchor - before].\n" + "\n" + " "JOURNAL_PARAMETER_SLICE":true or "JOURNAL_PARAMETER_SLICE":false\n" + " When it is turned on, the plugin is executing filtering via libsystemd,\n" + " utilizing all the available indexes of the journal files.\n" + " When it is off, only the time constraint is handled by libsystemd and\n" + " all filtering is done by the plugin.\n" + " The default is: %s\n" + "\n" + " "JOURNAL_PARAMETER_SOURCE":SOURCE\n" + " Query only the specified journal sources.\n" + " Do an `"JOURNAL_PARAMETER_INFO"` query to find the sources.\n" + "\n" + " "JOURNAL_PARAMETER_BEFORE":TIMESTAMP_IN_SECONDS\n" " Absolute or relative (to now) timestamp in seconds, to start the query.\n" " The query is always executed from the most recent to the oldest log entry.\n" " If not given the default is: now.\n" "\n" - " after:TIMESTAMP\n" + " "JOURNAL_PARAMETER_AFTER":TIMESTAMP_IN_SECONDS\n" " Absolute or relative (to `before`) timestamp in seconds, to end the query.\n" " If not given, the default is %d.\n" "\n" - " last:ITEMS\n" + " "JOURNAL_PARAMETER_LAST":ITEMS\n" " The number of items to return.\n" " The default is %d.\n" "\n" - " anchor:NUMBER\n" - " The `timestamp` of the item last received, to return log entries after that.\n" - " If not given, the query will return the top `ITEMS` from the most recent.\n" + " "JOURNAL_PARAMETER_ANCHOR":TIMESTAMP_IN_MICROSECONDS\n" + " Return items relative to this timestamp.\n" + " The exact items to be returned depend on the query `"JOURNAL_PARAMETER_DIRECTION"`.\n" + "\n" + " "JOURNAL_PARAMETER_DIRECTION":forward or "JOURNAL_PARAMETER_DIRECTION":backward\n" + " When set to `backward` (default) the items returned are the newest before the\n" + " `"JOURNAL_PARAMETER_ANCHOR"`, (or `"JOURNAL_PARAMETER_BEFORE"` if `"JOURNAL_PARAMETER_ANCHOR"` is not set)\n" + " When set to `forward` the items returned are the oldest after the\n" + " `"JOURNAL_PARAMETER_ANCHOR"`, (or `"JOURNAL_PARAMETER_AFTER"` if `"JOURNAL_PARAMETER_ANCHOR"` is not set)\n" + " The default is: %s\n" + "\n" + " "JOURNAL_PARAMETER_QUERY":SIMPLE_PATTERN\n" + " Do a full text search to find the log entries matching the pattern given.\n" + " The plugin is searching for matches on all fields of the database.\n" + "\n" + " "JOURNAL_PARAMETER_IF_MODIFIED_SINCE":TIMESTAMP_IN_MICROSECONDS\n" + " Each successful response, includes a `last_modified` field.\n" + " By providing the timestamp to the `"JOURNAL_PARAMETER_IF_MODIFIED_SINCE"` parameter,\n" + " the plugin will return 200 with a successful response, or 304 if the source has not\n" + " been modified since that timestamp.\n" + "\n" + " "JOURNAL_PARAMETER_HISTOGRAM":facet_id\n" + " Use the given `facet_id` for the histogram.\n" + " This parameter is ignored in `"JOURNAL_PARAMETER_DATA_ONLY"` mode.\n" + "\n" + " "JOURNAL_PARAMETER_FACETS":facet_id1,facet_id2,facet_id3,...\n" + " Add the given facets to the list of fields for which analysis is required.\n" + " The plugin will offer both a histogram and facet value counters for its values.\n" + " This parameter is ignored in `"JOURNAL_PARAMETER_DATA_ONLY"` mode.\n" "\n" " facet_id:value_id1,value_id2,value_id3,...\n" " Apply filters to the query, based on the facet IDs returned.\n" " Each `facet_id` can be given once, but multiple `facet_ids` can be given.\n" "\n" - "Filters can be combined. Each filter can be given only one time.\n" , program_name , SYSTEMD_JOURNAL_FUNCTION_NAME , SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION + , JOURNAL_DEFAULT_SLICE_MODE ? "true" : "false" // slice , -SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION , SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY + , JOURNAL_DEFAULT_DIRECTION == FACETS_ANCHOR_DIRECTION_BACKWARD ? "backward" : "forward" ); - pluginsd_function_result_end_to_stdout(); + + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb); + netdata_mutex_unlock(&stdout_mutex); + + buffer_free(wb); } +const char *errno_map[] = { + [1] = "1 (EPERM)", // "Operation not permitted", + [2] = "2 (ENOENT)", // "No such file or directory", + [3] = "3 (ESRCH)", // "No such process", + [4] = "4 (EINTR)", // "Interrupted system call", + [5] = "5 (EIO)", // "Input/output error", + [6] = "6 (ENXIO)", // "No such device or address", + [7] = "7 (E2BIG)", // "Argument list too long", + [8] = "8 (ENOEXEC)", // "Exec format error", + [9] = "9 (EBADF)", // "Bad file descriptor", + [10] = "10 (ECHILD)", // "No child processes", + [11] = "11 (EAGAIN)", // "Resource temporarily unavailable", + [12] = "12 (ENOMEM)", // "Cannot allocate memory", + [13] = "13 (EACCES)", // "Permission denied", + [14] = "14 (EFAULT)", // "Bad address", + [15] = "15 (ENOTBLK)", // "Block device required", + [16] = "16 (EBUSY)", // "Device or resource busy", + [17] = "17 (EEXIST)", // "File exists", + [18] = "18 (EXDEV)", // "Invalid cross-device link", + [19] = "19 (ENODEV)", // "No such device", + [20] = "20 (ENOTDIR)", // "Not a directory", + [21] = "21 (EISDIR)", // "Is a directory", + [22] = "22 (EINVAL)", // "Invalid argument", + [23] = "23 (ENFILE)", // "Too many open files in system", + [24] = "24 (EMFILE)", // "Too many open files", + [25] = "25 (ENOTTY)", // "Inappropriate ioctl for device", + [26] = "26 (ETXTBSY)", // "Text file busy", + [27] = "27 (EFBIG)", // "File too large", + [28] = "28 (ENOSPC)", // "No space left on device", + [29] = "29 (ESPIPE)", // "Illegal seek", + [30] = "30 (EROFS)", // "Read-only file system", + [31] = "31 (EMLINK)", // "Too many links", + [32] = "32 (EPIPE)", // "Broken pipe", + [33] = "33 (EDOM)", // "Numerical argument out of domain", + [34] = "34 (ERANGE)", // "Numerical result out of range", + [35] = "35 (EDEADLK)", // "Resource deadlock avoided", + [36] = "36 (ENAMETOOLONG)", // "File name too long", + [37] = "37 (ENOLCK)", // "No locks available", + [38] = "38 (ENOSYS)", // "Function not implemented", + [39] = "39 (ENOTEMPTY)", // "Directory not empty", + [40] = "40 (ELOOP)", // "Too many levels of symbolic links", + [42] = "42 (ENOMSG)", // "No message of desired type", + [43] = "43 (EIDRM)", // "Identifier removed", + [44] = "44 (ECHRNG)", // "Channel number out of range", + [45] = "45 (EL2NSYNC)", // "Level 2 not synchronized", + [46] = "46 (EL3HLT)", // "Level 3 halted", + [47] = "47 (EL3RST)", // "Level 3 reset", + [48] = "48 (ELNRNG)", // "Link number out of range", + [49] = "49 (EUNATCH)", // "Protocol driver not attached", + [50] = "50 (ENOCSI)", // "No CSI structure available", + [51] = "51 (EL2HLT)", // "Level 2 halted", + [52] = "52 (EBADE)", // "Invalid exchange", + [53] = "53 (EBADR)", // "Invalid request descriptor", + [54] = "54 (EXFULL)", // "Exchange full", + [55] = "55 (ENOANO)", // "No anode", + [56] = "56 (EBADRQC)", // "Invalid request code", + [57] = "57 (EBADSLT)", // "Invalid slot", + [59] = "59 (EBFONT)", // "Bad font file format", + [60] = "60 (ENOSTR)", // "Device not a stream", + [61] = "61 (ENODATA)", // "No data available", + [62] = "62 (ETIME)", // "Timer expired", + [63] = "63 (ENOSR)", // "Out of streams resources", + [64] = "64 (ENONET)", // "Machine is not on the network", + [65] = "65 (ENOPKG)", // "Package not installed", + [66] = "66 (EREMOTE)", // "Object is remote", + [67] = "67 (ENOLINK)", // "Link has been severed", + [68] = "68 (EADV)", // "Advertise error", + [69] = "69 (ESRMNT)", // "Srmount error", + [70] = "70 (ECOMM)", // "Communication error on send", + [71] = "71 (EPROTO)", // "Protocol error", + [72] = "72 (EMULTIHOP)", // "Multihop attempted", + [73] = "73 (EDOTDOT)", // "RFS specific error", + [74] = "74 (EBADMSG)", // "Bad message", + [75] = "75 (EOVERFLOW)", // "Value too large for defined data type", + [76] = "76 (ENOTUNIQ)", // "Name not unique on network", + [77] = "77 (EBADFD)", // "File descriptor in bad state", + [78] = "78 (EREMCHG)", // "Remote address changed", + [79] = "79 (ELIBACC)", // "Can not access a needed shared library", + [80] = "80 (ELIBBAD)", // "Accessing a corrupted shared library", + [81] = "81 (ELIBSCN)", // ".lib section in a.out corrupted", + [82] = "82 (ELIBMAX)", // "Attempting to link in too many shared libraries", + [83] = "83 (ELIBEXEC)", // "Cannot exec a shared library directly", + [84] = "84 (EILSEQ)", // "Invalid or incomplete multibyte or wide character", + [85] = "85 (ERESTART)", // "Interrupted system call should be restarted", + [86] = "86 (ESTRPIPE)", // "Streams pipe error", + [87] = "87 (EUSERS)", // "Too many users", + [88] = "88 (ENOTSOCK)", // "Socket operation on non-socket", + [89] = "89 (EDESTADDRREQ)", // "Destination address required", + [90] = "90 (EMSGSIZE)", // "Message too long", + [91] = "91 (EPROTOTYPE)", // "Protocol wrong type for socket", + [92] = "92 (ENOPROTOOPT)", // "Protocol not available", + [93] = "93 (EPROTONOSUPPORT)", // "Protocol not supported", + [94] = "94 (ESOCKTNOSUPPORT)", // "Socket type not supported", + [95] = "95 (ENOTSUP)", // "Operation not supported", + [96] = "96 (EPFNOSUPPORT)", // "Protocol family not supported", + [97] = "97 (EAFNOSUPPORT)", // "Address family not supported by protocol", + [98] = "98 (EADDRINUSE)", // "Address already in use", + [99] = "99 (EADDRNOTAVAIL)", // "Cannot assign requested address", + [100] = "100 (ENETDOWN)", // "Network is down", + [101] = "101 (ENETUNREACH)", // "Network is unreachable", + [102] = "102 (ENETRESET)", // "Network dropped connection on reset", + [103] = "103 (ECONNABORTED)", // "Software caused connection abort", + [104] = "104 (ECONNRESET)", // "Connection reset by peer", + [105] = "105 (ENOBUFS)", // "No buffer space available", + [106] = "106 (EISCONN)", // "Transport endpoint is already connected", + [107] = "107 (ENOTCONN)", // "Transport endpoint is not connected", + [108] = "108 (ESHUTDOWN)", // "Cannot send after transport endpoint shutdown", + [109] = "109 (ETOOMANYREFS)", // "Too many references: cannot splice", + [110] = "110 (ETIMEDOUT)", // "Connection timed out", + [111] = "111 (ECONNREFUSED)", // "Connection refused", + [112] = "112 (EHOSTDOWN)", // "Host is down", + [113] = "113 (EHOSTUNREACH)", // "No route to host", + [114] = "114 (EALREADY)", // "Operation already in progress", + [115] = "115 (EINPROGRESS)", // "Operation now in progress", + [116] = "116 (ESTALE)", // "Stale file handle", + [117] = "117 (EUCLEAN)", // "Structure needs cleaning", + [118] = "118 (ENOTNAM)", // "Not a XENIX named type file", + [119] = "119 (ENAVAIL)", // "No XENIX semaphores available", + [120] = "120 (EISNAM)", // "Is a named type file", + [121] = "121 (EREMOTEIO)", // "Remote I/O error", + [122] = "122 (EDQUOT)", // "Disk quota exceeded", + [123] = "123 (ENOMEDIUM)", // "No medium found", + [124] = "124 (EMEDIUMTYPE)", // "Wrong medium type", + [125] = "125 (ECANCELED)", // "Operation canceled", + [126] = "126 (ENOKEY)", // "Required key not available", + [127] = "127 (EKEYEXPIRED)", // "Key has expired", + [128] = "128 (EKEYREVOKED)", // "Key has been revoked", + [129] = "129 (EKEYREJECTED)", // "Key was rejected by service", + [130] = "130 (EOWNERDEAD)", // "Owner died", + [131] = "131 (ENOTRECOVERABLE)", // "State not recoverable", + [132] = "132 (ERFKILL)", // "Operation not possible due to RF-kill", + [133] = "133 (EHWPOISON)", // "Memory page has hardware error", +}; + static const char *syslog_facility_to_name(int facility) { switch (facility) { case LOG_FAC(LOG_KERN): return "kern"; @@ -216,31 +1690,57 @@ static const char *syslog_priority_to_name(int priority) { } } +static FACET_ROW_SEVERITY syslog_priority_to_facet_severity(FACETS *facets __maybe_unused, FACET_ROW *row, void *data __maybe_unused) { + // same to + // https://github.com/systemd/systemd/blob/aab9e4b2b86905a15944a1ac81e471b5b7075932/src/basic/terminal-util.c#L1501 + // function get_log_colors() + + FACET_ROW_KEY_VALUE *priority_rkv = dictionary_get(row->dict, "PRIORITY"); + if(!priority_rkv || priority_rkv->empty) + return FACET_ROW_SEVERITY_NORMAL; + + int priority = str2i(buffer_tostring(priority_rkv->wb)); + + if(priority <= LOG_ERR) + return FACET_ROW_SEVERITY_CRITICAL; + + else if (priority <= LOG_WARNING) + return FACET_ROW_SEVERITY_WARNING; + + else if(priority <= LOG_NOTICE) + return FACET_ROW_SEVERITY_NOTICE; + + else if(priority >= LOG_DEBUG) + return FACET_ROW_SEVERITY_DEBUG; + + return FACET_ROW_SEVERITY_NORMAL; +} + static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) { - struct passwd pw, *result; - char tmp[1024 + 1]; + static __thread char tmp[1024 + 1]; + struct passwd pw, *result = NULL; - if (getpwuid_r(uid, &pw, tmp, 1024, &result) != 0 || result == NULL) - return NULL; + if (getpwuid_r(uid, &pw, tmp, sizeof(tmp), &result) != 0 || !result || !pw.pw_name || !(*pw.pw_name)) + snprintfz(buffer, buffer_size - 1, "%u", uid); + else + snprintfz(buffer, buffer_size - 1, "%u (%s)", uid, pw.pw_name); - strncpy(buffer, pw.pw_name, buffer_size - 1); - buffer[buffer_size - 1] = '\0'; // Null-terminate just in case return buffer; } static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) { - struct group grp, *result; - char tmp[1024 + 1]; + static __thread char tmp[1024]; + struct group grp, *result = NULL; - if (getgrgid_r(gid, &grp, tmp, 1024, &result) != 0 || result == NULL) - return NULL; + if (getgrgid_r(gid, &grp, tmp, sizeof(tmp), &result) != 0 || !result || !grp.gr_name || !(*grp.gr_name)) + snprintfz(buffer, buffer_size - 1, "%u", gid); + else + snprintfz(buffer, buffer_size - 1, "%u (%s)", gid, grp.gr_name); - strncpy(buffer, grp.gr_name, buffer_size - 1); - buffer[buffer_size - 1] = '\0'; // Null-terminate just in case return buffer; } -static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { +static void netdata_systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { int facility = str2i(buffer_tostring(wb)); @@ -252,7 +1752,10 @@ static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unu } } -static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { +static void netdata_systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + if(scope == FACETS_TRANSFORM_FACET_SORT) + return; + const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { int priority = str2i(buffer_tostring(wb)); @@ -264,141 +1767,663 @@ static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BU } } -static void systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { - DICTIONARY *cache = data; +static void netdata_systemd_journal_transform_errno(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + if(scope == FACETS_TRANSFORM_FACET_SORT) + return; + const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { - const char *sv = dictionary_get(cache, v); - if(!sv) { - char buf[1024 + 1]; - int uid = str2i(buffer_tostring(wb)); - const char *name = uid_to_username(uid, buf, 1024); - if (!name) - name = v; + unsigned err_no = str2u(buffer_tostring(wb)); + if(err_no > 0 && err_no < sizeof(errno_map) / sizeof(*errno_map)) { + const char *name = errno_map[err_no]; + if(name) { + buffer_flush(wb); + buffer_strcat(wb, name); + } + } + } +} + +// ---------------------------------------------------------------------------- +// UID and GID transformation + +#define UID_GID_HASHTABLE_SIZE 10000 + +struct word_t2str_hashtable_entry { + struct word_t2str_hashtable_entry *next; + Word_t hash; + size_t len; + char str[]; +}; + +struct word_t2str_hashtable { + SPINLOCK spinlock; + size_t size; + struct word_t2str_hashtable_entry *hashtable[UID_GID_HASHTABLE_SIZE]; +}; + +struct word_t2str_hashtable uid_hashtable = { + .size = UID_GID_HASHTABLE_SIZE, +}; + +struct word_t2str_hashtable gid_hashtable = { + .size = UID_GID_HASHTABLE_SIZE, +}; + +struct word_t2str_hashtable_entry **word_t2str_hashtable_slot(struct word_t2str_hashtable *ht, Word_t hash) { + size_t slot = hash % ht->size; + struct word_t2str_hashtable_entry **e = &ht->hashtable[slot]; + + while(*e && (*e)->hash != hash) + e = &((*e)->next); + + return e; +} + +const char *uid_to_username_cached(uid_t uid, size_t *length) { + spinlock_lock(&uid_hashtable.spinlock); + + struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&uid_hashtable, uid); + if(!(*e)) { + static __thread char buf[1024]; + const char *name = uid_to_username(uid, buf, sizeof(buf)); + size_t size = strlen(name) + 1; + + *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size); + (*e)->len = size - 1; + (*e)->hash = uid; + memcpy((*e)->str, name, size); + } + + spinlock_unlock(&uid_hashtable.spinlock); - sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1); + *length = (*e)->len; + return (*e)->str; +} + +const char *gid_to_groupname_cached(gid_t gid, size_t *length) { + spinlock_lock(&gid_hashtable.spinlock); + + struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&gid_hashtable, gid); + if(!(*e)) { + static __thread char buf[1024]; + const char *name = gid_to_groupname(gid, buf, sizeof(buf)); + size_t size = strlen(name) + 1; + + *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size); + (*e)->len = size - 1; + (*e)->hash = gid; + memcpy((*e)->str, name, size); + } + + spinlock_unlock(&gid_hashtable.spinlock); + + *length = (*e)->len; + return (*e)->str; +} + +DICTIONARY *boot_ids_to_first_ut = NULL; + +static void netdata_systemd_journal_transform_boot_id(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + const char *boot_id = buffer_tostring(wb); + if(*boot_id && isxdigit(*boot_id)) { + usec_t ut = UINT64_MAX; + usec_t *p_ut = dictionary_get(boot_ids_to_first_ut, boot_id); + if(!p_ut) { + struct journal_file *jf; + dfe_start_read(journal_files_registry, jf) { + const char *files[2] = { + [0] = jf_dfe.name, + [1] = NULL, + }; + + sd_journal *j = NULL; + if(sd_journal_open_files(&j, files, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) + continue; + + char m[100]; + size_t len = snprintfz(m, sizeof(m), "_BOOT_ID=%s", boot_id); + usec_t t_ut = 0; + if(sd_journal_add_match(j, m, len) < 0 || + sd_journal_seek_head(j) < 0 || + sd_journal_next(j) < 0 || + sd_journal_get_realtime_usec(j, &t_ut) < 0 || !t_ut) { + sd_journal_close(j); + continue; + } + + if(t_ut < ut) + ut = t_ut; + + sd_journal_close(j); + } + dfe_done(jf); + + dictionary_set(boot_ids_to_first_ut, boot_id, &ut, sizeof(ut)); } + else + ut = *p_ut; + + if(ut != UINT64_MAX) { + time_t timestamp_sec = (time_t)(ut / USEC_PER_SEC); + struct tm tm; + char buffer[30]; + + gmtime_r(×tamp_sec, &tm); + strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tm); + + switch(scope) { + default: + case FACETS_TRANSFORM_DATA: + case FACETS_TRANSFORM_VALUE: + buffer_sprintf(wb, " (%s UTC) ", buffer); + break; + + case FACETS_TRANSFORM_FACET: + case FACETS_TRANSFORM_FACET_SORT: + case FACETS_TRANSFORM_HISTOGRAM: + buffer_flush(wb); + buffer_sprintf(wb, "%s UTC", buffer); + break; + } + } + } +} - buffer_flush(wb); - buffer_strcat(wb, sv); +static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + if(scope == FACETS_TRANSFORM_FACET_SORT) + return; + + const char *v = buffer_tostring(wb); + if(*v && isdigit(*v)) { + uid_t uid = str2i(buffer_tostring(wb)); + size_t len; + const char *name = uid_to_username_cached(uid, &len); + buffer_contents_replace(wb, name, len); } } -static void systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { - DICTIONARY *cache = data; +static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + if(scope == FACETS_TRANSFORM_FACET_SORT) + return; + const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { - const char *sv = dictionary_get(cache, v); - if(!sv) { - char buf[1024 + 1]; - int gid = str2i(buffer_tostring(wb)); - const char *name = gid_to_groupname(gid, buf, 1024); - if (!name) - name = v; + gid_t gid = str2i(buffer_tostring(wb)); + size_t len; + const char *name = gid_to_groupname_cached(gid, &len); + buffer_contents_replace(wb, name, len); + } +} - sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1); +const char *linux_capabilities[] = { + [CAP_CHOWN] = "CHOWN", + [CAP_DAC_OVERRIDE] = "DAC_OVERRIDE", + [CAP_DAC_READ_SEARCH] = "DAC_READ_SEARCH", + [CAP_FOWNER] = "FOWNER", + [CAP_FSETID] = "FSETID", + [CAP_KILL] = "KILL", + [CAP_SETGID] = "SETGID", + [CAP_SETUID] = "SETUID", + [CAP_SETPCAP] = "SETPCAP", + [CAP_LINUX_IMMUTABLE] = "LINUX_IMMUTABLE", + [CAP_NET_BIND_SERVICE] = "NET_BIND_SERVICE", + [CAP_NET_BROADCAST] = "NET_BROADCAST", + [CAP_NET_ADMIN] = "NET_ADMIN", + [CAP_NET_RAW] = "NET_RAW", + [CAP_IPC_LOCK] = "IPC_LOCK", + [CAP_IPC_OWNER] = "IPC_OWNER", + [CAP_SYS_MODULE] = "SYS_MODULE", + [CAP_SYS_RAWIO] = "SYS_RAWIO", + [CAP_SYS_CHROOT] = "SYS_CHROOT", + [CAP_SYS_PTRACE] = "SYS_PTRACE", + [CAP_SYS_PACCT] = "SYS_PACCT", + [CAP_SYS_ADMIN] = "SYS_ADMIN", + [CAP_SYS_BOOT] = "SYS_BOOT", + [CAP_SYS_NICE] = "SYS_NICE", + [CAP_SYS_RESOURCE] = "SYS_RESOURCE", + [CAP_SYS_TIME] = "SYS_TIME", + [CAP_SYS_TTY_CONFIG] = "SYS_TTY_CONFIG", + [CAP_MKNOD] = "MKNOD", + [CAP_LEASE] = "LEASE", + [CAP_AUDIT_WRITE] = "AUDIT_WRITE", + [CAP_AUDIT_CONTROL] = "AUDIT_CONTROL", + [CAP_SETFCAP] = "SETFCAP", + [CAP_MAC_OVERRIDE] = "MAC_OVERRIDE", + [CAP_MAC_ADMIN] = "MAC_ADMIN", + [CAP_SYSLOG] = "SYSLOG", + [CAP_WAKE_ALARM] = "WAKE_ALARM", + [CAP_BLOCK_SUSPEND] = "BLOCK_SUSPEND", + [37 /*CAP_AUDIT_READ*/] = "AUDIT_READ", + [38 /*CAP_PERFMON*/] = "PERFMON", + [39 /*CAP_BPF*/] = "BPF", + [40 /* CAP_CHECKPOINT_RESTORE */] = "CHECKPOINT_RESTORE", +}; + +static void netdata_systemd_journal_transform_cap_effective(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + if(scope == FACETS_TRANSFORM_FACET_SORT) + return; + + const char *v = buffer_tostring(wb); + if(*v && isdigit(*v)) { + uint64_t cap = strtoul(buffer_tostring(wb), NULL, 16); + if(cap) { + buffer_fast_strcat(wb, " (", 2); + for (size_t i = 0, added = 0; i < sizeof(linux_capabilities) / sizeof(linux_capabilities[0]); i++) { + if (linux_capabilities[i] && (cap & (1ULL << i))) { + + if (added) + buffer_fast_strcat(wb, " | ", 3); + + buffer_strcat(wb, linux_capabilities[i]); + added++; + } + } + buffer_fast_strcat(wb, ")", 1); } + } +} - buffer_flush(wb); - buffer_strcat(wb, sv); +static void netdata_systemd_journal_transform_timestamp_usec(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + if(scope == FACETS_TRANSFORM_FACET_SORT) + return; + + const char *v = buffer_tostring(wb); + if(*v && isdigit(*v)) { + uint64_t ut = str2ull(buffer_tostring(wb), NULL); + if(ut) { + time_t timestamp_sec = ut / USEC_PER_SEC; + struct tm tm; + char buffer[30]; + + gmtime_r(×tamp_sec, &tm); + strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tm); + buffer_sprintf(wb, " (%s.%06llu UTC)", buffer, ut % USEC_PER_SEC); + } } } -static void systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) { +// ---------------------------------------------------------------------------- + +static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) { FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID"); const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : FACET_VALUE_UNSET; - FACET_ROW_KEY_VALUE *syslog_identifier_rkv = dictionary_get(row->dict, "SYSLOG_IDENTIFIER"); - const char *identifier = syslog_identifier_rkv ? buffer_tostring(syslog_identifier_rkv->wb) : FACET_VALUE_UNSET; + const char *identifier = NULL; + FACET_ROW_KEY_VALUE *container_name_rkv = dictionary_get(row->dict, "CONTAINER_NAME"); + if(container_name_rkv && !container_name_rkv->empty) + identifier = buffer_tostring(container_name_rkv->wb); - if(strcmp(identifier, FACET_VALUE_UNSET) == 0) { - FACET_ROW_KEY_VALUE *comm_rkv = dictionary_get(row->dict, "_COMM"); - identifier = comm_rkv ? buffer_tostring(comm_rkv->wb) : FACET_VALUE_UNSET; + if(!identifier) { + FACET_ROW_KEY_VALUE *syslog_identifier_rkv = dictionary_get(row->dict, "SYSLOG_IDENTIFIER"); + if(syslog_identifier_rkv && !syslog_identifier_rkv->empty) + identifier = buffer_tostring(syslog_identifier_rkv->wb); + + if(!identifier) { + FACET_ROW_KEY_VALUE *comm_rkv = dictionary_get(row->dict, "_COMM"); + if(comm_rkv && !comm_rkv->empty) + identifier = buffer_tostring(comm_rkv->wb); + } } buffer_flush(rkv->wb); - if(strcmp(pid, FACET_VALUE_UNSET) == 0) - buffer_strcat(rkv->wb, identifier); + if(!identifier) + buffer_strcat(rkv->wb, FACET_VALUE_UNSET); else buffer_sprintf(rkv->wb, "%s[%s]", identifier, pid); buffer_json_add_array_item_string(json_array, buffer_tostring(rkv->wb)); } -static void function_systemd_journal(const char *transaction, char *function, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) { - char *words[SYSTEMD_JOURNAL_MAX_PARAMS] = { NULL }; - size_t num_words = quoted_strings_splitter_pluginsd(function, words, SYSTEMD_JOURNAL_MAX_PARAMS); +static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row __maybe_unused, void *data __maybe_unused) { + buffer_json_add_array_item_object(json_array); + buffer_json_member_add_string(json_array, "value", buffer_tostring(rkv->wb)); + buffer_json_object_close(json_array); +} + +DICTIONARY *function_query_status_dict = NULL; + +static void function_systemd_journal_progress(BUFFER *wb, const char *transaction, const char *progress_id) { + if(!progress_id || !(*progress_id)) { + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST, "missing progress id"); + netdata_mutex_unlock(&stdout_mutex); + return; + } + + const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(function_query_status_dict, progress_id); + + if(!item) { + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, "progress id is not found here"); + netdata_mutex_unlock(&stdout_mutex); + return; + } + + FUNCTION_QUERY_STATUS *fqs = dictionary_acquired_item_value(item); + + usec_t now_monotonic_ut = now_monotonic_usec(); + if(now_monotonic_ut + 10 * USEC_PER_SEC > fqs->stop_monotonic_ut) + fqs->stop_monotonic_ut = now_monotonic_ut + 10 * USEC_PER_SEC; + + usec_t duration_ut = now_monotonic_ut - fqs->started_monotonic_ut; + + size_t files_matched = fqs->files_matched; + size_t file_working = fqs->file_working; + if(file_working > files_matched) + files_matched = file_working; + + size_t rows_read = __atomic_load_n(&fqs->rows_read, __ATOMIC_RELAXED); + size_t bytes_read = __atomic_load_n(&fqs->bytes_read, __ATOMIC_RELAXED); - BUFFER *wb = buffer_create(0, NULL); buffer_flush(wb); - buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAY_ITEMS); + buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY); + buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK); + buffer_json_member_add_string(wb, "type", "table"); + buffer_json_member_add_uint64(wb, "running_duration_usec", duration_ut); + buffer_json_member_add_double(wb, "progress", (double)file_working * 100.0 / (double)files_matched); + char msg[1024 + 1]; + snprintfz(msg, 1024, + "Read %zu rows (%0.0f rows/s), " + "data %0.1f MB (%0.1f MB/s), " + "file %zu of %zu", + rows_read, (double)rows_read / (double)duration_ut * (double)USEC_PER_SEC, + (double)bytes_read / 1024.0 / 1024.0, ((double)bytes_read / (double)duration_ut * (double)USEC_PER_SEC) / 1024.0 / 1024.0, + file_working, files_matched + ); + buffer_json_member_add_string(wb, "message", msg); + buffer_json_finalize(wb); + + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", now_realtime_sec() + 1, wb); + netdata_mutex_unlock(&stdout_mutex); - FACETS *facets = facets_create(50, 0, FACETS_OPTION_ALL_KEYS_FTS, + dictionary_acquired_item_release(function_query_status_dict, item); +} + +static void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) { + fstat_thread_calls = 0; + fstat_thread_cached_responses = 0; + journal_files_registry_update(); + + BUFFER *wb = buffer_create(0, NULL); + buffer_flush(wb); + buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY); + + usec_t now_monotonic_ut = now_monotonic_usec(); + FUNCTION_QUERY_STATUS tmp_fqs = { + .cancelled = cancelled, + .started_monotonic_ut = now_monotonic_ut, + .stop_monotonic_ut = now_monotonic_ut + (timeout * USEC_PER_SEC), + }; + FUNCTION_QUERY_STATUS *fqs = NULL; + const DICTIONARY_ITEM *fqs_item = NULL; + + FACETS *facets = facets_create(50, FACETS_OPTION_ALL_KEYS_FTS, SYSTEMD_ALWAYS_VISIBLE_KEYS, SYSTEMD_KEYS_INCLUDED_IN_FACETS, SYSTEMD_KEYS_EXCLUDED_FROM_FACETS); + facets_accepted_param(facets, JOURNAL_PARAMETER_INFO); + facets_accepted_param(facets, JOURNAL_PARAMETER_SOURCE); facets_accepted_param(facets, JOURNAL_PARAMETER_AFTER); facets_accepted_param(facets, JOURNAL_PARAMETER_BEFORE); facets_accepted_param(facets, JOURNAL_PARAMETER_ANCHOR); + facets_accepted_param(facets, JOURNAL_PARAMETER_DIRECTION); facets_accepted_param(facets, JOURNAL_PARAMETER_LAST); facets_accepted_param(facets, JOURNAL_PARAMETER_QUERY); + facets_accepted_param(facets, JOURNAL_PARAMETER_FACETS); + facets_accepted_param(facets, JOURNAL_PARAMETER_HISTOGRAM); + facets_accepted_param(facets, JOURNAL_PARAMETER_IF_MODIFIED_SINCE); + facets_accepted_param(facets, JOURNAL_PARAMETER_DATA_ONLY); + facets_accepted_param(facets, JOURNAL_PARAMETER_ID); + facets_accepted_param(facets, JOURNAL_PARAMETER_PROGRESS); + facets_accepted_param(facets, JOURNAL_PARAMETER_DELTA); + facets_accepted_param(facets, JOURNAL_PARAMETER_TAIL); + +#ifdef HAVE_SD_JOURNAL_RESTART_FIELDS + facets_accepted_param(facets, JOURNAL_PARAMETER_SLICE); +#endif // HAVE_SD_JOURNAL_RESTART_FIELDS // register the fields in the order you want them on the dashboard - facets_register_dynamic_key(facets, "ND_JOURNAL_PROCESS", FACET_KEY_OPTION_NO_FACET|FACET_KEY_OPTION_VISIBLE|FACET_KEY_OPTION_FTS, - systemd_journal_dynamic_row_id, NULL); + facets_register_row_severity(facets, syslog_priority_to_facet_severity, NULL); + + facets_register_key_name(facets, "_HOSTNAME", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS); + + facets_register_dynamic_key_name(facets, JOURNAL_KEY_ND_JOURNAL_PROCESS, + FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS, + netdata_systemd_journal_dynamic_row_id, NULL); + + facets_register_key_name(facets, "MESSAGE", + FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | + FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS); + +// facets_register_dynamic_key_name(facets, "MESSAGE", +// FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | FACET_KEY_OPTION_RICH_TEXT | +// FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS, +// netdata_systemd_journal_rich_message, NULL); - facets_register_key(facets, "MESSAGE", - FACET_KEY_OPTION_NO_FACET|FACET_KEY_OPTION_MAIN_TEXT|FACET_KEY_OPTION_VISIBLE|FACET_KEY_OPTION_FTS); + facets_register_key_name_transformation(facets, "PRIORITY", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_priority, NULL); - facets_register_key_transformation(facets, "PRIORITY", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS, - systemd_journal_transform_priority, NULL); + facets_register_key_name_transformation(facets, "SYSLOG_FACILITY", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_syslog_facility, NULL); - facets_register_key_transformation(facets, "SYSLOG_FACILITY", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS, - systemd_journal_transform_syslog_facility, NULL); + facets_register_key_name_transformation(facets, "ERRNO", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_errno, NULL); - facets_register_key(facets, "SYSLOG_IDENTIFIER", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS); - facets_register_key(facets, "UNIT", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS); - facets_register_key(facets, "USER_UNIT", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS); + facets_register_key_name(facets, JOURNAL_KEY_ND_JOURNAL_FILE, + FACET_KEY_OPTION_NEVER_FACET); - facets_register_key_transformation(facets, "_UID", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS, - systemd_journal_transform_uid, uids); + facets_register_key_name(facets, "SYSLOG_IDENTIFIER", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); - facets_register_key_transformation(facets, "_GID", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS, - systemd_journal_transform_gid, gids); + facets_register_key_name(facets, "UNIT", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); + facets_register_key_name(facets, "USER_UNIT", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); + + facets_register_key_name_transformation(facets, "_BOOT_ID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_boot_id, NULL); + + facets_register_key_name_transformation(facets, "_SYSTEMD_OWNER_UID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_uid, NULL); + + facets_register_key_name_transformation(facets, "_UID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_uid, NULL); + + facets_register_key_name_transformation(facets, "OBJECT_SYSTEMD_OWNER_UID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_uid, NULL); + + facets_register_key_name_transformation(facets, "OBJECT_UID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_uid, NULL); + + facets_register_key_name_transformation(facets, "_GID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_gid, NULL); + + facets_register_key_name_transformation(facets, "OBJECT_GID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_gid, NULL); + + facets_register_key_name_transformation(facets, "_CAP_EFFECTIVE", + FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_cap_effective, NULL); + + facets_register_key_name_transformation(facets, "_AUDIT_LOGINUID", + FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_uid, NULL); + + facets_register_key_name_transformation(facets, "OBJECT_AUDIT_LOGINUID", + FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_uid, NULL); + + facets_register_key_name_transformation(facets, "_SOURCE_REALTIME_TIMESTAMP", + FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_timestamp_usec, NULL); + + // ------------------------------------------------------------------------ + // parse the parameters + + bool info = false, data_only = false, progress = false, slice = JOURNAL_DEFAULT_SLICE_MODE, delta = false, tail = false; time_t after_s = 0, before_s = 0; usec_t anchor = 0; + usec_t if_modified_since = 0; size_t last = 0; + FACETS_ANCHOR_DIRECTION direction = JOURNAL_DEFAULT_DIRECTION; const char *query = NULL; + const char *chart = NULL; + const char *source = NULL; + const char *progress_id = NULL; + SD_JOURNAL_FILE_SOURCE_TYPE source_type = SDJF_ALL; + size_t filters = 0; - buffer_json_member_add_object(wb, "request"); - buffer_json_member_add_object(wb, "filters"); + buffer_json_member_add_object(wb, "_request"); + char *words[SYSTEMD_JOURNAL_MAX_PARAMS] = { NULL }; + size_t num_words = quoted_strings_splitter_pluginsd(function, words, SYSTEMD_JOURNAL_MAX_PARAMS); for(int i = 1; i < SYSTEMD_JOURNAL_MAX_PARAMS ;i++) { - const char *keyword = get_word(words, num_words, i); + char *keyword = get_word(words, num_words, i); if(!keyword) break; if(strcmp(keyword, JOURNAL_PARAMETER_HELP) == 0) { - systemd_journal_function_help(transaction); + netdata_systemd_journal_function_help(transaction); goto cleanup; } - else if(strncmp(keyword, JOURNAL_PARAMETER_AFTER ":", strlen(JOURNAL_PARAMETER_AFTER ":")) == 0) { - after_s = str2l(&keyword[strlen(JOURNAL_PARAMETER_AFTER ":")]); + else if(strcmp(keyword, JOURNAL_PARAMETER_INFO) == 0) { + info = true; } - else if(strncmp(keyword, JOURNAL_PARAMETER_BEFORE ":", strlen(JOURNAL_PARAMETER_BEFORE ":")) == 0) { - before_s = str2l(&keyword[strlen(JOURNAL_PARAMETER_BEFORE ":")]); + else if(strcmp(keyword, JOURNAL_PARAMETER_PROGRESS) == 0) { + progress = true; } - else if(strncmp(keyword, JOURNAL_PARAMETER_ANCHOR ":", strlen(JOURNAL_PARAMETER_ANCHOR ":")) == 0) { - anchor = str2ull(&keyword[strlen(JOURNAL_PARAMETER_ANCHOR ":")], NULL); + else if(strncmp(keyword, JOURNAL_PARAMETER_DELTA ":", sizeof(JOURNAL_PARAMETER_DELTA ":") - 1) == 0) { + char *v = &keyword[sizeof(JOURNAL_PARAMETER_DELTA ":") - 1]; + + if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0) + delta = false; + else + delta = true; } - else if(strncmp(keyword, JOURNAL_PARAMETER_LAST ":", strlen(JOURNAL_PARAMETER_LAST ":")) == 0) { - last = str2ul(&keyword[strlen(JOURNAL_PARAMETER_LAST ":")]); + else if(strncmp(keyword, JOURNAL_PARAMETER_TAIL ":", sizeof(JOURNAL_PARAMETER_TAIL ":") - 1) == 0) { + char *v = &keyword[sizeof(JOURNAL_PARAMETER_TAIL ":") - 1]; + + if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0) + tail = false; + else + tail = true; } - else if(strncmp(keyword, JOURNAL_PARAMETER_QUERY ":", strlen(JOURNAL_PARAMETER_QUERY ":")) == 0) { - query= &keyword[strlen(JOURNAL_PARAMETER_QUERY ":")]; + else if(strncmp(keyword, JOURNAL_PARAMETER_DATA_ONLY ":", sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1) == 0) { + char *v = &keyword[sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1]; + + if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0) + data_only = false; + else + data_only = true; + } + else if(strncmp(keyword, JOURNAL_PARAMETER_SLICE ":", sizeof(JOURNAL_PARAMETER_SLICE ":") - 1) == 0) { + char *v = &keyword[sizeof(JOURNAL_PARAMETER_SLICE ":") - 1]; + + if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0) + slice = false; + else + slice = true; + } + else if(strncmp(keyword, JOURNAL_PARAMETER_ID ":", sizeof(JOURNAL_PARAMETER_ID ":") - 1) == 0) { + char *id = &keyword[sizeof(JOURNAL_PARAMETER_ID ":") - 1]; + + if(*id) + progress_id = id; + } + else if(strncmp(keyword, JOURNAL_PARAMETER_SOURCE ":", sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1) == 0) { + source = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1]; + + if(strcmp(source, SDJF_SOURCE_ALL_NAME) == 0) { + source_type = SDJF_ALL; + source = NULL; + } + else if(strcmp(source, SDJF_SOURCE_LOCAL_NAME) == 0) { + source_type = SDJF_LOCAL; + source = NULL; + } + else if(strcmp(source, SDJF_SOURCE_REMOTES_NAME) == 0) { + source_type = SDJF_REMOTE; + source = NULL; + } + else if(strcmp(source, SDJF_SOURCE_NAMESPACES_NAME) == 0) { + source_type = SDJF_NAMESPACE; + source = NULL; + } + else if(strcmp(source, SDJF_SOURCE_LOCAL_SYSTEM_NAME) == 0) { + source_type = SDJF_LOCAL | SDJF_SYSTEM; + source = NULL; + } + else if(strcmp(source, SDJF_SOURCE_LOCAL_USERS_NAME) == 0) { + source_type = SDJF_LOCAL | SDJF_USER; + source = NULL; + } + else if(strcmp(source, SDJF_SOURCE_LOCAL_OTHER_NAME) == 0) { + source_type = SDJF_LOCAL | SDJF_OTHER; + source = NULL; + } + else { + source_type = SDJF_ALL; + // else, match the source, whatever it is + } + } + else if(strncmp(keyword, JOURNAL_PARAMETER_AFTER ":", sizeof(JOURNAL_PARAMETER_AFTER ":") - 1) == 0) { + after_s = str2l(&keyword[sizeof(JOURNAL_PARAMETER_AFTER ":") - 1]); + } + else if(strncmp(keyword, JOURNAL_PARAMETER_BEFORE ":", sizeof(JOURNAL_PARAMETER_BEFORE ":") - 1) == 0) { + before_s = str2l(&keyword[sizeof(JOURNAL_PARAMETER_BEFORE ":") - 1]); + } + else if(strncmp(keyword, JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":", sizeof(JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":") - 1) == 0) { + if_modified_since = str2ull(&keyword[sizeof(JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":") - 1], NULL); + } + else if(strncmp(keyword, JOURNAL_PARAMETER_ANCHOR ":", sizeof(JOURNAL_PARAMETER_ANCHOR ":") - 1) == 0) { + anchor = str2ull(&keyword[sizeof(JOURNAL_PARAMETER_ANCHOR ":") - 1], NULL); + } + else if(strncmp(keyword, JOURNAL_PARAMETER_DIRECTION ":", sizeof(JOURNAL_PARAMETER_DIRECTION ":") - 1) == 0) { + direction = strcasecmp(&keyword[sizeof(JOURNAL_PARAMETER_DIRECTION ":") - 1], "forward") == 0 ? FACETS_ANCHOR_DIRECTION_FORWARD : FACETS_ANCHOR_DIRECTION_BACKWARD; + } + else if(strncmp(keyword, JOURNAL_PARAMETER_LAST ":", sizeof(JOURNAL_PARAMETER_LAST ":") - 1) == 0) { + last = str2ul(&keyword[sizeof(JOURNAL_PARAMETER_LAST ":") - 1]); + } + else if(strncmp(keyword, JOURNAL_PARAMETER_QUERY ":", sizeof(JOURNAL_PARAMETER_QUERY ":") - 1) == 0) { + query= &keyword[sizeof(JOURNAL_PARAMETER_QUERY ":") - 1]; + } + else if(strncmp(keyword, JOURNAL_PARAMETER_HISTOGRAM ":", sizeof(JOURNAL_PARAMETER_HISTOGRAM ":") - 1) == 0) { + chart = &keyword[sizeof(JOURNAL_PARAMETER_HISTOGRAM ":") - 1]; + } + else if(strncmp(keyword, JOURNAL_PARAMETER_FACETS ":", sizeof(JOURNAL_PARAMETER_FACETS ":") - 1) == 0) { + char *value = &keyword[sizeof(JOURNAL_PARAMETER_FACETS ":") - 1]; + if(*value) { + buffer_json_member_add_array(wb, JOURNAL_PARAMETER_FACETS); + + while(value) { + char *sep = strchr(value, ','); + if(sep) + *sep++ = '\0'; + + facets_register_facet_id(facets, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER); + buffer_json_add_array_item_string(wb, value); + + value = sep; + } + + buffer_json_array_close(wb); // JOURNAL_PARAMETER_FACETS + } } else { char *value = strchr(keyword, ':'); @@ -412,8 +2437,9 @@ static void function_systemd_journal(const char *transaction, char *function, ch if(sep) *sep++ = '\0'; - facets_register_facet_filter(facets, keyword, value, FACET_KEY_OPTION_REORDER); + facets_register_facet_id_filter(facets, keyword, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER); buffer_json_add_array_item_string(wb, value); + filters++; value = sep; } @@ -423,18 +2449,31 @@ static void function_systemd_journal(const char *transaction, char *function, ch } } - buffer_json_object_close(wb); // filters + // ------------------------------------------------------------------------ + // put this request into the progress db + + if(progress_id && *progress_id) { + fqs_item = dictionary_set_and_acquire_item(function_query_status_dict, progress_id, &tmp_fqs, sizeof(tmp_fqs)); + fqs = dictionary_acquired_item_value(fqs_item); + } + else { + // no progress id given, proceed without registering our progress in the dictionary + fqs = &tmp_fqs; + fqs_item = NULL; + } + + // ------------------------------------------------------------------------ + // validate parameters - time_t expires = now_realtime_sec() + 1; - time_t now_s; + time_t now_s = now_realtime_sec(); + time_t expires = now_s + 1; if(!after_s && !before_s) { - now_s = now_realtime_sec(); before_s = now_s; after_s = before_s - SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION; } else - rrdr_relative_window_to_absolute(&after_s, &before_s, &now_s, false); + rrdr_relative_window_to_absolute(&after_s, &before_s, now_s); if(after_s > before_s) { time_t tmp = after_s; @@ -448,85 +2487,179 @@ static void function_systemd_journal(const char *transaction, char *function, ch if(!last) last = SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY; - buffer_json_member_add_time_t(wb, "after", after_s); - buffer_json_member_add_time_t(wb, "before", before_s); - buffer_json_member_add_uint64(wb, "anchor", anchor); - buffer_json_member_add_uint64(wb, "last", last); - buffer_json_member_add_string(wb, "query", query); - buffer_json_member_add_time_t(wb, "timeout", timeout); - buffer_json_object_close(wb); // request - - facets_set_items(facets, last); - facets_set_anchor(facets, anchor); - facets_set_query(facets, query); - int response = systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC, - now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC); - if(response != HTTP_RESP_OK) { - pluginsd_function_json_error(transaction, response, "failed"); - goto cleanup; + // ------------------------------------------------------------------------ + // set query time-frame, anchors and direction + + fqs->after_ut = after_s * USEC_PER_SEC; + fqs->before_ut = (before_s * USEC_PER_SEC) + USEC_PER_SEC - 1; + fqs->if_modified_since = if_modified_since; + fqs->data_only = data_only; + fqs->delta = (fqs->data_only) ? delta : false; + fqs->tail = (fqs->data_only && fqs->if_modified_since) ? tail : false; + fqs->source = string_strdupz(source); + fqs->source_type = source_type; + fqs->entries = last; + fqs->last_modified = 0; + fqs->filters = filters; + fqs->query = (query && *query) ? query : NULL; + fqs->histogram = (chart && *chart) ? chart : NULL; + fqs->direction = direction; + fqs->anchor.start_ut = anchor; + fqs->anchor.stop_ut = 0; + + if(fqs->anchor.start_ut && fqs->tail) { + // a tail request + // we need the top X entries from BEFORE + // but, we need to calculate the facets and the + // histogram up to the anchor + fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD; + fqs->anchor.start_ut = 0; + fqs->anchor.stop_ut = anchor; } - pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires); - fwrite(buffer_tostring(wb), buffer_strlen(wb), 1, stdout); + if(anchor && anchor < fqs->after_ut) { + log_fqs(fqs, "received anchor is too small for query timeframe, ignoring anchor"); + anchor = 0; + fqs->anchor.start_ut = 0; + fqs->anchor.stop_ut = 0; + fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD; + } + else if(anchor > fqs->before_ut) { + log_fqs(fqs, "received anchor is too big for query timeframe, ignoring anchor"); + anchor = 0; + fqs->anchor.start_ut = 0; + fqs->anchor.stop_ut = 0; + fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD; + } - pluginsd_function_result_end_to_stdout(); + facets_set_anchor(facets, fqs->anchor.start_ut, fqs->anchor.stop_ut, fqs->direction); -cleanup: - facets_destroy(facets); - buffer_free(wb); -} + facets_set_additional_options(facets, + ((fqs->data_only) ? FACETS_OPTION_DATA_ONLY : 0) | + ((fqs->delta) ? FACETS_OPTION_SHOW_DELTAS : 0)); -static void *reader_main(void *arg __maybe_unused) { - char buffer[PLUGINSD_LINE_MAX + 1]; + // ------------------------------------------------------------------------ + // set the rest of the query parameters - char *s = NULL; - while(!plugin_should_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) { - char *words[PLUGINSD_MAX_WORDS] = { NULL }; - size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, PLUGINSD_MAX_WORDS); + facets_set_items(facets, fqs->entries); + facets_set_query(facets, fqs->query); - const char *keyword = get_word(words, num_words, 0); +#ifdef HAVE_SD_JOURNAL_RESTART_FIELDS + fqs->slice = slice; + if(slice) + facets_enable_slice_mode(facets); +#else + fqs->slice = false; +#endif - if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); + if(fqs->histogram) + facets_set_timeframe_and_histogram_by_id(facets, fqs->histogram, fqs->after_ut, fqs->before_ut); + else + facets_set_timeframe_and_histogram_by_name(facets, "PRIORITY", fqs->after_ut, fqs->before_ut); - if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { - netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", - keyword, - transaction?transaction:"(unset)", - timeout_s?timeout_s:"(unset)", - function?function:"(unset)"); - } - else { - int timeout = str2i(timeout_s); - if(timeout <= 0) timeout = SYSTEMD_JOURNAL_DEFAULT_TIMEOUT; - netdata_mutex_lock(&mutex); + // ------------------------------------------------------------------------ + // complete the request object + + buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_INFO, false); + buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_SLICE, fqs->slice); + buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DATA_ONLY, fqs->data_only); + buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_PROGRESS, false); + buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DELTA, fqs->delta); + buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_TAIL, fqs->tail); + buffer_json_member_add_string(wb, JOURNAL_PARAMETER_ID, progress_id); + buffer_json_member_add_string(wb, JOURNAL_PARAMETER_SOURCE, string2str(fqs->source)); + buffer_json_member_add_uint64(wb, "source_type", fqs->source_type); + buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_AFTER, fqs->after_ut / USEC_PER_SEC); + buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_BEFORE, fqs->before_ut / USEC_PER_SEC); + buffer_json_member_add_uint64(wb, "if_modified_since", fqs->if_modified_since); + buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_ANCHOR, anchor); + buffer_json_member_add_string(wb, JOURNAL_PARAMETER_DIRECTION, fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward"); + buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_LAST, fqs->entries); + buffer_json_member_add_string(wb, JOURNAL_PARAMETER_QUERY, fqs->query); + buffer_json_member_add_string(wb, JOURNAL_PARAMETER_HISTOGRAM, fqs->histogram); + buffer_json_object_close(wb); // request - if(strncmp(function, SYSTEMD_JOURNAL_FUNCTION_NAME, strlen(SYSTEMD_JOURNAL_FUNCTION_NAME)) == 0) - function_systemd_journal(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout); - else - pluginsd_function_json_error(transaction, HTTP_RESP_NOT_FOUND, "No function with this name found in systemd-journal.plugin."); + buffer_json_journal_versions(wb); - fflush(stdout); - netdata_mutex_unlock(&mutex); + // ------------------------------------------------------------------------ + // run the request + + int response; + + if(info) { + facets_accepted_parameters_to_json_array(facets, wb, false); + buffer_json_member_add_array(wb, "required_params"); + { + buffer_json_add_array_item_object(wb); + { + buffer_json_member_add_string(wb, "id", "source"); + buffer_json_member_add_string(wb, "name", "source"); + buffer_json_member_add_string(wb, "help", "Select the SystemD Journal source to query"); + buffer_json_member_add_string(wb, "type", "select"); + buffer_json_member_add_array(wb, "options"); + { + available_journal_file_sources_to_json_array(wb); + } + buffer_json_array_close(wb); // options array } + buffer_json_object_close(wb); // required params object } - else - netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)"); + buffer_json_array_close(wb); // required_params array + + facets_table_config(wb); + + buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK); + buffer_json_member_add_string(wb, "type", "table"); + buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); + buffer_json_finalize(wb); + response = HTTP_RESP_OK; + goto output; } - if(!s || feof(stdin) || ferror(stdin)) { - plugin_should_exit = true; - netdata_log_error("Received error on stdin."); + if(progress) { + function_systemd_journal_progress(wb, transaction, progress_id); + goto cleanup; } - exit(1); + response = netdata_systemd_journal_query(wb, facets, fqs); + + // ------------------------------------------------------------------------ + // cleanup query params + + string_freez(fqs->source); + fqs->source = NULL; + + // ------------------------------------------------------------------------ + // handle error response + + if(response != HTTP_RESP_OK) { + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_json_error_to_stdout(transaction, response, "failed"); + netdata_mutex_unlock(&stdout_mutex); + goto cleanup; + } + +output: + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_result_to_stdout(transaction, response, "application/json", expires, wb); + netdata_mutex_unlock(&stdout_mutex); + +cleanup: + facets_destroy(facets); + buffer_free(wb); + + if(fqs_item) { + dictionary_del(function_query_status_dict, dictionary_acquired_item_name(fqs_item)); + dictionary_acquired_item_release(function_query_status_dict, fqs_item); + dictionary_garbage_collect(function_query_status_dict); + } } +// ---------------------------------------------------------------------------- + int main(int argc __maybe_unused, char **argv __maybe_unused) { stderror = stderr; clocks_init(); @@ -540,44 +2673,104 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) { error_log_errors_per_period = 100; error_log_throttle_period = 3600; - // initialize the threads - netdata_threads_init_for_external_plugins(0); // set the default threads stack size here + log_set_global_severity_for_external_plugins(); + + netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX"); + if(verify_netdata_host_prefix() == -1) exit(1); + + // ------------------------------------------------------------------------ + // setup the journal directories + + unsigned d = 0; + + journal_directories[d++].path = strdupz("/var/log/journal"); + journal_directories[d++].path = strdupz("/run/log/journal"); + + if(*netdata_configured_host_prefix) { + char path[PATH_MAX]; + snprintfz(path, sizeof(path), "%s/var/log/journal", netdata_configured_host_prefix); + journal_directories[d++].path = strdupz(path); + snprintfz(path, sizeof(path), "%s/run/log/journal", netdata_configured_host_prefix); + journal_directories[d++].path = strdupz(path); + } + + // terminate the list + journal_directories[d].path = NULL; + + // ------------------------------------------------------------------------ + + function_query_status_dict = dictionary_create_advanced( + DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(FUNCTION_QUERY_STATUS)); + + // ------------------------------------------------------------------------ + // initialize the used hashes files registry + + used_hashes_registry = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + + + // ------------------------------------------------------------------------ + // initialize the journal files registry + + systemd_journal_session = (now_realtime_usec() / USEC_PER_SEC) * USEC_PER_SEC; + + journal_files_registry = dictionary_create_advanced( + DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(struct journal_file)); + + dictionary_register_insert_callback(journal_files_registry, files_registry_insert_cb, NULL); + dictionary_register_delete_callback(journal_files_registry, files_registry_delete_cb, NULL); + dictionary_register_conflict_callback(journal_files_registry, files_registry_conflict_cb, NULL); + + boot_ids_to_first_ut = dictionary_create_advanced( + DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(usec_t)); + + journal_files_registry_update(); - uids = dictionary_create(0); - gids = dictionary_create(0); // ------------------------------------------------------------------------ // debug if(argc == 2 && strcmp(argv[1], "debug") == 0) { - char buf[] = "systemd-journal after:-86400 before:0 last:500"; - function_systemd_journal("123", buf, "", 0, 30); + bool cancelled = false; + char buf[] = "systemd-journal after:-16000000 before:0 last:1"; + // char buf[] = "systemd-journal after:1695332964 before:1695937764 direction:backward last:100 slice:true source:all DHKucpqUoe1:PtVoyIuX.MU"; + // char buf[] = "systemd-journal after:1694511062 before:1694514662 anchor:1694514122024403"; + function_systemd_journal("123", buf, 600, &cancelled); exit(1); } // ------------------------------------------------------------------------ + // the event loop for functions + + struct functions_evloop_globals *wg = + functions_evloop_init(SYSTEMD_JOURNAL_WORKER_THREADS, "SDJ", &stdout_mutex, &plugin_should_exit); + + functions_evloop_add_function(wg, SYSTEMD_JOURNAL_FUNCTION_NAME, function_systemd_journal, + SYSTEMD_JOURNAL_DEFAULT_TIMEOUT); - netdata_thread_t reader_thread; - netdata_thread_create(&reader_thread, "SDJ_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL); // ------------------------------------------------------------------------ time_t started_t = now_monotonic_sec(); - size_t iteration; + size_t iteration = 0; usec_t step = 1000 * USEC_PER_MS; bool tty = isatty(fileno(stderr)) == 1; - netdata_mutex_lock(&mutex); + netdata_mutex_lock(&stdout_mutex); fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"%s\" %d \"%s\"\n", SYSTEMD_JOURNAL_FUNCTION_NAME, SYSTEMD_JOURNAL_DEFAULT_TIMEOUT, SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); heartbeat_t hb; heartbeat_init(&hb); - for(iteration = 0; 1 ; iteration++) { - netdata_mutex_unlock(&mutex); + while(!plugin_should_exit) { + iteration++; + + netdata_mutex_unlock(&stdout_mutex); heartbeat_next(&hb, step); - netdata_mutex_lock(&mutex); + netdata_mutex_lock(&stdout_mutex); if(!tty) fprintf(stdout, "\n"); @@ -589,8 +2782,5 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) { break; } - dictionary_destroy(uids); - dictionary_destroy(gids); - exit(0); } |