diff options
Diffstat (limited to 'daemon/unit_test.c')
-rw-r--r-- | daemon/unit_test.c | 312 |
1 files changed, 279 insertions, 33 deletions
diff --git a/daemon/unit_test.c b/daemon/unit_test.c index 36ccd9f6b..31718eeea 100644 --- a/daemon/unit_test.c +++ b/daemon/unit_test.c @@ -1688,7 +1688,8 @@ static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS st[i]->usec_since_last_update = USEC_PER_SEC * update_every; for (j = 0; j < DIMS; ++j) { - next = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c; + next = ((collected_number)i * DIMS) * REGION_POINTS[current_region] + + j * REGION_POINTS[current_region] + c; rrddim_set_by_pointer_fake_time(rd[i][j], next, time_now); } rrdset_done(st[i]); @@ -1719,13 +1720,14 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI for (j = 0; j < DIMS; ++j) { rd[i][j]->state->query_ops.init(rd[i][j], &handle, time_now, time_now + QUERY_BATCH * update_every); for (k = 0; k < QUERY_BATCH; ++k) { - last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c + k; + last = ((collected_number)i * DIMS) * REGION_POINTS[current_region] + + j * REGION_POINTS[current_region] + c + k; expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS)); n = rd[i][j]->state->query_ops.next_metric(&handle, &time_retrieved); value = unpack_storage_number(n); - same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0; + same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0; if(!same) { fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n", @@ -1780,7 +1782,7 @@ static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS] last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c; expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS)); - same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0; + same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0; if(!same) { fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n", @@ -1902,7 +1904,7 @@ int test_dbengine(void) collected_number last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c - point_offset; calculated_number expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_EXISTS)); - uint8_t same = (calculated_number_round(value * 10000000.0) == calculated_number_round(expected * 10000000.0)) ? 1 : 0; + uint8_t same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0; if(!same) { fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n", @@ -1932,20 +1934,27 @@ struct dbengine_chart_thread { uv_thread_t thread; RRDHOST *host; char *chartname; /* Will be prefixed by type, e.g. "example_local1.", "example_local2." etc */ - int dset_charts; /* number of charts */ - int dset_dims; /* dimensions per chart */ - int chart_i; /* current chart offset */ + unsigned dset_charts; /* number of charts */ + unsigned dset_dims; /* dimensions per chart */ + unsigned chart_i; /* current chart offset */ time_t time_present; /* current virtual time of the benchmark */ + volatile time_t time_max; /* latest timestamp of stored values */ unsigned history_seconds; /* how far back in the past to go */ + + volatile long done; /* initialize to 0, set to 1 to stop thread */ + struct completion charts_initialized; + unsigned long errors, stored_metrics_nr; /* statistics */ + + RRDSET *st; + RRDDIM *rd[]; /* dset_dims elements */ }; -collected_number generate_dbengine_chart_value(struct dbengine_chart_thread *thread_info, int dim_i, - time_t time_current) +collected_number generate_dbengine_chart_value(int chart_i, int dim_i, time_t time_current) { collected_number value; - value = ((collected_number)time_current) * thread_info->chart_i; - value += ((collected_number)time_current) * dim_i; + value = ((collected_number)time_current) * (chart_i + 1); + value += ((collected_number)time_current) * (dim_i + 1); value %= 1024LLU; return value; @@ -1956,44 +1965,47 @@ static void generate_dbengine_chart(void *arg) struct dbengine_chart_thread *thread_info = (struct dbengine_chart_thread *)arg; RRDHOST *host = thread_info->host; char *chartname = thread_info->chartname; - const int DSET_DIMS = thread_info->dset_dims; + const unsigned DSET_DIMS = thread_info->dset_dims; unsigned history_seconds = thread_info->history_seconds; time_t time_present = thread_info->time_present; - int j, update_every = 1; + unsigned j, update_every = 1; RRDSET *st; RRDDIM *rd[DSET_DIMS]; char name[RRD_ID_LENGTH_MAX + 1]; time_t time_current; // create the chart - snprintfz(name, RRD_ID_LENGTH_MAX, "example_local%d", thread_info->chart_i + 1); - st = rrdset_create(host, name, chartname, chartname, "example", NULL, chartname, chartname, chartname, NULL, 1, - update_every, RRDSET_TYPE_LINE); + snprintfz(name, RRD_ID_LENGTH_MAX, "example_local%u", thread_info->chart_i + 1); + thread_info->st = st = rrdset_create(host, name, chartname, chartname, "example", NULL, chartname, chartname, + chartname, NULL, 1, update_every, RRDSET_TYPE_LINE); for (j = 0 ; j < DSET_DIMS ; ++j) { - snprintfz(name, RRD_ID_LENGTH_MAX, "%s%d", chartname, j); + snprintfz(name, RRD_ID_LENGTH_MAX, "%s%u", chartname, j + 1); - rd[j] = rrddim_add(st, name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + thread_info->rd[j] = rd[j] = rrddim_add(st, name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); } + complete(&thread_info->charts_initialized); // feed it with the test data time_current = time_present - history_seconds; for (j = 0 ; j < DSET_DIMS ; ++j) { rd[j]->last_collected_time.tv_sec = - st->last_collected_time.tv_sec = st->last_updated.tv_sec = time_current; + st->last_collected_time.tv_sec = st->last_updated.tv_sec = time_current - update_every; rd[j]->last_collected_time.tv_usec = st->last_collected_time.tv_usec = st->last_updated.tv_usec = 0; } - for( ; time_current < time_present ; ++time_current) { - st->usec_since_last_update = USEC_PER_SEC; + for( ; !thread_info->done && time_current < time_present ; time_current += update_every) { + st->usec_since_last_update = USEC_PER_SEC * update_every; for (j = 0; j < DSET_DIMS; ++j) { collected_number value; - value = generate_dbengine_chart_value(thread_info, j, time_current); + value = generate_dbengine_chart_value(thread_info->chart_i, j, time_current); rrddim_set_by_pointer_fake_time(rd[j], value, time_current); + ++thread_info->stored_metrics_nr; } rrdset_done(st); + thread_info->time_max = time_current; } } @@ -2003,7 +2015,7 @@ void generate_dbengine_dataset(unsigned history_seconds) const int DSET_DIMS = 128; const uint64_t EXPECTED_COMPRESSION_RATIO = 20; RRDHOST *host = NULL; - struct dbengine_chart_thread thread_info[DSET_CHARTS]; + struct dbengine_chart_thread **thread_info; int i; time_t time_present; @@ -2021,25 +2033,259 @@ void generate_dbengine_dataset(unsigned history_seconds) if (NULL == host) return; + thread_info = mallocz(sizeof(*thread_info) * DSET_CHARTS); + for (i = 0 ; i < DSET_CHARTS ; ++i) { + thread_info[i] = mallocz(sizeof(*thread_info[i]) + sizeof(RRDDIM *) * DSET_DIMS); + } fprintf(stderr, "\nRunning DB-engine workload generator\n"); time_present = now_realtime_sec(); for (i = 0 ; i < DSET_CHARTS ; ++i) { - thread_info[i].host = host; - thread_info[i].chartname = "random"; - thread_info[i].dset_charts = DSET_CHARTS; - thread_info[i].chart_i = i; - thread_info[i].dset_dims = DSET_DIMS; - thread_info[i].history_seconds = history_seconds; - thread_info[i].time_present = time_present; - assert(0 == uv_thread_create(&thread_info[i].thread, generate_dbengine_chart, &thread_info[i])); + thread_info[i]->host = host; + thread_info[i]->chartname = "random"; + thread_info[i]->dset_charts = DSET_CHARTS; + thread_info[i]->chart_i = i; + thread_info[i]->dset_dims = DSET_DIMS; + thread_info[i]->history_seconds = history_seconds; + thread_info[i]->time_present = time_present; + thread_info[i]->time_max = 0; + thread_info[i]->done = 0; + init_completion(&thread_info[i]->charts_initialized); + assert(0 == uv_thread_create(&thread_info[i]->thread, generate_dbengine_chart, thread_info[i])); + wait_for_completion(&thread_info[i]->charts_initialized); + destroy_completion(&thread_info[i]->charts_initialized); } for (i = 0 ; i < DSET_CHARTS ; ++i) { - assert(0 == uv_thread_join(&thread_info[i].thread)); + assert(0 == uv_thread_join(&thread_info[i]->thread)); } + for (i = 0 ; i < DSET_CHARTS ; ++i) { + freez(thread_info[i]); + } + freez(thread_info); rrd_wrlock(); rrdhost_free(host); rrd_unlock(); } + +struct dbengine_query_thread { + uv_thread_t thread; + RRDHOST *host; + char *chartname; /* Will be prefixed by type, e.g. "example_local1.", "example_local2." etc */ + unsigned dset_charts; /* number of charts */ + unsigned dset_dims; /* dimensions per chart */ + time_t time_present; /* current virtual time of the benchmark */ + unsigned history_seconds; /* how far back in the past to go */ + volatile long done; /* initialize to 0, set to 1 to stop thread */ + unsigned long errors, queries_nr, queried_metrics_nr; /* statistics */ + + struct dbengine_chart_thread *chart_threads[]; /* dset_charts elements */ +}; + +static void query_dbengine_chart(void *arg) +{ + struct dbengine_query_thread *thread_info = (struct dbengine_query_thread *)arg; + const int DSET_CHARTS = thread_info->dset_charts; + const int DSET_DIMS = thread_info->dset_dims; + time_t time_after, time_before, time_min, time_max, duration; + int i, j, update_every = 1; + RRDSET *st; + RRDDIM *rd; + uint8_t same; + time_t time_now, time_retrieved; + collected_number generatedv; + calculated_number value, expected; + storage_number n; + struct rrddim_query_handle handle; + + do { + // pick a chart and dimension + i = random() % DSET_CHARTS; + st = thread_info->chart_threads[i]->st; + j = random() % DSET_DIMS; + rd = thread_info->chart_threads[i]->rd[j]; + + time_min = thread_info->time_present - thread_info->history_seconds + 1; + time_max = thread_info->chart_threads[i]->time_max; + if (!time_max) { + time_before = time_after = time_min; + } else { + time_after = time_min + random() % (MAX(time_max - time_min, 1)); + duration = random() % 3600; + time_before = MIN(time_after + duration, time_max); /* up to 1 hour queries */ + } + + rd->state->query_ops.init(rd, &handle, time_after, time_before); + ++thread_info->queries_nr; + for (time_now = time_after ; time_now <= time_before ; time_now += update_every) { + generatedv = generate_dbengine_chart_value(i, j, time_now); + expected = unpack_storage_number(pack_storage_number((calculated_number) generatedv, SN_EXISTS)); + + if (unlikely(rd->state->query_ops.is_finished(&handle))) { + fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " + CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n", + st->name, rd->name, (unsigned long) time_now, expected); + ++thread_info->errors; + break; + } + n = rd->state->query_ops.next_metric(&handle, &time_retrieved); + if (SN_EMPTY_SLOT == n) { + fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " + CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n", + st->name, rd->name, (unsigned long) time_now, expected); + ++thread_info->errors; + break; + } + ++thread_info->queried_metrics_nr; + value = unpack_storage_number(n); + + same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0; + if (!same) { + fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " + CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n", + st->name, rd->name, (unsigned long) time_now, expected, value); + ++thread_info->errors; + } + if (time_retrieved != time_now) { + fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n", + st->name, rd->name, (unsigned long) time_now, (unsigned long) time_retrieved); + ++thread_info->errors; + } + } + rd->state->query_ops.finalize(&handle); + } while(!thread_info->done); +} + +void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsigned QUERY_THREADS, + unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB) +{ + const unsigned DSET_DIMS = 128; + const uint64_t EXPECTED_COMPRESSION_RATIO = 20; + const unsigned HISTORY_SECONDS = 3600 * 24 * 365; /* 1 year of history */ + RRDHOST *host = NULL; + struct dbengine_chart_thread **chart_threads; + struct dbengine_query_thread **query_threads; + unsigned i, j; + time_t time_start, time_end; + + if (!TEST_DURATION_SEC) + TEST_DURATION_SEC = 10; + if (!DSET_CHARTS) + DSET_CHARTS = 1; + if (!QUERY_THREADS) + QUERY_THREADS = 1; + if (PAGE_CACHE_MB < RRDENG_MIN_PAGE_CACHE_SIZE_MB) + PAGE_CACHE_MB = RRDENG_MIN_PAGE_CACHE_SIZE_MB; + + default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE; + default_rrdeng_page_cache_mb = PAGE_CACHE_MB; + // Worst case for uncompressible data + default_rrdeng_disk_quota_mb = (((uint64_t)DSET_DIMS * DSET_CHARTS) * sizeof(storage_number) * HISTORY_SECONDS) / + (1024 * 1024); + default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100; + + error_log_limit_unlimited(); + debug(D_RRDHOST, "Initializing localhost with hostname 'dbengine-stress-test'"); + + host = dbengine_rrdhost_find_or_create("dbengine-stress-test"); + if (NULL == host) + return; + + chart_threads = mallocz(sizeof(*chart_threads) * DSET_CHARTS); + for (i = 0 ; i < DSET_CHARTS ; ++i) { + chart_threads[i] = mallocz(sizeof(*chart_threads[i]) + sizeof(RRDDIM *) * DSET_DIMS); + } + query_threads = mallocz(sizeof(*query_threads) * QUERY_THREADS); + for (i = 0 ; i < QUERY_THREADS ; ++i) { + query_threads[i] = mallocz(sizeof(*query_threads[i]) + sizeof(struct dbengine_chart_thread *) * DSET_CHARTS); + } + fprintf(stderr, "\nRunning DB-engine stress test, %u seconds writers ramp-up time,\n" + "%u seconds of concurrent readers and writers, %u writer threads, %u reader threads,\n" + "%u MiB of page cache.\n", + RAMP_UP_SECONDS, TEST_DURATION_SEC, DSET_CHARTS, QUERY_THREADS, PAGE_CACHE_MB); + + time_start = now_realtime_sec(); + for (i = 0 ; i < DSET_CHARTS ; ++i) { + chart_threads[i]->host = host; + chart_threads[i]->chartname = "random"; + chart_threads[i]->dset_charts = DSET_CHARTS; + chart_threads[i]->chart_i = i; + chart_threads[i]->dset_dims = DSET_DIMS; + chart_threads[i]->history_seconds = HISTORY_SECONDS; + chart_threads[i]->time_present = time_start; + chart_threads[i]->time_max = 0; + chart_threads[i]->done = 0; + chart_threads[i]->errors = chart_threads[i]->stored_metrics_nr = 0; + init_completion(&chart_threads[i]->charts_initialized); + assert(0 == uv_thread_create(&chart_threads[i]->thread, generate_dbengine_chart, chart_threads[i])); + } + /* barrier so that subsequent queries can access valid chart data */ + for (i = 0 ; i < DSET_CHARTS ; ++i) { + wait_for_completion(&chart_threads[i]->charts_initialized); + destroy_completion(&chart_threads[i]->charts_initialized); + } + sleep(RAMP_UP_SECONDS); + /* at this point data have already began being written to the database */ + for (i = 0 ; i < QUERY_THREADS ; ++i) { + query_threads[i]->host = host; + query_threads[i]->chartname = "random"; + query_threads[i]->dset_charts = DSET_CHARTS; + query_threads[i]->dset_dims = DSET_DIMS; + query_threads[i]->history_seconds = HISTORY_SECONDS; + query_threads[i]->time_present = time_start; + query_threads[i]->done = 0; + query_threads[i]->errors = query_threads[i]->queries_nr = query_threads[i]->queried_metrics_nr = 0; + for (j = 0 ; j < DSET_CHARTS ; ++j) { + query_threads[i]->chart_threads[j] = chart_threads[j]; + } + assert(0 == uv_thread_create(&query_threads[i]->thread, query_dbengine_chart, query_threads[i])); + } + sleep(TEST_DURATION_SEC); + /* stop workload */ + for (i = 0 ; i < DSET_CHARTS ; ++i) { + chart_threads[i]->done = 1; + } + for (i = 0 ; i < QUERY_THREADS ; ++i) { + query_threads[i]->done = 1; + } + for (i = 0 ; i < DSET_CHARTS ; ++i) { + assert(0 == uv_thread_join(&chart_threads[i]->thread)); + } + for (i = 0 ; i < QUERY_THREADS ; ++i) { + assert(0 == uv_thread_join(&query_threads[i]->thread)); + } + time_end = now_realtime_sec(); + fprintf(stderr, "\nDB-engine stress test finished in %ld seconds.\n", time_end - time_start); + unsigned long stored_metrics_nr = 0; + for (i = 0 ; i < DSET_CHARTS ; ++i) { + stored_metrics_nr += chart_threads[i]->stored_metrics_nr; + } + unsigned long queries_nr = 0, queried_metrics_nr = 0; + for (i = 0 ; i < QUERY_THREADS ; ++i) { + queries_nr += query_threads[i]->queries_nr; + queried_metrics_nr += query_threads[i]->queried_metrics_nr; + } + fprintf(stderr, "%u metrics were stored (dataset size of %lu MiB) in %u charts by 1 writer thread per chart.\n", + DSET_CHARTS * DSET_DIMS, stored_metrics_nr * sizeof(storage_number) / (1024 * 1024), DSET_CHARTS); + fprintf(stderr, "Metrics were being generated per 1 emulated second and time was accelerated.\n"); + fprintf(stderr, "%lu metric data points were queried by %u reader threads.\n", queried_metrics_nr, QUERY_THREADS); + fprintf(stderr, "Query starting time is randomly chosen from the beginning of the time-series up to the time of\n" + "the latest data point, and ending time from 1 second up to 1 hour after the starting time.\n"); + fprintf(stderr, "Performance is %lu written data points/sec and %lu read data points/sec.\n", + stored_metrics_nr / (time_end - time_start), queried_metrics_nr / (time_end - time_start)); + + for (i = 0 ; i < DSET_CHARTS ; ++i) { + freez(chart_threads[i]); + } + freez(chart_threads); + for (i = 0 ; i < QUERY_THREADS ; ++i) { + freez(query_threads[i]); + } + freez(query_threads); + rrdeng_exit(host->rrdeng_ctx); + rrd_wrlock(); + rrdhost_delete_charts(host); + rrd_unlock(); +} + #endif |