summaryrefslogtreecommitdiffstats
path: root/storage/tokudb/PerconaFT/util/tests/test_partitioned_counter.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/tokudb/PerconaFT/util/tests/test_partitioned_counter.cc')
-rw-r--r--storage/tokudb/PerconaFT/util/tests/test_partitioned_counter.cc416
1 files changed, 416 insertions, 0 deletions
diff --git a/storage/tokudb/PerconaFT/util/tests/test_partitioned_counter.cc b/storage/tokudb/PerconaFT/util/tests/test_partitioned_counter.cc
new file mode 100644
index 00000000..a4e6f842
--- /dev/null
+++ b/storage/tokudb/PerconaFT/util/tests/test_partitioned_counter.cc
@@ -0,0 +1,416 @@
+/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
+#ident "$Id$"
+/*======
+This file is part of PerconaFT.
+
+
+Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+
+----------------------------------------
+
+ PerconaFT is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License, version 3,
+ as published by the Free Software Foundation.
+
+ PerconaFT is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
+======= */
+
+#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
+
+/* This code can either test the PARTITIONED_COUNTER abstraction or it can time various implementations. */
+
+/* Try to make counter that requires no cache misses to increment, and to get the value can be slow.
+ * I don't care much about races between the readers and writers on the counter.
+ *
+ * The problem: We observed that incrementing a counter with multiple threads is quite expensive.
+ * Here are some performance numbers:
+ * Machines: mork or mindy (Intel Xeon L5520 2.27GHz)
+ * bradley's 4-core laptop laptop (Intel Core i7-2640M 2.80GHz) sandy bridge
+ * alf 16-core server (xeon E5-2665 2.4GHz) sandybridge
+ *
+ * mork mindy bradley alf
+ * 1.22ns 1.07ns 1.27ns 0.61ns to do a ++, but it's got a race in it.
+ * 27.11ns 20.47ns 18.75ns 34.15ns to do a sync_fetch_and_add().
+ * 0.26ns 0.29ns 0.71ns 0.19ns to do with a single version of a counter
+ * 0.35ns 0.33ns 0.69ns 0.18ns pure thread-local variable (no way to add things up)
+ * 0.76ns 1.50ns 0.35ns partitioned_counter.c (using link-time optimization, otherwise the function all overwhelms everything)
+ * 2.21ns 3.32ns 0.70ns partitioned_counter.c (using gcc, the C version at r46097, not C++) This one is a little slower because it has an extra branch in it.
+ *
+ * Surprisingly, compiling this code without -fPIC doesn't make it any faster (even the pure thread-local variable is the same). -fPIC access to
+ * thread-local variables look slower since they have a function all, but they don't seem to be any slower in practice. In fact, even the puretl-ptr test
+ * which simply increments a thread-local pointer is basically the same speed as accessing thread_local variable.
+ *
+ * How it works. Each thread has a thread-local counter structure with an integer in it. To increment, we increment the thread-local structure.
+ * The other operation is to query the counters to get the sum of all the thread-local variables.
+ * The first time a pthread increments the variable we add the variable to a linked list.
+ * When a pthread ends, we use the pthread_key destructor to remove the variable from the linked list. We also have to remember the sum of everything.
+ * that has been removed from the list.
+ * To get the sum we add the sum of the destructed items, plus everything in the list.
+ *
+ */
+
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <toku_race_tools.h>
+#include <toku_assert.h>
+#include <portability/toku_atomic.h>
+#include <memory.h>
+#include <util/partitioned_counter.h>
+#include "test.h"
+
+// The test code includes the fastest version I could figure out to make, implemented below.
+
+struct counter_s {
+ bool inited;
+ volatile int counter;
+ struct counter_s *prev, *next;
+ int myid;
+};
+static __thread struct counter_s counter = {false,0, NULL,NULL,0};
+
+static int finished_counter=0; // counter for all threads that are done.
+
+// We use a single mutex for anything complex. We'd like to use a mutex per partitioned counter, but we must cope with the possibility of a race between
+// a terminating pthread (which calls destroy_counter()), and a call to the counter destructor. So we use a global mutex.
+static pthread_mutex_t pc_mutex = PTHREAD_MUTEX_INITIALIZER;
+static struct counter_s *head=NULL;
+static pthread_key_t counter_key;
+
+static void pc_lock (void)
+// Effect: Lock the pc mutex.
+{
+ int r = pthread_mutex_lock(&pc_mutex);
+ assert(r==0);
+}
+
+static void pc_unlock (void)
+// Effect: Unlock the pc mutex.
+{
+ int r = pthread_mutex_unlock(&pc_mutex);
+ assert(r==0);
+}
+
+static void destroy_counter (void *counterp)
+// Effect: This is the function passed to pthread_key_create that is to run whenever a thread terminates.
+// The thread-local part of the counter must be copied into the shared state, and the thread-local part of the counter must be
+// removed from the linked list of all thread-local parts.
+{
+ assert((struct counter_s*)counterp==&counter);
+ pc_lock();
+ if (counter.prev==NULL) {
+ assert(head==&counter);
+ head = counter.next;
+ } else {
+ counter.prev->next = counter.next;
+ }
+ if (counter.next!=NULL) {
+ counter.next->prev = counter.prev;
+ }
+ finished_counter += counter.counter;
+ TOKU_VALGRIND_HG_ENABLE_CHECKING(&counter.counter, sizeof(counter.counter)); // stop ignoring races
+ //printf("finished counter now %d\n", finished_counter);
+ pc_unlock();
+}
+
+static int idcounter=0;
+
+static inline void increment (void) {
+ if (!counter.inited) {
+ pc_lock();
+ struct counter_s *cp = &counter;
+ { int r = pthread_setspecific(counter_key, cp); assert(r==0); }
+ cp->prev = NULL;
+ cp->next = head;
+ if (head!=NULL) {
+ head->prev = cp;
+ }
+ head = cp;
+ cp->counter = 0;
+ cp->inited = true;
+ cp->myid = idcounter++;
+ TOKU_VALGRIND_HG_DISABLE_CHECKING(&counter.counter, sizeof(counter.counter)); // the counter increment is kind of racy.
+ pc_unlock();
+ }
+ counter.counter++;
+}
+
+static int getvals (void) {
+ pc_lock();
+ int sum=finished_counter;
+ for (struct counter_s *p=head; p; p=p->next) {
+ sum+=p->counter;
+ }
+ pc_unlock();
+ return sum;
+}
+
+/**********************************************************************************/
+/* And now for some actual test code. */
+/**********************************************************************************/
+
+static const int N=10000000;
+static const int T=20;
+
+
+PARTITIONED_COUNTER pc;
+static void *pc_doit (void *v) {
+ for (int i=0; i<N; i++) {
+ increment_partitioned_counter(pc, 1);
+ }
+ //printf("val=%ld\n", read_partitioned_counter(pc));
+ return v;
+}
+
+static void* new_doit (void* v) {
+ for (int i=0; i<N; i++) {
+ increment();
+ //if (i%0x2000 == 0) sched_yield();
+ }
+ if (0) printf("done id=%d, getvals=%d\n", counter.myid, getvals());
+ return v;
+}
+
+static int oldcounter=0;
+
+static void* old_doit (void* v) {
+ for (int i=0; i<N; i++) {
+ (void)toku_sync_fetch_and_add(&oldcounter, 1);
+ //if (i%0x1000 == 0) sched_yield();
+ }
+ return v;
+}
+
+static volatile int oldcounter_nonatomic=0;
+
+static void* old_doit_nonatomic (void* v) {
+ for (int i=0; i<N; i++) {
+ oldcounter_nonatomic++;
+ //if (i%0x1000 == 0) sched_yield();
+ }
+ return v;
+}
+
+static __thread volatile int thread_local_counter=0;
+static void* tl_doit (void *v) {
+ for (int i=0; i<N; i++) {
+ thread_local_counter++;
+ }
+ return v;
+}
+
+static float tdiff (struct timeval *start, struct timeval *end) {
+ return (end->tv_sec-start->tv_sec) +1e-6*(end->tv_usec - start->tv_usec);
+}
+
+static void pt_create (pthread_t *thread, void *(*f)(void*), void *extra) {
+ int r = pthread_create(thread, NULL, f, extra);
+ assert(r==0);
+}
+
+static void pt_join (pthread_t thread, void *expect_extra) {
+ void *result;
+ int r = pthread_join(thread, &result);
+ assert(r==0);
+ assert(result==expect_extra);
+}
+
+static void timeit (const char *description, void* (*f)(void*)) {
+ struct timeval start, end;
+ pthread_t threads[T];
+ gettimeofday(&start, 0);
+ for (int i=0; i<T; i++) {
+ pt_create(&threads[i], f, NULL);
+ }
+ for (int i=0; i<T; i++) {
+ pt_join(threads[i], NULL);
+ }
+ gettimeofday(&end, 0);
+ printf("%-10s Time=%.6fs (%7.3fns per increment)\n", description, tdiff(&start, &end), (1e9*tdiff(&start, &end)/T)/N);
+}
+
+// Do a measurement where it really is only a pointer dereference to increment the variable, which is thread local.
+static void* tl_doit_ptr (void *v) {
+ volatile uint64_t *p = (uint64_t *)v;
+ for (int i=0; i<N; i++) {
+ (*p)++;
+ }
+ return v;
+}
+
+
+static void timeit_with_thread_local_pointer (const char *description, void* (*f)(void*)) {
+ struct timeval start, end;
+ pthread_t threads[T];
+ struct { uint64_t values[8] __attribute__((__aligned__(64))); } values[T]; // pad to different cache lines.
+ gettimeofday(&start, 0);
+ for (int i=0; i<T; i++) {
+ values[i].values[0]=0;
+ pt_create(&threads[i], f, &values[i].values[0]);
+ }
+ for (int i=0; i<T; i++) {
+ pt_join(threads[i], &values[i].values[0]);
+ }
+ gettimeofday(&end, 0);
+ printf("%-10s Time=%.6fs (%7.3fns per increment)\n", description, tdiff(&start, &end), (1e9*tdiff(&start, &end)/T)/N);
+}
+
+static int verboseness_cmdarg=0;
+static bool time_cmdarg=false;
+
+static void parse_args (int argc, const char *argv[]) {
+ const char *progname = argv[1];
+ argc--; argv++;
+ while (argc>0) {
+ if (strcmp(argv[0], "-v")==0) verboseness_cmdarg++;
+ else if (strcmp(argv[0], "--time")==0) time_cmdarg=true;
+ else {
+ printf("Usage: %s [-v] [--time]\n Default is to run tests. --time produces timing output.\n", progname);
+ exit(1);
+ }
+ argc--; argv++;
+ }
+}
+
+static void do_timeit (void) {
+ { int r = pthread_key_create(&counter_key, destroy_counter); assert(r==0); }
+ printf("%d threads\n%d increments per thread\n", T, N);
+ timeit("++", old_doit_nonatomic);
+ timeit("atomic++", old_doit);
+ timeit("fast", new_doit);
+ timeit("puretl", tl_doit);
+ timeit_with_thread_local_pointer("puretl-ptr", tl_doit_ptr);
+ pc = create_partitioned_counter();
+ timeit("pc", pc_doit);
+ destroy_partitioned_counter(pc);
+}
+
+struct test_arguments {
+ PARTITIONED_COUNTER pc;
+ uint64_t limit;
+ uint64_t total_increment_per_writer;
+ volatile uint64_t unfinished_count;
+};
+
+static void *reader_test_fun (void *ta_v) {
+ struct test_arguments *ta = (struct test_arguments *)ta_v;
+ uint64_t lastval = 0;
+ while (ta->unfinished_count>0) {
+ uint64_t thisval = read_partitioned_counter(ta->pc);
+ assert(lastval <= thisval);
+ assert(thisval <= ta->limit+2);
+ lastval = thisval;
+ if (verboseness_cmdarg && (0==(thisval & (thisval-1)))) printf("ufc=%" PRIu64 " Thisval=%" PRIu64 "\n", ta->unfinished_count,thisval);
+ }
+ uint64_t thisval = read_partitioned_counter(ta->pc);
+ assert(thisval==ta->limit+2); // we incremented two extra times in the test
+ return ta_v;
+}
+
+static void *writer_test_fun (void *ta_v) {
+ struct test_arguments *ta = (struct test_arguments *)ta_v;
+ for (uint64_t i=0; i<ta->total_increment_per_writer; i++) {
+ if (i%1000 == 0) sched_yield();
+ increment_partitioned_counter(ta->pc, 1);
+ }
+ uint64_t c __attribute__((__unused__)) = toku_sync_fetch_and_sub(&ta->unfinished_count, 1);
+ return ta_v;
+}
+
+
+static void do_testit (void) {
+ const int NGROUPS = 2;
+ uint64_t limits[NGROUPS];
+ limits [0] = 2000000;
+ limits [1] = 1000000;
+ uint64_t n_writers[NGROUPS];
+ n_writers[0] = 20;
+ n_writers[1] = 40;
+ struct test_arguments tas[NGROUPS];
+ pthread_t reader_threads[NGROUPS];
+ pthread_t *writer_threads[NGROUPS];
+ for (int i=0; i<NGROUPS; i++) {
+ tas[i].pc = create_partitioned_counter();
+ tas[i].limit = limits[i];
+ tas[i].unfinished_count = n_writers[i];
+ tas[i].total_increment_per_writer = limits[i]/n_writers[i];
+ assert(tas[i].total_increment_per_writer * n_writers[i] == limits[i]);
+ pt_create(&reader_threads[i], reader_test_fun, &tas[i]);
+ increment_partitioned_counter(tas[i].pc, 1); // make sure that the long-lived thread also increments the partitioned counter, to test for #5321.
+ MALLOC_N(n_writers[i], writer_threads[i]);
+ for (uint64_t j=0; j<n_writers[i] ; j++) {
+ pt_create(&writer_threads[i][j], writer_test_fun, &tas[i]);
+ }
+ increment_partitioned_counter(tas[i].pc, 1); // make sure that the long-lived thread also increments the partitioned counter, to test for #5321.
+ }
+ for (int i=0; i<NGROUPS; i++) {
+ pt_join(reader_threads[i], &tas[i]);
+ for (uint64_t j=0; j<n_writers[i] ; j++) {
+ pt_join(writer_threads[i][j], &tas[i]);
+ }
+ toku_free(writer_threads[i]);
+ destroy_partitioned_counter(tas[i].pc);
+ }
+}
+
+volatile int spinwait=0;
+static void* test2_fun (void* mypc_v) {
+ PARTITIONED_COUNTER mypc = (PARTITIONED_COUNTER)mypc_v;
+ increment_partitioned_counter(mypc, 3);
+ spinwait=1;
+ while (spinwait==1);
+ // mypc no longer points at a valid data structure.
+ return NULL;
+}
+
+static void do_testit2 (void)
+// This test checks to see what happens if a thread is still live when we destruct a counter.
+// A thread increments the counter, then lets us know through a spin wait, then waits until we destroy the counter.
+{
+ pthread_t t;
+ TOKU_VALGRIND_HG_DISABLE_CHECKING(&spinwait, sizeof(spinwait)); // this is a racy volatile variable.
+ {
+ PARTITIONED_COUNTER mypc = create_partitioned_counter();
+ increment_partitioned_counter(mypc, 1); // make sure that the long-lived thread also increments the partitioned counter, to test for #5321.
+ pt_create(&t, test2_fun, mypc);
+ while(spinwait==0); // wait until he incremented the counter.
+ increment_partitioned_counter(mypc, -1);
+ assert(read_partitioned_counter(mypc)==3);
+ destroy_partitioned_counter(mypc);
+ } // leave scope, so the counter goes away.
+ spinwait=2; // tell the other guy to finish up.
+ pt_join(t, NULL);
+}
+
+int test_main (int argc, const char *argv[]) {
+ parse_args(argc, argv);
+ if (time_cmdarg) {
+ do_timeit();
+ } else {
+ do_testit();
+ do_testit2();
+ }
+ return 0;
+}