diff options
Diffstat (limited to 'src/libnetdata/worker_utilization')
-rw-r--r-- | src/libnetdata/worker_utilization/README.md | 94 | ||||
-rw-r--r-- | src/libnetdata/worker_utilization/worker_utilization.c | 398 | ||||
-rw-r--r-- | src/libnetdata/worker_utilization/worker_utilization.h | 49 |
3 files changed, 541 insertions, 0 deletions
diff --git a/src/libnetdata/worker_utilization/README.md b/src/libnetdata/worker_utilization/README.md new file mode 100644 index 000000000..1a354376c --- /dev/null +++ b/src/libnetdata/worker_utilization/README.md @@ -0,0 +1,94 @@ +<!-- +title: "Worker Utilization" +custom_edit_url: https://github.com/netdata/netdata/edit/master/src/libnetdata/worker_utilization/README.md +sidebar_label: "Worker Utilization" +learn_status: "Published" +learn_topic_type: "References" +learn_rel_path: "Developers/libnetdata" +--> + +# Worker Utilization + +This library is to be used when there are 1 or more worker threads accepting requests +of some kind and servicing them. The goal is to provide a very simple way to monitor +worker threads utilization, as a percentage of the time they are busy and the amount +of requests served. + +## Design goals + +1. Minimal, if any, impact on the performance of the workers +2. Easy to be integrated into any kind of worker +3. No state of any kind at the worker side + +## How to use + +When a working thread starts, call: + +```c +void worker_register(const char *name); +``` + +This will create the necessary structures for the library to work. +No need to keep a pointer to them. They are allocated as `__thread` variables. + +Then job types need to be defined. Job types are anything a worker does that can be +counted and their execution time needs to be reported. The library is fast enough to +be integrated even on workers that perform hundreds of thousands of actions per second. + +Job types are defined like this: + +```c +void worker_register_job_type(size_t id, const char *name); +``` + +`id` is a number starting from zero. The library is compiled with a fixed size of 50 +ids (0 to 49). More can be allocated by setting `WORKER_UTILIZATION_MAX_JOB_TYPES` in +`worker_utilization.h`. `name` can be any string up to 22 characters. This can be +changed by setting `WORKER_UTILIZATION_MAX_JOB_NAME_LENGTH` in `worker_utilization.h`. + +Each thread that calls `worker_register(name)` will allocate about 3kB for maintaining +the information required. + +When the thread stops, call: + +```c +void worker_unregister(void); +``` + +Again, no parameters, or return values. + +> IMPORTANT: cancellable threads need to add a call to `worker_unregister()` to the +> `pop` function that cleans up the thread. Failure to do so, will result in about +> 3kB of memory leak for every thread that is stopped. + +When you are about to do some work in the working thread, call: + +```c +void worker_is_busy(size_t id); +``` + +When you finish doing the job, call: + +```c +void worker_is_idle(void); +``` + +Calls to `worker_is_busy(id)` can be made one after another (without calling +`worker_is_idle()` between them) to switch jobs without losing any time between +them and eliminating one of the 2 clock calls involved. + +## Implementation details + +Totally lockless, extremely fast, it should not introduce any kind of problems to the +workers. Every time `worker_is_busy(id)` or `worker_is_idle()` are called, a call to +`now_realtime_usec()` is done and a couple of variables are updated. That's it! + +The worker does not need to update the variables regularly. Based on the last status +of the worker, the statistics collector of netdata will calculate if the thread is +busy or idle all the time or part of the time. Works well for both thousands of jobs +per second and unlimited working time (being totally busy with a single request for +ages). + +The statistics collector is called by the global statistics thread of netdata. So, +even if the workers are extremely busy with their jobs, netdata will be able to know +how busy they are. diff --git a/src/libnetdata/worker_utilization/worker_utilization.c b/src/libnetdata/worker_utilization/worker_utilization.c new file mode 100644 index 000000000..4c61ea921 --- /dev/null +++ b/src/libnetdata/worker_utilization/worker_utilization.c @@ -0,0 +1,398 @@ +#include "worker_utilization.h" + +#define WORKER_IDLE 'I' +#define WORKER_BUSY 'B' + +struct worker_job_type { + STRING *name; + STRING *units; + + // statistics controlled variables + size_t statistics_last_jobs_started; + usec_t statistics_last_busy_time; + NETDATA_DOUBLE statistics_last_custom_value; + + // worker controlled variables + volatile size_t worker_jobs_started; + volatile usec_t worker_busy_time; + + WORKER_METRIC_TYPE type; + NETDATA_DOUBLE custom_value; +}; + +struct worker { + pid_t pid; + const char *tag; + const char *workname; + + // statistics controlled variables + volatile usec_t statistics_last_checkpoint; + size_t statistics_last_jobs_started; + usec_t statistics_last_busy_time; + + // the worker controlled variables + size_t worker_max_job_id; + volatile size_t job_id; + volatile size_t jobs_started; + volatile usec_t busy_time; + volatile usec_t last_action_timestamp; + volatile char last_action; + + struct worker_job_type per_job_type[WORKER_UTILIZATION_MAX_JOB_TYPES]; + + struct worker *next; + struct worker *prev; +}; + +struct workers_workname { // this is what we add to JudyHS + SPINLOCK spinlock; + struct worker *base; +}; + +static struct workers_globals { + bool enabled; + + SPINLOCK spinlock; + Pvoid_t worknames_JudyHS; + size_t memory; + +} workers_globals = { // workers globals, the base of all worknames + .enabled = false, + .spinlock = NETDATA_SPINLOCK_INITIALIZER, // a lock for the worknames index + .worknames_JudyHS = NULL, // the worknames index +}; + +static __thread struct worker *worker = NULL; // the current thread worker + +static inline usec_t worker_now_monotonic_usec(void) { +#ifdef NETDATA_WITHOUT_WORKERS_LATENCY + return 0; +#else + return now_monotonic_usec(); +#endif +} + +void workers_utilization_enable(void) { + workers_globals.enabled = true; +} + +size_t workers_allocated_memory(void) { + if(!workers_globals.enabled) + return 0; + + spinlock_lock(&workers_globals.spinlock); + size_t memory = workers_globals.memory; + spinlock_unlock(&workers_globals.spinlock); + + return memory; +} + +void worker_register(const char *name) { + if(unlikely(worker || !workers_globals.enabled)) + return; + + worker = callocz(1, sizeof(struct worker)); + worker->pid = gettid_cached(); + worker->tag = strdupz(nd_thread_tag()); + worker->workname = strdupz(name); + + usec_t now = worker_now_monotonic_usec(); + worker->statistics_last_checkpoint = now; + worker->last_action_timestamp = now; + worker->last_action = WORKER_IDLE; + + size_t name_size = strlen(name) + 1; + spinlock_lock(&workers_globals.spinlock); + + workers_globals.memory += sizeof(struct worker) + strlen(worker->tag) + 1 + strlen(worker->workname) + 1; + + Pvoid_t *PValue = JudyHSIns(&workers_globals.worknames_JudyHS, (void *)name, name_size, PJE0); + + struct workers_workname *workname = *PValue; + if(!workname) { + workname = mallocz(sizeof(struct workers_workname)); + spinlock_init(&workname->spinlock); + workname->base = NULL; + *PValue = workname; + + workers_globals.memory += sizeof(struct workers_workname) + JUDYHS_INDEX_SIZE_ESTIMATE(name_size); + } + + spinlock_lock(&workname->spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(workname->base, worker, prev, next); + spinlock_unlock(&workname->spinlock); + + spinlock_unlock(&workers_globals.spinlock); +} + +void worker_register_job_custom_metric(size_t job_id, const char *name, const char *units, WORKER_METRIC_TYPE type) { + if(unlikely(!worker)) return; + + if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES)) { + netdata_log_error("WORKER_UTILIZATION: job_id %zu is too big. Max is %zu", job_id, (size_t)(WORKER_UTILIZATION_MAX_JOB_TYPES - 1)); + return; + } + + if(job_id > worker->worker_max_job_id) + worker->worker_max_job_id = job_id; + + if(worker->per_job_type[job_id].name) { + if(strcmp(string2str(worker->per_job_type[job_id].name), name) != 0 || worker->per_job_type[job_id].type != type || strcmp(string2str(worker->per_job_type[job_id].units), units) != 0) + netdata_log_error("WORKER_UTILIZATION: duplicate job registration: worker '%s' job id %zu is '%s', ignoring the later '%s'", worker->workname, job_id, string2str(worker->per_job_type[job_id].name), name); + return; + } + + worker->per_job_type[job_id].name = string_strdupz(name); + worker->per_job_type[job_id].units = string_strdupz(units); + worker->per_job_type[job_id].type = type; +} + +void worker_register_job_name(size_t job_id, const char *name) { + worker_register_job_custom_metric(job_id, name, "", WORKER_METRIC_IDLE_BUSY); +} + +void worker_unregister(void) { + if(unlikely(!worker)) return; + + size_t workname_size = strlen(worker->workname) + 1; + spinlock_lock(&workers_globals.spinlock); + Pvoid_t *PValue = JudyHSGet(workers_globals.worknames_JudyHS, (void *)worker->workname, workname_size); + if(PValue) { + struct workers_workname *workname = *PValue; + spinlock_lock(&workname->spinlock); + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(workname->base, worker, prev, next); + spinlock_unlock(&workname->spinlock); + + if(!workname->base) { + JudyHSDel(&workers_globals.worknames_JudyHS, (void *) worker->workname, workname_size, PJE0); + freez(workname); + workers_globals.memory -= sizeof(struct workers_workname) + JUDYHS_INDEX_SIZE_ESTIMATE(workname_size); + } + } + workers_globals.memory -= sizeof(struct worker) + strlen(worker->tag) + 1 + strlen(worker->workname) + 1; + spinlock_unlock(&workers_globals.spinlock); + + for(int i = 0; i < WORKER_UTILIZATION_MAX_JOB_TYPES ;i++) { + string_freez(worker->per_job_type[i].name); + string_freez(worker->per_job_type[i].units); + } + + freez((void *)worker->tag); + freez((void *)worker->workname); + freez(worker); + + worker = NULL; +} + +static inline void worker_is_idle_with_time(usec_t now) { + usec_t delta = now - worker->last_action_timestamp; + worker->busy_time += delta; + worker->per_job_type[worker->job_id].worker_busy_time += delta; + + // the worker was busy + // set it to idle before we set the timestamp + + worker->last_action = WORKER_IDLE; + if(likely(worker->last_action_timestamp < now)) + worker->last_action_timestamp = now; +} + +void worker_is_idle(void) { + if(unlikely(!worker || worker->last_action != WORKER_BUSY)) return; + + worker_is_idle_with_time(worker_now_monotonic_usec()); +} + +void worker_is_busy(size_t job_id) { + if(unlikely(!worker || job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES)) + return; + + usec_t now = worker_now_monotonic_usec(); + + if(worker->last_action == WORKER_BUSY) + worker_is_idle_with_time(now); + + // the worker was idle + // set the timestamp and then set it to busy + + worker->job_id = job_id; + worker->per_job_type[job_id].worker_jobs_started++; + worker->jobs_started++; + worker->last_action_timestamp = now; + worker->last_action = WORKER_BUSY; +} + +void worker_set_metric(size_t job_id, NETDATA_DOUBLE value) { + if(unlikely(!worker)) return; + + if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES)) + return; + + switch(worker->per_job_type[job_id].type) { + case WORKER_METRIC_INCREMENT: + worker->per_job_type[job_id].custom_value += value; + break; + + case WORKER_METRIC_INCREMENTAL_TOTAL: + case WORKER_METRIC_ABSOLUTE: + default: + worker->per_job_type[job_id].custom_value = value; + break; + } +} + +// statistics interface + +void workers_foreach(const char *name, void (*callback)( + void *data + , pid_t pid + , const char *thread_tag + , size_t max_job_id + , size_t utilization_usec + , size_t duration_usec + , size_t jobs_started, size_t is_running + , STRING **job_types_names + , STRING **job_types_units + , WORKER_METRIC_TYPE *job_metric_types + , size_t *job_types_jobs_started + , usec_t *job_types_busy_time + , NETDATA_DOUBLE *job_custom_values + ) + , void *data) { + if(!workers_globals.enabled) + return; + + spinlock_lock(&workers_globals.spinlock); + usec_t busy_time, delta; + size_t i, jobs_started, jobs_running; + + size_t workname_size = strlen(name) + 1; + struct workers_workname *workname; + Pvoid_t *PValue = JudyHSGet(workers_globals.worknames_JudyHS, (void *)name, workname_size); + if(PValue) { + workname = *PValue; + spinlock_lock(&workname->spinlock); + } + else + workname = NULL; + + spinlock_unlock(&workers_globals.spinlock); + + if(!workname) + return; + + struct worker *p; + DOUBLE_LINKED_LIST_FOREACH_FORWARD(workname->base, p, prev, next) { + usec_t now = worker_now_monotonic_usec(); + + // find per job type statistics + STRING *per_job_type_name[WORKER_UTILIZATION_MAX_JOB_TYPES]; + STRING *per_job_type_units[WORKER_UTILIZATION_MAX_JOB_TYPES]; + WORKER_METRIC_TYPE per_job_metric_type[WORKER_UTILIZATION_MAX_JOB_TYPES]; + size_t per_job_type_jobs_started[WORKER_UTILIZATION_MAX_JOB_TYPES]; + usec_t per_job_type_busy_time[WORKER_UTILIZATION_MAX_JOB_TYPES]; + NETDATA_DOUBLE per_job_custom_values[WORKER_UTILIZATION_MAX_JOB_TYPES]; + + size_t max_job_id = p->worker_max_job_id; + for(i = 0; i <= max_job_id ;i++) { + per_job_type_name[i] = p->per_job_type[i].name; + per_job_type_units[i] = p->per_job_type[i].units; + per_job_metric_type[i] = p->per_job_type[i].type; + + switch(p->per_job_type[i].type) { + default: + case WORKER_METRIC_EMPTY: { + per_job_type_jobs_started[i] = 0; + per_job_type_busy_time[i] = 0; + per_job_custom_values[i] = NAN; + break; + } + + case WORKER_METRIC_IDLE_BUSY: { + size_t tmp_jobs_started = p->per_job_type[i].worker_jobs_started; + per_job_type_jobs_started[i] = tmp_jobs_started - p->per_job_type[i].statistics_last_jobs_started; + p->per_job_type[i].statistics_last_jobs_started = tmp_jobs_started; + + usec_t tmp_busy_time = p->per_job_type[i].worker_busy_time; + per_job_type_busy_time[i] = tmp_busy_time - p->per_job_type[i].statistics_last_busy_time; + p->per_job_type[i].statistics_last_busy_time = tmp_busy_time; + + per_job_custom_values[i] = NAN; + break; + } + + case WORKER_METRIC_ABSOLUTE: { + per_job_type_jobs_started[i] = 0; + per_job_type_busy_time[i] = 0; + + per_job_custom_values[i] = p->per_job_type[i].custom_value; + break; + } + + case WORKER_METRIC_INCREMENTAL_TOTAL: + case WORKER_METRIC_INCREMENT: { + per_job_type_jobs_started[i] = 0; + per_job_type_busy_time[i] = 0; + + NETDATA_DOUBLE tmp_custom_value = p->per_job_type[i].custom_value; + per_job_custom_values[i] = tmp_custom_value - p->per_job_type[i].statistics_last_custom_value; + p->per_job_type[i].statistics_last_custom_value = tmp_custom_value; + + break; + } + } + } + + // get a copy of the worker variables + size_t worker_job_id = p->job_id; + usec_t worker_busy_time = p->busy_time; + size_t worker_jobs_started = p->jobs_started; + char worker_last_action = p->last_action; + usec_t worker_last_action_timestamp = p->last_action_timestamp; + + delta = now - p->statistics_last_checkpoint; + p->statistics_last_checkpoint = now; + + // this is the only variable both the worker thread and the statistics thread are writing + // we set this only when the worker is busy, so that the worker will not + // accumulate all the busy time, but only the time after the point we collected statistics + if(worker_last_action == WORKER_BUSY && p->last_action_timestamp == worker_last_action_timestamp && p->last_action == WORKER_BUSY) + p->last_action_timestamp = now; + + // calculate delta busy time + busy_time = worker_busy_time - p->statistics_last_busy_time; + p->statistics_last_busy_time = worker_busy_time; + + // calculate delta jobs done + jobs_started = worker_jobs_started - p->statistics_last_jobs_started; + p->statistics_last_jobs_started = worker_jobs_started; + + jobs_running = 0; + if(worker_last_action == WORKER_BUSY) { + // the worker is still busy with something + // let's add that busy time to the reported one + usec_t dt = now - worker_last_action_timestamp; + busy_time += dt; + per_job_type_busy_time[worker_job_id] += dt; + jobs_running = 1; + } + + callback(data + , p->pid + , p->tag + , max_job_id + , busy_time + , delta + , jobs_started + , jobs_running + , per_job_type_name + , per_job_type_units + , per_job_metric_type + , per_job_type_jobs_started + , per_job_type_busy_time + , per_job_custom_values + ); + } + + spinlock_unlock(&workname->spinlock); +} diff --git a/src/libnetdata/worker_utilization/worker_utilization.h b/src/libnetdata/worker_utilization/worker_utilization.h new file mode 100644 index 000000000..e2f46c5a6 --- /dev/null +++ b/src/libnetdata/worker_utilization/worker_utilization.h @@ -0,0 +1,49 @@ +#ifndef WORKER_UTILIZATION_H +#define WORKER_UTILIZATION_H 1 + +#include "../libnetdata.h" + +// workers interfaces + +#define WORKER_UTILIZATION_MAX_JOB_TYPES 50 + +typedef enum __attribute__((packed)) { + WORKER_METRIC_EMPTY = 0, + WORKER_METRIC_IDLE_BUSY = 1, + WORKER_METRIC_ABSOLUTE = 2, + WORKER_METRIC_INCREMENT = 3, + WORKER_METRIC_INCREMENTAL_TOTAL = 4, +} WORKER_METRIC_TYPE; + +void workers_utilization_enable(void); +size_t workers_allocated_memory(void); +void worker_register(const char *name); +void worker_register_job_name(size_t job_id, const char *name); +void worker_register_job_custom_metric(size_t job_id, const char *name, const char *units, WORKER_METRIC_TYPE type); +void worker_unregister(void); + +void worker_is_idle(void); +void worker_is_busy(size_t job_id); +void worker_set_metric(size_t job_id, NETDATA_DOUBLE value); + +// statistics interface + +void workers_foreach(const char *name, void (*callback)( + void *data + , pid_t pid + , const char *thread_tag + , size_t max_job_id + , size_t utilization_usec + , size_t duration_usec + , size_t jobs_started + , size_t is_running + , STRING **job_types_names + , STRING **job_types_units + , WORKER_METRIC_TYPE *job_metric_types + , size_t *job_types_jobs_started + , usec_t *job_types_busy_time + , NETDATA_DOUBLE *job_custom_values + ) + , void *data); + +#endif // WORKER_UTILIZATION_H |