summaryrefslogtreecommitdiffstats
path: root/plugin/handler_socket/handlersocket
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:07:14 +0000
commita175314c3e5827eb193872241446f2f8f5c9d33c (patch)
treecd3d60ca99ae00829c52a6ca79150a5b6e62528b /plugin/handler_socket/handlersocket
parentInitial commit. (diff)
downloadmariadb-10.5-upstream.tar.xz
mariadb-10.5-upstream.zip
Adding upstream version 1:10.5.12.upstream/1%10.5.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'plugin/handler_socket/handlersocket')
-rw-r--r--plugin/handler_socket/handlersocket/COPYRIGHT.txt27
-rw-r--r--plugin/handler_socket/handlersocket/Makefile.am8
-rw-r--r--plugin/handler_socket/handlersocket/Makefile.plain.template31
-rw-r--r--plugin/handler_socket/handlersocket/database.cpp1180
-rw-r--r--plugin/handler_socket/handlersocket/database.hpp142
-rw-r--r--plugin/handler_socket/handlersocket/handlersocket.cpp217
-rw-r--r--plugin/handler_socket/handlersocket/handlersocket.spec.template29
-rw-r--r--plugin/handler_socket/handlersocket/hstcpsvr.cpp147
-rw-r--r--plugin/handler_socket/handlersocket/hstcpsvr.hpp58
-rw-r--r--plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp957
-rw-r--r--plugin/handler_socket/handlersocket/hstcpsvr_worker.hpp35
-rw-r--r--plugin/handler_socket/handlersocket/mysql_incl.hpp55
12 files changed, 2886 insertions, 0 deletions
diff --git a/plugin/handler_socket/handlersocket/COPYRIGHT.txt b/plugin/handler_socket/handlersocket/COPYRIGHT.txt
new file mode 100644
index 00000000..41dda127
--- /dev/null
+++ b/plugin/handler_socket/handlersocket/COPYRIGHT.txt
@@ -0,0 +1,27 @@
+
+ Copyright (c) 2010 DeNA Co.,Ltd.
+ All rights reserved.
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+ * Neither the name of DeNA Co.,Ltd. nor the names of its contributors
+ may be used to endorse or promote products derived from this software
+ without specific prior written permission.
+
+ THIS SOFTWARE IS PROVIDED BY DeNA Co.,Ltd. "AS IS" AND ANY EXPRESS OR
+ IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+ EVENT SHALL DeNA Co.,Ltd. BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
diff --git a/plugin/handler_socket/handlersocket/Makefile.am b/plugin/handler_socket/handlersocket/Makefile.am
new file mode 100644
index 00000000..7e47209b
--- /dev/null
+++ b/plugin/handler_socket/handlersocket/Makefile.am
@@ -0,0 +1,8 @@
+pkgplugindir = $(PLUGIN_DIR)
+CXXFLAGS += -fimplicit-templates
+noinst_HEADERS = database.hpp hstcpsvr.hpp hstcpsvr_worker.hpp mysql_incl.hpp
+pkgplugin_LTLIBRARIES = handlersocket.la
+handlersocket_la_LDFLAGS = -module ../libhsclient/libhsclient.la -L$(top_builddir)/libservices -lmysqlservices
+handlersocket_la_CXXFLAGS = $(MYSQL_INC) $(MYSQL_CFLAGS) $(AM_CXXFLAGS) -I$(srcdir)/../libhsclient
+handlersocket_la_SOURCES = database.cpp handlersocket.cpp \
+ hstcpsvr_worker.cpp hstcpsvr.cpp
diff --git a/plugin/handler_socket/handlersocket/Makefile.plain.template b/plugin/handler_socket/handlersocket/Makefile.plain.template
new file mode 100644
index 00000000..4d5f8c10
--- /dev/null
+++ b/plugin/handler_socket/handlersocket/Makefile.plain.template
@@ -0,0 +1,31 @@
+
+MYSQL_INC = HANDLERSOCKET_MYSQL_INC
+MYSQL_LIB = HANDLERSOCKET_MYSQL_LIB
+
+CXX = g++ -Wall -g -fno-rtti -fno-exceptions -fPIC -DPIC
+LIBS = $(MYSQL_LIB) -lhsclient -lpthread -lz
+CXXFLAGS = -I/usr/include/handlersocket $(MYSQL_INC)
+LDFLAGS =
+
+CXXFLAGS += -O3 -DNDEBUG
+
+HANDLERSOCKET_OBJS = database.o hstcpsvr.o hstcpsvr_worker.o
+
+all: handlersocket.so
+
+handlersocket.so: $(HANDLERSOCKET_OBJS) handlersocket.cpp
+ $(CXX) $(CXXFLAGS) -fno-strict-aliasing -shared $^ -o $@ $(LDFLAGS) \
+ -Wl,-soname -Wl,$@ $(LIBS)
+clean:
+ rm -f *.a *.so *.o
+
+LIBDIR = $(shell \
+ if [ -e /usr/lib64/mysql ]; then echo /usr/lib64; else echo /usr/lib; fi)
+
+install: handlersocket.so
+ sudo sh -c 'ulimit -c unlimited ; /etc/init.d/mysql stop ; \
+ cp handlersocket.so handlersocket.so.cpy && \
+ mv handlersocket.so.cpy \
+ $(LIBDIR)/mysql/plugin/handlersocket.so && \
+ /etc/init.d/mysql start'
+
diff --git a/plugin/handler_socket/handlersocket/database.cpp b/plugin/handler_socket/handlersocket/database.cpp
new file mode 100644
index 00000000..937b1177
--- /dev/null
+++ b/plugin/handler_socket/handlersocket/database.cpp
@@ -0,0 +1,1180 @@
+
+// vim:sw=2:ai
+
+/*
+ * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
+ * See COPYRIGHT.txt for details.
+ */
+
+#include <my_global.h>
+#include <string.h>
+
+#include "database.hpp"
+#include "string_util.hpp"
+#include "escape.hpp"
+#include "mysql_incl.hpp"
+
+#define DBG_KEY(x)
+#define DBG_SHUT(x)
+#define DBG_LOCK(x)
+#define DBG_THR(x)
+#define DBG_CMP(x)
+#define DBG_FLD(x)
+#define DBG_FILTER(x)
+#define DBG_REFCNT(x)
+#define DBG_KEYLEN(x)
+#define DBG_DELETED
+
+/* status variables */
+unsigned long long int open_tables_count;
+unsigned long long int close_tables_count;
+unsigned long long int lock_tables_count;
+unsigned long long int unlock_tables_count;
+unsigned long long int index_exec_count;
+
+namespace dena {
+
+prep_stmt::prep_stmt()
+ : dbctx(0), table_id(static_cast<size_t>(-1)),
+ idxnum(static_cast<size_t>(-1))
+{
+}
+prep_stmt::prep_stmt(dbcontext_i *c, size_t tbl, size_t idx,
+ const fields_type& rf, const fields_type& ff)
+ : dbctx(c), table_id(tbl), idxnum(idx), ret_fields(rf), filter_fields(ff)
+{
+ if (dbctx) {
+ dbctx->table_addref(table_id);
+ }
+}
+prep_stmt::~prep_stmt()
+{
+ if (dbctx) {
+ dbctx->table_release(table_id);
+ }
+}
+
+prep_stmt::prep_stmt(const prep_stmt& x)
+ : dbctx(x.dbctx), table_id(x.table_id), idxnum(x.idxnum),
+ ret_fields(x.ret_fields), filter_fields(x.filter_fields)
+{
+ if (dbctx) {
+ dbctx->table_addref(table_id);
+ }
+}
+
+prep_stmt&
+prep_stmt::operator =(const prep_stmt& x)
+{
+ if (this != &x) {
+ if (dbctx) {
+ dbctx->table_release(table_id);
+ }
+ dbctx = x.dbctx;
+ table_id = x.table_id;
+ idxnum = x.idxnum;
+ ret_fields = x.ret_fields;
+ filter_fields = x.filter_fields;
+ if (dbctx) {
+ dbctx->table_addref(table_id);
+ }
+ }
+ return *this;
+}
+
+struct database : public database_i, private noncopyable {
+ database(const config& c);
+ virtual ~database();
+ virtual dbcontext_ptr create_context(bool for_write) volatile;
+ virtual void stop() volatile;
+ virtual const config& get_conf() const volatile;
+ public:
+ int child_running;
+ private:
+ config conf;
+};
+
+struct tablevec_entry {
+ TABLE *table;
+ size_t refcount;
+ bool modified;
+ tablevec_entry() : table(0), refcount(0), modified(false) { }
+};
+
+struct expr_user_lock : private noncopyable {
+ expr_user_lock(THD *thd, int timeout)
+ : lck_key(thd, "handlersocket_wr", 16, &my_charset_latin1),
+ lck_timeout(thd, timeout),
+ lck_func_get_lock(thd, &lck_key, &lck_timeout),
+ lck_func_release_lock(thd, &lck_key)
+ {
+ lck_key.fix_fields(thd, 0);
+ lck_timeout.fix_fields(thd, 0);
+ lck_func_get_lock.fix_fields(thd, 0);
+ lck_func_release_lock.fix_fields(thd, 0);
+ }
+ long long get_lock() {
+ return lck_func_get_lock.val_int();
+ }
+ long long release_lock() {
+ return lck_func_release_lock.val_int();
+ }
+ private:
+ Item_string lck_key;
+ Item_int lck_timeout;
+ Item_func_get_lock lck_func_get_lock;
+ Item_func_release_lock lck_func_release_lock;
+};
+
+struct dbcontext : public dbcontext_i, private noncopyable {
+ dbcontext(volatile database *d, bool for_write);
+ virtual ~dbcontext();
+ virtual void init_thread(const void *stack_botton,
+ volatile int& shutdown_flag);
+ virtual void term_thread();
+ virtual bool check_alive();
+ virtual void lock_tables_if();
+ virtual void unlock_tables_if();
+ virtual bool get_commit_error();
+ virtual void clear_error();
+ virtual void close_tables_if();
+ virtual void table_addref(size_t tbl_id);
+ virtual void table_release(size_t tbl_id);
+ virtual void cmd_open(dbcallback_i& cb, const cmd_open_args& args);
+ virtual void cmd_exec(dbcallback_i& cb, const cmd_exec_args& args);
+ virtual void set_statistics(size_t num_conns, size_t num_active);
+ private:
+ int set_thread_message(const char *fmt, ...)
+ __attribute__((format (printf, 2, 3)));
+ bool parse_fields(TABLE *const table, const char *str,
+ prep_stmt::fields_type& flds);
+ void cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
+ const string_ref *fvals, size_t fvalslen);
+ void cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst,
+ const string_ref *fvals, size_t fvalslen);
+ void cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
+ ha_rkey_function find_flag, const cmd_exec_args& args);
+ size_t calc_filter_buf_size(TABLE *table, const prep_stmt& pst,
+ const record_filter *filters);
+ bool fill_filter_buf(TABLE *table, const prep_stmt& pst,
+ const record_filter *filters, uchar *filter_buf, size_t len);
+ int check_filter(dbcallback_i& cb, TABLE *table, const prep_stmt& pst,
+ const record_filter *filters, const uchar *filter_buf);
+ void resp_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
+ void dump_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
+ int modify_record(dbcallback_i& cb, TABLE *const table,
+ const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
+ size_t& modified_count);
+ private:
+ typedef std::vector<tablevec_entry> table_vec_type;
+ typedef std::pair<std::string, std::string> table_name_type;
+ typedef std::map<table_name_type, size_t> table_map_type;
+ private:
+ volatile database *const dbref;
+ bool for_write_flag;
+ THD *thd;
+ MYSQL_LOCK *lock;
+ bool lock_failed;
+ std::auto_ptr<expr_user_lock> user_lock;
+ int user_level_lock_timeout;
+ bool user_level_lock_locked;
+ bool commit_error;
+ std::vector<char> info_message_buf;
+ table_vec_type table_vec;
+ table_map_type table_map;
+};
+
+database::database(const config& c)
+ : child_running(1), conf(c)
+{
+}
+
+database::~database()
+{
+}
+
+dbcontext_ptr
+database::create_context(bool for_write) volatile
+{
+ return dbcontext_ptr(new dbcontext(this, for_write));
+}
+
+void
+database::stop() volatile
+{
+ child_running = false;
+}
+
+const config&
+database::get_conf() const volatile
+{
+ return const_cast<const config&>(conf);
+}
+
+database_ptr
+database_i::create(const config& conf)
+{
+ return database_ptr(new database(conf));
+}
+
+dbcontext::dbcontext(volatile database *d, bool for_write)
+ : dbref(d), for_write_flag(for_write), thd(0), lock(0), lock_failed(false),
+ user_level_lock_timeout(0), user_level_lock_locked(false),
+ commit_error(false)
+{
+ info_message_buf.resize(8192);
+ user_level_lock_timeout = d->get_conf().get_int("wrlock_timeout", 12);
+}
+
+dbcontext::~dbcontext()
+{
+}
+
+namespace {
+
+int
+wait_server_to_start(THD *thd, volatile int& shutdown_flag)
+{
+ int r = 0;
+ DBG_SHUT(fprintf(stderr, "HNDSOCK wsts\n"));
+ pthread_mutex_lock(&LOCK_server_started);
+ while (!mysqld_server_started) {
+ timespec abstime;
+ set_timespec(abstime, 1);
+ pthread_cond_timedwait(&COND_server_started, &LOCK_server_started,
+ &abstime);
+ pthread_mutex_unlock(&LOCK_server_started);
+ pthread_mutex_lock(&thd->mysys_var->mutex);
+ killed_state st = thd->killed;
+ pthread_mutex_unlock(&thd->mysys_var->mutex);
+ DBG_SHUT(fprintf(stderr, "HNDSOCK wsts kst %d\n", (int)st));
+ pthread_mutex_lock(&LOCK_server_started);
+ if (st != NOT_KILLED) {
+ DBG_SHUT(fprintf(stderr, "HNDSOCK wsts kst %d break\n", (int)st));
+ r = -1;
+ break;
+ }
+ if (shutdown_flag) {
+ DBG_SHUT(fprintf(stderr, "HNDSOCK wsts kst shut break\n"));
+ r = -1;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&LOCK_server_started);
+ DBG_SHUT(fprintf(stderr, "HNDSOCK wsts done\n"));
+ return r;
+}
+
+}; // namespace
+
+#define DENA_THR_OFFSETOF(fld) ((char *)(&thd->fld) - (char *)thd)
+
+void
+dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag)
+{
+ DBG_THR(fprintf(stderr, "HNDSOCK init thread\n"));
+ {
+ my_thread_init();
+ thd = new THD(0);
+ thd->thread_stack = (char *)stack_bottom;
+ DBG_THR(fprintf(stderr,
+ "thread_stack = %p sizeof(THD)=%zu sizeof(mtx)=%zu "
+ "O: %zu %zu %zu %zu %zu %zu %zu\n",
+ thd->thread_stack, sizeof(THD), sizeof(mysql_mutex_t),
+ DENA_THR_OFFSETOF(mdl_context),
+ DENA_THR_OFFSETOF(net),
+ DENA_THR_OFFSETOF(LOCK_thd_data),
+ DENA_THR_OFFSETOF(mysys_var),
+ DENA_THR_OFFSETOF(stmt_arena),
+ DENA_THR_OFFSETOF(limit_found_rows),
+ DENA_THR_OFFSETOF(locked_tables_list)));
+ thd->store_globals();
+ thd->system_thread = static_cast<enum_thread_type>(1<<30UL);
+ memset(&thd->net, 0, sizeof(thd->net));
+ if (for_write_flag) {
+ #if MYSQL_VERSION_ID >= 50505
+ thd->variables.option_bits |= OPTION_BIN_LOG;
+ #else
+ thd->options |= OPTION_BIN_LOG;
+ #endif
+ safeFree((char*) thd->db.str);
+ thd->db.str= my_strdup(PSI_NOT_INSTRUMENTED, "handlersocket", MYF(0));
+ thd->db.length= sizeof("handlersocket")-1;
+ }
+ thd->variables.option_bits |= OPTION_TABLE_LOCK;
+ set_current_thd(thd);
+ DBG_THR(fprintf(stderr, "HNDSOCK x0 %p\n", thd));
+ }
+ {
+ thd->thread_id = next_thread_id();
+ server_threads.insert(thd);
+ }
+
+ DBG_THR(fprintf(stderr, "HNDSOCK init thread wsts\n"));
+ wait_server_to_start(thd, shutdown_flag);
+ DBG_THR(fprintf(stderr, "HNDSOCK init thread done\n"));
+
+ thd_proc_info(thd, &info_message_buf[0]);
+ set_thread_message("hs:listening");
+ DBG_THR(fprintf(stderr, "HNDSOCK x1 %p\n", thd));
+
+ lex_start(thd);
+
+ user_lock.reset(new expr_user_lock(thd, user_level_lock_timeout));
+}
+
+int
+dbcontext::set_thread_message(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ const int n = vsnprintf(&info_message_buf[0], info_message_buf.size(),
+ fmt, ap);
+ va_end(ap);
+ return n;
+}
+
+void
+dbcontext::term_thread()
+{
+ DBG_THR(fprintf(stderr, "HNDSOCK thread end %p\n", thd));
+ close_tables_if();
+ set_current_thd(nullptr);
+ {
+ delete thd;
+ thd = 0;
+ my_thread_end();
+ }
+}
+
+bool
+dbcontext::check_alive()
+{
+ pthread_mutex_lock(&thd->mysys_var->mutex);
+ killed_state st = thd->killed;
+ pthread_mutex_unlock(&thd->mysys_var->mutex);
+ DBG_SHUT(fprintf(stderr, "chk HNDSOCK kst %p %p %d %zu\n", thd, &thd->killed,
+ (int)st, sizeof(*thd)));
+ if (st != NOT_KILLED) {
+ DBG_SHUT(fprintf(stderr, "chk HNDSOCK kst %d break\n", (int)st));
+ return false;
+ }
+ return true;
+}
+
+void
+dbcontext::lock_tables_if()
+{
+ if (lock_failed) {
+ return;
+ }
+ if (for_write_flag && !user_level_lock_locked) {
+ if (user_lock->get_lock()) {
+ user_level_lock_locked = true;
+ } else {
+ lock_failed = true;
+ return;
+ }
+ }
+ if (lock == 0) {
+ const size_t num_max = table_vec.size();
+ TABLE **const tables = DENA_ALLOCA_ALLOCATE(TABLE *, num_max + 1);
+ size_t num_open = 0;
+ for (size_t i = 0; i < num_max; ++i) {
+ if (table_vec[i].refcount > 0) {
+ tables[num_open++] = table_vec[i].table;
+ }
+ table_vec[i].modified = false;
+ }
+ #if MYSQL_VERSION_ID >= 50505
+ lock = thd->lock = mysql_lock_tables(thd, &tables[0], num_open, 0);
+ #else
+ bool need_reopen= false;
+ lock = thd->lock = mysql_lock_tables(thd, &tables[0], num_open,
+ MYSQL_LOCK_NOTIFY_IF_NEED_REOPEN, &need_reopen);
+ #endif
+ statistic_increment(lock_tables_count, &LOCK_status);
+ thd_proc_info(thd, &info_message_buf[0]);
+ DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK lock tables %p %p %zu %zu\n",
+ thd, lock, num_max, num_open));
+ if (lock == 0) {
+ lock_failed = true;
+ DENA_VERBOSE(10, fprintf(stderr, "HNDSOCK failed to lock tables %p\n",
+ thd));
+ }
+ if (for_write_flag) {
+ #if MYSQL_VERSION_ID >= 50505
+ thd->set_current_stmt_binlog_format_row();
+ #else
+ thd->current_stmt_binlog_row_based = 1;
+ #endif
+ }
+ DENA_ALLOCA_FREE(tables);
+ }
+ DBG_LOCK(fprintf(stderr, "HNDSOCK tblnum=%d\n", (int)tblnum));
+}
+
+void
+dbcontext::unlock_tables_if()
+{
+ if (lock != 0) {
+ DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK unlock tables %p %p\n",
+ thd, thd->lock));
+ if (for_write_flag) {
+ for (size_t i = 0; i < table_vec.size(); ++i) {
+ if (table_vec[i].modified) {
+ query_cache_invalidate3(thd, table_vec[i].table, 1);
+ table_vec[i].table->file->ha_release_auto_increment();
+ }
+ }
+ }
+ {
+ bool suc = true;
+ #if MYSQL_VERSION_ID >= 50505
+ suc = (trans_commit_stmt(thd) == 0);
+ #else
+ suc = (ha_autocommit_or_rollback(thd, 0) == 0);
+ #endif
+ if (!suc) {
+ commit_error = true;
+ DENA_VERBOSE(10, fprintf(stderr,
+ "HNDSOCK unlock tables: commit failed\n"));
+ }
+ }
+ mysql_unlock_tables(thd, lock);
+ lock = thd->lock = 0;
+ statistic_increment(unlock_tables_count, &LOCK_status);
+ }
+ if (user_level_lock_locked) {
+ if (user_lock->release_lock()) {
+ user_level_lock_locked = false;
+ }
+ }
+}
+
+bool
+dbcontext::get_commit_error()
+{
+ return commit_error;
+}
+
+void
+dbcontext::clear_error()
+{
+ lock_failed = false;
+ commit_error = false;
+}
+
+void
+dbcontext::close_tables_if()
+{
+ unlock_tables_if();
+ DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK close tables\n"));
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks(thd);
+ if (!table_vec.empty()) {
+ statistic_increment(close_tables_count, &LOCK_status);
+ table_vec.clear();
+ table_map.clear();
+ }
+}
+
+void
+dbcontext::table_addref(size_t tbl_id)
+{
+ table_vec[tbl_id].refcount += 1;
+ DBG_REFCNT(fprintf(stderr, "%p %zu %zu addref\n", this, tbl_id,
+ table_vec[tbl_id].refcount));
+}
+
+void
+dbcontext::table_release(size_t tbl_id)
+{
+ table_vec[tbl_id].refcount -= 1;
+ DBG_REFCNT(fprintf(stderr, "%p %zu %zu release\n", this, tbl_id,
+ table_vec[tbl_id].refcount));
+}
+
+void
+dbcontext::resp_record(dbcallback_i& cb, TABLE *const table,
+ const prep_stmt& pst)
+{
+ char rwpstr_buf[64];
+ String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin);
+ const prep_stmt::fields_type& rf = pst.get_ret_fields();
+ const size_t n = rf.size();
+ for (size_t i = 0; i < n; ++i) {
+ uint32_t fn = rf[i];
+ Field *const fld = table->field[fn];
+ DBG_FLD(fprintf(stderr, "fld=%p %zu\n", fld, fn));
+ if (fld->is_null()) {
+ /* null */
+ cb.dbcb_resp_entry(0, 0);
+ } else {
+ fld->val_str(&rwpstr, &rwpstr);
+ const size_t len = rwpstr.length();
+ if (len != 0) {
+ /* non-empty */
+ cb.dbcb_resp_entry(rwpstr.ptr(), rwpstr.length());
+ } else {
+ /* empty */
+ static const char empty_str[] = "";
+ cb.dbcb_resp_entry(empty_str, 0);
+ }
+ }
+ }
+}
+
+void
+dbcontext::dump_record(dbcallback_i& cb, TABLE *const table,
+ const prep_stmt& pst)
+{
+ char rwpstr_buf[64];
+ String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin);
+ const prep_stmt::fields_type& rf = pst.get_ret_fields();
+ const size_t n = rf.size();
+ for (size_t i = 0; i < n; ++i) {
+ uint32_t fn = rf[i];
+ Field *const fld = table->field[fn];
+ if (fld->is_null()) {
+ /* null */
+ fprintf(stderr, "NULL");
+ } else {
+ fld->val_str(&rwpstr, &rwpstr);
+ const std::string s(rwpstr.ptr(), rwpstr.length());
+ fprintf(stderr, "[%s]", s.c_str());
+ }
+ }
+ fprintf(stderr, "\n");
+}
+
+int
+dbcontext::modify_record(dbcallback_i& cb, TABLE *const table,
+ const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
+ size_t& modified_count)
+{
+ if (mod_op == 'U') {
+ /* update */
+ handler *const hnd = table->file;
+ uchar *const buf = table->record[0];
+ store_record(table, record[1]);
+ const prep_stmt::fields_type& rf = pst.get_ret_fields();
+ const size_t n = rf.size();
+ for (size_t i = 0; i < n; ++i) {
+ const string_ref& nv = args.uvals[i];
+ uint32_t fn = rf[i];
+ Field *const fld = table->field[fn];
+ if (nv.begin() == 0) {
+ fld->set_null();
+ } else {
+ fld->set_notnull();
+ fld->store(nv.begin(), nv.size(), &my_charset_bin);
+ }
+ }
+ table_vec[pst.get_table_id()].modified = true;
+ const int r = hnd->ha_update_row(table->record[1], buf);
+ if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) {
+ return r;
+ }
+ ++modified_count; /* TODO: HA_ERR_RECORD_IS_THE_SAME? */
+ } else if (mod_op == 'D') {
+ /* delete */
+ handler *const hnd = table->file;
+ table_vec[pst.get_table_id()].modified = true;
+ const int r = hnd->ha_delete_row(table->record[0]);
+ if (r != 0) {
+ return r;
+ }
+ ++modified_count;
+ } else if (mod_op == '+' || mod_op == '-') {
+ /* increment/decrement */
+ handler *const hnd = table->file;
+ uchar *const buf = table->record[0];
+ store_record(table, record[1]);
+ const prep_stmt::fields_type& rf = pst.get_ret_fields();
+ const size_t n = rf.size();
+ size_t i = 0;
+ for (i = 0; i < n; ++i) {
+ const string_ref& nv = args.uvals[i];
+ uint32_t fn = rf[i];
+ Field *const fld = table->field[fn];
+ if (fld->is_null() || nv.begin() == 0) {
+ continue;
+ }
+ const long long pval = fld->val_int();
+ const long long llv = atoll_nocheck(nv.begin(), nv.end());
+ /* TODO: llv == 0? */
+ long long nval = 0;
+ if (mod_op == '+') {
+ /* increment */
+ nval = pval + llv;
+ } else {
+ /* decrement */
+ nval = pval - llv;
+ if ((pval < 0 && nval > 0) || (pval > 0 && nval < 0)) {
+ break; /* don't modify */
+ }
+ }
+ fld->store(nval, false);
+ }
+ if (i == n) {
+ /* modify */
+ table_vec[pst.get_table_id()].modified = true;
+ const int r = hnd->ha_update_row(table->record[1], buf);
+ if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) {
+ return r;
+ }
+ ++modified_count;
+ }
+ }
+ return 0;
+}
+
+void
+dbcontext::cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
+ const string_ref *fvals, size_t fvalslen)
+{
+ if (!for_write_flag) {
+ return cb.dbcb_resp_short(2, "readonly");
+ }
+ lock_tables_if();
+ if (lock == 0) {
+ return cb.dbcb_resp_short(1, "lock_tables");
+ }
+ if (pst.get_table_id() >= table_vec.size()) {
+ return cb.dbcb_resp_short(2, "tblnum");
+ }
+ TABLE *const table = table_vec[pst.get_table_id()].table;
+ handler *const hnd = table->file;
+ uchar *const buf = table->record[0];
+ empty_record(table);
+ memset(buf, 0, table->s->null_bytes); /* clear null flags */
+ const prep_stmt::fields_type& rf = pst.get_ret_fields();
+ const size_t n = std::min(rf.size(), fvalslen);
+ for (size_t i = 0; i < n; ++i) {
+ uint32_t fn = rf[i];
+ Field *const fld = table->field[fn];
+ if (fvals[i].begin() == 0) {
+ fld->set_null();
+ } else {
+ fld->store(fvals[i].begin(), fvals[i].size(), &my_charset_bin);
+ }
+ }
+ table->next_number_field = table->found_next_number_field;
+ /* FIXME: test */
+ const int r = hnd->ha_write_row(buf);
+ const ulonglong insert_id = table->file->insert_id_for_cur_row;
+ table->next_number_field = 0;
+ table_vec[pst.get_table_id()].modified = true;
+ if (r == 0 && table->found_next_number_field != 0) {
+ return cb.dbcb_resp_short_num64(0, insert_id);
+ }
+ if (r != 0) {
+ return cb.dbcb_resp_short_num(1, r);
+ }
+ return cb.dbcb_resp_short(0, "");
+}
+
+void
+dbcontext::cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst,
+ const string_ref *fvals, size_t fvalslen)
+{
+ if (fvalslen < 1) {
+ return cb.dbcb_resp_short(2, "syntax");
+ }
+ return cb.dbcb_resp_short(2, "notimpl");
+}
+
+static size_t
+prepare_keybuf(const cmd_exec_args& args, uchar *key_buf, TABLE *table,
+ KEY& kinfo, size_t invalues_index)
+{
+ size_t kplen_sum = 0;
+ DBG_KEY(fprintf(stderr, "SLOW\n"));
+ for (size_t i = 0; i < args.kvalslen; ++i) {
+ const KEY_PART_INFO & kpt = kinfo.key_part[i];
+ string_ref kval = args.kvals[i];
+ if (args.invalues_keypart >= 0 &&
+ static_cast<size_t>(args.invalues_keypart) == i) {
+ kval = args.invalues[invalues_index];
+ }
+ if (kval.begin() == 0) {
+ kpt.field->set_null();
+ } else {
+ kpt.field->set_notnull();
+ }
+ kpt.field->store(kval.begin(), kval.size(), &my_charset_bin);
+ kplen_sum += kpt.store_length;
+ DBG_KEYLEN(fprintf(stderr, "l=%u sl=%zu\n", kpt.length,
+ kpt.store_length));
+ }
+ key_copy(key_buf, table->record[0], &kinfo, kplen_sum);
+ DBG_KEYLEN(fprintf(stderr, "sum=%zu flen=%u\n", kplen_sum,
+ kinfo.key_length));
+ return kplen_sum;
+}
+
+void
+dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
+ ha_rkey_function find_flag, const cmd_exec_args& args)
+{
+ const bool debug_out = (verbose_level >= 100);
+ bool need_resp_record = true;
+ char mod_op = 0;
+ const string_ref& mod_op_str = args.mod_op;
+ if (mod_op_str.size() != 0) {
+ if (!for_write_flag) {
+ return cb.dbcb_resp_short(2, "readonly");
+ }
+ mod_op = mod_op_str.begin()[0];
+ need_resp_record = mod_op_str.size() > 1 && mod_op_str.begin()[1] == '?';
+ switch (mod_op) {
+ case 'U': /* update */
+ case 'D': /* delete */
+ case '+': /* increment */
+ case '-': /* decrement */
+ break;
+ default:
+ if (debug_out) {
+ fprintf(stderr, "unknown modop: %c\n", mod_op);
+ }
+ return cb.dbcb_resp_short(2, "modop");
+ }
+ }
+ lock_tables_if();
+ if (lock == 0) {
+ return cb.dbcb_resp_short(1, "lock_tables");
+ }
+ if (pst.get_table_id() >= table_vec.size()) {
+ return cb.dbcb_resp_short(2, "tblnum");
+ }
+ TABLE *const table = table_vec[pst.get_table_id()].table;
+ /* keys */
+ if (pst.get_idxnum() >= table->s->keys) {
+ return cb.dbcb_resp_short(2, "idxnum");
+ }
+ KEY& kinfo = table->key_info[pst.get_idxnum()];
+ if (args.kvalslen > kinfo.user_defined_key_parts) {
+ return cb.dbcb_resp_short(2, "kpnum");
+ }
+ uchar *const key_buf = DENA_ALLOCA_ALLOCATE(uchar, kinfo.key_length);
+ size_t invalues_idx = 0;
+ size_t kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx);
+ /* filters */
+ uchar *filter_buf = 0;
+ if (args.filters != 0) {
+ const size_t filter_buf_len = calc_filter_buf_size(table, pst,
+ args.filters);
+ filter_buf = DENA_ALLOCA_ALLOCATE(uchar, filter_buf_len);
+ if (!fill_filter_buf(table, pst, args.filters, filter_buf,
+ filter_buf_len)) {
+ return cb.dbcb_resp_short(2, "filterblob");
+ }
+ }
+ /* handler */
+ table->read_set = &table->s->all_set;
+ handler *const hnd = table->file;
+ if (!for_write_flag) {
+ hnd->init_table_handle_for_HANDLER();
+ }
+ hnd->ha_index_or_rnd_end();
+ hnd->ha_index_init(pst.get_idxnum(), 1);
+ if (need_resp_record) {
+ cb.dbcb_resp_begin(pst.get_ret_fields().size());
+ }
+ const uint32_t limit = args.limit ? args.limit : 1;
+ uint32_t skip = args.skip;
+ size_t modified_count = 0;
+ int r = 0;
+ bool is_first = true;
+ for (uint32_t cnt = 0; cnt < limit + skip;) {
+ if (is_first) {
+ is_first = false;
+ const key_part_map kpm = (1U << args.kvalslen) - 1;
+ r = hnd->ha_index_read_map(table->record[0], key_buf, kpm, find_flag);
+ } else if (args.invalues_keypart >= 0) {
+ if (++invalues_idx >= args.invalueslen) {
+ break;
+ }
+ kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx);
+ const key_part_map kpm = (1U << args.kvalslen) - 1;
+ r = hnd->ha_index_read_map(table->record[0], key_buf, kpm, find_flag);
+ } else {
+ switch (find_flag) {
+ case HA_READ_BEFORE_KEY:
+ case HA_READ_KEY_OR_PREV:
+ r = hnd->ha_index_prev(table->record[0]);
+ break;
+ case HA_READ_AFTER_KEY:
+ case HA_READ_KEY_OR_NEXT:
+ r = hnd->ha_index_next(table->record[0]);
+ break;
+ case HA_READ_KEY_EXACT:
+ r = hnd->ha_index_next_same(table->record[0], key_buf, kplen_sum);
+ break;
+ default:
+ r = HA_ERR_END_OF_FILE; /* to finish the loop */
+ break;
+ }
+ }
+ if (debug_out) {
+ fprintf(stderr, "r=%d\n", r);
+ if (r == 0 || r == HA_ERR_RECORD_DELETED) {
+ dump_record(cb, table, pst);
+ }
+ }
+ int filter_res = 0;
+ if (r != 0) {
+ /* no-count */
+ } else if (args.filters != 0 && (filter_res = check_filter(cb, table,
+ pst, args.filters, filter_buf)) != 0) {
+ if (filter_res < 0) {
+ break;
+ }
+ } else if (skip > 0) {
+ --skip;
+ } else {
+ /* hit */
+ if (need_resp_record) {
+ resp_record(cb, table, pst);
+ }
+ if (mod_op != 0) {
+ r = modify_record(cb, table, pst, args, mod_op, modified_count);
+ }
+ ++cnt;
+ }
+ if (args.invalues_keypart >= 0 && r == HA_ERR_KEY_NOT_FOUND) {
+ continue;
+ }
+ if (r != 0 && r != HA_ERR_RECORD_DELETED) {
+ break;
+ }
+ }
+ hnd->ha_index_or_rnd_end();
+ if (r != 0 && r != HA_ERR_RECORD_DELETED && r != HA_ERR_KEY_NOT_FOUND &&
+ r != HA_ERR_END_OF_FILE) {
+ /* failed */
+ if (need_resp_record) {
+ /* revert dbcb_resp_begin() and dbcb_resp_entry() */
+ cb.dbcb_resp_cancel();
+ }
+ cb.dbcb_resp_short_num(1, r);
+ } else {
+ /* succeeded */
+ if (need_resp_record) {
+ cb.dbcb_resp_end();
+ } else {
+ cb.dbcb_resp_short_num(0, modified_count);
+ }
+ }
+ DENA_ALLOCA_FREE(filter_buf);
+ DENA_ALLOCA_FREE(key_buf);
+}
+
+size_t
+dbcontext::calc_filter_buf_size(TABLE *table, const prep_stmt& pst,
+ const record_filter *filters)
+{
+ size_t filter_buf_len = 0;
+ for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
+ if (f->val.begin() == 0) {
+ continue;
+ }
+ const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
+ filter_buf_len += table->field[fn]->pack_length();
+ }
+ ++filter_buf_len;
+ /* Field_medium::cmp() calls uint3korr(), which may read 4 bytes.
+ Allocate 1 more byte for safety. */
+ return filter_buf_len;
+}
+
+bool
+dbcontext::fill_filter_buf(TABLE *table, const prep_stmt& pst,
+ const record_filter *filters, uchar *filter_buf, size_t len)
+{
+ memset(filter_buf, 0, len);
+ size_t pos = 0;
+ for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
+ if (f->val.begin() == 0) {
+ continue;
+ }
+ const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
+ Field *const fld = table->field[fn];
+ if ((fld->flags & BLOB_FLAG) != 0) {
+ return false;
+ }
+ fld->store(f->val.begin(), f->val.size(), &my_charset_bin);
+ const size_t packlen = fld->pack_length();
+ memcpy(filter_buf + pos, fld->ptr, packlen);
+ pos += packlen;
+ }
+ return true;
+}
+
+int
+dbcontext::check_filter(dbcallback_i& cb, TABLE *table, const prep_stmt& pst,
+ const record_filter *filters, const uchar *filter_buf)
+{
+ DBG_FILTER(fprintf(stderr, "check_filter\n"));
+ size_t pos = 0;
+ for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
+ const string_ref& op = f->op;
+ const string_ref& val = f->val;
+ const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
+ Field *const fld = table->field[fn];
+ const size_t packlen = fld->pack_length();
+ const uchar *const bval = filter_buf + pos;
+ int cv = 0;
+ if (fld->is_null()) {
+ cv = (val.begin() == 0) ? 0 : -1;
+ } else {
+ cv = (val.begin() == 0) ? 1 : fld->cmp(bval);
+ }
+ DBG_FILTER(fprintf(stderr, "check_filter cv=%d\n", cv));
+ bool cond = true;
+ if (op.size() == 1) {
+ switch (op.begin()[0]) {
+ case '>':
+ DBG_FILTER(fprintf(stderr, "check_filter op: >\n"));
+ cond = (cv > 0);
+ break;
+ case '<':
+ DBG_FILTER(fprintf(stderr, "check_filter op: <\n"));
+ cond = (cv < 0);
+ break;
+ case '=':
+ DBG_FILTER(fprintf(stderr, "check_filter op: =\n"));
+ cond = (cv == 0);
+ break;
+ default:
+ DBG_FILTER(fprintf(stderr, "check_filter op: unknown\n"));
+ cond = false; /* FIXME: error */
+ break;
+ }
+ } else if (op.size() == 2 && op.begin()[1] == '=') {
+ switch (op.begin()[0]) {
+ case '>':
+ DBG_FILTER(fprintf(stderr, "check_filter op: >=\n"));
+ cond = (cv >= 0);
+ break;
+ case '<':
+ DBG_FILTER(fprintf(stderr, "check_filter op: <=\n"));
+ cond = (cv <= 0);
+ break;
+ case '!':
+ DBG_FILTER(fprintf(stderr, "check_filter op: !=\n"));
+ cond = (cv != 0);
+ break;
+ default:
+ DBG_FILTER(fprintf(stderr, "check_filter op: unknown\n"));
+ cond = false; /* FIXME: error */
+ break;
+ }
+ }
+ DBG_FILTER(fprintf(stderr, "check_filter cond: %d\n", (int)cond));
+ if (!cond) {
+ return (f->filter_type == record_filter_type_skip) ? 1 : -1;
+ }
+ if (val.begin() != 0) {
+ pos += packlen;
+ }
+ }
+ return 0;
+}
+
+void
+dbcontext::cmd_open(dbcallback_i& cb, const cmd_open_args& arg)
+{
+ unlock_tables_if();
+ const table_name_type k = std::make_pair(std::string(arg.dbn),
+ std::string(arg.tbl));
+ const table_map_type::const_iterator iter = table_map.find(k);
+ uint32_t tblnum = 0;
+ if (iter != table_map.end()) {
+ tblnum = iter->second;
+ DBG_CMP(fprintf(stderr, "HNDSOCK k=%s tblnum=%d\n", k.c_str(),
+ (int)tblnum));
+ } else {
+ TABLE_LIST tables;
+ TABLE *table = 0;
+ bool refresh = true;
+ const thr_lock_type lock_type = for_write_flag ? TL_WRITE : TL_READ;
+ #if MYSQL_VERSION_ID >= 50505
+ LEX_CSTRING db_name= { arg.dbn, strlen(arg.dbn) };
+ LEX_CSTRING tbl_name= { arg.tbl, strlen(arg.tbl) };
+ tables.init_one_table(&db_name, &tbl_name, 0, lock_type);
+ MDL_REQUEST_INIT(&tables.mdl_request, MDL_key::TABLE, arg.dbn, arg.tbl,
+ for_write_flag ? MDL_SHARED_WRITE : MDL_SHARED_READ, MDL_TRANSACTION);
+ Open_table_context ot_act(thd, 0);
+ if (!open_table(thd, &tables, &ot_act)) {
+ table = tables.table;
+ }
+ #else
+ tables.init_one_table(arg.dbn, arg.tbl, lock_type);
+ table = open_table(thd, &tables, thd->mem_root, &refresh,
+ OPEN_VIEW_NO_PARSE);
+ #endif
+ if (table == 0) {
+ DENA_VERBOSE(20, fprintf(stderr,
+ "HNDSOCK failed to open %p [%s] [%s] [%d]\n",
+ thd, arg.dbn, arg.tbl, static_cast<int>(refresh)));
+ return cb.dbcb_resp_short(1, "open_table");
+ }
+ statistic_increment(open_tables_count, &LOCK_status);
+ table->reginfo.lock_type = lock_type;
+ table->use_all_columns();
+ tblnum = table_vec.size();
+ tablevec_entry e;
+ e.table = table;
+ table_vec.push_back(e);
+ table_map[k] = tblnum;
+ }
+ size_t idxnum = static_cast<size_t>(-1);
+ if (arg.idx[0] >= '0' && arg.idx[0] <= '9') {
+ /* numeric */
+ TABLE *const table = table_vec[tblnum].table;
+ idxnum = atoi(arg.idx);
+ if (idxnum >= table->s->keys) {
+ return cb.dbcb_resp_short(2, "idxnum");
+ }
+ } else {
+ const char *const idx_name_to_open =
+ arg.idx[0] == '\0' ? "PRIMARY" : arg.idx;
+ TABLE *const table = table_vec[tblnum].table;
+ for (uint i = 0; i < table->s->keys; ++i) {
+ KEY& kinfo = table->key_info[i];
+ if (strcmp(kinfo.name.str, idx_name_to_open) == 0) {
+ idxnum = i;
+ break;
+ }
+ }
+ }
+ if (idxnum == size_t(-1)) {
+ return cb.dbcb_resp_short(2, "idxnum");
+ }
+ prep_stmt::fields_type rf;
+ prep_stmt::fields_type ff;
+ if (!parse_fields(table_vec[tblnum].table, arg.retflds, rf)) {
+ return cb.dbcb_resp_short(2, "fld");
+ }
+ if (!parse_fields(table_vec[tblnum].table, arg.filflds, ff)) {
+ return cb.dbcb_resp_short(2, "fld");
+ }
+ prep_stmt p(this, tblnum, idxnum, rf, ff);
+ cb.dbcb_set_prep_stmt(arg.pst_id, p);
+ return cb.dbcb_resp_short(0, "");
+}
+
+bool
+dbcontext::parse_fields(TABLE *const table, const char *str,
+ prep_stmt::fields_type& flds)
+{
+ string_ref flds_sr(str, strlen(str));
+ std::vector<string_ref> fldnms;
+ if (flds_sr.size() != 0) {
+ split(',', flds_sr, fldnms);
+ }
+ for (size_t i = 0; i < fldnms.size(); ++i) {
+ Field **fld = 0;
+ size_t j = 0;
+ for (fld = table->field; *fld; ++fld, ++j) {
+ DBG_FLD(fprintf(stderr, "f %s\n", (*fld)->field_name.str));
+ string_ref fn((*fld)->field_name.str, (*fld)->field_name.length);
+ if (fn == fldnms[i]) {
+ break;
+ }
+ }
+ if (*fld == 0) {
+ DBG_FLD(fprintf(stderr, "UNKNOWN FLD %s [%s]\n", retflds,
+ std::string(fldnms[i].begin(), fldnms[i].size()).c_str()));
+ return false;
+ }
+ DBG_FLD(fprintf(stderr, "FLD %s %zu\n", (*fld)->field_name.str, j));
+ flds.push_back(j);
+ }
+ return true;
+}
+
+enum db_write_op {
+ db_write_op_none = 0,
+ db_write_op_insert = 1,
+ db_write_op_sql = 2,
+};
+
+void
+dbcontext::cmd_exec(dbcallback_i& cb, const cmd_exec_args& args)
+{
+ const prep_stmt& p = *args.pst;
+ if (p.get_table_id() == static_cast<size_t>(-1)) {
+ return cb.dbcb_resp_short(2, "stmtnum");
+ }
+ ha_rkey_function find_flag = HA_READ_KEY_EXACT;
+ db_write_op wrop = db_write_op_none;
+ if (args.op.size() == 1) {
+ switch (args.op.begin()[0]) {
+ case '=':
+ find_flag = HA_READ_KEY_EXACT;
+ break;
+ case '>':
+ find_flag = HA_READ_AFTER_KEY;
+ break;
+ case '<':
+ find_flag = HA_READ_BEFORE_KEY;
+ break;
+ case '+':
+ wrop = db_write_op_insert;
+ break;
+ case 'S':
+ wrop = db_write_op_sql;
+ break;
+ default:
+ return cb.dbcb_resp_short(2, "op");
+ }
+ } else if (args.op.size() == 2 && args.op.begin()[1] == '=') {
+ switch (args.op.begin()[0]) {
+ case '>':
+ find_flag = HA_READ_KEY_OR_NEXT;
+ break;
+ case '<':
+ find_flag = HA_READ_KEY_OR_PREV;
+ break;
+ default:
+ return cb.dbcb_resp_short(2, "op");
+ }
+ } else {
+ return cb.dbcb_resp_short(2, "op");
+ }
+ if (args.kvalslen <= 0) {
+ return cb.dbcb_resp_short(2, "klen");
+ }
+ switch (wrop) {
+ case db_write_op_none:
+ return cmd_find_internal(cb, p, find_flag, args);
+ case db_write_op_insert:
+ return cmd_insert_internal(cb, p, args.kvals, args.kvalslen);
+ case db_write_op_sql:
+ return cmd_sql_internal(cb, p, args.kvals, args.kvalslen);
+ }
+}
+
+void
+dbcontext::set_statistics(size_t num_conns, size_t num_active)
+{
+ if (for_write_flag) {
+ set_thread_message("handlersocket: mode=wr, %zu conns, %zu active",
+ num_conns, num_active);
+ } else {
+ set_thread_message("handlersocket: mode=rd, %zu conns, %zu active",
+ num_conns, num_active);
+ }
+ /*
+ Don't set message buf if it's already in use. This saves slow call to
+ thd_proc_info() (if profiling is enabled)
+ */
+ if (thd->proc_info != &info_message_buf[0])
+ thd_proc_info(thd, &info_message_buf[0]);
+}
+
+};
+
diff --git a/plugin/handler_socket/handlersocket/database.hpp b/plugin/handler_socket/handlersocket/database.hpp
new file mode 100644
index 00000000..a4aee087
--- /dev/null
+++ b/plugin/handler_socket/handlersocket/database.hpp
@@ -0,0 +1,142 @@
+
+// vim:sw=2:ai
+
+/*
+ * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
+ * See COPYRIGHT.txt for details.
+ */
+
+#ifndef DENA_DATABASE_HPP
+#define DENA_DATABASE_HPP
+
+#include <string>
+#include <memory>
+#include <vector>
+#include <stdint.h>
+
+#include "string_buffer.hpp"
+#include "string_ref.hpp"
+#include "config.hpp"
+
+namespace dena {
+
+struct database_i;
+typedef std::auto_ptr<volatile database_i> database_ptr;
+
+struct dbcontext_i;
+typedef std::auto_ptr<dbcontext_i> dbcontext_ptr;
+
+struct database_i {
+ virtual ~database_i() { }
+ virtual dbcontext_ptr create_context(bool for_write) volatile = 0;
+ virtual void stop() volatile = 0;
+ virtual const config& get_conf() const volatile = 0;
+ static database_ptr create(const config& conf);
+};
+
+struct prep_stmt {
+ typedef std::vector<uint32_t> fields_type;
+ private:
+ dbcontext_i *dbctx; /* must be valid while *this is alive */
+ size_t table_id; /* a prep_stmt object holds a refcount of the table */
+ size_t idxnum;
+ fields_type ret_fields;
+ fields_type filter_fields;
+ public:
+ prep_stmt();
+ prep_stmt(dbcontext_i *c, size_t tbl, size_t idx, const fields_type& rf,
+ const fields_type& ff);
+ ~prep_stmt();
+ prep_stmt(const prep_stmt& x);
+ prep_stmt& operator =(const prep_stmt& x);
+ public:
+ size_t get_table_id() const { return table_id; }
+ size_t get_idxnum() const { return idxnum; }
+ const fields_type& get_ret_fields() const { return ret_fields; }
+ const fields_type& get_filter_fields() const { return filter_fields; }
+};
+
+struct dbcallback_i {
+ virtual ~dbcallback_i () { }
+ virtual void dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v) = 0;
+ virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const = 0;
+ virtual void dbcb_resp_short(uint32_t code, const char *msg) = 0;
+ virtual void dbcb_resp_short_num(uint32_t code, uint32_t value) = 0;
+ virtual void dbcb_resp_short_num64(uint32_t code, uint64_t value) = 0;
+ virtual void dbcb_resp_begin(size_t num_flds) = 0;
+ virtual void dbcb_resp_entry(const char *fld, size_t fldlen) = 0;
+ virtual void dbcb_resp_end() = 0;
+ virtual void dbcb_resp_cancel() = 0;
+};
+
+enum record_filter_type {
+ record_filter_type_skip = 0,
+ record_filter_type_break = 1,
+};
+
+struct record_filter {
+ record_filter_type filter_type;
+ string_ref op;
+ uint32_t ff_offset; /* offset in filter_fields */
+ string_ref val;
+ record_filter() : filter_type(record_filter_type_skip), ff_offset(0) { }
+};
+
+struct cmd_open_args {
+ size_t pst_id;
+ const char *dbn;
+ const char *tbl;
+ const char *idx;
+ const char *retflds;
+ const char *filflds;
+ cmd_open_args() : pst_id(0), dbn(0), tbl(0), idx(0), retflds(0),
+ filflds(0) { }
+};
+
+struct cmd_exec_args {
+ const prep_stmt *pst;
+ string_ref op;
+ const string_ref *kvals;
+ size_t kvalslen;
+ uint32_t limit;
+ uint32_t skip;
+ string_ref mod_op;
+ const string_ref *uvals; /* size must be pst->retfieelds.size() */
+ const record_filter *filters;
+ int invalues_keypart;
+ const string_ref *invalues;
+ size_t invalueslen;
+ cmd_exec_args() : pst(0), kvals(0), kvalslen(0), limit(0), skip(0),
+ uvals(0), filters(0), invalues_keypart(-1), invalues(0), invalueslen(0) { }
+};
+
+struct dbcontext_i {
+ virtual ~dbcontext_i() { }
+ virtual void init_thread(const void *stack_bottom,
+ volatile int& shutdown_flag) = 0;
+ virtual void term_thread() = 0;
+ virtual bool check_alive() = 0;
+ virtual void lock_tables_if() = 0;
+ virtual void unlock_tables_if() = 0;
+ virtual bool get_commit_error() = 0;
+ virtual void clear_error() = 0;
+ virtual void close_tables_if() = 0;
+ virtual void table_addref(size_t tbl_id) = 0; /* TODO: hide */
+ virtual void table_release(size_t tbl_id) = 0; /* TODO: hide */
+ virtual void cmd_open(dbcallback_i& cb, const cmd_open_args& args) = 0;
+ virtual void cmd_exec(dbcallback_i& cb, const cmd_exec_args& args) = 0;
+ virtual void set_statistics(size_t num_conns, size_t num_active) = 0;
+};
+
+};
+
+extern unsigned long long int open_tables_count;
+extern unsigned long long int close_tables_count;
+extern unsigned long long int lock_tables_count;
+extern unsigned long long int unlock_tables_count;
+#if 0
+extern unsigned long long int index_exec_count;
+#endif
+
+#endif
+
diff --git a/plugin/handler_socket/handlersocket/handlersocket.cpp b/plugin/handler_socket/handlersocket/handlersocket.cpp
new file mode 100644
index 00000000..81334970
--- /dev/null
+++ b/plugin/handler_socket/handlersocket/handlersocket.cpp
@@ -0,0 +1,217 @@
+
+// vim:sw=2:ai
+
+/*
+ * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
+ * See COPYRIGHT.txt for details.
+ */
+
+#include <my_global.h>
+#include <memory>
+#include <string>
+#include <stdio.h>
+
+#include "config.hpp"
+#include "hstcpsvr.hpp"
+#include "string_util.hpp"
+#include "mysql_incl.hpp"
+
+#define DBG_LOG \
+ if (dena::verbose_level >= 100) { \
+ fprintf(stderr, "%s %p\n", __PRETTY_FUNCTION__, this); \
+ }
+#define DBG_DO(x) if (dena::verbose_level >= 100) { x; }
+
+#define DBG_DIR(x)
+
+using namespace dena;
+
+static char *handlersocket_address = 0;
+static char *handlersocket_port = 0;
+static char *handlersocket_port_wr = 0;
+static unsigned int handlersocket_epoll = 1;
+static unsigned int handlersocket_threads = 32;
+static unsigned int handlersocket_threads_wr = 1;
+static unsigned int handlersocket_timeout = 30;
+static unsigned int handlersocket_backlog = 32768;
+static unsigned int handlersocket_sndbuf = 0;
+static unsigned int handlersocket_rcvbuf = 0;
+static unsigned int handlersocket_readsize = 0;
+static unsigned int handlersocket_accept_balance = 0;
+static unsigned int handlersocket_wrlock_timeout = 0;
+static char *handlersocket_plain_secret = 0;
+static char *handlersocket_plain_secret_wr = 0;
+
+struct daemon_handlersocket_data {
+ hstcpsvr_ptr hssvr_rd;
+ hstcpsvr_ptr hssvr_wr;
+};
+
+static int
+daemon_handlersocket_init(void *p)
+{
+ DENA_VERBOSE(10, fprintf(stderr, "handlersocket: initialized\n"));
+ config conf;
+ conf["use_epoll"] = handlersocket_epoll ? "1" : "0";
+ if (handlersocket_address) {
+ conf["host"] = handlersocket_address;
+ }
+ if (handlersocket_port) {
+ conf["port"] = handlersocket_port;
+ }
+ /*
+ * unix domain socket
+ * conf["host"] = "/";
+ * conf["port"] = "/tmp/handlersocket";
+ */
+ if (handlersocket_threads > 0) {
+ conf["num_threads"] = to_stdstring(handlersocket_threads);
+ } else {
+ conf["num_threads"] = "1";
+ }
+ conf["timeout"] = to_stdstring(handlersocket_timeout);
+ conf["listen_backlog"] = to_stdstring(handlersocket_backlog);
+ conf["sndbuf"] = to_stdstring(handlersocket_sndbuf);
+ conf["rcvbuf"] = to_stdstring(handlersocket_rcvbuf);
+ conf["readsize"] = to_stdstring(handlersocket_readsize);
+ conf["accept_balance"] = to_stdstring(handlersocket_accept_balance);
+ conf["wrlock_timeout"] = to_stdstring(handlersocket_wrlock_timeout);
+ std::auto_ptr<daemon_handlersocket_data> ap(new daemon_handlersocket_data);
+ if (handlersocket_port != 0 && handlersocket_port_wr != handlersocket_port) {
+ conf["port"] = handlersocket_port;
+ if (handlersocket_plain_secret) {
+ conf["plain_secret"] = handlersocket_plain_secret;
+ }
+ ap->hssvr_rd = hstcpsvr_i::create(conf);
+ ap->hssvr_rd->start_listen();
+ }
+ if (handlersocket_port_wr != 0) {
+ if (handlersocket_threads_wr > 0) {
+ conf["num_threads"] = to_stdstring(handlersocket_threads_wr);
+ }
+ conf["port"] = handlersocket_port_wr;
+ conf["for_write"] = "1";
+ conf["plain_secret"] = "";
+ if (handlersocket_plain_secret_wr) {
+ conf["plain_secret"] = handlersocket_plain_secret_wr;
+ }
+ ap->hssvr_wr = hstcpsvr_i::create(conf);
+ ap->hssvr_wr->start_listen();
+ }
+ st_plugin_int *const plugin = static_cast<st_plugin_int *>(p);
+ plugin->data = ap.release();
+ return 0;
+}
+
+static int
+daemon_handlersocket_deinit(void *p)
+{
+ DENA_VERBOSE(10, fprintf(stderr, "handlersocket: terminated\n"));
+ st_plugin_int *const plugin = static_cast<st_plugin_int *>(p);
+ daemon_handlersocket_data *ptr =
+ static_cast<daemon_handlersocket_data *>(plugin->data);
+ delete ptr;
+ return 0;
+}
+
+static struct st_mysql_daemon daemon_handlersocket_plugin = {
+ MYSQL_DAEMON_INTERFACE_VERSION
+};
+
+static MYSQL_SYSVAR_UINT(verbose, dena::verbose_level, 0,
+ "0..10000", 0, 0, 10 /* default */, 0, 10000, 0);
+static MYSQL_SYSVAR_UINT(epoll, handlersocket_epoll, PLUGIN_VAR_READONLY,
+ "0..1", 0, 0, 1 /* default */, 0, 1, 0);
+static MYSQL_SYSVAR_STR(address, handlersocket_address,
+ PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC, "", NULL, NULL, NULL);
+static MYSQL_SYSVAR_STR(port, handlersocket_port,
+ PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC, "", NULL, NULL, NULL);
+static MYSQL_SYSVAR_STR(port_wr, handlersocket_port_wr,
+ PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC, "", NULL, NULL, NULL);
+static MYSQL_SYSVAR_UINT(threads, handlersocket_threads, PLUGIN_VAR_READONLY,
+ "1..3000", 0, 0, 16 /* default */, 1, 3000, 0);
+static MYSQL_SYSVAR_UINT(threads_wr, handlersocket_threads_wr,
+ PLUGIN_VAR_READONLY, "1..3000", 0, 0, 1 /* default */, 1, 3000, 0);
+static MYSQL_SYSVAR_UINT(timeout, handlersocket_timeout, PLUGIN_VAR_READONLY,
+ "30..3600", 0, 0, 300 /* default */, 30, 3600, 0);
+static MYSQL_SYSVAR_UINT(backlog, handlersocket_backlog, PLUGIN_VAR_READONLY,
+ "5..1000000", 0, 0, 32768 /* default */, 5, 1000000, 0);
+static MYSQL_SYSVAR_UINT(sndbuf, handlersocket_sndbuf, PLUGIN_VAR_READONLY,
+ "0..16777216", 0, 0, 0 /* default */, 0, 16777216, 0);
+static MYSQL_SYSVAR_UINT(rcvbuf, handlersocket_rcvbuf, PLUGIN_VAR_READONLY,
+ "0..16777216", 0, 0, 0 /* default */, 0, 16777216, 0);
+static MYSQL_SYSVAR_UINT(readsize, handlersocket_readsize, PLUGIN_VAR_READONLY,
+ "0..16777216", 0, 0, 0 /* default */, 0, 16777216, 0);
+static MYSQL_SYSVAR_UINT(accept_balance, handlersocket_accept_balance,
+ PLUGIN_VAR_READONLY, "0..10000", 0, 0, 0 /* default */, 0, 10000, 0);
+static MYSQL_SYSVAR_UINT(wrlock_timeout, handlersocket_wrlock_timeout,
+ PLUGIN_VAR_READONLY, "0..3600", 0, 0, 12 /* default */, 0, 3600, 0);
+static MYSQL_SYSVAR_STR(plain_secret, handlersocket_plain_secret,
+ PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC, "", NULL, NULL, NULL);
+static MYSQL_SYSVAR_STR(plain_secret_wr, handlersocket_plain_secret_wr,
+ PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC, "", NULL, NULL, NULL);
+
+
+/* warning: type-punning to incomplete type might break strict-aliasing
+ * rules */
+static struct st_mysql_sys_var *daemon_handlersocket_system_variables[] = {
+ MYSQL_SYSVAR(verbose),
+ MYSQL_SYSVAR(address),
+ MYSQL_SYSVAR(port),
+ MYSQL_SYSVAR(port_wr),
+ MYSQL_SYSVAR(epoll),
+ MYSQL_SYSVAR(threads),
+ MYSQL_SYSVAR(threads_wr),
+ MYSQL_SYSVAR(timeout),
+ MYSQL_SYSVAR(backlog),
+ MYSQL_SYSVAR(sndbuf),
+ MYSQL_SYSVAR(rcvbuf),
+ MYSQL_SYSVAR(readsize),
+ MYSQL_SYSVAR(accept_balance),
+ MYSQL_SYSVAR(wrlock_timeout),
+ MYSQL_SYSVAR(plain_secret),
+ MYSQL_SYSVAR(plain_secret_wr),
+ 0
+};
+
+static SHOW_VAR hs_status_variables[] = {
+ {"table_open", (char*) &open_tables_count, SHOW_LONGLONG},
+ {"table_close", (char*) &close_tables_count, SHOW_LONGLONG},
+ {"table_lock", (char*) &lock_tables_count, SHOW_LONGLONG},
+ {"table_unlock", (char*) &unlock_tables_count, SHOW_LONGLONG},
+ #if 0
+ {"index_exec", (char*) &index_exec_count, SHOW_LONGLONG},
+ #endif
+ {NullS, NullS, SHOW_LONG}
+};
+
+static int show_hs_vars(THD *thd, SHOW_VAR *var, char *buff)
+{
+ var->type= SHOW_ARRAY;
+ var->value= (char *) &hs_status_variables;
+ return 0;
+}
+
+static SHOW_VAR daemon_handlersocket_status_variables[] = {
+ {"Hs", (char*) show_hs_vars, SHOW_FUNC},
+ {NullS, NullS, SHOW_LONG}
+};
+
+
+maria_declare_plugin(handlersocket)
+{
+ MYSQL_DAEMON_PLUGIN,
+ &daemon_handlersocket_plugin,
+ "handlersocket",
+ "higuchi dot akira at dena dot jp",
+ "Direct access into InnoDB",
+ PLUGIN_LICENSE_BSD,
+ daemon_handlersocket_init,
+ daemon_handlersocket_deinit,
+ 0x0100 /* 1.0 */,
+ daemon_handlersocket_status_variables,
+ daemon_handlersocket_system_variables,
+ "1.0",
+ MariaDB_PLUGIN_MATURITY_BETA
+}
+maria_declare_plugin_end;
diff --git a/plugin/handler_socket/handlersocket/handlersocket.spec.template b/plugin/handler_socket/handlersocket/handlersocket.spec.template
new file mode 100644
index 00000000..0ce8c0cb
--- /dev/null
+++ b/plugin/handler_socket/handlersocket/handlersocket.spec.template
@@ -0,0 +1,29 @@
+Summary: handlersocket plugin for mysql
+Name: handlersocket
+Version: HANDLERSOCKET_VERSION
+Release: 1%{?dist}
+Group: System Environment/Libraries
+License: BSD
+Source: handlersocket.tar.gz
+Packager: Akira Higuchi <higuchi dot akira at dena dot jp>
+BuildRoot: /var/tmp/%{name}-%{version}-root
+
+%description
+
+%prep
+%setup -n %{name}
+
+%define _use_internal_dependency_generator 0
+
+%build
+make -f Makefile.plain
+
+%install
+rm -rf $RPM_BUILD_ROOT
+mkdir -p $RPM_BUILD_ROOT/%{_libdir}/mysql/plugin
+install -m 755 handlersocket.so $RPM_BUILD_ROOT/%{_libdir}/mysql/plugin/
+
+%files
+%defattr(-, root, root)
+%{_libdir}/mysql/plugin/*.so
+
diff --git a/plugin/handler_socket/handlersocket/hstcpsvr.cpp b/plugin/handler_socket/handlersocket/hstcpsvr.cpp
new file mode 100644
index 00000000..250ef2c7
--- /dev/null
+++ b/plugin/handler_socket/handlersocket/hstcpsvr.cpp
@@ -0,0 +1,147 @@
+
+// vim:sw=2:ai
+
+/*
+ * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
+ * See COPYRIGHT.txt for details.
+ */
+
+#include <my_global.h>
+#include <vector>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <sys/resource.h>
+
+#include "hstcpsvr.hpp"
+#include "hstcpsvr_worker.hpp"
+#include "thread.hpp"
+#include "fatal.hpp"
+#include "auto_ptrcontainer.hpp"
+
+#define DBG(x)
+
+namespace dena {
+
+struct worker_throbj {
+ worker_throbj(const hstcpsvr_worker_arg& arg)
+ : worker(hstcpsvr_worker_i::create(arg)) { }
+ void operator ()() {
+ worker->run();
+ }
+ hstcpsvr_worker_ptr worker;
+};
+
+struct hstcpsvr : public hstcpsvr_i, private noncopyable {
+ hstcpsvr(const config& c);
+ ~hstcpsvr();
+ virtual std::string start_listen();
+ private:
+ hstcpsvr_shared_c cshared;
+ volatile hstcpsvr_shared_v vshared;
+ typedef thread<worker_throbj> worker_thread_type;
+ typedef auto_ptrcontainer< std::vector<worker_thread_type *> > threads_type;
+ threads_type threads;
+ std::vector<unsigned int> thread_num_conns_vec;
+ private:
+ void stop_workers();
+};
+
+namespace {
+
+void
+check_nfile(size_t nfile)
+{
+ struct rlimit rl;
+ const int r = getrlimit(RLIMIT_NOFILE, &rl);
+ if (r != 0) {
+ fatal_abort("check_nfile: getrlimit failed");
+ }
+ if (rl.rlim_cur < static_cast<rlim_t>(nfile + 1000)) {
+ fprintf(stderr,
+ "[Warning] handlersocket: open_files_limit is too small.\n");
+ }
+}
+
+};
+
+hstcpsvr::hstcpsvr(const config& c)
+ : cshared(), vshared()
+{
+ vshared.shutdown = 0;
+ cshared.conf = c; /* copy */
+ if (cshared.conf["port"] == "") {
+ cshared.conf["port"] = "9999";
+ }
+ cshared.num_threads = cshared.conf.get_int("num_threads", 32);
+ cshared.sockargs.nonblocking = cshared.conf.get_int("nonblocking", 1);
+ cshared.sockargs.use_epoll = cshared.conf.get_int("use_epoll", 1);
+ if (cshared.sockargs.use_epoll) {
+ cshared.sockargs.nonblocking = 1;
+ }
+ cshared.readsize = cshared.conf.get_int("readsize", 1);
+ cshared.nb_conn_per_thread = cshared.conf.get_int("conn_per_thread", 1024);
+ cshared.for_write_flag = cshared.conf.get_int("for_write", 0);
+ cshared.plain_secret = cshared.conf.get_str("plain_secret", "");
+ cshared.require_auth = !cshared.plain_secret.empty();
+ cshared.sockargs.set(cshared.conf);
+ cshared.dbptr = database_i::create(c);
+ check_nfile(cshared.num_threads * cshared.nb_conn_per_thread);
+ thread_num_conns_vec.resize(cshared.num_threads);
+ cshared.thread_num_conns = thread_num_conns_vec.empty()
+ ? 0 : &thread_num_conns_vec[0];
+}
+
+hstcpsvr::~hstcpsvr()
+{
+ stop_workers();
+}
+
+std::string
+hstcpsvr::start_listen()
+{
+ std::string err;
+ if (threads.size() != 0) {
+ return "start_listen: already running";
+ }
+ if (socket_bind(cshared.listen_fd, cshared.sockargs, err) != 0) {
+ return "bind: " + err;
+ }
+ DENA_VERBOSE(20, fprintf(stderr, "bind done\n"));
+ const size_t stack_size = std::max(
+ cshared.conf.get_int("stack_size", 1 * 1024LL * 1024), 8 * 1024LL * 1024);
+ for (long i = 0; i < cshared.num_threads; ++i) {
+ hstcpsvr_worker_arg arg;
+ arg.cshared = &cshared;
+ arg.vshared = &vshared;
+ arg.worker_id = i;
+ std::auto_ptr< thread<worker_throbj> > thr(
+ new thread<worker_throbj>(arg, stack_size));
+ threads.push_back_ptr(thr);
+ }
+ DENA_VERBOSE(20, fprintf(stderr, "threads created\n"));
+ for (size_t i = 0; i < threads.size(); ++i) {
+ threads[i]->start();
+ }
+ DENA_VERBOSE(20, fprintf(stderr, "threads started\n"));
+ return std::string();
+}
+
+void
+hstcpsvr::stop_workers()
+{
+ vshared.shutdown = 1;
+ for (size_t i = 0; i < threads.size(); ++i) {
+ threads[i]->join();
+ }
+ threads.clear();
+}
+
+hstcpsvr_ptr
+hstcpsvr_i::create(const config& conf)
+{
+ return hstcpsvr_ptr(new hstcpsvr(conf));
+}
+
+};
+
diff --git a/plugin/handler_socket/handlersocket/hstcpsvr.hpp b/plugin/handler_socket/handlersocket/hstcpsvr.hpp
new file mode 100644
index 00000000..811bfa25
--- /dev/null
+++ b/plugin/handler_socket/handlersocket/hstcpsvr.hpp
@@ -0,0 +1,58 @@
+
+// vim:sw=2:ai
+
+/*
+ * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
+ * See COPYRIGHT.txt for details.
+ */
+
+#ifndef DENA_HSTCPSVR_HPP
+#define DENA_HSTCPSVR_HPP
+
+#include <memory>
+#include <string>
+#include <map>
+
+#include "mutex.hpp"
+#include "auto_file.hpp"
+#include "database.hpp"
+#include "config.hpp"
+#include "socket.hpp"
+
+namespace dena {
+
+struct hstcpsvr_shared_c {
+ config conf;
+ long num_threads;
+ long nb_conn_per_thread;
+ bool for_write_flag;
+ bool require_auth;
+ std::string plain_secret;
+ int readsize;
+ socket_args sockargs;
+ auto_file listen_fd;
+ database_ptr dbptr;
+ volatile unsigned int *thread_num_conns; /* 0 .. num_threads-1 */
+ hstcpsvr_shared_c() : num_threads(0), nb_conn_per_thread(100),
+ for_write_flag(false), require_auth(false), readsize(0),
+ thread_num_conns(0) { }
+};
+
+struct hstcpsvr_shared_v : public mutex {
+ int shutdown;
+ hstcpsvr_shared_v() : shutdown(0) { }
+};
+
+struct hstcpsvr_i;
+typedef std::auto_ptr<hstcpsvr_i> hstcpsvr_ptr;
+
+struct hstcpsvr_i {
+ virtual ~hstcpsvr_i() { }
+ virtual std::string start_listen() = 0;
+ static hstcpsvr_ptr create(const config& conf);
+};
+
+};
+
+#endif
+
diff --git a/plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp b/plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp
new file mode 100644
index 00000000..9863602a
--- /dev/null
+++ b/plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp
@@ -0,0 +1,957 @@
+
+// vim:sw=2:ai
+
+/*
+ * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
+ * See COPYRIGHT.txt for details.
+ */
+
+#include <my_global.h>
+#include <netinet/in.h>
+#include <errno.h>
+#include <poll.h>
+#include <unistd.h>
+#include <stdexcept>
+#include <signal.h>
+#include <list>
+#if __linux__
+#include <sys/epoll.h>
+#endif
+#ifdef HAVE_ALLOCA_H
+#include <alloca.h>
+#endif
+
+#include "hstcpsvr_worker.hpp"
+#include "string_buffer.hpp"
+#include "auto_ptrcontainer.hpp"
+#include "string_util.hpp"
+#include "escape.hpp"
+
+#define DBG_FD(x)
+#define DBG_TR(x)
+#define DBG_EP(x)
+#define DBG_MULTI(x)
+
+/* TODO */
+#if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
+#define MSG_NOSIGNAL 0
+#endif
+
+namespace dena {
+
+struct dbconnstate {
+ string_buffer readbuf;
+ string_buffer writebuf;
+ std::vector<prep_stmt> prep_stmts;
+ size_t resp_begin_pos;
+ size_t find_nl_pos;
+ void reset() {
+ readbuf.clear();
+ writebuf.clear();
+ prep_stmts.clear();
+ resp_begin_pos = 0;
+ find_nl_pos = 0;
+ }
+ dbconnstate() : resp_begin_pos(0), find_nl_pos(0) { }
+};
+
+struct hstcpsvr_conn;
+typedef auto_ptrcontainer< std::list<hstcpsvr_conn *> > hstcpsvr_conns_type;
+
+struct hstcpsvr_conn : public dbcallback_i {
+ public:
+ auto_file fd;
+ sockaddr_storage addr;
+ size_socket addr_len;
+ dbconnstate cstate;
+ std::string err;
+ size_t readsize;
+ bool nonblocking;
+ bool read_finished;
+ bool write_finished;
+ time_t nb_last_io;
+ hstcpsvr_conns_type::iterator conns_iter;
+ bool authorized;
+ public:
+ bool closed() const;
+ bool ok_to_close() const;
+ void reset();
+ int accept(const hstcpsvr_shared_c& cshared);
+ bool write_more(bool *more_r = 0);
+ bool read_more(bool *more_r = 0);
+ public:
+ virtual void dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v);
+ virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const;
+ virtual void dbcb_resp_short(uint32_t code, const char *msg);
+ virtual void dbcb_resp_short_num(uint32_t code, uint32_t value);
+ virtual void dbcb_resp_short_num64(uint32_t code, uint64_t value);
+ virtual void dbcb_resp_begin(size_t num_flds);
+ virtual void dbcb_resp_entry(const char *fld, size_t fldlen);
+ virtual void dbcb_resp_end();
+ virtual void dbcb_resp_cancel();
+ public:
+ hstcpsvr_conn() : addr_len(sizeof(addr)), readsize(4096),
+ nonblocking(false), read_finished(false), write_finished(false),
+ nb_last_io(0), authorized(false) { }
+};
+
+bool
+hstcpsvr_conn::closed() const
+{
+ return fd.get() < 0;
+}
+
+bool
+hstcpsvr_conn::ok_to_close() const
+{
+ return write_finished || (read_finished && cstate.writebuf.size() == 0);
+}
+
+void
+hstcpsvr_conn::reset()
+{
+ addr = sockaddr_storage();
+ addr_len = sizeof(addr);
+ cstate.reset();
+ fd.reset();
+ read_finished = false;
+ write_finished = false;
+}
+
+int
+hstcpsvr_conn::accept(const hstcpsvr_shared_c& cshared)
+{
+ reset();
+ return socket_accept(cshared.listen_fd.get(), fd, cshared.sockargs, addr,
+ addr_len, err);
+}
+
+bool
+hstcpsvr_conn::write_more(bool *more_r)
+{
+ if (write_finished || cstate.writebuf.size() == 0) {
+ return false;
+ }
+ const size_t wlen = cstate.writebuf.size();
+ ssize_t len = send(fd.get(), cstate.writebuf.begin(), wlen, MSG_NOSIGNAL);
+ if (len <= 0) {
+ if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
+ cstate.writebuf.clear();
+ write_finished = true;
+ }
+ return false;
+ }
+ cstate.writebuf.erase_front(len);
+ /* FIXME: reallocate memory if too large */
+ if (more_r) {
+ *more_r = (static_cast<size_t>(len) == wlen);
+ }
+ return true;
+}
+
+bool
+hstcpsvr_conn::read_more(bool *more_r)
+{
+ if (read_finished) {
+ return false;
+ }
+ const size_t block_size = readsize > 4096 ? readsize : 4096;
+ char *wp = cstate.readbuf.make_space(block_size);
+ const ssize_t len = read(fd.get(), wp, block_size);
+ if (len <= 0) {
+ if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
+ read_finished = true;
+ }
+ return false;
+ }
+ cstate.readbuf.space_wrote(len);
+ if (more_r) {
+ *more_r = (static_cast<size_t>(len) == block_size);
+ }
+ return true;
+}
+
+void
+hstcpsvr_conn::dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v)
+{
+ if (cstate.prep_stmts.size() <= pst_id) {
+ cstate.prep_stmts.resize(pst_id + 1);
+ }
+ cstate.prep_stmts[pst_id] = v;
+}
+
+const prep_stmt *
+hstcpsvr_conn::dbcb_get_prep_stmt(size_t pst_id) const
+{
+ if (cstate.prep_stmts.size() <= pst_id) {
+ return 0;
+ }
+ return &cstate.prep_stmts[pst_id];
+}
+
+void
+hstcpsvr_conn::dbcb_resp_short(uint32_t code, const char *msg)
+{
+ write_ui32(cstate.writebuf, code);
+ const size_t msglen = strlen(msg);
+ if (msglen != 0) {
+ cstate.writebuf.append_literal("\t1\t");
+ cstate.writebuf.append(msg, msg + msglen);
+ } else {
+ cstate.writebuf.append_literal("\t1");
+ }
+ cstate.writebuf.append_literal("\n");
+}
+
+void
+hstcpsvr_conn::dbcb_resp_short_num(uint32_t code, uint32_t value)
+{
+ write_ui32(cstate.writebuf, code);
+ cstate.writebuf.append_literal("\t1\t");
+ write_ui32(cstate.writebuf, value);
+ cstate.writebuf.append_literal("\n");
+}
+
+void
+hstcpsvr_conn::dbcb_resp_short_num64(uint32_t code, uint64_t value)
+{
+ write_ui32(cstate.writebuf, code);
+ cstate.writebuf.append_literal("\t1\t");
+ write_ui64(cstate.writebuf, value);
+ cstate.writebuf.append_literal("\n");
+}
+
+void
+hstcpsvr_conn::dbcb_resp_begin(size_t num_flds)
+{
+ cstate.resp_begin_pos = cstate.writebuf.size();
+ cstate.writebuf.append_literal("0\t");
+ write_ui32(cstate.writebuf, num_flds);
+}
+
+void
+hstcpsvr_conn::dbcb_resp_entry(const char *fld, size_t fldlen)
+{
+ if (fld != 0) {
+ cstate.writebuf.append_literal("\t");
+ escape_string(cstate.writebuf, fld, fld + fldlen);
+ } else {
+ static const char t[] = "\t\0";
+ cstate.writebuf.append(t, t + 2);
+ }
+}
+
+void
+hstcpsvr_conn::dbcb_resp_end()
+{
+ cstate.writebuf.append_literal("\n");
+ cstate.resp_begin_pos = 0;
+}
+
+void
+hstcpsvr_conn::dbcb_resp_cancel()
+{
+ cstate.writebuf.resize(cstate.resp_begin_pos);
+ cstate.resp_begin_pos = 0;
+}
+
+struct hstcpsvr_worker : public hstcpsvr_worker_i, private noncopyable {
+ hstcpsvr_worker(const hstcpsvr_worker_arg& arg);
+ virtual void run();
+ private:
+ const hstcpsvr_shared_c& cshared;
+ volatile hstcpsvr_shared_v& vshared;
+ long worker_id;
+ dbcontext_ptr dbctx;
+ hstcpsvr_conns_type conns; /* conns refs dbctx */
+ time_t last_check_time;
+ std::vector<pollfd> pfds;
+ #ifdef __linux__
+ std::vector<epoll_event> events_vec;
+ auto_file epoll_fd;
+ #endif
+ bool accept_enabled;
+ int accept_balance;
+ std::vector<string_ref> invalues_work;
+ std::vector<record_filter> filters_work;
+ private:
+ int run_one_nb();
+ int run_one_ep();
+ void execute_lines(hstcpsvr_conn& conn);
+ void execute_line(char *start, char *finish, hstcpsvr_conn& conn);
+ void do_open_index(char *start, char *finish, hstcpsvr_conn& conn);
+ void do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
+ char *finish, hstcpsvr_conn& conn);
+ void do_authorization(char *start, char *finish, hstcpsvr_conn& conn);
+};
+
+hstcpsvr_worker::hstcpsvr_worker(const hstcpsvr_worker_arg& arg)
+ : cshared(*arg.cshared), vshared(*arg.vshared), worker_id(arg.worker_id),
+ dbctx(cshared.dbptr->create_context(cshared.for_write_flag)),
+ last_check_time(time(0)), accept_enabled(true), accept_balance(0)
+{
+ #ifdef __linux__
+ if (cshared.sockargs.use_epoll) {
+ epoll_fd.reset(epoll_create(10));
+ if (epoll_fd.get() < 0) {
+ fatal_abort("epoll_create");
+ }
+ epoll_event ev;
+ memset(&ev, 0, sizeof(ev));
+ ev.events = EPOLLIN;
+ ev.data.ptr = 0;
+ if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
+ != 0) {
+ fatal_abort("epoll_ctl EPOLL_CTL_ADD");
+ }
+ events_vec.resize(10240);
+ }
+ #endif
+ accept_balance = cshared.conf.get_int("accept_balance", 0);
+}
+
+namespace {
+
+struct thr_init {
+ thr_init(const dbcontext_ptr& dc, volatile int& shutdown_flag) : dbctx(dc) {
+ dbctx->init_thread(this, shutdown_flag);
+ }
+ ~thr_init() {
+ dbctx->term_thread();
+ }
+ const dbcontext_ptr& dbctx;
+};
+
+}; // namespace
+
+void
+hstcpsvr_worker::run()
+{
+ thr_init initobj(dbctx, vshared.shutdown);
+
+ #ifdef __linux__
+ if (cshared.sockargs.use_epoll) {
+ while (!vshared.shutdown && dbctx->check_alive()) {
+ run_one_ep();
+ }
+ } else if (cshared.sockargs.nonblocking) {
+ while (!vshared.shutdown && dbctx->check_alive()) {
+ run_one_nb();
+ }
+ } else {
+ /* UNUSED */
+ fatal_abort("run_one");
+ }
+ #else
+ while (!vshared.shutdown && dbctx->check_alive()) {
+ run_one_nb();
+ }
+ #endif
+}
+
+int
+hstcpsvr_worker::run_one_nb()
+{
+ size_t nfds = 0;
+ /* CLIENT SOCKETS */
+ for (hstcpsvr_conns_type::const_iterator i = conns.begin();
+ i != conns.end(); ++i) {
+ if (pfds.size() <= nfds) {
+ pfds.resize(nfds + 1);
+ }
+ pollfd& pfd = pfds[nfds++];
+ pfd.fd = (*i)->fd.get();
+ short ev = 0;
+ if ((*i)->cstate.writebuf.size() != 0) {
+ ev = POLLOUT;
+ } else {
+ ev = POLLIN;
+ }
+ pfd.events = pfd.revents = ev;
+ }
+ /* LISTENER */
+ {
+ const size_t cpt = cshared.nb_conn_per_thread;
+ const short ev = (cpt > nfds) ? POLLIN : 0;
+ if (pfds.size() <= nfds) {
+ pfds.resize(nfds + 1);
+ }
+ pollfd& pfd = pfds[nfds++];
+ pfd.fd = cshared.listen_fd.get();
+ pfd.events = pfd.revents = ev;
+ }
+ /* POLL */
+ const int npollev = poll(&pfds[0], nfds, 1 * 1000);
+ dbctx->set_statistics(conns.size(), npollev);
+ const time_t now = time(0);
+ size_t j = 0;
+ const short mask_in = ~POLLOUT;
+ const short mask_out = POLLOUT | POLLERR | POLLHUP | POLLNVAL;
+ /* READ */
+ for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
+ ++i, ++j) {
+ pollfd& pfd = pfds[j];
+ if ((pfd.revents & mask_in) == 0) {
+ continue;
+ }
+ hstcpsvr_conn& conn = **i;
+ if (conn.read_more()) {
+ if (conn.cstate.readbuf.size() > 0) {
+ const char ch = conn.cstate.readbuf.begin()[0];
+ if (ch == 'Q') {
+ vshared.shutdown = 1;
+ } else if (ch == '/') {
+ conn.cstate.readbuf.clear();
+ conn.cstate.find_nl_pos = 0;
+ conn.cstate.writebuf.clear();
+ conn.read_finished = true;
+ conn.write_finished = true;
+ }
+ }
+ conn.nb_last_io = now;
+ }
+ }
+ /* EXECUTE */
+ j = 0;
+ for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
+ ++i, ++j) {
+ pollfd& pfd = pfds[j];
+ if ((pfd.revents & mask_in) == 0 || (*i)->cstate.readbuf.size() == 0) {
+ continue;
+ }
+ execute_lines(**i);
+ }
+ /* COMMIT */
+ dbctx->unlock_tables_if();
+ const bool commit_error = dbctx->get_commit_error();
+ dbctx->clear_error();
+ /* WRITE/CLOSE */
+ j = 0;
+ for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
+ ++j) {
+ pollfd& pfd = pfds[j];
+ hstcpsvr_conn& conn = **i;
+ hstcpsvr_conns_type::iterator icur = i;
+ ++i;
+ if (commit_error) {
+ conn.reset();
+ continue;
+ }
+ if ((pfd.revents & (mask_out | mask_in)) != 0) {
+ if (conn.write_more()) {
+ conn.nb_last_io = now;
+ }
+ }
+ if (cshared.sockargs.timeout != 0 &&
+ conn.nb_last_io + cshared.sockargs.timeout < now) {
+ conn.reset();
+ }
+ if (conn.closed() || conn.ok_to_close()) {
+ conns.erase_ptr(icur);
+ }
+ }
+ /* ACCEPT */
+ {
+ pollfd& pfd = pfds[nfds - 1];
+ if ((pfd.revents & mask_in) != 0) {
+ std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
+ c->nonblocking = true;
+ c->readsize = cshared.readsize;
+ c->accept(cshared);
+ if (c->fd.get() >= 0) {
+ if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
+ fatal_abort("F_SETFL O_NONBLOCK");
+ }
+ c->nb_last_io = now;
+ conns.push_back_ptr(c);
+ } else {
+ /* errno == 11 (EAGAIN) is not a fatal error. */
+ DENA_VERBOSE(100, fprintf(stderr,
+ "accept failed: errno=%d (not fatal)\n", errno));
+ }
+ }
+ }
+ DENA_VERBOSE(30, fprintf(stderr, "nb: %p nfds=%zu cns=%zu\n", this, nfds,
+ conns.size()));
+ if (conns.empty()) {
+ dbctx->close_tables_if();
+ }
+ dbctx->set_statistics(conns.size(), 0);
+ return 0;
+}
+
+#ifdef __linux__
+int
+hstcpsvr_worker::run_one_ep()
+{
+ epoll_event *const events = &events_vec[0];
+ const size_t num_events = events_vec.size();
+ const time_t now = time(0);
+ size_t in_count = 0, out_count = 0, accept_count = 0;
+ int nfds = epoll_wait(epoll_fd.get(), events, num_events, 1000);
+ /* READ/ACCEPT */
+ dbctx->set_statistics(conns.size(), nfds);
+ for (int i = 0; i < nfds; ++i) {
+ epoll_event& ev = events[i];
+ if ((ev.events & EPOLLIN) == 0) {
+ continue;
+ }
+ hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
+ if (conn == 0) {
+ /* listener */
+ ++accept_count;
+ DBG_EP(fprintf(stderr, "IN listener\n"));
+ std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
+ c->nonblocking = true;
+ c->readsize = cshared.readsize;
+ c->accept(cshared);
+ if (c->fd.get() >= 0) {
+ if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
+ fatal_abort("F_SETFL O_NONBLOCK");
+ }
+ epoll_event cev;
+ memset(&cev, 0, sizeof(cev));
+ cev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+ cev.data.ptr = c.get();
+ c->nb_last_io = now;
+ const int fd = c->fd.get();
+ conns.push_back_ptr(c);
+ conns.back()->conns_iter = --conns.end();
+ if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, fd, &cev) != 0) {
+ fatal_abort("epoll_ctl EPOLL_CTL_ADD");
+ }
+ } else {
+ DENA_VERBOSE(100, fprintf(stderr,
+ "accept failed: errno=%d (not fatal)\n", errno));
+ }
+ } else {
+ /* client connection */
+ ++in_count;
+ DBG_EP(fprintf(stderr, "IN client\n"));
+ bool more_data = false;
+ while (conn->read_more(&more_data)) {
+ DBG_EP(fprintf(stderr, "IN client read_more\n"));
+ conn->nb_last_io = now;
+ if (!more_data) {
+ break;
+ }
+ }
+ }
+ }
+ /* EXECUTE */
+ for (int i = 0; i < nfds; ++i) {
+ epoll_event& ev = events[i];
+ hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
+ if ((ev.events & EPOLLIN) == 0 || conn == 0 ||
+ conn->cstate.readbuf.size() == 0) {
+ continue;
+ }
+ const char ch = conn->cstate.readbuf.begin()[0];
+ if (ch == 'Q') {
+ vshared.shutdown = 1;
+ } else if (ch == '/') {
+ conn->cstate.readbuf.clear();
+ conn->cstate.find_nl_pos = 0;
+ conn->cstate.writebuf.clear();
+ conn->read_finished = true;
+ conn->write_finished = true;
+ } else {
+ execute_lines(*conn);
+ }
+ }
+ /* COMMIT */
+ dbctx->unlock_tables_if();
+ const bool commit_error = dbctx->get_commit_error();
+ dbctx->clear_error();
+ /* WRITE */
+ for (int i = 0; i < nfds; ++i) {
+ epoll_event& ev = events[i];
+ hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
+ if (commit_error && conn != 0) {
+ conn->reset();
+ continue;
+ }
+ if ((ev.events & EPOLLOUT) == 0) {
+ continue;
+ }
+ ++out_count;
+ if (conn == 0) {
+ /* listener */
+ DBG_EP(fprintf(stderr, "OUT listener\n"));
+ } else {
+ /* client connection */
+ DBG_EP(fprintf(stderr, "OUT client\n"));
+ bool more_data = false;
+ while (conn->write_more(&more_data)) {
+ DBG_EP(fprintf(stderr, "OUT client write_more\n"));
+ conn->nb_last_io = now;
+ if (!more_data) {
+ break;
+ }
+ }
+ }
+ }
+ /* CLOSE */
+ for (int i = 0; i < nfds; ++i) {
+ epoll_event& ev = events[i];
+ hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
+ if (conn != 0 && conn->ok_to_close()) {
+ DBG_EP(fprintf(stderr, "CLOSE close\n"));
+ conns.erase_ptr(conn->conns_iter);
+ }
+ }
+ /* TIMEOUT & cleanup */
+ if (last_check_time + 10 < now) {
+ for (hstcpsvr_conns_type::iterator i = conns.begin();
+ i != conns.end(); ) {
+ hstcpsvr_conns_type::iterator icur = i;
+ ++i;
+ if (cshared.sockargs.timeout != 0 &&
+ (*icur)->nb_last_io + cshared.sockargs.timeout < now) {
+ conns.erase_ptr((*icur)->conns_iter);
+ }
+ }
+ last_check_time = now;
+ DENA_VERBOSE(20, fprintf(stderr, "ep: %p nfds=%d cns=%zu\n", this, nfds,
+ conns.size()));
+ }
+ DENA_VERBOSE(30, fprintf(stderr, "%p in=%zu out=%zu ac=%zu, cns=%zu\n",
+ this, in_count, out_count, accept_count, conns.size()));
+ if (conns.empty()) {
+ dbctx->close_tables_if();
+ }
+ /* STATISTICS */
+ const size_t num_conns = conns.size();
+ dbctx->set_statistics(num_conns, 0);
+ /* ENABLE/DISABLE ACCEPT */
+ if (accept_balance != 0) {
+ cshared.thread_num_conns[worker_id] = num_conns;
+ size_t total_num_conns = 0;
+ for (long i = 0; i < cshared.num_threads; ++i) {
+ total_num_conns += cshared.thread_num_conns[i];
+ }
+ bool e_acc = false;
+ if (num_conns < 10 ||
+ total_num_conns * 2 > num_conns * cshared.num_threads) {
+ e_acc = true;
+ }
+ epoll_event ev;
+ memset(&ev, 0, sizeof(ev));
+ ev.events = EPOLLIN;
+ ev.data.ptr = 0;
+ if (e_acc == accept_enabled) {
+ } else if (e_acc) {
+ if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
+ != 0) {
+ fatal_abort("epoll_ctl EPOLL_CTL_ADD");
+ }
+ } else {
+ if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_DEL, cshared.listen_fd.get(), &ev)
+ != 0) {
+ fatal_abort("epoll_ctl EPOLL_CTL_ADD");
+ }
+ }
+ accept_enabled = e_acc;
+ }
+ return 0;
+}
+#endif
+
+void
+hstcpsvr_worker::execute_lines(hstcpsvr_conn& conn)
+{
+ DBG_MULTI(int cnt = 0);
+ dbconnstate& cstate = conn.cstate;
+ char *buf_end = cstate.readbuf.end();
+ char *line_begin = cstate.readbuf.begin();
+ char *find_pos = line_begin + cstate.find_nl_pos;
+ while (true) {
+ char *const nl = memchr_char(find_pos, '\n', buf_end - find_pos);
+ if (nl == 0) {
+ break;
+ }
+ char *const lf = (line_begin != nl && nl[-1] == '\r') ? nl - 1 : nl;
+ DBG_MULTI(cnt++);
+ execute_line(line_begin, lf, conn);
+ find_pos = line_begin = nl + 1;
+ }
+ cstate.readbuf.erase_front(line_begin - cstate.readbuf.begin());
+ cstate.find_nl_pos = cstate.readbuf.size();
+ DBG_MULTI(fprintf(stderr, "cnt=%d\n", cnt));
+}
+
+void
+hstcpsvr_worker::execute_line(char *start, char *finish, hstcpsvr_conn& conn)
+{
+ /* safe to modify, safe to dereference 'finish' */
+ char *const cmd_begin = start;
+ read_token(start, finish);
+ char *const cmd_end = start;
+ skip_one(start, finish);
+ if (cmd_begin == cmd_end) {
+ return conn.dbcb_resp_short(2, "cmd");
+ }
+ if (cmd_begin + 1 == cmd_end) {
+ if (cmd_begin[0] == 'P') {
+ if (cshared.require_auth && !conn.authorized) {
+ return conn.dbcb_resp_short(3, "unauth");
+ }
+ return do_open_index(start, finish, conn);
+ }
+ if (cmd_begin[0] == 'A') {
+ return do_authorization(start, finish, conn);
+ }
+ }
+ if (cmd_begin[0] >= '0' && cmd_begin[0] <= '9') {
+ if (cshared.require_auth && !conn.authorized) {
+ return conn.dbcb_resp_short(3, "unauth");
+ }
+ return do_exec_on_index(cmd_begin, cmd_end, start, finish, conn);
+ }
+ return conn.dbcb_resp_short(2, "cmd");
+}
+
+void
+hstcpsvr_worker::do_open_index(char *start, char *finish, hstcpsvr_conn& conn)
+{
+ const size_t pst_id = read_ui32(start, finish);
+ skip_one(start, finish);
+ /* dbname */
+ char *const dbname_begin = start;
+ read_token(start, finish);
+ char *const dbname_end = start;
+ skip_one(start, finish);
+ /* tblname */
+ char *const tblname_begin = start;
+ read_token(start, finish);
+ char *const tblname_end = start;
+ skip_one(start, finish);
+ /* idxname */
+ char *const idxname_begin = start;
+ read_token(start, finish);
+ char *const idxname_end = start;
+ skip_one(start, finish);
+ /* retfields */
+ char *const retflds_begin = start;
+ read_token(start, finish);
+ char *const retflds_end = start;
+ skip_one(start, finish);
+ /* filfields */
+ char *const filflds_begin = start;
+ read_token(start, finish);
+ char *const filflds_end = start;
+ dbname_end[0] = 0;
+ tblname_end[0] = 0;
+ idxname_end[0] = 0;
+ retflds_end[0] = 0;
+ filflds_end[0] = 0;
+ cmd_open_args args;
+ args.pst_id = pst_id;
+ args.dbn = dbname_begin;
+ args.tbl = tblname_begin;
+ args.idx = idxname_begin;
+ args.retflds = retflds_begin;
+ args.filflds = filflds_begin;
+ return dbctx->cmd_open(conn, args);
+}
+
+void
+hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
+ char *finish, hstcpsvr_conn& conn)
+{
+ cmd_exec_args args;
+ const size_t pst_id = read_ui32(cmd_begin, cmd_end);
+ if (pst_id >= conn.cstate.prep_stmts.size()) {
+ return conn.dbcb_resp_short(2, "stmtnum");
+ }
+ args.pst = &conn.cstate.prep_stmts[pst_id];
+ char *const op_begin = start;
+ read_token(start, finish);
+ char *const op_end = start;
+ args.op = string_ref(op_begin, op_end);
+ skip_one(start, finish);
+ const uint32_t fldnum = read_ui32(start, finish);
+ string_ref *const flds = DENA_ALLOCA_ALLOCATE(string_ref, fldnum);
+ auto_alloca_free<string_ref> flds_autofree(flds);
+ args.kvals = flds;
+ args.kvalslen = fldnum;
+ for (size_t i = 0; i < fldnum; ++i) {
+ skip_one(start, finish);
+ char *const f_begin = start;
+ read_token(start, finish);
+ char *const f_end = start;
+ if (is_null_expression(f_begin, f_end)) {
+ /* null */
+ flds[i] = string_ref();
+ } else {
+ /* non-null */
+ char *wp = f_begin;
+ unescape_string(wp, f_begin, f_end);
+ flds[i] = string_ref(f_begin, wp - f_begin);
+ }
+ }
+ skip_one(start, finish);
+ args.limit = read_ui32(start, finish);
+ skip_one(start, finish);
+ args.skip = read_ui32(start, finish);
+ if (start == finish) {
+ /* simple query */
+ return dbctx->cmd_exec(conn, args);
+ }
+ /* has more options */
+ skip_one(start, finish);
+ /* in-clause */
+ if (start[0] == '@') {
+ read_token(start, finish); /* '@' */
+ skip_one(start, finish);
+ args.invalues_keypart = read_ui32(start, finish);
+ skip_one(start, finish);
+ args.invalueslen = read_ui32(start, finish);
+ if (args.invalueslen <= 0) {
+ return conn.dbcb_resp_short(2, "invalueslen");
+ }
+ if (invalues_work.size() < args.invalueslen) {
+ invalues_work.resize(args.invalueslen);
+ }
+ args.invalues = &invalues_work[0];
+ for (uint32_t i = 0; i < args.invalueslen; ++i) {
+ skip_one(start, finish);
+ char *const invalue_begin = start;
+ read_token(start, finish);
+ char *const invalue_end = start;
+ char *wp = invalue_begin;
+ unescape_string(wp, invalue_begin, invalue_end);
+ invalues_work[i] = string_ref(invalue_begin, wp - invalue_begin);
+ }
+ skip_one(start, finish);
+ }
+ if (start == finish) {
+ /* no more options */
+ return dbctx->cmd_exec(conn, args);
+ }
+ /* filters */
+ size_t filters_count = 0;
+ while (start != finish && (start[0] == 'W' || start[0] == 'F')) {
+ char *const filter_type_begin = start;
+ read_token(start, finish);
+ char *const filter_type_end = start;
+ skip_one(start, finish);
+ char *const filter_op_begin = start;
+ read_token(start, finish);
+ char *const filter_op_end = start;
+ skip_one(start, finish);
+ const uint32_t ff_offset = read_ui32(start, finish);
+ skip_one(start, finish);
+ char *const filter_val_begin = start;
+ read_token(start, finish);
+ char *const filter_val_end = start;
+ skip_one(start, finish);
+ if (filters_work.size() <= filters_count) {
+ filters_work.resize(filters_count + 1);
+ }
+ record_filter& fi = filters_work[filters_count];
+ if (filter_type_end != filter_type_begin + 1) {
+ return conn.dbcb_resp_short(2, "filtertype");
+ }
+ fi.filter_type = (filter_type_begin[0] == 'W')
+ ? record_filter_type_break : record_filter_type_skip;
+ const uint32_t num_filflds = args.pst->get_filter_fields().size();
+ if (ff_offset >= num_filflds) {
+ return conn.dbcb_resp_short(2, "filterfld");
+ }
+ fi.op = string_ref(filter_op_begin, filter_op_end);
+ fi.ff_offset = ff_offset;
+ if (is_null_expression(filter_val_begin, filter_val_end)) {
+ /* null */
+ fi.val = string_ref();
+ } else {
+ /* non-null */
+ char *wp = filter_val_begin;
+ unescape_string(wp, filter_val_begin, filter_val_end);
+ fi.val = string_ref(filter_val_begin, wp - filter_val_begin);
+ }
+ ++filters_count;
+ }
+ if (filters_count > 0) {
+ if (filters_work.size() <= filters_count) {
+ filters_work.resize(filters_count + 1);
+ }
+ filters_work[filters_count].op = string_ref(); /* sentinel */
+ args.filters = &filters_work[0];
+ } else {
+ args.filters = 0;
+ }
+ if (start == finish) {
+ /* no modops */
+ return dbctx->cmd_exec(conn, args);
+ }
+ /* has modops */
+ char *const mod_op_begin = start;
+ read_token(start, finish);
+ char *const mod_op_end = start;
+ args.mod_op = string_ref(mod_op_begin, mod_op_end);
+ const size_t num_uvals = args.pst->get_ret_fields().size();
+ string_ref *const uflds = DENA_ALLOCA_ALLOCATE(string_ref, num_uvals);
+ auto_alloca_free<string_ref> uflds_autofree(uflds);
+ for (size_t i = 0; i < num_uvals; ++i) {
+ skip_one(start, finish);
+ char *const f_begin = start;
+ read_token(start, finish);
+ char *const f_end = start;
+ if (is_null_expression(f_begin, f_end)) {
+ /* null */
+ uflds[i] = string_ref();
+ } else {
+ /* non-null */
+ char *wp = f_begin;
+ unescape_string(wp, f_begin, f_end);
+ uflds[i] = string_ref(f_begin, wp - f_begin);
+ }
+ }
+ args.uvals = uflds;
+ return dbctx->cmd_exec(conn, args);
+}
+
+void
+hstcpsvr_worker::do_authorization(char *start, char *finish,
+ hstcpsvr_conn& conn)
+{
+ /* auth type */
+ char *const authtype_begin = start;
+ read_token(start, finish);
+ char *const authtype_end = start;
+ const size_t authtype_len = authtype_end - authtype_begin;
+ skip_one(start, finish);
+ /* key */
+ char *const key_begin = start;
+ read_token(start, finish);
+ char *const key_end = start;
+ const size_t key_len = key_end - key_begin;
+ authtype_end[0] = 0;
+ key_end[0] = 0;
+ char *wp = key_begin;
+ unescape_string(wp, key_begin, key_end);
+ if (authtype_len != 1 || authtype_begin[0] != '1') {
+ return conn.dbcb_resp_short(3, "authtype");
+ }
+ if (cshared.plain_secret.size() == key_len &&
+ memcmp(cshared.plain_secret.data(), key_begin, key_len) == 0) {
+ conn.authorized = true;
+ } else {
+ conn.authorized = false;
+ }
+ if (!conn.authorized) {
+ return conn.dbcb_resp_short(3, "unauth");
+ } else {
+ return conn.dbcb_resp_short(0, "");
+ }
+}
+
+hstcpsvr_worker_ptr
+hstcpsvr_worker_i::create(const hstcpsvr_worker_arg& arg)
+{
+ return hstcpsvr_worker_ptr(new hstcpsvr_worker(arg));
+}
+
+};
+
diff --git a/plugin/handler_socket/handlersocket/hstcpsvr_worker.hpp b/plugin/handler_socket/handlersocket/hstcpsvr_worker.hpp
new file mode 100644
index 00000000..497581c2
--- /dev/null
+++ b/plugin/handler_socket/handlersocket/hstcpsvr_worker.hpp
@@ -0,0 +1,35 @@
+
+// vim:sw=2:ai
+
+/*
+ * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
+ * See COPYRIGHT.txt for details.
+ */
+
+#ifndef DENA_HSTCPSVR_WORKER_HPP
+#define DENA_HSTCPSVR_WORKER_HPP
+
+#include "hstcpsvr.hpp"
+
+namespace dena {
+
+struct hstcpsvr_worker_i;
+typedef std::auto_ptr<hstcpsvr_worker_i> hstcpsvr_worker_ptr;
+
+struct hstcpsvr_worker_arg {
+ const hstcpsvr_shared_c *cshared;
+ volatile hstcpsvr_shared_v *vshared;
+ long worker_id;
+ hstcpsvr_worker_arg() : cshared(0), vshared(0), worker_id(0) { }
+};
+
+struct hstcpsvr_worker_i {
+ virtual ~hstcpsvr_worker_i() { }
+ virtual void run() = 0;
+ static hstcpsvr_worker_ptr create(const hstcpsvr_worker_arg& arg);
+};
+
+};
+
+#endif
+
diff --git a/plugin/handler_socket/handlersocket/mysql_incl.hpp b/plugin/handler_socket/handlersocket/mysql_incl.hpp
new file mode 100644
index 00000000..0d056a7e
--- /dev/null
+++ b/plugin/handler_socket/handlersocket/mysql_incl.hpp
@@ -0,0 +1,55 @@
+
+// vim:sw=2:ai
+
+/*
+ * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
+ * See COPYRIGHT.txt for details.
+ */
+
+#ifndef DENA_MYSQL_INCL_HPP
+#define DENA_MYSQL_INCL_HPP
+
+#ifndef HAVE_CONFIG_H
+#define HAVE_CONFIG_H
+#endif
+
+#ifndef MYSQL_DYNAMIC_PLUGIN
+#define MYSQL_DYNAMIC_PLUGIN
+#endif
+
+#define MYSQL_SERVER 1
+
+#include <my_global.h>
+#include <mysql_version.h>
+
+#if MYSQL_VERSION_ID >= 50505
+#include <my_pthread.h>
+#include <sql_priv.h>
+#include "sql_class.h"
+#include "unireg.h"
+#include "lock.h"
+#include "key.h" // key_copy()
+#include <my_global.h>
+#include <mysql/plugin.h>
+#include <transaction.h>
+#include <sql_base.h>
+// FIXME FIXME FIXME
+#define safeFree(X) my_free(X)
+#undef pthread_cond_timedwait
+#undef pthread_mutex_lock
+#undef pthread_mutex_unlock
+#define pthread_cond_timedwait mysql_cond_timedwait
+#define pthread_mutex_lock mysql_mutex_lock
+#define pthread_mutex_unlock mysql_mutex_unlock
+#define current_stmt_binlog_row_based is_current_stmt_binlog_format_row
+#define clear_current_stmt_binlog_row_based clear_current_stmt_binlog_format_row
+
+#else
+#include "mysql_priv.h"
+#endif
+
+#undef min
+#undef max
+
+#endif
+