diff options
author | Federico Ceratto <federico.ceratto@gmail.com> | 2016-11-23 15:49:10 +0000 |
---|---|---|
committer | Federico Ceratto <federico.ceratto@gmail.com> | 2016-11-23 15:49:10 +0000 |
commit | 87649cf32bd0e14d5a903fb85b01e9f41a253540 (patch) | |
tree | bbefda6dac074aeb87529592e8e5064f69cbe024 /src/rrd.c | |
parent | Imported Upstream version 1.3.0+dfsg (diff) | |
download | netdata-87649cf32bd0e14d5a903fb85b01e9f41a253540.tar.xz netdata-87649cf32bd0e14d5a903fb85b01e9f41a253540.zip |
New upstream version 1.4.0+dfsgupstream/1.4.0+dfsg
Diffstat (limited to 'src/rrd.c')
-rw-r--r-- | src/rrd.c | 230 |
1 files changed, 150 insertions, 80 deletions
@@ -5,10 +5,12 @@ // ---------------------------------------------------------------------------- // globals +/* // if not zero it gives the time (in seconds) to remove un-updated dimensions // DO NOT ENABLE // if dimensions are removed, the chart generation will have to run again int rrd_delete_unupdated_dimensions = 0; +*/ int rrd_update_every = UPDATE_EVERY; int rrd_default_history_entries = RRD_DEFAULT_HISTORY_ENTRIES; @@ -42,7 +44,8 @@ RRDHOST localhost = { AVL_LOCK_INITIALIZER }, .health_log = { - .nextid = 1, + .next_log_id = 1, + .next_alarm_id = 1, .count = 0, .max = 1000, .alarms = NULL, @@ -50,6 +53,12 @@ RRDHOST localhost = { } }; +void rrdhost_init(char *hostname) { + localhost.hostname = hostname; + localhost.health_log.next_log_id = + localhost.health_log.next_alarm_id = time(NULL); +} + void rrdhost_rwlock(RRDHOST *host) { pthread_rwlock_wrlock(&host->rrdset_root_rwlock); } @@ -413,8 +422,29 @@ void rrdset_reset(RRDSET *st) rd->last_collected_time.tv_sec = 0; rd->last_collected_time.tv_usec = 0; rd->counter = 0; - bzero(rd->values, rd->entries * sizeof(storage_number)); + memset(rd->values, 0, rd->entries * sizeof(storage_number)); + } +} +static long align_entries_to_pagesize(long entries) { + if(entries < 5) entries = 5; + if(entries > RRD_HISTORY_ENTRIES_MAX) entries = RRD_HISTORY_ENTRIES_MAX; + +#ifdef NETDATA_LOG_ALLOCATIONS + long page = (size_t)sysconf(_SC_PAGESIZE); + + long size = sizeof(RRDDIM) + entries * sizeof(storage_number); + if(size % page) { + size -= (size % page); + size += page; + + long n = (size - sizeof(RRDDIM)) / sizeof(storage_number); + return n; } + + return entries; +#else + return entries; +#endif } RRDSET *rrdset_create(const char *type, const char *id, const char *name, const char *family, const char *context, const char *title, const char *units, long priority, int update_every, int chart_type) @@ -441,9 +471,9 @@ RRDSET *rrdset_create(const char *type, const char *id, const char *name, const return st; } - long entries = config_get_number(fullid, "history", rrd_default_history_entries); - if(entries < 5) entries = config_set_number(fullid, "history", 5); - if(entries > RRD_HISTORY_ENTRIES_MAX) entries = config_set_number(fullid, "history", RRD_HISTORY_ENTRIES_MAX); + long rentries = config_get_number(fullid, "history", rrd_default_history_entries); + long entries = align_entries_to_pagesize(rentries); + if(entries != rentries) entries = config_set_number(fullid, "history", entries); int enabled = config_get_boolean(fullid, "enabled", 1); if(!enabled) entries = 5; @@ -459,29 +489,29 @@ RRDSET *rrdset_create(const char *type, const char *id, const char *name, const if(strcmp(st->magic, RRDSET_MAGIC) != 0) { errno = 0; info("Initializing file %s.", fullfilename); - bzero(st, size); + memset(st, 0, size); } else if(strcmp(st->id, fullid) != 0) { errno = 0; error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid); // munmap(st, size); // st = NULL; - bzero(st, size); + memset(st, 0, size); } else if(st->memsize != size || st->entries != entries) { errno = 0; error("File %s does not have the desired size. Clearing it.", fullfilename); - bzero(st, size); + memset(st, 0, size); } else if(st->update_every != update_every) { errno = 0; error("File %s does not have the desired update frequency. Clearing it.", fullfilename); - bzero(st, size); + memset(st, 0, size); } else if((time(NULL) - st->last_updated.tv_sec) > update_every * entries) { errno = 0; error("File %s is too old. Clearing it.", fullfilename); - bzero(st, size); + memset(st, 0, size); } } @@ -496,6 +526,7 @@ RRDSET *rrdset_create(const char *type, const char *id, const char *name, const st->next = NULL; st->mapped = rrd_memory_mode; st->variables = NULL; + st->alarms = NULL; } else { st = callocz(1, size); @@ -606,44 +637,44 @@ RRDDIM *rrddim_add(RRDSET *st, const char *id, const char *name, long multiplier if(strcmp(rd->magic, RRDDIMENSION_MAGIC) != 0) { errno = 0; info("Initializing file %s.", fullfilename); - bzero(rd, size); + memset(rd, 0, size); } else if(rd->memsize != size) { errno = 0; error("File %s does not have the desired size. Clearing it.", fullfilename); - bzero(rd, size); + memset(rd, 0, size); } else if(rd->multiplier != multiplier) { errno = 0; error("File %s does not have the same multiplier. Clearing it.", fullfilename); - bzero(rd, size); + memset(rd, 0, size); } else if(rd->divisor != divisor) { errno = 0; error("File %s does not have the same divisor. Clearing it.", fullfilename); - bzero(rd, size); + memset(rd, 0, size); } else if(rd->algorithm != algorithm) { errno = 0; error("File %s does not have the same algorithm. Clearing it.", fullfilename); - bzero(rd, size); + memset(rd, 0, size); } else if(rd->update_every != st->update_every) { errno = 0; error("File %s does not have the same refresh frequency. Clearing it.", fullfilename); - bzero(rd, size); + memset(rd, 0, size); } else if(usec_dt(&now, &rd->last_collected_time) > (rd->entries * rd->update_every * 1000000ULL)) { errno = 0; error("File %s is too old. Clearing it.", fullfilename); - bzero(rd, size); + memset(rd, 0, size); } else if(strcmp(rd->id, id) != 0) { errno = 0; error("File %s contents are not for dimension %s. Clearing it.", fullfilename, id); // munmap(rd, size); // rd = NULL; - bzero(rd, size); + memset(rd, 0, size); } } @@ -927,6 +958,8 @@ collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number rd->updated = 1; rd->counter++; + // fprintf(stderr, "%s.%s %llu " COLLECTED_NUMBER_FORMAT " dt %0.6f" " rate " CALCULATED_NUMBER_FORMAT "\n", st->name, rd->name, st->usec_since_last_update, value, (float)((double)st->usec_since_last_update / (double)1000000), (calculated_number)((value - rd->last_collected_value) * (calculated_number)rd->multiplier / (calculated_number)rd->divisor * 1000000.0 / (calculated_number)st->usec_since_last_update)); + return rd->last_collected_value; } @@ -978,25 +1011,42 @@ unsigned long long rrdset_done(RRDSET *st) debug(D_RRD_CALLS, "rrdset_done() for chart %s", st->name); - RRDDIM *rd, *last; - int oldstate, store_this_entry = 1, first_entry = 0; - unsigned long long last_ut, now_ut, next_ut, stored_entries = 0; + RRDDIM *rd; + + int + pthreadoldcancelstate; // store the old cancelable pthread state, to restore it at the end + + char + store_this_entry = 1, // boolean: 1 = store this entry, 0 = don't store this entry + first_entry = 0; // boolean: 1 = this is the first entry seen for this chart, 0 = all other entries + + unsigned int + stored_entries = 0; // the number of entries we have stored in the db, during this call to rrdset_done() + + unsigned long long + last_collect_ut, // the timestamp in microseconds, of the last collected value + now_collect_ut, // the timestamp in microseconds, of this collected value (this is NOW) + last_stored_ut, // the timestamp in microseconds, of the last stored entry in the db + next_store_ut, // the timestamp in microseconds, of the next entry to store in the db + update_every_ut = st->update_every * 1000000ULL; // st->update_every in microseconds - if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) != 0)) + if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &pthreadoldcancelstate) != 0)) error("Cannot set pthread cancel state to DISABLE."); // a read lock is OK here pthread_rwlock_rdlock(&st->rwlock); +/* // enable the chart, if it was disabled if(unlikely(rrd_delete_unupdated_dimensions) && !st->enabled) st->enabled = 1; +*/ // check if the chart has a long time to be updated - if(unlikely(st->usec_since_last_update > st->entries * st->update_every * 1000000ULL)) { + if(unlikely(st->usec_since_last_update > st->entries * update_every_ut)) { info("%s: took too long to be updated (%0.3Lf secs). Reseting it.", st->name, (long double)(st->usec_since_last_update / 1000000.0)); rrdset_reset(st); - st->usec_since_last_update = st->update_every * 1000000ULL; + st->usec_since_last_update = update_every_ut; first_entry = 1; } if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: microseconds since last update: %llu", st->name, st->usec_since_last_update); @@ -1006,6 +1056,7 @@ unsigned long long rrdset_done(RRDSET *st) // it is the first entry // set the last_collected_time to now gettimeofday(&st->last_collected_time, NULL); + last_collect_ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - update_every_ut; // the first entry should not be stored store_this_entry = 0; @@ -1016,7 +1067,8 @@ unsigned long long rrdset_done(RRDSET *st) else { // it is not the first entry // calculate the proper last_collected_time, using usec_since_last_update - unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec + st->usec_since_last_update; + last_collect_ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec; + unsigned long long ut = last_collect_ut + st->usec_since_last_update; st->last_collected_time.tv_sec = (time_t) (ut / 1000000ULL); st->last_collected_time.tv_usec = (suseconds_t) (ut % 1000000ULL); } @@ -1038,11 +1090,11 @@ unsigned long long rrdset_done(RRDSET *st) } // check if we will re-write the entire data set - if(unlikely(usec_dt(&st->last_collected_time, &st->last_updated) > st->update_every * st->entries * 1000000ULL)) { + if(unlikely(usec_dt(&st->last_collected_time, &st->last_updated) > st->entries * update_every_ut)) { info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Reseting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec); rrdset_reset(st); - st->usec_since_last_update = st->update_every * 1000000ULL; + st->usec_since_last_update = update_every_ut; gettimeofday(&st->last_collected_time, NULL); @@ -1056,21 +1108,18 @@ unsigned long long rrdset_done(RRDSET *st) } // these are the 3 variables that will help us in interpolation - // last_ut = the last time we added a value to the storage - // now_ut = the time the current value is taken at - // next_ut = the time of the next interpolation point - last_ut = st->last_updated.tv_sec * 1000000ULL + st->last_updated.tv_usec; - now_ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec; - next_ut = (st->last_updated.tv_sec + st->update_every) * 1000000ULL; - - if(unlikely(!first_entry && now_ut < next_ut)) { - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: THIS IS IN THE SAME INTERPOLATION POINT", st->name); - } + // last_stored_ut = the last time we added a value to the storage + // now_collect_ut = the time the current value has been collected + // next_store_ut = the time of the next interpolation point + last_stored_ut = st->last_updated.tv_sec * 1000000ULL + st->last_updated.tv_usec; + now_collect_ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec; + next_store_ut = (st->last_updated.tv_sec + st->update_every) * 1000000ULL; if(unlikely(st->debug)) { - debug(D_RRD_STATS, "%s: last ut = %0.3Lf (last updated time)", st->name, (long double)last_ut/1000000.0); - debug(D_RRD_STATS, "%s: now ut = %0.3Lf (current update time)", st->name, (long double)now_ut/1000000.0); - debug(D_RRD_STATS, "%s: next ut = %0.3Lf (next interpolation point)", st->name, (long double)next_ut/1000000.0); + debug(D_RRD_STATS, "%s: last_collect_ut = %0.3Lf (last collection time)", st->name, (long double)last_collect_ut/1000000.0); + debug(D_RRD_STATS, "%s: now_collect_ut = %0.3Lf (current collection time)", st->name, (long double)now_collect_ut/1000000.0); + debug(D_RRD_STATS, "%s: last_stored_ut = %0.3Lf (last updated time)", st->name, (long double)last_stored_ut/1000000.0); + debug(D_RRD_STATS, "%s: next_store_ut = %0.3Lf (next interpolation point)", st->name, (long double)next_store_ut/1000000.0); } if(unlikely(!st->counter_done)) { @@ -1082,7 +1131,7 @@ unsigned long long rrdset_done(RRDSET *st) // calculate totals and count the dimensions int dimensions; st->collected_total = 0; - for( rd = st->dimensions, dimensions = 0 ; likely(rd) ; rd = rd->next, dimensions++ ) + for( rd = st->dimensions, dimensions = 0 ; rd ; rd = rd->next, dimensions++ ) if(likely(rd->updated)) st->collected_total += rd->collected_value; uint32_t storage_flags = SN_EXISTS; @@ -1090,7 +1139,7 @@ unsigned long long rrdset_done(RRDSET *st) // process all dimensions to calculate their values // based on the collected figures only // at this stage we do not interpolate anything - for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) { + for( rd = st->dimensions ; rd ; rd = rd->next ) { if(unlikely(!rd->updated)) { rd->calculated_value = 0; @@ -1169,7 +1218,7 @@ unsigned long long rrdset_done(RRDSET *st) rd->last_collected_value = rd->collected_value; } - rd->calculated_value = + rd->calculated_value += (calculated_number)(rd->collected_value - rd->last_collected_value) * (calculated_number)rd->multiplier / (calculated_number)rd->divisor; @@ -1259,21 +1308,26 @@ unsigned long long rrdset_done(RRDSET *st) // at this point we have all the calculated values ready // it is now time to interpolate values on a second boundary - unsigned long long first_ut = last_ut; - long long iterations = (now_ut - last_ut) / (st->update_every * 1000000ULL); - if((now_ut % (st->update_every * 1000000ULL)) == 0) iterations++; + if(unlikely(now_collect_ut < next_store_ut)) { + // this is collected in the same interpolation point + if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: THIS IS IN THE SAME INTERPOLATION POINT", st->name); + } - for( ; likely(next_ut <= now_ut) ; next_ut += st->update_every * 1000000ULL, iterations-- ) { + unsigned long long first_ut = last_stored_ut; + long long iterations = (now_collect_ut - last_stored_ut) / (update_every_ut); + if((now_collect_ut % (update_every_ut)) == 0) iterations++; + + for( ; next_store_ut <= now_collect_ut ; last_collect_ut = next_store_ut, next_store_ut += update_every_ut, iterations-- ) { #ifdef NETDATA_INTERNAL_CHECKS - if(iterations < 0) { error("%s: iterations calculation wrapped! first_ut = %llu, last_ut = %llu, next_ut = %llu, now_ut = %llu", st->name, first_ut, last_ut, next_ut, now_ut); } + if(iterations < 0) { error("%s: iterations calculation wrapped! first_ut = %llu, last_stored_ut = %llu, next_store_ut = %llu, now_collect_ut = %llu", st->name, first_ut, last_stored_ut, next_store_ut, now_collect_ut); } #endif if(unlikely(st->debug)) { - debug(D_RRD_STATS, "%s: last ut = %0.3Lf (last updated time)", st->name, (long double)last_ut/1000000.0); - debug(D_RRD_STATS, "%s: next ut = %0.3Lf (next interpolation point)", st->name, (long double)next_ut/1000000.0); + debug(D_RRD_STATS, "%s: last_stored_ut = %0.3Lf (last updated time)", st->name, (long double)last_stored_ut/1000000.0); + debug(D_RRD_STATS, "%s: next_store_ut = %0.3Lf (next interpolation point)", st->name, (long double)next_store_ut/1000000.0); } - st->last_updated.tv_sec = (time_t) (next_ut / 1000000ULL); + st->last_updated.tv_sec = (time_t) (next_store_ut / 1000000ULL); st->last_updated.tv_usec = 0; for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) { @@ -1283,8 +1337,8 @@ unsigned long long rrdset_done(RRDSET *st) case RRDDIM_INCREMENTAL: new_value = (calculated_number) ( rd->calculated_value - * (calculated_number)(next_ut - last_ut) - / (calculated_number)(now_ut - last_ut) + * (calculated_number)(next_store_ut - last_collect_ut) + / (calculated_number)(now_collect_ut - last_collect_ut) ); if(unlikely(st->debug)) @@ -1296,14 +1350,23 @@ unsigned long long rrdset_done(RRDSET *st) , st->id, rd->name , new_value , rd->calculated_value - , (next_ut - last_ut) - , (now_ut - last_ut) + , (next_store_ut - last_stored_ut) + , (now_collect_ut - last_stored_ut) ); rd->calculated_value -= new_value; new_value += rd->last_calculated_value; rd->last_calculated_value = 0; new_value /= (calculated_number)st->update_every; + + if(unlikely(next_store_ut - last_stored_ut < update_every_ut)) { + if(unlikely(st->debug)) + debug(D_RRD_STATS, "%s/%s: COLLECTION POINT IS SHORT " CALCULATED_NUMBER_FORMAT " - EXTRAPOLATING", + st->id, rd->name + , (calculated_number)(next_store_ut - last_stored_ut) + ); + new_value = new_value * (calculated_number)(st->update_every * 1000000) / (calculated_number)(next_store_ut - last_stored_ut); + } break; case RRDDIM_ABSOLUTE: @@ -1323,8 +1386,8 @@ unsigned long long rrdset_done(RRDSET *st) new_value = (calculated_number) ( ( (rd->calculated_value - rd->last_calculated_value) - * (calculated_number)(next_ut - first_ut) - / (calculated_number)(now_ut - first_ut) + * (calculated_number)(next_store_ut - last_collect_ut) + / (calculated_number)(now_collect_ut - last_collect_ut) ) + rd->last_calculated_value ); @@ -1338,20 +1401,15 @@ unsigned long long rrdset_done(RRDSET *st) , st->id, rd->name , new_value , rd->calculated_value, rd->last_calculated_value - , (next_ut - first_ut) - , (now_ut - first_ut), rd->last_calculated_value + , (next_store_ut - first_ut) + , (now_collect_ut - first_ut), rd->last_calculated_value ); - - // this is wrong - // it fades the value towards the target - // while we know the calculated value is different - // if(likely(next_ut + st->update_every * 1000000ULL > now_ut)) rd->calculated_value = new_value; } break; } if(unlikely(!store_this_entry)) { - // store_this_entry = 1; + rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS); continue; } @@ -1411,25 +1469,34 @@ unsigned long long rrdset_done(RRDSET *st) st->counter++; st->current_entry = ((st->current_entry + 1) >= st->entries) ? 0 : st->current_entry + 1; - last_ut = next_ut; + last_stored_ut = next_store_ut; } - // align next interpolation to last collection point - if(likely(stored_entries || !store_this_entry)) { - st->last_updated.tv_sec = st->last_collected_time.tv_sec; - st->last_updated.tv_usec = st->last_collected_time.tv_usec; - st->last_collected_total = st->collected_total; - } + st->last_collected_total = st->collected_total; - for( rd = st->dimensions; likely(rd) ; rd = rd->next ) { + for( rd = st->dimensions; rd ; rd = rd->next ) { if(unlikely(!rd->updated)) continue; - if(likely(stored_entries || !store_this_entry)) { - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_collected_value (old: " COLLECTED_NUMBER_FORMAT ") to last_collected_value (new: " COLLECTED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_collected_value, rd->collected_value); - rd->last_collected_value = rd->collected_value; + if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_collected_value (old: " COLLECTED_NUMBER_FORMAT ") to last_collected_value (new: " COLLECTED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_collected_value, rd->collected_value); + rd->last_collected_value = rd->collected_value; + + switch(rd->algorithm) { + case RRDDIM_INCREMENTAL: + if(unlikely(!first_entry)) { + if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value + rd->calculated_value, rd->calculated_value); + rd->last_calculated_value += rd->calculated_value; + } + else { + if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: THIS IS THE FIRST POINT", st->name); + } + break; - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value, rd->calculated_value); - rd->last_calculated_value = rd->calculated_value; + case RRDDIM_ABSOLUTE: + case RRDDIM_PCENT_OVER_ROW_TOTAL: + case RRDDIM_PCENT_OVER_DIFF_TOTAL: + if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value, rd->calculated_value); + rd->last_calculated_value = rd->calculated_value; + break; } rd->calculated_value = 0; @@ -1452,6 +1519,7 @@ unsigned long long rrdset_done(RRDSET *st) // ALL DONE ABOUT THE DATA UPDATE // -------------------------------------------------------------------- +/* // find if there are any obsolete dimensions (not updated recently) if(unlikely(rrd_delete_unupdated_dimensions)) { @@ -1460,6 +1528,7 @@ unsigned long long rrdset_done(RRDSET *st) break; if(unlikely(rd)) { + RRDDIM *last; // there is dimension to free // upgrade our read lock to a write lock pthread_rwlock_unlock(&st->rwlock); @@ -1497,11 +1566,12 @@ unsigned long long rrdset_done(RRDSET *st) } } } +*/ pthread_rwlock_unlock(&st->rwlock); - if(unlikely(pthread_setcancelstate(oldstate, NULL) != 0)) - error("Cannot set pthread cancel state to RESTORE (%d).", oldstate); + if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0)) + error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate); return(st->usec_since_last_update); } |