summaryrefslogtreecommitdiffstats
path: root/plugin/handler_socket/client
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
commita175314c3e5827eb193872241446f2f8f5c9d33c (patch)
treecd3d60ca99ae00829c52a6ca79150a5b6e62528b /plugin/handler_socket/client
parentInitial commit. (diff)
downloadmariadb-10.5-a175314c3e5827eb193872241446f2f8f5c9d33c.tar.xz
mariadb-10.5-a175314c3e5827eb193872241446f2f8f5c9d33c.zip
Adding upstream version 1:10.5.12.upstream/1%10.5.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'plugin/handler_socket/client')
-rw-r--r--plugin/handler_socket/client/Makefile.am24
-rw-r--r--plugin/handler_socket/client/hsclient.cpp88
-rw-r--r--plugin/handler_socket/client/hslongrun.cpp1041
-rwxr-xr-xplugin/handler_socket/client/hspool_test.pl224
-rw-r--r--plugin/handler_socket/client/hstest.cpp1532
-rwxr-xr-xplugin/handler_socket/client/hstest.pl228
-rwxr-xr-xplugin/handler_socket/client/hstest_hs.sh4
-rwxr-xr-xplugin/handler_socket/client/hstest_hs_more50.sh4
-rwxr-xr-xplugin/handler_socket/client/hstest_md.sh7
-rwxr-xr-xplugin/handler_socket/client/hstest_my.sh3
-rwxr-xr-xplugin/handler_socket/client/hstest_my_more50.sh3
11 files changed, 3158 insertions, 0 deletions
diff --git a/plugin/handler_socket/client/Makefile.am b/plugin/handler_socket/client/Makefile.am
new file mode 100644
index 00000000..e89727a7
--- /dev/null
+++ b/plugin/handler_socket/client/Makefile.am
@@ -0,0 +1,24 @@
+CXXFLAGS += -fimplicit-templates
+AM_INCLUDES= -I$(srcdir)/../libhsclient
+bin_PROGRAMS=hsclient
+hsclient_SOURCES= hsclient.cpp
+hsclient_LDFLAGS= -static -L../libhsclient -lhsclient
+hsclient_CXXFLAGS= $(AM_INCLUDES)
+
+hstest: hstest.o
+ $(CXX) $(CXXFLAGS) $(MY_CXXFLAGS) $(LFLAGS) hstest.o \
+ -L../libhsclient/.libs -lhsclient $(MYSQL_LIB) \
+ -o hstest
+
+hstest.o: hstest.cpp
+ $(CXX) $(CXXFLAGS) $(MY_CXXFLAGS) $(MYSQL_INC) $(AM_INCLUDES) \
+ -c hstest.cpp
+
+hslongrun: hslongrun.o
+ $(CXX) $(CXXFLAGS) $(MY_CXXFLAGS) $(LFLAGS) hslongrun.o \
+ -L../libhsclient/.libs -lhsclient $(MYSQL_LIB) \
+ -o hslongrun
+
+hslongrun.o: hslongrun.cpp
+ $(CXX) $(CXXFLAGS) $(MY_CXXFLAGS) $(MYSQL_INC) $(AM_INCLUDES) \
+ -c hslongrun.cpp
diff --git a/plugin/handler_socket/client/hsclient.cpp b/plugin/handler_socket/client/hsclient.cpp
new file mode 100644
index 00000000..0dd8332e
--- /dev/null
+++ b/plugin/handler_socket/client/hsclient.cpp
@@ -0,0 +1,88 @@
+
+// vim:sw=2:ai
+
+#include "hstcpcli.hpp"
+#include "string_util.hpp"
+
+namespace dena {
+
+int
+hstcpcli_main(int argc, char **argv)
+{
+ config conf;
+ parse_args(argc, argv, conf);
+ socket_args sockargs;
+ sockargs.set(conf);
+ hstcpcli_ptr cli = hstcpcli_i::create(sockargs);
+ const std::string dbname = conf.get_str("dbname", "hstest");
+ const std::string table = conf.get_str("table", "hstest_table1");
+ const std::string index = conf.get_str("index", "PRIMARY");
+ const std::string fields = conf.get_str("fields", "k,v");
+ const int limit = conf.get_int("limit", 0);
+ const int skip = conf.get_int("skip", 0);
+ std::vector<std::string> keys;
+ std::vector<string_ref> keyrefs;
+ size_t num_keys = 0;
+ while (true) {
+ const std::string conf_key = std::string("k") + to_stdstring(num_keys);
+ const std::string k = conf.get_str(conf_key, "");
+ const std::string kx = conf.get_str(conf_key, "x");
+ if (k.empty() && kx == "x") {
+ break;
+ }
+ ++num_keys;
+ keys.push_back(k);
+ }
+ for (size_t i = 0; i < keys.size(); ++i) {
+ const string_ref ref(keys[i].data(), keys[i].size());
+ keyrefs.push_back(ref);
+ }
+ const std::string op = conf.get_str("op", "=");
+ const string_ref op_ref(op.data(), op.size());
+ cli->request_buf_open_index(0, dbname.c_str(), table.c_str(),
+ index.c_str(), fields.c_str());
+ cli->request_buf_exec_generic(0, op_ref, num_keys == 0 ? 0 : &keyrefs[0],
+ num_keys, limit, skip, string_ref(), 0, 0);
+ int code = 0;
+ size_t numflds = 0;
+ do {
+ if (cli->request_send() != 0) {
+ fprintf(stderr, "request_send: %s\n", cli->get_error().c_str());
+ break;
+ }
+ if ((code = cli->response_recv(numflds)) != 0) {
+ fprintf(stderr, "response_recv: %s\n", cli->get_error().c_str());
+ break;
+ }
+ } while (false);
+ cli->response_buf_remove();
+ do {
+ if ((code = cli->response_recv(numflds)) != 0) {
+ fprintf(stderr, "response_recv: %s\n", cli->get_error().c_str());
+ break;
+ }
+ while (true) {
+ const string_ref *const row = cli->get_next_row();
+ if (row == 0) {
+ break;
+ }
+ printf("REC:");
+ for (size_t i = 0; i < numflds; ++i) {
+ const std::string val(row[i].begin(), row[i].size());
+ printf(" %s", val.c_str());
+ }
+ printf("\n");
+ }
+ } while (false);
+ cli->response_buf_remove();
+ return 0;
+}
+
+};
+
+int
+main(int argc, char **argv)
+{
+ return dena::hstcpcli_main(argc, argv);
+}
+
diff --git a/plugin/handler_socket/client/hslongrun.cpp b/plugin/handler_socket/client/hslongrun.cpp
new file mode 100644
index 00000000..b7c02951
--- /dev/null
+++ b/plugin/handler_socket/client/hslongrun.cpp
@@ -0,0 +1,1041 @@
+
+// vim:sw=2:ai
+
+#include <signal.h>
+#include <sys/time.h>
+#include <stdio.h>
+#include <string.h>
+#include <vector>
+#include <map>
+#include <stdlib.h>
+#include <memory>
+#include <errno.h>
+#include <mysql.h>
+#include <time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "util.hpp"
+#include "auto_ptrcontainer.hpp"
+#include "socket.hpp"
+#include "hstcpcli.hpp"
+#include "string_util.hpp"
+#include "mutex.hpp"
+
+namespace dena {
+
+struct auto_mysql : private noncopyable {
+ auto_mysql() : db(0) {
+ reset();
+ }
+ ~auto_mysql() {
+ if (db) {
+ mysql_close(db);
+ }
+ }
+ void reset() {
+ if (db) {
+ mysql_close(db);
+ }
+ if ((db = mysql_init(0)) == 0) {
+ fatal_abort("failed to initialize mysql client");
+ }
+ }
+ operator MYSQL *() const { return db; }
+ private:
+ MYSQL *db;
+};
+
+struct auto_mysql_res : private noncopyable {
+ auto_mysql_res(MYSQL *db) {
+ res = mysql_store_result(db);
+ }
+ ~auto_mysql_res() {
+ if (res) {
+ mysql_free_result(res);
+ }
+ }
+ operator MYSQL_RES *() const { return res; }
+ private:
+ MYSQL_RES *res;
+};
+
+struct auto_mysql_stmt : private noncopyable {
+ auto_mysql_stmt(MYSQL *db) {
+ stmt = mysql_stmt_init(db);
+ }
+ ~auto_mysql_stmt() {
+ if (stmt) {
+ mysql_stmt_close(stmt);
+ }
+ }
+ operator MYSQL_STMT *() const { return stmt; }
+ private:
+ MYSQL_STMT *stmt;
+};
+
+double
+gettimeofday_double()
+{
+ struct timeval tv = { };
+ if (gettimeofday(&tv, 0) != 0) {
+ fatal_abort("gettimeofday");
+ }
+ return static_cast<double>(tv.tv_usec) / 1000000 + tv.tv_sec;
+}
+
+struct record_value {
+ mutex lock;
+ bool deleted;
+ bool unknown_state;
+ std::string key;
+ std::vector<std::string> values;
+ record_value() : deleted(true), unknown_state(false) { }
+};
+
+struct hs_longrun_shared {
+ config conf;
+ socket_args arg;
+ int verbose;
+ long num_threads;
+ int usleep;
+ volatile mutable int running;
+ auto_ptrcontainer< std::vector<record_value *> > records;
+ hs_longrun_shared() : verbose(0), num_threads(0), usleep(0), running(1) { }
+};
+
+struct thread_base {
+ thread_base() : need_join(false), stack_size(256 * 1024) { }
+ virtual ~thread_base() {
+ join();
+ }
+ virtual void run() = 0;
+ void start() {
+ if (!start_nothrow()) {
+ fatal_abort("thread::start");
+ }
+ }
+ bool start_nothrow() {
+ if (need_join) {
+ return need_join; /* true */
+ }
+ void *const arg = this;
+ pthread_attr_t attr;
+ if (pthread_attr_init(&attr) != 0) {
+ fatal_abort("pthread_attr_init");
+ }
+ if (pthread_attr_setstacksize(&attr, stack_size) != 0) {
+ fatal_abort("pthread_attr_setstacksize");
+ }
+ const int r = pthread_create(&thr, &attr, thread_main, arg);
+ if (pthread_attr_destroy(&attr) != 0) {
+ fatal_abort("pthread_attr_destroy");
+ }
+ if (r != 0) {
+ return need_join; /* false */
+ }
+ need_join = true;
+ return need_join; /* true */
+ }
+ void join() {
+ if (!need_join) {
+ return;
+ }
+ int e = 0;
+ if ((e = pthread_join(thr, 0)) != 0) {
+ fatal_abort("pthread_join");
+ }
+ need_join = false;
+ }
+ private:
+ static void *thread_main(void *arg) {
+ thread_base *p = static_cast<thread_base *>(arg);
+ p->run();
+ return 0;
+ }
+ private:
+ pthread_t thr;
+ bool need_join;
+ size_t stack_size;
+};
+
+struct hs_longrun_stat {
+ unsigned long long verify_error_count;
+ unsigned long long runtime_error_count;
+ unsigned long long unknown_count;
+ unsigned long long success_count;
+ hs_longrun_stat()
+ : verify_error_count(0), runtime_error_count(0),
+ unknown_count(0), success_count(0) { }
+ void add(const hs_longrun_stat& x) {
+ verify_error_count += x.verify_error_count;
+ runtime_error_count += x.runtime_error_count;
+ unknown_count += x.unknown_count;
+ success_count += x.success_count;
+ }
+};
+
+struct hs_longrun_thread_base : public thread_base {
+ struct arg_type {
+ int id;
+ std::string worker_type;
+ char op;
+ int lock_flag;
+ const hs_longrun_shared& sh;
+ arg_type(int id, const std::string& worker_type, char op, int lock_flag,
+ const hs_longrun_shared& sh)
+ : id(id), worker_type(worker_type), op(op), lock_flag(lock_flag),
+ sh(sh) { }
+ };
+ arg_type arg;
+ hs_longrun_stat stat;
+ drand48_data randbuf;
+ unsigned int seed;
+ hs_longrun_thread_base(const arg_type& arg)
+ : arg(arg), seed(0) {
+ seed = time(0) + arg.id + 1;
+ srand48_r(seed, &randbuf);
+ }
+ virtual ~hs_longrun_thread_base() { }
+ virtual void run() = 0;
+ size_t rand_record() {
+ double v = 0;
+ drand48_r(&randbuf, &v);
+ const size_t sz = arg.sh.records.size();
+ size_t r = size_t(v * sz);
+ if (r >= sz) {
+ r = 0;
+ }
+ return r;
+ }
+ int verify_update(const std::string& k, const std::string& v1,
+ const std::string& v2, const std::string& v3, record_value& rec,
+ uint32_t num_rows, bool cur_unknown_state);
+ int verify_read(const std::string& k, uint32_t num_rows, uint32_t num_flds,
+ const std::string rrec[4], record_value& rec);
+ int verify_readnolock(const std::string& k, uint32_t num_rows,
+ uint32_t num_flds, const std::string rrec[4]);
+};
+
+int
+hs_longrun_thread_base::verify_update(const std::string& k,
+ const std::string& v1, const std::string& v2, const std::string& v3,
+ record_value& rec, uint32_t num_rows, bool cur_unknown_state)
+{
+ const bool op_success = num_rows == 1;
+ int ret = 0;
+ if (!rec.unknown_state) {
+ if (!rec.deleted && !op_success) {
+ ++stat.verify_error_count;
+ if (arg.sh.verbose > 0) {
+ fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
+ "unexpected_update_failure\n",
+ arg.worker_type.c_str(), arg.id, k.c_str());
+ }
+ ret = 1;
+ } else if (rec.deleted && op_success) {
+ ++stat.verify_error_count;
+ if (arg.sh.verbose > 0) {
+ fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
+ "unexpected_update_success\n",
+ arg.worker_type.c_str(), arg.id, k.c_str());
+ }
+ ret = 1;
+ }
+ }
+ if (op_success) {
+ rec.values.resize(4);
+ rec.values[0] = k;
+ rec.values[1] = v1;
+ rec.values[2] = v2;
+ rec.values[3] = v3;
+ if (ret == 0 && !rec.unknown_state) {
+ ++stat.success_count;
+ }
+ }
+ rec.unknown_state = cur_unknown_state;
+ if (arg.sh.verbose >= 100 && ret == 0) {
+ fprintf(stderr, "%s %s %s %s %s\n", arg.worker_type.c_str(),
+ k.c_str(), v1.c_str(), v2.c_str(), v3.c_str());
+ }
+ return ret;
+}
+
+int
+hs_longrun_thread_base::verify_read(const std::string& k,
+ uint32_t num_rows, uint32_t num_flds, const std::string rrec[4],
+ record_value& rec)
+{
+ const bool op_success = num_rows != 0;
+ int ret = 0;
+ if (!rec.unknown_state) {
+ if (!rec.deleted && !op_success) {
+ ++stat.verify_error_count;
+ if (arg.sh.verbose > 0) {
+ fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
+ "unexpected_read_failure\n",
+ arg.worker_type.c_str(), arg.id, k.c_str());
+ }
+ ret = 1;
+ } else if (rec.deleted && op_success) {
+ ++stat.verify_error_count;
+ if (arg.sh.verbose > 0) {
+ fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
+ "unexpected_read_success\n",
+ arg.worker_type.c_str(), arg.id, k.c_str());
+ }
+ ret = 1;
+ } else if (num_flds != 4) {
+ ++stat.verify_error_count;
+ if (arg.sh.verbose > 0) {
+ fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
+ "unexpected_read_fldnum %d\n",
+ arg.worker_type.c_str(), arg.id, k.c_str(),
+ static_cast<int>(num_flds));
+ }
+ ret = 1;
+ } else if (rec.deleted) {
+ /* nothing to verify */
+ } else {
+ int diff = 0;
+ for (size_t i = 0; i < 4; ++i) {
+ if (rec.values[i] == rrec[i]) {
+ /* ok */
+ } else {
+ diff = 1;
+ }
+ }
+ if (diff) {
+ std::string mess;
+ for (size_t i = 0; i < 4; ++i) {
+ const std::string& expected = rec.values[i];
+ const std::string& val = rrec[i];
+ mess += " " + val + "/" + expected;
+ }
+ if (arg.sh.verbose > 0) {
+ fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
+ "unexpected_read_value %s\n",
+ arg.worker_type.c_str(), arg.id, k.c_str(), mess.c_str());
+ }
+ ret = 1;
+ }
+ }
+ }
+ if (arg.sh.verbose >= 100 && ret == 0) {
+ fprintf(stderr, "%s %s\n", arg.worker_type.c_str(), k.c_str());
+ }
+ if (ret == 0 && !rec.unknown_state) {
+ ++stat.success_count;
+ }
+ return ret;
+}
+
+int
+hs_longrun_thread_base::verify_readnolock(const std::string& k,
+ uint32_t num_rows, uint32_t num_flds, const std::string rrec[4])
+{
+ int ret = 0;
+ if (num_rows != 1 || num_flds != 4) {
+ ++stat.verify_error_count;
+ if (arg.sh.verbose > 0) {
+ fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
+ "unexpected_read_failure\n",
+ arg.worker_type.c_str(), arg.id, k.c_str());
+ }
+ ret = 1;
+ }
+ if (arg.sh.verbose >= 100 && ret == 0) {
+ fprintf(stderr, "%s -> %s %s %s %s %s\n", arg.worker_type.c_str(),
+ k.c_str(), rrec[0].c_str(), rrec[1].c_str(), rrec[2].c_str(),
+ rrec[3].c_str());
+ }
+ if (ret == 0) {
+ ++stat.success_count;
+ }
+ return ret;
+}
+
+struct hs_longrun_thread_hs : public hs_longrun_thread_base {
+ hs_longrun_thread_hs(const arg_type& arg)
+ : hs_longrun_thread_base(arg) { }
+ void run();
+ int check_hs_error(const char *mess, record_value *rec);
+ int op_insert(record_value& rec);
+ int op_delete(record_value& rec);
+ int op_update(record_value& rec);
+ int op_read(record_value& rec);
+ int op_readnolock(int k);
+ hstcpcli_ptr cli;
+ socket_args sockargs;
+};
+
+struct lock_guard : noncopyable {
+ lock_guard(mutex& mtx) : mtx(mtx) {
+ mtx.lock();
+ }
+ ~lock_guard() {
+ mtx.unlock();
+ }
+ mutex& mtx;
+};
+
+string_ref
+to_string_ref(const std::string& s)
+{
+ return string_ref(s.data(), s.size());
+}
+
+std::string
+to_string(const string_ref& s)
+{
+ return std::string(s.begin(), s.size());
+}
+
+void
+hs_longrun_thread_hs::run()
+{
+ config c = arg.sh.conf;
+ if (arg.op == 'R' || arg.op == 'N') {
+ c["port"] = to_stdstring(arg.sh.conf.get_int("hsport", 9998));
+ } else {
+ c["port"] = to_stdstring(arg.sh.conf.get_int("hsport_wr", 9999));
+ }
+ sockargs.set(c);
+
+ while (arg.sh.running) {
+ if (cli.get() == 0 || !cli->stable_point()) {
+ cli = hstcpcli_i::create(sockargs);
+ if (check_hs_error("connect", 0) != 0) {
+ cli.reset();
+ continue;
+ }
+ cli->request_buf_open_index(0, "hstestdb", "hstesttbl", "PRIMARY",
+ "k,v1,v2,v3", "k,v1,v2,v3");
+ cli->request_send();
+ if (check_hs_error("openindex_send", 0) != 0) {
+ cli.reset();
+ continue;
+ }
+ size_t num_flds = 0;
+ cli->response_recv(num_flds);
+ if (check_hs_error("openindex_recv", 0) != 0) {
+ cli.reset();
+ continue;
+ }
+ cli->response_buf_remove();
+ }
+ const size_t rec_id = rand_record();
+ if (arg.lock_flag) {
+ record_value& rec = *arg.sh.records[rec_id];
+ lock_guard g(rec.lock);
+ int e = 0;
+ switch (arg.op) {
+ case 'I':
+ e = op_insert(rec);
+ break;
+ case 'D':
+ e = op_delete(rec);
+ break;
+ case 'U':
+ e = op_update(rec);
+ break;
+ case 'R':
+ e = op_read(rec);
+ break;
+ default:
+ break;
+ }
+ } else {
+ int e = 0;
+ switch (arg.op) {
+ case 'N':
+ e = op_readnolock(rec_id);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+}
+
+int
+hs_longrun_thread_hs::op_insert(record_value& rec)
+{
+ const std::string k = rec.key;
+ const std::string v1 = "iv1_" + k + "_" + to_stdstring(arg.id);
+ const std::string v2 = "iv2_" + k + "_" + to_stdstring(arg.id);
+ const std::string v3 = "iv3_" + k + "_" + to_stdstring(arg.id);
+ const string_ref op_ref("+", 1);
+ const string_ref op_args[4] = {
+ to_string_ref(k),
+ to_string_ref(v1),
+ to_string_ref(v2),
+ to_string_ref(v3)
+ };
+ cli->request_buf_exec_generic(0, op_ref, op_args, 4, 1, 0,
+ string_ref(), 0, 0, 0, 0);
+ cli->request_send();
+ if (check_hs_error("op_insert_send", &rec) != 0) { return 1; }
+ size_t numflds = 0;
+ cli->response_recv(numflds);
+ if (arg.sh.verbose > 10) {
+ const string_ref *row = cli->get_next_row();
+ fprintf(stderr, "HS op=+ errrcode=%d errmess=[%s]\n", cli->get_error_code(),
+ row ? to_string(row[0]).c_str() : "");
+ }
+ const bool op_success = cli->get_error_code() == 0;
+ int ret = 0;
+ if (!rec.unknown_state) {
+ if (rec.deleted && !op_success) {
+ ++stat.verify_error_count;
+ if (arg.sh.verbose > 0) {
+ fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
+ "unexpected_insert_failure\n",
+ arg.worker_type.c_str(), arg.id, k.c_str());
+ }
+ ret = 1;
+ } else if (!rec.deleted && op_success) {
+ ++stat.verify_error_count;
+ if (arg.sh.verbose > 0) {
+ fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
+ "unexpected_insert_success\n",
+ arg.worker_type.c_str(), arg.id, k.c_str());
+ }
+ ret = 1;
+ }
+ } else {
+ ++stat.unknown_count;
+ }
+ if (op_success) {
+ rec.values.resize(4);
+ rec.values[0] = k;
+ rec.values[1] = v1;
+ rec.values[2] = v2;
+ rec.values[3] = v3;
+ rec.deleted = false;
+ if (arg.sh.verbose >= 100 && ret == 0) {
+ fprintf(stderr, "HS_INSERT %s %s %s %s\n", k.c_str(), v1.c_str(),
+ v2.c_str(), v3.c_str());
+ }
+ if (ret == 0 && !rec.unknown_state) {
+ ++stat.success_count;
+ }
+ rec.unknown_state = false;
+ }
+ cli->response_buf_remove();
+ return ret;
+}
+
+int
+hs_longrun_thread_hs::op_delete(record_value& rec)
+{
+ const std::string k = rec.key;
+ const string_ref op_ref("=", 1);
+ const string_ref op_args[1] = {
+ to_string_ref(k),
+ };
+ const string_ref modop_ref("D", 1);
+ cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
+ modop_ref, 0, 0, 0, 0);
+ cli->request_send();
+ if (check_hs_error("op_delete_send", &rec) != 0) { return 1; }
+ size_t numflds = 0;
+ cli->response_recv(numflds);
+ if (check_hs_error("op_delete_recv", &rec) != 0) { return 1; }
+ const string_ref *row = cli->get_next_row();
+ const bool op_success = (numflds > 0 && row != 0 &&
+ to_string(row[0]) == "1");
+ int ret = 0;
+ if (!rec.unknown_state) {
+ if (!rec.deleted && !op_success) {
+ ++stat.verify_error_count;
+ if (arg.sh.verbose > 0) {
+ fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
+ "unexpected_delete_failure\n",
+ arg.worker_type.c_str(), arg.id, k.c_str());
+ }
+ ret = 1;
+ } else if (rec.deleted && op_success) {
+ ++stat.verify_error_count;
+ if (arg.sh.verbose > 0) {
+ fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s "
+ "unexpected_delete_success\n",
+ arg.worker_type.c_str(), arg.id, k.c_str());
+ }
+ ret = 1;
+ }
+ }
+ cli->response_buf_remove();
+ if (op_success) {
+ rec.deleted = true;
+ if (ret == 0 && !rec.unknown_state) {
+ ++stat.success_count;
+ }
+ rec.unknown_state = false;
+ }
+ if (arg.sh.verbose >= 100 && ret == 0) {
+ fprintf(stderr, "HS_DELETE %s\n", k.c_str());
+ }
+ return ret;
+}
+
+int
+hs_longrun_thread_hs::op_update(record_value& rec)
+{
+ const std::string k = rec.key;
+ const std::string v1 = "uv1_" + k + "_" + to_stdstring(arg.id);
+ const std::string v2 = "uv2_" + k + "_" + to_stdstring(arg.id);
+ const std::string v3 = "uv3_" + k + "_" + to_stdstring(arg.id);
+ const string_ref op_ref("=", 1);
+ const string_ref op_args[1] = {
+ to_string_ref(k),
+ };
+ const string_ref modop_ref("U", 1);
+ const string_ref modop_args[4] = {
+ to_string_ref(k),
+ to_string_ref(v1),
+ to_string_ref(v2),
+ to_string_ref(v3)
+ };
+ cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
+ modop_ref, modop_args, 4, 0, 0);
+ cli->request_send();
+ if (check_hs_error("op_update_send", &rec) != 0) { return 1; }
+ size_t numflds = 0;
+ cli->response_recv(numflds);
+ if (check_hs_error("op_update_recv", &rec) != 0) { return 1; }
+ const string_ref *row = cli->get_next_row();
+ uint32_t num_rows = row
+ ? atoi_uint32_nocheck(row[0].begin(), row[0].end()) : 0;
+ cli->response_buf_remove();
+ const bool cur_unknown_state = (num_rows == 1);
+ return verify_update(k, v1, v2, v3, rec, num_rows, cur_unknown_state);
+}
+
+int
+hs_longrun_thread_hs::op_read(record_value& rec)
+{
+ const std::string k = rec.key;
+ const string_ref op_ref("=", 1);
+ const string_ref op_args[1] = {
+ to_string_ref(k),
+ };
+ cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
+ string_ref(), 0, 0, 0, 0);
+ cli->request_send();
+ if (check_hs_error("op_read_send", 0) != 0) { return 1; }
+ size_t num_flds = 0;
+ size_t num_rows = 0;
+ cli->response_recv(num_flds);
+ if (check_hs_error("op_read_recv", 0) != 0) { return 1; }
+ const string_ref *row = cli->get_next_row();
+ std::string rrec[4];
+ if (row != 0 && num_flds == 4) {
+ for (int i = 0; i < 4; ++i) {
+ rrec[i] = to_string(row[i]);
+ }
+ ++num_rows;
+ }
+ row = cli->get_next_row();
+ if (row != 0) {
+ ++num_rows;
+ }
+ cli->response_buf_remove();
+ return verify_read(k, num_rows, num_flds, rrec, rec);
+}
+
+int
+hs_longrun_thread_hs::op_readnolock(int key)
+{
+ const std::string k = to_stdstring(key);
+ const string_ref op_ref("=", 1);
+ const string_ref op_args[1] = {
+ to_string_ref(k),
+ };
+ cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0,
+ string_ref(), 0, 0, 0, 0);
+ cli->request_send();
+ if (check_hs_error("op_read_send", 0) != 0) { return 1; }
+ size_t num_flds = 0;
+ size_t num_rows = 0;
+ cli->response_recv(num_flds);
+ if (check_hs_error("op_read_recv", 0) != 0) { return 1; }
+ const string_ref *row = cli->get_next_row();
+ std::string rrec[4];
+ if (row != 0 && num_flds == 4) {
+ for (int i = 0; i < 4; ++i) {
+ rrec[i] = to_string(row[i]);
+ }
+ ++num_rows;
+ }
+ row = cli->get_next_row();
+ if (row != 0) {
+ ++num_rows;
+ }
+ cli->response_buf_remove();
+ return verify_readnolock(k, num_rows, num_flds, rrec);
+}
+
+int
+hs_longrun_thread_hs::check_hs_error(const char *mess, record_value *rec)
+{
+ const int err = cli->get_error_code();
+ if (err == 0) {
+ return 0;
+ }
+ ++stat.runtime_error_count;
+ if (arg.sh.verbose > 0) {
+ const std::string estr = cli->get_error();
+ fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d %s: %d %s\n",
+ arg.op, arg.id, mess, err, estr.c_str());
+ }
+ if (rec) {
+ rec->unknown_state = true;
+ }
+ return 1;
+}
+
+struct hs_longrun_thread_my : public hs_longrun_thread_base {
+ hs_longrun_thread_my(const arg_type& arg)
+ : hs_longrun_thread_base(arg), connected(false) { }
+ void run();
+ void show_mysql_error(const char *mess, record_value *rec);
+ int op_insert(record_value& rec);
+ int op_delete(record_value& rec);
+ int op_update(record_value& rec);
+ int op_delins(record_value& rec);
+ int op_read(record_value& rec);
+ auto_mysql db;
+ bool connected;
+};
+
+void
+hs_longrun_thread_my::run()
+{
+ const std::string mysql_host = arg.sh.conf.get_str("host", "localhost");
+ const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root");
+ const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", "");
+ const std::string mysql_dbname = "hstestdb";
+
+ while (arg.sh.running) {
+ if (!connected) {
+ if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(),
+ mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) {
+ show_mysql_error("mysql_real_connect", 0);
+ continue;
+ }
+ }
+ connected = true;
+ const size_t rec_id = rand_record();
+ record_value& rec = *arg.sh.records[rec_id];
+ lock_guard g(rec.lock);
+ int e = 0;
+ switch (arg.op) {
+ #if 0
+ case 'I':
+ e = op_insert(rec);
+ break;
+ case 'D':
+ e = op_delete(rec);
+ break;
+ case 'U':
+ e = op_update(rec);
+ break;
+ #endif
+ case 'T':
+ e = op_delins(rec);
+ break;
+ case 'R':
+ e = op_read(rec);
+ break;
+ default:
+ break;
+ }
+ }
+}
+
+int
+hs_longrun_thread_my::op_delins(record_value& rec)
+{
+ const std::string k = rec.key;
+ const std::string v1 = "div1_" + k + "_" + to_stdstring(arg.id);
+ const std::string v2 = "div2_" + k + "_" + to_stdstring(arg.id);
+ const std::string v3 = "div3_" + k + "_" + to_stdstring(arg.id);
+ int success = 0;
+ bool cur_unknown_state = false;
+ do {
+ char query[1024];
+ #if 1
+ if (mysql_query(db, "begin") != 0) {
+ if (arg.sh.verbose >= 20) {
+ fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "begin");
+ }
+ break;
+ }
+ #endif
+ cur_unknown_state = true;
+ snprintf(query, 1024,
+ "delete from hstesttbl where k = '%s'", k.c_str());
+ if (mysql_query(db, query) != 0) {
+ if (arg.sh.verbose >= 20) {
+ fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
+ }
+ break;
+ }
+ if (mysql_affected_rows(db) != 1) {
+ if (arg.sh.verbose >= 20) {
+ fprintf(stderr, "mysql: notfound: [%s]\n", query);
+ }
+ break;
+ }
+ snprintf(query, 1024,
+ "insert into hstesttbl values ('%s', '%s', '%s', '%s')",
+ k.c_str(), v1.c_str(), v2.c_str(), v3.c_str());
+ if (mysql_query(db, query) != 0) {
+ if (arg.sh.verbose >= 20) {
+ fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
+ }
+ break;
+ }
+ #if 1
+ if (mysql_query(db, "commit") != 0) {
+ if (arg.sh.verbose >= 20) {
+ fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "commit");
+ }
+ break;
+ }
+ #endif
+ success = true;
+ cur_unknown_state = false;
+ } while (false);
+ return verify_update(k, v1, v2, v3, rec, (success != 0), cur_unknown_state);
+}
+
+int
+hs_longrun_thread_my::op_read(record_value& rec)
+{
+ const std::string k = rec.key;
+ char query[1024] = { 0 };
+ const int len = snprintf(query, 1024,
+ "select k,v1,v2,v3 from hstesttbl where k='%s'", k.c_str());
+ const int r = mysql_real_query(db, query, len > 0 ? len : 0);
+ if (r != 0) {
+ show_mysql_error(query, 0);
+ return 1;
+ }
+ MYSQL_ROW row = 0;
+ unsigned long *lengths = 0;
+ unsigned int num_rows = 0;
+ unsigned int num_flds = 0;
+ auto_mysql_res res(db);
+ std::string rrec[4];
+ if (res != 0) {
+ num_flds = mysql_num_fields(res);
+ row = mysql_fetch_row(res);
+ if (row != 0) {
+ lengths = mysql_fetch_lengths(res);
+ if (num_flds == 4) {
+ for (int i = 0; i < 4; ++i) {
+ rrec[i] = std::string(row[i], lengths[i]);
+ }
+ }
+ ++num_rows;
+ row = mysql_fetch_row(res);
+ if (row != 0) {
+ ++num_rows;
+ }
+ }
+ }
+ return verify_read(k, num_rows, num_flds, rrec, rec);
+}
+
+void
+hs_longrun_thread_my::show_mysql_error(const char *mess, record_value *rec)
+{
+ ++stat.runtime_error_count;
+ if (arg.sh.verbose > 0) {
+ fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d [%s]: %s\n",
+ arg.op, arg.id, mess, mysql_error(db));
+ }
+ if (rec) {
+ rec->unknown_state = true;
+ }
+ db.reset();
+ connected = false;
+}
+
+void
+mysql_do(MYSQL *db, const char *query)
+{
+ if (mysql_real_query(db, query, strlen(query)) != 0) {
+ fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query);
+ fatal_abort("mysql_do");
+ }
+}
+
+void
+hs_longrun_init_table(const config& conf, int num_prepare,
+ hs_longrun_shared& shared)
+{
+ const std::string mysql_host = conf.get_str("host", "localhost");
+ const std::string mysql_user = conf.get_str("mysqluser", "root");
+ const std::string mysql_passwd = conf.get_str("mysqlpass", "");
+ const std::string mysql_dbname = "";
+ auto_mysql db;
+ if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(),
+ mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) {
+ fprintf(stderr, "mysql: error=[%s]\n", mysql_error(db));
+ fatal_abort("hs_longrun_init_table");
+ }
+ mysql_do(db, "drop database if exists hstestdb");
+ mysql_do(db, "create database hstestdb");
+ mysql_do(db, "use hstestdb");
+ mysql_do(db,
+ "create table hstesttbl ("
+ "k int primary key,"
+ "v1 varchar(32) not null,"
+ "v2 varchar(32) not null,"
+ "v3 varchar(32) not null"
+ ") character set utf8 collate utf8_bin engine = innodb");
+ for (int i = 0; i < num_prepare; ++i) {
+ const std::string i_str = to_stdstring(i);
+ const std::string v1 = "pv1_" + i_str;
+ const std::string v2 = "pv2_" + i_str;
+ const std::string v3 = "pv3_" + i_str;
+ char buf[1024];
+ snprintf(buf, 1024, "insert into hstesttbl(k, v1, v2, v3) values"
+ "(%d, '%s', '%s', '%s')", i, v1.c_str(), v2.c_str(), v3.c_str());
+ mysql_do(db, buf);
+ record_value *rec = shared.records[i];
+ rec->key = i_str;
+ rec->values.resize(4);
+ rec->values[0] = i_str;
+ rec->values[1] = v1;
+ rec->values[2] = v2;
+ rec->values[3] = v3;
+ rec->deleted = false;
+ }
+}
+
+int
+hs_longrun_main(int argc, char **argv)
+{
+ hs_longrun_shared shared;
+ parse_args(argc, argv, shared.conf);
+ shared.conf["host"] = shared.conf.get_str("host", "localhost");
+ shared.verbose = shared.conf.get_int("verbose", 1);
+ const int table_size = shared.conf.get_int("table_size", 10000);
+ for (int i = 0; i < table_size; ++i) {
+ std::auto_ptr<record_value> rec(new record_value());
+ rec->key = to_stdstring(i);
+ shared.records.push_back_ptr(rec);
+ }
+ mysql_library_init(0, 0, 0);
+ const int duration = shared.conf.get_int("duration", 10);
+ const int num_hsinsert = shared.conf.get_int("num_hsinsert", 10);
+ const int num_hsdelete = shared.conf.get_int("num_hsdelete", 10);
+ const int num_hsupdate = shared.conf.get_int("num_hsupdate", 10);
+ const int num_hsread = shared.conf.get_int("num_hsread", 10);
+ const int num_myread = shared.conf.get_int("num_myread", 10);
+ const int num_mydelins = shared.conf.get_int("num_mydelins", 10);
+ int num_hsreadnolock = shared.conf.get_int("num_hsreadnolock", 10);
+ const bool always_filled = (num_hsinsert == 0 && num_hsdelete == 0);
+ if (!always_filled) {
+ num_hsreadnolock = 0;
+ }
+ hs_longrun_init_table(shared.conf, always_filled ? table_size : 0,
+ shared);
+ /* create worker threads */
+ static const struct thrtmpl_type {
+ const char *type; char op; int num; int hs; int lock;
+ } thrtmpl[] = {
+ { "hsinsert", 'I', num_hsinsert, 1, 1 },
+ { "hsdelete", 'D', num_hsdelete, 1, 1 },
+ { "hsupdate", 'U', num_hsupdate, 1, 1 },
+ { "hsread", 'R', num_hsread, 1, 1 },
+ { "hsreadnolock", 'N', num_hsreadnolock, 1, 0 },
+ { "myread", 'R', num_myread, 0, 1 },
+ { "mydelins", 'T', num_mydelins, 0, 1 },
+ };
+ typedef auto_ptrcontainer< std::vector<hs_longrun_thread_base *> > thrs_type;
+ thrs_type thrs;
+ for (size_t i = 0; i < sizeof(thrtmpl)/sizeof(thrtmpl[0]); ++i) {
+ const thrtmpl_type& e = thrtmpl[i];
+ for (int j = 0; j < e.num; ++j) {
+ int id = thrs.size();
+ const hs_longrun_thread_hs::arg_type arg(id, e.type, e.op, e.lock,
+ shared);
+ std::auto_ptr<hs_longrun_thread_base> thr;
+ if (e.hs) {
+ thr.reset(new hs_longrun_thread_hs(arg));
+ } else {
+ thr.reset(new hs_longrun_thread_my(arg));
+ }
+ thrs.push_back_ptr(thr);
+ }
+ }
+ shared.num_threads = thrs.size();
+ /* start threads */
+ fprintf(stderr, "START\n");
+ shared.running = 1;
+ for (size_t i = 0; i < thrs.size(); ++i) {
+ thrs[i]->start();
+ }
+ /* wait */
+ sleep(duration);
+ /* stop thread */
+ shared.running = 0;
+ for (size_t i = 0; i < thrs.size(); ++i) {
+ thrs[i]->join();
+ }
+ fprintf(stderr, "DONE\n");
+ /* summary */
+ typedef std::map<std::string, hs_longrun_stat> stat_map;
+ stat_map sm;
+ for (size_t i = 0; i < thrs.size(); ++i) {
+ hs_longrun_thread_base *const thr = thrs[i];
+ const std::string wt = thr->arg.worker_type;
+ hs_longrun_stat& v = sm[wt];
+ v.add(thr->stat);
+ }
+ hs_longrun_stat total;
+ for (stat_map::const_iterator i = sm.begin(); i != sm.end(); ++i) {
+ if (i->second.verify_error_count != 0) {
+ fprintf(stderr, "%s verify_error %llu\n", i->first.c_str(),
+ i->second.verify_error_count);
+ }
+ if (i->second.runtime_error_count) {
+ fprintf(stderr, "%s runtime_error %llu\n", i->first.c_str(),
+ i->second.runtime_error_count);
+ }
+ if (i->second.unknown_count) {
+ fprintf(stderr, "%s unknown %llu\n", i->first.c_str(),
+ i->second.unknown_count);
+ }
+ fprintf(stderr, "%s success %llu\n", i->first.c_str(),
+ i->second.success_count);
+ total.add(i->second);
+ }
+ if (total.verify_error_count != 0) {
+ fprintf(stderr, "TOTAL verify_error %llu\n", total.verify_error_count);
+ }
+ if (total.runtime_error_count != 0) {
+ fprintf(stderr, "TOTAL runtime_error %llu\n", total.runtime_error_count);
+ }
+ if (total.unknown_count != 0) {
+ fprintf(stderr, "TOTAL unknown %llu\n", total.unknown_count);
+ }
+ fprintf(stderr, "TOTAL success %llu\n", total.success_count);
+ mysql_library_end();
+ return 0;
+}
+
+};
+
+int
+main(int argc, char **argv)
+{
+ return dena::hs_longrun_main(argc, argv);
+}
+
diff --git a/plugin/handler_socket/client/hspool_test.pl b/plugin/handler_socket/client/hspool_test.pl
new file mode 100755
index 00000000..03227e31
--- /dev/null
+++ b/plugin/handler_socket/client/hspool_test.pl
@@ -0,0 +1,224 @@
+#!/usr/bin/env perl
+
+use strict;
+use warnings;
+use DB::HandlerSocket::Pool;
+use DBI;
+
+my %conf = ();
+for my $i (@ARGV) {
+ my ($k, $v) = split(/=/, $i);
+ $conf{$k} = $v;
+}
+
+my $verbose = get_conf("verbose", 0);
+my $actions_str = get_conf("actions",
+ "create,insert,verify,verify2,verify3,verify4,clean");
+my $tablesize = get_conf("tablesize", 1000);
+my $db = get_conf("db", "hstestdb");
+my $table = get_conf("table", "testtbl");
+my $table_schema = get_conf("table_schema", undef);
+my $engine = get_conf("engine", "innodb");
+my $host = get_conf("host", "localhost");
+my $mysqlport = get_conf("mysqlport", 3306);
+my $hsport_rd = get_conf("hsport_rd", 9998);
+my $hsport_wr = get_conf("hsport_wr", 9999);
+my $loop = get_conf("loop", 10000);
+my $op = get_conf("op", "=");
+my $ssps = get_conf("ssps", 0);
+my $num_moreflds = get_conf("moreflds", 0);
+my $moreflds_prefix = get_conf("moreflds_prefix", "f");
+my $mysql_user = 'root';
+my $mysql_password = '';
+
+my $dsn = "DBI:MariaDB:database=;host=$host;port=$mysqlport"
+ . ";mariadb_server_prepare=$ssps";
+my $dbh = DBI->connect($dsn, $mysql_user, $mysql_password,
+ { RaiseError => 1 });
+my $hsargs = { 'host' => $host, 'port' => $hsport_rd };
+my $hspool = new DB::HandlerSocket::Pool({
+ hostmap => {
+ "$db.$table" => {
+ host => $host,
+ port => $hsport_rd,
+ },
+ },
+ resolve => undef,
+ error => undef,
+});
+$table_schema = "(k int primary key, fc30 varchar(30), ft text)"
+ if (!defined($table_schema));
+
+my @actions = split(/,/, $actions_str);
+for my $action (@actions) {
+ print "ACTION: $action\n";
+ eval "hstest_$action()";
+ if ($@) {
+ die $@;
+ }
+ print "ACTION: $action DONE\n";
+}
+
+sub get_conf {
+ my ($key, $def) = @_;
+ my $val = $conf{$key};
+ if ($val) {
+ print "$key=$val\n";
+ } else {
+ $val = $def;
+ my $defstr = $def || "(undef)";
+ print "$key=$defstr(default)\n";
+ }
+ return $val;
+}
+
+sub hstest_create {
+ $dbh->do("drop database if exists $db");
+ $dbh->do("create database $db");
+ $dbh->do("use $db");
+ $dbh->do("create table $table $table_schema engine=$engine");
+}
+
+sub hstest_dump {
+ $dbh->do("use $db");
+ my $sth = $dbh->prepare("select * from $table");
+ $sth->execute();
+ my $arr = $sth->fetchall_arrayref();
+ for my $rec (@$arr) {
+ print "REC:";
+ for my $row (@$rec) {
+ print " $row";
+ }
+ print "\n";
+ }
+}
+
+sub hstest_insert {
+ $dbh->do("use $db");
+ my $sth = $dbh->prepare("insert into $table values (?, ?, ?)");
+ for (my $k = 0; $k < $tablesize; ++$k) {
+ my $fc30 = "fc30_$k";
+ my $ft = "ft_$k";
+ $sth->execute($k, $fc30, $ft);
+ }
+}
+
+sub hstest_verify {
+ $dbh->do("use $db");
+ my $sth = $dbh->prepare("select * from $table order by k");
+ $sth->execute();
+ my $arr = $sth->fetchall_arrayref();
+ my $hsres = $hspool->index_find($db, $table, "PRIMARY", "k,fc30,ft",
+ ">=", [ 0 ], $tablesize, 0);
+ for (my $i = 0; $i < $tablesize; ++$i) {
+ my $rec = $arr->[$i];
+ my $differ = 0;
+ print "REC:" if $verbose;
+ for (my $j = 0; $j < 3; ++$j) {
+ my $fld = $rec->[$j];
+ my $hsidx = $i * 3 + $j;
+ my $hsfld = $hsres->[$hsidx];
+ if ($hsfld ne $fld) {
+ $differ = 1;
+ }
+ if ($differ) {
+ print " $fld:$hsfld" if $verbose;
+ } else {
+ print " $hsfld" if $verbose;
+ }
+ }
+ print "\n" if $verbose;
+ if ($differ) {
+ die "verification failed";
+ }
+ }
+}
+
+sub hstest_verify2 {
+ $dbh->do("use $db");
+ my $sth = $dbh->prepare("select * from $table order by k");
+ $sth->execute();
+ my $arr = $sth->fetchall_arrayref();
+ my $hsresa = $hspool->index_find_multi($db, $table, "PRIMARY",
+ "k,fc30,ft", [ [ -1, ">=", [ 0 ], $tablesize, 0 ] ]);
+ my $hsres = $hsresa->[0];
+ for (my $i = 0; $i < $tablesize; ++$i) {
+ my $rec = $arr->[$i];
+ my $differ = 0;
+ print "REC:" if $verbose;
+ for (my $j = 0; $j < 3; ++$j) {
+ my $fld = $rec->[$j];
+ my $hsidx = $i * 3 + $j;
+ my $hsfld = $hsres->[$hsidx];
+ if ($hsfld ne $fld) {
+ $differ = 1;
+ }
+ if ($differ) {
+ print " $fld:$hsfld" if $verbose;
+ } else {
+ print " $hsfld" if $verbose;
+ }
+ }
+ print "\n" if $verbose;
+ if ($differ) {
+ die "verification failed";
+ }
+ }
+}
+
+sub hashref_to_str {
+ my $href = $_[0];
+ my $r = '';
+ for my $k (sort keys %$href) {
+ my $v = $href->{$k};
+ $r .= "($k=>$v)";
+ }
+ return $r;
+}
+
+sub hstest_verify3 {
+ $dbh->do("use $db");
+ my $sth = $dbh->prepare("select * from $table order by k");
+ $sth->execute();
+ my $hsres_t = $hspool->index_find($db, $table, "PRIMARY", "k,fc30,ft",
+ ">=", [ 0 ], $tablesize, 0);
+ my $hsres = DB::HandlerSocket::Pool::result_single_to_hasharr(
+ [ 'k', 'fc30', 'ft' ], $hsres_t);
+ for (my $i = 0; $i < $tablesize; ++$i) {
+ my $mystr = hashref_to_str($sth->fetchrow_hashref());
+ my $hsstr = hashref_to_str($hsres->[$i]);
+ if ($mystr ne $hsstr) {
+ print "DIFF my=[$mystr] hs=[$hsstr]\n" if $verbose;
+ die "verification failed";
+ } else {
+ print "OK $hsstr\n" if $verbose;
+ }
+ }
+}
+
+sub hstest_verify4 {
+ $dbh->do("use $db");
+ my $sth = $dbh->prepare("select * from $table order by k");
+ $sth->execute();
+ my $hsres_t = $hspool->index_find($db, $table, "PRIMARY", "k,fc30,ft",
+ ">=", [ 0 ], $tablesize, 0);
+ my $hsres = DB::HandlerSocket::Pool::result_single_to_hashhash(
+ [ 'k', 'fc30', 'ft' ], 'k', $hsres_t);
+ my $rechash = $sth->fetchall_hashref('k');
+ while (my ($k, $href) = each (%$rechash)) {
+ my $mystr = hashref_to_str($href);
+ my $hsstr = hashref_to_str($hsres->{$k});
+ if ($mystr ne $hsstr) {
+ print "DIFF my=[$mystr] hs=[$hsstr]\n" if $verbose;
+ die "verification failed";
+ } else {
+ print "OK $hsstr\n" if $verbose;
+ }
+ }
+}
+
+sub hstest_clean {
+ $hspool->clear_pool();
+ $dbh->do("drop database if exists $db");
+}
+
diff --git a/plugin/handler_socket/client/hstest.cpp b/plugin/handler_socket/client/hstest.cpp
new file mode 100644
index 00000000..b5551fed
--- /dev/null
+++ b/plugin/handler_socket/client/hstest.cpp
@@ -0,0 +1,1532 @@
+
+// vim:sw=2:ai
+
+#include <signal.h>
+#include <sys/time.h>
+#include <stdio.h>
+#include <string.h>
+#include <vector>
+#include <stdlib.h>
+#include <memory>
+#include <errno.h>
+#include <mysql.h>
+#include <time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include "util.hpp"
+#include "auto_ptrcontainer.hpp"
+#include "socket.hpp"
+#include "thread.hpp"
+#include "hstcpcli.hpp"
+
+#if __GNUC__ >= 4
+long atomic_exchange_and_add(volatile long *valp, long c)
+{
+ return __sync_fetch_and_add(valp, c);
+}
+#else
+#include <bits/atomicity.h>
+using namespace __gnu_cxx;
+long atomic_exchange_and_add(volatile long *valp, long c)
+{
+ return __exchange_and_add((volatile _Atomic_word *)valp, c);
+}
+#endif
+
+namespace dena {
+
+struct auto_mysql : private noncopyable {
+ auto_mysql() : db(0) {
+ reset();
+ }
+ ~auto_mysql() {
+ if (db) {
+ mysql_close(db);
+ }
+ }
+ void reset() {
+ if (db) {
+ mysql_close(db);
+ }
+ if ((db = mysql_init(0)) == 0) {
+ fatal_abort("failed to initialize mysql client");
+ }
+ }
+ operator MYSQL *() const { return db; }
+ private:
+ MYSQL *db;
+};
+
+struct auto_mysql_res : private noncopyable {
+ auto_mysql_res(MYSQL *db) {
+ res = mysql_store_result(db);
+ }
+ ~auto_mysql_res() {
+ if (res) {
+ mysql_free_result(res);
+ }
+ }
+ operator MYSQL_RES *() const { return res; }
+ private:
+ MYSQL_RES *res;
+};
+
+struct auto_mysql_stmt : private noncopyable {
+ auto_mysql_stmt(MYSQL *db) {
+ stmt = mysql_stmt_init(db);
+ }
+ ~auto_mysql_stmt() {
+ if (stmt) {
+ mysql_stmt_close(stmt);
+ }
+ }
+ operator MYSQL_STMT *() const { return stmt; }
+ private:
+ MYSQL_STMT *stmt;
+};
+
+namespace {
+
+double
+gettimeofday_double()
+{
+ struct timeval tv;
+ if (gettimeofday(&tv, 0) != 0) {
+ fatal_abort("gettimeofday");
+ }
+ return static_cast<double>(tv.tv_usec) / 1000000 + tv.tv_sec;
+}
+
+// unused
+void
+wait_close(int fd)
+{
+ char buf[1024];
+ while (true) {
+ int r = read(fd, buf, sizeof(buf));
+ if (r <= 0) {
+ break;
+ }
+ }
+}
+
+// unused
+void
+gentle_close(int fd)
+{
+ int r = shutdown(fd, SHUT_WR);
+ if (r != 0) {
+ return;
+ }
+ wait_close(fd);
+}
+
+};
+
+struct hstest_shared {
+ config conf;
+ socket_args arg;
+ int verbose;
+ size_t loop;
+ size_t pipe;
+ char op;
+ long num_threads;
+ mutable volatile long count;
+ mutable volatile long conn_count;
+ long wait_conn;
+ volatile char *keygen;
+ long keygen_size;
+ mutable volatile int enable_timing;
+ int usleep;
+ int dump;
+ hstest_shared() : verbose(0), loop(0), pipe(0), op('G'), num_threads(0),
+ count(0), conn_count(0), wait_conn(0), keygen(0), keygen_size(0),
+ enable_timing(0), usleep(0), dump(0) { }
+ void increment_count(unsigned int c = 1) const volatile {
+ atomic_exchange_and_add(&count, c);
+ }
+ void increment_conn(unsigned int c) const volatile {
+ atomic_exchange_and_add(&conn_count, c);
+ while (wait_conn != 0 && conn_count < wait_conn) {
+ sleep(1);
+ }
+ // fprintf(stderr, "wait_conn=%ld done\n", wait_conn);
+ }
+};
+
+struct hstest_thread {
+ struct arg_type {
+ size_t id;
+ const hstest_shared& sh;
+ bool watch_flag;
+ arg_type(size_t i, const hstest_shared& s, bool w)
+ : id(i), sh(s), watch_flag(w) { }
+ };
+ hstest_thread(const arg_type& a) : arg(a), io_success_count(0),
+ op_success_count(0), response_min(99999), response_max(0),
+ response_sum(0), response_avg(0) { }
+ void operator ()();
+ void test_1();
+ void test_2_3(int test_num);
+ void test_4_5(int test_num);
+ void test_6(int test_num);
+ void test_7(int test_num);
+ void test_8(int test_num);
+ void test_9(int test_num);
+ void test_10(int test_num);
+ void test_11(int test_num);
+ void test_12(int test_num);
+ void test_21(int test_num);
+ void test_22(int test_num);
+ void test_watch();
+ void sleep_if();
+ void set_timing(double time_spent);
+ arg_type arg;
+ auto_file fd;
+ size_t io_success_count;
+ size_t op_success_count;
+ double response_min, response_max, response_sum, response_avg;
+};
+
+void
+hstest_thread::test_1()
+{
+ char buf[1024];
+ unsigned int seed = arg.id;
+ seed ^= arg.sh.conf.get_int("seed_xor", 0);
+ std::string err;
+ if (socket_connect(fd, arg.sh.arg, err) != 0) {
+ fprintf(stderr, "connect: %d %s\n", errno, strerror(errno));
+ return;
+ }
+ const char op = arg.sh.op;
+ const int tablesize = arg.sh.conf.get_int("tablesize", 0);
+ for (size_t i = 0; i < arg.sh.loop; ++i) {
+ for (size_t j = 0; j < arg.sh.pipe; ++j) {
+ int k = 0, v = 0, len = 0;
+ if (op == 'G') {
+ k = rand_r(&seed);
+ v = rand_r(&seed); /* unused */
+ if (tablesize != 0) {
+ k &= tablesize;
+ }
+ len = snprintf(buf, sizeof(buf), "%c\tk%d\n", op, k);
+ } else {
+ k = rand_r(&seed);
+ v = rand_r(&seed);
+ if (tablesize != 0) {
+ k &= tablesize;
+ }
+ len = snprintf(buf, sizeof(buf), "%c\tk%d\tv%d\n", op, k, v);
+ }
+ const int wlen = write(fd.get(), buf, len);
+ if (wlen != len) {
+ return;
+ }
+ }
+ size_t read_cnt = 0;
+ size_t read_pos = 0;
+ while (read_cnt < arg.sh.pipe) {
+ const int rlen = read(fd.get(), buf + read_pos, sizeof(buf) - read_pos);
+ if (rlen <= 0) {
+ return;
+ }
+ read_pos += rlen;
+ while (true) {
+ const char *const p = static_cast<const char *>(memchr(buf, '\n',
+ read_pos));
+ if (p == 0) {
+ break;
+ }
+ ++read_cnt;
+ ++io_success_count;
+ arg.sh.increment_count();
+ if (p != buf && buf[0] == '=') {
+ ++op_success_count;
+ }
+ const size_t rest_size = buf + read_pos - (p + 1);
+ if (rest_size != 0) {
+ memmove(buf, p + 1, rest_size);
+ }
+ read_pos = rest_size;
+ }
+ }
+ }
+}
+
+void
+hstest_thread::test_2_3(int test_num)
+{
+#if 0
+ char buf_k[128], buf_v[128];
+ unsigned int seed = arg.id;
+ op_base_t op = static_cast<op_base_t>(arg.sh.op);
+ micli_ptr hnd;
+ if (test_num == 2) {
+ hnd = micli_i::create_remote(arg.sh.conf);
+ } else if (test_num == 3) {
+ // hnd = micli_i::create_inproc(arg.sh.localdb);
+ }
+ if (hnd.get() == 0) {
+ return;
+ }
+ for (size_t i = 0; i < arg.sh.loop; ++i) {
+ for (size_t j = 0; j < arg.sh.pipe; ++j) {
+ int k = 0, v = 0, klen = 0, vlen = 0;
+ k = rand_r(&seed);
+ klen = snprintf(buf_k, sizeof(buf_k), "k%d", k);
+ v = rand_r(&seed); /* unused */
+ vlen = snprintf(buf_v, sizeof(buf_v), "v%d", v);
+ string_ref arr[2];
+ arr[0] = string_ref(buf_k, klen);
+ arr[1] = string_ref(buf_v, vlen);
+ pstrarr_ptr rec(arr, 2);
+ if (hnd->execute(op, 0, 0, rec.get_const())) {
+ ++io_success_count;
+ arg.sh.increment_count();
+ const dataset& res = hnd->get_result_ref();
+ if (res.size() == 1) {
+ ++op_success_count;
+ }
+ }
+ }
+ }
+#endif
+}
+
+void
+hstest_thread::test_4_5(int test_num)
+{
+#if 0
+ char buf_k[128], buf_v[8192];
+ memset(buf_v, ' ', sizeof(buf_v));
+ unsigned int seed = arg.id;
+ op_base_t op = static_cast<op_base_t>(arg.sh.op);
+ micli_ptr hnd;
+ if (test_num == 4) {
+ hnd = micli_i::create_remote(arg.sh.conf);
+ } else if (test_num == 5) {
+ hnd = micli_i::create_inproc(arg.sh.localdb);
+ }
+ if (hnd.get() == 0) {
+ return;
+ }
+ for (size_t i = 0; i < arg.sh.loop; ++i) {
+ for (size_t j = 0; j < arg.sh.pipe; ++j) {
+ int k = 0, klen = 0, vlen = 0;
+ k = i & 0x0000ffffUL;
+ if (k == 0) {
+ fprintf(stderr, "k=0\n");
+ }
+ klen = snprintf(buf_k, sizeof(buf_k), "k%d", k);
+ vlen = rand_r(&seed) % 8192;
+ string_ref arr[2];
+ arr[0] = string_ref(buf_k, klen);
+ arr[1] = string_ref(buf_v, vlen);
+ pstrarr_ptr rec(arr, 2);
+ if (hnd->execute(op, 0, 0, rec.get_const())) {
+ ++io_success_count;
+ const dataset& res = hnd->get_result_ref();
+ if (res.size() == 1) {
+ ++op_success_count;
+ }
+ }
+ }
+ }
+#endif
+}
+
+void
+hstest_thread::test_6(int test_num)
+{
+ int count = arg.sh.conf.get_int("count", 1);
+ auto_file fds[count];
+ for (int i = 0; i < count; ++i) {
+ const double t1 = gettimeofday_double();
+ std::string err;
+ if (socket_connect(fds[i], arg.sh.arg, err) != 0) {
+ fprintf(stderr, "id=%zu i=%d err=%s\n", arg.id, i, err.c_str());
+ }
+ const double t2 = gettimeofday_double();
+ if (t2 - t1 > 1) {
+ fprintf(stderr, "id=%zu i=%d time %f\n", arg.id, i, t2 - t1);
+ }
+ }
+}
+
+void
+hstest_thread::test_7(int num)
+{
+ /*
+ set foo 0 0 10
+ 0123456789
+ STORED
+ get foo
+ VALUE foo 0 10
+ 0123456789
+ END
+ get var
+ END
+ */
+ char buf[1024];
+ const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
+ unsigned int seed = arg.id;
+ seed ^= arg.sh.conf.get_int("seed_xor", 0);
+ const int tablesize = arg.sh.conf.get_int("tablesize", 0);
+ const char op = arg.sh.op;
+ for (size_t i = 0; i < arg.sh.loop; ++i) {
+ const double tm1 = gettimeofday_double();
+ std::string err;
+ if (fd.get() < 0 && socket_connect(fd, arg.sh.arg, err) != 0) {
+ fprintf(stderr, "connect: %d %s\n", errno, strerror(errno));
+ return;
+ }
+ for (size_t j = 0; j < arg.sh.pipe; ++j) {
+ int k = 0, v = 0, len = 0;
+ if (op == 'G') {
+ k = rand_r(&seed);
+ v = rand_r(&seed); /* unused */
+ if (tablesize != 0) {
+ k &= tablesize;
+ }
+ len = snprintf(buf, sizeof(buf), "get k%d\r\n", k);
+ } else {
+ k = rand_r(&seed);
+ v = rand_r(&seed);
+ if (tablesize != 0) {
+ k &= tablesize;
+ }
+ char vbuf[1024];
+ int vlen = snprintf(vbuf, sizeof(vbuf),
+ "v%d"
+ // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
+ // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
+ // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
+ // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
+ // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
+ , v);
+ len = snprintf(buf, sizeof(buf), "set k%d 0 0 %d\r\n%s\r\n",
+ k, vlen, vbuf);
+ }
+ const int wlen = write(fd.get(), buf, len);
+ if (wlen != len) {
+ return;
+ }
+ }
+ size_t read_cnt = 0;
+ size_t read_pos = 0;
+ bool read_response_done = false;
+ bool expect_value = false;
+ while (!read_response_done) {
+ const int rlen = read(fd.get(), buf + read_pos, sizeof(buf) - read_pos);
+ if (rlen <= 0) {
+ return;
+ }
+ read_pos += rlen;
+ while (true) {
+ const char *const p = static_cast<const char *>(memchr(buf, '\n',
+ read_pos));
+ if (p == 0) {
+ break;
+ }
+ ++read_cnt;
+ if (expect_value) {
+ expect_value = false;
+ } else if (p >= buf + 6 && memcmp(buf, "VALUE ", 6) == 0) {
+ expect_value = true;
+ ++op_success_count;
+ } else {
+ if (p == buf + 7 && memcmp(buf, "STORED\r", 7) == 0) {
+ ++op_success_count;
+ }
+ read_response_done = true;
+ }
+ const size_t rest_size = buf + read_pos - (p + 1);
+ if (rest_size != 0) {
+ memmove(buf, p + 1, rest_size);
+ }
+ read_pos = rest_size;
+ }
+ ++io_success_count;
+ }
+ arg.sh.increment_count();
+ if (!keep_connection) {
+ fd.close();
+ }
+ const double tm2 = gettimeofday_double();
+ set_timing(tm2 - tm1);
+ sleep_if();
+ }
+}
+
+struct rec {
+ std::string key;
+ std::string value;
+};
+
+void
+hstest_thread::test_8(int test_num)
+{
+#if 0
+ char buf_k[128], buf_v[128];
+ unsigned int seed = arg.id;
+ // op_base_t op = static_cast<op_base_t>(arg.sh.op);
+ using namespace boost::multi_index;
+ typedef member<rec, std::string, &rec::key> rec_get_key;
+ typedef ordered_unique<rec_get_key> oui;
+ typedef multi_index_container< rec, indexed_by<oui> > mic;
+ #if 0
+ typedef std::map<std::string, std::string> m_type;
+ m_type m;
+ #endif
+ mic m;
+ for (size_t i = 0; i < arg.sh.loop; ++i) {
+ for (size_t j = 0; j < arg.sh.pipe; ++j) {
+ int k = 0, v = 0, klen = 0, vlen = 0;
+ k = rand_r(&seed);
+ klen = snprintf(buf_k, sizeof(buf_k), "k%d", k);
+ v = rand_r(&seed); /* unused */
+ vlen = snprintf(buf_v, sizeof(buf_v), "v%d", v);
+ const std::string ks(buf_k, klen);
+ const std::string vs(buf_v, vlen);
+ rec r;
+ r.key = ks;
+ r.value = vs;
+ m.insert(r);
+ // m.insert(std::make_pair(ks, vs));
+ ++io_success_count;
+ ++op_success_count;
+ arg.sh.increment_count();
+ }
+ }
+#endif
+}
+
+struct mysqltest_thread_initobj : private noncopyable {
+ mysqltest_thread_initobj() {
+ mysql_thread_init();
+ }
+ ~mysqltest_thread_initobj() {
+ mysql_thread_end();
+ }
+};
+
+void
+hstest_thread::test_9(int test_num)
+{
+ /* create table hstest
+ * ( k varchar(255) not null, v varchar(255) not null, primary key(k))
+ * engine = innodb; */
+ auto_mysql db;
+ // mysqltest_thread_initobj initobj;
+ std::string err;
+ const char op = arg.sh.op;
+ const std::string suffix = arg.sh.conf.get_str("value_suffix", "upd");
+ unsigned long long err_cnt = 0;
+ unsigned long long query_cnt = 0;
+ #if 0
+ my_bool reconnect = 0;
+ if (mysql_options(db, MYSQL_OPT_RECONNECT, &reconnect) != 0) {
+ err = "mysql_options() failed";
+ ++err_cnt;
+ return;
+ }
+ #endif
+ unsigned int seed = time(0) + arg.id + 1;
+ seed ^= arg.sh.conf.get_int("seed_xor", 0);
+ drand48_data randbuf;
+ srand48_r(seed, &randbuf);
+ const std::string mysql_host = arg.sh.conf.get_str("host", "localhost");
+ const int mysql_port = arg.sh.conf.get_int("mysqlport", 3306);
+ const int num = arg.sh.loop;
+ const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root");
+ const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", "");
+ const std::string mysql_dbname = arg.sh.conf.get_str("dbname", "hstest");
+ const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
+ const int verbose = arg.sh.conf.get_int("verbose", 1);
+ const int tablesize = arg.sh.conf.get_int("tablesize", 10000);
+ const int moreflds = arg.sh.conf.get_int("moreflds", 0);
+ const std::string moreflds_prefix = arg.sh.conf.get_str(
+ "moreflds_prefix", "column0123456789_");
+ const int use_handler = arg.sh.conf.get_int("handler", 0);
+ const int sched_flag = arg.sh.conf.get_int("sched", 0);
+ const int use_in = arg.sh.conf.get_int("in", 0);
+ const int ssps = use_in ? 0 : arg.sh.conf.get_int("ssps", 0);
+ std::string flds = "v";
+ for (int i = 0; i < moreflds; ++i) {
+ char buf[1024];
+ snprintf(buf, sizeof(buf), ",%s%d", moreflds_prefix.c_str(), i);
+ flds += std::string(buf);
+ }
+ int connected = 0;
+ std::auto_ptr<auto_mysql_stmt> stmt;
+ string_buffer wbuf;
+ for (int i = 0; i < num; ++i) {
+ const double tm1 = gettimeofday_double();
+ const int flags = 0;
+ if (connected == 0) {
+ if (!mysql_real_connect(db, mysql_host.c_str(),
+ mysql_user.c_str(), mysql_user.empty() ? 0 : mysql_passwd.c_str(),
+ mysql_dbname.c_str(), mysql_port, 0, flags)) {
+ err = "failed to connect: " + std::string(mysql_error(db));
+ if (verbose >= 1) {
+ fprintf(stderr, "e=[%s]\n", err.c_str());
+ }
+ ++err_cnt;
+ return;
+ }
+ arg.sh.increment_conn(1);
+ }
+ int r = 0;
+ if (connected == 0 && use_handler) {
+ const char *const q = "handler hstest_table1 open";
+ r = mysql_real_query(db, q, strlen(q));
+ if (r != 0) {
+ err = 1;
+ }
+ }
+ if (connected == 0 && ssps) {
+ stmt.reset(new auto_mysql_stmt(db));
+ const char *const q = "select v from hstest_table1 where k = ?";
+ r = mysql_stmt_prepare(*stmt, q, strlen(q));
+ if (r != 0) {
+ fprintf(stderr, "ssps err\n");
+ ++err_cnt;
+ return;
+ }
+ }
+ connected = 1;
+ std::string result_str;
+ unsigned int err = 0;
+ unsigned int num_flds = 0, num_affected_rows = 0;
+ int got_data = 0;
+ char buf_query[16384];
+ int buf_query_len = 0;
+ int k = 0, v = 0;
+ {
+ double kf = 0, vf = 0;
+ drand48_r(&randbuf, &kf);
+ drand48_r(&randbuf, &vf);
+ k = int(kf * tablesize);
+ v = int(vf * tablesize);
+ #if 0
+ k = rand_r(&seed);
+ v = rand_r(&seed);
+ if (tablesize != 0) {
+ k %= tablesize;
+ }
+ #endif
+ if (op == 'G') {
+ if (use_handler) {
+ buf_query_len = snprintf(buf_query, sizeof(buf_query),
+ "handler hstest_table1 read `primary` = ( '%d' )", k);
+ // TODO: moreflds
+ } else if (ssps) {
+ //
+ } else if (use_in) {
+ wbuf.clear();
+ char *p = wbuf.make_space(1024);
+ int len = snprintf(p, 1024, "select %s from hstest_table1 where k in ('%d'", flds.c_str(), k);
+ wbuf.space_wrote(len);
+ for (int j = 1; j < use_in; ++j) {
+ /* generate more key */
+ drand48_r(&randbuf, &kf);
+ k = int(kf * tablesize);
+ p = wbuf.make_space(1024);
+ int len = snprintf(p, 1024, ", '%d'", k);
+ wbuf.space_wrote(len);
+ }
+ wbuf.append_literal(")");
+ } else {
+ buf_query_len = snprintf(buf_query, sizeof(buf_query),
+ "select %s from hstest_table1 where k = '%d'", flds.c_str(), k);
+ }
+ } else if (op == 'U') {
+ buf_query_len = snprintf(buf_query, sizeof(buf_query),
+ "update hstest_table1 set v = '%d_%d%s' where k = '%d'",
+ v, k, suffix.c_str(), k);
+ } else if (op == 'R') {
+ buf_query_len = snprintf(buf_query, sizeof(buf_query),
+ "replace into hstest_table1 values ('%d', 'v%d')", k, v);
+ // TODO: moreflds
+ }
+ }
+ if (r == 0) {
+ if (ssps) {
+ MYSQL_BIND bind[1] = { };
+ bind[0].buffer_type = MYSQL_TYPE_LONG;
+ bind[0].buffer = (char *)&k;
+ bind[0].is_null = 0;
+ bind[0].length = 0;
+ if (mysql_stmt_bind_param(*stmt, bind)) {
+ fprintf(stderr, "err: %s\n", mysql_stmt_error(*stmt));
+ ++err_cnt;
+ return;
+ }
+ r = mysql_stmt_execute(*stmt);
+ // fprintf(stderr, "stmt exec\n");
+ } else if (use_in) {
+ r = mysql_real_query(db, wbuf.begin(), wbuf.size());
+ } else {
+ r = mysql_real_query(db, buf_query, buf_query_len);
+ // fprintf(stderr, "real query\n");
+ }
+ ++query_cnt;
+ }
+ if (r != 0) {
+ err = 1;
+ } else if (ssps) {
+ if (verbose >= 0) {
+ char resbuf[1024];
+ unsigned long res_len = 0;
+ MYSQL_BIND bind[1] = { };
+ bind[0].buffer_type = MYSQL_TYPE_STRING;
+ bind[0].buffer = resbuf;
+ bind[0].buffer_length = sizeof(resbuf);
+ bind[0].length = &res_len;
+ if (mysql_stmt_bind_result(*stmt, bind)) {
+ fprintf(stderr, "err: %s\n", mysql_stmt_error(*stmt));
+ ++err_cnt;
+ return;
+ }
+ if (mysql_stmt_fetch(*stmt)) {
+ fprintf(stderr, "err: %s\n", mysql_stmt_error(*stmt));
+ ++err_cnt;
+ return;
+ }
+ if (!result_str.empty()) {
+ result_str += " ";
+ }
+ result_str += std::string(resbuf, res_len);
+ // fprintf(stderr, "SSPS RES: %s\n", result_str.c_str());
+ got_data = 1;
+ } else {
+ got_data = 1;
+ }
+ } else {
+ auto_mysql_res res(db);
+ if (res != 0) {
+ if (verbose >= 0) {
+ num_flds = mysql_num_fields(res);
+ MYSQL_ROW row = 0;
+ while ((row = mysql_fetch_row(res)) != 0) {
+ got_data += 1;
+ unsigned long *const lengths = mysql_fetch_lengths(res);
+ if (verbose >= 2) {
+ for (unsigned int i = 0; i < num_flds; ++i) {
+ if (!result_str.empty()) {
+ result_str += " ";
+ }
+ result_str += std::string(row[i], lengths[i]);
+ }
+ }
+ }
+ } else {
+ MYSQL_ROW row = 0;
+ while ((row = mysql_fetch_row(res)) != 0) {
+ got_data += 1;
+ }
+ }
+ } else {
+ if (mysql_field_count(db) == 0) {
+ num_affected_rows = mysql_affected_rows(db);
+ } else {
+ err = 1;
+ }
+ }
+ }
+ if (verbose >= 2 || (verbose >= 1 && err != 0)) {
+ if (err) {
+ ++err_cnt;
+ const char *const errstr = mysql_error(db);
+ fprintf(stderr, "e=[%s] a=%u q=[%s]\n", errstr,
+ num_affected_rows, buf_query);
+ } else {
+ fprintf(stderr, "a=%u q=[%s] r=[%s]\n", num_affected_rows, buf_query,
+ result_str.c_str());
+ }
+ }
+ if (err == 0) {
+ ++io_success_count;
+ if (num_affected_rows > 0 || got_data > 0) {
+ op_success_count += got_data;
+ } else {
+ if (verbose >= 1) {
+ fprintf(stderr, "k=%d numaff=%u gotdata=%d\n",
+ k, num_affected_rows, got_data);
+ }
+ }
+ arg.sh.increment_count();
+ }
+ if (!keep_connection) {
+ if (stmt.get() != 0) {
+ stmt.reset();
+ }
+ db.reset();
+ connected = 0;
+ }
+ const double tm2 = gettimeofday_double();
+ set_timing(tm2 - tm1);
+ sleep_if();
+ if (sched_flag) {
+ sched_yield();
+ }
+ }
+ if (verbose >= 1) {
+ fprintf(stderr, "thread finished (error_count=%llu)\n", err_cnt);
+ }
+}
+
+void
+hstest_thread::test_10(int test_num)
+{
+ const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
+ unsigned int seed = time(0) + arg.id + 1;
+ seed ^= arg.sh.conf.get_int("seed_xor", 0);
+ drand48_data randbuf;
+ srand48_r(seed, &randbuf);
+ std::string err;
+ int keepconn_count = 0;
+ const char op = arg.sh.op;
+ const int verbose = arg.sh.conf.get_int("verbose", 1);
+ const std::string suffix = arg.sh.conf.get_str("value_suffix", "upd");
+ const int tablesize = arg.sh.conf.get_int("tablesize", 10000);
+ const int firstkey = arg.sh.conf.get_int("firstkey", 0);
+ const int sched_flag = arg.sh.conf.get_int("sched", 0);
+ const int moreflds = arg.sh.conf.get_int("moreflds", 0);
+ const std::string dbname = arg.sh.conf.get_str("dbname", "hstest");
+ const std::string table = arg.sh.conf.get_str("table", "hstest_table1");
+ const std::string index = arg.sh.conf.get_str("index", "PRIMARY");
+ const std::string field = arg.sh.conf.get_str("field", "v");
+ const int use_in = arg.sh.conf.get_int("in", 0);
+ const std::string moreflds_prefix = arg.sh.conf.get_str(
+ "moreflds_prefix", "column0123456789_");
+ const int dump = arg.sh.dump;
+ const int nodup = arg.sh.conf.get_int("nodup", 0);
+ std::string moreflds_str;
+ for (int i = 0; i < moreflds; ++i) {
+ char sbuf[1024];
+ snprintf(sbuf, sizeof(sbuf), ",%s%d", moreflds_prefix.c_str(), i);
+ moreflds_str += std::string(sbuf);
+ }
+ string_buffer wbuf;
+ char rbuf[16384];
+ for (size_t i = 0; i < arg.sh.loop; ++i) {
+ int len = 0, rlen = 0, wlen = 0;
+ #if 0
+ const double tm1 = gettimeofday_double();
+ #endif
+ if (fd.get() < 0) {
+ if (socket_connect(fd, arg.sh.arg, err) != 0) {
+ fprintf(stderr, "connect: %d %s\n", errno, strerror(errno));
+ return;
+ }
+ char *wp = wbuf.make_space(1024);
+ len = snprintf(wp, 1024,
+ "P\t1\t%s\t%s\tPRIMARY\t%s%s\n", dbname.c_str(), table.c_str(),
+ field.c_str(), moreflds_str.c_str());
+ /* pst_num, db, table, index, retflds */
+ wbuf.space_wrote(len);
+ wlen = write(fd.get(), wbuf.begin(), len);
+ if (len != wlen) {
+ fprintf(stderr, "write: %d %d\n", len, wlen);
+ return;
+ }
+ wbuf.clear();
+ rlen = read(fd.get(), rbuf, sizeof(rbuf));
+ if (rlen <= 0 || rbuf[rlen - 1] != '\n') {
+ fprintf(stderr, "read: rlen=%d errno=%d\n", rlen, errno);
+ return;
+ }
+ if (rbuf[0] != '0') {
+ fprintf(stderr, "failed to open table\n");
+ return;
+ }
+ arg.sh.increment_conn(1);
+ }
+ const double tm1 = gettimeofday_double();
+ for (size_t j = 0; j < arg.sh.pipe; ++j) {
+ int k = 0, v = 0;
+ {
+ while (true) {
+ double kf = 0, vf = 0;
+ drand48_r(&randbuf, &kf);
+ drand48_r(&randbuf, &vf);
+ k = int(kf * tablesize) + firstkey;
+ v = int(vf * tablesize) + firstkey;
+ if (k - firstkey < arg.sh.keygen_size) {
+ volatile char *const ptr = arg.sh.keygen + (k - firstkey);
+ // int oldv = __sync_fetch_and_or(ptr, 1);
+ int oldv = *ptr;
+ *ptr += 1;
+ if (nodup && oldv != 0) {
+ if (dump) {
+ fprintf(stderr, "retry\n");
+ }
+ continue;
+ }
+ } else {
+ if (nodup) {
+ if (dump) {
+ fprintf(stderr, "retry2\n");
+ }
+ continue;
+ }
+ }
+ size_t len = 0;
+ if (op == 'G') {
+ if (use_in) {
+ char *wp = wbuf.make_space(1024);
+ len = snprintf(wp, 1024, "1\t=\t1\t\t%d\t0\t@\t0\t%d\t%d",
+ use_in, use_in, k);
+ wbuf.space_wrote(len);
+ for (int j = 1; j < use_in; ++j) {
+ drand48_r(&randbuf, &kf);
+ k = int(kf * tablesize) + firstkey;
+ char *wp = wbuf.make_space(1024);
+ len = snprintf(wp, 1024, "\t%d", k);
+ wbuf.space_wrote(len);
+ }
+ wbuf.append_literal("\n");
+ } else {
+ char *wp = wbuf.make_space(1024);
+ len = snprintf(wp, 1024, "1\t=\t1\t%d\n", k);
+ wbuf.space_wrote(len);
+ }
+ } else if (op == 'U') {
+ char *wp = wbuf.make_space(1024);
+ len = snprintf(wp, 1024,
+ "1\t=\t1\t%d\t1\t0\tU\t%d_%d%s\n", k, v, k, suffix.c_str());
+ wbuf.space_wrote(len);
+ }
+ break;
+ }
+ }
+ }
+ wlen = write(fd.get(), wbuf.begin(), wbuf.size());
+ if ((size_t) wlen != wbuf.size()) {
+ fprintf(stderr, "write: %d %d\n", (int)wbuf.size(), wlen);
+ return;
+ }
+ wbuf.clear();
+ size_t read_cnt = 0;
+ size_t read_pos = 0;
+ while (read_cnt < arg.sh.pipe) {
+ rlen = read(fd.get(), rbuf + read_pos, sizeof(rbuf) - read_pos);
+ if (rlen <= 0) {
+ fprintf(stderr, "read: %d\n", rlen);
+ return;
+ }
+ read_pos += rlen;
+ while (true) {
+ const char *const nl = static_cast<const char *>(memchr(rbuf, '\n',
+ read_pos));
+ if (nl == 0) {
+ break;
+ }
+ ++read_cnt;
+ ++io_success_count;
+ const char *t1 = static_cast<const char *>(memchr(rbuf, '\t',
+ nl - rbuf));
+ if (t1 == 0) {
+ fprintf(stderr, "error \n");
+ break;
+ }
+ ++t1;
+ const char *t2 = static_cast<const char *>(memchr(t1, '\t',
+ nl - t1));
+ if (t2 == 0) {
+ if (verbose > 1) {
+ fprintf(stderr, "key: notfound \n");
+ }
+ break;
+ }
+ ++t2;
+ if (t1 == rbuf + 2 && rbuf[0] == '0') {
+ if (op == 'G') {
+ ++op_success_count;
+ arg.sh.increment_count();
+ } else if (op == 'U') {
+ const char *t3 = t2;
+ while (t3 != nl && t3[0] >= 0x10) {
+ ++t3;
+ }
+ if (t3 != t2 + 1 || t2[0] != '1') {
+ const std::string mess(t2, t3);
+ fprintf(stderr, "mod: %s\n", mess.c_str());
+ } else {
+ ++op_success_count;
+ arg.sh.increment_count();
+ if (arg.sh.dump && arg.sh.pipe == 1) {
+ fwrite(wbuf.begin(), wbuf.size(), 1, stderr);
+ }
+ }
+ }
+ } else {
+ const char *t3 = t2;
+ while (t3 != nl && t3[0] >= 0x10) {
+ ++t3;
+ }
+ const std::string mess(t2, t3);
+ fprintf(stderr, "err: %s\n", mess.c_str());
+ }
+ const size_t rest_size = rbuf + read_pos - (nl + 1);
+ if (rest_size != 0) {
+ memmove(rbuf, nl + 1, rest_size);
+ }
+ read_pos = rest_size;
+ }
+ }
+ if (!keep_connection) {
+ fd.reset();
+ arg.sh.increment_conn(-1);
+ } else if (keep_connection > 1 && ++keepconn_count > keep_connection) {
+ keepconn_count = 0;
+ fd.reset();
+ arg.sh.increment_conn(-1);
+ }
+ const double tm2 = gettimeofday_double();
+ set_timing(tm2 - tm1);
+ sleep_if();
+ if (sched_flag) {
+ sched_yield();
+ }
+ }
+ if (dump) {
+ fprintf(stderr, "done\n");
+ }
+}
+
+void
+hstest_thread::sleep_if()
+{
+ if (arg.sh.usleep) {
+ struct timespec ts = {
+ arg.sh.usleep / 1000000,
+ (arg.sh.usleep % 1000000) * 1000
+ };
+ nanosleep(&ts, 0);
+ }
+}
+
+void
+hstest_thread::set_timing(double time_spent)
+{
+ response_min = std::min(response_min, time_spent);
+ response_max = std::max(response_max, time_spent);
+ response_sum += time_spent;
+ if (op_success_count != 0) {
+ response_avg = response_sum / op_success_count;
+ }
+}
+
+void
+hstest_thread::test_11(int test_num)
+{
+ const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
+ const int tablesize = arg.sh.conf.get_int("tablesize", 0);
+ unsigned int seed = arg.id;
+ seed ^= arg.sh.conf.get_int("seed_xor", 0);
+ std::string err;
+ hstcpcli_ptr cli;
+ for (size_t i = 0; i < arg.sh.loop; ++i) {
+ if (cli.get() == 0) {
+ cli = hstcpcli_i::create(arg.sh.arg);
+ cli->request_buf_open_index(0, "hstest", "hstest_table1", "", "v");
+ /* pst_num, db, table, index, retflds */
+ if (cli->request_send() != 0) {
+ fprintf(stderr, "reuqest_send: %s\n", cli->get_error().c_str());
+ return;
+ }
+ size_t num_flds = 0;
+ if (cli->response_recv(num_flds) != 0) {
+ fprintf(stderr, "reuqest_recv: %s\n", cli->get_error().c_str());
+ return;
+ }
+ cli->response_buf_remove();
+ }
+ for (size_t j = 0; j < arg.sh.pipe; ++j) {
+ char buf[256];
+ int k = 0, v = 0, len = 0;
+ {
+ k = rand_r(&seed);
+ v = rand_r(&seed); /* unused */
+ if (tablesize != 0) {
+ k &= tablesize;
+ }
+ len = snprintf(buf, sizeof(buf), "%d", k);
+ }
+ const string_ref key(buf, len);
+ const string_ref op("=", 1);
+ cli->request_buf_exec_generic(0, op, &key, 1, 1, 0, string_ref(), 0, 0);
+ }
+ if (cli->request_send() != 0) {
+ fprintf(stderr, "reuqest_send: %s\n", cli->get_error().c_str());
+ return;
+ }
+ size_t read_cnt = 0;
+ for (size_t j = 0; j < arg.sh.pipe; ++j) {
+ size_t num_flds = 0;
+ if (cli->response_recv(num_flds) != 0) {
+ fprintf(stderr, "reuqest_recv: %s\n", cli->get_error().c_str());
+ return;
+ }
+ {
+ ++read_cnt;
+ ++io_success_count;
+ arg.sh.increment_count();
+ {
+ ++op_success_count;
+ }
+ }
+ cli->response_buf_remove();
+ }
+ if (!keep_connection) {
+ cli.reset();
+ }
+ }
+}
+
+void
+hstest_thread::test_watch()
+{
+ const int timelimit = arg.sh.conf.get_int("timelimit", 0);
+ const int timelimit_offset = timelimit / 2;
+ int loop = 0;
+ double t1 = 0, t2 = 0;
+ size_t cnt_t1 = 0, cnt_t2 = 0;
+ size_t prev_cnt = 0;
+ double now_f = 0;
+ while (true) {
+ sleep(1);
+ const size_t cnt = arg.sh.count;
+ const size_t df = cnt - prev_cnt;
+ prev_cnt = cnt;
+ const double now_prev = now_f;
+ now_f = gettimeofday_double();
+ if (now_prev != 0) {
+ const double rps = static_cast<double>(df) / (now_f - now_prev);
+ fprintf(stderr, "now: %zu cntdiff: %zu tdiff: %f rps: %f\n",
+ static_cast<size_t>(now_f), df, now_f - now_prev, rps);
+ }
+ if (timelimit != 0) {
+ if (arg.sh.wait_conn == 0 || arg.sh.conn_count >= arg.sh.wait_conn) {
+ ++loop;
+ }
+ if (loop == timelimit_offset) {
+ t1 = gettimeofday_double();
+ cnt_t1 = cnt;
+ arg.sh.enable_timing = 1;
+ fprintf(stderr, "start timing\n");
+ } else if (loop == timelimit_offset + timelimit) {
+ t2 = gettimeofday_double();
+ cnt_t2 = cnt;
+ const size_t cnt_diff = cnt_t2 - cnt_t1;
+ const double tdiff = t2 - t1;
+ const double qps = cnt_diff / (tdiff != 0 ? tdiff : 1);
+ fprintf(stderr, "(%f: %zu, %f: %zu), %10.5f qps\n",
+ t1, cnt_t1, t2, cnt_t2, qps);
+ size_t keycnt = 0;
+ for (int i = 0; i < arg.sh.keygen_size; ++i) {
+ if (arg.sh.keygen[i]) {
+ ++keycnt;
+ }
+ }
+ fprintf(stderr, "keygen=%zu\n", keycnt);
+ break;
+ }
+ }
+ }
+#if 0
+ int loop = 0;
+ double t1 = 0, t2 = 0;
+ size_t cnt_t1 = 0, cnt_t2 = 0;
+ size_t prev_cnt = 0;
+ while (true) {
+ sleep(1);
+ const size_t cnt = arg.sh.count;
+ const size_t df = cnt - prev_cnt;
+ prev_cnt = cnt;
+ const size_t now = time(0);
+ fprintf(stderr, "%zu %zu\n", now, df);
+ if (timelimit != 0) {
+ ++loop;
+ if (loop == timelimit_offset) {
+ t1 = gettimeofday_double();
+ cnt_t1 = cnt;
+ } else if (loop == timelimit_offset + timelimit) {
+ t2 = gettimeofday_double();
+ cnt_t2 = cnt;
+ const size_t cnt_diff = cnt_t2 - cnt_t1;
+ const double tdiff = t2 - t1;
+ const double qps = cnt_diff / (tdiff != 0 ? tdiff : 1);
+ fprintf(stderr, "(%f: %zu, %f: %zu), %10.5f qps\n",
+ t1, cnt_t1, t2, cnt_t2, qps);
+ size_t keycnt = 0;
+ for (int i = 0; i < arg.sh.keygen_size; ++i) {
+ if (arg.sh.keygen[i]) {
+ ++keycnt;
+ }
+ }
+ fprintf(stderr, "keygen=%zu\n", keycnt);
+ _exit(0);
+ }
+ }
+ }
+#endif
+}
+
+void
+hstest_thread::test_12(int test_num)
+{
+ /* NOTE: num_threads should be 1 */
+ /* create table hstest
+ * ( k varchar(255) not null, v varchar(255) not null, primary key(k))
+ * engine = innodb; */
+ mysqltest_thread_initobj initobj;
+ auto_mysql db;
+ std::string err;
+ unsigned long long err_cnt = 0;
+ unsigned long long query_cnt = 0;
+ #if 0
+ my_bool reconnect = 0;
+ if (mysql_options(db, MYSQL_OPT_RECONNECT, &reconnect) != 0) {
+ err = "mysql_options() failed";
+ ++err_cnt;
+ return;
+ }
+ #endif
+ const std::string mysql_host = arg.sh.conf.get_str("host", "localhost");
+ const int mysql_port = arg.sh.conf.get_int("mysqlport", 3306);
+ const unsigned int num = arg.sh.loop;
+ const size_t pipe = arg.sh.pipe;
+ const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root");
+ const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", "");
+ const std::string mysql_dbname = arg.sh.conf.get_str("db", "hstest");
+ const int keep_connection = arg.sh.conf.get_int("keep_connection", 1);
+ const int verbose = arg.sh.conf.get_int("verbose", 1);
+ const int use_handler = arg.sh.conf.get_int("handler", 0);
+ int connected = 0;
+ unsigned int k = 0;
+ string_buffer buf;
+ for (unsigned int i = 0; i < num; ++i) {
+ const int flags = 0;
+ if (connected == 0 && !mysql_real_connect(db, mysql_host.c_str(),
+ mysql_user.c_str(), mysql_user.empty() ? 0 : mysql_passwd.c_str(),
+ mysql_dbname.c_str(), mysql_port, 0, flags)) {
+ err = "failed to connect: " + std::string(mysql_error(db));
+ if (verbose >= 1) {
+ fprintf(stderr, "e=[%s]\n", err.c_str());
+ }
+ ++err_cnt;
+ return;
+ }
+ int r = 0;
+ if (connected == 0 && use_handler) {
+ const char *const q = "handler hstest open";
+ r = mysql_real_query(db, q, strlen(q));
+ if (r != 0) {
+ err = 1;
+ }
+ }
+ connected = 1;
+ std::string result_str;
+ unsigned int err = 0;
+ unsigned int num_flds = 0, num_affected_rows = 0;
+ int got_data = 0;
+ buf.clear();
+ buf.append_literal("insert into hstest values ");
+ for (size_t j = 0; j < pipe; ++j) {
+ const unsigned int v = ~k;
+ if (j != 0) {
+ buf.append_literal(",");
+ }
+ char *wp = buf.make_space(64);
+ int buf_query_len = snprintf(wp, 64, "('k%u', 'v%u')", k, v);
+ buf.space_wrote(buf_query_len);
+ ++k;
+ }
+ if (r == 0) {
+ r = mysql_real_query(db, buf.begin(), buf.size());
+ ++query_cnt;
+ }
+ if (r != 0) {
+ err = 1;
+ } else {
+ auto_mysql_res res(db);
+ if (res != 0) {
+ if (verbose >= 0) {
+ num_flds = mysql_num_fields(res);
+ MYSQL_ROW row = 0;
+ while ((row = mysql_fetch_row(res)) != 0) {
+ got_data = 1;
+ unsigned long *const lengths = mysql_fetch_lengths(res);
+ if (verbose >= 2) {
+ for (unsigned int i = 0; i < num_flds; ++i) {
+ if (!result_str.empty()) {
+ result_str += " ";
+ }
+ result_str += std::string(row[i], lengths[i]);
+ }
+ }
+ }
+ }
+ } else {
+ if (mysql_field_count(db) == 0) {
+ num_affected_rows = mysql_affected_rows(db);
+ } else {
+ err = 1;
+ }
+ }
+ }
+ if (verbose >= 2 || (verbose >= 1 && err != 0)) {
+ if (err) {
+ ++err_cnt;
+ const char *const errstr = mysql_error(db);
+ fprintf(stderr, "e=[%s] a=%u q=[%s]\n", errstr,
+ num_affected_rows, std::string(buf.begin(), buf.size()).c_str());
+ } else {
+ fprintf(stderr, "a=%u q=[%s] r=[%s]\n", num_affected_rows,
+ std::string(buf.begin(), buf.size()).c_str(),
+ result_str.c_str());
+ }
+ }
+ if (err == 0) {
+ ++io_success_count;
+ if (num_affected_rows > 0 || got_data > 0) {
+ ++op_success_count;
+ }
+ arg.sh.increment_count(pipe);
+ }
+ if (!keep_connection) {
+ db.reset();
+ connected = 0;
+ }
+ }
+ if (verbose >= 1) {
+ fprintf(stderr, "thread finished (error_count=%llu)\n", err_cnt);
+ }
+}
+
+void
+hstest_thread::test_21(int num)
+{
+ /* fsync test */
+ unsigned int id = arg.id;
+ std::string err;
+ #if 0
+ if (socket_connect(fd, arg.sh.arg, err) != 0) {
+ fprintf(stderr, "connect: %d %s\n", errno, strerror(errno));
+ return;
+ }
+ #endif
+ auto_file logfd;
+ char fname[1024];
+ snprintf(fname, sizeof(fname), "synctest_%u", id);
+ int open_flags = O_WRONLY | O_CREAT | O_TRUNC | O_APPEND;
+ logfd.reset(open(fname, open_flags, 0644));
+ if (logfd.get() < 0) {
+ fprintf(stderr, "open: %s: %d %s\n", fname, errno, strerror(errno));
+ return;
+ }
+ char buf[1024];
+ unsigned long long count = 0;
+ while (true) {
+ snprintf(buf, sizeof(buf), "%u %llu\n", id, count);
+ const size_t len = strlen(buf);
+ if (write(logfd.get(), buf, len) != (ssize_t)len) {
+ fprintf(stderr, "write: %s: %d %s\n", fname, errno, strerror(errno));
+ return;
+ }
+ #if 0
+ if (write(fd.get(), buf, len) != (ssize_t)len) {
+ fprintf(stderr, "write(sock): %d %s\n", errno, strerror(errno));
+ return;
+ }
+ #endif
+ if (fdatasync(logfd.get()) != 0) {
+ fprintf(stderr, "fsync: %s: %d %s\n", fname, errno, strerror(errno));
+ return;
+ }
+ ++count;
+ ++op_success_count;
+ arg.sh.increment_count();
+ }
+}
+
+void
+hstest_thread::test_22(int num)
+{
+ /* dd if=/dev/zero of=dummy.dat bs=1024M count=100 */
+ unsigned int id = arg.id;
+ std::string err;
+ auto_file filefd;
+ char fname[1024];
+ snprintf(fname, sizeof(fname), "dummy.dat");
+ int open_flags = O_RDONLY | O_DIRECT;
+ filefd.reset(open(fname, open_flags, 0644));
+ if (filefd.get() < 0) {
+ fprintf(stderr, "open: %s: %d %s\n", fname, errno, strerror(errno));
+ return;
+ }
+ char buf_x[4096 * 2];
+ char *const buf = (char *)(size_t(buf_x + 4096) / 4096 * 4096);
+ unsigned long long count = 0;
+ drand48_data randbuf;
+ unsigned long long seed = time(0);
+ seed *= 10;
+ seed += id;
+ srand48_r(seed, &randbuf);
+ for (unsigned int i = 0; i < arg.sh.loop; ++i) {
+ double kf = 0;
+ drand48_r(&randbuf, &kf);
+ kf *= (209715200 / 1);
+ // fprintf(stderr, "v=%f\n", kf);
+ off_t v = static_cast<off_t>(kf);
+ v %= (209715200 / 1);
+ v *= (512 * 1);
+ const double tm1 = gettimeofday_double();
+ const ssize_t r = pread(filefd.get(), buf, (512 * 1), v);
+ const double tm2 = gettimeofday_double();
+ if (r < 0) {
+ fprintf(stderr, "pread: %s: %d %s\n", fname, errno, strerror(errno));
+ return;
+ }
+ ++count;
+ ++op_success_count;
+ arg.sh.increment_count();
+ set_timing(tm2 - tm1);
+ }
+}
+
+void
+hstest_thread::operator ()()
+{
+ if (arg.watch_flag) {
+ return test_watch();
+ }
+ int test_num = arg.sh.conf.get_int("test", 1);
+ if (test_num == 1) {
+ test_1();
+ } else if (test_num == 2 || test_num == 3) {
+ test_2_3(test_num);
+ } else if (test_num == 4 || test_num == 5) {
+ test_4_5(test_num);
+ } else if (test_num == 6) {
+ test_6(test_num);
+ } else if (test_num == 7) {
+ test_7(test_num);
+ } else if (test_num == 8) {
+ test_8(test_num);
+ } else if (test_num == 9) {
+ test_9(test_num);
+ } else if (test_num == 10) {
+ test_10(test_num);
+ } else if (test_num == 11) {
+ test_11(test_num);
+ } else if (test_num == 12) {
+ test_12(test_num);
+ } else if (test_num == 21) {
+ test_21(test_num);
+ } else if (test_num == 22) {
+ test_22(test_num);
+ }
+ const int halt = arg.sh.conf.get_int("halt", 0);
+ if (halt) {
+ fprintf(stderr, "thread halted\n");
+ while (true) {
+ sleep(100000);
+ }
+ }
+ fprintf(stderr, "thread finished\n");
+}
+
+int
+hstest_main(int argc, char **argv)
+{
+ ignore_sigpipe();
+ hstest_shared shared;
+ parse_args(argc, argv, shared.conf);
+ shared.conf["port"] = shared.conf["hsport"];
+ shared.arg.set(shared.conf);
+ shared.loop = shared.conf.get_int("num", 1000);
+ shared.pipe = shared.conf.get_int("pipe", 1);
+ shared.verbose = shared.conf.get_int("verbose", 1);
+ const int tablesize = shared.conf.get_int("tablesize", 0);
+ std::vector<char> keygen(tablesize);
+ shared.keygen = &keygen[0];
+ shared.keygen_size = tablesize;
+ shared.usleep = shared.conf.get_int("usleep", 0);
+ shared.dump = shared.conf.get_int("dump", 0);
+ shared.num_threads = shared.conf.get_int("num_threads", 10);
+ shared.wait_conn = shared.conf.get_int("wait_conn", 0);
+ const std::string op = shared.conf.get_str("op", "G");
+ if (op.size() > 0) {
+ shared.op = op[0];
+ }
+ #if 0
+ const int localdb_flag = shared.conf.get_int("local", 0);
+ if (localdb_flag) {
+ shared.localdb = database_i::create(shared.conf);
+ }
+ #endif
+ const int num_thrs = shared.num_threads;
+ typedef thread<hstest_thread> thread_type;
+ typedef std::auto_ptr<thread_type> thread_ptr;
+ typedef auto_ptrcontainer< std::vector<thread_type *> > thrs_type;
+ thrs_type thrs;
+ for (int i = 0; i < num_thrs; ++i) {
+ const hstest_thread::arg_type arg(i, shared, false);
+ thread_ptr thr(new thread<hstest_thread>(arg));
+ thrs.push_back_ptr(thr);
+ }
+ for (size_t i = 0; i < thrs.size(); ++i) {
+ thrs[i]->start();
+ }
+ thread_ptr watch_thread;
+ const int timelimit = shared.conf.get_int("timelimit", 0);
+ {
+ const hstest_thread::arg_type arg(0, shared, true);
+ watch_thread = thread_ptr(new thread<hstest_thread>(arg));
+ watch_thread->start();
+ }
+ size_t iocnt = 0, opcnt = 0;
+ double respmin = 999999, respmax = 0;
+ double respsum = 0;
+ if (timelimit != 0) {
+ watch_thread->join();
+ }
+ for (size_t i = 0; i < thrs.size(); ++i) {
+ if (timelimit == 0) {
+ thrs[i]->join();
+ }
+ iocnt += (*thrs[i])->io_success_count;
+ opcnt += (*thrs[i])->op_success_count;
+ respmin = std::min(respmin, (*thrs[i])->response_min);
+ respmax = std::max(respmax, (*thrs[i])->response_max);
+ respsum += (*thrs[i])->response_sum;
+ }
+ fprintf(stderr, "io_success_count=%zu op_success_count=%zu\n", iocnt, opcnt);
+ fprintf(stderr, "respmin=%f respmax=%f respsum=%f respavg=%f\n",
+ respmin, respmax, respsum, respsum / opcnt);
+ size_t keycnt = 0;
+ for (size_t i = 0; i < keygen.size(); ++i) {
+ if (keygen[i]) {
+ ++keycnt;
+ }
+ }
+ fprintf(stderr, "keycnt=%zu\n", keycnt);
+ _exit(0);
+ return 0;
+}
+
+};
+
+int
+main(int argc, char **argv)
+{
+ return dena::hstest_main(argc, argv);
+}
+
diff --git a/plugin/handler_socket/client/hstest.pl b/plugin/handler_socket/client/hstest.pl
new file mode 100755
index 00000000..1363e153
--- /dev/null
+++ b/plugin/handler_socket/client/hstest.pl
@@ -0,0 +1,228 @@
+#!/usr/bin/env perl
+
+# vim:sw=8:ai:ts=8
+
+use strict;
+use warnings;
+
+use DBI;
+use Net::HandlerSocket;
+
+my %conf = ();
+for my $i (@ARGV) {
+ my ($k, $v) = split(/=/, $i);
+ $conf{$k} = $v;
+}
+
+my $verbose = get_conf("verbose", 0);
+my $actions_str = get_conf("actions", "hsread");
+my $tablesize = get_conf("tablesize", 10000);
+my $db = get_conf("db", "hstest");
+my $table = get_conf("table", "hstest_table1");
+my $engine = get_conf("engine", "innodb");
+my $host = get_conf("host", "localhost");
+my $mysqlport = get_conf("mysqlport", 3306);
+my $mysqluser = get_conf("mysqluser", "root");
+my $mysqlpass = get_conf("mysqlpass", "");
+my $hsport = get_conf("hsport", 9999);
+my $loop = get_conf("loop", 10000);
+my $op = get_conf("op", "=");
+my $ssps = get_conf("ssps", 0);
+my $num_moreflds = get_conf("moreflds", 0);
+my $moreflds_prefix = get_conf("moreflds_prefix", "column0123456789_");
+my $keytype = get_conf("keytype", "varchar(32)");
+my $file = get_conf("file", undef);
+
+my $dsn = "DBI:MariaDB:database=;host=$host;port=$mysqlport"
+ . ";mariadb_server_prepare=$ssps";
+my $dbh = DBI->connect($dsn, $mysqluser, $mysqlpass, { RaiseError => 1 });
+my $hsargs = { 'host' => $host, 'port' => $hsport };
+my $cli = new Net::HandlerSocket($hsargs);
+
+my @actions = split(/,/, $actions_str);
+for my $action (@actions) {
+ if ($action eq "table") {
+ print("TABLE $db.$table\n");
+ $dbh->do("drop database if exists $db");
+ $dbh->do("create database $db");
+ $dbh->do("use $db");
+ my $moreflds = get_createtbl_moreflds_str();
+ $dbh->do(
+ "create table $table (" .
+ "k $keytype primary key" .
+ ",v varchar(32) not null" .
+ $moreflds .
+ ") character set utf8 collate utf8_bin " .
+ "engine = $engine");
+ } elsif ($action eq "insert") {
+ print("INSERT $db.$table tablesize=$tablesize\n");
+ $dbh->do("use $db");
+ my $moreflds = get_insert_moreflds_str();
+ for (my $i = 0; $i < $tablesize; $i += 100) {
+ my $qstr = "insert into $db.$table values";
+ for (my $j = 0; $j < 100; ++$j) {
+ if ($j != 0) {
+ $qstr .= ",";
+ }
+ my $k = "" . ($i + $j);
+ my $v = "v" . int(rand(1000)) . ($i + $j);
+ $qstr .= "('$k', '$v'";
+ for (my $j = 0; $j < $num_moreflds; ++$j) {
+ $qstr .= ",'$j'";
+ }
+ $qstr .= ")";
+ }
+ $dbh->do($qstr);
+ print "$i/$tablesize\n" if $i % 1000 == 0;
+ }
+ } elsif ($action eq "read") {
+ print("READ $db.$table op=$op loop=$loop\n");
+ $dbh->do("use $db");
+ my $moreflds = get_select_moreflds_str();
+ my $sth = $dbh->prepare(
+ "select k,v$moreflds from $db.$table where k = ?");
+ for (my $i = 0; $i < $loop; ++$i) {
+ my $k = "" . int(rand($tablesize));
+ # print "k=$k\n";
+ $sth->execute($k);
+ if ($verbose >= 10) {
+ print "RET:";
+ while (my $ref = $sth->fetchrow_arrayref()) {
+ my $rk = $ref->[0];
+ my $rv = $ref->[1];
+ print " $rk $rv";
+ }
+ print "\n";
+ }
+ print "$i/$loop\n" if $i % 1000 == 0;
+ }
+ } elsif ($action eq "hsinsert") {
+ print("HSINSERT $db.$table tablesize=$tablesize\n");
+ $cli->open_index(1, $db, $table, '', 'k,v');
+ for (my $i = 0; $i < $tablesize; ++$i) {
+ my $k = "" . $i;
+ my $v = "v" . int(rand(1000)) . $i;
+ my $r = $cli->execute_insert(1, [ $k, $v ]);
+ if ($r->[0] != 0) {
+ die;
+ }
+ print "$i/$tablesize\n" if $i % 1000 == 0;
+ }
+ } elsif ($action eq "hsread") {
+ print("HSREAD $db.$table op=$op loop=$loop\n");
+ my $moreflds = get_select_moreflds_str();
+ $cli->open_index(1, $db, $table, '', "k,v$moreflds");
+ for (my $i = 0; $i < $loop; ++$i) {
+ my $k = "" . int(rand($tablesize));
+ # print "k=$k\n";
+ my $r = $cli->execute_find(1, $op, [ $k ], 1, 0);
+ if ($verbose >= 10) {
+ my $len = scalar(@{$r});
+ print "LEN=$len";
+ for my $e (@{$r}) {
+ print " [$e]";
+ }
+ print "\n";
+ }
+ print "$i/$loop\n" if $i % 1000 == 0;
+ }
+ } elsif ($action eq "hsupdate") {
+ my $vbase = "v" . int(rand(1000));
+ print("HSUPDATE $db.$table op=$op loop=$loop vbase=$vbase\n");
+ $cli->open_index(1, $db, $table, '', 'v');
+ for (my $i = 0; $i < $loop; ++$i) {
+ my $k = "" . int(rand($tablesize));
+ my $v = $vbase . $i;
+ print "k=$k v=$v\n";
+ my $r = $cli->execute_update(1, $op, [ $k ], 1, 0,
+ [ $v ]);
+ if ($verbose >= 10) {
+ print "UP k=$k v=$v\n";
+ }
+ print "$i/$loop\n" if $i % 1000 == 0;
+ }
+ } elsif ($action eq "hsdelete") {
+ print("HSDELETE $db.$table op=$op loop=$loop\n");
+ $cli->open_index(1, $db, $table, '', '');
+ for (my $i = 0; $i < $loop; ++$i) {
+ my $k = "" . int(rand($tablesize));
+ print "k=$k\n";
+ my $r = $cli->execute_delete(1, $op, [ $k ], 1, 0);
+ if ($verbose >= 10) {
+ print "DEL k=$k\n";
+ }
+ print "$i/$loop\n" if $i % 1000 == 0;
+ }
+ } elsif ($action eq "verify") {
+ verify_do();
+ }
+}
+
+sub verify_do {
+ my ($fail_cnt, $ok_cnt) = (0, 0);
+ my $sth = $dbh->prepare("select v from $db.$table where k = ?");
+ use FileHandle;
+ my $fh = new FileHandle($file, "r");
+ while (my $line = <$fh>) {
+ chomp($line);
+ my @vec = split(/\t/, $line);
+ my $k = $vec[3];
+ my $v = $vec[7];
+ next if (!defined($k) || !defined($v));
+ # print "$k $v\n";
+ $sth->execute($k);
+ my $aref = $sth->fetchrow_arrayref();
+ if (!defined($aref)) {
+ print "FAILED: $k notfound\n";
+ ++$fail_cnt;
+ } else {
+ my $gv = $aref->[0];
+ if ($gv ne $v) {
+ print "FAILED: $k got=$gv expected=$v\n";
+ ++$fail_cnt;
+ } else {
+ print "OK: $k $v $gv\n" if $verbose >= 10;
+ ++$ok_cnt;
+ }
+ }
+ }
+ print "OK=$ok_cnt FAIL=$fail_cnt\n";
+}
+
+sub get_conf {
+ my ($key, $def) = @_;
+ my $val = $conf{$key};
+ if ($val) {
+ print "$key=$val\n";
+ } else {
+ $val = $def;
+ $def ||= '';
+ print "$key=$def(default)\n";
+ }
+ return $val;
+}
+
+sub get_createtbl_moreflds_str {
+ my $s = "";
+ for (my $j = 0; $j < $num_moreflds; ++$j) {
+ $s .= ",$moreflds_prefix$j varchar(30)";
+ }
+ return $s;
+}
+
+sub get_select_moreflds_str {
+ my $s = "";
+ for (my $i = 0; $i < $num_moreflds; ++$i) {
+ $s .= ",$moreflds_prefix$i";
+ }
+ return $s;
+}
+
+sub get_insert_moreflds_str {
+ my $s = "";
+ for (my $i = 0; $i < $num_moreflds; ++$i) {
+ $s .= ",?";
+ }
+ return $s;
+}
+
diff --git a/plugin/handler_socket/client/hstest_hs.sh b/plugin/handler_socket/client/hstest_hs.sh
new file mode 100755
index 00000000..1b9eee18
--- /dev/null
+++ b/plugin/handler_socket/client/hstest_hs.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+exec ./hstest test=10 tablesize=10000 host=localhost hsport=9998 num=10000000 \
+ num_threads=100 timelimit=10 $@
diff --git a/plugin/handler_socket/client/hstest_hs_more50.sh b/plugin/handler_socket/client/hstest_hs_more50.sh
new file mode 100755
index 00000000..b7539c52
--- /dev/null
+++ b/plugin/handler_socket/client/hstest_hs_more50.sh
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+exec ./hstest test=10 key_mask=9999 host=localhost port=9998 num=10000000 \
+ num_threads=100 timelimit=10 moreflds=50 $@
diff --git a/plugin/handler_socket/client/hstest_md.sh b/plugin/handler_socket/client/hstest_md.sh
new file mode 100755
index 00000000..8129f884
--- /dev/null
+++ b/plugin/handler_socket/client/hstest_md.sh
@@ -0,0 +1,7 @@
+#!/bin/bash
+
+./hstest test=7 key_mask=9999 host=localhost port=11211 num=10000 \
+ num_threads=10 timelimit=10 op=R $@
+./hstest test=7 key_mask=9999 host=localhost port=11211 num=1000000 \
+ num_threads=100 timelimit=10 op=G $@
+
diff --git a/plugin/handler_socket/client/hstest_my.sh b/plugin/handler_socket/client/hstest_my.sh
new file mode 100755
index 00000000..cf917cf4
--- /dev/null
+++ b/plugin/handler_socket/client/hstest_my.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+exec ./hstest test=9 tablesize=9999 host=localhost mysqlport=3306 num=1000000 \
+ num_threads=100 verbose=1 timelimit=10 $@
diff --git a/plugin/handler_socket/client/hstest_my_more50.sh b/plugin/handler_socket/client/hstest_my_more50.sh
new file mode 100755
index 00000000..6782b5e8
--- /dev/null
+++ b/plugin/handler_socket/client/hstest_my_more50.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+exec ./hstest test=9 key_mask=9999 host=localhost port=3306 num=1000000 \
+ num_threads=100 verbose=1 timelimit=10 moreflds=50 $@