diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/README.md | 100 | ||||
-rw-r--r-- | streaming/receiver.c | 48 | ||||
-rw-r--r-- | streaming/replication.c | 4 | ||||
-rw-r--r-- | streaming/rrdpush.c | 126 | ||||
-rw-r--r-- | streaming/rrdpush.h | 22 | ||||
-rw-r--r-- | streaming/sender.c | 95 |
6 files changed, 304 insertions, 91 deletions
diff --git a/streaming/README.md b/streaming/README.md index 370186ac..a27167bc 100644 --- a/streaming/README.md +++ b/streaming/README.md @@ -30,36 +30,36 @@ node**. This file is automatically generated by Netdata the first time it is sta #### `[stream]` section -| Setting | Default | Description | -| :---------------------------------------------- | :------------------------ | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `enabled` | `no` | Whether this node streams metrics to any parent. Change to `yes` to enable streaming. | -| [`destination`](#destination) | ` ` | A space-separated list of parent nodes to attempt to stream to, with the first available parent receiving metrics, using the following format: `[PROTOCOL:]HOST[%INTERFACE][:PORT][:SSL]`. [Read more →](#destination) | -| `ssl skip certificate verification` | `yes` | If you want to accept self-signed or expired certificates, set to `yes` and uncomment. | -| `CApath` | `/etc/ssl/certs/` | The directory where known certificates are found. Defaults to OpenSSL's default path. | -| `CAfile` | `/etc/ssl/certs/cert.pem` | Add a parent node certificate to the list of known certificates in `CAPath`. | -| `api key` | ` ` | The `API_KEY` to use as the child node. | -| `timeout seconds` | `60` | The timeout to connect and send metrics to a parent. | -| `default port` | `19999` | The port to use if `destination` does not specify one. | -| [`send charts matching`](#send-charts-matching) | `*` | A space-separated list of [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md) to filter which charts are streamed. [Read more →](#send-charts-matching) | -| `buffer size bytes` | `10485760` | The size of the buffer to use when sending metrics. The default `10485760` equals a buffer of 10MB, which is good for 60 seconds of data. Increase this if you expect latencies higher than that. The buffer is flushed on reconnect. | -| `reconnect delay seconds` | `5` | How long to wait until retrying to connect to the parent node. | -| `initial clock resync iterations` | `60` | Sync the clock of charts for how many seconds when starting. | +| Setting | Default | Description | +|-------------------------------------------------|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `enabled` | `no` | Whether this node streams metrics to any parent. Change to `yes` to enable streaming. | +| [`destination`](#destination) | | A space-separated list of parent nodes to attempt to stream to, with the first available parent receiving metrics, using the following format: `[PROTOCOL:]HOST[%INTERFACE][:PORT][:SSL]`. [Read more →](#destination) | +| `ssl skip certificate verification` | `yes` | If you want to accept self-signed or expired certificates, set to `yes` and uncomment. | +| `CApath` | `/etc/ssl/certs/` | The directory where known certificates are found. Defaults to OpenSSL's default path. | +| `CAfile` | `/etc/ssl/certs/cert.pem` | Add a parent node certificate to the list of known certificates in `CAPath`. | +| `api key` | | The `API_KEY` to use as the child node. | +| `timeout seconds` | `60` | The timeout to connect and send metrics to a parent. | +| `default port` | `19999` | The port to use if `destination` does not specify one. | +| [`send charts matching`](#send-charts-matching) | `*` | A space-separated list of [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md) to filter which charts are streamed. [Read more →](#send-charts-matching) | +| `buffer size bytes` | `10485760` | The size of the buffer to use when sending metrics. The default `10485760` equals a buffer of 10MB, which is good for 60 seconds of data. Increase this if you expect latencies higher than that. The buffer is flushed on reconnect. | +| `reconnect delay seconds` | `5` | How long to wait until retrying to connect to the parent node. | +| `initial clock resync iterations` | `60` | Sync the clock of charts for how many seconds when starting. | ### `[API_KEY]` and `[MACHINE_GUID]` sections -| Setting | Default | Description | -| :---------------------------------------------- | :------------------------ | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `enabled` | `no` | Whether this API KEY enabled or disabled. | -| [`allow from`](#allow-from) | `*` | A space-separated list of [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md) matching the IPs of nodes that will stream metrics using this API key. [Read more →](#allow-from) | -| `default history` | `3600` | The default amount of child metrics history to retain when using the `save`, `map`, or `ram` memory modes. | -| [`default memory mode`](#default-memory-mode) | `ram` | The [database](https://github.com/netdata/netdata/blob/master/database/README.md) to use for all nodes using this `API_KEY`. Valid settings are `dbengine`, `map`, `save`, `ram`, or `none`. [Read more →](#default-memory-mode) | -| `health enabled by default` | `auto` | Whether alarms and notifications should be enabled for nodes using this `API_KEY`. `auto` enables alarms when the child is connected. `yes` enables alarms always, and `no` disables alarms. | -| `default postpone alarms on connect seconds` | `60` | Postpone alarms and notifications for a period of time after the child connects. | -| `default health log history` | `432000` | History of health log events (in seconds) kept in the database. | -| `default proxy enabled` | ` ` | Route metrics through a proxy. | -| `default proxy destination` | ` ` | Space-separated list of `IP:PORT` for proxies. | -| `default proxy api key` | ` ` | The `API_KEY` of the proxy. | -| `default send charts matching` | `*` | See [`send charts matching`](#send-charts-matching). | +| Setting | Default | Description | +|-----------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `enabled` | `no` | Whether this API KEY enabled or disabled. | +| [`allow from`](#allow-from) | `*` | A space-separated list of [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md) matching the IPs of nodes that will stream metrics using this API key. [Read more →](#allow-from) | +| `default history` | `3600` | The default amount of child metrics history to retain when using the `save`, `map`, or `ram` memory modes. | +| [`default memory mode`](#default-memory-mode) | `ram` | The [database](https://github.com/netdata/netdata/blob/master/database/README.md) to use for all nodes using this `API_KEY`. Valid settings are `dbengine`, `map`, `save`, `ram`, or `none`. [Read more →](#default-memory-mode) | +| `health enabled by default` | `auto` | Whether alerts and notifications should be enabled for nodes using this `API_KEY`. `auto` enables alerts when the child is connected. `yes` enables alerts always, and `no` disables alerts. | +| `default postpone alarms on connect seconds` | `60` | Postpone alerts and notifications for a period of time after the child connects. | +| `default health log history` | `432000` | History of health log events (in seconds) kept in the database. | +| `default proxy enabled` | | Route metrics through a proxy. | +| `default proxy destination` | | Space-separated list of `IP:PORT` for proxies. | +| `default proxy api key` | | The `API_KEY` of the proxy. | +| `default send charts matching` | `*` | See [`send charts matching`](#send-charts-matching). | #### `destination` @@ -145,24 +145,24 @@ cache size` and `dbengine multihost disk space` settings in the `[global]` secti ### `netdata.conf` -| Setting | Default | Description | -| :----------------------------------------- | :---------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| **`[global]` section** | | | -| `memory mode` | `dbengine` | Determines the [database type](https://github.com/netdata/netdata/blob/master/database/README.md) to be used on that node. Other options settings include `none`, `ram`, `save`, and `map`. `none` disables the database at this host. This also disables alarms and notifications, as those can't run without a database. | -| **`[web]` section** | | | +| Setting | Default | Description | +|--------------------------------------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `[global]` section | | | +| `memory mode` | `dbengine` | Determines the [database type](https://github.com/netdata/netdata/blob/master/database/README.md) to be used on that node. Other options settings include `none`, `ram`, `save`, and `map`. `none` disables the database at this host. This also disables alerts and notifications, as those can't run without a database. | +| `[web]` section | | | | `mode` | `static-threaded` | Determines the [web server](https://github.com/netdata/netdata/blob/master/web/server/README.md) type. The other option is `none`, which disables the dashboard, API, and registry. | -| `accept a streaming request every seconds` | `0` | Set a limit on how often a parent node accepts streaming requests from child nodes. `0` equals no limit. If this is set, you may see `... too busy to accept new streaming request. Will be allowed in X secs` in Netdata's `error.log`. | +| `accept a streaming request every seconds` | `0` | Set a limit on how often a parent node accepts streaming requests from child nodes. `0` equals no limit. If this is set, you may see `... too busy to accept new streaming request. Will be allowed in X secs` in Netdata's `error.log`. | ### Basic use cases This is an overview of how the main options can be combined: -| target|memory<br/>mode|web<br/>mode|stream<br/>enabled|exporting|alarms|dashboard| -|------|:-------------:|:----------:|:----------------:|:-----:|:----:|:-------:| -| headless collector|`none`|`none`|`yes`|only for `data source = as collected`|not possible|no| -| headless proxy|`none`|not `none`|`yes`|only for `data source = as collected`|not possible|no| -| proxy with db|not `none`|not `none`|`yes`|possible|possible|yes| -| central netdata|not `none`|not `none`|`no`|possible|possible|yes| +| target | memory<br/>mode | web<br/>mode | stream<br/>enabled | exporting | alerts | dashboard | +|--------------------|:---------------:|:------------:|:------------------:|:-------------------------------------:|:------------:|:---------:| +| headless collector | `none` | `none` | `yes` | only for `data source = as collected` | not possible | no | +| headless proxy | `none` | not `none` | `yes` | only for `data source = as collected` | not possible | no | +| proxy with db | not `none` | not `none` | `yes` | possible | possible | yes | +| central netdata | not `none` | not `none` | `no` | possible | possible | yes | ### Per-child settings @@ -170,7 +170,7 @@ While the `[API_KEY]` section applies settings for any child node using that key with the `[MACHINE_GUID]` section. For example, the metrics streamed from only the child node with `MACHINE_GUID` are saved in memory, not using the -default `dbengine` as specified by the `API_KEY`, and alarms are disabled. +default `dbengine` as specified by the `API_KEY`, and alerts are disabled. ```conf [API_KEY] @@ -261,12 +261,12 @@ To enable stream compression: ``` -| Parent | Stream compression | Child | -|----------------------|--------------------|----------------------| -| Supported & Enabled | compressed | Supported & Enabled | -| (Supported & Disabled)/Not supported | uncompressed | Supported & Enabled | -| Supported & Enabled | uncompressed | (Supported & Disabled)/Not supported | -| (Supported & Disabled)/Not supported | uncompressed | (Supported & Disabled)/Not supported | +| Parent | Stream compression | Child | +|--------------------------------------|--------------------|--------------------------------------| +| Supported & Enabled | compressed | Supported & Enabled | +| (Supported & Disabled)/Not supported | uncompressed | Supported & Enabled | +| Supported & Enabled | uncompressed | (Supported & Disabled)/Not supported | +| (Supported & Disabled)/Not supported | uncompressed | (Supported & Disabled)/Not supported | In case of parents with multiple children you can select which streams will be compressed by using the same configuration under the `[API_KEY]`, `[MACHINE_GUID]` section. @@ -383,7 +383,7 @@ following configurations: parameter (default is no). | Parent TLS enabled | Parent port SSL | Child TLS | Child SSL Ver. | Behavior | -| :----------------- | :--------------- | :-------- | :------------- | :--------------------------------------------------------------------------------------------------------------------------------------- | +|:-------------------|:-----------------|:----------|:---------------|:-----------------------------------------------------------------------------------------------------------------------------------------| | No | - | No | no | Legacy behavior. The parent-child stream is unencrypted. | | Yes | force | No | no | The parent rejects the child connection. | | Yes | -/optional | No | no | The parent-child stream is unencrypted (expected situation for legacy child nodes and newer parent nodes) | @@ -396,7 +396,7 @@ A proxy is a node that receives metrics from a child, then streams them onward t configure it as a receiving and a sending Netdata at the same time. Netdata proxies may or may not maintain a database for the metrics passing through them. When they maintain a database, -they can also run health checks (alarms and notifications) for the remote host that is streaming the metrics. +they can also run health checks (alerts and notifications) for the remote host that is streaming the metrics. In the following example, the proxy receives metrics from a child node using the `API_KEY` of `66666666-7777-8888-9999-000000000000`, then stores metrics using `dbengine`. It then uses the `API_KEY` of @@ -431,7 +431,7 @@ On the parent, set the following in `stream.conf`: # do not save child metrics on disk default memory = ram - # alarms checks, only while the child is connected + # alerts checks, only while the child is connected health enabled by default = auto ``` @@ -449,7 +449,7 @@ On the child nodes, set the following in `stream.conf`: api key = 11111111-2222-3333-4444-555555555555 ``` -In addition, edit `netdata.conf` on each child node to disable the database and alarms. +In addition, edit `netdata.conf` on each child node to disable the database and alerts. ```bash [global] diff --git a/streaming/receiver.c b/streaming/receiver.c index 3ff022e9..10ef8b7d 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -226,53 +226,47 @@ static inline bool receiver_read_compressed(struct receiver_state *r) { /* Produce a full line if one exists, statefully return where we start next time. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. */ -inline char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size) { +inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) { + buffer_need_bytes(dst, reader->read_len - reader->pos + 2); + size_t start = reader->pos; char *ss = &reader->read_buffer[start]; char *se = &reader->read_buffer[reader->read_len]; - char *ds = dst; - char *de = &dst[dst_size - 2]; + char *ds = &dst->buffer[dst->len]; + char *de = &ds[dst->size - dst->len - 2]; if(ss >= se) { *ds = '\0'; reader->pos = 0; reader->read_len = 0; reader->read_buffer[reader->read_len] = '\0'; - return NULL; + return false; } // copy all bytes to buffer - while(ss < se && ds < de && *ss != '\n') + while(ss < se && ds < de && *ss != '\n') { *ds++ = *ss++; + dst->len++; + } // if we have a newline, return the buffer if(ss < se && ds < de && *ss == '\n') { // newline found in the r->read_buffer *ds++ = *ss++; // copy the newline too - *ds = '\0'; + dst->len++; - reader->pos = ss - reader->read_buffer; - return dst; - } - - // if the destination is full, oops! - if(ds == de) { - netdata_log_error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX); *ds = '\0'; + reader->pos = ss - reader->read_buffer; - return dst; + return true; } - // no newline found in the r->read_buffer - // move everything to the beginning - memmove(reader->read_buffer, &reader->read_buffer[start], reader->read_len - start); - reader->read_len -= (int)start; - reader->read_buffer[reader->read_len] = '\0'; - *ds = '\0'; reader->pos = 0; - return NULL; + reader->read_len = 0; + reader->read_buffer[reader->read_len] = '\0'; + return false; } bool plugin_is_enabled(struct plugind *cd); @@ -342,10 +336,10 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i buffered_reader_init(&rpt->reader); - char buffer[PLUGINSD_LINE_MAX + 2] = ""; + BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); while(!receiver_should_stop(rpt)) { - if(!buffered_reader_next_line(&rpt->reader, buffer, PLUGINSD_LINE_MAX + 2)) { + if(!buffered_reader_next_line(&rpt->reader, buffer)) { bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt); if(unlikely(!have_new_data)) { @@ -356,13 +350,15 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i continue; } - if (unlikely(parser_action(parser, buffer))) { - internal_error(true, "parser_action() failed on keyword '%s'.", buffer); + if (unlikely(parser_action(parser, buffer->buffer))) { receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); break; } - } + buffer->len = 0; + buffer->buffer[0] = '\0'; + } + buffer_free(buffer); result = parser->user.data_collections_count; // free parser with the pop function diff --git a/streaming/replication.c b/streaming/replication.c index 0e5a0b40..ffb6b3de 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -1838,7 +1838,7 @@ static void replication_main_cleanup(void *ptr) { } freez(replication_globals.main_thread.threads_ptrs); replication_globals.main_thread.threads_ptrs = NULL; - __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED); + __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t), __ATOMIC_RELAXED); aral_destroy(replication_globals.aral_rse); replication_globals.aral_rse = NULL; @@ -1867,7 +1867,7 @@ void *replication_thread_main(void *ptr __maybe_unused) { if(--threads) { replication_globals.main_thread.threads = threads; replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *)); - __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED); + __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t), __ATOMIC_RELAXED); for(int i = 0; i < threads ;i++) { char tag[NETDATA_THREAD_TAG_MAX + 1]; diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 67c43e41..e8c46a02 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -96,6 +96,9 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { STREAM_CAP_BINARY | STREAM_CAP_INTERPOLATED | STREAM_HAS_COMPRESSION | +#ifdef NETDATA_TEST_DYNCFG + STREAM_CAP_DYNCFG | +#endif (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) | (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) | 0; @@ -465,6 +468,46 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) { *rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, }; } +// TODO enable this macro before release +#define bail_if_no_cap(cap) \ + if(unlikely(!stream_has_capability(host->sender, cap))) { \ + return; \ + } + +#define dyncfg_check_can_push(host) \ + if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) \ + return; \ + bail_if_no_cap(STREAM_CAP_DYNCFG) + +// assumes job is locked and acquired!!! +void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job) { + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d\n", plugin_name, module_name, job->name, job_status2str(job->status), job->state); + if (job->reason) + buffer_sprintf(wb, " \"%s\"", job->reason); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); + + job->dirty = 0; +} + +void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name) { + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) { RRDHOST *host = st->rrdhost; @@ -489,6 +532,12 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); } + if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) { + BUFFER *wb = sender_start(host->sender); + rrd_functions_expose_global_rrdpush(host, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + } + RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST); bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED); bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED); @@ -553,6 +602,78 @@ void rrdpush_send_global_functions(RRDHOST *host) { sender_thread_buffer_free(); } +void rrdpush_send_dyncfg(RRDHOST *host) { + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + DICTIONARY *plugins_dict = host->configurable_plugins; + + struct configurable_plugin *plug; + dfe_start_read(plugins_dict, plug) { + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plug->name); + struct module *mod; + dfe_start_read(plug->modules, mod) { + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plug->name, mod->name, module_type2str(mod->type)); + struct job *job; + dfe_start_read(mod->jobs, job) { + pthread_mutex_lock(&job->lock); + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plug->name, mod->name, job->name, job_type2str(job->type), job->flags); + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plug->name, mod->name, job->name, job_status2str(job->status), job->state); + if (job->reason) + buffer_sprintf(wb, " \"%s\"", job->reason); + buffer_sprintf(wb, "\n"); + job->dirty = 0; + pthread_mutex_unlock(&job->lock); + } dfe_done(job); + } dfe_done(mod); + } + dfe_done(plug); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + +void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name) +{ + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plugin_name); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + +void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type) +{ + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type)); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + +void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags) +{ + dyncfg_check_can_push(host); + + BUFFER *wb = sender_start(host->sender); + + buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags); + + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + + sender_thread_buffer_free(); +} + void rrdpush_send_claimed_id(RRDHOST *host) { if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM)) return; @@ -588,7 +709,7 @@ int connect_to_one_of_destinations( if(d->postpone_reconnection_until > now) continue; - netdata_log_info( + internal_error(true, "STREAM %s: connecting to '%s' (default port: %d)...", rrdhost_hostname(host), string2str(d->destination), @@ -1166,6 +1287,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri // another receiver is already connected // try again later +#ifdef NETDATA_INTERNAL_CHECKS char msg[200 + 1]; snprintfz(msg, 200, "multiple connections for same host, " @@ -1176,6 +1298,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri rpt, msg, "ALREADY CONNECTED"); +#endif // Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up buffer_flush(w->response.data); @@ -1280,6 +1403,7 @@ static struct { { STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, { STREAM_CAP_IEEE754, "IEEE754" }, { STREAM_CAP_DATA_WITH_ML, "ML" }, + { STREAM_CAP_DYNCFG, "DYN_CFG" }, { 0 , NULL }, }; diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 73bd438c..c3c14233 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -6,6 +6,7 @@ #include "libnetdata/libnetdata.h" #include "daemon/common.h" #include "web/server/web_client.h" +#include "database/rrdfunctions.h" #include "database/rrd.h" #define CONNECTED_TO_SIZE 100 @@ -44,6 +45,7 @@ typedef enum { STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit + STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set // this must be signed int, so don't use the last bit @@ -231,6 +233,13 @@ typedef enum __attribute__((packed)) { SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression } SENDER_FLAGS; +struct function_payload_state { + BUFFER *payload; + char *txid; + char *fn_name; + char *timeout; +}; + struct sender_state { RRDHOST *host; pid_t tid; // the thread id of the sender, from gettid() @@ -260,6 +269,9 @@ struct sender_state { int rrdpush_sender_pipe[2]; // collector to sender thread signaling int rrdpush_sender_socket; + int receiving_function_payload; + struct function_payload_state function_payload; // state when receiving function with payload + uint16_t hops; #ifdef ENABLE_RRDPUSH_COMPRESSION @@ -356,7 +368,7 @@ struct buffered_reader { char read_buffer[PLUGINSD_LINE_MAX + 1]; }; -char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size); +bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst); static inline void buffered_reader_init(struct buffered_reader *reader) { reader->read_buffer[0] = '\0'; reader->read_len = 0; @@ -481,6 +493,7 @@ void *rrdpush_sender_thread(void *ptr); void rrdpush_send_host_labels(RRDHOST *host); void rrdpush_send_claimed_id(RRDHOST *host); void rrdpush_send_global_functions(RRDHOST *host); +void rrdpush_send_dyncfg(RRDHOST *host); #define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended #define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended @@ -763,4 +776,11 @@ typedef struct rrdhost_status { void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s); bool rrdhost_state_cloud_emulation(RRDHOST *host); +void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job); +void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name); + +void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name); +void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type); +void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);//x + #endif //NETDATA_RRDPUSH_H diff --git a/streaming/sender.c b/streaming/sender.c index 76843518..c37b158b 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -377,6 +377,7 @@ struct { const char *error; int worker_job_id; time_t postpone_reconnect_seconds; + bool prevent_log; } stream_responses[] = { { .response = START_STREAMING_PROMPT_VN, @@ -413,6 +414,7 @@ struct { .error = "remote server rejected this stream, the host we are trying to stream is its localhost", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour + .prevent_log = true, }, { .response = START_STREAMING_ERROR_ALREADY_STREAMING, @@ -422,6 +424,7 @@ struct { .error = "remote server rejected this stream, the host we are trying to stream is already streamed to it", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 2 * 60, // 2 minutes + .prevent_log = true, }, { .response = START_STREAMING_ERROR_NOT_PERMITTED, @@ -469,6 +472,7 @@ struct { .error = "remote node response is not understood, is it Netdata?", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 1 * 60, // 1 minute + .prevent_log = false, } }; @@ -498,6 +502,7 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender return true; } + bool prevent_log = stream_responses[i].prevent_log; const char *error = stream_responses[i].error; int worker_job_id = stream_responses[i].worker_job_id; time_t delay = stream_responses[i].postpone_reconnect_seconds; @@ -509,13 +514,18 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender char buf[LOG_DATE_LENGTH]; log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until); - netdata_log_error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", - rrdhost_hostname(host), s->connected_to, error, delay, buf); + + if(prevent_log) + internal_error(true, "STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", + rrdhost_hostname(host), s->connected_to, error, delay, buf); + else + netdata_log_error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", + rrdhost_hostname(host), s->connected_to, error, delay, buf); return false; } -static bool rrdpush_sender_connect_ssl(struct sender_state *s) { +static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { #ifdef ENABLE_HTTPS RRDHOST *host = s->host; bool ssl_required = host->destination && host->destination->ssl; @@ -924,12 +934,13 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS); sender_thread_buffer_free(); - internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).", + internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %"PRIu64" usec).", rrdhost_hostname(s->host), s->connected_to, string2str(tmp->transaction), buffer_strlen(func_wb), now_realtime_usec() - tmp->received_ut); } + string_freez(tmp->transaction); buffer_free(func_wb); freez(tmp); @@ -944,6 +955,14 @@ void execute_commands(struct sender_state *s) { while( start < end && (newline = strchr(start, '\n')) ) { *newline = '\0'; + if (s->receiving_function_payload && unlikely(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) != 0)) { + if (buffer_strlen(s->function_payload.payload) != 0) + buffer_strcat(s->function_payload.payload, "\n"); + buffer_strcat(s->function_payload.payload, start); + start = newline + 1; + continue; + } + netdata_log_access("STREAM: %d from '%s' for host '%s': %s", gettid(), s->connected_to, rrdhost_hostname(s->host), start); @@ -954,12 +973,12 @@ void execute_commands(struct sender_state *s) { const char *keyword = get_word(words, num_words, 0); - if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { + if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) { worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); + char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1); + char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2); + char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", @@ -979,12 +998,65 @@ void execute_commands(struct sender_state *s) { tmp->transaction = string_strdupz(transaction); BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); - int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp); + char *payload = s->receiving_function_payload ? (char *)buffer_tostring(s->function_payload.payload) : NULL; + int code = rrd_function_run(s->host, wb, timeout, function, false, transaction, + stream_execute_function_callback, tmp, NULL, NULL, payload); + if(code != HTTP_RESP_OK) { - rrd_call_function_error(wb, "Failed to route request to collector", code); + if (!buffer_strlen(wb)) + rrd_call_function_error(wb, "Failed to route request to collector", code); + stream_execute_function_callback(wb, code, tmp); } } + + if (s->receiving_function_payload) { + s->receiving_function_payload = false; + + buffer_free(s->function_payload.payload); + freez(s->function_payload.txid); + freez(s->function_payload.timeout); + freez(s->function_payload.fn_name); + + memset(&s->function_payload, 0, sizeof(struct function_payload_state)); + } + } + else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) { + if (s->receiving_function_payload) { + netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword); + s->receiving_function_payload = false; + buffer_free(s->function_payload.payload); + s->function_payload.payload = NULL; + + // TODO send error response + } + + char *transaction = get_word(words, num_words, 1); + char *timeout_s = get_word(words, num_words, 2); + char *function = get_word(words, num_words, 3); + + if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { + netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", + rrdhost_hostname(s->host), s->connected_to, + keyword, + transaction?transaction:"(unset)", + timeout_s?timeout_s:"(unset)", + function?function:"(unset)"); + } + + s->receiving_function_payload = true; + s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions); + + s->function_payload.txid = strdupz(get_word(words, num_words, 1)); + s->function_payload.timeout = strdupz(get_word(words, num_words, 2)); + s->function_payload.fn_name = strdupz(get_word(words, num_words, 3)); + } + else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { + worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); + + char *transaction = get_word(words, num_words, 1); + if(transaction && *transaction) + rrd_function_cancel(transaction); } else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) { worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST); @@ -1179,7 +1251,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { freez(s); } -void rrdpush_initialize_ssl_ctx(RRDHOST *host) { +void rrdpush_initialize_ssl_ctx(RRDHOST *host __maybe_unused) { #ifdef ENABLE_HTTPS static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER; spinlock_lock(&sp); @@ -1321,6 +1393,7 @@ void *rrdpush_sender_thread(void *ptr) { rrdpush_send_claimed_id(s->host); rrdpush_send_host_labels(s->host); rrdpush_send_global_functions(s->host); + rrdpush_send_dyncfg(s->host); s->replication.oldest_request_after_t = 0; rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); |