diff options
Diffstat (limited to '')
-rw-r--r-- | database/rrdhost.c | 422 |
1 files changed, 346 insertions, 76 deletions
diff --git a/database/rrdhost.c b/database/rrdhost.c index 69e4beabf..235d64b1e 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -42,6 +42,7 @@ bool is_storage_engine_shared(STORAGE_INSTANCE *engine __maybe_unused) { } RRDHOST *find_host_by_node_id(char *node_id) { + uuid_t node_uuid; if (unlikely(!node_id || uuid_parse(node_id, node_uuid))) return NULL; @@ -79,7 +80,7 @@ static inline void rrdhost_init() { } RRDHOST_ACQUIRED *rrdhost_find_and_acquire(const char *machine_guid) { - debug(D_RRD_CALLS, "rrdhost_find_and_acquire() host %s", machine_guid); + netdata_log_debug(D_RRD_CALLS, "rrdhost_find_and_acquire() host %s", machine_guid); return (RRDHOST_ACQUIRED *)dictionary_get_and_acquire_item(rrdhost_root_index, machine_guid); } @@ -115,7 +116,8 @@ static inline RRDHOST *rrdhost_index_add_by_guid(RRDHOST *host) { rrdhost_option_set(host, RRDHOST_OPTION_INDEXED_MACHINE_GUID); else { rrdhost_option_clear(host, RRDHOST_OPTION_INDEXED_MACHINE_GUID); - error("RRDHOST: %s() host with machine guid '%s' is already indexed", __FUNCTION__, host->machine_guid); + netdata_log_error("RRDHOST: %s() host with machine guid '%s' is already indexed", + __FUNCTION__, host->machine_guid); } return host; @@ -124,7 +126,8 @@ static inline RRDHOST *rrdhost_index_add_by_guid(RRDHOST *host) { static void rrdhost_index_del_by_guid(RRDHOST *host) { if(rrdhost_option_check(host, RRDHOST_OPTION_INDEXED_MACHINE_GUID)) { if(!dictionary_del(rrdhost_root_index, host->machine_guid)) - error("RRDHOST: %s() failed to delete machine guid '%s' from index", __FUNCTION__, host->machine_guid); + netdata_log_error("RRDHOST: %s() failed to delete machine guid '%s' from index", + __FUNCTION__, host->machine_guid); rrdhost_option_clear(host, RRDHOST_OPTION_INDEXED_MACHINE_GUID); } @@ -145,7 +148,8 @@ static inline void rrdhost_index_del_hostname(RRDHOST *host) { if(rrdhost_option_check(host, RRDHOST_OPTION_INDEXED_HOSTNAME)) { if(!dictionary_del(rrdhost_root_index_hostname, rrdhost_hostname(host))) - error("RRDHOST: %s() failed to delete hostname '%s' from index", __FUNCTION__, rrdhost_hostname(host)); + netdata_log_error("RRDHOST: %s() failed to delete hostname '%s' from index", + __FUNCTION__, rrdhost_hostname(host)); rrdhost_option_clear(host, RRDHOST_OPTION_INDEXED_HOSTNAME); } @@ -299,10 +303,11 @@ static RRDHOST *rrdhost_create( int is_localhost, bool archived ) { - debug(D_RRDHOST, "Host '%s': adding with guid '%s'", hostname, guid); + netdata_log_debug(D_RRDHOST, "Host '%s': adding with guid '%s'", hostname, guid); if(memory_mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled) { - error("memory mode 'dbengine' is not enabled, but host '%s' is configured for it. Falling back to 'alloc'", hostname); + netdata_log_error("memory mode 'dbengine' is not enabled, but host '%s' is configured for it. Falling back to 'alloc'", + hostname); memory_mode = RRD_MEMORY_MODE_ALLOC; } @@ -348,8 +353,8 @@ int is_legacy = 1; case RRD_MEMORY_MODE_MAP: case RRD_MEMORY_MODE_SAVE: case RRD_MEMORY_MODE_RAM: - if(host->rrdpush_seconds_to_replicate > host->rrd_history_entries * host->rrd_update_every) - host->rrdpush_seconds_to_replicate = host->rrd_history_entries * host->rrd_update_every; + if(host->rrdpush_seconds_to_replicate > (time_t) host->rrd_history_entries * (time_t) host->rrd_update_every) + host->rrdpush_seconds_to_replicate = (time_t) host->rrd_history_entries * (time_t) host->rrd_update_every; break; case RRD_MEMORY_MODE_DBENGINE: @@ -386,7 +391,7 @@ int is_legacy = 1; (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && is_legacy))) { int r = mkdir(host->cache_dir, 0775); if(r != 0 && errno != EEXIST) - error("Host '%s': cannot create directory '%s'", rrdhost_hostname(host), host->cache_dir); + netdata_log_error("Host '%s': cannot create directory '%s'", rrdhost_hostname(host), host->cache_dir); } } @@ -412,7 +417,7 @@ int is_legacy = 1; ret = mkdir(dbenginepath, 0775); if (ret != 0 && errno != EEXIST) - error("Host '%s': cannot create directory '%s'", rrdhost_hostname(host), dbenginepath); + netdata_log_error("Host '%s': cannot create directory '%s'", rrdhost_hostname(host), dbenginepath); else ret = 0; // succeed @@ -453,9 +458,8 @@ int is_legacy = 1; } if (ret) { // check legacy or multihost initialization success - error( - "Host '%s': cannot initialize host with machine guid '%s'. Failed to initialize DB engine at '%s'.", - rrdhost_hostname(host), host->machine_guid, host->cache_dir); + netdata_log_error("Host '%s': cannot initialize host with machine guid '%s'. Failed to initialize DB engine at '%s'.", + rrdhost_hostname(host), host->machine_guid, host->cache_dir); rrd_wrlock(); rrdhost_free___while_having_rrd_wrlock(host, true); @@ -503,7 +507,8 @@ int is_legacy = 1; RRDHOST *t = rrdhost_index_add_by_guid(host); if(t != host) { - error("Host '%s': cannot add host with machine guid '%s' to index. It already exists as host '%s' with machine guid '%s'.", rrdhost_hostname(host), host->machine_guid, rrdhost_hostname(t), t->machine_guid); + netdata_log_error("Host '%s': cannot add host with machine guid '%s' to index. It already exists as host '%s' with machine guid '%s'.", + rrdhost_hostname(host), host->machine_guid, rrdhost_hostname(t), t->machine_guid); rrdhost_free___while_having_rrd_wrlock(host, true); rrd_unlock(); return NULL; @@ -520,7 +525,7 @@ int is_legacy = 1; // ------------------------------------------------------------------------ - info("Host '%s' (at registry as '%s') with guid '%s' initialized" + netdata_log_info("Host '%s' (at registry as '%s') with guid '%s' initialized" ", os '%s'" ", timezone '%s'" ", tags '%s'" @@ -528,7 +533,7 @@ int is_legacy = 1; ", program_version '%s'" ", update every %d" ", memory mode %s" - ", history entries %ld" + ", history entries %d" ", streaming %s" " (to '%s' with api key '%s')" ", health %s" @@ -593,7 +598,7 @@ static void rrdhost_update(RRDHOST *host { UNUSED(guid); - netdata_spinlock_lock(&host->rrdhost_update_lock); + spinlock_lock(&host->rrdhost_update_lock); host->health.health_enabled = (mode == RRD_MEMORY_MODE_NONE) ? 0 : health_enabled; @@ -611,34 +616,44 @@ static void rrdhost_update(RRDHOST *host host->registry_hostname = string_strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname); if(strcmp(rrdhost_hostname(host), hostname) != 0) { - info("Host '%s' has been renamed to '%s'. If this is not intentional it may mean multiple hosts are using the same machine_guid.", rrdhost_hostname(host), hostname); + netdata_log_info("Host '%s' has been renamed to '%s'. If this is not intentional it may mean multiple hosts are using the same machine_guid.", rrdhost_hostname(host), hostname); rrdhost_init_hostname(host, hostname, true); } else { rrdhost_index_add_hostname(host); } if(strcmp(rrdhost_program_name(host), program_name) != 0) { - info("Host '%s' switched program name from '%s' to '%s'", rrdhost_hostname(host), rrdhost_program_name(host), program_name); + netdata_log_info("Host '%s' switched program name from '%s' to '%s'", rrdhost_hostname(host), rrdhost_program_name(host), program_name); STRING *t = host->program_name; host->program_name = string_strdupz(program_name); string_freez(t); } if(strcmp(rrdhost_program_version(host), program_version) != 0) { - info("Host '%s' switched program version from '%s' to '%s'", rrdhost_hostname(host), rrdhost_program_version(host), program_version); + netdata_log_info("Host '%s' switched program version from '%s' to '%s'", rrdhost_hostname(host), rrdhost_program_version(host), program_version); STRING *t = host->program_version; host->program_version = string_strdupz(program_version); string_freez(t); } if(host->rrd_update_every != update_every) - error("Host '%s' has an update frequency of %d seconds, but the wanted one is %d seconds. Restart netdata here to apply the new settings.", rrdhost_hostname(host), host->rrd_update_every, update_every); + netdata_log_error("Host '%s' has an update frequency of %d seconds, but the wanted one is %d seconds. " + "Restart netdata here to apply the new settings.", + rrdhost_hostname(host), host->rrd_update_every, update_every); if(host->rrd_memory_mode != mode) - error("Host '%s' has memory mode '%s', but the wanted one is '%s'. Restart netdata here to apply the new settings.", rrdhost_hostname(host), rrd_memory_mode_name(host->rrd_memory_mode), rrd_memory_mode_name(mode)); + netdata_log_error("Host '%s' has memory mode '%s', but the wanted one is '%s'. " + "Restart netdata here to apply the new settings.", + rrdhost_hostname(host), + rrd_memory_mode_name(host->rrd_memory_mode), + rrd_memory_mode_name(mode)); else if(host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE && host->rrd_history_entries < history) - error("Host '%s' has history of %ld entries, but the wanted one is %ld entries. Restart netdata here to apply the new settings.", rrdhost_hostname(host), host->rrd_history_entries, history); + netdata_log_error("Host '%s' has history of %d entries, but the wanted one is %ld entries. " + "Restart netdata here to apply the new settings.", + rrdhost_hostname(host), + host->rrd_history_entries, + history); // update host tags rrdhost_init_tags(host, tags); @@ -678,10 +693,10 @@ static void rrdhost_update(RRDHOST *host ml_host_new(host); rrdhost_load_rrdcontext_data(host); - info("Host %s is not in archived mode anymore", rrdhost_hostname(host)); + netdata_log_info("Host %s is not in archived mode anymore", rrdhost_hostname(host)); } - netdata_spinlock_unlock(&host->rrdhost_update_lock); + spinlock_unlock(&host->rrdhost_update_lock); } RRDHOST *rrdhost_find_or_create( @@ -709,7 +724,7 @@ RRDHOST *rrdhost_find_or_create( , struct rrdhost_system_info *system_info , bool archived ) { - debug(D_RRDHOST, "Searching for host '%s' with guid '%s'", hostname, guid); + netdata_log_debug(D_RRDHOST, "Searching for host '%s' with guid '%s'", hostname, guid); RRDHOST *host = rrdhost_find_by_guid(guid); if (unlikely(host && host->rrd_memory_mode != mode && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) { @@ -718,8 +733,10 @@ RRDHOST *rrdhost_find_or_create( return host; /* If a legacy memory mode instantiates all dbengine state must be discarded to avoid inconsistencies */ - error("Archived host '%s' has memory mode '%s', but the wanted one is '%s'. Discarding archived state.", - rrdhost_hostname(host), rrd_memory_mode_name(host->rrd_memory_mode), rrd_memory_mode_name(mode)); + netdata_log_error("Archived host '%s' has memory mode '%s', but the wanted one is '%s'. Discarding archived state.", + rrdhost_hostname(host), + rrd_memory_mode_name(host->rrd_memory_mode), + rrd_memory_mode_name(mode)); rrd_wrlock(); rrdhost_free___while_having_rrd_wrlock(host, true); @@ -827,18 +844,18 @@ void dbengine_init(char *hostname) { if (read_num > 0 && read_num <= MAX_PAGES_PER_EXTENT) rrdeng_pages_per_extent = read_num; else { - error("Invalid dbengine pages per extent %u given. Using %u.", read_num, rrdeng_pages_per_extent); + netdata_log_error("Invalid dbengine pages per extent %u given. Using %u.", read_num, rrdeng_pages_per_extent); config_set_number(CONFIG_SECTION_DB, "dbengine pages per extent", rrdeng_pages_per_extent); } storage_tiers = config_get_number(CONFIG_SECTION_DB, "storage tiers", storage_tiers); if(storage_tiers < 1) { - error("At least 1 storage tier is required. Assuming 1."); + netdata_log_error("At least 1 storage tier is required. Assuming 1."); storage_tiers = 1; config_set_number(CONFIG_SECTION_DB, "storage tiers", storage_tiers); } if(storage_tiers > RRD_STORAGE_TIERS) { - error("Up to %d storage tier are supported. Assuming %d.", RRD_STORAGE_TIERS, RRD_STORAGE_TIERS); + netdata_log_error("Up to %d storage tier are supported. Assuming %d.", RRD_STORAGE_TIERS, RRD_STORAGE_TIERS); storage_tiers = RRD_STORAGE_TIERS; config_set_number(CONFIG_SECTION_DB, "storage tiers", storage_tiers); } @@ -860,7 +877,7 @@ void dbengine_init(char *hostname) { int ret = mkdir(dbenginepath, 0775); if (ret != 0 && errno != EEXIST) { - error("DBENGINE on '%s': cannot create directory '%s'", hostname, dbenginepath); + netdata_log_error("DBENGINE on '%s': cannot create directory '%s'", hostname, dbenginepath); break; } @@ -880,7 +897,9 @@ void dbengine_init(char *hostname) { if(grouping_iterations < 2) { grouping_iterations = 2; config_set_number(CONFIG_SECTION_DB, dbengineconfig, grouping_iterations); - error("DBENGINE on '%s': 'dbegnine tier %zu update every iterations' cannot be less than 2. Assuming 2.", hostname, tier); + netdata_log_error("DBENGINE on '%s': 'dbegnine tier %zu update every iterations' cannot be less than 2. Assuming 2.", + hostname, + tier); } snprintfz(dbengineconfig, 200, "dbengine tier %zu backfill", tier); @@ -889,7 +908,7 @@ void dbengine_init(char *hostname) { else if(strcmp(bf, "full") == 0) backfill = RRD_BACKFILL_FULL; else if(strcmp(bf, "none") == 0) backfill = RRD_BACKFILL_NONE; else { - error("DBENGINE: unknown backfill value '%s', assuming 'new'", bf); + netdata_log_error("DBENGINE: unknown backfill value '%s', assuming 'new'", bf); config_set(CONFIG_SECTION_DB, dbengineconfig, "new"); backfill = RRD_BACKFILL_NEW; } @@ -900,7 +919,10 @@ void dbengine_init(char *hostname) { if(tier > 0 && get_tier_grouping(tier) > 65535) { storage_tiers_grouping_iterations[tier] = 1; - error("DBENGINE on '%s': dbengine tier %zu gives aggregation of more than 65535 points of tier 0. Disabling tiers above %zu", hostname, tier, tier); + netdata_log_error("DBENGINE on '%s': dbengine tier %zu gives aggregation of more than 65535 points of tier 0. Disabling tiers above %zu", + hostname, + tier, + tier); break; } @@ -911,9 +933,12 @@ void dbengine_init(char *hostname) { strncpyz(tiers_init[tier].path, dbenginepath, FILENAME_MAX); tiers_init[tier].ret = 0; - if(parallel_initialization) - netdata_thread_create(&tiers_init[tier].thread, "DBENGINE_INIT", NETDATA_THREAD_OPTION_JOINABLE, + if(parallel_initialization) { + char tag[NETDATA_THREAD_TAG_MAX + 1]; + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "DBENGINIT[%zu]", tier); + netdata_thread_create(&tiers_init[tier].thread, tag, NETDATA_THREAD_OPTION_JOINABLE, dbengine_tier_init, &tiers_init[tier]); + } else dbengine_tier_init(&tiers_init[tier]); } @@ -925,16 +950,21 @@ void dbengine_init(char *hostname) { netdata_thread_join(tiers_init[tier].thread, &ptr); if(tiers_init[tier].ret != 0) { - error("DBENGINE on '%s': Failed to initialize multi-host database tier %zu on path '%s'", - hostname, tiers_init[tier].tier, tiers_init[tier].path); + netdata_log_error("DBENGINE on '%s': Failed to initialize multi-host database tier %zu on path '%s'", + hostname, + tiers_init[tier].tier, + tiers_init[tier].path); } else if(created_tiers == tier) created_tiers++; } if(created_tiers && created_tiers < storage_tiers) { - error("DBENGINE on '%s': Managed to create %zu tiers instead of %zu. Continuing with %zu available.", - hostname, created_tiers, storage_tiers, created_tiers); + netdata_log_error("DBENGINE on '%s': Managed to create %zu tiers instead of %zu. Continuing with %zu available.", + hostname, + created_tiers, + storage_tiers, + created_tiers); storage_tiers = created_tiers; } else if(!created_tiers) @@ -947,7 +977,7 @@ void dbengine_init(char *hostname) { #else storage_tiers = config_get_number(CONFIG_SECTION_DB, "storage tiers", 1); if(storage_tiers != 1) { - error("DBENGINE is not available on '%s', so only 1 database tier can be supported.", hostname); + netdata_log_error("DBENGINE is not available on '%s', so only 1 database tier can be supported.", hostname); storage_tiers = 1; config_set_number(CONFIG_SECTION_DB, "storage tiers", storage_tiers); } @@ -963,7 +993,7 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info, bool unitt set_late_global_environment(system_info); fatal("Failed to initialize SQLite"); } - info("Skipping SQLITE metadata initialization since memory mode is not dbengine"); + netdata_log_info("Skipping SQLITE metadata initialization since memory mode is not dbengine"); } if (unlikely(sql_init_context_database(system_info ? 0 : 1))) { @@ -978,23 +1008,23 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info, bool unitt rrdpush_init(); if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE || rrdpush_receiver_needs_dbengine()) { - info("DBENGINE: Initializing ..."); + netdata_log_info("DBENGINE: Initializing ..."); dbengine_init(hostname); } else { - info("DBENGINE: Not initializing ..."); + netdata_log_info("DBENGINE: Not initializing ..."); storage_tiers = 1; } if (!dbengine_enabled) { if (storage_tiers > 1) { - error("dbengine is not enabled, but %zu tiers have been requested. Resetting tiers to 1", - storage_tiers); + netdata_log_error("dbengine is not enabled, but %zu tiers have been requested. Resetting tiers to 1", + storage_tiers); storage_tiers = 1; } if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) { - error("dbengine is not enabled, but it has been given as the default db mode. Resetting db mode to alloc"); + netdata_log_error("dbengine is not enabled, but it has been given as the default db mode. Resetting db mode to alloc"); default_rrd_memory_mode = RRD_MEMORY_MODE_ALLOC; } } @@ -1003,7 +1033,7 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info, bool unitt if(!unittest) metadata_sync_init(); - debug(D_RRDHOST, "Initializing localhost with hostname '%s'", hostname); + netdata_log_debug(D_RRDHOST, "Initializing localhost with hostname '%s'", hostname); localhost = rrdhost_create( hostname , registry_get_this_machine_hostname() @@ -1035,6 +1065,15 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info, bool unitt return 1; } +#ifdef NETDATA_DEV_MODE + // we register this only on localhost + // for the other nodes, the origin server should register it + rrd_collector_started(); // this creates a collector that runs for as long as netdata runs + rrd_collector_add_function(localhost, NULL, "streaming", 10, + RRDFUNCTIONS_STREAMING_HELP, true, + rrdhost_function_streaming, NULL); +#endif + if (likely(system_info)) { migrate_localhost(&localhost->host_uuid); sql_aclk_sync_init(); @@ -1094,22 +1133,20 @@ static void rrdhost_streaming_sender_structures_init(RRDHOST *host) host->sender->host = host; host->sender->buffer = cbuffer_new(CBUFFER_INITIAL_SIZE, 1024 * 1024, &netdata_buffers_statistics.cbuffers_streaming); - host->sender->capabilities = stream_our_capabilities(); + host->sender->capabilities = stream_our_capabilities(host, true); host->sender->rrdpush_sender_pipe[PIPE_READ] = -1; host->sender->rrdpush_sender_pipe[PIPE_WRITE] = -1; host->sender->rrdpush_sender_socket = -1; -#ifdef ENABLE_COMPRESSION - if(default_compression_enabled) { +#ifdef ENABLE_RRDPUSH_COMPRESSION + if(default_rrdpush_compression_enabled) host->sender->flags |= SENDER_FLAG_COMPRESSION; - host->sender->compressor = create_compressor(); - } else host->sender->flags &= ~SENDER_FLAG_COMPRESSION; #endif - netdata_mutex_init(&host->sender->mutex); + spinlock_init(&host->sender->spinlock); replication_init_sender(host->sender); } @@ -1120,11 +1157,10 @@ static void rrdhost_streaming_sender_structures_free(RRDHOST *host) if (unlikely(!host->sender)) return; - rrdpush_sender_thread_stop(host, "HOST CLEANUP", true); // stop a possibly running thread + rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP, true); // stop a possibly running thread cbuffer_free(host->sender->buffer); -#ifdef ENABLE_COMPRESSION - if (host->sender->compressor) - host->sender->compressor->destroy(&host->sender->compressor); +#ifdef ENABLE_RRDPUSH_COMPRESSION + rrdpush_compressor_destroy(&host->sender->compressor); #endif replication_cleanup_sender(host->sender); @@ -1139,7 +1175,7 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) { if(!host) return; if (netdata_exit || force) { - info("RRD: 'host:%s' freeing memory...", rrdhost_hostname(host)); + netdata_log_info("RRD: 'host:%s' freeing memory...", rrdhost_hostname(host)); // ------------------------------------------------------------------------ // first remove it from the indexes, so that it will not be discoverable @@ -1157,7 +1193,7 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) { rrdhost_streaming_sender_structures_free(host); if (netdata_exit || force) - stop_streaming_receiver(host, "HOST CLEANUP"); + stop_streaming_receiver(host, STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP); // ------------------------------------------------------------------------ @@ -1199,7 +1235,7 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) { #endif if (!netdata_exit && !force) { - info("RRD: 'host:%s' is now in archive mode...", rrdhost_hostname(host)); + netdata_log_info("RRD: 'host:%s' is now in archive mode...", rrdhost_hostname(host)); rrdhost_flag_set(host, RRDHOST_FLAG_ARCHIVED | RRDHOST_FLAG_ORPHAN); return; } @@ -1226,7 +1262,6 @@ void rrdhost_free___while_having_rrd_wrlock(RRDHOST *host, bool force) { string_freez(host->health.health_default_recipient); string_freez(host->registry_hostname); simple_pattern_free(host->rrdpush_send_charts_matching); - netdata_rwlock_destroy(&host->health_log.alarm_log_rwlock); freez(host->node_id); rrdfamily_index_destroy(host); @@ -1269,7 +1304,7 @@ void rrd_finalize_collection_for_all_hosts(void) { void rrdhost_save_charts(RRDHOST *host) { if(!host) return; - info("RRD: 'host:%s' saving / closing database...", rrdhost_hostname(host)); + netdata_log_info("RRD: 'host:%s' saving / closing database...", rrdhost_hostname(host)); RRDSET *st; @@ -1378,15 +1413,16 @@ static void rrdhost_load_auto_labels(void) { rrdlabels_add(labels, "_streams_to", localhost->rrdpush_send_destination, RRDLABEL_SRC_AUTO); } -void rrdhost_set_is_parent_label(int count) { - DICTIONARY *labels = localhost->rrdlabels; +void rrdhost_set_is_parent_label(void) { + int count = __atomic_load_n(&localhost->connected_children_count, __ATOMIC_RELAXED); if (count == 0 || count == 1) { + DICTIONARY *labels = localhost->rrdlabels; rrdlabels_add(labels, "_is_parent", (count) ? "true" : "false", RRDLABEL_SRC_AUTO); //queue a node info #ifdef ENABLE_ACLK - if (netdata_cloud_setting) { + if (netdata_cloud_enabled) { aclk_queue_node_info(localhost, false); } #endif @@ -1397,7 +1433,7 @@ static void rrdhost_load_config_labels(void) { int status = config_load(NULL, 1, CONFIG_SECTION_HOST_LABEL); if(!status) { char *filename = CONFIG_DIR "/" CONFIG_FILENAME; - error("RRDLABEL: Cannot reload the configuration file '%s', using labels in memory", filename); + netdata_log_error("RRDLABEL: Cannot reload the configuration file '%s', using labels in memory", filename); } struct section *co = appconfig_get_section(&netdata_config, CONFIG_SECTION_HOST_LABEL); @@ -1417,11 +1453,11 @@ static void rrdhost_load_kubernetes_labels(void) { sprintf(label_script, "%s/%s", netdata_configured_primary_plugins_dir, "get-kubernetes-labels.sh"); if (unlikely(access(label_script, R_OK) != 0)) { - error("Kubernetes pod label fetching script %s not found.",label_script); + netdata_log_error("Kubernetes pod label fetching script %s not found.",label_script); return; } - debug(D_RRDHOST, "Attempting to fetch external labels via %s", label_script); + netdata_log_debug(D_RRDHOST, "Attempting to fetch external labels via %s", label_script); pid_t pid; FILE *fp_child_input; @@ -1435,7 +1471,8 @@ static void rrdhost_load_kubernetes_labels(void) { // Non-zero exit code means that all the script output is error messages. We've shown already any message that didn't include a ':' // Here we'll inform with an ERROR that the script failed, show whatever (if anything) was added to the list of labels, free the memory and set the return to null int rc = netdata_pclose(fp_child_input, fp_child_output, pid); - if(rc) error("%s exited abnormally. Failed to get kubernetes labels.", label_script); + if(rc) + netdata_log_error("%s exited abnormally. Failed to get kubernetes labels.", label_script); } void reload_host_labels(void) { @@ -1455,7 +1492,7 @@ void reload_host_labels(void) { } void rrdhost_finalize_collection(RRDHOST *host) { - info("RRD: 'host:%s' stopping data collection...", rrdhost_hostname(host)); + netdata_log_info("RRD: 'host:%s' stopping data collection...", rrdhost_hostname(host)); RRDSET *st; rrdset_foreach_read(st, host) @@ -1469,7 +1506,7 @@ void rrdhost_finalize_collection(RRDHOST *host) { void rrdhost_delete_charts(RRDHOST *host) { if(!host) return; - info("RRD: 'host:%s' deleting disk files...", rrdhost_hostname(host)); + netdata_log_info("RRD: 'host:%s' deleting disk files...", rrdhost_hostname(host)); RRDSET *st; @@ -1491,7 +1528,7 @@ void rrdhost_delete_charts(RRDHOST *host) { void rrdhost_cleanup_charts(RRDHOST *host) { if(!host) return; - info("RRD: 'host:%s' cleaning up disk files...", rrdhost_hostname(host)); + netdata_log_info("RRD: 'host:%s' cleaning up disk files...", rrdhost_hostname(host)); RRDSET *st; uint32_t rrdhost_delete_obsolete_charts = rrdhost_option_check(host, RRDHOST_OPTION_DELETE_OBSOLETE_CHARTS); @@ -1518,7 +1555,7 @@ void rrdhost_cleanup_charts(RRDHOST *host) { // RRDHOST - save all hosts to disk void rrdhost_save_all(void) { - info("RRD: saving databases [%zu hosts(s)]...", rrdhost_hosts_available()); + netdata_log_info("RRD: saving databases [%zu hosts(s)]...", rrdhost_hosts_available()); rrd_rdlock(); @@ -1533,7 +1570,7 @@ void rrdhost_save_all(void) { // RRDHOST - save or delete all hosts from disk void rrdhost_cleanup_all(void) { - info("RRD: cleaning up database [%zu hosts(s)]...", rrdhost_hosts_available()); + netdata_log_info("RRD: cleaning up database [%zu hosts(s)]...", rrdhost_hosts_available()); rrd_rdlock(); @@ -1687,3 +1724,236 @@ int rrdhost_set_system_info_variable(struct rrdhost_system_info *system_info, ch return res; } + +static NETDATA_DOUBLE rrdhost_sender_replication_completion_unsafe(RRDHOST *host, time_t now, size_t *instances) { + size_t charts = rrdhost_sender_replicating_charts(host); + NETDATA_DOUBLE completion; + if(!charts || !host->sender || !host->sender->replication.oldest_request_after_t) + completion = 100.0; + else if(!host->sender->replication.latest_completed_before_t || host->sender->replication.latest_completed_before_t < host->sender->replication.oldest_request_after_t) + completion = 0.0; + else { + time_t total = now - host->sender->replication.oldest_request_after_t; + time_t current = host->sender->replication.latest_completed_before_t - host->sender->replication.oldest_request_after_t; + completion = (NETDATA_DOUBLE) current * 100.0 / (NETDATA_DOUBLE) total; + } + + *instances = charts; + + return completion; +} + +bool rrdhost_matches_window(RRDHOST *host, time_t after, time_t before, time_t now) { + time_t first_time_s, last_time_s; + rrdhost_retention(host, now, rrdhost_is_online(host), &first_time_s, &last_time_s); + return query_matches_retention(after, before, first_time_s, last_time_s, 0); +} + +bool rrdhost_state_cloud_emulation(RRDHOST *host) { + return rrdhost_is_online(host); +} + +void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s) { + memset(s, 0, sizeof(*s)); + + s->host = host; + s->now = now; + + RRDHOST_FLAGS flags = __atomic_load_n(&host->flags, __ATOMIC_RELAXED); + + // --- db --- + + bool online = rrdhost_is_online(host); + + rrdhost_retention(host, now, online, &s->db.first_time_s, &s->db.last_time_s); + s->db.metrics = host->rrdctx.metrics; + s->db.instances = host->rrdctx.instances; + s->db.contexts = dictionary_entries(host->rrdctx.contexts); + if(!s->db.first_time_s || !s->db.last_time_s || !s->db.metrics || !s->db.instances || !s->db.contexts || + (flags & (RRDHOST_FLAG_PENDING_CONTEXT_LOAD|RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS))) + s->db.status = RRDHOST_DB_STATUS_INITIALIZING; + else + s->db.status = RRDHOST_DB_STATUS_QUERYABLE; + + s->db.mode = host->rrd_memory_mode; + + // --- ingest --- + + s->ingest.since = MAX(host->child_connect_time, host->child_disconnected_time); + s->ingest.reason = (online) ? STREAM_HANDSHAKE_NEVER : host->rrdpush_last_receiver_exit_reason; + + netdata_mutex_lock(&host->receiver_lock); + s->ingest.hops = (host->system_info ? host->system_info->hops : (host == localhost) ? 0 : 1); + bool has_receiver = false; + if (host->receiver) { + has_receiver = true; + s->ingest.replication.instances = rrdhost_receiver_replicating_charts(host); + s->ingest.replication.completion = host->rrdpush_receiver_replication_percent; + s->ingest.replication.in_progress = s->ingest.replication.instances > 0; + + s->ingest.capabilities = host->receiver->capabilities; + s->ingest.peers = socket_peers(host->receiver->fd); +#ifdef ENABLE_HTTPS + s->ingest.ssl = SSL_connection(&host->receiver->ssl); +#endif + } + netdata_mutex_unlock(&host->receiver_lock); + + if (online) { + if(s->db.status == RRDHOST_DB_STATUS_INITIALIZING) + s->ingest.status = RRDHOST_INGEST_STATUS_INITIALIZING; + + else if (host == localhost || rrdhost_option_check(host, RRDHOST_OPTION_VIRTUAL_HOST)) { + s->ingest.status = RRDHOST_INGEST_STATUS_ONLINE; + s->ingest.since = netdata_start_time; + } + + else if (s->ingest.replication.in_progress) + s->ingest.status = RRDHOST_INGEST_STATUS_REPLICATING; + + else + s->ingest.status = RRDHOST_INGEST_STATUS_ONLINE; + } + else { + if (!s->ingest.since) { + s->ingest.status = RRDHOST_INGEST_STATUS_ARCHIVED; + s->ingest.since = s->db.last_time_s; + } + + else + s->ingest.status = RRDHOST_INGEST_STATUS_OFFLINE; + } + + if(host == localhost) + s->ingest.type = RRDHOST_INGEST_TYPE_LOCALHOST; + else if(has_receiver || rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED)) + s->ingest.type = RRDHOST_INGEST_TYPE_CHILD; + else if(rrdhost_option_check(host, RRDHOST_OPTION_VIRTUAL_HOST)) + s->ingest.type = RRDHOST_INGEST_TYPE_VIRTUAL; + else + s->ingest.type = RRDHOST_INGEST_TYPE_ARCHIVED; + + s->ingest.id = host->rrdpush_receiver_connection_counter; + + if(!s->ingest.since) + s->ingest.since = netdata_start_time; + + if(s->ingest.status == RRDHOST_INGEST_STATUS_ONLINE) + s->db.liveness = RRDHOST_DB_LIVENESS_LIVE; + else + s->db.liveness = RRDHOST_DB_LIVENESS_STALE; + + // --- stream --- + + if (!host->sender) { + s->stream.status = RRDHOST_STREAM_STATUS_DISABLED; + s->stream.hops = s->ingest.hops + 1; + } + else { + sender_lock(host->sender); + + s->stream.since = host->sender->last_state_since_t; + s->stream.peers = socket_peers(host->sender->rrdpush_sender_socket); + s->stream.ssl = SSL_connection(&host->sender->ssl); + + memcpy(s->stream.sent_bytes_on_this_connection_per_type, + host->sender->sent_bytes_on_this_connection_per_type, + MIN(sizeof(s->stream.sent_bytes_on_this_connection_per_type), + sizeof(host->sender->sent_bytes_on_this_connection_per_type))); + + if (rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED)) { + s->stream.hops = host->sender->hops; + s->stream.reason = STREAM_HANDSHAKE_NEVER; + s->stream.capabilities = host->sender->capabilities; + + s->stream.replication.completion = rrdhost_sender_replication_completion_unsafe(host, now, &s->stream.replication.instances); + s->stream.replication.in_progress = s->stream.replication.instances > 0; + + if(s->stream.replication.in_progress) + s->stream.status = RRDHOST_STREAM_STATUS_REPLICATING; + else + s->stream.status = RRDHOST_STREAM_STATUS_ONLINE; + +#ifdef ENABLE_RRDPUSH_COMPRESSION + s->stream.compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor.initialized); +#endif + } + else { + s->stream.status = RRDHOST_STREAM_STATUS_OFFLINE; + s->stream.hops = s->ingest.hops + 1; + s->stream.reason = host->sender->exit.reason; + } + + sender_unlock(host->sender); + } + + s->stream.id = host->rrdpush_sender_connection_counter; + + if(!s->stream.since) + s->stream.since = netdata_start_time; + + // --- ml --- + + if(ml_host_get_host_status(host, &s->ml.metrics)) { + s->ml.type = RRDHOST_ML_TYPE_SELF; + + if(s->ingest.status == RRDHOST_INGEST_STATUS_OFFLINE || s->ingest.status == RRDHOST_INGEST_STATUS_ARCHIVED) + s->ml.status = RRDHOST_ML_STATUS_OFFLINE; + else + s->ml.status = RRDHOST_ML_STATUS_RUNNING; + } + else if(stream_has_capability(&s->ingest, STREAM_CAP_DATA_WITH_ML)) { + s->ml.type = RRDHOST_ML_TYPE_RECEIVED; + s->ml.status = RRDHOST_ML_STATUS_RUNNING; + } + else { + // does not receive ML, does not run ML + s->ml.type = RRDHOST_ML_TYPE_DISABLED; + s->ml.status = RRDHOST_ML_STATUS_DISABLED; + } + + // --- health --- + + if(host->health.health_enabled) { + if(flags & RRDHOST_FLAG_PENDING_HEALTH_INITIALIZATION) + s->health.status = RRDHOST_HEALTH_STATUS_INITIALIZING; + else { + s->health.status = RRDHOST_HEALTH_STATUS_RUNNING; + + RRDCALC *rc; + foreach_rrdcalc_in_rrdhost_read(host, rc) { + if (unlikely(!rc->rrdset || !rc->rrdset->last_collected_time.tv_sec)) + continue; + + switch (rc->status) { + default: + case RRDCALC_STATUS_REMOVED: + break; + + case RRDCALC_STATUS_CLEAR: + s->health.alerts.clear++; + break; + + case RRDCALC_STATUS_WARNING: + s->health.alerts.warning++; + break; + + case RRDCALC_STATUS_CRITICAL: + s->health.alerts.critical++; + break; + + case RRDCALC_STATUS_UNDEFINED: + s->health.alerts.undefined++; + break; + + case RRDCALC_STATUS_UNINITIALIZED: + s->health.alerts.uninitialized++; + break; + } + } + foreach_rrdcalc_in_rrdhost_done(rc); + } + } + else + s->health.status = RRDHOST_HEALTH_STATUS_DISABLED; +} |