summaryrefslogtreecommitdiffstats
path: root/src/aclk/schema-wrappers
diff options
context:
space:
mode:
Diffstat (limited to 'src/aclk/schema-wrappers')
-rw-r--r--src/aclk/schema-wrappers/agent_cmds.cc38
-rw-r--r--src/aclk/schema-wrappers/agent_cmds.h27
-rw-r--r--src/aclk/schema-wrappers/alarm_config.cc140
-rw-r--r--src/aclk/schema-wrappers/alarm_config.h71
-rw-r--r--src/aclk/schema-wrappers/alarm_stream.cc221
-rw-r--r--src/aclk/schema-wrappers/alarm_stream.h128
-rw-r--r--src/aclk/schema-wrappers/capability.cc11
-rw-r--r--src/aclk/schema-wrappers/capability.h24
-rw-r--r--src/aclk/schema-wrappers/connection.cc72
-rw-r--r--src/aclk/schema-wrappers/connection.h47
-rw-r--r--src/aclk/schema-wrappers/context.cc125
-rw-r--r--src/aclk/schema-wrappers/context.h53
-rw-r--r--src/aclk/schema-wrappers/context_stream.cc42
-rw-r--r--src/aclk/schema-wrappers/context_stream.h36
-rw-r--r--src/aclk/schema-wrappers/node_connection.cc46
-rw-r--r--src/aclk/schema-wrappers/node_connection.h32
-rw-r--r--src/aclk/schema-wrappers/node_creation.cc39
-rw-r--r--src/aclk/schema-wrappers/node_creation.h31
-rw-r--r--src/aclk/schema-wrappers/node_info.cc136
-rw-r--r--src/aclk/schema-wrappers/node_info.h79
-rw-r--r--src/aclk/schema-wrappers/proto_2_json.cc88
-rw-r--r--src/aclk/schema-wrappers/proto_2_json.h18
-rw-r--r--src/aclk/schema-wrappers/schema_wrapper_utils.cc22
-rw-r--r--src/aclk/schema-wrappers/schema_wrapper_utils.h24
-rw-r--r--src/aclk/schema-wrappers/schema_wrappers.h19
25 files changed, 1569 insertions, 0 deletions
diff --git a/src/aclk/schema-wrappers/agent_cmds.cc b/src/aclk/schema-wrappers/agent_cmds.cc
new file mode 100644
index 000000000..6950f4025
--- /dev/null
+++ b/src/aclk/schema-wrappers/agent_cmds.cc
@@ -0,0 +1,38 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/agent/v1/cmds.pb.h"
+
+#include "agent_cmds.h"
+
+#include "schema_wrapper_utils.h"
+
+using namespace agent::v1;
+
+int parse_cancel_pending_req(const char *msg, size_t msg_len, struct aclk_cancel_pending_req *req)
+{
+ CancelPendingRequest msg_parsed;
+
+ if (!msg_parsed.ParseFromArray(msg, msg_len)) {
+ error_report("Failed to parse CancelPendingRequest message");
+ return 1;
+ }
+
+ if (msg_parsed.request_id().c_str() == NULL) {
+ error_report("CancelPendingRequest message missing request_id");
+ return 1;
+ }
+ req->request_id = strdupz(msg_parsed.request_id().c_str());
+
+ if (msg_parsed.trace_id().c_str())
+ req->trace_id = strdupz(msg_parsed.trace_id().c_str());
+
+ set_timeval_from_google_timestamp(msg_parsed.timestamp(), &req->timestamp);
+
+ return 0;
+}
+
+void free_cancel_pending_req(struct aclk_cancel_pending_req *req)
+{
+ freez(req->request_id);
+ freez(req->trace_id);
+}
diff --git a/src/aclk/schema-wrappers/agent_cmds.h b/src/aclk/schema-wrappers/agent_cmds.h
new file mode 100644
index 000000000..7e01f86c5
--- /dev/null
+++ b/src/aclk/schema-wrappers/agent_cmds.h
@@ -0,0 +1,27 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H
+#define ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H
+
+#include "libnetdata/libnetdata.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct aclk_cancel_pending_req {
+ char *request_id;
+
+ struct timeval timestamp;
+
+ char *trace_id;
+};
+
+int parse_cancel_pending_req(const char *msg, size_t msg_len, struct aclk_cancel_pending_req *req);
+void free_cancel_pending_req(struct aclk_cancel_pending_req *req);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H */
diff --git a/src/aclk/schema-wrappers/alarm_config.cc b/src/aclk/schema-wrappers/alarm_config.cc
new file mode 100644
index 000000000..64d28f324
--- /dev/null
+++ b/src/aclk/schema-wrappers/alarm_config.cc
@@ -0,0 +1,140 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "alarm_config.h"
+
+#include "proto/alarm/v1/config.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+using namespace alarms::v1;
+
+void destroy_aclk_alarm_configuration(struct aclk_alarm_configuration *cfg)
+{
+ freez(cfg->alarm);
+ freez(cfg->tmpl);
+ freez(cfg->on_chart);
+ freez(cfg->classification);
+ freez(cfg->type);
+ freez(cfg->component);
+ freez(cfg->os);
+ freez(cfg->hosts);
+ freez(cfg->plugin);
+ freez(cfg->module);
+ freez(cfg->charts);
+ freez(cfg->lookup);
+ freez(cfg->every);
+ freez(cfg->units);
+ freez(cfg->green);
+ freez(cfg->red);
+ freez(cfg->calculation_expr);
+ freez(cfg->warning_expr);
+ freez(cfg->critical_expr);
+ freez(cfg->recipient);
+ freez(cfg->exec);
+ freez(cfg->delay);
+ freez(cfg->repeat);
+ freez(cfg->info);
+ freez(cfg->options);
+ freez(cfg->host_labels);
+ freez(cfg->p_db_lookup_dimensions);
+ freez(cfg->p_db_lookup_method);
+ freez(cfg->p_db_lookup_options);
+ freez(cfg->chart_labels);
+ freez(cfg->summary);
+}
+
+char *generate_provide_alarm_configuration(size_t *len, struct provide_alarm_configuration *data)
+{
+ ProvideAlarmConfiguration msg;
+ AlarmConfiguration *cfg = msg.mutable_config();
+
+ msg.set_config_hash(data->cfg_hash);
+
+ if (data->cfg.alarm)
+ cfg->set_alarm(data->cfg.alarm);
+ if (data->cfg.tmpl)
+ cfg->set_template_(data->cfg.tmpl);
+ if(data->cfg.on_chart)
+ cfg->set_on_chart(data->cfg.on_chart);
+ if (data->cfg.classification)
+ cfg->set_classification(data->cfg.classification);
+ if (data->cfg.type)
+ cfg->set_type(data->cfg.type);
+ if (data->cfg.component)
+ cfg->set_component(data->cfg.component);
+ if (data->cfg.os)
+ cfg->set_os(data->cfg.os);
+ if (data->cfg.hosts)
+ cfg->set_hosts(data->cfg.hosts);
+ if (data->cfg.plugin)
+ cfg->set_plugin(data->cfg.plugin);
+ if(data->cfg.module)
+ cfg->set_module(data->cfg.module);
+ if(data->cfg.charts)
+ cfg->set_charts(data->cfg.charts);
+ if(data->cfg.lookup)
+ cfg->set_lookup(data->cfg.lookup);
+ if(data->cfg.every)
+ cfg->set_every(data->cfg.every);
+ if(data->cfg.units)
+ cfg->set_units(data->cfg.units);
+ if (data->cfg.green)
+ cfg->set_green(data->cfg.green);
+ if (data->cfg.red)
+ cfg->set_red(data->cfg.red);
+ if (data->cfg.calculation_expr)
+ cfg->set_calculation_expr(data->cfg.calculation_expr);
+ if (data->cfg.warning_expr)
+ cfg->set_warning_expr(data->cfg.warning_expr);
+ if (data->cfg.critical_expr)
+ cfg->set_critical_expr(data->cfg.critical_expr);
+ if (data->cfg.recipient)
+ cfg->set_recipient(data->cfg.recipient);
+ if (data->cfg.exec)
+ cfg->set_exec(data->cfg.exec);
+ if (data->cfg.delay)
+ cfg->set_delay(data->cfg.delay);
+ if (data->cfg.repeat)
+ cfg->set_repeat(data->cfg.repeat);
+ if (data->cfg.info)
+ cfg->set_info(data->cfg.info);
+ if (data->cfg.options)
+ cfg->set_options(data->cfg.options);
+ if (data->cfg.host_labels)
+ cfg->set_host_labels(data->cfg.host_labels);
+
+ cfg->set_p_db_lookup_after(data->cfg.p_db_lookup_after);
+ cfg->set_p_db_lookup_before(data->cfg.p_db_lookup_before);
+ if (data->cfg.p_db_lookup_dimensions)
+ cfg->set_p_db_lookup_dimensions(data->cfg.p_db_lookup_dimensions);
+ if (data->cfg.p_db_lookup_method)
+ cfg->set_p_db_lookup_method(data->cfg.p_db_lookup_method);
+ if (data->cfg.p_db_lookup_options)
+ cfg->set_p_db_lookup_options(data->cfg.p_db_lookup_options);
+ cfg->set_p_update_every(data->cfg.p_update_every);
+
+ if (data->cfg.chart_labels)
+ cfg->set_chart_labels(data->cfg.chart_labels);
+ if (data->cfg.summary)
+ cfg->set_summary(data->cfg.summary);
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (!msg.SerializeToArray(bin, *len))
+ return NULL;
+
+ return bin;
+}
+
+char *parse_send_alarm_configuration(const char *data, size_t len)
+{
+ SendAlarmConfiguration msg;
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+ if (!msg.config_hash().c_str())
+ return NULL;
+ return strdupz(msg.config_hash().c_str());
+}
+
diff --git a/src/aclk/schema-wrappers/alarm_config.h b/src/aclk/schema-wrappers/alarm_config.h
new file mode 100644
index 000000000..3c9a5d9a8
--- /dev/null
+++ b/src/aclk/schema-wrappers/alarm_config.h
@@ -0,0 +1,71 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H
+#define ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H
+
+#include <stdlib.h>
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct aclk_alarm_configuration {
+ char *alarm;
+ char *tmpl;
+ char *on_chart;
+
+ char *classification;
+ char *type;
+ char *component;
+
+ char *os;
+ char *hosts;
+ char *plugin;
+ char *module;
+ char *charts;
+ char *lookup;
+ char *every;
+ char *units;
+
+ char *green;
+ char *red;
+
+ char *calculation_expr;
+ char *warning_expr;
+ char *critical_expr;
+
+ char *recipient;
+ char *exec;
+ char *delay;
+ char *repeat;
+ char *info;
+ char *options;
+ char *host_labels;
+
+ int32_t p_db_lookup_after;
+ int32_t p_db_lookup_before;
+ char *p_db_lookup_dimensions;
+ char *p_db_lookup_method;
+ char *p_db_lookup_options;
+ int32_t p_update_every;
+
+ char *chart_labels;
+ char *summary;
+};
+
+void destroy_aclk_alarm_configuration(struct aclk_alarm_configuration *cfg);
+
+struct provide_alarm_configuration {
+ char *cfg_hash;
+ struct aclk_alarm_configuration cfg;
+};
+
+char *generate_provide_alarm_configuration(size_t *len, struct provide_alarm_configuration *data);
+char *parse_send_alarm_configuration(const char *data, size_t len);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_ALARM_CONFIG_H */
diff --git a/src/aclk/schema-wrappers/alarm_stream.cc b/src/aclk/schema-wrappers/alarm_stream.cc
new file mode 100644
index 000000000..29d80e39e
--- /dev/null
+++ b/src/aclk/schema-wrappers/alarm_stream.cc
@@ -0,0 +1,221 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "alarm_stream.h"
+
+#include "proto/alarm/v1/stream.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+using namespace alarms::v1;
+
+struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len)
+{
+ struct start_alarm_streaming ret;
+ memset(&ret, 0, sizeof(ret));
+
+ StartAlarmStreaming msg;
+
+ if (!msg.ParseFromArray(data, len))
+ return ret;
+
+ ret.node_id = strdupz(msg.node_id().c_str());
+ ret.resets = msg.resets();
+
+ return ret;
+}
+
+struct send_alarm_checkpoint parse_send_alarm_checkpoint(const char *data, size_t len)
+{
+ struct send_alarm_checkpoint ret;
+ memset(&ret, 0, sizeof(ret));
+
+ SendAlarmCheckpoint msg;
+ if (!msg.ParseFromArray(data, len))
+ return ret;
+
+ ret.node_id = strdupz(msg.node_id().c_str());
+ ret.claim_id = strdupz(msg.claim_id().c_str());
+
+ return ret;
+}
+
+static alarms::v1::AlarmStatus aclk_alarm_status_to_proto(enum aclk_alarm_status status)
+{
+ switch (status) {
+ case aclk_alarm_status::ALARM_STATUS_NULL:
+ return alarms::v1::ALARM_STATUS_NULL;
+ case aclk_alarm_status::ALARM_STATUS_UNKNOWN:
+ return alarms::v1::ALARM_STATUS_UNKNOWN;
+ case aclk_alarm_status::ALARM_STATUS_REMOVED:
+ return alarms::v1::ALARM_STATUS_REMOVED;
+ case aclk_alarm_status::ALARM_STATUS_NOT_A_NUMBER:
+ return alarms::v1::ALARM_STATUS_NOT_A_NUMBER;
+ case aclk_alarm_status::ALARM_STATUS_CLEAR:
+ return alarms::v1::ALARM_STATUS_CLEAR;
+ case aclk_alarm_status::ALARM_STATUS_WARNING:
+ return alarms::v1::ALARM_STATUS_WARNING;
+ case aclk_alarm_status::ALARM_STATUS_CRITICAL:
+ return alarms::v1::ALARM_STATUS_CRITICAL;
+ default:
+ netdata_log_error("Unknown alarm status");
+ return alarms::v1::ALARM_STATUS_UNKNOWN;
+ }
+}
+
+void destroy_alarm_log_entry(struct alarm_log_entry *entry)
+{
+ freez(entry->chart);
+ freez(entry->name);
+ freez(entry->config_hash);
+ freez(entry->timezone);
+ freez(entry->exec_path);
+ freez(entry->conf_source);
+ freez(entry->command);
+ freez(entry->value_string);
+ freez(entry->old_value_string);
+ freez(entry->rendered_info);
+ freez(entry->chart_context);
+ freez(entry->transition_id);
+ freez(entry->chart_name);
+ freez(entry->summary);
+}
+
+static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *proto)
+{
+ proto->set_node_id(data->node_id);
+ proto->set_claim_id(data->claim_id);
+ proto->set_chart(data->chart);
+ proto->set_name(data->name);
+ proto->set_when(data->when);
+ proto->set_config_hash(data->config_hash);
+ proto->set_utc_offset(data->utc_offset);
+ proto->set_timezone(data->timezone);
+ proto->set_exec_path(data->exec_path);
+ proto->set_conf_source(data->conf_source);
+ proto->set_command(data->command);
+ proto->set_duration(data->duration);
+ proto->set_non_clear_duration(data->non_clear_duration);
+ proto->set_status(aclk_alarm_status_to_proto(data->status));
+ proto->set_old_status(aclk_alarm_status_to_proto(data->old_status));
+ proto->set_delay(data->delay);
+ proto->set_delay_up_to_timestamp(data->delay_up_to_timestamp);
+ proto->set_last_repeat(data->last_repeat);
+ proto->set_silenced(data->silenced);
+
+ if (data->value_string)
+ proto->set_value_string(data->value_string);
+ if (data->old_value_string)
+ proto->set_old_value_string(data->old_value_string);
+
+ proto->set_value(data->value);
+ proto->set_old_value(data->old_value);
+ proto->set_updated(data->updated);
+ proto->set_rendered_info(data->rendered_info);
+ proto->set_chart_context(data->chart_context);
+ proto->set_event_id(data->event_id);
+ proto->set_transition_id(data->transition_id);
+ proto->set_chart_name(data->chart_name);
+ proto->set_summary(data->summary);
+}
+
+char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data)
+{
+ AlarmLogEntry le;
+
+ fill_alarm_log_entry(data, &le);
+
+ *len = PROTO_COMPAT_MSG_SIZE(le);
+ char *bin = (char*)mallocz(*len);
+ if (!le.SerializeToArray(bin, *len)) {
+ freez(bin);
+ return NULL;
+ }
+
+ return bin;
+}
+
+char *generate_alarm_checkpoint(size_t *len, struct alarm_checkpoint *data)
+{
+ AlarmCheckpoint msg;
+
+ msg.set_claim_id(data->claim_id);
+ msg.set_node_id(data->node_id);
+ msg.set_checksum(data->checksum);
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (!msg.SerializeToArray(bin, *len)) {
+ freez(bin);
+ return NULL;
+ }
+
+ return bin;
+}
+
+struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len)
+{
+ SendAlarmSnapshot msg;
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ struct send_alarm_snapshot *ret = (struct send_alarm_snapshot*)callocz(1, sizeof(struct send_alarm_snapshot));
+ if (msg.claim_id().c_str())
+ ret->claim_id = strdupz(msg.claim_id().c_str());
+ if (msg.node_id().c_str())
+ ret->node_id = strdupz(msg.node_id().c_str());
+ if (msg.snapshot_uuid().c_str())
+ ret->snapshot_uuid = strdupz(msg.snapshot_uuid().c_str());
+
+ return ret;
+}
+
+void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr)
+{
+ freez(ptr->claim_id);
+ freez(ptr->node_id);
+ freez(ptr->snapshot_uuid);
+ freez(ptr);
+}
+
+alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data)
+{
+ AlarmSnapshot *msg = new AlarmSnapshot;
+ if (unlikely(!msg)) fatal("Cannot allocate memory for AlarmSnapshot");
+
+ msg->set_node_id(data->node_id);
+ msg->set_claim_id(data->claim_id);
+ msg->set_snapshot_uuid(data->snapshot_uuid);
+ msg->set_chunks(data->chunks);
+ msg->set_chunk(data->chunk);
+
+ // this is handled automatically by add_alarm_log_entry2snapshot function
+ msg->set_chunk_size(0);
+
+ return msg;
+}
+
+void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data)
+{
+ AlarmSnapshot *alarm_snapshot = (AlarmSnapshot *)snapshot;
+ AlarmLogEntry *alarm_log_entry = alarm_snapshot->add_alarms();
+
+ fill_alarm_log_entry(data, alarm_log_entry);
+
+ alarm_snapshot->set_chunk_size(alarm_snapshot->chunk_size() + 1);
+}
+
+char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot)
+{
+ AlarmSnapshot *alarm_snapshot = (AlarmSnapshot *)snapshot;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(alarm_snapshot);
+ char *bin = (char*)mallocz(*len);
+ if (!alarm_snapshot->SerializeToArray(bin, *len)) {
+ delete alarm_snapshot;
+ return NULL;
+ }
+
+ delete alarm_snapshot;
+ return bin;
+}
diff --git a/src/aclk/schema-wrappers/alarm_stream.h b/src/aclk/schema-wrappers/alarm_stream.h
new file mode 100644
index 000000000..3c81ff445
--- /dev/null
+++ b/src/aclk/schema-wrappers/alarm_stream.h
@@ -0,0 +1,128 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H
+#define ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H
+
+#include <stdlib.h>
+
+#include "database/rrd.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct start_alarm_streaming {
+ char *node_id;
+ bool resets;
+};
+
+struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len);
+
+enum aclk_alarm_status {
+ ALARM_STATUS_NULL = 0,
+ ALARM_STATUS_UNKNOWN = 1,
+ ALARM_STATUS_REMOVED = 2,
+ ALARM_STATUS_NOT_A_NUMBER = 3,
+ ALARM_STATUS_CLEAR = 4,
+ ALARM_STATUS_WARNING = 5,
+ ALARM_STATUS_CRITICAL = 6
+};
+
+struct alarm_log_entry {
+ char *node_id;
+ char *claim_id;
+
+ char *chart;
+ char *name;
+ char *family;
+
+ uint64_t batch_id;
+ uint64_t sequence_id;
+ uint64_t when;
+
+ char *config_hash;
+
+ int32_t utc_offset;
+ char *timezone;
+
+ char *exec_path;
+ char *conf_source;
+ char *command;
+
+ uint32_t duration;
+ uint32_t non_clear_duration;
+
+ enum aclk_alarm_status status;
+ enum aclk_alarm_status old_status;
+ uint64_t delay;
+ uint64_t delay_up_to_timestamp;
+
+ uint64_t last_repeat;
+ int silenced;
+
+ char *value_string;
+ char *old_value_string;
+
+ double value;
+ double old_value;
+
+ // updated alarm entry, when the status of the alarm has been updated by a later entry
+ int updated;
+
+ // rendered_info
+ char *rendered_info;
+
+ char *chart_context;
+ char *chart_name;
+
+ uint64_t event_id;
+ char *transition_id;
+ char *summary;
+};
+
+struct send_alarm_checkpoint {
+ char *node_id;
+ char *claim_id;
+};
+
+struct alarm_checkpoint {
+ char *node_id;
+ char *claim_id;
+ char *checksum;
+};
+
+struct send_alarm_snapshot {
+ char *node_id;
+ char *claim_id;
+ char *snapshot_uuid;
+};
+
+struct alarm_snapshot {
+ char *node_id;
+ char *claim_id;
+ char *snapshot_uuid;
+ uint32_t chunks;
+ uint32_t chunk;
+};
+
+typedef void* alarm_snapshot_proto_ptr_t;
+
+void destroy_alarm_log_entry(struct alarm_log_entry *entry);
+
+char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data);
+
+struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len);
+void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr);
+
+struct send_alarm_checkpoint parse_send_alarm_checkpoint(const char *data, size_t len);
+char *generate_alarm_checkpoint(size_t *len, struct alarm_checkpoint *data);
+
+alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data);
+void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data);
+char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_ALARM_STREAM_H */
diff --git a/src/aclk/schema-wrappers/capability.cc b/src/aclk/schema-wrappers/capability.cc
new file mode 100644
index 000000000..af45740a9
--- /dev/null
+++ b/src/aclk/schema-wrappers/capability.cc
@@ -0,0 +1,11 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/aclk/v1/lib.pb.h"
+
+#include "capability.h"
+
+void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa) {
+ proto_capa->set_name(c_capa->name);
+ proto_capa->set_enabled(c_capa->enabled);
+ proto_capa->set_version(c_capa->version);
+}
diff --git a/src/aclk/schema-wrappers/capability.h b/src/aclk/schema-wrappers/capability.h
new file mode 100644
index 000000000..c6085a44b
--- /dev/null
+++ b/src/aclk/schema-wrappers/capability.h
@@ -0,0 +1,24 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_CAPABILITY_H
+#define ACLK_SCHEMA_CAPABILITY_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct capability {
+ const char *name;
+ uint32_t version;
+ int enabled;
+};
+
+#ifdef __cplusplus
+}
+
+#include "proto/aclk/v1/lib.pb.h"
+
+void capability_set(aclk_lib::v1::Capability *proto_capa, const struct capability *c_capa);
+#endif
+
+#endif /* ACLK_SCHEMA_CAPABILITY_H */
diff --git a/src/aclk/schema-wrappers/connection.cc b/src/aclk/schema-wrappers/connection.cc
new file mode 100644
index 000000000..20b40ece2
--- /dev/null
+++ b/src/aclk/schema-wrappers/connection.cc
@@ -0,0 +1,72 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/agent/v1/connection.pb.h"
+#include "proto/agent/v1/disconnect.pb.h"
+#include "connection.h"
+
+#include "schema_wrapper_utils.h"
+
+#include <sys/time.h>
+#include <stdlib.h>
+
+using namespace agent::v1;
+
+char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data)
+{
+ UpdateAgentConnection connupd;
+
+ connupd.set_claim_id(data->claim_id);
+ connupd.set_reachable(data->reachable);
+ connupd.set_session_id(data->session_id);
+
+ connupd.set_update_source((data->lwt) ? CONNECTION_UPDATE_SOURCE_LWT : CONNECTION_UPDATE_SOURCE_AGENT);
+
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+
+ google::protobuf::Timestamp *timestamp = connupd.mutable_updated_at();
+ timestamp->set_seconds(tv.tv_sec);
+ timestamp->set_nanos(tv.tv_usec * 1000);
+
+ if (data->capabilities) {
+ const struct capability *capa = data->capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = connupd.add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+
+ *len = PROTO_COMPAT_MSG_SIZE(connupd);
+ char *msg = (char*)mallocz(*len);
+ if (msg)
+ connupd.SerializeToArray(msg, *len);
+
+ return msg;
+}
+
+struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len) {
+ DisconnectReq req;
+ struct disconnect_cmd *res;
+
+ if (!req.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct disconnect_cmd *)callocz(1, sizeof(struct disconnect_cmd));
+
+ if (!res)
+ return NULL;
+
+ res->reconnect_after_s = req.reconnect_after_seconds();
+ res->permaban = req.permaban();
+ res->error_code = req.error_code();
+ if (req.error_description().c_str()) {
+ res->error_description = strdupz(req.error_description().c_str());
+ if (!res->error_description) {
+ freez(res);
+ return NULL;
+ }
+ }
+
+ return res;
+}
diff --git a/src/aclk/schema-wrappers/connection.h b/src/aclk/schema-wrappers/connection.h
new file mode 100644
index 000000000..0356c7d78
--- /dev/null
+++ b/src/aclk/schema-wrappers/connection.h
@@ -0,0 +1,47 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONNECTION_H
+#define ACLK_SCHEMA_WRAPPER_CONNECTION_H
+
+#include "capability.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+ const char *claim_id;
+ unsigned int reachable:1;
+
+ int64_t session_id;
+
+ unsigned int lwt:1;
+
+ const struct capability *capabilities;
+
+// TODO in future optional fields
+// > 15 optional fields:
+// How long the system was running until connection (only applicable when reachable=true)
+// google.protobuf.Duration system_uptime = 15;
+// How long the netdata agent was running until connection (only applicable when reachable=true)
+// google.protobuf.Duration agent_uptime = 16;
+
+
+} update_agent_connection_t;
+
+char *generate_update_agent_connection(size_t *len, const update_agent_connection_t *data);
+
+struct disconnect_cmd {
+ uint64_t reconnect_after_s;
+ int permaban;
+ uint32_t error_code;
+ char *error_description;
+};
+
+struct disconnect_cmd *parse_disconnect_cmd(const char *data, size_t len);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONNECTION_H */
diff --git a/src/aclk/schema-wrappers/context.cc b/src/aclk/schema-wrappers/context.cc
new file mode 100644
index 000000000..b04c9d20c
--- /dev/null
+++ b/src/aclk/schema-wrappers/context.cc
@@ -0,0 +1,125 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/context/v1/context.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+#include "context.h"
+
+using namespace context::v1;
+
+// ContextsSnapshot
+contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version)
+{
+ ContextsSnapshot *ctxs_snap = new ContextsSnapshot;
+
+ if (ctxs_snap == NULL)
+ fatal("Cannot allocate ContextsSnapshot object. OOM");
+
+ ctxs_snap->set_claim_id(claim_id);
+ ctxs_snap->set_node_id(node_id);
+ ctxs_snap->set_version(version);
+
+ return ctxs_snap;
+}
+
+void contexts_snapshot_delete(contexts_snapshot_t snapshot)
+{
+ delete (ContextsSnapshot *)snapshot;
+}
+
+void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version)
+{
+ ((ContextsSnapshot *)ctxs_snapshot)->set_version(version);
+}
+
+static void fill_ctx_updated(ContextUpdated *ctx, struct context_updated *c_ctx)
+{
+ ctx->set_id(c_ctx->id);
+ ctx->set_version(c_ctx->version);
+ ctx->set_first_entry(c_ctx->first_entry);
+ ctx->set_last_entry(c_ctx->last_entry);
+ ctx->set_deleted(c_ctx->deleted);
+ ctx->set_title(c_ctx->title);
+ ctx->set_priority(c_ctx->priority);
+ ctx->set_chart_type(c_ctx->chart_type);
+ ctx->set_units(c_ctx->units);
+ ctx->set_family(c_ctx->family);
+}
+
+void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update)
+{
+ ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
+ ContextUpdated *ctx = ctxs_snap->add_contexts();
+
+ fill_ctx_updated(ctx, ctx_update);
+}
+
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len)
+{
+ ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap);
+ char *bin = (char*)mallocz(*len);
+ if (!ctxs_snap->SerializeToArray(bin, *len)) {
+ freez(bin);
+ delete ctxs_snap;
+ return NULL;
+ }
+
+ delete ctxs_snap;
+ return bin;
+}
+
+// ContextsUpdated
+contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at)
+{
+ ContextsUpdated *ctxs_updated = new ContextsUpdated;
+
+ if (ctxs_updated == NULL)
+ fatal("Cannot allocate ContextsUpdated object. OOM");
+
+ ctxs_updated->set_claim_id(claim_id);
+ ctxs_updated->set_node_id(node_id);
+ ctxs_updated->set_version_hash(version_hash);
+ ctxs_updated->set_created_at(created_at);
+
+ return ctxs_updated;
+}
+
+void contexts_updated_delete(contexts_updated_t ctxs_updated)
+{
+ delete (ContextsUpdated *)ctxs_updated;
+}
+
+void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash)
+{
+ ((ContextsUpdated *)ctxs_updated)->set_version_hash(version_hash);
+}
+
+void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update)
+{
+ ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
+ ContextUpdated *ctx = ctxs_update->add_contextupdates();
+
+ if (ctx == NULL)
+ fatal("Cannot allocate ContextUpdated object. OOM");
+
+ fill_ctx_updated(ctx, ctx_update);
+}
+
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len)
+{
+ ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update);
+ char *bin = (char*)mallocz(*len);
+ if (!ctxs_update->SerializeToArray(bin, *len)) {
+ freez(bin);
+ delete ctxs_update;
+ return NULL;
+ }
+
+ delete ctxs_update;
+ return bin;
+}
diff --git a/src/aclk/schema-wrappers/context.h b/src/aclk/schema-wrappers/context.h
new file mode 100644
index 000000000..cbb7701a8
--- /dev/null
+++ b/src/aclk/schema-wrappers/context.h
@@ -0,0 +1,53 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_H
+#define ACLK_SCHEMA_WRAPPER_CONTEXT_H
+
+#include <stdint.h>
+#include <sys/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef void* contexts_updated_t;
+typedef void* contexts_snapshot_t;
+
+struct context_updated {
+ // context id
+ const char *id;
+
+ uint64_t version;
+
+ uint64_t first_entry;
+ uint64_t last_entry;
+
+ int deleted;
+
+ const char *title;
+ uint64_t priority;
+ const char *chart_type;
+ const char *units;
+ const char *family;
+};
+
+// ContextS Snapshot related
+contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version);
+void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot);
+void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version);
+void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update);
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len);
+
+// ContextS Updated related
+contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at);
+void contexts_updated_delete(contexts_updated_t ctxs_updated);
+void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash);
+void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update);
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_H */
diff --git a/src/aclk/schema-wrappers/context_stream.cc b/src/aclk/schema-wrappers/context_stream.cc
new file mode 100644
index 000000000..3bb1956cb
--- /dev/null
+++ b/src/aclk/schema-wrappers/context_stream.cc
@@ -0,0 +1,42 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/context/v1/stream.pb.h"
+
+#include "context_stream.h"
+
+#include "libnetdata/libnetdata.h"
+
+struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len)
+{
+ context::v1::StopStreamingContexts msg;
+
+ struct stop_streaming_ctxs *res;
+
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct stop_streaming_ctxs *)callocz(1, sizeof(struct stop_streaming_ctxs));
+
+ res->claim_id = strdupz(msg.claim_id().c_str());
+ res->node_id = strdupz(msg.node_id().c_str());
+
+ return res;
+}
+
+struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len)
+{
+ context::v1::ContextsCheckpoint msg;
+
+ struct ctxs_checkpoint *res;
+
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct ctxs_checkpoint *)callocz(1, sizeof(struct ctxs_checkpoint));
+
+ res->claim_id = strdupz(msg.claim_id().c_str());
+ res->node_id = strdupz(msg.node_id().c_str());
+ res->version_hash = msg.version_hash();
+
+ return res;
+}
diff --git a/src/aclk/schema-wrappers/context_stream.h b/src/aclk/schema-wrappers/context_stream.h
new file mode 100644
index 000000000..8c691d2cc
--- /dev/null
+++ b/src/aclk/schema-wrappers/context_stream.h
@@ -0,0 +1,36 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H
+#define ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct stop_streaming_ctxs {
+ char *claim_id;
+ char *node_id;
+ // we omit reason as there is only one defined at this point
+ // as soon as there is more than one defined in StopStreaminContextsReason
+ // we should add it
+ // 0 - RATE_LIMIT_EXCEEDED
+};
+
+struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len);
+
+struct ctxs_checkpoint {
+ char *claim_id;
+ char *node_id;
+
+ uint64_t version_hash;
+};
+
+struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len);
+
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H */
diff --git a/src/aclk/schema-wrappers/node_connection.cc b/src/aclk/schema-wrappers/node_connection.cc
new file mode 100644
index 000000000..db1fa6449
--- /dev/null
+++ b/src/aclk/schema-wrappers/node_connection.cc
@@ -0,0 +1,46 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/nodeinstance/connection/v1/connection.pb.h"
+#include "node_connection.h"
+
+#include "schema_wrapper_utils.h"
+
+#include <sys/time.h>
+#include <stdlib.h>
+
+char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data) {
+ nodeinstance::v1::UpdateNodeInstanceConnection msg;
+
+ if(data->claim_id)
+ msg.set_claim_id(data->claim_id);
+ msg.set_node_id(data->node_id);
+
+ msg.set_liveness(data->live);
+ msg.set_queryable(data->queryable);
+
+ msg.set_session_id(data->session_id);
+ msg.set_hops(data->hops);
+
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+
+ google::protobuf::Timestamp *timestamp = msg.mutable_updated_at();
+ timestamp->set_seconds(tv.tv_sec);
+ timestamp->set_nanos(tv.tv_usec * 1000);
+
+ if (data->capabilities) {
+ const struct capability *capa = data->capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = msg.add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
diff --git a/src/aclk/schema-wrappers/node_connection.h b/src/aclk/schema-wrappers/node_connection.h
new file mode 100644
index 000000000..dac0d8fe0
--- /dev/null
+++ b/src/aclk/schema-wrappers/node_connection.h
@@ -0,0 +1,32 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
+#define ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
+
+#include "capability.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+ const char* claim_id;
+ const char* node_id;
+
+ unsigned int live:1;
+ unsigned int queryable:1;
+
+ int64_t session_id;
+
+ int32_t hops;
+ const struct capability *capabilities;
+} node_instance_connection_t;
+
+char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H */
diff --git a/src/aclk/schema-wrappers/node_creation.cc b/src/aclk/schema-wrappers/node_creation.cc
new file mode 100644
index 000000000..5ad25b7e5
--- /dev/null
+++ b/src/aclk/schema-wrappers/node_creation.cc
@@ -0,0 +1,39 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/nodeinstance/create/v1/creation.pb.h"
+#include "node_creation.h"
+
+#include "schema_wrapper_utils.h"
+
+#include <stdlib.h>
+
+char *generate_node_instance_creation(size_t *len, const node_instance_creation_t *data)
+{
+ nodeinstance::create::v1::CreateNodeInstance msg;
+
+ if (data->claim_id)
+ msg.set_claim_id(data->claim_id);
+ msg.set_machine_guid(data->machine_guid);
+ msg.set_hostname(data->hostname);
+ msg.set_hops(data->hops);
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
+
+node_instance_creation_result_t parse_create_node_instance_result(const char *data, size_t len)
+{
+ nodeinstance::create::v1::CreateNodeInstanceResult msg;
+ node_instance_creation_result_t res = { .node_id = NULL, .machine_guid = NULL };
+
+ if (!msg.ParseFromArray(data, len))
+ return res;
+
+ res.node_id = strdupz(msg.node_id().c_str());
+ res.machine_guid = strdupz(msg.machine_guid().c_str());
+ return res;
+}
diff --git a/src/aclk/schema-wrappers/node_creation.h b/src/aclk/schema-wrappers/node_creation.h
new file mode 100644
index 000000000..7a8c7f7c7
--- /dev/null
+++ b/src/aclk/schema-wrappers/node_creation.h
@@ -0,0 +1,31 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_NODE_CREATION_H
+#define ACLK_SCHEMA_WRAPPER_NODE_CREATION_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+ const char *claim_id;
+ const char *machine_guid;
+ const char *hostname;
+
+ int32_t hops;
+} node_instance_creation_t;
+
+typedef struct {
+ char *node_id;
+ char *machine_guid;
+} node_instance_creation_result_t;
+
+char *generate_node_instance_creation(size_t *len, const node_instance_creation_t *data);
+node_instance_creation_result_t parse_create_node_instance_result(const char *data, size_t len);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_NODE_CREATION_H */
diff --git a/src/aclk/schema-wrappers/node_info.cc b/src/aclk/schema-wrappers/node_info.cc
new file mode 100644
index 000000000..5e321f688
--- /dev/null
+++ b/src/aclk/schema-wrappers/node_info.cc
@@ -0,0 +1,136 @@
+#include "node_info.h"
+
+#include "proto/nodeinstance/info/v1/info.pb.h"
+
+#include "schema_wrapper_utils.h"
+
+static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct aclk_node_info *data)
+{
+ google::protobuf::Map<std::string, std::string> *map;
+
+ if (data->name)
+ info->set_name(data->name);
+
+ if (data->os)
+ info->set_os(data->os);
+ if (data->os_name)
+ info->set_os_name(data->os_name);
+ if (data->os_version)
+ info->set_os_version(data->os_version);
+
+ if (data->kernel_name)
+ info->set_kernel_name(data->kernel_name);
+ if (data->kernel_version)
+ info->set_kernel_version(data->kernel_version);
+
+ if (data->architecture)
+ info->set_architecture(data->architecture);
+
+ info->set_cpus(data->cpus);
+
+ if (data->cpu_frequency)
+ info->set_cpu_frequency(data->cpu_frequency);
+
+ if (data->memory)
+ info->set_memory(data->memory);
+
+ if (data->disk_space)
+ info->set_disk_space(data->disk_space);
+
+ if (data->version)
+ info->set_version(data->version);
+
+ if (data->release_channel)
+ info->set_release_channel(data->release_channel);
+
+ if (data->timezone)
+ info->set_timezone(data->timezone);
+
+ if (data->virtualization_type)
+ info->set_virtualization_type(data->virtualization_type);
+
+ if (data->container_type)
+ info->set_container_type(data->container_type);
+
+ if (data->custom_info)
+ info->set_custom_info(data->custom_info);
+
+ if (data->machine_guid)
+ info->set_machine_guid(data->machine_guid);
+
+ nodeinstance::info::v1::MachineLearningInfo *ml_info = info->mutable_ml_info();
+ ml_info->set_ml_capable(data->ml_info.ml_capable);
+ ml_info->set_ml_enabled(data->ml_info.ml_enabled);
+
+ map = info->mutable_host_labels();
+ rrdlabels_walkthrough_read(data->host_labels_ptr, label_add_to_map_callback, map);
+ return 0;
+}
+
+char *generate_update_node_info_message(size_t *len, struct update_node_info *info)
+{
+ nodeinstance::info::v1::UpdateNodeInfo msg;
+
+ msg.set_node_id(info->node_id);
+ msg.set_claim_id(info->claim_id);
+
+ if (generate_node_info(msg.mutable_data(), &info->data))
+ return NULL;
+
+ set_google_timestamp_from_timeval(info->updated_at, msg.mutable_updated_at());
+ msg.set_machine_guid(info->machine_guid);
+ msg.set_child(info->child);
+
+ nodeinstance::info::v1::MachineLearningInfo *ml_info = msg.mutable_ml_info();
+ ml_info->set_ml_capable(info->ml_info.ml_capable);
+ ml_info->set_ml_enabled(info->ml_info.ml_enabled);
+
+ struct capability *capa;
+ if (info->node_capabilities) {
+ capa = info->node_capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = msg.mutable_node_info()->add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+ if (info->node_instance_capabilities) {
+ capa = info->node_instance_capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = msg.mutable_node_instance_info()->add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
+
+char *generate_update_node_collectors_message(size_t *len, struct update_node_collectors *upd_node_collectors)
+{
+ nodeinstance::info::v1::UpdateNodeCollectors msg;
+
+ msg.set_node_id(upd_node_collectors->node_id);
+ msg.set_claim_id(upd_node_collectors->claim_id);
+
+ void *colls;
+ dfe_start_read(upd_node_collectors->node_collectors, colls) {
+ struct collector_info *c =(struct collector_info *)colls;
+ nodeinstance::info::v1::CollectorInfo *col = msg.add_collectors();
+ col->set_plugin(c->plugin);
+ col->set_module(c->module);
+ }
+ dfe_done(colls);
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)mallocz(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
diff --git a/src/aclk/schema-wrappers/node_info.h b/src/aclk/schema-wrappers/node_info.h
new file mode 100644
index 000000000..4f57601df
--- /dev/null
+++ b/src/aclk/schema-wrappers/node_info.h
@@ -0,0 +1,79 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_NODE_INFO_H
+#define ACLK_SCHEMA_WRAPPER_NODE_INFO_H
+
+#include <stdlib.h>
+#include <stdint.h>
+
+#include "capability.h"
+#include "database/rrd.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct machine_learning_info {
+ bool ml_capable;
+ bool ml_enabled;
+};
+
+struct aclk_node_info {
+ const char *name;
+
+ const char *os;
+ const char *os_name;
+ const char *os_version;
+ const char *kernel_name;
+ const char *kernel_version;
+ const char *architecture;
+ uint32_t cpus;
+ const char *cpu_frequency;
+ const char *memory;
+ const char *disk_space;
+ const char *version;
+ const char *release_channel;
+ const char *timezone;
+ const char *virtualization_type;
+ const char *container_type;
+ const char *custom_info;
+ const char *machine_guid;
+
+ RRDLABELS *host_labels_ptr;
+ struct machine_learning_info ml_info;
+};
+
+struct update_node_info {
+ char *node_id;
+ char *claim_id;
+ struct aclk_node_info data;
+ struct timeval updated_at;
+ char *machine_guid;
+ int child;
+
+ struct machine_learning_info ml_info;
+
+ struct capability *node_capabilities;
+ struct capability *node_instance_capabilities;
+};
+
+struct collector_info {
+ const char *module;
+ const char *plugin;
+};
+
+struct update_node_collectors {
+ char *claim_id;
+ char *node_id;
+ DICTIONARY *node_collectors;
+};
+
+char *generate_update_node_info_message(size_t *len, struct update_node_info *info);
+
+char *generate_update_node_collectors_message(size_t *len, struct update_node_collectors *collectors);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_NODE_INFO_H */
diff --git a/src/aclk/schema-wrappers/proto_2_json.cc b/src/aclk/schema-wrappers/proto_2_json.cc
new file mode 100644
index 000000000..854396510
--- /dev/null
+++ b/src/aclk/schema-wrappers/proto_2_json.cc
@@ -0,0 +1,88 @@
+#include <google/protobuf/message.h>
+#include <google/protobuf/util/json_util.h>
+
+#include "proto/alarm/v1/config.pb.h"
+#include "proto/alarm/v1/stream.pb.h"
+#include "proto/aclk/v1/lib.pb.h"
+#include "proto/agent/v1/connection.pb.h"
+#include "proto/agent/v1/disconnect.pb.h"
+#include "proto/nodeinstance/connection/v1/connection.pb.h"
+#include "proto/nodeinstance/create/v1/creation.pb.h"
+#include "proto/nodeinstance/info/v1/info.pb.h"
+#include "proto/context/v1/stream.pb.h"
+#include "proto/context/v1/context.pb.h"
+#include "proto/agent/v1/cmds.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "proto_2_json.h"
+
+using namespace google::protobuf::util;
+
+static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
+{
+//tx side
+ if (!strcmp(msgname, "UpdateAgentConnection"))
+ return new agent::v1::UpdateAgentConnection;
+ if (!strcmp(msgname, "UpdateNodeInstanceConnection"))
+ return new nodeinstance::v1::UpdateNodeInstanceConnection;
+ if (!strcmp(msgname, "CreateNodeInstance"))
+ return new nodeinstance::create::v1::CreateNodeInstance;
+ if (!strcmp(msgname, "UpdateNodeInfo"))
+ return new nodeinstance::info::v1::UpdateNodeInfo;
+ if (!strcmp(msgname, "AlarmCheckpoint"))
+ return new alarms::v1::AlarmCheckpoint;
+ if (!strcmp(msgname, "ProvideAlarmConfiguration"))
+ return new alarms::v1::ProvideAlarmConfiguration;
+ if (!strcmp(msgname, "AlarmSnapshot"))
+ return new alarms::v1::AlarmSnapshot;
+ if (!strcmp(msgname, "AlarmLogEntry"))
+ return new alarms::v1::AlarmLogEntry;
+ if (!strcmp(msgname, "UpdateNodeCollectors"))
+ return new nodeinstance::info::v1::UpdateNodeCollectors;
+ if (!strcmp(msgname, "ContextsUpdated"))
+ return new context::v1::ContextsUpdated;
+ if (!strcmp(msgname, "ContextsSnapshot"))
+ return new context::v1::ContextsSnapshot;
+
+//rx side
+ if (!strcmp(msgname, "CreateNodeInstanceResult"))
+ return new nodeinstance::create::v1::CreateNodeInstanceResult;
+ if (!strcmp(msgname, "SendNodeInstances"))
+ return new agent::v1::SendNodeInstances;
+ if (!strcmp(msgname, "StartAlarmStreaming"))
+ return new alarms::v1::StartAlarmStreaming;
+ if (!strcmp(msgname, "SendAlarmCheckpoint"))
+ return new alarms::v1::SendAlarmCheckpoint;
+ if (!strcmp(msgname, "SendAlarmConfiguration"))
+ return new alarms::v1::SendAlarmConfiguration;
+ if (!strcmp(msgname, "SendAlarmSnapshot"))
+ return new alarms::v1::SendAlarmSnapshot;
+ if (!strcmp(msgname, "DisconnectReq"))
+ return new agent::v1::DisconnectReq;
+ if (!strcmp(msgname, "ContextsCheckpoint"))
+ return new context::v1::ContextsCheckpoint;
+ if (!strcmp(msgname, "StopStreamingContexts"))
+ return new context::v1::StopStreamingContexts;
+ if (!strcmp(msgname, "CancelPendingRequest"))
+ return new agent::v1::CancelPendingRequest;
+
+ return NULL;
+}
+
+char *protomsg_to_json(const void *protobin, size_t len, const char *msgname)
+{
+ google::protobuf::Message *msg = msg_name_to_protomsg(msgname);
+ if (msg == NULL)
+ return strdupz("Don't know this message type by name.");
+
+ if (!msg->ParseFromArray(protobin, len))
+ return strdupz("Can't parse this message. Malformed or wrong parser used.");
+
+ JsonPrintOptions options;
+
+ std::string output;
+ google::protobuf::util::MessageToJsonString(*msg, &output, options);
+ delete msg;
+ return strdupz(output.c_str());
+}
diff --git a/src/aclk/schema-wrappers/proto_2_json.h b/src/aclk/schema-wrappers/proto_2_json.h
new file mode 100644
index 000000000..3bd98478c
--- /dev/null
+++ b/src/aclk/schema-wrappers/proto_2_json.h
@@ -0,0 +1,18 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef PROTO_2_JSON_H
+#define PROTO_2_JSON_H
+
+#include <sys/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+char *protomsg_to_json(const void *protobin, size_t len, const char *msgname);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* PROTO_2_JSON_H */
diff --git a/src/aclk/schema-wrappers/schema_wrapper_utils.cc b/src/aclk/schema-wrappers/schema_wrapper_utils.cc
new file mode 100644
index 000000000..96a4b9bf1
--- /dev/null
+++ b/src/aclk/schema-wrappers/schema_wrapper_utils.cc
@@ -0,0 +1,22 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "schema_wrapper_utils.h"
+
+void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts)
+{
+ ts->set_nanos(tv.tv_usec*1000);
+ ts->set_seconds(tv.tv_sec);
+}
+
+void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv)
+{
+ tv->tv_sec = ts.seconds();
+ tv->tv_usec = ts.nanos()/1000;
+}
+
+int label_add_to_map_callback(const char *name, const char *value, RRDLABEL_SRC ls __maybe_unused, void *data)
+{
+ auto map = (google::protobuf::Map<std::string, std::string> *)data;
+ map->insert({name, value});
+ return 1;
+}
diff --git a/src/aclk/schema-wrappers/schema_wrapper_utils.h b/src/aclk/schema-wrappers/schema_wrapper_utils.h
new file mode 100644
index 000000000..693a4ce5f
--- /dev/null
+++ b/src/aclk/schema-wrappers/schema_wrapper_utils.h
@@ -0,0 +1,24 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef SCHEMA_WRAPPER_UTILS_H
+#define SCHEMA_WRAPPER_UTILS_H
+
+#include "database/rrd.h"
+
+#include <sys/time.h>
+#include <google/protobuf/timestamp.pb.h>
+#include <google/protobuf/map.h>
+
+#if GOOGLE_PROTOBUF_VERSION < 3001000
+#define PROTO_COMPAT_MSG_SIZE(msg) (size_t)msg.ByteSize()
+#define PROTO_COMPAT_MSG_SIZE_PTR(msg) (size_t)msg->ByteSize()
+#else
+#define PROTO_COMPAT_MSG_SIZE(msg) msg.ByteSizeLong()
+#define PROTO_COMPAT_MSG_SIZE_PTR(msg) msg->ByteSizeLong()
+#endif
+
+void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts);
+void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv);
+int label_add_to_map_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data);
+
+#endif /* SCHEMA_WRAPPER_UTILS_H */
diff --git a/src/aclk/schema-wrappers/schema_wrappers.h b/src/aclk/schema-wrappers/schema_wrappers.h
new file mode 100644
index 000000000..b651b8845
--- /dev/null
+++ b/src/aclk/schema-wrappers/schema_wrappers.h
@@ -0,0 +1,19 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+// utility header to include all the message wrappers at once
+
+#ifndef SCHEMA_WRAPPERS_H
+#define SCHEMA_WRAPPERS_H
+
+#include "connection.h"
+#include "node_connection.h"
+#include "node_creation.h"
+#include "alarm_config.h"
+#include "alarm_stream.h"
+#include "node_info.h"
+#include "capability.h"
+#include "context_stream.h"
+#include "context.h"
+#include "agent_cmds.h"
+
+#endif /* SCHEMA_WRAPPERS_H */