diff options
Diffstat (limited to 'aclk/schema-wrappers')
-rw-r--r-- | aclk/schema-wrappers/agent_cmds.cc | 38 | ||||
-rw-r--r-- | aclk/schema-wrappers/agent_cmds.h | 27 | ||||
-rw-r--r-- | aclk/schema-wrappers/alarm_stream.cc | 80 | ||||
-rw-r--r-- | aclk/schema-wrappers/alarm_stream.h | 47 | ||||
-rw-r--r-- | aclk/schema-wrappers/proto_2_json.cc | 11 | ||||
-rw-r--r-- | aclk/schema-wrappers/schema_wrappers.h | 1 |
6 files changed, 122 insertions, 82 deletions
diff --git a/aclk/schema-wrappers/agent_cmds.cc b/aclk/schema-wrappers/agent_cmds.cc new file mode 100644 index 00000000..6950f402 --- /dev/null +++ b/aclk/schema-wrappers/agent_cmds.cc @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "proto/agent/v1/cmds.pb.h" + +#include "agent_cmds.h" + +#include "schema_wrapper_utils.h" + +using namespace agent::v1; + +int parse_cancel_pending_req(const char *msg, size_t msg_len, struct aclk_cancel_pending_req *req) +{ + CancelPendingRequest msg_parsed; + + if (!msg_parsed.ParseFromArray(msg, msg_len)) { + error_report("Failed to parse CancelPendingRequest message"); + return 1; + } + + if (msg_parsed.request_id().c_str() == NULL) { + error_report("CancelPendingRequest message missing request_id"); + return 1; + } + req->request_id = strdupz(msg_parsed.request_id().c_str()); + + if (msg_parsed.trace_id().c_str()) + req->trace_id = strdupz(msg_parsed.trace_id().c_str()); + + set_timeval_from_google_timestamp(msg_parsed.timestamp(), &req->timestamp); + + return 0; +} + +void free_cancel_pending_req(struct aclk_cancel_pending_req *req) +{ + freez(req->request_id); + freez(req->trace_id); +} diff --git a/aclk/schema-wrappers/agent_cmds.h b/aclk/schema-wrappers/agent_cmds.h new file mode 100644 index 00000000..7e01f86c --- /dev/null +++ b/aclk/schema-wrappers/agent_cmds.h @@ -0,0 +1,27 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H +#define ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H + +#include "libnetdata/libnetdata.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct aclk_cancel_pending_req { + char *request_id; + + struct timeval timestamp; + + char *trace_id; +}; + +int parse_cancel_pending_req(const char *msg, size_t msg_len, struct aclk_cancel_pending_req *req); +void free_cancel_pending_req(struct aclk_cancel_pending_req *req); + +#ifdef __cplusplus +} +#endif + +#endif /* ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H */ diff --git a/aclk/schema-wrappers/alarm_stream.cc b/aclk/schema-wrappers/alarm_stream.cc index f6439330..af0b891c 100644 --- a/aclk/schema-wrappers/alarm_stream.cc +++ b/aclk/schema-wrappers/alarm_stream.cc @@ -21,57 +21,24 @@ struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_ return ret; ret.node_id = strdupz(msg.node_id().c_str()); - ret.batch_id = msg.batch_id(); - ret.start_seq_id = msg.start_sequnce_id(); + ret.resets = msg.resets(); return ret; } -char *parse_send_alarm_log_health(const char *data, size_t len) +struct send_alarm_checkpoint parse_send_alarm_checkpoint(const char *data, size_t len) { - SendAlarmLogHealth msg; - if (!msg.ParseFromArray(data, len)) - return NULL; - return strdupz(msg.node_id().c_str()); -} - -char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data) -{ - AlarmLogHealth msg; - LogEntries *entries; - - msg.set_claim_id(data->claim_id); - msg.set_node_id(data->node_id); - msg.set_enabled(data->enabled); - - switch (data->status) { - case alarm_log_status_aclk::ALARM_LOG_STATUS_IDLE: - msg.set_status(alarms::v1::ALARM_LOG_STATUS_IDLE); - break; - case alarm_log_status_aclk::ALARM_LOG_STATUS_RUNNING: - msg.set_status(alarms::v1::ALARM_LOG_STATUS_RUNNING); - break; - case alarm_log_status_aclk::ALARM_LOG_STATUS_UNSPECIFIED: - msg.set_status(alarms::v1::ALARM_LOG_STATUS_UNSPECIFIED); - break; - default: - error("Unknown status of AlarmLogHealth LogEntry"); - return NULL; - } - - entries = msg.mutable_log_entries(); - entries->set_first_sequence_id(data->log_entries.first_seq_id); - entries->set_last_sequence_id(data->log_entries.last_seq_id); + struct send_alarm_checkpoint ret; + memset(&ret, 0, sizeof(ret)); - set_google_timestamp_from_timeval(data->log_entries.first_when, entries->mutable_first_when()); - set_google_timestamp_from_timeval(data->log_entries.last_when, entries->mutable_last_when()); + SendAlarmCheckpoint msg; + if (!msg.ParseFromArray(data, len)) + return ret; - *len = PROTO_COMPAT_MSG_SIZE(msg); - char *bin = (char*)mallocz(*len); - if (!msg.SerializeToArray(bin, *len)) - return NULL; + ret.node_id = strdupz(msg.node_id().c_str()); + ret.claim_id = strdupz(msg.claim_id().c_str()); - return bin; + return ret; } static alarms::v1::AlarmStatus aclk_alarm_status_to_proto(enum aclk_alarm_status status) @@ -131,8 +98,6 @@ static void fill_alarm_log_entry(struct alarm_log_entry *data, AlarmLogEntry *pr if (data->family) proto->set_family(data->family); - proto->set_batch_id(data->batch_id); - proto->set_sequence_id(data->sequence_id); proto->set_when(data->when); proto->set_config_hash(data->config_hash); @@ -187,6 +152,24 @@ char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data) return bin; } +char *generate_alarm_checkpoint(size_t *len, struct alarm_checkpoint *data) +{ + AlarmCheckpoint msg; + + msg.set_claim_id(data->claim_id); + msg.set_node_id(data->node_id); + msg.set_checksum(data->checksum); + + *len = PROTO_COMPAT_MSG_SIZE(msg); + char *bin = (char*)mallocz(*len); + if (!msg.SerializeToArray(bin, *len)) { + freez(bin); + return NULL; + } + + return bin; +} + struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len) { SendAlarmSnapshot msg; @@ -198,8 +181,8 @@ struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t l ret->claim_id = strdupz(msg.claim_id().c_str()); if (msg.node_id().c_str()) ret->node_id = strdupz(msg.node_id().c_str()); - ret->snapshot_id = msg.snapshot_id(); - ret->sequence_id = msg.sequence_id(); + if (msg.snapshot_uuid().c_str()) + ret->snapshot_uuid = strdupz(msg.snapshot_uuid().c_str()); return ret; } @@ -208,6 +191,7 @@ void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr) { freez(ptr->claim_id); freez(ptr->node_id); + freez(ptr->snapshot_uuid); freez(ptr); } @@ -218,7 +202,7 @@ alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot * msg->set_node_id(data->node_id); msg->set_claim_id(data->claim_id); - msg->set_snapshot_id(data->snapshot_id); + msg->set_snapshot_uuid(data->snapshot_uuid); msg->set_chunks(data->chunks); msg->set_chunk(data->chunk); diff --git a/aclk/schema-wrappers/alarm_stream.h b/aclk/schema-wrappers/alarm_stream.h index 63911da3..83e7c1bc 100644 --- a/aclk/schema-wrappers/alarm_stream.h +++ b/aclk/schema-wrappers/alarm_stream.h @@ -11,38 +11,12 @@ extern "C" { #endif -enum alarm_log_status_aclk { - ALARM_LOG_STATUS_UNSPECIFIED = 0, - ALARM_LOG_STATUS_RUNNING = 1, - ALARM_LOG_STATUS_IDLE = 2 -}; - -struct alarm_log_entries { - int64_t first_seq_id; - struct timeval first_when; - - int64_t last_seq_id; - struct timeval last_when; -}; - -struct alarm_log_health { - char *claim_id; - char *node_id; - int enabled; - enum alarm_log_status_aclk status; - struct alarm_log_entries log_entries; -}; - struct start_alarm_streaming { char *node_id; - uint64_t batch_id; - uint64_t start_seq_id; + bool resets; }; struct start_alarm_streaming parse_start_alarm_streaming(const char *data, size_t len); -char *parse_send_alarm_log_health(const char *data, size_t len); - -char *generate_alarm_log_health(size_t *len, struct alarm_log_health *data); enum aclk_alarm_status { ALARM_STATUS_NULL = 0, @@ -101,17 +75,27 @@ struct alarm_log_entry { char *chart_context; }; +struct send_alarm_checkpoint { + char *node_id; + char *claim_id; +}; + +struct alarm_checkpoint { + char *node_id; + char *claim_id; + char *checksum; +}; + struct send_alarm_snapshot { char *node_id; char *claim_id; - uint64_t snapshot_id; - uint64_t sequence_id; + char *snapshot_uuid; }; struct alarm_snapshot { char *node_id; char *claim_id; - uint64_t snapshot_id; + char *snapshot_uuid; uint32_t chunks; uint32_t chunk; }; @@ -125,6 +109,9 @@ char *generate_alarm_log_entry(size_t *len, struct alarm_log_entry *data); struct send_alarm_snapshot *parse_send_alarm_snapshot(const char *data, size_t len); void destroy_send_alarm_snapshot(struct send_alarm_snapshot *ptr); +struct send_alarm_checkpoint parse_send_alarm_checkpoint(const char *data, size_t len); +char *generate_alarm_checkpoint(size_t *len, struct alarm_checkpoint *data); + alarm_snapshot_proto_ptr_t generate_alarm_snapshot_proto(struct alarm_snapshot *data); void add_alarm_log_entry2snapshot(alarm_snapshot_proto_ptr_t snapshot, struct alarm_log_entry *data); char *generate_alarm_snapshot_bin(size_t *len, alarm_snapshot_proto_ptr_t snapshot); diff --git a/aclk/schema-wrappers/proto_2_json.cc b/aclk/schema-wrappers/proto_2_json.cc index 8853b2e0..85439651 100644 --- a/aclk/schema-wrappers/proto_2_json.cc +++ b/aclk/schema-wrappers/proto_2_json.cc @@ -11,6 +11,7 @@ #include "proto/nodeinstance/info/v1/info.pb.h" #include "proto/context/v1/stream.pb.h" #include "proto/context/v1/context.pb.h" +#include "proto/agent/v1/cmds.pb.h" #include "libnetdata/libnetdata.h" @@ -29,8 +30,8 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new nodeinstance::create::v1::CreateNodeInstance; if (!strcmp(msgname, "UpdateNodeInfo")) return new nodeinstance::info::v1::UpdateNodeInfo; - if (!strcmp(msgname, "AlarmLogHealth")) - return new alarms::v1::AlarmLogHealth; + if (!strcmp(msgname, "AlarmCheckpoint")) + return new alarms::v1::AlarmCheckpoint; if (!strcmp(msgname, "ProvideAlarmConfiguration")) return new alarms::v1::ProvideAlarmConfiguration; if (!strcmp(msgname, "AlarmSnapshot")) @@ -51,8 +52,8 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new agent::v1::SendNodeInstances; if (!strcmp(msgname, "StartAlarmStreaming")) return new alarms::v1::StartAlarmStreaming; - if (!strcmp(msgname, "SendAlarmLogHealth")) - return new alarms::v1::SendAlarmLogHealth; + if (!strcmp(msgname, "SendAlarmCheckpoint")) + return new alarms::v1::SendAlarmCheckpoint; if (!strcmp(msgname, "SendAlarmConfiguration")) return new alarms::v1::SendAlarmConfiguration; if (!strcmp(msgname, "SendAlarmSnapshot")) @@ -63,6 +64,8 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname) return new context::v1::ContextsCheckpoint; if (!strcmp(msgname, "StopStreamingContexts")) return new context::v1::StopStreamingContexts; + if (!strcmp(msgname, "CancelPendingRequest")) + return new agent::v1::CancelPendingRequest; return NULL; } diff --git a/aclk/schema-wrappers/schema_wrappers.h b/aclk/schema-wrappers/schema_wrappers.h index a96f7ea7..b651b884 100644 --- a/aclk/schema-wrappers/schema_wrappers.h +++ b/aclk/schema-wrappers/schema_wrappers.h @@ -14,5 +14,6 @@ #include "capability.h" #include "context_stream.h" #include "context.h" +#include "agent_cmds.h" #endif /* SCHEMA_WRAPPERS_H */ |