// vim:sw=2:ai /* * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. * See COPYRIGHT.txt for details. */ #include #include #include #include #include #include #include #include #if __linux__ #include #endif #ifdef HAVE_ALLOCA_H #include #endif #include "hstcpsvr_worker.hpp" #include "string_buffer.hpp" #include "auto_ptrcontainer.hpp" #include "string_util.hpp" #include "escape.hpp" #define DBG_FD(x) #define DBG_TR(x) #define DBG_EP(x) #define DBG_MULTI(x) /* TODO */ #if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL) #define MSG_NOSIGNAL 0 #endif namespace dena { struct dbconnstate { string_buffer readbuf; string_buffer writebuf; std::vector prep_stmts; size_t resp_begin_pos; size_t find_nl_pos; void reset() { readbuf.clear(); writebuf.clear(); prep_stmts.clear(); resp_begin_pos = 0; find_nl_pos = 0; } dbconnstate() : resp_begin_pos(0), find_nl_pos(0) { } }; struct hstcpsvr_conn; typedef auto_ptrcontainer< std::list > hstcpsvr_conns_type; struct hstcpsvr_conn : public dbcallback_i { public: auto_file fd; sockaddr_storage addr; size_socket addr_len; dbconnstate cstate; std::string err; size_t readsize; bool nonblocking; bool read_finished; bool write_finished; time_t nb_last_io; hstcpsvr_conns_type::iterator conns_iter; bool authorized; public: bool closed() const; bool ok_to_close() const; void reset(); int accept(const hstcpsvr_shared_c& cshared); bool write_more(bool *more_r = 0); bool read_more(bool *more_r = 0); public: virtual void dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v); virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const; virtual void dbcb_resp_short(uint32_t code, const char *msg); virtual void dbcb_resp_short_num(uint32_t code, uint32_t value); virtual void dbcb_resp_short_num64(uint32_t code, uint64_t value); virtual void dbcb_resp_begin(size_t num_flds); virtual void dbcb_resp_entry(const char *fld, size_t fldlen); virtual void dbcb_resp_end(); virtual void dbcb_resp_cancel(); public: hstcpsvr_conn() : addr_len(sizeof(addr)), readsize(4096), nonblocking(false), read_finished(false), write_finished(false), nb_last_io(0), authorized(false) { } }; bool hstcpsvr_conn::closed() const { return fd.get() < 0; } bool hstcpsvr_conn::ok_to_close() const { return write_finished || (read_finished && cstate.writebuf.size() == 0); } void hstcpsvr_conn::reset() { addr = sockaddr_storage(); addr_len = sizeof(addr); cstate.reset(); fd.reset(); read_finished = false; write_finished = false; } int hstcpsvr_conn::accept(const hstcpsvr_shared_c& cshared) { reset(); return socket_accept(cshared.listen_fd.get(), fd, cshared.sockargs, addr, addr_len, err); } bool hstcpsvr_conn::write_more(bool *more_r) { if (write_finished || cstate.writebuf.size() == 0) { return false; } const size_t wlen = cstate.writebuf.size(); ssize_t len = send(fd.get(), cstate.writebuf.begin(), wlen, MSG_NOSIGNAL); if (len <= 0) { if (len == 0 || !nonblocking || errno != EWOULDBLOCK) { cstate.writebuf.clear(); write_finished = true; } return false; } cstate.writebuf.erase_front(len); /* FIXME: reallocate memory if too large */ if (more_r) { *more_r = (static_cast(len) == wlen); } return true; } bool hstcpsvr_conn::read_more(bool *more_r) { if (read_finished) { return false; } const size_t block_size = readsize > 4096 ? readsize : 4096; char *wp = cstate.readbuf.make_space(block_size); const ssize_t len = read(fd.get(), wp, block_size); if (len <= 0) { if (len == 0 || !nonblocking || errno != EWOULDBLOCK) { read_finished = true; } return false; } cstate.readbuf.space_wrote(len); if (more_r) { *more_r = (static_cast(len) == block_size); } return true; } void hstcpsvr_conn::dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v) { if (cstate.prep_stmts.size() <= pst_id) { cstate.prep_stmts.resize(pst_id + 1); } cstate.prep_stmts[pst_id] = v; } const prep_stmt * hstcpsvr_conn::dbcb_get_prep_stmt(size_t pst_id) const { if (cstate.prep_stmts.size() <= pst_id) { return 0; } return &cstate.prep_stmts[pst_id]; } void hstcpsvr_conn::dbcb_resp_short(uint32_t code, const char *msg) { write_ui32(cstate.writebuf, code); const size_t msglen = strlen(msg); if (msglen != 0) { cstate.writebuf.append_literal("\t1\t"); cstate.writebuf.append(msg, msg + msglen); } else { cstate.writebuf.append_literal("\t1"); } cstate.writebuf.append_literal("\n"); } void hstcpsvr_conn::dbcb_resp_short_num(uint32_t code, uint32_t value) { write_ui32(cstate.writebuf, code); cstate.writebuf.append_literal("\t1\t"); write_ui32(cstate.writebuf, value); cstate.writebuf.append_literal("\n"); } void hstcpsvr_conn::dbcb_resp_short_num64(uint32_t code, uint64_t value) { write_ui32(cstate.writebuf, code); cstate.writebuf.append_literal("\t1\t"); write_ui64(cstate.writebuf, value); cstate.writebuf.append_literal("\n"); } void hstcpsvr_conn::dbcb_resp_begin(size_t num_flds) { cstate.resp_begin_pos = cstate.writebuf.size(); cstate.writebuf.append_literal("0\t"); write_ui32(cstate.writebuf, num_flds); } void hstcpsvr_conn::dbcb_resp_entry(const char *fld, size_t fldlen) { if (fld != 0) { cstate.writebuf.append_literal("\t"); escape_string(cstate.writebuf, fld, fld + fldlen); } else { static const char t[] = "\t\0"; cstate.writebuf.append(t, t + 2); } } void hstcpsvr_conn::dbcb_resp_end() { cstate.writebuf.append_literal("\n"); cstate.resp_begin_pos = 0; } void hstcpsvr_conn::dbcb_resp_cancel() { cstate.writebuf.resize(cstate.resp_begin_pos); cstate.resp_begin_pos = 0; } struct hstcpsvr_worker : public hstcpsvr_worker_i, private noncopyable { hstcpsvr_worker(const hstcpsvr_worker_arg& arg); virtual void run(); private: const hstcpsvr_shared_c& cshared; volatile hstcpsvr_shared_v& vshared; long worker_id; dbcontext_ptr dbctx; hstcpsvr_conns_type conns; /* conns refs dbctx */ time_t last_check_time; std::vector pfds; #ifdef __linux__ std::vector events_vec; auto_file epoll_fd; #endif bool accept_enabled; int accept_balance; std::vector invalues_work; std::vector filters_work; private: int run_one_nb(); int run_one_ep(); void execute_lines(hstcpsvr_conn& conn); void execute_line(char *start, char *finish, hstcpsvr_conn& conn); void do_open_index(char *start, char *finish, hstcpsvr_conn& conn); void do_exec_on_index(char *cmd_begin, char *cmd_end, char *start, char *finish, hstcpsvr_conn& conn); void do_authorization(char *start, char *finish, hstcpsvr_conn& conn); }; hstcpsvr_worker::hstcpsvr_worker(const hstcpsvr_worker_arg& arg) : cshared(*arg.cshared), vshared(*arg.vshared), worker_id(arg.worker_id), dbctx(cshared.dbptr->create_context(cshared.for_write_flag)), last_check_time(time(0)), accept_enabled(true), accept_balance(0) { #ifdef __linux__ if (cshared.sockargs.use_epoll) { epoll_fd.reset(epoll_create(10)); if (epoll_fd.get() < 0) { fatal_abort("epoll_create"); } epoll_event ev; memset(&ev, 0, sizeof(ev)); ev.events = EPOLLIN; ev.data.ptr = 0; if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev) != 0) { fatal_abort("epoll_ctl EPOLL_CTL_ADD"); } events_vec.resize(10240); } #endif accept_balance = cshared.conf.get_int("accept_balance", 0); } namespace { struct thr_init { thr_init(const dbcontext_ptr& dc, volatile int& shutdown_flag) : dbctx(dc) { dbctx->init_thread(this, shutdown_flag); } ~thr_init() { dbctx->term_thread(); } const dbcontext_ptr& dbctx; }; }; // namespace void hstcpsvr_worker::run() { thr_init initobj(dbctx, vshared.shutdown); #ifdef __linux__ if (cshared.sockargs.use_epoll) { while (!vshared.shutdown && dbctx->check_alive()) { run_one_ep(); } } else if (cshared.sockargs.nonblocking) { while (!vshared.shutdown && dbctx->check_alive()) { run_one_nb(); } } else { /* UNUSED */ fatal_abort("run_one"); } #else while (!vshared.shutdown && dbctx->check_alive()) { run_one_nb(); } #endif } int hstcpsvr_worker::run_one_nb() { size_t nfds = 0; /* CLIENT SOCKETS */ for (hstcpsvr_conns_type::const_iterator i = conns.begin(); i != conns.end(); ++i) { if (pfds.size() <= nfds) { pfds.resize(nfds + 1); } pollfd& pfd = pfds[nfds++]; pfd.fd = (*i)->fd.get(); short ev = 0; if ((*i)->cstate.writebuf.size() != 0) { ev = POLLOUT; } else { ev = POLLIN; } pfd.events = pfd.revents = ev; } /* LISTENER */ { const size_t cpt = cshared.nb_conn_per_thread; const short ev = (cpt > nfds) ? POLLIN : 0; if (pfds.size() <= nfds) { pfds.resize(nfds + 1); } pollfd& pfd = pfds[nfds++]; pfd.fd = cshared.listen_fd.get(); pfd.events = pfd.revents = ev; } /* POLL */ const int npollev = poll(&pfds[0], nfds, 1 * 1000); dbctx->set_statistics(conns.size(), npollev); const time_t now = time(0); size_t j = 0; const short mask_in = ~POLLOUT; const short mask_out = POLLOUT | POLLERR | POLLHUP | POLLNVAL; /* READ */ for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end(); ++i, ++j) { pollfd& pfd = pfds[j]; if ((pfd.revents & mask_in) == 0) { continue; } hstcpsvr_conn& conn = **i; if (conn.read_more()) { if (conn.cstate.readbuf.size() > 0) { const char ch = conn.cstate.readbuf.begin()[0]; if (ch == 'Q') { vshared.shutdown = 1; } else if (ch == '/') { conn.cstate.readbuf.clear(); conn.cstate.find_nl_pos = 0; conn.cstate.writebuf.clear(); conn.read_finished = true; conn.write_finished = true; } } conn.nb_last_io = now; } } /* EXECUTE */ j = 0; for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end(); ++i, ++j) { pollfd& pfd = pfds[j]; if ((pfd.revents & mask_in) == 0 || (*i)->cstate.readbuf.size() == 0) { continue; } execute_lines(**i); } /* COMMIT */ dbctx->unlock_tables_if(); const bool commit_error = dbctx->get_commit_error(); dbctx->clear_error(); /* WRITE/CLOSE */ j = 0; for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end(); ++j) { pollfd& pfd = pfds[j]; hstcpsvr_conn& conn = **i; hstcpsvr_conns_type::iterator icur = i; ++i; if (commit_error) { conn.reset(); continue; } if ((pfd.revents & (mask_out | mask_in)) != 0) { if (conn.write_more()) { conn.nb_last_io = now; } } if (cshared.sockargs.timeout != 0 && conn.nb_last_io + cshared.sockargs.timeout < now) { conn.reset(); } if (conn.closed() || conn.ok_to_close()) { conns.erase_ptr(icur); } } /* ACCEPT */ { pollfd& pfd = pfds[nfds - 1]; if ((pfd.revents & mask_in) != 0) { std::auto_ptr c(new hstcpsvr_conn()); c->nonblocking = true; c->readsize = cshared.readsize; c->accept(cshared); if (c->fd.get() >= 0) { if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) { fatal_abort("F_SETFL O_NONBLOCK"); } c->nb_last_io = now; conns.push_back_ptr(c); } else { /* errno == 11 (EAGAIN) is not a fatal error. */ DENA_VERBOSE(100, fprintf(stderr, "accept failed: errno=%d (not fatal)\n", errno)); } } } DENA_VERBOSE(30, fprintf(stderr, "nb: %p nfds=%zu cns=%zu\n", this, nfds, conns.size())); if (conns.empty()) { dbctx->close_tables_if(); } dbctx->set_statistics(conns.size(), 0); return 0; } #ifdef __linux__ int hstcpsvr_worker::run_one_ep() { epoll_event *const events = &events_vec[0]; const size_t num_events = events_vec.size(); const time_t now = time(0); size_t in_count = 0, out_count = 0, accept_count = 0; int nfds = epoll_wait(epoll_fd.get(), events, num_events, 1000); /* READ/ACCEPT */ dbctx->set_statistics(conns.size(), nfds); for (int i = 0; i < nfds; ++i) { epoll_event& ev = events[i]; if ((ev.events & EPOLLIN) == 0) { continue; } hstcpsvr_conn *const conn = static_cast(ev.data.ptr); if (conn == 0) { /* listener */ ++accept_count; DBG_EP(fprintf(stderr, "IN listener\n")); std::auto_ptr c(new hstcpsvr_conn()); c->nonblocking = true; c->readsize = cshared.readsize; c->accept(cshared); if (c->fd.get() >= 0) { if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) { fatal_abort("F_SETFL O_NONBLOCK"); } epoll_event cev; memset(&cev, 0, sizeof(cev)); cev.events = EPOLLIN | EPOLLOUT | EPOLLET; cev.data.ptr = c.get(); c->nb_last_io = now; const int fd = c->fd.get(); conns.push_back_ptr(c); conns.back()->conns_iter = --conns.end(); if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, fd, &cev) != 0) { fatal_abort("epoll_ctl EPOLL_CTL_ADD"); } } else { DENA_VERBOSE(100, fprintf(stderr, "accept failed: errno=%d (not fatal)\n", errno)); } } else { /* client connection */ ++in_count; DBG_EP(fprintf(stderr, "IN client\n")); bool more_data = false; while (conn->read_more(&more_data)) { DBG_EP(fprintf(stderr, "IN client read_more\n")); conn->nb_last_io = now; if (!more_data) { break; } } } } /* EXECUTE */ for (int i = 0; i < nfds; ++i) { epoll_event& ev = events[i]; hstcpsvr_conn *const conn = static_cast(ev.data.ptr); if ((ev.events & EPOLLIN) == 0 || conn == 0 || conn->cstate.readbuf.size() == 0) { continue; } const char ch = conn->cstate.readbuf.begin()[0]; if (ch == 'Q') { vshared.shutdown = 1; } else if (ch == '/') { conn->cstate.readbuf.clear(); conn->cstate.find_nl_pos = 0; conn->cstate.writebuf.clear(); conn->read_finished = true; conn->write_finished = true; } else { execute_lines(*conn); } } /* COMMIT */ dbctx->unlock_tables_if(); const bool commit_error = dbctx->get_commit_error(); dbctx->clear_error(); /* WRITE */ for (int i = 0; i < nfds; ++i) { epoll_event& ev = events[i]; hstcpsvr_conn *const conn = static_cast(ev.data.ptr); if (commit_error && conn != 0) { conn->reset(); continue; } if ((ev.events & EPOLLOUT) == 0) { continue; } ++out_count; if (conn == 0) { /* listener */ DBG_EP(fprintf(stderr, "OUT listener\n")); } else { /* client connection */ DBG_EP(fprintf(stderr, "OUT client\n")); bool more_data = false; while (conn->write_more(&more_data)) { DBG_EP(fprintf(stderr, "OUT client write_more\n")); conn->nb_last_io = now; if (!more_data) { break; } } } } /* CLOSE */ for (int i = 0; i < nfds; ++i) { epoll_event& ev = events[i]; hstcpsvr_conn *const conn = static_cast(ev.data.ptr); if (conn != 0 && conn->ok_to_close()) { DBG_EP(fprintf(stderr, "CLOSE close\n")); conns.erase_ptr(conn->conns_iter); } } /* TIMEOUT & cleanup */ if (last_check_time + 10 < now) { for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end(); ) { hstcpsvr_conns_type::iterator icur = i; ++i; if (cshared.sockargs.timeout != 0 && (*icur)->nb_last_io + cshared.sockargs.timeout < now) { conns.erase_ptr((*icur)->conns_iter); } } last_check_time = now; DENA_VERBOSE(20, fprintf(stderr, "ep: %p nfds=%d cns=%zu\n", this, nfds, conns.size())); } DENA_VERBOSE(30, fprintf(stderr, "%p in=%zu out=%zu ac=%zu, cns=%zu\n", this, in_count, out_count, accept_count, conns.size())); if (conns.empty()) { dbctx->close_tables_if(); } /* STATISTICS */ const size_t num_conns = conns.size(); dbctx->set_statistics(num_conns, 0); /* ENABLE/DISABLE ACCEPT */ if (accept_balance != 0) { cshared.thread_num_conns[worker_id] = num_conns; size_t total_num_conns = 0; for (long i = 0; i < cshared.num_threads; ++i) { total_num_conns += cshared.thread_num_conns[i]; } bool e_acc = false; if (num_conns < 10 || total_num_conns * 2 > num_conns * cshared.num_threads) { e_acc = true; } epoll_event ev; memset(&ev, 0, sizeof(ev)); ev.events = EPOLLIN; ev.data.ptr = 0; if (e_acc == accept_enabled) { } else if (e_acc) { if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev) != 0) { fatal_abort("epoll_ctl EPOLL_CTL_ADD"); } } else { if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_DEL, cshared.listen_fd.get(), &ev) != 0) { fatal_abort("epoll_ctl EPOLL_CTL_ADD"); } } accept_enabled = e_acc; } return 0; } #endif void hstcpsvr_worker::execute_lines(hstcpsvr_conn& conn) { DBG_MULTI(int cnt = 0); dbconnstate& cstate = conn.cstate; char *buf_end = cstate.readbuf.end(); char *line_begin = cstate.readbuf.begin(); char *find_pos = line_begin + cstate.find_nl_pos; while (true) { char *const nl = memchr_char(find_pos, '\n', buf_end - find_pos); if (nl == 0) { break; } char *const lf = (line_begin != nl && nl[-1] == '\r') ? nl - 1 : nl; DBG_MULTI(cnt++); execute_line(line_begin, lf, conn); find_pos = line_begin = nl + 1; } cstate.readbuf.erase_front(line_begin - cstate.readbuf.begin()); cstate.find_nl_pos = cstate.readbuf.size(); DBG_MULTI(fprintf(stderr, "cnt=%d\n", cnt)); } void hstcpsvr_worker::execute_line(char *start, char *finish, hstcpsvr_conn& conn) { /* safe to modify, safe to dereference 'finish' */ char *const cmd_begin = start; read_token(start, finish); char *const cmd_end = start; skip_one(start, finish); if (cmd_begin == cmd_end) { return conn.dbcb_resp_short(2, "cmd"); } if (cmd_begin + 1 == cmd_end) { if (cmd_begin[0] == 'P') { if (cshared.require_auth && !conn.authorized) { return conn.dbcb_resp_short(3, "unauth"); } return do_open_index(start, finish, conn); } if (cmd_begin[0] == 'A') { return do_authorization(start, finish, conn); } } if (cmd_begin[0] >= '0' && cmd_begin[0] <= '9') { if (cshared.require_auth && !conn.authorized) { return conn.dbcb_resp_short(3, "unauth"); } return do_exec_on_index(cmd_begin, cmd_end, start, finish, conn); } return conn.dbcb_resp_short(2, "cmd"); } void hstcpsvr_worker::do_open_index(char *start, char *finish, hstcpsvr_conn& conn) { const size_t pst_id = read_ui32(start, finish); skip_one(start, finish); /* dbname */ char *const dbname_begin = start; read_token(start, finish); char *const dbname_end = start; skip_one(start, finish); /* tblname */ char *const tblname_begin = start; read_token(start, finish); char *const tblname_end = start; skip_one(start, finish); /* idxname */ char *const idxname_begin = start; read_token(start, finish); char *const idxname_end = start; skip_one(start, finish); /* retfields */ char *const retflds_begin = start; read_token(start, finish); char *const retflds_end = start; skip_one(start, finish); /* filfields */ char *const filflds_begin = start; read_token(start, finish); char *const filflds_end = start; dbname_end[0] = 0; tblname_end[0] = 0; idxname_end[0] = 0; retflds_end[0] = 0; filflds_end[0] = 0; cmd_open_args args; args.pst_id = pst_id; args.dbn = dbname_begin; args.tbl = tblname_begin; args.idx = idxname_begin; args.retflds = retflds_begin; args.filflds = filflds_begin; return dbctx->cmd_open(conn, args); } void hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start, char *finish, hstcpsvr_conn& conn) { cmd_exec_args args; const size_t pst_id = read_ui32(cmd_begin, cmd_end); if (pst_id >= conn.cstate.prep_stmts.size()) { return conn.dbcb_resp_short(2, "stmtnum"); } args.pst = &conn.cstate.prep_stmts[pst_id]; char *const op_begin = start; read_token(start, finish); char *const op_end = start; args.op = string_ref(op_begin, op_end); skip_one(start, finish); const uint32_t fldnum = read_ui32(start, finish); string_ref *const flds = DENA_ALLOCA_ALLOCATE(string_ref, fldnum); auto_alloca_free flds_autofree(flds); args.kvals = flds; args.kvalslen = fldnum; for (size_t i = 0; i < fldnum; ++i) { skip_one(start, finish); char *const f_begin = start; read_token(start, finish); char *const f_end = start; if (is_null_expression(f_begin, f_end)) { /* null */ flds[i] = string_ref(); } else { /* non-null */ char *wp = f_begin; unescape_string(wp, f_begin, f_end); flds[i] = string_ref(f_begin, wp - f_begin); } } skip_one(start, finish); args.limit = read_ui32(start, finish); skip_one(start, finish); args.skip = read_ui32(start, finish); if (start == finish) { /* simple query */ return dbctx->cmd_exec(conn, args); } /* has more options */ skip_one(start, finish); /* in-clause */ if (start[0] == '@') { read_token(start, finish); /* '@' */ skip_one(start, finish); args.invalues_keypart = read_ui32(start, finish); skip_one(start, finish); args.invalueslen = read_ui32(start, finish); if (args.invalueslen <= 0) { return conn.dbcb_resp_short(2, "invalueslen"); } if (invalues_work.size() < args.invalueslen) { invalues_work.resize(args.invalueslen); } args.invalues = &invalues_work[0]; for (uint32_t i = 0; i < args.invalueslen; ++i) { skip_one(start, finish); char *const invalue_begin = start; read_token(start, finish); char *const invalue_end = start; char *wp = invalue_begin; unescape_string(wp, invalue_begin, invalue_end); invalues_work[i] = string_ref(invalue_begin, wp - invalue_begin); } skip_one(start, finish); } if (start == finish) { /* no more options */ return dbctx->cmd_exec(conn, args); } /* filters */ size_t filters_count = 0; while (start != finish && (start[0] == 'W' || start[0] == 'F')) { char *const filter_type_begin = start; read_token(start, finish); char *const filter_type_end = start; skip_one(start, finish); char *const filter_op_begin = start; read_token(start, finish); char *const filter_op_end = start; skip_one(start, finish); const uint32_t ff_offset = read_ui32(start, finish); skip_one(start, finish); char *const filter_val_begin = start; read_token(start, finish); char *const filter_val_end = start; skip_one(start, finish); if (filters_work.size() <= filters_count) { filters_work.resize(filters_count + 1); } record_filter& fi = filters_work[filters_count]; if (filter_type_end != filter_type_begin + 1) { return conn.dbcb_resp_short(2, "filtertype"); } fi.filter_type = (filter_type_begin[0] == 'W') ? record_filter_type_break : record_filter_type_skip; const uint32_t num_filflds = args.pst->get_filter_fields().size(); if (ff_offset >= num_filflds) { return conn.dbcb_resp_short(2, "filterfld"); } fi.op = string_ref(filter_op_begin, filter_op_end); fi.ff_offset = ff_offset; if (is_null_expression(filter_val_begin, filter_val_end)) { /* null */ fi.val = string_ref(); } else { /* non-null */ char *wp = filter_val_begin; unescape_string(wp, filter_val_begin, filter_val_end); fi.val = string_ref(filter_val_begin, wp - filter_val_begin); } ++filters_count; } if (filters_count > 0) { if (filters_work.size() <= filters_count) { filters_work.resize(filters_count + 1); } filters_work[filters_count].op = string_ref(); /* sentinel */ args.filters = &filters_work[0]; } else { args.filters = 0; } if (start == finish) { /* no modops */ return dbctx->cmd_exec(conn, args); } /* has modops */ char *const mod_op_begin = start; read_token(start, finish); char *const mod_op_end = start; args.mod_op = string_ref(mod_op_begin, mod_op_end); const size_t num_uvals = args.pst->get_ret_fields().size(); string_ref *const uflds = DENA_ALLOCA_ALLOCATE(string_ref, num_uvals); auto_alloca_free uflds_autofree(uflds); for (size_t i = 0; i < num_uvals; ++i) { skip_one(start, finish); char *const f_begin = start; read_token(start, finish); char *const f_end = start; if (is_null_expression(f_begin, f_end)) { /* null */ uflds[i] = string_ref(); } else { /* non-null */ char *wp = f_begin; unescape_string(wp, f_begin, f_end); uflds[i] = string_ref(f_begin, wp - f_begin); } } args.uvals = uflds; return dbctx->cmd_exec(conn, args); } void hstcpsvr_worker::do_authorization(char *start, char *finish, hstcpsvr_conn& conn) { /* auth type */ char *const authtype_begin = start; read_token(start, finish); char *const authtype_end = start; const size_t authtype_len = authtype_end - authtype_begin; skip_one(start, finish); /* key */ char *const key_begin = start; read_token(start, finish); char *const key_end = start; const size_t key_len = key_end - key_begin; authtype_end[0] = 0; key_end[0] = 0; char *wp = key_begin; unescape_string(wp, key_begin, key_end); if (authtype_len != 1 || authtype_begin[0] != '1') { return conn.dbcb_resp_short(3, "authtype"); } if (cshared.plain_secret.size() == key_len && memcmp(cshared.plain_secret.data(), key_begin, key_len) == 0) { conn.authorized = true; } else { conn.authorized = false; } if (!conn.authorized) { return conn.dbcb_resp_short(3, "unauth"); } else { return conn.dbcb_resp_short(0, ""); } } hstcpsvr_worker_ptr hstcpsvr_worker_i::create(const hstcpsvr_worker_arg& arg) { return hstcpsvr_worker_ptr(new hstcpsvr_worker(arg)); } };