summaryrefslogtreecommitdiffstats
path: root/aclk/aclk_query.c
diff options
context:
space:
mode:
Diffstat (limited to 'aclk/aclk_query.c')
-rw-r--r--aclk/aclk_query.c130
1 files changed, 44 insertions, 86 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index de970fc3..981c0196 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -13,27 +13,6 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
-typedef struct aclk_query_handler {
- aclk_query_type_t type;
- char *name; // for logging purposes
- int(*fnc)(struct aclk_query_thread *query_thr, aclk_query_t query);
-} aclk_query_handler;
-
-static int info_metadata(struct aclk_query_thread *query_thr, aclk_query_t query)
-{
- aclk_send_info_metadata(query_thr->client,
- !query->data.metadata_info.initial_on_connect,
- query->data.metadata_info.host);
- return 0;
-}
-
-static int alarms_metadata(struct aclk_query_thread *query_thr, aclk_query_t query)
-{
- aclk_send_alarm_metadata(query_thr->client,
- !query->data.metadata_info.initial_on_connect);
- return 0;
-}
-
static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char *url)
{
usec_t t;
@@ -277,84 +256,63 @@ cleanup:
return retval;
}
-static int chart_query(struct aclk_query_thread *query_thr, aclk_query_t query)
-{
- aclk_chart_msg(query_thr->client, query->data.chart_add_del.host, query->data.chart_add_del.chart_name);
- return 0;
-}
-
-static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_query_t query)
-{
- aclk_alarm_state_msg(query_thr->client, query->data.alarm_update);
- // aclk_alarm_state_msg frees the json object including the header it generates
- query->data.alarm_update = NULL;
- return 0;
-}
-
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
{
// this will be simplified when legacy support is removed
aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name);
return 0;
}
-#endif
-
-aclk_query_handler aclk_query_handlers[] = {
- { .type = HTTP_API_V2, .name = "http_api_request_v2", .fnc = http_api_v2 },
- { .type = ALARM_STATE_UPDATE, .name = "alarm_state_update", .fnc = alarm_state_update_query },
- { .type = METADATA_INFO, .name = "info_metadata", .fnc = info_metadata },
- { .type = METADATA_ALARMS, .name = "alarms_metadata", .fnc = alarms_metadata },
- { .type = CHART_NEW, .name = "chart_new", .fnc = chart_query },
- { .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata },
-#ifdef ENABLE_NEW_CLOUD_PROTOCOL
- { .type = REGISTER_NODE, .name = "register_node", .fnc = send_bin_msg },
- { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = send_bin_msg },
- { .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg },
- { .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg },
- { .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg },
- { .type = RETENTION_UPDATED, .name = "update_retention_info", .fnc = send_bin_msg },
- { .type = UPDATE_NODE_INFO, .name = "update_node_info", .fnc = send_bin_msg },
- { .type = ALARM_LOG_HEALTH, .name = "alarm_log_health", .fnc = send_bin_msg },
- { .type = ALARM_PROVIDE_CFG, .name = "provide_alarm_config", .fnc = send_bin_msg },
- { .type = ALARM_SNAPSHOT, .name = "alarm_snapshot", .fnc = send_bin_msg },
-#endif
- { .type = UNKNOWN, .name = NULL, .fnc = NULL }
-};
const char *aclk_query_get_name(aclk_query_type_t qt)
{
- aclk_query_handler *ptr = aclk_query_handlers;
- while (ptr->type != UNKNOWN) {
- if (ptr->type == qt)
- return ptr->name;
- ptr++;
+ switch (qt) {
+ case HTTP_API_V2: return "http_api_request_v2";
+ case REGISTER_NODE: return "register_node";
+ case NODE_STATE_UPDATE: return "node_state_update";
+ case CHART_DIMS_UPDATE: return "chart_and_dim_update";
+ case CHART_CONFIG_UPDATED: return "chart_config_updated";
+ case CHART_RESET: return "reset_chart_messages";
+ case RETENTION_UPDATED: return "update_retention_info";
+ case UPDATE_NODE_INFO: return "update_node_info";
+ case ALARM_LOG_HEALTH: return "alarm_log_health";
+ case ALARM_PROVIDE_CFG: return "provide_alarm_config";
+ case ALARM_SNAPSHOT: return "alarm_snapshot";
+ case UPDATE_NODE_COLLECTORS: return "update_node_collectors";
+ case PROTO_BIN_MESSAGE: return "generic_binary_proto_message";
+ default:
+ error_report("Unknown query type used %d", (int) qt);
+ return "unknown";
}
- return "unknown";
}
static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query)
-{
- for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
- if (aclk_query_handlers[i].type == query->type) {
- worker_is_busy(i);
-
- debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name);
- aclk_query_handlers[i].fnc(query_thr, query);
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.queries_dispatched++;
- aclk_queries_per_thread[query_thr->idx]++;
- aclk_metrics_per_sample.queries_per_type[query->type]++;
- ACLK_STATS_UNLOCK;
- }
- aclk_query_free(query);
+{
+ if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) {
+ error_report("Unknown query in query queue. %u", query->type);
+ aclk_query_free(query);
+ return;
+ }
- worker_is_idle();
- return;
- }
+ worker_is_busy(query->type);
+ if (query->type == HTTP_API_V2) {
+ debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\"");
+ http_api_v2(query_thr, query);
+ } else {
+ debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name);
+ send_bin_msg(query_thr, query);
}
- fatal("Unknown query in query queue. %u", query->type);
+
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ aclk_metrics_per_sample.queries_dispatched++;
+ aclk_queries_per_thread[query_thr->idx]++;
+ aclk_metrics_per_sample.queries_per_type[query->type]++;
+ ACLK_STATS_UNLOCK;
+ }
+
+ aclk_query_free(query);
+
+ worker_is_idle();
}
/* Processes messages from queue. Compete for work with other threads
@@ -370,8 +328,8 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr)
static void worker_aclk_register(void) {
worker_register("ACLKQUERY");
- for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
- worker_register_job_name(i, aclk_query_handlers[i].name);
+ for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) {
+ worker_register_job_name(i, aclk_query_get_name(i));
}
}