diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-02-06 16:11:30 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-02-06 16:11:30 +0000 |
commit | aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7 (patch) | |
tree | 941cbdd387b41c1a81587c20a6df9f0e5e0ff7ab /database/sqlite/sqlite_metadata.c | |
parent | Adding upstream version 1.37.1. (diff) | |
download | netdata-aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7.tar.xz netdata-aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7.zip |
Adding upstream version 1.38.0.upstream/1.38.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/sqlite/sqlite_metadata.c')
-rw-r--r-- | database/sqlite/sqlite_metadata.c | 428 |
1 files changed, 148 insertions, 280 deletions
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c index 4eb21215..35f928ff 100644 --- a/database/sqlite/sqlite_metadata.c +++ b/database/sqlite/sqlite_metadata.c @@ -4,9 +4,9 @@ // SQL statements -#define SQL_STORE_CLAIM_ID "insert into node_instance " \ - "(host_id, claim_id, date_created) values (@host_id, @claim_id, unixepoch()) " \ - "on conflict(host_id) do update set claim_id = excluded.claim_id;" +#define SQL_STORE_CLAIM_ID "INSERT INTO node_instance " \ + "(host_id, claim_id, date_created) VALUES (@host_id, @claim_id, unixepoch()) " \ + "ON CONFLICT(host_id) DO UPDATE SET claim_id = excluded.claim_id;" #define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;" @@ -56,24 +56,13 @@ #define MAX_METADATA_CLEANUP (500) // Maximum metadata write operations (e.g deletes before retrying) #define METADATA_MAX_BATCH_SIZE (512) // Maximum commands to execute before running the event loop -#define METADATA_MAX_TRANSACTION_BATCH (128) // Maximum commands to add in a transaction enum metadata_opcode { METADATA_DATABASE_NOOP = 0, METADATA_DATABASE_TIMER, - METADATA_ADD_CHART, - METADATA_ADD_CHART_LABEL, - METADATA_ADD_DIMENSION, METADATA_DEL_DIMENSION, - METADATA_ADD_DIMENSION_OPTION, - METADATA_ADD_HOST_SYSTEM_INFO, - METADATA_ADD_HOST_INFO, METADATA_STORE_CLAIM_ID, - METADATA_STORE_HOST_LABELS, - METADATA_STORE_BUFFER, - - METADATA_SKIP_TRANSACTION, // Dummy -- OPCODES less than this one can be in a tranasction - + METADATA_ADD_HOST_INFO, METADATA_SCAN_HOSTS, METADATA_MAINTENANCE, METADATA_SYNC_SHUTDOWN, @@ -105,14 +94,14 @@ typedef enum { struct metadata_wc { uv_thread_t thread; + uv_loop_t *loop; + uv_async_t async; + uv_timer_t timer_req; time_t check_metadata_after; time_t check_hosts_after; volatile unsigned queue_size; - uv_loop_t *loop; - uv_async_t async; METADATA_FLAG flags; uint64_t row_id; - uv_timer_t timer_req; struct completion init_complete; /* FIFO command queue */ uv_mutex_t cmd_mutex; @@ -339,7 +328,7 @@ static int sql_store_host_info(RRDHOST *host) if (unlikely(rc != SQLITE_OK)) goto bind_fail; - rc = sqlite3_bind_int(res, ++param, (int ) host->health_enabled); + rc = sqlite3_bind_int(res, ++param, (int ) host->health.health_enabled); if (unlikely(rc != SQLITE_OK)) goto bind_fail; @@ -383,7 +372,7 @@ static BUFFER *sql_store_host_system_info(RRDHOST *host) if (unlikely(!system_info)) return NULL; - BUFFER *work_buffer = buffer_create(1024); + BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); struct query_build key_data = {.sql = work_buffer, .count = 0}; uuid_unparse_lower(host->host_uuid, key_data.uuid_str); @@ -418,49 +407,6 @@ static BUFFER *sql_store_host_system_info(RRDHOST *host) /* - * Store set option for a dimension - */ -static int sql_set_dimension_option(uuid_t *dim_uuid, char *option) -{ - sqlite3_stmt *res = NULL; - int rc; - - if (unlikely(!db_meta)) { - if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) - return 0; - error_report("Database has not been initialized"); - return 1; - } - - rc = sqlite3_prepare_v2(db_meta, "UPDATE dimension SET options = @options WHERE dim_id = @dim_id", -1, &res, 0); - if (unlikely(rc != SQLITE_OK)) { - error_report("Failed to prepare statement to update dimension options"); - return 0; - }; - - rc = sqlite3_bind_blob(res, 2, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - if (!option || !strcmp(option,"unhide")) - rc = sqlite3_bind_null(res, 1); - else - rc = sqlite3_bind_text(res, 1, option, -1, SQLITE_STATIC); - if (unlikely(rc != SQLITE_OK)) - goto bind_fail; - - rc = execute_insert(res); - if (unlikely(rc != SQLITE_DONE)) - error_report("Failed to update dimension option, rc = %d", rc); - -bind_fail: - rc = sqlite3_finalize(res); - if (unlikely(rc != SQLITE_OK)) - error_report("Failed to finalize statement in update dimension options, rc = %d", rc); - return 0; -} - -/* * Store a chart in the database */ @@ -665,22 +611,26 @@ bind_fail: return 1; } -static bool dimension_can_be_deleted(uuid_t *dim_uuid) +static bool dimension_can_be_deleted(uuid_t *dim_uuid __maybe_unused) { #ifdef ENABLE_DBENGINE - bool no_retention = true; - for (size_t tier = 0; tier < storage_tiers; tier++) { - if (!multidb_ctx[tier]) - continue; - time_t first_time_t = 0, last_time_t = 0; - if (rrdeng_metric_retention_by_uuid((void *) multidb_ctx[tier], dim_uuid, &first_time_t, &last_time_t) == 0) { - if (first_time_t > 0) { - no_retention = false; - break; + if(dbengine_enabled) { + bool no_retention = true; + for (size_t tier = 0; tier < storage_tiers; tier++) { + if (!multidb_ctx[tier]) + continue; + time_t first_time_t = 0, last_time_t = 0; + if (rrdeng_metric_retention_by_uuid((void *) multidb_ctx[tier], dim_uuid, &first_time_t, &last_time_t)) { + if (first_time_t > 0) { + no_retention = false; + break; + } } } + return no_retention; } - return no_retention; + else + return false; #else return false; #endif @@ -736,6 +686,16 @@ skip_run: error_report("Failed to finalize the prepared statement when reading dimensions"); } +static void cleanup_health_log(void) +{ + RRDHOST *host; + dfe_start_reentrant(rrdhost_root_index, host) { + if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) + continue; + sql_health_alarm_log_cleanup(host); + } + dfe_done(host); +} // // EVENT LOOP STARTS HERE @@ -817,7 +777,7 @@ static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd) (void) uv_async_send(&wc->async); } -static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc, enum metadata_opcode *next_opcode) +static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc) { struct metadata_cmd ret; unsigned queue_size; @@ -828,7 +788,6 @@ static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc, enum metadat memset(&ret, 0, sizeof(ret)); ret.opcode = METADATA_DATABASE_NOOP; ret.completion = NULL; - *next_opcode = METADATA_DATABASE_NOOP; } else { /* dequeue command */ ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; @@ -840,10 +799,6 @@ static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc, enum metadat wc->cmd_queue.head + 1 : 0; } wc->queue_size = queue_size - 1; - if (wc->queue_size > 0) - *next_opcode = wc->cmd_queue.cmd_array[wc->cmd_queue.head].opcode; - else - *next_opcode = METADATA_DATABASE_NOOP; /* wake up producers */ uv_cond_signal(&wc->cmd_cond); } @@ -892,10 +847,16 @@ static void after_metadata_cleanup(uv_work_t *req, int status) struct metadata_wc *wc = req->data; metadata_flag_clear(wc, METADATA_FLAG_CLEANUP); } + static void start_metadata_cleanup(uv_work_t *req) { + register_libuv_worker_jobs(); + + worker_is_busy(UV_EVENT_METADATA_CLEANUP); struct metadata_wc *wc = req->data; check_dimension_metadata(wc); + cleanup_health_log(); + worker_is_idle(); } struct scan_metadata_payload { @@ -920,13 +881,13 @@ static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused) freez(data); } -static bool metadata_scan_host(RRDHOST *host, uint32_t max_count) { +static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_counter) { RRDSET *st; int rc; bool more_to_do = false; uint32_t scan_count = 1; - BUFFER *work_buffer = buffer_create(1024); + BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); rrdset_foreach_reentrant(st, host) { if (scan_count == max_count) { @@ -934,6 +895,8 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count) { break; } if(rrdset_flag_check(st, RRDSET_FLAG_METADATA_UPDATE)) { + (*query_counter)++; + rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE); scan_count++; @@ -963,8 +926,15 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count) { RRDDIM *rd; rrddim_foreach_read(rd, st) { if(rrddim_flag_check(rd, RRDDIM_FLAG_METADATA_UPDATE)) { + (*query_counter)++; + rrddim_flag_clear(rd, RRDDIM_FLAG_METADATA_UPDATE); + if (rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)) + rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN); + else + rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN); + rc = sql_store_dimension( &rd->metric_uuid, &rd->rrdset->chart_uuid, @@ -990,52 +960,119 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count) { // Worker thread to scan hosts for pending metadata to store static void start_metadata_hosts(uv_work_t *req __maybe_unused) { + register_libuv_worker_jobs(); + RRDHOST *host; struct scan_metadata_payload *data = req->data; struct metadata_wc *wc = data->wc; + usec_t all_started_ut = now_monotonic_usec(); (void)all_started_ut; + internal_error(true, "METADATA: checking all hosts..."); + bool run_again = false; + worker_is_busy(UV_EVENT_METADATA_STORE); + + if (!data->max_count) + db_execute("BEGIN TRANSACTION;"); dfe_start_reentrant(rrdhost_root_index, host) { if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE)) continue; - internal_error(true, "METADATA: Scanning host %s", rrdhost_hostname(host)); + + size_t query_counter = 0; (void)query_counter; + usec_t started_ut = now_monotonic_usec(); (void)started_ut; + rrdhost_flag_clear(host,RRDHOST_FLAG_METADATA_UPDATE); - if (unlikely(metadata_scan_host(host, data->max_count))) { + + if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_LABELS))) { + rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_LABELS); + int rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid); + if (likely(rc == SQLITE_OK)) { + BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite); + struct query_build tmp = {.sql = work_buffer, .count = 0}; + uuid_unparse_lower(host->host_uuid, tmp.uuid_str); + rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp); + db_execute(buffer_tostring(work_buffer)); + buffer_free(work_buffer); + query_counter++; + } + } + + if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_CLAIMID))) { + rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_CLAIMID); + uuid_t uuid; + + if (likely(host->aclk_state.claimed_id && !uuid_parse(host->aclk_state.claimed_id, uuid))) + store_claim_id(&host->host_uuid, &uuid); + else + store_claim_id(&host->host_uuid, NULL); + + query_counter++; + } + + if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))) { + rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO); + + BUFFER *work_buffer = sql_store_host_system_info(host); + if(work_buffer) { + db_execute(buffer_tostring(work_buffer)); + buffer_free(work_buffer); + query_counter++; + } + + int rc = sql_store_host_info(host); + if (unlikely(rc)) + error_report("METADATA: 'host:%s': failed to store host info", string2str(host->hostname)); + else + query_counter++; + } + + if (data->max_count) + db_execute("BEGIN TRANSACTION;"); + if (unlikely(metadata_scan_host(host, data->max_count, &query_counter))) { run_again = true; rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE); - internal_error(true,"METADATA: Rescheduling host %s to run; more charts to store", rrdhost_hostname(host)); + internal_error(true,"METADATA: 'host:%s': scheduling another run, more charts to store", rrdhost_hostname(host)); } + if (data->max_count) + db_execute("COMMIT TRANSACTION;"); + + usec_t ended_ut = now_monotonic_usec(); (void)ended_ut; + internal_error(true, "METADATA: 'host:%s': saved metadata with %zu SQL statements, in %0.2f ms", + rrdhost_hostname(host), query_counter, + (double)(ended_ut - started_ut) / USEC_PER_MS); } dfe_done(host); + if (!data->max_count) + db_execute("COMMIT TRANSACTION;"); + + usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut; + internal_error(true, "METADATA: checking all hosts completed in %0.2f ms", + (double)(all_ended_ut - all_started_ut) / USEC_PER_MS); + if (unlikely(run_again)) wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_IMMEDIATE; else wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_INTERVAL; + worker_is_idle(); } static void metadata_event_loop(void *arg) { + service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); worker_register("METASYNC"); worker_register_job_name(METADATA_DATABASE_NOOP, "noop"); worker_register_job_name(METADATA_DATABASE_TIMER, "timer"); - worker_register_job_name(METADATA_ADD_CHART, "add chart"); - worker_register_job_name(METADATA_ADD_CHART_LABEL, "add chart label"); - worker_register_job_name(METADATA_ADD_DIMENSION, "add dimension"); worker_register_job_name(METADATA_DEL_DIMENSION, "delete dimension"); - worker_register_job_name(METADATA_ADD_DIMENSION_OPTION, "dimension option"); - worker_register_job_name(METADATA_ADD_HOST_SYSTEM_INFO, "host system info"); - worker_register_job_name(METADATA_ADD_HOST_INFO, "host info"); worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id"); - worker_register_job_name(METADATA_STORE_HOST_LABELS, "host labels"); + worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info"); worker_register_job_name(METADATA_MAINTENANCE, "maintenance"); - int ret; uv_loop_t *loop; unsigned cmd_batch_size; struct metadata_wc *wc = arg; - enum metadata_opcode opcode, next_opcode; + enum metadata_opcode opcode; uv_work_t metadata_cleanup_worker; uv_thread_set_name_np(wc->thread, "METASYNC"); @@ -1073,20 +1110,12 @@ static void metadata_event_loop(void *arg) wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK; int shutdown = 0; - int in_transaction = 0; - int commands_in_transaction = 0; - // This can be used in the event loop for all opcodes (not workers) - BUFFER *work_buffer = buffer_create(1024); wc->row_id = 0; completion_mark_complete(&wc->init_complete); while (shutdown == 0 || (wc->flags & METADATA_WORKER_BUSY)) { - RRDDIM *rd = NULL; - RRDSET *st = NULL; - RRDHOST *host = NULL; - DICTIONARY_ITEM *dict_item = NULL; - BUFFER *buffer = NULL; uuid_t *uuid; + RRDHOST *host = NULL; int rc; worker_is_idle(); @@ -1098,7 +1127,7 @@ static void metadata_event_loop(void *arg) if (unlikely(cmd_batch_size >= METADATA_MAX_BATCH_SIZE)) break; - cmd = metadata_deq_cmd(wc, &next_opcode); + cmd = metadata_deq_cmd(wc); opcode = cmd.opcode; if (unlikely(opcode == METADATA_DATABASE_NOOP && metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) { @@ -1108,130 +1137,38 @@ static void metadata_event_loop(void *arg) ++cmd_batch_size; - // If we are not in transaction and this command is the same with the next ; start a transaction - if (!in_transaction && opcode < METADATA_SKIP_TRANSACTION && opcode == next_opcode) { - if (opcode != METADATA_DATABASE_NOOP) { - in_transaction = 1; - db_execute("BEGIN TRANSACTION;"); - } - } - - if (likely(in_transaction)) { - commands_in_transaction++; - } - if (likely(opcode != METADATA_DATABASE_NOOP)) - worker_is_busy(opcode); + worker_is_busy(opcode); switch (opcode) { case METADATA_DATABASE_NOOP: case METADATA_DATABASE_TIMER: break; - case METADATA_ADD_CHART: - dict_item = (DICTIONARY_ITEM * ) cmd.param[0]; - st = (RRDSET *) dictionary_acquired_item_value(dict_item); - - rc = sql_store_chart( - &st->chart_uuid, - &st->rrdhost->host_uuid, - string2str(st->parts.type), - string2str(st->parts.id), - string2str(st->parts.name), - rrdset_family(st), - rrdset_context(st), - rrdset_title(st), - rrdset_units(st), - rrdset_plugin_name(st), - rrdset_module_name(st), - st->priority, - st->update_every, - st->chart_type, - st->rrd_memory_mode, - st->entries); - - if (unlikely(rc)) - error_report("Failed to store chart %s", rrdset_id(st)); - - dictionary_acquired_item_release(st->rrdhost->rrdset_root_index, dict_item); - break; - case METADATA_ADD_CHART_LABEL: - dict_item = (DICTIONARY_ITEM * ) cmd.param[0]; - st = (RRDSET *) dictionary_acquired_item_value(dict_item); - check_and_update_chart_labels(st, work_buffer); - dictionary_acquired_item_release(st->rrdhost->rrdset_root_index, dict_item); - break; - case METADATA_ADD_DIMENSION: - dict_item = (DICTIONARY_ITEM * ) cmd.param[0]; - rd = (RRDDIM *) dictionary_acquired_item_value(dict_item); - - rc = sql_store_dimension( - &rd->metric_uuid, - &rd->rrdset->chart_uuid, - string2str(rd->id), - string2str(rd->name), - rd->multiplier, - rd->divisor, - rd->algorithm, - rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN)); - if (unlikely(rc)) - error_report("Failed to store dimension %s", rrddim_id(rd)); - - dictionary_acquired_item_release(rd->rrdset->rrddim_root_index, dict_item); - break; case METADATA_DEL_DIMENSION: uuid = (uuid_t *) cmd.param[0]; if (likely(dimension_can_be_deleted(uuid))) delete_dimension_uuid(uuid); freez(uuid); break; - case METADATA_ADD_DIMENSION_OPTION: - dict_item = (DICTIONARY_ITEM * ) cmd.param[0]; - rd = (RRDDIM *) dictionary_acquired_item_value(dict_item); - rc = sql_set_dimension_option( - &rd->metric_uuid, rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN) ? "hidden" : NULL); - if (unlikely(rc)) - error_report("Failed to store dimension option for %s", string2str(rd->id)); - dictionary_acquired_item_release(rd->rrdset->rrddim_root_index, dict_item); - break; - case METADATA_ADD_HOST_SYSTEM_INFO: - buffer = (BUFFER *) cmd.param[0]; - db_execute(buffer_tostring(buffer)); - buffer_free(buffer); - break; - case METADATA_ADD_HOST_INFO: - dict_item = (DICTIONARY_ITEM * ) cmd.param[0]; - host = (RRDHOST *) dictionary_acquired_item_value(dict_item); - rc = sql_store_host_info(host); - if (unlikely(rc)) - error_report("Failed to store host info in the database for %s", string2str(host->hostname)); - dictionary_acquired_item_release(rrdhost_root_index, dict_item); - break; case METADATA_STORE_CLAIM_ID: store_claim_id((uuid_t *) cmd.param[0], (uuid_t *) cmd.param[1]); freez((void *) cmd.param[0]); freez((void *) cmd.param[1]); break; - case METADATA_STORE_HOST_LABELS: - dict_item = (DICTIONARY_ITEM * ) cmd.param[0]; - host = (RRDHOST *) dictionary_acquired_item_value(dict_item); - rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid); - - if (likely(rc == SQLITE_OK)) { - buffer_flush(work_buffer); - struct query_build tmp = {.sql = work_buffer, .count = 0}; - uuid_unparse_lower(host->host_uuid, tmp.uuid_str); - rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp); - db_execute(buffer_tostring(work_buffer)); - } - - dictionary_acquired_item_release(rrdhost_root_index, dict_item); + case METADATA_ADD_HOST_INFO: + host = (RRDHOST *) cmd.param[0]; + rc = sql_store_host_info(host); + if (unlikely(rc)) + error_report("Failed to store host info in the database for %s", string2str(host->hostname)); break; - case METADATA_SCAN_HOSTS: if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SCANNING_HOSTS))) break; + if (unittest_running) + break; + struct scan_metadata_payload *data = mallocz(sizeof(*data)); data->request.data = data; data->wc = wc; @@ -1242,7 +1179,7 @@ static void metadata_event_loop(void *arg) cmd.completion = NULL; // Do not complete after launching worker (worker will do) } else - data->max_count = 1000; + data->max_count = 5000; metadata_flag_set(wc, METADATA_FLAG_SCANNING_HOSTS); if (unlikely( @@ -1255,11 +1192,6 @@ static void metadata_event_loop(void *arg) metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS); } break; - case METADATA_STORE_BUFFER: - buffer = (BUFFER *) cmd.param[0]; - db_execute(buffer_tostring(buffer)); - buffer_free(buffer); - break; case METADATA_MAINTENANCE: if (unlikely(metadata_flag_check(wc, METADATA_FLAG_CLEANUP))) break; @@ -1279,11 +1211,6 @@ static void metadata_event_loop(void *arg) default: break; } - if (in_transaction && (commands_in_transaction >= METADATA_MAX_TRANSACTION_BATCH || opcode != next_opcode)) { - in_transaction = 0; - db_execute("COMMIT TRANSACTION;"); - commands_in_transaction = 0; - } if (cmd.completion) completion_mark_complete(cmd.completion); @@ -1302,8 +1229,6 @@ static void metadata_event_loop(void *arg) uv_run(loop, UV_RUN_DEFAULT); uv_cond_destroy(&wc->cmd_cond); - /* uv_mutex_destroy(&wc->cmd_mutex); */ - //fatal_assert(0 == uv_loop_close(loop)); int rc; do { @@ -1313,7 +1238,6 @@ static void metadata_event_loop(void *arg) freez(loop); worker_unregister(); - buffer_free(work_buffer); info("METADATA: Shutting down event loop"); completion_mark_complete(&wc->init_complete); return; @@ -1408,50 +1332,6 @@ static inline void queue_metadata_cmd(enum metadata_opcode opcode, const void *p } // Public -void metaqueue_chart_update(RRDSET *st) -{ - const DICTIONARY_ITEM *acquired_st = dictionary_get_and_acquire_item(st->rrdhost->rrdset_root_index, string2str(st->id)); - queue_metadata_cmd(METADATA_ADD_CHART, acquired_st, NULL); -} - -// -// RD may not be collected, so we may store it needlessly -void metaqueue_dimension_update(RRDDIM *rd) -{ - const DICTIONARY_ITEM *acquired_rd = - dictionary_get_and_acquire_item(rd->rrdset->rrddim_root_index, string2str(rd->id)); - - if (unlikely(rrdset_flag_check(rd->rrdset, RRDSET_FLAG_METADATA_UPDATE))) { - metaqueue_chart_update(rd->rrdset); - rrdset_flag_clear(rd->rrdset, RRDSET_FLAG_METADATA_UPDATE); - } - - queue_metadata_cmd(METADATA_ADD_DIMENSION, acquired_rd, NULL); -} - -void metaqueue_dimension_update_flags(RRDDIM *rd) -{ - const DICTIONARY_ITEM *acquired_rd = - dictionary_get_and_acquire_item(rd->rrdset->rrddim_root_index, string2str(rd->id)); - queue_metadata_cmd(METADATA_ADD_DIMENSION_OPTION, acquired_rd, NULL); -} - -void metaqueue_host_update_system_info(RRDHOST *host) -{ - BUFFER *work_buffer = sql_store_host_system_info(host); - - if (unlikely(!work_buffer)) - return; - - queue_metadata_cmd(METADATA_ADD_HOST_SYSTEM_INFO, work_buffer, NULL); -} - -void metaqueue_host_update_info(const char *machine_guid) -{ - const DICTIONARY_ITEM *acquired_host = dictionary_get_and_acquire_item(rrdhost_root_index, machine_guid); - queue_metadata_cmd(METADATA_ADD_HOST_INFO, acquired_host, NULL); -} - void metaqueue_delete_dimension_uuid(uuid_t *uuid) { if (unlikely(!metasync_worker.loop)) @@ -1477,24 +1357,13 @@ void metaqueue_store_claim_id(uuid_t *host_uuid, uuid_t *claim_uuid) queue_metadata_cmd(METADATA_STORE_CLAIM_ID, local_host_uuid, local_claim_uuid); } -void metaqueue_store_host_labels(const char *machine_guid) +void metaqueue_host_update_info(RRDHOST *host) { - const DICTIONARY_ITEM *acquired_host = dictionary_get_and_acquire_item(rrdhost_root_index, machine_guid); - queue_metadata_cmd(METADATA_STORE_HOST_LABELS, acquired_host, NULL); -} - -void metaqueue_buffer(BUFFER *buffer) -{ - queue_metadata_cmd(METADATA_STORE_BUFFER, buffer, NULL); -} - -void metaqueue_chart_labels(RRDSET *st) -{ - const DICTIONARY_ITEM *acquired_st = dictionary_get_and_acquire_item(st->rrdhost->rrdset_root_index, string2str(st->id)); - queue_metadata_cmd(METADATA_ADD_CHART_LABEL, acquired_st, NULL); + if (unlikely(!metasync_worker.loop)) + return; + queue_metadata_cmd(METADATA_ADD_HOST_INFO, host, NULL); } - // // unitests // @@ -1542,7 +1411,7 @@ static void *metadata_unittest_threads(void) tu.join = 0; for (int i = 0; i < threads_to_create; i++) { char buf[100 + 1]; - snprintf(buf, 100, "meta%d", i); + snprintf(buf, 100, "META[%d]", i); netdata_thread_create( &threads[i], buf, @@ -1558,7 +1427,6 @@ static void *metadata_unittest_threads(void) void *retval; netdata_thread_join(threads[i], &retval); } -// uv_async_send(&metasync_worker.async); sleep_usec(5 * USEC_PER_SEC); fprintf(stderr, "Added %u elements, processed %u\n", tu.added, tu.processed); |