summaryrefslogtreecommitdiffstats
path: root/aclk/schema-wrappers
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--aclk/schema-wrappers/alarm_stream.cc3
-rw-r--r--aclk/schema-wrappers/alarm_stream.h2
-rw-r--r--aclk/schema-wrappers/chart_stream.cc9
-rw-r--r--aclk/schema-wrappers/chart_stream.h2
-rw-r--r--aclk/schema-wrappers/context.cc125
-rw-r--r--aclk/schema-wrappers/context.h53
-rw-r--r--aclk/schema-wrappers/context_stream.cc42
-rw-r--r--aclk/schema-wrappers/context_stream.h36
-rw-r--r--aclk/schema-wrappers/node_connection.cc9
-rw-r--r--aclk/schema-wrappers/node_connection.h3
-rw-r--r--aclk/schema-wrappers/node_info.cc35
-rw-r--r--aclk/schema-wrappers/node_info.h21
-rw-r--r--aclk/schema-wrappers/proto_2_json.cc101
-rw-r--r--aclk/schema-wrappers/proto_2_json.h18
-rw-r--r--aclk/schema-wrappers/schema_wrapper_utils.cc7
-rw-r--r--aclk/schema-wrappers/schema_wrapper_utils.h4
-rw-r--r--aclk/schema-wrappers/schema_wrappers.h2
17 files changed, 449 insertions, 23 deletions
diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc
index 338e512d8..f64393300 100644
--- a/aclk/schema-wrappers/alarm_stream.cc
+++ b/aclk/schema-wrappers/alarm_stream.cc
@@ -118,6 +118,7 @@ void destroy_alarm_log_entry(struct alarm_log_entry *entry)
freez(entry->old_value_string);
freez(entry->rendered_info);
+ freez(entry->chart_context);
}
static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *proto)
@@ -166,6 +167,8 @@ static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *pr
proto->set_updated(data->updated);
proto->set_rendered_info(data->rendered_info);
+
+ proto->set_chart_context(data->chart_context);
}
char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data)
diff --git a/aclk/schema-wrappers/alarm_stream.h b/aclk/schema-wrappers/alarm_stream.h
index 2932bb192..63911da3f 100644
--- a/aclk/schema-wrappers/alarm_stream.h
+++ b/aclk/schema-wrappers/alarm_stream.h
@@ -97,6 +97,8 @@ struct alarm_log_entry {
// rendered_info
char *rendered_info;
+
+ char *chart_context;
};
struct send_alarm_snapshot {
diff --git a/aclk/schema-wrappers/chart_stream.cc b/aclk/schema-wrappers/chart_stream.cc
index 7d820e533..54c940758 100644
--- a/aclk/schema-wrappers/chart_stream.cc
+++ b/aclk/schema-wrappers/chart_stream.cc
@@ -76,7 +76,7 @@ void chart_instance_updated_destroy(struct chart_instance_updated *instance)
freez((char*)instance->id);
freez((char*)instance->claim_id);
- free_label_list(instance->label_head);
+ rrdlabels_destroy(instance->chart_labels);
freez((char*)instance->config_hash);
}
@@ -85,7 +85,6 @@ static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, co
{
google::protobuf::Map<std::string, std::string> *map;
aclk_lib::v1::ACLKMessagePosition *pos;
- struct label *label;
chart->set_id(update->id);
chart->set_claim_id(update->claim_id);
@@ -93,11 +92,7 @@ static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, co
chart->set_name(update->name);
map = chart->mutable_chart_labels();
- label = update->label_head;
- while (label) {
- map->insert({label->key, label->value});
- label = label->next;
- }
+ rrdlabels_walkthrough_read(update->chart_labels, label_add_to_map_callback, map);
switch (update->memory_mode) {
case RRD_MEMORY_MODE_NONE:
diff --git a/aclk/schema-wrappers/chart_stream.h b/aclk/schema-wrappers/chart_stream.h
index 7a46ecd8e..904866868 100644
--- a/aclk/schema-wrappers/chart_stream.h
+++ b/aclk/schema-wrappers/chart_stream.h
@@ -57,7 +57,7 @@ struct chart_instance_updated {
const char *node_id;
const char *name;
- struct label *label_head;
+ DICTIONARY *chart_labels;
RRD_MEMORY_MODE memory_mode;
diff --git a/aclk/schema-wrappers/context.cc b/aclk/schema-wrappers/context.cc
new file mode 100644
index 000000000..b04c9d20c
--- /dev/null
+++ b/aclk/schema-wrappers/context.cc
@@ -0,0 +1,125 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/context/v1/context.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "schema_wrapper_utils.h"
+
+#include "context.h"
+
+using namespace context::v1;
+
+// ContextsSnapshot
+contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version)
+{
+ ContextsSnapshot *ctxs_snap = new ContextsSnapshot;
+
+ if (ctxs_snap == NULL)
+ fatal("Cannot allocate ContextsSnapshot object. OOM");
+
+ ctxs_snap->set_claim_id(claim_id);
+ ctxs_snap->set_node_id(node_id);
+ ctxs_snap->set_version(version);
+
+ return ctxs_snap;
+}
+
+void contexts_snapshot_delete(contexts_snapshot_t snapshot)
+{
+ delete (ContextsSnapshot *)snapshot;
+}
+
+void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version)
+{
+ ((ContextsSnapshot *)ctxs_snapshot)->set_version(version);
+}
+
+static void fill_ctx_updated(ContextUpdated *ctx, struct context_updated *c_ctx)
+{
+ ctx->set_id(c_ctx->id);
+ ctx->set_version(c_ctx->version);
+ ctx->set_first_entry(c_ctx->first_entry);
+ ctx->set_last_entry(c_ctx->last_entry);
+ ctx->set_deleted(c_ctx->deleted);
+ ctx->set_title(c_ctx->title);
+ ctx->set_priority(c_ctx->priority);
+ ctx->set_chart_type(c_ctx->chart_type);
+ ctx->set_units(c_ctx->units);
+ ctx->set_family(c_ctx->family);
+}
+
+void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update)
+{
+ ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
+ ContextUpdated *ctx = ctxs_snap->add_contexts();
+
+ fill_ctx_updated(ctx, ctx_update);
+}
+
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len)
+{
+ ContextsSnapshot *ctxs_snap = (ContextsSnapshot *)ctxs_snapshot;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_snap);
+ char *bin = (char*)mallocz(*len);
+ if (!ctxs_snap->SerializeToArray(bin, *len)) {
+ freez(bin);
+ delete ctxs_snap;
+ return NULL;
+ }
+
+ delete ctxs_snap;
+ return bin;
+}
+
+// ContextsUpdated
+contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at)
+{
+ ContextsUpdated *ctxs_updated = new ContextsUpdated;
+
+ if (ctxs_updated == NULL)
+ fatal("Cannot allocate ContextsUpdated object. OOM");
+
+ ctxs_updated->set_claim_id(claim_id);
+ ctxs_updated->set_node_id(node_id);
+ ctxs_updated->set_version_hash(version_hash);
+ ctxs_updated->set_created_at(created_at);
+
+ return ctxs_updated;
+}
+
+void contexts_updated_delete(contexts_updated_t ctxs_updated)
+{
+ delete (ContextsUpdated *)ctxs_updated;
+}
+
+void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash)
+{
+ ((ContextsUpdated *)ctxs_updated)->set_version_hash(version_hash);
+}
+
+void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update)
+{
+ ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
+ ContextUpdated *ctx = ctxs_update->add_contextupdates();
+
+ if (ctx == NULL)
+ fatal("Cannot allocate ContextUpdated object. OOM");
+
+ fill_ctx_updated(ctx, ctx_update);
+}
+
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len)
+{
+ ContextsUpdated *ctxs_update = (ContextsUpdated *)ctxs_updated;
+ *len = PROTO_COMPAT_MSG_SIZE_PTR(ctxs_update);
+ char *bin = (char*)mallocz(*len);
+ if (!ctxs_update->SerializeToArray(bin, *len)) {
+ freez(bin);
+ delete ctxs_update;
+ return NULL;
+ }
+
+ delete ctxs_update;
+ return bin;
+}
diff --git a/aclk/schema-wrappers/context.h b/aclk/schema-wrappers/context.h
new file mode 100644
index 000000000..cbb7701a8
--- /dev/null
+++ b/aclk/schema-wrappers/context.h
@@ -0,0 +1,53 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_H
+#define ACLK_SCHEMA_WRAPPER_CONTEXT_H
+
+#include <stdint.h>
+#include <sys/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef void* contexts_updated_t;
+typedef void* contexts_snapshot_t;
+
+struct context_updated {
+ // context id
+ const char *id;
+
+ uint64_t version;
+
+ uint64_t first_entry;
+ uint64_t last_entry;
+
+ int deleted;
+
+ const char *title;
+ uint64_t priority;
+ const char *chart_type;
+ const char *units;
+ const char *family;
+};
+
+// ContextS Snapshot related
+contexts_snapshot_t contexts_snapshot_new(const char *claim_id, const char *node_id, uint64_t version);
+void contexts_snapshot_delete(contexts_snapshot_t ctxs_snapshot);
+void contexts_snapshot_set_version(contexts_snapshot_t ctxs_snapshot, uint64_t version);
+void contexts_snapshot_add_ctx_update(contexts_snapshot_t ctxs_snapshot, struct context_updated *ctx_update);
+char *contexts_snapshot_2bin(contexts_snapshot_t ctxs_snapshot, size_t *len);
+
+// ContextS Updated related
+contexts_updated_t contexts_updated_new(const char *claim_id, const char *node_id, uint64_t version_hash, uint64_t created_at);
+void contexts_updated_delete(contexts_updated_t ctxs_updated);
+void contexts_updated_update_version_hash(contexts_updated_t ctxs_updated, uint64_t version_hash);
+void contexts_updated_add_ctx_update(contexts_updated_t ctxs_updated, struct context_updated *ctx_update);
+char *contexts_updated_2bin(contexts_updated_t ctxs_updated, size_t *len);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_H */
diff --git a/aclk/schema-wrappers/context_stream.cc b/aclk/schema-wrappers/context_stream.cc
new file mode 100644
index 000000000..3bb1956cb
--- /dev/null
+++ b/aclk/schema-wrappers/context_stream.cc
@@ -0,0 +1,42 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/context/v1/stream.pb.h"
+
+#include "context_stream.h"
+
+#include "libnetdata/libnetdata.h"
+
+struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len)
+{
+ context::v1::StopStreamingContexts msg;
+
+ struct stop_streaming_ctxs *res;
+
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct stop_streaming_ctxs *)callocz(1, sizeof(struct stop_streaming_ctxs));
+
+ res->claim_id = strdupz(msg.claim_id().c_str());
+ res->node_id = strdupz(msg.node_id().c_str());
+
+ return res;
+}
+
+struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len)
+{
+ context::v1::ContextsCheckpoint msg;
+
+ struct ctxs_checkpoint *res;
+
+ if (!msg.ParseFromArray(data, len))
+ return NULL;
+
+ res = (struct ctxs_checkpoint *)callocz(1, sizeof(struct ctxs_checkpoint));
+
+ res->claim_id = strdupz(msg.claim_id().c_str());
+ res->node_id = strdupz(msg.node_id().c_str());
+ res->version_hash = msg.version_hash();
+
+ return res;
+}
diff --git a/aclk/schema-wrappers/context_stream.h b/aclk/schema-wrappers/context_stream.h
new file mode 100644
index 000000000..8c691d2cc
--- /dev/null
+++ b/aclk/schema-wrappers/context_stream.h
@@ -0,0 +1,36 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H
+#define ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct stop_streaming_ctxs {
+ char *claim_id;
+ char *node_id;
+ // we omit reason as there is only one defined at this point
+ // as soon as there is more than one defined in StopStreaminContextsReason
+ // we should add it
+ // 0 - RATE_LIMIT_EXCEEDED
+};
+
+struct stop_streaming_ctxs *parse_stop_streaming_ctxs(const char *data, size_t len);
+
+struct ctxs_checkpoint {
+ char *claim_id;
+ char *node_id;
+
+ uint64_t version_hash;
+};
+
+struct ctxs_checkpoint *parse_ctxs_checkpoint(const char *data, size_t len);
+
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPER_CONTEXT_STREAM_H */
diff --git a/aclk/schema-wrappers/node_connection.cc b/aclk/schema-wrappers/node_connection.cc
index 0a4c8ece1..a6ca8ef98 100644
--- a/aclk/schema-wrappers/node_connection.cc
+++ b/aclk/schema-wrappers/node_connection.cc
@@ -28,6 +28,15 @@ char *generate_node_instance_connection(size_t *len, const node_instance_connect
timestamp->set_seconds(tv.tv_sec);
timestamp->set_nanos(tv.tv_usec * 1000);
+ if (data->capabilities) {
+ struct capability *capa = data->capabilities;
+ while (capa->name) {
+ aclk_lib::v1::Capability *proto_capa = msg.add_capabilities();
+ capability_set(proto_capa, capa);
+ capa++;
+ }
+ }
+
*len = PROTO_COMPAT_MSG_SIZE(msg);
char *bin = (char*)malloc(*len);
if (bin)
diff --git a/aclk/schema-wrappers/node_connection.h b/aclk/schema-wrappers/node_connection.h
index 3fd207213..c27729d15 100644
--- a/aclk/schema-wrappers/node_connection.h
+++ b/aclk/schema-wrappers/node_connection.h
@@ -3,6 +3,8 @@
#ifndef ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
#define ACLK_SCHEMA_WRAPPER_NODE_CONNECTION_H
+#include "capability.h"
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -17,6 +19,7 @@ typedef struct {
int64_t session_id;
int32_t hops;
+ struct capability *capabilities;
} node_instance_connection_t;
char *generate_node_instance_connection(size_t *len, const node_instance_connection_t *data);
diff --git a/aclk/schema-wrappers/node_info.cc b/aclk/schema-wrappers/node_info.cc
index f66985246..2a05ddaba 100644
--- a/aclk/schema-wrappers/node_info.cc
+++ b/aclk/schema-wrappers/node_info.cc
@@ -6,7 +6,6 @@
static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct aclk_node_info *data)
{
- struct label *label;
google::protobuf::Map<std::string, std::string> *map;
if (data->name)
@@ -56,9 +55,6 @@ static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct acl
if (data->custom_info)
info->set_custom_info(data->custom_info);
- for (size_t i = 0; i < data->service_count; i++)
- info->add_services(data->services[i]);
-
if (data->machine_guid)
info->set_machine_guid(data->machine_guid);
@@ -67,12 +63,7 @@ static int generate_node_info(nodeinstance::info::v1::NodeInfo *info, struct acl
ml_info->set_ml_enabled(data->ml_info.ml_enabled);
map = info->mutable_host_labels();
- label = data->host_labels_head;
- while (label) {
- map->insert({label->key, label->value});
- label = label->next;
- }
-
+ rrdlabels_walkthrough_read(data->host_labels_ptr, label_add_to_map_callback, map);
return 0;
}
@@ -119,3 +110,27 @@ char *generate_update_node_info_message(size_t *len, struct update_node_info *in
return bin;
}
+
+char *generate_update_node_collectors_message(size_t *len, struct update_node_collectors *upd_node_collectors)
+{
+ nodeinstance::info::v1::UpdateNodeCollectors msg;
+
+ msg.set_node_id(upd_node_collectors->node_id);
+ msg.set_claim_id(upd_node_collectors->claim_id);
+
+ void *colls;
+ dfe_start_read(upd_node_collectors->node_collectors, colls) {
+ struct collector_info *c =(struct collector_info *)colls;
+ nodeinstance::info::v1::CollectorInfo *col = msg.add_collectors();
+ col->set_plugin(c->plugin);
+ col->set_module(c->module);
+ }
+ dfe_done(colls);
+
+ *len = PROTO_COMPAT_MSG_SIZE(msg);
+ char *bin = (char*)malloc(*len);
+ if (bin)
+ msg.SerializeToArray(bin, *len);
+
+ return bin;
+}
diff --git a/aclk/schema-wrappers/node_info.h b/aclk/schema-wrappers/node_info.h
index e67f3e1da..e8ac2d7c6 100644
--- a/aclk/schema-wrappers/node_info.h
+++ b/aclk/schema-wrappers/node_info.h
@@ -4,9 +4,10 @@
#define ACLK_SCHEMA_WRAPPER_NODE_INFO_H
#include <stdlib.h>
+#include <stdint.h>
-#include "database/rrd.h"
#include "capability.h"
+#include "database/rrd.h"
#ifdef __cplusplus
extern "C" {
@@ -49,12 +50,9 @@ struct aclk_node_info {
char *custom_info;
- char **services;
- size_t service_count;
-
char *machine_guid;
- struct label *host_labels_head;
+ DICTIONARY *host_labels_ptr;
struct machine_learning_info ml_info;
};
@@ -73,8 +71,21 @@ struct update_node_info {
struct capability *node_instance_capabilities;
};
+struct collector_info {
+ char *module;
+ char *plugin;
+};
+
+struct update_node_collectors {
+ char *claim_id;
+ char *node_id;
+ DICTIONARY *node_collectors;
+};
+
char *generate_update_node_info_message(size_t *len, struct update_node_info *info);
+char *generate_update_node_collectors_message(size_t *len, struct update_node_collectors *collectors);
+
#ifdef __cplusplus
}
#endif
diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc
new file mode 100644
index 000000000..0e473eb6c
--- /dev/null
+++ b/aclk/schema-wrappers/proto_2_json.cc
@@ -0,0 +1,101 @@
+#include <google/protobuf/message.h>
+#include <google/protobuf/util/json_util.h>
+
+#include "proto/alarm/v1/config.pb.h"
+#include "proto/alarm/v1/stream.pb.h"
+#include "proto/aclk/v1/lib.pb.h"
+#include "proto/chart/v1/config.pb.h"
+#include "proto/chart/v1/stream.pb.h"
+#include "proto/agent/v1/connection.pb.h"
+#include "proto/agent/v1/disconnect.pb.h"
+#include "proto/nodeinstance/connection/v1/connection.pb.h"
+#include "proto/nodeinstance/create/v1/creation.pb.h"
+#include "proto/nodeinstance/info/v1/info.pb.h"
+#include "proto/context/v1/stream.pb.h"
+#include "proto/context/v1/context.pb.h"
+
+#include "libnetdata/libnetdata.h"
+
+#include "proto_2_json.h"
+
+using namespace google::protobuf::util;
+
+static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
+{
+//tx side
+ if (!strcmp(msgname, "UpdateAgentConnection"))
+ return new agent::v1::UpdateAgentConnection;
+ if (!strcmp(msgname, "UpdateNodeInstanceConnection"))
+ return new nodeinstance::v1::UpdateNodeInstanceConnection;
+ if (!strcmp(msgname, "CreateNodeInstance"))
+ return new nodeinstance::create::v1::CreateNodeInstance;
+ if (!strcmp(msgname, "ChartsAndDimensionsUpdated"))
+ return new chart::v1::ChartsAndDimensionsUpdated;
+ if (!strcmp(msgname, "ChartConfigsUpdated"))
+ return new chart::v1::ChartConfigsUpdated;
+ if (!strcmp(msgname, "ResetChartMessages"))
+ return new chart::v1::ResetChartMessages;
+ if (!strcmp(msgname, "RetentionUpdated"))
+ return new chart::v1::RetentionUpdated;
+ if (!strcmp(msgname, "UpdateNodeInfo"))
+ return new nodeinstance::info::v1::UpdateNodeInfo;
+ if (!strcmp(msgname, "AlarmLogHealth"))
+ return new alarms::v1::AlarmLogHealth;
+ if (!strcmp(msgname, "ProvideAlarmConfiguration"))
+ return new alarms::v1::ProvideAlarmConfiguration;
+ if (!strcmp(msgname, "AlarmSnapshot"))
+ return new alarms::v1::AlarmSnapshot;
+ if (!strcmp(msgname, "AlarmLogEntry"))
+ return new alarms::v1::AlarmLogEntry;
+ if (!strcmp(msgname, "UpdateNodeCollectors"))
+ return new nodeinstance::info::v1::UpdateNodeCollectors;
+ if (!strcmp(msgname, "ContextsUpdated"))
+ return new context::v1::ContextsUpdated;
+ if (!strcmp(msgname, "ContextsSnapshot"))
+ return new context::v1::ContextsSnapshot;
+
+//rx side
+ if (!strcmp(msgname, "CreateNodeInstanceResult"))
+ return new nodeinstance::create::v1::CreateNodeInstanceResult;
+ if (!strcmp(msgname, "SendNodeInstances"))
+ return new agent::v1::SendNodeInstances;
+ if (!strcmp(msgname, "StreamChartsAndDimensions"))
+ return new chart::v1::StreamChartsAndDimensions;
+ if (!strcmp(msgname, "ChartsAndDimensionsAck"))
+ return new chart::v1::ChartsAndDimensionsAck;
+ if (!strcmp(msgname, "UpdateChartConfigs"))
+ return new chart::v1::UpdateChartConfigs;
+ if (!strcmp(msgname, "StartAlarmStreaming"))
+ return new alarms::v1::StartAlarmStreaming;
+ if (!strcmp(msgname, "SendAlarmLogHealth"))
+ return new alarms::v1::SendAlarmLogHealth;
+ if (!strcmp(msgname, "SendAlarmConfiguration"))
+ return new alarms::v1::SendAlarmConfiguration;
+ if (!strcmp(msgname, "SendAlarmSnapshot"))
+ return new alarms::v1::SendAlarmSnapshot;
+ if (!strcmp(msgname, "DisconnectReq"))
+ return new agent::v1::DisconnectReq;
+ if (!strcmp(msgname, "ContextsCheckpoint"))
+ return new context::v1::ContextsCheckpoint;
+ if (!strcmp(msgname, "StopStreamingContexts"))
+ return new context::v1::StopStreamingContexts;
+
+ return NULL;
+}
+
+char *protomsg_to_json(const void *protobin, size_t len, const char *msgname)
+{
+ google::protobuf::Message *msg = msg_name_to_protomsg(msgname);
+ if (msg == NULL)
+ return strdupz("Don't know this message type by name.");
+
+ if (!msg->ParseFromArray(protobin, len))
+ return strdupz("Can't parse this message. Malformed or wrong parser used.");
+
+ JsonPrintOptions options;
+
+ std::string output;
+ google::protobuf::util::MessageToJsonString(*msg, &output, options);
+ delete msg;
+ return strdupz(output.c_str());
+}
diff --git a/aclk/schema-wrappers/proto_2_json.h b/aclk/schema-wrappers/proto_2_json.h
new file mode 100644
index 000000000..3bd98478c
--- /dev/null
+++ b/aclk/schema-wrappers/proto_2_json.h
@@ -0,0 +1,18 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef PROTO_2_JSON_H
+#define PROTO_2_JSON_H
+
+#include <sys/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+char *protomsg_to_json(const void *protobin, size_t len, const char *msgname);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* PROTO_2_JSON_H */
diff --git a/aclk/schema-wrappers/schema_wrapper_utils.cc b/aclk/schema-wrappers/schema_wrapper_utils.cc
index b100e20c3..6573e6299 100644
--- a/aclk/schema-wrappers/schema_wrapper_utils.cc
+++ b/aclk/schema-wrappers/schema_wrapper_utils.cc
@@ -13,3 +13,10 @@ void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, st
tv->tv_sec = ts.seconds();
tv->tv_usec = ts.nanos()/1000;
}
+
+int label_add_to_map_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ (void)ls;
+ auto map = (google::protobuf::Map<std::string, std::string> *)data;
+ map->insert({name, value});
+ return 1;
+}
diff --git a/aclk/schema-wrappers/schema_wrapper_utils.h b/aclk/schema-wrappers/schema_wrapper_utils.h
index 494855f82..2815d0f20 100644
--- a/aclk/schema-wrappers/schema_wrapper_utils.h
+++ b/aclk/schema-wrappers/schema_wrapper_utils.h
@@ -3,8 +3,11 @@
#ifndef SCHEMA_WRAPPER_UTILS_H
#define SCHEMA_WRAPPER_UTILS_H
+#include "database/rrd.h"
+
#include <sys/time.h>
#include <google/protobuf/timestamp.pb.h>
+#include <google/protobuf/map.h>
#if GOOGLE_PROTOBUF_VERSION < 3001000
#define PROTO_COMPAT_MSG_SIZE(msg) (size_t)msg.ByteSize();
@@ -16,5 +19,6 @@
void set_google_timestamp_from_timeval(struct timeval tv, google::protobuf::Timestamp *ts);
void set_timeval_from_google_timestamp(const google::protobuf::Timestamp &ts, struct timeval *tv);
+int label_add_to_map_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data);
#endif /* SCHEMA_WRAPPER_UTILS_H */
diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h
index a3248a69b..26412cacc 100644
--- a/aclk/schema-wrappers/schema_wrappers.h
+++ b/aclk/schema-wrappers/schema_wrappers.h
@@ -14,5 +14,7 @@
#include "alarm_stream.h"
#include "node_info.h"
#include "capability.h"
+#include "context_stream.h"
+#include "context.h"
#endif /* SCHEMA_WRAPPERS_H */