diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:24:36 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:24:36 +0000 |
commit | 06eaf7232e9a920468c0f8d74dcf2fe8b555501c (patch) | |
tree | e2c7b5777f728320e5b5542b6213fd3591ba51e2 /plugin/handler_socket/client | |
parent | Initial commit. (diff) | |
download | mariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.tar.xz mariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.zip |
Adding upstream version 1:10.11.6.upstream/1%10.11.6
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'plugin/handler_socket/client')
-rw-r--r-- | plugin/handler_socket/client/Makefile.am | 24 | ||||
-rw-r--r-- | plugin/handler_socket/client/hsclient.cpp | 88 | ||||
-rw-r--r-- | plugin/handler_socket/client/hslongrun.cpp | 1041 | ||||
-rwxr-xr-x | plugin/handler_socket/client/hspool_test.pl | 224 | ||||
-rw-r--r-- | plugin/handler_socket/client/hstest.cpp | 1532 | ||||
-rwxr-xr-x | plugin/handler_socket/client/hstest.pl | 228 | ||||
-rwxr-xr-x | plugin/handler_socket/client/hstest_hs.sh | 4 | ||||
-rwxr-xr-x | plugin/handler_socket/client/hstest_hs_more50.sh | 4 | ||||
-rwxr-xr-x | plugin/handler_socket/client/hstest_md.sh | 7 | ||||
-rwxr-xr-x | plugin/handler_socket/client/hstest_my.sh | 3 | ||||
-rwxr-xr-x | plugin/handler_socket/client/hstest_my_more50.sh | 3 |
11 files changed, 3158 insertions, 0 deletions
diff --git a/plugin/handler_socket/client/Makefile.am b/plugin/handler_socket/client/Makefile.am new file mode 100644 index 00000000..e89727a7 --- /dev/null +++ b/plugin/handler_socket/client/Makefile.am @@ -0,0 +1,24 @@ +CXXFLAGS += -fimplicit-templates +AM_INCLUDES= -I$(srcdir)/../libhsclient +bin_PROGRAMS=hsclient +hsclient_SOURCES= hsclient.cpp +hsclient_LDFLAGS= -static -L../libhsclient -lhsclient +hsclient_CXXFLAGS= $(AM_INCLUDES) + +hstest: hstest.o + $(CXX) $(CXXFLAGS) $(MY_CXXFLAGS) $(LFLAGS) hstest.o \ + -L../libhsclient/.libs -lhsclient $(MYSQL_LIB) \ + -o hstest + +hstest.o: hstest.cpp + $(CXX) $(CXXFLAGS) $(MY_CXXFLAGS) $(MYSQL_INC) $(AM_INCLUDES) \ + -c hstest.cpp + +hslongrun: hslongrun.o + $(CXX) $(CXXFLAGS) $(MY_CXXFLAGS) $(LFLAGS) hslongrun.o \ + -L../libhsclient/.libs -lhsclient $(MYSQL_LIB) \ + -o hslongrun + +hslongrun.o: hslongrun.cpp + $(CXX) $(CXXFLAGS) $(MY_CXXFLAGS) $(MYSQL_INC) $(AM_INCLUDES) \ + -c hslongrun.cpp diff --git a/plugin/handler_socket/client/hsclient.cpp b/plugin/handler_socket/client/hsclient.cpp new file mode 100644 index 00000000..0dd8332e --- /dev/null +++ b/plugin/handler_socket/client/hsclient.cpp @@ -0,0 +1,88 @@ + +// vim:sw=2:ai + +#include "hstcpcli.hpp" +#include "string_util.hpp" + +namespace dena { + +int +hstcpcli_main(int argc, char **argv) +{ + config conf; + parse_args(argc, argv, conf); + socket_args sockargs; + sockargs.set(conf); + hstcpcli_ptr cli = hstcpcli_i::create(sockargs); + const std::string dbname = conf.get_str("dbname", "hstest"); + const std::string table = conf.get_str("table", "hstest_table1"); + const std::string index = conf.get_str("index", "PRIMARY"); + const std::string fields = conf.get_str("fields", "k,v"); + const int limit = conf.get_int("limit", 0); + const int skip = conf.get_int("skip", 0); + std::vector<std::string> keys; + std::vector<string_ref> keyrefs; + size_t num_keys = 0; + while (true) { + const std::string conf_key = std::string("k") + to_stdstring(num_keys); + const std::string k = conf.get_str(conf_key, ""); + const std::string kx = conf.get_str(conf_key, "x"); + if (k.empty() && kx == "x") { + break; + } + ++num_keys; + keys.push_back(k); + } + for (size_t i = 0; i < keys.size(); ++i) { + const string_ref ref(keys[i].data(), keys[i].size()); + keyrefs.push_back(ref); + } + const std::string op = conf.get_str("op", "="); + const string_ref op_ref(op.data(), op.size()); + cli->request_buf_open_index(0, dbname.c_str(), table.c_str(), + index.c_str(), fields.c_str()); + cli->request_buf_exec_generic(0, op_ref, num_keys == 0 ? 0 : &keyrefs[0], + num_keys, limit, skip, string_ref(), 0, 0); + int code = 0; + size_t numflds = 0; + do { + if (cli->request_send() != 0) { + fprintf(stderr, "request_send: %s\n", cli->get_error().c_str()); + break; + } + if ((code = cli->response_recv(numflds)) != 0) { + fprintf(stderr, "response_recv: %s\n", cli->get_error().c_str()); + break; + } + } while (false); + cli->response_buf_remove(); + do { + if ((code = cli->response_recv(numflds)) != 0) { + fprintf(stderr, "response_recv: %s\n", cli->get_error().c_str()); + break; + } + while (true) { + const string_ref *const row = cli->get_next_row(); + if (row == 0) { + break; + } + printf("REC:"); + for (size_t i = 0; i < numflds; ++i) { + const std::string val(row[i].begin(), row[i].size()); + printf(" %s", val.c_str()); + } + printf("\n"); + } + } while (false); + cli->response_buf_remove(); + return 0; +} + +}; + +int +main(int argc, char **argv) +{ + return dena::hstcpcli_main(argc, argv); +} + diff --git a/plugin/handler_socket/client/hslongrun.cpp b/plugin/handler_socket/client/hslongrun.cpp new file mode 100644 index 00000000..b7c02951 --- /dev/null +++ b/plugin/handler_socket/client/hslongrun.cpp @@ -0,0 +1,1041 @@ + +// vim:sw=2:ai + +#include <signal.h> +#include <sys/time.h> +#include <stdio.h> +#include <string.h> +#include <vector> +#include <map> +#include <stdlib.h> +#include <memory> +#include <errno.h> +#include <mysql.h> +#include <time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "util.hpp" +#include "auto_ptrcontainer.hpp" +#include "socket.hpp" +#include "hstcpcli.hpp" +#include "string_util.hpp" +#include "mutex.hpp" + +namespace dena { + +struct auto_mysql : private noncopyable { + auto_mysql() : db(0) { + reset(); + } + ~auto_mysql() { + if (db) { + mysql_close(db); + } + } + void reset() { + if (db) { + mysql_close(db); + } + if ((db = mysql_init(0)) == 0) { + fatal_abort("failed to initialize mysql client"); + } + } + operator MYSQL *() const { return db; } + private: + MYSQL *db; +}; + +struct auto_mysql_res : private noncopyable { + auto_mysql_res(MYSQL *db) { + res = mysql_store_result(db); + } + ~auto_mysql_res() { + if (res) { + mysql_free_result(res); + } + } + operator MYSQL_RES *() const { return res; } + private: + MYSQL_RES *res; +}; + +struct auto_mysql_stmt : private noncopyable { + auto_mysql_stmt(MYSQL *db) { + stmt = mysql_stmt_init(db); + } + ~auto_mysql_stmt() { + if (stmt) { + mysql_stmt_close(stmt); + } + } + operator MYSQL_STMT *() const { return stmt; } + private: + MYSQL_STMT *stmt; +}; + +double +gettimeofday_double() +{ + struct timeval tv = { }; + if (gettimeofday(&tv, 0) != 0) { + fatal_abort("gettimeofday"); + } + return static_cast<double>(tv.tv_usec) / 1000000 + tv.tv_sec; +} + +struct record_value { + mutex lock; + bool deleted; + bool unknown_state; + std::string key; + std::vector<std::string> values; + record_value() : deleted(true), unknown_state(false) { } +}; + +struct hs_longrun_shared { + config conf; + socket_args arg; + int verbose; + long num_threads; + int usleep; + volatile mutable int running; + auto_ptrcontainer< std::vector<record_value *> > records; + hs_longrun_shared() : verbose(0), num_threads(0), usleep(0), running(1) { } +}; + +struct thread_base { + thread_base() : need_join(false), stack_size(256 * 1024) { } + virtual ~thread_base() { + join(); + } + virtual void run() = 0; + void start() { + if (!start_nothrow()) { + fatal_abort("thread::start"); + } + } + bool start_nothrow() { + if (need_join) { + return need_join; /* true */ + } + void *const arg = this; + pthread_attr_t attr; + if (pthread_attr_init(&attr) != 0) { + fatal_abort("pthread_attr_init"); + } + if (pthread_attr_setstacksize(&attr, stack_size) != 0) { + fatal_abort("pthread_attr_setstacksize"); + } + const int r = pthread_create(&thr, &attr, thread_main, arg); + if (pthread_attr_destroy(&attr) != 0) { + fatal_abort("pthread_attr_destroy"); + } + if (r != 0) { + return need_join; /* false */ + } + need_join = true; + return need_join; /* true */ + } + void join() { + if (!need_join) { + return; + } + int e = 0; + if ((e = pthread_join(thr, 0)) != 0) { + fatal_abort("pthread_join"); + } + need_join = false; + } + private: + static void *thread_main(void *arg) { + thread_base *p = static_cast<thread_base *>(arg); + p->run(); + return 0; + } + private: + pthread_t thr; + bool need_join; + size_t stack_size; +}; + +struct hs_longrun_stat { + unsigned long long verify_error_count; + unsigned long long runtime_error_count; + unsigned long long unknown_count; + unsigned long long success_count; + hs_longrun_stat() + : verify_error_count(0), runtime_error_count(0), + unknown_count(0), success_count(0) { } + void add(const hs_longrun_stat& x) { + verify_error_count += x.verify_error_count; + runtime_error_count += x.runtime_error_count; + unknown_count += x.unknown_count; + success_count += x.success_count; + } +}; + +struct hs_longrun_thread_base : public thread_base { + struct arg_type { + int id; + std::string worker_type; + char op; + int lock_flag; + const hs_longrun_shared& sh; + arg_type(int id, const std::string& worker_type, char op, int lock_flag, + const hs_longrun_shared& sh) + : id(id), worker_type(worker_type), op(op), lock_flag(lock_flag), + sh(sh) { } + }; + arg_type arg; + hs_longrun_stat stat; + drand48_data randbuf; + unsigned int seed; + hs_longrun_thread_base(const arg_type& arg) + : arg(arg), seed(0) { + seed = time(0) + arg.id + 1; + srand48_r(seed, &randbuf); + } + virtual ~hs_longrun_thread_base() { } + virtual void run() = 0; + size_t rand_record() { + double v = 0; + drand48_r(&randbuf, &v); + const size_t sz = arg.sh.records.size(); + size_t r = size_t(v * sz); + if (r >= sz) { + r = 0; + } + return r; + } + int verify_update(const std::string& k, const std::string& v1, + const std::string& v2, const std::string& v3, record_value& rec, + uint32_t num_rows, bool cur_unknown_state); + int verify_read(const std::string& k, uint32_t num_rows, uint32_t num_flds, + const std::string rrec[4], record_value& rec); + int verify_readnolock(const std::string& k, uint32_t num_rows, + uint32_t num_flds, const std::string rrec[4]); +}; + +int +hs_longrun_thread_base::verify_update(const std::string& k, + const std::string& v1, const std::string& v2, const std::string& v3, + record_value& rec, uint32_t num_rows, bool cur_unknown_state) +{ + const bool op_success = num_rows == 1; + int ret = 0; + if (!rec.unknown_state) { + if (!rec.deleted && !op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_update_failure\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } else if (rec.deleted && op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_update_success\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } + } + if (op_success) { + rec.values.resize(4); + rec.values[0] = k; + rec.values[1] = v1; + rec.values[2] = v2; + rec.values[3] = v3; + if (ret == 0 && !rec.unknown_state) { + ++stat.success_count; + } + } + rec.unknown_state = cur_unknown_state; + if (arg.sh.verbose >= 100 && ret == 0) { + fprintf(stderr, "%s %s %s %s %s\n", arg.worker_type.c_str(), + k.c_str(), v1.c_str(), v2.c_str(), v3.c_str()); + } + return ret; +} + +int +hs_longrun_thread_base::verify_read(const std::string& k, + uint32_t num_rows, uint32_t num_flds, const std::string rrec[4], + record_value& rec) +{ + const bool op_success = num_rows != 0; + int ret = 0; + if (!rec.unknown_state) { + if (!rec.deleted && !op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_read_failure\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } else if (rec.deleted && op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_read_success\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } else if (num_flds != 4) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_read_fldnum %d\n", + arg.worker_type.c_str(), arg.id, k.c_str(), + static_cast<int>(num_flds)); + } + ret = 1; + } else if (rec.deleted) { + /* nothing to verify */ + } else { + int diff = 0; + for (size_t i = 0; i < 4; ++i) { + if (rec.values[i] == rrec[i]) { + /* ok */ + } else { + diff = 1; + } + } + if (diff) { + std::string mess; + for (size_t i = 0; i < 4; ++i) { + const std::string& expected = rec.values[i]; + const std::string& val = rrec[i]; + mess += " " + val + "/" + expected; + } + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_read_value %s\n", + arg.worker_type.c_str(), arg.id, k.c_str(), mess.c_str()); + } + ret = 1; + } + } + } + if (arg.sh.verbose >= 100 && ret == 0) { + fprintf(stderr, "%s %s\n", arg.worker_type.c_str(), k.c_str()); + } + if (ret == 0 && !rec.unknown_state) { + ++stat.success_count; + } + return ret; +} + +int +hs_longrun_thread_base::verify_readnolock(const std::string& k, + uint32_t num_rows, uint32_t num_flds, const std::string rrec[4]) +{ + int ret = 0; + if (num_rows != 1 || num_flds != 4) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_read_failure\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } + if (arg.sh.verbose >= 100 && ret == 0) { + fprintf(stderr, "%s -> %s %s %s %s %s\n", arg.worker_type.c_str(), + k.c_str(), rrec[0].c_str(), rrec[1].c_str(), rrec[2].c_str(), + rrec[3].c_str()); + } + if (ret == 0) { + ++stat.success_count; + } + return ret; +} + +struct hs_longrun_thread_hs : public hs_longrun_thread_base { + hs_longrun_thread_hs(const arg_type& arg) + : hs_longrun_thread_base(arg) { } + void run(); + int check_hs_error(const char *mess, record_value *rec); + int op_insert(record_value& rec); + int op_delete(record_value& rec); + int op_update(record_value& rec); + int op_read(record_value& rec); + int op_readnolock(int k); + hstcpcli_ptr cli; + socket_args sockargs; +}; + +struct lock_guard : noncopyable { + lock_guard(mutex& mtx) : mtx(mtx) { + mtx.lock(); + } + ~lock_guard() { + mtx.unlock(); + } + mutex& mtx; +}; + +string_ref +to_string_ref(const std::string& s) +{ + return string_ref(s.data(), s.size()); +} + +std::string +to_string(const string_ref& s) +{ + return std::string(s.begin(), s.size()); +} + +void +hs_longrun_thread_hs::run() +{ + config c = arg.sh.conf; + if (arg.op == 'R' || arg.op == 'N') { + c["port"] = to_stdstring(arg.sh.conf.get_int("hsport", 9998)); + } else { + c["port"] = to_stdstring(arg.sh.conf.get_int("hsport_wr", 9999)); + } + sockargs.set(c); + + while (arg.sh.running) { + if (cli.get() == 0 || !cli->stable_point()) { + cli = hstcpcli_i::create(sockargs); + if (check_hs_error("connect", 0) != 0) { + cli.reset(); + continue; + } + cli->request_buf_open_index(0, "hstestdb", "hstesttbl", "PRIMARY", + "k,v1,v2,v3", "k,v1,v2,v3"); + cli->request_send(); + if (check_hs_error("openindex_send", 0) != 0) { + cli.reset(); + continue; + } + size_t num_flds = 0; + cli->response_recv(num_flds); + if (check_hs_error("openindex_recv", 0) != 0) { + cli.reset(); + continue; + } + cli->response_buf_remove(); + } + const size_t rec_id = rand_record(); + if (arg.lock_flag) { + record_value& rec = *arg.sh.records[rec_id]; + lock_guard g(rec.lock); + int e = 0; + switch (arg.op) { + case 'I': + e = op_insert(rec); + break; + case 'D': + e = op_delete(rec); + break; + case 'U': + e = op_update(rec); + break; + case 'R': + e = op_read(rec); + break; + default: + break; + } + } else { + int e = 0; + switch (arg.op) { + case 'N': + e = op_readnolock(rec_id); + break; + default: + break; + } + } + } +} + +int +hs_longrun_thread_hs::op_insert(record_value& rec) +{ + const std::string k = rec.key; + const std::string v1 = "iv1_" + k + "_" + to_stdstring(arg.id); + const std::string v2 = "iv2_" + k + "_" + to_stdstring(arg.id); + const std::string v3 = "iv3_" + k + "_" + to_stdstring(arg.id); + const string_ref op_ref("+", 1); + const string_ref op_args[4] = { + to_string_ref(k), + to_string_ref(v1), + to_string_ref(v2), + to_string_ref(v3) + }; + cli->request_buf_exec_generic(0, op_ref, op_args, 4, 1, 0, + string_ref(), 0, 0, 0, 0); + cli->request_send(); + if (check_hs_error("op_insert_send", &rec) != 0) { return 1; } + size_t numflds = 0; + cli->response_recv(numflds); + if (arg.sh.verbose > 10) { + const string_ref *row = cli->get_next_row(); + fprintf(stderr, "HS op=+ errrcode=%d errmess=[%s]\n", cli->get_error_code(), + row ? to_string(row[0]).c_str() : ""); + } + const bool op_success = cli->get_error_code() == 0; + int ret = 0; + if (!rec.unknown_state) { + if (rec.deleted && !op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_insert_failure\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } else if (!rec.deleted && op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_insert_success\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } + } else { + ++stat.unknown_count; + } + if (op_success) { + rec.values.resize(4); + rec.values[0] = k; + rec.values[1] = v1; + rec.values[2] = v2; + rec.values[3] = v3; + rec.deleted = false; + if (arg.sh.verbose >= 100 && ret == 0) { + fprintf(stderr, "HS_INSERT %s %s %s %s\n", k.c_str(), v1.c_str(), + v2.c_str(), v3.c_str()); + } + if (ret == 0 && !rec.unknown_state) { + ++stat.success_count; + } + rec.unknown_state = false; + } + cli->response_buf_remove(); + return ret; +} + +int +hs_longrun_thread_hs::op_delete(record_value& rec) +{ + const std::string k = rec.key; + const string_ref op_ref("=", 1); + const string_ref op_args[1] = { + to_string_ref(k), + }; + const string_ref modop_ref("D", 1); + cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0, + modop_ref, 0, 0, 0, 0); + cli->request_send(); + if (check_hs_error("op_delete_send", &rec) != 0) { return 1; } + size_t numflds = 0; + cli->response_recv(numflds); + if (check_hs_error("op_delete_recv", &rec) != 0) { return 1; } + const string_ref *row = cli->get_next_row(); + const bool op_success = (numflds > 0 && row != 0 && + to_string(row[0]) == "1"); + int ret = 0; + if (!rec.unknown_state) { + if (!rec.deleted && !op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_delete_failure\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } else if (rec.deleted && op_success) { + ++stat.verify_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "VERIFY_ERROR: %s wid=%d k=%s " + "unexpected_delete_success\n", + arg.worker_type.c_str(), arg.id, k.c_str()); + } + ret = 1; + } + } + cli->response_buf_remove(); + if (op_success) { + rec.deleted = true; + if (ret == 0 && !rec.unknown_state) { + ++stat.success_count; + } + rec.unknown_state = false; + } + if (arg.sh.verbose >= 100 && ret == 0) { + fprintf(stderr, "HS_DELETE %s\n", k.c_str()); + } + return ret; +} + +int +hs_longrun_thread_hs::op_update(record_value& rec) +{ + const std::string k = rec.key; + const std::string v1 = "uv1_" + k + "_" + to_stdstring(arg.id); + const std::string v2 = "uv2_" + k + "_" + to_stdstring(arg.id); + const std::string v3 = "uv3_" + k + "_" + to_stdstring(arg.id); + const string_ref op_ref("=", 1); + const string_ref op_args[1] = { + to_string_ref(k), + }; + const string_ref modop_ref("U", 1); + const string_ref modop_args[4] = { + to_string_ref(k), + to_string_ref(v1), + to_string_ref(v2), + to_string_ref(v3) + }; + cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0, + modop_ref, modop_args, 4, 0, 0); + cli->request_send(); + if (check_hs_error("op_update_send", &rec) != 0) { return 1; } + size_t numflds = 0; + cli->response_recv(numflds); + if (check_hs_error("op_update_recv", &rec) != 0) { return 1; } + const string_ref *row = cli->get_next_row(); + uint32_t num_rows = row + ? atoi_uint32_nocheck(row[0].begin(), row[0].end()) : 0; + cli->response_buf_remove(); + const bool cur_unknown_state = (num_rows == 1); + return verify_update(k, v1, v2, v3, rec, num_rows, cur_unknown_state); +} + +int +hs_longrun_thread_hs::op_read(record_value& rec) +{ + const std::string k = rec.key; + const string_ref op_ref("=", 1); + const string_ref op_args[1] = { + to_string_ref(k), + }; + cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0, + string_ref(), 0, 0, 0, 0); + cli->request_send(); + if (check_hs_error("op_read_send", 0) != 0) { return 1; } + size_t num_flds = 0; + size_t num_rows = 0; + cli->response_recv(num_flds); + if (check_hs_error("op_read_recv", 0) != 0) { return 1; } + const string_ref *row = cli->get_next_row(); + std::string rrec[4]; + if (row != 0 && num_flds == 4) { + for (int i = 0; i < 4; ++i) { + rrec[i] = to_string(row[i]); + } + ++num_rows; + } + row = cli->get_next_row(); + if (row != 0) { + ++num_rows; + } + cli->response_buf_remove(); + return verify_read(k, num_rows, num_flds, rrec, rec); +} + +int +hs_longrun_thread_hs::op_readnolock(int key) +{ + const std::string k = to_stdstring(key); + const string_ref op_ref("=", 1); + const string_ref op_args[1] = { + to_string_ref(k), + }; + cli->request_buf_exec_generic(0, op_ref, op_args, 1, 1, 0, + string_ref(), 0, 0, 0, 0); + cli->request_send(); + if (check_hs_error("op_read_send", 0) != 0) { return 1; } + size_t num_flds = 0; + size_t num_rows = 0; + cli->response_recv(num_flds); + if (check_hs_error("op_read_recv", 0) != 0) { return 1; } + const string_ref *row = cli->get_next_row(); + std::string rrec[4]; + if (row != 0 && num_flds == 4) { + for (int i = 0; i < 4; ++i) { + rrec[i] = to_string(row[i]); + } + ++num_rows; + } + row = cli->get_next_row(); + if (row != 0) { + ++num_rows; + } + cli->response_buf_remove(); + return verify_readnolock(k, num_rows, num_flds, rrec); +} + +int +hs_longrun_thread_hs::check_hs_error(const char *mess, record_value *rec) +{ + const int err = cli->get_error_code(); + if (err == 0) { + return 0; + } + ++stat.runtime_error_count; + if (arg.sh.verbose > 0) { + const std::string estr = cli->get_error(); + fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d %s: %d %s\n", + arg.op, arg.id, mess, err, estr.c_str()); + } + if (rec) { + rec->unknown_state = true; + } + return 1; +} + +struct hs_longrun_thread_my : public hs_longrun_thread_base { + hs_longrun_thread_my(const arg_type& arg) + : hs_longrun_thread_base(arg), connected(false) { } + void run(); + void show_mysql_error(const char *mess, record_value *rec); + int op_insert(record_value& rec); + int op_delete(record_value& rec); + int op_update(record_value& rec); + int op_delins(record_value& rec); + int op_read(record_value& rec); + auto_mysql db; + bool connected; +}; + +void +hs_longrun_thread_my::run() +{ + const std::string mysql_host = arg.sh.conf.get_str("host", "localhost"); + const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root"); + const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", ""); + const std::string mysql_dbname = "hstestdb"; + + while (arg.sh.running) { + if (!connected) { + if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(), + mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) { + show_mysql_error("mysql_real_connect", 0); + continue; + } + } + connected = true; + const size_t rec_id = rand_record(); + record_value& rec = *arg.sh.records[rec_id]; + lock_guard g(rec.lock); + int e = 0; + switch (arg.op) { + #if 0 + case 'I': + e = op_insert(rec); + break; + case 'D': + e = op_delete(rec); + break; + case 'U': + e = op_update(rec); + break; + #endif + case 'T': + e = op_delins(rec); + break; + case 'R': + e = op_read(rec); + break; + default: + break; + } + } +} + +int +hs_longrun_thread_my::op_delins(record_value& rec) +{ + const std::string k = rec.key; + const std::string v1 = "div1_" + k + "_" + to_stdstring(arg.id); + const std::string v2 = "div2_" + k + "_" + to_stdstring(arg.id); + const std::string v3 = "div3_" + k + "_" + to_stdstring(arg.id); + int success = 0; + bool cur_unknown_state = false; + do { + char query[1024]; + #if 1 + if (mysql_query(db, "begin") != 0) { + if (arg.sh.verbose >= 20) { + fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "begin"); + } + break; + } + #endif + cur_unknown_state = true; + snprintf(query, 1024, + "delete from hstesttbl where k = '%s'", k.c_str()); + if (mysql_query(db, query) != 0) { + if (arg.sh.verbose >= 20) { + fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query); + } + break; + } + if (mysql_affected_rows(db) != 1) { + if (arg.sh.verbose >= 20) { + fprintf(stderr, "mysql: notfound: [%s]\n", query); + } + break; + } + snprintf(query, 1024, + "insert into hstesttbl values ('%s', '%s', '%s', '%s')", + k.c_str(), v1.c_str(), v2.c_str(), v3.c_str()); + if (mysql_query(db, query) != 0) { + if (arg.sh.verbose >= 20) { + fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query); + } + break; + } + #if 1 + if (mysql_query(db, "commit") != 0) { + if (arg.sh.verbose >= 20) { + fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), "commit"); + } + break; + } + #endif + success = true; + cur_unknown_state = false; + } while (false); + return verify_update(k, v1, v2, v3, rec, (success != 0), cur_unknown_state); +} + +int +hs_longrun_thread_my::op_read(record_value& rec) +{ + const std::string k = rec.key; + char query[1024] = { 0 }; + const int len = snprintf(query, 1024, + "select k,v1,v2,v3 from hstesttbl where k='%s'", k.c_str()); + const int r = mysql_real_query(db, query, len > 0 ? len : 0); + if (r != 0) { + show_mysql_error(query, 0); + return 1; + } + MYSQL_ROW row = 0; + unsigned long *lengths = 0; + unsigned int num_rows = 0; + unsigned int num_flds = 0; + auto_mysql_res res(db); + std::string rrec[4]; + if (res != 0) { + num_flds = mysql_num_fields(res); + row = mysql_fetch_row(res); + if (row != 0) { + lengths = mysql_fetch_lengths(res); + if (num_flds == 4) { + for (int i = 0; i < 4; ++i) { + rrec[i] = std::string(row[i], lengths[i]); + } + } + ++num_rows; + row = mysql_fetch_row(res); + if (row != 0) { + ++num_rows; + } + } + } + return verify_read(k, num_rows, num_flds, rrec, rec); +} + +void +hs_longrun_thread_my::show_mysql_error(const char *mess, record_value *rec) +{ + ++stat.runtime_error_count; + if (arg.sh.verbose > 0) { + fprintf(stderr, "RUNTIME_ERROR: op=%c wid=%d [%s]: %s\n", + arg.op, arg.id, mess, mysql_error(db)); + } + if (rec) { + rec->unknown_state = true; + } + db.reset(); + connected = false; +} + +void +mysql_do(MYSQL *db, const char *query) +{ + if (mysql_real_query(db, query, strlen(query)) != 0) { + fprintf(stderr, "mysql: e=[%s] q=[%s]\n", mysql_error(db), query); + fatal_abort("mysql_do"); + } +} + +void +hs_longrun_init_table(const config& conf, int num_prepare, + hs_longrun_shared& shared) +{ + const std::string mysql_host = conf.get_str("host", "localhost"); + const std::string mysql_user = conf.get_str("mysqluser", "root"); + const std::string mysql_passwd = conf.get_str("mysqlpass", ""); + const std::string mysql_dbname = ""; + auto_mysql db; + if (!mysql_real_connect(db, mysql_host.c_str(), mysql_user.c_str(), + mysql_passwd.c_str(), mysql_dbname.c_str(), mysql_port, 0, 0)) { + fprintf(stderr, "mysql: error=[%s]\n", mysql_error(db)); + fatal_abort("hs_longrun_init_table"); + } + mysql_do(db, "drop database if exists hstestdb"); + mysql_do(db, "create database hstestdb"); + mysql_do(db, "use hstestdb"); + mysql_do(db, + "create table hstesttbl (" + "k int primary key," + "v1 varchar(32) not null," + "v2 varchar(32) not null," + "v3 varchar(32) not null" + ") character set utf8 collate utf8_bin engine = innodb"); + for (int i = 0; i < num_prepare; ++i) { + const std::string i_str = to_stdstring(i); + const std::string v1 = "pv1_" + i_str; + const std::string v2 = "pv2_" + i_str; + const std::string v3 = "pv3_" + i_str; + char buf[1024]; + snprintf(buf, 1024, "insert into hstesttbl(k, v1, v2, v3) values" + "(%d, '%s', '%s', '%s')", i, v1.c_str(), v2.c_str(), v3.c_str()); + mysql_do(db, buf); + record_value *rec = shared.records[i]; + rec->key = i_str; + rec->values.resize(4); + rec->values[0] = i_str; + rec->values[1] = v1; + rec->values[2] = v2; + rec->values[3] = v3; + rec->deleted = false; + } +} + +int +hs_longrun_main(int argc, char **argv) +{ + hs_longrun_shared shared; + parse_args(argc, argv, shared.conf); + shared.conf["host"] = shared.conf.get_str("host", "localhost"); + shared.verbose = shared.conf.get_int("verbose", 1); + const int table_size = shared.conf.get_int("table_size", 10000); + for (int i = 0; i < table_size; ++i) { + std::auto_ptr<record_value> rec(new record_value()); + rec->key = to_stdstring(i); + shared.records.push_back_ptr(rec); + } + mysql_library_init(0, 0, 0); + const int duration = shared.conf.get_int("duration", 10); + const int num_hsinsert = shared.conf.get_int("num_hsinsert", 10); + const int num_hsdelete = shared.conf.get_int("num_hsdelete", 10); + const int num_hsupdate = shared.conf.get_int("num_hsupdate", 10); + const int num_hsread = shared.conf.get_int("num_hsread", 10); + const int num_myread = shared.conf.get_int("num_myread", 10); + const int num_mydelins = shared.conf.get_int("num_mydelins", 10); + int num_hsreadnolock = shared.conf.get_int("num_hsreadnolock", 10); + const bool always_filled = (num_hsinsert == 0 && num_hsdelete == 0); + if (!always_filled) { + num_hsreadnolock = 0; + } + hs_longrun_init_table(shared.conf, always_filled ? table_size : 0, + shared); + /* create worker threads */ + static const struct thrtmpl_type { + const char *type; char op; int num; int hs; int lock; + } thrtmpl[] = { + { "hsinsert", 'I', num_hsinsert, 1, 1 }, + { "hsdelete", 'D', num_hsdelete, 1, 1 }, + { "hsupdate", 'U', num_hsupdate, 1, 1 }, + { "hsread", 'R', num_hsread, 1, 1 }, + { "hsreadnolock", 'N', num_hsreadnolock, 1, 0 }, + { "myread", 'R', num_myread, 0, 1 }, + { "mydelins", 'T', num_mydelins, 0, 1 }, + }; + typedef auto_ptrcontainer< std::vector<hs_longrun_thread_base *> > thrs_type; + thrs_type thrs; + for (size_t i = 0; i < sizeof(thrtmpl)/sizeof(thrtmpl[0]); ++i) { + const thrtmpl_type& e = thrtmpl[i]; + for (int j = 0; j < e.num; ++j) { + int id = thrs.size(); + const hs_longrun_thread_hs::arg_type arg(id, e.type, e.op, e.lock, + shared); + std::auto_ptr<hs_longrun_thread_base> thr; + if (e.hs) { + thr.reset(new hs_longrun_thread_hs(arg)); + } else { + thr.reset(new hs_longrun_thread_my(arg)); + } + thrs.push_back_ptr(thr); + } + } + shared.num_threads = thrs.size(); + /* start threads */ + fprintf(stderr, "START\n"); + shared.running = 1; + for (size_t i = 0; i < thrs.size(); ++i) { + thrs[i]->start(); + } + /* wait */ + sleep(duration); + /* stop thread */ + shared.running = 0; + for (size_t i = 0; i < thrs.size(); ++i) { + thrs[i]->join(); + } + fprintf(stderr, "DONE\n"); + /* summary */ + typedef std::map<std::string, hs_longrun_stat> stat_map; + stat_map sm; + for (size_t i = 0; i < thrs.size(); ++i) { + hs_longrun_thread_base *const thr = thrs[i]; + const std::string wt = thr->arg.worker_type; + hs_longrun_stat& v = sm[wt]; + v.add(thr->stat); + } + hs_longrun_stat total; + for (stat_map::const_iterator i = sm.begin(); i != sm.end(); ++i) { + if (i->second.verify_error_count != 0) { + fprintf(stderr, "%s verify_error %llu\n", i->first.c_str(), + i->second.verify_error_count); + } + if (i->second.runtime_error_count) { + fprintf(stderr, "%s runtime_error %llu\n", i->first.c_str(), + i->second.runtime_error_count); + } + if (i->second.unknown_count) { + fprintf(stderr, "%s unknown %llu\n", i->first.c_str(), + i->second.unknown_count); + } + fprintf(stderr, "%s success %llu\n", i->first.c_str(), + i->second.success_count); + total.add(i->second); + } + if (total.verify_error_count != 0) { + fprintf(stderr, "TOTAL verify_error %llu\n", total.verify_error_count); + } + if (total.runtime_error_count != 0) { + fprintf(stderr, "TOTAL runtime_error %llu\n", total.runtime_error_count); + } + if (total.unknown_count != 0) { + fprintf(stderr, "TOTAL unknown %llu\n", total.unknown_count); + } + fprintf(stderr, "TOTAL success %llu\n", total.success_count); + mysql_library_end(); + return 0; +} + +}; + +int +main(int argc, char **argv) +{ + return dena::hs_longrun_main(argc, argv); +} + diff --git a/plugin/handler_socket/client/hspool_test.pl b/plugin/handler_socket/client/hspool_test.pl new file mode 100755 index 00000000..03227e31 --- /dev/null +++ b/plugin/handler_socket/client/hspool_test.pl @@ -0,0 +1,224 @@ +#!/usr/bin/env perl + +use strict; +use warnings; +use DB::HandlerSocket::Pool; +use DBI; + +my %conf = (); +for my $i (@ARGV) { + my ($k, $v) = split(/=/, $i); + $conf{$k} = $v; +} + +my $verbose = get_conf("verbose", 0); +my $actions_str = get_conf("actions", + "create,insert,verify,verify2,verify3,verify4,clean"); +my $tablesize = get_conf("tablesize", 1000); +my $db = get_conf("db", "hstestdb"); +my $table = get_conf("table", "testtbl"); +my $table_schema = get_conf("table_schema", undef); +my $engine = get_conf("engine", "innodb"); +my $host = get_conf("host", "localhost"); +my $mysqlport = get_conf("mysqlport", 3306); +my $hsport_rd = get_conf("hsport_rd", 9998); +my $hsport_wr = get_conf("hsport_wr", 9999); +my $loop = get_conf("loop", 10000); +my $op = get_conf("op", "="); +my $ssps = get_conf("ssps", 0); +my $num_moreflds = get_conf("moreflds", 0); +my $moreflds_prefix = get_conf("moreflds_prefix", "f"); +my $mysql_user = 'root'; +my $mysql_password = ''; + +my $dsn = "DBI:MariaDB:database=;host=$host;port=$mysqlport" + . ";mariadb_server_prepare=$ssps"; +my $dbh = DBI->connect($dsn, $mysql_user, $mysql_password, + { RaiseError => 1 }); +my $hsargs = { 'host' => $host, 'port' => $hsport_rd }; +my $hspool = new DB::HandlerSocket::Pool({ + hostmap => { + "$db.$table" => { + host => $host, + port => $hsport_rd, + }, + }, + resolve => undef, + error => undef, +}); +$table_schema = "(k int primary key, fc30 varchar(30), ft text)" + if (!defined($table_schema)); + +my @actions = split(/,/, $actions_str); +for my $action (@actions) { + print "ACTION: $action\n"; + eval "hstest_$action()"; + if ($@) { + die $@; + } + print "ACTION: $action DONE\n"; +} + +sub get_conf { + my ($key, $def) = @_; + my $val = $conf{$key}; + if ($val) { + print "$key=$val\n"; + } else { + $val = $def; + my $defstr = $def || "(undef)"; + print "$key=$defstr(default)\n"; + } + return $val; +} + +sub hstest_create { + $dbh->do("drop database if exists $db"); + $dbh->do("create database $db"); + $dbh->do("use $db"); + $dbh->do("create table $table $table_schema engine=$engine"); +} + +sub hstest_dump { + $dbh->do("use $db"); + my $sth = $dbh->prepare("select * from $table"); + $sth->execute(); + my $arr = $sth->fetchall_arrayref(); + for my $rec (@$arr) { + print "REC:"; + for my $row (@$rec) { + print " $row"; + } + print "\n"; + } +} + +sub hstest_insert { + $dbh->do("use $db"); + my $sth = $dbh->prepare("insert into $table values (?, ?, ?)"); + for (my $k = 0; $k < $tablesize; ++$k) { + my $fc30 = "fc30_$k"; + my $ft = "ft_$k"; + $sth->execute($k, $fc30, $ft); + } +} + +sub hstest_verify { + $dbh->do("use $db"); + my $sth = $dbh->prepare("select * from $table order by k"); + $sth->execute(); + my $arr = $sth->fetchall_arrayref(); + my $hsres = $hspool->index_find($db, $table, "PRIMARY", "k,fc30,ft", + ">=", [ 0 ], $tablesize, 0); + for (my $i = 0; $i < $tablesize; ++$i) { + my $rec = $arr->[$i]; + my $differ = 0; + print "REC:" if $verbose; + for (my $j = 0; $j < 3; ++$j) { + my $fld = $rec->[$j]; + my $hsidx = $i * 3 + $j; + my $hsfld = $hsres->[$hsidx]; + if ($hsfld ne $fld) { + $differ = 1; + } + if ($differ) { + print " $fld:$hsfld" if $verbose; + } else { + print " $hsfld" if $verbose; + } + } + print "\n" if $verbose; + if ($differ) { + die "verification failed"; + } + } +} + +sub hstest_verify2 { + $dbh->do("use $db"); + my $sth = $dbh->prepare("select * from $table order by k"); + $sth->execute(); + my $arr = $sth->fetchall_arrayref(); + my $hsresa = $hspool->index_find_multi($db, $table, "PRIMARY", + "k,fc30,ft", [ [ -1, ">=", [ 0 ], $tablesize, 0 ] ]); + my $hsres = $hsresa->[0]; + for (my $i = 0; $i < $tablesize; ++$i) { + my $rec = $arr->[$i]; + my $differ = 0; + print "REC:" if $verbose; + for (my $j = 0; $j < 3; ++$j) { + my $fld = $rec->[$j]; + my $hsidx = $i * 3 + $j; + my $hsfld = $hsres->[$hsidx]; + if ($hsfld ne $fld) { + $differ = 1; + } + if ($differ) { + print " $fld:$hsfld" if $verbose; + } else { + print " $hsfld" if $verbose; + } + } + print "\n" if $verbose; + if ($differ) { + die "verification failed"; + } + } +} + +sub hashref_to_str { + my $href = $_[0]; + my $r = ''; + for my $k (sort keys %$href) { + my $v = $href->{$k}; + $r .= "($k=>$v)"; + } + return $r; +} + +sub hstest_verify3 { + $dbh->do("use $db"); + my $sth = $dbh->prepare("select * from $table order by k"); + $sth->execute(); + my $hsres_t = $hspool->index_find($db, $table, "PRIMARY", "k,fc30,ft", + ">=", [ 0 ], $tablesize, 0); + my $hsres = DB::HandlerSocket::Pool::result_single_to_hasharr( + [ 'k', 'fc30', 'ft' ], $hsres_t); + for (my $i = 0; $i < $tablesize; ++$i) { + my $mystr = hashref_to_str($sth->fetchrow_hashref()); + my $hsstr = hashref_to_str($hsres->[$i]); + if ($mystr ne $hsstr) { + print "DIFF my=[$mystr] hs=[$hsstr]\n" if $verbose; + die "verification failed"; + } else { + print "OK $hsstr\n" if $verbose; + } + } +} + +sub hstest_verify4 { + $dbh->do("use $db"); + my $sth = $dbh->prepare("select * from $table order by k"); + $sth->execute(); + my $hsres_t = $hspool->index_find($db, $table, "PRIMARY", "k,fc30,ft", + ">=", [ 0 ], $tablesize, 0); + my $hsres = DB::HandlerSocket::Pool::result_single_to_hashhash( + [ 'k', 'fc30', 'ft' ], 'k', $hsres_t); + my $rechash = $sth->fetchall_hashref('k'); + while (my ($k, $href) = each (%$rechash)) { + my $mystr = hashref_to_str($href); + my $hsstr = hashref_to_str($hsres->{$k}); + if ($mystr ne $hsstr) { + print "DIFF my=[$mystr] hs=[$hsstr]\n" if $verbose; + die "verification failed"; + } else { + print "OK $hsstr\n" if $verbose; + } + } +} + +sub hstest_clean { + $hspool->clear_pool(); + $dbh->do("drop database if exists $db"); +} + diff --git a/plugin/handler_socket/client/hstest.cpp b/plugin/handler_socket/client/hstest.cpp new file mode 100644 index 00000000..b5551fed --- /dev/null +++ b/plugin/handler_socket/client/hstest.cpp @@ -0,0 +1,1532 @@ + +// vim:sw=2:ai + +#include <signal.h> +#include <sys/time.h> +#include <stdio.h> +#include <string.h> +#include <vector> +#include <stdlib.h> +#include <memory> +#include <errno.h> +#include <mysql.h> +#include <time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "util.hpp" +#include "auto_ptrcontainer.hpp" +#include "socket.hpp" +#include "thread.hpp" +#include "hstcpcli.hpp" + +#if __GNUC__ >= 4 +long atomic_exchange_and_add(volatile long *valp, long c) +{ + return __sync_fetch_and_add(valp, c); +} +#else +#include <bits/atomicity.h> +using namespace __gnu_cxx; +long atomic_exchange_and_add(volatile long *valp, long c) +{ + return __exchange_and_add((volatile _Atomic_word *)valp, c); +} +#endif + +namespace dena { + +struct auto_mysql : private noncopyable { + auto_mysql() : db(0) { + reset(); + } + ~auto_mysql() { + if (db) { + mysql_close(db); + } + } + void reset() { + if (db) { + mysql_close(db); + } + if ((db = mysql_init(0)) == 0) { + fatal_abort("failed to initialize mysql client"); + } + } + operator MYSQL *() const { return db; } + private: + MYSQL *db; +}; + +struct auto_mysql_res : private noncopyable { + auto_mysql_res(MYSQL *db) { + res = mysql_store_result(db); + } + ~auto_mysql_res() { + if (res) { + mysql_free_result(res); + } + } + operator MYSQL_RES *() const { return res; } + private: + MYSQL_RES *res; +}; + +struct auto_mysql_stmt : private noncopyable { + auto_mysql_stmt(MYSQL *db) { + stmt = mysql_stmt_init(db); + } + ~auto_mysql_stmt() { + if (stmt) { + mysql_stmt_close(stmt); + } + } + operator MYSQL_STMT *() const { return stmt; } + private: + MYSQL_STMT *stmt; +}; + +namespace { + +double +gettimeofday_double() +{ + struct timeval tv; + if (gettimeofday(&tv, 0) != 0) { + fatal_abort("gettimeofday"); + } + return static_cast<double>(tv.tv_usec) / 1000000 + tv.tv_sec; +} + +// unused +void +wait_close(int fd) +{ + char buf[1024]; + while (true) { + int r = read(fd, buf, sizeof(buf)); + if (r <= 0) { + break; + } + } +} + +// unused +void +gentle_close(int fd) +{ + int r = shutdown(fd, SHUT_WR); + if (r != 0) { + return; + } + wait_close(fd); +} + +}; + +struct hstest_shared { + config conf; + socket_args arg; + int verbose; + size_t loop; + size_t pipe; + char op; + long num_threads; + mutable volatile long count; + mutable volatile long conn_count; + long wait_conn; + volatile char *keygen; + long keygen_size; + mutable volatile int enable_timing; + int usleep; + int dump; + hstest_shared() : verbose(0), loop(0), pipe(0), op('G'), num_threads(0), + count(0), conn_count(0), wait_conn(0), keygen(0), keygen_size(0), + enable_timing(0), usleep(0), dump(0) { } + void increment_count(unsigned int c = 1) const volatile { + atomic_exchange_and_add(&count, c); + } + void increment_conn(unsigned int c) const volatile { + atomic_exchange_and_add(&conn_count, c); + while (wait_conn != 0 && conn_count < wait_conn) { + sleep(1); + } + // fprintf(stderr, "wait_conn=%ld done\n", wait_conn); + } +}; + +struct hstest_thread { + struct arg_type { + size_t id; + const hstest_shared& sh; + bool watch_flag; + arg_type(size_t i, const hstest_shared& s, bool w) + : id(i), sh(s), watch_flag(w) { } + }; + hstest_thread(const arg_type& a) : arg(a), io_success_count(0), + op_success_count(0), response_min(99999), response_max(0), + response_sum(0), response_avg(0) { } + void operator ()(); + void test_1(); + void test_2_3(int test_num); + void test_4_5(int test_num); + void test_6(int test_num); + void test_7(int test_num); + void test_8(int test_num); + void test_9(int test_num); + void test_10(int test_num); + void test_11(int test_num); + void test_12(int test_num); + void test_21(int test_num); + void test_22(int test_num); + void test_watch(); + void sleep_if(); + void set_timing(double time_spent); + arg_type arg; + auto_file fd; + size_t io_success_count; + size_t op_success_count; + double response_min, response_max, response_sum, response_avg; +}; + +void +hstest_thread::test_1() +{ + char buf[1024]; + unsigned int seed = arg.id; + seed ^= arg.sh.conf.get_int("seed_xor", 0); + std::string err; + if (socket_connect(fd, arg.sh.arg, err) != 0) { + fprintf(stderr, "connect: %d %s\n", errno, strerror(errno)); + return; + } + const char op = arg.sh.op; + const int tablesize = arg.sh.conf.get_int("tablesize", 0); + for (size_t i = 0; i < arg.sh.loop; ++i) { + for (size_t j = 0; j < arg.sh.pipe; ++j) { + int k = 0, v = 0, len = 0; + if (op == 'G') { + k = rand_r(&seed); + v = rand_r(&seed); /* unused */ + if (tablesize != 0) { + k &= tablesize; + } + len = snprintf(buf, sizeof(buf), "%c\tk%d\n", op, k); + } else { + k = rand_r(&seed); + v = rand_r(&seed); + if (tablesize != 0) { + k &= tablesize; + } + len = snprintf(buf, sizeof(buf), "%c\tk%d\tv%d\n", op, k, v); + } + const int wlen = write(fd.get(), buf, len); + if (wlen != len) { + return; + } + } + size_t read_cnt = 0; + size_t read_pos = 0; + while (read_cnt < arg.sh.pipe) { + const int rlen = read(fd.get(), buf + read_pos, sizeof(buf) - read_pos); + if (rlen <= 0) { + return; + } + read_pos += rlen; + while (true) { + const char *const p = static_cast<const char *>(memchr(buf, '\n', + read_pos)); + if (p == 0) { + break; + } + ++read_cnt; + ++io_success_count; + arg.sh.increment_count(); + if (p != buf && buf[0] == '=') { + ++op_success_count; + } + const size_t rest_size = buf + read_pos - (p + 1); + if (rest_size != 0) { + memmove(buf, p + 1, rest_size); + } + read_pos = rest_size; + } + } + } +} + +void +hstest_thread::test_2_3(int test_num) +{ +#if 0 + char buf_k[128], buf_v[128]; + unsigned int seed = arg.id; + op_base_t op = static_cast<op_base_t>(arg.sh.op); + micli_ptr hnd; + if (test_num == 2) { + hnd = micli_i::create_remote(arg.sh.conf); + } else if (test_num == 3) { + // hnd = micli_i::create_inproc(arg.sh.localdb); + } + if (hnd.get() == 0) { + return; + } + for (size_t i = 0; i < arg.sh.loop; ++i) { + for (size_t j = 0; j < arg.sh.pipe; ++j) { + int k = 0, v = 0, klen = 0, vlen = 0; + k = rand_r(&seed); + klen = snprintf(buf_k, sizeof(buf_k), "k%d", k); + v = rand_r(&seed); /* unused */ + vlen = snprintf(buf_v, sizeof(buf_v), "v%d", v); + string_ref arr[2]; + arr[0] = string_ref(buf_k, klen); + arr[1] = string_ref(buf_v, vlen); + pstrarr_ptr rec(arr, 2); + if (hnd->execute(op, 0, 0, rec.get_const())) { + ++io_success_count; + arg.sh.increment_count(); + const dataset& res = hnd->get_result_ref(); + if (res.size() == 1) { + ++op_success_count; + } + } + } + } +#endif +} + +void +hstest_thread::test_4_5(int test_num) +{ +#if 0 + char buf_k[128], buf_v[8192]; + memset(buf_v, ' ', sizeof(buf_v)); + unsigned int seed = arg.id; + op_base_t op = static_cast<op_base_t>(arg.sh.op); + micli_ptr hnd; + if (test_num == 4) { + hnd = micli_i::create_remote(arg.sh.conf); + } else if (test_num == 5) { + hnd = micli_i::create_inproc(arg.sh.localdb); + } + if (hnd.get() == 0) { + return; + } + for (size_t i = 0; i < arg.sh.loop; ++i) { + for (size_t j = 0; j < arg.sh.pipe; ++j) { + int k = 0, klen = 0, vlen = 0; + k = i & 0x0000ffffUL; + if (k == 0) { + fprintf(stderr, "k=0\n"); + } + klen = snprintf(buf_k, sizeof(buf_k), "k%d", k); + vlen = rand_r(&seed) % 8192; + string_ref arr[2]; + arr[0] = string_ref(buf_k, klen); + arr[1] = string_ref(buf_v, vlen); + pstrarr_ptr rec(arr, 2); + if (hnd->execute(op, 0, 0, rec.get_const())) { + ++io_success_count; + const dataset& res = hnd->get_result_ref(); + if (res.size() == 1) { + ++op_success_count; + } + } + } + } +#endif +} + +void +hstest_thread::test_6(int test_num) +{ + int count = arg.sh.conf.get_int("count", 1); + auto_file fds[count]; + for (int i = 0; i < count; ++i) { + const double t1 = gettimeofday_double(); + std::string err; + if (socket_connect(fds[i], arg.sh.arg, err) != 0) { + fprintf(stderr, "id=%zu i=%d err=%s\n", arg.id, i, err.c_str()); + } + const double t2 = gettimeofday_double(); + if (t2 - t1 > 1) { + fprintf(stderr, "id=%zu i=%d time %f\n", arg.id, i, t2 - t1); + } + } +} + +void +hstest_thread::test_7(int num) +{ + /* + set foo 0 0 10 + 0123456789 + STORED + get foo + VALUE foo 0 10 + 0123456789 + END + get var + END + */ + char buf[1024]; + const int keep_connection = arg.sh.conf.get_int("keep_connection", 1); + unsigned int seed = arg.id; + seed ^= arg.sh.conf.get_int("seed_xor", 0); + const int tablesize = arg.sh.conf.get_int("tablesize", 0); + const char op = arg.sh.op; + for (size_t i = 0; i < arg.sh.loop; ++i) { + const double tm1 = gettimeofday_double(); + std::string err; + if (fd.get() < 0 && socket_connect(fd, arg.sh.arg, err) != 0) { + fprintf(stderr, "connect: %d %s\n", errno, strerror(errno)); + return; + } + for (size_t j = 0; j < arg.sh.pipe; ++j) { + int k = 0, v = 0, len = 0; + if (op == 'G') { + k = rand_r(&seed); + v = rand_r(&seed); /* unused */ + if (tablesize != 0) { + k &= tablesize; + } + len = snprintf(buf, sizeof(buf), "get k%d\r\n", k); + } else { + k = rand_r(&seed); + v = rand_r(&seed); + if (tablesize != 0) { + k &= tablesize; + } + char vbuf[1024]; + int vlen = snprintf(vbuf, sizeof(vbuf), + "v%d" + // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + // "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + , v); + len = snprintf(buf, sizeof(buf), "set k%d 0 0 %d\r\n%s\r\n", + k, vlen, vbuf); + } + const int wlen = write(fd.get(), buf, len); + if (wlen != len) { + return; + } + } + size_t read_cnt = 0; + size_t read_pos = 0; + bool read_response_done = false; + bool expect_value = false; + while (!read_response_done) { + const int rlen = read(fd.get(), buf + read_pos, sizeof(buf) - read_pos); + if (rlen <= 0) { + return; + } + read_pos += rlen; + while (true) { + const char *const p = static_cast<const char *>(memchr(buf, '\n', + read_pos)); + if (p == 0) { + break; + } + ++read_cnt; + if (expect_value) { + expect_value = false; + } else if (p >= buf + 6 && memcmp(buf, "VALUE ", 6) == 0) { + expect_value = true; + ++op_success_count; + } else { + if (p == buf + 7 && memcmp(buf, "STORED\r", 7) == 0) { + ++op_success_count; + } + read_response_done = true; + } + const size_t rest_size = buf + read_pos - (p + 1); + if (rest_size != 0) { + memmove(buf, p + 1, rest_size); + } + read_pos = rest_size; + } + ++io_success_count; + } + arg.sh.increment_count(); + if (!keep_connection) { + fd.close(); + } + const double tm2 = gettimeofday_double(); + set_timing(tm2 - tm1); + sleep_if(); + } +} + +struct rec { + std::string key; + std::string value; +}; + +void +hstest_thread::test_8(int test_num) +{ +#if 0 + char buf_k[128], buf_v[128]; + unsigned int seed = arg.id; + // op_base_t op = static_cast<op_base_t>(arg.sh.op); + using namespace boost::multi_index; + typedef member<rec, std::string, &rec::key> rec_get_key; + typedef ordered_unique<rec_get_key> oui; + typedef multi_index_container< rec, indexed_by<oui> > mic; + #if 0 + typedef std::map<std::string, std::string> m_type; + m_type m; + #endif + mic m; + for (size_t i = 0; i < arg.sh.loop; ++i) { + for (size_t j = 0; j < arg.sh.pipe; ++j) { + int k = 0, v = 0, klen = 0, vlen = 0; + k = rand_r(&seed); + klen = snprintf(buf_k, sizeof(buf_k), "k%d", k); + v = rand_r(&seed); /* unused */ + vlen = snprintf(buf_v, sizeof(buf_v), "v%d", v); + const std::string ks(buf_k, klen); + const std::string vs(buf_v, vlen); + rec r; + r.key = ks; + r.value = vs; + m.insert(r); + // m.insert(std::make_pair(ks, vs)); + ++io_success_count; + ++op_success_count; + arg.sh.increment_count(); + } + } +#endif +} + +struct mysqltest_thread_initobj : private noncopyable { + mysqltest_thread_initobj() { + mysql_thread_init(); + } + ~mysqltest_thread_initobj() { + mysql_thread_end(); + } +}; + +void +hstest_thread::test_9(int test_num) +{ + /* create table hstest + * ( k varchar(255) not null, v varchar(255) not null, primary key(k)) + * engine = innodb; */ + auto_mysql db; + // mysqltest_thread_initobj initobj; + std::string err; + const char op = arg.sh.op; + const std::string suffix = arg.sh.conf.get_str("value_suffix", "upd"); + unsigned long long err_cnt = 0; + unsigned long long query_cnt = 0; + #if 0 + my_bool reconnect = 0; + if (mysql_options(db, MYSQL_OPT_RECONNECT, &reconnect) != 0) { + err = "mysql_options() failed"; + ++err_cnt; + return; + } + #endif + unsigned int seed = time(0) + arg.id + 1; + seed ^= arg.sh.conf.get_int("seed_xor", 0); + drand48_data randbuf; + srand48_r(seed, &randbuf); + const std::string mysql_host = arg.sh.conf.get_str("host", "localhost"); + const int mysql_port = arg.sh.conf.get_int("mysqlport", 3306); + const int num = arg.sh.loop; + const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root"); + const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", ""); + const std::string mysql_dbname = arg.sh.conf.get_str("dbname", "hstest"); + const int keep_connection = arg.sh.conf.get_int("keep_connection", 1); + const int verbose = arg.sh.conf.get_int("verbose", 1); + const int tablesize = arg.sh.conf.get_int("tablesize", 10000); + const int moreflds = arg.sh.conf.get_int("moreflds", 0); + const std::string moreflds_prefix = arg.sh.conf.get_str( + "moreflds_prefix", "column0123456789_"); + const int use_handler = arg.sh.conf.get_int("handler", 0); + const int sched_flag = arg.sh.conf.get_int("sched", 0); + const int use_in = arg.sh.conf.get_int("in", 0); + const int ssps = use_in ? 0 : arg.sh.conf.get_int("ssps", 0); + std::string flds = "v"; + for (int i = 0; i < moreflds; ++i) { + char buf[1024]; + snprintf(buf, sizeof(buf), ",%s%d", moreflds_prefix.c_str(), i); + flds += std::string(buf); + } + int connected = 0; + std::auto_ptr<auto_mysql_stmt> stmt; + string_buffer wbuf; + for (int i = 0; i < num; ++i) { + const double tm1 = gettimeofday_double(); + const int flags = 0; + if (connected == 0) { + if (!mysql_real_connect(db, mysql_host.c_str(), + mysql_user.c_str(), mysql_user.empty() ? 0 : mysql_passwd.c_str(), + mysql_dbname.c_str(), mysql_port, 0, flags)) { + err = "failed to connect: " + std::string(mysql_error(db)); + if (verbose >= 1) { + fprintf(stderr, "e=[%s]\n", err.c_str()); + } + ++err_cnt; + return; + } + arg.sh.increment_conn(1); + } + int r = 0; + if (connected == 0 && use_handler) { + const char *const q = "handler hstest_table1 open"; + r = mysql_real_query(db, q, strlen(q)); + if (r != 0) { + err = 1; + } + } + if (connected == 0 && ssps) { + stmt.reset(new auto_mysql_stmt(db)); + const char *const q = "select v from hstest_table1 where k = ?"; + r = mysql_stmt_prepare(*stmt, q, strlen(q)); + if (r != 0) { + fprintf(stderr, "ssps err\n"); + ++err_cnt; + return; + } + } + connected = 1; + std::string result_str; + unsigned int err = 0; + unsigned int num_flds = 0, num_affected_rows = 0; + int got_data = 0; + char buf_query[16384]; + int buf_query_len = 0; + int k = 0, v = 0; + { + double kf = 0, vf = 0; + drand48_r(&randbuf, &kf); + drand48_r(&randbuf, &vf); + k = int(kf * tablesize); + v = int(vf * tablesize); + #if 0 + k = rand_r(&seed); + v = rand_r(&seed); + if (tablesize != 0) { + k %= tablesize; + } + #endif + if (op == 'G') { + if (use_handler) { + buf_query_len = snprintf(buf_query, sizeof(buf_query), + "handler hstest_table1 read `primary` = ( '%d' )", k); + // TODO: moreflds + } else if (ssps) { + // + } else if (use_in) { + wbuf.clear(); + char *p = wbuf.make_space(1024); + int len = snprintf(p, 1024, "select %s from hstest_table1 where k in ('%d'", flds.c_str(), k); + wbuf.space_wrote(len); + for (int j = 1; j < use_in; ++j) { + /* generate more key */ + drand48_r(&randbuf, &kf); + k = int(kf * tablesize); + p = wbuf.make_space(1024); + int len = snprintf(p, 1024, ", '%d'", k); + wbuf.space_wrote(len); + } + wbuf.append_literal(")"); + } else { + buf_query_len = snprintf(buf_query, sizeof(buf_query), + "select %s from hstest_table1 where k = '%d'", flds.c_str(), k); + } + } else if (op == 'U') { + buf_query_len = snprintf(buf_query, sizeof(buf_query), + "update hstest_table1 set v = '%d_%d%s' where k = '%d'", + v, k, suffix.c_str(), k); + } else if (op == 'R') { + buf_query_len = snprintf(buf_query, sizeof(buf_query), + "replace into hstest_table1 values ('%d', 'v%d')", k, v); + // TODO: moreflds + } + } + if (r == 0) { + if (ssps) { + MYSQL_BIND bind[1] = { }; + bind[0].buffer_type = MYSQL_TYPE_LONG; + bind[0].buffer = (char *)&k; + bind[0].is_null = 0; + bind[0].length = 0; + if (mysql_stmt_bind_param(*stmt, bind)) { + fprintf(stderr, "err: %s\n", mysql_stmt_error(*stmt)); + ++err_cnt; + return; + } + r = mysql_stmt_execute(*stmt); + // fprintf(stderr, "stmt exec\n"); + } else if (use_in) { + r = mysql_real_query(db, wbuf.begin(), wbuf.size()); + } else { + r = mysql_real_query(db, buf_query, buf_query_len); + // fprintf(stderr, "real query\n"); + } + ++query_cnt; + } + if (r != 0) { + err = 1; + } else if (ssps) { + if (verbose >= 0) { + char resbuf[1024]; + unsigned long res_len = 0; + MYSQL_BIND bind[1] = { }; + bind[0].buffer_type = MYSQL_TYPE_STRING; + bind[0].buffer = resbuf; + bind[0].buffer_length = sizeof(resbuf); + bind[0].length = &res_len; + if (mysql_stmt_bind_result(*stmt, bind)) { + fprintf(stderr, "err: %s\n", mysql_stmt_error(*stmt)); + ++err_cnt; + return; + } + if (mysql_stmt_fetch(*stmt)) { + fprintf(stderr, "err: %s\n", mysql_stmt_error(*stmt)); + ++err_cnt; + return; + } + if (!result_str.empty()) { + result_str += " "; + } + result_str += std::string(resbuf, res_len); + // fprintf(stderr, "SSPS RES: %s\n", result_str.c_str()); + got_data = 1; + } else { + got_data = 1; + } + } else { + auto_mysql_res res(db); + if (res != 0) { + if (verbose >= 0) { + num_flds = mysql_num_fields(res); + MYSQL_ROW row = 0; + while ((row = mysql_fetch_row(res)) != 0) { + got_data += 1; + unsigned long *const lengths = mysql_fetch_lengths(res); + if (verbose >= 2) { + for (unsigned int i = 0; i < num_flds; ++i) { + if (!result_str.empty()) { + result_str += " "; + } + result_str += std::string(row[i], lengths[i]); + } + } + } + } else { + MYSQL_ROW row = 0; + while ((row = mysql_fetch_row(res)) != 0) { + got_data += 1; + } + } + } else { + if (mysql_field_count(db) == 0) { + num_affected_rows = mysql_affected_rows(db); + } else { + err = 1; + } + } + } + if (verbose >= 2 || (verbose >= 1 && err != 0)) { + if (err) { + ++err_cnt; + const char *const errstr = mysql_error(db); + fprintf(stderr, "e=[%s] a=%u q=[%s]\n", errstr, + num_affected_rows, buf_query); + } else { + fprintf(stderr, "a=%u q=[%s] r=[%s]\n", num_affected_rows, buf_query, + result_str.c_str()); + } + } + if (err == 0) { + ++io_success_count; + if (num_affected_rows > 0 || got_data > 0) { + op_success_count += got_data; + } else { + if (verbose >= 1) { + fprintf(stderr, "k=%d numaff=%u gotdata=%d\n", + k, num_affected_rows, got_data); + } + } + arg.sh.increment_count(); + } + if (!keep_connection) { + if (stmt.get() != 0) { + stmt.reset(); + } + db.reset(); + connected = 0; + } + const double tm2 = gettimeofday_double(); + set_timing(tm2 - tm1); + sleep_if(); + if (sched_flag) { + sched_yield(); + } + } + if (verbose >= 1) { + fprintf(stderr, "thread finished (error_count=%llu)\n", err_cnt); + } +} + +void +hstest_thread::test_10(int test_num) +{ + const int keep_connection = arg.sh.conf.get_int("keep_connection", 1); + unsigned int seed = time(0) + arg.id + 1; + seed ^= arg.sh.conf.get_int("seed_xor", 0); + drand48_data randbuf; + srand48_r(seed, &randbuf); + std::string err; + int keepconn_count = 0; + const char op = arg.sh.op; + const int verbose = arg.sh.conf.get_int("verbose", 1); + const std::string suffix = arg.sh.conf.get_str("value_suffix", "upd"); + const int tablesize = arg.sh.conf.get_int("tablesize", 10000); + const int firstkey = arg.sh.conf.get_int("firstkey", 0); + const int sched_flag = arg.sh.conf.get_int("sched", 0); + const int moreflds = arg.sh.conf.get_int("moreflds", 0); + const std::string dbname = arg.sh.conf.get_str("dbname", "hstest"); + const std::string table = arg.sh.conf.get_str("table", "hstest_table1"); + const std::string index = arg.sh.conf.get_str("index", "PRIMARY"); + const std::string field = arg.sh.conf.get_str("field", "v"); + const int use_in = arg.sh.conf.get_int("in", 0); + const std::string moreflds_prefix = arg.sh.conf.get_str( + "moreflds_prefix", "column0123456789_"); + const int dump = arg.sh.dump; + const int nodup = arg.sh.conf.get_int("nodup", 0); + std::string moreflds_str; + for (int i = 0; i < moreflds; ++i) { + char sbuf[1024]; + snprintf(sbuf, sizeof(sbuf), ",%s%d", moreflds_prefix.c_str(), i); + moreflds_str += std::string(sbuf); + } + string_buffer wbuf; + char rbuf[16384]; + for (size_t i = 0; i < arg.sh.loop; ++i) { + int len = 0, rlen = 0, wlen = 0; + #if 0 + const double tm1 = gettimeofday_double(); + #endif + if (fd.get() < 0) { + if (socket_connect(fd, arg.sh.arg, err) != 0) { + fprintf(stderr, "connect: %d %s\n", errno, strerror(errno)); + return; + } + char *wp = wbuf.make_space(1024); + len = snprintf(wp, 1024, + "P\t1\t%s\t%s\tPRIMARY\t%s%s\n", dbname.c_str(), table.c_str(), + field.c_str(), moreflds_str.c_str()); + /* pst_num, db, table, index, retflds */ + wbuf.space_wrote(len); + wlen = write(fd.get(), wbuf.begin(), len); + if (len != wlen) { + fprintf(stderr, "write: %d %d\n", len, wlen); + return; + } + wbuf.clear(); + rlen = read(fd.get(), rbuf, sizeof(rbuf)); + if (rlen <= 0 || rbuf[rlen - 1] != '\n') { + fprintf(stderr, "read: rlen=%d errno=%d\n", rlen, errno); + return; + } + if (rbuf[0] != '0') { + fprintf(stderr, "failed to open table\n"); + return; + } + arg.sh.increment_conn(1); + } + const double tm1 = gettimeofday_double(); + for (size_t j = 0; j < arg.sh.pipe; ++j) { + int k = 0, v = 0; + { + while (true) { + double kf = 0, vf = 0; + drand48_r(&randbuf, &kf); + drand48_r(&randbuf, &vf); + k = int(kf * tablesize) + firstkey; + v = int(vf * tablesize) + firstkey; + if (k - firstkey < arg.sh.keygen_size) { + volatile char *const ptr = arg.sh.keygen + (k - firstkey); + // int oldv = __sync_fetch_and_or(ptr, 1); + int oldv = *ptr; + *ptr += 1; + if (nodup && oldv != 0) { + if (dump) { + fprintf(stderr, "retry\n"); + } + continue; + } + } else { + if (nodup) { + if (dump) { + fprintf(stderr, "retry2\n"); + } + continue; + } + } + size_t len = 0; + if (op == 'G') { + if (use_in) { + char *wp = wbuf.make_space(1024); + len = snprintf(wp, 1024, "1\t=\t1\t\t%d\t0\t@\t0\t%d\t%d", + use_in, use_in, k); + wbuf.space_wrote(len); + for (int j = 1; j < use_in; ++j) { + drand48_r(&randbuf, &kf); + k = int(kf * tablesize) + firstkey; + char *wp = wbuf.make_space(1024); + len = snprintf(wp, 1024, "\t%d", k); + wbuf.space_wrote(len); + } + wbuf.append_literal("\n"); + } else { + char *wp = wbuf.make_space(1024); + len = snprintf(wp, 1024, "1\t=\t1\t%d\n", k); + wbuf.space_wrote(len); + } + } else if (op == 'U') { + char *wp = wbuf.make_space(1024); + len = snprintf(wp, 1024, + "1\t=\t1\t%d\t1\t0\tU\t%d_%d%s\n", k, v, k, suffix.c_str()); + wbuf.space_wrote(len); + } + break; + } + } + } + wlen = write(fd.get(), wbuf.begin(), wbuf.size()); + if ((size_t) wlen != wbuf.size()) { + fprintf(stderr, "write: %d %d\n", (int)wbuf.size(), wlen); + return; + } + wbuf.clear(); + size_t read_cnt = 0; + size_t read_pos = 0; + while (read_cnt < arg.sh.pipe) { + rlen = read(fd.get(), rbuf + read_pos, sizeof(rbuf) - read_pos); + if (rlen <= 0) { + fprintf(stderr, "read: %d\n", rlen); + return; + } + read_pos += rlen; + while (true) { + const char *const nl = static_cast<const char *>(memchr(rbuf, '\n', + read_pos)); + if (nl == 0) { + break; + } + ++read_cnt; + ++io_success_count; + const char *t1 = static_cast<const char *>(memchr(rbuf, '\t', + nl - rbuf)); + if (t1 == 0) { + fprintf(stderr, "error \n"); + break; + } + ++t1; + const char *t2 = static_cast<const char *>(memchr(t1, '\t', + nl - t1)); + if (t2 == 0) { + if (verbose > 1) { + fprintf(stderr, "key: notfound \n"); + } + break; + } + ++t2; + if (t1 == rbuf + 2 && rbuf[0] == '0') { + if (op == 'G') { + ++op_success_count; + arg.sh.increment_count(); + } else if (op == 'U') { + const char *t3 = t2; + while (t3 != nl && t3[0] >= 0x10) { + ++t3; + } + if (t3 != t2 + 1 || t2[0] != '1') { + const std::string mess(t2, t3); + fprintf(stderr, "mod: %s\n", mess.c_str()); + } else { + ++op_success_count; + arg.sh.increment_count(); + if (arg.sh.dump && arg.sh.pipe == 1) { + fwrite(wbuf.begin(), wbuf.size(), 1, stderr); + } + } + } + } else { + const char *t3 = t2; + while (t3 != nl && t3[0] >= 0x10) { + ++t3; + } + const std::string mess(t2, t3); + fprintf(stderr, "err: %s\n", mess.c_str()); + } + const size_t rest_size = rbuf + read_pos - (nl + 1); + if (rest_size != 0) { + memmove(rbuf, nl + 1, rest_size); + } + read_pos = rest_size; + } + } + if (!keep_connection) { + fd.reset(); + arg.sh.increment_conn(-1); + } else if (keep_connection > 1 && ++keepconn_count > keep_connection) { + keepconn_count = 0; + fd.reset(); + arg.sh.increment_conn(-1); + } + const double tm2 = gettimeofday_double(); + set_timing(tm2 - tm1); + sleep_if(); + if (sched_flag) { + sched_yield(); + } + } + if (dump) { + fprintf(stderr, "done\n"); + } +} + +void +hstest_thread::sleep_if() +{ + if (arg.sh.usleep) { + struct timespec ts = { + arg.sh.usleep / 1000000, + (arg.sh.usleep % 1000000) * 1000 + }; + nanosleep(&ts, 0); + } +} + +void +hstest_thread::set_timing(double time_spent) +{ + response_min = std::min(response_min, time_spent); + response_max = std::max(response_max, time_spent); + response_sum += time_spent; + if (op_success_count != 0) { + response_avg = response_sum / op_success_count; + } +} + +void +hstest_thread::test_11(int test_num) +{ + const int keep_connection = arg.sh.conf.get_int("keep_connection", 1); + const int tablesize = arg.sh.conf.get_int("tablesize", 0); + unsigned int seed = arg.id; + seed ^= arg.sh.conf.get_int("seed_xor", 0); + std::string err; + hstcpcli_ptr cli; + for (size_t i = 0; i < arg.sh.loop; ++i) { + if (cli.get() == 0) { + cli = hstcpcli_i::create(arg.sh.arg); + cli->request_buf_open_index(0, "hstest", "hstest_table1", "", "v"); + /* pst_num, db, table, index, retflds */ + if (cli->request_send() != 0) { + fprintf(stderr, "reuqest_send: %s\n", cli->get_error().c_str()); + return; + } + size_t num_flds = 0; + if (cli->response_recv(num_flds) != 0) { + fprintf(stderr, "reuqest_recv: %s\n", cli->get_error().c_str()); + return; + } + cli->response_buf_remove(); + } + for (size_t j = 0; j < arg.sh.pipe; ++j) { + char buf[256]; + int k = 0, v = 0, len = 0; + { + k = rand_r(&seed); + v = rand_r(&seed); /* unused */ + if (tablesize != 0) { + k &= tablesize; + } + len = snprintf(buf, sizeof(buf), "%d", k); + } + const string_ref key(buf, len); + const string_ref op("=", 1); + cli->request_buf_exec_generic(0, op, &key, 1, 1, 0, string_ref(), 0, 0); + } + if (cli->request_send() != 0) { + fprintf(stderr, "reuqest_send: %s\n", cli->get_error().c_str()); + return; + } + size_t read_cnt = 0; + for (size_t j = 0; j < arg.sh.pipe; ++j) { + size_t num_flds = 0; + if (cli->response_recv(num_flds) != 0) { + fprintf(stderr, "reuqest_recv: %s\n", cli->get_error().c_str()); + return; + } + { + ++read_cnt; + ++io_success_count; + arg.sh.increment_count(); + { + ++op_success_count; + } + } + cli->response_buf_remove(); + } + if (!keep_connection) { + cli.reset(); + } + } +} + +void +hstest_thread::test_watch() +{ + const int timelimit = arg.sh.conf.get_int("timelimit", 0); + const int timelimit_offset = timelimit / 2; + int loop = 0; + double t1 = 0, t2 = 0; + size_t cnt_t1 = 0, cnt_t2 = 0; + size_t prev_cnt = 0; + double now_f = 0; + while (true) { + sleep(1); + const size_t cnt = arg.sh.count; + const size_t df = cnt - prev_cnt; + prev_cnt = cnt; + const double now_prev = now_f; + now_f = gettimeofday_double(); + if (now_prev != 0) { + const double rps = static_cast<double>(df) / (now_f - now_prev); + fprintf(stderr, "now: %zu cntdiff: %zu tdiff: %f rps: %f\n", + static_cast<size_t>(now_f), df, now_f - now_prev, rps); + } + if (timelimit != 0) { + if (arg.sh.wait_conn == 0 || arg.sh.conn_count >= arg.sh.wait_conn) { + ++loop; + } + if (loop == timelimit_offset) { + t1 = gettimeofday_double(); + cnt_t1 = cnt; + arg.sh.enable_timing = 1; + fprintf(stderr, "start timing\n"); + } else if (loop == timelimit_offset + timelimit) { + t2 = gettimeofday_double(); + cnt_t2 = cnt; + const size_t cnt_diff = cnt_t2 - cnt_t1; + const double tdiff = t2 - t1; + const double qps = cnt_diff / (tdiff != 0 ? tdiff : 1); + fprintf(stderr, "(%f: %zu, %f: %zu), %10.5f qps\n", + t1, cnt_t1, t2, cnt_t2, qps); + size_t keycnt = 0; + for (int i = 0; i < arg.sh.keygen_size; ++i) { + if (arg.sh.keygen[i]) { + ++keycnt; + } + } + fprintf(stderr, "keygen=%zu\n", keycnt); + break; + } + } + } +#if 0 + int loop = 0; + double t1 = 0, t2 = 0; + size_t cnt_t1 = 0, cnt_t2 = 0; + size_t prev_cnt = 0; + while (true) { + sleep(1); + const size_t cnt = arg.sh.count; + const size_t df = cnt - prev_cnt; + prev_cnt = cnt; + const size_t now = time(0); + fprintf(stderr, "%zu %zu\n", now, df); + if (timelimit != 0) { + ++loop; + if (loop == timelimit_offset) { + t1 = gettimeofday_double(); + cnt_t1 = cnt; + } else if (loop == timelimit_offset + timelimit) { + t2 = gettimeofday_double(); + cnt_t2 = cnt; + const size_t cnt_diff = cnt_t2 - cnt_t1; + const double tdiff = t2 - t1; + const double qps = cnt_diff / (tdiff != 0 ? tdiff : 1); + fprintf(stderr, "(%f: %zu, %f: %zu), %10.5f qps\n", + t1, cnt_t1, t2, cnt_t2, qps); + size_t keycnt = 0; + for (int i = 0; i < arg.sh.keygen_size; ++i) { + if (arg.sh.keygen[i]) { + ++keycnt; + } + } + fprintf(stderr, "keygen=%zu\n", keycnt); + _exit(0); + } + } + } +#endif +} + +void +hstest_thread::test_12(int test_num) +{ + /* NOTE: num_threads should be 1 */ + /* create table hstest + * ( k varchar(255) not null, v varchar(255) not null, primary key(k)) + * engine = innodb; */ + mysqltest_thread_initobj initobj; + auto_mysql db; + std::string err; + unsigned long long err_cnt = 0; + unsigned long long query_cnt = 0; + #if 0 + my_bool reconnect = 0; + if (mysql_options(db, MYSQL_OPT_RECONNECT, &reconnect) != 0) { + err = "mysql_options() failed"; + ++err_cnt; + return; + } + #endif + const std::string mysql_host = arg.sh.conf.get_str("host", "localhost"); + const int mysql_port = arg.sh.conf.get_int("mysqlport", 3306); + const unsigned int num = arg.sh.loop; + const size_t pipe = arg.sh.pipe; + const std::string mysql_user = arg.sh.conf.get_str("mysqluser", "root"); + const std::string mysql_passwd = arg.sh.conf.get_str("mysqlpass", ""); + const std::string mysql_dbname = arg.sh.conf.get_str("db", "hstest"); + const int keep_connection = arg.sh.conf.get_int("keep_connection", 1); + const int verbose = arg.sh.conf.get_int("verbose", 1); + const int use_handler = arg.sh.conf.get_int("handler", 0); + int connected = 0; + unsigned int k = 0; + string_buffer buf; + for (unsigned int i = 0; i < num; ++i) { + const int flags = 0; + if (connected == 0 && !mysql_real_connect(db, mysql_host.c_str(), + mysql_user.c_str(), mysql_user.empty() ? 0 : mysql_passwd.c_str(), + mysql_dbname.c_str(), mysql_port, 0, flags)) { + err = "failed to connect: " + std::string(mysql_error(db)); + if (verbose >= 1) { + fprintf(stderr, "e=[%s]\n", err.c_str()); + } + ++err_cnt; + return; + } + int r = 0; + if (connected == 0 && use_handler) { + const char *const q = "handler hstest open"; + r = mysql_real_query(db, q, strlen(q)); + if (r != 0) { + err = 1; + } + } + connected = 1; + std::string result_str; + unsigned int err = 0; + unsigned int num_flds = 0, num_affected_rows = 0; + int got_data = 0; + buf.clear(); + buf.append_literal("insert into hstest values "); + for (size_t j = 0; j < pipe; ++j) { + const unsigned int v = ~k; + if (j != 0) { + buf.append_literal(","); + } + char *wp = buf.make_space(64); + int buf_query_len = snprintf(wp, 64, "('k%u', 'v%u')", k, v); + buf.space_wrote(buf_query_len); + ++k; + } + if (r == 0) { + r = mysql_real_query(db, buf.begin(), buf.size()); + ++query_cnt; + } + if (r != 0) { + err = 1; + } else { + auto_mysql_res res(db); + if (res != 0) { + if (verbose >= 0) { + num_flds = mysql_num_fields(res); + MYSQL_ROW row = 0; + while ((row = mysql_fetch_row(res)) != 0) { + got_data = 1; + unsigned long *const lengths = mysql_fetch_lengths(res); + if (verbose >= 2) { + for (unsigned int i = 0; i < num_flds; ++i) { + if (!result_str.empty()) { + result_str += " "; + } + result_str += std::string(row[i], lengths[i]); + } + } + } + } + } else { + if (mysql_field_count(db) == 0) { + num_affected_rows = mysql_affected_rows(db); + } else { + err = 1; + } + } + } + if (verbose >= 2 || (verbose >= 1 && err != 0)) { + if (err) { + ++err_cnt; + const char *const errstr = mysql_error(db); + fprintf(stderr, "e=[%s] a=%u q=[%s]\n", errstr, + num_affected_rows, std::string(buf.begin(), buf.size()).c_str()); + } else { + fprintf(stderr, "a=%u q=[%s] r=[%s]\n", num_affected_rows, + std::string(buf.begin(), buf.size()).c_str(), + result_str.c_str()); + } + } + if (err == 0) { + ++io_success_count; + if (num_affected_rows > 0 || got_data > 0) { + ++op_success_count; + } + arg.sh.increment_count(pipe); + } + if (!keep_connection) { + db.reset(); + connected = 0; + } + } + if (verbose >= 1) { + fprintf(stderr, "thread finished (error_count=%llu)\n", err_cnt); + } +} + +void +hstest_thread::test_21(int num) +{ + /* fsync test */ + unsigned int id = arg.id; + std::string err; + #if 0 + if (socket_connect(fd, arg.sh.arg, err) != 0) { + fprintf(stderr, "connect: %d %s\n", errno, strerror(errno)); + return; + } + #endif + auto_file logfd; + char fname[1024]; + snprintf(fname, sizeof(fname), "synctest_%u", id); + int open_flags = O_WRONLY | O_CREAT | O_TRUNC | O_APPEND; + logfd.reset(open(fname, open_flags, 0644)); + if (logfd.get() < 0) { + fprintf(stderr, "open: %s: %d %s\n", fname, errno, strerror(errno)); + return; + } + char buf[1024]; + unsigned long long count = 0; + while (true) { + snprintf(buf, sizeof(buf), "%u %llu\n", id, count); + const size_t len = strlen(buf); + if (write(logfd.get(), buf, len) != (ssize_t)len) { + fprintf(stderr, "write: %s: %d %s\n", fname, errno, strerror(errno)); + return; + } + #if 0 + if (write(fd.get(), buf, len) != (ssize_t)len) { + fprintf(stderr, "write(sock): %d %s\n", errno, strerror(errno)); + return; + } + #endif + if (fdatasync(logfd.get()) != 0) { + fprintf(stderr, "fsync: %s: %d %s\n", fname, errno, strerror(errno)); + return; + } + ++count; + ++op_success_count; + arg.sh.increment_count(); + } +} + +void +hstest_thread::test_22(int num) +{ + /* dd if=/dev/zero of=dummy.dat bs=1024M count=100 */ + unsigned int id = arg.id; + std::string err; + auto_file filefd; + char fname[1024]; + snprintf(fname, sizeof(fname), "dummy.dat"); + int open_flags = O_RDONLY | O_DIRECT; + filefd.reset(open(fname, open_flags, 0644)); + if (filefd.get() < 0) { + fprintf(stderr, "open: %s: %d %s\n", fname, errno, strerror(errno)); + return; + } + char buf_x[4096 * 2]; + char *const buf = (char *)(size_t(buf_x + 4096) / 4096 * 4096); + unsigned long long count = 0; + drand48_data randbuf; + unsigned long long seed = time(0); + seed *= 10; + seed += id; + srand48_r(seed, &randbuf); + for (unsigned int i = 0; i < arg.sh.loop; ++i) { + double kf = 0; + drand48_r(&randbuf, &kf); + kf *= (209715200 / 1); + // fprintf(stderr, "v=%f\n", kf); + off_t v = static_cast<off_t>(kf); + v %= (209715200 / 1); + v *= (512 * 1); + const double tm1 = gettimeofday_double(); + const ssize_t r = pread(filefd.get(), buf, (512 * 1), v); + const double tm2 = gettimeofday_double(); + if (r < 0) { + fprintf(stderr, "pread: %s: %d %s\n", fname, errno, strerror(errno)); + return; + } + ++count; + ++op_success_count; + arg.sh.increment_count(); + set_timing(tm2 - tm1); + } +} + +void +hstest_thread::operator ()() +{ + if (arg.watch_flag) { + return test_watch(); + } + int test_num = arg.sh.conf.get_int("test", 1); + if (test_num == 1) { + test_1(); + } else if (test_num == 2 || test_num == 3) { + test_2_3(test_num); + } else if (test_num == 4 || test_num == 5) { + test_4_5(test_num); + } else if (test_num == 6) { + test_6(test_num); + } else if (test_num == 7) { + test_7(test_num); + } else if (test_num == 8) { + test_8(test_num); + } else if (test_num == 9) { + test_9(test_num); + } else if (test_num == 10) { + test_10(test_num); + } else if (test_num == 11) { + test_11(test_num); + } else if (test_num == 12) { + test_12(test_num); + } else if (test_num == 21) { + test_21(test_num); + } else if (test_num == 22) { + test_22(test_num); + } + const int halt = arg.sh.conf.get_int("halt", 0); + if (halt) { + fprintf(stderr, "thread halted\n"); + while (true) { + sleep(100000); + } + } + fprintf(stderr, "thread finished\n"); +} + +int +hstest_main(int argc, char **argv) +{ + ignore_sigpipe(); + hstest_shared shared; + parse_args(argc, argv, shared.conf); + shared.conf["port"] = shared.conf["hsport"]; + shared.arg.set(shared.conf); + shared.loop = shared.conf.get_int("num", 1000); + shared.pipe = shared.conf.get_int("pipe", 1); + shared.verbose = shared.conf.get_int("verbose", 1); + const int tablesize = shared.conf.get_int("tablesize", 0); + std::vector<char> keygen(tablesize); + shared.keygen = &keygen[0]; + shared.keygen_size = tablesize; + shared.usleep = shared.conf.get_int("usleep", 0); + shared.dump = shared.conf.get_int("dump", 0); + shared.num_threads = shared.conf.get_int("num_threads", 10); + shared.wait_conn = shared.conf.get_int("wait_conn", 0); + const std::string op = shared.conf.get_str("op", "G"); + if (op.size() > 0) { + shared.op = op[0]; + } + #if 0 + const int localdb_flag = shared.conf.get_int("local", 0); + if (localdb_flag) { + shared.localdb = database_i::create(shared.conf); + } + #endif + const int num_thrs = shared.num_threads; + typedef thread<hstest_thread> thread_type; + typedef std::auto_ptr<thread_type> thread_ptr; + typedef auto_ptrcontainer< std::vector<thread_type *> > thrs_type; + thrs_type thrs; + for (int i = 0; i < num_thrs; ++i) { + const hstest_thread::arg_type arg(i, shared, false); + thread_ptr thr(new thread<hstest_thread>(arg)); + thrs.push_back_ptr(thr); + } + for (size_t i = 0; i < thrs.size(); ++i) { + thrs[i]->start(); + } + thread_ptr watch_thread; + const int timelimit = shared.conf.get_int("timelimit", 0); + { + const hstest_thread::arg_type arg(0, shared, true); + watch_thread = thread_ptr(new thread<hstest_thread>(arg)); + watch_thread->start(); + } + size_t iocnt = 0, opcnt = 0; + double respmin = 999999, respmax = 0; + double respsum = 0; + if (timelimit != 0) { + watch_thread->join(); + } + for (size_t i = 0; i < thrs.size(); ++i) { + if (timelimit == 0) { + thrs[i]->join(); + } + iocnt += (*thrs[i])->io_success_count; + opcnt += (*thrs[i])->op_success_count; + respmin = std::min(respmin, (*thrs[i])->response_min); + respmax = std::max(respmax, (*thrs[i])->response_max); + respsum += (*thrs[i])->response_sum; + } + fprintf(stderr, "io_success_count=%zu op_success_count=%zu\n", iocnt, opcnt); + fprintf(stderr, "respmin=%f respmax=%f respsum=%f respavg=%f\n", + respmin, respmax, respsum, respsum / opcnt); + size_t keycnt = 0; + for (size_t i = 0; i < keygen.size(); ++i) { + if (keygen[i]) { + ++keycnt; + } + } + fprintf(stderr, "keycnt=%zu\n", keycnt); + _exit(0); + return 0; +} + +}; + +int +main(int argc, char **argv) +{ + return dena::hstest_main(argc, argv); +} + diff --git a/plugin/handler_socket/client/hstest.pl b/plugin/handler_socket/client/hstest.pl new file mode 100755 index 00000000..1363e153 --- /dev/null +++ b/plugin/handler_socket/client/hstest.pl @@ -0,0 +1,228 @@ +#!/usr/bin/env perl + +# vim:sw=8:ai:ts=8 + +use strict; +use warnings; + +use DBI; +use Net::HandlerSocket; + +my %conf = (); +for my $i (@ARGV) { + my ($k, $v) = split(/=/, $i); + $conf{$k} = $v; +} + +my $verbose = get_conf("verbose", 0); +my $actions_str = get_conf("actions", "hsread"); +my $tablesize = get_conf("tablesize", 10000); +my $db = get_conf("db", "hstest"); +my $table = get_conf("table", "hstest_table1"); +my $engine = get_conf("engine", "innodb"); +my $host = get_conf("host", "localhost"); +my $mysqlport = get_conf("mysqlport", 3306); +my $mysqluser = get_conf("mysqluser", "root"); +my $mysqlpass = get_conf("mysqlpass", ""); +my $hsport = get_conf("hsport", 9999); +my $loop = get_conf("loop", 10000); +my $op = get_conf("op", "="); +my $ssps = get_conf("ssps", 0); +my $num_moreflds = get_conf("moreflds", 0); +my $moreflds_prefix = get_conf("moreflds_prefix", "column0123456789_"); +my $keytype = get_conf("keytype", "varchar(32)"); +my $file = get_conf("file", undef); + +my $dsn = "DBI:MariaDB:database=;host=$host;port=$mysqlport" + . ";mariadb_server_prepare=$ssps"; +my $dbh = DBI->connect($dsn, $mysqluser, $mysqlpass, { RaiseError => 1 }); +my $hsargs = { 'host' => $host, 'port' => $hsport }; +my $cli = new Net::HandlerSocket($hsargs); + +my @actions = split(/,/, $actions_str); +for my $action (@actions) { + if ($action eq "table") { + print("TABLE $db.$table\n"); + $dbh->do("drop database if exists $db"); + $dbh->do("create database $db"); + $dbh->do("use $db"); + my $moreflds = get_createtbl_moreflds_str(); + $dbh->do( + "create table $table (" . + "k $keytype primary key" . + ",v varchar(32) not null" . + $moreflds . + ") character set utf8 collate utf8_bin " . + "engine = $engine"); + } elsif ($action eq "insert") { + print("INSERT $db.$table tablesize=$tablesize\n"); + $dbh->do("use $db"); + my $moreflds = get_insert_moreflds_str(); + for (my $i = 0; $i < $tablesize; $i += 100) { + my $qstr = "insert into $db.$table values"; + for (my $j = 0; $j < 100; ++$j) { + if ($j != 0) { + $qstr .= ","; + } + my $k = "" . ($i + $j); + my $v = "v" . int(rand(1000)) . ($i + $j); + $qstr .= "('$k', '$v'"; + for (my $j = 0; $j < $num_moreflds; ++$j) { + $qstr .= ",'$j'"; + } + $qstr .= ")"; + } + $dbh->do($qstr); + print "$i/$tablesize\n" if $i % 1000 == 0; + } + } elsif ($action eq "read") { + print("READ $db.$table op=$op loop=$loop\n"); + $dbh->do("use $db"); + my $moreflds = get_select_moreflds_str(); + my $sth = $dbh->prepare( + "select k,v$moreflds from $db.$table where k = ?"); + for (my $i = 0; $i < $loop; ++$i) { + my $k = "" . int(rand($tablesize)); + # print "k=$k\n"; + $sth->execute($k); + if ($verbose >= 10) { + print "RET:"; + while (my $ref = $sth->fetchrow_arrayref()) { + my $rk = $ref->[0]; + my $rv = $ref->[1]; + print " $rk $rv"; + } + print "\n"; + } + print "$i/$loop\n" if $i % 1000 == 0; + } + } elsif ($action eq "hsinsert") { + print("HSINSERT $db.$table tablesize=$tablesize\n"); + $cli->open_index(1, $db, $table, '', 'k,v'); + for (my $i = 0; $i < $tablesize; ++$i) { + my $k = "" . $i; + my $v = "v" . int(rand(1000)) . $i; + my $r = $cli->execute_insert(1, [ $k, $v ]); + if ($r->[0] != 0) { + die; + } + print "$i/$tablesize\n" if $i % 1000 == 0; + } + } elsif ($action eq "hsread") { + print("HSREAD $db.$table op=$op loop=$loop\n"); + my $moreflds = get_select_moreflds_str(); + $cli->open_index(1, $db, $table, '', "k,v$moreflds"); + for (my $i = 0; $i < $loop; ++$i) { + my $k = "" . int(rand($tablesize)); + # print "k=$k\n"; + my $r = $cli->execute_find(1, $op, [ $k ], 1, 0); + if ($verbose >= 10) { + my $len = scalar(@{$r}); + print "LEN=$len"; + for my $e (@{$r}) { + print " [$e]"; + } + print "\n"; + } + print "$i/$loop\n" if $i % 1000 == 0; + } + } elsif ($action eq "hsupdate") { + my $vbase = "v" . int(rand(1000)); + print("HSUPDATE $db.$table op=$op loop=$loop vbase=$vbase\n"); + $cli->open_index(1, $db, $table, '', 'v'); + for (my $i = 0; $i < $loop; ++$i) { + my $k = "" . int(rand($tablesize)); + my $v = $vbase . $i; + print "k=$k v=$v\n"; + my $r = $cli->execute_update(1, $op, [ $k ], 1, 0, + [ $v ]); + if ($verbose >= 10) { + print "UP k=$k v=$v\n"; + } + print "$i/$loop\n" if $i % 1000 == 0; + } + } elsif ($action eq "hsdelete") { + print("HSDELETE $db.$table op=$op loop=$loop\n"); + $cli->open_index(1, $db, $table, '', ''); + for (my $i = 0; $i < $loop; ++$i) { + my $k = "" . int(rand($tablesize)); + print "k=$k\n"; + my $r = $cli->execute_delete(1, $op, [ $k ], 1, 0); + if ($verbose >= 10) { + print "DEL k=$k\n"; + } + print "$i/$loop\n" if $i % 1000 == 0; + } + } elsif ($action eq "verify") { + verify_do(); + } +} + +sub verify_do { + my ($fail_cnt, $ok_cnt) = (0, 0); + my $sth = $dbh->prepare("select v from $db.$table where k = ?"); + use FileHandle; + my $fh = new FileHandle($file, "r"); + while (my $line = <$fh>) { + chomp($line); + my @vec = split(/\t/, $line); + my $k = $vec[3]; + my $v = $vec[7]; + next if (!defined($k) || !defined($v)); + # print "$k $v\n"; + $sth->execute($k); + my $aref = $sth->fetchrow_arrayref(); + if (!defined($aref)) { + print "FAILED: $k notfound\n"; + ++$fail_cnt; + } else { + my $gv = $aref->[0]; + if ($gv ne $v) { + print "FAILED: $k got=$gv expected=$v\n"; + ++$fail_cnt; + } else { + print "OK: $k $v $gv\n" if $verbose >= 10; + ++$ok_cnt; + } + } + } + print "OK=$ok_cnt FAIL=$fail_cnt\n"; +} + +sub get_conf { + my ($key, $def) = @_; + my $val = $conf{$key}; + if ($val) { + print "$key=$val\n"; + } else { + $val = $def; + $def ||= ''; + print "$key=$def(default)\n"; + } + return $val; +} + +sub get_createtbl_moreflds_str { + my $s = ""; + for (my $j = 0; $j < $num_moreflds; ++$j) { + $s .= ",$moreflds_prefix$j varchar(30)"; + } + return $s; +} + +sub get_select_moreflds_str { + my $s = ""; + for (my $i = 0; $i < $num_moreflds; ++$i) { + $s .= ",$moreflds_prefix$i"; + } + return $s; +} + +sub get_insert_moreflds_str { + my $s = ""; + for (my $i = 0; $i < $num_moreflds; ++$i) { + $s .= ",?"; + } + return $s; +} + diff --git a/plugin/handler_socket/client/hstest_hs.sh b/plugin/handler_socket/client/hstest_hs.sh new file mode 100755 index 00000000..1b9eee18 --- /dev/null +++ b/plugin/handler_socket/client/hstest_hs.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +exec ./hstest test=10 tablesize=10000 host=localhost hsport=9998 num=10000000 \ + num_threads=100 timelimit=10 $@ diff --git a/plugin/handler_socket/client/hstest_hs_more50.sh b/plugin/handler_socket/client/hstest_hs_more50.sh new file mode 100755 index 00000000..b7539c52 --- /dev/null +++ b/plugin/handler_socket/client/hstest_hs_more50.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +exec ./hstest test=10 key_mask=9999 host=localhost port=9998 num=10000000 \ + num_threads=100 timelimit=10 moreflds=50 $@ diff --git a/plugin/handler_socket/client/hstest_md.sh b/plugin/handler_socket/client/hstest_md.sh new file mode 100755 index 00000000..8129f884 --- /dev/null +++ b/plugin/handler_socket/client/hstest_md.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +./hstest test=7 key_mask=9999 host=localhost port=11211 num=10000 \ + num_threads=10 timelimit=10 op=R $@ +./hstest test=7 key_mask=9999 host=localhost port=11211 num=1000000 \ + num_threads=100 timelimit=10 op=G $@ + diff --git a/plugin/handler_socket/client/hstest_my.sh b/plugin/handler_socket/client/hstest_my.sh new file mode 100755 index 00000000..cf917cf4 --- /dev/null +++ b/plugin/handler_socket/client/hstest_my.sh @@ -0,0 +1,3 @@ +#!/bin/bash +exec ./hstest test=9 tablesize=9999 host=localhost mysqlport=3306 num=1000000 \ + num_threads=100 verbose=1 timelimit=10 $@ diff --git a/plugin/handler_socket/client/hstest_my_more50.sh b/plugin/handler_socket/client/hstest_my_more50.sh new file mode 100755 index 00000000..6782b5e8 --- /dev/null +++ b/plugin/handler_socket/client/hstest_my_more50.sh @@ -0,0 +1,3 @@ +#!/bin/bash +exec ./hstest test=9 key_mask=9999 host=localhost port=3306 num=1000000 \ + num_threads=100 verbose=1 timelimit=10 moreflds=50 $@ |