summaryrefslogtreecommitdiffstats
path: root/daemon/unit_test.c
diff options
context:
space:
mode:
Diffstat (limited to 'daemon/unit_test.c')
-rw-r--r--daemon/unit_test.c312
1 files changed, 279 insertions, 33 deletions
diff --git a/daemon/unit_test.c b/daemon/unit_test.c
index 36ccd9f6..31718eee 100644
--- a/daemon/unit_test.c
+++ b/daemon/unit_test.c
@@ -1688,7 +1688,8 @@ static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS
st[i]->usec_since_last_update = USEC_PER_SEC * update_every;
for (j = 0; j < DIMS; ++j) {
- next = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c;
+ next = ((collected_number)i * DIMS) * REGION_POINTS[current_region] +
+ j * REGION_POINTS[current_region] + c;
rrddim_set_by_pointer_fake_time(rd[i][j], next, time_now);
}
rrdset_done(st[i]);
@@ -1719,13 +1720,14 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI
for (j = 0; j < DIMS; ++j) {
rd[i][j]->state->query_ops.init(rd[i][j], &handle, time_now, time_now + QUERY_BATCH * update_every);
for (k = 0; k < QUERY_BATCH; ++k) {
- last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c + k;
+ last = ((collected_number)i * DIMS) * REGION_POINTS[current_region] +
+ j * REGION_POINTS[current_region] + c + k;
expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS));
n = rd[i][j]->state->query_ops.next_metric(&handle, &time_retrieved);
value = unpack_storage_number(n);
- same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0;
+ same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0;
if(!same) {
fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
@@ -1780,7 +1782,7 @@ static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS]
last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c;
expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS));
- same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0;
+ same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0;
if(!same) {
fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
@@ -1902,7 +1904,7 @@ int test_dbengine(void)
collected_number last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c - point_offset;
calculated_number expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS));
- uint8_t same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0;
+ uint8_t same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0;
if(!same) {
fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
@@ -1932,20 +1934,27 @@ struct dbengine_chart_thread {
uv_thread_t thread;
RRDHOST *host;
char *chartname; /* Will be prefixed by type, e.g. "example_local1.", "example_local2." etc */
- int dset_charts; /* number of charts */
- int dset_dims; /* dimensions per chart */
- int chart_i; /* current chart offset */
+ unsigned dset_charts; /* number of charts */
+ unsigned dset_dims; /* dimensions per chart */
+ unsigned chart_i; /* current chart offset */
time_t time_present; /* current virtual time of the benchmark */
+ volatile time_t time_max; /* latest timestamp of stored values */
unsigned history_seconds; /* how far back in the past to go */
+
+ volatile long done; /* initialize to 0, set to 1 to stop thread */
+ struct completion charts_initialized;
+ unsigned long errors, stored_metrics_nr; /* statistics */
+
+ RRDSET *st;
+ RRDDIM *rd[]; /* dset_dims elements */
};
-collected_number generate_dbengine_chart_value(struct dbengine_chart_thread *thread_info, int dim_i,
- time_t time_current)
+collected_number generate_dbengine_chart_value(int chart_i, int dim_i, time_t time_current)
{
collected_number value;
- value = ((collected_number)time_current) * thread_info->chart_i;
- value += ((collected_number)time_current) * dim_i;
+ value = ((collected_number)time_current) * (chart_i + 1);
+ value += ((collected_number)time_current) * (dim_i + 1);
value %= 1024LLU;
return value;
@@ -1956,44 +1965,47 @@ static void generate_dbengine_chart(void *arg)
struct dbengine_chart_thread *thread_info = (struct dbengine_chart_thread *)arg;
RRDHOST *host = thread_info->host;
char *chartname = thread_info->chartname;
- const int DSET_DIMS = thread_info->dset_dims;
+ const unsigned DSET_DIMS = thread_info->dset_dims;
unsigned history_seconds = thread_info->history_seconds;
time_t time_present = thread_info->time_present;
- int j, update_every = 1;
+ unsigned j, update_every = 1;
RRDSET *st;
RRDDIM *rd[DSET_DIMS];
char name[RRD_ID_LENGTH_MAX + 1];
time_t time_current;
// create the chart
- snprintfz(name, RRD_ID_LENGTH_MAX, "example_local%d", thread_info->chart_i + 1);
- st = rrdset_create(host, name, chartname, chartname, "example", NULL, chartname, chartname, chartname, NULL, 1,
- update_every, RRDSET_TYPE_LINE);
+ snprintfz(name, RRD_ID_LENGTH_MAX, "example_local%u", thread_info->chart_i + 1);
+ thread_info->st = st = rrdset_create(host, name, chartname, chartname, "example", NULL, chartname, chartname,
+ chartname, NULL, 1, update_every, RRDSET_TYPE_LINE);
for (j = 0 ; j < DSET_DIMS ; ++j) {
- snprintfz(name, RRD_ID_LENGTH_MAX, "%s%d", chartname, j);
+ snprintfz(name, RRD_ID_LENGTH_MAX, "%s%u", chartname, j + 1);
- rd[j] = rrddim_add(st, name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ thread_info->rd[j] = rd[j] = rrddim_add(st, name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
}
+ complete(&thread_info->charts_initialized);
// feed it with the test data
time_current = time_present - history_seconds;
for (j = 0 ; j < DSET_DIMS ; ++j) {
rd[j]->last_collected_time.tv_sec =
- st->last_collected_time.tv_sec = st->last_updated.tv_sec = time_current;
+ st->last_collected_time.tv_sec = st->last_updated.tv_sec = time_current - update_every;
rd[j]->last_collected_time.tv_usec =
st->last_collected_time.tv_usec = st->last_updated.tv_usec = 0;
}
- for( ; time_current < time_present ; ++time_current) {
- st->usec_since_last_update = USEC_PER_SEC;
+ for( ; !thread_info->done && time_current < time_present ; time_current += update_every) {
+ st->usec_since_last_update = USEC_PER_SEC * update_every;
for (j = 0; j < DSET_DIMS; ++j) {
collected_number value;
- value = generate_dbengine_chart_value(thread_info, j, time_current);
+ value = generate_dbengine_chart_value(thread_info->chart_i, j, time_current);
rrddim_set_by_pointer_fake_time(rd[j], value, time_current);
+ ++thread_info->stored_metrics_nr;
}
rrdset_done(st);
+ thread_info->time_max = time_current;
}
}
@@ -2003,7 +2015,7 @@ void generate_dbengine_dataset(unsigned history_seconds)
const int DSET_DIMS = 128;
const uint64_t EXPECTED_COMPRESSION_RATIO = 20;
RRDHOST *host = NULL;
- struct dbengine_chart_thread thread_info[DSET_CHARTS];
+ struct dbengine_chart_thread **thread_info;
int i;
time_t time_present;
@@ -2021,25 +2033,259 @@ void generate_dbengine_dataset(unsigned history_seconds)
if (NULL == host)
return;
+ thread_info = mallocz(sizeof(*thread_info) * DSET_CHARTS);
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ thread_info[i] = mallocz(sizeof(*thread_info[i]) + sizeof(RRDDIM *) * DSET_DIMS);
+ }
fprintf(stderr, "\nRunning DB-engine workload generator\n");
time_present = now_realtime_sec();
for (i = 0 ; i < DSET_CHARTS ; ++i) {
- thread_info[i].host = host;
- thread_info[i].chartname = "random";
- thread_info[i].dset_charts = DSET_CHARTS;
- thread_info[i].chart_i = i;
- thread_info[i].dset_dims = DSET_DIMS;
- thread_info[i].history_seconds = history_seconds;
- thread_info[i].time_present = time_present;
- assert(0 == uv_thread_create(&thread_info[i].thread, generate_dbengine_chart, &thread_info[i]));
+ thread_info[i]->host = host;
+ thread_info[i]->chartname = "random";
+ thread_info[i]->dset_charts = DSET_CHARTS;
+ thread_info[i]->chart_i = i;
+ thread_info[i]->dset_dims = DSET_DIMS;
+ thread_info[i]->history_seconds = history_seconds;
+ thread_info[i]->time_present = time_present;
+ thread_info[i]->time_max = 0;
+ thread_info[i]->done = 0;
+ init_completion(&thread_info[i]->charts_initialized);
+ assert(0 == uv_thread_create(&thread_info[i]->thread, generate_dbengine_chart, thread_info[i]));
+ wait_for_completion(&thread_info[i]->charts_initialized);
+ destroy_completion(&thread_info[i]->charts_initialized);
}
for (i = 0 ; i < DSET_CHARTS ; ++i) {
- assert(0 == uv_thread_join(&thread_info[i].thread));
+ assert(0 == uv_thread_join(&thread_info[i]->thread));
}
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ freez(thread_info[i]);
+ }
+ freez(thread_info);
rrd_wrlock();
rrdhost_free(host);
rrd_unlock();
}
+
+struct dbengine_query_thread {
+ uv_thread_t thread;
+ RRDHOST *host;
+ char *chartname; /* Will be prefixed by type, e.g. "example_local1.", "example_local2." etc */
+ unsigned dset_charts; /* number of charts */
+ unsigned dset_dims; /* dimensions per chart */
+ time_t time_present; /* current virtual time of the benchmark */
+ unsigned history_seconds; /* how far back in the past to go */
+ volatile long done; /* initialize to 0, set to 1 to stop thread */
+ unsigned long errors, queries_nr, queried_metrics_nr; /* statistics */
+
+ struct dbengine_chart_thread *chart_threads[]; /* dset_charts elements */
+};
+
+static void query_dbengine_chart(void *arg)
+{
+ struct dbengine_query_thread *thread_info = (struct dbengine_query_thread *)arg;
+ const int DSET_CHARTS = thread_info->dset_charts;
+ const int DSET_DIMS = thread_info->dset_dims;
+ time_t time_after, time_before, time_min, time_max, duration;
+ int i, j, update_every = 1;
+ RRDSET *st;
+ RRDDIM *rd;
+ uint8_t same;
+ time_t time_now, time_retrieved;
+ collected_number generatedv;
+ calculated_number value, expected;
+ storage_number n;
+ struct rrddim_query_handle handle;
+
+ do {
+ // pick a chart and dimension
+ i = random() % DSET_CHARTS;
+ st = thread_info->chart_threads[i]->st;
+ j = random() % DSET_DIMS;
+ rd = thread_info->chart_threads[i]->rd[j];
+
+ time_min = thread_info->time_present - thread_info->history_seconds + 1;
+ time_max = thread_info->chart_threads[i]->time_max;
+ if (!time_max) {
+ time_before = time_after = time_min;
+ } else {
+ time_after = time_min + random() % (MAX(time_max - time_min, 1));
+ duration = random() % 3600;
+ time_before = MIN(time_after + duration, time_max); /* up to 1 hour queries */
+ }
+
+ rd->state->query_ops.init(rd, &handle, time_after, time_before);
+ ++thread_info->queries_nr;
+ for (time_now = time_after ; time_now <= time_before ; time_now += update_every) {
+ generatedv = generate_dbengine_chart_value(i, j, time_now);
+ expected = unpack_storage_number(pack_storage_number((calculated_number) generatedv, SN_EXISTS));
+
+ if (unlikely(rd->state->query_ops.is_finished(&handle))) {
+ fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
+ CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n",
+ st->name, rd->name, (unsigned long) time_now, expected);
+ ++thread_info->errors;
+ break;
+ }
+ n = rd->state->query_ops.next_metric(&handle, &time_retrieved);
+ if (SN_EMPTY_SLOT == n) {
+ fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
+ CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n",
+ st->name, rd->name, (unsigned long) time_now, expected);
+ ++thread_info->errors;
+ break;
+ }
+ ++thread_info->queried_metrics_nr;
+ value = unpack_storage_number(n);
+
+ same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0;
+ if (!same) {
+ fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
+ CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
+ st->name, rd->name, (unsigned long) time_now, expected, value);
+ ++thread_info->errors;
+ }
+ if (time_retrieved != time_now) {
+ fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n",
+ st->name, rd->name, (unsigned long) time_now, (unsigned long) time_retrieved);
+ ++thread_info->errors;
+ }
+ }
+ rd->state->query_ops.finalize(&handle);
+ } while(!thread_info->done);
+}
+
+void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsigned QUERY_THREADS,
+ unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB)
+{
+ const unsigned DSET_DIMS = 128;
+ const uint64_t EXPECTED_COMPRESSION_RATIO = 20;
+ const unsigned HISTORY_SECONDS = 3600 * 24 * 365; /* 1 year of history */
+ RRDHOST *host = NULL;
+ struct dbengine_chart_thread **chart_threads;
+ struct dbengine_query_thread **query_threads;
+ unsigned i, j;
+ time_t time_start, time_end;
+
+ if (!TEST_DURATION_SEC)
+ TEST_DURATION_SEC = 10;
+ if (!DSET_CHARTS)
+ DSET_CHARTS = 1;
+ if (!QUERY_THREADS)
+ QUERY_THREADS = 1;
+ if (PAGE_CACHE_MB < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
+ PAGE_CACHE_MB = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
+
+ default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
+ default_rrdeng_page_cache_mb = PAGE_CACHE_MB;
+ // Worst case for uncompressible data
+ default_rrdeng_disk_quota_mb = (((uint64_t)DSET_DIMS * DSET_CHARTS) * sizeof(storage_number) * HISTORY_SECONDS) /
+ (1024 * 1024);
+ default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;
+
+ error_log_limit_unlimited();
+ debug(D_RRDHOST, "Initializing localhost with hostname 'dbengine-stress-test'");
+
+ host = dbengine_rrdhost_find_or_create("dbengine-stress-test");
+ if (NULL == host)
+ return;
+
+ chart_threads = mallocz(sizeof(*chart_threads) * DSET_CHARTS);
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ chart_threads[i] = mallocz(sizeof(*chart_threads[i]) + sizeof(RRDDIM *) * DSET_DIMS);
+ }
+ query_threads = mallocz(sizeof(*query_threads) * QUERY_THREADS);
+ for (i = 0 ; i < QUERY_THREADS ; ++i) {
+ query_threads[i] = mallocz(sizeof(*query_threads[i]) + sizeof(struct dbengine_chart_thread *) * DSET_CHARTS);
+ }
+ fprintf(stderr, "\nRunning DB-engine stress test, %u seconds writers ramp-up time,\n"
+ "%u seconds of concurrent readers and writers, %u writer threads, %u reader threads,\n"
+ "%u MiB of page cache.\n",
+ RAMP_UP_SECONDS, TEST_DURATION_SEC, DSET_CHARTS, QUERY_THREADS, PAGE_CACHE_MB);
+
+ time_start = now_realtime_sec();
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ chart_threads[i]->host = host;
+ chart_threads[i]->chartname = "random";
+ chart_threads[i]->dset_charts = DSET_CHARTS;
+ chart_threads[i]->chart_i = i;
+ chart_threads[i]->dset_dims = DSET_DIMS;
+ chart_threads[i]->history_seconds = HISTORY_SECONDS;
+ chart_threads[i]->time_present = time_start;
+ chart_threads[i]->time_max = 0;
+ chart_threads[i]->done = 0;
+ chart_threads[i]->errors = chart_threads[i]->stored_metrics_nr = 0;
+ init_completion(&chart_threads[i]->charts_initialized);
+ assert(0 == uv_thread_create(&chart_threads[i]->thread, generate_dbengine_chart, chart_threads[i]));
+ }
+ /* barrier so that subsequent queries can access valid chart data */
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ wait_for_completion(&chart_threads[i]->charts_initialized);
+ destroy_completion(&chart_threads[i]->charts_initialized);
+ }
+ sleep(RAMP_UP_SECONDS);
+ /* at this point data have already began being written to the database */
+ for (i = 0 ; i < QUERY_THREADS ; ++i) {
+ query_threads[i]->host = host;
+ query_threads[i]->chartname = "random";
+ query_threads[i]->dset_charts = DSET_CHARTS;
+ query_threads[i]->dset_dims = DSET_DIMS;
+ query_threads[i]->history_seconds = HISTORY_SECONDS;
+ query_threads[i]->time_present = time_start;
+ query_threads[i]->done = 0;
+ query_threads[i]->errors = query_threads[i]->queries_nr = query_threads[i]->queried_metrics_nr = 0;
+ for (j = 0 ; j < DSET_CHARTS ; ++j) {
+ query_threads[i]->chart_threads[j] = chart_threads[j];
+ }
+ assert(0 == uv_thread_create(&query_threads[i]->thread, query_dbengine_chart, query_threads[i]));
+ }
+ sleep(TEST_DURATION_SEC);
+ /* stop workload */
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ chart_threads[i]->done = 1;
+ }
+ for (i = 0 ; i < QUERY_THREADS ; ++i) {
+ query_threads[i]->done = 1;
+ }
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ assert(0 == uv_thread_join(&chart_threads[i]->thread));
+ }
+ for (i = 0 ; i < QUERY_THREADS ; ++i) {
+ assert(0 == uv_thread_join(&query_threads[i]->thread));
+ }
+ time_end = now_realtime_sec();
+ fprintf(stderr, "\nDB-engine stress test finished in %ld seconds.\n", time_end - time_start);
+ unsigned long stored_metrics_nr = 0;
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ stored_metrics_nr += chart_threads[i]->stored_metrics_nr;
+ }
+ unsigned long queries_nr = 0, queried_metrics_nr = 0;
+ for (i = 0 ; i < QUERY_THREADS ; ++i) {
+ queries_nr += query_threads[i]->queries_nr;
+ queried_metrics_nr += query_threads[i]->queried_metrics_nr;
+ }
+ fprintf(stderr, "%u metrics were stored (dataset size of %lu MiB) in %u charts by 1 writer thread per chart.\n",
+ DSET_CHARTS * DSET_DIMS, stored_metrics_nr * sizeof(storage_number) / (1024 * 1024), DSET_CHARTS);
+ fprintf(stderr, "Metrics were being generated per 1 emulated second and time was accelerated.\n");
+ fprintf(stderr, "%lu metric data points were queried by %u reader threads.\n", queried_metrics_nr, QUERY_THREADS);
+ fprintf(stderr, "Query starting time is randomly chosen from the beginning of the time-series up to the time of\n"
+ "the latest data point, and ending time from 1 second up to 1 hour after the starting time.\n");
+ fprintf(stderr, "Performance is %lu written data points/sec and %lu read data points/sec.\n",
+ stored_metrics_nr / (time_end - time_start), queried_metrics_nr / (time_end - time_start));
+
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ freez(chart_threads[i]);
+ }
+ freez(chart_threads);
+ for (i = 0 ; i < QUERY_THREADS ; ++i) {
+ freez(query_threads[i]);
+ }
+ freez(query_threads);
+ rrdeng_exit(host->rrdeng_ctx);
+ rrd_wrlock();
+ rrdhost_delete_charts(host);
+ rrd_unlock();
+}
+
#endif