diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:07:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:07:14 +0000 |
commit | a175314c3e5827eb193872241446f2f8f5c9d33c (patch) | |
tree | cd3d60ca99ae00829c52a6ca79150a5b6e62528b /plugin/handler_socket/handlersocket | |
parent | Initial commit. (diff) | |
download | mariadb-10.5-a175314c3e5827eb193872241446f2f8f5c9d33c.tar.xz mariadb-10.5-a175314c3e5827eb193872241446f2f8f5c9d33c.zip |
Adding upstream version 1:10.5.12.upstream/1%10.5.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'plugin/handler_socket/handlersocket')
-rw-r--r-- | plugin/handler_socket/handlersocket/COPYRIGHT.txt | 27 | ||||
-rw-r--r-- | plugin/handler_socket/handlersocket/Makefile.am | 8 | ||||
-rw-r--r-- | plugin/handler_socket/handlersocket/Makefile.plain.template | 31 | ||||
-rw-r--r-- | plugin/handler_socket/handlersocket/database.cpp | 1180 | ||||
-rw-r--r-- | plugin/handler_socket/handlersocket/database.hpp | 142 | ||||
-rw-r--r-- | plugin/handler_socket/handlersocket/handlersocket.cpp | 217 | ||||
-rw-r--r-- | plugin/handler_socket/handlersocket/handlersocket.spec.template | 29 | ||||
-rw-r--r-- | plugin/handler_socket/handlersocket/hstcpsvr.cpp | 147 | ||||
-rw-r--r-- | plugin/handler_socket/handlersocket/hstcpsvr.hpp | 58 | ||||
-rw-r--r-- | plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp | 957 | ||||
-rw-r--r-- | plugin/handler_socket/handlersocket/hstcpsvr_worker.hpp | 35 | ||||
-rw-r--r-- | plugin/handler_socket/handlersocket/mysql_incl.hpp | 55 |
12 files changed, 2886 insertions, 0 deletions
diff --git a/plugin/handler_socket/handlersocket/COPYRIGHT.txt b/plugin/handler_socket/handlersocket/COPYRIGHT.txt new file mode 100644 index 00000000..41dda127 --- /dev/null +++ b/plugin/handler_socket/handlersocket/COPYRIGHT.txt @@ -0,0 +1,27 @@ + + Copyright (c) 2010 DeNA Co.,Ltd. + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of DeNA Co.,Ltd. nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY DeNA Co.,Ltd. "AS IS" AND ANY EXPRESS OR + IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + EVENT SHALL DeNA Co.,Ltd. BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + diff --git a/plugin/handler_socket/handlersocket/Makefile.am b/plugin/handler_socket/handlersocket/Makefile.am new file mode 100644 index 00000000..7e47209b --- /dev/null +++ b/plugin/handler_socket/handlersocket/Makefile.am @@ -0,0 +1,8 @@ +pkgplugindir = $(PLUGIN_DIR) +CXXFLAGS += -fimplicit-templates +noinst_HEADERS = database.hpp hstcpsvr.hpp hstcpsvr_worker.hpp mysql_incl.hpp +pkgplugin_LTLIBRARIES = handlersocket.la +handlersocket_la_LDFLAGS = -module ../libhsclient/libhsclient.la -L$(top_builddir)/libservices -lmysqlservices +handlersocket_la_CXXFLAGS = $(MYSQL_INC) $(MYSQL_CFLAGS) $(AM_CXXFLAGS) -I$(srcdir)/../libhsclient +handlersocket_la_SOURCES = database.cpp handlersocket.cpp \ + hstcpsvr_worker.cpp hstcpsvr.cpp diff --git a/plugin/handler_socket/handlersocket/Makefile.plain.template b/plugin/handler_socket/handlersocket/Makefile.plain.template new file mode 100644 index 00000000..4d5f8c10 --- /dev/null +++ b/plugin/handler_socket/handlersocket/Makefile.plain.template @@ -0,0 +1,31 @@ + +MYSQL_INC = HANDLERSOCKET_MYSQL_INC +MYSQL_LIB = HANDLERSOCKET_MYSQL_LIB + +CXX = g++ -Wall -g -fno-rtti -fno-exceptions -fPIC -DPIC +LIBS = $(MYSQL_LIB) -lhsclient -lpthread -lz +CXXFLAGS = -I/usr/include/handlersocket $(MYSQL_INC) +LDFLAGS = + +CXXFLAGS += -O3 -DNDEBUG + +HANDLERSOCKET_OBJS = database.o hstcpsvr.o hstcpsvr_worker.o + +all: handlersocket.so + +handlersocket.so: $(HANDLERSOCKET_OBJS) handlersocket.cpp + $(CXX) $(CXXFLAGS) -fno-strict-aliasing -shared $^ -o $@ $(LDFLAGS) \ + -Wl,-soname -Wl,$@ $(LIBS) +clean: + rm -f *.a *.so *.o + +LIBDIR = $(shell \ + if [ -e /usr/lib64/mysql ]; then echo /usr/lib64; else echo /usr/lib; fi) + +install: handlersocket.so + sudo sh -c 'ulimit -c unlimited ; /etc/init.d/mysql stop ; \ + cp handlersocket.so handlersocket.so.cpy && \ + mv handlersocket.so.cpy \ + $(LIBDIR)/mysql/plugin/handlersocket.so && \ + /etc/init.d/mysql start' + diff --git a/plugin/handler_socket/handlersocket/database.cpp b/plugin/handler_socket/handlersocket/database.cpp new file mode 100644 index 00000000..937b1177 --- /dev/null +++ b/plugin/handler_socket/handlersocket/database.cpp @@ -0,0 +1,1180 @@ + +// vim:sw=2:ai + +/* + * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. + * See COPYRIGHT.txt for details. + */ + +#include <my_global.h> +#include <string.h> + +#include "database.hpp" +#include "string_util.hpp" +#include "escape.hpp" +#include "mysql_incl.hpp" + +#define DBG_KEY(x) +#define DBG_SHUT(x) +#define DBG_LOCK(x) +#define DBG_THR(x) +#define DBG_CMP(x) +#define DBG_FLD(x) +#define DBG_FILTER(x) +#define DBG_REFCNT(x) +#define DBG_KEYLEN(x) +#define DBG_DELETED + +/* status variables */ +unsigned long long int open_tables_count; +unsigned long long int close_tables_count; +unsigned long long int lock_tables_count; +unsigned long long int unlock_tables_count; +unsigned long long int index_exec_count; + +namespace dena { + +prep_stmt::prep_stmt() + : dbctx(0), table_id(static_cast<size_t>(-1)), + idxnum(static_cast<size_t>(-1)) +{ +} +prep_stmt::prep_stmt(dbcontext_i *c, size_t tbl, size_t idx, + const fields_type& rf, const fields_type& ff) + : dbctx(c), table_id(tbl), idxnum(idx), ret_fields(rf), filter_fields(ff) +{ + if (dbctx) { + dbctx->table_addref(table_id); + } +} +prep_stmt::~prep_stmt() +{ + if (dbctx) { + dbctx->table_release(table_id); + } +} + +prep_stmt::prep_stmt(const prep_stmt& x) + : dbctx(x.dbctx), table_id(x.table_id), idxnum(x.idxnum), + ret_fields(x.ret_fields), filter_fields(x.filter_fields) +{ + if (dbctx) { + dbctx->table_addref(table_id); + } +} + +prep_stmt& +prep_stmt::operator =(const prep_stmt& x) +{ + if (this != &x) { + if (dbctx) { + dbctx->table_release(table_id); + } + dbctx = x.dbctx; + table_id = x.table_id; + idxnum = x.idxnum; + ret_fields = x.ret_fields; + filter_fields = x.filter_fields; + if (dbctx) { + dbctx->table_addref(table_id); + } + } + return *this; +} + +struct database : public database_i, private noncopyable { + database(const config& c); + virtual ~database(); + virtual dbcontext_ptr create_context(bool for_write) volatile; + virtual void stop() volatile; + virtual const config& get_conf() const volatile; + public: + int child_running; + private: + config conf; +}; + +struct tablevec_entry { + TABLE *table; + size_t refcount; + bool modified; + tablevec_entry() : table(0), refcount(0), modified(false) { } +}; + +struct expr_user_lock : private noncopyable { + expr_user_lock(THD *thd, int timeout) + : lck_key(thd, "handlersocket_wr", 16, &my_charset_latin1), + lck_timeout(thd, timeout), + lck_func_get_lock(thd, &lck_key, &lck_timeout), + lck_func_release_lock(thd, &lck_key) + { + lck_key.fix_fields(thd, 0); + lck_timeout.fix_fields(thd, 0); + lck_func_get_lock.fix_fields(thd, 0); + lck_func_release_lock.fix_fields(thd, 0); + } + long long get_lock() { + return lck_func_get_lock.val_int(); + } + long long release_lock() { + return lck_func_release_lock.val_int(); + } + private: + Item_string lck_key; + Item_int lck_timeout; + Item_func_get_lock lck_func_get_lock; + Item_func_release_lock lck_func_release_lock; +}; + +struct dbcontext : public dbcontext_i, private noncopyable { + dbcontext(volatile database *d, bool for_write); + virtual ~dbcontext(); + virtual void init_thread(const void *stack_botton, + volatile int& shutdown_flag); + virtual void term_thread(); + virtual bool check_alive(); + virtual void lock_tables_if(); + virtual void unlock_tables_if(); + virtual bool get_commit_error(); + virtual void clear_error(); + virtual void close_tables_if(); + virtual void table_addref(size_t tbl_id); + virtual void table_release(size_t tbl_id); + virtual void cmd_open(dbcallback_i& cb, const cmd_open_args& args); + virtual void cmd_exec(dbcallback_i& cb, const cmd_exec_args& args); + virtual void set_statistics(size_t num_conns, size_t num_active); + private: + int set_thread_message(const char *fmt, ...) + __attribute__((format (printf, 2, 3))); + bool parse_fields(TABLE *const table, const char *str, + prep_stmt::fields_type& flds); + void cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst, + const string_ref *fvals, size_t fvalslen); + void cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst, + const string_ref *fvals, size_t fvalslen); + void cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst, + ha_rkey_function find_flag, const cmd_exec_args& args); + size_t calc_filter_buf_size(TABLE *table, const prep_stmt& pst, + const record_filter *filters); + bool fill_filter_buf(TABLE *table, const prep_stmt& pst, + const record_filter *filters, uchar *filter_buf, size_t len); + int check_filter(dbcallback_i& cb, TABLE *table, const prep_stmt& pst, + const record_filter *filters, const uchar *filter_buf); + void resp_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst); + void dump_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst); + int modify_record(dbcallback_i& cb, TABLE *const table, + const prep_stmt& pst, const cmd_exec_args& args, char mod_op, + size_t& modified_count); + private: + typedef std::vector<tablevec_entry> table_vec_type; + typedef std::pair<std::string, std::string> table_name_type; + typedef std::map<table_name_type, size_t> table_map_type; + private: + volatile database *const dbref; + bool for_write_flag; + THD *thd; + MYSQL_LOCK *lock; + bool lock_failed; + std::auto_ptr<expr_user_lock> user_lock; + int user_level_lock_timeout; + bool user_level_lock_locked; + bool commit_error; + std::vector<char> info_message_buf; + table_vec_type table_vec; + table_map_type table_map; +}; + +database::database(const config& c) + : child_running(1), conf(c) +{ +} + +database::~database() +{ +} + +dbcontext_ptr +database::create_context(bool for_write) volatile +{ + return dbcontext_ptr(new dbcontext(this, for_write)); +} + +void +database::stop() volatile +{ + child_running = false; +} + +const config& +database::get_conf() const volatile +{ + return const_cast<const config&>(conf); +} + +database_ptr +database_i::create(const config& conf) +{ + return database_ptr(new database(conf)); +} + +dbcontext::dbcontext(volatile database *d, bool for_write) + : dbref(d), for_write_flag(for_write), thd(0), lock(0), lock_failed(false), + user_level_lock_timeout(0), user_level_lock_locked(false), + commit_error(false) +{ + info_message_buf.resize(8192); + user_level_lock_timeout = d->get_conf().get_int("wrlock_timeout", 12); +} + +dbcontext::~dbcontext() +{ +} + +namespace { + +int +wait_server_to_start(THD *thd, volatile int& shutdown_flag) +{ + int r = 0; + DBG_SHUT(fprintf(stderr, "HNDSOCK wsts\n")); + pthread_mutex_lock(&LOCK_server_started); + while (!mysqld_server_started) { + timespec abstime; + set_timespec(abstime, 1); + pthread_cond_timedwait(&COND_server_started, &LOCK_server_started, + &abstime); + pthread_mutex_unlock(&LOCK_server_started); + pthread_mutex_lock(&thd->mysys_var->mutex); + killed_state st = thd->killed; + pthread_mutex_unlock(&thd->mysys_var->mutex); + DBG_SHUT(fprintf(stderr, "HNDSOCK wsts kst %d\n", (int)st)); + pthread_mutex_lock(&LOCK_server_started); + if (st != NOT_KILLED) { + DBG_SHUT(fprintf(stderr, "HNDSOCK wsts kst %d break\n", (int)st)); + r = -1; + break; + } + if (shutdown_flag) { + DBG_SHUT(fprintf(stderr, "HNDSOCK wsts kst shut break\n")); + r = -1; + break; + } + } + pthread_mutex_unlock(&LOCK_server_started); + DBG_SHUT(fprintf(stderr, "HNDSOCK wsts done\n")); + return r; +} + +}; // namespace + +#define DENA_THR_OFFSETOF(fld) ((char *)(&thd->fld) - (char *)thd) + +void +dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag) +{ + DBG_THR(fprintf(stderr, "HNDSOCK init thread\n")); + { + my_thread_init(); + thd = new THD(0); + thd->thread_stack = (char *)stack_bottom; + DBG_THR(fprintf(stderr, + "thread_stack = %p sizeof(THD)=%zu sizeof(mtx)=%zu " + "O: %zu %zu %zu %zu %zu %zu %zu\n", + thd->thread_stack, sizeof(THD), sizeof(mysql_mutex_t), + DENA_THR_OFFSETOF(mdl_context), + DENA_THR_OFFSETOF(net), + DENA_THR_OFFSETOF(LOCK_thd_data), + DENA_THR_OFFSETOF(mysys_var), + DENA_THR_OFFSETOF(stmt_arena), + DENA_THR_OFFSETOF(limit_found_rows), + DENA_THR_OFFSETOF(locked_tables_list))); + thd->store_globals(); + thd->system_thread = static_cast<enum_thread_type>(1<<30UL); + memset(&thd->net, 0, sizeof(thd->net)); + if (for_write_flag) { + #if MYSQL_VERSION_ID >= 50505 + thd->variables.option_bits |= OPTION_BIN_LOG; + #else + thd->options |= OPTION_BIN_LOG; + #endif + safeFree((char*) thd->db.str); + thd->db.str= my_strdup(PSI_NOT_INSTRUMENTED, "handlersocket", MYF(0)); + thd->db.length= sizeof("handlersocket")-1; + } + thd->variables.option_bits |= OPTION_TABLE_LOCK; + set_current_thd(thd); + DBG_THR(fprintf(stderr, "HNDSOCK x0 %p\n", thd)); + } + { + thd->thread_id = next_thread_id(); + server_threads.insert(thd); + } + + DBG_THR(fprintf(stderr, "HNDSOCK init thread wsts\n")); + wait_server_to_start(thd, shutdown_flag); + DBG_THR(fprintf(stderr, "HNDSOCK init thread done\n")); + + thd_proc_info(thd, &info_message_buf[0]); + set_thread_message("hs:listening"); + DBG_THR(fprintf(stderr, "HNDSOCK x1 %p\n", thd)); + + lex_start(thd); + + user_lock.reset(new expr_user_lock(thd, user_level_lock_timeout)); +} + +int +dbcontext::set_thread_message(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + const int n = vsnprintf(&info_message_buf[0], info_message_buf.size(), + fmt, ap); + va_end(ap); + return n; +} + +void +dbcontext::term_thread() +{ + DBG_THR(fprintf(stderr, "HNDSOCK thread end %p\n", thd)); + close_tables_if(); + set_current_thd(nullptr); + { + delete thd; + thd = 0; + my_thread_end(); + } +} + +bool +dbcontext::check_alive() +{ + pthread_mutex_lock(&thd->mysys_var->mutex); + killed_state st = thd->killed; + pthread_mutex_unlock(&thd->mysys_var->mutex); + DBG_SHUT(fprintf(stderr, "chk HNDSOCK kst %p %p %d %zu\n", thd, &thd->killed, + (int)st, sizeof(*thd))); + if (st != NOT_KILLED) { + DBG_SHUT(fprintf(stderr, "chk HNDSOCK kst %d break\n", (int)st)); + return false; + } + return true; +} + +void +dbcontext::lock_tables_if() +{ + if (lock_failed) { + return; + } + if (for_write_flag && !user_level_lock_locked) { + if (user_lock->get_lock()) { + user_level_lock_locked = true; + } else { + lock_failed = true; + return; + } + } + if (lock == 0) { + const size_t num_max = table_vec.size(); + TABLE **const tables = DENA_ALLOCA_ALLOCATE(TABLE *, num_max + 1); + size_t num_open = 0; + for (size_t i = 0; i < num_max; ++i) { + if (table_vec[i].refcount > 0) { + tables[num_open++] = table_vec[i].table; + } + table_vec[i].modified = false; + } + #if MYSQL_VERSION_ID >= 50505 + lock = thd->lock = mysql_lock_tables(thd, &tables[0], num_open, 0); + #else + bool need_reopen= false; + lock = thd->lock = mysql_lock_tables(thd, &tables[0], num_open, + MYSQL_LOCK_NOTIFY_IF_NEED_REOPEN, &need_reopen); + #endif + statistic_increment(lock_tables_count, &LOCK_status); + thd_proc_info(thd, &info_message_buf[0]); + DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK lock tables %p %p %zu %zu\n", + thd, lock, num_max, num_open)); + if (lock == 0) { + lock_failed = true; + DENA_VERBOSE(10, fprintf(stderr, "HNDSOCK failed to lock tables %p\n", + thd)); + } + if (for_write_flag) { + #if MYSQL_VERSION_ID >= 50505 + thd->set_current_stmt_binlog_format_row(); + #else + thd->current_stmt_binlog_row_based = 1; + #endif + } + DENA_ALLOCA_FREE(tables); + } + DBG_LOCK(fprintf(stderr, "HNDSOCK tblnum=%d\n", (int)tblnum)); +} + +void +dbcontext::unlock_tables_if() +{ + if (lock != 0) { + DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK unlock tables %p %p\n", + thd, thd->lock)); + if (for_write_flag) { + for (size_t i = 0; i < table_vec.size(); ++i) { + if (table_vec[i].modified) { + query_cache_invalidate3(thd, table_vec[i].table, 1); + table_vec[i].table->file->ha_release_auto_increment(); + } + } + } + { + bool suc = true; + #if MYSQL_VERSION_ID >= 50505 + suc = (trans_commit_stmt(thd) == 0); + #else + suc = (ha_autocommit_or_rollback(thd, 0) == 0); + #endif + if (!suc) { + commit_error = true; + DENA_VERBOSE(10, fprintf(stderr, + "HNDSOCK unlock tables: commit failed\n")); + } + } + mysql_unlock_tables(thd, lock); + lock = thd->lock = 0; + statistic_increment(unlock_tables_count, &LOCK_status); + } + if (user_level_lock_locked) { + if (user_lock->release_lock()) { + user_level_lock_locked = false; + } + } +} + +bool +dbcontext::get_commit_error() +{ + return commit_error; +} + +void +dbcontext::clear_error() +{ + lock_failed = false; + commit_error = false; +} + +void +dbcontext::close_tables_if() +{ + unlock_tables_if(); + DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK close tables\n")); + close_thread_tables(thd); + thd->mdl_context.release_transactional_locks(thd); + if (!table_vec.empty()) { + statistic_increment(close_tables_count, &LOCK_status); + table_vec.clear(); + table_map.clear(); + } +} + +void +dbcontext::table_addref(size_t tbl_id) +{ + table_vec[tbl_id].refcount += 1; + DBG_REFCNT(fprintf(stderr, "%p %zu %zu addref\n", this, tbl_id, + table_vec[tbl_id].refcount)); +} + +void +dbcontext::table_release(size_t tbl_id) +{ + table_vec[tbl_id].refcount -= 1; + DBG_REFCNT(fprintf(stderr, "%p %zu %zu release\n", this, tbl_id, + table_vec[tbl_id].refcount)); +} + +void +dbcontext::resp_record(dbcallback_i& cb, TABLE *const table, + const prep_stmt& pst) +{ + char rwpstr_buf[64]; + String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin); + const prep_stmt::fields_type& rf = pst.get_ret_fields(); + const size_t n = rf.size(); + for (size_t i = 0; i < n; ++i) { + uint32_t fn = rf[i]; + Field *const fld = table->field[fn]; + DBG_FLD(fprintf(stderr, "fld=%p %zu\n", fld, fn)); + if (fld->is_null()) { + /* null */ + cb.dbcb_resp_entry(0, 0); + } else { + fld->val_str(&rwpstr, &rwpstr); + const size_t len = rwpstr.length(); + if (len != 0) { + /* non-empty */ + cb.dbcb_resp_entry(rwpstr.ptr(), rwpstr.length()); + } else { + /* empty */ + static const char empty_str[] = ""; + cb.dbcb_resp_entry(empty_str, 0); + } + } + } +} + +void +dbcontext::dump_record(dbcallback_i& cb, TABLE *const table, + const prep_stmt& pst) +{ + char rwpstr_buf[64]; + String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin); + const prep_stmt::fields_type& rf = pst.get_ret_fields(); + const size_t n = rf.size(); + for (size_t i = 0; i < n; ++i) { + uint32_t fn = rf[i]; + Field *const fld = table->field[fn]; + if (fld->is_null()) { + /* null */ + fprintf(stderr, "NULL"); + } else { + fld->val_str(&rwpstr, &rwpstr); + const std::string s(rwpstr.ptr(), rwpstr.length()); + fprintf(stderr, "[%s]", s.c_str()); + } + } + fprintf(stderr, "\n"); +} + +int +dbcontext::modify_record(dbcallback_i& cb, TABLE *const table, + const prep_stmt& pst, const cmd_exec_args& args, char mod_op, + size_t& modified_count) +{ + if (mod_op == 'U') { + /* update */ + handler *const hnd = table->file; + uchar *const buf = table->record[0]; + store_record(table, record[1]); + const prep_stmt::fields_type& rf = pst.get_ret_fields(); + const size_t n = rf.size(); + for (size_t i = 0; i < n; ++i) { + const string_ref& nv = args.uvals[i]; + uint32_t fn = rf[i]; + Field *const fld = table->field[fn]; + if (nv.begin() == 0) { + fld->set_null(); + } else { + fld->set_notnull(); + fld->store(nv.begin(), nv.size(), &my_charset_bin); + } + } + table_vec[pst.get_table_id()].modified = true; + const int r = hnd->ha_update_row(table->record[1], buf); + if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) { + return r; + } + ++modified_count; /* TODO: HA_ERR_RECORD_IS_THE_SAME? */ + } else if (mod_op == 'D') { + /* delete */ + handler *const hnd = table->file; + table_vec[pst.get_table_id()].modified = true; + const int r = hnd->ha_delete_row(table->record[0]); + if (r != 0) { + return r; + } + ++modified_count; + } else if (mod_op == '+' || mod_op == '-') { + /* increment/decrement */ + handler *const hnd = table->file; + uchar *const buf = table->record[0]; + store_record(table, record[1]); + const prep_stmt::fields_type& rf = pst.get_ret_fields(); + const size_t n = rf.size(); + size_t i = 0; + for (i = 0; i < n; ++i) { + const string_ref& nv = args.uvals[i]; + uint32_t fn = rf[i]; + Field *const fld = table->field[fn]; + if (fld->is_null() || nv.begin() == 0) { + continue; + } + const long long pval = fld->val_int(); + const long long llv = atoll_nocheck(nv.begin(), nv.end()); + /* TODO: llv == 0? */ + long long nval = 0; + if (mod_op == '+') { + /* increment */ + nval = pval + llv; + } else { + /* decrement */ + nval = pval - llv; + if ((pval < 0 && nval > 0) || (pval > 0 && nval < 0)) { + break; /* don't modify */ + } + } + fld->store(nval, false); + } + if (i == n) { + /* modify */ + table_vec[pst.get_table_id()].modified = true; + const int r = hnd->ha_update_row(table->record[1], buf); + if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) { + return r; + } + ++modified_count; + } + } + return 0; +} + +void +dbcontext::cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst, + const string_ref *fvals, size_t fvalslen) +{ + if (!for_write_flag) { + return cb.dbcb_resp_short(2, "readonly"); + } + lock_tables_if(); + if (lock == 0) { + return cb.dbcb_resp_short(1, "lock_tables"); + } + if (pst.get_table_id() >= table_vec.size()) { + return cb.dbcb_resp_short(2, "tblnum"); + } + TABLE *const table = table_vec[pst.get_table_id()].table; + handler *const hnd = table->file; + uchar *const buf = table->record[0]; + empty_record(table); + memset(buf, 0, table->s->null_bytes); /* clear null flags */ + const prep_stmt::fields_type& rf = pst.get_ret_fields(); + const size_t n = std::min(rf.size(), fvalslen); + for (size_t i = 0; i < n; ++i) { + uint32_t fn = rf[i]; + Field *const fld = table->field[fn]; + if (fvals[i].begin() == 0) { + fld->set_null(); + } else { + fld->store(fvals[i].begin(), fvals[i].size(), &my_charset_bin); + } + } + table->next_number_field = table->found_next_number_field; + /* FIXME: test */ + const int r = hnd->ha_write_row(buf); + const ulonglong insert_id = table->file->insert_id_for_cur_row; + table->next_number_field = 0; + table_vec[pst.get_table_id()].modified = true; + if (r == 0 && table->found_next_number_field != 0) { + return cb.dbcb_resp_short_num64(0, insert_id); + } + if (r != 0) { + return cb.dbcb_resp_short_num(1, r); + } + return cb.dbcb_resp_short(0, ""); +} + +void +dbcontext::cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst, + const string_ref *fvals, size_t fvalslen) +{ + if (fvalslen < 1) { + return cb.dbcb_resp_short(2, "syntax"); + } + return cb.dbcb_resp_short(2, "notimpl"); +} + +static size_t +prepare_keybuf(const cmd_exec_args& args, uchar *key_buf, TABLE *table, + KEY& kinfo, size_t invalues_index) +{ + size_t kplen_sum = 0; + DBG_KEY(fprintf(stderr, "SLOW\n")); + for (size_t i = 0; i < args.kvalslen; ++i) { + const KEY_PART_INFO & kpt = kinfo.key_part[i]; + string_ref kval = args.kvals[i]; + if (args.invalues_keypart >= 0 && + static_cast<size_t>(args.invalues_keypart) == i) { + kval = args.invalues[invalues_index]; + } + if (kval.begin() == 0) { + kpt.field->set_null(); + } else { + kpt.field->set_notnull(); + } + kpt.field->store(kval.begin(), kval.size(), &my_charset_bin); + kplen_sum += kpt.store_length; + DBG_KEYLEN(fprintf(stderr, "l=%u sl=%zu\n", kpt.length, + kpt.store_length)); + } + key_copy(key_buf, table->record[0], &kinfo, kplen_sum); + DBG_KEYLEN(fprintf(stderr, "sum=%zu flen=%u\n", kplen_sum, + kinfo.key_length)); + return kplen_sum; +} + +void +dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst, + ha_rkey_function find_flag, const cmd_exec_args& args) +{ + const bool debug_out = (verbose_level >= 100); + bool need_resp_record = true; + char mod_op = 0; + const string_ref& mod_op_str = args.mod_op; + if (mod_op_str.size() != 0) { + if (!for_write_flag) { + return cb.dbcb_resp_short(2, "readonly"); + } + mod_op = mod_op_str.begin()[0]; + need_resp_record = mod_op_str.size() > 1 && mod_op_str.begin()[1] == '?'; + switch (mod_op) { + case 'U': /* update */ + case 'D': /* delete */ + case '+': /* increment */ + case '-': /* decrement */ + break; + default: + if (debug_out) { + fprintf(stderr, "unknown modop: %c\n", mod_op); + } + return cb.dbcb_resp_short(2, "modop"); + } + } + lock_tables_if(); + if (lock == 0) { + return cb.dbcb_resp_short(1, "lock_tables"); + } + if (pst.get_table_id() >= table_vec.size()) { + return cb.dbcb_resp_short(2, "tblnum"); + } + TABLE *const table = table_vec[pst.get_table_id()].table; + /* keys */ + if (pst.get_idxnum() >= table->s->keys) { + return cb.dbcb_resp_short(2, "idxnum"); + } + KEY& kinfo = table->key_info[pst.get_idxnum()]; + if (args.kvalslen > kinfo.user_defined_key_parts) { + return cb.dbcb_resp_short(2, "kpnum"); + } + uchar *const key_buf = DENA_ALLOCA_ALLOCATE(uchar, kinfo.key_length); + size_t invalues_idx = 0; + size_t kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx); + /* filters */ + uchar *filter_buf = 0; + if (args.filters != 0) { + const size_t filter_buf_len = calc_filter_buf_size(table, pst, + args.filters); + filter_buf = DENA_ALLOCA_ALLOCATE(uchar, filter_buf_len); + if (!fill_filter_buf(table, pst, args.filters, filter_buf, + filter_buf_len)) { + return cb.dbcb_resp_short(2, "filterblob"); + } + } + /* handler */ + table->read_set = &table->s->all_set; + handler *const hnd = table->file; + if (!for_write_flag) { + hnd->init_table_handle_for_HANDLER(); + } + hnd->ha_index_or_rnd_end(); + hnd->ha_index_init(pst.get_idxnum(), 1); + if (need_resp_record) { + cb.dbcb_resp_begin(pst.get_ret_fields().size()); + } + const uint32_t limit = args.limit ? args.limit : 1; + uint32_t skip = args.skip; + size_t modified_count = 0; + int r = 0; + bool is_first = true; + for (uint32_t cnt = 0; cnt < limit + skip;) { + if (is_first) { + is_first = false; + const key_part_map kpm = (1U << args.kvalslen) - 1; + r = hnd->ha_index_read_map(table->record[0], key_buf, kpm, find_flag); + } else if (args.invalues_keypart >= 0) { + if (++invalues_idx >= args.invalueslen) { + break; + } + kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx); + const key_part_map kpm = (1U << args.kvalslen) - 1; + r = hnd->ha_index_read_map(table->record[0], key_buf, kpm, find_flag); + } else { + switch (find_flag) { + case HA_READ_BEFORE_KEY: + case HA_READ_KEY_OR_PREV: + r = hnd->ha_index_prev(table->record[0]); + break; + case HA_READ_AFTER_KEY: + case HA_READ_KEY_OR_NEXT: + r = hnd->ha_index_next(table->record[0]); + break; + case HA_READ_KEY_EXACT: + r = hnd->ha_index_next_same(table->record[0], key_buf, kplen_sum); + break; + default: + r = HA_ERR_END_OF_FILE; /* to finish the loop */ + break; + } + } + if (debug_out) { + fprintf(stderr, "r=%d\n", r); + if (r == 0 || r == HA_ERR_RECORD_DELETED) { + dump_record(cb, table, pst); + } + } + int filter_res = 0; + if (r != 0) { + /* no-count */ + } else if (args.filters != 0 && (filter_res = check_filter(cb, table, + pst, args.filters, filter_buf)) != 0) { + if (filter_res < 0) { + break; + } + } else if (skip > 0) { + --skip; + } else { + /* hit */ + if (need_resp_record) { + resp_record(cb, table, pst); + } + if (mod_op != 0) { + r = modify_record(cb, table, pst, args, mod_op, modified_count); + } + ++cnt; + } + if (args.invalues_keypart >= 0 && r == HA_ERR_KEY_NOT_FOUND) { + continue; + } + if (r != 0 && r != HA_ERR_RECORD_DELETED) { + break; + } + } + hnd->ha_index_or_rnd_end(); + if (r != 0 && r != HA_ERR_RECORD_DELETED && r != HA_ERR_KEY_NOT_FOUND && + r != HA_ERR_END_OF_FILE) { + /* failed */ + if (need_resp_record) { + /* revert dbcb_resp_begin() and dbcb_resp_entry() */ + cb.dbcb_resp_cancel(); + } + cb.dbcb_resp_short_num(1, r); + } else { + /* succeeded */ + if (need_resp_record) { + cb.dbcb_resp_end(); + } else { + cb.dbcb_resp_short_num(0, modified_count); + } + } + DENA_ALLOCA_FREE(filter_buf); + DENA_ALLOCA_FREE(key_buf); +} + +size_t +dbcontext::calc_filter_buf_size(TABLE *table, const prep_stmt& pst, + const record_filter *filters) +{ + size_t filter_buf_len = 0; + for (const record_filter *f = filters; f->op.begin() != 0; ++f) { + if (f->val.begin() == 0) { + continue; + } + const uint32_t fn = pst.get_filter_fields()[f->ff_offset]; + filter_buf_len += table->field[fn]->pack_length(); + } + ++filter_buf_len; + /* Field_medium::cmp() calls uint3korr(), which may read 4 bytes. + Allocate 1 more byte for safety. */ + return filter_buf_len; +} + +bool +dbcontext::fill_filter_buf(TABLE *table, const prep_stmt& pst, + const record_filter *filters, uchar *filter_buf, size_t len) +{ + memset(filter_buf, 0, len); + size_t pos = 0; + for (const record_filter *f = filters; f->op.begin() != 0; ++f) { + if (f->val.begin() == 0) { + continue; + } + const uint32_t fn = pst.get_filter_fields()[f->ff_offset]; + Field *const fld = table->field[fn]; + if ((fld->flags & BLOB_FLAG) != 0) { + return false; + } + fld->store(f->val.begin(), f->val.size(), &my_charset_bin); + const size_t packlen = fld->pack_length(); + memcpy(filter_buf + pos, fld->ptr, packlen); + pos += packlen; + } + return true; +} + +int +dbcontext::check_filter(dbcallback_i& cb, TABLE *table, const prep_stmt& pst, + const record_filter *filters, const uchar *filter_buf) +{ + DBG_FILTER(fprintf(stderr, "check_filter\n")); + size_t pos = 0; + for (const record_filter *f = filters; f->op.begin() != 0; ++f) { + const string_ref& op = f->op; + const string_ref& val = f->val; + const uint32_t fn = pst.get_filter_fields()[f->ff_offset]; + Field *const fld = table->field[fn]; + const size_t packlen = fld->pack_length(); + const uchar *const bval = filter_buf + pos; + int cv = 0; + if (fld->is_null()) { + cv = (val.begin() == 0) ? 0 : -1; + } else { + cv = (val.begin() == 0) ? 1 : fld->cmp(bval); + } + DBG_FILTER(fprintf(stderr, "check_filter cv=%d\n", cv)); + bool cond = true; + if (op.size() == 1) { + switch (op.begin()[0]) { + case '>': + DBG_FILTER(fprintf(stderr, "check_filter op: >\n")); + cond = (cv > 0); + break; + case '<': + DBG_FILTER(fprintf(stderr, "check_filter op: <\n")); + cond = (cv < 0); + break; + case '=': + DBG_FILTER(fprintf(stderr, "check_filter op: =\n")); + cond = (cv == 0); + break; + default: + DBG_FILTER(fprintf(stderr, "check_filter op: unknown\n")); + cond = false; /* FIXME: error */ + break; + } + } else if (op.size() == 2 && op.begin()[1] == '=') { + switch (op.begin()[0]) { + case '>': + DBG_FILTER(fprintf(stderr, "check_filter op: >=\n")); + cond = (cv >= 0); + break; + case '<': + DBG_FILTER(fprintf(stderr, "check_filter op: <=\n")); + cond = (cv <= 0); + break; + case '!': + DBG_FILTER(fprintf(stderr, "check_filter op: !=\n")); + cond = (cv != 0); + break; + default: + DBG_FILTER(fprintf(stderr, "check_filter op: unknown\n")); + cond = false; /* FIXME: error */ + break; + } + } + DBG_FILTER(fprintf(stderr, "check_filter cond: %d\n", (int)cond)); + if (!cond) { + return (f->filter_type == record_filter_type_skip) ? 1 : -1; + } + if (val.begin() != 0) { + pos += packlen; + } + } + return 0; +} + +void +dbcontext::cmd_open(dbcallback_i& cb, const cmd_open_args& arg) +{ + unlock_tables_if(); + const table_name_type k = std::make_pair(std::string(arg.dbn), + std::string(arg.tbl)); + const table_map_type::const_iterator iter = table_map.find(k); + uint32_t tblnum = 0; + if (iter != table_map.end()) { + tblnum = iter->second; + DBG_CMP(fprintf(stderr, "HNDSOCK k=%s tblnum=%d\n", k.c_str(), + (int)tblnum)); + } else { + TABLE_LIST tables; + TABLE *table = 0; + bool refresh = true; + const thr_lock_type lock_type = for_write_flag ? TL_WRITE : TL_READ; + #if MYSQL_VERSION_ID >= 50505 + LEX_CSTRING db_name= { arg.dbn, strlen(arg.dbn) }; + LEX_CSTRING tbl_name= { arg.tbl, strlen(arg.tbl) }; + tables.init_one_table(&db_name, &tbl_name, 0, lock_type); + MDL_REQUEST_INIT(&tables.mdl_request, MDL_key::TABLE, arg.dbn, arg.tbl, + for_write_flag ? MDL_SHARED_WRITE : MDL_SHARED_READ, MDL_TRANSACTION); + Open_table_context ot_act(thd, 0); + if (!open_table(thd, &tables, &ot_act)) { + table = tables.table; + } + #else + tables.init_one_table(arg.dbn, arg.tbl, lock_type); + table = open_table(thd, &tables, thd->mem_root, &refresh, + OPEN_VIEW_NO_PARSE); + #endif + if (table == 0) { + DENA_VERBOSE(20, fprintf(stderr, + "HNDSOCK failed to open %p [%s] [%s] [%d]\n", + thd, arg.dbn, arg.tbl, static_cast<int>(refresh))); + return cb.dbcb_resp_short(1, "open_table"); + } + statistic_increment(open_tables_count, &LOCK_status); + table->reginfo.lock_type = lock_type; + table->use_all_columns(); + tblnum = table_vec.size(); + tablevec_entry e; + e.table = table; + table_vec.push_back(e); + table_map[k] = tblnum; + } + size_t idxnum = static_cast<size_t>(-1); + if (arg.idx[0] >= '0' && arg.idx[0] <= '9') { + /* numeric */ + TABLE *const table = table_vec[tblnum].table; + idxnum = atoi(arg.idx); + if (idxnum >= table->s->keys) { + return cb.dbcb_resp_short(2, "idxnum"); + } + } else { + const char *const idx_name_to_open = + arg.idx[0] == '\0' ? "PRIMARY" : arg.idx; + TABLE *const table = table_vec[tblnum].table; + for (uint i = 0; i < table->s->keys; ++i) { + KEY& kinfo = table->key_info[i]; + if (strcmp(kinfo.name.str, idx_name_to_open) == 0) { + idxnum = i; + break; + } + } + } + if (idxnum == size_t(-1)) { + return cb.dbcb_resp_short(2, "idxnum"); + } + prep_stmt::fields_type rf; + prep_stmt::fields_type ff; + if (!parse_fields(table_vec[tblnum].table, arg.retflds, rf)) { + return cb.dbcb_resp_short(2, "fld"); + } + if (!parse_fields(table_vec[tblnum].table, arg.filflds, ff)) { + return cb.dbcb_resp_short(2, "fld"); + } + prep_stmt p(this, tblnum, idxnum, rf, ff); + cb.dbcb_set_prep_stmt(arg.pst_id, p); + return cb.dbcb_resp_short(0, ""); +} + +bool +dbcontext::parse_fields(TABLE *const table, const char *str, + prep_stmt::fields_type& flds) +{ + string_ref flds_sr(str, strlen(str)); + std::vector<string_ref> fldnms; + if (flds_sr.size() != 0) { + split(',', flds_sr, fldnms); + } + for (size_t i = 0; i < fldnms.size(); ++i) { + Field **fld = 0; + size_t j = 0; + for (fld = table->field; *fld; ++fld, ++j) { + DBG_FLD(fprintf(stderr, "f %s\n", (*fld)->field_name.str)); + string_ref fn((*fld)->field_name.str, (*fld)->field_name.length); + if (fn == fldnms[i]) { + break; + } + } + if (*fld == 0) { + DBG_FLD(fprintf(stderr, "UNKNOWN FLD %s [%s]\n", retflds, + std::string(fldnms[i].begin(), fldnms[i].size()).c_str())); + return false; + } + DBG_FLD(fprintf(stderr, "FLD %s %zu\n", (*fld)->field_name.str, j)); + flds.push_back(j); + } + return true; +} + +enum db_write_op { + db_write_op_none = 0, + db_write_op_insert = 1, + db_write_op_sql = 2, +}; + +void +dbcontext::cmd_exec(dbcallback_i& cb, const cmd_exec_args& args) +{ + const prep_stmt& p = *args.pst; + if (p.get_table_id() == static_cast<size_t>(-1)) { + return cb.dbcb_resp_short(2, "stmtnum"); + } + ha_rkey_function find_flag = HA_READ_KEY_EXACT; + db_write_op wrop = db_write_op_none; + if (args.op.size() == 1) { + switch (args.op.begin()[0]) { + case '=': + find_flag = HA_READ_KEY_EXACT; + break; + case '>': + find_flag = HA_READ_AFTER_KEY; + break; + case '<': + find_flag = HA_READ_BEFORE_KEY; + break; + case '+': + wrop = db_write_op_insert; + break; + case 'S': + wrop = db_write_op_sql; + break; + default: + return cb.dbcb_resp_short(2, "op"); + } + } else if (args.op.size() == 2 && args.op.begin()[1] == '=') { + switch (args.op.begin()[0]) { + case '>': + find_flag = HA_READ_KEY_OR_NEXT; + break; + case '<': + find_flag = HA_READ_KEY_OR_PREV; + break; + default: + return cb.dbcb_resp_short(2, "op"); + } + } else { + return cb.dbcb_resp_short(2, "op"); + } + if (args.kvalslen <= 0) { + return cb.dbcb_resp_short(2, "klen"); + } + switch (wrop) { + case db_write_op_none: + return cmd_find_internal(cb, p, find_flag, args); + case db_write_op_insert: + return cmd_insert_internal(cb, p, args.kvals, args.kvalslen); + case db_write_op_sql: + return cmd_sql_internal(cb, p, args.kvals, args.kvalslen); + } +} + +void +dbcontext::set_statistics(size_t num_conns, size_t num_active) +{ + if (for_write_flag) { + set_thread_message("handlersocket: mode=wr, %zu conns, %zu active", + num_conns, num_active); + } else { + set_thread_message("handlersocket: mode=rd, %zu conns, %zu active", + num_conns, num_active); + } + /* + Don't set message buf if it's already in use. This saves slow call to + thd_proc_info() (if profiling is enabled) + */ + if (thd->proc_info != &info_message_buf[0]) + thd_proc_info(thd, &info_message_buf[0]); +} + +}; + diff --git a/plugin/handler_socket/handlersocket/database.hpp b/plugin/handler_socket/handlersocket/database.hpp new file mode 100644 index 00000000..a4aee087 --- /dev/null +++ b/plugin/handler_socket/handlersocket/database.hpp @@ -0,0 +1,142 @@ + +// vim:sw=2:ai + +/* + * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. + * See COPYRIGHT.txt for details. + */ + +#ifndef DENA_DATABASE_HPP +#define DENA_DATABASE_HPP + +#include <string> +#include <memory> +#include <vector> +#include <stdint.h> + +#include "string_buffer.hpp" +#include "string_ref.hpp" +#include "config.hpp" + +namespace dena { + +struct database_i; +typedef std::auto_ptr<volatile database_i> database_ptr; + +struct dbcontext_i; +typedef std::auto_ptr<dbcontext_i> dbcontext_ptr; + +struct database_i { + virtual ~database_i() { } + virtual dbcontext_ptr create_context(bool for_write) volatile = 0; + virtual void stop() volatile = 0; + virtual const config& get_conf() const volatile = 0; + static database_ptr create(const config& conf); +}; + +struct prep_stmt { + typedef std::vector<uint32_t> fields_type; + private: + dbcontext_i *dbctx; /* must be valid while *this is alive */ + size_t table_id; /* a prep_stmt object holds a refcount of the table */ + size_t idxnum; + fields_type ret_fields; + fields_type filter_fields; + public: + prep_stmt(); + prep_stmt(dbcontext_i *c, size_t tbl, size_t idx, const fields_type& rf, + const fields_type& ff); + ~prep_stmt(); + prep_stmt(const prep_stmt& x); + prep_stmt& operator =(const prep_stmt& x); + public: + size_t get_table_id() const { return table_id; } + size_t get_idxnum() const { return idxnum; } + const fields_type& get_ret_fields() const { return ret_fields; } + const fields_type& get_filter_fields() const { return filter_fields; } +}; + +struct dbcallback_i { + virtual ~dbcallback_i () { } + virtual void dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v) = 0; + virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const = 0; + virtual void dbcb_resp_short(uint32_t code, const char *msg) = 0; + virtual void dbcb_resp_short_num(uint32_t code, uint32_t value) = 0; + virtual void dbcb_resp_short_num64(uint32_t code, uint64_t value) = 0; + virtual void dbcb_resp_begin(size_t num_flds) = 0; + virtual void dbcb_resp_entry(const char *fld, size_t fldlen) = 0; + virtual void dbcb_resp_end() = 0; + virtual void dbcb_resp_cancel() = 0; +}; + +enum record_filter_type { + record_filter_type_skip = 0, + record_filter_type_break = 1, +}; + +struct record_filter { + record_filter_type filter_type; + string_ref op; + uint32_t ff_offset; /* offset in filter_fields */ + string_ref val; + record_filter() : filter_type(record_filter_type_skip), ff_offset(0) { } +}; + +struct cmd_open_args { + size_t pst_id; + const char *dbn; + const char *tbl; + const char *idx; + const char *retflds; + const char *filflds; + cmd_open_args() : pst_id(0), dbn(0), tbl(0), idx(0), retflds(0), + filflds(0) { } +}; + +struct cmd_exec_args { + const prep_stmt *pst; + string_ref op; + const string_ref *kvals; + size_t kvalslen; + uint32_t limit; + uint32_t skip; + string_ref mod_op; + const string_ref *uvals; /* size must be pst->retfieelds.size() */ + const record_filter *filters; + int invalues_keypart; + const string_ref *invalues; + size_t invalueslen; + cmd_exec_args() : pst(0), kvals(0), kvalslen(0), limit(0), skip(0), + uvals(0), filters(0), invalues_keypart(-1), invalues(0), invalueslen(0) { } +}; + +struct dbcontext_i { + virtual ~dbcontext_i() { } + virtual void init_thread(const void *stack_bottom, + volatile int& shutdown_flag) = 0; + virtual void term_thread() = 0; + virtual bool check_alive() = 0; + virtual void lock_tables_if() = 0; + virtual void unlock_tables_if() = 0; + virtual bool get_commit_error() = 0; + virtual void clear_error() = 0; + virtual void close_tables_if() = 0; + virtual void table_addref(size_t tbl_id) = 0; /* TODO: hide */ + virtual void table_release(size_t tbl_id) = 0; /* TODO: hide */ + virtual void cmd_open(dbcallback_i& cb, const cmd_open_args& args) = 0; + virtual void cmd_exec(dbcallback_i& cb, const cmd_exec_args& args) = 0; + virtual void set_statistics(size_t num_conns, size_t num_active) = 0; +}; + +}; + +extern unsigned long long int open_tables_count; +extern unsigned long long int close_tables_count; +extern unsigned long long int lock_tables_count; +extern unsigned long long int unlock_tables_count; +#if 0 +extern unsigned long long int index_exec_count; +#endif + +#endif + diff --git a/plugin/handler_socket/handlersocket/handlersocket.cpp b/plugin/handler_socket/handlersocket/handlersocket.cpp new file mode 100644 index 00000000..81334970 --- /dev/null +++ b/plugin/handler_socket/handlersocket/handlersocket.cpp @@ -0,0 +1,217 @@ + +// vim:sw=2:ai + +/* + * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. + * See COPYRIGHT.txt for details. + */ + +#include <my_global.h> +#include <memory> +#include <string> +#include <stdio.h> + +#include "config.hpp" +#include "hstcpsvr.hpp" +#include "string_util.hpp" +#include "mysql_incl.hpp" + +#define DBG_LOG \ + if (dena::verbose_level >= 100) { \ + fprintf(stderr, "%s %p\n", __PRETTY_FUNCTION__, this); \ + } +#define DBG_DO(x) if (dena::verbose_level >= 100) { x; } + +#define DBG_DIR(x) + +using namespace dena; + +static char *handlersocket_address = 0; +static char *handlersocket_port = 0; +static char *handlersocket_port_wr = 0; +static unsigned int handlersocket_epoll = 1; +static unsigned int handlersocket_threads = 32; +static unsigned int handlersocket_threads_wr = 1; +static unsigned int handlersocket_timeout = 30; +static unsigned int handlersocket_backlog = 32768; +static unsigned int handlersocket_sndbuf = 0; +static unsigned int handlersocket_rcvbuf = 0; +static unsigned int handlersocket_readsize = 0; +static unsigned int handlersocket_accept_balance = 0; +static unsigned int handlersocket_wrlock_timeout = 0; +static char *handlersocket_plain_secret = 0; +static char *handlersocket_plain_secret_wr = 0; + +struct daemon_handlersocket_data { + hstcpsvr_ptr hssvr_rd; + hstcpsvr_ptr hssvr_wr; +}; + +static int +daemon_handlersocket_init(void *p) +{ + DENA_VERBOSE(10, fprintf(stderr, "handlersocket: initialized\n")); + config conf; + conf["use_epoll"] = handlersocket_epoll ? "1" : "0"; + if (handlersocket_address) { + conf["host"] = handlersocket_address; + } + if (handlersocket_port) { + conf["port"] = handlersocket_port; + } + /* + * unix domain socket + * conf["host"] = "/"; + * conf["port"] = "/tmp/handlersocket"; + */ + if (handlersocket_threads > 0) { + conf["num_threads"] = to_stdstring(handlersocket_threads); + } else { + conf["num_threads"] = "1"; + } + conf["timeout"] = to_stdstring(handlersocket_timeout); + conf["listen_backlog"] = to_stdstring(handlersocket_backlog); + conf["sndbuf"] = to_stdstring(handlersocket_sndbuf); + conf["rcvbuf"] = to_stdstring(handlersocket_rcvbuf); + conf["readsize"] = to_stdstring(handlersocket_readsize); + conf["accept_balance"] = to_stdstring(handlersocket_accept_balance); + conf["wrlock_timeout"] = to_stdstring(handlersocket_wrlock_timeout); + std::auto_ptr<daemon_handlersocket_data> ap(new daemon_handlersocket_data); + if (handlersocket_port != 0 && handlersocket_port_wr != handlersocket_port) { + conf["port"] = handlersocket_port; + if (handlersocket_plain_secret) { + conf["plain_secret"] = handlersocket_plain_secret; + } + ap->hssvr_rd = hstcpsvr_i::create(conf); + ap->hssvr_rd->start_listen(); + } + if (handlersocket_port_wr != 0) { + if (handlersocket_threads_wr > 0) { + conf["num_threads"] = to_stdstring(handlersocket_threads_wr); + } + conf["port"] = handlersocket_port_wr; + conf["for_write"] = "1"; + conf["plain_secret"] = ""; + if (handlersocket_plain_secret_wr) { + conf["plain_secret"] = handlersocket_plain_secret_wr; + } + ap->hssvr_wr = hstcpsvr_i::create(conf); + ap->hssvr_wr->start_listen(); + } + st_plugin_int *const plugin = static_cast<st_plugin_int *>(p); + plugin->data = ap.release(); + return 0; +} + +static int +daemon_handlersocket_deinit(void *p) +{ + DENA_VERBOSE(10, fprintf(stderr, "handlersocket: terminated\n")); + st_plugin_int *const plugin = static_cast<st_plugin_int *>(p); + daemon_handlersocket_data *ptr = + static_cast<daemon_handlersocket_data *>(plugin->data); + delete ptr; + return 0; +} + +static struct st_mysql_daemon daemon_handlersocket_plugin = { + MYSQL_DAEMON_INTERFACE_VERSION +}; + +static MYSQL_SYSVAR_UINT(verbose, dena::verbose_level, 0, + "0..10000", 0, 0, 10 /* default */, 0, 10000, 0); +static MYSQL_SYSVAR_UINT(epoll, handlersocket_epoll, PLUGIN_VAR_READONLY, + "0..1", 0, 0, 1 /* default */, 0, 1, 0); +static MYSQL_SYSVAR_STR(address, handlersocket_address, + PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC, "", NULL, NULL, NULL); +static MYSQL_SYSVAR_STR(port, handlersocket_port, + PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC, "", NULL, NULL, NULL); +static MYSQL_SYSVAR_STR(port_wr, handlersocket_port_wr, + PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC, "", NULL, NULL, NULL); +static MYSQL_SYSVAR_UINT(threads, handlersocket_threads, PLUGIN_VAR_READONLY, + "1..3000", 0, 0, 16 /* default */, 1, 3000, 0); +static MYSQL_SYSVAR_UINT(threads_wr, handlersocket_threads_wr, + PLUGIN_VAR_READONLY, "1..3000", 0, 0, 1 /* default */, 1, 3000, 0); +static MYSQL_SYSVAR_UINT(timeout, handlersocket_timeout, PLUGIN_VAR_READONLY, + "30..3600", 0, 0, 300 /* default */, 30, 3600, 0); +static MYSQL_SYSVAR_UINT(backlog, handlersocket_backlog, PLUGIN_VAR_READONLY, + "5..1000000", 0, 0, 32768 /* default */, 5, 1000000, 0); +static MYSQL_SYSVAR_UINT(sndbuf, handlersocket_sndbuf, PLUGIN_VAR_READONLY, + "0..16777216", 0, 0, 0 /* default */, 0, 16777216, 0); +static MYSQL_SYSVAR_UINT(rcvbuf, handlersocket_rcvbuf, PLUGIN_VAR_READONLY, + "0..16777216", 0, 0, 0 /* default */, 0, 16777216, 0); +static MYSQL_SYSVAR_UINT(readsize, handlersocket_readsize, PLUGIN_VAR_READONLY, + "0..16777216", 0, 0, 0 /* default */, 0, 16777216, 0); +static MYSQL_SYSVAR_UINT(accept_balance, handlersocket_accept_balance, + PLUGIN_VAR_READONLY, "0..10000", 0, 0, 0 /* default */, 0, 10000, 0); +static MYSQL_SYSVAR_UINT(wrlock_timeout, handlersocket_wrlock_timeout, + PLUGIN_VAR_READONLY, "0..3600", 0, 0, 12 /* default */, 0, 3600, 0); +static MYSQL_SYSVAR_STR(plain_secret, handlersocket_plain_secret, + PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC, "", NULL, NULL, NULL); +static MYSQL_SYSVAR_STR(plain_secret_wr, handlersocket_plain_secret_wr, + PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC, "", NULL, NULL, NULL); + + +/* warning: type-punning to incomplete type might break strict-aliasing + * rules */ +static struct st_mysql_sys_var *daemon_handlersocket_system_variables[] = { + MYSQL_SYSVAR(verbose), + MYSQL_SYSVAR(address), + MYSQL_SYSVAR(port), + MYSQL_SYSVAR(port_wr), + MYSQL_SYSVAR(epoll), + MYSQL_SYSVAR(threads), + MYSQL_SYSVAR(threads_wr), + MYSQL_SYSVAR(timeout), + MYSQL_SYSVAR(backlog), + MYSQL_SYSVAR(sndbuf), + MYSQL_SYSVAR(rcvbuf), + MYSQL_SYSVAR(readsize), + MYSQL_SYSVAR(accept_balance), + MYSQL_SYSVAR(wrlock_timeout), + MYSQL_SYSVAR(plain_secret), + MYSQL_SYSVAR(plain_secret_wr), + 0 +}; + +static SHOW_VAR hs_status_variables[] = { + {"table_open", (char*) &open_tables_count, SHOW_LONGLONG}, + {"table_close", (char*) &close_tables_count, SHOW_LONGLONG}, + {"table_lock", (char*) &lock_tables_count, SHOW_LONGLONG}, + {"table_unlock", (char*) &unlock_tables_count, SHOW_LONGLONG}, + #if 0 + {"index_exec", (char*) &index_exec_count, SHOW_LONGLONG}, + #endif + {NullS, NullS, SHOW_LONG} +}; + +static int show_hs_vars(THD *thd, SHOW_VAR *var, char *buff) +{ + var->type= SHOW_ARRAY; + var->value= (char *) &hs_status_variables; + return 0; +} + +static SHOW_VAR daemon_handlersocket_status_variables[] = { + {"Hs", (char*) show_hs_vars, SHOW_FUNC}, + {NullS, NullS, SHOW_LONG} +}; + + +maria_declare_plugin(handlersocket) +{ + MYSQL_DAEMON_PLUGIN, + &daemon_handlersocket_plugin, + "handlersocket", + "higuchi dot akira at dena dot jp", + "Direct access into InnoDB", + PLUGIN_LICENSE_BSD, + daemon_handlersocket_init, + daemon_handlersocket_deinit, + 0x0100 /* 1.0 */, + daemon_handlersocket_status_variables, + daemon_handlersocket_system_variables, + "1.0", + MariaDB_PLUGIN_MATURITY_BETA +} +maria_declare_plugin_end; diff --git a/plugin/handler_socket/handlersocket/handlersocket.spec.template b/plugin/handler_socket/handlersocket/handlersocket.spec.template new file mode 100644 index 00000000..0ce8c0cb --- /dev/null +++ b/plugin/handler_socket/handlersocket/handlersocket.spec.template @@ -0,0 +1,29 @@ +Summary: handlersocket plugin for mysql +Name: handlersocket +Version: HANDLERSOCKET_VERSION +Release: 1%{?dist} +Group: System Environment/Libraries +License: BSD +Source: handlersocket.tar.gz +Packager: Akira Higuchi <higuchi dot akira at dena dot jp> +BuildRoot: /var/tmp/%{name}-%{version}-root + +%description + +%prep +%setup -n %{name} + +%define _use_internal_dependency_generator 0 + +%build +make -f Makefile.plain + +%install +rm -rf $RPM_BUILD_ROOT +mkdir -p $RPM_BUILD_ROOT/%{_libdir}/mysql/plugin +install -m 755 handlersocket.so $RPM_BUILD_ROOT/%{_libdir}/mysql/plugin/ + +%files +%defattr(-, root, root) +%{_libdir}/mysql/plugin/*.so + diff --git a/plugin/handler_socket/handlersocket/hstcpsvr.cpp b/plugin/handler_socket/handlersocket/hstcpsvr.cpp new file mode 100644 index 00000000..250ef2c7 --- /dev/null +++ b/plugin/handler_socket/handlersocket/hstcpsvr.cpp @@ -0,0 +1,147 @@ + +// vim:sw=2:ai + +/* + * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. + * See COPYRIGHT.txt for details. + */ + +#include <my_global.h> +#include <vector> +#include <sys/socket.h> +#include <netinet/in.h> +#include <unistd.h> +#include <sys/resource.h> + +#include "hstcpsvr.hpp" +#include "hstcpsvr_worker.hpp" +#include "thread.hpp" +#include "fatal.hpp" +#include "auto_ptrcontainer.hpp" + +#define DBG(x) + +namespace dena { + +struct worker_throbj { + worker_throbj(const hstcpsvr_worker_arg& arg) + : worker(hstcpsvr_worker_i::create(arg)) { } + void operator ()() { + worker->run(); + } + hstcpsvr_worker_ptr worker; +}; + +struct hstcpsvr : public hstcpsvr_i, private noncopyable { + hstcpsvr(const config& c); + ~hstcpsvr(); + virtual std::string start_listen(); + private: + hstcpsvr_shared_c cshared; + volatile hstcpsvr_shared_v vshared; + typedef thread<worker_throbj> worker_thread_type; + typedef auto_ptrcontainer< std::vector<worker_thread_type *> > threads_type; + threads_type threads; + std::vector<unsigned int> thread_num_conns_vec; + private: + void stop_workers(); +}; + +namespace { + +void +check_nfile(size_t nfile) +{ + struct rlimit rl; + const int r = getrlimit(RLIMIT_NOFILE, &rl); + if (r != 0) { + fatal_abort("check_nfile: getrlimit failed"); + } + if (rl.rlim_cur < static_cast<rlim_t>(nfile + 1000)) { + fprintf(stderr, + "[Warning] handlersocket: open_files_limit is too small.\n"); + } +} + +}; + +hstcpsvr::hstcpsvr(const config& c) + : cshared(), vshared() +{ + vshared.shutdown = 0; + cshared.conf = c; /* copy */ + if (cshared.conf["port"] == "") { + cshared.conf["port"] = "9999"; + } + cshared.num_threads = cshared.conf.get_int("num_threads", 32); + cshared.sockargs.nonblocking = cshared.conf.get_int("nonblocking", 1); + cshared.sockargs.use_epoll = cshared.conf.get_int("use_epoll", 1); + if (cshared.sockargs.use_epoll) { + cshared.sockargs.nonblocking = 1; + } + cshared.readsize = cshared.conf.get_int("readsize", 1); + cshared.nb_conn_per_thread = cshared.conf.get_int("conn_per_thread", 1024); + cshared.for_write_flag = cshared.conf.get_int("for_write", 0); + cshared.plain_secret = cshared.conf.get_str("plain_secret", ""); + cshared.require_auth = !cshared.plain_secret.empty(); + cshared.sockargs.set(cshared.conf); + cshared.dbptr = database_i::create(c); + check_nfile(cshared.num_threads * cshared.nb_conn_per_thread); + thread_num_conns_vec.resize(cshared.num_threads); + cshared.thread_num_conns = thread_num_conns_vec.empty() + ? 0 : &thread_num_conns_vec[0]; +} + +hstcpsvr::~hstcpsvr() +{ + stop_workers(); +} + +std::string +hstcpsvr::start_listen() +{ + std::string err; + if (threads.size() != 0) { + return "start_listen: already running"; + } + if (socket_bind(cshared.listen_fd, cshared.sockargs, err) != 0) { + return "bind: " + err; + } + DENA_VERBOSE(20, fprintf(stderr, "bind done\n")); + const size_t stack_size = std::max( + cshared.conf.get_int("stack_size", 1 * 1024LL * 1024), 8 * 1024LL * 1024); + for (long i = 0; i < cshared.num_threads; ++i) { + hstcpsvr_worker_arg arg; + arg.cshared = &cshared; + arg.vshared = &vshared; + arg.worker_id = i; + std::auto_ptr< thread<worker_throbj> > thr( + new thread<worker_throbj>(arg, stack_size)); + threads.push_back_ptr(thr); + } + DENA_VERBOSE(20, fprintf(stderr, "threads created\n")); + for (size_t i = 0; i < threads.size(); ++i) { + threads[i]->start(); + } + DENA_VERBOSE(20, fprintf(stderr, "threads started\n")); + return std::string(); +} + +void +hstcpsvr::stop_workers() +{ + vshared.shutdown = 1; + for (size_t i = 0; i < threads.size(); ++i) { + threads[i]->join(); + } + threads.clear(); +} + +hstcpsvr_ptr +hstcpsvr_i::create(const config& conf) +{ + return hstcpsvr_ptr(new hstcpsvr(conf)); +} + +}; + diff --git a/plugin/handler_socket/handlersocket/hstcpsvr.hpp b/plugin/handler_socket/handlersocket/hstcpsvr.hpp new file mode 100644 index 00000000..811bfa25 --- /dev/null +++ b/plugin/handler_socket/handlersocket/hstcpsvr.hpp @@ -0,0 +1,58 @@ + +// vim:sw=2:ai + +/* + * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. + * See COPYRIGHT.txt for details. + */ + +#ifndef DENA_HSTCPSVR_HPP +#define DENA_HSTCPSVR_HPP + +#include <memory> +#include <string> +#include <map> + +#include "mutex.hpp" +#include "auto_file.hpp" +#include "database.hpp" +#include "config.hpp" +#include "socket.hpp" + +namespace dena { + +struct hstcpsvr_shared_c { + config conf; + long num_threads; + long nb_conn_per_thread; + bool for_write_flag; + bool require_auth; + std::string plain_secret; + int readsize; + socket_args sockargs; + auto_file listen_fd; + database_ptr dbptr; + volatile unsigned int *thread_num_conns; /* 0 .. num_threads-1 */ + hstcpsvr_shared_c() : num_threads(0), nb_conn_per_thread(100), + for_write_flag(false), require_auth(false), readsize(0), + thread_num_conns(0) { } +}; + +struct hstcpsvr_shared_v : public mutex { + int shutdown; + hstcpsvr_shared_v() : shutdown(0) { } +}; + +struct hstcpsvr_i; +typedef std::auto_ptr<hstcpsvr_i> hstcpsvr_ptr; + +struct hstcpsvr_i { + virtual ~hstcpsvr_i() { } + virtual std::string start_listen() = 0; + static hstcpsvr_ptr create(const config& conf); +}; + +}; + +#endif + diff --git a/plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp b/plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp new file mode 100644 index 00000000..9863602a --- /dev/null +++ b/plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp @@ -0,0 +1,957 @@ + +// vim:sw=2:ai + +/* + * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. + * See COPYRIGHT.txt for details. + */ + +#include <my_global.h> +#include <netinet/in.h> +#include <errno.h> +#include <poll.h> +#include <unistd.h> +#include <stdexcept> +#include <signal.h> +#include <list> +#if __linux__ +#include <sys/epoll.h> +#endif +#ifdef HAVE_ALLOCA_H +#include <alloca.h> +#endif + +#include "hstcpsvr_worker.hpp" +#include "string_buffer.hpp" +#include "auto_ptrcontainer.hpp" +#include "string_util.hpp" +#include "escape.hpp" + +#define DBG_FD(x) +#define DBG_TR(x) +#define DBG_EP(x) +#define DBG_MULTI(x) + +/* TODO */ +#if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL) +#define MSG_NOSIGNAL 0 +#endif + +namespace dena { + +struct dbconnstate { + string_buffer readbuf; + string_buffer writebuf; + std::vector<prep_stmt> prep_stmts; + size_t resp_begin_pos; + size_t find_nl_pos; + void reset() { + readbuf.clear(); + writebuf.clear(); + prep_stmts.clear(); + resp_begin_pos = 0; + find_nl_pos = 0; + } + dbconnstate() : resp_begin_pos(0), find_nl_pos(0) { } +}; + +struct hstcpsvr_conn; +typedef auto_ptrcontainer< std::list<hstcpsvr_conn *> > hstcpsvr_conns_type; + +struct hstcpsvr_conn : public dbcallback_i { + public: + auto_file fd; + sockaddr_storage addr; + size_socket addr_len; + dbconnstate cstate; + std::string err; + size_t readsize; + bool nonblocking; + bool read_finished; + bool write_finished; + time_t nb_last_io; + hstcpsvr_conns_type::iterator conns_iter; + bool authorized; + public: + bool closed() const; + bool ok_to_close() const; + void reset(); + int accept(const hstcpsvr_shared_c& cshared); + bool write_more(bool *more_r = 0); + bool read_more(bool *more_r = 0); + public: + virtual void dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v); + virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const; + virtual void dbcb_resp_short(uint32_t code, const char *msg); + virtual void dbcb_resp_short_num(uint32_t code, uint32_t value); + virtual void dbcb_resp_short_num64(uint32_t code, uint64_t value); + virtual void dbcb_resp_begin(size_t num_flds); + virtual void dbcb_resp_entry(const char *fld, size_t fldlen); + virtual void dbcb_resp_end(); + virtual void dbcb_resp_cancel(); + public: + hstcpsvr_conn() : addr_len(sizeof(addr)), readsize(4096), + nonblocking(false), read_finished(false), write_finished(false), + nb_last_io(0), authorized(false) { } +}; + +bool +hstcpsvr_conn::closed() const +{ + return fd.get() < 0; +} + +bool +hstcpsvr_conn::ok_to_close() const +{ + return write_finished || (read_finished && cstate.writebuf.size() == 0); +} + +void +hstcpsvr_conn::reset() +{ + addr = sockaddr_storage(); + addr_len = sizeof(addr); + cstate.reset(); + fd.reset(); + read_finished = false; + write_finished = false; +} + +int +hstcpsvr_conn::accept(const hstcpsvr_shared_c& cshared) +{ + reset(); + return socket_accept(cshared.listen_fd.get(), fd, cshared.sockargs, addr, + addr_len, err); +} + +bool +hstcpsvr_conn::write_more(bool *more_r) +{ + if (write_finished || cstate.writebuf.size() == 0) { + return false; + } + const size_t wlen = cstate.writebuf.size(); + ssize_t len = send(fd.get(), cstate.writebuf.begin(), wlen, MSG_NOSIGNAL); + if (len <= 0) { + if (len == 0 || !nonblocking || errno != EWOULDBLOCK) { + cstate.writebuf.clear(); + write_finished = true; + } + return false; + } + cstate.writebuf.erase_front(len); + /* FIXME: reallocate memory if too large */ + if (more_r) { + *more_r = (static_cast<size_t>(len) == wlen); + } + return true; +} + +bool +hstcpsvr_conn::read_more(bool *more_r) +{ + if (read_finished) { + return false; + } + const size_t block_size = readsize > 4096 ? readsize : 4096; + char *wp = cstate.readbuf.make_space(block_size); + const ssize_t len = read(fd.get(), wp, block_size); + if (len <= 0) { + if (len == 0 || !nonblocking || errno != EWOULDBLOCK) { + read_finished = true; + } + return false; + } + cstate.readbuf.space_wrote(len); + if (more_r) { + *more_r = (static_cast<size_t>(len) == block_size); + } + return true; +} + +void +hstcpsvr_conn::dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v) +{ + if (cstate.prep_stmts.size() <= pst_id) { + cstate.prep_stmts.resize(pst_id + 1); + } + cstate.prep_stmts[pst_id] = v; +} + +const prep_stmt * +hstcpsvr_conn::dbcb_get_prep_stmt(size_t pst_id) const +{ + if (cstate.prep_stmts.size() <= pst_id) { + return 0; + } + return &cstate.prep_stmts[pst_id]; +} + +void +hstcpsvr_conn::dbcb_resp_short(uint32_t code, const char *msg) +{ + write_ui32(cstate.writebuf, code); + const size_t msglen = strlen(msg); + if (msglen != 0) { + cstate.writebuf.append_literal("\t1\t"); + cstate.writebuf.append(msg, msg + msglen); + } else { + cstate.writebuf.append_literal("\t1"); + } + cstate.writebuf.append_literal("\n"); +} + +void +hstcpsvr_conn::dbcb_resp_short_num(uint32_t code, uint32_t value) +{ + write_ui32(cstate.writebuf, code); + cstate.writebuf.append_literal("\t1\t"); + write_ui32(cstate.writebuf, value); + cstate.writebuf.append_literal("\n"); +} + +void +hstcpsvr_conn::dbcb_resp_short_num64(uint32_t code, uint64_t value) +{ + write_ui32(cstate.writebuf, code); + cstate.writebuf.append_literal("\t1\t"); + write_ui64(cstate.writebuf, value); + cstate.writebuf.append_literal("\n"); +} + +void +hstcpsvr_conn::dbcb_resp_begin(size_t num_flds) +{ + cstate.resp_begin_pos = cstate.writebuf.size(); + cstate.writebuf.append_literal("0\t"); + write_ui32(cstate.writebuf, num_flds); +} + +void +hstcpsvr_conn::dbcb_resp_entry(const char *fld, size_t fldlen) +{ + if (fld != 0) { + cstate.writebuf.append_literal("\t"); + escape_string(cstate.writebuf, fld, fld + fldlen); + } else { + static const char t[] = "\t\0"; + cstate.writebuf.append(t, t + 2); + } +} + +void +hstcpsvr_conn::dbcb_resp_end() +{ + cstate.writebuf.append_literal("\n"); + cstate.resp_begin_pos = 0; +} + +void +hstcpsvr_conn::dbcb_resp_cancel() +{ + cstate.writebuf.resize(cstate.resp_begin_pos); + cstate.resp_begin_pos = 0; +} + +struct hstcpsvr_worker : public hstcpsvr_worker_i, private noncopyable { + hstcpsvr_worker(const hstcpsvr_worker_arg& arg); + virtual void run(); + private: + const hstcpsvr_shared_c& cshared; + volatile hstcpsvr_shared_v& vshared; + long worker_id; + dbcontext_ptr dbctx; + hstcpsvr_conns_type conns; /* conns refs dbctx */ + time_t last_check_time; + std::vector<pollfd> pfds; + #ifdef __linux__ + std::vector<epoll_event> events_vec; + auto_file epoll_fd; + #endif + bool accept_enabled; + int accept_balance; + std::vector<string_ref> invalues_work; + std::vector<record_filter> filters_work; + private: + int run_one_nb(); + int run_one_ep(); + void execute_lines(hstcpsvr_conn& conn); + void execute_line(char *start, char *finish, hstcpsvr_conn& conn); + void do_open_index(char *start, char *finish, hstcpsvr_conn& conn); + void do_exec_on_index(char *cmd_begin, char *cmd_end, char *start, + char *finish, hstcpsvr_conn& conn); + void do_authorization(char *start, char *finish, hstcpsvr_conn& conn); +}; + +hstcpsvr_worker::hstcpsvr_worker(const hstcpsvr_worker_arg& arg) + : cshared(*arg.cshared), vshared(*arg.vshared), worker_id(arg.worker_id), + dbctx(cshared.dbptr->create_context(cshared.for_write_flag)), + last_check_time(time(0)), accept_enabled(true), accept_balance(0) +{ + #ifdef __linux__ + if (cshared.sockargs.use_epoll) { + epoll_fd.reset(epoll_create(10)); + if (epoll_fd.get() < 0) { + fatal_abort("epoll_create"); + } + epoll_event ev; + memset(&ev, 0, sizeof(ev)); + ev.events = EPOLLIN; + ev.data.ptr = 0; + if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev) + != 0) { + fatal_abort("epoll_ctl EPOLL_CTL_ADD"); + } + events_vec.resize(10240); + } + #endif + accept_balance = cshared.conf.get_int("accept_balance", 0); +} + +namespace { + +struct thr_init { + thr_init(const dbcontext_ptr& dc, volatile int& shutdown_flag) : dbctx(dc) { + dbctx->init_thread(this, shutdown_flag); + } + ~thr_init() { + dbctx->term_thread(); + } + const dbcontext_ptr& dbctx; +}; + +}; // namespace + +void +hstcpsvr_worker::run() +{ + thr_init initobj(dbctx, vshared.shutdown); + + #ifdef __linux__ + if (cshared.sockargs.use_epoll) { + while (!vshared.shutdown && dbctx->check_alive()) { + run_one_ep(); + } + } else if (cshared.sockargs.nonblocking) { + while (!vshared.shutdown && dbctx->check_alive()) { + run_one_nb(); + } + } else { + /* UNUSED */ + fatal_abort("run_one"); + } + #else + while (!vshared.shutdown && dbctx->check_alive()) { + run_one_nb(); + } + #endif +} + +int +hstcpsvr_worker::run_one_nb() +{ + size_t nfds = 0; + /* CLIENT SOCKETS */ + for (hstcpsvr_conns_type::const_iterator i = conns.begin(); + i != conns.end(); ++i) { + if (pfds.size() <= nfds) { + pfds.resize(nfds + 1); + } + pollfd& pfd = pfds[nfds++]; + pfd.fd = (*i)->fd.get(); + short ev = 0; + if ((*i)->cstate.writebuf.size() != 0) { + ev = POLLOUT; + } else { + ev = POLLIN; + } + pfd.events = pfd.revents = ev; + } + /* LISTENER */ + { + const size_t cpt = cshared.nb_conn_per_thread; + const short ev = (cpt > nfds) ? POLLIN : 0; + if (pfds.size() <= nfds) { + pfds.resize(nfds + 1); + } + pollfd& pfd = pfds[nfds++]; + pfd.fd = cshared.listen_fd.get(); + pfd.events = pfd.revents = ev; + } + /* POLL */ + const int npollev = poll(&pfds[0], nfds, 1 * 1000); + dbctx->set_statistics(conns.size(), npollev); + const time_t now = time(0); + size_t j = 0; + const short mask_in = ~POLLOUT; + const short mask_out = POLLOUT | POLLERR | POLLHUP | POLLNVAL; + /* READ */ + for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end(); + ++i, ++j) { + pollfd& pfd = pfds[j]; + if ((pfd.revents & mask_in) == 0) { + continue; + } + hstcpsvr_conn& conn = **i; + if (conn.read_more()) { + if (conn.cstate.readbuf.size() > 0) { + const char ch = conn.cstate.readbuf.begin()[0]; + if (ch == 'Q') { + vshared.shutdown = 1; + } else if (ch == '/') { + conn.cstate.readbuf.clear(); + conn.cstate.find_nl_pos = 0; + conn.cstate.writebuf.clear(); + conn.read_finished = true; + conn.write_finished = true; + } + } + conn.nb_last_io = now; + } + } + /* EXECUTE */ + j = 0; + for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end(); + ++i, ++j) { + pollfd& pfd = pfds[j]; + if ((pfd.revents & mask_in) == 0 || (*i)->cstate.readbuf.size() == 0) { + continue; + } + execute_lines(**i); + } + /* COMMIT */ + dbctx->unlock_tables_if(); + const bool commit_error = dbctx->get_commit_error(); + dbctx->clear_error(); + /* WRITE/CLOSE */ + j = 0; + for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end(); + ++j) { + pollfd& pfd = pfds[j]; + hstcpsvr_conn& conn = **i; + hstcpsvr_conns_type::iterator icur = i; + ++i; + if (commit_error) { + conn.reset(); + continue; + } + if ((pfd.revents & (mask_out | mask_in)) != 0) { + if (conn.write_more()) { + conn.nb_last_io = now; + } + } + if (cshared.sockargs.timeout != 0 && + conn.nb_last_io + cshared.sockargs.timeout < now) { + conn.reset(); + } + if (conn.closed() || conn.ok_to_close()) { + conns.erase_ptr(icur); + } + } + /* ACCEPT */ + { + pollfd& pfd = pfds[nfds - 1]; + if ((pfd.revents & mask_in) != 0) { + std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn()); + c->nonblocking = true; + c->readsize = cshared.readsize; + c->accept(cshared); + if (c->fd.get() >= 0) { + if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) { + fatal_abort("F_SETFL O_NONBLOCK"); + } + c->nb_last_io = now; + conns.push_back_ptr(c); + } else { + /* errno == 11 (EAGAIN) is not a fatal error. */ + DENA_VERBOSE(100, fprintf(stderr, + "accept failed: errno=%d (not fatal)\n", errno)); + } + } + } + DENA_VERBOSE(30, fprintf(stderr, "nb: %p nfds=%zu cns=%zu\n", this, nfds, + conns.size())); + if (conns.empty()) { + dbctx->close_tables_if(); + } + dbctx->set_statistics(conns.size(), 0); + return 0; +} + +#ifdef __linux__ +int +hstcpsvr_worker::run_one_ep() +{ + epoll_event *const events = &events_vec[0]; + const size_t num_events = events_vec.size(); + const time_t now = time(0); + size_t in_count = 0, out_count = 0, accept_count = 0; + int nfds = epoll_wait(epoll_fd.get(), events, num_events, 1000); + /* READ/ACCEPT */ + dbctx->set_statistics(conns.size(), nfds); + for (int i = 0; i < nfds; ++i) { + epoll_event& ev = events[i]; + if ((ev.events & EPOLLIN) == 0) { + continue; + } + hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr); + if (conn == 0) { + /* listener */ + ++accept_count; + DBG_EP(fprintf(stderr, "IN listener\n")); + std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn()); + c->nonblocking = true; + c->readsize = cshared.readsize; + c->accept(cshared); + if (c->fd.get() >= 0) { + if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) { + fatal_abort("F_SETFL O_NONBLOCK"); + } + epoll_event cev; + memset(&cev, 0, sizeof(cev)); + cev.events = EPOLLIN | EPOLLOUT | EPOLLET; + cev.data.ptr = c.get(); + c->nb_last_io = now; + const int fd = c->fd.get(); + conns.push_back_ptr(c); + conns.back()->conns_iter = --conns.end(); + if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, fd, &cev) != 0) { + fatal_abort("epoll_ctl EPOLL_CTL_ADD"); + } + } else { + DENA_VERBOSE(100, fprintf(stderr, + "accept failed: errno=%d (not fatal)\n", errno)); + } + } else { + /* client connection */ + ++in_count; + DBG_EP(fprintf(stderr, "IN client\n")); + bool more_data = false; + while (conn->read_more(&more_data)) { + DBG_EP(fprintf(stderr, "IN client read_more\n")); + conn->nb_last_io = now; + if (!more_data) { + break; + } + } + } + } + /* EXECUTE */ + for (int i = 0; i < nfds; ++i) { + epoll_event& ev = events[i]; + hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr); + if ((ev.events & EPOLLIN) == 0 || conn == 0 || + conn->cstate.readbuf.size() == 0) { + continue; + } + const char ch = conn->cstate.readbuf.begin()[0]; + if (ch == 'Q') { + vshared.shutdown = 1; + } else if (ch == '/') { + conn->cstate.readbuf.clear(); + conn->cstate.find_nl_pos = 0; + conn->cstate.writebuf.clear(); + conn->read_finished = true; + conn->write_finished = true; + } else { + execute_lines(*conn); + } + } + /* COMMIT */ + dbctx->unlock_tables_if(); + const bool commit_error = dbctx->get_commit_error(); + dbctx->clear_error(); + /* WRITE */ + for (int i = 0; i < nfds; ++i) { + epoll_event& ev = events[i]; + hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr); + if (commit_error && conn != 0) { + conn->reset(); + continue; + } + if ((ev.events & EPOLLOUT) == 0) { + continue; + } + ++out_count; + if (conn == 0) { + /* listener */ + DBG_EP(fprintf(stderr, "OUT listener\n")); + } else { + /* client connection */ + DBG_EP(fprintf(stderr, "OUT client\n")); + bool more_data = false; + while (conn->write_more(&more_data)) { + DBG_EP(fprintf(stderr, "OUT client write_more\n")); + conn->nb_last_io = now; + if (!more_data) { + break; + } + } + } + } + /* CLOSE */ + for (int i = 0; i < nfds; ++i) { + epoll_event& ev = events[i]; + hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr); + if (conn != 0 && conn->ok_to_close()) { + DBG_EP(fprintf(stderr, "CLOSE close\n")); + conns.erase_ptr(conn->conns_iter); + } + } + /* TIMEOUT & cleanup */ + if (last_check_time + 10 < now) { + for (hstcpsvr_conns_type::iterator i = conns.begin(); + i != conns.end(); ) { + hstcpsvr_conns_type::iterator icur = i; + ++i; + if (cshared.sockargs.timeout != 0 && + (*icur)->nb_last_io + cshared.sockargs.timeout < now) { + conns.erase_ptr((*icur)->conns_iter); + } + } + last_check_time = now; + DENA_VERBOSE(20, fprintf(stderr, "ep: %p nfds=%d cns=%zu\n", this, nfds, + conns.size())); + } + DENA_VERBOSE(30, fprintf(stderr, "%p in=%zu out=%zu ac=%zu, cns=%zu\n", + this, in_count, out_count, accept_count, conns.size())); + if (conns.empty()) { + dbctx->close_tables_if(); + } + /* STATISTICS */ + const size_t num_conns = conns.size(); + dbctx->set_statistics(num_conns, 0); + /* ENABLE/DISABLE ACCEPT */ + if (accept_balance != 0) { + cshared.thread_num_conns[worker_id] = num_conns; + size_t total_num_conns = 0; + for (long i = 0; i < cshared.num_threads; ++i) { + total_num_conns += cshared.thread_num_conns[i]; + } + bool e_acc = false; + if (num_conns < 10 || + total_num_conns * 2 > num_conns * cshared.num_threads) { + e_acc = true; + } + epoll_event ev; + memset(&ev, 0, sizeof(ev)); + ev.events = EPOLLIN; + ev.data.ptr = 0; + if (e_acc == accept_enabled) { + } else if (e_acc) { + if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev) + != 0) { + fatal_abort("epoll_ctl EPOLL_CTL_ADD"); + } + } else { + if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_DEL, cshared.listen_fd.get(), &ev) + != 0) { + fatal_abort("epoll_ctl EPOLL_CTL_ADD"); + } + } + accept_enabled = e_acc; + } + return 0; +} +#endif + +void +hstcpsvr_worker::execute_lines(hstcpsvr_conn& conn) +{ + DBG_MULTI(int cnt = 0); + dbconnstate& cstate = conn.cstate; + char *buf_end = cstate.readbuf.end(); + char *line_begin = cstate.readbuf.begin(); + char *find_pos = line_begin + cstate.find_nl_pos; + while (true) { + char *const nl = memchr_char(find_pos, '\n', buf_end - find_pos); + if (nl == 0) { + break; + } + char *const lf = (line_begin != nl && nl[-1] == '\r') ? nl - 1 : nl; + DBG_MULTI(cnt++); + execute_line(line_begin, lf, conn); + find_pos = line_begin = nl + 1; + } + cstate.readbuf.erase_front(line_begin - cstate.readbuf.begin()); + cstate.find_nl_pos = cstate.readbuf.size(); + DBG_MULTI(fprintf(stderr, "cnt=%d\n", cnt)); +} + +void +hstcpsvr_worker::execute_line(char *start, char *finish, hstcpsvr_conn& conn) +{ + /* safe to modify, safe to dereference 'finish' */ + char *const cmd_begin = start; + read_token(start, finish); + char *const cmd_end = start; + skip_one(start, finish); + if (cmd_begin == cmd_end) { + return conn.dbcb_resp_short(2, "cmd"); + } + if (cmd_begin + 1 == cmd_end) { + if (cmd_begin[0] == 'P') { + if (cshared.require_auth && !conn.authorized) { + return conn.dbcb_resp_short(3, "unauth"); + } + return do_open_index(start, finish, conn); + } + if (cmd_begin[0] == 'A') { + return do_authorization(start, finish, conn); + } + } + if (cmd_begin[0] >= '0' && cmd_begin[0] <= '9') { + if (cshared.require_auth && !conn.authorized) { + return conn.dbcb_resp_short(3, "unauth"); + } + return do_exec_on_index(cmd_begin, cmd_end, start, finish, conn); + } + return conn.dbcb_resp_short(2, "cmd"); +} + +void +hstcpsvr_worker::do_open_index(char *start, char *finish, hstcpsvr_conn& conn) +{ + const size_t pst_id = read_ui32(start, finish); + skip_one(start, finish); + /* dbname */ + char *const dbname_begin = start; + read_token(start, finish); + char *const dbname_end = start; + skip_one(start, finish); + /* tblname */ + char *const tblname_begin = start; + read_token(start, finish); + char *const tblname_end = start; + skip_one(start, finish); + /* idxname */ + char *const idxname_begin = start; + read_token(start, finish); + char *const idxname_end = start; + skip_one(start, finish); + /* retfields */ + char *const retflds_begin = start; + read_token(start, finish); + char *const retflds_end = start; + skip_one(start, finish); + /* filfields */ + char *const filflds_begin = start; + read_token(start, finish); + char *const filflds_end = start; + dbname_end[0] = 0; + tblname_end[0] = 0; + idxname_end[0] = 0; + retflds_end[0] = 0; + filflds_end[0] = 0; + cmd_open_args args; + args.pst_id = pst_id; + args.dbn = dbname_begin; + args.tbl = tblname_begin; + args.idx = idxname_begin; + args.retflds = retflds_begin; + args.filflds = filflds_begin; + return dbctx->cmd_open(conn, args); +} + +void +hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start, + char *finish, hstcpsvr_conn& conn) +{ + cmd_exec_args args; + const size_t pst_id = read_ui32(cmd_begin, cmd_end); + if (pst_id >= conn.cstate.prep_stmts.size()) { + return conn.dbcb_resp_short(2, "stmtnum"); + } + args.pst = &conn.cstate.prep_stmts[pst_id]; + char *const op_begin = start; + read_token(start, finish); + char *const op_end = start; + args.op = string_ref(op_begin, op_end); + skip_one(start, finish); + const uint32_t fldnum = read_ui32(start, finish); + string_ref *const flds = DENA_ALLOCA_ALLOCATE(string_ref, fldnum); + auto_alloca_free<string_ref> flds_autofree(flds); + args.kvals = flds; + args.kvalslen = fldnum; + for (size_t i = 0; i < fldnum; ++i) { + skip_one(start, finish); + char *const f_begin = start; + read_token(start, finish); + char *const f_end = start; + if (is_null_expression(f_begin, f_end)) { + /* null */ + flds[i] = string_ref(); + } else { + /* non-null */ + char *wp = f_begin; + unescape_string(wp, f_begin, f_end); + flds[i] = string_ref(f_begin, wp - f_begin); + } + } + skip_one(start, finish); + args.limit = read_ui32(start, finish); + skip_one(start, finish); + args.skip = read_ui32(start, finish); + if (start == finish) { + /* simple query */ + return dbctx->cmd_exec(conn, args); + } + /* has more options */ + skip_one(start, finish); + /* in-clause */ + if (start[0] == '@') { + read_token(start, finish); /* '@' */ + skip_one(start, finish); + args.invalues_keypart = read_ui32(start, finish); + skip_one(start, finish); + args.invalueslen = read_ui32(start, finish); + if (args.invalueslen <= 0) { + return conn.dbcb_resp_short(2, "invalueslen"); + } + if (invalues_work.size() < args.invalueslen) { + invalues_work.resize(args.invalueslen); + } + args.invalues = &invalues_work[0]; + for (uint32_t i = 0; i < args.invalueslen; ++i) { + skip_one(start, finish); + char *const invalue_begin = start; + read_token(start, finish); + char *const invalue_end = start; + char *wp = invalue_begin; + unescape_string(wp, invalue_begin, invalue_end); + invalues_work[i] = string_ref(invalue_begin, wp - invalue_begin); + } + skip_one(start, finish); + } + if (start == finish) { + /* no more options */ + return dbctx->cmd_exec(conn, args); + } + /* filters */ + size_t filters_count = 0; + while (start != finish && (start[0] == 'W' || start[0] == 'F')) { + char *const filter_type_begin = start; + read_token(start, finish); + char *const filter_type_end = start; + skip_one(start, finish); + char *const filter_op_begin = start; + read_token(start, finish); + char *const filter_op_end = start; + skip_one(start, finish); + const uint32_t ff_offset = read_ui32(start, finish); + skip_one(start, finish); + char *const filter_val_begin = start; + read_token(start, finish); + char *const filter_val_end = start; + skip_one(start, finish); + if (filters_work.size() <= filters_count) { + filters_work.resize(filters_count + 1); + } + record_filter& fi = filters_work[filters_count]; + if (filter_type_end != filter_type_begin + 1) { + return conn.dbcb_resp_short(2, "filtertype"); + } + fi.filter_type = (filter_type_begin[0] == 'W') + ? record_filter_type_break : record_filter_type_skip; + const uint32_t num_filflds = args.pst->get_filter_fields().size(); + if (ff_offset >= num_filflds) { + return conn.dbcb_resp_short(2, "filterfld"); + } + fi.op = string_ref(filter_op_begin, filter_op_end); + fi.ff_offset = ff_offset; + if (is_null_expression(filter_val_begin, filter_val_end)) { + /* null */ + fi.val = string_ref(); + } else { + /* non-null */ + char *wp = filter_val_begin; + unescape_string(wp, filter_val_begin, filter_val_end); + fi.val = string_ref(filter_val_begin, wp - filter_val_begin); + } + ++filters_count; + } + if (filters_count > 0) { + if (filters_work.size() <= filters_count) { + filters_work.resize(filters_count + 1); + } + filters_work[filters_count].op = string_ref(); /* sentinel */ + args.filters = &filters_work[0]; + } else { + args.filters = 0; + } + if (start == finish) { + /* no modops */ + return dbctx->cmd_exec(conn, args); + } + /* has modops */ + char *const mod_op_begin = start; + read_token(start, finish); + char *const mod_op_end = start; + args.mod_op = string_ref(mod_op_begin, mod_op_end); + const size_t num_uvals = args.pst->get_ret_fields().size(); + string_ref *const uflds = DENA_ALLOCA_ALLOCATE(string_ref, num_uvals); + auto_alloca_free<string_ref> uflds_autofree(uflds); + for (size_t i = 0; i < num_uvals; ++i) { + skip_one(start, finish); + char *const f_begin = start; + read_token(start, finish); + char *const f_end = start; + if (is_null_expression(f_begin, f_end)) { + /* null */ + uflds[i] = string_ref(); + } else { + /* non-null */ + char *wp = f_begin; + unescape_string(wp, f_begin, f_end); + uflds[i] = string_ref(f_begin, wp - f_begin); + } + } + args.uvals = uflds; + return dbctx->cmd_exec(conn, args); +} + +void +hstcpsvr_worker::do_authorization(char *start, char *finish, + hstcpsvr_conn& conn) +{ + /* auth type */ + char *const authtype_begin = start; + read_token(start, finish); + char *const authtype_end = start; + const size_t authtype_len = authtype_end - authtype_begin; + skip_one(start, finish); + /* key */ + char *const key_begin = start; + read_token(start, finish); + char *const key_end = start; + const size_t key_len = key_end - key_begin; + authtype_end[0] = 0; + key_end[0] = 0; + char *wp = key_begin; + unescape_string(wp, key_begin, key_end); + if (authtype_len != 1 || authtype_begin[0] != '1') { + return conn.dbcb_resp_short(3, "authtype"); + } + if (cshared.plain_secret.size() == key_len && + memcmp(cshared.plain_secret.data(), key_begin, key_len) == 0) { + conn.authorized = true; + } else { + conn.authorized = false; + } + if (!conn.authorized) { + return conn.dbcb_resp_short(3, "unauth"); + } else { + return conn.dbcb_resp_short(0, ""); + } +} + +hstcpsvr_worker_ptr +hstcpsvr_worker_i::create(const hstcpsvr_worker_arg& arg) +{ + return hstcpsvr_worker_ptr(new hstcpsvr_worker(arg)); +} + +}; + diff --git a/plugin/handler_socket/handlersocket/hstcpsvr_worker.hpp b/plugin/handler_socket/handlersocket/hstcpsvr_worker.hpp new file mode 100644 index 00000000..497581c2 --- /dev/null +++ b/plugin/handler_socket/handlersocket/hstcpsvr_worker.hpp @@ -0,0 +1,35 @@ + +// vim:sw=2:ai + +/* + * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. + * See COPYRIGHT.txt for details. + */ + +#ifndef DENA_HSTCPSVR_WORKER_HPP +#define DENA_HSTCPSVR_WORKER_HPP + +#include "hstcpsvr.hpp" + +namespace dena { + +struct hstcpsvr_worker_i; +typedef std::auto_ptr<hstcpsvr_worker_i> hstcpsvr_worker_ptr; + +struct hstcpsvr_worker_arg { + const hstcpsvr_shared_c *cshared; + volatile hstcpsvr_shared_v *vshared; + long worker_id; + hstcpsvr_worker_arg() : cshared(0), vshared(0), worker_id(0) { } +}; + +struct hstcpsvr_worker_i { + virtual ~hstcpsvr_worker_i() { } + virtual void run() = 0; + static hstcpsvr_worker_ptr create(const hstcpsvr_worker_arg& arg); +}; + +}; + +#endif + diff --git a/plugin/handler_socket/handlersocket/mysql_incl.hpp b/plugin/handler_socket/handlersocket/mysql_incl.hpp new file mode 100644 index 00000000..0d056a7e --- /dev/null +++ b/plugin/handler_socket/handlersocket/mysql_incl.hpp @@ -0,0 +1,55 @@ + +// vim:sw=2:ai + +/* + * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. + * See COPYRIGHT.txt for details. + */ + +#ifndef DENA_MYSQL_INCL_HPP +#define DENA_MYSQL_INCL_HPP + +#ifndef HAVE_CONFIG_H +#define HAVE_CONFIG_H +#endif + +#ifndef MYSQL_DYNAMIC_PLUGIN +#define MYSQL_DYNAMIC_PLUGIN +#endif + +#define MYSQL_SERVER 1 + +#include <my_global.h> +#include <mysql_version.h> + +#if MYSQL_VERSION_ID >= 50505 +#include <my_pthread.h> +#include <sql_priv.h> +#include "sql_class.h" +#include "unireg.h" +#include "lock.h" +#include "key.h" // key_copy() +#include <my_global.h> +#include <mysql/plugin.h> +#include <transaction.h> +#include <sql_base.h> +// FIXME FIXME FIXME +#define safeFree(X) my_free(X) +#undef pthread_cond_timedwait +#undef pthread_mutex_lock +#undef pthread_mutex_unlock +#define pthread_cond_timedwait mysql_cond_timedwait +#define pthread_mutex_lock mysql_mutex_lock +#define pthread_mutex_unlock mysql_mutex_unlock +#define current_stmt_binlog_row_based is_current_stmt_binlog_format_row +#define clear_current_stmt_binlog_row_based clear_current_stmt_binlog_format_row + +#else +#include "mysql_priv.h" +#endif + +#undef min +#undef max + +#endif + |