summaryrefslogtreecommitdiffstats
path: root/aclk/schema-wrappers
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--aclk/schema-wrappers/alarm_config.cc147
-rw-r--r--aclk/schema-wrappers/alarm_config.h69
-rw-r--r--aclk/schema-wrappers/alarm_stream.cc248
-rw-r--r--aclk/schema-wrappers/alarm_stream.h134
-rw-r--r--aclk/schema-wrappers/chart_config.cc105
-rw-r--r--aclk/schema-wrappers/chart_config.h50
-rw-r--r--aclk/schema-wrappers/chart_stream.cc342
-rw-r--r--aclk/schema-wrappers/chart_stream.h121
-rw-r--r--aclk/schema-wrappers/connection.cc63
-rw-r--r--aclk/schema-wrappers/connection.h43
-rw-r--r--aclk/schema-wrappers/node_connection.cc37
-rw-r--r--aclk/schema-wrappers/node_connection.h29
-rw-r--r--aclk/schema-wrappers/node_creation.cc39
-rw-r--r--aclk/schema-wrappers/node_creation.h31
-rw-r--r--aclk/schema-wrappers/node_info.cc95
-rw-r--r--aclk/schema-wrappers/node_info.h69
-rw-r--r--aclk/schema-wrappers/schema_wrapper_utils.cc15
-rw-r--r--aclk/schema-wrappers/schema_wrapper_utils.h20
-rw-r--r--aclk/schema-wrappers/schema_wrappers.h17
19 files changed, 1674 insertions, 0 deletions
diff --git a/aclk/schema-wrappers/alarm_config.cc b/aclk/schema-wrappers/alarm_config.cc
new file mode 100644
index 000000000..56d7e6f39
--- /dev/null
+++ b/aclk/schema-wrappers/alarm_config.cc
@@ -0,0 +1,147 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "alarm_config.h"
+
+#include "proto/alarm/v1/config.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+using namespace alarms::v1;
+
+void destroy_aclk_alarm_configuration(struct aclk_alarm_configuration *cfg)
+{
+ freez(cfg->alarm);
+ freez(cfg->tmpl);
+ freez(cfg->on_chart);
+
+ freez(cfg->classification);
+ freez(cfg->type);
+ freez(cfg->component);
+
+ freez(cfg->os);
+ freez(cfg->hosts);
+ freez(cfg->plugin);
+ freez(cfg->module);
+ freez(cfg->charts);
+ freez(cfg->families);
+ freez(cfg->lookup);
+ freez(cfg->every);
+ freez(cfg->units);
+
+ freez(cfg->green);
+ freez(cfg->red);
+
+ freez(cfg->calculation_expr);
+ freez(cfg->warning_expr);
+ freez(cfg->critical_expr);
+
+ freez(cfg->recipient);
+ freez(cfg->exec);
+ freez(cfg->delay);
+ freez(cfg->repeat);
+ freez(cfg->info);
+ freez(cfg->options);
+ freez(cfg->host_labels);
+
+ freez(cfg->p_db_lookup_dimensions);
+ freez(cfg->p_db_lookup_method);
+ freez(cfg->p_db_lookup_options);
+}
+
+char *generate_provide_alarm_configuration(size_t *len, struct provide_alarm_configuration *data)
+{
+ ProvideAlarmConfiguration msg;
+ AlarmConfiguration *cfg = msg.mutable_config();
+
+ msg.set_config_hash(data->cfg_hash);
+
+ if (data->cfg.alarm)
+ cfg->set_alarm(data->cfg.alarm);
+ if (data->cfg.tmpl)
+ cfg->set_template_(data->cfg.tmpl);
+ if(data->cfg.on_chart)
+ cfg->set_on_chart(data->cfg.on_chart);
+
+ if (data->cfg.classification)
+ cfg->set_classification(data->cfg.classification);
+ if (data->cfg.type)
+ cfg->set_type(data->cfg.type);
+ if (data->cfg.component)
+ cfg->set_component(data->cfg.component);
+
+ if (data->cfg.os)
+ cfg->set_os(data->cfg.os);
+ if (data->cfg.hosts)
+ cfg->set_hosts(data->cfg.hosts);
+ if (data->cfg.plugin)
+ cfg->set_plugin(data->cfg.plugin);
+ if(data->cfg.module)
+ cfg->set_module(data->cfg.module);
+ if(data->cfg.charts)
+ cfg->set_charts(data->cfg.charts);
+ if(data->cfg.families)
+ cfg->set_families(data->cfg.families);
+ if(data->cfg.lookup)
+ cfg->set_lookup(data->cfg.lookup);
+ if(data->cfg.every)
+ cfg->set_every(data->cfg.every);
+ if(data->cfg.units)
+ cfg->set_units(data->cfg.units);
+
+ if (data->cfg.green)
+ cfg->set_green(data->cfg.green);
+ if (data->cfg.red)
+ cfg->set_red(data->cfg.red);
+
+ if (data->cfg.calculation_expr)
+ cfg->set_calculation_expr(data->cfg.calculation_expr);
+ if (data->cfg.warning_expr)
+ cfg->set_warning_expr(data->cfg.warning_expr);
+ if (data->cfg.critical_expr)
+ cfg->set_critical_expr(data->cfg.critical_expr);
+
+ if (data->cfg.recipient)
+ cfg->set_recipient(data->cfg.recipient);
+ if (data->cfg.exec)
+ cfg->set_exec(data->cfg.exec);
+ if (data->cfg.delay)
+ cfg->set_delay(data->cfg.delay);
+ if (data->cfg.repeat)
+ cfg->set_repeat(data->cfg.repeat);
+ if (data->cfg.info)
+ cfg->set_info(data->cfg.info);
+ if (data->cfg.options)
+ cfg->set_options(data->cfg.options);
+ if (data->cfg.host_labels)
+ cfg->set_host_labels(data->cfg.host_labels);
+
+ cfg->set_p_db_lookup_after(data->cfg.p_db_lookup_after);
+ cfg->set_p_db_lookup_before(data->cfg.p_db_lookup_before);
+ if (data->cfg.p_db_lookup_dimensions)
+ cfg->set_p_db_lookup_dimensions(data->cfg.p_db_lookup_dimensions);
+ if (data->cfg.p_db_lookup_method)
+ cfg->set_p_db_lookup_method(data->cfg.p_db_lookup_method);
+ if (data->cfg.p_db_lookup_options)
+ cfg->set_p_db_lookup_options(data->cfg.p_db_lookup_options);
+ cfg->set_p_update_every(data->cfg.p_update_every);
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (!msg.SerializeToArray(bin, *len))
+ return NULL;
+
+ return bin;
+}
+
+char *parse_send_alarm_configuration(const char *data, size_t len)
+{
+ SendAlarmConfiguration msg;
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+ if (!msg.config_hash().c_str())
+ return NULL;
+ return strdupz(msg.config_hash().c_str());
+}
+
diff --git a/aclk/schema-wrappers/alarm_config.h b/aclk/schema-wrappers/alarm_config.h
new file mode 100644
index 000000000..157fbc60f
--- /dev/null
+++ b/aclk/schema-wrappers/alarm_config.h
@@ -0,0 +1,69 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H
+#define ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H
+
+#include <stdlib.h>
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct aclk_alarm_configuration {
+ char *alarm;
+ char *tmpl;
+ char *on_chart;
+
+ char *classification;
+ char *type;
+ char *component;
+
+ char *os;
+ char *hosts;
+ char *plugin;
+ char *module;
+ char *charts;
+ char *families;
+ char *lookup;
+ char *every;
+ char *units;
+
+ char *green;
+ char *red;
+
+ char *calculation_expr;
+ char *warning_expr;
+ char *critical_expr;
+
+ char *recipient;
+ char *exec;
+ char *delay;
+ char *repeat;
+ char *info;
+ char *options;
+ char *host_labels;
+
+ int32_t p_db_lookup_after;
+ int32_t p_db_lookup_before;
+ char *p_db_lookup_dimensions;
+ char *p_db_lookup_method;
+ char *p_db_lookup_options;
+ int32_t p_update_every;
+};
+
+void destroy_aclk_alarm_configuration(struct aclk_alarm_configuration *cfg);
+
+struct provide_alarm_configuration {
+ char *cfg_hash;
+ struct aclk_alarm_configuration cfg;
+};
+
+char *generate_provide_alarm_configuration(size_t *len, struct provide_alarm_configuration *data);
+char *parse_send_alarm_configuration(const char *data, size_t len);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H */
diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc
new file mode 100644
index 000000000..5868e5d67
--- /dev/null
+++ b/aclk/schema-wrappers/alarm_stream.cc
@@ -0,0 +1,248 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "alarm_stream.h"
+
+#include "proto/alarm/v1/stream.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+using namespace alarms::v1;
+
+struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len)
+{
+ struct start_alarm_streaming ret;
+ memset(&ret, 0, sizeof(ret));
+
+ StartAlarmStreaming msg;
+
+ if (!msg.ParseFromArray(data, len))
+ return ret;
+
+ ret.node_id = strdupz(msg.node_id().c_str());
+ ret.batch_id = msg.batch_id();
+ ret.start_seq_id = msg.start_sequnce_id();
+
+ return ret;
+}
+
+char *parse_send_alarm_log_health(const char *data, size_t len)
+{
+ SendAlarmLogHealth msg;
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+ return strdupz(msg.node_id().c_str());
+}
+
+char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data)
+{
+ AlarmLogHealth msg;
+ LogEntries *entries;
+
+ msg.set_claim_id(data->claim_id);
+ msg.set_node_id(data->node_id);
+ msg.set_enabled(data->enabled);
+
+ switch (data->status) {
+ case alarm_log_status_aclk::ALARM_LOG_STATUS_IDLE:
+ msg.set_status(alarms::v1::ALARM_LOG_STATUS_IDLE);
+ break;
+ case alarm_log_status_aclk::ALARM_LOG_STATUS_RUNNING:
+ msg.set_status(alarms::v1::ALARM_LOG_STATUS_RUNNING);
+ break;
+ case alarm_log_status_aclk::ALARM_LOG_STATUS_UNSPECIFIED:
+ msg.set_status(alarms::v1::ALARM_LOG_STATUS_UNSPECIFIED);
+ break;
+ default:
+ error("Unknown status of AlarmLogHealth LogEntry");
+ return NULL;
+ }
+
+ entries = msg.mutable_log_entries();
+ entries->set_first_sequence_id(data->log_entries.first_seq_id);
+ entries->set_last_sequence_id(data->log_entries.last_seq_id);
+
+ set_google_timestamp_from_timeval(data->log_entries.first_when, entries->mutable_first_when());
+ set_google_timestamp_from_timeval(data->log_entries.last_when, entries->mutable_last_when());
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (!msg.SerializeToArray(bin, *len))
+ return NULL;
+
+ return bin;
+}
+
+static alarms::v1::AlarmStatus aclk_alarm_status_to_proto(enum aclk_alarm_status status)
+{
+ switch (status) {
+ case aclk_alarm_status::ALARM_STATUS_NULL:
+ return alarms::v1::ALARM_STATUS_NULL;
+ case aclk_alarm_status::ALARM_STATUS_UNKNOWN:
+ return alarms::v1::ALARM_STATUS_UNKNOWN;
+ case aclk_alarm_status::ALARM_STATUS_REMOVED:
+ return alarms::v1::ALARM_STATUS_REMOVED;
+ case aclk_alarm_status::ALARM_STATUS_NOT_A_NUMBER:
+ return alarms::v1::ALARM_STATUS_NOT_A_NUMBER;
+ case aclk_alarm_status::ALARM_STATUS_CLEAR:
+ return alarms::v1::ALARM_STATUS_CLEAR;
+ case aclk_alarm_status::ALARM_STATUS_WARNING:
+ return alarms::v1::ALARM_STATUS_WARNING;
+ case aclk_alarm_status::ALARM_STATUS_CRITICAL:
+ return alarms::v1::ALARM_STATUS_CRITICAL;
+ default:
+ error("Unknown alarm status");
+ return alarms::v1::ALARM_STATUS_UNKNOWN;
+ }
+}
+
+void destroy_alarm_log_entry(struct alarm_log_entry *entry)
+{
+ //freez(entry->node_id);
+ //freez(entry->claim_id);
+
+ freez(entry->chart);
+ freez(entry->name);
+ freez(entry->family);
+
+ freez(entry->config_hash);
+
+ freez(entry->timezone);
+
+ freez(entry->exec_path);
+ freez(entry->conf_source);
+ freez(entry->command);
+
+ freez(entry->value_string);
+ freez(entry->old_value_string);
+
+ freez(entry->rendered_info);
+}
+
+static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *proto)
+{
+ proto->set_node_id(data->node_id);
+ proto->set_claim_id(data->claim_id);
+
+ proto->set_chart(data->chart);
+ proto->set_name(data->name);
+ if (data->family)
+ proto->set_family(data->family);
+
+ proto->set_batch_id(data->batch_id);
+ proto->set_sequence_id(data->sequence_id);
+ proto->set_when(data->when);
+
+ proto->set_config_hash(data->config_hash);
+
+ proto->set_utc_offset(data->utc_offset);
+ proto->set_timezone(data->timezone);
+
+ proto->set_exec_path(data->exec_path);
+ proto->set_conf_source(data->conf_source);
+ proto->set_command(data->command);
+
+ proto->set_duration(data->duration);
+ proto->set_non_clear_duration(data->non_clear_duration);
+
+
+ proto->set_status(aclk_alarm_status_to_proto(data->status));
+ proto->set_old_status(aclk_alarm_status_to_proto(data->old_status));
+ proto->set_delay(data->delay);
+ proto->set_delay_up_to_timestamp(data->delay_up_to_timestamp);
+
+ proto->set_last_repeat(data->last_repeat);
+ proto->set_silenced(data->silenced);
+
+ if (data->value_string)
+ proto->set_value_string(data->value_string);
+ if (data->old_value_string)
+ proto->set_old_value_string(data->old_value_string);
+
+ proto->set_value(data->value);
+ proto->set_old_value(data->old_value);
+
+ proto->set_updated(data->updated);
+
+ proto->set_rendered_info(data->rendered_info);
+}
+
+char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data)
+{
+ AlarmLogEntry le;
+
+ fill_alarm_log_entry(data, &le);
+
+ *len = PROTO_COMPAT_MSG_SIZE(le);
+ char *bin = (char*)mallocz(*len);
+ if (!le.SerializeToArray(bin, *len))
+ return NULL;
+
+ return bin;
+}
+
+struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len)
+{
+ SendAlarmSnapshot msg;
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ struct send_alarm_snapshot *ret = (struct send_alarm_snapshot*)callocz(1, sizeof(struct send_alarm_snapshot));
+ if (msg.claim_id().c_str())
+ ret->claim_id = strdupz(msg.claim_id().c_str());
+ if (msg.node_id().c_str())
+ ret->node_id = strdupz(msg.node_id().c_str());
+ ret->snapshot_id = msg.snapshot_id();
+ ret->sequence_id = msg.sequence_id();
+
+ return ret;
+}
+
+void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr)
+{
+ freez(ptr->claim_id);
+ freez(ptr->node_id);
+ freez(ptr);
+}
+
+alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data)
+{
+ AlarmSnapshot *msg = new AlarmSnapshot;
+ if (unlikely(!msg)) fatal("Cannot allocate memory for AlarmSnapshot");
+
+ msg->set_node_id(data->node_id);
+ msg->set_claim_id(data->claim_id);
+ msg->set_snapshot_id(data->snapshot_id);
+ msg->set_chunks(data->chunks);
+ msg->set_chunk(data->chunk);
+
+ // this is handled automatically by add_alarm_log_entry2snapshot function
+ msg->set_chunk_size(0);
+
+ return msg;
+}
+
+void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data)
+{
+ AlarmSnapshot *alarm_snapshot = (AlarmSnapshot *)snapshot;
+ AlarmLogEntry *alarm_log_entry = alarm_snapshot->add_alarms();
+
+ fill_alarm_log_entry(data, alarm_log_entry);
+
+ alarm_snapshot->set_chunk_size(alarm_snapshot->chunk_size() + 1);
+}
+
+char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot)
+{
+ AlarmSnapshot *alarm_snapshot = (AlarmSnapshot *)snapshot;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(alarm_snapshot);
+ char *bin = (char*)mallocz(*len);
+ if (!alarm_snapshot->SerializeToArray(bin, *len)) {
+ delete alarm_snapshot;
+ return NULL;
+ }
+
+ delete alarm_snapshot;
+ return bin;
+}
diff --git a/aclk/schema-wrappers/alarm_stream.h b/aclk/schema-wrappers/alarm_stream.h
new file mode 100644
index 000000000..2932bb192
--- /dev/null
+++ b/aclk/schema-wrappers/alarm_stream.h
@@ -0,0 +1,134 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H
+#define ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H
+
+#include <stdlib.h>
+
+#include "database/rrd.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+enum alarm_log_status_aclk {
+ ALARM_LOG_STATUS_UNSPECIFIED = 0,
+ ALARM_LOG_STATUS_RUNNING = 1,
+ ALARM_LOG_STATUS_IDLE = 2
+};
+
+struct alarm_log_entries {
+ int64_t first_seq_id;
+ struct timeval first_when;
+
+ int64_t last_seq_id;
+ struct timeval last_when;
+};
+
+struct alarm_log_health {
+ char *claim_id;
+ char *node_id;
+ int enabled;
+ enum alarm_log_status_aclk status;
+ struct alarm_log_entries log_entries;
+};
+
+struct start_alarm_streaming {
+ char *node_id;
+ uint64_t batch_id;
+ uint64_t start_seq_id;
+};
+
+struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len);
+char *parse_send_alarm_log_health(const char *data, size_t len);
+
+char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data);
+
+enum aclk_alarm_status {
+ ALARM_STATUS_NULL = 0,
+ ALARM_STATUS_UNKNOWN = 1,
+ ALARM_STATUS_REMOVED = 2,
+ ALARM_STATUS_NOT_A_NUMBER = 3,
+ ALARM_STATUS_CLEAR = 4,
+ ALARM_STATUS_WARNING = 5,
+ ALARM_STATUS_CRITICAL = 6
+};
+
+struct alarm_log_entry {
+ char *node_id;
+ char *claim_id;
+
+ char *chart;
+ char *name;
+ char *family;
+
+ uint64_t batch_id;
+ uint64_t sequence_id;
+ uint64_t when;
+
+ char *config_hash;
+
+ int32_t utc_offset;
+ char *timezone;
+
+ char *exec_path;
+ char *conf_source;
+ char *command;
+
+ uint32_t duration;
+ uint32_t non_clear_duration;
+
+ enum aclk_alarm_status status;
+ enum aclk_alarm_status old_status;
+ uint64_t delay;
+ uint64_t delay_up_to_timestamp;
+
+ uint64_t last_repeat;
+ int silenced;
+
+ char *value_string;
+ char *old_value_string;
+
+ double value;
+ double old_value;
+
+ // updated alarm entry, when the status of the alarm has been updated by a later entry
+ int updated;
+
+ // rendered_info
+ char *rendered_info;
+};
+
+struct send_alarm_snapshot {
+ char *node_id;
+ char *claim_id;
+ uint64_t snapshot_id;
+ uint64_t sequence_id;
+};
+
+struct alarm_snapshot {
+ char *node_id;
+ char *claim_id;
+ uint64_t snapshot_id;
+ uint32_t chunks;
+ uint32_t chunk;
+};
+
+typedef void* alarm_snapshot_proto_ptr_t;
+
+void destroy_alarm_log_entry(struct alarm_log_entry *entry);
+
+char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data);
+
+struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len);
+void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr);
+
+alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data);
+void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data);
+char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H */
diff --git a/aclk/schema-wrappers/chart_config.cc b/aclk/schema-wrappers/chart_config.cc
new file mode 100644
index 000000000..87e34e0df
--- /dev/null
+++ b/aclk/schema-wrappers/chart_config.cc
@@ -0,0 +1,105 @@
+#include "chart_config.h"
+
+#include "proto/chart/v1/config.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+void destroy_update_chart_config(struct update_chart_config *cfg)
+{
+ freez(cfg->claim_id);
+ freez(cfg->node_id);
+ freez(cfg->hashes);
+}
+
+void destroy_chart_config_updated(struct chart_config_updated *cfg)
+{
+ freez(cfg->type);
+ freez(cfg->family);
+ freez(cfg->context);
+ freez(cfg->title);
+ freez(cfg->plugin);
+ freez(cfg->module);
+ freez(cfg->units);
+ freez(cfg->config_hash);
+}
+
+struct update_chart_config parse_update_chart_config(const char *data, size_t len)
+{
+ chart::v1::UpdateChartConfigs cfgs;
+ update_chart_config res;
+ memset(&res, 0, sizeof(res));
+
+ if (!cfgs.ParseFromArray(data, len))
+ return res;
+
+ res.claim_id = strdupz(cfgs.claim_id().c_str());
+ res.node_id = strdupz(cfgs.node_id().c_str());
+
+ // to not do bazillion tiny allocations for individual strings
+ // we calculate how much memory we will need for all of them
+ // and allocate at once
+ int hash_count = cfgs.config_hashes_size();
+ size_t total_strlen = 0;
+ for (int i = 0; i < hash_count; i++)
+ total_strlen += cfgs.config_hashes(i).length();
+ total_strlen += hash_count; //null bytes
+
+ res.hashes = (char**)callocz( 1,
+ (hash_count+1) * sizeof(char*) + //char * array incl. terminating NULL at the end
+ total_strlen //strings themselves incl. 1 null byte each
+ );
+
+ char* dest = ((char*)res.hashes) + (hash_count + 1 /* NULL ptr */) * sizeof(char *);
+ // now copy them strings
+ // null bytes handled by callocz
+ for (int i = 0; i < hash_count; i++) {
+ strcpy(dest, cfgs.config_hashes(i).c_str());
+ res.hashes[i] = dest;
+ dest += strlen(dest) + 1 /* end string null */;
+ }
+
+ return res;
+}
+
+char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size)
+{
+ chart::v1::ChartConfigsUpdated configs;
+ for (int i = 0; i < list_size; i++) {
+ chart::v1::ChartConfigUpdated *config = configs.add_configs();
+ config->set_type(config_list[i].type);
+ if (config_list[i].family)
+ config->set_family(config_list[i].family);
+ config->set_context(config_list[i].context);
+ config->set_title(config_list[i].title);
+ config->set_priority(config_list[i].priority);
+ config->set_plugin(config_list[i].plugin);
+
+ if (config_list[i].module)
+ config->set_module(config_list[i].module);
+
+ switch (config_list[i].chart_type) {
+ case RRDSET_TYPE_LINE:
+ config->set_chart_type(chart::v1::LINE);
+ break;
+ case RRDSET_TYPE_AREA:
+ config->set_chart_type(chart::v1::AREA);
+ break;
+ case RRDSET_TYPE_STACKED:
+ config->set_chart_type(chart::v1::STACKED);
+ break;
+ default:
+ return NULL;
+ }
+
+ config->set_units(config_list[i].units);
+ config->set_config_hash(config_list[i].config_hash);
+ }
+
+ *len = PROTO_COMPAT_MSG_SIZE(configs);
+ char *bin = (char*)mallocz(*len);
+ configs.SerializeToArray(bin, *len);
+
+ return bin;
+}
diff --git a/aclk/schema-wrappers/chart_config.h b/aclk/schema-wrappers/chart_config.h
new file mode 100644
index 000000000..f08f76b61
--- /dev/null
+++ b/aclk/schema-wrappers/chart_config.h
@@ -0,0 +1,50 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H
+#define ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H
+
+#include <stdlib.h>
+
+#include "database/rrd.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct update_chart_config {
+ char *claim_id;
+ char *node_id;
+ char **hashes;
+};
+
+enum chart_config_chart_type {
+ LINE,
+ AREA,
+ STACKED
+};
+
+struct chart_config_updated {
+ char *type;
+ char *family;
+ char *context;
+ char *title;
+ uint64_t priority;
+ char *plugin;
+ char *module;
+ RRDSET_TYPE chart_type;
+ char *units;
+ char *config_hash;
+};
+
+void destroy_update_chart_config(struct update_chart_config *cfg);
+void destroy_chart_config_updated(struct chart_config_updated *cfg);
+
+struct update_chart_config parse_update_chart_config(const char *data, size_t len);
+
+char *generate_chart_configs_updated(size_t *len, const struct chart_config_updated *config_list, int list_size);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CHART_CONFIG_H */
diff --git a/aclk/schema-wrappers/chart_stream.cc b/aclk/schema-wrappers/chart_stream.cc
new file mode 100644
index 000000000..7d820e533
--- /dev/null
+++ b/aclk/schema-wrappers/chart_stream.cc
@@ -0,0 +1,342 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "aclk/aclk_util.h"
+
+#include "proto/chart/v1/stream.pb.h"
+#include "chart_stream.h"
+
+#include "schema_wrapper_utils.h"
+
+#include <sys/time.h>
+#include <stdlib.h>
+
+stream_charts_and_dims_t parse_stream_charts_and_dims(const char *data, size_t len)
+{
+ chart::v1::StreamChartsAndDimensions msg;
+ stream_charts_and_dims_t res;
+ memset(&res, 0, sizeof(res));
+
+ if (!msg.ParseFromArray(data, len))
+ return res;
+
+ res.node_id = strdup(msg.node_id().c_str());
+ res.claim_id = strdup(msg.claim_id().c_str());
+ res.seq_id = msg.sequence_id();
+ res.batch_id = msg.batch_id();
+ set_timeval_from_google_timestamp(msg.seq_id_created_at(), &res.seq_id_created_at);
+
+ return res;
+}
+
+chart_and_dim_ack_t parse_chart_and_dimensions_ack(const char *data, size_t len)
+{
+ chart::v1::ChartsAndDimensionsAck msg;
+ chart_and_dim_ack_t res = { .claim_id = NULL, .node_id = NULL, .last_seq_id = 0 };
+
+ if (!msg.ParseFromArray(data, len))
+ return res;
+
+ res.node_id = strdup(msg.node_id().c_str());
+ res.claim_id = strdup(msg.claim_id().c_str());
+ res.last_seq_id = msg.last_sequence_id();
+
+ return res;
+}
+
+char *generate_reset_chart_messages(size_t *len, chart_reset_t reset)
+{
+ chart::v1::ResetChartMessages msg;
+
+ msg.set_claim_id(reset.claim_id);
+ msg.set_node_id(reset.node_id);
+ switch (reset.reason) {
+ case DB_EMPTY:
+ msg.set_reason(chart::v1::ResetReason::DB_EMPTY);
+ break;
+ case SEQ_ID_NOT_EXISTS:
+ msg.set_reason(chart::v1::ResetReason::SEQ_ID_NOT_EXISTS);
+ break;
+ case TIMESTAMP_MISMATCH:
+ msg.set_reason(chart::v1::ResetReason::TIMESTAMP_MISMATCH);
+ break;
+ default:
+ return NULL;
+ }
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)malloc(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
+
+void chart_instance_updated_destroy(struct chart_instance_updated *instance)
+{
+ freez((char*)instance->id);
+ freez((char*)instance->claim_id);
+
+ free_label_list(instance->label_head);
+
+ freez((char*)instance->config_hash);
+}
+
+static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, const struct chart_instance_updated *update)
+{
+ google::protobuf::Map<std::string, std::string> *map;
+ aclk_lib::v1::ACLKMessagePosition *pos;
+ struct label *label;
+
+ chart->set_id(update->id);
+ chart->set_claim_id(update->claim_id);
+ chart->set_node_id(update->node_id);
+ chart->set_name(update->name);
+
+ map = chart->mutable_chart_labels();
+ label = update->label_head;
+ while (label) {
+ map->insert({label->key, label->value});
+ label = label->next;
+ }
+
+ switch (update->memory_mode) {
+ case RRD_MEMORY_MODE_NONE:
+ chart->set_memory_mode(chart::v1::NONE);
+ break;
+ case RRD_MEMORY_MODE_RAM:
+ chart->set_memory_mode(chart::v1::RAM);
+ break;
+ case RRD_MEMORY_MODE_MAP:
+ chart->set_memory_mode(chart::v1::MAP);
+ break;
+ case RRD_MEMORY_MODE_SAVE:
+ chart->set_memory_mode(chart::v1::SAVE);
+ break;
+ case RRD_MEMORY_MODE_ALLOC:
+ chart->set_memory_mode(chart::v1::ALLOC);
+ break;
+ case RRD_MEMORY_MODE_DBENGINE:
+ chart->set_memory_mode(chart::v1::DB_ENGINE);
+ break;
+ default:
+ return 1;
+ break;
+ }
+
+ chart->set_update_every_interval(update->update_every);
+ chart->set_config_hash(update->config_hash);
+
+ pos = chart->mutable_position();
+ pos->set_sequence_id(update->position.sequence_id);
+ pos->set_previous_sequence_id(update->position.previous_sequence_id);
+ set_google_timestamp_from_timeval(update->position.seq_id_creation_time, pos->mutable_seq_id_created_at());
+
+ return 0;
+}
+
+static int set_chart_dim_updated(chart::v1::ChartDimensionUpdated *dim, const struct chart_dimension_updated *c_dim)
+{
+ aclk_lib::v1::ACLKMessagePosition *pos;
+
+ dim->set_id(c_dim->id);
+ dim->set_chart_id(c_dim->chart_id);
+ dim->set_node_id(c_dim->node_id);
+ dim->set_claim_id(c_dim->claim_id);
+ dim->set_name(c_dim->name);
+
+ set_google_timestamp_from_timeval(c_dim->created_at, dim->mutable_created_at());
+ set_google_timestamp_from_timeval(c_dim->last_timestamp, dim->mutable_last_timestamp());
+
+ pos = dim->mutable_position();
+ pos->set_sequence_id(c_dim->position.sequence_id);
+ pos->set_previous_sequence_id(c_dim->position.previous_sequence_id);
+ set_google_timestamp_from_timeval(c_dim->position.seq_id_creation_time, pos->mutable_seq_id_created_at());
+
+ return 0;
+}
+
+char *generate_charts_and_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id)
+{
+ chart::v1::ChartsAndDimensionsUpdated msg;
+ chart::v1::ChartInstanceUpdated db_chart;
+ chart::v1::ChartDimensionUpdated db_dim;
+ aclk_lib::v1::ACLKMessagePosition *pos;
+
+ msg.set_batch_id(batch_id);
+
+ for (int i = 0; payloads[i]; i++) {
+ if (is_dim[i]) {
+ if (!db_dim.ParseFromArray(payloads[i], payload_sizes[i])) {
+ error("[ACLK] Could not parse chart::v1::chart_dimension_updated");
+ return NULL;
+ }
+
+ pos = db_dim.mutable_position();
+ pos->set_sequence_id(new_positions[i].sequence_id);
+ pos->set_previous_sequence_id(new_positions[i].previous_sequence_id);
+ set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at());
+
+ chart::v1::ChartDimensionUpdated *dim = msg.add_dimensions();
+ *dim = db_dim;
+ } else {
+ if (!db_chart.ParseFromArray(payloads[i], payload_sizes[i])) {
+ error("[ACLK] Could not parse chart::v1::ChartInstanceUpdated");
+ return NULL;
+ }
+
+ pos = db_chart.mutable_position();
+ pos->set_sequence_id(new_positions[i].sequence_id);
+ pos->set_previous_sequence_id(new_positions[i].previous_sequence_id);
+ set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at());
+
+ chart::v1::ChartInstanceUpdated *chart = msg.add_charts();
+ *chart = db_chart;
+ }
+ }
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
+
+char *generate_charts_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
+{
+ chart::v1::ChartsAndDimensionsUpdated msg;
+
+ msg.set_batch_id(chart_batch_id);
+
+ for (int i = 0; payloads[i]; i++) {
+ chart::v1::ChartInstanceUpdated db_msg;
+ chart::v1::ChartInstanceUpdated *chart;
+ aclk_lib::v1::ACLKMessagePosition *pos;
+
+ if (!db_msg.ParseFromArray(payloads[i], payload_sizes[i])) {
+ error("[ACLK] Could not parse chart::v1::ChartInstanceUpdated");
+ return NULL;
+ }
+
+ pos = db_msg.mutable_position();
+ pos->set_sequence_id(new_positions[i].sequence_id);
+ pos->set_previous_sequence_id(new_positions[i].previous_sequence_id);
+ set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at());
+
+ chart = msg.add_charts();
+ *chart = db_msg;
+ }
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
+
+char *generate_chart_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions)
+{
+ chart::v1::ChartsAndDimensionsUpdated msg;
+
+ msg.set_batch_id(chart_batch_id);
+
+ for (int i = 0; payloads[i]; i++) {
+ chart::v1::ChartDimensionUpdated db_msg;
+ chart::v1::ChartDimensionUpdated *dim;
+ aclk_lib::v1::ACLKMessagePosition *pos;
+
+ if (!db_msg.ParseFromArray(payloads[i], payload_sizes[i])) {
+ error("[ACLK] Could not parse chart::v1::chart_dimension_updated");
+ return NULL;
+ }
+
+ pos = db_msg.mutable_position();
+ pos->set_sequence_id(new_positions[i].sequence_id);
+ pos->set_previous_sequence_id(new_positions[i].previous_sequence_id);
+ set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at());
+
+ dim = msg.add_dimensions();
+ *dim = db_msg;
+ }
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
+
+char *generate_chart_instance_updated(size_t *len, const struct chart_instance_updated *update)
+{
+ chart::v1::ChartInstanceUpdated *chart = new chart::v1::ChartInstanceUpdated();
+
+ if (set_chart_instance_updated(chart, update))
+ return NULL;
+
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(chart);
+ char *bin = (char*)mallocz(*len);
+ chart->SerializeToArray(bin, *len);
+
+ delete chart;
+ return bin;
+}
+
+char *generate_chart_dimension_updated(size_t *len, const struct chart_dimension_updated *dim)
+{
+ chart::v1::ChartDimensionUpdated *proto_dim = new chart::v1::ChartDimensionUpdated();
+
+ if (set_chart_dim_updated(proto_dim, dim))
+ return NULL;
+
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(proto_dim);
+ char *bin = (char*)mallocz(*len);
+ proto_dim->SerializeToArray(bin, *len);
+
+ delete proto_dim;
+ return bin;
+}
+
+using namespace google::protobuf;
+
+char *generate_retention_updated(size_t *len, struct retention_updated *data)
+{
+ chart::v1::RetentionUpdated msg;
+
+ msg.set_claim_id(data->claim_id);
+ msg.set_node_id(data->node_id);
+
+ switch (data->memory_mode) {
+ case RRD_MEMORY_MODE_NONE:
+ msg.set_memory_mode(chart::v1::NONE);
+ break;
+ case RRD_MEMORY_MODE_RAM:
+ msg.set_memory_mode(chart::v1::RAM);
+ break;
+ case RRD_MEMORY_MODE_MAP:
+ msg.set_memory_mode(chart::v1::MAP);
+ break;
+ case RRD_MEMORY_MODE_SAVE:
+ msg.set_memory_mode(chart::v1::SAVE);
+ break;
+ case RRD_MEMORY_MODE_ALLOC:
+ msg.set_memory_mode(chart::v1::ALLOC);
+ break;
+ case RRD_MEMORY_MODE_DBENGINE:
+ msg.set_memory_mode(chart::v1::DB_ENGINE);
+ break;
+ default:
+ return NULL;
+ }
+
+ for (int i = 0; i < data->interval_duration_count; i++) {
+ Map<uint32, uint32> *map = msg.mutable_interval_durations();
+ map->insert({data->interval_durations[i].update_every, data->interval_durations[i].retention});
+ }
+
+ set_google_timestamp_from_timeval(data->rotation_timestamp, msg.mutable_rotation_timestamp());
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
diff --git a/aclk/schema-wrappers/chart_stream.h b/aclk/schema-wrappers/chart_stream.h
new file mode 100644
index 000000000..7a46ecd8e
--- /dev/null
+++ b/aclk/schema-wrappers/chart_stream.h
@@ -0,0 +1,121 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CHART_STREAM_H
+#define ACLK_SCHEMA_WRAPPER_CHART_STREAM_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "database/rrd.h"
+
+typedef struct {
+ char* claim_id;
+ char* node_id;
+
+ uint64_t seq_id;
+ uint64_t batch_id;
+
+ struct timeval seq_id_created_at;
+} stream_charts_and_dims_t;
+
+stream_charts_and_dims_t parse_stream_charts_and_dims(const char *data, size_t len);
+
+typedef struct {
+ char* claim_id;
+ char* node_id;
+
+ uint64_t last_seq_id;
+} chart_and_dim_ack_t;
+
+chart_and_dim_ack_t parse_chart_and_dimensions_ack(const char *data, size_t len);
+
+enum chart_reset_reason {
+ DB_EMPTY,
+ SEQ_ID_NOT_EXISTS,
+ TIMESTAMP_MISMATCH
+};
+
+typedef struct {
+ char *claim_id;
+ char *node_id;
+
+ enum chart_reset_reason reason;
+} chart_reset_t;
+
+char *generate_reset_chart_messages(size_t *len, const chart_reset_t reset);
+
+struct aclk_message_position {
+ uint64_t sequence_id;
+ struct timeval seq_id_creation_time;
+ uint64_t previous_sequence_id;
+};
+
+struct chart_instance_updated {
+ const char *id;
+ const char *claim_id;
+ const char *node_id;
+ const char *name;
+
+ struct label *label_head;
+
+ RRD_MEMORY_MODE memory_mode;
+
+ uint32_t update_every;
+ const char * config_hash;
+
+ struct aclk_message_position position;
+};
+
+void chart_instance_updated_destroy(struct chart_instance_updated *instance);
+
+struct chart_dimension_updated {
+ const char *id;
+ const char *chart_id;
+ const char *node_id;
+ const char *claim_id;
+ const char *name;
+ struct timeval created_at;
+ struct timeval last_timestamp;
+ struct aclk_message_position position;
+};
+
+typedef struct {
+ struct chart_instance_updated *charts;
+ uint16_t chart_count;
+
+ struct chart_dimension_updated *dims;
+ uint16_t dim_count;
+
+ uint64_t batch_id;
+} charts_and_dims_updated_t;
+
+struct interval_duration {
+ uint32_t update_every;
+ uint32_t retention;
+};
+
+struct retention_updated {
+ char *claim_id;
+ char *node_id;
+
+ RRD_MEMORY_MODE memory_mode;
+
+ struct interval_duration *interval_durations;
+ int interval_duration_count;
+
+ struct timeval rotation_timestamp;
+};
+
+char *generate_charts_and_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id);
+char *generate_charts_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions);
+char *generate_chart_instance_updated(size_t *len, const struct chart_instance_updated *update);
+char *generate_chart_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions);
+char *generate_chart_dimension_updated(size_t *len, const struct chart_dimension_updated *dim);
+char *generate_retention_updated(size_t *len, struct retention_updated *data);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CHART_STREAM_H */
diff --git a/aclk/schema-wrappers/connection.cc b/aclk/schema-wrappers/connection.cc
new file mode 100644
index 000000000..e3bbfe31f
--- /dev/null
+++ b/aclk/schema-wrappers/connection.cc
@@ -0,0 +1,63 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/agent/v1/connection.pb.h"
+#include "proto/agent/v1/disconnect.pb.h"
+#include "connection.h"
+
+#include "schema_wrapper_utils.h"
+
+#include <sys/time.h>
+#include <stdlib.h>
+
+using namespace agent::v1;
+
+char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data)
+{
+ UpdateAgentConnection connupd;
+
+ connupd.set_claim_id(data->claim_id);
+ connupd.set_reachable(data->reachable);
+ connupd.set_session_id(data->session_id);
+
+ connupd.set_update_source((data->lwt) ? CONNECTION_UPDATE_SOURCE_LWT : CONNECTION_UPDATE_SOURCE_AGENT);
+
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+
+ google::protobuf::Timestamp *timestamp = connupd.mutable_updated_at();
+ timestamp->set_seconds(tv.tv_sec);
+ timestamp->set_nanos(tv.tv_usec * 1000);
+
+ *len = PROTO_COMPAT_MSG_SIZE(connupd);
+ char *msg = (char*)malloc(*len);
+ if (msg)
+ connupd.SerializeToArray(msg, *len);
+
+ return msg;
+}
+
+struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len) {
+ DisconnectReq req;
+ struct disconnect_cmd *res;
+
+ if (!req.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct disconnect_cmd *)calloc(1, sizeof(struct disconnect_cmd));
+
+ if (!res)
+ return NULL;
+
+ res->reconnect_after_s = req.reconnect_after_seconds();
+ res->permaban = req.permaban();
+ res->error_code = req.error_code();
+ if (req.error_description().c_str()) {
+ res->error_description = strdup(req.error_description().c_str());
+ if (!res->error_description) {
+ free(res);
+ return NULL;
+ }
+ }
+
+ return res;
+}
diff --git a/aclk/schema-wrappers/connection.h b/aclk/schema-wrappers/connection.h
new file mode 100644
index 000000000..8c223869a
--- /dev/null
+++ b/aclk/schema-wrappers/connection.h
@@ -0,0 +1,43 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONNECTION_H
+#define ACLK_SCHEMA_WRAPPER_CONNECTION_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+ const char *claim_id;
+ unsigned int reachable:1;
+
+ int64_t session_id;
+
+ unsigned int lwt:1;
+
+// TODO in future optional fields
+// > 15 optional fields:
+// How long the system was running until connection (only applicable when reachable=true)
+// google.protobuf.Duration system_uptime = 15;
+// How long the netdata agent was running until connection (only applicable when reachable=true)
+// google.protobuf.Duration agent_uptime = 16;
+
+
+} update_agent_connection_t;
+
+char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data);
+
+struct disconnect_cmd {
+ uint64_t reconnect_after_s;
+ int permaban;
+ uint32_t error_code;
+ char *error_description;
+};
+
+struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONNECTION_H */
diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc
new file mode 100644
index 000000000..0a4c8ece1
--- /dev/null
+++ b/aclk/schema-wrappers/node_connection.cc
@@ -0,0 +1,37 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/nodeinstance/connection/v1/connection.pb.h"
+#include "node_connection.h"
+
+#include "schema_wrapper_utils.h"
+
+#include <sys/time.h>
+#include <stdlib.h>
+
+char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data) {
+ nodeinstance::v1::UpdateNodeInstanceConnection msg;
+
+ if(data->claim_id)
+ msg.set_claim_id(data->claim_id);
+ msg.set_node_id(data->node_id);
+
+ msg.set_liveness(data->live);
+ msg.set_queryable(data->queryable);
+
+ msg.set_session_id(data->session_id);
+ msg.set_hops(data->hops);
+
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+
+ google::protobuf::Timestamp *timestamp = msg.mutable_updated_at();
+ timestamp->set_seconds(tv.tv_sec);
+ timestamp->set_nanos(tv.tv_usec * 1000);
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)malloc(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h
new file mode 100644
index 000000000..3fd207213
--- /dev/null
+++ b/aclk/schema-wrappers/node_connection.h
@@ -0,0 +1,29 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
+#define ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+ const char* claim_id;
+ const char* node_id;
+
+ unsigned int live:1;
+ unsigned int queryable:1;
+
+ int64_t session_id;
+
+ int32_t hops;
+} node_instance_connection_t;
+
+char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H */
diff --git a/aclk/schema-wrappers/node_creation.cc b/aclk/schema-wrappers/node_creation.cc
new file mode 100644
index 000000000..c696bb27b
--- /dev/null
+++ b/aclk/schema-wrappers/node_creation.cc
@@ -0,0 +1,39 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/nodeinstance/create/v1/creation.pb.h"
+#include "node_creation.h"
+
+#include "schema_wrapper_utils.h"
+
+#include <stdlib.h>
+
+char *generate_node_instance_creation(size_t *len, const node_instance_creation_t *data)
+{
+ nodeinstance::create::v1::CreateNodeInstance msg;
+
+ if (data->claim_id)
+ msg.set_claim_id(data->claim_id);
+ msg.set_machine_guid(data->machine_guid);
+ msg.set_hostname(data->hostname);
+ msg.set_hops(data->hops);
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)malloc(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
+
+node_instance_creation_result_t parse_create_node_instance_result(const char *data, size_t len)
+{
+ nodeinstance::create::v1::CreateNodeInstanceResult msg;
+ node_instance_creation_result_t res = { .node_id = NULL, .machine_guid = NULL };
+
+ if (!msg.ParseFromArray(data, len))
+ return res;
+
+ res.node_id = strdup(msg.node_id().c_str());
+ res.machine_guid = strdup(msg.machine_guid().c_str());
+ return res;
+}
diff --git a/aclk/schema-wrappers/node_creation.h b/aclk/schema-wrappers/node_creation.h
new file mode 100644
index 000000000..71e45ef55
--- /dev/null
+++ b/aclk/schema-wrappers/node_creation.h
@@ -0,0 +1,31 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_NODE_CREATION_H
+#define ACLK_SCHEMA_WRAPPER_NODE_CREATION_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+ const char* claim_id;
+ const char* machine_guid;
+ const char* hostname;
+
+ int32_t hops;
+} node_instance_creation_t;
+
+typedef struct {
+ char *node_id;
+ char *machine_guid;
+} node_instance_creation_result_t;
+
+char *generate_node_instance_creation(size_t *len, const node_instance_creation_t *data);
+node_instance_creation_result_t parse_create_node_instance_result(const char *data, size_t len);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_NODE_CREATION_H */
diff --git a/aclk/schema-wrappers/node_info.cc b/aclk/schema-wrappers/node_info.cc
new file mode 100644
index 000000000..f6638aa5f
--- /dev/null
+++ b/aclk/schema-wrappers/node_info.cc
@@ -0,0 +1,95 @@
+#include "node_info.h"
+
+#include "proto/nodeinstance/info/v1/info.pb.h"
+
+#include "schema_wrapper_utils.h"
+
+static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct aclk_node_info *data)
+{
+ struct label *label;
+ google::protobuf::Map<std::string, std::string> *map;
+
+ if (data->name)
+ info->set_name(data->name);
+
+ if (data->os)
+ info->set_os(data->os);
+ if (data->os_name)
+ info->set_os_name(data->os_name);
+ if (data->os_version)
+ info->set_os_version(data->os_version);
+
+ if (data->kernel_name)
+ info->set_kernel_name(data->kernel_name);
+ if (data->kernel_version)
+ info->set_kernel_version(data->kernel_version);
+
+ if (data->architecture)
+ info->set_architecture(data->architecture);
+
+ info->set_cpus(data->cpus);
+
+ if (data->cpu_frequency)
+ info->set_cpu_frequency(data->cpu_frequency);
+
+ if (data->memory)
+ info->set_memory(data->memory);
+
+ if (data->disk_space)
+ info->set_disk_space(data->disk_space);
+
+ if (data->version)
+ info->set_version(data->version);
+
+ if (data->release_channel)
+ info->set_release_channel(data->release_channel);
+
+ if (data->timezone)
+ info->set_timezone(data->timezone);
+
+ if (data->virtualization_type)
+ info->set_virtualization_type(data->virtualization_type);
+
+ if (data->container_type)
+ info->set_container_type(data->container_type);
+
+ if (data->custom_info)
+ info->set_custom_info(data->custom_info);
+
+ for (size_t i = 0; i < data->service_count; i++)
+ info->add_services(data->services[i]);
+
+ if (data->machine_guid)
+ info->set_machine_guid(data->machine_guid);
+
+ map = info->mutable_host_labels();
+ label = data->host_labels_head;
+ while (label) {
+ map->insert({label->key, label->value});
+ label = label->next;
+ }
+
+ return 0;
+}
+
+char *generate_update_node_info_message(size_t *len, struct update_node_info *info)
+{
+ nodeinstance::info::v1::UpdateNodeInfo msg;
+
+ msg.set_node_id(info->node_id);
+ msg.set_claim_id(info->claim_id);
+
+ if (generate_node_info(msg.mutable_data(), &info->data))
+ return NULL;
+
+ set_google_timestamp_from_timeval(info->updated_at, msg.mutable_updated_at());
+ msg.set_machine_guid(info->machine_guid);
+ msg.set_child(info->child);
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)malloc(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
diff --git a/aclk/schema-wrappers/node_info.h b/aclk/schema-wrappers/node_info.h
new file mode 100644
index 000000000..4acb671a5
--- /dev/null
+++ b/aclk/schema-wrappers/node_info.h
@@ -0,0 +1,69 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_NODE_INFO_H
+#define ACLK_SCHEMA_WRAPPER_NODE_INFO_H
+
+#include <stdlib.h>
+
+#include "database/rrd.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct aclk_node_info {
+ char *name;
+
+ char *os;
+ char *os_name;
+ char *os_version;
+
+ char *kernel_name;
+ char *kernel_version;
+
+ char *architecture;
+
+ uint32_t cpus;
+
+ char *cpu_frequency;
+
+ char *memory;
+
+ char *disk_space;
+
+ char *version;
+
+ char *release_channel;
+
+ char *timezone;
+
+ char *virtualization_type;
+
+ char *container_type;
+
+ char *custom_info;
+
+ char **services;
+ size_t service_count;
+
+ char *machine_guid;
+
+ struct label *host_labels_head;
+};
+
+struct update_node_info {
+ char *node_id;
+ char *claim_id;
+ struct aclk_node_info data;
+ struct timeval updated_at;
+ char *machine_guid;
+ int child;
+};
+
+char *generate_update_node_info_message(size_t *len, struct update_node_info *info);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_NODE_INFO_H */
diff --git a/aclk/schema-wrappers/schema_wrapper_utils.cc b/aclk/schema-wrappers/schema_wrapper_utils.cc
new file mode 100644
index 000000000..b100e20c3
--- /dev/null
+++ b/aclk/schema-wrappers/schema_wrapper_utils.cc
@@ -0,0 +1,15 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "schema_wrapper_utils.h"
+
+void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts)
+{
+ ts->set_nanos(tv.tv_usec*1000);
+ ts->set_seconds(tv.tv_sec);
+}
+
+void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv)
+{
+ tv->tv_sec = ts.seconds();
+ tv->tv_usec = ts.nanos()/1000;
+}
diff --git a/aclk/schema-wrappers/schema_wrapper_utils.h b/aclk/schema-wrappers/schema_wrapper_utils.h
new file mode 100644
index 000000000..494855f82
--- /dev/null
+++ b/aclk/schema-wrappers/schema_wrapper_utils.h
@@ -0,0 +1,20 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef SCHEMA_WRAPPER_UTILS_H
+#define SCHEMA_WRAPPER_UTILS_H
+
+#include <sys/time.h>
+#include <google/protobuf/timestamp.pb.h>
+
+#if GOOGLE_PROTOBUF_VERSION < 3001000
+#define PROTO_COMPAT_MSG_SIZE(msg) (size_t)msg.ByteSize();
+#define PROTO_COMPAT_MSG_SIZE_PTR(msg) (size_t)msg->ByteSize();
+#else
+#define PROTO_COMPAT_MSG_SIZE(msg) msg.ByteSizeLong();
+#define PROTO_COMPAT_MSG_SIZE_PTR(msg) msg->ByteSizeLong();
+#endif
+
+void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts);
+void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv);
+
+#endif /* SCHEMA_WRAPPER_UTILS_H */
diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h
new file mode 100644
index 000000000..a3975fca3
--- /dev/null
+++ b/aclk/schema-wrappers/schema_wrappers.h
@@ -0,0 +1,17 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+// utility header to include all the message wrappers at once
+
+#ifndef SCHEMA_WRAPPERS_H
+#define SCHEMA_WRAPPERS_H
+
+#include "connection.h"
+#include "node_connection.h"
+#include "node_creation.h"
+#include "chart_config.h"
+#include "chart_stream.h"
+#include "alarm_config.h"
+#include "alarm_stream.h"
+#include "node_info.h"
+
+#endif /* SCHEMA_WRAPPERS_H */