diff options
Diffstat (limited to 'storage/spider/hs_client/hstcpcli.cpp')
-rw-r--r-- | storage/spider/hs_client/hstcpcli.cpp | 667 |
1 files changed, 667 insertions, 0 deletions
diff --git a/storage/spider/hs_client/hstcpcli.cpp b/storage/spider/hs_client/hstcpcli.cpp new file mode 100644 index 00000000..4c93b5a3 --- /dev/null +++ b/storage/spider/hs_client/hstcpcli.cpp @@ -0,0 +1,667 @@ + +// vim:sw=2:ai + +/* + * Copyright (C) 2010-2011 DeNA Co.,Ltd.. All rights reserved. + * Copyright (C) 2011-2017 Kentoku SHIBA + * See COPYRIGHT.txt for details. + */ + +#include <my_global.h> +#include "mysql_version.h" +#include "hs_compat.h" +#if MYSQL_VERSION_ID < 50500 +#include "mysql_priv.h" +#include <mysql/plugin.h> +#else +#include "sql_priv.h" +#include "probes_mysql.h" +#include "sql_class.h" +#endif + +#include "hstcpcli.hpp" +#include "auto_file.hpp" +#include "string_util.hpp" +#include "auto_addrinfo.hpp" +#include "escape.hpp" +#include "util.hpp" + +/* TODO */ +#if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL) +#define MSG_NOSIGNAL 0 +#endif + +#define DBG(x) + +namespace dena { + +hstresult::hstresult() +{ + SPD_INIT_DYNAMIC_ARRAY2(&flds, sizeof(string_ref), NULL, 16, 16, + MYF(MY_WME)); +} + +hstresult::~hstresult() +{ + delete_dynamic(&flds); +} + +struct hstcpcli : public hstcpcli_i, private noncopyable { + hstcpcli(const socket_args& args); + virtual ~hstcpcli(); + virtual void close(); + virtual int reconnect(); + virtual bool stable_point(); + virtual void request_buf_open_index(size_t pst_id, const char *dbn, + const char *tbl, const char *idx, const char *retflds, const char *filflds); + virtual void request_buf_auth(const char *secret, const char *typ); + virtual void request_buf_exec_generic(size_t pst_id, const string_ref& op, + const string_ref *kvs, size_t kvslen, uint32 limit, uint32 skip, + const string_ref& mod_op, const string_ref *mvs, size_t mvslen, + const hstcpcli_filter *fils, size_t filslen, int invalues_keypart, + const string_ref *invalues, size_t invalueslen); + virtual size_t request_buf_append(const char *start, const char *finish); + virtual void request_reset(); + virtual int request_send(); + virtual int response_recv(size_t& num_flds_r); + virtual int get_result(hstresult& result); + virtual const string_ref *get_next_row(); + virtual const string_ref *get_next_row_from_result(hstresult& result); + virtual size_t get_row_size(); + virtual size_t get_row_size_from_result(hstresult& result); + virtual void response_buf_remove(); + virtual int get_error_code(); + virtual String& get_error(); + virtual void clear_error(); + virtual int set_timeout(int send_timeout, int recv_timeout); + virtual size_t get_num_req_bufd() { return num_req_bufd; } + virtual size_t get_num_req_sent() { return num_req_sent; } + virtual size_t get_num_req_rcvd() { return num_req_rcvd; } + virtual size_t get_response_end_offset() { return response_end_offset; } + virtual const char *get_readbuf_begin() { return readbuf.begin(); } + virtual const char *get_readbuf_end() { return readbuf.end(); } + virtual const char *get_writebuf_begin() { return writebuf.begin(); } + virtual size_t get_writebuf_size() { return writebuf.size(); } + virtual void write_error_to_log(const char *func_name, const char *file_name, + ulong line_no); + private: + int read_more(); + int set_error(int code, const String& str); + int set_error(int code, const char *str); + private: + auto_file fd; + socket_args sargs; + string_buffer readbuf; + string_buffer writebuf; + size_t response_end_offset; /* incl newline */ + size_t cur_row_offset; + size_t cur_row_size; + size_t num_flds; + size_t num_req_bufd; /* buffered but not yet sent */ + size_t num_req_sent; /* sent but not yet received */ + size_t num_req_rcvd; /* received but not yet removed */ + int error_code; + String error_str; + DYNAMIC_ARRAY flds; + int errno_buf; +}; + +hstcpcli::hstcpcli(const socket_args& args) + : sargs(args), response_end_offset(0), cur_row_offset(0), cur_row_size(0), + num_flds(0), num_req_bufd(0), num_req_sent(0), num_req_rcvd(0), + error_code(0), errno_buf(0) +{ + String err; + SPD_INIT_DYNAMIC_ARRAY2(&flds, sizeof(string_ref), NULL, 16, 16, MYF(MY_WME)); + if (socket_connect(fd, sargs, err) != 0) { + set_error(-1, err); + } +} + +hstcpcli::~hstcpcli() +{ + delete_dynamic(&flds); +} + +void +hstcpcli::close() +{ + fd.close(); + readbuf.clear(); + writebuf.clear(); + response_end_offset = 0; + cur_row_offset = 0; + num_flds = 0; + num_req_bufd = 0; + num_req_sent = 0; + num_req_rcvd = 0; +} + +int +hstcpcli::reconnect() +{ + clear_error(); + close(); + String err; + if (socket_connect(fd, sargs, err) != 0) { + set_error(-1, err); + } + return error_code; +} + +int +hstcpcli::set_timeout(int send_timeout, int recv_timeout) +{ + String err; + sargs.send_timeout = send_timeout; + sargs.recv_timeout = recv_timeout; + if (socket_set_timeout(fd, sargs, err) != 0) { + set_error(-1, err); + } + return error_code; +} + +bool +hstcpcli::stable_point() +{ + /* returns true if cli can send a new request */ + return fd.get() >= 0 && num_req_bufd == 0 && num_req_sent == 0 && + num_req_rcvd == 0 && response_end_offset == 0; +} + +int +hstcpcli::get_error_code() +{ + return error_code; +} + +String& +hstcpcli::get_error() +{ + return error_str; +} + +int +hstcpcli::read_more() +{ + const size_t block_size = 4096; // FIXME + char *const wp = readbuf.make_space(block_size); + int rlen; + errno = 0; + while ((rlen = read(fd.get(), wp, block_size)) <= 0) { + errno_buf = errno; + if (rlen < 0) { + if (errno == EINTR || errno == EAGAIN) + { + errno = 0; + continue; + } + error_str = String("read: failed", &my_charset_bin); + } else { + error_str = String("read: eof", &my_charset_bin); + } + return rlen; + } + readbuf.space_wrote(rlen); + return rlen; +} + +void +hstcpcli::clear_error() +{ + DBG(fprintf(stderr, "CLEAR_ERROR: %d\n", error_code)); + error_code = 0; + error_str.length(0); +} + +int +hstcpcli::set_error(int code, const String& str) +{ + DBG(fprintf(stderr, "SET_ERROR: %d\n", code)); + error_code = code; + error_str = str; + return error_code; +} + +int +hstcpcli::set_error(int code, const char *str) +{ + uint32 str_len = strlen(str); + DBG(fprintf(stderr, "SET_ERROR: %d\n", code)); + error_code = code; + error_str.length(0); + if (error_str.reserve(str_len + 1)) + return 0; + error_str.q_append(str, str_len); + error_str.c_ptr_safe(); + return error_code; +} + +void +hstcpcli::request_buf_open_index(size_t pst_id, const char *dbn, + const char *tbl, const char *idx, const char *retflds, const char *filflds) +{ +/* + if (num_req_sent > 0 || num_req_rcvd > 0) { +*/ + if (num_req_rcvd > 0) { + close(); + set_error(-1, "request_buf_open_index: protocol out of sync"); + return; + } + const string_ref dbn_ref(dbn, strlen(dbn)); + const string_ref tbl_ref(tbl, strlen(tbl)); + const string_ref idx_ref(idx, strlen(idx)); + const string_ref rfs_ref(retflds, strlen(retflds)); + writebuf.append_literal("P\t"); + append_uint32(writebuf, pst_id); // FIXME size_t ? + writebuf.append_literal("\t"); + writebuf.append(dbn_ref.begin(), dbn_ref.end()); + writebuf.append_literal("\t"); + writebuf.append(tbl_ref.begin(), tbl_ref.end()); + writebuf.append_literal("\t"); + writebuf.append(idx_ref.begin(), idx_ref.end()); + writebuf.append_literal("\t"); + writebuf.append(rfs_ref.begin(), rfs_ref.end()); + if (filflds != 0) { + const string_ref fls_ref(filflds, strlen(filflds)); + writebuf.append_literal("\t"); + writebuf.append(fls_ref.begin(), fls_ref.end()); + } + writebuf.append_literal("\n"); + ++num_req_bufd; +} + +void +hstcpcli::request_buf_auth(const char *secret, const char *typ) +{ +/* + if (num_req_sent > 0 || num_req_rcvd > 0) { +*/ + if (num_req_rcvd > 0) { + close(); + set_error(-1, "request_buf_auth: protocol out of sync"); + return; + } + if (typ == 0) { + typ = "1"; + } + const string_ref typ_ref(typ, strlen(typ)); + const string_ref secret_ref(secret, strlen(secret)); + writebuf.append_literal("A\t"); + writebuf.append(typ_ref.begin(), typ_ref.end()); + writebuf.append_literal("\t"); + writebuf.append(secret_ref.begin(), secret_ref.end()); + writebuf.append_literal("\n"); + ++num_req_bufd; +} + +namespace { + +void +append_delim_value(string_buffer& buf, const char *start, const char *finish) +{ + if (start == 0) { + /* null */ + const char t[] = "\t\0"; + buf.append(t, t + 2); + } else { + /* non-null */ + buf.append_literal("\t"); + escape_string(buf, start, finish); + } +} + +}; + +void +hstcpcli::request_buf_exec_generic(size_t pst_id, const string_ref& op, + const string_ref *kvs, size_t kvslen, uint32 limit, uint32 skip, + const string_ref& mod_op, const string_ref *mvs, size_t mvslen, + const hstcpcli_filter *fils, size_t filslen, int invalues_keypart, + const string_ref *invalues, size_t invalueslen) +{ +/* + if (num_req_sent > 0 || num_req_rcvd > 0) { +*/ + if (num_req_rcvd > 0) { + close(); + set_error(-1, "request_buf_exec_generic: protocol out of sync"); + return; + } + append_uint32(writebuf, pst_id); // FIXME size_t ? + writebuf.append_literal("\t"); + writebuf.append(op.begin(), op.end()); + writebuf.append_literal("\t"); + append_uint32(writebuf, kvslen); // FIXME size_t ? + for (size_t i = 0; i < kvslen; ++i) { + const string_ref& kv = kvs[i]; + append_delim_value(writebuf, kv.begin(), kv.end()); + } + if (limit != 0 || skip != 0 || invalues_keypart >= 0 || + mod_op.size() != 0 || filslen != 0) { + /* has more option */ + writebuf.append_literal("\t"); + append_uint32(writebuf, limit); // FIXME size_t ? + if (skip != 0 || invalues_keypart >= 0 || + mod_op.size() != 0 || filslen != 0) { + writebuf.append_literal("\t"); + append_uint32(writebuf, skip); // FIXME size_t ? + } + if (invalues_keypart >= 0) { + writebuf.append_literal("\t@\t"); + append_uint32(writebuf, invalues_keypart); + writebuf.append_literal("\t"); + append_uint32(writebuf, invalueslen); + for (size_t i = 0; i < invalueslen; ++i) { + const string_ref& s = invalues[i]; + append_delim_value(writebuf, s.begin(), s.end()); + } + } + for (size_t i = 0; i < filslen; ++i) { + const hstcpcli_filter& f = fils[i]; + writebuf.append_literal("\t"); + writebuf.append(f.filter_type.begin(), f.filter_type.end()); + writebuf.append_literal("\t"); + writebuf.append(f.op.begin(), f.op.end()); + writebuf.append_literal("\t"); + append_uint32(writebuf, f.ff_offset); + append_delim_value(writebuf, f.val.begin(), f.val.end()); + } + if (mod_op.size() != 0) { + writebuf.append_literal("\t"); + writebuf.append(mod_op.begin(), mod_op.end()); + for (size_t i = 0; i < mvslen; ++i) { + const string_ref& mv = mvs[i]; + append_delim_value(writebuf, mv.begin(), mv.end()); + } + } + } + writebuf.append_literal("\n"); + ++num_req_bufd; +} + +size_t +hstcpcli::request_buf_append(const char *start, const char *finish) +{ +/* + if (num_req_sent > 0 || num_req_rcvd > 0) { +*/ + if (num_req_rcvd > 0) { + close(); + set_error(-1, "request_buf_append: protocol out of sync"); + return 0; + } + const char *nl = start; + size_t num_req = 0; + while ((nl = memchr_char(nl, '\n', finish - nl))) { + if (nl == finish) + break; + num_req++; + nl++; + } + num_req++; + writebuf.append(start, finish); + if (*(finish - 1) != '\n') + writebuf.append_literal("\n"); + num_req_bufd += num_req; + return num_req; +} + +void +hstcpcli::request_reset() +{ + if (num_req_bufd) { + writebuf.erase_front(writebuf.size()); + num_req_bufd = 0; + } +} + +int +hstcpcli::request_send() +{ + if (error_code < 0) { + return error_code; + } + clear_error(); + if (fd.get() < 0) { + close(); + return set_error(-1, "write: closed"); + } +/* + if (num_req_bufd == 0 || num_req_sent > 0 || num_req_rcvd > 0) { +*/ + if (num_req_bufd == 0 || num_req_rcvd > 0) { + close(); + return set_error(-1, "request_send: protocol out of sync"); + } + const size_t wrlen = writebuf.size(); + const ssize_t r = send(fd.get(), writebuf.begin(), wrlen, MSG_NOSIGNAL); + if (r <= 0) { + close(); + return set_error(-1, r < 0 ? "write: failed" : "write: eof"); + } + writebuf.erase_front(r); + if (static_cast<size_t>(r) != wrlen) { + close(); + return set_error(-1, "write: incomplete"); + } + num_req_sent += num_req_bufd; + num_req_bufd = 0; + DBG(fprintf(stderr, "REQSEND 0\n")); + return 0; +} + +int +hstcpcli::response_recv(size_t& num_flds_r) +{ + if (error_code < 0) { + return error_code; + } + clear_error(); + if (num_req_bufd > 0 || num_req_sent == 0 || num_req_rcvd > 0 || + response_end_offset != 0) { + close(); + return set_error(-1, "response_recv: protocol out of sync"); + } + cur_row_offset = 0; + num_flds_r = num_flds = 0; + if (fd.get() < 0) { + return set_error(-1, "read: closed"); + } + size_t offset = 0; + while (true) { + const char *const lbegin = readbuf.begin() + offset; + const char *const lend = readbuf.end(); + if (lbegin < lend) + { + const char *const nl = memchr_char(lbegin, '\n', lend - lbegin); + if (nl != 0) { + offset += (nl + 1) - lbegin; + break; + } + offset += lend - lbegin; + } + if (read_more() <= 0) { + close(); + error_code = -1; + return error_code; + } + } + response_end_offset = offset; + --num_req_sent; + ++num_req_rcvd; + char *start = readbuf.begin(); + char *const finish = start + response_end_offset - 1; + const size_t resp_code = read_ui32(start, finish); + skip_one(start, finish); + num_flds_r = num_flds = read_ui32(start, finish); + if (resp_code != 0) { + skip_one(start, finish); + char *const err_begin = start; + read_token(start, finish); + char *const err_end = start; + String e = String(err_begin, (uint32)(err_end - err_begin), &my_charset_bin); + if (!e.length()) { + e = String("unknown_error", &my_charset_bin); + } + return set_error(resp_code, e); + } + cur_row_size = 0; + cur_row_offset = start - readbuf.begin(); + DBG(fprintf(stderr, "[%s] ro=%zu eol=%zu\n", + String(readbuf.begin(), readbuf.begin() + response_end_offset) + .c_str(), + cur_row_offset, response_end_offset)); + DBG(fprintf(stderr, "RES 0\n")); + if (flds.max_element < num_flds) + { + if (allocate_dynamic(&flds, num_flds)) + return set_error(-1, "out of memory"); + } + flds.elements = num_flds; + return 0; +} + +int +hstcpcli::get_result(hstresult& result) +{ +/* + readbuf.swap(result.readbuf); +*/ + char *const wp = result.readbuf.make_space(response_end_offset); + memcpy(wp, readbuf.begin(), response_end_offset); + result.readbuf.space_wrote(response_end_offset); + result.response_end_offset = response_end_offset; + result.num_flds = num_flds; + result.cur_row_size = cur_row_size; + result.cur_row_offset = cur_row_offset; + if (result.flds.max_element < num_flds) + { + if (allocate_dynamic(&result.flds, num_flds)) + return set_error(-1, "out of memory"); + } + result.flds.elements = num_flds; + return 0; +} + +const string_ref * +hstcpcli::get_next_row() +{ + if (num_flds == 0 || flds.elements < num_flds) { + DBG(fprintf(stderr, "GNR NF 0\n")); + return 0; + } + char *start = readbuf.begin() + cur_row_offset; + char *const finish = readbuf.begin() + response_end_offset - 1; + if (start >= finish) { /* start[0] == nl */ + DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish)); + return 0; + } + for (size_t i = 0; i < num_flds; ++i) { + skip_one(start, finish); + char *const fld_begin = start; + read_token(start, finish); + char *const fld_end = start; + char *wp = fld_begin; + if (is_null_expression(fld_begin, fld_end)) { + /* null */ + ((string_ref *) flds.buffer)[i] = string_ref(); + } else { + unescape_string(wp, fld_begin, fld_end); /* in-place */ + ((string_ref *) flds.buffer)[i] = string_ref(fld_begin, wp); + } + } + cur_row_size = start - (readbuf.begin() + cur_row_offset); + cur_row_offset = start - readbuf.begin(); + return (string_ref *) flds.buffer; +} + +const string_ref * +hstcpcli::get_next_row_from_result(hstresult& result) +{ + if (result.num_flds == 0 || result.flds.elements < result.num_flds) { + DBG(fprintf(stderr, "GNR NF 0\n")); + return 0; + } + char *start = result.readbuf.begin() + result.cur_row_offset; + char *const finish = result.readbuf.begin() + result.response_end_offset - 1; + if (start >= finish) { /* start[0] == nl */ + DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish)); + return 0; + } + for (size_t i = 0; i < result.num_flds; ++i) { + skip_one(start, finish); + char *const fld_begin = start; + read_token(start, finish); + char *const fld_end = start; + char *wp = fld_begin; + if (is_null_expression(fld_begin, fld_end)) { + /* null */ + ((string_ref *) result.flds.buffer)[i] = string_ref(); + } else { + unescape_string(wp, fld_begin, fld_end); /* in-place */ + ((string_ref *) result.flds.buffer)[i] = string_ref(fld_begin, wp); + } + } + result.cur_row_size = + start - (result.readbuf.begin() + result.cur_row_offset); + result.cur_row_offset = start - result.readbuf.begin(); + return (string_ref *) result.flds.buffer; +} + +size_t +hstcpcli::get_row_size() +{ + return cur_row_size; +} + +size_t +hstcpcli::get_row_size_from_result(hstresult& result) +{ + return result.cur_row_size; +} + +void +hstcpcli::response_buf_remove() +{ + if (response_end_offset == 0) { + close(); + set_error(-1, "response_buf_remove: protocol out of sync"); + return; + } + readbuf.erase_front(response_end_offset); + response_end_offset = 0; + --num_req_rcvd; + cur_row_offset = 0; + num_flds = 0; +} + +void +hstcpcli::write_error_to_log( + const char *func_name, + const char *file_name, + ulong line_no +) { + if (errno_buf) { + time_t cur_time = (time_t) time((time_t*) 0); + struct tm lt; + struct tm *l_time = localtime_r(&cur_time, <); + fprintf(stderr, + "%04d%02d%02d %02d:%02d:%02d [ERROR] hstcpcli: [%d][%s]" + " [%s][%s][%lu] errno=%d\n", + l_time->tm_year + 1900, l_time->tm_mon + 1, l_time->tm_mday, + l_time->tm_hour, l_time->tm_min, l_time->tm_sec, + error_code, error_str.c_ptr_safe(), + func_name, file_name, line_no, errno_buf); + } +} + +hstcpcli_ptr +hstcpcli_i::create(const socket_args& args) +{ + return hstcpcli_ptr(new hstcpcli(args)); +} + +}; + |