diff options
Diffstat (limited to 'aclk/aclk_query.c')
-rw-r--r-- | aclk/aclk_query.c | 130 |
1 files changed, 44 insertions, 86 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index de970fc3..981c0196 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -13,27 +13,6 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait) #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait) -typedef struct aclk_query_handler { - aclk_query_type_t type; - char *name; // for logging purposes - int(*fnc)(struct aclk_query_thread *query_thr, aclk_query_t query); -} aclk_query_handler; - -static int info_metadata(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - aclk_send_info_metadata(query_thr->client, - !query->data.metadata_info.initial_on_connect, - query->data.metadata_info.host); - return 0; -} - -static int alarms_metadata(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - aclk_send_alarm_metadata(query_thr->client, - !query->data.metadata_info.initial_on_connect); - return 0; -} - static usec_t aclk_web_api_v1_request(RRDHOST *host, struct web_client *w, char *url) { usec_t t; @@ -277,84 +256,63 @@ cleanup: return retval; } -static int chart_query(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - aclk_chart_msg(query_thr->client, query->data.chart_add_del.host, query->data.chart_add_del.chart_name); - return 0; -} - -static int alarm_state_update_query(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - aclk_alarm_state_msg(query_thr->client, query->data.alarm_update); - // aclk_alarm_state_msg frees the json object including the header it generates - query->data.alarm_update = NULL; - return 0; -} - -#ifdef ENABLE_NEW_CLOUD_PROTOCOL static int send_bin_msg(struct aclk_query_thread *query_thr, aclk_query_t query) { // this will be simplified when legacy support is removed aclk_send_bin_message_subtopic_pid(query_thr->client, query->data.bin_payload.payload, query->data.bin_payload.size, query->data.bin_payload.topic, query->data.bin_payload.msg_name); return 0; } -#endif - -aclk_query_handler aclk_query_handlers[] = { - { .type = HTTP_API_V2, .name = "http_api_request_v2", .fnc = http_api_v2 }, - { .type = ALARM_STATE_UPDATE, .name = "alarm_state_update", .fnc = alarm_state_update_query }, - { .type = METADATA_INFO, .name = "info_metadata", .fnc = info_metadata }, - { .type = METADATA_ALARMS, .name = "alarms_metadata", .fnc = alarms_metadata }, - { .type = CHART_NEW, .name = "chart_new", .fnc = chart_query }, - { .type = CHART_DEL, .name = "chart_delete", .fnc = info_metadata }, -#ifdef ENABLE_NEW_CLOUD_PROTOCOL - { .type = REGISTER_NODE, .name = "register_node", .fnc = send_bin_msg }, - { .type = NODE_STATE_UPDATE, .name = "node_state_update", .fnc = send_bin_msg }, - { .type = CHART_DIMS_UPDATE, .name = "chart_and_dim_update", .fnc = send_bin_msg }, - { .type = CHART_CONFIG_UPDATED, .name = "chart_config_updated", .fnc = send_bin_msg }, - { .type = CHART_RESET, .name = "reset_chart_messages", .fnc = send_bin_msg }, - { .type = RETENTION_UPDATED, .name = "update_retention_info", .fnc = send_bin_msg }, - { .type = UPDATE_NODE_INFO, .name = "update_node_info", .fnc = send_bin_msg }, - { .type = ALARM_LOG_HEALTH, .name = "alarm_log_health", .fnc = send_bin_msg }, - { .type = ALARM_PROVIDE_CFG, .name = "provide_alarm_config", .fnc = send_bin_msg }, - { .type = ALARM_SNAPSHOT, .name = "alarm_snapshot", .fnc = send_bin_msg }, -#endif - { .type = UNKNOWN, .name = NULL, .fnc = NULL } -}; const char *aclk_query_get_name(aclk_query_type_t qt) { - aclk_query_handler *ptr = aclk_query_handlers; - while (ptr->type != UNKNOWN) { - if (ptr->type == qt) - return ptr->name; - ptr++; + switch (qt) { + case HTTP_API_V2: return "http_api_request_v2"; + case REGISTER_NODE: return "register_node"; + case NODE_STATE_UPDATE: return "node_state_update"; + case CHART_DIMS_UPDATE: return "chart_and_dim_update"; + case CHART_CONFIG_UPDATED: return "chart_config_updated"; + case CHART_RESET: return "reset_chart_messages"; + case RETENTION_UPDATED: return "update_retention_info"; + case UPDATE_NODE_INFO: return "update_node_info"; + case ALARM_LOG_HEALTH: return "alarm_log_health"; + case ALARM_PROVIDE_CFG: return "provide_alarm_config"; + case ALARM_SNAPSHOT: return "alarm_snapshot"; + case UPDATE_NODE_COLLECTORS: return "update_node_collectors"; + case PROTO_BIN_MESSAGE: return "generic_binary_proto_message"; + default: + error_report("Unknown query type used %d", (int) qt); + return "unknown"; } - return "unknown"; } static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_query_t query) -{ - for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) { - if (aclk_query_handlers[i].type == query->type) { - worker_is_busy(i); - - debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name); - aclk_query_handlers[i].fnc(query_thr, query); - if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.queries_dispatched++; - aclk_queries_per_thread[query_thr->idx]++; - aclk_metrics_per_sample.queries_per_type[query->type]++; - ACLK_STATS_UNLOCK; - } - aclk_query_free(query); +{ + if (query->type == UNKNOWN || query->type >= ACLK_QUERY_TYPE_COUNT) { + error_report("Unknown query in query queue. %u", query->type); + aclk_query_free(query); + return; + } - worker_is_idle(); - return; - } + worker_is_busy(query->type); + if (query->type == HTTP_API_V2) { + debug(D_ACLK, "Processing Queued Message of type: \"http_api_request_v2\""); + http_api_v2(query_thr, query); + } else { + debug(D_ACLK, "Processing Queued Message of type: \"%s\"", query->data.bin_payload.msg_name); + send_bin_msg(query_thr, query); } - fatal("Unknown query in query queue. %u", query->type); + + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_metrics_per_sample.queries_dispatched++; + aclk_queries_per_thread[query_thr->idx]++; + aclk_metrics_per_sample.queries_per_type[query->type]++; + ACLK_STATS_UNLOCK; + } + + aclk_query_free(query); + + worker_is_idle(); } /* Processes messages from queue. Compete for work with other threads @@ -370,8 +328,8 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr) static void worker_aclk_register(void) { worker_register("ACLKQUERY"); - for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) { - worker_register_job_name(i, aclk_query_handlers[i].name); + for (int i = 1; i < ACLK_QUERY_TYPE_COUNT; i++) { + worker_register_job_name(i, aclk_query_get_name(i)); } } |