diff options
Diffstat (limited to 'libnetdata')
34 files changed, 3724 insertions, 847 deletions
diff --git a/libnetdata/Makefile.am b/libnetdata/Makefile.am index e787801c2..167d05caa 100644 --- a/libnetdata/Makefile.am +++ b/libnetdata/Makefile.am @@ -17,6 +17,7 @@ SUBDIRS = \ health \ locks \ log \ + onewayalloc \ popen \ procfile \ simple_pattern \ @@ -25,6 +26,7 @@ SUBDIRS = \ storage_number \ threads \ url \ + worker_utilization \ tests \ $(NULL) diff --git a/libnetdata/avl/avl.c b/libnetdata/avl/avl.c index b05b97acb..5a4c1a983 100644 --- a/libnetdata/avl/avl.c +++ b/libnetdata/avl/avl.c @@ -372,9 +372,9 @@ void avl_destroy_lock(avl_tree_lock *tree) { int lock; #ifdef AVL_LOCK_WITH_MUTEX - lock = pthread_mutex_destroy(&tree->mutex); + lock = netdata_mutex_destroy(&tree->mutex); #else - lock = pthread_rwlock_destroy(&tree->rwlock); + lock = netdata_rwlock_destroy(&tree->rwlock); #endif if(lock != 0) diff --git a/libnetdata/buffer/buffer.c b/libnetdata/buffer/buffer.c index 8ea90985c..880417551 100644 --- a/libnetdata/buffer/buffer.c +++ b/libnetdata/buffer/buffer.c @@ -136,6 +136,24 @@ void buffer_print_llu(BUFFER *wb, unsigned long long uvalue) wb->len += wstr - str; } +void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len) { + if(unlikely(!txt || !*txt)) return; + + buffer_need_bytes(wb, len + 1); + + char *s = &wb->buffer[wb->len]; + const char *end = &txt[len + 1]; + + while(txt != end) + *s++ = *txt++; + + wb->len += len; + + // keep it NULL terminating + // not counting it at wb->len + wb->buffer[wb->len] = '\0'; +} + void buffer_strcat(BUFFER *wb, const char *txt) { // buffer_sprintf(wb, "%s", txt); @@ -159,8 +177,7 @@ void buffer_strcat(BUFFER *wb, const char *txt) if(*txt) { debug(D_WEB_BUFFER, "strcat(): increasing web_buffer at position %zu, size = %zu\n", wb->len, wb->size); len = strlen(txt); - buffer_increase(wb, len); - buffer_strcat(wb, txt); + buffer_fast_strcat(wb, txt, len); } else { // terminate the string @@ -236,15 +253,23 @@ void buffer_vsprintf(BUFFER *wb, const char *fmt, va_list args) { if(unlikely(!fmt || !*fmt)) return; - buffer_need_bytes(wb, 2); + size_t wrote = 0, need = 2, space_remaining = 0; - size_t len = wb->size - wb->len - 1; + do { + need += space_remaining * 2; - wb->len += vsnprintfz(&wb->buffer[wb->len], len, fmt, args); + debug(D_WEB_BUFFER, "web_buffer_sprintf(): increasing web_buffer at position %zu, size = %zu, by %zu bytes (wrote = %zu)\n", wb->len, wb->size, need, wrote); + buffer_need_bytes(wb, need); - buffer_overflow_check(wb); + space_remaining = wb->size - wb->len - 1; - // the buffer is \0 terminated by vsnprintfz + wrote = (size_t) vsnprintfz(&wb->buffer[wb->len], space_remaining, fmt, args); + + } while(wrote >= space_remaining); + + wb->len += wrote; + + // the buffer is \0 terminated by vsnprintf } void buffer_sprintf(BUFFER *wb, const char *fmt, ...) @@ -252,22 +277,21 @@ void buffer_sprintf(BUFFER *wb, const char *fmt, ...) if(unlikely(!fmt || !*fmt)) return; va_list args; - size_t wrote = 0, need = 2, multiplier = 0, len; + size_t wrote = 0, need = 2, space_remaining = 0; do { - need += wrote + multiplier * WEB_DATA_LENGTH_INCREASE_STEP; - multiplier++; + need += space_remaining * 2; debug(D_WEB_BUFFER, "web_buffer_sprintf(): increasing web_buffer at position %zu, size = %zu, by %zu bytes (wrote = %zu)\n", wb->len, wb->size, need, wrote); buffer_need_bytes(wb, need); - len = wb->size - wb->len - 1; + space_remaining = wb->size - wb->len - 1; va_start(args, fmt); - wrote = (size_t) vsnprintfz(&wb->buffer[wb->len], len, fmt, args); + wrote = (size_t) vsnprintfz(&wb->buffer[wb->len], space_remaining, fmt, args); va_end(args); - } while(wrote >= len); + } while(wrote >= space_remaining); wb->len += wrote; @@ -420,16 +444,21 @@ void buffer_increase(BUFFER *b, size_t free_size_required) { buffer_overflow_check(b); size_t left = b->size - b->len; - if(left >= free_size_required) return; - size_t increase = free_size_required - left; - if(increase < WEB_DATA_LENGTH_INCREASE_STEP) increase = WEB_DATA_LENGTH_INCREASE_STEP; + size_t wanted = free_size_required - left; + size_t minimum = WEB_DATA_LENGTH_INCREASE_STEP; + if(minimum > wanted) wanted = minimum; + + size_t optimal = b->size; + if(b->size > 5*1024*1024) optimal = b->size / 2; + + if(optimal > wanted) wanted = optimal; - debug(D_WEB_BUFFER, "Increasing data buffer from size %zu to %zu.", b->size, b->size + increase); + debug(D_WEB_BUFFER, "Increasing data buffer from size %zu to %zu.", b->size, b->size + wanted); - b->buffer = reallocz(b->buffer, b->size + increase + sizeof(BUFFER_OVERFLOW_EOF) + 2); - b->size += increase; + b->buffer = reallocz(b->buffer, b->size + wanted + sizeof(BUFFER_OVERFLOW_EOF) + 2); + b->size += wanted; buffer_overflow_init(b); buffer_overflow_check(b); diff --git a/libnetdata/buffer/buffer.h b/libnetdata/buffer/buffer.h index d50910c51..ceaeadd9b 100644 --- a/libnetdata/buffer/buffer.h +++ b/libnetdata/buffer/buffer.h @@ -55,6 +55,7 @@ extern const char *buffer_tostring(BUFFER *wb); extern void buffer_reset(BUFFER *wb); extern void buffer_strcat(BUFFER *wb, const char *txt); +extern void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len); extern void buffer_rrd_value(BUFFER *wb, calculated_number value); extern void buffer_date(BUFFER *wb, int year, int month, int day, int hours, int minutes, int seconds); diff --git a/libnetdata/clocks/clocks.c b/libnetdata/clocks/clocks.c index 4ec5fa98b..f0e17b232 100644 --- a/libnetdata/clocks/clocks.c +++ b/libnetdata/clocks/clocks.c @@ -2,8 +2,13 @@ #include "../libnetdata.h" -static int clock_boottime_valid = 1; -static int clock_monotonic_coarse_valid = 1; +// defaults are for compatibility +// call clocks_init() once, to optimize these default settings +static clockid_t clock_boottime_to_use = CLOCK_MONOTONIC; +static clockid_t clock_monotonic_to_use = CLOCK_MONOTONIC; + +usec_t clock_monotonic_resolution = 1000; +usec_t clock_realtime_resolution = 1000; #ifndef HAVE_CLOCK_GETTIME inline int clock_gettime(clockid_t clk_id, struct timespec *ts) { @@ -18,19 +23,60 @@ inline int clock_gettime(clockid_t clk_id, struct timespec *ts) { } #endif -void test_clock_boottime(void) { +// Similar to CLOCK_MONOTONIC, but provides access to a raw hardware-based time that is not subject to NTP adjustments +// or the incremental adjustments performed by adjtime(3). This clock does not count time that the system is suspended + +static void test_clock_monotonic_raw(void) { +#ifdef CLOCK_MONOTONIC_RAW + struct timespec ts; + if(clock_gettime(CLOCK_MONOTONIC_RAW, &ts) == -1 && errno == EINVAL) + clock_monotonic_to_use = CLOCK_MONOTONIC; + else + clock_monotonic_to_use = CLOCK_MONOTONIC_RAW; +#else + clock_monotonic_to_use = CLOCK_MONOTONIC; +#endif +} + +// When running a binary with CLOCK_BOOTTIME defined on a system with a linux kernel older than Linux 2.6.39 the +// clock_gettime(2) system call fails with EINVAL. In that case it must fall-back to CLOCK_MONOTONIC. + +static void test_clock_boottime(void) { struct timespec ts; if(clock_gettime(CLOCK_BOOTTIME, &ts) == -1 && errno == EINVAL) - clock_boottime_valid = 0; + clock_boottime_to_use = clock_monotonic_to_use; + else + clock_boottime_to_use = CLOCK_BOOTTIME; } -void test_clock_monotonic_coarse(void) { +static usec_t get_clock_resolution(clockid_t clock) { struct timespec ts; - if(clock_gettime(CLOCK_MONOTONIC_COARSE, &ts) == -1 && errno == EINVAL) - clock_monotonic_coarse_valid = 0; + clock_getres(clock, &ts); + return ts.tv_sec * USEC_PER_SEC + ts.tv_nsec * NSEC_PER_USEC; +} + +// perform any initializations required for clocks + +void clocks_init(void) { + // monotonic raw has to be tested before boottime + test_clock_monotonic_raw(); + + // boottime has to be tested after monotonic coarse + test_clock_boottime(); + + clock_monotonic_resolution = get_clock_resolution(clock_monotonic_to_use); + clock_realtime_resolution = get_clock_resolution(CLOCK_REALTIME); + + // if for any reason these are zero, netdata will crash + // since we use them as modulo to calculations + if(!clock_realtime_resolution) + clock_realtime_resolution = 1000; + + if(!clock_monotonic_resolution) + clock_monotonic_resolution = 1000; } -static inline time_t now_sec(clockid_t clk_id) { +inline time_t now_sec(clockid_t clk_id) { struct timespec ts; if(unlikely(clock_gettime(clk_id, &ts) == -1)) { error("clock_gettime(%d, ×pec) failed.", clk_id); @@ -39,7 +85,7 @@ static inline time_t now_sec(clockid_t clk_id) { return ts.tv_sec; } -static inline usec_t now_usec(clockid_t clk_id) { +inline usec_t now_usec(clockid_t clk_id) { struct timespec ts; if(unlikely(clock_gettime(clk_id, &ts) == -1)) { error("clock_gettime(%d, ×pec) failed.", clk_id); @@ -48,7 +94,7 @@ static inline usec_t now_usec(clockid_t clk_id) { return (usec_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC; } -static inline int now_timeval(clockid_t clk_id, struct timeval *tv) { +inline int now_timeval(clockid_t clk_id, struct timeval *tv) { struct timespec ts; if(unlikely(clock_gettime(clk_id, &ts) == -1)) { @@ -76,15 +122,15 @@ inline int now_realtime_timeval(struct timeval *tv) { } inline time_t now_monotonic_sec(void) { - return now_sec(likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC); + return now_sec(clock_monotonic_to_use); } inline usec_t now_monotonic_usec(void) { - return now_usec(likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC); + return now_usec(clock_monotonic_to_use); } inline int now_monotonic_timeval(struct timeval *tv) { - return now_timeval(likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC, tv); + return now_timeval(clock_monotonic_to_use, tv); } inline time_t now_monotonic_high_precision_sec(void) { @@ -100,19 +146,15 @@ inline int now_monotonic_high_precision_timeval(struct timeval *tv) { } inline time_t now_boottime_sec(void) { - return now_sec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : - likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC); + return now_sec(clock_boottime_to_use); } inline usec_t now_boottime_usec(void) { - return now_usec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : - likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC); + return now_usec(clock_boottime_to_use); } inline int now_boottime_timeval(struct timeval *tv) { - return now_timeval(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : - likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC, - tv); + return now_timeval(clock_boottime_to_use, tv); } inline usec_t timeval_usec(struct timeval *tv) { @@ -137,9 +179,113 @@ inline usec_t dt_usec(struct timeval *now, struct timeval *old) { return (ts1 > ts2) ? (ts1 - ts2) : (ts2 - ts1); } -inline void heartbeat_init(heartbeat_t *hb) -{ - hb->monotonic = hb->realtime = 0ULL; +#ifdef __linux__ +void sleep_to_absolute_time(usec_t usec) { + static int einval_printed = 0, enotsup_printed = 0, eunknown_printed = 0; + clockid_t clock = CLOCK_REALTIME; + + struct timespec req = { + .tv_sec = (time_t)(usec / USEC_PER_SEC), + .tv_nsec = (suseconds_t)((usec % USEC_PER_SEC) * NSEC_PER_USEC) + }; + + int ret = 0; + while( (ret = clock_nanosleep(clock, TIMER_ABSTIME, &req, NULL)) != 0 ) { + if(ret == EINTR) continue; + else { + if (ret == EINVAL) { + if (!einval_printed) { + einval_printed++; + error( + "Invalid time given to clock_nanosleep(): clockid = %d, tv_sec = %ld, tv_nsec = %ld", + clock, + req.tv_sec, + req.tv_nsec); + } + } else if (ret == ENOTSUP) { + if (!enotsup_printed) { + enotsup_printed++; + error( + "Invalid clock id given to clock_nanosleep(): clockid = %d, tv_sec = %ld, tv_nsec = %ld", + clock, + req.tv_sec, + req.tv_nsec); + } + } else { + if (!eunknown_printed) { + eunknown_printed++; + error( + "Unknown return value %d from clock_nanosleep(): clockid = %d, tv_sec = %ld, tv_nsec = %ld", + ret, + clock, + req.tv_sec, + req.tv_nsec); + } + } + sleep_usec(usec); + } + } +}; +#endif + +#define HEARTBEAT_ALIGNMENT_STATISTICS_SIZE 10 +netdata_mutex_t heartbeat_alignment_mutex = NETDATA_MUTEX_INITIALIZER; +static size_t heartbeat_alignment_id = 0; + +struct heartbeat_thread_statistics { + size_t sequence; + usec_t dt; +}; +static struct heartbeat_thread_statistics heartbeat_alignment_values[HEARTBEAT_ALIGNMENT_STATISTICS_SIZE] = { 0 }; + +void heartbeat_statistics(usec_t *min_ptr, usec_t *max_ptr, usec_t *average_ptr, size_t *count_ptr) { + struct heartbeat_thread_statistics current[HEARTBEAT_ALIGNMENT_STATISTICS_SIZE]; + static struct heartbeat_thread_statistics old[HEARTBEAT_ALIGNMENT_STATISTICS_SIZE] = { 0 }; + + memcpy(current, heartbeat_alignment_values, sizeof(struct heartbeat_thread_statistics) * HEARTBEAT_ALIGNMENT_STATISTICS_SIZE); + + usec_t min = 0, max = 0, total = 0, average = 0; + size_t i, count = 0; + for(i = 0; i < HEARTBEAT_ALIGNMENT_STATISTICS_SIZE ;i++) { + if(current[i].sequence == old[i].sequence) continue; + usec_t value = current[i].dt - old[i].dt; + + if(!count) { + min = max = total = value; + count = 1; + } + else { + total += value; + if(value < min) min = value; + if(value > max) max = value; + count++; + } + } + if(count) + average = total / count; + + if(min_ptr) *min_ptr = min; + if(max_ptr) *max_ptr = max; + if(average_ptr) *average_ptr = average; + if(count_ptr) *count_ptr = count; + + memcpy(old, current, sizeof(struct heartbeat_thread_statistics) * HEARTBEAT_ALIGNMENT_STATISTICS_SIZE); +} + +inline void heartbeat_init(heartbeat_t *hb) { + hb->realtime = 0ULL; + hb->randomness = 250 * USEC_PER_MS + ((now_realtime_usec() * clock_realtime_resolution) % (250 * USEC_PER_MS)); + hb->randomness -= (hb->randomness % clock_realtime_resolution); + + netdata_mutex_lock(&heartbeat_alignment_mutex); + hb->statistics_id = heartbeat_alignment_id; + heartbeat_alignment_id++; + netdata_mutex_unlock(&heartbeat_alignment_mutex); + + if(hb->statistics_id < HEARTBEAT_ALIGNMENT_STATISTICS_SIZE) { + heartbeat_alignment_values[hb->statistics_id].dt = 0; + heartbeat_alignment_values[hb->statistics_id].sequence = 0; + } } // waits for the next heartbeat @@ -147,96 +293,81 @@ inline void heartbeat_init(heartbeat_t *hb) // it returns the dt using the realtime clock usec_t heartbeat_next(heartbeat_t *hb, usec_t tick) { - heartbeat_t now; - now.monotonic = now_monotonic_usec(); - now.realtime = now_realtime_usec(); - - usec_t next_monotonic = now.monotonic - (now.monotonic % tick) + tick; - - while(now.monotonic < next_monotonic) { - sleep_usec(next_monotonic - now.monotonic); - now.monotonic = now_monotonic_usec(); - now.realtime = now_realtime_usec(); + if(unlikely(hb->randomness > tick / 2)) { + // TODO: The heartbeat tick should be specified at the heartbeat_init() function + usec_t tmp = (now_realtime_usec() * clock_realtime_resolution) % (tick / 2); + info("heartbeat randomness of %llu is too big for a tick of %llu - setting it to %llu", hb->randomness, tick, tmp); + hb->randomness = tmp; } - if(likely(hb->realtime != 0ULL)) { - usec_t dt_monotonic = now.monotonic - hb->monotonic; - usec_t dt_realtime = now.realtime - hb->realtime; + usec_t dt; + usec_t now = now_realtime_usec(); + usec_t next = now - (now % tick) + tick + hb->randomness; - hb->monotonic = now.monotonic; - hb->realtime = now.realtime; + // align the next time we want to the clock resolution + if(next % clock_realtime_resolution) + next = next - (next % clock_realtime_resolution) + clock_realtime_resolution; - if(unlikely(dt_monotonic >= tick + tick / 2)) { - errno = 0; - error("heartbeat missed %llu monotonic microseconds", dt_monotonic - tick); - } + // sleep_usec() has a loop to guarantee we will sleep for at least the requested time. + // According the specs, when we sleep for a relative time, clock adjustments should not affect the duration + // we sleep. + sleep_usec(next - now); + now = now_realtime_usec(); + dt = now - hb->realtime; - return dt_realtime; + if(hb->statistics_id < HEARTBEAT_ALIGNMENT_STATISTICS_SIZE) { + heartbeat_alignment_values[hb->statistics_id].dt += now - next; + heartbeat_alignment_values[hb->statistics_id].sequence++; } - else { - hb->monotonic = now.monotonic; - hb->realtime = now.realtime; - return 0ULL; + + if(unlikely(now < next)) { + errno = 0; + error("heartbeat clock: woke up %llu microseconds earlier than expected (can be due to the CLOCK_REALTIME set to the past).", next - now); + } + else if(unlikely(now - next > tick / 2)) { + errno = 0; + error("heartbeat clock: woke up %llu microseconds later than expected (can be due to system load or the CLOCK_REALTIME set to the future).", now - next); } -} -// returned the elapsed time, since the last heartbeat -// using the monotonic clock + if(unlikely(!hb->realtime)) { + // the first time return zero + dt = 0; + } -inline usec_t heartbeat_monotonic_dt_to_now_usec(heartbeat_t *hb) { - if(!hb || !hb->monotonic) return 0ULL; - return now_monotonic_usec() - hb->monotonic; + hb->realtime = now; + return dt; } -int sleep_usec(usec_t usec) { - -#ifndef NETDATA_WITH_USLEEP +void sleep_usec(usec_t usec) { // we expect microseconds (1.000.000 per second) // but timespec is nanoseconds (1.000.000.000 per second) struct timespec rem, req = { - .tv_sec = (time_t) (usec / 1000000), - .tv_nsec = (suseconds_t) ((usec % 1000000) * 1000) + .tv_sec = (time_t) (usec / USEC_PER_SEC), + .tv_nsec = (suseconds_t) ((usec % USEC_PER_SEC) * NSEC_PER_USEC) }; - while (nanosleep(&req, &rem) == -1) { +#ifdef __linux__ + while ((errno = clock_nanosleep(CLOCK_REALTIME, 0, &req, &rem)) != 0) { +#else + while ((errno = nanosleep(&req, &rem)) != 0) { +#endif if (likely(errno == EINTR)) { - debug(D_SYSTEM, "nanosleep() interrupted (while sleeping for %llu microseconds).", usec); req.tv_sec = rem.tv_sec; req.tv_nsec = rem.tv_nsec; } else { +#ifdef __linux__ + error("Cannot clock_nanosleep(CLOCK_REALTIME) for %llu microseconds.", usec); +#else error("Cannot nanosleep() for %llu microseconds.", usec); +#endif break; } } - - return 0; -#else - int ret = usleep(usec); - if(unlikely(ret == -1 && errno == EINVAL)) { - // on certain systems, usec has to be up to 999999 - if(usec > 999999) { - int counter = usec / 999999; - while(counter--) - usleep(999999); - - usleep(usec % 999999); - } - else { - error("Cannot usleep() for %llu microseconds.", usec); - return ret; - } - } - - if(ret != 0) - error("usleep() failed for %llu microseconds.", usec); - - return ret; -#endif } static inline collected_number uptime_from_boottime(void) { #ifdef CLOCK_BOOTTIME_IS_AVAILABLE - return now_boottime_usec() / 1000; + return (collected_number)(now_boottime_usec() / USEC_PER_MS); #else error("uptime cannot be read from CLOCK_BOOTTIME on this system."); return 0; diff --git a/libnetdata/clocks/clocks.h b/libnetdata/clocks/clocks.h index d3475df6a..53c036ece 100644 --- a/libnetdata/clocks/clocks.h +++ b/libnetdata/clocks/clocks.h @@ -22,8 +22,9 @@ typedef unsigned long long usec_t; typedef long long susec_t; typedef struct heartbeat { - usec_t monotonic; usec_t realtime; + usec_t randomness; + size_t statistics_id; } heartbeat_t; /* Linux value is as good as any other */ @@ -36,20 +37,14 @@ typedef struct heartbeat { #define CLOCK_MONOTONIC CLOCK_REALTIME #endif -/* Prefer CLOCK_MONOTONIC_COARSE where available to reduce overhead. It has the same semantics as CLOCK_MONOTONIC */ -#ifndef CLOCK_MONOTONIC_COARSE -/* fallback to CLOCK_MONOTONIC if not available */ -#define CLOCK_MONOTONIC_COARSE CLOCK_MONOTONIC -#endif - #ifndef CLOCK_BOOTTIME #ifdef CLOCK_UPTIME /* CLOCK_BOOTTIME falls back to CLOCK_UPTIME on FreeBSD */ #define CLOCK_BOOTTIME CLOCK_UPTIME #else // CLOCK_UPTIME -/* CLOCK_BOOTTIME falls back to CLOCK_MONOTONIC */ -#define CLOCK_BOOTTIME CLOCK_MONOTONIC_COARSE +/* CLOCK_BOOTTIME falls back to CLOCK_REALTIME */ +#define CLOCK_BOOTTIME CLOCK_REALTIME #endif // CLOCK_UPTIME #else // CLOCK_BOOTTIME @@ -115,8 +110,6 @@ extern int clock_gettime(clockid_t clk_id, struct timespec *ts); * All now_*_sec() functions return the time in seconds from the appropriate clock, or 0 on error. * All now_*_usec() functions return the time in microseconds from the appropriate clock, or 0 on error. * - * Most functions will attempt to use CLOCK_MONOTONIC_COARSE if available to reduce contention overhead and improve - * performance scaling. If high precision is required please use one of the available now_*_high_precision_* functions. */ extern int now_realtime_timeval(struct timeval *tv); extern time_t now_realtime_sec(void); @@ -133,7 +126,6 @@ extern int now_boottime_timeval(struct timeval *tv); extern time_t now_boottime_sec(void); extern usec_t now_boottime_usec(void); - extern usec_t timeval_usec(struct timeval *tv); extern msec_t timeval_msec(struct timeval *tv); @@ -147,23 +139,22 @@ extern void heartbeat_init(heartbeat_t *hb); */ extern usec_t heartbeat_next(heartbeat_t *hb, usec_t tick); -/* Returns elapsed time in microseconds since last heartbeat */ -extern usec_t heartbeat_monotonic_dt_to_now_usec(heartbeat_t *hb); +extern void heartbeat_statistics(usec_t *min_ptr, usec_t *max_ptr, usec_t *average_ptr, size_t *count_ptr); -extern int sleep_usec(usec_t usec); +extern void sleep_usec(usec_t usec); -/* - * When running a binary with CLOCK_BOOTTIME defined on a system with a linux kernel older than Linux 2.6.39 the - * clock_gettime(2) system call fails with EINVAL. In that case it must fall-back to CLOCK_MONOTONIC. - */ -void test_clock_boottime(void); +extern void clocks_init(void); -/* - * When running a binary with CLOCK_MONOTONIC_COARSE defined on a system with a linux kernel older than Linux 2.6.32 the - * clock_gettime(2) system call fails with EINVAL. In that case it must fall-back to CLOCK_MONOTONIC. - */ -void test_clock_monotonic_coarse(void); +// lower level functions - avoid using directly +extern time_t now_sec(clockid_t clk_id); +extern usec_t now_usec(clockid_t clk_id); +extern int now_timeval(clockid_t clk_id, struct timeval *tv); extern collected_number uptime_msec(char *filename); +extern usec_t clock_monotonic_resolution; +extern usec_t clock_realtime_resolution; + +extern void sleep_to_absolute_time(usec_t usec); + #endif /* NETDATA_CLOCKS_H */ diff --git a/libnetdata/completion/completion.c b/libnetdata/completion/completion.c index 77818f40d..b5ac86e4f 100644 --- a/libnetdata/completion/completion.c +++ b/libnetdata/completion/completion.c @@ -29,6 +29,6 @@ void completion_mark_complete(struct completion *p) { uv_mutex_lock(&p->mutex); p->completed = 1; - uv_mutex_unlock(&p->mutex); uv_cond_broadcast(&p->cond); + uv_mutex_unlock(&p->mutex); } diff --git a/libnetdata/config/appconfig.c b/libnetdata/config/appconfig.c index 0daa6e5e4..0272877bf 100644 --- a/libnetdata/config/appconfig.c +++ b/libnetdata/config/appconfig.c @@ -257,6 +257,54 @@ void appconfig_section_destroy_non_loaded(struct config *root, const char *secti freez(co); } +void appconfig_section_option_destroy_non_loaded(struct config *root, const char *section, const char *name) +{ + debug(D_CONFIG, "Destroying section option '%s -> %s'.", section, name); + + struct section *co; + co = appconfig_section_find(root, section); + if (!co) { + error("Could not destroy section option '%s -> %s'. The section not found.", section, name); + return; + } + + config_section_wrlock(co); + + struct config_option *cv; + + cv = appconfig_option_index_find(co, name, simple_hash(name)); + + if (cv && cv->flags & CONFIG_VALUE_LOADED) { + config_section_unlock(co); + return; + } + + if (unlikely(!(cv && appconfig_option_index_del(co, cv)))) { + config_section_unlock(co); + error("Could not destroy section option '%s -> %s'. The option not found.", section, name); + return; + } + + if (co->values == cv) { + co->values = co->values->next; + } else { + struct config_option *cv_cur = co->values, *cv_prev = NULL; + while (cv_cur && cv_cur != cv) { + cv_prev = cv_cur; + cv_cur = cv_cur->next; + } + if (cv_cur) { + cv_prev->next = cv_cur->next; + } + } + + freez(cv->value); + freez(cv->name); + freez(cv); + + config_section_unlock(co); + return; +} // ---------------------------------------------------------------------------- // config name-value methods @@ -757,49 +805,40 @@ void appconfig_generate(struct config *root, BUFFER *wb, int only_changed) struct section *co; struct config_option *cv; - for(i = 0; i < 3 ;i++) { - switch(i) { - case 0: - buffer_strcat(wb, - "# netdata configuration\n" - "#\n" - "# You can download the latest version of this file, using:\n" - "#\n" - "# wget -O /etc/netdata/netdata.conf http://localhost:19999/netdata.conf\n" - "# or\n" - "# curl -o /etc/netdata/netdata.conf http://localhost:19999/netdata.conf\n" - "#\n" - "# You can uncomment and change any of the options below.\n" - "# The value shown in the commented settings, is the default value.\n" - "#\n" - "\n# global netdata configuration\n"); - break; - - case 1: - buffer_strcat(wb, "\n\n# per plugin configuration\n"); - break; - - case 2: - buffer_strcat(wb, "\n\n# per chart configuration\n"); - break; - } - + buffer_strcat(wb, + "# netdata configuration\n" + "#\n" + "# You can download the latest version of this file, using:\n" + "#\n" + "# wget -O /etc/netdata/netdata.conf http://localhost:19999/netdata.conf\n" + "# or\n" + "# curl -o /etc/netdata/netdata.conf http://localhost:19999/netdata.conf\n" + "#\n" + "# You can uncomment and change any of the options below.\n" + "# The value shown in the commented settings, is the default value.\n" + "#\n" + "\n# global netdata configuration\n"); + + for(i = 0; i <= 15 ;i++) { appconfig_wrlock(root); for(co = root->first_section; co ; co = co->next) { - if(!strcmp(co->name, CONFIG_SECTION_GLOBAL) - || !strcmp(co->name, CONFIG_SECTION_WEB) - || !strcmp(co->name, CONFIG_SECTION_STATSD) - || !strcmp(co->name, CONFIG_SECTION_PLUGINS) - || !strcmp(co->name, CONFIG_SECTION_CLOUD) - || !strcmp(co->name, CONFIG_SECTION_REGISTRY) - || !strcmp(co->name, CONFIG_SECTION_HEALTH) - || !strcmp(co->name, CONFIG_SECTION_STREAM) - || !strcmp(co->name, CONFIG_SECTION_HOST_LABEL) - || !strcmp(co->name, CONFIG_SECTION_ML) - ) - pri = 0; - else if(!strncmp(co->name, "plugin:", 7)) pri = 1; - else pri = 2; + if(!strcmp(co->name, CONFIG_SECTION_GLOBAL)) pri = 0; + else if(!strcmp(co->name, CONFIG_SECTION_DIRECTORIES)) pri = 1; + else if(!strcmp(co->name, CONFIG_SECTION_LOGS)) pri = 2; + else if(!strcmp(co->name, CONFIG_SECTION_ENV_VARS)) pri = 3; + else if(!strcmp(co->name, CONFIG_SECTION_HOST_LABEL)) pri = 4; + else if(!strcmp(co->name, CONFIG_SECTION_SQLITE)) pri = 5; + else if(!strcmp(co->name, CONFIG_SECTION_CLOUD)) pri = 6; + else if(!strcmp(co->name, CONFIG_SECTION_ML)) pri = 7; + else if(!strcmp(co->name, CONFIG_SECTION_HEALTH)) pri = 8; + else if(!strcmp(co->name, CONFIG_SECTION_WEB)) pri = 9; + // by default, new sections will get pri = 10 (set at the end, below) + else if(!strcmp(co->name, CONFIG_SECTION_REGISTRY)) pri = 11; + else if(!strcmp(co->name, CONFIG_SECTION_GLOBAL_STATISTICS)) pri = 12; + else if(!strcmp(co->name, CONFIG_SECTION_PLUGINS)) pri = 13; + else if(!strcmp(co->name, CONFIG_SECTION_STATSD)) pri = 14; + else if(!strncmp(co->name, "plugin:", 7)) pri = 15; // << change the loop too if you change this + else pri = 10; // this is used for any new (currently unknown) sections if(i == pri) { int loaded = 0; diff --git a/libnetdata/config/appconfig.h b/libnetdata/config/appconfig.h index b5cf77419..f1f61e31d 100644 --- a/libnetdata/config/appconfig.h +++ b/libnetdata/config/appconfig.h @@ -82,19 +82,24 @@ #define CONFIG_FILENAME "netdata.conf" -#define CONFIG_SECTION_GLOBAL "global" -#define CONFIG_SECTION_WEB "web" -#define CONFIG_SECTION_STATSD "statsd" -#define CONFIG_SECTION_PLUGINS "plugins" -#define CONFIG_SECTION_CLOUD "cloud" -#define CONFIG_SECTION_REGISTRY "registry" -#define CONFIG_SECTION_HEALTH "health" -#define CONFIG_SECTION_STREAM "stream" -#define CONFIG_SECTION_ML "ml" -#define CONFIG_SECTION_EXPORTING "exporting:global" -#define CONFIG_SECTION_PROMETHEUS "prometheus:exporter" -#define CONFIG_SECTION_HOST_LABEL "host labels" -#define EXPORTING_CONF "exporting.conf" +#define CONFIG_SECTION_GLOBAL "global" +#define CONFIG_SECTION_DIRECTORIES "directories" +#define CONFIG_SECTION_LOGS "logs" +#define CONFIG_SECTION_ENV_VARS "environment variables" +#define CONFIG_SECTION_SQLITE "sqlite" +#define CONFIG_SECTION_WEB "web" +#define CONFIG_SECTION_STATSD "statsd" +#define CONFIG_SECTION_PLUGINS "plugins" +#define CONFIG_SECTION_CLOUD "cloud" +#define CONFIG_SECTION_REGISTRY "registry" +#define CONFIG_SECTION_HEALTH "health" +#define CONFIG_SECTION_STREAM "stream" +#define CONFIG_SECTION_ML "ml" +#define CONFIG_SECTION_EXPORTING "exporting:global" +#define CONFIG_SECTION_PROMETHEUS "prometheus:exporter" +#define CONFIG_SECTION_HOST_LABEL "host labels" +#define EXPORTING_CONF "exporting.conf" +#define CONFIG_SECTION_GLOBAL_STATISTICS "global statistics" // these are used to limit the configuration names and values lengths // they are not enforced by config.c functions (they will strdup() all strings, no matter of their length) @@ -183,6 +188,7 @@ extern void appconfig_generate(struct config *root, BUFFER *wb, int only_changed extern int appconfig_section_compare(void *a, void *b); extern void appconfig_section_destroy_non_loaded(struct config *root, const char *section); +extern void appconfig_section_option_destroy_non_loaded(struct config *root, const char *section, const char *name); extern int config_parse_duration(const char* string, int* result); diff --git a/libnetdata/dictionary/README.md b/libnetdata/dictionary/README.md index 6049c7f66..28d0cfbbd 100644 --- a/libnetdata/dictionary/README.md +++ b/libnetdata/dictionary/README.md @@ -2,4 +2,206 @@ custom_edit_url: https://github.com/netdata/netdata/edit/master/libnetdata/dictionary/README.md --> +# Dictionaries +Netdata dictionaries associate a `name` with a `value`: + +- A `name` can be any string. +- A `value` can be anything. + +Such a pair of a `name` and a `value` consists of an `item` or an `entry` in the dictionary. + +Dictionaries provide an interface to: + +- **Add** an item to the dictionary +- **Get** an item from the dictionary (provided its `name`) +- **Delete** an item from the dictionary (provided its `name`) +- **Traverse** the list of items in the dictionary + +Dictionaries are **ordered**, meaning that the order they have been added is preserved while traversing them. The caller may reverse this order by passing the flag `DICTIONARY_FLAG_ADD_IN_FRONT` when creating the dictionary. + +Dictionaries guarantee **uniqueness** of all items added to them, meaning that only one item with a given name can exist in the dictionary at any given time. + +Dictionaries are extremely fast in all operations. They are indexing the keys with `JudyHS` and they utilize a double-linked-list for the traversal operations. Deletion is the most expensive operation, usually somewhat slower than insertion. + +## Memory management + +Dictionaries come with 2 memory management options: + +- **Clone** (copy) the name and/or the value to memory allocated by the dictionary. +- **Link** the name and/or the value, without allocating any memory about them. + +In **clone** mode, the dictionary guarantees that all operations on the dictionary items will automatically take care of the memory used by the name and/or the value. In case the value is an object needs to have user allocated memory, the following callback functions can be registered: + + 1.`dictionary_register_insert_callback()` that will be called just after the insertion of an item to the dictionary, or after the replacement of the value of a dictionary item (but while the dictionary is write-locked - if locking is enabled). + 2. `dictionary_register_delete_callback()` that will be called just prior to the deletion of an item from the dictionary, or prior to the replacement of the value of a dictionary item (but while the dictionary is write-locked - if locking is enabled). + 3. `dictionary_register_conflict_callback()` that will be called when `DICTIONARY_FLAG_DONT_OVERWRITE_VALUE` is set and another value is attempted to be inserted for the same key. + +In **link** mode, the name and/or the value are just linked to the dictionary item, and it is the user's responsibility to free the memory used after an item is deleted from the dictionary. + +By default, **clone** mode is used for both the name and the value. + +To use **link** mode for names, add `DICTIONARY_FLAG_NAME_LINK_DONT_CLONE` to the flags when creating the dictionary. + +To use **link** mode for values, add `DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE` to the flags when creating the dictionary. + +## Locks + +The dictionary allows both **single-threaded** operation (no locks - faster) and **multi-threaded** operation utilizing a read-write lock. + +The default is **multi-threaded**. To enable **single-threaded** add `DICTIONARY_FLAG_SINGLE_THREADED` to the flags when creating the dictionary. + +## Hash table operations + +The dictionary supports the following operations supported by the hash table: + +- `dictionary_set()` to add an item to the dictionary, or change its value. +- `dictionary_get()` to get an item from the dictionary. +- `dictionary_del()` to delete an item from the dictionary. + +## Creation and destruction + +Use `dictionary_create()` to create a dictionary. + +Use `dictionary_destroy()` to destroy a dictionary. When destroyed, a dictionary frees all the memory it has allocated on its own. The exception is the registration of a deletion callback function that can be called on deletion of an item, which may free additional resources. + +### dictionary_set() + +This call is used to: + +- **add** an item to the dictionary. +- **reset** the value of an existing item in the dictionary. + +If **resetting** is not desired, add `DICTIONARY_FLAG_DONT_OVERWRITE_VALUE` to the flags when creating the dictionary. In this case, `dictionary_set()` will return the value of the original item found in the dictionary instead of resetting it and the value passed to the call will be ignored. + +For **multi-threaded** operation, the `dictionary_set()` calls get an exclusive write lock on the dictionary. + +The format is: + +```c +value = dictionary_set(dict, name, value, value_len); +``` + +Where: + +* `dict` is a pointer to the dictionary previously created. +* `name` is a pointer to a string to be used as the key of this item. The name must not be `NULL` and must not be an empty string `""`. +* `value` is a pointer to the value associated with this item. In **clone** mode, if `value` is `NULL`, a new memory allocation will be made of `value_len` size and will be initialized to zero. +* `value_len` is the size of the `value` data. If `value_len` is zero, no allocation will be done and the dictionary item will permanently have the `NULL` value. + +> **IMPORTANT**<br/>There is also an **unsafe** version (without locks) of this call. This is to be used when traversing the dictionary. It should never be called without an active lock on the dictionary, which can only be acquired while traversing. + +### dictionary_get() + +This call is used to get the value of an item, given its name. It utilizes the JudyHS hash table for making the lookup. + +For **multi-threaded** operation, the `dictionary_get()` call gets a shared read lock on the dictionary. + +The format is: + +```c +value = dictionary_get(dict, name); +``` + +Where: + +* `dict` is a pointer to the dictionary previously created. +* `name` is a pointer to a string to be used as the key of this item. The name must not be `NULL` and must not be an empty string `""`. + +> **IMPORTANT**<br/>There is also an **unsafe** version (without locks) of this call. This is to be used when traversing the dictionary. It should never be called without an active lock on the dictionary, which can only be acquired while traversing. + +### dictionary_del() + +This call is used to delete an item from the dictionary, given its name. + +If there is a delete callback registered to the dictionary (`dictionary_register_delete_callback()`), it is called prior to the actual deletion of the item. + +For **multi-threaded** operation, the `dictionary_del()` calls get an exclusive write lock on the dictionary. + +The format is: + +```c +value = dictionary_del(dict, name); +``` + +Where: + +* `dict` is a pointer to the dictionary previously created. +* `name` is a pointer to a string to be used as the key of this item. The name must not be `NULL` and must not be an empty string `""`. + +> **IMPORTANT**<br/>There is also an **unsafe** version (without locks) of this call. This is to be used when traversing the dictionary, to delete the current item. It should never be called without an active lock on the dictionary, which can only be acquired while traversing. + +## Traversal + +Dictionaries offer 2 ways to traverse the entire dictionary: + +- **walkthrough**, implemented by setting a callback function to be called for every item. +- **foreach**, a way to traverse the dictionary with a for-next loop. + +Both of these methods are available in **read** or **write** mode. In **read** mode only lookups are allowed to the dictionary. In **write** both lookups but also deletion of the currently working item is also allowed. + +While traversing the dictionary with any of these methods, all calls to the dictionary have to use the `_unsafe` versions of the function calls, otherwise deadlock may arise. + +> **IMPORTANT**<br/>The dictionary itself does not check to ensure that a user is actually using the right lock mode (read or write) while traversing the dictionary for each of the unsafe calls. + +### walkthrough (callback) + +There are 2 calls: + +- `dictionary_walkthrough_read()` that acquires a shared read lock, and it calls a callback function for every item of the dictionary. The callback function may use the unsafe versions of the `dictionary_get()` calls to lookup other items in the dictionary, but it should not add or remove item from the dictionary. +- `dictionary_walkthrough_write()` that acquires an exclusive write lock, and it calls a callback function for every item of the dictionary. This is to be used when items need to be added to the dictionary, or when the current item may need to be deleted. If the callback function deletes any other items, the behavior may be undefined (actually, the item next to the one currently working should not be deleted - a pointer to it is held by the traversal function to move on traversing the dictionary). + +The items are traversed in the same order they have been added to the dictionary (or the reverse order if the flag `DICTIONARY_FLAG_ADD_IN_FRONT` is set during dictionary creation). + +The callback function returns an `int`. If this value is negative, traversal of the dictionary is stopped immediately and the negative value is returned to the caller. If the returned value of all callbacks is zero or positive, the walkthrough functions return the sum of the return values of all callbacks. So, if you are just interested to know how many items fall into some condition, write a callback function that returns 1 when the item satisfies that condition and 0 when it does not and the walkthrough function will return how many tested positive. + +### foreach (for-next loop) + +The following is a snippet of such a loop: + +```c +MY_ITEM *item; +dfe_start_read(dict, item) { + printf("hey, I got an item named '%s' with value ptr %08X", item_name, item); +} +dfe_done(item); +``` + +The `item` parameter gives the name of the pointer to be used while iterating the items. Any name is accepted. + +The `item_name` is a variable that is automatically created, by concatenating whatever is given as `item` and `_name`. So, if you call `dfe_start_read(dict, myvar)`, the name will be `myvar_name`. + +Both `dfe_start_read(dict, item)` and `dfe_done(item)` are together inside a `do { ... } while(0)` loop, so that the following will work: + +```c +MY_ITEM *item; + +if(x = 1) + // do { + dfe_start_read(dict, item) + printf("hey, I got an item named '%s' with value ptr %08X", item_name, item); + dfe_done(item); + // } while(0); +else + something else; +``` + +In the above, the `if(x)` condition will work as expected. It will do the foreach loop when x is 1, otherwise it will run `something else`. + +There are 2 versions of `dfe_start`: + +- `dfe_start_read()` that acquires a shared read lock to the dictionary. +- `dfe_start_write()` that acquires an exclusive write lock to the dictionary. + +While in the loop, depending on the read or write versions of `dfe_start`, the caller may lookup or manipulate the dictionary. The rules are the same with the walkthrough callback functions. + +PS: DFE is Dictionary For Each. + +## special multi-threaded lockless case + +Since the dictionary uses a hash table and a double linked list, if the contract between 2 threads is for one to use the hash table functions only (`set`, `get` - but no `del`) and the other to use the traversal ones only, the dictionary allows concurrent use without locks. + +This is currently used in statsd: + +- the data collection thread uses only `get` and `set`. It never uses `del`. New items are added at the front of the linked list (`DICTIONARY_FLAG_ADD_IN_FRONT`). +- the flushing thread is only traversing the dictionary up to the point it last traversed it (it uses a flag for that to know where it stopped last time). It never uses `get`, `set` or `del`. diff --git a/libnetdata/dictionary/dictionary.c b/libnetdata/dictionary/dictionary.c index b3dc3f371..42285037d 100644 --- a/libnetdata/dictionary/dictionary.c +++ b/libnetdata/dictionary/dictionary.c @@ -1,225 +1,772 @@ // SPDX-License-Identifier: GPL-3.0-or-later +// NOT TO BE USED BY USERS YET +#define DICTIONARY_FLAG_REFERENCE_COUNTERS (1 << 6) // maintain reference counter in walkthrough and foreach + +typedef struct dictionary DICTIONARY; +#define DICTIONARY_INTERNALS + #include "../libnetdata.h" +#ifndef ENABLE_DBENGINE +#define DICTIONARY_WITH_AVL +#warning Compiling DICTIONARY with an AVL index +#else +#define DICTIONARY_WITH_JUDYHS +#endif + +#ifdef DICTIONARY_WITH_JUDYHS +#include <Judy.h> +#endif + +/* + * This version uses JudyHS arrays to index the dictionary + * + * The following output is from the unit test, at the end of this file: + * + * This is the JudyHS version: + * + * 1000000 x dictionary_set() (dictionary size 0 entries, 0 KB)... + * 1000000 x dictionary_get(existing) (dictionary size 1000000 entries, 74001 KB)... + * 1000000 x dictionary_get(non-existing) (dictionary size 1000000 entries, 74001 KB)... + * Walking through the dictionary (dictionary size 1000000 entries, 74001 KB)... + * 1000000 x dictionary_del(existing) (dictionary size 1000000 entries, 74001 KB)... + * 1000000 x dictionary_set() (dictionary size 0 entries, 0 KB)... + * Destroying dictionary (dictionary size 1000000 entries, 74001 KB)... + * + * TIMINGS: + * adding 316027 usec, positive search 156740 usec, negative search 84524, walk through 15036 usec, deleting 361444, destroy 107394 usec + * + * This is from the JudySL version: + * + * Creating dictionary of 1000000 entries... + * Checking index of 1000000 entries... + * Walking 1000000 entries and checking name-value pairs... + * Created and checked 1000000 entries, found 0 errors - used 58376 KB of memory + * Destroying dictionary of 1000000 entries... + * Deleted 1000000 entries + * create 338975 usec, check 156080 usec, walk 80764 usec, destroy 444569 usec + * + * This is the AVL version: + * + * Creating dictionary of 1000000 entries... + * Checking index of 1000000 entries... + * Walking 1000000 entries and checking name-value pairs... + * Created and checked 1000000 entries, found 0 errors - used 89626 KB of memory + * Destroying dictionary of 1000000 entries... + * create 413892 usec, check 220006 usec, walk 34247 usec, destroy 98062 usec + * + * So, the JudySL is a lot slower to WALK and DESTROY (DESTROY does a WALK) + * It is slower, because for every item, JudySL copies the KEY/NAME to a + * caller supplied buffer (Index). So, by just walking over 1 million items, + * JudySL does 1 million strcpy() !!! + * + * It also seems that somehow JudySLDel() is unbelievably slow too! + * + */ + + +/* + * Every item in the dictionary has the following structure. + */ +typedef struct name_value { +#ifdef DICTIONARY_WITH_AVL + avl_t avl_node; +#endif + + struct name_value *next; // a double linked list to allow fast insertions and deletions + struct name_value *prev; + + char *name; // the name of the dictionary item + void *value; // the value of the dictionary item +} NAME_VALUE; + +/* + * When DICTIONARY_FLAG_WITH_STATISTICS is set, we need to keep track of all the memory + * we allocate and free. So, we need to keep track of the sizes of all names and values. + * We do this by overloading NAME_VALUE with the following additional fields. + */ + +typedef enum name_value_flags { + NAME_VALUE_FLAG_NONE = 0, + NAME_VALUE_FLAG_DELETED = (1 << 0), // this item is deleted +} NAME_VALUE_FLAGS; + +typedef struct name_value_with_stats { + NAME_VALUE name_value_data_here; // never used - just to put the lengths at the right position + + size_t name_len; // the size of the name, including the terminating zero + size_t value_len; // the size of the value (assumed binary) + + size_t refcount; // the reference counter + NAME_VALUE_FLAGS flags; // the flags for this item +} NAME_VALUE_WITH_STATS; + +struct dictionary_stats { + size_t inserts; + size_t deletes; + size_t searches; + size_t resets; + size_t entries; + size_t memory; +}; + +struct dictionary { + DICTIONARY_FLAGS flags; // the flags of the dictionary + + NAME_VALUE *first_item; // the double linked list base pointers + NAME_VALUE *last_item; + +#ifdef DICTIONARY_WITH_AVL + avl_tree_type values_index; + NAME_VALUE *hash_base; +#endif + +#ifdef DICTIONARY_WITH_JUDYHS + Pvoid_t JudyHSArray; // the hash table +#endif + + netdata_rwlock_t *rwlock; // the r/w lock when DICTIONARY_FLAG_SINGLE_THREADED is not set + + void (*ins_callback)(const char *name, void *value, void *data); + void *ins_callback_data; + + void (*del_callback)(const char *name, void *value, void *data); + void *del_callback_data; + + void (*conflict_callback)(const char *name, void *old_value, void *new_value, void *data); + void *conflict_callback_data; + + struct dictionary_stats *stats; // the statistics when DICTIONARY_FLAG_WITH_STATISTICS is set +}; + +void dictionary_register_insert_callback(DICTIONARY *dict, void (*ins_callback)(const char *name, void *value, void *data), void *data) { + dict->ins_callback = ins_callback; + dict->ins_callback_data = data; +} + +void dictionary_register_delete_callback(DICTIONARY *dict, void (*del_callback)(const char *name, void *value, void *data), void *data) { + dict->del_callback = del_callback; + dict->del_callback_data = data; +} + +void dictionary_register_conflict_callback(DICTIONARY *dict, void (*conflict_callback)(const char *name, void *old_value, void *new_value, void *data), void *data) { + dict->conflict_callback = conflict_callback; + dict->conflict_callback_data = data; +} + // ---------------------------------------------------------------------------- -// dictionary statistics +// dictionary statistics maintenance -static inline void NETDATA_DICTIONARY_STATS_INSERTS_PLUS1(DICTIONARY *dict) { - if(likely(dict->stats)) - dict->stats->inserts++; +size_t dictionary_stats_allocated_memory(DICTIONARY *dict) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) + return dict->stats->memory; + return 0; } -static inline void NETDATA_DICTIONARY_STATS_DELETES_PLUS1(DICTIONARY *dict) { - if(likely(dict->stats)) - dict->stats->deletes++; +size_t dictionary_stats_entries(DICTIONARY *dict) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) + return dict->stats->entries; + return 0; } -static inline void NETDATA_DICTIONARY_STATS_SEARCHES_PLUS1(DICTIONARY *dict) { - if(likely(dict->stats)) +size_t dictionary_stats_searches(DICTIONARY *dict) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) + return dict->stats->searches; + return 0; +} +size_t dictionary_stats_inserts(DICTIONARY *dict) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) + return dict->stats->inserts; + return 0; +} +size_t dictionary_stats_deletes(DICTIONARY *dict) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) + return dict->stats->deletes; + return 0; +} +size_t dictionary_stats_resets(DICTIONARY *dict) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) + return dict->stats->resets; + return 0; +} + +static inline void DICTIONARY_STATS_SEARCHES_PLUS1(DICTIONARY *dict) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) dict->stats->searches++; } -static inline void NETDATA_DICTIONARY_STATS_ENTRIES_PLUS1(DICTIONARY *dict) { - if(likely(dict->stats)) +static inline void DICTIONARY_STATS_ENTRIES_PLUS1(DICTIONARY *dict, size_t size) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) { + dict->stats->inserts++; dict->stats->entries++; + dict->stats->memory += size; + } } -static inline void NETDATA_DICTIONARY_STATS_ENTRIES_MINUS1(DICTIONARY *dict) { - if(likely(dict->stats)) +static inline void DICTIONARY_STATS_ENTRIES_MINUS1(DICTIONARY *dict, size_t size) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) { + dict->stats->deletes++; dict->stats->entries--; + dict->stats->memory -= size; + } +} +static inline void DICTIONARY_STATS_VALUE_RESETS_PLUS1(DICTIONARY *dict, size_t oldsize, size_t newsize) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) { + dict->stats->resets++; + dict->stats->memory += newsize; + dict->stats->memory -= oldsize; + } } - // ---------------------------------------------------------------------------- // dictionary locks -static inline void dictionary_read_lock(DICTIONARY *dict) { - if(likely(dict->rwlock)) { +static inline size_t dictionary_lock_init(DICTIONARY *dict) { + if(likely(!(dict->flags & DICTIONARY_FLAG_SINGLE_THREADED))) { + dict->rwlock = mallocz(sizeof(netdata_rwlock_t)); + netdata_rwlock_init(dict->rwlock); + return sizeof(netdata_rwlock_t); + } + dict->rwlock = NULL; + return 0; +} + +static inline size_t dictionary_lock_free(DICTIONARY *dict) { + if(likely(!(dict->flags & DICTIONARY_FLAG_SINGLE_THREADED))) { + netdata_rwlock_destroy(dict->rwlock); + freez(dict->rwlock); + return sizeof(netdata_rwlock_t); + } + return 0; +} + +static inline void dictionary_lock_rlock(DICTIONARY *dict) { + if(likely(!(dict->flags & DICTIONARY_FLAG_SINGLE_THREADED))) { // debug(D_DICTIONARY, "Dictionary READ lock"); netdata_rwlock_rdlock(dict->rwlock); } } -static inline void dictionary_write_lock(DICTIONARY *dict) { - if(likely(dict->rwlock)) { +static inline void dictionary_lock_wrlock(DICTIONARY *dict) { + if(likely(!(dict->flags & DICTIONARY_FLAG_SINGLE_THREADED))) { // debug(D_DICTIONARY, "Dictionary WRITE lock"); netdata_rwlock_wrlock(dict->rwlock); } } static inline void dictionary_unlock(DICTIONARY *dict) { - if(likely(dict->rwlock)) { + if(likely(!(dict->flags & DICTIONARY_FLAG_SINGLE_THREADED))) { // debug(D_DICTIONARY, "Dictionary UNLOCK lock"); netdata_rwlock_unlock(dict->rwlock); } } +// ---------------------------------------------------------------------------- +// reference counters + +static inline size_t reference_counter_init(DICTIONARY *dict) { + (void)dict; + + // allocate memory required for reference counters + // return number of bytes + return 0; +} + +static inline size_t reference_counter_free(DICTIONARY *dict) { + (void)dict; + + // free memory required for reference counters + // return number of bytes + return 0; +} + +static void reference_counter_acquire(DICTIONARY *dict, NAME_VALUE *nv) { + if(unlikely(dict->flags & DICTIONARY_FLAG_REFERENCE_COUNTERS)) { + NAME_VALUE_WITH_STATS *nvs = (NAME_VALUE_WITH_STATS *)nv; + __atomic_fetch_add(&nvs->refcount, 1, __ATOMIC_SEQ_CST); + } +} + +static void reference_counter_release(DICTIONARY *dict, NAME_VALUE *nv) { + if(unlikely(dict->flags & DICTIONARY_FLAG_REFERENCE_COUNTERS)) { + NAME_VALUE_WITH_STATS *nvs = (NAME_VALUE_WITH_STATS *)nv; + __atomic_fetch_sub(&nvs->refcount, 1, __ATOMIC_SEQ_CST); + } +} + +static int reference_counter_mark_deleted(DICTIONARY *dict, NAME_VALUE *nv) { + if(unlikely(dict->flags & DICTIONARY_FLAG_REFERENCE_COUNTERS)) { + NAME_VALUE_WITH_STATS *nvs = (NAME_VALUE_WITH_STATS *)nv; + nvs->flags |= NAME_VALUE_FLAG_DELETED; + return 1; + } + return 0; +} // ---------------------------------------------------------------------------- -// avl index +// hash table +#ifdef DICTIONARY_WITH_AVL static int name_value_compare(void* a, void* b) { - if(((NAME_VALUE *)a)->hash < ((NAME_VALUE *)b)->hash) return -1; - else if(((NAME_VALUE *)a)->hash > ((NAME_VALUE *)b)->hash) return 1; - else return strcmp(((NAME_VALUE *)a)->name, ((NAME_VALUE *)b)->name); + return strcmp(((NAME_VALUE *)a)->name, ((NAME_VALUE *)b)->name); } -static inline NAME_VALUE *dictionary_name_value_index_find_nolock(DICTIONARY *dict, const char *name, uint32_t hash) { +static void hashtable_init_unsafe(DICTIONARY *dict) { + avl_init(&dict->values_index, name_value_compare); +} + +static size_t hashtable_destroy_unsafe(DICTIONARY *dict) { + (void)dict; + return 0; +} + +static inline int hashtable_delete_unsafe(DICTIONARY *dict, const char *name, size_t name_len, NAME_VALUE *nv) { + (void)name; + (void)name_len; + + if(unlikely(avl_remove(&(dict->values_index), (avl_t *)(nv)) != (avl_t *)nv)) + return 0; + + return 1; +} + +static inline NAME_VALUE *hashtable_get_unsafe(DICTIONARY *dict, const char *name, size_t name_len) { + (void)name_len; + NAME_VALUE tmp; - tmp.hash = (hash)?hash:simple_hash(name); tmp.name = (char *)name; - - NETDATA_DICTIONARY_STATS_SEARCHES_PLUS1(dict); return (NAME_VALUE *)avl_search(&(dict->values_index), (avl_t *) &tmp); } +static inline NAME_VALUE **hashtable_insert_unsafe(DICTIONARY *dict, const char *name, size_t name_len) { + // AVL needs a NAME_VALUE to insert into the dictionary but we don't have it yet. + // So, the only thing we can do, is return an existing one if it is already there. + // Returning NULL will make the caller thing we added it, will allocate one + // and will call hashtable_inserted_name_value_unsafe(), at which we will do + // the actual indexing. + + dict->hash_base = hashtable_get_unsafe(dict, name, name_len); + return &dict->hash_base; +} + +static inline void hashtable_inserted_name_value_unsafe(DICTIONARY *dict, const char *name, size_t name_len, NAME_VALUE *nv) { + // we have our new NAME_VALUE object. + // Let's index it. + + (void)name; + (void)name_len; + + if(unlikely(avl_insert(&((dict)->values_index), (avl_t *)(nv)) != (avl_t *)nv)) + error("dictionary: INTERNAL ERROR: duplicate insertion to dictionary."); +} +#endif + +#ifdef DICTIONARY_WITH_JUDYHS +static void hashtable_init_unsafe(DICTIONARY *dict) { + dict->JudyHSArray = NULL; +} + +static size_t hashtable_destroy_unsafe(DICTIONARY *dict) { + if(unlikely(!dict->JudyHSArray)) return 0; + + JError_t J_Error; + Word_t ret = JudyHSFreeArray(&dict->JudyHSArray, &J_Error); + if(unlikely(ret == (Word_t) JERR)) { + error("DICTIONARY: Cannot destroy JudyHS, JU_ERRNO_* == %u, ID == %d", + JU_ERRNO(&J_Error), JU_ERRID(&J_Error)); + } + + debug(D_DICTIONARY, "Dictionary: hash table freed %lu bytes", ret); + + dict->JudyHSArray = NULL; + return (size_t)ret; +} + +static inline NAME_VALUE **hashtable_insert_unsafe(DICTIONARY *dict, const char *name, size_t name_len) { + JError_t J_Error; + Pvoid_t *Rc = JudyHSIns(&dict->JudyHSArray, (void *)name, name_len, &J_Error); + if (unlikely(Rc == PJERR)) { + fatal("DICTIONARY: Cannot insert entry with name '%s' to JudyHS, JU_ERRNO_* == %u, ID == %d", + name, JU_ERRNO(&J_Error), JU_ERRID(&J_Error)); + } + + // if *Rc == 0, new item added to the array + // otherwise the existing item value is returned in *Rc + + // we return a pointer to a pointer, so that the caller can + // put anything needed at the value of the index. + // The pointer to pointer we return has to be used before + // any other operation that may change the index (insert/delete). + return (NAME_VALUE **)Rc; +} + +static inline int hashtable_delete_unsafe(DICTIONARY *dict, const char *name, size_t name_len, NAME_VALUE *nv) { + (void)nv; + + if(unlikely(!dict->JudyHSArray)) return 0; + + JError_t J_Error; + int ret = JudyHSDel(&dict->JudyHSArray, (void *)name, name_len, &J_Error); + if(unlikely(ret == JERR)) { + error("DICTIONARY: Cannot delete entry with name '%s' from JudyHS, JU_ERRNO_* == %u, ID == %d", name, + JU_ERRNO(&J_Error), JU_ERRID(&J_Error)); + return 0; + } + + // Hey, this is problematic! We need the value back, not just an int with a status! + // https://sourceforge.net/p/judy/feature-requests/23/ + + if(unlikely(ret == 0)) { + // not found in the dictionary + return 0; + } + else { + // found and deleted from the dictionary + return 1; + } +} + +static inline NAME_VALUE *hashtable_get_unsafe(DICTIONARY *dict, const char *name, size_t name_len) { + if(unlikely(!dict->JudyHSArray)) return NULL; + + DICTIONARY_STATS_SEARCHES_PLUS1(dict); + + Pvoid_t *Rc; + Rc = JudyHSGet(dict->JudyHSArray, (void *)name, name_len); + if(likely(Rc)) { + // found in the hash table + return (NAME_VALUE *)*Rc; + } + else { + // not found in the hash table + return NULL; + } +} + +static inline void hashtable_inserted_name_value_unsafe(DICTIONARY *dict, const char *name, size_t name_len, NAME_VALUE *nv) { + (void)dict; + (void)name; + (void)name_len; + (void)nv; + ; +} + +#endif // DICTIONARY_WITH_JUDYHS + // ---------------------------------------------------------------------------- -// internal methods +// linked list management + +static inline void linkedlist_namevalue_link_unsafe(DICTIONARY *dict, NAME_VALUE *nv) { + if (unlikely(!dict->first_item)) { + // we are the only ones here + nv->next = NULL; + nv->prev = NULL; + dict->first_item = dict->last_item = nv; + return; + } + + if(dict->flags & DICTIONARY_FLAG_ADD_IN_FRONT) { + // add it at the beginning + nv->prev = NULL; + nv->next = dict->first_item; -static NAME_VALUE *dictionary_name_value_create_nolock(DICTIONARY *dict, const char *name, void *value, size_t value_len, uint32_t hash) { + if (likely(nv->next)) nv->next->prev = nv; + dict->first_item = nv; + } + else { + // add it at the end + nv->next = NULL; + nv->prev = dict->last_item; + + if (likely(nv->prev)) nv->prev->next = nv; + dict->last_item = nv; + } +} + +static inline void linkedlist_namevalue_unlink_unsafe(DICTIONARY *dict, NAME_VALUE *nv) { + if(nv->next) nv->next->prev = nv->prev; + if(nv->prev) nv->prev->next = nv->next; + if(dict->first_item == nv) dict->first_item = nv->next; + if(dict->last_item == nv) dict->last_item = nv->prev; +} + +// ---------------------------------------------------------------------------- +// NAME_VALUE methods + +static inline size_t namevalue_alloc_size(DICTIONARY *dict) { + return (dict->flags & DICTIONARY_FLAG_WITH_STATISTICS) ? sizeof(NAME_VALUE_WITH_STATS) : sizeof(NAME_VALUE); +} + +static inline size_t namevalue_get_namelen(DICTIONARY *dict, NAME_VALUE *nv) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) { + NAME_VALUE_WITH_STATS *nvs = (NAME_VALUE_WITH_STATS *)nv; + return nvs->name_len; + } + return 0; +} +static inline size_t namevalue_get_valuelen(DICTIONARY *dict, NAME_VALUE *nv) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) { + NAME_VALUE_WITH_STATS *nvs = (NAME_VALUE_WITH_STATS *)nv; + return nvs->value_len; + } + return 0; +} +static inline void namevalue_set_valuelen(DICTIONARY *dict, NAME_VALUE *nv, size_t value_len) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) { + NAME_VALUE_WITH_STATS *nvs = (NAME_VALUE_WITH_STATS *)nv; + nvs->value_len = value_len; + } +} +static inline void namevalue_set_namevaluelen(DICTIONARY *dict, NAME_VALUE *nv, size_t name_len, size_t value_len) { + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) { + NAME_VALUE_WITH_STATS *nvs = (NAME_VALUE_WITH_STATS *)nv; + nvs->name_len = name_len; + nvs->value_len = value_len; + } +} + +static NAME_VALUE *namevalue_create_unsafe(DICTIONARY *dict, const char *name, size_t name_len, void *value, size_t value_len) { debug(D_DICTIONARY, "Creating name value entry for name '%s'.", name); - NAME_VALUE *nv = callocz(1, sizeof(NAME_VALUE)); + size_t size = namevalue_alloc_size(dict); + NAME_VALUE *nv = mallocz(size); + size_t allocated = size; - if(dict->flags & DICTIONARY_FLAG_NAME_LINK_DONT_CLONE) + namevalue_set_namevaluelen(dict, nv, name_len, value_len); + + if(likely(dict->flags & DICTIONARY_FLAG_NAME_LINK_DONT_CLONE)) nv->name = (char *)name; else { - nv->name = strdupz(name); + nv->name = mallocz(name_len); + memcpy(nv->name, name, name_len); + allocated += name_len; } - nv->hash = (hash)?hash:simple_hash(nv->name); - - if(dict->flags & DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE) + if(likely(dict->flags & DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE)) nv->value = value; else { - nv->value = mallocz(value_len); - memcpy(nv->value, value, value_len); - } + if(likely(value_len)) { + if (value) { + // a value has been supplied + // copy it + nv->value = mallocz(value_len); + memcpy(nv->value, value, value_len); + } + else { + // no value has been supplied + // allocate a clear memory block + nv->value = callocz(1, value_len); + } + } + else { + // the caller want an item without any value + nv->value = NULL; + } - // index it - NETDATA_DICTIONARY_STATS_INSERTS_PLUS1(dict); - if(unlikely(avl_insert(&((dict)->values_index), (avl_t *)(nv)) != (avl_t *)nv)) - error("dictionary: INTERNAL ERROR: duplicate insertion to dictionary."); + allocated += value_len; + } - NETDATA_DICTIONARY_STATS_ENTRIES_PLUS1(dict); + DICTIONARY_STATS_ENTRIES_PLUS1(dict, allocated); return nv; } -static void dictionary_name_value_destroy_nolock(DICTIONARY *dict, NAME_VALUE *nv) { - debug(D_DICTIONARY, "Destroying name value entry for name '%s'.", nv->name); +static void namevalue_reset_unsafe(DICTIONARY *dict, NAME_VALUE *nv, void *value, size_t value_len) { + debug(D_DICTIONARY, "Dictionary entry with name '%s' found. Changing its value.", nv->name); - NETDATA_DICTIONARY_STATS_DELETES_PLUS1(dict); - if(unlikely(avl_remove(&(dict->values_index), (avl_t *)(nv)) != (avl_t *)nv)) - error("dictionary: INTERNAL ERROR: dictionary invalid removal of node."); + if(likely(dict->flags & DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE)) { + debug(D_DICTIONARY, "Dictionary: linking value to '%s'", nv->name); + nv->value = value; + namevalue_set_valuelen(dict, nv, value_len); + } + else { + debug(D_DICTIONARY, "Dictionary: cloning value to '%s'", nv->name); + DICTIONARY_STATS_VALUE_RESETS_PLUS1(dict, namevalue_get_valuelen(dict, nv), value_len); + + void *old = nv->value; + void *new = mallocz(value_len); + memcpy(new, value, value_len); + nv->value = new; + namevalue_set_valuelen(dict, nv, value_len); + + debug(D_DICTIONARY, "Dictionary: freeing old value of '%s'", nv->name); + freez(old); + } +} - NETDATA_DICTIONARY_STATS_ENTRIES_MINUS1(dict); +static size_t namevalue_destroy_unsafe(DICTIONARY *dict, NAME_VALUE *nv) { + debug(D_DICTIONARY, "Destroying name value entry for name '%s'.", nv->name); + + size_t freed = 0; - if(!(dict->flags & DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE)) { - debug(D_REGISTRY, "Dictionary freeing value of '%s'", nv->name); + if(unlikely(!(dict->flags & DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE))) { + debug(D_DICTIONARY, "Dictionary freeing value of '%s'", nv->name); freez(nv->value); + freed += namevalue_get_valuelen(dict, nv); } - if(!(dict->flags & DICTIONARY_FLAG_NAME_LINK_DONT_CLONE)) { - debug(D_REGISTRY, "Dictionary freeing name '%s'", nv->name); + if(unlikely(!(dict->flags & DICTIONARY_FLAG_NAME_LINK_DONT_CLONE))) { + debug(D_DICTIONARY, "Dictionary freeing name '%s'", nv->name); freez(nv->name); + freed += namevalue_get_namelen(dict, nv); } freez(nv); + freed += namevalue_alloc_size(dict); + + DICTIONARY_STATS_ENTRIES_MINUS1(dict, freed); + + return freed; } // ---------------------------------------------------------------------------- -// API - basic methods +// API - dictionary management -DICTIONARY *dictionary_create(uint8_t flags) { +DICTIONARY *dictionary_create(DICTIONARY_FLAGS flags) { debug(D_DICTIONARY, "Creating dictionary."); - DICTIONARY *dict = callocz(1, sizeof(DICTIONARY)); - - if(flags & DICTIONARY_FLAG_WITH_STATISTICS) - dict->stats = callocz(1, sizeof(struct dictionary_stats)); + if((flags & DICTIONARY_FLAG_REFERENCE_COUNTERS) && (flags & DICTIONARY_FLAG_SINGLE_THREADED)) { + error("DICTIONARY: requested reference counters on single threaded dictionary. Not adding reference counters."); + flags &= ~DICTIONARY_FLAG_REFERENCE_COUNTERS; + } - if(!(flags & DICTIONARY_FLAG_SINGLE_THREADED)) { - dict->rwlock = callocz(1, sizeof(netdata_rwlock_t)); - netdata_rwlock_init(dict->rwlock); + if(flags & DICTIONARY_FLAG_REFERENCE_COUNTERS) { + // we need statistics to allocate the extra NAME_VALUE attributes + flags |= DICTIONARY_FLAG_WITH_STATISTICS; } - avl_init(&dict->values_index, name_value_compare); + DICTIONARY *dict = callocz(1, sizeof(DICTIONARY)); + size_t allocated = sizeof(DICTIONARY); + dict->flags = flags; + dict->first_item = dict->last_item = NULL; + + allocated += dictionary_lock_init(dict); + allocated += reference_counter_init(dict); + + if(flags & DICTIONARY_FLAG_WITH_STATISTICS) { + dict->stats = callocz(1, sizeof(struct dictionary_stats)); + allocated += sizeof(struct dictionary_stats); + dict->stats->memory = allocated; + } + else + dict->stats = NULL; - return dict; + hashtable_init_unsafe(dict); + return (DICTIONARY *)dict; } -void dictionary_destroy(DICTIONARY *dict) { +size_t dictionary_destroy(DICTIONARY *dict) { debug(D_DICTIONARY, "Destroying dictionary."); - dictionary_write_lock(dict); + dictionary_lock_wrlock(dict); + + size_t freed = 0; + NAME_VALUE *nv = dict->first_item; + while (nv) { + // cache nv->next + // because we are going to free nv + NAME_VALUE *nvnext = nv->next; + freed += namevalue_destroy_unsafe(dict, nv); + nv = nvnext; + // to speed up destruction, we don't + // unlink nv from the linked-list here + } + + dict->first_item = NULL; + dict->last_item = NULL; - while(dict->values_index.root) - dictionary_name_value_destroy_nolock(dict, (NAME_VALUE *)dict->values_index.root); + // destroy the dictionary + freed += hashtable_destroy_unsafe(dict); dictionary_unlock(dict); + freed += dictionary_lock_free(dict); + freed += reference_counter_free(dict); - if(dict->stats) + if(unlikely(dict->flags & DICTIONARY_FLAG_WITH_STATISTICS)) { freez(dict->stats); - - if(dict->rwlock) { - netdata_rwlock_destroy(dict->rwlock); - freez(dict->rwlock); + dict->stats = NULL; + freed += sizeof(struct dictionary_stats); } freez(dict); + freed += sizeof(DICTIONARY); + + return freed; } // ---------------------------------------------------------------------------- +// API - items management -void *dictionary_set(DICTIONARY *dict, const char *name, void *value, size_t value_len) { - debug(D_DICTIONARY, "SET dictionary entry with name '%s'.", name); - - uint32_t hash = simple_hash(name); +void *dictionary_set_unsafe(DICTIONARY *dict, const char *name, void *value, size_t value_len) { + if(unlikely(!name || !*name)) { + error("Attempted to dictionary_set() a dictionary item without a name"); + return NULL; + } - dictionary_write_lock(dict); + size_t name_len = strlen(name) + 1; // we need the terminating null too - NAME_VALUE *nv = dictionary_name_value_index_find_nolock(dict, name, hash); - if(unlikely(!nv)) { - debug(D_DICTIONARY, "Dictionary entry with name '%s' not found. Creating a new one.", name); + debug(D_DICTIONARY, "SET dictionary entry with name '%s'.", name); - nv = dictionary_name_value_create_nolock(dict, name, value, value_len, hash); - if(unlikely(!nv)) - fatal("Cannot create name_value."); + // DISCUSSION: + // Is it better to gain a read-lock and do a hashtable_get_unsafe() + // before we write lock to do hashtable_insert_unsafe()? + // + // Probably this depends on the use case. + // For statsd for example that does dictionary_set() to update received values, + // it could be beneficial to do a get() before we insert(). + // + // But the caller has the option to do this on his/her own. + // So, let's do the fastest here and let the caller decide the flow of calls. + + NAME_VALUE *nv, **pnv = hashtable_insert_unsafe(dict, name, name_len); + if(likely(*pnv == 0)) { + // a new item added to the index + nv = *pnv = namevalue_create_unsafe(dict, name, name_len, value, value_len); + hashtable_inserted_name_value_unsafe(dict, name, name_len, nv); + linkedlist_namevalue_link_unsafe(dict, nv); + + if(dict->ins_callback) + dict->ins_callback(nv->name, nv->value, dict->ins_callback_data); } else { - debug(D_DICTIONARY, "Dictionary entry with name '%s' found. Changing its value.", name); + // the item is already in the index + // so, either we will return the old one + // or overwrite the value, depending on dictionary flags - if(dict->flags & DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE) { - debug(D_REGISTRY, "Dictionary: linking value to '%s'", name); - nv->value = value; - } - else { - debug(D_REGISTRY, "Dictionary: cloning value to '%s'", name); + nv = *pnv; + if(!(dict->flags & DICTIONARY_FLAG_DONT_OVERWRITE_VALUE)) { - // copy the new value without breaking - // any other thread accessing the same entry - void *new = mallocz(value_len), - *old = nv->value; + if(dict->del_callback) + dict->del_callback(nv->name, nv->value, dict->del_callback_data); - memcpy(new, value, value_len); - nv->value = new; + namevalue_reset_unsafe(dict, nv, value, value_len); - debug(D_REGISTRY, "Dictionary: freeing old value of '%s'", name); - freez(old); + if(dict->ins_callback) + dict->ins_callback(nv->name, nv->value, dict->ins_callback_data); } + else if(dict->conflict_callback) + dict->conflict_callback(nv->name, nv->value, value, dict->conflict_callback_data); } - dictionary_unlock(dict); - return nv->value; } -void *dictionary_get(DICTIONARY *dict, const char *name) { - debug(D_DICTIONARY, "GET dictionary entry with name '%s'.", name); - - dictionary_read_lock(dict); - NAME_VALUE *nv = dictionary_name_value_index_find_nolock(dict, name, 0); +void *dictionary_set(DICTIONARY *dict, const char *name, void *value, size_t value_len) { + dictionary_lock_wrlock(dict); + void *ret = dictionary_set_unsafe(dict, name, value, value_len); dictionary_unlock(dict); + return ret; +} +void *dictionary_get_unsafe(DICTIONARY *dict, const char *name) { + if(unlikely(!name || !*name)) { + error("Attempted to dictionary_get() without a name"); + return NULL; + } + + size_t name_len = strlen(name) + 1; // we need the terminating null too + + debug(D_DICTIONARY, "GET dictionary entry with name '%s'.", name); + + NAME_VALUE *nv = hashtable_get_unsafe(dict, name, name_len); if(unlikely(!nv)) { debug(D_DICTIONARY, "Not found dictionary entry with name '%s'.", name); return NULL; @@ -229,101 +776,508 @@ void *dictionary_get(DICTIONARY *dict, const char *name) { return nv->value; } -int dictionary_del(DICTIONARY *dict, const char *name) { - int ret; +void *dictionary_get(DICTIONARY *dict, const char *name) { + dictionary_lock_rlock(dict); + void *ret = dictionary_get_unsafe(dict, name); + dictionary_unlock(dict); + return ret; +} + +int dictionary_del_unsafe(DICTIONARY *dict, const char *name) { + if(unlikely(!name || !*name)) { + error("Attempted to dictionary_det() without a name"); + return -1; + } + + size_t name_len = strlen(name) + 1; // we need the terminating null too debug(D_DICTIONARY, "DEL dictionary entry with name '%s'.", name); - dictionary_write_lock(dict); + // Unfortunately, the JudyHSDel() does not return the value of the + // item that was deleted, so we have to find it before we delete it, + // since we need to release our structures too. - NAME_VALUE *nv = dictionary_name_value_index_find_nolock(dict, name, 0); + int ret; + NAME_VALUE *nv = hashtable_get_unsafe(dict, name, name_len); if(unlikely(!nv)) { debug(D_DICTIONARY, "Not found dictionary entry with name '%s'.", name); ret = -1; } else { debug(D_DICTIONARY, "Found dictionary entry with name '%s'.", name); - dictionary_name_value_destroy_nolock(dict, nv); + + if(hashtable_delete_unsafe(dict, name, name_len, nv) == 0) + error("DICTIONARY: INTERNAL ERROR: tried to delete item with name '%s' that is not in the index", name); + + if(!reference_counter_mark_deleted(dict, nv)) { + linkedlist_namevalue_unlink_unsafe(dict, nv); + + if(dict->del_callback) + dict->del_callback(nv->name, nv->value, dict->del_callback_data); + + namevalue_destroy_unsafe(dict, nv); + } ret = 0; } + return ret; +} +int dictionary_del(DICTIONARY *dict, const char *name) { + dictionary_lock_wrlock(dict); + int ret = dictionary_del_unsafe(dict, name); dictionary_unlock(dict); - return ret; } - // ---------------------------------------------------------------------------- -// API - walk through the dictionary -// the dictionary is locked for reading while this happens -// do not user other dictionary calls while walking the dictionary - deadlock! +// traversal with loop -static int dictionary_walker(avl_t *a, int (*callback)(void *entry, void *data), void *data) { - int total = 0, ret = 0; +void *dictionary_foreach_start_rw(DICTFE *dfe, DICTIONARY *dict, char rw) { + if(unlikely(!dfe || !dict)) return NULL; - if(a->avl_link[0]) { - ret = dictionary_walker(a->avl_link[0], callback, data); - if(ret < 0) return ret; - total += ret; + dfe->dict = dict; + dfe->started_ut = now_realtime_usec(); + + if(rw == 'r' || rw == 'R') + dictionary_lock_rlock(dict); + else + dictionary_lock_wrlock(dict); + + NAME_VALUE *nv = dict->first_item; + dfe->last_position_index = (void *)nv; + + if(likely(nv)) { + dfe->next_position_index = (void *)nv->next; + dfe->name = nv->name; + dfe->value = (void *)nv->value; + reference_counter_acquire(dict, nv); + } + else { + dfe->next_position_index = NULL; + dfe->name = NULL; + dfe->value = NULL; } - ret = callback(((NAME_VALUE *)a)->value, data); - if(ret < 0) return ret; - total += ret; + return dfe->value; +} + +void *dictionary_foreach_next(DICTFE *dfe) { + if(unlikely(!dfe || !dfe->dict)) return NULL; + + NAME_VALUE *nv = (NAME_VALUE *)dfe->last_position_index; + if(likely(nv)) + reference_counter_release(dfe->dict, nv); + + nv = dfe->last_position_index = dfe->next_position_index; + + if(likely(nv)) { + dfe->next_position_index = (void *)nv->next; + dfe->name = nv->name; + dfe->value = (void *)nv->value; - if(a->avl_link[1]) { - ret = dictionary_walker(a->avl_link[1], callback, data); - if (ret < 0) return ret; - total += ret; + reference_counter_acquire(dfe->dict, nv); + } + else { + dfe->next_position_index = NULL; + dfe->name = NULL; + dfe->value = NULL; } - return total; + return dfe->value; } -int dictionary_get_all(DICTIONARY *dict, int (*callback)(void *entry, void *data), void *data) { +usec_t dictionary_foreach_done(DICTFE *dfe) { + if(unlikely(!dfe || !dfe->dict)) return 0; + + NAME_VALUE *nv = (NAME_VALUE *)dfe->last_position_index; + if(nv) + reference_counter_release(dfe->dict, nv); + + dictionary_unlock((DICTIONARY *)dfe->dict); + dfe->dict = NULL; + dfe->last_position_index = NULL; + dfe->next_position_index = NULL; + dfe->name = NULL; + dfe->value = NULL; + + usec_t usec = now_realtime_usec() - dfe->started_ut; + dfe->started_ut = 0; + + return usec; +} + +// ---------------------------------------------------------------------------- +// API - walk through the dictionary +// the dictionary is locked for reading while this happens +// do not use other dictionary calls while walking the dictionary - deadlock! + +int dictionary_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const char *name, void *entry, void *data), void *data) { + if(rw == 'r' || rw == 'R') + dictionary_lock_rlock(dict); + else + dictionary_lock_wrlock(dict); + + // written in such a way, that the callback can delete the active element + int ret = 0; + NAME_VALUE *nv = dict->first_item, *nv_next; + while(nv) { + nv_next = nv->next; + + reference_counter_acquire(dict, nv); + int r = callback(nv->name, nv->value, data); + reference_counter_release(dict, nv); + if(unlikely(r < 0)) { + ret = r; + break; + } - dictionary_read_lock(dict); + ret += r; - if(likely(dict->values_index.root)) - ret = dictionary_walker(dict->values_index.root, callback, data); + nv = nv_next; + } dictionary_unlock(dict); return ret; } -static int dictionary_walker_name_value(avl_t *a, int (*callback)(char *name, void *entry, void *data), void *data) { - int total = 0, ret = 0; +// ---------------------------------------------------------------------------- +// unit test - if(a->avl_link[0]) { - ret = dictionary_walker_name_value(a->avl_link[0], callback, data); - if(ret < 0) return ret; - total += ret; +static void dictionary_unittest_free_char_pp(char **pp, size_t entries) { + for(size_t i = 0; i < entries ;i++) + freez(pp[i]); + + freez(pp); +} + +static char **dictionary_unittest_generate_names(size_t entries) { + char **names = mallocz(sizeof(char *) * entries); + for(size_t i = 0; i < entries ;i++) { + char buf[25 + 1] = ""; + snprintfz(buf, 25, "name.%zu.0123456789.%zu \t !@#$%%^&*(),./[]{}\\|~`", i, entries / 2 + i); + names[i] = strdupz(buf); } + return names; +} - ret = callback(((NAME_VALUE *)a)->name, ((NAME_VALUE *)a)->value, data); - if(ret < 0) return ret; - total += ret; +static char **dictionary_unittest_generate_values(size_t entries) { + char **values = mallocz(sizeof(char *) * entries); + for(size_t i = 0; i < entries ;i++) { + char buf[25 + 1] = ""; + snprintfz(buf, 25, "value-%zu-0987654321.%zu%%^&*(),. \t !@#$/[]{}\\|~`", i, entries / 2 + i); + values[i] = strdupz(buf); + } + return values; +} - if(a->avl_link[1]) { - ret = dictionary_walker_name_value(a->avl_link[1], callback, data); - if (ret < 0) return ret; - total += ret; +static size_t dictionary_unittest_set_clone(DICTIONARY *dict, char **names, char **values, size_t entries) { + size_t errors = 0; + for(size_t i = 0; i < entries ;i++) { + size_t vallen = strlen(values[i]) + 1; + char *val = (char *)dictionary_set(dict, names[i], values[i], vallen); + if(val == values[i]) { fprintf(stderr, ">>> %s() returns reference to value\n", __FUNCTION__); errors++; } + if(!val || memcmp(val, values[i], vallen) != 0) { fprintf(stderr, ">>> %s() returns invalid value\n", __FUNCTION__); errors++; } } + return errors; +} - return total; +static size_t dictionary_unittest_set_nonclone(DICTIONARY *dict, char **names, char **values, size_t entries) { + size_t errors = 0; + for(size_t i = 0; i < entries ;i++) { + size_t vallen = strlen(values[i]) + 1; + char *val = (char *)dictionary_set(dict, names[i], values[i], vallen); + if(val != values[i]) { fprintf(stderr, ">>> %s() returns invalid pointer to value\n", __FUNCTION__); errors++; } + } + return errors; } -int dictionary_get_all_name_value(DICTIONARY *dict, int (*callback)(char *name, void *entry, void *data), void *data) { - int ret = 0; +static size_t dictionary_unittest_get_clone(DICTIONARY *dict, char **names, char **values, size_t entries) { + size_t errors = 0; + for(size_t i = 0; i < entries ;i++) { + size_t vallen = strlen(values[i]) + 1; + char *val = (char *)dictionary_get(dict, names[i]); + if(val == values[i]) { fprintf(stderr, ">>> %s() returns reference to value\n", __FUNCTION__); errors++; } + if(!val || memcmp(val, values[i], vallen) != 0) { fprintf(stderr, ">>> %s() returns invalid value\n", __FUNCTION__); errors++; } + } + return errors; +} - dictionary_read_lock(dict); +static size_t dictionary_unittest_get_nonclone(DICTIONARY *dict, char **names, char **values, size_t entries) { + size_t errors = 0; + for(size_t i = 0; i < entries ;i++) { + char *val = (char *)dictionary_get(dict, names[i]); + if(val != values[i]) { fprintf(stderr, ">>> %s() returns invalid pointer to value\n", __FUNCTION__); errors++; } + } + return errors; +} - if(likely(dict->values_index.root)) - ret = dictionary_walker_name_value(dict->values_index.root, callback, data); +static size_t dictionary_unittest_get_nonexisting(DICTIONARY *dict, char **names, char **values, size_t entries) { + (void)names; + size_t errors = 0; + for(size_t i = 0; i < entries ;i++) { + char *val = (char *)dictionary_get(dict, values[i]); + if(val) { fprintf(stderr, ">>> %s() returns non-existing item\n", __FUNCTION__); errors++; } + } + return errors; +} - dictionary_unlock(dict); +static size_t dictionary_unittest_del_nonexisting(DICTIONARY *dict, char **names, char **values, size_t entries) { + (void)names; + size_t errors = 0; + for(size_t i = 0; i < entries ;i++) { + int ret = dictionary_del(dict, values[i]); + if(ret != -1) { fprintf(stderr, ">>> %s() deleted non-existing item\n", __FUNCTION__); errors++; } + } + return errors; +} - return ret; +static size_t dictionary_unittest_del_existing(DICTIONARY *dict, char **names, char **values, size_t entries) { + (void)values; + size_t errors = 0; + + size_t forward_from = 0, forward_to = entries / 3; + size_t middle_from = forward_to, middle_to = entries * 2 / 3; + size_t backward_from = middle_to, backward_to = entries; + + for(size_t i = forward_from; i < forward_to ;i++) { + int ret = dictionary_del(dict, names[i]); + if(ret == -1) { fprintf(stderr, ">>> %s() didn't delete (forward) existing item\n", __FUNCTION__); errors++; } + } + + for(size_t i = middle_to - 1; i >= middle_from ;i--) { + int ret = dictionary_del(dict, names[i]); + if(ret == -1) { fprintf(stderr, ">>> %s() didn't delete (middle) existing item\n", __FUNCTION__); errors++; } + } + + for(size_t i = backward_to - 1; i >= backward_from ;i--) { + int ret = dictionary_del(dict, names[i]); + if(ret == -1) { fprintf(stderr, ">>> %s() didn't delete (backward) existing item\n", __FUNCTION__); errors++; } + } + + return errors; +} + +static size_t dictionary_unittest_reset_clone(DICTIONARY *dict, char **names, char **values, size_t entries) { + (void)values; + // set the name as value too + size_t errors = 0; + for(size_t i = 0; i < entries ;i++) { + size_t vallen = strlen(names[i]) + 1; + char *val = (char *)dictionary_set(dict, names[i], names[i], vallen); + if(val == names[i]) { fprintf(stderr, ">>> %s() returns reference to value\n", __FUNCTION__); errors++; } + if(!val || memcmp(val, names[i], vallen) != 0) { fprintf(stderr, ">>> %s() returns invalid value\n", __FUNCTION__); errors++; } + } + return errors; +} + +static size_t dictionary_unittest_reset_nonclone(DICTIONARY *dict, char **names, char **values, size_t entries) { + (void)values; + // set the name as value too + size_t errors = 0; + for(size_t i = 0; i < entries ;i++) { + size_t vallen = strlen(names[i]) + 1; + char *val = (char *)dictionary_set(dict, names[i], names[i], vallen); + if(val != names[i]) { fprintf(stderr, ">>> %s() returns invalid pointer to value\n", __FUNCTION__); errors++; } + if(!val) { fprintf(stderr, ">>> %s() returns invalid value\n", __FUNCTION__); errors++; } + } + return errors; +} + +static size_t dictionary_unittest_reset_dont_overwrite_nonclone(DICTIONARY *dict, char **names, char **values, size_t entries) { + // set the name as value too + size_t errors = 0; + for(size_t i = 0; i < entries ;i++) { + size_t vallen = strlen(names[i]) + 1; + char *val = (char *)dictionary_set(dict, names[i], names[i], vallen); + if(val != values[i]) { fprintf(stderr, ">>> %s() returns invalid pointer to value\n", __FUNCTION__); errors++; } + } + return errors; +} + +static int dictionary_unittest_walkthrough_callback(const char *name, void *value, void *data) { + (void)name; + (void)value; + (void)data; + return 1; +} + +static size_t dictionary_unittest_walkthrough(DICTIONARY *dict, char **names, char **values, size_t entries) { + (void)names; + (void)values; + int sum = dictionary_walkthrough_read(dict, dictionary_unittest_walkthrough_callback, NULL); + if(sum < (int)entries) return entries - sum; + else return sum - entries; +} + +static int dictionary_unittest_walkthrough_delete_this_callback(const char *name, void *value, void *data) { + (void)value; + + if(dictionary_del_having_write_lock((DICTIONARY *)data, name) == -1) + return 0; + + return 1; +} + +static size_t dictionary_unittest_walkthrough_delete_this(DICTIONARY *dict, char **names, char **values, size_t entries) { + (void)names; + (void)values; + int sum = dictionary_walkthrough_write(dict, dictionary_unittest_walkthrough_delete_this_callback, dict); + if(sum < (int)entries) return entries - sum; + else return sum - entries; +} + +static int dictionary_unittest_walkthrough_stop_callback(const char *name, void *value, void *data) { + (void)name; + (void)value; + (void)data; + return -1; +} + +static size_t dictionary_unittest_walkthrough_stop(DICTIONARY *dict, char **names, char **values, size_t entries) { + (void)names; + (void)values; + (void)entries; + int sum = dictionary_walkthrough_read(dict, dictionary_unittest_walkthrough_stop_callback, NULL); + if(sum != -1) return 1; + return 0; +} + +static size_t dictionary_unittest_foreach(DICTIONARY *dict, char **names, char **values, size_t entries) { + (void)names; + (void)values; + (void)entries; + size_t count = 0; + char *item; + dfe_start_read(dict, item) + count++; + dfe_done(item); + + if(count > entries) return count - entries; + return entries - count; +} + +static size_t dictionary_unittest_foreach_delete_this(DICTIONARY *dict, char **names, char **values, size_t entries) { + (void)names; + (void)values; + (void)entries; + size_t count = 0; + char *item; + dfe_start_write(dict, item) + if(dictionary_del_having_write_lock(dict, item_name) != -1) count++; + dfe_done(item); + + if(count > entries) return count - entries; + return entries - count; +} + +static size_t dictionary_unittest_destroy(DICTIONARY *dict, char **names, char **values, size_t entries) { + (void)names; + (void)values; + (void)entries; + size_t bytes = dictionary_destroy(dict); + fprintf(stderr, " %s() freed %zu bytes,", __FUNCTION__, bytes); + return 0; +} + +static usec_t dictionary_unittest_run_and_measure_time(DICTIONARY *dict, char *message, char **names, char **values, size_t entries, size_t *errors, size_t (*callback)(DICTIONARY *dict, char **names, char **values, size_t entries)) { + fprintf(stderr, "%-40s... ", message); + + usec_t started = now_realtime_usec(); + size_t errs = callback(dict, names, values, entries); + usec_t ended = now_realtime_usec(); + usec_t dt = ended - started; + + if(callback == dictionary_unittest_destroy) dict = NULL; + + fprintf(stderr, " %zu errors, %zu items in dictionary, %llu usec \n", errs, dict? dictionary_stats_entries(dict):0, dt); + *errors += errs; + return dt; +} + +void dictionary_unittest_clone(DICTIONARY *dict, char **names, char **values, size_t entries, size_t *errors) { + dictionary_unittest_run_and_measure_time(dict, "adding entries", names, values, entries, errors, dictionary_unittest_set_clone); + dictionary_unittest_run_and_measure_time(dict, "getting entries", names, values, entries, errors, dictionary_unittest_get_clone); + dictionary_unittest_run_and_measure_time(dict, "getting non-existing entries", names, values, entries, errors, dictionary_unittest_get_nonexisting); + dictionary_unittest_run_and_measure_time(dict, "resetting entries", names, values, entries, errors, dictionary_unittest_reset_clone); + dictionary_unittest_run_and_measure_time(dict, "deleting non-existing entries", names, values, entries, errors, dictionary_unittest_del_nonexisting); + dictionary_unittest_run_and_measure_time(dict, "traverse foreach read loop", names, values, entries, errors, dictionary_unittest_foreach); + dictionary_unittest_run_and_measure_time(dict, "walkthrough read callback", names, values, entries, errors, dictionary_unittest_walkthrough); + dictionary_unittest_run_and_measure_time(dict, "walkthrough read callback stop", names, values, entries, errors, dictionary_unittest_walkthrough_stop); + dictionary_unittest_run_and_measure_time(dict, "deleting existing entries", names, values, entries, errors, dictionary_unittest_del_existing); + dictionary_unittest_run_and_measure_time(dict, "walking through empty", names, values, 0, errors, dictionary_unittest_walkthrough); + dictionary_unittest_run_and_measure_time(dict, "traverse foreach empty", names, values, 0, errors, dictionary_unittest_foreach); + dictionary_unittest_run_and_measure_time(dict, "destroying empty dictionary", names, values, entries, errors, dictionary_unittest_destroy); +} + +void dictionary_unittest_nonclone(DICTIONARY *dict, char **names, char **values, size_t entries, size_t *errors) { + dictionary_unittest_run_and_measure_time(dict, "adding entries", names, values, entries, errors, dictionary_unittest_set_nonclone); + dictionary_unittest_run_and_measure_time(dict, "getting entries", names, values, entries, errors, dictionary_unittest_get_nonclone); + dictionary_unittest_run_and_measure_time(dict, "getting non-existing entries", names, values, entries, errors, dictionary_unittest_get_nonexisting); + dictionary_unittest_run_and_measure_time(dict, "resetting entries", names, values, entries, errors, dictionary_unittest_reset_nonclone); + dictionary_unittest_run_and_measure_time(dict, "deleting non-existing entries", names, values, entries, errors, dictionary_unittest_del_nonexisting); + dictionary_unittest_run_and_measure_time(dict, "traverse foreach read loop", names, values, entries, errors, dictionary_unittest_foreach); + dictionary_unittest_run_and_measure_time(dict, "walkthrough read callback", names, values, entries, errors, dictionary_unittest_walkthrough); + dictionary_unittest_run_and_measure_time(dict, "walkthrough read callback stop", names, values, entries, errors, dictionary_unittest_walkthrough_stop); + dictionary_unittest_run_and_measure_time(dict, "deleting existing entries", names, values, entries, errors, dictionary_unittest_del_existing); + dictionary_unittest_run_and_measure_time(dict, "walking through empty", names, values, 0, errors, dictionary_unittest_walkthrough); + dictionary_unittest_run_and_measure_time(dict, "traverse foreach empty", names, values, 0, errors, dictionary_unittest_foreach); + dictionary_unittest_run_and_measure_time(dict, "destroying empty dictionary", names, values, entries, errors, dictionary_unittest_destroy); +} + +int dictionary_unittest(size_t entries) { + if(entries < 10) entries = 10; + + DICTIONARY *dict; + size_t errors = 0; + + fprintf(stderr, "Generating %zu names and values...\n", entries); + char **names = dictionary_unittest_generate_names(entries); + char **values = dictionary_unittest_generate_values(entries); + + fprintf(stderr, "\nCreating dictionary single threaded, clone, %zu items\n", entries); + dict = dictionary_create(DICTIONARY_FLAG_SINGLE_THREADED|DICTIONARY_FLAG_WITH_STATISTICS); + dictionary_unittest_clone(dict, names, values, entries, &errors); + + fprintf(stderr, "\nCreating dictionary multi threaded, clone, %zu items\n", entries); + dict = dictionary_create(DICTIONARY_FLAG_WITH_STATISTICS); + dictionary_unittest_clone(dict, names, values, entries, &errors); + + fprintf(stderr, "\nCreating dictionary single threaded, non-clone, add-in-front options, %zu items\n", entries); + dict = dictionary_create(DICTIONARY_FLAG_SINGLE_THREADED|DICTIONARY_FLAG_WITH_STATISTICS|DICTIONARY_FLAG_NAME_LINK_DONT_CLONE|DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE|DICTIONARY_FLAG_ADD_IN_FRONT); + dictionary_unittest_nonclone(dict, names, values, entries, &errors); + + fprintf(stderr, "\nCreating dictionary multi threaded, non-clone, add-in-front options, %zu items\n", entries); + dict = dictionary_create(DICTIONARY_FLAG_WITH_STATISTICS|DICTIONARY_FLAG_NAME_LINK_DONT_CLONE|DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE|DICTIONARY_FLAG_ADD_IN_FRONT); + dictionary_unittest_nonclone(dict, names, values, entries, &errors); + + fprintf(stderr, "\nCreating dictionary single-threaded, non-clone, don't overwrite options, %zu items\n", entries); + dict = dictionary_create(DICTIONARY_FLAG_SINGLE_THREADED|DICTIONARY_FLAG_WITH_STATISTICS|DICTIONARY_FLAG_NAME_LINK_DONT_CLONE|DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE|DICTIONARY_FLAG_DONT_OVERWRITE_VALUE); + dictionary_unittest_run_and_measure_time(dict, "adding entries", names, values, entries, &errors, dictionary_unittest_set_nonclone); + dictionary_unittest_run_and_measure_time(dict, "resetting non-overwrite entries", names, values, entries, &errors, dictionary_unittest_reset_dont_overwrite_nonclone); + dictionary_unittest_run_and_measure_time(dict, "traverse foreach read loop", names, values, entries, &errors, dictionary_unittest_foreach); + dictionary_unittest_run_and_measure_time(dict, "walkthrough read callback", names, values, entries, &errors, dictionary_unittest_walkthrough); + dictionary_unittest_run_and_measure_time(dict, "walkthrough read callback stop", names, values, entries, &errors, dictionary_unittest_walkthrough_stop); + dictionary_unittest_run_and_measure_time(dict, "destroying full dictionary", names, values, entries, &errors, dictionary_unittest_destroy); + + fprintf(stderr, "\nCreating dictionary multi-threaded, non-clone, don't overwrite options, %zu items\n", entries); + dict = dictionary_create(DICTIONARY_FLAG_WITH_STATISTICS|DICTIONARY_FLAG_NAME_LINK_DONT_CLONE|DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE|DICTIONARY_FLAG_DONT_OVERWRITE_VALUE); + dictionary_unittest_run_and_measure_time(dict, "adding entries", names, values, entries, &errors, dictionary_unittest_set_nonclone); + dictionary_unittest_run_and_measure_time(dict, "walkthrough write delete this", names, values, entries, &errors, dictionary_unittest_walkthrough_delete_this); + dictionary_unittest_run_and_measure_time(dict, "destroying empty dictionary", names, values, entries, &errors, dictionary_unittest_destroy); + + fprintf(stderr, "\nCreating dictionary multi-threaded, non-clone, don't overwrite options, %zu items\n", entries); + dict = dictionary_create(DICTIONARY_FLAG_WITH_STATISTICS|DICTIONARY_FLAG_NAME_LINK_DONT_CLONE|DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE|DICTIONARY_FLAG_DONT_OVERWRITE_VALUE); + dictionary_unittest_run_and_measure_time(dict, "adding entries", names, values, entries, &errors, dictionary_unittest_set_nonclone); + dictionary_unittest_run_and_measure_time(dict, "foreach write delete this", names, values, entries, &errors, dictionary_unittest_foreach_delete_this); + dictionary_unittest_run_and_measure_time(dict, "traverse foreach read loop empty", names, values, 0, &errors, dictionary_unittest_foreach); + dictionary_unittest_run_and_measure_time(dict, "walkthrough read callback empty", names, values, 0, &errors, dictionary_unittest_walkthrough); + dictionary_unittest_run_and_measure_time(dict, "destroying empty dictionary", names, values, entries, &errors, dictionary_unittest_destroy); + + dictionary_unittest_free_char_pp(names, entries); + dictionary_unittest_free_char_pp(values, entries); + + fprintf(stderr, "\n%zu errors found\n", errors); + return (int)errors; } diff --git a/libnetdata/dictionary/dictionary.h b/libnetdata/dictionary/dictionary.h index 76213887e..356bf1895 100644 --- a/libnetdata/dictionary/dictionary.h +++ b/libnetdata/dictionary/dictionary.h @@ -5,45 +5,186 @@ #include "../libnetdata.h" -struct dictionary_stats { - unsigned long long inserts; - unsigned long long deletes; - unsigned long long searches; - unsigned long long entries; -}; -typedef struct name_value { - avl_t avl_node; // the index - this has to be first! +/* + * Netdata DICTIONARY features: + * + * CLONE or LINK + * Names and Values in the dictionary can be cloned or linked. + * In clone mode, the dictionary does all the memory management. + * The default is clone for both names and values. + * Set DICTIONARY_FLAG_NAME_LINK_DONT_CLONE to link names. + * Set DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE to link names. + * + * ORDERED + * Items are ordered in the order they are added (new items are appended at the end). + * You may reverse the order by setting the flag DICTIONARY_FLAG_ADD_IN_FRONT. + * + * LOOKUP + * The dictionary uses JudyHS to maintain a very fast randomly accessible hash table. + * + * MULTI-THREADED and SINGLE-THREADED + * Each dictionary may be single threaded (no locks), or multi-threaded (multiple readers or one writer). + * The default is multi-threaded. Add the flag DICTIONARY_FLAG_SINGLE_THREADED for single-threaded. + * + * WALK-THROUGH and FOREACH traversal + * The dictionary can be traversed on read or write mode, either with a callback (walkthrough) or with + * a loop (foreach). + * + * In write mode traversal, the caller may delete only the current item, but may add as many items as needed. + * + */ - uint32_t hash; // a simple hash to speed up searching - // we first compare hashes, and only if the hashes are equal we do string comparisons +#ifndef DICTIONARY_INTERNALS +typedef void DICTIONARY; +#endif - char *name; - void *value; -} NAME_VALUE; +typedef enum dictionary_flags { + DICTIONARY_FLAG_NONE = 0, // the default is the opposite of all below + DICTIONARY_FLAG_SINGLE_THREADED = (1 << 0), // don't use any locks (default: use locks) + DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE = (1 << 1), // don't copy the value, just point to the one provided (default: copy) + DICTIONARY_FLAG_NAME_LINK_DONT_CLONE = (1 << 2), // don't copy the name, just point to the one provided (default: copy) + DICTIONARY_FLAG_WITH_STATISTICS = (1 << 3), // maintain statistics about dictionary operations (default: disabled) + DICTIONARY_FLAG_DONT_OVERWRITE_VALUE = (1 << 4), // don't overwrite values of dictionary items (default: overwrite) + DICTIONARY_FLAG_ADD_IN_FRONT = (1 << 5), // add dictionary items at the front of the linked list (default: at the end) + DICTIONARY_FLAG_RESERVED1 = (1 << 6), // this is reserved for DICTIONARY_FLAG_REFERENCE_COUNTERS +} DICTIONARY_FLAGS; -typedef struct dictionary { - avl_tree_type values_index; +// Create a dictionary +extern DICTIONARY *dictionary_create(DICTIONARY_FLAGS flags); - uint8_t flags; +// an insert callback to be called just after an item is added to the dictionary +// this callback is called while the dictionary is write locked! +extern void dictionary_register_insert_callback(DICTIONARY *dict, void (*ins_callback)(const char *name, void *value, void *data), void *data); - struct dictionary_stats *stats; - netdata_rwlock_t *rwlock; -} DICTIONARY; +// a delete callback to be called just before an item is deleted forever +// this callback is called while the dictionary is write locked! +extern void dictionary_register_delete_callback(DICTIONARY *dict, void (*del_callback)(const char *name, void *value, void *data), void *data); -#define DICTIONARY_FLAG_DEFAULT 0x00000000 -#define DICTIONARY_FLAG_SINGLE_THREADED 0x00000001 -#define DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE 0x00000002 -#define DICTIONARY_FLAG_NAME_LINK_DONT_CLONE 0x00000004 -#define DICTIONARY_FLAG_WITH_STATISTICS 0x00000008 +// a merge callback to be called when DICTIONARY_FLAG_DONT_OVERWRITE_VALUE +// and an item is already found in the dictionary - the dictionary does nothing else in this case +// the old_value will remain in the dictionary - the new_value is ignored +extern void dictionary_register_conflict_callback(DICTIONARY *dict, void (*conflict_callback)(const char *name, void *old_value, void *new_value, void *data), void *data); -extern DICTIONARY *dictionary_create(uint8_t flags); -extern void dictionary_destroy(DICTIONARY *dict); +// Destroy a dictionary +// returns the number of bytes freed +// the returned value will not include name and value sizes if DICTIONARY_FLAG_WITH_STATISTICS is not set +extern size_t dictionary_destroy(DICTIONARY *dict); + +// Set an item in the dictionary +// - if an item with the same name does not exist, create one +// - if an item with the same name exists, then: +// a) if DICTIONARY_FLAG_DONT_OVERWRITE_VALUE is set, just return the existing value (ignore the new value) +// else b) reset the value to the new value passed at the call +// +// When DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE is set, the value is linked, otherwise it is copied +// When DICTIONARY_FLAG_NAME_LINK_DONT_CLONE is set, the name is linked, otherwise it is copied +// +// When neither DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE nor DICTIONARY_FLAG_NAME_LINK_DONT_CLONE are set, all the +// memory management for names and values is done by the dictionary. +// +// Passing NULL as value, the dictionary will callocz() the newly allocated value, otherwise it will copy it. +// Passing 0 as value_len, the dictionary will set the value to NULL (no allocations for value will be made). extern void *dictionary_set(DICTIONARY *dict, const char *name, void *value, size_t value_len) NEVERNULL; + +// Get an item from the dictionary +// If it returns NULL, the item is not found extern void *dictionary_get(DICTIONARY *dict, const char *name); + +// Delete an item from the dictionary +// returns 0 if the item was found and has been deleted +// returns -1 if the item was not found in the index extern int dictionary_del(DICTIONARY *dict, const char *name); -extern int dictionary_get_all(DICTIONARY *dict, int (*callback)(void *entry, void *d), void *data); -extern int dictionary_get_all_name_value(DICTIONARY *dict, int (*callback)(char *name, void *entry, void *d), void *data); +// UNSAFE functions, without locks +// to be used when the user is traversing with the right lock type +// Read lock is acquired by dictionary_walktrhough_read() and dfe_start_read() +// Write lock is acquired by dictionary_walktrhough_write() and dfe_start_write() +// For code readability, please use these macros: +#define dictionary_get_having_read_lock(dict, name) dictionary_get_unsafe(dict, name) +#define dictionary_get_having_write_lock(dict, name) dictionary_get_unsafe(dict, name) +#define dictionary_set_having_write_lock(dict, name, value, value_len) dictionary_set_unsafe(dict, name, value, value_len) +#define dictionary_del_having_write_lock(dict, name) dictionary_del_unsafe(dict, name) + +extern void *dictionary_get_unsafe(DICTIONARY *dict, const char *name); +extern void *dictionary_set_unsafe(DICTIONARY *dict, const char *name, void *value, size_t value_len); +extern int dictionary_del_unsafe(DICTIONARY *dict, const char *name); + +// Traverse (walk through) the items of the dictionary. +// The order of traversal is currently the order of insertion. +// +// The callback function may return a negative number to stop the traversal, +// in which case that negative value is returned to the caller. +// +// If all callback calls return zero or positive numbers, the sum of all of +// them is returned to the caller. +// +// You cannot alter the dictionary from inside a dictionary_walkthrough_read() - deadlock! +// You can only delete the current item from inside a dictionary_walkthrough_write() - you can add as many as you want. +// +#define dictionary_walkthrough_read(dict, callback, data) dictionary_walkthrough_rw(dict, 'r', callback, data) +#define dictionary_walkthrough_write(dict, callback, data) dictionary_walkthrough_rw(dict, 'w', callback, data) +extern int dictionary_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const char *name, void *value, void *data), void *data); + +// Traverse with foreach +// +// Use like this: +// +// DICTFE dfe = {}; +// for(MY_ITEM *item = dfe_start_read(&dfe, dict); item ; item = dfe_next(&dfe)) { +// // do things with the item and its dfe.name +// } +// dfe_done(&dfe); +// +// You cannot alter the dictionary from within a dfe_read_start() - deadlock! +// You can only delete the current item from inside a dfe_start_write() - you can add as many as you want. +// + +#ifdef DICTIONARY_INTERNALS +#define DICTFE_CONST +#else +#define DICTFE_CONST const +#endif + +typedef DICTFE_CONST struct dictionary_foreach { + DICTFE_CONST char *name; // the dictionary name of the last item used + void *value; // the dictionary value of the last item used + // same as the return value of dictfe_start() and dictfe_next() + + // the following are for internal use only - to keep track of the point we are + usec_t started_ut; // the time the caller started iterating (now_realtime_usec()) + DICTIONARY *dict; // the dictionary upon we work + void *last_position_index; // the internal position index, to remember the position we are at + void *next_position_index; // the internal position index, of the next item +} DICTFE; + +#define dfe_start_read(dict, value) dfe_start_rw(dict, value, 'r') +#define dfe_start_write(dict, value) dfe_start_rw(dict, value, 'w') +#define dfe_start_rw(dict, value, mode) \ + do { \ + DICTFE value ## _dfe = {}; \ + const char *value ## _name; (void)(value ## _name); \ + for((value) = dictionary_foreach_start_rw(&value ## _dfe, (dict), (mode)), ( value ## _name ) = value ## _dfe.name; \ + (value) ;\ + (value) = dictionary_foreach_next(&value ## _dfe), ( value ## _name ) = value ## _dfe.name) + +#define dfe_done(value) \ + dictionary_foreach_done(&value ## _dfe); \ + } while(0) + +extern void * dictionary_foreach_start_rw(DICTFE *dfe, DICTIONARY *dict, char rw); +extern void * dictionary_foreach_next(DICTFE *dfe); +extern usec_t dictionary_foreach_done(DICTFE *dfe); + +// Get statistics about the dictionary +// If DICTIONARY_FLAG_WITH_STATISTICS is not set, these return zero +extern size_t dictionary_stats_allocated_memory(DICTIONARY *dict); +extern size_t dictionary_stats_entries(DICTIONARY *dict); +extern size_t dictionary_stats_inserts(DICTIONARY *dict); +extern size_t dictionary_stats_searches(DICTIONARY *dict); +extern size_t dictionary_stats_deletes(DICTIONARY *dict); +extern size_t dictionary_stats_resets(DICTIONARY *dict); + +extern int dictionary_unittest(size_t entries); #endif /* NETDATA_DICTIONARY_H */ diff --git a/libnetdata/ebpf/ebpf.c b/libnetdata/ebpf/ebpf.c index dde6d57fc..ffb602307 100644 --- a/libnetdata/ebpf/ebpf.c +++ b/libnetdata/ebpf/ebpf.c @@ -843,9 +843,13 @@ void ebpf_adjust_thread_load(ebpf_module_t *mod, struct btf *file) } /** + * Parse BTF file * - * @param filename - * @return + * Parse a specific BTF file present on filesystem + * + * @param filename the file that will be parsed. + * + * @return It returns a pointer for the file on success and NULL otherwise. */ struct btf *ebpf_parse_btf_file(const char *filename) { @@ -858,6 +862,71 @@ struct btf *ebpf_parse_btf_file(const char *filename) return bf; } + +/** + * Load default btf file + * + * Load the default BTF file on environment. + * + * @param path is the fullpath + * @param filename is the file inside BTF path. + */ +struct btf *ebpf_load_btf_file(char *path, char *filename) +{ + char fullpath[PATH_MAX + 1]; + snprintfz(fullpath, PATH_MAX, "%s/%s", path, filename); + struct btf *ret = ebpf_parse_btf_file(fullpath); + if (!ret) + info("Your environment does not have BTF file %s/%s. The plugin will work with 'legacy' code.", + path, filename); + + return ret; +} + +/** + * Find BTF attach type + * + * Search type fr current btf file. + * + * @param file is the structure for the btf file already parsed. + */ +static inline const struct btf_type *ebpf_find_btf_attach_type(struct btf *file) +{ + int id = btf__find_by_name_kind(file, "bpf_attach_type", BTF_KIND_ENUM); + if (id < 0) { + fprintf(stderr, "Cannot find 'bpf_attach_type'"); + + return NULL; + } + + return btf__type_by_id(file, id); +} + +/** + * Is function inside BTF + * + * Look for a specific function inside the given BTF file. + * + * @param file is the structure for the btf file already parsed. + * @param function is the function that we want to find. + */ +int ebpf_is_function_inside_btf(struct btf *file, char *function) +{ + const struct btf_type *type = ebpf_find_btf_attach_type(file); + if (!type) + return -1; + + const struct btf_enum *e = btf_enum(type); + int i, id; + for (id = -1, i = 0; i < btf_vlen(type); i++, e++) { + if (!strcmp(btf__name_by_offset(file, e->name_off), "BPF_TRACE_FENTRY")) { + id = btf__find_by_name_kind(file, function, BTF_KIND_FUNC); + break; + } + } + + return (id > 0) ? 1 : 0; +} #endif /** @@ -902,6 +971,9 @@ void ebpf_update_module_using_config(ebpf_module_t *modules) modules->apps_charts = appconfig_get_boolean(modules->cfg, EBPF_GLOBAL_SECTION, EBPF_CFG_APPLICATION, modules->apps_charts); + modules->cgroup_charts = appconfig_get_boolean(modules->cfg, EBPF_GLOBAL_SECTION, EBPF_CFG_CGROUP, + modules->cgroup_charts); + modules->pid_map_size = (uint32_t)appconfig_get_number(modules->cfg, EBPF_GLOBAL_SECTION, EBPF_CFG_PID_SIZE, modules->pid_map_size); diff --git a/libnetdata/ebpf/ebpf.h b/libnetdata/ebpf/ebpf.h index f701e1d4d..ec486b59a 100644 --- a/libnetdata/ebpf/ebpf.h +++ b/libnetdata/ebpf/ebpf.h @@ -275,7 +275,8 @@ extern int ebpf_enable_tracing_values(char *subsys, char *eventname); extern int ebpf_disable_tracing_values(char *subsys, char *eventname); // BTF Section -#define EBPF_DEFAULT_BTF_FILE "/sys/kernel/btf" +#define EBPF_DEFAULT_BTF_FILE "vmlinux" +#define EBPF_DEFAULT_BTF_PATH "/sys/kernel/btf" #define EBPF_DEFAULT_ERROR_MSG "Cannot open or load BPF file for thread" // BTF helpers @@ -287,6 +288,8 @@ extern void ebpf_select_host_prefix(char *output, size_t length, char *syscall, #ifdef LIBBPF_MAJOR_VERSION extern void ebpf_adjust_thread_load(ebpf_module_t *mod, struct btf *file); extern struct btf *ebpf_parse_btf_file(const char *filename); +extern struct btf *ebpf_load_btf_file(char *path, char *filename); +extern int ebpf_is_function_inside_btf(struct btf *file, char *function); #endif #endif /* NETDATA_EBPF_H */ diff --git a/libnetdata/inlined.h b/libnetdata/inlined.h index 6f236b8f4..7e7d8ebed 100644 --- a/libnetdata/inlined.h +++ b/libnetdata/inlined.h @@ -222,19 +222,6 @@ static inline long double str2ld(const char *s, char **endptr) { } } -#ifdef NETDATA_STRCMP_OVERRIDE -#ifdef strcmp -#undef strcmp -#endif -#define strcmp(a, b) strsame(a, b) -#endif // NETDATA_STRCMP_OVERRIDE - -static inline int strsame(const char *a, const char *b) { - if(unlikely(a == b)) return 0; - while(*a && *a == *b) { a++; b++; } - return *a - *b; -} - static inline char *strncpyz(char *dst, const char *src, size_t n) { char *p = dst; diff --git a/libnetdata/libnetdata.c b/libnetdata/libnetdata.c index 18d022407..2997ce19e 100644 --- a/libnetdata/libnetdata.c +++ b/libnetdata/libnetdata.c @@ -11,7 +11,12 @@ #endif /* __FreeBSD__ || __APPLE__*/ struct rlimit rlimit_nofile = { .rlim_cur = 1024, .rlim_max = 1024 }; + +#ifdef MADV_MERGEABLE int enable_ksm = 1; +#else +int enable_ksm = 0; +#endif volatile sig_atomic_t netdata_exit = 0; const char *program_version = VERSION; @@ -26,6 +31,8 @@ const char *program_version = VERSION; // routines. #ifdef NETDATA_LOG_ALLOCATIONS +#warning NETDATA_LOG_ALLOCATIONS ENABLED - set log_thread_memory_allocations=1 on any thread to log all its allocations - or use log_allocations() to log them on demand + static __thread struct memory_statistics { volatile ssize_t malloc_calls_made; volatile ssize_t calloc_calls_made; @@ -39,23 +46,14 @@ static __thread struct memory_statistics { __thread size_t log_thread_memory_allocations = 0; -static inline void print_allocations(const char *file, const char *function, const unsigned long line, const char *type, size_t size) { +inline void log_allocations_int(const char *file, const char *function, const unsigned long line) { static __thread struct memory_statistics old = { 0, 0, 0, 0, 0, 0, 0, 0 }; - fprintf(stderr, "%s iteration %zu MEMORY TRACE: %lu@%s : %s : %s : %zu\n", - netdata_thread_tag(), - log_thread_memory_allocations, - line, file, function, - type, size - ); - - fprintf(stderr, "%s iteration %zu MEMORY ALLOCATIONS: (%04lu@%-40.40s:%-40.40s): Allocated %zd KiB (%+zd B), mmapped %zd KiB (%+zd B): %s : malloc %zd (%+zd), calloc %zd (%+zd), realloc %zd (%+zd), strdup %zd (%+zd), free %zd (%+zd)\n", + fprintf(stderr, "%s MEMORY ALLOCATIONS: (%04lu@%s:%s): Allocated %zd KiB (%+zd B), mmapped %zd KiB (%+zd B): : malloc %zd (%+zd), calloc %zd (%+zd), realloc %zd (%+zd), strdup %zd (%+zd), free %zd (%+zd)\n", netdata_thread_tag(), - log_thread_memory_allocations, line, file, function, (memory_statistics.allocated_memory + 512) / 1024, memory_statistics.allocated_memory - old.allocated_memory, (memory_statistics.mmapped_memory + 512) / 1024, memory_statistics.mmapped_memory - old.mmapped_memory, - type, memory_statistics.malloc_calls_made, memory_statistics.malloc_calls_made - old.malloc_calls_made, memory_statistics.calloc_calls_made, memory_statistics.calloc_calls_made - old.calloc_calls_made, memory_statistics.realloc_calls_made, memory_statistics.realloc_calls_made - old.realloc_calls_made, @@ -74,12 +72,12 @@ static inline void mmap_accounting(size_t size) { } void *mallocz_int(const char *file, const char *function, const unsigned long line, size_t size) { - if(log_thread_memory_allocations) { - memory_statistics.memory_calls_made++; - memory_statistics.malloc_calls_made++; - memory_statistics.allocated_memory += size; - print_allocations(file, function, line, "malloc()", size); - } + memory_statistics.memory_calls_made++; + memory_statistics.malloc_calls_made++; + memory_statistics.allocated_memory += size; + + if(log_thread_memory_allocations) + log_allocations_int(file, function, line); size_t *n = (size_t *)malloc(sizeof(size_t) + size); if (unlikely(!n)) fatal("mallocz() cannot allocate %zu bytes of memory.", size); @@ -90,12 +88,11 @@ void *mallocz_int(const char *file, const char *function, const unsigned long li void *callocz_int(const char *file, const char *function, const unsigned long line, size_t nmemb, size_t size) { size = nmemb * size; - if(log_thread_memory_allocations) { - memory_statistics.memory_calls_made++; - memory_statistics.calloc_calls_made++; - memory_statistics.allocated_memory += size; - print_allocations(file, function, line, "calloc()", size); - } + memory_statistics.memory_calls_made++; + memory_statistics.calloc_calls_made++; + memory_statistics.allocated_memory += size; + if(log_thread_memory_allocations) + log_allocations_int(file, function, line); size_t *n = (size_t *)calloc(1, sizeof(size_t) + size); if (unlikely(!n)) fatal("callocz() cannot allocate %zu bytes of memory.", size); @@ -113,12 +110,11 @@ void *reallocz_int(const char *file, const char *function, const unsigned long l n = realloc(n, sizeof(size_t) + size); if (unlikely(!n)) fatal("reallocz() cannot allocate %zu bytes of memory (from %zu bytes).", size, old_size); - if(log_thread_memory_allocations) { - memory_statistics.memory_calls_made++; - memory_statistics.realloc_calls_made++; - memory_statistics.allocated_memory += (size - old_size); - print_allocations(file, function, line, "realloc()", size - old_size); - } + memory_statistics.memory_calls_made++; + memory_statistics.realloc_calls_made++; + memory_statistics.allocated_memory += (size - old_size); + if(log_thread_memory_allocations) + log_allocations_int(file, function, line); *n = size; return (void *)&n[1]; @@ -127,12 +123,11 @@ void *reallocz_int(const char *file, const char *function, const unsigned long l char *strdupz_int(const char *file, const char *function, const unsigned long line, const char *s) { size_t size = strlen(s) + 1; - if(log_thread_memory_allocations) { - memory_statistics.memory_calls_made++; - memory_statistics.strdup_calls_made++; - memory_statistics.allocated_memory += size; - print_allocations(file, function, line, "strdup()", size); - } + memory_statistics.memory_calls_made++; + memory_statistics.strdup_calls_made++; + memory_statistics.allocated_memory += size; + if(log_thread_memory_allocations) + log_allocations_int(file, function, line); size_t *n = (size_t *)malloc(sizeof(size_t) + size); if (unlikely(!n)) fatal("strdupz() cannot allocate %zu bytes of memory.", size); @@ -150,12 +145,11 @@ void freez_int(const char *file, const char *function, const unsigned long line, n--; size_t size = *n; - if(log_thread_memory_allocations) { - memory_statistics.memory_calls_made++; - memory_statistics.free_calls_made++; - memory_statistics.allocated_memory -= size; - print_allocations(file, function, line, "free()", size); - } + memory_statistics.memory_calls_made++; + memory_statistics.free_calls_made++; + memory_statistics.allocated_memory -= size; + if(log_thread_memory_allocations) + log_allocations_int(file, function, line); free(n); } @@ -583,7 +577,7 @@ unsigned char netdata_map_chart_ids[256] = { [89] = 'y', // Y [90] = 'z', // Z [91] = '_', // [ - [92] = '/', // backslash + [92] = '_', // backslash [93] = '_', // ] [94] = '_', // ^ [95] = '_', // _ @@ -939,108 +933,128 @@ static int memory_file_open(const char *filename, size_t size) { return fd; } -// mmap_shared is used for memory mode = map -static void *memory_file_mmap(const char *filename, size_t size, int flags) { - // info("memory_file_mmap('%s', %zu", filename, size); - static int log_madvise = 1; +static inline int madvise_sequential(void *mem, size_t len) { + static int logger = 1; + int ret = madvise(mem, len, MADV_SEQUENTIAL); - int fd = -1; - if(filename) { - fd = memory_file_open(filename, size); - if(fd == -1) return MAP_FAILED; - } + if (ret != 0 && logger-- > 0) error("madvise(MADV_SEQUENTIAL) failed."); + return ret; +} - void *mem = mmap(NULL, size, PROT_READ | PROT_WRITE, flags, fd, 0); - if (mem != MAP_FAILED) { -#ifdef NETDATA_LOG_ALLOCATIONS - mmap_accounting(size); -#endif - int advise = MADV_SEQUENTIAL | MADV_DONTFORK; - if (flags & MAP_SHARED) advise |= MADV_WILLNEED; +static inline int madvise_dontfork(void *mem, size_t len) { + static int logger = 1; + int ret = madvise(mem, len, MADV_DONTFORK); - if (madvise(mem, size, advise) != 0 && log_madvise) { - error("Cannot advise the kernel about shared memory usage."); - log_madvise--; - } - } + if (ret != 0 && logger-- > 0) error("madvise(MADV_DONTFORK) failed."); + return ret; +} - if(fd != -1) - close(fd); +static inline int madvise_willneed(void *mem, size_t len) { + static int logger = 1; + int ret = madvise(mem, len, MADV_WILLNEED); - return mem; + if (ret != 0 && logger-- > 0) error("madvise(MADV_WILLNEED) failed."); + return ret; } +#if __linux__ +static inline int madvise_dontdump(void *mem, size_t len) { + static int logger = 1; + int ret = madvise(mem, len, MADV_DONTDUMP); + + if (ret != 0 && logger-- > 0) error("madvise(MADV_DONTDUMP) failed."); + return ret; +} +#else +static inline int madvise_dontdump(void *mem, size_t len) { + UNUSED(mem); + UNUSED(len); + + return 0; +} +#endif + +static inline int madvise_mergeable(void *mem, size_t len) { #ifdef MADV_MERGEABLE -static void *memory_file_mmap_ksm(const char *filename, size_t size, int flags) { - // info("memory_file_mmap_ksm('%s', %zu", filename, size); - static int log_madvise_2 = 1, log_madvise_3 = 1; + static int logger = 1; + int ret = madvise(mem, len, MADV_MERGEABLE); + + if (ret != 0 && logger-- > 0) error("madvise(MADV_MERGEABLE) failed."); + return ret; +#else + UNUSED(mem); + UNUSED(len); + + return 0; +#endif +} + +void *netdata_mmap(const char *filename, size_t size, int flags, int ksm) { + // info("netdata_mmap('%s', %zu", filename, size); + + // MAP_SHARED is used in memory mode map + // MAP_PRIVATE is used in memory mode ram and save + + if(unlikely(!(flags & MAP_SHARED) && !(flags & MAP_PRIVATE))) + fatal("Neither MAP_SHARED or MAP_PRIVATE were given to netdata_mmap()"); + + if(unlikely((flags & MAP_SHARED) && (flags & MAP_PRIVATE))) + fatal("Both MAP_SHARED and MAP_PRIVATE were given to netdata_mmap()"); + + if(unlikely((flags & MAP_SHARED) && (!filename || !*filename))) + fatal("MAP_SHARED requested, without a filename to netdata_mmap()"); + + // don't enable ksm is the global setting is disabled + if(unlikely(!enable_ksm)) ksm = 0; + + // KSM only merges anonymous (private) pages, never pagecache (file) pages + // but MAP_PRIVATE without MAP_ANONYMOUS it fails too, so we need it always + if((flags & MAP_PRIVATE)) flags |= MAP_ANONYMOUS; int fd = -1; - if(filename) { + void *mem = MAP_FAILED; + + if(filename && *filename) { + // open/create the file to be used fd = memory_file_open(filename, size); - if(fd == -1) return MAP_FAILED; + if(fd == -1) goto cleanup; + } + + int fd_for_mmap = fd; + if(fd != -1 && (flags & MAP_PRIVATE)) { + // this is MAP_PRIVATE allocation + // no need for mmap() to use our fd + // we will copy the file into the memory allocated + fd_for_mmap = -1; } - void *mem = mmap(NULL, size, PROT_READ | PROT_WRITE, flags | MAP_ANONYMOUS, -1, 0); + mem = mmap(NULL, size, PROT_READ | PROT_WRITE, flags, fd_for_mmap, 0); if (mem != MAP_FAILED) { + #ifdef NETDATA_LOG_ALLOCATIONS mmap_accounting(size); #endif - if(fd != -1) { + + // if we have a file open, but we didn't give it to mmap(), + // we have to read the file into the memory block we allocated + if(fd != -1 && fd_for_mmap == -1) { if (lseek(fd, 0, SEEK_SET) == 0) { if (read(fd, mem, size) != (ssize_t) size) - error("Cannot read from file '%s'", filename); + info("Cannot read from file '%s'", filename); } - else error("Cannot seek to beginning of file '%s'.", filename); - } - - // don't use MADV_SEQUENTIAL|MADV_DONTFORK, they disable MADV_MERGEABLE - if (madvise(mem, size, MADV_SEQUENTIAL | MADV_DONTFORK) != 0 && log_madvise_2) { - error("Cannot advise the kernel about the memory usage (MADV_SEQUENTIAL|MADV_DONTFORK) of file '%s'.", filename); - log_madvise_2--; + else info("Cannot seek to beginning of file '%s'.", filename); } - if (madvise(mem, size, MADV_MERGEABLE) != 0 && log_madvise_3) { - error("Cannot advise the kernel about the memory usage (MADV_MERGEABLE) of file '%s'.", filename); - log_madvise_3--; - } + madvise_sequential(mem, size); + madvise_dontfork(mem, size); + madvise_dontdump(mem, size); + if(flags & MAP_SHARED) madvise_willneed(mem, size); + if(ksm) madvise_mergeable(mem, size); } - if(fd != -1) - close(fd); - - return mem; -} -#else -static void *memory_file_mmap_ksm(const char *filename, size_t size, int flags) { - // info("memory_file_mmap_ksm FALLBACK ('%s', %zu", filename, size); - - if(filename) - return memory_file_mmap(filename, size, flags); - - // when KSM is not available and no filename is given (memory mode = ram), - // we just report failure - return MAP_FAILED; -} -#endif - -void *mymmap(const char *filename, size_t size, int flags, int ksm) { - void *mem = NULL; - - if (filename && (flags & MAP_SHARED || !enable_ksm || !ksm)) - // memory mode = map | save - // when KSM is not enabled - // MAP_SHARED is used for memory mode = map (no KSM possible) - mem = memory_file_mmap(filename, size, flags); - - else - // memory mode = save | ram - // when KSM is enabled - // for memory mode = ram, the filename is NULL - mem = memory_file_mmap_ksm(filename, size, flags); - +cleanup: + if(fd != -1) close(fd); if(mem == MAP_FAILED) return NULL; - errno = 0; return mem; } @@ -1097,14 +1111,13 @@ char *fgets_trim_len(char *buf, size_t buf_size, FILE *fp, size_t *len) { } int vsnprintfz(char *dst, size_t n, const char *fmt, va_list args) { + if(unlikely(!n)) return 0; + int size = vsnprintf(dst, n, fmt, args); + dst[n - 1] = '\0'; - if (unlikely((size_t) size > n)) { - // truncated - size = (int)n; - } + if (unlikely((size_t) size > n)) size = (int)n; - dst[size] = '\0'; return size; } diff --git a/libnetdata/libnetdata.h b/libnetdata/libnetdata.h index 7f62c882e..34062f2a6 100644 --- a/libnetdata/libnetdata.h +++ b/libnetdata/libnetdata.h @@ -233,12 +233,15 @@ extern __thread size_t log_thread_memory_allocations; #define mallocz(size) mallocz_int(__FILE__, __FUNCTION__, __LINE__, size) #define reallocz(ptr, size) reallocz_int(__FILE__, __FUNCTION__, __LINE__, ptr, size) #define freez(ptr) freez_int(__FILE__, __FUNCTION__, __LINE__, ptr) +#define log_allocations() log_allocations_int(__FILE__, __FUNCTION__, __LINE__) extern char *strdupz_int(const char *file, const char *function, const unsigned long line, const char *s); extern void *callocz_int(const char *file, const char *function, const unsigned long line, size_t nmemb, size_t size); extern void *mallocz_int(const char *file, const char *function, const unsigned long line, size_t size); extern void *reallocz_int(const char *file, const char *function, const unsigned long line, void *ptr, size_t size); extern void freez_int(const char *file, const char *function, const unsigned long line, void *ptr); +extern void log_allocations_int(const char *file, const char *function, const unsigned long line); + #else // NETDATA_LOG_ALLOCATIONS extern char *strdupz(const char *s) MALLOCLIKE NEVERNULL; extern void *callocz(size_t nmemb, size_t size) MALLOCLIKE NEVERNULL; @@ -250,7 +253,7 @@ extern void freez(void *ptr); extern void json_escape_string(char *dst, const char *src, size_t size); extern void json_fix_string(char *s); -extern void *mymmap(const char *filename, size_t size, int flags, int ksm); +extern void *netdata_mmap(const char *filename, size_t size, int flags, int ksm); extern int memory_file_save(const char *filename, void *mem, size_t size); extern int fd_is_valid(int fd); @@ -342,6 +345,8 @@ extern char *netdata_configured_host_prefix; #include "json/json.h" #include "health/health.h" #include "string/utf8.h" +#include "onewayalloc/onewayalloc.h" +#include "worker_utilization/worker_utilization.h" // BEWARE: Outside of the C code this also exists in alarm-notify.sh #define DEFAULT_CLOUD_BASE_URL "https://app.netdata.cloud" diff --git a/libnetdata/locks/README.md b/libnetdata/locks/README.md index a83f1b647..9ac96a8f6 100644 --- a/libnetdata/locks/README.md +++ b/libnetdata/locks/README.md @@ -2,4 +2,99 @@ custom_edit_url: https://github.com/netdata/netdata/edit/master/libnetdata/locks/README.md --> +## How to trace netdata locks + +To enable tracing rwlocks in netdata, compile netdata by setting `CFLAGS="-DNETDATA_TRACE_RWLOCKS=1"`, like this: + +``` +CFLAGS="-O1 -ggdb -DNETDATA_TRACE_RWLOCKS=1" ./netdata-installer.sh +``` + +During compilation, the compiler will log: + +``` +libnetdata/locks/locks.c:105:2: warning: #warning NETDATA_TRACE_RWLOCKS ENABLED - EXPECT A LOT OF OUTPUT [-Wcpp] + 105 | #warning NETDATA_TRACE_RWLOCKS ENABLED - EXPECT A LOT OF OUTPUT + | ^~~~~~~ +``` + +Once compiled, netdata will do the following: + +Every call to `netdata_rwlock_*()` is now measured in time. + +### logging of slow locks/unlocks + +If any call takes more than 10 usec, it will be logged like this: + +``` +RW_LOCK ON LOCK 0x0x7fbe1f2e5190: 4157038, 'ACLK_Query_2' (function build_context_param_list() 99@web/api/formatters/rrd2json.c) WAITED to UNLOCK for 29 usec. +``` + +The time can be changed by setting this `-DNETDATA_TRACE_RWLOCKS_WAIT_TIME_TO_IGNORE_USEC=20` (or whatever number) to the CFLAGS. + +### logging of long hold times + +If any lock is holded for more than 10000 usec, it will be logged like this: + +``` +RW_LOCK ON LOCK 0x0x55a20afc1b20: 4187198, 'ANALYTICS' (function analytics_gather_mutable_meta_data() 532@daemon/analytics.c) holded a 'R' for 13232 usec. +``` + +The time can be changed by setting this `-DNETDATA_TRACE_RWLOCKS_HOLD_TIME_TO_IGNORE_USEC=20000` (or whatever number) to the CFLAGS. + +### logging for probable pauses (predictive) + +The library maintains a linked-list of all the lock holders (one entry per thread). For this linked-list a mutex is used. So every call to the r/w locks now also has a mutex lock. + +If any call is expected to pause the caller (ie the caller is attempting a read lock while there is a write lock in place and vice versa), the library will log something like this: + +``` +RW_LOCK ON LOCK 0x0x5651c9fcce20: 4190039 'HEALTH' (function init_pending_foreach_alarms() 661@health/health.c) WANTS a 'W' lock (while holding 1 rwlocks and 1 mutexes). +There are 7 readers and 0 writers are holding the lock: + => 1: RW_LOCK: process 4190091 'WEB_SERVER[static14]' (function web_client_api_request_v1_data() 526@web/api/web_api_v1.c) is having 1 'R' lock for 709847 usec. + => 2: RW_LOCK: process 4190079 'WEB_SERVER[static6]' (function web_client_api_request_v1_data() 526@web/api/web_api_v1.c) is having 1 'R' lock for 709869 usec. + => 3: RW_LOCK: process 4190084 'WEB_SERVER[static10]' (function web_client_api_request_v1_data() 526@web/api/web_api_v1.c) is having 1 'R' lock for 709948 usec. + => 4: RW_LOCK: process 4190076 'WEB_SERVER[static3]' (function web_client_api_request_v1_data() 526@web/api/web_api_v1.c) is having 1 'R' lock for 710190 usec. + => 5: RW_LOCK: process 4190092 'WEB_SERVER[static15]' (function web_client_api_request_v1_data() 526@web/api/web_api_v1.c) is having 1 'R' lock for 710195 usec. + => 6: RW_LOCK: process 4190077 'WEB_SERVER[static4]' (function web_client_api_request_v1_data() 526@web/api/web_api_v1.c) is having 1 'R' lock for 710208 usec. + => 7: RW_LOCK: process 4190044 'WEB_SERVER[static1]' (function web_client_api_request_v1_data() 526@web/api/web_api_v1.c) is having 1 'R' lock for 710221 usec. +``` + +And each of the above is paired with a `GOT` log, like this: + +``` +RW_LOCK ON LOCK 0x0x5651c9fcce20: 4190039 'HEALTH' (function init_pending_foreach_alarms() 661@health/health.c) GOT a 'W' lock (while holding 2 rwlocks and 1 mutexes). +There are 0 readers and 1 writers are holding the lock: + => 1: RW_LOCK: process 4190039 'HEALTH' (function init_pending_foreach_alarms() 661@health/health.c) is having 1 'W' lock for 36 usec. +``` + +Keep in mind that the lock and log are not atomic. The list of callers is indicative (and sometimes just empty because the original holders of the lock, unlocked it until we had the chance to print their names). + +### POSIX compliance check + +The library may also log messages about POSIX unsupported cases, like this: + +``` +RW_LOCK FATAL ON LOCK 0x0x622000109290: 3609368 'PLUGIN[proc]' (function __rrdset_check_rdlock() 10@database/rrdset.c) attempts to acquire a 'W' lock. +But it is not supported by POSIX because: ALREADY HAS THIS LOCK +At this attempt, the task is holding 1 rwlocks and 1 mutexes. +There are 1 readers and 0 writers are holding the lock requested now: + => 1: RW_LOCK: process 3609368 'PLUGIN[proc]' (function rrdset_done() 1398@database/rrdset.c) is having 1 'R' lock for 0 usec. +``` + +### nested read locks + +When compiled with `-DNETDATA_TRACE_RWLOCKS_LOG_NESTED=1` the library will also detect nested read locks and print them like this: + +``` +RW_LOCK ON LOCK 0x0x7ff6ea46d190: 4140225 'WEB_SERVER[static14]' (function rrdr_json_wrapper_begin() 34@web/api/formatters/json_wrapper.c) NESTED READ LOCK REQUEST a 'R' lock (while holding 1 rwlocks and 1 mutexes). +There are 5 readers and 0 writers are holding the lock: + => 1: RW_LOCK: process 4140225 'WEB_SERVER[static14]' (function rrdr_lock_rrdset() 70@web/api/queries/rrdr.c) is having 1 'R' lock for 216667 usec. + => 2: RW_LOCK: process 4140211 'WEB_SERVER[static6]' (function rrdr_lock_rrdset() 70@web/api/queries/rrdr.c) is having 1 'R' lock for 220001 usec. + => 3: RW_LOCK: process 4140218 'WEB_SERVER[static8]' (function rrdr_lock_rrdset() 70@web/api/queries/rrdr.c) is having 1 'R' lock for 220001 usec. + => 4: RW_LOCK: process 4140224 'WEB_SERVER[static13]' (function rrdr_lock_rrdset() 70@web/api/queries/rrdr.c) is having 1 'R' lock for 220001 usec. + => 5: RW_LOCK: process 4140227 'WEB_SERVER[static16]' (function rrdr_lock_rrdset() 70@web/api/queries/rrdr.c) is having 1 'R' lock for 220001 usec. +``` + + diff --git a/libnetdata/locks/locks.c b/libnetdata/locks/locks.c index ca9a5aee9..8b5348678 100644 --- a/libnetdata/locks/locks.c +++ b/libnetdata/locks/locks.c @@ -2,11 +2,32 @@ #include "../libnetdata.h" +#ifdef NETDATA_TRACE_RWLOCKS + +#ifndef NETDATA_TRACE_RWLOCKS_WAIT_TIME_TO_IGNORE_USEC +#define NETDATA_TRACE_RWLOCKS_WAIT_TIME_TO_IGNORE_USEC 10 +#endif + +#ifndef NETDATA_TRACE_RWLOCKS_HOLD_TIME_TO_IGNORE_USEC +#define NETDATA_TRACE_RWLOCKS_HOLD_TIME_TO_IGNORE_USEC 10000 +#endif + +#ifndef NETDATA_THREAD_LOCKS_ARRAY_SIZE +#define NETDATA_THREAD_LOCKS_ARRAY_SIZE 10 +#endif +static __thread netdata_rwlock_t *netdata_thread_locks[NETDATA_THREAD_LOCKS_ARRAY_SIZE]; + + +#endif // NETDATA_TRACE_RWLOCKS + // ---------------------------------------------------------------------------- // automatic thread cancelability management, based on locks static __thread int netdata_thread_first_cancelability = 0; -static __thread int netdata_thread_lock_cancelability = 0; +static __thread int netdata_thread_nested_disables = 0; + +static __thread size_t netdata_locks_acquired_rwlocks = 0; +static __thread size_t netdata_locks_acquired_mutexes = 0; inline void netdata_thread_disable_cancelability(void) { int old; @@ -14,18 +35,19 @@ inline void netdata_thread_disable_cancelability(void) { if(ret != 0) error("THREAD_CANCELABILITY: pthread_setcancelstate() on thread %s returned error %d", netdata_thread_tag(), ret); else { - if(!netdata_thread_lock_cancelability) + if(!netdata_thread_nested_disables) netdata_thread_first_cancelability = old; - netdata_thread_lock_cancelability++; + netdata_thread_nested_disables++; } } inline void netdata_thread_enable_cancelability(void) { - if(netdata_thread_lock_cancelability < 1) { - error("THREAD_CANCELABILITY: netdata_thread_enable_cancelability(): invalid thread cancelability count %d on thread %s - results will be undefined - please report this!", netdata_thread_lock_cancelability, netdata_thread_tag()); + if(netdata_thread_nested_disables < 1) { + error("THREAD_CANCELABILITY: netdata_thread_enable_cancelability(): invalid thread cancelability count %d on thread %s - results will be undefined - please report this!", + netdata_thread_nested_disables, netdata_thread_tag()); } - else if(netdata_thread_lock_cancelability == 1) { + else if(netdata_thread_nested_disables == 1) { int old = 1; int ret = pthread_setcancelstate(netdata_thread_first_cancelability, &old); if(ret != 0) @@ -35,10 +57,10 @@ inline void netdata_thread_enable_cancelability(void) { error("THREAD_CANCELABILITY: netdata_thread_enable_cancelability(): old thread cancelability on thread %s was changed, expected DISABLED (%d), found %s (%d) - please report this!", netdata_thread_tag(), PTHREAD_CANCEL_DISABLE, (old == PTHREAD_CANCEL_ENABLE)?"ENABLED":"UNKNOWN", old); } - netdata_thread_lock_cancelability = 0; + netdata_thread_nested_disables = 0; } else - netdata_thread_lock_cancelability--; + netdata_thread_nested_disables--; } // ---------------------------------------------------------------------------- @@ -51,6 +73,13 @@ int __netdata_mutex_init(netdata_mutex_t *mutex) { return ret; } +int __netdata_mutex_destroy(netdata_mutex_t *mutex) { + int ret = pthread_mutex_destroy(mutex); + if(unlikely(ret != 0)) + error("MUTEX_LOCK: failed to destroy (code %d).", ret); + return ret; +} + int __netdata_mutex_lock(netdata_mutex_t *mutex) { netdata_thread_disable_cancelability(); @@ -59,6 +88,9 @@ int __netdata_mutex_lock(netdata_mutex_t *mutex) { netdata_thread_enable_cancelability(); error("MUTEX_LOCK: failed to get lock (code %d)", ret); } + else + netdata_locks_acquired_mutexes++; + return ret; } @@ -68,6 +100,8 @@ int __netdata_mutex_trylock(netdata_mutex_t *mutex) { int ret = pthread_mutex_trylock(mutex); if(ret != 0) netdata_thread_enable_cancelability(); + else + netdata_locks_acquired_mutexes++; return ret; } @@ -76,93 +110,105 @@ int __netdata_mutex_unlock(netdata_mutex_t *mutex) { int ret = pthread_mutex_unlock(mutex); if(unlikely(ret != 0)) error("MUTEX_LOCK: failed to unlock (code %d).", ret); - else + else { + netdata_locks_acquired_mutexes--; netdata_thread_enable_cancelability(); + } return ret; } +#ifdef NETDATA_TRACE_RWLOCKS + +#warning NETDATA_TRACE_RWLOCKS ENABLED - EXPECT A LOT OF OUTPUT + int netdata_mutex_init_debug(const char *file __maybe_unused, const char *function __maybe_unused, const unsigned long line __maybe_unused, netdata_mutex_t *mutex) { - usec_t start = 0; - (void)start; - - if(unlikely(debug_flags & D_LOCKS)) { - start = now_boottime_usec(); - debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_init(0x%p) from %lu@%s, %s()", mutex, line, file, function); - } + debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_init(%p) from %lu@%s, %s()", mutex, line, file, function); int ret = __netdata_mutex_init(mutex); - debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_init(0x%p) = %d in %llu usec, from %lu@%s, %s()", mutex, ret, now_boottime_usec() - start, line, file, function); + debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_init(%p) = %d, from %lu@%s, %s()", mutex, ret, line, file, function); return ret; } -int netdata_mutex_lock_debug(const char *file __maybe_unused, const char *function __maybe_unused, +int netdata_mutex_destroy_debug(const char *file __maybe_unused, const char *function __maybe_unused, const unsigned long line __maybe_unused, netdata_mutex_t *mutex) { - usec_t start = 0; - (void)start; + debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_destroy(%p) from %lu@%s, %s()", mutex, line, file, function); - if(unlikely(debug_flags & D_LOCKS)) { - start = now_boottime_usec(); - debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_lock(0x%p) from %lu@%s, %s()", mutex, line, file, function); - } + int ret = __netdata_mutex_destroy(mutex); + + debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_destroy(%p) = %d, from %lu@%s, %s()", mutex, ret, line, file, function); + + return ret; +} + +int netdata_mutex_lock_debug(const char *file __maybe_unused, const char *function __maybe_unused, + const unsigned long line __maybe_unused, netdata_mutex_t *mutex) { + debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_lock(%p) from %lu@%s, %s()", mutex, line, file, function); + usec_t start_s = now_monotonic_high_precision_usec(); int ret = __netdata_mutex_lock(mutex); + usec_t end_s = now_monotonic_high_precision_usec(); - debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_lock(0x%p) = %d in %llu usec, from %lu@%s, %s()", mutex, ret, now_boottime_usec() - start, line, file, function); + // remove compiler unused variables warning + (void)start_s; + (void)end_s; + + debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_lock(%p) = %d in %llu usec, from %lu@%s, %s()", mutex, ret, end_s - start_s, line, file, function); return ret; } int netdata_mutex_trylock_debug(const char *file __maybe_unused, const char *function __maybe_unused, const unsigned long line __maybe_unused, netdata_mutex_t *mutex) { - usec_t start = 0; - (void)start; - - if(unlikely(debug_flags & D_LOCKS)) { - start = now_boottime_usec(); - debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_trylock(0x%p) from %lu@%s, %s()", mutex, line, file, function); - } + debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_trylock(%p) from %lu@%s, %s()", mutex, line, file, function); + usec_t start_s = now_monotonic_high_precision_usec(); int ret = __netdata_mutex_trylock(mutex); + usec_t end_s = now_monotonic_high_precision_usec(); + + // remove compiler unused variables warning + (void)start_s; + (void)end_s; - debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_trylock(0x%p) = %d in %llu usec, from %lu@%s, %s()", mutex, ret, now_boottime_usec() - start, line, file, function); + debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_trylock(%p) = %d in %llu usec, from %lu@%s, %s()", mutex, ret, end_s - start_s, line, file, function); return ret; } int netdata_mutex_unlock_debug(const char *file __maybe_unused, const char *function __maybe_unused, const unsigned long line __maybe_unused, netdata_mutex_t *mutex) { - usec_t start = 0; - (void)start; - - if(unlikely(debug_flags & D_LOCKS)) { - start = now_boottime_usec(); - debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_unlock(0x%p) from %lu@%s, %s()", mutex, line, file, function); - } + debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_unlock(%p) from %lu@%s, %s()", mutex, line, file, function); + usec_t start_s = now_monotonic_high_precision_usec(); int ret = __netdata_mutex_unlock(mutex); + usec_t end_s = now_monotonic_high_precision_usec(); + + // remove compiler unused variables warning + (void)start_s; + (void)end_s; - debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_unlock(0x%p) = %d in %llu usec, from %lu@%s, %s()", mutex, ret, now_boottime_usec() - start, line, file, function); + debug(D_LOCKS, "MUTEX_LOCK: netdata_mutex_unlock(%p) = %d in %llu usec, from %lu@%s, %s()", mutex, ret, end_s - start_s, line, file, function); return ret; } +#endif // NETDATA_TRACE_RWLOCKS // ---------------------------------------------------------------------------- -// r/w lock +// rwlock int __netdata_rwlock_destroy(netdata_rwlock_t *rwlock) { - int ret = pthread_rwlock_destroy(rwlock); + int ret = pthread_rwlock_destroy(&rwlock->rwlock_t); if(unlikely(ret != 0)) error("RW_LOCK: failed to destroy lock (code %d)", ret); return ret; } int __netdata_rwlock_init(netdata_rwlock_t *rwlock) { - int ret = pthread_rwlock_init(rwlock, NULL); + int ret = pthread_rwlock_init(&rwlock->rwlock_t, NULL); if(unlikely(ret != 0)) error("RW_LOCK: failed to initialize lock (code %d)", ret); return ret; @@ -171,11 +217,13 @@ int __netdata_rwlock_init(netdata_rwlock_t *rwlock) { int __netdata_rwlock_rdlock(netdata_rwlock_t *rwlock) { netdata_thread_disable_cancelability(); - int ret = pthread_rwlock_rdlock(rwlock); + int ret = pthread_rwlock_rdlock(&rwlock->rwlock_t); if(unlikely(ret != 0)) { netdata_thread_enable_cancelability(); error("RW_LOCK: failed to obtain read lock (code %d)", ret); } + else + netdata_locks_acquired_rwlocks++; return ret; } @@ -183,21 +231,25 @@ int __netdata_rwlock_rdlock(netdata_rwlock_t *rwlock) { int __netdata_rwlock_wrlock(netdata_rwlock_t *rwlock) { netdata_thread_disable_cancelability(); - int ret = pthread_rwlock_wrlock(rwlock); + int ret = pthread_rwlock_wrlock(&rwlock->rwlock_t); if(unlikely(ret != 0)) { error("RW_LOCK: failed to obtain write lock (code %d)", ret); netdata_thread_enable_cancelability(); } + else + netdata_locks_acquired_rwlocks++; return ret; } int __netdata_rwlock_unlock(netdata_rwlock_t *rwlock) { - int ret = pthread_rwlock_unlock(rwlock); + int ret = pthread_rwlock_unlock(&rwlock->rwlock_t); if(unlikely(ret != 0)) error("RW_LOCK: failed to release lock (code %d)", ret); - else + else { netdata_thread_enable_cancelability(); + netdata_locks_acquired_rwlocks--; + } return ret; } @@ -205,9 +257,11 @@ int __netdata_rwlock_unlock(netdata_rwlock_t *rwlock) { int __netdata_rwlock_tryrdlock(netdata_rwlock_t *rwlock) { netdata_thread_disable_cancelability(); - int ret = pthread_rwlock_tryrdlock(rwlock); + int ret = pthread_rwlock_tryrdlock(&rwlock->rwlock_t); if(ret != 0) netdata_thread_enable_cancelability(); + else + netdata_locks_acquired_rwlocks++; return ret; } @@ -215,129 +269,458 @@ int __netdata_rwlock_tryrdlock(netdata_rwlock_t *rwlock) { int __netdata_rwlock_trywrlock(netdata_rwlock_t *rwlock) { netdata_thread_disable_cancelability(); - int ret = pthread_rwlock_trywrlock(rwlock); + int ret = pthread_rwlock_trywrlock(&rwlock->rwlock_t); if(ret != 0) netdata_thread_enable_cancelability(); + else + netdata_locks_acquired_rwlocks++; return ret; } +#ifdef NETDATA_TRACE_RWLOCKS + +// ---------------------------------------------------------------------------- +// lockers list + +void not_supported_by_posix_rwlocks(const char *file, const char *function, const unsigned long line, netdata_rwlock_t *rwlock, char locktype, const char *reason) { + __netdata_mutex_lock(&rwlock->lockers_mutex); + fprintf(stderr, + "RW_LOCK FATAL ON LOCK %p: %d '%s' (function %s() %lu@%s) attempts to acquire a '%c' lock, but it is not supported by POSIX because: %s. At this attempt, the task is holding %zu rwlocks and %zu mutexes. There are %zu readers and %zu writers holding this lock:\n", + rwlock, + gettid(), netdata_thread_tag(), + function, line, file, + locktype, + reason, + netdata_locks_acquired_rwlocks, netdata_locks_acquired_mutexes, + rwlock->readers, rwlock->writers); + + int i; + usec_t now = now_monotonic_high_precision_usec(); + netdata_rwlock_locker *p; + for(i = 1, p = rwlock->lockers; p ;p = p->next, i++) { + fprintf(stderr, + " => %i: RW_LOCK %p: process %d '%s' (function %s() %lu@%s) is having %zu '%c' lock for %llu usec.\n", + i, rwlock, + p->pid, p->tag, + p->function, p->line, p->file, + p->callers, p->lock, + (now - p->start_s)); + } + __netdata_mutex_unlock(&rwlock->lockers_mutex); +} + +static void log_rwlock_lockers(const char *file, const char *function, const unsigned long line, netdata_rwlock_t *rwlock, const char *reason, char locktype) { + + // this function can only be used by one thread at a time + // because otherwise, the threads may deadlock waiting for each other + static netdata_mutex_t log_lockers_mutex = NETDATA_MUTEX_INITIALIZER; + __netdata_mutex_lock(&log_lockers_mutex); + + // now work on this locker + __netdata_mutex_lock(&rwlock->lockers_mutex); + fprintf(stderr, + "RW_LOCK ON LOCK %p: %d '%s' (function %s() %lu@%s) %s a '%c' lock (while holding %zu rwlocks and %zu mutexes). There are %zu readers and %zu writers holding this lock:\n", + rwlock, + gettid(), netdata_thread_tag(), + function, line, file, + reason, locktype, + netdata_locks_acquired_rwlocks, netdata_locks_acquired_mutexes, + rwlock->readers, rwlock->writers); + + int i; + usec_t now = now_monotonic_high_precision_usec(); + netdata_rwlock_locker *p; + for(i = 1, p = rwlock->lockers; p ;p = p->next, i++) { + fprintf(stderr, + " => %i: RW_LOCK %p: process %d '%s' (function %s() %lu@%s) is having %zu '%c' lock for %llu usec.\n", + i, rwlock, + p->pid, p->tag, + p->function, p->line, p->file, + p->callers, p->lock, + (now - p->start_s)); + + if(p->all_caller_locks) { + // find the lock in the netdata_thread_locks[] + // and remove it + int k; + for(k = 0; k < NETDATA_THREAD_LOCKS_ARRAY_SIZE ;k++) { + if (p->all_caller_locks[k] && p->all_caller_locks[k] != rwlock) { + + // lock the other lock lockers list + __netdata_mutex_lock(&p->all_caller_locks[k]->lockers_mutex); + + // print the list of lockers of the other lock + netdata_rwlock_locker *r; + int j; + for(j = 1, r = p->all_caller_locks[k]->lockers; r ;r = r->next, j++) { + fprintf( + stderr, + " ~~~> %i: RW_LOCK %p: process %d '%s' (function %s() %lu@%s) is having %zu '%c' lock for %llu usec.\n", + j, + p->all_caller_locks[k], + r->pid, + r->tag, + r->function, + r->line, + r->file, + r->callers, + r->lock, + (now - r->start_s)); + } + + // unlock the other lock lockers list + __netdata_mutex_unlock(&p->all_caller_locks[k]->lockers_mutex); + } + } + } + + } + __netdata_mutex_unlock(&rwlock->lockers_mutex); + + // unlock this function for other threads + __netdata_mutex_unlock(&log_lockers_mutex); +} + +static netdata_rwlock_locker *add_rwlock_locker(const char *file, const char *function, const unsigned long line, netdata_rwlock_t *rwlock, char lock_type) { + netdata_rwlock_locker *p = mallocz(sizeof(netdata_rwlock_locker)); + p->pid = gettid(); + p->tag = netdata_thread_tag(); + p->lock = lock_type; + p->file = file; + p->function = function; + p->line = line; + p->callers = 1; + p->all_caller_locks = netdata_thread_locks; + p->start_s = now_monotonic_high_precision_usec(); + + // find a slot in the netdata_thread_locks[] + int i; + for(i = 0; i < NETDATA_THREAD_LOCKS_ARRAY_SIZE ;i++) { + if (!netdata_thread_locks[i]) { + netdata_thread_locks[i] = rwlock; + break; + } + } + + __netdata_mutex_lock(&rwlock->lockers_mutex); + p->next = rwlock->lockers; + rwlock->lockers = p; + if(lock_type == 'R') rwlock->readers++; + if(lock_type == 'W') rwlock->writers++; + __netdata_mutex_unlock(&rwlock->lockers_mutex); + + return p; +} + +static void remove_rwlock_locker(const char *file __maybe_unused, const char *function __maybe_unused, const unsigned long line __maybe_unused, netdata_rwlock_t *rwlock, netdata_rwlock_locker *locker) { + usec_t end_s = now_monotonic_high_precision_usec(); + + if(locker->callers == 0) + fprintf(stderr, + "RW_LOCK ON LOCK %p: %d, '%s' (function %s() %lu@%s) callers should be positive but it is zero\n", + rwlock, + locker->pid, locker->tag, + locker->function, locker->line, locker->file); + + if(locker->callers > 1 && locker->lock != 'R') + fprintf(stderr, + "RW_LOCK ON LOCK %p: %d, '%s' (function %s() %lu@%s) only 'R' locks support multiple holders, but here we have %zu callers holding a '%c' lock.\n", + rwlock, + locker->pid, locker->tag, + locker->function, locker->line, locker->file, + locker->callers, locker->lock); + + __netdata_mutex_lock(&rwlock->lockers_mutex); + locker->callers--; + + if(!locker->callers) { + int doit = 0; + + if (rwlock->lockers == locker) { + rwlock->lockers = locker->next; + doit = 1; + } else { + netdata_rwlock_locker *p; + for (p = rwlock->lockers; p && p->next != locker; p = p->next) + ; + if (p && p->next == locker) { + p->next = locker->next; + doit = 1; + } + } + if(doit) { + if(locker->lock == 'R') rwlock->readers--; + if(locker->lock == 'W') rwlock->writers--; + } + + if(!doit) { + fprintf(stderr, + "RW_LOCK ON LOCK %p: %d, '%s' (function %s() %lu@%s) with %zu x '%c' lock is not found.\n", + rwlock, + locker->pid, locker->tag, + locker->function, locker->line, locker->file, + locker->callers, locker->lock); + } + else { + // find the lock in the netdata_thread_locks[] + // and remove it + int i; + for(i = 0; i < NETDATA_THREAD_LOCKS_ARRAY_SIZE ;i++) { + if (netdata_thread_locks[i] == rwlock) + netdata_thread_locks[i] = NULL; + } + + if(end_s - locker->start_s >= NETDATA_TRACE_RWLOCKS_HOLD_TIME_TO_IGNORE_USEC) + fprintf(stderr, + "RW_LOCK ON LOCK %p: %d, '%s' (function %s() %lu@%s) holded a '%c' for %llu usec.\n", + rwlock, + locker->pid, locker->tag, + locker->function, locker->line, locker->file, + locker->lock, end_s - locker->start_s); + + freez(locker); + } + } + + __netdata_mutex_unlock(&rwlock->lockers_mutex); +} + +static netdata_rwlock_locker *find_rwlock_locker(const char *file __maybe_unused, const char *function __maybe_unused, const unsigned long line __maybe_unused, netdata_rwlock_t *rwlock) { + pid_t pid = gettid(); + netdata_rwlock_locker *p; + + __netdata_mutex_lock(&rwlock->lockers_mutex); + for(p = rwlock->lockers; p ;p = p->next) { + if(p->pid == pid) break; + } + __netdata_mutex_unlock(&rwlock->lockers_mutex); + + return p; +} + +static netdata_rwlock_locker *update_or_add_rwlock_locker(const char *file, const char *function, const unsigned long line, netdata_rwlock_t *rwlock, netdata_rwlock_locker *locker, char locktype) { + if(!locker) { + return add_rwlock_locker(file, function, line, rwlock, locktype); + } + else if(locker->lock == 'R' && locktype == 'R') { + __netdata_mutex_lock(&rwlock->lockers_mutex); + locker->callers++; + __netdata_mutex_unlock(&rwlock->lockers_mutex); + return locker; + } + else { + not_supported_by_posix_rwlocks(file, function, line, rwlock, locktype, "DEADLOCK - WANTS TO CHANGE LOCK TYPE BUT ALREADY HAS THIS LOCKED"); + return locker; + } +} + +// ---------------------------------------------------------------------------- +// debug versions of rwlock int netdata_rwlock_destroy_debug(const char *file __maybe_unused, const char *function __maybe_unused, const unsigned long line __maybe_unused, netdata_rwlock_t *rwlock) { - usec_t start = 0; - (void)start; + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_destroy(%p) from %lu@%s, %s()", rwlock, line, file, function); - if(unlikely(debug_flags & D_LOCKS)) { - start = now_boottime_usec(); - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_destroy(0x%p) from %lu@%s, %s()", rwlock, line, file, function); - } + if(rwlock->readers) + error("RW_LOCK: destroying a rwlock with %zu readers in it", rwlock->readers); + if(rwlock->writers) + error("RW_LOCK: destroying a rwlock with %zu writers in it", rwlock->writers); int ret = __netdata_rwlock_destroy(rwlock); + if(!ret) { + while (rwlock->lockers) + remove_rwlock_locker(file, function, line, rwlock, rwlock->lockers); + + if (rwlock->readers) + error("RW_LOCK: internal error - empty rwlock with %zu readers in it", rwlock->readers); + if (rwlock->writers) + error("RW_LOCK: internal error - empty rwlock with %zu writers in it", rwlock->writers); + } - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_destroy(0x%p) = %d in %llu usec, from %lu@%s, %s()", rwlock, ret, now_boottime_usec() - start, line, file, function); + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_destroy(%p) = %d, from %lu@%s, %s()", rwlock, ret, line, file, function); return ret; } int netdata_rwlock_init_debug(const char *file __maybe_unused, const char *function __maybe_unused, const unsigned long line __maybe_unused, netdata_rwlock_t *rwlock) { - usec_t start = 0; - (void)start; - - if(unlikely(debug_flags & D_LOCKS)) { - start = now_boottime_usec(); - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_init(0x%p) from %lu@%s, %s()", rwlock, line, file, function); - } + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_init(%p) from %lu@%s, %s()", rwlock, line, file, function); int ret = __netdata_rwlock_init(rwlock); + if(!ret) { + __netdata_mutex_init(&rwlock->lockers_mutex); + rwlock->lockers = NULL; + rwlock->readers = 0; + rwlock->writers = 0; + } - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_init(0x%p) = %d in %llu usec, from %lu@%s, %s()", rwlock, ret, now_boottime_usec() - start, line, file, function); + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_init(%p) = %d, from %lu@%s, %s()", rwlock, ret, line, file, function); return ret; } int netdata_rwlock_rdlock_debug(const char *file __maybe_unused, const char *function __maybe_unused, const unsigned long line __maybe_unused, netdata_rwlock_t *rwlock) { - usec_t start = 0; - (void)start; - if(unlikely(debug_flags & D_LOCKS)) { - start = now_boottime_usec(); - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_rdlock(0x%p) from %lu@%s, %s()", rwlock, line, file, function); + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_rdlock(%p) from %lu@%s, %s()", rwlock, line, file, function); + + netdata_rwlock_locker *locker = find_rwlock_locker(file, function, line, rwlock); + +#ifdef NETDATA_TRACE_RWLOCKS_LOG_NESTED + if(locker && locker->lock == 'R') { + log_rwlock_lockers(file, function, line, rwlock, "NESTED READ LOCK REQUEST", 'R'); + } +#endif // NETDATA_TRACE_RWLOCKS_LOG_NESTED + + int log = 0; + if(rwlock->writers) { + log_rwlock_lockers(file, function, line, rwlock, "WANTS", 'R'); + log = 1; } + usec_t start_s = now_monotonic_high_precision_usec(); int ret = __netdata_rwlock_rdlock(rwlock); + usec_t end_s = now_monotonic_high_precision_usec(); + + if(!ret) { + locker = update_or_add_rwlock_locker(file, function, line, rwlock, locker, 'R'); + if(log) log_rwlock_lockers(file, function, line, rwlock, "GOT", 'R'); + + } - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_rdlock(0x%p) = %d in %llu usec, from %lu@%s, %s()", rwlock, ret, now_boottime_usec() - start, line, file, function); + if(end_s - start_s >= NETDATA_TRACE_RWLOCKS_WAIT_TIME_TO_IGNORE_USEC) + fprintf(stderr, + "RW_LOCK ON LOCK %p: %d, '%s' (function %s() %lu@%s) WAITED for a READ lock for %llu usec.\n", + rwlock, + gettid(), netdata_thread_tag(), + function, line, file, + end_s - start_s); + + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_rdlock(%p) = %d in %llu usec, from %lu@%s, %s()", rwlock, ret, end_s - start_s, line, file, function); return ret; } int netdata_rwlock_wrlock_debug(const char *file __maybe_unused, const char *function __maybe_unused, const unsigned long line __maybe_unused, netdata_rwlock_t *rwlock) { - usec_t start = 0; - (void)start; - if(unlikely(debug_flags & D_LOCKS)) { - start = now_boottime_usec(); - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_wrlock(0x%p) from %lu@%s, %s()", rwlock, line, file, function); + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_wrlock(%p) from %lu@%s, %s()", rwlock, line, file, function); + + netdata_rwlock_locker *locker = find_rwlock_locker(file, function, line, rwlock); + if(locker) + not_supported_by_posix_rwlocks(file, function, line, rwlock, 'W', "DEADLOCK - WANTS A WRITE LOCK BUT ALREADY HAVE THIS LOCKED"); + + int log = 0; + if(rwlock->readers) { + log_rwlock_lockers(file, function, line, rwlock, "WANTS", 'W'); + log = 1; } + usec_t start_s = now_monotonic_high_precision_usec(); int ret = __netdata_rwlock_wrlock(rwlock); + usec_t end_s = now_monotonic_high_precision_usec(); - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_wrlock(0x%p) = %d in %llu usec, from %lu@%s, %s()", rwlock, ret, now_boottime_usec() - start, line, file, function); + if(!ret){ + locker = update_or_add_rwlock_locker(file, function, line, rwlock, locker, 'W'); + if(log) log_rwlock_lockers(file, function, line, rwlock, "GOT", 'W'); + } + + if(end_s - start_s >= NETDATA_TRACE_RWLOCKS_WAIT_TIME_TO_IGNORE_USEC) + fprintf(stderr, + "RW_LOCK ON LOCK %p: %d, '%s' (function %s() %lu@%s) WAITED for a WRITE lock for %llu usec.\n", + rwlock, + gettid(), netdata_thread_tag(), + function, line, file, + end_s - start_s); + + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_wrlock(%p) = %d in %llu usec, from %lu@%s, %s()", rwlock, ret, end_s - start_s, line, file, function); return ret; } int netdata_rwlock_unlock_debug(const char *file __maybe_unused, const char *function __maybe_unused, const unsigned long line __maybe_unused, netdata_rwlock_t *rwlock) { - usec_t start = 0; - (void)start; - if(unlikely(debug_flags & D_LOCKS)) { - start = now_boottime_usec(); - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_unlock(0x%p) from %lu@%s, %s()", rwlock, line, file, function); - } + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_unlock(%p) from %lu@%s, %s()", rwlock, line, file, function); + netdata_rwlock_locker *locker = find_rwlock_locker(file, function, line, rwlock); + if(unlikely(!locker)) + not_supported_by_posix_rwlocks(file, function, line, rwlock, 'U', "UNLOCK WITHOUT LOCK"); + + usec_t start_s = now_monotonic_high_precision_usec(); int ret = __netdata_rwlock_unlock(rwlock); + usec_t end_s = now_monotonic_high_precision_usec(); + + if(end_s - start_s >= NETDATA_TRACE_RWLOCKS_WAIT_TIME_TO_IGNORE_USEC) + fprintf(stderr, + "RW_LOCK ON LOCK %p: %d, '%s' (function %s() %lu@%s) WAITED to UNLOCK for %llu usec.\n", + rwlock, + gettid(), netdata_thread_tag(), + function, line, file, + end_s - start_s); + + if(likely(!ret && locker)) remove_rwlock_locker(file, function, line, rwlock, locker); - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_unlock(0x%p) = %d in %llu usec, from %lu@%s, %s()", rwlock, ret, now_boottime_usec() - start, line, file, function); + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_unlock(%p) = %d in %llu usec, from %lu@%s, %s()", rwlock, ret, end_s - start_s, line, file, function); return ret; } int netdata_rwlock_tryrdlock_debug(const char *file __maybe_unused, const char *function __maybe_unused, const unsigned long line __maybe_unused, netdata_rwlock_t *rwlock) { - usec_t start = 0; - (void)start; + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_tryrdlock(%p) from %lu@%s, %s()", rwlock, line, file, function); - if(unlikely(debug_flags & D_LOCKS)) { - start = now_boottime_usec(); - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_tryrdlock(0x%p) from %lu@%s, %s()", rwlock, line, file, function); - } + netdata_rwlock_locker *locker = find_rwlock_locker(file, function, line, rwlock); + if(locker && locker->lock == 'W') + not_supported_by_posix_rwlocks(file, function, line, rwlock, 'R', "DEADLOCK - WANTS A READ LOCK BUT IT HAS A WRITE LOCK ALREADY"); + usec_t start_s = now_monotonic_high_precision_usec(); int ret = __netdata_rwlock_tryrdlock(rwlock); + usec_t end_s = now_monotonic_high_precision_usec(); + + if(!ret) + locker = update_or_add_rwlock_locker(file, function, line, rwlock, locker, 'R'); - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_tryrdlock(0x%p) = %d in %llu usec, from %lu@%s, %s()", rwlock, ret, now_boottime_usec() - start, line, file, function); + if(end_s - start_s >= NETDATA_TRACE_RWLOCKS_WAIT_TIME_TO_IGNORE_USEC) + fprintf(stderr, + "RW_LOCK ON LOCK %p: %d, '%s' (function %s() %lu@%s) WAITED to TRYREAD for %llu usec.\n", + rwlock, + gettid(), netdata_thread_tag(), + function, line, file, + end_s - start_s); + + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_tryrdlock(%p) = %d in %llu usec, from %lu@%s, %s()", rwlock, ret, end_s - start_s, line, file, function); return ret; } int netdata_rwlock_trywrlock_debug(const char *file __maybe_unused, const char *function __maybe_unused, const unsigned long line __maybe_unused, netdata_rwlock_t *rwlock) { - usec_t start = 0; - (void)start; + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_trywrlock(%p) from %lu@%s, %s()", rwlock, line, file, function); - if(unlikely(debug_flags & D_LOCKS)) { - start = now_boottime_usec(); - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_trywrlock(0x%p) from %lu@%s, %s()", rwlock, line, file, function); - } + netdata_rwlock_locker *locker = find_rwlock_locker(file, function, line, rwlock); + if(locker) + not_supported_by_posix_rwlocks(file, function, line, rwlock, 'W', "ALREADY HAS THIS LOCK"); + usec_t start_s = now_monotonic_high_precision_usec(); int ret = __netdata_rwlock_trywrlock(rwlock); + usec_t end_s = now_monotonic_high_precision_usec(); - debug(D_LOCKS, "RW_LOCK: netdata_rwlock_trywrlock(0x%p) = %d in %llu usec, from %lu@%s, %s()", rwlock, ret, now_boottime_usec() - start, line, file, function); + if(!ret) + locker = update_or_add_rwlock_locker(file, function, line, rwlock, locker, 'W'); + + if(end_s - start_s >= NETDATA_TRACE_RWLOCKS_WAIT_TIME_TO_IGNORE_USEC) + fprintf(stderr, + "RW_LOCK ON LOCK %p: %d, '%s' (function %s() %lu@%s) WAITED to TRYWRITE for %llu usec.\n", + rwlock, + gettid(), netdata_thread_tag(), + function, line, file, + end_s - start_s); + + debug(D_LOCKS, "RW_LOCK: netdata_rwlock_trywrlock(%p) = %d in %llu usec, from %lu@%s, %s()", rwlock, ret, end_s - start_s, line, file, function); return ret; } + +#endif // NETDATA_TRACE_RWLOCKS diff --git a/libnetdata/locks/locks.h b/libnetdata/locks/locks.h index 850dd7ebc..796b53c6d 100644 --- a/libnetdata/locks/locks.h +++ b/libnetdata/locks/locks.h @@ -4,14 +4,55 @@ #define NETDATA_LOCKS_H 1 #include "../libnetdata.h" +#include "../clocks/clocks.h" typedef pthread_mutex_t netdata_mutex_t; #define NETDATA_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER -typedef pthread_rwlock_t netdata_rwlock_t; -#define NETDATA_RWLOCK_INITIALIZER PTHREAD_RWLOCK_INITIALIZER +#ifdef NETDATA_TRACE_RWLOCKS +typedef struct netdata_rwlock_locker { + pid_t pid; + const char *tag; + char lock; // 'R', 'W' + const char *file; + const char *function; + unsigned long line; + size_t callers; + usec_t start_s; + struct netdata_rwlock_t **all_caller_locks; + struct netdata_rwlock_locker *next; +} netdata_rwlock_locker; + +typedef struct netdata_rwlock_t { + pthread_rwlock_t rwlock_t; // the lock + size_t readers; // the number of reader on the lock + size_t writers; // the number of writers on the lock + netdata_mutex_t lockers_mutex; // a mutex to protect the linked list of the lock holding threads + netdata_rwlock_locker *lockers; // the linked list of the lock holding threads +} netdata_rwlock_t; + +#define NETDATA_RWLOCK_INITIALIZER { \ + .rwlock_t = PTHREAD_RWLOCK_INITIALIZER, \ + .readers = 0, \ + .writers = 0, \ + .lockers_mutex = NETDATA_MUTEX_INITIALIZER, \ + .lockers = NULL \ + } + +#else // NETDATA_TRACE_RWLOCKS + +typedef struct netdata_rwlock_t { + pthread_rwlock_t rwlock_t; +} netdata_rwlock_t; + +#define NETDATA_RWLOCK_INITIALIZER { \ + .rwlock_t = PTHREAD_RWLOCK_INITIALIZER \ + } + +#endif // NETDATA_TRACE_RWLOCKS extern int __netdata_mutex_init(netdata_mutex_t *mutex); +extern int __netdata_mutex_destroy(netdata_mutex_t *mutex); extern int __netdata_mutex_lock(netdata_mutex_t *mutex); extern int __netdata_mutex_trylock(netdata_mutex_t *mutex); extern int __netdata_mutex_unlock(netdata_mutex_t *mutex); @@ -24,7 +65,13 @@ extern int __netdata_rwlock_unlock(netdata_rwlock_t *rwlock); extern int __netdata_rwlock_tryrdlock(netdata_rwlock_t *rwlock); extern int __netdata_rwlock_trywrlock(netdata_rwlock_t *rwlock); +extern void netdata_thread_disable_cancelability(void); +extern void netdata_thread_enable_cancelability(void); + +#ifdef NETDATA_TRACE_RWLOCKS + extern int netdata_mutex_init_debug( const char *file, const char *function, const unsigned long line, netdata_mutex_t *mutex); +extern int netdata_mutex_destroy_debug( const char *file, const char *function, const unsigned long line, netdata_mutex_t *mutex); extern int netdata_mutex_lock_debug( const char *file, const char *function, const unsigned long line, netdata_mutex_t *mutex); extern int netdata_mutex_trylock_debug( const char *file, const char *function, const unsigned long line, netdata_mutex_t *mutex); extern int netdata_mutex_unlock_debug( const char *file, const char *function, const unsigned long line, netdata_mutex_t *mutex); @@ -37,12 +84,8 @@ extern int netdata_rwlock_unlock_debug( const char *file, const char *function, extern int netdata_rwlock_tryrdlock_debug( const char *file, const char *function, const unsigned long line, netdata_rwlock_t *rwlock); extern int netdata_rwlock_trywrlock_debug( const char *file, const char *function, const unsigned long line, netdata_rwlock_t *rwlock); -extern void netdata_thread_disable_cancelability(void); -extern void netdata_thread_enable_cancelability(void); - -#ifdef NETDATA_INTERNAL_CHECKS - #define netdata_mutex_init(mutex) netdata_mutex_init_debug(__FILE__, __FUNCTION__, __LINE__, mutex) +#define netdata_mutex_destroy(mutex) netdata_mutex_init_debug(__FILE__, __FUNCTION__, __LINE__, mutex) #define netdata_mutex_lock(mutex) netdata_mutex_lock_debug(__FILE__, __FUNCTION__, __LINE__, mutex) #define netdata_mutex_trylock(mutex) netdata_mutex_trylock_debug(__FILE__, __FUNCTION__, __LINE__, mutex) #define netdata_mutex_unlock(mutex) netdata_mutex_unlock_debug(__FILE__, __FUNCTION__, __LINE__, mutex) @@ -55,9 +98,10 @@ extern void netdata_thread_enable_cancelability(void); #define netdata_rwlock_tryrdlock(rwlock) netdata_rwlock_tryrdlock_debug(__FILE__, __FUNCTION__, __LINE__, rwlock) #define netdata_rwlock_trywrlock(rwlock) netdata_rwlock_trywrlock_debug(__FILE__, __FUNCTION__, __LINE__, rwlock) -#else // !NETDATA_INTERNAL_CHECKS +#else // !NETDATA_TRACE_RWLOCKS #define netdata_mutex_init(mutex) __netdata_mutex_init(mutex) +#define netdata_mutex_destroy(mutex) __netdata_mutex_destroy(mutex) #define netdata_mutex_lock(mutex) __netdata_mutex_lock(mutex) #define netdata_mutex_trylock(mutex) __netdata_mutex_trylock(mutex) #define netdata_mutex_unlock(mutex) __netdata_mutex_unlock(mutex) @@ -70,6 +114,6 @@ extern void netdata_thread_enable_cancelability(void); #define netdata_rwlock_tryrdlock(rwlock) __netdata_rwlock_tryrdlock(rwlock) #define netdata_rwlock_trywrlock(rwlock) __netdata_rwlock_trywrlock(rwlock) -#endif // NETDATA_INTERNAL_CHECKS +#endif // NETDATA_TRACE_RWLOCKS #endif //NETDATA_LOCKS_H diff --git a/libnetdata/onewayalloc/Makefile.am b/libnetdata/onewayalloc/Makefile.am new file mode 100644 index 000000000..161784b8f --- /dev/null +++ b/libnetdata/onewayalloc/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/libnetdata/onewayalloc/README.md b/libnetdata/onewayalloc/README.md new file mode 100644 index 000000000..1f459c263 --- /dev/null +++ b/libnetdata/onewayalloc/README.md @@ -0,0 +1,71 @@ +<!-- +title: "One Way Allocator" +custom_edit_url: https://github.com/netdata/netdata/edit/master/libnetdata/onewayallocator/README.md +--> + +# One Way Allocator + +This is a very fast single-threaded-only memory allocator, that minimized system calls +when a lot of memory allocations needs to be made to perform a task, which all of them +can be freed together when the task finishes. + +It has been designed to be used for netdata context queries. + +For netdata to perform a context query, it builds a virtual chart, a chart that contains +all the dimensions of the charts having the same context. This process requires allocating +several structures for each of the dimensions to attach them to the virtual chart. All +these data can be freed immediately after the query finishes. + +## How it works + +1. The caller calls `ONEWAYALLOC *owa = onewayalloc_create(sizehint)` to create an OWA. + Internally this allocates the first memory buffer with size >= `sizehint`. + If `sizehint` is zero, it will allocate 1 hardware page (usually 4kb). + No need to check for success or failure. As with `mallocz()` in netdata, a `fatal()` + will be called if the allocation fails - although this will never fail, since Linux + does not really check if there is memory available for `mmap()` calls. + +2. The caller can then perform any number of the following calls to acquire memory: + - `onewayalloc_mallocz(owa, size)`, similar to `mallocz()` + - `onewayalloc_callocz(owa, nmemb, size)`, similar to `callocz()` + - `onewayalloc_strdupz(owa, string)`, similar to `strdupz()` + - `onewayalloc_memdupz(owa, ptr, size)`, similar to `mallocz()` and then `memcpy()` + +3. Once the caller has done all the work with the allocated buffers, all memory allocated + can be freed with `onewayalloc_destroy(owa)`. + +## How faster it is? + +On modern hardware, for any single query the performance improvement is marginal and not +noticeable at all. + +We performed the following tests using the same huge context query (1000 charts, +100 dimensions each = 100k dimensions) + +1. using `mallocz()`, 1 caller, 256 queries (sequential) +2. using `mallocz()`, 256 callers, 1 query each (parallel) +3. using `OWA`, 1 caller, 256 queries (sequential) +4. using `OWA`, 256 callers, 1 query each (parallel) + +Netdata was configured to use 24 web threads on the 24 core server we used. + +The results are as follows: + +### sequential test + +branch|transactions|time to complete|transaction rate|average response time|min response time|max response time +:---:|:---:|:---:|:---:|:---:|:---:|:---:| +`malloc()`|256|322.35s|0.79/sec|1.26s|1.01s|1.87s +`OWA`|256|310.19s|0.83/sec|1.21s|1.04s|1.63s + +For a single query, the improvement is just marginal and not noticeable at all. + +### parallel test + +branch|transactions|time to complete|transaction rate|average response time|min response time|max response time +:---:|:---:|:---:|:---:|:---:|:---:|:---:| +`malloc()`|256|84.72s|3.02/sec|68.43s|50.20s|84.71s +`OWA`|256|39.35s|6.51/sec|34.48s|20.55s|39.34s + +For parallel workload, like the one executed by netdata.cloud, `OWA` provides a 54% overall speed improvement (more than double the overall +user-experienced speed, including the data query itself). diff --git a/libnetdata/onewayalloc/onewayalloc.c b/libnetdata/onewayalloc/onewayalloc.c new file mode 100644 index 000000000..a048aebf6 --- /dev/null +++ b/libnetdata/onewayalloc/onewayalloc.c @@ -0,0 +1,178 @@ +#include "onewayalloc.h" + +static size_t OWA_NATURAL_PAGE_SIZE = 0; +static size_t OWA_NATURAL_ALIGNMENT = sizeof(int*); + +typedef struct owa_page { + size_t stats_pages; + size_t stats_pages_size; + size_t stats_mallocs_made; + size_t stats_mallocs_size; + size_t size; // the total size of the page + size_t offset; // the first free byte of the page + struct owa_page *next; // the next page on the list + struct owa_page *last; // the last page on the list - we currently allocate on this +} OWA_PAGE; + +// allocations need to be aligned to CPU register width +// https://en.wikipedia.org/wiki/Data_structure_alignment +static inline size_t natural_alignment(size_t size) { + if(unlikely(size % OWA_NATURAL_ALIGNMENT)) + size = size + OWA_NATURAL_ALIGNMENT - (size % OWA_NATURAL_ALIGNMENT); + + return size; +} + +// Create an OWA +// Once it is created, the called may call the onewayalloc_mallocz() +// any number of times, for any amount of memory. + +static OWA_PAGE *onewayalloc_create_internal(OWA_PAGE *head, size_t size_hint) { + if(unlikely(!OWA_NATURAL_PAGE_SIZE)) + OWA_NATURAL_PAGE_SIZE = sysconf(_SC_PAGE_SIZE); + + // our default page size + size_t size = OWA_NATURAL_PAGE_SIZE; + + // make sure the new page will fit both the requested size + // and the OWA_PAGE structure at its beginning + size_hint += sizeof(OWA_PAGE); + + // prefer the user size if it is bigger than our size + if(size_hint > size) size = size_hint; + + // try to allocate half of the total we have allocated already + if(likely(head)) { + size_t optimal_size = head->stats_pages_size / 2; + if(optimal_size > size) size = optimal_size; + } + + // Make sure our allocations are always a multiple of the hardware page size + if(size % OWA_NATURAL_PAGE_SIZE) size = size + OWA_NATURAL_PAGE_SIZE - (size % OWA_NATURAL_PAGE_SIZE); + + // OWA_PAGE *page = (OWA_PAGE *)netdata_mmap(NULL, size, MAP_ANONYMOUS|MAP_PRIVATE, 0); + // if(unlikely(!page)) fatal("Cannot allocate onewayalloc buffer of size %zu", size); + OWA_PAGE *page = (OWA_PAGE *)mallocz(size); + + page->size = size; + page->offset = natural_alignment(sizeof(OWA_PAGE)); + page->next = page->last = NULL; + + if(unlikely(!head)) { + // this is the first time we are called + head = page; + head->stats_pages = 0; + head->stats_pages_size = 0; + head->stats_mallocs_made = 0; + head->stats_mallocs_size = 0; + } + else { + // link this page into our existing linked list + head->last->next = page; + } + + head->last = page; + head->stats_pages++; + head->stats_pages_size += size; + + return (ONEWAYALLOC *)page; +} + +ONEWAYALLOC *onewayalloc_create(size_t size_hint) { + return onewayalloc_create_internal(NULL, size_hint); +} + +void *onewayalloc_mallocz(ONEWAYALLOC *owa, size_t size) { + OWA_PAGE *head = (OWA_PAGE *)owa; + OWA_PAGE *page = head->last; + + // update stats + head->stats_mallocs_made++; + head->stats_mallocs_size += size; + + // make sure the size is aligned + size = natural_alignment(size); + + if(unlikely(page->size - page->offset < size)) { + // we don't have enough space to fit the data + // let's get another page + page = onewayalloc_create_internal(head, (size > page->size)?size:page->size); + } + + char *mem = (char *)page; + mem = &mem[page->offset]; + page->offset += size; + + return (void *)mem; +} + +void *onewayalloc_callocz(ONEWAYALLOC *owa, size_t nmemb, size_t size) { + size_t total = nmemb * size; + void *mem = onewayalloc_mallocz(owa, total); + memset(mem, 0, total); + return mem; +} + +char *onewayalloc_strdupz(ONEWAYALLOC *owa, const char *s) { + size_t size = strlen(s) + 1; + char *d = onewayalloc_mallocz((OWA_PAGE *)owa, size); + memcpy(d, s, size); + return d; +} + +void *onewayalloc_memdupz(ONEWAYALLOC *owa, const void *src, size_t size) { + void *mem = onewayalloc_mallocz((OWA_PAGE *)owa, size); + // memcpy() is way faster than strcpy() since it does not check for '\0' + memcpy(mem, src, size); + return mem; +} + +void onewayalloc_freez(ONEWAYALLOC *owa __maybe_unused, const void *ptr __maybe_unused) { +#ifdef NETDATA_INTERNAL_CHECKS + // allow the caller to call us for a mallocz() allocation + // so try to find it in our memory and if it is not there + // log an error + + if (unlikely(!ptr)) + return; + + OWA_PAGE *head = (OWA_PAGE *)owa; + OWA_PAGE *page; + size_t seeking = (size_t)ptr; + + for(page = head; page ;page = page->next) { + size_t start = (size_t)page; + size_t end = start + page->size; + + if(seeking >= start && seeking <= end) { + // found it - it is ours + // just return to let the caller think we actually did something + return; + } + } + + // not found - it is not ours + // let's free it with the system allocator + error("ONEWAYALLOC: request to free address 0x%p that is not allocated by this OWA", ptr); +#endif + + return; +} + +void onewayalloc_destroy(ONEWAYALLOC *owa) { + if(!owa) return; + + OWA_PAGE *head = (OWA_PAGE *)owa; + + //info("OWA: %zu allocations of %zu total bytes, in %zu pages of %zu total bytes", + // head->stats_mallocs_made, head->stats_mallocs_size, + // head->stats_pages, head->stats_pages_size); + + OWA_PAGE *page = head; + while(page) { + OWA_PAGE *p = page; + page = page->next; + // munmap(p, p->size); + freez(p); + } +} diff --git a/libnetdata/onewayalloc/onewayalloc.h b/libnetdata/onewayalloc/onewayalloc.h new file mode 100644 index 000000000..ed8f12f39 --- /dev/null +++ b/libnetdata/onewayalloc/onewayalloc.h @@ -0,0 +1,17 @@ +#ifndef ONEWAYALLOC_H +#define ONEWAYALLOC_H 1 + +#include "../libnetdata.h" + +typedef void ONEWAYALLOC; + +extern ONEWAYALLOC *onewayalloc_create(size_t size_hint); +extern void onewayalloc_destroy(ONEWAYALLOC *owa); + +extern void *onewayalloc_mallocz(ONEWAYALLOC *owa, size_t size); +extern void *onewayalloc_callocz(ONEWAYALLOC *owa, size_t nmemb, size_t size); +extern char *onewayalloc_strdupz(ONEWAYALLOC *owa, const char *s); +extern void *onewayalloc_memdupz(ONEWAYALLOC *owa, const void *src, size_t size); +extern void onewayalloc_freez(ONEWAYALLOC *owa, const void *ptr); + +#endif // ONEWAYALLOC_H diff --git a/libnetdata/popen/popen.c b/libnetdata/popen/popen.c index 33f4bd950..eaeffd32d 100644 --- a/libnetdata/popen/popen.c +++ b/libnetdata/popen/popen.c @@ -78,28 +78,34 @@ static void myp_del(pid_t pid) { #define PIPE_READ 0 #define PIPE_WRITE 1 -/* custom_popene flag definitions */ -#define FLAG_CREATE_PIPE 1 // Create a pipe like popen() when set, otherwise set stdout to /dev/null -#define FLAG_CLOSE_FD 2 // Close all file descriptors other than STDIN_FILENO, STDOUT_FILENO, STDERR_FILENO +static inline void convert_argv_to_string(char *dst, size_t size, const char *spawn_argv[]) { + int i; + for(i = 0; spawn_argv[i] ;i++) { + if(i == 0) snprintfz(dst, size, "%s", spawn_argv[i]); + else { + size_t len = strlen(dst); + snprintfz(&dst[len], size - len, " '%s'", spawn_argv[i]); + } + } +} /* - * Returns -1 on failure, 0 on success. When FLAG_CREATE_PIPE is set, on success set the FILE *fp pointer. + * Returns -1 on failure, 0 on success. When POPEN_FLAG_CREATE_PIPE is set, on success set the FILE *fp pointer. */ -static inline int custom_popene(const char *command, volatile pid_t *pidptr, char **env, uint8_t flags, FILE **fpp) { +static int custom_popene(volatile pid_t *pidptr, char **env, uint8_t flags, FILE **fpp, const char *command, const char *spawn_argv[]) { + // create a string to be logged about the command we are running + char command_to_be_logged[2048]; + convert_argv_to_string(command_to_be_logged, sizeof(command_to_be_logged), spawn_argv); + // info("custom_popene() running command: %s", command_to_be_logged); + FILE *fp = NULL; int ret = 0; // success by default int pipefd[2], error; pid_t pid; - char *const spawn_argv[] = { - "sh", - "-c", - (char *)command, - NULL - }; posix_spawnattr_t attr; posix_spawn_file_actions_t fa; - if (flags & FLAG_CREATE_PIPE) { + if (flags & POPEN_FLAG_CREATE_PIPE) { if (pipe(pipefd) == -1) return -1; if ((fp = fdopen(pipefd[PIPE_READ], "r")) == NULL) { @@ -107,7 +113,7 @@ static inline int custom_popene(const char *command, volatile pid_t *pidptr, cha } } - if (flags & FLAG_CLOSE_FD) { + if (flags & POPEN_FLAG_CLOSE_FD) { // Mark all files to be closed by the exec() stage of posix_spawn() int i; for (i = (int) (sysconf(_SC_OPEN_MAX) - 1); i >= 0; i--) { @@ -117,7 +123,7 @@ static inline int custom_popene(const char *command, volatile pid_t *pidptr, cha } if (!posix_spawn_file_actions_init(&fa)) { - if (flags & FLAG_CREATE_PIPE) { + if (flags & POPEN_FLAG_CREATE_PIPE) { // move the pipe to stdout in the child if (posix_spawn_file_actions_adddup2(&fa, pipefd[PIPE_WRITE], STDOUT_FILENO)) { error("posix_spawn_file_actions_adddup2() failed"); @@ -150,22 +156,22 @@ static inline int custom_popene(const char *command, volatile pid_t *pidptr, cha // Take the lock while we fork to ensure we don't race with SIGCHLD // delivery on a process which exits quickly. myp_add_lock(); - if (!posix_spawn(&pid, "/bin/sh", &fa, &attr, spawn_argv, env)) { + if (!posix_spawn(&pid, command, &fa, &attr, (char * const*)spawn_argv, env)) { *pidptr = pid; myp_add_locked(pid); - debug(D_CHILDS, "Spawned command: '%s' on pid %d from parent pid %d.", command, pid, getpid()); + debug(D_CHILDS, "Spawned command: \"%s\" on pid %d from parent pid %d.", command_to_be_logged, pid, getpid()); } else { myp_add_unlock(); - error("Failed to spawn command: '%s' from parent pid %d.", command, getpid()); - if (flags & FLAG_CREATE_PIPE) { + error("Failed to spawn command: \"%s\" from parent pid %d.", command_to_be_logged, getpid()); + if (flags & POPEN_FLAG_CREATE_PIPE) { fclose(fp); } ret = -1; } - if (flags & FLAG_CREATE_PIPE) { + if (flags & POPEN_FLAG_CREATE_PIPE) { close(pipefd[PIPE_WRITE]); if (0 == ret) // on success set FILE * pointer - *fpp = fp; + if(fpp) *fpp = fp; } if (!error) { @@ -181,8 +187,9 @@ static inline int custom_popene(const char *command, volatile pid_t *pidptr, cha error_after_posix_spawn_file_actions_init: if (posix_spawn_file_actions_destroy(&fa)) error("posix_spawn_file_actions_destroy"); + error_after_pipe: - if (flags & FLAG_CREATE_PIPE) { + if (flags & POPEN_FLAG_CREATE_PIPE) { if (fp) fclose(fp); else @@ -193,6 +200,41 @@ error_after_pipe: return -1; } +int custom_popene_variadic_internal_dont_use_directly(volatile pid_t *pidptr, char **env, uint8_t flags, FILE **fpp, const char *command, ...) { + // convert the variable list arguments into what posix_spawn() needs + // all arguments are expected strings + va_list args; + int args_count; + + // count the number variable parameters + // the variable parameters are expected NULL terminated + { + const char *s; + + va_start(args, command); + args_count = 0; + while ((s = va_arg(args, const char *))) args_count++; + va_end(args); + } + + // create a string pointer array as needed by posix_spawn() + // variable array in the stack + const char *spawn_argv[args_count + 1]; + { + const char *s; + va_start(args, command); + int i; + for (i = 0; i < args_count; i++) { + s = va_arg(args, const char *); + spawn_argv[i] = s; + } + spawn_argv[args_count] = NULL; + va_end(args); + } + + return custom_popene(pidptr, env, flags, fpp, command, spawn_argv); +} + // See man environ extern char **environ; @@ -252,19 +294,37 @@ int myp_reap(pid_t pid) { FILE *mypopen(const char *command, volatile pid_t *pidptr) { FILE *fp = NULL; - (void)custom_popene(command, pidptr, environ, FLAG_CREATE_PIPE | FLAG_CLOSE_FD, &fp); + const char *spawn_argv[] = { + "sh", + "-c", + command, + NULL + }; + (void)custom_popene(pidptr, environ, POPEN_FLAG_CREATE_PIPE|POPEN_FLAG_CLOSE_FD, &fp, "/bin/sh", spawn_argv); return fp; } FILE *mypopene(const char *command, volatile pid_t *pidptr, char **env) { FILE *fp = NULL; - (void)custom_popene(command, pidptr, env, FLAG_CREATE_PIPE | FLAG_CLOSE_FD, &fp); + const char *spawn_argv[] = { + "sh", + "-c", + command, + NULL + }; + (void)custom_popene( pidptr, env, POPEN_FLAG_CREATE_PIPE|POPEN_FLAG_CLOSE_FD, &fp, "/bin/sh", spawn_argv); return fp; } // returns 0 on success, -1 on failure int netdata_spawn(const char *command, volatile pid_t *pidptr) { - return custom_popene(command, pidptr, environ, 0, NULL); + const char *spawn_argv[] = { + "sh", + "-c", + command, + NULL + }; + return custom_popene( pidptr, environ, POPEN_FLAG_NONE, NULL, "/bin/sh", spawn_argv); } int custom_pclose(FILE *fp, pid_t pid) { @@ -340,4 +400,4 @@ int mypclose(FILE *fp, pid_t pid) int netdata_spawn_waitpid(pid_t pid) { return custom_pclose(NULL, pid); -}
\ No newline at end of file +} diff --git a/libnetdata/popen/popen.h b/libnetdata/popen/popen.h index f387cff0a..57eb9131e 100644 --- a/libnetdata/popen/popen.h +++ b/libnetdata/popen/popen.h @@ -8,6 +8,23 @@ #define PIPE_READ 0 #define PIPE_WRITE 1 +/* custom_popene_variadic_internal_dont_use_directly flag definitions */ +#define POPEN_FLAG_NONE 0 +#define POPEN_FLAG_CREATE_PIPE 1 // Create a pipe like popen() when set, otherwise set stdout to /dev/null +#define POPEN_FLAG_CLOSE_FD 2 // Close all file descriptors other than STDIN_FILENO, STDOUT_FILENO, STDERR_FILENO + +// the flags to be used by default +#define POPEN_FLAGS_DEFAULT (POPEN_FLAG_CREATE_PIPE|POPEN_FLAG_CLOSE_FD) + +// mypopen_raw is the interface to use instead of custom_popene_variadic_internal_dont_use_directly() +// mypopen_raw will add the terminating NULL at the arguments list +// we append the parameter 'command' twice - this is because the underlying call needs the command to execute and the argv[0] to pass to it +#define mypopen_raw_default_flags_and_environment(pidptr, fpp, command, args...) custom_popene_variadic_internal_dont_use_directly(pidptr, environ, POPEN_FLAGS_DEFAULT, fpp, command, command, ##args, NULL) +#define mypopen_raw_default_flags(pidptr, env, fpp, command, args...) custom_popene_variadic_internal_dont_use_directly(pidptr, env, POPEN_FLAGS_DEFAULT, fpp, command, command, ##args, NULL) +#define mypopen_raw(pidptr, env, flags, fpp, command, args...) custom_popene_variadic_internal_dont_use_directly(pidptr, env, flags, fpp, command, command, ##args, NULL) + +extern int custom_popene_variadic_internal_dont_use_directly(volatile pid_t *pidptr, char **env, uint8_t flags, FILE **fpp, const char *command, ...); + extern FILE *mypopen(const char *command, volatile pid_t *pidptr); extern FILE *mypopene(const char *command, volatile pid_t *pidptr, char **env); extern int mypclose(FILE *fp, pid_t pid); diff --git a/libnetdata/procfile/procfile.c b/libnetdata/procfile/procfile.c index ce412f4b0..19964da17 100644 --- a/libnetdata/procfile/procfile.c +++ b/libnetdata/procfile/procfile.c @@ -4,9 +4,9 @@ #define PF_PREFIX "PROCFILE" -#define PFWORDS_INCREASE_STEP 200 -#define PFLINES_INCREASE_STEP 10 -#define PROCFILE_INCREMENT_BUFFER 512 +#define PFWORDS_INCREASE_STEP 2000 +#define PFLINES_INCREASE_STEP 200 +#define PROCFILE_INCREMENT_BUFFER 4096 int procfile_open_flags = O_RDONLY; @@ -48,9 +48,12 @@ static inline void pfwords_add(procfile *ff, char *str) { pfwords *fw = ff->words; if(unlikely(fw->len == fw->size)) { // debug(D_PROCFILE, PF_PREFIX ": expanding words"); + size_t minimum = PFWORDS_INCREASE_STEP; + size_t optimal = fw->size / 2; + size_t wanted = (optimal > minimum)?optimal:minimum; - ff->words = fw = reallocz(fw, sizeof(pfwords) + (fw->size + PFWORDS_INCREASE_STEP) * sizeof(char *)); - fw->size += PFWORDS_INCREASE_STEP; + ff->words = fw = reallocz(fw, sizeof(pfwords) + (fw->size + wanted) * sizeof(char *)); + fw->size += wanted; } fw->words[fw->len++] = str; @@ -90,9 +93,12 @@ static inline size_t *pflines_add(procfile *ff) { pflines *fl = ff->lines; if(unlikely(fl->len == fl->size)) { // debug(D_PROCFILE, PF_PREFIX ": expanding lines"); + size_t minimum = PFLINES_INCREASE_STEP; + size_t optimal = fl->size / 2; + size_t wanted = (optimal > minimum)?optimal:minimum; - ff->lines = fl = reallocz(fl, sizeof(pflines) + (fl->size + PFLINES_INCREASE_STEP) * sizeof(ffline)); - fl->size += PFLINES_INCREASE_STEP; + ff->lines = fl = reallocz(fl, sizeof(pflines) + (fl->size + wanted) * sizeof(ffline)); + fl->size += wanted; } ffline *ffl = &fl->lines[fl->len++]; @@ -272,9 +278,13 @@ procfile *procfile_readall(procfile *ff) { ssize_t x = ff->size - s; if(unlikely(!x)) { - debug(D_PROCFILE, PF_PREFIX ": Expanding data buffer for file '%s'.", procfile_filename(ff)); - ff = reallocz(ff, sizeof(procfile) + ff->size + PROCFILE_INCREMENT_BUFFER); - ff->size += PROCFILE_INCREMENT_BUFFER; + size_t minimum = PROCFILE_INCREMENT_BUFFER; + size_t optimal = ff->size / 2; + size_t wanted = (optimal > minimum)?optimal:minimum; + + debug(D_PROCFILE, PF_PREFIX ": Expanding data buffer for file '%s' by %zu bytes.", procfile_filename(ff), wanted); + ff = reallocz(ff, sizeof(procfile) + ff->size + wanted); + ff->size += wanted; } debug(D_PROCFILE, "Reading file '%s', from position %zd with length %zd", procfile_filename(ff), s, (ssize_t)(ff->size - s)); diff --git a/libnetdata/socket/socket.c b/libnetdata/socket/socket.c index 73eb8e662..df6d3148b 100644 --- a/libnetdata/socket/socket.c +++ b/libnetdata/socket/socket.c @@ -1386,175 +1386,142 @@ static void poll_events_cleanup(void *data) { freez(p->inf); } -static void poll_events_process(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, short int revents, time_t now) { - short int events = pf->events; - int fd = pf->fd; - pf->revents = 0; - size_t i = pi->slot; +static int poll_process_error(POLLINFO *pi, struct pollfd *pf, short int revents) { + error("POLLFD: LISTENER: received %s %s %s on socket at slot %zu (fd %d) client '%s' port '%s' expecting %s %s %s, having %s %s %s" + , revents & POLLERR ? "POLLERR" : "" + , revents & POLLHUP ? "POLLHUP" : "" + , revents & POLLNVAL ? "POLLNVAL" : "" + , pi->slot + , pi->fd + , pi->client_ip ? pi->client_ip : "<undefined-ip>" + , pi->client_port ? pi->client_port : "<undefined-port>" + , pf->events & POLLIN ? "POLLIN" : "", pf->events & POLLOUT ? "POLLOUT" : "", pf->events & POLLPRI ? "POLLPRI" : "" + , revents & POLLIN ? "POLLIN" : "", revents & POLLOUT ? "POLLOUT" : "", revents & POLLPRI ? "POLLPRI" : "" + ); - if(unlikely(fd == -1)) { - debug(D_POLLFD, "POLLFD: LISTENER: ignoring slot %zu, it does not have an fd", i); - return; - } + pf->events = 0; + poll_close_fd(pi); + return 1; +} - debug(D_POLLFD, "POLLFD: LISTENER: processing events for slot %zu (events = %d, revents = %d)", i, events, revents); +static inline int poll_process_send(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, time_t now) { + pi->last_sent_t = now; + pi->send_count++; - if(revents & POLLIN || revents & POLLPRI) { - // receiving data + debug(D_POLLFD, "POLLFD: LISTENER: sending data to socket on slot %zu (fd %d)", pi->slot, pf->fd); - pi->last_received_t = now; - pi->recv_count++; + pf->events = 0; - if(likely(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET)) { - // read data from client TCP socket - debug(D_POLLFD, "POLLFD: LISTENER: reading data from TCP client slot %zu (fd %d)", i, fd); + // remember the slot, in case we need to close it later + // the callback may manipulate the socket list and our pf and pi pointers may be invalid after that call + size_t slot = pi->slot; - pf->events = 0; - if (pi->rcv_callback(pi, &pf->events) == -1) { - poll_close_fd(&p->inf[i]); - return; - } - pf = &p->fds[i]; - pi = &p->inf[i]; - -#ifdef NETDATA_INTERNAL_CHECKS - // this is common - it is used for web server file copies - if(unlikely(!(pf->events & (POLLIN|POLLOUT)))) { - error("POLLFD: LISTENER: after reading, client slot %zu (fd %d) from %s port %s was left without expecting input or output. ", i, fd, pi->client_ip?pi->client_ip:"<undefined-ip>", pi->client_port?pi->client_port:"<undefined-port>"); - //poll_close_fd(pi); - //return; - } -#endif - } - else if(likely(pi->flags & POLLINFO_FLAG_SERVER_SOCKET)) { - // new connection - // debug(D_POLLFD, "POLLFD: LISTENER: accepting connections from slot %zu (fd %d)", i, fd); - - switch(pi->socktype) { - case SOCK_STREAM: { - // a TCP socket - // we accept the connection - - int nfd; - do { - char client_ip[INET6_ADDRSTRLEN]; - char client_port[NI_MAXSERV]; - char client_host[NI_MAXHOST]; - client_host[0] = 0; - client_ip[0] = 0; - client_port[0] = 0; - - debug(D_POLLFD, "POLLFD: LISTENER: calling accept4() slot %zu (fd %d)", i, fd); - nfd = accept_socket(fd, SOCK_NONBLOCK, client_ip, INET6_ADDRSTRLEN, client_port, NI_MAXSERV, - client_host, NI_MAXHOST, p->access_list, p->allow_dns); - if (unlikely(nfd < 0)) { - // accept failed - - debug(D_POLLFD, "POLLFD: LISTENER: accept4() slot %zu (fd %d) failed.", i, fd); - - if(unlikely(errno == EMFILE)) { - error("POLLFD: LISTENER: too many open files - sleeping for 1ms - used by this thread %zu, max for this thread %zu", p->used, p->limit); - usleep(1000); // 10ms - } - else if(unlikely(errno != EWOULDBLOCK && errno != EAGAIN)) - error("POLLFD: LISTENER: accept() failed."); - - break; - } - else { - // accept ok - // info("POLLFD: LISTENER: client '[%s]:%s' connected to '%s' on fd %d", client_ip, client_port, sockets->fds_names[i], nfd); - poll_add_fd(p - , nfd - , SOCK_STREAM - , pi->port_acl - , POLLINFO_FLAG_CLIENT_SOCKET - , client_ip - , client_port - , client_host - , p->add_callback - , p->del_callback - , p->rcv_callback - , p->snd_callback - , NULL - ); + if (unlikely(pi->snd_callback(pi, &pf->events) == -1)) + poll_close_fd(&p->inf[slot]); - // it may have reallocated them, so refresh our pointers - pf = &p->fds[i]; - pi = &p->inf[i]; - } - } while (nfd >= 0 && (!p->limit || p->used < p->limit)); - break; - } + // IMPORTANT: + // pf and pi may be invalid below this point, they may have been reallocated. - case SOCK_DGRAM: { - // a UDP socket - // we read data from the server socket + return 1; +} - debug(D_POLLFD, "POLLFD: LISTENER: reading data from UDP slot %zu (fd %d)", i, fd); +static inline int poll_process_tcp_read(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, time_t now) { + pi->last_received_t = now; + pi->recv_count++; - // TODO: access_list is not applied to UDP - // but checking the access list on every UDP packet will destroy - // performance, especially for statsd. + debug(D_POLLFD, "POLLFD: LISTENER: reading data from TCP client slot %zu (fd %d)", pi->slot, pf->fd); - pf->events = 0; - pi->rcv_callback(pi, &pf->events); - break; - } + pf->events = 0; - default: { - error("POLLFD: LISTENER: Unknown socktype %d on slot %zu", pi->socktype, pi->slot); - break; - } - } - } - } + // remember the slot, in case we need to close it later + // the callback may manipulate the socket list and our pf and pi pointers may be invalid after that call + size_t slot = pi->slot; - if(unlikely(revents & POLLOUT)) { - // sending data - debug(D_POLLFD, "POLLFD: LISTENER: sending data to socket on slot %zu (fd %d)", i, fd); + if (pi->rcv_callback(pi, &pf->events) == -1) + poll_close_fd(&p->inf[slot]); - pi->last_sent_t = now; - pi->send_count++; + // IMPORTANT: + // pf and pi may be invalid below this point, they may have been reallocated. - pf->events = 0; - if (pi->snd_callback(pi, &pf->events) == -1) { - poll_close_fd(&p->inf[i]); - return; - } - pf = &p->fds[i]; - pi = &p->inf[i]; - -#ifdef NETDATA_INTERNAL_CHECKS - // this is common - it is used for streaming - if(unlikely(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET && !(pf->events & (POLLIN|POLLOUT)))) { - error("POLLFD: LISTENER: after sending, client slot %zu (fd %d) from %s port %s was left without expecting input or output. ", i, fd, pi->client_ip?pi->client_ip:"<undefined-ip>", pi->client_port?pi->client_port:"<undefined-port>"); - //poll_close_fd(pi); - //return; + return 1; +} + +static inline int poll_process_udp_read(POLLINFO *pi, struct pollfd *pf, time_t now __maybe_unused) { + pi->last_received_t = now; + pi->recv_count++; + + debug(D_POLLFD, "POLLFD: LISTENER: reading data from UDP slot %zu (fd %d)", pi->slot, pf->fd); + + // TODO: access_list is not applied to UDP + // but checking the access list on every UDP packet will destroy + // performance, especially for statsd. + + pf->events = 0; + if(pi->rcv_callback(pi, &pf->events) == -1) + return 0; + + // IMPORTANT: + // pf and pi may be invalid below this point, they may have been reallocated. + + return 1; +} + +static int poll_process_new_tcp_connection(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, time_t now) { + pi->last_received_t = now; + pi->recv_count++; + + debug(D_POLLFD, "POLLFD: LISTENER: accepting connections from slot %zu (fd %d)", pi->slot, pf->fd); + + char client_ip[INET6_ADDRSTRLEN] = ""; + char client_port[NI_MAXSERV] = ""; + char client_host[NI_MAXHOST] = ""; + + debug(D_POLLFD, "POLLFD: LISTENER: calling accept4() slot %zu (fd %d)", pi->slot, pf->fd); + + int nfd = accept_socket( + pf->fd,SOCK_NONBLOCK, + client_ip, INET6_ADDRSTRLEN, client_port,NI_MAXSERV, client_host, NI_MAXHOST, + p->access_list, p->allow_dns + ); + + if (unlikely(nfd < 0)) { + // accept failed + + debug(D_POLLFD, "POLLFD: LISTENER: accept4() slot %zu (fd %d) failed.", pi->slot, pf->fd); + + if(unlikely(errno == EMFILE)) { + error("POLLFD: LISTENER: too many open files - sleeping for 1ms - used by this thread %zu, max for this thread %zu", p->used, p->limit); + usleep(1000); // 1ms } -#endif - } + else if(unlikely(errno != EWOULDBLOCK && errno != EAGAIN)) + error("POLLFD: LISTENER: accept() failed."); - if(unlikely(revents & POLLERR)) { - error("POLLFD: LISTENER: processing POLLERR events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd); - pf->events = 0; - poll_close_fd(pi); - return; } + else { + // accept ok + + poll_add_fd(p + , nfd + , SOCK_STREAM + , pi->port_acl + , POLLINFO_FLAG_CLIENT_SOCKET + , client_ip + , client_port + , client_host + , p->add_callback + , p->del_callback + , p->rcv_callback + , p->snd_callback + , NULL + ); - if(unlikely(revents & POLLHUP)) { - error("POLLFD: LISTENER: processing POLLHUP events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd); - pf->events = 0; - poll_close_fd(pi); - return; - } + // IMPORTANT: + // pf and pi may be invalid below this point, they may have been reallocated. - if(unlikely(revents & POLLNVAL)) { - error("POLLFD: LISTENER: processing POLLNVAL events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd); - pf->events = 0; - poll_close_fd(pi); - return; + return 1; } + + return 0; } void poll_events(LISTEN_SOCKETS *sockets @@ -1687,18 +1654,129 @@ void poll_events(LISTEN_SOCKETS *sockets debug(D_POLLFD, "POLLFD: LISTENER: poll() timeout."); } else { + POLLINFO *pi; + struct pollfd *pf; + size_t idx, processed = 0; + short int revents; + + // keep fast lookup arrays per function + // to avoid looping through the entire list every time + size_t sends[p.max + 1], sends_max = 0; + size_t reads[p.max + 1], reads_max = 0; + size_t conns[p.max + 1], conns_max = 0; + size_t udprd[p.max + 1], udprd_max = 0; + for (i = 0; i <= p.max; i++) { - struct pollfd *pf = &p.fds[i]; - short int revents = pf->revents; - if (unlikely(revents)) - poll_events_process(&p, &p.inf[i], pf, revents, now); + pi = &p.inf[i]; + pf = &p.fds[i]; + revents = pf->revents; + + if(unlikely(revents == 0 || pf->fd == -1)) + continue; + + if (unlikely(revents & (POLLERR|POLLHUP|POLLNVAL))) { + // something is wrong to one of our sockets + + pf->revents = 0; + processed += poll_process_error(pi, pf, revents); + } + else if (likely(revents & POLLOUT)) { + // a client is ready to receive data + + sends[sends_max++] = i; + } + else if (likely(revents & (POLLIN|POLLPRI))) { + if (pi->flags & POLLINFO_FLAG_CLIENT_SOCKET) { + // a client sent data to us + + reads[reads_max++] = i; + } + else if (pi->flags & POLLINFO_FLAG_SERVER_SOCKET) { + // something is coming to our server sockets + + if(pi->socktype == SOCK_DGRAM) { + // UDP receive, directly on our listening socket + + udprd[udprd_max++] = i; + } + else if(pi->socktype == SOCK_STREAM) { + // new TCP connection + + conns[conns_max++] = i; + } + else + error("POLLFD: LISTENER: server slot %zu (fd %d) connection from %s port %s using unhandled socket type %d." + , i + , pi->fd + , pi->client_ip ? pi->client_ip : "<undefined-ip>" + , pi->client_port ? pi->client_port : "<undefined-port>" + , pi->socktype + ); + } + else + error("POLLFD: LISTENER: client slot %zu (fd %d) data from %s port %s using flags %08X is neither client nor server." + , i + , pi->fd + , pi->client_ip ? pi->client_ip : "<undefined-ip>" + , pi->client_port ? pi->client_port : "<undefined-port>" + , pi->flags + ); + } + else + error("POLLFD: LISTENER: socket slot %zu (fd %d) client %s port %s unhandled event id %d." + , i + , pi->fd + , pi->client_ip ? pi->client_ip : "<undefined-ip>" + , pi->client_port ? pi->client_port : "<undefined-port>" + , revents + ); + } + + // process sends + for (idx = 0; idx < sends_max; idx++) { + i = sends[idx]; + pi = &p.inf[i]; + pf = &p.fds[i]; + pf->revents = 0; + processed += poll_process_send(&p, pi, pf, now); + } + + // process UDP reads + for (idx = 0; idx < udprd_max; idx++) { + i = udprd[idx]; + pi = &p.inf[i]; + pf = &p.fds[i]; + pf->revents = 0; + processed += poll_process_udp_read(pi, pf, now); + } + + // process TCP reads + for (idx = 0; idx < reads_max; idx++) { + i = reads[idx]; + pi = &p.inf[i]; + pf = &p.fds[i]; + pf->revents = 0; + processed += poll_process_tcp_read(&p, pi, pf, now); + } + + if(!processed && (!p.limit || p.used < p.limit)) { + // nothing processed above (rcv, snd) and we have room for another TCP connection + // so, accept one TCP connection + for (idx = 0; idx < conns_max; idx++) { + i = conns[idx]; + pi = &p.inf[i]; + pf = &p.fds[i]; + pf->revents = 0; + if (poll_process_new_tcp_connection(&p, pi, pf, now)) + break; + } } } if(unlikely(p.checks_every > 0 && now - last_check > p.checks_every)) { last_check = now; - // security checks + // cleanup old sockets for(i = 0; i <= p.max; i++) { POLLINFO *pi = &p.inf[i]; diff --git a/libnetdata/storage_number/storage_number.c b/libnetdata/storage_number/storage_number.c index 3e6a9f45c..ba7a66874 100644 --- a/libnetdata/storage_number/storage_number.c +++ b/libnetdata/storage_number/storage_number.c @@ -91,48 +91,21 @@ RET_SN: return r; } -calculated_number unpack_storage_number(storage_number value) { - if(!value) return 0; - - int sign = 0, exp = 0; - int factor = 10; - - // bit 32 = 0:positive, 1:negative - if(unlikely(value & (1 << 31))) - sign = 1; - - // bit 31 = 0:divide, 1:multiply - if(unlikely(value & (1 << 30))) - exp = 1; - - // bit 27 SN_EXISTS_100 - if(unlikely(value & (1 << 26))) - factor = 100; - - // bit 26 SN_EXISTS_RESET - // bit 25 SN_ANOMALY_BIT - - // bit 30, 29, 28 = (multiplier or divider) 0-7 (8 total) - int mul = (value & ((1<<29)|(1<<28)|(1<<27))) >> 27; - - // bit 24 to bit 1 = the value, so remove all other bits - value ^= value & ((1<<31)|(1<<30)|(1<<29)|(1<<28)|(1<<27)|(1<<26)|(1<<25)|(1<<24)); - - calculated_number n = value; - - // fprintf(stderr, "UNPACK: %08X, sign = %d, exp = %d, mul = %d, factor = %d, n = " CALCULATED_NUMBER_FORMAT "\n", value, sign, exp, mul, factor, n); - - if(exp) { - for(; mul; mul--) - n *= factor; +// Lookup table to make storage number unpacking efficient. +calculated_number unpack_storage_number_lut10x[4 * 8]; + +__attribute__((constructor)) void initialize_lut(void) { + // The lookup table is partitioned in 4 subtables based on the + // values of the factor and exp bits. + for (int i = 0; i < 8; i++) { + // factor = 0 + unpack_storage_number_lut10x[0 * 8 + i] = 1 / pow(10, i); // exp = 0 + unpack_storage_number_lut10x[1 * 8 + i] = pow(10, i); // exp = 1 + + // factor = 1 + unpack_storage_number_lut10x[2 * 8 + i] = 1 / pow(100, i); // exp = 0 + unpack_storage_number_lut10x[3 * 8 + i] = pow(100, i); // exp = 1 } - else { - for( ; mul ; mul--) - n /= 10; - } - - if(sign) n = -n; - return n; } /* diff --git a/libnetdata/storage_number/storage_number.h b/libnetdata/storage_number/storage_number.h index 4101f69e0..7e7b511b0 100644 --- a/libnetdata/storage_number/storage_number.h +++ b/libnetdata/storage_number/storage_number.h @@ -80,7 +80,7 @@ typedef uint32_t storage_number; #define did_storage_number_reset(value) ((((storage_number) (value)) & SN_EXISTS_RESET) != 0) storage_number pack_storage_number(calculated_number value, uint32_t flags); -calculated_number unpack_storage_number(storage_number value); +static inline calculated_number unpack_storage_number(storage_number value) __attribute__((const)); int print_calculated_number(char *str, calculated_number value); @@ -98,4 +98,41 @@ int print_calculated_number(char *str, calculated_number value); // period of at least every other 10 samples. #define MAX_INCREMENTAL_PERCENT_RATE 10 + +static inline calculated_number unpack_storage_number(storage_number value) { + extern calculated_number unpack_storage_number_lut10x[4 * 8]; + + if(!value) return 0; + + int sign = 1, exp = 0; + int factor = 0; + + // bit 32 = 0:positive, 1:negative + if(unlikely(value & (1 << 31))) + sign = -1; + + // bit 31 = 0:divide, 1:multiply + if(unlikely(value & (1 << 30))) + exp = 1; + + // bit 27 SN_EXISTS_100 + if(unlikely(value & (1 << 26))) + factor = 1; + + // bit 26 SN_EXISTS_RESET + // bit 25 SN_ANOMALY_BIT + + // bit 30, 29, 28 = (multiplier or divider) 0-7 (8 total) + int mul = (value & ((1<<29)|(1<<28)|(1<<27))) >> 27; + + // bit 24 to bit 1 = the value, so remove all other bits + value ^= value & ((1<<31)|(1<<30)|(1<<29)|(1<<28)|(1<<27)|(1<<26)|(1<<25)|(1<<24)); + + calculated_number n = value; + + // fprintf(stderr, "UNPACK: %08X, sign = %d, exp = %d, mul = %d, factor = %d, n = " CALCULATED_NUMBER_FORMAT "\n", value, sign, exp, mul, factor, n); + + return sign * unpack_storage_number_lut10x[(factor * 16) + (exp * 8) + mul] * n; +} + #endif /* NETDATA_STORAGE_NUMBER_H */ diff --git a/libnetdata/worker_utilization/Makefile.am b/libnetdata/worker_utilization/Makefile.am new file mode 100644 index 000000000..161784b8f --- /dev/null +++ b/libnetdata/worker_utilization/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/libnetdata/worker_utilization/README.md b/libnetdata/worker_utilization/README.md new file mode 100644 index 000000000..35f30b40b --- /dev/null +++ b/libnetdata/worker_utilization/README.md @@ -0,0 +1,90 @@ +<!-- +title: "Worker Utilization" +custom_edit_url: https://github.com/netdata/netdata/edit/master/libnetdata/onewayallocator/README.md +--> + +# Worker Utilization + +This library is to be used when there are 1 or more worker threads accepting requests +of some kind and servicing them. The goal is to provide a very simple way to monitor +worker threads utilization, as a percentage of the time they are busy and the amount +of requests served. + +## Design goals + +1. Minimal, if any, impact on the performance of the workers +2. Easy to be integrated into any kind of worker +3. No state of any kind at the worker side + +## How to use + +When a working thread starts, call: + +```c +void worker_register(const char *name); +``` + +This will create the necessary structures for the library to work. +No need to keep a pointer to them. They are allocated as `__thread` variables. + +Then job types need to be defined. Job types are anything a worker does that can be +counted and their execution time needs to be reported. The library is fast enough to +be integrated even on workers that perform hundreds of thousands of actions per second. + +Job types are defined like this: + +```c +void worker_register_job_type(size_t id, const char *name); +``` + +`id` is a number starting from zero. The library is compiled with a fixed size of 50 +ids (0 to 49). More can be allocated by setting `WORKER_UTILIZATION_MAX_JOB_TYPES` in +`worker_utilization.h`. `name` can be any string up to 22 characters. This can be +changed by setting `WORKER_UTILIZATION_MAX_JOB_NAME_LENGTH` in `worker_utilization.h`. + +Each thread that calls `worker_register(name)` will allocate about 3kB for maintaining +the information required. + +When the thread stops, call: + +```c +void worker_unregister(void); +``` + +Again, no parameters, or return values. + +> IMPORTANT: cancellable threads need to add a call to `worker_unregister()` to the +> `pop` function that cleans up the thread. Failure to do so, will result in about +> 3kB of memory leak for every thread that is stopped. + +When you are about to do some work in the working thread, call: + +```c +void worker_is_busy(size_t id); +``` + +When you finish doing the job, call: + +```c +void worker_is_idle(void); +``` + +Calls to `worker_is_busy(id)` can be made one after another (without calling +`worker_is_idle()` between them) to switch jobs without losing any time between +them and eliminating one of the 2 clock calls involved. + +## Implementation details + +Totally lockless, extremely fast, it should not introduce any kind of problems to the +workers. Every time `worker_is_busy(id)` or `worker_is_idle()` are called, a call to +`now_realtime_usec()` is done and a couple of variables are updated. That's it! + +The worker does not need to update the variables regularly. Based on the last status +of the worker, the statistics collector of netdata will calculate if the thread is +busy or idle all the time or part of the time. Works well for both thousands of jobs +per second and unlimited working time (being totally busy with a single request for +ages). + +The statistics collector is called by the global statistics thread of netdata. So, +even if the workers are extremely busy with their jobs, netdata will be able to know +how busy they are. diff --git a/libnetdata/worker_utilization/worker_utilization.c b/libnetdata/worker_utilization/worker_utilization.c new file mode 100644 index 000000000..bd3ad60e0 --- /dev/null +++ b/libnetdata/worker_utilization/worker_utilization.c @@ -0,0 +1,210 @@ +#include "worker_utilization.h" + +#define WORKER_IDLE 'I' +#define WORKER_BUSY 'B' + +struct worker_job_type { + char name[WORKER_UTILIZATION_MAX_JOB_NAME_LENGTH + 1]; + + // statistics controlled variables + size_t statistics_last_jobs_started; + usec_t statistics_last_busy_time; + + // worker controlled variables + volatile size_t worker_jobs_started; + volatile usec_t worker_busy_time; +}; + +struct worker { + pid_t pid; + const char *tag; + const char *workname; + uint32_t workname_hash; + + // statistics controlled variables + volatile usec_t statistics_last_checkpoint; + size_t statistics_last_jobs_started; + usec_t statistics_last_busy_time; + + // the worker controlled variables + volatile size_t job_id; + volatile size_t jobs_started; + volatile usec_t busy_time; + volatile usec_t last_action_timestamp; + volatile char last_action; + + struct worker_job_type per_job_type[WORKER_UTILIZATION_MAX_JOB_TYPES]; + + struct worker *next; +}; + +static netdata_mutex_t base_lock = NETDATA_MUTEX_INITIALIZER; +static struct worker *base = NULL; +static __thread struct worker *worker = NULL; + +void worker_register(const char *workname) { + if(unlikely(worker)) return; + + worker = callocz(1, sizeof(struct worker)); + worker->pid = gettid(); + worker->tag = strdupz(netdata_thread_tag()); + worker->workname = strdupz(workname); + worker->workname_hash = simple_hash(worker->workname); + + usec_t now = now_realtime_usec(); + worker->statistics_last_checkpoint = now; + worker->last_action_timestamp = now; + worker->last_action = WORKER_IDLE; + + netdata_mutex_lock(&base_lock); + worker->next = base; + base = worker; + netdata_mutex_unlock(&base_lock); +} + +void worker_register_job_name(size_t job_id, const char *name) { + if(unlikely(!worker)) return; + + if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES)) { + error("WORKER_UTILIZATION: job_id %zu is too big. Max is %zu", job_id, (size_t)(WORKER_UTILIZATION_MAX_JOB_TYPES - 1)); + return; + } + if (*worker->per_job_type[job_id].name) { + error("WORKER_UTILIZATION: duplicate job registration: worker '%s' job id %zu is '%s', ignoring '%s'", worker->workname, job_id, worker->per_job_type[job_id].name, name); + return; + } + + strncpy(worker->per_job_type[job_id].name, name, WORKER_UTILIZATION_MAX_JOB_NAME_LENGTH); +} + +void worker_unregister(void) { + if(unlikely(!worker)) return; + + netdata_mutex_lock(&base_lock); + if(base == worker) + base = worker->next; + else { + struct worker *p; + for(p = base; p && p->next && p->next != worker ;p = p->next); + if(p && p->next == worker) + p->next = worker->next; + } + netdata_mutex_unlock(&base_lock); + + freez((void *)worker->tag); + freez((void *)worker->workname); + freez(worker); + + worker = NULL; +} + +static inline void worker_is_idle_with_time(usec_t now) { + usec_t delta = now - worker->last_action_timestamp; + worker->busy_time += delta; + worker->per_job_type[worker->job_id].worker_busy_time += delta; + + // the worker was busy + // set it to idle before we set the timestamp + + worker->last_action = WORKER_IDLE; + if(likely(worker->last_action_timestamp < now)) + worker->last_action_timestamp = now; +} + +void worker_is_idle(void) { + if(unlikely(!worker)) return; + if(unlikely(worker->last_action != WORKER_BUSY)) return; + + worker_is_idle_with_time(now_realtime_usec()); +} + +void worker_is_busy(size_t job_id) { + if(unlikely(!worker)) return; + if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES)) + job_id = 0; + + usec_t now = now_realtime_usec(); + + if(worker->last_action == WORKER_BUSY) + worker_is_idle_with_time(now); + + // the worker was idle + // set the timestamp and then set it to busy + + worker->job_id = job_id; + worker->per_job_type[job_id].worker_jobs_started++; + worker->jobs_started++; + worker->last_action_timestamp = now; + worker->last_action = WORKER_BUSY; +} + + +// statistics interface + +void workers_foreach(const char *workname, void (*callback)(void *data, pid_t pid, const char *thread_tag, size_t utilization_usec, size_t duration_usec, size_t jobs_started, size_t is_running, const char **job_types_names, size_t *job_types_jobs_started, usec_t *job_types_busy_time), void *data) { + netdata_mutex_lock(&base_lock); + uint32_t hash = simple_hash(workname); + usec_t busy_time, delta; + size_t i, jobs_started, jobs_running; + + struct worker *p; + for(p = base; p ; p = p->next) { + if(hash != p->workname_hash || strcmp(workname, p->workname)) continue; + + usec_t now = now_realtime_usec(); + + // find per job type statistics + const char *per_job_type_name[WORKER_UTILIZATION_MAX_JOB_TYPES]; + size_t per_job_type_jobs_started[WORKER_UTILIZATION_MAX_JOB_TYPES]; + usec_t per_job_type_busy_time[WORKER_UTILIZATION_MAX_JOB_TYPES]; + for(i = 0; i < WORKER_UTILIZATION_MAX_JOB_TYPES ;i++) { + per_job_type_name[i] = p->per_job_type[i].name; + + size_t tmp_jobs_started = p->per_job_type[i].worker_jobs_started; + per_job_type_jobs_started[i] = tmp_jobs_started - p->per_job_type[i].statistics_last_jobs_started; + p->per_job_type[i].statistics_last_jobs_started = tmp_jobs_started; + + usec_t tmp_busy_time = p->per_job_type[i].worker_busy_time; + per_job_type_busy_time[i] = tmp_busy_time - p->per_job_type[i].statistics_last_busy_time; + p->per_job_type[i].statistics_last_busy_time = tmp_busy_time; + } + + // get a copy of the worker variables + size_t worker_job_id = p->job_id; + usec_t worker_busy_time = p->busy_time; + size_t worker_jobs_started = p->jobs_started; + char worker_last_action = p->last_action; + usec_t worker_last_action_timestamp = p->last_action_timestamp; + + delta = now - p->statistics_last_checkpoint; + p->statistics_last_checkpoint = now; + + // this is the only variable both the worker thread and the statistics thread are writing + // we set this only when the worker is busy, so that the worker will not + // accumulate all the busy time, but only the time after the point we collected statistics + if(worker_last_action == WORKER_BUSY && p->last_action_timestamp == worker_last_action_timestamp && p->last_action == WORKER_BUSY) + p->last_action_timestamp = now; + + // calculate delta busy time + busy_time = worker_busy_time - p->statistics_last_busy_time; + p->statistics_last_busy_time = worker_busy_time; + + // calculate delta jobs done + jobs_started = worker_jobs_started - p->statistics_last_jobs_started; + p->statistics_last_jobs_started = worker_jobs_started; + + jobs_running = 0; + if(worker_last_action == WORKER_BUSY) { + // the worker is still busy with something + // let's add that busy time to the reported one + usec_t dt = now - worker_last_action_timestamp; + busy_time += dt; + per_job_type_busy_time[worker_job_id] += dt; + jobs_running = 1; + } + + callback(data, p->pid, p->tag, busy_time, delta, jobs_started, jobs_running, per_job_type_name, per_job_type_jobs_started, per_job_type_busy_time); + } + + netdata_mutex_unlock(&base_lock); +} diff --git a/libnetdata/worker_utilization/worker_utilization.h b/libnetdata/worker_utilization/worker_utilization.h new file mode 100644 index 000000000..8f16fe054 --- /dev/null +++ b/libnetdata/worker_utilization/worker_utilization.h @@ -0,0 +1,22 @@ +#ifndef WORKER_UTILIZATION_H +#define WORKER_UTILIZATION_H 1 + +#include "../libnetdata.h" + +// workers interfaces + +#define WORKER_UTILIZATION_MAX_JOB_TYPES 50 +#define WORKER_UTILIZATION_MAX_JOB_NAME_LENGTH 25 + +extern void worker_register(const char *workname); +extern void worker_register_job_name(size_t job_id, const char *name); +extern void worker_unregister(void); + +extern void worker_is_idle(void); +extern void worker_is_busy(size_t job_id); + +// statistics interface + +extern void workers_foreach(const char *workname, void (*callback)(void *data, pid_t pid, const char *thread_tag, size_t utilization_usec, size_t duration_usec, size_t jobs_started, size_t is_running, const char **job_types_names, size_t *job_types_jobs_started, usec_t *job_types_busy_time), void *data); + +#endif // WORKER_UTILIZATION_H |