diff options
Diffstat (limited to 'database/rrdhost.c')
-rw-r--r-- | database/rrdhost.c | 1159 |
1 files changed, 491 insertions, 668 deletions
diff --git a/database/rrdhost.c b/database/rrdhost.c index 7f4bd95ba..5ba13d47b 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -3,21 +3,21 @@ #define NETDATA_RRD_INTERNALS #include "rrd.h" -int storage_tiers = 1; -int storage_tiers_grouping_iterations[RRD_STORAGE_TIERS] = { 1, 60, 60, 60, 60 }; +bool dbengine_enabled = false; // will become true if and when dbengine is initialized +size_t storage_tiers = 3; +size_t storage_tiers_grouping_iterations[RRD_STORAGE_TIERS] = { 1, 60, 60, 60, 60 }; RRD_BACKFILL storage_tiers_backfill[RRD_STORAGE_TIERS] = { RRD_BACKFILL_NEW, RRD_BACKFILL_NEW, RRD_BACKFILL_NEW, RRD_BACKFILL_NEW, RRD_BACKFILL_NEW }; #if RRD_STORAGE_TIERS != 5 #error RRD_STORAGE_TIERS is not 5 - you need to update the grouping iterations per tier #endif -int get_tier_grouping(int tier) { +size_t get_tier_grouping(size_t tier) { if(unlikely(tier >= storage_tiers)) tier = storage_tiers - 1; - if(unlikely(tier < 0)) tier = 0; - int grouping = 1; + size_t grouping = 1; // first tier is always 1 iteration of whatever update every the chart has - for(int i = 1; i <= tier ;i++) + for(size_t i = 1; i <= tier ;i++) grouping *= storage_tiers_grouping_iterations[i]; return grouping; @@ -32,7 +32,7 @@ time_t rrdhost_free_orphan_time = 3600; bool is_storage_engine_shared(STORAGE_INSTANCE *engine) { #ifdef ENABLE_DBENGINE - for(int tier = 0; tier < storage_tiers ;tier++) { + for(size_t tier = 0; tier < storage_tiers ;tier++) { if (engine == (STORAGE_INSTANCE *)multidb_ctx[tier]) return true; } @@ -43,107 +43,144 @@ bool is_storage_engine_shared(STORAGE_INSTANCE *engine) { // ---------------------------------------------------------------------------- -// RRDHOST index +// RRDHOST indexes management -int rrdhost_compare(void* a, void* b) { - if(((RRDHOST *)a)->hash_machine_guid < ((RRDHOST *)b)->hash_machine_guid) return -1; - else if(((RRDHOST *)a)->hash_machine_guid > ((RRDHOST *)b)->hash_machine_guid) return 1; - else return strcmp(((RRDHOST *)a)->machine_guid, ((RRDHOST *)b)->machine_guid); +DICTIONARY *rrdhost_root_index = NULL; +static DICTIONARY *rrdhost_root_index_hostname = NULL; + +static inline void rrdhost_init() { + if(unlikely(!rrdhost_root_index)) { + rrdhost_root_index = dictionary_create( + DICT_OPTION_NAME_LINK_DONT_CLONE | DICT_OPTION_VALUE_LINK_DONT_CLONE | DICT_OPTION_DONT_OVERWRITE_VALUE); + } + + if(unlikely(!rrdhost_root_index_hostname)) { + rrdhost_root_index_hostname = dictionary_create( + DICT_OPTION_NAME_LINK_DONT_CLONE | DICT_OPTION_VALUE_LINK_DONT_CLONE | DICT_OPTION_DONT_OVERWRITE_VALUE); + } } -avl_tree_lock rrdhost_root_index = { - .avl_tree = { NULL, rrdhost_compare }, - .rwlock = AVL_LOCK_INITIALIZER -}; +// ---------------------------------------------------------------------------- +// RRDHOST index by UUID + +inline long rrdhost_hosts_available(void) { + return dictionary_entries(rrdhost_root_index); +} -RRDHOST *rrdhost_find_by_guid(const char *guid, uint32_t hash) { - debug(D_RRDHOST, "Searching in index for host with guid '%s'", guid); +inline RRDHOST *rrdhost_find_by_guid(const char *guid) { + return dictionary_get(rrdhost_root_index, guid); +} - RRDHOST tmp; - strncpyz(tmp.machine_guid, guid, GUID_LEN); - tmp.hash_machine_guid = (hash)?hash:simple_hash(tmp.machine_guid); +static inline RRDHOST *rrdhost_index_add_by_guid(RRDHOST *host) { + RRDHOST *ret_machine_guid = dictionary_set(rrdhost_root_index, host->machine_guid, host, sizeof(RRDHOST)); + if(ret_machine_guid == host) + rrdhost_option_set(host, RRDHOST_OPTION_INDEXED_MACHINE_GUID); + else { + rrdhost_option_clear(host, RRDHOST_OPTION_INDEXED_MACHINE_GUID); + error("RRDHOST: %s() host with machine guid '%s' is already indexed", __FUNCTION__, host->machine_guid); + } - return (RRDHOST *)avl_search_lock(&(rrdhost_root_index), (avl_t *) &tmp); + return host; +} + +static void rrdhost_index_del_by_guid(RRDHOST *host) { + if(rrdhost_option_check(host, RRDHOST_OPTION_INDEXED_MACHINE_GUID)) { + if(!dictionary_del(rrdhost_root_index, host->machine_guid)) + error("RRDHOST: %s() failed to delete machine guid '%s' from index", __FUNCTION__, host->machine_guid); + + rrdhost_option_clear(host, RRDHOST_OPTION_INDEXED_MACHINE_GUID); + } } -RRDHOST *rrdhost_find_by_hostname(const char *hostname, uint32_t hash) { +// ---------------------------------------------------------------------------- +// RRDHOST index by hostname + +inline RRDHOST *rrdhost_find_by_hostname(const char *hostname) { if(unlikely(!strcmp(hostname, "localhost"))) return localhost; - if(unlikely(!hash)) hash = simple_hash(hostname); + return dictionary_get(rrdhost_root_index_hostname, hostname); +} - rrd_rdlock(); - RRDHOST *host; - rrdhost_foreach_read(host) { - if(unlikely((hash == host->hash_hostname && !strcmp(hostname, host->hostname)))) { - rrd_unlock(); - return host; - } +static inline RRDHOST *rrdhost_index_add_hostname(RRDHOST *host) { + if(!host->hostname) return host; + + RRDHOST *ret_hostname = dictionary_set(rrdhost_root_index_hostname, rrdhost_hostname(host), host, sizeof(RRDHOST)); + if(ret_hostname == host) + rrdhost_option_set(host, RRDHOST_OPTION_INDEXED_HOSTNAME); + else { + rrdhost_option_clear(host, RRDHOST_OPTION_INDEXED_HOSTNAME); + error("RRDHOST: %s() host with hostname '%s' is already indexed", __FUNCTION__, rrdhost_hostname(host)); } - rrd_unlock(); - return NULL; + return host; } -#define rrdhost_index_add(rrdhost) (RRDHOST *)avl_insert_lock(&(rrdhost_root_index), (avl_t *)(rrdhost)) -#define rrdhost_index_del(rrdhost) (RRDHOST *)avl_remove_lock(&(rrdhost_root_index), (avl_t *)(rrdhost)) +static inline void rrdhost_index_del_hostname(RRDHOST *host) { + if(unlikely(!host->hostname)) return; + + if(rrdhost_option_check(host, RRDHOST_OPTION_INDEXED_HOSTNAME)) { + if(!dictionary_del(rrdhost_root_index_hostname, rrdhost_hostname(host))) + error("RRDHOST: %s() failed to delete hostname '%s' from index", __FUNCTION__, rrdhost_hostname(host)); + rrdhost_option_clear(host, RRDHOST_OPTION_INDEXED_HOSTNAME); + } +} // ---------------------------------------------------------------------------- // RRDHOST - internal helpers static inline void rrdhost_init_tags(RRDHOST *host, const char *tags) { - if(host->tags && tags && !strcmp(host->tags, tags)) + if(host->tags && tags && !strcmp(rrdhost_tags(host), tags)) return; - void *old = (void *)host->tags; - host->tags = (tags && *tags)?strdupz(tags):NULL; - freez(old); + STRING *old = host->tags; + host->tags = string_strdupz((tags && *tags)?tags:NULL); + string_freez(old); } static inline void rrdhost_init_hostname(RRDHOST *host, const char *hostname) { - if(host->hostname && hostname && !strcmp(host->hostname, hostname)) + if(unlikely(hostname && !*hostname)) hostname = NULL; + + if(host->hostname && hostname && !strcmp(rrdhost_hostname(host), hostname)) return; - void *old = host->hostname; - host->hostname = strdupz(hostname?hostname:"localhost"); - host->hash_hostname = simple_hash(host->hostname); - freez(old); + rrdhost_index_del_hostname(host); + + STRING *old = host->hostname; + host->hostname = string_strdupz(hostname?hostname:"localhost"); + string_freez(old); + + rrdhost_index_add_hostname(host); } static inline void rrdhost_init_os(RRDHOST *host, const char *os) { - if(host->os && os && !strcmp(host->os, os)) + if(host->os && os && !strcmp(rrdhost_os(host), os)) return; - void *old = (void *)host->os; - host->os = strdupz(os?os:"unknown"); - freez(old); + STRING *old = host->os; + host->os = string_strdupz(os?os:"unknown"); + string_freez(old); } static inline void rrdhost_init_timezone(RRDHOST *host, const char *timezone, const char *abbrev_timezone, int32_t utc_offset) { - if (host->timezone && timezone && !strcmp(host->timezone, timezone) && host->abbrev_timezone && abbrev_timezone && - !strcmp(host->abbrev_timezone, abbrev_timezone) && host->utc_offset == utc_offset) + if (host->timezone && timezone && !strcmp(rrdhost_timezone(host), timezone) && host->abbrev_timezone && abbrev_timezone && + !strcmp(rrdhost_abbrev_timezone(host), abbrev_timezone) && host->utc_offset == utc_offset) return; - void *old = (void *)host->timezone; - host->timezone = strdupz((timezone && *timezone)?timezone:"unknown"); - freez(old); + STRING *old = host->timezone; + host->timezone = string_strdupz((timezone && *timezone)?timezone:"unknown"); + string_freez(old); old = (void *)host->abbrev_timezone; - host->abbrev_timezone = strdupz((abbrev_timezone && *abbrev_timezone) ? abbrev_timezone : "UTC"); - freez(old); + host->abbrev_timezone = string_strdupz((abbrev_timezone && *abbrev_timezone) ? abbrev_timezone : "UTC"); + string_freez(old); host->utc_offset = utc_offset; } -static inline void rrdhost_init_machine_guid(RRDHOST *host, const char *machine_guid) { - strncpy(host->machine_guid, machine_guid, GUID_LEN); - host->machine_guid[GUID_LEN] = '\0'; - host->hash_machine_guid = simple_hash(host->machine_guid); -} - -void set_host_properties(RRDHOST *host, int update_every, RRD_MEMORY_MODE memory_mode, const char *hostname, - const char *registry_hostname, const char *guid, const char *os, const char *tags, +void set_host_properties(RRDHOST *host, int update_every, RRD_MEMORY_MODE memory_mode, + const char *registry_hostname, const char *os, const char *tags, const char *tzone, const char *abbrev_tzone, int32_t utc_offset, const char *program_name, const char *program_version) { @@ -151,23 +188,48 @@ void set_host_properties(RRDHOST *host, int update_every, RRD_MEMORY_MODE memory host->rrd_update_every = update_every; host->rrd_memory_mode = memory_mode; - rrdhost_init_hostname(host, hostname); - - rrdhost_init_machine_guid(host, guid); - rrdhost_init_os(host, os); rrdhost_init_timezone(host, tzone, abbrev_tzone, utc_offset); rrdhost_init_tags(host, tags); - host->program_name = strdupz((program_name && *program_name) ? program_name : "unknown"); - host->program_version = strdupz((program_version && *program_version) ? program_version : "unknown"); - - host->registry_hostname = strdupz((registry_hostname && *registry_hostname) ? registry_hostname : host->hostname); + host->program_name = string_strdupz((program_name && *program_name) ? program_name : "unknown"); + host->program_version = string_strdupz((program_version && *program_version) ? program_version : "unknown"); + host->registry_hostname = string_strdupz((registry_hostname && *registry_hostname) ? registry_hostname : rrdhost_hostname(host)); } // ---------------------------------------------------------------------------- // RRDHOST - add a host +static void rrdhost_initialize_rrdpush_sender(RRDHOST *host, + unsigned int rrdpush_enabled, + char *rrdpush_destination, + char *rrdpush_api_key, + char *rrdpush_send_charts_matching +) { + if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED)) return; + + if(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) { + rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED); + + sender_init(host); + +#ifdef ENABLE_HTTPS + host->sender->ssl.conn = NULL; + host->sender->ssl.flags = NETDATA_SSL_START; +#endif + + host->rrdpush_send_destination = strdupz(rrdpush_destination); + rrdpush_destinations_init(host); + + host->rrdpush_send_api_key = strdupz(rrdpush_api_key); + host->rrdpush_send_charts_matching = simple_pattern_create(rrdpush_send_charts_matching, NULL, SIMPLE_PATTERN_EXACT); + + rrdhost_option_set(host, RRDHOST_OPTION_SENDER_ENABLED); + } + else + rrdhost_option_clear(host, RRDHOST_OPTION_SENDER_ENABLED); +} + RRDHOST *rrdhost_create(const char *hostname, const char *registry_hostname, const char *guid, @@ -186,188 +248,136 @@ RRDHOST *rrdhost_create(const char *hostname, char *rrdpush_destination, char *rrdpush_api_key, char *rrdpush_send_charts_matching, + bool rrdpush_enable_replication, + time_t rrdpush_seconds_to_replicate, + time_t rrdpush_replication_step, struct rrdhost_system_info *system_info, int is_localhost, bool archived ) { debug(D_RRDHOST, "Host '%s': adding with guid '%s'", hostname, guid); + rrd_check_wrlock(); + + if(memory_mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled) { + error("memory mode 'dbengine' is not enabled, but host '%s' is configured for it. Falling back to 'alloc'", hostname); + memory_mode = RRD_MEMORY_MODE_ALLOC; + } + #ifdef ENABLE_DBENGINE int is_legacy = (memory_mode == RRD_MEMORY_MODE_DBENGINE) && is_legacy_child(guid); #else - int is_legacy = 1; +int is_legacy = 1; #endif - rrd_check_wrlock(); int is_in_multihost = (memory_mode == RRD_MEMORY_MODE_DBENGINE && !is_legacy); RRDHOST *host = callocz(1, sizeof(RRDHOST)); - set_host_properties(host, (update_every > 0)?update_every:1, memory_mode, hostname, registry_hostname, guid, os, + strncpyz(host->machine_guid, guid, GUID_LEN + 1); + + set_host_properties(host, (update_every > 0)?update_every:1, memory_mode, registry_hostname, os, tags, timezone, abbrev_timezone, utc_offset, program_name, program_version); + rrdhost_init_hostname(host, hostname); + host->rrd_history_entries = align_entries_to_pagesize(memory_mode, entries); host->health_enabled = ((memory_mode == RRD_MEMORY_MODE_NONE)) ? 0 : health_enabled; - sender_init(host); - netdata_mutex_init(&host->receiver_lock); - - host->rrdpush_send_enabled = (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) ? 1 : 0; - host->rrdpush_send_destination = (host->rrdpush_send_enabled)?strdupz(rrdpush_destination):NULL; - if (host->rrdpush_send_destination) - host->destinations = destinations_init(host->rrdpush_send_destination); - host->rrdpush_send_api_key = (host->rrdpush_send_enabled)?strdupz(rrdpush_api_key):NULL; - host->rrdpush_send_charts_matching = simple_pattern_create(rrdpush_send_charts_matching, NULL, SIMPLE_PATTERN_EXACT); + if (likely(!archived)) { + rrdfunctions_init(host); + host->rrdlabels = rrdlabels_create(); + rrdhost_initialize_rrdpush_sender( + host, rrdpush_enabled, rrdpush_destination, rrdpush_api_key, rrdpush_send_charts_matching); + } - host->rrdpush_sender_pipe[0] = -1; - host->rrdpush_sender_pipe[1] = -1; - host->rrdpush_sender_socket = -1; + if(rrdpush_enable_replication) + rrdhost_option_set(host, RRDHOST_OPTION_REPLICATION); + else + rrdhost_option_clear(host, RRDHOST_OPTION_REPLICATION); + + host->rrdpush_seconds_to_replicate = rrdpush_seconds_to_replicate; + host->rrdpush_replication_step = rrdpush_replication_step; + + switch(memory_mode) { + default: + case RRD_MEMORY_MODE_ALLOC: + case RRD_MEMORY_MODE_MAP: + case RRD_MEMORY_MODE_SAVE: + case RRD_MEMORY_MODE_RAM: + if(host->rrdpush_seconds_to_replicate > host->rrd_history_entries * host->rrd_update_every) + host->rrdpush_seconds_to_replicate = host->rrd_history_entries * host->rrd_update_every; + break; - //host->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION; Unused? -#ifdef ENABLE_HTTPS - host->ssl.conn = NULL; - host->ssl.flags = NETDATA_SSL_START; - host->stream_ssl.conn = NULL; - host->stream_ssl.flags = NETDATA_SSL_START; -#endif + case RRD_MEMORY_MODE_DBENGINE: + break; + } netdata_rwlock_init(&host->rrdhost_rwlock); - host->host_labels = rrdlabels_create(); - netdata_mutex_init(&host->aclk_state_lock); + netdata_mutex_init(&host->receiver_lock); host->system_info = system_info; - avl_init_lock(&(host->rrdset_root_index), rrdset_compare); - avl_init_lock(&(host->rrdset_root_index_name), rrdset_compare_name); - avl_init_lock(&(host->rrdfamily_root_index), rrdfamily_compare); - avl_init_lock(&(host->rrdvar_root_index), rrdvar_compare); + rrdset_index_init(host); if(config_get_boolean(CONFIG_SECTION_DB, "delete obsolete charts files", 1)) - rrdhost_flag_set(host, RRDHOST_FLAG_DELETE_OBSOLETE_CHARTS); + rrdhost_option_set(host, RRDHOST_OPTION_DELETE_OBSOLETE_CHARTS); if(config_get_boolean(CONFIG_SECTION_DB, "delete orphan hosts files", 1) && !is_localhost) - rrdhost_flag_set(host, RRDHOST_FLAG_DELETE_ORPHAN_HOST); - - host->health_default_warn_repeat_every = config_get_duration(CONFIG_SECTION_HEALTH, "default repeat warning", "never"); - host->health_default_crit_repeat_every = config_get_duration(CONFIG_SECTION_HEALTH, "default repeat critical", "never"); - avl_init_lock(&(host->alarms_idx_health_log), alarm_compare_id); - avl_init_lock(&(host->alarms_idx_name), alarm_compare_name); - - // ------------------------------------------------------------------------ - // initialize health variables - - host->health_log.next_log_id = 1; - host->health_log.next_alarm_id = 1; - host->health_log.max = 1000; - host->health_log.next_log_id = (uint32_t)now_realtime_sec(); - host->health_log.next_alarm_id = 0; - - long n = config_get_number(CONFIG_SECTION_HEALTH, "in memory max health log entries", host->health_log.max); - if(n < 10) { - error("Host '%s': health configuration has invalid max log entries %ld. Using default %u", host->hostname, n, host->health_log.max); - config_set_number(CONFIG_SECTION_HEALTH, "in memory max health log entries", (long)host->health_log.max); - } - else - host->health_log.max = (unsigned int)n; - - netdata_rwlock_init(&host->health_log.alarm_log_rwlock); + rrdhost_option_set(host, RRDHOST_OPTION_DELETE_ORPHAN_HOST); char filename[FILENAME_MAX + 1]; - if(is_localhost) { - host->cache_dir = strdupz(netdata_configured_cache_dir); host->varlib_dir = strdupz(netdata_configured_varlib_dir); - } else { // this is not localhost - append our GUID to localhost path if (is_in_multihost) { // don't append to cache dir in multihost host->cache_dir = strdupz(netdata_configured_cache_dir); - } else { + } + else { snprintfz(filename, FILENAME_MAX, "%s/%s", netdata_configured_cache_dir, host->machine_guid); host->cache_dir = strdupz(filename); } - if((host->rrd_memory_mode == RRD_MEMORY_MODE_MAP || host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || ( - host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && is_legacy))) { + if((host->rrd_memory_mode == RRD_MEMORY_MODE_MAP || host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || + (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && is_legacy))) { int r = mkdir(host->cache_dir, 0775); if(r != 0 && errno != EEXIST) - error("Host '%s': cannot create directory '%s'", host->hostname, host->cache_dir); + error("Host '%s': cannot create directory '%s'", rrdhost_hostname(host), host->cache_dir); } snprintfz(filename, FILENAME_MAX, "%s/%s", netdata_configured_varlib_dir, host->machine_guid); host->varlib_dir = strdupz(filename); - - if(host->health_enabled) { - int r = mkdir(host->varlib_dir, 0775); - if(r != 0 && errno != EEXIST) - error("Host '%s': cannot create directory '%s'", host->hostname, host->varlib_dir); - } - - } - - if(host->health_enabled) { - snprintfz(filename, FILENAME_MAX, "%s/health", host->varlib_dir); - int r = mkdir(filename, 0775); - if(r != 0 && errno != EEXIST) - error("Host '%s': cannot create directory '%s'", host->hostname, filename); - } - - snprintfz(filename, FILENAME_MAX, "%s/health/health-log.db", host->varlib_dir); - host->health_log_filename = strdupz(filename); - - snprintfz(filename, FILENAME_MAX, "%s/alarm-notify.sh", netdata_configured_primary_plugins_dir); - host->health_default_exec = strdupz(config_get(CONFIG_SECTION_HEALTH, "script to execute on alarm", filename)); - host->health_default_recipient = strdupz("root"); - - - // ------------------------------------------------------------------------ - // load health configuration - - if(host->health_enabled) { - rrdhost_wrlock(host); - health_readdir(host, health_user_config_dir(), health_stock_config_dir(), NULL); - rrdhost_unlock(host); } - RRDHOST *t = rrdhost_index_add(host); + // this is also needed for custom host variables - not only health + if(!host->rrdvars) + host->rrdvars = rrdvariables_create(); + RRDHOST *t = rrdhost_index_add_by_guid(host); if(t != host) { - error("Host '%s': cannot add host with machine guid '%s' to index. It already exists as host '%s' with machine guid '%s'.", host->hostname, host->machine_guid, t->hostname, t->machine_guid); + error("Host '%s': cannot add host with machine guid '%s' to index. It already exists as host '%s' with machine guid '%s'.", rrdhost_hostname(host), host->machine_guid, rrdhost_hostname(t), t->machine_guid); rrdhost_free(host, 1); return NULL; } if (likely(!uuid_parse(host->machine_guid, host->host_uuid))) { - int rc; - if (!archived) { - rc = sql_store_host_info(host); - if (unlikely(rc)) - error_report("Failed to store machine GUID to the database"); - } + if(!archived) + metaqueue_host_update_info(host->machine_guid); sql_load_node_id(host); - if (host->health_enabled) { - if (!file_is_migrated(host->health_log_filename)) { - rc = sql_create_health_log_table(host); - if (unlikely(rc)) { - error_report("Failed to create health log table in the database"); - health_alarm_log_load(host); - health_alarm_log_open(host); - } - else { - health_alarm_log_load(host); - add_migrated_file(host->health_log_filename, 0); - } - } else { - sql_create_health_log_table(host); - sql_health_alarm_log_load(host); - } - } } else error_report("Host machine GUID %s is not valid", host->machine_guid); + rrdfamily_index_init(host); + rrdcalctemplate_index_init(host); + rrdcalc_rrdhost_index_init(host); + + if (health_enabled) + health_thread_spawn(host); + if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) { #ifdef ENABLE_DBENGINE char dbenginepath[FILENAME_MAX + 1]; @@ -376,14 +386,18 @@ RRDHOST *rrdhost_create(const char *hostname, snprintfz(dbenginepath, FILENAME_MAX, "%s/dbengine", host->cache_dir); ret = mkdir(dbenginepath, 0775); if (ret != 0 && errno != EEXIST) - error("Host '%s': cannot create directory '%s'", host->hostname, dbenginepath); + error("Host '%s': cannot create directory '%s'", rrdhost_hostname(host), dbenginepath); else ret = 0; // succeed if (is_legacy) { // initialize legacy dbengine instance as needed + host->db[0].mode = RRD_MEMORY_MODE_DBENGINE; + host->db[0].eng = storage_engine_get(host->db[0].mode); + host->db[0].tier_grouping = get_tier_grouping(0); + ret = rrdeng_init( host, - (struct rrdengine_instance **)&host->storage_instance[0], + (struct rrdengine_instance **)&host->db[0].instance, dbenginepath, default_rrdeng_page_cache_mb, default_rrdeng_disk_quota_mb, @@ -392,18 +406,26 @@ RRDHOST *rrdhost_create(const char *hostname, if(ret == 0) { // assign the rest of the shared storage instances to it // to allow them collect its metrics too - for(int tier = 1; tier < storage_tiers ; tier++) - host->storage_instance[tier] = (STORAGE_INSTANCE *)multidb_ctx[tier]; + for(size_t tier = 1; tier < storage_tiers ; tier++) { + host->db[tier].mode = RRD_MEMORY_MODE_DBENGINE; + host->db[tier].eng = storage_engine_get(host->db[tier].mode); + host->db[tier].instance = (STORAGE_INSTANCE *) multidb_ctx[tier]; + host->db[tier].tier_grouping = get_tier_grouping(tier); + } } } else { - for(int tier = 0; tier < storage_tiers ; tier++) - host->storage_instance[tier] = (STORAGE_INSTANCE *)multidb_ctx[tier]; + for(size_t tier = 0; tier < storage_tiers ; tier++) { + host->db[tier].mode = RRD_MEMORY_MODE_DBENGINE; + host->db[tier].eng = storage_engine_get(host->db[tier].mode); + host->db[tier].instance = (STORAGE_INSTANCE *)multidb_ctx[tier]; + host->db[tier].tier_grouping = get_tier_grouping(tier); + } } if (ret) { // check legacy or multihost initialization success error( "Host '%s': cannot initialize host with machine guid '%s'. Failed to initialize DB engine at '%s'.", - host->hostname, host->machine_guid, host->cache_dir); + rrdhost_hostname(host), host->machine_guid, host->cache_dir); rrdhost_free(host, 1); host = NULL; //rrd_hosts_available++; //TODO: maybe we want this? @@ -416,27 +438,29 @@ RRDHOST *rrdhost_create(const char *hostname, #endif } else { + host->db[0].mode = host->rrd_memory_mode; + host->db[0].eng = storage_engine_get(host->db[0].mode); + host->db[0].instance = NULL; + host->db[0].tier_grouping = get_tier_grouping(0); + #ifdef ENABLE_DBENGINE // the first tier is reserved for the non-dbengine modes - for(int tier = 1; tier < storage_tiers ; tier++) - host->storage_instance[tier] = (STORAGE_INSTANCE *)multidb_ctx[tier]; + for(size_t tier = 1; tier < storage_tiers ; tier++) { + host->db[tier].mode = RRD_MEMORY_MODE_DBENGINE; + host->db[tier].eng = storage_engine_get(host->db[tier].mode); + host->db[tier].instance = (STORAGE_INSTANCE *) multidb_ctx[tier]; + host->db[tier].tier_grouping = get_tier_grouping(tier); + } #endif } // ------------------------------------------------------------------------ // link it and add it to the index - if(is_localhost) { - host->next = localhost; - localhost = host; - } - else { - if(localhost) { - host->next = localhost->next; - localhost->next = host; - } - else localhost = host; - } + if(is_localhost) + DOUBLE_LINKED_LIST_PREPEND_UNSAFE(localhost, host, prev, next); + else + DOUBLE_LINKED_LIST_APPEND_UNSAFE(localhost, host, prev, next); // ------------------------------------------------------------------------ // init new ML host and update system_info to let upstreams know @@ -466,28 +490,29 @@ RRDHOST *rrdhost_create(const char *hostname, ", health_log '%s'" ", alarms default handler '%s'" ", alarms default recipient '%s'" - , host->hostname - , host->registry_hostname + , rrdhost_hostname(host) + , rrdhost_registry_hostname(host) , host->machine_guid - , host->os - , host->timezone - , (host->tags)?host->tags:"" - , host->program_name - , host->program_version + , rrdhost_os(host) + , rrdhost_timezone(host) + , rrdhost_tags(host) + , rrdhost_program_name(host) + , rrdhost_program_version(host) , host->rrd_update_every , rrd_memory_mode_name(host->rrd_memory_mode) , host->rrd_history_entries - , host->rrdpush_send_enabled?"enabled":"disabled" + , rrdhost_has_rrdpush_sender_enabled(host)?"enabled":"disabled" , host->rrdpush_send_destination?host->rrdpush_send_destination:"" , host->rrdpush_send_api_key?host->rrdpush_send_api_key:"" , host->health_enabled?"enabled":"disabled" , host->cache_dir , host->varlib_dir , host->health_log_filename - , host->health_default_exec - , host->health_default_recipient + , string2str(host->health_default_exec) + , string2str(host->health_default_recipient) ); - sql_store_host_system_info(&host->host_uuid, system_info); + if(!archived) + metaqueue_host_update_system_info(host); rrd_hosts_available++; @@ -496,6 +521,8 @@ RRDHOST *rrdhost_create(const char *hostname, ml_new_host(host); else rrdhost_flag_set(host, RRDHOST_FLAG_ARCHIVED); + + return host; } @@ -518,113 +545,97 @@ void rrdhost_update(RRDHOST *host , char *rrdpush_destination , char *rrdpush_api_key , char *rrdpush_send_charts_matching + , bool rrdpush_enable_replication + , time_t rrdpush_seconds_to_replicate + , time_t rrdpush_replication_step , struct rrdhost_system_info *system_info ) { UNUSED(guid); - UNUSED(rrdpush_enabled); - UNUSED(rrdpush_destination); - UNUSED(rrdpush_api_key); - UNUSED(rrdpush_send_charts_matching); host->health_enabled = (mode == RRD_MEMORY_MODE_NONE) ? 0 : health_enabled; - //host->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION; Unused? rrdhost_system_info_free(host->system_info); host->system_info = system_info; - sql_store_host_system_info(&host->host_uuid, system_info); + metaqueue_host_update_system_info(host); rrdhost_init_os(host, os); rrdhost_init_timezone(host, timezone, abbrev_timezone, utc_offset); - freez(host->registry_hostname); - host->registry_hostname = strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname); + string_freez(host->registry_hostname); + host->registry_hostname = string_strdupz((registry_hostname && *registry_hostname)?registry_hostname:hostname); - if(strcmp(host->hostname, hostname) != 0) { - info("Host '%s' has been renamed to '%s'. If this is not intentional it may mean multiple hosts are using the same machine_guid.", host->hostname, hostname); - char *t = host->hostname; - host->hostname = strdupz(hostname); - host->hash_hostname = simple_hash(host->hostname); - freez(t); + if(strcmp(rrdhost_hostname(host), hostname) != 0) { + info("Host '%s' has been renamed to '%s'. If this is not intentional it may mean multiple hosts are using the same machine_guid.", rrdhost_hostname(host), hostname); + rrdhost_init_hostname(host, hostname); } - if(strcmp(host->program_name, program_name) != 0) { - info("Host '%s' switched program name from '%s' to '%s'", host->hostname, host->program_name, program_name); - char *t = host->program_name; - host->program_name = strdupz(program_name); - freez(t); + if(strcmp(rrdhost_program_name(host), program_name) != 0) { + info("Host '%s' switched program name from '%s' to '%s'", rrdhost_hostname(host), rrdhost_program_name(host), program_name); + STRING *t = host->program_name; + host->program_name = string_strdupz(program_name); + string_freez(t); } - if(strcmp(host->program_version, program_version) != 0) { - info("Host '%s' switched program version from '%s' to '%s'", host->hostname, host->program_version, program_version); - char *t = host->program_version; - host->program_version = strdupz(program_version); - freez(t); + if(strcmp(rrdhost_program_version(host), program_version) != 0) { + info("Host '%s' switched program version from '%s' to '%s'", rrdhost_hostname(host), rrdhost_program_version(host), program_version); + STRING *t = host->program_version; + host->program_version = string_strdupz(program_version); + string_freez(t); } if(host->rrd_update_every != update_every) - error("Host '%s' has an update frequency of %d seconds, but the wanted one is %d seconds. Restart netdata here to apply the new settings.", host->hostname, host->rrd_update_every, update_every); - - if(host->rrd_history_entries < history) - error("Host '%s' has history of %ld entries, but the wanted one is %ld entries. Restart netdata here to apply the new settings.", host->hostname, host->rrd_history_entries, history); + error("Host '%s' has an update frequency of %d seconds, but the wanted one is %d seconds. Restart netdata here to apply the new settings.", rrdhost_hostname(host), host->rrd_update_every, update_every); if(host->rrd_memory_mode != mode) - error("Host '%s' has memory mode '%s', but the wanted one is '%s'. Restart netdata here to apply the new settings.", host->hostname, rrd_memory_mode_name(host->rrd_memory_mode), rrd_memory_mode_name(mode)); + error("Host '%s' has memory mode '%s', but the wanted one is '%s'. Restart netdata here to apply the new settings.", rrdhost_hostname(host), rrd_memory_mode_name(host->rrd_memory_mode), rrd_memory_mode_name(mode)); + + else if(host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE && host->rrd_history_entries < history) + error("Host '%s' has history of %ld entries, but the wanted one is %ld entries. Restart netdata here to apply the new settings.", rrdhost_hostname(host), host->rrd_history_entries, history); // update host tags rrdhost_init_tags(host, tags); + if(!host->rrdvars) + host->rrdvars = rrdvariables_create(); + if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) { rrdhost_flag_clear(host, RRDHOST_FLAG_ARCHIVED); - host->rrdpush_send_enabled = (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) ? 1 : 0; - host->rrdpush_send_destination = (host->rrdpush_send_enabled)?strdupz(rrdpush_destination):NULL; - if (host->rrdpush_send_destination) - host->destinations = destinations_init(host->rrdpush_send_destination); - host->rrdpush_send_api_key = (host->rrdpush_send_enabled)?strdupz(rrdpush_api_key):NULL; - host->rrdpush_send_charts_matching = simple_pattern_create(rrdpush_send_charts_matching, NULL, SIMPLE_PATTERN_EXACT); + rrdfunctions_init(host); - if(host->health_enabled) { - int r; - char filename[FILENAME_MAX + 1]; + if(!host->rrdlabels) + host->rrdlabels = rrdlabels_create(); + + if (!host->rrdset_root_index) + rrdset_index_init(host); + + rrdhost_initialize_rrdpush_sender(host, + rrdpush_enabled, + rrdpush_destination, + rrdpush_api_key, + rrdpush_send_charts_matching); + + rrdfamily_index_init(host); + rrdcalctemplate_index_init(host); + rrdcalc_rrdhost_index_init(host); + + if(rrdpush_enable_replication) + rrdhost_option_set(host, RRDHOST_OPTION_REPLICATION); + else + rrdhost_option_clear(host, RRDHOST_OPTION_REPLICATION); + + host->rrdpush_seconds_to_replicate = rrdpush_seconds_to_replicate; + host->rrdpush_replication_step = rrdpush_replication_step; - if (host != localhost) { - r = mkdir(host->varlib_dir, 0775); - if (r != 0 && errno != EEXIST) - error("Host '%s': cannot create directory '%s'", host->hostname, host->varlib_dir); - } - snprintfz(filename, FILENAME_MAX, "%s/health", host->varlib_dir); - r = mkdir(filename, 0775); - if(r != 0 && errno != EEXIST) - error("Host '%s': cannot create directory '%s'", host->hostname, filename); - - rrdhost_wrlock(host); - health_readdir(host, health_user_config_dir(), health_stock_config_dir(), NULL); - rrdhost_unlock(host); - - if (!file_is_migrated(host->health_log_filename)) { - int rc = sql_create_health_log_table(host); - if (unlikely(rc)) { - error_report("Failed to create health log table in the database"); - - health_alarm_log_load(host); - health_alarm_log_open(host); - } else { - health_alarm_log_load(host); - add_migrated_file(host->health_log_filename, 0); - } - } else { - sql_create_health_log_table(host); - sql_health_alarm_log_load(host); - } - } rrd_hosts_available++; ml_new_host(host); rrdhost_load_rrdcontext_data(host); - info("Host %s is not in archived mode anymore", host->hostname); + info("Host %s is not in archived mode anymore", rrdhost_hostname(host)); } - return; + if (health_enabled) + health_thread_spawn(host); } RRDHOST *rrdhost_find_or_create( @@ -646,17 +657,20 @@ RRDHOST *rrdhost_find_or_create( , char *rrdpush_destination , char *rrdpush_api_key , char *rrdpush_send_charts_matching + , bool rrdpush_enable_replication + , time_t rrdpush_seconds_to_replicate + , time_t rrdpush_replication_step , struct rrdhost_system_info *system_info , bool archived ) { debug(D_RRDHOST, "Searching for host '%s' with guid '%s'", hostname, guid); rrd_wrlock(); - RRDHOST *host = rrdhost_find_by_guid(guid, 0); - if (unlikely(host && RRD_MEMORY_MODE_DBENGINE != mode && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) { + RRDHOST *host = rrdhost_find_by_guid(guid); + if (unlikely(host && host->rrd_memory_mode != mode && rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))) { /* If a legacy memory mode instantiates all dbengine state must be discarded to avoid inconsistencies */ error("Archived host '%s' has memory mode '%s', but the wanted one is '%s'. Discarding archived state.", - host->hostname, rrd_memory_mode_name(host->rrd_memory_mode), rrd_memory_mode_name(mode)); + rrdhost_hostname(host), rrd_memory_mode_name(host->rrd_memory_mode), rrd_memory_mode_name(mode)); rrdhost_free(host, 1); host = NULL; } @@ -680,6 +694,9 @@ RRDHOST *rrdhost_find_or_create( , rrdpush_destination , rrdpush_api_key , rrdpush_send_charts_matching + , rrdpush_enable_replication + , rrdpush_seconds_to_replicate + , rrdpush_replication_step , system_info , 0 , archived @@ -705,6 +722,9 @@ RRDHOST *rrdhost_find_or_create( , rrdpush_destination , rrdpush_api_key , rrdpush_send_charts_matching + , rrdpush_enable_replication + , rrdpush_seconds_to_replicate + , rrdpush_replication_step , system_info); } if (host) { @@ -721,6 +741,8 @@ RRDHOST *rrdhost_find_or_create( inline int rrdhost_should_be_removed(RRDHOST *host, RRDHOST *protected_host, time_t now) { if(host != protected_host && host != localhost + && rrdhost_receiver_replicating_charts(host) == 0 + && rrdhost_sender_replicating_charts(host) == 0 && rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN) && !rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) && !host->receiver @@ -731,50 +753,10 @@ inline int rrdhost_should_be_removed(RRDHOST *host, RRDHOST *protected_host, tim return 0; } -void rrdhost_cleanup_orphan_hosts_nolock(RRDHOST *protected_host) { - time_t now = now_realtime_sec(); - - RRDHOST *host; - -restart_after_removal: - rrdhost_foreach_write(host) { - if(rrdhost_should_be_removed(host, protected_host, now)) { - info("Host '%s' with machine guid '%s' is obsolete - cleaning up.", host->hostname, host->machine_guid); - - if (rrdhost_flag_check(host, RRDHOST_FLAG_DELETE_ORPHAN_HOST) -#ifdef ENABLE_DBENGINE - /* don't delete multi-host DB host files */ - && !(host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && is_storage_engine_shared(host->storage_instance[0])) -#endif - ) - rrdhost_delete_charts(host); - else - rrdhost_save_charts(host); - - rrdhost_free(host, 0); - goto restart_after_removal; - } - } -} - // ---------------------------------------------------------------------------- // RRDHOST global / startup initialization -int rrd_init(char *hostname, struct rrdhost_system_info *system_info) { - - if (unlikely(sql_init_database(DB_CHECK_NONE, system_info ? 0 : 1))) { - if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) - fatal("Failed to initialize SQLite"); - info("Skipping SQLITE metadata initialization since memory mode is not dbengine"); - } - - if (unlikely(sql_init_context_database(system_info ? 0 : 1))) { - error_report("Failed to initialize context metadata database"); - } - - if (unlikely(!system_info)) - goto unittest; - +void dbengine_init(char *hostname) { #ifdef ENABLE_DBENGINE storage_tiers = config_get_number(CONFIG_SECTION_DB, "storage tiers", storage_tiers); if(storage_tiers < 1) { @@ -807,14 +789,15 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info) { else rrdeng_page_descr_use_malloc(); - int created_tiers = 0; + size_t created_tiers = 0; char dbenginepath[FILENAME_MAX + 1]; char dbengineconfig[200 + 1]; - for(int tier = 0; tier < storage_tiers ;tier++) { + int divisor = 1; + for(size_t tier = 0; tier < storage_tiers ;tier++) { if(tier == 0) snprintfz(dbenginepath, FILENAME_MAX, "%s/dbengine", netdata_configured_cache_dir); else - snprintfz(dbenginepath, FILENAME_MAX, "%s/dbengine-tier%d", netdata_configured_cache_dir, tier); + snprintfz(dbenginepath, FILENAME_MAX, "%s/dbengine-tier%zu", netdata_configured_cache_dir, tier); int ret = mkdir(dbenginepath, 0775); if (ret != 0 && errno != EEXIST) { @@ -822,27 +805,30 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info) { break; } - int page_cache_mb = default_rrdeng_page_cache_mb; - int disk_space_mb = default_multidb_disk_quota_mb; - int grouping_iterations = storage_tiers_grouping_iterations[tier]; + if(tier > 0) + divisor *= 2; + + int page_cache_mb = default_rrdeng_page_cache_mb / divisor; + int disk_space_mb = default_multidb_disk_quota_mb / divisor; + size_t grouping_iterations = storage_tiers_grouping_iterations[tier]; RRD_BACKFILL backfill = storage_tiers_backfill[tier]; if(tier > 0) { - snprintfz(dbengineconfig, 200, "dbengine tier %d page cache size MB", tier); + snprintfz(dbengineconfig, 200, "dbengine tier %zu page cache size MB", tier); page_cache_mb = config_get_number(CONFIG_SECTION_DB, dbengineconfig, page_cache_mb); - snprintfz(dbengineconfig, 200, "dbengine tier %d multihost disk space MB", tier); + snprintfz(dbengineconfig, 200, "dbengine tier %zu multihost disk space MB", tier); disk_space_mb = config_get_number(CONFIG_SECTION_DB, dbengineconfig, disk_space_mb); - snprintfz(dbengineconfig, 200, "dbengine tier %d update every iterations", tier); + snprintfz(dbengineconfig, 200, "dbengine tier %zu update every iterations", tier); grouping_iterations = config_get_number(CONFIG_SECTION_DB, dbengineconfig, grouping_iterations); if(grouping_iterations < 2) { grouping_iterations = 2; config_set_number(CONFIG_SECTION_DB, dbengineconfig, grouping_iterations); - error("DBENGINE on '%s': 'dbegnine tier %d update every iterations' cannot be less than 2. Assuming 2.", hostname, tier); + error("DBENGINE on '%s': 'dbegnine tier %zu update every iterations' cannot be less than 2. Assuming 2.", hostname, tier); } - snprintfz(dbengineconfig, 200, "dbengine tier %d backfill", tier); + snprintfz(dbengineconfig, 200, "dbengine tier %zu backfill", tier); const char *bf = config_get(CONFIG_SECTION_DB, dbengineconfig, backfill == RRD_BACKFILL_NEW ? "new" : backfill == RRD_BACKFILL_FULL ? "full" : "none"); if(strcmp(bf, "new") == 0) backfill = RRD_BACKFILL_NEW; else if(strcmp(bf, "full") == 0) backfill = RRD_BACKFILL_FULL; @@ -859,14 +845,14 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info) { if(tier > 0 && get_tier_grouping(tier) > 65535) { storage_tiers_grouping_iterations[tier] = 1; - error("DBENGINE on '%s': dbengine tier %d gives aggregation of more than 65535 points of tier 0. Disabling tiers above %d", hostname, tier, tier); + error("DBENGINE on '%s': dbengine tier %zu gives aggregation of more than 65535 points of tier 0. Disabling tiers above %zu", hostname, tier, tier); break; } - - internal_error(true, "DBENGINE tier %d grouping iterations is set to %d", tier, storage_tiers_grouping_iterations[tier]); + + internal_error(true, "DBENGINE tier %zu grouping iterations is set to %zu", tier, storage_tiers_grouping_iterations[tier]); ret = rrdeng_init(NULL, NULL, dbenginepath, page_cache_mb, disk_space_mb, tier); if(ret != 0) { - error("DBENGINE on '%s': Failed to initialize multi-host database tier %d on path '%s'", + error("DBENGINE on '%s': Failed to initialize multi-host database tier %zu on path '%s'", hostname, tier, dbenginepath); break; } @@ -875,13 +861,14 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info) { } if(created_tiers && created_tiers < storage_tiers) { - error("DBENGINE on '%s': Managed to create %d tiers instead of %d. Continuing with %d available.", + error("DBENGINE on '%s': Managed to create %zu tiers instead of %zu. Continuing with %zu available.", hostname, created_tiers, storage_tiers, created_tiers); storage_tiers = created_tiers; } else if(!created_tiers) fatal("DBENGINE on '%s', failed to initialize databases at '%s'.", hostname, netdata_configured_cache_dir); + dbengine_enabled = true; #else storage_tiers = config_get_number(CONFIG_SECTION_DB, "storage tiers", 1); if(storage_tiers != 1) { @@ -889,12 +876,54 @@ int rrd_init(char *hostname, struct rrdhost_system_info *system_info) { storage_tiers = 1; config_set_number(CONFIG_SECTION_DB, "storage tiers", storage_tiers); } + dbengine_enabled = false; #endif +} - health_init(); - rrdpush_init(); +int rrd_init(char *hostname, struct rrdhost_system_info *system_info) { + rrdhost_init(); -unittest: + if (unlikely(sql_init_database(DB_CHECK_NONE, system_info ? 0 : 1))) { + if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) + fatal("Failed to initialize SQLite"); + info("Skipping SQLITE metadata initialization since memory mode is not dbengine"); + } + + if (unlikely(sql_init_context_database(system_info ? 0 : 1))) { + error_report("Failed to initialize context metadata database"); + } + + if (unlikely(strcmp(hostname, "unittest") == 0)) { + dbengine_enabled = true; + } + else { + health_init(); + rrdpush_init(); + + if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE || rrdpush_receiver_needs_dbengine()) { + info("Initializing dbengine..."); + dbengine_init(hostname); + } + else { + info("Not initializing dbengine..."); + storage_tiers = 1; + } + + if (!dbengine_enabled) { + if (storage_tiers > 1) { + error("dbengine is not enabled, but %zu tiers have been requested. Resetting tiers to 1", + storage_tiers); + storage_tiers = 1; + } + + if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) { + error("dbengine is not enabled, but it has been given as the default db mode. Resetting db mode to alloc"); + default_rrd_memory_mode = RRD_MEMORY_MODE_ALLOC; + } + } + } + + metadata_sync_init(); debug(D_RRDHOST, "Initializing localhost with hostname '%s'", hostname); rrd_wrlock(); localhost = rrdhost_create( @@ -916,6 +945,9 @@ unittest: , default_rrdpush_destination , default_rrdpush_api_key , default_rrdpush_send_charts_matching + , default_rrdpush_enable_replication + , default_rrdpush_seconds_to_replicate + , default_rrdpush_replication_step , system_info , 1 , 0 @@ -940,19 +972,19 @@ unittest: // there are only used when NETDATA_INTERNAL_CHECKS is set void __rrdhost_check_rdlock(RRDHOST *host, const char *file, const char *function, const unsigned long line) { - debug(D_RRDHOST, "Checking read lock on host '%s'", host->hostname); + debug(D_RRDHOST, "Checking read lock on host '%s'", rrdhost_hostname(host)); int ret = netdata_rwlock_trywrlock(&host->rrdhost_rwlock); if(ret == 0) - fatal("RRDHOST '%s' should be read-locked, but it is not, at function %s() at line %lu of file '%s'", host->hostname, function, line, file); + fatal("RRDHOST '%s' should be read-locked, but it is not, at function %s() at line %lu of file '%s'", rrdhost_hostname(host), function, line, file); } void __rrdhost_check_wrlock(RRDHOST *host, const char *file, const char *function, const unsigned long line) { - debug(D_RRDHOST, "Checking write lock on host '%s'", host->hostname); + debug(D_RRDHOST, "Checking write lock on host '%s'", rrdhost_hostname(host)); int ret = netdata_rwlock_tryrdlock(&host->rrdhost_rwlock); if(ret == 0) - fatal("RRDHOST '%s' should be write-locked, but it is not, at function %s() at line %lu of file '%s'", host->hostname, function, line, file); + fatal("RRDHOST '%s' should be write-locked, but it is not, at function %s() at line %lu of file '%s'", rrdhost_hostname(host), function, line, file); } void __rrd_check_rdlock(const char *file, const char *function, const unsigned long line) { @@ -975,8 +1007,6 @@ void __rrd_check_wrlock(const char *file, const char *function, const unsigned l // RRDHOST - free void rrdhost_system_info_free(struct rrdhost_system_info *system_info) { - info("SYSTEM_INFO: free %p", system_info); - if(likely(system_info)) { freez(system_info->cloud_provider_type); freez(system_info->cloud_instance_type); @@ -1016,18 +1046,21 @@ void destroy_receiver_state(struct receiver_state *rpt); void stop_streaming_sender(RRDHOST *host) { + rrdhost_option_clear(host, RRDHOST_OPTION_SENDER_ENABLED); + if (unlikely(!host->sender)) return; rrdpush_sender_thread_stop(host); // stop a possibly running thread cbuffer_free(host->sender->buffer); - buffer_free(host->sender->build); #ifdef ENABLE_COMPRESSION if (host->sender->compressor) host->sender->compressor->destroy(&host->sender->compressor); #endif + replication_cleanup_sender(host->sender); freez(host->sender); host->sender = NULL; + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED); } void stop_streaming_receiver(RRDHOST *host) @@ -1051,7 +1084,7 @@ void rrdhost_free(RRDHOST *host, bool force) { if(!host) return; if (netdata_exit || force) - info("Freeing all memory for host '%s'...", host->hostname); + info("Freeing all memory for host '%s'...", rrdhost_hostname(host)); rrd_check_wrlock(); // make sure the RRDs are write locked @@ -1061,65 +1094,53 @@ void rrdhost_free(RRDHOST *host, bool force) { // ------------------------------------------------------------------------ // clean up streaming + stop_streaming_sender(host); if (netdata_exit || force) stop_streaming_receiver(host); + + // ------------------------------------------------------------------------ + // clean up alarms + + rrdcalc_delete_all(host); + + rrdhost_wrlock(host); // lock this RRDHOST + // ------------------------------------------------------------------------ // release its children resources #ifdef ENABLE_DBENGINE - for(int tier = 0; tier < storage_tiers ;tier++) { - if(host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && - host->storage_instance[tier] && - !is_storage_engine_shared(host->storage_instance[tier])) - rrdeng_prepare_exit((struct rrdengine_instance *)host->storage_instance[tier]); + for(size_t tier = 0; tier < storage_tiers ;tier++) { + if(host->db[tier].mode == RRD_MEMORY_MODE_DBENGINE + && host->db[tier].instance + && !is_storage_engine_shared(host->db[tier].instance)) + rrdeng_prepare_exit((struct rrdengine_instance *)host->db[tier].instance); } #endif - while(host->rrdset_root) - rrdset_free(host->rrdset_root); + // delete all the RRDSETs of the host + rrdset_index_destroy(host); + rrdcalc_rrdhost_index_destroy(host); + rrdcalctemplate_index_destroy(host); freez(host->exporting_flags); - while(host->alarms) - rrdcalc_unlink_and_free(host, host->alarms); - - RRDCALC *rc,*nc; - for(rc = host->alarms_with_foreach; rc ; rc = nc) { - nc = rc->next; - rrdcalc_free(rc); - } - host->alarms_with_foreach = NULL; - - while(host->templates) - rrdcalctemplate_unlink_and_free(host, host->templates); - - RRDCALCTEMPLATE *rt,*next; - for(rt = host->alarms_template_with_foreach; rt ; rt = next) { - next = rt->next; - rrdcalctemplate_free(rt); - } - host->alarms_template_with_foreach = NULL; - - debug(D_RRD_CALLS, "RRDHOST: Cleaning up remaining host variables for host '%s'", host->hostname); - rrdvar_free_remaining_variables(host, &host->rrdvar_root_index); - health_alarm_log_free(host); #ifdef ENABLE_DBENGINE - for(int tier = 0; tier < storage_tiers ;tier++) { - if(host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && - host->storage_instance[tier] && - !is_storage_engine_shared(host->storage_instance[tier])) - rrdeng_exit((struct rrdengine_instance *)host->storage_instance[tier]); + for(size_t tier = 0; tier < storage_tiers ;tier++) { + if(host->db[tier].mode == RRD_MEMORY_MODE_DBENGINE + && host->db[tier].instance + && !is_storage_engine_shared(host->db[tier].instance)) + rrdeng_exit((struct rrdengine_instance *)host->db[tier].instance); } #endif if (!netdata_exit && !force) { - info("Setting archive mode for host '%s'...", host->hostname); + info("Setting archive mode for host '%s'...", rrdhost_hostname(host)); rrdhost_flag_set(host, RRDHOST_FLAG_ARCHIVED); rrdhost_unlock(host); return; @@ -1143,24 +1164,13 @@ void rrdhost_free(RRDHOST *host, bool force) { // ------------------------------------------------------------------------ // remove it from the indexes - if(rrdhost_index_del(host) != host) - error("RRDHOST '%s' removed from index, deleted the wrong entry.", host->hostname); + rrdhost_index_del_hostname(host); + rrdhost_index_del_by_guid(host); // ------------------------------------------------------------------------ // unlink it from the host - if(host == localhost) { - localhost = host->next; - } - else { - // find the previous one - RRDHOST *h; - for(h = localhost; h && h->next != host ; h = h->next) ; - - // bypass it - if(h) h->next = host->next; - else error("Request to free RRDHOST '%s': cannot find it", host->hostname); - } + DOUBLE_LINKED_LIST_REMOVE_UNSAFE(localhost, host, prev, next); // ------------------------------------------------------------------------ // free it @@ -1168,37 +1178,36 @@ void rrdhost_free(RRDHOST *host, bool force) { pthread_mutex_destroy(&host->aclk_state_lock); freez(host->aclk_state.claimed_id); freez(host->aclk_state.prev_claimed_id); - freez((void *)host->tags); - rrdlabels_destroy(host->host_labels); - freez((void *)host->os); - freez((void *)host->timezone); - freez((void *)host->abbrev_timezone); - freez(host->program_version); - freez(host->program_name); + string_freez(host->tags); + rrdlabels_destroy(host->rrdlabels); + string_freez(host->os); + string_freez(host->timezone); + string_freez(host->abbrev_timezone); + string_freez(host->program_name); + string_freez(host->program_version); rrdhost_system_info_free(host->system_info); freez(host->cache_dir); freez(host->varlib_dir); freez(host->rrdpush_send_api_key); freez(host->rrdpush_send_destination); - struct rrdpush_destinations *tmp_destination; - while (host->destinations) { - tmp_destination = host->destinations->next; - freez(host->destinations); - host->destinations = tmp_destination; - } - freez(host->health_default_exec); - freez(host->health_default_recipient); + rrdpush_destinations_free(host); + string_freez(host->health_default_exec); + string_freez(host->health_default_recipient); freez(host->health_log_filename); - freez(host->hostname); - freez(host->registry_hostname); + string_freez(host->registry_hostname); simple_pattern_free(host->rrdpush_send_charts_matching); rrdhost_unlock(host); netdata_rwlock_destroy(&host->health_log.alarm_log_rwlock); netdata_rwlock_destroy(&host->rrdhost_rwlock); freez(host->node_id); + rrdfamily_index_destroy(host); + rrdfunctions_destroy(host); + rrdvariables_destroy(host->rrdvars); + rrdhost_destroy_rrdcontexts(host); + string_freez(host->hostname); freez(host); #ifdef ENABLE_ACLK if (wc) @@ -1209,9 +1218,14 @@ void rrdhost_free(RRDHOST *host, bool force) { void rrdhost_free_all(void) { rrd_wrlock(); + /* Make sure child-hosts are released before the localhost. */ - while(localhost->next) rrdhost_free(localhost->next, 1); - rrdhost_free(localhost, 1); + while(localhost && localhost->next) + rrdhost_free(localhost->next, 1); + + if(localhost) + rrdhost_free(localhost, 1); + rrd_unlock(); } @@ -1221,25 +1235,20 @@ void rrdhost_free_all(void) { void rrdhost_save_charts(RRDHOST *host) { if(!host) return; - info("Saving/Closing database of host '%s'...", host->hostname); + info("Saving/Closing database of host '%s'...", rrdhost_hostname(host)); RRDSET *st; // we get a write lock // to ensure only one thread is saving the database - rrdhost_wrlock(host); - rrdset_foreach_write(st, host) { - rrdset_rdlock(st); rrdset_save(st); - rrdset_unlock(st); } - - rrdhost_unlock(host); + rrdset_foreach_done(st); } static void rrdhost_load_auto_labels(void) { - DICTIONARY *labels = localhost->host_labels; + DICTIONARY *labels = localhost->rrdlabels; if (localhost->system_info->cloud_provider_type) rrdlabels_add(labels, "_cloud_provider_type", localhost->system_info->cloud_provider_type, RRDLABEL_SRC_AUTO); @@ -1301,13 +1310,31 @@ static void rrdhost_load_auto_labels(void) { add_aclk_host_labels(); + health_add_host_labels(); + rrdlabels_add( - labels, "_is_parent", (localhost->next || configured_as_parent()) ? "true" : "false", RRDLABEL_SRC_AUTO); + labels, "_is_parent", (localhost->senders_count > 0) ? "true" : "false", RRDLABEL_SRC_AUTO); if (localhost->rrdpush_send_destination) rrdlabels_add(labels, "_streams_to", localhost->rrdpush_send_destination, RRDLABEL_SRC_AUTO); } +void rrdhost_set_is_parent_label(int count) { + DICTIONARY *labels = localhost->rrdlabels; + + if (count == 0 || count == 1) { + rrdlabels_add( + labels, "_is_parent", (count) ? "true" : "false", RRDLABEL_SRC_AUTO); + + //queue a node info +#ifdef ENABLE_ACLK + if (netdata_cloud_setting) { + aclk_queue_node_info(localhost); + } +#endif + } +} + static void rrdhost_load_config_labels(void) { int status = config_load(NULL, 1, CONFIG_SECTION_HOST_LABEL); if(!status) { @@ -1320,7 +1347,7 @@ static void rrdhost_load_config_labels(void) { config_section_wrlock(co); struct config_option *cv; for(cv = co->values; cv ; cv = cv->next) { - rrdlabels_add(localhost->host_labels, cv->name, cv->value, RRDLABEL_SRC_CONFIG); + rrdlabels_add(localhost->rrdlabels, cv->name, cv->value, RRDLABEL_SRC_CONFIG); cv->flags |= CONFIG_VALUE_USED; } config_section_unlock(co); @@ -1339,41 +1366,37 @@ static void rrdhost_load_kubernetes_labels(void) { debug(D_RRDHOST, "Attempting to fetch external labels via %s", label_script); pid_t pid; - FILE *fp = mypopen(label_script, &pid); - if(!fp) return; + FILE *fp_child_input; + FILE *fp_child_output = netdata_popen(label_script, &pid, &fp_child_input); + if(!fp_child_output) return; char buffer[1000 + 1]; - while (fgets(buffer, 1000, fp) != NULL) - rrdlabels_add_pair(localhost->host_labels, buffer, RRDLABEL_SRC_AUTO|RRDLABEL_SRC_K8S); + while (fgets(buffer, 1000, fp_child_output) != NULL) + rrdlabels_add_pair(localhost->rrdlabels, buffer, RRDLABEL_SRC_AUTO|RRDLABEL_SRC_K8S); // Non-zero exit code means that all the script output is error messages. We've shown already any message that didn't include a ':' // Here we'll inform with an ERROR that the script failed, show whatever (if anything) was added to the list of labels, free the memory and set the return to null - int rc = mypclose(fp, pid); + int rc = netdata_pclose(fp_child_input, fp_child_output, pid); if(rc) error("%s exited abnormally. Failed to get kubernetes labels.", label_script); } void reload_host_labels(void) { - if(!localhost->host_labels) - localhost->host_labels = rrdlabels_create(); + if(!localhost->rrdlabels) + localhost->rrdlabels = rrdlabels_create(); - rrdlabels_unmark_all(localhost->host_labels); + rrdlabels_unmark_all(localhost->rrdlabels); // priority is important here rrdhost_load_config_labels(); rrdhost_load_kubernetes_labels(); rrdhost_load_auto_labels(); - rrdlabels_remove_all_unmarked(localhost->host_labels); - sql_store_host_labels(localhost); + rrdlabels_remove_all_unmarked(localhost->rrdlabels); + metaqueue_store_host_labels(localhost->machine_guid); health_label_log_save(localhost); -/* TODO-GAPS - fix this so that it looks properly at the state and version of the sender - if(localhost->rrdpush_send_enabled && localhost->rrdpush_sender_buffer){ - localhost->labels.labels_flag |= RRDHOST_FLAG_STREAM_LABELS_UPDATE; - rrdpush_send_labels(localhost); - } -*/ + rrdpush_send_host_labels(localhost); health_reload(); } @@ -1383,23 +1406,18 @@ void reload_host_labels(void) { void rrdhost_delete_charts(RRDHOST *host) { if(!host) return; - info("Deleting database of host '%s'...", host->hostname); + info("Deleting database of host '%s'...", rrdhost_hostname(host)); RRDSET *st; // we get a write lock // to ensure only one thread is saving the database - rrdhost_wrlock(host); - rrdset_foreach_write(st, host) { - rrdset_rdlock(st); - rrdset_delete_files(st); - rrdset_unlock(st); + rrdset_delete_files(st); } + rrdset_foreach_done(st); recursively_delete_dir(host->cache_dir, "left over host"); - - rrdhost_unlock(host); } // ---------------------------------------------------------------------------- @@ -1408,29 +1426,26 @@ void rrdhost_delete_charts(RRDHOST *host) { void rrdhost_cleanup_charts(RRDHOST *host) { if(!host) return; - info("Cleaning up database of host '%s'...", host->hostname); + info("Cleaning up database of host '%s'...", rrdhost_hostname(host)); RRDSET *st; - uint32_t rrdhost_delete_obsolete_charts = rrdhost_flag_check(host, RRDHOST_FLAG_DELETE_OBSOLETE_CHARTS); + uint32_t rrdhost_delete_obsolete_charts = rrdhost_option_check(host, RRDHOST_OPTION_DELETE_OBSOLETE_CHARTS); // we get a write lock // to ensure only one thread is saving the database - rrdhost_wrlock(host); - rrdset_foreach_write(st, host) { - rrdset_rdlock(st); if(rrdhost_delete_obsolete_charts && rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)) rrdset_delete_files(st); + else if(rrdhost_delete_obsolete_charts && rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE_DIMENSIONS)) rrdset_delete_obsolete_dimensions(st); + else rrdset_save(st); - rrdset_unlock(st); } - - rrdhost_unlock(host); + rrdset_foreach_done(st); } @@ -1459,11 +1474,9 @@ void rrdhost_cleanup_all(void) { RRDHOST *host; rrdhost_foreach_read(host) { - if (host != localhost && rrdhost_flag_check(host, RRDHOST_FLAG_DELETE_ORPHAN_HOST) && !host->receiver -#ifdef ENABLE_DBENGINE + if (host != localhost && rrdhost_option_check(host, RRDHOST_OPTION_DELETE_ORPHAN_HOST) && !host->receiver /* don't delete multi-host DB host files */ - && !(host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && is_storage_engine_shared(host->storage_instance[0])) -#endif + && !(host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && is_storage_engine_shared(host->db[0].instance)) ) rrdhost_delete_charts(host); else @@ -1475,157 +1488,6 @@ void rrdhost_cleanup_all(void) { // ---------------------------------------------------------------------------- -// RRDHOST - save or delete all the host charts from disk - -void rrdhost_cleanup_obsolete_charts(RRDHOST *host) { - time_t now = now_realtime_sec(); - - RRDSET *st; - - uint32_t rrdhost_delete_obsolete_charts = rrdhost_flag_check(host, RRDHOST_FLAG_DELETE_OBSOLETE_CHARTS); - -restart_after_removal: - rrdset_foreach_write(st, host) { - if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE) - && st->last_accessed_time + rrdset_free_obsolete_time < now - && st->last_updated.tv_sec + rrdset_free_obsolete_time < now - && st->last_collected_time.tv_sec + rrdset_free_obsolete_time < now - )) { - st->rrdhost->obsolete_charts_count--; -#ifdef ENABLE_DBENGINE - if(st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) { - RRDDIM *rd, *last; - - rrdset_flag_set(st, RRDSET_FLAG_ARCHIVED); - while (st->variables) rrdsetvar_free(st->variables); - while (st->alarms) rrdsetcalc_unlink(st->alarms); - rrdset_wrlock(st); - for (rd = st->dimensions, last = NULL ; likely(rd) ; ) { - if (rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) { - last = rd; - rd = rd->next; - continue; - } - - if (rrddim_flag_check(rd, RRDDIM_FLAG_ACLK)) { - last = rd; - rd = rd->next; - continue; - } - rrddim_flag_set(rd, RRDDIM_FLAG_ARCHIVED); - while (rd->variables) - rrddimvar_free(rd->variables); - - if (rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE)) { - rrddim_flag_clear(rd, RRDDIM_FLAG_OBSOLETE); - - /* only a collector can mark a chart as obsolete, so we must remove the reference */ - - size_t tiers_available = 0, tiers_said_yes = 0; - for(int tier = 0; tier < storage_tiers ;tier++) { - if(rd->tiers[tier]) { - tiers_available++; - - if(rd->tiers[tier]->collect_ops.finalize(rd->tiers[tier]->db_collection_handle)) - tiers_said_yes++; - - rd->tiers[tier]->db_collection_handle = NULL; - } - } - - if (tiers_available == tiers_said_yes && tiers_said_yes) { - /* This metric has no data and no references */ - delete_dimension_uuid(&rd->metric_uuid); - rrddim_free(st, rd); - if (unlikely(!last)) { - rd = st->dimensions; - } - else { - rd = last->next; - } - continue; - } -#ifdef ENABLE_ACLK - else - queue_dimension_to_aclk(rd, rd->last_collected_time.tv_sec); -#endif - } - last = rd; - rd = rd->next; - } - rrdset_unlock(st); - - debug(D_RRD_CALLS, "RRDSET: Cleaning up remaining chart variables for host '%s', chart '%s'", host->hostname, st->id); - rrdvar_free_remaining_variables(host, &st->rrdvar_root_index); - - rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE); - - if (st->dimensions) { - /* If the chart still has dimensions don't delete it from the metadata log */ - continue; - } - } -#endif - rrdset_rdlock(st); - - if(rrdhost_delete_obsolete_charts) - rrdset_delete_files(st); - else - rrdset_save(st); - - rrdset_unlock(st); - - rrdset_free(st); - goto restart_after_removal; - } -#ifdef ENABLE_ACLK - else - sql_check_chart_liveness(st); -#endif - } -} - -void rrdset_check_obsoletion(RRDHOST *host) -{ - RRDSET *st; - time_t last_entry_t; - rrdset_foreach_read(st, host) { - last_entry_t = rrdset_last_entry_t(st); - if (last_entry_t && last_entry_t < host->senders_connect_time) { - rrdset_is_obsolete(st); - } - } -} - -void rrd_cleanup_obsolete_charts() -{ - rrd_rdlock(); - - RRDHOST *host; - rrdhost_foreach_read(host) - { - if (host->obsolete_charts_count) { - rrdhost_wrlock(host); - rrdhost_cleanup_obsolete_charts(host); - rrdhost_unlock(host); - } - - if ( host != localhost && - host->trigger_chart_obsoletion_check && - ((host->senders_last_chart_command && - host->senders_last_chart_command + host->health_delay_up_to < now_realtime_sec()) - || (host->senders_connect_time + 300 < now_realtime_sec())) ) { - rrdhost_rdlock(host); - rrdset_check_obsoletion(host); - rrdhost_unlock(host); - host->trigger_chart_obsoletion_check = 0; - } - } - - rrd_unlock(); -} - -// ---------------------------------------------------------------------------- // RRDHOST - set system info from environment variables // system_info fields must be heap allocated or NULL int rrdhost_set_system_info_variable(struct rrdhost_system_info *system_info, char *name, char *value) { @@ -1761,57 +1623,18 @@ int rrdhost_set_system_info_variable(struct rrdhost_system_info *system_info, ch return res; } -/** - * Alarm Compare ID - * - * Callback function used with the binary trees to compare the id of RRDCALC - * - * @param a a pointer to the RRDCAL item to insert,compare or update the binary tree - * @param b the pointer to the binary tree. - * - * @return It returns 0 case the values are equal, 1 case a is bigger than b and -1 case a is smaller than b. - */ -int alarm_compare_id(void *a, void *b) { - register uint32_t hash1 = ((RRDCALC *)a)->id; - register uint32_t hash2 = ((RRDCALC *)b)->id; - - if(hash1 < hash2) return -1; - else if(hash1 > hash2) return 1; - - return 0; -} - -/** - * Alarm Compare NAME - * - * Callback function used with the binary trees to compare the name of RRDCALC - * - * @param a a pointer to the RRDCAL item to insert,compare or update the binary tree - * @param b the pointer to the binary tree. - * - * @return It returns 0 case the values are equal, 1 case a is bigger than b and -1 case a is smaller than b. - */ -int alarm_compare_name(void *a, void *b) { - RRDCALC *in1 = (RRDCALC *)a; - RRDCALC *in2 = (RRDCALC *)b; - - if(in1->hash < in2->hash) return -1; - else if(in1->hash > in2->hash) return 1; - - return strcmp(in1->name,in2->name); -} - // Added for gap-filling, if this proves to be a bottleneck in large-scale systems then we will need to cache // the last entry times as the metric updates, but let's see if it is a problem first. time_t rrdhost_last_entry_t(RRDHOST *h) { - rrdhost_rdlock(h); RRDSET *st; time_t result = 0; + rrdset_foreach_read(st, h) { time_t st_last = rrdset_last_entry_t(st); + if (st_last > result) result = st_last; } - rrdhost_unlock(h); + rrdset_foreach_done(st); return result; } |