diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:07:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:07:14 +0000 |
commit | a175314c3e5827eb193872241446f2f8f5c9d33c (patch) | |
tree | cd3d60ca99ae00829c52a6ca79150a5b6e62528b /plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp | |
parent | Initial commit. (diff) | |
download | mariadb-10.5-upstream.tar.xz mariadb-10.5-upstream.zip |
Adding upstream version 1:10.5.12.upstream/1%10.5.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp')
-rw-r--r-- | plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp | 957 |
1 files changed, 957 insertions, 0 deletions
diff --git a/plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp b/plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp new file mode 100644 index 00000000..9863602a --- /dev/null +++ b/plugin/handler_socket/handlersocket/hstcpsvr_worker.cpp @@ -0,0 +1,957 @@ + +// vim:sw=2:ai + +/* + * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. + * See COPYRIGHT.txt for details. + */ + +#include <my_global.h> +#include <netinet/in.h> +#include <errno.h> +#include <poll.h> +#include <unistd.h> +#include <stdexcept> +#include <signal.h> +#include <list> +#if __linux__ +#include <sys/epoll.h> +#endif +#ifdef HAVE_ALLOCA_H +#include <alloca.h> +#endif + +#include "hstcpsvr_worker.hpp" +#include "string_buffer.hpp" +#include "auto_ptrcontainer.hpp" +#include "string_util.hpp" +#include "escape.hpp" + +#define DBG_FD(x) +#define DBG_TR(x) +#define DBG_EP(x) +#define DBG_MULTI(x) + +/* TODO */ +#if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL) +#define MSG_NOSIGNAL 0 +#endif + +namespace dena { + +struct dbconnstate { + string_buffer readbuf; + string_buffer writebuf; + std::vector<prep_stmt> prep_stmts; + size_t resp_begin_pos; + size_t find_nl_pos; + void reset() { + readbuf.clear(); + writebuf.clear(); + prep_stmts.clear(); + resp_begin_pos = 0; + find_nl_pos = 0; + } + dbconnstate() : resp_begin_pos(0), find_nl_pos(0) { } +}; + +struct hstcpsvr_conn; +typedef auto_ptrcontainer< std::list<hstcpsvr_conn *> > hstcpsvr_conns_type; + +struct hstcpsvr_conn : public dbcallback_i { + public: + auto_file fd; + sockaddr_storage addr; + size_socket addr_len; + dbconnstate cstate; + std::string err; + size_t readsize; + bool nonblocking; + bool read_finished; + bool write_finished; + time_t nb_last_io; + hstcpsvr_conns_type::iterator conns_iter; + bool authorized; + public: + bool closed() const; + bool ok_to_close() const; + void reset(); + int accept(const hstcpsvr_shared_c& cshared); + bool write_more(bool *more_r = 0); + bool read_more(bool *more_r = 0); + public: + virtual void dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v); + virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const; + virtual void dbcb_resp_short(uint32_t code, const char *msg); + virtual void dbcb_resp_short_num(uint32_t code, uint32_t value); + virtual void dbcb_resp_short_num64(uint32_t code, uint64_t value); + virtual void dbcb_resp_begin(size_t num_flds); + virtual void dbcb_resp_entry(const char *fld, size_t fldlen); + virtual void dbcb_resp_end(); + virtual void dbcb_resp_cancel(); + public: + hstcpsvr_conn() : addr_len(sizeof(addr)), readsize(4096), + nonblocking(false), read_finished(false), write_finished(false), + nb_last_io(0), authorized(false) { } +}; + +bool +hstcpsvr_conn::closed() const +{ + return fd.get() < 0; +} + +bool +hstcpsvr_conn::ok_to_close() const +{ + return write_finished || (read_finished && cstate.writebuf.size() == 0); +} + +void +hstcpsvr_conn::reset() +{ + addr = sockaddr_storage(); + addr_len = sizeof(addr); + cstate.reset(); + fd.reset(); + read_finished = false; + write_finished = false; +} + +int +hstcpsvr_conn::accept(const hstcpsvr_shared_c& cshared) +{ + reset(); + return socket_accept(cshared.listen_fd.get(), fd, cshared.sockargs, addr, + addr_len, err); +} + +bool +hstcpsvr_conn::write_more(bool *more_r) +{ + if (write_finished || cstate.writebuf.size() == 0) { + return false; + } + const size_t wlen = cstate.writebuf.size(); + ssize_t len = send(fd.get(), cstate.writebuf.begin(), wlen, MSG_NOSIGNAL); + if (len <= 0) { + if (len == 0 || !nonblocking || errno != EWOULDBLOCK) { + cstate.writebuf.clear(); + write_finished = true; + } + return false; + } + cstate.writebuf.erase_front(len); + /* FIXME: reallocate memory if too large */ + if (more_r) { + *more_r = (static_cast<size_t>(len) == wlen); + } + return true; +} + +bool +hstcpsvr_conn::read_more(bool *more_r) +{ + if (read_finished) { + return false; + } + const size_t block_size = readsize > 4096 ? readsize : 4096; + char *wp = cstate.readbuf.make_space(block_size); + const ssize_t len = read(fd.get(), wp, block_size); + if (len <= 0) { + if (len == 0 || !nonblocking || errno != EWOULDBLOCK) { + read_finished = true; + } + return false; + } + cstate.readbuf.space_wrote(len); + if (more_r) { + *more_r = (static_cast<size_t>(len) == block_size); + } + return true; +} + +void +hstcpsvr_conn::dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v) +{ + if (cstate.prep_stmts.size() <= pst_id) { + cstate.prep_stmts.resize(pst_id + 1); + } + cstate.prep_stmts[pst_id] = v; +} + +const prep_stmt * +hstcpsvr_conn::dbcb_get_prep_stmt(size_t pst_id) const +{ + if (cstate.prep_stmts.size() <= pst_id) { + return 0; + } + return &cstate.prep_stmts[pst_id]; +} + +void +hstcpsvr_conn::dbcb_resp_short(uint32_t code, const char *msg) +{ + write_ui32(cstate.writebuf, code); + const size_t msglen = strlen(msg); + if (msglen != 0) { + cstate.writebuf.append_literal("\t1\t"); + cstate.writebuf.append(msg, msg + msglen); + } else { + cstate.writebuf.append_literal("\t1"); + } + cstate.writebuf.append_literal("\n"); +} + +void +hstcpsvr_conn::dbcb_resp_short_num(uint32_t code, uint32_t value) +{ + write_ui32(cstate.writebuf, code); + cstate.writebuf.append_literal("\t1\t"); + write_ui32(cstate.writebuf, value); + cstate.writebuf.append_literal("\n"); +} + +void +hstcpsvr_conn::dbcb_resp_short_num64(uint32_t code, uint64_t value) +{ + write_ui32(cstate.writebuf, code); + cstate.writebuf.append_literal("\t1\t"); + write_ui64(cstate.writebuf, value); + cstate.writebuf.append_literal("\n"); +} + +void +hstcpsvr_conn::dbcb_resp_begin(size_t num_flds) +{ + cstate.resp_begin_pos = cstate.writebuf.size(); + cstate.writebuf.append_literal("0\t"); + write_ui32(cstate.writebuf, num_flds); +} + +void +hstcpsvr_conn::dbcb_resp_entry(const char *fld, size_t fldlen) +{ + if (fld != 0) { + cstate.writebuf.append_literal("\t"); + escape_string(cstate.writebuf, fld, fld + fldlen); + } else { + static const char t[] = "\t\0"; + cstate.writebuf.append(t, t + 2); + } +} + +void +hstcpsvr_conn::dbcb_resp_end() +{ + cstate.writebuf.append_literal("\n"); + cstate.resp_begin_pos = 0; +} + +void +hstcpsvr_conn::dbcb_resp_cancel() +{ + cstate.writebuf.resize(cstate.resp_begin_pos); + cstate.resp_begin_pos = 0; +} + +struct hstcpsvr_worker : public hstcpsvr_worker_i, private noncopyable { + hstcpsvr_worker(const hstcpsvr_worker_arg& arg); + virtual void run(); + private: + const hstcpsvr_shared_c& cshared; + volatile hstcpsvr_shared_v& vshared; + long worker_id; + dbcontext_ptr dbctx; + hstcpsvr_conns_type conns; /* conns refs dbctx */ + time_t last_check_time; + std::vector<pollfd> pfds; + #ifdef __linux__ + std::vector<epoll_event> events_vec; + auto_file epoll_fd; + #endif + bool accept_enabled; + int accept_balance; + std::vector<string_ref> invalues_work; + std::vector<record_filter> filters_work; + private: + int run_one_nb(); + int run_one_ep(); + void execute_lines(hstcpsvr_conn& conn); + void execute_line(char *start, char *finish, hstcpsvr_conn& conn); + void do_open_index(char *start, char *finish, hstcpsvr_conn& conn); + void do_exec_on_index(char *cmd_begin, char *cmd_end, char *start, + char *finish, hstcpsvr_conn& conn); + void do_authorization(char *start, char *finish, hstcpsvr_conn& conn); +}; + +hstcpsvr_worker::hstcpsvr_worker(const hstcpsvr_worker_arg& arg) + : cshared(*arg.cshared), vshared(*arg.vshared), worker_id(arg.worker_id), + dbctx(cshared.dbptr->create_context(cshared.for_write_flag)), + last_check_time(time(0)), accept_enabled(true), accept_balance(0) +{ + #ifdef __linux__ + if (cshared.sockargs.use_epoll) { + epoll_fd.reset(epoll_create(10)); + if (epoll_fd.get() < 0) { + fatal_abort("epoll_create"); + } + epoll_event ev; + memset(&ev, 0, sizeof(ev)); + ev.events = EPOLLIN; + ev.data.ptr = 0; + if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev) + != 0) { + fatal_abort("epoll_ctl EPOLL_CTL_ADD"); + } + events_vec.resize(10240); + } + #endif + accept_balance = cshared.conf.get_int("accept_balance", 0); +} + +namespace { + +struct thr_init { + thr_init(const dbcontext_ptr& dc, volatile int& shutdown_flag) : dbctx(dc) { + dbctx->init_thread(this, shutdown_flag); + } + ~thr_init() { + dbctx->term_thread(); + } + const dbcontext_ptr& dbctx; +}; + +}; // namespace + +void +hstcpsvr_worker::run() +{ + thr_init initobj(dbctx, vshared.shutdown); + + #ifdef __linux__ + if (cshared.sockargs.use_epoll) { + while (!vshared.shutdown && dbctx->check_alive()) { + run_one_ep(); + } + } else if (cshared.sockargs.nonblocking) { + while (!vshared.shutdown && dbctx->check_alive()) { + run_one_nb(); + } + } else { + /* UNUSED */ + fatal_abort("run_one"); + } + #else + while (!vshared.shutdown && dbctx->check_alive()) { + run_one_nb(); + } + #endif +} + +int +hstcpsvr_worker::run_one_nb() +{ + size_t nfds = 0; + /* CLIENT SOCKETS */ + for (hstcpsvr_conns_type::const_iterator i = conns.begin(); + i != conns.end(); ++i) { + if (pfds.size() <= nfds) { + pfds.resize(nfds + 1); + } + pollfd& pfd = pfds[nfds++]; + pfd.fd = (*i)->fd.get(); + short ev = 0; + if ((*i)->cstate.writebuf.size() != 0) { + ev = POLLOUT; + } else { + ev = POLLIN; + } + pfd.events = pfd.revents = ev; + } + /* LISTENER */ + { + const size_t cpt = cshared.nb_conn_per_thread; + const short ev = (cpt > nfds) ? POLLIN : 0; + if (pfds.size() <= nfds) { + pfds.resize(nfds + 1); + } + pollfd& pfd = pfds[nfds++]; + pfd.fd = cshared.listen_fd.get(); + pfd.events = pfd.revents = ev; + } + /* POLL */ + const int npollev = poll(&pfds[0], nfds, 1 * 1000); + dbctx->set_statistics(conns.size(), npollev); + const time_t now = time(0); + size_t j = 0; + const short mask_in = ~POLLOUT; + const short mask_out = POLLOUT | POLLERR | POLLHUP | POLLNVAL; + /* READ */ + for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end(); + ++i, ++j) { + pollfd& pfd = pfds[j]; + if ((pfd.revents & mask_in) == 0) { + continue; + } + hstcpsvr_conn& conn = **i; + if (conn.read_more()) { + if (conn.cstate.readbuf.size() > 0) { + const char ch = conn.cstate.readbuf.begin()[0]; + if (ch == 'Q') { + vshared.shutdown = 1; + } else if (ch == '/') { + conn.cstate.readbuf.clear(); + conn.cstate.find_nl_pos = 0; + conn.cstate.writebuf.clear(); + conn.read_finished = true; + conn.write_finished = true; + } + } + conn.nb_last_io = now; + } + } + /* EXECUTE */ + j = 0; + for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end(); + ++i, ++j) { + pollfd& pfd = pfds[j]; + if ((pfd.revents & mask_in) == 0 || (*i)->cstate.readbuf.size() == 0) { + continue; + } + execute_lines(**i); + } + /* COMMIT */ + dbctx->unlock_tables_if(); + const bool commit_error = dbctx->get_commit_error(); + dbctx->clear_error(); + /* WRITE/CLOSE */ + j = 0; + for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end(); + ++j) { + pollfd& pfd = pfds[j]; + hstcpsvr_conn& conn = **i; + hstcpsvr_conns_type::iterator icur = i; + ++i; + if (commit_error) { + conn.reset(); + continue; + } + if ((pfd.revents & (mask_out | mask_in)) != 0) { + if (conn.write_more()) { + conn.nb_last_io = now; + } + } + if (cshared.sockargs.timeout != 0 && + conn.nb_last_io + cshared.sockargs.timeout < now) { + conn.reset(); + } + if (conn.closed() || conn.ok_to_close()) { + conns.erase_ptr(icur); + } + } + /* ACCEPT */ + { + pollfd& pfd = pfds[nfds - 1]; + if ((pfd.revents & mask_in) != 0) { + std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn()); + c->nonblocking = true; + c->readsize = cshared.readsize; + c->accept(cshared); + if (c->fd.get() >= 0) { + if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) { + fatal_abort("F_SETFL O_NONBLOCK"); + } + c->nb_last_io = now; + conns.push_back_ptr(c); + } else { + /* errno == 11 (EAGAIN) is not a fatal error. */ + DENA_VERBOSE(100, fprintf(stderr, + "accept failed: errno=%d (not fatal)\n", errno)); + } + } + } + DENA_VERBOSE(30, fprintf(stderr, "nb: %p nfds=%zu cns=%zu\n", this, nfds, + conns.size())); + if (conns.empty()) { + dbctx->close_tables_if(); + } + dbctx->set_statistics(conns.size(), 0); + return 0; +} + +#ifdef __linux__ +int +hstcpsvr_worker::run_one_ep() +{ + epoll_event *const events = &events_vec[0]; + const size_t num_events = events_vec.size(); + const time_t now = time(0); + size_t in_count = 0, out_count = 0, accept_count = 0; + int nfds = epoll_wait(epoll_fd.get(), events, num_events, 1000); + /* READ/ACCEPT */ + dbctx->set_statistics(conns.size(), nfds); + for (int i = 0; i < nfds; ++i) { + epoll_event& ev = events[i]; + if ((ev.events & EPOLLIN) == 0) { + continue; + } + hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr); + if (conn == 0) { + /* listener */ + ++accept_count; + DBG_EP(fprintf(stderr, "IN listener\n")); + std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn()); + c->nonblocking = true; + c->readsize = cshared.readsize; + c->accept(cshared); + if (c->fd.get() >= 0) { + if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) { + fatal_abort("F_SETFL O_NONBLOCK"); + } + epoll_event cev; + memset(&cev, 0, sizeof(cev)); + cev.events = EPOLLIN | EPOLLOUT | EPOLLET; + cev.data.ptr = c.get(); + c->nb_last_io = now; + const int fd = c->fd.get(); + conns.push_back_ptr(c); + conns.back()->conns_iter = --conns.end(); + if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, fd, &cev) != 0) { + fatal_abort("epoll_ctl EPOLL_CTL_ADD"); + } + } else { + DENA_VERBOSE(100, fprintf(stderr, + "accept failed: errno=%d (not fatal)\n", errno)); + } + } else { + /* client connection */ + ++in_count; + DBG_EP(fprintf(stderr, "IN client\n")); + bool more_data = false; + while (conn->read_more(&more_data)) { + DBG_EP(fprintf(stderr, "IN client read_more\n")); + conn->nb_last_io = now; + if (!more_data) { + break; + } + } + } + } + /* EXECUTE */ + for (int i = 0; i < nfds; ++i) { + epoll_event& ev = events[i]; + hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr); + if ((ev.events & EPOLLIN) == 0 || conn == 0 || + conn->cstate.readbuf.size() == 0) { + continue; + } + const char ch = conn->cstate.readbuf.begin()[0]; + if (ch == 'Q') { + vshared.shutdown = 1; + } else if (ch == '/') { + conn->cstate.readbuf.clear(); + conn->cstate.find_nl_pos = 0; + conn->cstate.writebuf.clear(); + conn->read_finished = true; + conn->write_finished = true; + } else { + execute_lines(*conn); + } + } + /* COMMIT */ + dbctx->unlock_tables_if(); + const bool commit_error = dbctx->get_commit_error(); + dbctx->clear_error(); + /* WRITE */ + for (int i = 0; i < nfds; ++i) { + epoll_event& ev = events[i]; + hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr); + if (commit_error && conn != 0) { + conn->reset(); + continue; + } + if ((ev.events & EPOLLOUT) == 0) { + continue; + } + ++out_count; + if (conn == 0) { + /* listener */ + DBG_EP(fprintf(stderr, "OUT listener\n")); + } else { + /* client connection */ + DBG_EP(fprintf(stderr, "OUT client\n")); + bool more_data = false; + while (conn->write_more(&more_data)) { + DBG_EP(fprintf(stderr, "OUT client write_more\n")); + conn->nb_last_io = now; + if (!more_data) { + break; + } + } + } + } + /* CLOSE */ + for (int i = 0; i < nfds; ++i) { + epoll_event& ev = events[i]; + hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr); + if (conn != 0 && conn->ok_to_close()) { + DBG_EP(fprintf(stderr, "CLOSE close\n")); + conns.erase_ptr(conn->conns_iter); + } + } + /* TIMEOUT & cleanup */ + if (last_check_time + 10 < now) { + for (hstcpsvr_conns_type::iterator i = conns.begin(); + i != conns.end(); ) { + hstcpsvr_conns_type::iterator icur = i; + ++i; + if (cshared.sockargs.timeout != 0 && + (*icur)->nb_last_io + cshared.sockargs.timeout < now) { + conns.erase_ptr((*icur)->conns_iter); + } + } + last_check_time = now; + DENA_VERBOSE(20, fprintf(stderr, "ep: %p nfds=%d cns=%zu\n", this, nfds, + conns.size())); + } + DENA_VERBOSE(30, fprintf(stderr, "%p in=%zu out=%zu ac=%zu, cns=%zu\n", + this, in_count, out_count, accept_count, conns.size())); + if (conns.empty()) { + dbctx->close_tables_if(); + } + /* STATISTICS */ + const size_t num_conns = conns.size(); + dbctx->set_statistics(num_conns, 0); + /* ENABLE/DISABLE ACCEPT */ + if (accept_balance != 0) { + cshared.thread_num_conns[worker_id] = num_conns; + size_t total_num_conns = 0; + for (long i = 0; i < cshared.num_threads; ++i) { + total_num_conns += cshared.thread_num_conns[i]; + } + bool e_acc = false; + if (num_conns < 10 || + total_num_conns * 2 > num_conns * cshared.num_threads) { + e_acc = true; + } + epoll_event ev; + memset(&ev, 0, sizeof(ev)); + ev.events = EPOLLIN; + ev.data.ptr = 0; + if (e_acc == accept_enabled) { + } else if (e_acc) { + if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev) + != 0) { + fatal_abort("epoll_ctl EPOLL_CTL_ADD"); + } + } else { + if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_DEL, cshared.listen_fd.get(), &ev) + != 0) { + fatal_abort("epoll_ctl EPOLL_CTL_ADD"); + } + } + accept_enabled = e_acc; + } + return 0; +} +#endif + +void +hstcpsvr_worker::execute_lines(hstcpsvr_conn& conn) +{ + DBG_MULTI(int cnt = 0); + dbconnstate& cstate = conn.cstate; + char *buf_end = cstate.readbuf.end(); + char *line_begin = cstate.readbuf.begin(); + char *find_pos = line_begin + cstate.find_nl_pos; + while (true) { + char *const nl = memchr_char(find_pos, '\n', buf_end - find_pos); + if (nl == 0) { + break; + } + char *const lf = (line_begin != nl && nl[-1] == '\r') ? nl - 1 : nl; + DBG_MULTI(cnt++); + execute_line(line_begin, lf, conn); + find_pos = line_begin = nl + 1; + } + cstate.readbuf.erase_front(line_begin - cstate.readbuf.begin()); + cstate.find_nl_pos = cstate.readbuf.size(); + DBG_MULTI(fprintf(stderr, "cnt=%d\n", cnt)); +} + +void +hstcpsvr_worker::execute_line(char *start, char *finish, hstcpsvr_conn& conn) +{ + /* safe to modify, safe to dereference 'finish' */ + char *const cmd_begin = start; + read_token(start, finish); + char *const cmd_end = start; + skip_one(start, finish); + if (cmd_begin == cmd_end) { + return conn.dbcb_resp_short(2, "cmd"); + } + if (cmd_begin + 1 == cmd_end) { + if (cmd_begin[0] == 'P') { + if (cshared.require_auth && !conn.authorized) { + return conn.dbcb_resp_short(3, "unauth"); + } + return do_open_index(start, finish, conn); + } + if (cmd_begin[0] == 'A') { + return do_authorization(start, finish, conn); + } + } + if (cmd_begin[0] >= '0' && cmd_begin[0] <= '9') { + if (cshared.require_auth && !conn.authorized) { + return conn.dbcb_resp_short(3, "unauth"); + } + return do_exec_on_index(cmd_begin, cmd_end, start, finish, conn); + } + return conn.dbcb_resp_short(2, "cmd"); +} + +void +hstcpsvr_worker::do_open_index(char *start, char *finish, hstcpsvr_conn& conn) +{ + const size_t pst_id = read_ui32(start, finish); + skip_one(start, finish); + /* dbname */ + char *const dbname_begin = start; + read_token(start, finish); + char *const dbname_end = start; + skip_one(start, finish); + /* tblname */ + char *const tblname_begin = start; + read_token(start, finish); + char *const tblname_end = start; + skip_one(start, finish); + /* idxname */ + char *const idxname_begin = start; + read_token(start, finish); + char *const idxname_end = start; + skip_one(start, finish); + /* retfields */ + char *const retflds_begin = start; + read_token(start, finish); + char *const retflds_end = start; + skip_one(start, finish); + /* filfields */ + char *const filflds_begin = start; + read_token(start, finish); + char *const filflds_end = start; + dbname_end[0] = 0; + tblname_end[0] = 0; + idxname_end[0] = 0; + retflds_end[0] = 0; + filflds_end[0] = 0; + cmd_open_args args; + args.pst_id = pst_id; + args.dbn = dbname_begin; + args.tbl = tblname_begin; + args.idx = idxname_begin; + args.retflds = retflds_begin; + args.filflds = filflds_begin; + return dbctx->cmd_open(conn, args); +} + +void +hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start, + char *finish, hstcpsvr_conn& conn) +{ + cmd_exec_args args; + const size_t pst_id = read_ui32(cmd_begin, cmd_end); + if (pst_id >= conn.cstate.prep_stmts.size()) { + return conn.dbcb_resp_short(2, "stmtnum"); + } + args.pst = &conn.cstate.prep_stmts[pst_id]; + char *const op_begin = start; + read_token(start, finish); + char *const op_end = start; + args.op = string_ref(op_begin, op_end); + skip_one(start, finish); + const uint32_t fldnum = read_ui32(start, finish); + string_ref *const flds = DENA_ALLOCA_ALLOCATE(string_ref, fldnum); + auto_alloca_free<string_ref> flds_autofree(flds); + args.kvals = flds; + args.kvalslen = fldnum; + for (size_t i = 0; i < fldnum; ++i) { + skip_one(start, finish); + char *const f_begin = start; + read_token(start, finish); + char *const f_end = start; + if (is_null_expression(f_begin, f_end)) { + /* null */ + flds[i] = string_ref(); + } else { + /* non-null */ + char *wp = f_begin; + unescape_string(wp, f_begin, f_end); + flds[i] = string_ref(f_begin, wp - f_begin); + } + } + skip_one(start, finish); + args.limit = read_ui32(start, finish); + skip_one(start, finish); + args.skip = read_ui32(start, finish); + if (start == finish) { + /* simple query */ + return dbctx->cmd_exec(conn, args); + } + /* has more options */ + skip_one(start, finish); + /* in-clause */ + if (start[0] == '@') { + read_token(start, finish); /* '@' */ + skip_one(start, finish); + args.invalues_keypart = read_ui32(start, finish); + skip_one(start, finish); + args.invalueslen = read_ui32(start, finish); + if (args.invalueslen <= 0) { + return conn.dbcb_resp_short(2, "invalueslen"); + } + if (invalues_work.size() < args.invalueslen) { + invalues_work.resize(args.invalueslen); + } + args.invalues = &invalues_work[0]; + for (uint32_t i = 0; i < args.invalueslen; ++i) { + skip_one(start, finish); + char *const invalue_begin = start; + read_token(start, finish); + char *const invalue_end = start; + char *wp = invalue_begin; + unescape_string(wp, invalue_begin, invalue_end); + invalues_work[i] = string_ref(invalue_begin, wp - invalue_begin); + } + skip_one(start, finish); + } + if (start == finish) { + /* no more options */ + return dbctx->cmd_exec(conn, args); + } + /* filters */ + size_t filters_count = 0; + while (start != finish && (start[0] == 'W' || start[0] == 'F')) { + char *const filter_type_begin = start; + read_token(start, finish); + char *const filter_type_end = start; + skip_one(start, finish); + char *const filter_op_begin = start; + read_token(start, finish); + char *const filter_op_end = start; + skip_one(start, finish); + const uint32_t ff_offset = read_ui32(start, finish); + skip_one(start, finish); + char *const filter_val_begin = start; + read_token(start, finish); + char *const filter_val_end = start; + skip_one(start, finish); + if (filters_work.size() <= filters_count) { + filters_work.resize(filters_count + 1); + } + record_filter& fi = filters_work[filters_count]; + if (filter_type_end != filter_type_begin + 1) { + return conn.dbcb_resp_short(2, "filtertype"); + } + fi.filter_type = (filter_type_begin[0] == 'W') + ? record_filter_type_break : record_filter_type_skip; + const uint32_t num_filflds = args.pst->get_filter_fields().size(); + if (ff_offset >= num_filflds) { + return conn.dbcb_resp_short(2, "filterfld"); + } + fi.op = string_ref(filter_op_begin, filter_op_end); + fi.ff_offset = ff_offset; + if (is_null_expression(filter_val_begin, filter_val_end)) { + /* null */ + fi.val = string_ref(); + } else { + /* non-null */ + char *wp = filter_val_begin; + unescape_string(wp, filter_val_begin, filter_val_end); + fi.val = string_ref(filter_val_begin, wp - filter_val_begin); + } + ++filters_count; + } + if (filters_count > 0) { + if (filters_work.size() <= filters_count) { + filters_work.resize(filters_count + 1); + } + filters_work[filters_count].op = string_ref(); /* sentinel */ + args.filters = &filters_work[0]; + } else { + args.filters = 0; + } + if (start == finish) { + /* no modops */ + return dbctx->cmd_exec(conn, args); + } + /* has modops */ + char *const mod_op_begin = start; + read_token(start, finish); + char *const mod_op_end = start; + args.mod_op = string_ref(mod_op_begin, mod_op_end); + const size_t num_uvals = args.pst->get_ret_fields().size(); + string_ref *const uflds = DENA_ALLOCA_ALLOCATE(string_ref, num_uvals); + auto_alloca_free<string_ref> uflds_autofree(uflds); + for (size_t i = 0; i < num_uvals; ++i) { + skip_one(start, finish); + char *const f_begin = start; + read_token(start, finish); + char *const f_end = start; + if (is_null_expression(f_begin, f_end)) { + /* null */ + uflds[i] = string_ref(); + } else { + /* non-null */ + char *wp = f_begin; + unescape_string(wp, f_begin, f_end); + uflds[i] = string_ref(f_begin, wp - f_begin); + } + } + args.uvals = uflds; + return dbctx->cmd_exec(conn, args); +} + +void +hstcpsvr_worker::do_authorization(char *start, char *finish, + hstcpsvr_conn& conn) +{ + /* auth type */ + char *const authtype_begin = start; + read_token(start, finish); + char *const authtype_end = start; + const size_t authtype_len = authtype_end - authtype_begin; + skip_one(start, finish); + /* key */ + char *const key_begin = start; + read_token(start, finish); + char *const key_end = start; + const size_t key_len = key_end - key_begin; + authtype_end[0] = 0; + key_end[0] = 0; + char *wp = key_begin; + unescape_string(wp, key_begin, key_end); + if (authtype_len != 1 || authtype_begin[0] != '1') { + return conn.dbcb_resp_short(3, "authtype"); + } + if (cshared.plain_secret.size() == key_len && + memcmp(cshared.plain_secret.data(), key_begin, key_len) == 0) { + conn.authorized = true; + } else { + conn.authorized = false; + } + if (!conn.authorized) { + return conn.dbcb_resp_short(3, "unauth"); + } else { + return conn.dbcb_resp_short(0, ""); + } +} + +hstcpsvr_worker_ptr +hstcpsvr_worker_i::create(const hstcpsvr_worker_arg& arg) +{ + return hstcpsvr_worker_ptr(new hstcpsvr_worker(arg)); +} + +}; + |