summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/README.md100
-rw-r--r--streaming/receiver.c48
-rw-r--r--streaming/replication.c4
-rw-r--r--streaming/rrdpush.c126
-rw-r--r--streaming/rrdpush.h22
-rw-r--r--streaming/sender.c95
6 files changed, 304 insertions, 91 deletions
diff --git a/streaming/README.md b/streaming/README.md
index 370186ac..a27167bc 100644
--- a/streaming/README.md
+++ b/streaming/README.md
@@ -30,36 +30,36 @@ node**. This file is automatically generated by Netdata the first time it is sta
#### `[stream]` section
-| Setting | Default | Description |
-| :---------------------------------------------- | :------------------------ | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| `enabled` | `no` | Whether this node streams metrics to any parent. Change to `yes` to enable streaming. |
-| [`destination`](#destination) | ` ` | A space-separated list of parent nodes to attempt to stream to, with the first available parent receiving metrics, using the following format: `[PROTOCOL:]HOST[%INTERFACE][:PORT][:SSL]`. [Read more →](#destination) |
-| `ssl skip certificate verification` | `yes` | If you want to accept self-signed or expired certificates, set to `yes` and uncomment. |
-| `CApath` | `/etc/ssl/certs/` | The directory where known certificates are found. Defaults to OpenSSL's default path. |
-| `CAfile` | `/etc/ssl/certs/cert.pem` | Add a parent node certificate to the list of known certificates in `CAPath`. |
-| `api key` | ` ` | The `API_KEY` to use as the child node. |
-| `timeout seconds` | `60` | The timeout to connect and send metrics to a parent. |
-| `default port` | `19999` | The port to use if `destination` does not specify one. |
-| [`send charts matching`](#send-charts-matching) | `*` | A space-separated list of [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md) to filter which charts are streamed. [Read more →](#send-charts-matching) |
-| `buffer size bytes` | `10485760` | The size of the buffer to use when sending metrics. The default `10485760` equals a buffer of 10MB, which is good for 60 seconds of data. Increase this if you expect latencies higher than that. The buffer is flushed on reconnect. |
-| `reconnect delay seconds` | `5` | How long to wait until retrying to connect to the parent node. |
-| `initial clock resync iterations` | `60` | Sync the clock of charts for how many seconds when starting. |
+| Setting | Default | Description |
+|-------------------------------------------------|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `enabled` | `no` | Whether this node streams metrics to any parent. Change to `yes` to enable streaming. |
+| [`destination`](#destination) | | A space-separated list of parent nodes to attempt to stream to, with the first available parent receiving metrics, using the following format: `[PROTOCOL:]HOST[%INTERFACE][:PORT][:SSL]`. [Read more →](#destination) |
+| `ssl skip certificate verification` | `yes` | If you want to accept self-signed or expired certificates, set to `yes` and uncomment. |
+| `CApath` | `/etc/ssl/certs/` | The directory where known certificates are found. Defaults to OpenSSL's default path. |
+| `CAfile` | `/etc/ssl/certs/cert.pem` | Add a parent node certificate to the list of known certificates in `CAPath`. |
+| `api key` | | The `API_KEY` to use as the child node. |
+| `timeout seconds` | `60` | The timeout to connect and send metrics to a parent. |
+| `default port` | `19999` | The port to use if `destination` does not specify one. |
+| [`send charts matching`](#send-charts-matching) | `*` | A space-separated list of [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md) to filter which charts are streamed. [Read more →](#send-charts-matching) |
+| `buffer size bytes` | `10485760` | The size of the buffer to use when sending metrics. The default `10485760` equals a buffer of 10MB, which is good for 60 seconds of data. Increase this if you expect latencies higher than that. The buffer is flushed on reconnect. |
+| `reconnect delay seconds` | `5` | How long to wait until retrying to connect to the parent node. |
+| `initial clock resync iterations` | `60` | Sync the clock of charts for how many seconds when starting. |
### `[API_KEY]` and `[MACHINE_GUID]` sections
-| Setting | Default | Description |
-| :---------------------------------------------- | :------------------------ | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| `enabled` | `no` | Whether this API KEY enabled or disabled. |
-| [`allow from`](#allow-from) | `*` | A space-separated list of [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md) matching the IPs of nodes that will stream metrics using this API key. [Read more →](#allow-from) |
-| `default history` | `3600` | The default amount of child metrics history to retain when using the `save`, `map`, or `ram` memory modes. |
-| [`default memory mode`](#default-memory-mode) | `ram` | The [database](https://github.com/netdata/netdata/blob/master/database/README.md) to use for all nodes using this `API_KEY`. Valid settings are `dbengine`, `map`, `save`, `ram`, or `none`. [Read more →](#default-memory-mode) |
-| `health enabled by default` | `auto` | Whether alarms and notifications should be enabled for nodes using this `API_KEY`. `auto` enables alarms when the child is connected. `yes` enables alarms always, and `no` disables alarms. |
-| `default postpone alarms on connect seconds` | `60` | Postpone alarms and notifications for a period of time after the child connects. |
-| `default health log history` | `432000` | History of health log events (in seconds) kept in the database. |
-| `default proxy enabled` | ` ` | Route metrics through a proxy. |
-| `default proxy destination` | ` ` | Space-separated list of `IP:PORT` for proxies. |
-| `default proxy api key` | ` ` | The `API_KEY` of the proxy. |
-| `default send charts matching` | `*` | See [`send charts matching`](#send-charts-matching). |
+| Setting | Default | Description |
+|-----------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `enabled` | `no` | Whether this API KEY enabled or disabled. |
+| [`allow from`](#allow-from) | `*` | A space-separated list of [Netdata simple patterns](https://github.com/netdata/netdata/blob/master/libnetdata/simple_pattern/README.md) matching the IPs of nodes that will stream metrics using this API key. [Read more →](#allow-from) |
+| `default history` | `3600` | The default amount of child metrics history to retain when using the `save`, `map`, or `ram` memory modes. |
+| [`default memory mode`](#default-memory-mode) | `ram` | The [database](https://github.com/netdata/netdata/blob/master/database/README.md) to use for all nodes using this `API_KEY`. Valid settings are `dbengine`, `map`, `save`, `ram`, or `none`. [Read more →](#default-memory-mode) |
+| `health enabled by default` | `auto` | Whether alerts and notifications should be enabled for nodes using this `API_KEY`. `auto` enables alerts when the child is connected. `yes` enables alerts always, and `no` disables alerts. |
+| `default postpone alarms on connect seconds` | `60` | Postpone alerts and notifications for a period of time after the child connects. |
+| `default health log history` | `432000` | History of health log events (in seconds) kept in the database. |
+| `default proxy enabled` | | Route metrics through a proxy. |
+| `default proxy destination` | | Space-separated list of `IP:PORT` for proxies. |
+| `default proxy api key` | | The `API_KEY` of the proxy. |
+| `default send charts matching` | `*` | See [`send charts matching`](#send-charts-matching). |
#### `destination`
@@ -145,24 +145,24 @@ cache size` and `dbengine multihost disk space` settings in the `[global]` secti
### `netdata.conf`
-| Setting | Default | Description |
-| :----------------------------------------- | :---------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| **`[global]` section** | | |
-| `memory mode` | `dbengine` | Determines the [database type](https://github.com/netdata/netdata/blob/master/database/README.md) to be used on that node. Other options settings include `none`, `ram`, `save`, and `map`. `none` disables the database at this host. This also disables alarms and notifications, as those can't run without a database. |
-| **`[web]` section** | | |
+| Setting | Default | Description |
+|--------------------------------------------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `[global]` section | | |
+| `memory mode` | `dbengine` | Determines the [database type](https://github.com/netdata/netdata/blob/master/database/README.md) to be used on that node. Other options settings include `none`, `ram`, `save`, and `map`. `none` disables the database at this host. This also disables alerts and notifications, as those can't run without a database. |
+| `[web]` section | | |
| `mode` | `static-threaded` | Determines the [web server](https://github.com/netdata/netdata/blob/master/web/server/README.md) type. The other option is `none`, which disables the dashboard, API, and registry. |
-| `accept a streaming request every seconds` | `0` | Set a limit on how often a parent node accepts streaming requests from child nodes. `0` equals no limit. If this is set, you may see `... too busy to accept new streaming request. Will be allowed in X secs` in Netdata's `error.log`. |
+| `accept a streaming request every seconds` | `0` | Set a limit on how often a parent node accepts streaming requests from child nodes. `0` equals no limit. If this is set, you may see `... too busy to accept new streaming request. Will be allowed in X secs` in Netdata's `error.log`. |
### Basic use cases
This is an overview of how the main options can be combined:
-| target|memory<br/>mode|web<br/>mode|stream<br/>enabled|exporting|alarms|dashboard|
-|------|:-------------:|:----------:|:----------------:|:-----:|:----:|:-------:|
-| headless collector|`none`|`none`|`yes`|only for `data source = as collected`|not possible|no|
-| headless proxy|`none`|not `none`|`yes`|only for `data source = as collected`|not possible|no|
-| proxy with db|not `none`|not `none`|`yes`|possible|possible|yes|
-| central netdata|not `none`|not `none`|`no`|possible|possible|yes|
+| target | memory<br/>mode | web<br/>mode | stream<br/>enabled | exporting | alerts | dashboard |
+|--------------------|:---------------:|:------------:|:------------------:|:-------------------------------------:|:------------:|:---------:|
+| headless collector | `none` | `none` | `yes` | only for `data source = as collected` | not possible | no |
+| headless proxy | `none` | not `none` | `yes` | only for `data source = as collected` | not possible | no |
+| proxy with db | not `none` | not `none` | `yes` | possible | possible | yes |
+| central netdata | not `none` | not `none` | `no` | possible | possible | yes |
### Per-child settings
@@ -170,7 +170,7 @@ While the `[API_KEY]` section applies settings for any child node using that key
with the `[MACHINE_GUID]` section.
For example, the metrics streamed from only the child node with `MACHINE_GUID` are saved in memory, not using the
-default `dbengine` as specified by the `API_KEY`, and alarms are disabled.
+default `dbengine` as specified by the `API_KEY`, and alerts are disabled.
```conf
[API_KEY]
@@ -261,12 +261,12 @@ To enable stream compression:
```
-| Parent | Stream compression | Child |
-|----------------------|--------------------|----------------------|
-| Supported & Enabled | compressed | Supported & Enabled |
-| (Supported & Disabled)/Not supported | uncompressed | Supported & Enabled |
-| Supported & Enabled | uncompressed | (Supported & Disabled)/Not supported |
-| (Supported & Disabled)/Not supported | uncompressed | (Supported & Disabled)/Not supported |
+| Parent | Stream compression | Child |
+|--------------------------------------|--------------------|--------------------------------------|
+| Supported & Enabled | compressed | Supported & Enabled |
+| (Supported & Disabled)/Not supported | uncompressed | Supported & Enabled |
+| Supported & Enabled | uncompressed | (Supported & Disabled)/Not supported |
+| (Supported & Disabled)/Not supported | uncompressed | (Supported & Disabled)/Not supported |
In case of parents with multiple children you can select which streams will be compressed by using the same configuration under the `[API_KEY]`, `[MACHINE_GUID]` section.
@@ -383,7 +383,7 @@ following configurations:
parameter (default is no).
| Parent TLS enabled | Parent port SSL | Child TLS | Child SSL Ver. | Behavior |
-| :----------------- | :--------------- | :-------- | :------------- | :--------------------------------------------------------------------------------------------------------------------------------------- |
+|:-------------------|:-----------------|:----------|:---------------|:-----------------------------------------------------------------------------------------------------------------------------------------|
| No | - | No | no | Legacy behavior. The parent-child stream is unencrypted. |
| Yes | force | No | no | The parent rejects the child connection. |
| Yes | -/optional | No | no | The parent-child stream is unencrypted (expected situation for legacy child nodes and newer parent nodes) |
@@ -396,7 +396,7 @@ A proxy is a node that receives metrics from a child, then streams them onward t
configure it as a receiving and a sending Netdata at the same time.
Netdata proxies may or may not maintain a database for the metrics passing through them. When they maintain a database,
-they can also run health checks (alarms and notifications) for the remote host that is streaming the metrics.
+they can also run health checks (alerts and notifications) for the remote host that is streaming the metrics.
In the following example, the proxy receives metrics from a child node using the `API_KEY` of
`66666666-7777-8888-9999-000000000000`, then stores metrics using `dbengine`. It then uses the `API_KEY` of
@@ -431,7 +431,7 @@ On the parent, set the following in `stream.conf`:
# do not save child metrics on disk
default memory = ram
- # alarms checks, only while the child is connected
+ # alerts checks, only while the child is connected
health enabled by default = auto
```
@@ -449,7 +449,7 @@ On the child nodes, set the following in `stream.conf`:
api key = 11111111-2222-3333-4444-555555555555
```
-In addition, edit `netdata.conf` on each child node to disable the database and alarms.
+In addition, edit `netdata.conf` on each child node to disable the database and alerts.
```bash
[global]
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 3ff022e9..10ef8b7d 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -226,53 +226,47 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
*/
-inline char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size) {
+inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) {
+ buffer_need_bytes(dst, reader->read_len - reader->pos + 2);
+
size_t start = reader->pos;
char *ss = &reader->read_buffer[start];
char *se = &reader->read_buffer[reader->read_len];
- char *ds = dst;
- char *de = &dst[dst_size - 2];
+ char *ds = &dst->buffer[dst->len];
+ char *de = &ds[dst->size - dst->len - 2];
if(ss >= se) {
*ds = '\0';
reader->pos = 0;
reader->read_len = 0;
reader->read_buffer[reader->read_len] = '\0';
- return NULL;
+ return false;
}
// copy all bytes to buffer
- while(ss < se && ds < de && *ss != '\n')
+ while(ss < se && ds < de && *ss != '\n') {
*ds++ = *ss++;
+ dst->len++;
+ }
// if we have a newline, return the buffer
if(ss < se && ds < de && *ss == '\n') {
// newline found in the r->read_buffer
*ds++ = *ss++; // copy the newline too
- *ds = '\0';
+ dst->len++;
- reader->pos = ss - reader->read_buffer;
- return dst;
- }
-
- // if the destination is full, oops!
- if(ds == de) {
- netdata_log_error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX);
*ds = '\0';
+
reader->pos = ss - reader->read_buffer;
- return dst;
+ return true;
}
- // no newline found in the r->read_buffer
- // move everything to the beginning
- memmove(reader->read_buffer, &reader->read_buffer[start], reader->read_len - start);
- reader->read_len -= (int)start;
- reader->read_buffer[reader->read_len] = '\0';
- *ds = '\0';
reader->pos = 0;
- return NULL;
+ reader->read_len = 0;
+ reader->read_buffer[reader->read_len] = '\0';
+ return false;
}
bool plugin_is_enabled(struct plugind *cd);
@@ -342,10 +336,10 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
buffered_reader_init(&rpt->reader);
- char buffer[PLUGINSD_LINE_MAX + 2] = "";
+ BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
while(!receiver_should_stop(rpt)) {
- if(!buffered_reader_next_line(&rpt->reader, buffer, PLUGINSD_LINE_MAX + 2)) {
+ if(!buffered_reader_next_line(&rpt->reader, buffer)) {
bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt);
if(unlikely(!have_new_data)) {
@@ -356,13 +350,15 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
continue;
}
- if (unlikely(parser_action(parser, buffer))) {
- internal_error(true, "parser_action() failed on keyword '%s'.", buffer);
+ if (unlikely(parser_action(parser, buffer->buffer))) {
receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
break;
}
- }
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
+ }
+ buffer_free(buffer);
result = parser->user.data_collections_count;
// free parser with the pop function
diff --git a/streaming/replication.c b/streaming/replication.c
index 0e5a0b40..ffb6b3de 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -1838,7 +1838,7 @@ static void replication_main_cleanup(void *ptr) {
}
freez(replication_globals.main_thread.threads_ptrs);
replication_globals.main_thread.threads_ptrs = NULL;
- __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t), __ATOMIC_RELAXED);
aral_destroy(replication_globals.aral_rse);
replication_globals.aral_rse = NULL;
@@ -1867,7 +1867,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
if(--threads) {
replication_globals.main_thread.threads = threads;
replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
- __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
+ __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t), __ATOMIC_RELAXED);
for(int i = 0; i < threads ;i++) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 67c43e41..e8c46a02 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -96,6 +96,9 @@ STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
STREAM_CAP_BINARY |
STREAM_CAP_INTERPOLATED |
STREAM_HAS_COMPRESSION |
+#ifdef NETDATA_TEST_DYNCFG
+ STREAM_CAP_DYNCFG |
+#endif
(ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
(ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) |
0;
@@ -465,6 +468,46 @@ void rrdset_push_metrics_finished(RRDSET_STREAM_BUFFER *rsb, RRDSET *st) {
*rsb = (RRDSET_STREAM_BUFFER){ .wb = NULL, };
}
+// TODO enable this macro before release
+#define bail_if_no_cap(cap) \
+ if(unlikely(!stream_has_capability(host->sender, cap))) { \
+ return; \
+ }
+
+#define dyncfg_check_can_push(host) \
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host))) \
+ return; \
+ bail_if_no_cap(STREAM_CAP_DYNCFG)
+
+// assumes job is locked and acquired!!!
+void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job) {
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d\n", plugin_name, module_name, job->name, job_status2str(job->status), job->state);
+ if (job->reason)
+ buffer_sprintf(wb, " \"%s\"", job->reason);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+
+ job->dirty = 0;
+}
+
+void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name) {
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DELETE_JOB " %s %s %s\n", plugin_name, module_name, job_name);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock_time) {
RRDHOST *host = st->rrdhost;
@@ -489,6 +532,12 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
}
+ if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) {
+ BUFFER *wb = sender_start(host->sender);
+ rrd_functions_expose_global_rrdpush(host, wb);
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+ }
+
RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST);
bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED);
bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
@@ -553,6 +602,78 @@ void rrdpush_send_global_functions(RRDHOST *host) {
sender_thread_buffer_free();
}
+void rrdpush_send_dyncfg(RRDHOST *host) {
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ DICTIONARY *plugins_dict = host->configurable_plugins;
+
+ struct configurable_plugin *plug;
+ dfe_start_read(plugins_dict, plug) {
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plug->name);
+ struct module *mod;
+ dfe_start_read(plug->modules, mod) {
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plug->name, mod->name, module_type2str(mod->type));
+ struct job *job;
+ dfe_start_read(mod->jobs, job) {
+ pthread_mutex_lock(&job->lock);
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plug->name, mod->name, job->name, job_type2str(job->type), job->flags);
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPORT_JOB_STATUS " %s %s %s %s %d", plug->name, mod->name, job->name, job_status2str(job->status), job->state);
+ if (job->reason)
+ buffer_sprintf(wb, " \"%s\"", job->reason);
+ buffer_sprintf(wb, "\n");
+ job->dirty = 0;
+ pthread_mutex_unlock(&job->lock);
+ } dfe_done(job);
+ } dfe_done(mod);
+ }
+ dfe_done(plug);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name)
+{
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_ENABLE " %s\n", plugin_name);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type)
+{
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE " %s %s %s\n", plugin_name, module_name, module_type2str(type));
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
+void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags)
+{
+ dyncfg_check_can_push(host);
+
+ BUFFER *wb = sender_start(host->sender);
+
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB " %s %s %s %s %"PRIu32"\n", plugin_name, module_name, job_name, job_type2str(type), flags);
+
+ sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
+
+ sender_thread_buffer_free();
+}
+
void rrdpush_send_claimed_id(RRDHOST *host) {
if(!stream_has_capability(host->sender, STREAM_CAP_CLAIM))
return;
@@ -588,7 +709,7 @@ int connect_to_one_of_destinations(
if(d->postpone_reconnection_until > now)
continue;
- netdata_log_info(
+ internal_error(true,
"STREAM %s: connecting to '%s' (default port: %d)...",
rrdhost_hostname(host),
string2str(d->destination),
@@ -1166,6 +1287,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
// another receiver is already connected
// try again later
+#ifdef NETDATA_INTERNAL_CHECKS
char msg[200 + 1];
snprintfz(msg, 200,
"multiple connections for same host, "
@@ -1176,6 +1298,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
rpt,
msg,
"ALREADY CONNECTED");
+#endif
// Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
buffer_flush(w->response.data);
@@ -1280,6 +1403,7 @@ static struct {
{ STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
{ STREAM_CAP_IEEE754, "IEEE754" },
{ STREAM_CAP_DATA_WITH_ML, "ML" },
+ { STREAM_CAP_DYNCFG, "DYN_CFG" },
{ 0 , NULL },
};
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 73bd438c..c3c14233 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -6,6 +6,7 @@
#include "libnetdata/libnetdata.h"
#include "daemon/common.h"
#include "web/server/web_client.h"
+#include "database/rrdfunctions.h"
#include "database/rrd.h"
#define CONNECTED_TO_SIZE 100
@@ -44,6 +45,7 @@ typedef enum {
STREAM_CAP_INTERPOLATED = (1 << 14), // streaming supports interpolated streaming of values
STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
+ STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
@@ -231,6 +233,13 @@ typedef enum __attribute__((packed)) {
SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
+struct function_payload_state {
+ BUFFER *payload;
+ char *txid;
+ char *fn_name;
+ char *timeout;
+};
+
struct sender_state {
RRDHOST *host;
pid_t tid; // the thread id of the sender, from gettid()
@@ -260,6 +269,9 @@ struct sender_state {
int rrdpush_sender_pipe[2]; // collector to sender thread signaling
int rrdpush_sender_socket;
+ int receiving_function_payload;
+ struct function_payload_state function_payload; // state when receiving function with payload
+
uint16_t hops;
#ifdef ENABLE_RRDPUSH_COMPRESSION
@@ -356,7 +368,7 @@ struct buffered_reader {
char read_buffer[PLUGINSD_LINE_MAX + 1];
};
-char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size);
+bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst);
static inline void buffered_reader_init(struct buffered_reader *reader) {
reader->read_buffer[0] = '\0';
reader->read_len = 0;
@@ -481,6 +493,7 @@ void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
void rrdpush_send_claimed_id(RRDHOST *host);
void rrdpush_send_global_functions(RRDHOST *host);
+void rrdpush_send_dyncfg(RRDHOST *host);
#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
@@ -763,4 +776,11 @@ typedef struct rrdhost_status {
void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s);
bool rrdhost_state_cloud_emulation(RRDHOST *host);
+void rrdpush_send_job_status_update(RRDHOST *host, const char *plugin_name, const char *module_name, struct job *job);
+void rrdpush_send_job_deleted(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name);
+
+void rrdpush_send_dyncfg_enable(RRDHOST *host, const char *plugin_name);
+void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, const char *module_name, enum module_type type);
+void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);//x
+
#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
index 76843518..c37b158b 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -377,6 +377,7 @@ struct {
const char *error;
int worker_job_id;
time_t postpone_reconnect_seconds;
+ bool prevent_log;
} stream_responses[] = {
{
.response = START_STREAMING_PROMPT_VN,
@@ -413,6 +414,7 @@ struct {
.error = "remote server rejected this stream, the host we are trying to stream is its localhost",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour
+ .prevent_log = true,
},
{
.response = START_STREAMING_ERROR_ALREADY_STREAMING,
@@ -422,6 +424,7 @@ struct {
.error = "remote server rejected this stream, the host we are trying to stream is already streamed to it",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 2 * 60, // 2 minutes
+ .prevent_log = true,
},
{
.response = START_STREAMING_ERROR_NOT_PERMITTED,
@@ -469,6 +472,7 @@ struct {
.error = "remote node response is not understood, is it Netdata?",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 1 * 60, // 1 minute
+ .prevent_log = false,
}
};
@@ -498,6 +502,7 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
return true;
}
+ bool prevent_log = stream_responses[i].prevent_log;
const char *error = stream_responses[i].error;
int worker_job_id = stream_responses[i].worker_job_id;
time_t delay = stream_responses[i].postpone_reconnect_seconds;
@@ -509,13 +514,18 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
char buf[LOG_DATE_LENGTH];
log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until);
- netdata_log_error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
- rrdhost_hostname(host), s->connected_to, error, delay, buf);
+
+ if(prevent_log)
+ internal_error(true, "STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
+ rrdhost_hostname(host), s->connected_to, error, delay, buf);
+ else
+ netdata_log_error("STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
+ rrdhost_hostname(host), s->connected_to, error, delay, buf);
return false;
}
-static bool rrdpush_sender_connect_ssl(struct sender_state *s) {
+static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
#ifdef ENABLE_HTTPS
RRDHOST *host = s->host;
bool ssl_required = host->destination && host->destination->ssl;
@@ -924,12 +934,13 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
sender_commit(s, wb, STREAM_TRAFFIC_TYPE_FUNCTIONS);
sender_thread_buffer_free();
- internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %llu usec).",
+ internal_error(true, "STREAM %s [send to %s] FUNCTION transaction %s sending back response (%zu bytes, %"PRIu64" usec).",
rrdhost_hostname(s->host), s->connected_to,
string2str(tmp->transaction),
buffer_strlen(func_wb),
now_realtime_usec() - tmp->received_ut);
}
+
string_freez(tmp->transaction);
buffer_free(func_wb);
freez(tmp);
@@ -944,6 +955,14 @@ void execute_commands(struct sender_state *s) {
while( start < end && (newline = strchr(start, '\n')) ) {
*newline = '\0';
+ if (s->receiving_function_payload && unlikely(strcmp(start, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) != 0)) {
+ if (buffer_strlen(s->function_payload.payload) != 0)
+ buffer_strcat(s->function_payload.payload, "\n");
+ buffer_strcat(s->function_payload.payload, start);
+ start = newline + 1;
+ continue;
+ }
+
netdata_log_access("STREAM: %d from '%s' for host '%s': %s",
gettid(), s->connected_to, rrdhost_hostname(s->host), start);
@@ -954,12 +973,12 @@ void execute_commands(struct sender_state *s) {
const char *keyword = get_word(words, num_words, 0);
- if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+ if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) {
worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
- char *transaction = get_word(words, num_words, 1);
- char *timeout_s = get_word(words, num_words, 2);
- char *function = get_word(words, num_words, 3);
+ char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1);
+ char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2);
+ char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
@@ -979,12 +998,65 @@ void execute_commands(struct sender_state *s) {
tmp->transaction = string_strdupz(transaction);
BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions);
- int code = rrd_call_function_async(s->host, wb, timeout, function, stream_execute_function_callback, tmp);
+ char *payload = s->receiving_function_payload ? (char *)buffer_tostring(s->function_payload.payload) : NULL;
+ int code = rrd_function_run(s->host, wb, timeout, function, false, transaction,
+ stream_execute_function_callback, tmp, NULL, NULL, payload);
+
if(code != HTTP_RESP_OK) {
- rrd_call_function_error(wb, "Failed to route request to collector", code);
+ if (!buffer_strlen(wb))
+ rrd_call_function_error(wb, "Failed to route request to collector", code);
+
stream_execute_function_callback(wb, code, tmp);
}
}
+
+ if (s->receiving_function_payload) {
+ s->receiving_function_payload = false;
+
+ buffer_free(s->function_payload.payload);
+ freez(s->function_payload.txid);
+ freez(s->function_payload.timeout);
+ freez(s->function_payload.fn_name);
+
+ memset(&s->function_payload, 0, sizeof(struct function_payload_state));
+ }
+ }
+ else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
+ if (s->receiving_function_payload) {
+ netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword);
+ s->receiving_function_payload = false;
+ buffer_free(s->function_payload.payload);
+ s->function_payload.payload = NULL;
+
+ // TODO send error response
+ }
+
+ char *transaction = get_word(words, num_words, 1);
+ char *timeout_s = get_word(words, num_words, 2);
+ char *function = get_word(words, num_words, 3);
+
+ if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
+ netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
+ rrdhost_hostname(s->host), s->connected_to,
+ keyword,
+ transaction?transaction:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ function?function:"(unset)");
+ }
+
+ s->receiving_function_payload = true;
+ s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions);
+
+ s->function_payload.txid = strdupz(get_word(words, num_words, 1));
+ s->function_payload.timeout = strdupz(get_word(words, num_words, 2));
+ s->function_payload.fn_name = strdupz(get_word(words, num_words, 3));
+ }
+ else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
+ worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+
+ char *transaction = get_word(words, num_words, 1);
+ if(transaction && *transaction)
+ rrd_function_cancel(transaction);
}
else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
@@ -1179,7 +1251,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
freez(s);
}
-void rrdpush_initialize_ssl_ctx(RRDHOST *host) {
+void rrdpush_initialize_ssl_ctx(RRDHOST *host __maybe_unused) {
#ifdef ENABLE_HTTPS
static SPINLOCK sp = NETDATA_SPINLOCK_INITIALIZER;
spinlock_lock(&sp);
@@ -1321,6 +1393,7 @@ void *rrdpush_sender_thread(void *ptr) {
rrdpush_send_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
rrdpush_send_global_functions(s->host);
+ rrdpush_send_dyncfg(s->host);
s->replication.oldest_request_after_t = 0;
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);