diff options
Diffstat (limited to 'aclk/schema-wrappers/chart_stream.cc')
-rw-r--r-- | aclk/schema-wrappers/chart_stream.cc | 337 |
1 files changed, 0 insertions, 337 deletions
diff --git a/aclk/schema-wrappers/chart_stream.cc b/aclk/schema-wrappers/chart_stream.cc deleted file mode 100644 index 54c940758..000000000 --- a/aclk/schema-wrappers/chart_stream.cc +++ /dev/null @@ -1,337 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "aclk/aclk_util.h" - -#include "proto/chart/v1/stream.pb.h" -#include "chart_stream.h" - -#include "schema_wrapper_utils.h" - -#include <sys/time.h> -#include <stdlib.h> - -stream_charts_and_dims_t parse_stream_charts_and_dims(const char *data, size_t len) -{ - chart::v1::StreamChartsAndDimensions msg; - stream_charts_and_dims_t res; - memset(&res, 0, sizeof(res)); - - if (!msg.ParseFromArray(data, len)) - return res; - - res.node_id = strdup(msg.node_id().c_str()); - res.claim_id = strdup(msg.claim_id().c_str()); - res.seq_id = msg.sequence_id(); - res.batch_id = msg.batch_id(); - set_timeval_from_google_timestamp(msg.seq_id_created_at(), &res.seq_id_created_at); - - return res; -} - -chart_and_dim_ack_t parse_chart_and_dimensions_ack(const char *data, size_t len) -{ - chart::v1::ChartsAndDimensionsAck msg; - chart_and_dim_ack_t res = { .claim_id = NULL, .node_id = NULL, .last_seq_id = 0 }; - - if (!msg.ParseFromArray(data, len)) - return res; - - res.node_id = strdup(msg.node_id().c_str()); - res.claim_id = strdup(msg.claim_id().c_str()); - res.last_seq_id = msg.last_sequence_id(); - - return res; -} - -char *generate_reset_chart_messages(size_t *len, chart_reset_t reset) -{ - chart::v1::ResetChartMessages msg; - - msg.set_claim_id(reset.claim_id); - msg.set_node_id(reset.node_id); - switch (reset.reason) { - case DB_EMPTY: - msg.set_reason(chart::v1::ResetReason::DB_EMPTY); - break; - case SEQ_ID_NOT_EXISTS: - msg.set_reason(chart::v1::ResetReason::SEQ_ID_NOT_EXISTS); - break; - case TIMESTAMP_MISMATCH: - msg.set_reason(chart::v1::ResetReason::TIMESTAMP_MISMATCH); - break; - default: - return NULL; - } - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)malloc(*len); - if (bin) - msg.SerializeToArray(bin, *len); - - return bin; -} - -void chart_instance_updated_destroy(struct chart_instance_updated *instance) -{ - freez((char*)instance->id); - freez((char*)instance->claim_id); - - rrdlabels_destroy(instance->chart_labels); - - freez((char*)instance->config_hash); -} - -static int set_chart_instance_updated(chart::v1::ChartInstanceUpdated *chart, const struct chart_instance_updated *update) -{ - google::protobuf::Map<std::string, std::string> *map; - aclk_lib::v1::ACLKMessagePosition *pos; - - chart->set_id(update->id); - chart->set_claim_id(update->claim_id); - chart->set_node_id(update->node_id); - chart->set_name(update->name); - - map = chart->mutable_chart_labels(); - rrdlabels_walkthrough_read(update->chart_labels, label_add_to_map_callback, map); - - switch (update->memory_mode) { - case RRD_MEMORY_MODE_NONE: - chart->set_memory_mode(chart::v1::NONE); - break; - case RRD_MEMORY_MODE_RAM: - chart->set_memory_mode(chart::v1::RAM); - break; - case RRD_MEMORY_MODE_MAP: - chart->set_memory_mode(chart::v1::MAP); - break; - case RRD_MEMORY_MODE_SAVE: - chart->set_memory_mode(chart::v1::SAVE); - break; - case RRD_MEMORY_MODE_ALLOC: - chart->set_memory_mode(chart::v1::ALLOC); - break; - case RRD_MEMORY_MODE_DBENGINE: - chart->set_memory_mode(chart::v1::DB_ENGINE); - break; - default: - return 1; - break; - } - - chart->set_update_every_interval(update->update_every); - chart->set_config_hash(update->config_hash); - - pos = chart->mutable_position(); - pos->set_sequence_id(update->position.sequence_id); - pos->set_previous_sequence_id(update->position.previous_sequence_id); - set_google_timestamp_from_timeval(update->position.seq_id_creation_time, pos->mutable_seq_id_created_at()); - - return 0; -} - -static int set_chart_dim_updated(chart::v1::ChartDimensionUpdated *dim, const struct chart_dimension_updated *c_dim) -{ - aclk_lib::v1::ACLKMessagePosition *pos; - - dim->set_id(c_dim->id); - dim->set_chart_id(c_dim->chart_id); - dim->set_node_id(c_dim->node_id); - dim->set_claim_id(c_dim->claim_id); - dim->set_name(c_dim->name); - - set_google_timestamp_from_timeval(c_dim->created_at, dim->mutable_created_at()); - set_google_timestamp_from_timeval(c_dim->last_timestamp, dim->mutable_last_timestamp()); - - pos = dim->mutable_position(); - pos->set_sequence_id(c_dim->position.sequence_id); - pos->set_previous_sequence_id(c_dim->position.previous_sequence_id); - set_google_timestamp_from_timeval(c_dim->position.seq_id_creation_time, pos->mutable_seq_id_created_at()); - - return 0; -} - -char *generate_charts_and_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, int *is_dim, struct aclk_message_position *new_positions, uint64_t batch_id) -{ - chart::v1::ChartsAndDimensionsUpdated msg; - chart::v1::ChartInstanceUpdated db_chart; - chart::v1::ChartDimensionUpdated db_dim; - aclk_lib::v1::ACLKMessagePosition *pos; - - msg.set_batch_id(batch_id); - - for (int i = 0; payloads[i]; i++) { - if (is_dim[i]) { - if (!db_dim.ParseFromArray(payloads[i], payload_sizes[i])) { - error("[ACLK] Could not parse chart::v1::chart_dimension_updated"); - return NULL; - } - - pos = db_dim.mutable_position(); - pos->set_sequence_id(new_positions[i].sequence_id); - pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); - set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); - - chart::v1::ChartDimensionUpdated *dim = msg.add_dimensions(); - *dim = db_dim; - } else { - if (!db_chart.ParseFromArray(payloads[i], payload_sizes[i])) { - error("[ACLK] Could not parse chart::v1::ChartInstanceUpdated"); - return NULL; - } - - pos = db_chart.mutable_position(); - pos->set_sequence_id(new_positions[i].sequence_id); - pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); - set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); - - chart::v1::ChartInstanceUpdated *chart = msg.add_charts(); - *chart = db_chart; - } - } - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - msg.SerializeToArray(bin, *len); - - return bin; -} - -char *generate_charts_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) -{ - chart::v1::ChartsAndDimensionsUpdated msg; - - msg.set_batch_id(chart_batch_id); - - for (int i = 0; payloads[i]; i++) { - chart::v1::ChartInstanceUpdated db_msg; - chart::v1::ChartInstanceUpdated *chart; - aclk_lib::v1::ACLKMessagePosition *pos; - - if (!db_msg.ParseFromArray(payloads[i], payload_sizes[i])) { - error("[ACLK] Could not parse chart::v1::ChartInstanceUpdated"); - return NULL; - } - - pos = db_msg.mutable_position(); - pos->set_sequence_id(new_positions[i].sequence_id); - pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); - set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); - - chart = msg.add_charts(); - *chart = db_msg; - } - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - msg.SerializeToArray(bin, *len); - - return bin; -} - -char *generate_chart_dimensions_updated(size_t *len, char **payloads, size_t *payload_sizes, struct aclk_message_position *new_positions) -{ - chart::v1::ChartsAndDimensionsUpdated msg; - - msg.set_batch_id(chart_batch_id); - - for (int i = 0; payloads[i]; i++) { - chart::v1::ChartDimensionUpdated db_msg; - chart::v1::ChartDimensionUpdated *dim; - aclk_lib::v1::ACLKMessagePosition *pos; - - if (!db_msg.ParseFromArray(payloads[i], payload_sizes[i])) { - error("[ACLK] Could not parse chart::v1::chart_dimension_updated"); - return NULL; - } - - pos = db_msg.mutable_position(); - pos->set_sequence_id(new_positions[i].sequence_id); - pos->set_previous_sequence_id(new_positions[i].previous_sequence_id); - set_google_timestamp_from_timeval(new_positions[i].seq_id_creation_time, pos->mutable_seq_id_created_at()); - - dim = msg.add_dimensions(); - *dim = db_msg; - } - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - msg.SerializeToArray(bin, *len); - - return bin; -} - -char *generate_chart_instance_updated(size_t *len, const struct chart_instance_updated *update) -{ - chart::v1::ChartInstanceUpdated *chart = new chart::v1::ChartInstanceUpdated(); - - if (set_chart_instance_updated(chart, update)) - return NULL; - - *len = PROTO_COMPAT_MSG_SIZE_PTR(chart); - char *bin = (char*)mallocz(*len); - chart->SerializeToArray(bin, *len); - - delete chart; - return bin; -} - -char *generate_chart_dimension_updated(size_t *len, const struct chart_dimension_updated *dim) -{ - chart::v1::ChartDimensionUpdated *proto_dim = new chart::v1::ChartDimensionUpdated(); - - if (set_chart_dim_updated(proto_dim, dim)) - return NULL; - - *len = PROTO_COMPAT_MSG_SIZE_PTR(proto_dim); - char *bin = (char*)mallocz(*len); - proto_dim->SerializeToArray(bin, *len); - - delete proto_dim; - return bin; -} - -using namespace google::protobuf; - -char *generate_retention_updated(size_t *len, struct retention_updated *data) -{ - chart::v1::RetentionUpdated msg; - - msg.set_claim_id(data->claim_id); - msg.set_node_id(data->node_id); - - switch (data->memory_mode) { - case RRD_MEMORY_MODE_NONE: - msg.set_memory_mode(chart::v1::NONE); - break; - case RRD_MEMORY_MODE_RAM: - msg.set_memory_mode(chart::v1::RAM); - break; - case RRD_MEMORY_MODE_MAP: - msg.set_memory_mode(chart::v1::MAP); - break; - case RRD_MEMORY_MODE_SAVE: - msg.set_memory_mode(chart::v1::SAVE); - break; - case RRD_MEMORY_MODE_ALLOC: - msg.set_memory_mode(chart::v1::ALLOC); - break; - case RRD_MEMORY_MODE_DBENGINE: - msg.set_memory_mode(chart::v1::DB_ENGINE); - break; - default: - return NULL; - } - - for (int i = 0; i < data->interval_duration_count; i++) { - Map<uint32, uint32> *map = msg.mutable_interval_durations(); - map->insert({data->interval_durations[i].update_every, data->interval_durations[i].retention}); - } - - set_google_timestamp_from_timeval(data->rotation_timestamp, msg.mutable_rotation_timestamp()); - - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - msg.SerializeToArray(bin, *len); - - return bin; -} |