summaryrefslogtreecommitdiffstats
path: root/storage/spider/hs_client/hstcpcli.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/spider/hs_client/hstcpcli.cpp')
-rw-r--r--storage/spider/hs_client/hstcpcli.cpp667
1 files changed, 667 insertions, 0 deletions
diff --git a/storage/spider/hs_client/hstcpcli.cpp b/storage/spider/hs_client/hstcpcli.cpp
new file mode 100644
index 00000000..4c93b5a3
--- /dev/null
+++ b/storage/spider/hs_client/hstcpcli.cpp
@@ -0,0 +1,667 @@
+
+// vim:sw=2:ai
+
+/*
+ * Copyright (C) 2010-2011 DeNA Co.,Ltd.. All rights reserved.
+ * Copyright (C) 2011-2017 Kentoku SHIBA
+ * See COPYRIGHT.txt for details.
+ */
+
+#include <my_global.h>
+#include "mysql_version.h"
+#include "hs_compat.h"
+#if MYSQL_VERSION_ID < 50500
+#include "mysql_priv.h"
+#include <mysql/plugin.h>
+#else
+#include "sql_priv.h"
+#include "probes_mysql.h"
+#include "sql_class.h"
+#endif
+
+#include "hstcpcli.hpp"
+#include "auto_file.hpp"
+#include "string_util.hpp"
+#include "auto_addrinfo.hpp"
+#include "escape.hpp"
+#include "util.hpp"
+
+/* TODO */
+#if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
+#define MSG_NOSIGNAL 0
+#endif
+
+#define DBG(x)
+
+namespace dena {
+
+hstresult::hstresult()
+{
+ SPD_INIT_DYNAMIC_ARRAY2(&flds, sizeof(string_ref), NULL, 16, 16,
+ MYF(MY_WME));
+}
+
+hstresult::~hstresult()
+{
+ delete_dynamic(&flds);
+}
+
+struct hstcpcli : public hstcpcli_i, private noncopyable {
+ hstcpcli(const socket_args& args);
+ virtual ~hstcpcli();
+ virtual void close();
+ virtual int reconnect();
+ virtual bool stable_point();
+ virtual void request_buf_open_index(size_t pst_id, const char *dbn,
+ const char *tbl, const char *idx, const char *retflds, const char *filflds);
+ virtual void request_buf_auth(const char *secret, const char *typ);
+ virtual void request_buf_exec_generic(size_t pst_id, const string_ref& op,
+ const string_ref *kvs, size_t kvslen, uint32 limit, uint32 skip,
+ const string_ref& mod_op, const string_ref *mvs, size_t mvslen,
+ const hstcpcli_filter *fils, size_t filslen, int invalues_keypart,
+ const string_ref *invalues, size_t invalueslen);
+ virtual size_t request_buf_append(const char *start, const char *finish);
+ virtual void request_reset();
+ virtual int request_send();
+ virtual int response_recv(size_t& num_flds_r);
+ virtual int get_result(hstresult& result);
+ virtual const string_ref *get_next_row();
+ virtual const string_ref *get_next_row_from_result(hstresult& result);
+ virtual size_t get_row_size();
+ virtual size_t get_row_size_from_result(hstresult& result);
+ virtual void response_buf_remove();
+ virtual int get_error_code();
+ virtual String& get_error();
+ virtual void clear_error();
+ virtual int set_timeout(int send_timeout, int recv_timeout);
+ virtual size_t get_num_req_bufd() { return num_req_bufd; }
+ virtual size_t get_num_req_sent() { return num_req_sent; }
+ virtual size_t get_num_req_rcvd() { return num_req_rcvd; }
+ virtual size_t get_response_end_offset() { return response_end_offset; }
+ virtual const char *get_readbuf_begin() { return readbuf.begin(); }
+ virtual const char *get_readbuf_end() { return readbuf.end(); }
+ virtual const char *get_writebuf_begin() { return writebuf.begin(); }
+ virtual size_t get_writebuf_size() { return writebuf.size(); }
+ virtual void write_error_to_log(const char *func_name, const char *file_name,
+ ulong line_no);
+ private:
+ int read_more();
+ int set_error(int code, const String& str);
+ int set_error(int code, const char *str);
+ private:
+ auto_file fd;
+ socket_args sargs;
+ string_buffer readbuf;
+ string_buffer writebuf;
+ size_t response_end_offset; /* incl newline */
+ size_t cur_row_offset;
+ size_t cur_row_size;
+ size_t num_flds;
+ size_t num_req_bufd; /* buffered but not yet sent */
+ size_t num_req_sent; /* sent but not yet received */
+ size_t num_req_rcvd; /* received but not yet removed */
+ int error_code;
+ String error_str;
+ DYNAMIC_ARRAY flds;
+ int errno_buf;
+};
+
+hstcpcli::hstcpcli(const socket_args& args)
+ : sargs(args), response_end_offset(0), cur_row_offset(0), cur_row_size(0),
+ num_flds(0), num_req_bufd(0), num_req_sent(0), num_req_rcvd(0),
+ error_code(0), errno_buf(0)
+{
+ String err;
+ SPD_INIT_DYNAMIC_ARRAY2(&flds, sizeof(string_ref), NULL, 16, 16, MYF(MY_WME));
+ if (socket_connect(fd, sargs, err) != 0) {
+ set_error(-1, err);
+ }
+}
+
+hstcpcli::~hstcpcli()
+{
+ delete_dynamic(&flds);
+}
+
+void
+hstcpcli::close()
+{
+ fd.close();
+ readbuf.clear();
+ writebuf.clear();
+ response_end_offset = 0;
+ cur_row_offset = 0;
+ num_flds = 0;
+ num_req_bufd = 0;
+ num_req_sent = 0;
+ num_req_rcvd = 0;
+}
+
+int
+hstcpcli::reconnect()
+{
+ clear_error();
+ close();
+ String err;
+ if (socket_connect(fd, sargs, err) != 0) {
+ set_error(-1, err);
+ }
+ return error_code;
+}
+
+int
+hstcpcli::set_timeout(int send_timeout, int recv_timeout)
+{
+ String err;
+ sargs.send_timeout = send_timeout;
+ sargs.recv_timeout = recv_timeout;
+ if (socket_set_timeout(fd, sargs, err) != 0) {
+ set_error(-1, err);
+ }
+ return error_code;
+}
+
+bool
+hstcpcli::stable_point()
+{
+ /* returns true if cli can send a new request */
+ return fd.get() >= 0 && num_req_bufd == 0 && num_req_sent == 0 &&
+ num_req_rcvd == 0 && response_end_offset == 0;
+}
+
+int
+hstcpcli::get_error_code()
+{
+ return error_code;
+}
+
+String&
+hstcpcli::get_error()
+{
+ return error_str;
+}
+
+int
+hstcpcli::read_more()
+{
+ const size_t block_size = 4096; // FIXME
+ char *const wp = readbuf.make_space(block_size);
+ int rlen;
+ errno = 0;
+ while ((rlen = read(fd.get(), wp, block_size)) <= 0) {
+ errno_buf = errno;
+ if (rlen < 0) {
+ if (errno == EINTR || errno == EAGAIN)
+ {
+ errno = 0;
+ continue;
+ }
+ error_str = String("read: failed", &my_charset_bin);
+ } else {
+ error_str = String("read: eof", &my_charset_bin);
+ }
+ return rlen;
+ }
+ readbuf.space_wrote(rlen);
+ return rlen;
+}
+
+void
+hstcpcli::clear_error()
+{
+ DBG(fprintf(stderr, "CLEAR_ERROR: %d\n", error_code));
+ error_code = 0;
+ error_str.length(0);
+}
+
+int
+hstcpcli::set_error(int code, const String& str)
+{
+ DBG(fprintf(stderr, "SET_ERROR: %d\n", code));
+ error_code = code;
+ error_str = str;
+ return error_code;
+}
+
+int
+hstcpcli::set_error(int code, const char *str)
+{
+ uint32 str_len = strlen(str);
+ DBG(fprintf(stderr, "SET_ERROR: %d\n", code));
+ error_code = code;
+ error_str.length(0);
+ if (error_str.reserve(str_len + 1))
+ return 0;
+ error_str.q_append(str, str_len);
+ error_str.c_ptr_safe();
+ return error_code;
+}
+
+void
+hstcpcli::request_buf_open_index(size_t pst_id, const char *dbn,
+ const char *tbl, const char *idx, const char *retflds, const char *filflds)
+{
+/*
+ if (num_req_sent > 0 || num_req_rcvd > 0) {
+*/
+ if (num_req_rcvd > 0) {
+ close();
+ set_error(-1, "request_buf_open_index: protocol out of sync");
+ return;
+ }
+ const string_ref dbn_ref(dbn, strlen(dbn));
+ const string_ref tbl_ref(tbl, strlen(tbl));
+ const string_ref idx_ref(idx, strlen(idx));
+ const string_ref rfs_ref(retflds, strlen(retflds));
+ writebuf.append_literal("P\t");
+ append_uint32(writebuf, pst_id); // FIXME size_t ?
+ writebuf.append_literal("\t");
+ writebuf.append(dbn_ref.begin(), dbn_ref.end());
+ writebuf.append_literal("\t");
+ writebuf.append(tbl_ref.begin(), tbl_ref.end());
+ writebuf.append_literal("\t");
+ writebuf.append(idx_ref.begin(), idx_ref.end());
+ writebuf.append_literal("\t");
+ writebuf.append(rfs_ref.begin(), rfs_ref.end());
+ if (filflds != 0) {
+ const string_ref fls_ref(filflds, strlen(filflds));
+ writebuf.append_literal("\t");
+ writebuf.append(fls_ref.begin(), fls_ref.end());
+ }
+ writebuf.append_literal("\n");
+ ++num_req_bufd;
+}
+
+void
+hstcpcli::request_buf_auth(const char *secret, const char *typ)
+{
+/*
+ if (num_req_sent > 0 || num_req_rcvd > 0) {
+*/
+ if (num_req_rcvd > 0) {
+ close();
+ set_error(-1, "request_buf_auth: protocol out of sync");
+ return;
+ }
+ if (typ == 0) {
+ typ = "1";
+ }
+ const string_ref typ_ref(typ, strlen(typ));
+ const string_ref secret_ref(secret, strlen(secret));
+ writebuf.append_literal("A\t");
+ writebuf.append(typ_ref.begin(), typ_ref.end());
+ writebuf.append_literal("\t");
+ writebuf.append(secret_ref.begin(), secret_ref.end());
+ writebuf.append_literal("\n");
+ ++num_req_bufd;
+}
+
+namespace {
+
+void
+append_delim_value(string_buffer& buf, const char *start, const char *finish)
+{
+ if (start == 0) {
+ /* null */
+ const char t[] = "\t\0";
+ buf.append(t, t + 2);
+ } else {
+ /* non-null */
+ buf.append_literal("\t");
+ escape_string(buf, start, finish);
+ }
+}
+
+};
+
+void
+hstcpcli::request_buf_exec_generic(size_t pst_id, const string_ref& op,
+ const string_ref *kvs, size_t kvslen, uint32 limit, uint32 skip,
+ const string_ref& mod_op, const string_ref *mvs, size_t mvslen,
+ const hstcpcli_filter *fils, size_t filslen, int invalues_keypart,
+ const string_ref *invalues, size_t invalueslen)
+{
+/*
+ if (num_req_sent > 0 || num_req_rcvd > 0) {
+*/
+ if (num_req_rcvd > 0) {
+ close();
+ set_error(-1, "request_buf_exec_generic: protocol out of sync");
+ return;
+ }
+ append_uint32(writebuf, pst_id); // FIXME size_t ?
+ writebuf.append_literal("\t");
+ writebuf.append(op.begin(), op.end());
+ writebuf.append_literal("\t");
+ append_uint32(writebuf, kvslen); // FIXME size_t ?
+ for (size_t i = 0; i < kvslen; ++i) {
+ const string_ref& kv = kvs[i];
+ append_delim_value(writebuf, kv.begin(), kv.end());
+ }
+ if (limit != 0 || skip != 0 || invalues_keypart >= 0 ||
+ mod_op.size() != 0 || filslen != 0) {
+ /* has more option */
+ writebuf.append_literal("\t");
+ append_uint32(writebuf, limit); // FIXME size_t ?
+ if (skip != 0 || invalues_keypart >= 0 ||
+ mod_op.size() != 0 || filslen != 0) {
+ writebuf.append_literal("\t");
+ append_uint32(writebuf, skip); // FIXME size_t ?
+ }
+ if (invalues_keypart >= 0) {
+ writebuf.append_literal("\t@\t");
+ append_uint32(writebuf, invalues_keypart);
+ writebuf.append_literal("\t");
+ append_uint32(writebuf, invalueslen);
+ for (size_t i = 0; i < invalueslen; ++i) {
+ const string_ref& s = invalues[i];
+ append_delim_value(writebuf, s.begin(), s.end());
+ }
+ }
+ for (size_t i = 0; i < filslen; ++i) {
+ const hstcpcli_filter& f = fils[i];
+ writebuf.append_literal("\t");
+ writebuf.append(f.filter_type.begin(), f.filter_type.end());
+ writebuf.append_literal("\t");
+ writebuf.append(f.op.begin(), f.op.end());
+ writebuf.append_literal("\t");
+ append_uint32(writebuf, f.ff_offset);
+ append_delim_value(writebuf, f.val.begin(), f.val.end());
+ }
+ if (mod_op.size() != 0) {
+ writebuf.append_literal("\t");
+ writebuf.append(mod_op.begin(), mod_op.end());
+ for (size_t i = 0; i < mvslen; ++i) {
+ const string_ref& mv = mvs[i];
+ append_delim_value(writebuf, mv.begin(), mv.end());
+ }
+ }
+ }
+ writebuf.append_literal("\n");
+ ++num_req_bufd;
+}
+
+size_t
+hstcpcli::request_buf_append(const char *start, const char *finish)
+{
+/*
+ if (num_req_sent > 0 || num_req_rcvd > 0) {
+*/
+ if (num_req_rcvd > 0) {
+ close();
+ set_error(-1, "request_buf_append: protocol out of sync");
+ return 0;
+ }
+ const char *nl = start;
+ size_t num_req = 0;
+ while ((nl = memchr_char(nl, '\n', finish - nl))) {
+ if (nl == finish)
+ break;
+ num_req++;
+ nl++;
+ }
+ num_req++;
+ writebuf.append(start, finish);
+ if (*(finish - 1) != '\n')
+ writebuf.append_literal("\n");
+ num_req_bufd += num_req;
+ return num_req;
+}
+
+void
+hstcpcli::request_reset()
+{
+ if (num_req_bufd) {
+ writebuf.erase_front(writebuf.size());
+ num_req_bufd = 0;
+ }
+}
+
+int
+hstcpcli::request_send()
+{
+ if (error_code < 0) {
+ return error_code;
+ }
+ clear_error();
+ if (fd.get() < 0) {
+ close();
+ return set_error(-1, "write: closed");
+ }
+/*
+ if (num_req_bufd == 0 || num_req_sent > 0 || num_req_rcvd > 0) {
+*/
+ if (num_req_bufd == 0 || num_req_rcvd > 0) {
+ close();
+ return set_error(-1, "request_send: protocol out of sync");
+ }
+ const size_t wrlen = writebuf.size();
+ const ssize_t r = send(fd.get(), writebuf.begin(), wrlen, MSG_NOSIGNAL);
+ if (r <= 0) {
+ close();
+ return set_error(-1, r < 0 ? "write: failed" : "write: eof");
+ }
+ writebuf.erase_front(r);
+ if (static_cast<size_t>(r) != wrlen) {
+ close();
+ return set_error(-1, "write: incomplete");
+ }
+ num_req_sent += num_req_bufd;
+ num_req_bufd = 0;
+ DBG(fprintf(stderr, "REQSEND 0\n"));
+ return 0;
+}
+
+int
+hstcpcli::response_recv(size_t& num_flds_r)
+{
+ if (error_code < 0) {
+ return error_code;
+ }
+ clear_error();
+ if (num_req_bufd > 0 || num_req_sent == 0 || num_req_rcvd > 0 ||
+ response_end_offset != 0) {
+ close();
+ return set_error(-1, "response_recv: protocol out of sync");
+ }
+ cur_row_offset = 0;
+ num_flds_r = num_flds = 0;
+ if (fd.get() < 0) {
+ return set_error(-1, "read: closed");
+ }
+ size_t offset = 0;
+ while (true) {
+ const char *const lbegin = readbuf.begin() + offset;
+ const char *const lend = readbuf.end();
+ if (lbegin < lend)
+ {
+ const char *const nl = memchr_char(lbegin, '\n', lend - lbegin);
+ if (nl != 0) {
+ offset += (nl + 1) - lbegin;
+ break;
+ }
+ offset += lend - lbegin;
+ }
+ if (read_more() <= 0) {
+ close();
+ error_code = -1;
+ return error_code;
+ }
+ }
+ response_end_offset = offset;
+ --num_req_sent;
+ ++num_req_rcvd;
+ char *start = readbuf.begin();
+ char *const finish = start + response_end_offset - 1;
+ const size_t resp_code = read_ui32(start, finish);
+ skip_one(start, finish);
+ num_flds_r = num_flds = read_ui32(start, finish);
+ if (resp_code != 0) {
+ skip_one(start, finish);
+ char *const err_begin = start;
+ read_token(start, finish);
+ char *const err_end = start;
+ String e = String(err_begin, (uint32)(err_end - err_begin), &my_charset_bin);
+ if (!e.length()) {
+ e = String("unknown_error", &my_charset_bin);
+ }
+ return set_error(resp_code, e);
+ }
+ cur_row_size = 0;
+ cur_row_offset = start - readbuf.begin();
+ DBG(fprintf(stderr, "[%s] ro=%zu eol=%zu\n",
+ String(readbuf.begin(), readbuf.begin() + response_end_offset)
+ .c_str(),
+ cur_row_offset, response_end_offset));
+ DBG(fprintf(stderr, "RES 0\n"));
+ if (flds.max_element < num_flds)
+ {
+ if (allocate_dynamic(&flds, num_flds))
+ return set_error(-1, "out of memory");
+ }
+ flds.elements = num_flds;
+ return 0;
+}
+
+int
+hstcpcli::get_result(hstresult& result)
+{
+/*
+ readbuf.swap(result.readbuf);
+*/
+ char *const wp = result.readbuf.make_space(response_end_offset);
+ memcpy(wp, readbuf.begin(), response_end_offset);
+ result.readbuf.space_wrote(response_end_offset);
+ result.response_end_offset = response_end_offset;
+ result.num_flds = num_flds;
+ result.cur_row_size = cur_row_size;
+ result.cur_row_offset = cur_row_offset;
+ if (result.flds.max_element < num_flds)
+ {
+ if (allocate_dynamic(&result.flds, num_flds))
+ return set_error(-1, "out of memory");
+ }
+ result.flds.elements = num_flds;
+ return 0;
+}
+
+const string_ref *
+hstcpcli::get_next_row()
+{
+ if (num_flds == 0 || flds.elements < num_flds) {
+ DBG(fprintf(stderr, "GNR NF 0\n"));
+ return 0;
+ }
+ char *start = readbuf.begin() + cur_row_offset;
+ char *const finish = readbuf.begin() + response_end_offset - 1;
+ if (start >= finish) { /* start[0] == nl */
+ DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish));
+ return 0;
+ }
+ for (size_t i = 0; i < num_flds; ++i) {
+ skip_one(start, finish);
+ char *const fld_begin = start;
+ read_token(start, finish);
+ char *const fld_end = start;
+ char *wp = fld_begin;
+ if (is_null_expression(fld_begin, fld_end)) {
+ /* null */
+ ((string_ref *) flds.buffer)[i] = string_ref();
+ } else {
+ unescape_string(wp, fld_begin, fld_end); /* in-place */
+ ((string_ref *) flds.buffer)[i] = string_ref(fld_begin, wp);
+ }
+ }
+ cur_row_size = start - (readbuf.begin() + cur_row_offset);
+ cur_row_offset = start - readbuf.begin();
+ return (string_ref *) flds.buffer;
+}
+
+const string_ref *
+hstcpcli::get_next_row_from_result(hstresult& result)
+{
+ if (result.num_flds == 0 || result.flds.elements < result.num_flds) {
+ DBG(fprintf(stderr, "GNR NF 0\n"));
+ return 0;
+ }
+ char *start = result.readbuf.begin() + result.cur_row_offset;
+ char *const finish = result.readbuf.begin() + result.response_end_offset - 1;
+ if (start >= finish) { /* start[0] == nl */
+ DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish));
+ return 0;
+ }
+ for (size_t i = 0; i < result.num_flds; ++i) {
+ skip_one(start, finish);
+ char *const fld_begin = start;
+ read_token(start, finish);
+ char *const fld_end = start;
+ char *wp = fld_begin;
+ if (is_null_expression(fld_begin, fld_end)) {
+ /* null */
+ ((string_ref *) result.flds.buffer)[i] = string_ref();
+ } else {
+ unescape_string(wp, fld_begin, fld_end); /* in-place */
+ ((string_ref *) result.flds.buffer)[i] = string_ref(fld_begin, wp);
+ }
+ }
+ result.cur_row_size =
+ start - (result.readbuf.begin() + result.cur_row_offset);
+ result.cur_row_offset = start - result.readbuf.begin();
+ return (string_ref *) result.flds.buffer;
+}
+
+size_t
+hstcpcli::get_row_size()
+{
+ return cur_row_size;
+}
+
+size_t
+hstcpcli::get_row_size_from_result(hstresult& result)
+{
+ return result.cur_row_size;
+}
+
+void
+hstcpcli::response_buf_remove()
+{
+ if (response_end_offset == 0) {
+ close();
+ set_error(-1, "response_buf_remove: protocol out of sync");
+ return;
+ }
+ readbuf.erase_front(response_end_offset);
+ response_end_offset = 0;
+ --num_req_rcvd;
+ cur_row_offset = 0;
+ num_flds = 0;
+}
+
+void
+hstcpcli::write_error_to_log(
+ const char *func_name,
+ const char *file_name,
+ ulong line_no
+) {
+ if (errno_buf) {
+ time_t cur_time = (time_t) time((time_t*) 0);
+ struct tm lt;
+ struct tm *l_time = localtime_r(&cur_time, &lt);
+ fprintf(stderr,
+ "%04d%02d%02d %02d:%02d:%02d [ERROR] hstcpcli: [%d][%s]"
+ " [%s][%s][%lu] errno=%d\n",
+ l_time->tm_year + 1900, l_time->tm_mon + 1, l_time->tm_mday,
+ l_time->tm_hour, l_time->tm_min, l_time->tm_sec,
+ error_code, error_str.c_ptr_safe(),
+ func_name, file_name, line_no, errno_buf);
+ }
+}
+
+hstcpcli_ptr
+hstcpcli_i::create(const socket_args& args)
+{
+ return hstcpcli_ptr(new hstcpcli(args));
+}
+
+};
+