summaryrefslogtreecommitdiffstats
path: root/database/sqlite/sqlite_metadata.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:30 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2023-02-06 16:11:30 +0000
commitaa2fe8ccbfcb117efa207d10229eeeac5d0f97c7 (patch)
tree941cbdd387b41c1a81587c20a6df9f0e5e0ff7ab /database/sqlite/sqlite_metadata.c
parentAdding upstream version 1.37.1. (diff)
downloadnetdata-aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7.tar.xz
netdata-aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7.zip
Adding upstream version 1.38.0.upstream/1.38.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/sqlite/sqlite_metadata.c')
-rw-r--r--database/sqlite/sqlite_metadata.c428
1 files changed, 148 insertions, 280 deletions
diff --git a/database/sqlite/sqlite_metadata.c b/database/sqlite/sqlite_metadata.c
index 4eb212152..35f928ffa 100644
--- a/database/sqlite/sqlite_metadata.c
+++ b/database/sqlite/sqlite_metadata.c
@@ -4,9 +4,9 @@
// SQL statements
-#define SQL_STORE_CLAIM_ID "insert into node_instance " \
- "(host_id, claim_id, date_created) values (@host_id, @claim_id, unixepoch()) " \
- "on conflict(host_id) do update set claim_id = excluded.claim_id;"
+#define SQL_STORE_CLAIM_ID "INSERT INTO node_instance " \
+ "(host_id, claim_id, date_created) VALUES (@host_id, @claim_id, unixepoch()) " \
+ "ON CONFLICT(host_id) DO UPDATE SET claim_id = excluded.claim_id;"
#define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;"
@@ -56,24 +56,13 @@
#define MAX_METADATA_CLEANUP (500) // Maximum metadata write operations (e.g deletes before retrying)
#define METADATA_MAX_BATCH_SIZE (512) // Maximum commands to execute before running the event loop
-#define METADATA_MAX_TRANSACTION_BATCH (128) // Maximum commands to add in a transaction
enum metadata_opcode {
METADATA_DATABASE_NOOP = 0,
METADATA_DATABASE_TIMER,
- METADATA_ADD_CHART,
- METADATA_ADD_CHART_LABEL,
- METADATA_ADD_DIMENSION,
METADATA_DEL_DIMENSION,
- METADATA_ADD_DIMENSION_OPTION,
- METADATA_ADD_HOST_SYSTEM_INFO,
- METADATA_ADD_HOST_INFO,
METADATA_STORE_CLAIM_ID,
- METADATA_STORE_HOST_LABELS,
- METADATA_STORE_BUFFER,
-
- METADATA_SKIP_TRANSACTION, // Dummy -- OPCODES less than this one can be in a tranasction
-
+ METADATA_ADD_HOST_INFO,
METADATA_SCAN_HOSTS,
METADATA_MAINTENANCE,
METADATA_SYNC_SHUTDOWN,
@@ -105,14 +94,14 @@ typedef enum {
struct metadata_wc {
uv_thread_t thread;
+ uv_loop_t *loop;
+ uv_async_t async;
+ uv_timer_t timer_req;
time_t check_metadata_after;
time_t check_hosts_after;
volatile unsigned queue_size;
- uv_loop_t *loop;
- uv_async_t async;
METADATA_FLAG flags;
uint64_t row_id;
- uv_timer_t timer_req;
struct completion init_complete;
/* FIFO command queue */
uv_mutex_t cmd_mutex;
@@ -339,7 +328,7 @@ static int sql_store_host_info(RRDHOST *host)
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
- rc = sqlite3_bind_int(res, ++param, (int ) host->health_enabled);
+ rc = sqlite3_bind_int(res, ++param, (int ) host->health.health_enabled);
if (unlikely(rc != SQLITE_OK))
goto bind_fail;
@@ -383,7 +372,7 @@ static BUFFER *sql_store_host_system_info(RRDHOST *host)
if (unlikely(!system_info))
return NULL;
- BUFFER *work_buffer = buffer_create(1024);
+ BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
struct query_build key_data = {.sql = work_buffer, .count = 0};
uuid_unparse_lower(host->host_uuid, key_data.uuid_str);
@@ -418,49 +407,6 @@ static BUFFER *sql_store_host_system_info(RRDHOST *host)
/*
- * Store set option for a dimension
- */
-static int sql_set_dimension_option(uuid_t *dim_uuid, char *option)
-{
- sqlite3_stmt *res = NULL;
- int rc;
-
- if (unlikely(!db_meta)) {
- if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
- return 0;
- error_report("Database has not been initialized");
- return 1;
- }
-
- rc = sqlite3_prepare_v2(db_meta, "UPDATE dimension SET options = @options WHERE dim_id = @dim_id", -1, &res, 0);
- if (unlikely(rc != SQLITE_OK)) {
- error_report("Failed to prepare statement to update dimension options");
- return 0;
- };
-
- rc = sqlite3_bind_blob(res, 2, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC);
- if (unlikely(rc != SQLITE_OK))
- goto bind_fail;
-
- if (!option || !strcmp(option,"unhide"))
- rc = sqlite3_bind_null(res, 1);
- else
- rc = sqlite3_bind_text(res, 1, option, -1, SQLITE_STATIC);
- if (unlikely(rc != SQLITE_OK))
- goto bind_fail;
-
- rc = execute_insert(res);
- if (unlikely(rc != SQLITE_DONE))
- error_report("Failed to update dimension option, rc = %d", rc);
-
-bind_fail:
- rc = sqlite3_finalize(res);
- if (unlikely(rc != SQLITE_OK))
- error_report("Failed to finalize statement in update dimension options, rc = %d", rc);
- return 0;
-}
-
-/*
* Store a chart in the database
*/
@@ -665,22 +611,26 @@ bind_fail:
return 1;
}
-static bool dimension_can_be_deleted(uuid_t *dim_uuid)
+static bool dimension_can_be_deleted(uuid_t *dim_uuid __maybe_unused)
{
#ifdef ENABLE_DBENGINE
- bool no_retention = true;
- for (size_t tier = 0; tier < storage_tiers; tier++) {
- if (!multidb_ctx[tier])
- continue;
- time_t first_time_t = 0, last_time_t = 0;
- if (rrdeng_metric_retention_by_uuid((void *) multidb_ctx[tier], dim_uuid, &first_time_t, &last_time_t) == 0) {
- if (first_time_t > 0) {
- no_retention = false;
- break;
+ if(dbengine_enabled) {
+ bool no_retention = true;
+ for (size_t tier = 0; tier < storage_tiers; tier++) {
+ if (!multidb_ctx[tier])
+ continue;
+ time_t first_time_t = 0, last_time_t = 0;
+ if (rrdeng_metric_retention_by_uuid((void *) multidb_ctx[tier], dim_uuid, &first_time_t, &last_time_t)) {
+ if (first_time_t > 0) {
+ no_retention = false;
+ break;
+ }
}
}
+ return no_retention;
}
- return no_retention;
+ else
+ return false;
#else
return false;
#endif
@@ -736,6 +686,16 @@ skip_run:
error_report("Failed to finalize the prepared statement when reading dimensions");
}
+static void cleanup_health_log(void)
+{
+ RRDHOST *host;
+ dfe_start_reentrant(rrdhost_root_index, host) {
+ if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))
+ continue;
+ sql_health_alarm_log_cleanup(host);
+ }
+ dfe_done(host);
+}
//
// EVENT LOOP STARTS HERE
@@ -817,7 +777,7 @@ static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd)
(void) uv_async_send(&wc->async);
}
-static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc, enum metadata_opcode *next_opcode)
+static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc)
{
struct metadata_cmd ret;
unsigned queue_size;
@@ -828,7 +788,6 @@ static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc, enum metadat
memset(&ret, 0, sizeof(ret));
ret.opcode = METADATA_DATABASE_NOOP;
ret.completion = NULL;
- *next_opcode = METADATA_DATABASE_NOOP;
} else {
/* dequeue command */
ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
@@ -840,10 +799,6 @@ static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc, enum metadat
wc->cmd_queue.head + 1 : 0;
}
wc->queue_size = queue_size - 1;
- if (wc->queue_size > 0)
- *next_opcode = wc->cmd_queue.cmd_array[wc->cmd_queue.head].opcode;
- else
- *next_opcode = METADATA_DATABASE_NOOP;
/* wake up producers */
uv_cond_signal(&wc->cmd_cond);
}
@@ -892,10 +847,16 @@ static void after_metadata_cleanup(uv_work_t *req, int status)
struct metadata_wc *wc = req->data;
metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
}
+
static void start_metadata_cleanup(uv_work_t *req)
{
+ register_libuv_worker_jobs();
+
+ worker_is_busy(UV_EVENT_METADATA_CLEANUP);
struct metadata_wc *wc = req->data;
check_dimension_metadata(wc);
+ cleanup_health_log();
+ worker_is_idle();
}
struct scan_metadata_payload {
@@ -920,13 +881,13 @@ static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused)
freez(data);
}
-static bool metadata_scan_host(RRDHOST *host, uint32_t max_count) {
+static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_counter) {
RRDSET *st;
int rc;
bool more_to_do = false;
uint32_t scan_count = 1;
- BUFFER *work_buffer = buffer_create(1024);
+ BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
rrdset_foreach_reentrant(st, host) {
if (scan_count == max_count) {
@@ -934,6 +895,8 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count) {
break;
}
if(rrdset_flag_check(st, RRDSET_FLAG_METADATA_UPDATE)) {
+ (*query_counter)++;
+
rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE);
scan_count++;
@@ -963,8 +926,15 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count) {
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
if(rrddim_flag_check(rd, RRDDIM_FLAG_METADATA_UPDATE)) {
+ (*query_counter)++;
+
rrddim_flag_clear(rd, RRDDIM_FLAG_METADATA_UPDATE);
+ if (rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN))
+ rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN);
+ else
+ rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN);
+
rc = sql_store_dimension(
&rd->metric_uuid,
&rd->rrdset->chart_uuid,
@@ -990,52 +960,119 @@ static bool metadata_scan_host(RRDHOST *host, uint32_t max_count) {
// Worker thread to scan hosts for pending metadata to store
static void start_metadata_hosts(uv_work_t *req __maybe_unused)
{
+ register_libuv_worker_jobs();
+
RRDHOST *host;
struct scan_metadata_payload *data = req->data;
struct metadata_wc *wc = data->wc;
+ usec_t all_started_ut = now_monotonic_usec(); (void)all_started_ut;
+ internal_error(true, "METADATA: checking all hosts...");
+
bool run_again = false;
+ worker_is_busy(UV_EVENT_METADATA_STORE);
+
+ if (!data->max_count)
+ db_execute("BEGIN TRANSACTION;");
dfe_start_reentrant(rrdhost_root_index, host) {
if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE))
continue;
- internal_error(true, "METADATA: Scanning host %s", rrdhost_hostname(host));
+
+ size_t query_counter = 0; (void)query_counter;
+ usec_t started_ut = now_monotonic_usec(); (void)started_ut;
+
rrdhost_flag_clear(host,RRDHOST_FLAG_METADATA_UPDATE);
- if (unlikely(metadata_scan_host(host, data->max_count))) {
+
+ if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_LABELS))) {
+ rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_LABELS);
+ int rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid);
+ if (likely(rc == SQLITE_OK)) {
+ BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
+ struct query_build tmp = {.sql = work_buffer, .count = 0};
+ uuid_unparse_lower(host->host_uuid, tmp.uuid_str);
+ rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp);
+ db_execute(buffer_tostring(work_buffer));
+ buffer_free(work_buffer);
+ query_counter++;
+ }
+ }
+
+ if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_CLAIMID))) {
+ rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_CLAIMID);
+ uuid_t uuid;
+
+ if (likely(host->aclk_state.claimed_id && !uuid_parse(host->aclk_state.claimed_id, uuid)))
+ store_claim_id(&host->host_uuid, &uuid);
+ else
+ store_claim_id(&host->host_uuid, NULL);
+
+ query_counter++;
+ }
+
+ if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))) {
+ rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO);
+
+ BUFFER *work_buffer = sql_store_host_system_info(host);
+ if(work_buffer) {
+ db_execute(buffer_tostring(work_buffer));
+ buffer_free(work_buffer);
+ query_counter++;
+ }
+
+ int rc = sql_store_host_info(host);
+ if (unlikely(rc))
+ error_report("METADATA: 'host:%s': failed to store host info", string2str(host->hostname));
+ else
+ query_counter++;
+ }
+
+ if (data->max_count)
+ db_execute("BEGIN TRANSACTION;");
+ if (unlikely(metadata_scan_host(host, data->max_count, &query_counter))) {
run_again = true;
rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE);
- internal_error(true,"METADATA: Rescheduling host %s to run; more charts to store", rrdhost_hostname(host));
+ internal_error(true,"METADATA: 'host:%s': scheduling another run, more charts to store", rrdhost_hostname(host));
}
+ if (data->max_count)
+ db_execute("COMMIT TRANSACTION;");
+
+ usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
+ internal_error(true, "METADATA: 'host:%s': saved metadata with %zu SQL statements, in %0.2f ms",
+ rrdhost_hostname(host), query_counter,
+ (double)(ended_ut - started_ut) / USEC_PER_MS);
}
dfe_done(host);
+ if (!data->max_count)
+ db_execute("COMMIT TRANSACTION;");
+
+ usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut;
+ internal_error(true, "METADATA: checking all hosts completed in %0.2f ms",
+ (double)(all_ended_ut - all_started_ut) / USEC_PER_MS);
+
if (unlikely(run_again))
wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_IMMEDIATE;
else
wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_INTERVAL;
+ worker_is_idle();
}
static void metadata_event_loop(void *arg)
{
+ service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
worker_register("METASYNC");
worker_register_job_name(METADATA_DATABASE_NOOP, "noop");
worker_register_job_name(METADATA_DATABASE_TIMER, "timer");
- worker_register_job_name(METADATA_ADD_CHART, "add chart");
- worker_register_job_name(METADATA_ADD_CHART_LABEL, "add chart label");
- worker_register_job_name(METADATA_ADD_DIMENSION, "add dimension");
worker_register_job_name(METADATA_DEL_DIMENSION, "delete dimension");
- worker_register_job_name(METADATA_ADD_DIMENSION_OPTION, "dimension option");
- worker_register_job_name(METADATA_ADD_HOST_SYSTEM_INFO, "host system info");
- worker_register_job_name(METADATA_ADD_HOST_INFO, "host info");
worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id");
- worker_register_job_name(METADATA_STORE_HOST_LABELS, "host labels");
+ worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info");
worker_register_job_name(METADATA_MAINTENANCE, "maintenance");
-
int ret;
uv_loop_t *loop;
unsigned cmd_batch_size;
struct metadata_wc *wc = arg;
- enum metadata_opcode opcode, next_opcode;
+ enum metadata_opcode opcode;
uv_work_t metadata_cleanup_worker;
uv_thread_set_name_np(wc->thread, "METASYNC");
@@ -1073,20 +1110,12 @@ static void metadata_event_loop(void *arg)
wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK;
int shutdown = 0;
- int in_transaction = 0;
- int commands_in_transaction = 0;
- // This can be used in the event loop for all opcodes (not workers)
- BUFFER *work_buffer = buffer_create(1024);
wc->row_id = 0;
completion_mark_complete(&wc->init_complete);
while (shutdown == 0 || (wc->flags & METADATA_WORKER_BUSY)) {
- RRDDIM *rd = NULL;
- RRDSET *st = NULL;
- RRDHOST *host = NULL;
- DICTIONARY_ITEM *dict_item = NULL;
- BUFFER *buffer = NULL;
uuid_t *uuid;
+ RRDHOST *host = NULL;
int rc;
worker_is_idle();
@@ -1098,7 +1127,7 @@ static void metadata_event_loop(void *arg)
if (unlikely(cmd_batch_size >= METADATA_MAX_BATCH_SIZE))
break;
- cmd = metadata_deq_cmd(wc, &next_opcode);
+ cmd = metadata_deq_cmd(wc);
opcode = cmd.opcode;
if (unlikely(opcode == METADATA_DATABASE_NOOP && metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
@@ -1108,130 +1137,38 @@ static void metadata_event_loop(void *arg)
++cmd_batch_size;
- // If we are not in transaction and this command is the same with the next ; start a transaction
- if (!in_transaction && opcode < METADATA_SKIP_TRANSACTION && opcode == next_opcode) {
- if (opcode != METADATA_DATABASE_NOOP) {
- in_transaction = 1;
- db_execute("BEGIN TRANSACTION;");
- }
- }
-
- if (likely(in_transaction)) {
- commands_in_transaction++;
- }
-
if (likely(opcode != METADATA_DATABASE_NOOP))
- worker_is_busy(opcode);
+ worker_is_busy(opcode);
switch (opcode) {
case METADATA_DATABASE_NOOP:
case METADATA_DATABASE_TIMER:
break;
- case METADATA_ADD_CHART:
- dict_item = (DICTIONARY_ITEM * ) cmd.param[0];
- st = (RRDSET *) dictionary_acquired_item_value(dict_item);
-
- rc = sql_store_chart(
- &st->chart_uuid,
- &st->rrdhost->host_uuid,
- string2str(st->parts.type),
- string2str(st->parts.id),
- string2str(st->parts.name),
- rrdset_family(st),
- rrdset_context(st),
- rrdset_title(st),
- rrdset_units(st),
- rrdset_plugin_name(st),
- rrdset_module_name(st),
- st->priority,
- st->update_every,
- st->chart_type,
- st->rrd_memory_mode,
- st->entries);
-
- if (unlikely(rc))
- error_report("Failed to store chart %s", rrdset_id(st));
-
- dictionary_acquired_item_release(st->rrdhost->rrdset_root_index, dict_item);
- break;
- case METADATA_ADD_CHART_LABEL:
- dict_item = (DICTIONARY_ITEM * ) cmd.param[0];
- st = (RRDSET *) dictionary_acquired_item_value(dict_item);
- check_and_update_chart_labels(st, work_buffer);
- dictionary_acquired_item_release(st->rrdhost->rrdset_root_index, dict_item);
- break;
- case METADATA_ADD_DIMENSION:
- dict_item = (DICTIONARY_ITEM * ) cmd.param[0];
- rd = (RRDDIM *) dictionary_acquired_item_value(dict_item);
-
- rc = sql_store_dimension(
- &rd->metric_uuid,
- &rd->rrdset->chart_uuid,
- string2str(rd->id),
- string2str(rd->name),
- rd->multiplier,
- rd->divisor,
- rd->algorithm,
- rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN));
- if (unlikely(rc))
- error_report("Failed to store dimension %s", rrddim_id(rd));
-
- dictionary_acquired_item_release(rd->rrdset->rrddim_root_index, dict_item);
- break;
case METADATA_DEL_DIMENSION:
uuid = (uuid_t *) cmd.param[0];
if (likely(dimension_can_be_deleted(uuid)))
delete_dimension_uuid(uuid);
freez(uuid);
break;
- case METADATA_ADD_DIMENSION_OPTION:
- dict_item = (DICTIONARY_ITEM * ) cmd.param[0];
- rd = (RRDDIM *) dictionary_acquired_item_value(dict_item);
- rc = sql_set_dimension_option(
- &rd->metric_uuid, rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN) ? "hidden" : NULL);
- if (unlikely(rc))
- error_report("Failed to store dimension option for %s", string2str(rd->id));
- dictionary_acquired_item_release(rd->rrdset->rrddim_root_index, dict_item);
- break;
- case METADATA_ADD_HOST_SYSTEM_INFO:
- buffer = (BUFFER *) cmd.param[0];
- db_execute(buffer_tostring(buffer));
- buffer_free(buffer);
- break;
- case METADATA_ADD_HOST_INFO:
- dict_item = (DICTIONARY_ITEM * ) cmd.param[0];
- host = (RRDHOST *) dictionary_acquired_item_value(dict_item);
- rc = sql_store_host_info(host);
- if (unlikely(rc))
- error_report("Failed to store host info in the database for %s", string2str(host->hostname));
- dictionary_acquired_item_release(rrdhost_root_index, dict_item);
- break;
case METADATA_STORE_CLAIM_ID:
store_claim_id((uuid_t *) cmd.param[0], (uuid_t *) cmd.param[1]);
freez((void *) cmd.param[0]);
freez((void *) cmd.param[1]);
break;
- case METADATA_STORE_HOST_LABELS:
- dict_item = (DICTIONARY_ITEM * ) cmd.param[0];
- host = (RRDHOST *) dictionary_acquired_item_value(dict_item);
- rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid);
-
- if (likely(rc == SQLITE_OK)) {
- buffer_flush(work_buffer);
- struct query_build tmp = {.sql = work_buffer, .count = 0};
- uuid_unparse_lower(host->host_uuid, tmp.uuid_str);
- rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp);
- db_execute(buffer_tostring(work_buffer));
- }
-
- dictionary_acquired_item_release(rrdhost_root_index, dict_item);
+ case METADATA_ADD_HOST_INFO:
+ host = (RRDHOST *) cmd.param[0];
+ rc = sql_store_host_info(host);
+ if (unlikely(rc))
+ error_report("Failed to store host info in the database for %s", string2str(host->hostname));
break;
-
case METADATA_SCAN_HOSTS:
if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SCANNING_HOSTS)))
break;
+ if (unittest_running)
+ break;
+
struct scan_metadata_payload *data = mallocz(sizeof(*data));
data->request.data = data;
data->wc = wc;
@@ -1242,7 +1179,7 @@ static void metadata_event_loop(void *arg)
cmd.completion = NULL; // Do not complete after launching worker (worker will do)
}
else
- data->max_count = 1000;
+ data->max_count = 5000;
metadata_flag_set(wc, METADATA_FLAG_SCANNING_HOSTS);
if (unlikely(
@@ -1255,11 +1192,6 @@ static void metadata_event_loop(void *arg)
metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
}
break;
- case METADATA_STORE_BUFFER:
- buffer = (BUFFER *) cmd.param[0];
- db_execute(buffer_tostring(buffer));
- buffer_free(buffer);
- break;
case METADATA_MAINTENANCE:
if (unlikely(metadata_flag_check(wc, METADATA_FLAG_CLEANUP)))
break;
@@ -1279,11 +1211,6 @@ static void metadata_event_loop(void *arg)
default:
break;
}
- if (in_transaction && (commands_in_transaction >= METADATA_MAX_TRANSACTION_BATCH || opcode != next_opcode)) {
- in_transaction = 0;
- db_execute("COMMIT TRANSACTION;");
- commands_in_transaction = 0;
- }
if (cmd.completion)
completion_mark_complete(cmd.completion);
@@ -1302,8 +1229,6 @@ static void metadata_event_loop(void *arg)
uv_run(loop, UV_RUN_DEFAULT);
uv_cond_destroy(&wc->cmd_cond);
- /* uv_mutex_destroy(&wc->cmd_mutex); */
- //fatal_assert(0 == uv_loop_close(loop));
int rc;
do {
@@ -1313,7 +1238,6 @@ static void metadata_event_loop(void *arg)
freez(loop);
worker_unregister();
- buffer_free(work_buffer);
info("METADATA: Shutting down event loop");
completion_mark_complete(&wc->init_complete);
return;
@@ -1408,50 +1332,6 @@ static inline void queue_metadata_cmd(enum metadata_opcode opcode, const void *p
}
// Public
-void metaqueue_chart_update(RRDSET *st)
-{
- const DICTIONARY_ITEM *acquired_st = dictionary_get_and_acquire_item(st->rrdhost->rrdset_root_index, string2str(st->id));
- queue_metadata_cmd(METADATA_ADD_CHART, acquired_st, NULL);
-}
-
-//
-// RD may not be collected, so we may store it needlessly
-void metaqueue_dimension_update(RRDDIM *rd)
-{
- const DICTIONARY_ITEM *acquired_rd =
- dictionary_get_and_acquire_item(rd->rrdset->rrddim_root_index, string2str(rd->id));
-
- if (unlikely(rrdset_flag_check(rd->rrdset, RRDSET_FLAG_METADATA_UPDATE))) {
- metaqueue_chart_update(rd->rrdset);
- rrdset_flag_clear(rd->rrdset, RRDSET_FLAG_METADATA_UPDATE);
- }
-
- queue_metadata_cmd(METADATA_ADD_DIMENSION, acquired_rd, NULL);
-}
-
-void metaqueue_dimension_update_flags(RRDDIM *rd)
-{
- const DICTIONARY_ITEM *acquired_rd =
- dictionary_get_and_acquire_item(rd->rrdset->rrddim_root_index, string2str(rd->id));
- queue_metadata_cmd(METADATA_ADD_DIMENSION_OPTION, acquired_rd, NULL);
-}
-
-void metaqueue_host_update_system_info(RRDHOST *host)
-{
- BUFFER *work_buffer = sql_store_host_system_info(host);
-
- if (unlikely(!work_buffer))
- return;
-
- queue_metadata_cmd(METADATA_ADD_HOST_SYSTEM_INFO, work_buffer, NULL);
-}
-
-void metaqueue_host_update_info(const char *machine_guid)
-{
- const DICTIONARY_ITEM *acquired_host = dictionary_get_and_acquire_item(rrdhost_root_index, machine_guid);
- queue_metadata_cmd(METADATA_ADD_HOST_INFO, acquired_host, NULL);
-}
-
void metaqueue_delete_dimension_uuid(uuid_t *uuid)
{
if (unlikely(!metasync_worker.loop))
@@ -1477,24 +1357,13 @@ void metaqueue_store_claim_id(uuid_t *host_uuid, uuid_t *claim_uuid)
queue_metadata_cmd(METADATA_STORE_CLAIM_ID, local_host_uuid, local_claim_uuid);
}
-void metaqueue_store_host_labels(const char *machine_guid)
+void metaqueue_host_update_info(RRDHOST *host)
{
- const DICTIONARY_ITEM *acquired_host = dictionary_get_and_acquire_item(rrdhost_root_index, machine_guid);
- queue_metadata_cmd(METADATA_STORE_HOST_LABELS, acquired_host, NULL);
-}
-
-void metaqueue_buffer(BUFFER *buffer)
-{
- queue_metadata_cmd(METADATA_STORE_BUFFER, buffer, NULL);
-}
-
-void metaqueue_chart_labels(RRDSET *st)
-{
- const DICTIONARY_ITEM *acquired_st = dictionary_get_and_acquire_item(st->rrdhost->rrdset_root_index, string2str(st->id));
- queue_metadata_cmd(METADATA_ADD_CHART_LABEL, acquired_st, NULL);
+ if (unlikely(!metasync_worker.loop))
+ return;
+ queue_metadata_cmd(METADATA_ADD_HOST_INFO, host, NULL);
}
-
//
// unitests
//
@@ -1542,7 +1411,7 @@ static void *metadata_unittest_threads(void)
tu.join = 0;
for (int i = 0; i < threads_to_create; i++) {
char buf[100 + 1];
- snprintf(buf, 100, "meta%d", i);
+ snprintf(buf, 100, "META[%d]", i);
netdata_thread_create(
&threads[i],
buf,
@@ -1558,7 +1427,6 @@ static void *metadata_unittest_threads(void)
void *retval;
netdata_thread_join(threads[i], &retval);
}
-// uv_async_send(&metasync_worker.async);
sleep_usec(5 * USEC_PER_SEC);
fprintf(stderr, "Added %u elements, processed %u\n", tu.added, tu.processed);