1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
|
// SPDX-License-Identifier: GPL-3.0-or-later
/** @file logsmanagement.c
* @brief This is the main file of the Netdata logs management project
*
* The aim of the project is to add the capability to collect, parse and
* query logs in the Netdata agent. For more information please refer
* to the project's [README](README.md) file.
*/
#include <uv.h>
#include "daemon/common.h"
#include "db_api.h"
#include "file_info.h"
#include "flb_plugin.h"
#include "functions.h"
#include "helper.h"
#include "libnetdata/required_dummies.h"
#include "logsmanag_config.h"
#include "rrd_api/rrd_api_stats.h"
#if defined(ENABLE_LOGSMANAGEMENT_TESTS)
#include "logsmanagement/unit_test/unit_test.h"
#endif
netdata_mutex_t stdout_mut = NETDATA_MUTEX_INITIALIZER;
bool logsmanagement_should_exit = false;
struct File_infos_arr *p_file_infos_arr = NULL;
static uv_loop_t *main_loop;
static struct {
uv_signal_t sig;
const int signum;
} signals[] = {
// Add here signals that will terminate the plugin
{.signum = SIGINT},
{.signum = SIGQUIT},
{.signum = SIGPIPE},
{.signum = SIGTERM}
};
static void signal_handler(uv_signal_t *handle, int signum __maybe_unused) {
UNUSED(handle);
debug_log("Signal received: %d\n", signum);
__atomic_store_n(&logsmanagement_should_exit, true, __ATOMIC_RELAXED);
}
static void on_walk_cleanup(uv_handle_t* handle, void* data){
UNUSED(data);
if (!uv_is_closing(handle))
uv_close(handle, NULL);
}
/**
* @brief The main function of the logs management plugin.
* @details Any static asserts are most likely going to be inluded here. After
* any initialisation routines, the default uv_loop_t is executed indefinitely.
*/
int main(int argc, char **argv) {
/* Static asserts */
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-local-typedefs"
COMPILE_TIME_ASSERT(SAVE_BLOB_TO_DB_MIN <= SAVE_BLOB_TO_DB_MAX);
COMPILE_TIME_ASSERT(CIRCULAR_BUFF_DEFAULT_MAX_SIZE >= CIRCULAR_BUFF_MAX_SIZE_RANGE_MIN);
COMPILE_TIME_ASSERT(CIRCULAR_BUFF_DEFAULT_MAX_SIZE <= CIRCULAR_BUFF_MAX_SIZE_RANGE_MAX);
#pragma GCC diagnostic pop
clocks_init();
program_name = LOGS_MANAGEMENT_PLUGIN_STR;
nd_log_initialize_for_external_plugins(program_name);
// netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX");
// if(verify_netdata_host_prefix(true) == -1) exit(1);
int g_update_every = 0;
for(int i = 1; i < argc ; i++) {
if(isdigit(*argv[i]) && !g_update_every && str2i(argv[i]) > 0 && str2i(argv[i]) < 86400) {
g_update_every = str2i(argv[i]);
debug_log("new update_every received: %d", g_update_every);
}
else if(!strcmp("--unittest", argv[i])) {
#if defined(ENABLE_LOGSMANAGEMENT_TESTS)
exit(logs_management_unittest());
#else
collector_error("%s was not built with unit test support.", program_name);
#endif
}
else if(!strcmp("version", argv[i]) ||
!strcmp("-version", argv[i]) ||
!strcmp("--version", argv[i]) ||
!strcmp("-v", argv[i]) ||
!strcmp("-V", argv[i])) {
printf(NETDATA_VERSION"\n");
exit(0);
}
else if(!strcmp("-h", argv[i]) ||
!strcmp("--help", argv[i])) {
fprintf(stderr,
"\n"
" netdata %s %s\n"
" Copyright (C) 2023 Netdata Inc.\n"
" Released under GNU General Public License v3 or later.\n"
" All rights reserved.\n"
"\n"
" This program is the logs management plugin for netdata.\n"
"\n"
" Available command line options:\n"
"\n"
" --unittest run unit tests and exit\n"
"\n"
" -v\n"
" -V\n"
" --version print version and exit\n"
"\n"
" -h\n"
" --help print this message and exit\n"
"\n"
" For more information:\n"
" https://github.com/netdata/netdata/tree/master/src/collectors/logs-management.plugin\n"
"\n",
program_name,
NETDATA_VERSION
);
exit(1);
}
else
collector_error("%s(): ignoring parameter '%s'", __FUNCTION__, argv[i]);
}
Flb_socket_config_t *p_forward_in_config = NULL;
main_loop = mallocz(sizeof(uv_loop_t));
fatal_assert(uv_loop_init(main_loop) == 0);
flb_srvc_config_t flb_srvc_config = {
.flush = FLB_FLUSH_DEFAULT,
.http_listen = FLB_HTTP_LISTEN_DEFAULT,
.http_port = FLB_HTTP_PORT_DEFAULT,
.http_server = FLB_HTTP_SERVER_DEFAULT,
.log_path = "NULL",
.log_level = FLB_LOG_LEVEL_DEFAULT,
.coro_stack_size = FLB_CORO_STACK_SIZE_DEFAULT
};
p_file_infos_arr = callocz(1, sizeof(struct File_infos_arr));
if(logs_manag_config_load(&flb_srvc_config, &p_forward_in_config, g_update_every))
exit(1);
if(flb_init(flb_srvc_config, get_stock_config_dir(), g_logs_manag_config.sd_journal_field_prefix)){
collector_error("flb_init() failed - logs management will be disabled");
exit(1);
}
if(flb_add_fwd_input(p_forward_in_config))
collector_error("flb_add_fwd_input() failed - logs management forward input will be disabled");
/* Initialize logs management for each configuration section */
config_file_load(main_loop, p_forward_in_config, &flb_srvc_config, &stdout_mut);
if(p_file_infos_arr->count == 0){
collector_info("No valid configuration could be found for any log source - logs management will be disabled");
exit(1);
}
/* Run Fluent Bit engine
* NOTE: flb_run() ideally would be executed after db_init(), but in case of
* a db_init() failure, it is easier to call flb_stop_and_cleanup() rather
* than the other way round (i.e. cleaning up after db_init(), if flb_run()
* fails). */
if(flb_run()){
collector_error("flb_run() failed - logs management will be disabled");
exit(1);
}
if(db_init()){
collector_error("db_init() failed - logs management will be disabled");
exit(1);
}
uv_thread_t *p_stats_charts_thread_id = NULL;
const char *const netdata_internals_monitoring = getenv("NETDATA_INTERNALS_MONITORING");
if( netdata_internals_monitoring &&
*netdata_internals_monitoring &&
strcmp(netdata_internals_monitoring, "YES") == 0){
p_stats_charts_thread_id = mallocz(sizeof(uv_thread_t));
fatal_assert(0 == uv_thread_create(p_stats_charts_thread_id, stats_charts_init, &stdout_mut));
}
#if defined(__STDC_VERSION__)
debug_log( "__STDC_VERSION__: %ld", __STDC_VERSION__);
#else
debug_log( "__STDC_VERSION__ undefined");
#endif // defined(__STDC_VERSION__)
debug_log( "libuv version: %s", uv_version_string());
debug_log( "LZ4 version: %s", LZ4_versionString());
debug_log( "SQLITE version: " SQLITE_VERSION);
for(int i = 0; i < (int) (sizeof(signals) / sizeof(signals[0])); i++){
uv_signal_init(main_loop, &signals[i].sig);
uv_signal_start(&signals[i].sig, signal_handler, signals[i].signum);
}
struct functions_evloop_globals *wg = logsmanagement_func_facets_init(&logsmanagement_should_exit);
collector_info("%s setup completed successfully", program_name);
/* Run uvlib loop. */
while(!__atomic_load_n(&logsmanagement_should_exit, __ATOMIC_RELAXED))
uv_run(main_loop, UV_RUN_ONCE);
/* If there are valid log sources, there should always be valid handles */
collector_info("uv_run(main_loop, ...); no handles or requests - cleaning up...");
nd_log_limits_unlimited();
// TODO: Clean up stats charts memory
if(p_stats_charts_thread_id){
uv_thread_join(p_stats_charts_thread_id);
freez(p_stats_charts_thread_id);
}
uv_stop(main_loop);
flb_terminate();
flb_free_fwd_input_out_cb();
p_file_info_destroy_all();
uv_walk(main_loop, on_walk_cleanup, NULL);
while(0 != uv_run(main_loop, UV_RUN_ONCE));
if(uv_loop_close(main_loop))
m_assert(0, "uv_loop_close() result not 0");
freez(main_loop);
functions_evloop_cancel_threads(wg);
collector_info("logs management clean up done - exiting");
exit(0);
}
|