diff options
Diffstat (limited to 'storage/mroonga/vendor/groonga/src/suggest')
13 files changed, 2448 insertions, 0 deletions
diff --git a/storage/mroonga/vendor/groonga/src/suggest/CMakeLists.txt b/storage/mroonga/vendor/groonga/src/suggest/CMakeLists.txt new file mode 100644 index 00000000..ec85c1fb --- /dev/null +++ b/storage/mroonga/vendor/groonga/src/suggest/CMakeLists.txt @@ -0,0 +1,87 @@ +# Copyright(C) 2012-2013 Brazil +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +include_directories( + ${CMAKE_CURRENT_SOURCE_DIR}/../../lib + ${MRUBY_INCLUDE_DIRS} + ${MESSAGE_PACK_INCLUDE_DIRS}) + +read_file_list(${CMAKE_CURRENT_SOURCE_DIR}/create_dataset_sources.am + GROONGA_SUGGEST_CREATE_DATASET_SOURCES) +add_executable(groonga-suggest-create-dataset + ${GROONGA_SUGGEST_CREATE_DATASET_SOURCES}) +set_source_files_properties(${GROONGA_SUGGEST_CREATE_DATASET_SOURCES} + PROPERTIES + COMPILE_FLAGS "${GRN_C_COMPILE_FLAGS}") +target_link_libraries(groonga-suggest-create-dataset libgroonga) +install( + TARGETS groonga-suggest-create-dataset + DESTINATION ${BIN_DIR}) + +if(GRN_WITH_LIBEVENT AND GRN_WITH_ZEROMQ AND GRN_WITH_MESSAGE_PACK) + set(GRN_WITH_SUGGEST_LEARNER TRUE) +else() + set(GRN_WITH_SUGGEST_LEARNER FALSE) +endif() + +if(GRN_WITH_SUGGEST_LEARNER) + include_directories( + ${LIBEVENT_INCLUDE_DIRS} + ${ZEROMQ_INCLUDE_DIRS} + ${MESSAGE_PACK_INCLUDE_DIRS} + ) + link_directories( + ${LIBEVENT_LIBRARY_DIRS} + ${ZEROMQ_LIBRARY_DIRS} + ${MESSAGE_PACK_LIBRARY_DIRS} + ) + + read_file_list(${CMAKE_CURRENT_SOURCE_DIR}/util_sources.am + GROONGA_SUGGEST_UTIL_SOURCES) + add_library(groonga-suggest-util STATIC ${GROONGA_SUGGEST_UTIL_SOURCES}) + set_source_files_properties(${GROONGA_SUGGEST_UTIL_SOURCES} + PROPERTIES + COMPILE_FLAGS "${GRN_C_COMPILE_FLAGS}") + + read_file_list(${CMAKE_CURRENT_SOURCE_DIR}/learner_sources.am + GROONGA_SUGGEST_LEARNER_SOURCES) + add_executable(groonga-suggest-learner ${GROONGA_SUGGEST_LEARNER_SOURCES}) + set_source_files_properties(${GROONGA_SUGGEST_LEARNER_SOURCES} + PROPERTIES + COMPILE_FLAGS "${GRN_C_COMPILE_FLAGS}") + target_link_libraries(groonga-suggest-learner + groonga-suggest-util + libgroonga + ${LIBEVENT_LIBRARIES} + ${ZEROMQ_LIBRARIES} + ${MESSAGE_PACK_LIBRARIES}) + + read_file_list(${CMAKE_CURRENT_SOURCE_DIR}/httpd_sources.am + GROONGA_SUGGEST_HTTPD_SOURCES) + add_executable(groonga-suggest-httpd ${GROONGA_SUGGEST_HTTPD_SOURCES}) + set_source_files_properties(${GROONGA_SUGGEST_HTTPD_SOURCES} + PROPERTIES + COMPILE_FLAGS "${GRN_C_COMPILE_FLAGS}") + target_link_libraries(groonga-suggest-httpd + groonga-suggest-util + libgroonga + ${LIBEVENT_LIBRARIES} + ${ZEROMQ_LIBRARIES} + ${MESSAGE_PACK_LIBRARIES}) + + install( + TARGETS groonga-suggest-learner groonga-suggest-httpd + DESTINATION ${BIN_DIR}) +endif() diff --git a/storage/mroonga/vendor/groonga/src/suggest/Makefile.am b/storage/mroonga/vendor/groonga/src/suggest/Makefile.am new file mode 100644 index 00000000..91260016 --- /dev/null +++ b/storage/mroonga/vendor/groonga/src/suggest/Makefile.am @@ -0,0 +1,74 @@ +bin_PROGRAMS = + +NONEXISTENT_CXX_SOURCE = nonexistent.cpp + +bin_PROGRAMS += \ + groonga-suggest-create-dataset + +if ENABLE_SUGGEST_LEARNER +bin_PROGRAMS += \ + groonga-suggest-learner \ + groonga-suggest-httpd +noinst_LTLIBRARIES = libutil.la +endif + +EXTRA_DIST = \ + CMakeLists.txt + +AM_CFLAGS = \ + $(NO_STRICT_ALIASING_CFLAGS) \ + $(COVERAGE_CFLAGS) \ + $(GRN_CFLAGS) \ + $(MESSAGE_PACK_CFLAGS) \ + $(MRUBY_CFLAGS) + +DEFS += $(GRN_DEFS) + +AM_LDFLAGS = -no-undefined + +DEFAULT_INCLUDES = \ + -I$(top_builddir) \ + -I$(srcdir) \ + -I$(top_srcdir)/include \ + -I$(top_srcdir)/lib \ + $(GROONGA_INCLUDEDIR) + +include learner_sources.am +nodist_EXTRA_groonga_suggest_learner_SOURCES = $(NONEXISTENT_CXX_SOURCE) +groonga_suggest_learner_CFLAGS = \ + $(AM_CFLAGS) \ + $(LIBEVENT_CFLAGS) \ + $(LIBZMQ_CFLAGS) \ + $(MESSAGE_PACK_CFLAGS) +groonga_suggest_learner_LDADD = \ + libutil.la \ + $(top_builddir)/lib/libgroonga.la \ + $(LIBEVENT_LIBS) \ + $(LIBZMQ_LIBS) \ + $(MESSAGE_PACK_LIBS) + +include httpd_sources.am +nodist_EXTRA_groonga_suggest_httpd_SOURCES = $(NONEXISTENT_CXX_SOURCE) +groonga_suggest_httpd_CFLAGS = \ + $(AM_CFLAGS) \ + $(LIBEVENT_CFLAGS) \ + $(LIBZMQ_CFLAGS) \ + $(MESSAGE_PACK_CFLAGS) +groonga_suggest_httpd_LDADD = \ + libutil.la \ + $(top_builddir)/lib/libgroonga.la \ + $(LIBEVENT_LIBS) \ + $(LIBZMQ_LIBS) \ + $(MESSAGE_PACK_LIBS) + +include create_dataset_sources.am +nodist_EXTRA_groonga_suggest_create_dataset_SOURCES = $(NONEXISTENT_CXX_SOURCE) +groonga_suggest_create_dataset_CFLAGS = \ + $(AM_CFLAGS) +groonga_suggest_create_dataset_LDADD = \ + $(top_builddir)/lib/libgroonga.la + +include util_sources.am +libutil_la_CFLAGS = \ + $(AM_CFLAGS) \ + $(LIBEVENT_CFLAGS) diff --git a/storage/mroonga/vendor/groonga/src/suggest/create_dataset_sources.am b/storage/mroonga/vendor/groonga/src/suggest/create_dataset_sources.am new file mode 100644 index 00000000..cfecd650 --- /dev/null +++ b/storage/mroonga/vendor/groonga/src/suggest/create_dataset_sources.am @@ -0,0 +1,2 @@ +groonga_suggest_create_dataset_SOURCES = \ + groonga_suggest_create_dataset.c diff --git a/storage/mroonga/vendor/groonga/src/suggest/groonga_suggest_create_dataset.c b/storage/mroonga/vendor/groonga/src/suggest/groonga_suggest_create_dataset.c new file mode 100644 index 00000000..7cec2922 --- /dev/null +++ b/storage/mroonga/vendor/groonga/src/suggest/groonga_suggest_create_dataset.c @@ -0,0 +1,223 @@ +/* -*- c-basic-offset: 2 -*- */ +/* Copyright(C) 2010-2015 Brazil + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA +*/ + +/* For grn_str_getopt() */ +#include <grn_str.h> + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <groonga.h> + +typedef enum { + MODE_NONE, + MODE_USAGE +} ModeFlags; + +static const char *DEFAULT_DEFAULT_TOKENIZER = "TokenBigram"; + +static void +usage(FILE *output, int argc, char **argv) +{ +#define OUTPUT(...) fprintf(output, __VA_ARGS__) + + OUTPUT("Usage: %s [OPTIONS] DB_PATH DATASET_NAME\n", argv[0]); + OUTPUT(" e.g.: %s /tmp/db shops\n", argv[0]); + OUTPUT("\n"); + OUTPUT("Options:\n"); + OUTPUT(" --default-tokenizer=TOKENIZER Use TOKENIZER as the default\n"); + OUTPUT(" tokenizer for item name\n"); + OUTPUT(" (default: %s)\n", + DEFAULT_DEFAULT_TOKENIZER); + OUTPUT(" -h, --help Show this message and exit\n"); + +#undef OUTPUT +} + +static void +output(grn_ctx *ctx) +{ + int flags = 0; + char *str; + unsigned int str_len; + + do { + grn_ctx_recv(ctx, &str, &str_len, &flags); + if (str_len > 0 || ctx->rc) { + if (ctx->rc) { + printf("ERROR (%d): %s\n", ctx->rc, ctx->errbuf); + } + if (str_len > 0) { + printf("%.*s\n", str_len, str); + } + } + } while (flags & GRN_CTX_MORE); +} + +static void +send_command(grn_ctx *ctx, grn_obj *buffer, const char *command, + const char *dataset_name) +{ + const char *p = command; + const char *dataset_place_holder = "${DATASET}"; + char *dataset_place_holder_position; + + if (ctx->rc != GRN_SUCCESS) { + return; + } + + GRN_BULK_REWIND(buffer); + while ((dataset_place_holder_position = strstr(p, dataset_place_holder))) { + GRN_TEXT_PUT(ctx, buffer, p, dataset_place_holder_position - p); + GRN_TEXT_PUTS(ctx, buffer, dataset_name); + p = dataset_place_holder_position + strlen(dataset_place_holder); + } + GRN_TEXT_PUTS(ctx, buffer, p); + printf("> %.*s\n", (int)GRN_TEXT_LEN(buffer), GRN_TEXT_VALUE(buffer)); + grn_ctx_send(ctx, GRN_TEXT_VALUE(buffer), GRN_TEXT_LEN(buffer), 0); + output(ctx); +} + + +int +main(int argc, char **argv) +{ + const char *db_path; + const char *dataset_name; + grn_ctx ctx_, *ctx; + grn_obj *db; + grn_bool success = GRN_TRUE; + int parsed_argc, rest_argc; + int flags = MODE_NONE; + const char *default_tokenizer = NULL; + static grn_str_getopt_opt opts[] = { + {'\0', "default-tokenizer", NULL, 0, GETOPT_OP_NONE}, + {'h', "help", NULL, MODE_USAGE, GETOPT_OP_UPDATE} + }; + + opts[0].arg = &default_tokenizer; + + parsed_argc = grn_str_getopt(argc, argv, opts, &flags); + if (parsed_argc < 0) { + usage(stderr, argc, argv); + return EXIT_FAILURE; + } + + if (flags & MODE_USAGE) { + usage(stdout, argc, argv); + return EXIT_SUCCESS; + } + + rest_argc = argc - parsed_argc; + if (rest_argc != 2) { + usage(stderr, argc, argv); + return EXIT_FAILURE; + } + + db_path = argv[parsed_argc]; + dataset_name = argv[parsed_argc + 1]; + + grn_init(); + + ctx = &ctx_; + grn_ctx_init(ctx, 0); + db = grn_db_open(ctx, db_path); + if (!db) { + if (ctx->rc == GRN_NO_SUCH_FILE_OR_DIRECTORY) { + db = grn_db_create(ctx, db_path, NULL); + if (!db) { + fprintf(stderr, "DB create failed (%s): %s\n", db_path, ctx->errbuf); + } + } else { + fprintf(stderr, "DB open failed (%s): %s\n", db_path, ctx->errbuf); + } + } + + if (db) { + grn_obj text; + GRN_TEXT_INIT(&text, 0); +#define SEND(string) send_command(ctx, &text, string, dataset_name) + SEND("plugin_register suggest/suggest"); + SEND("table_create event_type TABLE_HASH_KEY ShortText"); + { + grn_obj query; + GRN_TEXT_INIT(&query, 0); + GRN_TEXT_PUTS(ctx, &query, + "table_create bigram TABLE_PAT_KEY ShortText " + "--default_tokenizer "); + if (default_tokenizer) { + GRN_TEXT_PUTS(ctx, &query, default_tokenizer); + } else { + GRN_TEXT_PUTS(ctx, &query, DEFAULT_DEFAULT_TOKENIZER); + } + GRN_TEXT_PUTS(ctx, &query, " --normalizer NormalizerAuto"); + GRN_TEXT_PUTC(ctx, &query, '\0'); + SEND(GRN_TEXT_VALUE(&query)); + GRN_OBJ_FIN(ctx, &query); + } + SEND("table_create kana TABLE_PAT_KEY ShortText " + "--normalizer NormalizerAuto"); + SEND("table_create item_${DATASET} TABLE_PAT_KEY " + "ShortText --default_tokenizer TokenDelimit " + "--normalizer NormalizerAuto"); + SEND("column_create bigram item_${DATASET}_key " + "COLUMN_INDEX|WITH_POSITION item_${DATASET} _key"); + SEND("column_create item_${DATASET} kana COLUMN_VECTOR kana"); + SEND("column_create kana item_${DATASET}_kana COLUMN_INDEX " + "item_${DATASET} kana"); + SEND("column_create item_${DATASET} freq COLUMN_SCALAR Int32"); + SEND("column_create item_${DATASET} last COLUMN_SCALAR Time"); + SEND("column_create item_${DATASET} boost COLUMN_SCALAR Int32"); + SEND("column_create item_${DATASET} freq2 COLUMN_SCALAR Int32"); + SEND("column_create item_${DATASET} buzz COLUMN_SCALAR Int32"); + + SEND("table_create pair_${DATASET} TABLE_HASH_KEY UInt64"); + SEND("column_create pair_${DATASET} pre COLUMN_SCALAR item_${DATASET}"); + SEND("column_create pair_${DATASET} post COLUMN_SCALAR item_${DATASET}"); + SEND("column_create pair_${DATASET} freq0 COLUMN_SCALAR Int32"); + SEND("column_create pair_${DATASET} freq1 COLUMN_SCALAR Int32"); + SEND("column_create pair_${DATASET} freq2 COLUMN_SCALAR Int32"); + SEND("column_create item_${DATASET} co COLUMN_INDEX pair_${DATASET} pre"); + + SEND("table_create sequence_${DATASET} TABLE_HASH_KEY ShortText"); + SEND("table_create event_${DATASET} TABLE_NO_KEY"); + SEND("column_create sequence_${DATASET} events " + "COLUMN_VECTOR|RING_BUFFER event_${DATASET}"); + SEND("column_create event_${DATASET} type COLUMN_SCALAR event_type"); + SEND("column_create event_${DATASET} time COLUMN_SCALAR Time"); + SEND("column_create event_${DATASET} item COLUMN_SCALAR item_${DATASET}"); + SEND("column_create event_${DATASET} sequence COLUMN_SCALAR " + "sequence_${DATASET}"); + + SEND("table_create configuration TABLE_HASH_KEY ShortText"); + SEND("column_create configuration weight COLUMN_SCALAR UInt32"); + SEND("load --table configuration"); + SEND("["); + SEND("{\"_key\": \"${DATASET}\", \"weight\": 1}"); + SEND("]"); +#undef SEND + success = ctx->rc == GRN_SUCCESS; + GRN_OBJ_FIN(ctx, &text); + GRN_OBJ_FIN(ctx, db); + } else { + success = GRN_FALSE; + } + grn_ctx_fin(ctx); + grn_fin(); + + return success ? EXIT_SUCCESS : EXIT_FAILURE; +} diff --git a/storage/mroonga/vendor/groonga/src/suggest/groonga_suggest_ddl.txt b/storage/mroonga/vendor/groonga/src/suggest/groonga_suggest_ddl.txt new file mode 100644 index 00000000..f82763f8 --- /dev/null +++ b/storage/mroonga/vendor/groonga/src/suggest/groonga_suggest_ddl.txt @@ -0,0 +1,62 @@ +suggest向けDDL解説 +================== + +suggest機能で必要となるスキーマを説明する。なお、このドキュメントはsuggest機能完成とともにdocsディレクトリ内に移動される。 + +1. 概要 + + suggestデータベースには、学習過程でのみ必要になるテーブルと、 + 学習時およびサジェスト時に必要になるテーブルがあります。 + 前者を作業用テーブルと呼び、後者を学習結果テーブルと呼びます。 + +2. 学習結果テーブル + + 学習結果テーブルは二つのファクトテーブルと二つの語彙テーブルから構成されます。 + +2.1. 学習結果ファクトテーブル + +item: suggest候補として提示する個々の文字列を格納するテーブルです。 + + 以下のカラムを用意しています。 + + _key: itemテーブルの主キーは、サジェスト対象文字列そのものとなります。 + kana: 対象文字列の読み仮名を格納します。必須ではありませんが、セットされていればローマ字入力途中の文字列に対しても補完候補を提示可能となります。 + freq: ユーザクエリにおいて入力された回数を記録します。 + last: ユーザクエリに最後に入力された時刻を記録します。 + freq2: ユーザクエリにおいてサブミットされた回数を記録します。 + boost: 当該文字列の露出を制御します。正値を指定すれば露出が頻繁になり、-1を指定すれば、露出されません。 + buzz: 入力回数の差分を記録します。 + co: 共起する他の文字列を参照するための索引です。 + +pair: item間の共起関係を管理するテーブルです。 + + 以下のカラムを用意しています。 + + _kay: preおよびpostのIDを結合した数値が主キーとなります。 + pre: 事前に出現するアイティムです。 + post: 事後に出現するアイティムです。 + freq0: completeイベントにおける共起頻度を記録します。 + freq1: correctイベントにおける共起頻度を記録します。 + freq2: suggestイベントにおける共起頻度を記録します。 + +2.2. 学習結果語彙テーブル + + bigram: itemの主キーによって全文検索を行うための語彙表です。 + kana: itemのkanaカラムの入力補完を行うための語彙表です。 + +3. 作業用テーブル + + 作業用テーブルにはeventとsequenceの2テーブルがあります。 + +event: ユーザの個々のクエリイベントに対応します。レコードの追加のみが行われます。 + 適宜dropして再作成することが望ましいです。 + + type: submitの有無を記録します。 + time: イベントが発生した時刻を記録します。 + item: ユーザが入力した文字列を記録します。 + sequence: ユーザの識別子(cookie, ipアドレス等)を記録します。 + +sequence: 同一ユーザの一連の操作を記録します。レコードの追加のみが行われます。 + 適宜dropして再作成することが望ましいです。 + + events: 当該ユーザの操作履歴を記録します。 diff --git a/storage/mroonga/vendor/groonga/src/suggest/groonga_suggest_httpd.c b/storage/mroonga/vendor/groonga/src/suggest/groonga_suggest_httpd.c new file mode 100644 index 00000000..4f542f21 --- /dev/null +++ b/storage/mroonga/vendor/groonga/src/suggest/groonga_suggest_httpd.c @@ -0,0 +1,860 @@ +/* -*- c-basic-offset: 2 -*- */ +/* Copyright(C) 2010-2015 Brazil + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA +*/ + +/* groonga origin headers */ +#include <grn_str.h> +#include <grn_msgpack.h> + +#include <stdio.h> +#include <signal.h> +#include <string.h> +#include <sys/types.h> +#include <sys/time.h> +#include <time.h> +#include <stdlib.h> +#include <unistd.h> +#include <err.h> + +#include <fcntl.h> +#include <sys/queue.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <sys/resource.h> + +#include "zmq_compatible.h" +#include <event.h> +#include <evhttp.h> +#include <groonga.h> +#include <pthread.h> + +#include "util.h" + +#define DEFAULT_PORT 8080 +#define DEFAULT_MAX_THREADS 8 + +#define CONST_STR_LEN(x) x, x ? sizeof(x) - 1 : 0 + +#define LISTEN_BACKLOG 756 +#define MIN_MAX_FDS 2048 +#define MAX_THREADS 128 /* max 256 */ + +typedef enum { + run_mode_none = 0, + run_mode_usage, + run_mode_daemon, + run_mode_error +} run_mode; + +#define RUN_MODE_MASK 0x007f +#define RUN_MODE_ENABLE_MAX_FD_CHECK 0x0080 + + +typedef struct { + grn_ctx *ctx; + grn_obj *db; + void *zmq_sock; + grn_obj cmd_buf; + grn_obj pass_through_parameters; + pthread_t thd; + uint32_t thread_id; + struct event_base *base; + struct evhttp *httpd; + struct event pulse; + const char *log_base_path; + FILE *log_file; + uint32_t log_count; + grn_bool request_reopen_log_file; +} thd_data; + +typedef struct { + const char *db_path; + const char *recv_endpoint; + pthread_t thd; + void *zmq_ctx; +} recv_thd_data; + +#define CMD_BUF_SIZE 1024 + +static thd_data threads[MAX_THREADS]; +static uint32_t default_max_threads = DEFAULT_MAX_THREADS; +static uint32_t max_threads; +static volatile sig_atomic_t loop = 1; +static grn_obj *db; +static uint32_t n_lines_per_log_file = 1000000; + +static int +suggest_result(grn_ctx *ctx, + struct evbuffer *res_buf, const char *types, const char *query, + const char *target_name, int frequency_threshold, + double conditional_probability_threshold, int limit, + grn_obj *cmd_buf, grn_obj *pass_through_parameters) +{ + if (target_name && types && query) { + GRN_BULK_REWIND(cmd_buf); + GRN_TEXT_PUTS(ctx, cmd_buf, "/d/suggest?table=item_"); + grn_text_urlenc(ctx, cmd_buf, target_name, strlen(target_name)); + GRN_TEXT_PUTS(ctx, cmd_buf, "&column=kana&types="); + grn_text_urlenc(ctx, cmd_buf, types, strlen(types)); + GRN_TEXT_PUTS(ctx, cmd_buf, "&query="); + grn_text_urlenc(ctx, cmd_buf, query, strlen(query)); + GRN_TEXT_PUTS(ctx, cmd_buf, "&frequency_threshold="); + grn_text_itoa(ctx, cmd_buf, frequency_threshold); + GRN_TEXT_PUTS(ctx, cmd_buf, "&conditional_probability_threshold="); + grn_text_ftoa(ctx, cmd_buf, conditional_probability_threshold); + GRN_TEXT_PUTS(ctx, cmd_buf, "&limit="); + grn_text_itoa(ctx, cmd_buf, limit); + if (GRN_TEXT_LEN(pass_through_parameters) > 0) { + GRN_TEXT_PUTS(ctx, cmd_buf, "&"); + GRN_TEXT_PUT(ctx, cmd_buf, + GRN_TEXT_VALUE(pass_through_parameters), + GRN_TEXT_LEN(pass_through_parameters)); + } + { + char *res; + int flags; + unsigned int res_len; + + grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), 0); + grn_ctx_recv(ctx, &res, &res_len, &flags); + + evbuffer_add(res_buf, res, res_len); + return res_len; + } + } else { + evbuffer_add(res_buf, "{}", 2); + return 2; + } +} + +static void +log_send(struct evkeyvalq *output_headers, struct evbuffer *res_buf, + thd_data *thd, struct evkeyvalq *get_args) +{ + uint64_t millisec; + int frequency_threshold, limit; + double conditional_probability_threshold; + const char *callback, *types, *query, *client_id, *target_name, + *learn_target_name; + + GRN_BULK_REWIND(&(thd->pass_through_parameters)); + parse_keyval(thd->ctx, get_args, &query, &types, &client_id, &target_name, + &learn_target_name, &callback, &millisec, &frequency_threshold, + &conditional_probability_threshold, &limit, + &(thd->pass_through_parameters)); + + /* send data to learn client */ + if (thd->zmq_sock && millisec && client_id && query && learn_target_name) { + char c; + size_t l; + msgpack_packer pk; + msgpack_sbuffer sbuf; + int cnt, submit_flag = 0; + + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write); + + cnt = 4; + if (types && !strcmp(types, "submit")) { + cnt++; + types = NULL; + submit_flag = 1; + } + msgpack_pack_map(&pk, cnt); + + c = 'i'; + msgpack_pack_str(&pk, 1); + msgpack_pack_str_body(&pk, &c, 1); + l = strlen(client_id); + msgpack_pack_str(&pk, l); + msgpack_pack_str_body(&pk, client_id, l); + + c = 'q'; + msgpack_pack_str(&pk, 1); + msgpack_pack_str_body(&pk, &c, 1); + l = strlen(query); + msgpack_pack_str(&pk, l); + msgpack_pack_str_body(&pk, query, l); + + c = 's'; + msgpack_pack_str(&pk, 1); + msgpack_pack_str_body(&pk, &c, 1); + msgpack_pack_uint64(&pk, millisec); + + c = 'l'; + msgpack_pack_str(&pk, 1); + msgpack_pack_str_body(&pk, &c, 1); + l = strlen(learn_target_name); + msgpack_pack_str(&pk, l); + msgpack_pack_str_body(&pk, learn_target_name, l); + + if (submit_flag) { + c = 't'; + msgpack_pack_str(&pk, 1); + msgpack_pack_str_body(&pk, &c, 1); + msgpack_pack_true(&pk); + } + { + zmq_msg_t msg; + if (!zmq_msg_init_size(&msg, sbuf.size)) { + memcpy((void *)zmq_msg_data(&msg), sbuf.data, sbuf.size); + if (zmq_msg_send(&msg, thd->zmq_sock, 0) == -1) { + print_error("zmq_msg_send() error"); + } + zmq_msg_close(&msg); + } + } + msgpack_sbuffer_destroy(&sbuf); + } + /* make result */ + { + int content_length; + if (callback) { + evhttp_add_header(output_headers, + "Content-Type", "text/javascript; charset=UTF-8"); + content_length = strlen(callback); + evbuffer_add(res_buf, callback, content_length); + evbuffer_add(res_buf, "(", 1); + content_length += suggest_result(thd->ctx, + res_buf, types, query, target_name, + frequency_threshold, + conditional_probability_threshold, + limit, + &(thd->cmd_buf), + &(thd->pass_through_parameters)) + 3; + evbuffer_add(res_buf, ");", 2); + } else { + evhttp_add_header(output_headers, + "Content-Type", "application/json; charset=UTF-8"); + content_length = suggest_result(thd->ctx, + res_buf, types, query, target_name, + frequency_threshold, + conditional_probability_threshold, + limit, + &(thd->cmd_buf), + &(thd->pass_through_parameters)); + } + if (content_length >= 0) { +#define NUM_BUF_SIZE 16 + char num_buf[NUM_BUF_SIZE]; + grn_snprintf(num_buf, NUM_BUF_SIZE, NUM_BUF_SIZE, "%d", content_length); + evhttp_add_header(output_headers, "Content-Length", num_buf); +#undef NUM_BUF_SIZE + } + } +} + +static void +cleanup_httpd_thread(thd_data *thd) { + if (thd->log_file) { + fclose(thd->log_file); + } + if (thd->httpd) { + evhttp_free(thd->httpd); + } + if (thd->zmq_sock) { + zmq_close(thd->zmq_sock); + } + grn_obj_unlink(thd->ctx, &(thd->cmd_buf)); + grn_obj_unlink(thd->ctx, &(thd->pass_through_parameters)); + if (thd->ctx) { + grn_ctx_close(thd->ctx); + } + event_base_free(thd->base); +} + +static void +close_log_file(thd_data *thread) +{ + fclose(thread->log_file); + thread->log_file = NULL; + thread->request_reopen_log_file = GRN_FALSE; +} + +static void +generic_handler(struct evhttp_request *req, void *arg) +{ + struct evkeyvalq args; + thd_data *thd = arg; + + if (!loop) { + event_base_loopexit(thd->base, NULL); + return; + } + if (!req->uri) { return; } + + evhttp_parse_query(req->uri, &args); + { + struct evbuffer *res_buf; + if (!(res_buf = evbuffer_new())) { + err(1, "failed to create response buffer"); + } + + evhttp_add_header(req->output_headers, "Connection", "close"); + + log_send(req->output_headers, res_buf, thd, &args); + evhttp_send_reply(req, HTTP_OK, "OK", res_buf); + evbuffer_free(res_buf); + /* logging */ + { + if (thd->log_base_path) { + if (thd->log_file && thd->request_reopen_log_file) { + close_log_file(thd); + } + if (!thd->log_file) { + time_t n; + struct tm *t_st; + char p[PATH_MAX + 1]; + + time(&n); + t_st = localtime(&n); + + grn_snprintf(p, + PATH_MAX, + PATH_MAX, + "%s%04d%02d%02d%02d%02d%02d-%02d", + thd->log_base_path, + t_st->tm_year + 1900, + t_st->tm_mon + 1, + t_st->tm_mday, + t_st->tm_hour, + t_st->tm_min, + t_st->tm_sec, + thd->thread_id); + + if (!(thd->log_file = fopen(p, "a"))) { + print_error("cannot open log_file %s.", p); + } else { + thd->log_count = 0; + } + } + if (thd->log_file) { + fprintf(thd->log_file, "%s\n", req->uri); + thd->log_count++; + if (n_lines_per_log_file > 0 && + thd->log_count >= n_lines_per_log_file) { + close_log_file(thd); + } + } + } + } + } + evhttp_clear_headers(&args); +} + +static int +bind_socket(int port) +{ + int nfd; + if ((nfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + print_error("cannot open socket for http."); + return -1; + } else { + int r, one = 1; + struct sockaddr_in addr; + + r = setsockopt(nfd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(int)); + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + + if ((r = bind(nfd, (struct sockaddr *)&addr, sizeof(addr))) < 0) { + print_error("cannot bind socket for http."); + return r; + } + if ((r = listen(nfd, LISTEN_BACKLOG)) < 0) { + print_error("cannot listen socket for http."); + return r; + } + if ((r = fcntl(nfd, F_GETFL, 0)) < 0 || fcntl(nfd, F_SETFL, r | O_NONBLOCK) < 0 ) { + print_error("cannot fcntl socket for http."); + return -1; + } + return nfd; + } +} + +static void +signal_handler(int sig) +{ + loop = 0; +} + +static void +signal_reopen_log_file(int sig) +{ + uint32_t i; + + for (i = 0; i < max_threads; i++) { + threads[i].request_reopen_log_file = GRN_TRUE; + } +} + +void +timeout_handler(int fd, short events, void *arg) { + thd_data *thd = arg; + if (!loop) { + event_base_loopexit(thd->base, NULL); + } else { + struct timeval tv = {1, 0}; + evtimer_add(&(thd->pulse), &tv); + } +} + +static void * +dispatch(void *arg) +{ + event_base_dispatch((struct event_base *)arg); + return NULL; +} + +static void +msgpack2json(msgpack_object *o, grn_ctx *ctx, grn_obj *buf) +{ + switch (o->type) { + case MSGPACK_OBJECT_POSITIVE_INTEGER: + grn_text_ulltoa(ctx, buf, o->via.u64); + break; + case MSGPACK_OBJECT_STR: + grn_text_esc(ctx, buf, + MSGPACK_OBJECT_STR_PTR(o), + MSGPACK_OBJECT_STR_SIZE(o)); + break; + case MSGPACK_OBJECT_ARRAY: + GRN_TEXT_PUTC(ctx, buf, '['); + { + int i; + for (i = 0; i < o->via.array.size; i++) { + msgpack2json(o->via.array.ptr, ctx, buf); + } + } + GRN_TEXT_PUTC(ctx, buf, ']'); + break; + case MSGPACK_OBJECT_FLOAT: + grn_text_ftoa(ctx, buf, MSGPACK_OBJECT_FLOAT_VALUE(o)); + break; + default: + print_error("cannot handle this msgpack type."); + } +} + +static void +load_from_learner(msgpack_object *o, grn_ctx *ctx, grn_obj *cmd_buf) +{ + if (o->type == MSGPACK_OBJECT_MAP && o->via.map.size) { + msgpack_object_kv *kv; + msgpack_object *key; + msgpack_object *value; + kv = &(o->via.map.ptr[0]); + key = &(kv->key); + value = &(kv->val); + if (key->type == MSGPACK_OBJECT_STR && MSGPACK_OBJECT_STR_SIZE(key) == 6 && + !memcmp(MSGPACK_OBJECT_STR_PTR(key), CONST_STR_LEN("target"))) { + if (value->type == MSGPACK_OBJECT_STR) { + int i; + GRN_BULK_REWIND(cmd_buf); + GRN_TEXT_PUTS(ctx, cmd_buf, "load --table "); + GRN_TEXT_PUT(ctx, cmd_buf, + MSGPACK_OBJECT_STR_PTR(value), + MSGPACK_OBJECT_STR_SIZE(value)); + grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), GRN_CTX_MORE); + grn_ctx_send(ctx, CONST_STR_LEN("["), GRN_CTX_MORE); + if (MSGPACK_OBJECT_STR_SIZE(value) > 5) { + if (!memcmp(MSGPACK_OBJECT_STR_PTR(value), CONST_STR_LEN("item_")) || + !memcmp(MSGPACK_OBJECT_STR_PTR(value), CONST_STR_LEN("pair_"))) { + char delim = '{'; + GRN_BULK_REWIND(cmd_buf); + for (i = 1; i < o->via.map.size; i++) { + GRN_TEXT_PUTC(ctx, cmd_buf, delim); + kv = &(o->via.map.ptr[i]); + msgpack2json(&(kv->key), ctx, cmd_buf); + GRN_TEXT_PUTC(ctx, cmd_buf, ':'); + msgpack2json(&(kv->val), ctx, cmd_buf); + delim = ','; + } + GRN_TEXT_PUTC(ctx, cmd_buf, '}'); + /* printf("msg: %.*s\n", GRN_TEXT_LEN(cmd_buf), GRN_TEXT_VALUE(cmd_buf)); */ + grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), GRN_CTX_MORE); + } + } + grn_ctx_send(ctx, CONST_STR_LEN("]"), 0); + { + char *res; + int flags; + unsigned int res_len; + grn_ctx_recv(ctx, &res, &res_len, &flags); + } + } + } + } +} + +static void +recv_handler(grn_ctx *ctx, void *zmq_recv_sock, msgpack_zone *mempool, grn_obj *cmd_buf) +{ + zmq_msg_t msg; + + if (zmq_msg_init(&msg)) { + print_error("cannot init zmq message."); + } else { + if (zmq_msg_recv(&msg, zmq_recv_sock, 0) == -1) { + print_error("cannot recv zmq message."); + } else { + msgpack_object obj; + msgpack_unpack_return ret; + + ret = msgpack_unpack(zmq_msg_data(&msg), zmq_msg_size(&msg), NULL, mempool, &obj); + if (MSGPACK_UNPACK_SUCCESS == ret) { + load_from_learner(&obj, ctx, cmd_buf); + } else { + print_error("invalid recv data."); + } + msgpack_zone_clear(mempool); + } + zmq_msg_close(&msg); + } +} + +static void * +recv_from_learner(void *arg) +{ + void *zmq_recv_sock; + recv_thd_data *thd = arg; + + if ((zmq_recv_sock = zmq_socket(thd->zmq_ctx, ZMQ_SUB))) { + if (!zmq_connect(zmq_recv_sock, thd->recv_endpoint)) { + grn_ctx ctx; + if (!grn_ctx_init(&ctx, 0)) { + if ((!grn_ctx_use(&ctx, db))) { + msgpack_zone *mempool; + if ((mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) { + grn_obj cmd_buf; + zmq_pollitem_t items[] = { + { zmq_recv_sock, 0, ZMQ_POLLIN, 0} + }; + GRN_TEXT_INIT(&cmd_buf, 0); + zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0); + while (loop) { + zmq_poll(items, 1, 10000); + if (items[0].revents & ZMQ_POLLIN) { + recv_handler(&ctx, zmq_recv_sock, mempool, &cmd_buf); + } + } + grn_obj_unlink(&ctx, &cmd_buf); + msgpack_zone_free(mempool); + } else { + print_error("cannot create msgpack zone."); + } + /* db_close */ + } else { + print_error("error in grn_db_open() on recv thread."); + } + grn_ctx_fin(&ctx); + } else { + print_error("error in grn_ctx_init() on recv thread."); + } + } else { + print_error("cannot create recv zmq_socket."); + } + } else { + print_error("cannot connect zmq_socket."); + } + return NULL; +} + +static int +serve_threads(int nthreads, int port, const char *db_path, void *zmq_ctx, + const char *send_endpoint, const char *recv_endpoint, + const char *log_base_path) +{ + int nfd; + uint32_t i; + if ((nfd = bind_socket(port)) < 0) { + print_error("cannot bind socket. please check port number with netstat."); + return -1; + } + + for (i = 0; i < nthreads; i++) { + memset(&threads[i], 0, sizeof(threads[i])); + threads[i].request_reopen_log_file = GRN_FALSE; + if (!(threads[i].base = event_init())) { + print_error("error in event_init() on thread %d.", i); + } else { + if (!(threads[i].httpd = evhttp_new(threads[i].base))) { + print_error("error in evhttp_new() on thread %d.", i); + } else { + int r; + if ((r = evhttp_accept_socket(threads[i].httpd, nfd))) { + print_error("error in evhttp_accept_socket() on thread %d.", i); + } else { + if (send_endpoint) { + if (!(threads[i].zmq_sock = zmq_socket(zmq_ctx, ZMQ_PUB))) { + print_error("cannot create zmq_socket."); + } else if (zmq_connect(threads[i].zmq_sock, send_endpoint)) { + print_error("cannot connect zmq_socket."); + zmq_close(threads[i].zmq_sock); + threads[i].zmq_sock = NULL; + } else { + uint64_t hwm = 1; + zmq_setsockopt(threads[i].zmq_sock, ZMQ_SNDHWM, &hwm, sizeof(uint64_t)); + } + } else { + threads[i].zmq_sock = NULL; + } + if (!(threads[i].ctx = grn_ctx_open(0))) { + print_error("error in grn_ctx_open() on thread %d.", i); + } else if (grn_ctx_use(threads[i].ctx, db)) { + print_error("error in grn_db_open() on thread %d.", i); + } else { + GRN_TEXT_INIT(&(threads[i].cmd_buf), 0); + GRN_TEXT_INIT(&(threads[i].pass_through_parameters), 0); + threads[i].log_base_path = log_base_path; + threads[i].thread_id = i; + evhttp_set_gencb(threads[i].httpd, generic_handler, &threads[i]); + evhttp_set_timeout(threads[i].httpd, 10); + { + struct timeval tv = {1, 0}; + evtimer_set(&(threads[i].pulse), timeout_handler, &threads[i]); + evtimer_add(&(threads[i].pulse), &tv); + } + if ((r = pthread_create(&(threads[i].thd), NULL, dispatch, threads[i].base))) { + print_error("error in pthread_create() on thread %d.", i); + } + } + } + } + } + } + + /* recv thread from learner */ + if (recv_endpoint) { + recv_thd_data rthd; + rthd.db_path = db_path; + rthd.recv_endpoint = recv_endpoint; + rthd.zmq_ctx = zmq_ctx; + + if (pthread_create(&(rthd.thd), NULL, recv_from_learner, &rthd)) { + print_error("error in pthread_create() on thread %d.", i); + } + if (pthread_join(rthd.thd, NULL)) { + print_error("error in pthread_join() on thread %d.", i); + } + } else { + while (loop) { sleep(1); } + } + + /* join all httpd thread */ + for (i = 0; i < nthreads; i++) { + if (threads[i].thd) { + if (pthread_join(threads[i].thd, NULL)) { + print_error("error in pthread_join() on thread %d.", i); + } + } + cleanup_httpd_thread(&(threads[i])); + } + return 0; +} + +static uint32_t +get_core_number(void) +{ +#ifdef ACTUALLY_GET_CORE_NUMBER +#ifdef _SC_NPROCESSORS_CONF + return sysconf(_SC_NPROCESSORS_CONF); +#else /* _SC_NPROCESSORS_CONF */ + int n_processors; + size_t length = sizeof(n_processors); + int mib[] = {CTL_HW, HW_NCPU}; + if (sysctl(mib, sizeof(mib) / sizeof(mib[0]), + &n_processors, &length, NULL, 0) == 0 && + length == sizeof(n_processors) && + 0 < n_processors) { + return n_processors; + } else { + return 1; + } +#endif /* _SC_NPROCESSORS_CONF */ +#endif /* ACTUALLY_GET_CORE_NUMBER */ + return 0; +} + +static void +usage(FILE *output) +{ + fprintf( + output, + "Usage: groonga-suggest-httpd [options...] db_path\n" + "db_path:\n" + " specify groonga database path which is used for suggestion.\n" + "\n" + "options:\n" + " -p, --port <port number> : http server port number\n" + " (default: %d)\n" + /* + " --address <ip/hostname> : server address to listen\n" + " (default: %s)\n" + */ + " -c <thread number> : number of server threads\n" + " (deprecated. use --n-threads)\n" + " -t, --n-threads <thread number> : number of server threads\n" + " (default: %d)\n" + " -s, --send-endpoint <send endpoint> : send endpoint\n" + " (ex. tcp://example.com:1234)\n" + " -r, --receive-endpoint <receive endpoint> : receive endpoint\n" + " (ex. tcp://example.com:1235)\n" + " -l, --log-base-path <path prefix> : log path prefix\n" + " --n-lines-per-log-file <lines number> : number of lines in a log file\n" + " use 0 for disabling this\n" + " (default: %d)\n" + " -d, --daemon : daemonize\n" + " --disable-max-fd-check : disable max FD check on start\n" + " -h, --help : show this message\n", + DEFAULT_PORT, default_max_threads, n_lines_per_log_file); +} + +int +main(int argc, char **argv) +{ + int port_no = DEFAULT_PORT; + const char *max_threads_string = NULL, *port_string = NULL; + const char *address; + const char *send_endpoint = NULL, *recv_endpoint = NULL, *log_base_path = NULL; + const char *n_lines_per_log_file_string = NULL; + int n_processed_args, flags = RUN_MODE_ENABLE_MAX_FD_CHECK; + run_mode mode = run_mode_none; + + if (!(default_max_threads = get_core_number())) { + default_max_threads = DEFAULT_MAX_THREADS; + } + + /* parse options */ + { + static grn_str_getopt_opt opts[] = { + {'c', NULL, NULL, 0, GETOPT_OP_NONE}, /* deprecated */ + {'t', "n-threads", NULL, 0, GETOPT_OP_NONE}, + {'h', "help", NULL, run_mode_usage, GETOPT_OP_UPDATE}, + {'p', "port", NULL, 0, GETOPT_OP_NONE}, + {'\0', "bind-address", NULL, 0, GETOPT_OP_NONE}, /* not supported yet */ + {'s', "send-endpoint", NULL, 0, GETOPT_OP_NONE}, + {'r', "receive-endpoint", NULL, 0, GETOPT_OP_NONE}, + {'l', "log-base-path", NULL, 0, GETOPT_OP_NONE}, + {'\0', "n-lines-per-log-file", NULL, 0, GETOPT_OP_NONE}, + {'d', "daemon", NULL, run_mode_daemon, GETOPT_OP_UPDATE}, + {'\0', "disable-max-fd-check", NULL, RUN_MODE_ENABLE_MAX_FD_CHECK, + GETOPT_OP_OFF}, + {'\0', NULL, NULL, 0, 0} + }; + opts[0].arg = &max_threads_string; + opts[1].arg = &max_threads_string; + opts[3].arg = &port_string; + opts[4].arg = &address; + opts[5].arg = &send_endpoint; + opts[6].arg = &recv_endpoint; + opts[7].arg = &log_base_path; + opts[8].arg = &n_lines_per_log_file_string; + + n_processed_args = grn_str_getopt(argc, argv, opts, &flags); + } + + /* main */ + mode = (flags & RUN_MODE_MASK); + if (n_processed_args < 0 || + (argc - n_processed_args) != 1 || + mode == run_mode_error) { + usage(stderr); + return EXIT_FAILURE; + } else if (mode == run_mode_usage) { + usage(stdout); + return EXIT_SUCCESS; + } else { + grn_ctx ctx; + void *zmq_ctx; + + if (max_threads_string) { + max_threads = atoi(max_threads_string); + if (max_threads > MAX_THREADS) { + print_error("too many threads. limit to %d.", MAX_THREADS); + max_threads = MAX_THREADS; + } + } else { + max_threads = default_max_threads; + } + + if (port_string) { + port_no = atoi(port_string); + } + + if (flags & RUN_MODE_ENABLE_MAX_FD_CHECK) { + /* check environment */ + struct rlimit rlim; + if (!getrlimit(RLIMIT_NOFILE, &rlim)) { + if (rlim.rlim_max < MIN_MAX_FDS) { + print_error("too small max fds. %d required.", MIN_MAX_FDS); + return -1; + } + rlim.rlim_cur = rlim.rlim_cur; + setrlimit(RLIMIT_NOFILE, &rlim); + } + } + + if (n_lines_per_log_file_string) { + int64_t n_lines; + n_lines = grn_atoll(n_lines_per_log_file_string, + n_lines_per_log_file_string + strlen(n_lines_per_log_file_string), + NULL); + if (n_lines < 0) { + print_error("--n-lines-per-log-file must be >= 0: <%s>", + n_lines_per_log_file_string); + return(EXIT_FAILURE); + } + if (n_lines > UINT32_MAX) { + print_error("--n-lines-per-log-file must be <= %ld: <%s>", + UINT32_MAX, n_lines_per_log_file_string); + return(EXIT_FAILURE); + } + n_lines_per_log_file = (uint32_t)n_lines; + } + + if (mode == run_mode_daemon) { + daemonize(); + } + + grn_init(); + grn_ctx_init(&ctx, 0); + if ((db = grn_db_open(&ctx, argv[n_processed_args]))) { + if ((zmq_ctx = zmq_init(1))) { + signal(SIGTERM, signal_handler); + signal(SIGINT, signal_handler); + signal(SIGQUIT, signal_handler); + signal(SIGUSR1, signal_reopen_log_file); + + serve_threads(max_threads, port_no, argv[n_processed_args], zmq_ctx, + send_endpoint, recv_endpoint, log_base_path); + zmq_term(zmq_ctx); + } else { + print_error("cannot create zmq context."); + } + grn_obj_close(&ctx, db); + } else { + print_error("cannot open db."); + } + grn_ctx_fin(&ctx); + grn_fin(); + } + return 0; +} diff --git a/storage/mroonga/vendor/groonga/src/suggest/groonga_suggest_learner.c b/storage/mroonga/vendor/groonga/src/suggest/groonga_suggest_learner.c new file mode 100644 index 00000000..8109ae7f --- /dev/null +++ b/storage/mroonga/vendor/groonga/src/suggest/groonga_suggest_learner.c @@ -0,0 +1,843 @@ +/* -*- c-basic-offset: 2 -*- */ +/* Copyright(C) 2010-2015 Brazil + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA +*/ + +/* for grn_str_getopt() */ +#include <grn_str.h> +#include <grn_msgpack.h> + +#include "zmq_compatible.h" +#include <stdio.h> +#include <signal.h> +#include <unistd.h> +#include <pthread.h> +#include <groonga.h> +#include <inttypes.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <dirent.h> + +#include "util.h" + +#include <evhttp.h> + +#define DEFAULT_RECV_ENDPOINT "tcp://*:1234" +#define DEFAULT_SEND_ENDPOINT "tcp://*:1235" +#define SEND_WAIT 1000 /* 0.001sec */ + +#define CONST_STR_LEN(x) x, x ? sizeof(x) - 1 : 0 + +typedef enum { + RUN_MODE_NONE = 0x00, + RUN_MODE_USAGE = 0x01, + RUN_MODE_DAEMON = 0x02, + RUN_MODE_ERROR = 0x04 +} run_mode; + +#define RUN_MODE_MASK 0x007f + +typedef struct { + const char *db_path; + const char *send_endpoint; + pthread_t thd; + void *zmq_ctx; +} send_thd_data; + +static volatile sig_atomic_t loop = 1; + +static void +load_to_groonga(grn_ctx *ctx, + grn_obj *buf, + const char *query, uint32_t query_len, + const char *client_id, uint32_t client_id_len, + const char *learn_target_name, uint32_t learn_target_name_len, + uint64_t millisec, + int submit) +{ + GRN_BULK_REWIND(buf); + GRN_TEXT_PUTS(ctx, buf, "load --table event_"); + GRN_TEXT_PUT(ctx, buf, learn_target_name, learn_target_name_len); + GRN_TEXT_PUTS(ctx, buf, " --each 'suggest_preparer(_id,type,item,sequence,time,pair_"); + GRN_TEXT_PUT(ctx, buf, learn_target_name, learn_target_name_len); + GRN_TEXT_PUTS(ctx, buf, ")'"); + grn_ctx_send(ctx, GRN_TEXT_VALUE(buf), GRN_TEXT_LEN(buf), GRN_CTX_MORE); + grn_ctx_send(ctx, CONST_STR_LEN("["), GRN_CTX_MORE); + + GRN_BULK_REWIND(buf); + GRN_TEXT_PUTS(ctx, buf, "{\"item\":"); + grn_text_esc(ctx, buf, query, query_len); + GRN_TEXT_PUTS(ctx, buf, ",\"sequence\":"); + grn_text_esc(ctx, buf, client_id, client_id_len); + GRN_TEXT_PUTS(ctx, buf, ",\"time\":"); + grn_text_ftoa(ctx, buf, (double)millisec / 1000); + if (submit) { + GRN_TEXT_PUTS(ctx, buf, ",\"type\":\"submit\"}"); + } else { + GRN_TEXT_PUTS(ctx, buf, "}"); + } + /* printf("%.*s\n", GRN_TEXT_LEN(buf), GRN_TEXT_VALUE(buf)); */ + grn_ctx_send(ctx, GRN_TEXT_VALUE(buf), GRN_TEXT_LEN(buf), GRN_CTX_MORE); + + grn_ctx_send(ctx, CONST_STR_LEN("]"), 0); + + { + char *res; + int flags; + unsigned int res_len; + grn_ctx_recv(ctx, &res, &res_len, &flags); + } +} + +void +load_to_multi_targets(grn_ctx *ctx, + grn_obj *buf, + const char *query, uint32_t query_len, + const char *client_id, uint32_t client_id_len, + const char *learn_target_names, + uint32_t learn_target_names_len, + uint64_t millisec, + int submit) +{ + if (millisec && query && client_id && learn_target_names) { + unsigned int tn_len; + const char *tn, *tnp, *tne; + tn = tnp = learn_target_names; + tne = learn_target_names + learn_target_names_len; + while (tnp <= tne) { + if (tnp == tne || *tnp == '|') { + tn_len = tnp - tn; + + /* + printf("sec: %" PRIu64 " query %.*s client_id: %.*s target: %.*s\n", + millisec, + query_len, query, + client_id_len, client_id, + tn_len, tn); + */ + load_to_groonga(ctx, buf, query, query_len, client_id, client_id_len, + tn, tn_len, millisec, submit); + + tn = ++tnp; + } else { + tnp++; + } + } + } +} + +#define PACK_KEY_FROM_ID(id) do { \ + int _k_len; \ + char _k_buf[GRN_TABLE_MAX_KEY_SIZE]; \ + _k_len = grn_table_get_key(ctx, ref_table, (id), _k_buf, GRN_TABLE_MAX_KEY_SIZE); \ + msgpack_pack_str(&pk, _k_len); \ + msgpack_pack_str_body(&pk, _k_buf, _k_len); \ +} while (0) + +#define PACK_MAP_ITEM(col_name) do { \ + grn_obj _v; \ + msgpack_pack_str(&pk, sizeof(#col_name) - 1); \ + msgpack_pack_str_body(&pk, #col_name, sizeof(#col_name) - 1); \ + switch (col_##col_name->header.type) { \ + case GRN_COLUMN_FIX_SIZE: \ + GRN_VALUE_FIX_SIZE_INIT(&_v, 0, grn_obj_get_range(ctx, col_##col_name)); \ + break; \ + case GRN_COLUMN_VAR_SIZE: \ + if ((col_##col_name->header.flags & GRN_OBJ_COLUMN_TYPE_MASK) == GRN_OBJ_COLUMN_VECTOR) { \ + GRN_VALUE_FIX_SIZE_INIT(&_v, GRN_OBJ_VECTOR, grn_obj_get_range(ctx, col_##col_name)); \ + } else { \ + GRN_VALUE_VAR_SIZE_INIT(&_v, 0, grn_obj_get_range(ctx, col_##col_name)); \ + } \ + break; \ + } \ + grn_obj_get_value(ctx, col_##col_name, rec_id, &_v); \ + \ + switch (_v.header.type) { \ + case GRN_BULK: \ + switch (_v.header.domain) { \ + case GRN_DB_SHORT_TEXT: \ + msgpack_pack_str(&pk, GRN_TEXT_LEN(&_v)); \ + msgpack_pack_str_body(&pk, GRN_TEXT_VALUE(&_v), GRN_TEXT_LEN(&_v)); \ + break; \ + case GRN_DB_INT32: \ + msgpack_pack_int32(&pk, GRN_INT32_VALUE(&_v)); \ + break; \ + case GRN_DB_UINT32: \ + msgpack_pack_uint32(&pk, GRN_UINT32_VALUE(&_v)); \ + break; \ + case GRN_DB_TIME: \ + msgpack_pack_double(&pk, (double)GRN_TIME_VALUE(&_v) / GRN_TIME_USEC_PER_SEC); \ + break; \ + default: /* ref. to ShortText key */ \ + PACK_KEY_FROM_ID(GRN_RECORD_VALUE(&_v)); \ + } \ + break; \ + case GRN_UVECTOR: /* ref.s to ShortText key */ \ + { \ + grn_id *_idv = (grn_id *)GRN_BULK_HEAD(&_v), *_idve = (grn_id *)GRN_BULK_CURR(&_v); \ + msgpack_pack_array(&pk, _idve - _idv); \ + for (; _idv < _idve; _idv++) { \ + PACK_KEY_FROM_ID(*_idv); \ + } \ + } \ + break; \ + default: \ + print_error("invalid groonga object type(%d) for msgpack.", _v.header.type); \ + msgpack_pack_nil(&pk); \ + break; \ + } \ + grn_obj_close(ctx, &_v); \ +} while (0) + +static int +zmq_send_to_httpd(void *zmq_send_sock, void *data, size_t size) +{ + zmq_msg_t msg; + if (!zmq_msg_init_size(&msg, size)) { + memcpy((void *)zmq_msg_data(&msg), data, size); + if (zmq_msg_send(&msg, zmq_send_sock, 0) == -1) { + print_error("zmq_send() error"); + return -1; + } + zmq_msg_close(&msg); + } else { + print_error("zmq_msg_init_size() error"); + } + return 0; +} + +static void +send_handler(void *zmq_send_sock, grn_ctx *ctx) +{ + grn_table_cursor *cur; + if ((cur = grn_table_cursor_open(ctx, grn_ctx_db(ctx), NULL, 0, NULL, 0, + 0, -1, 0))) { + grn_id table_id; + while (loop && (table_id = grn_table_cursor_next(ctx, cur)) != GRN_ID_NIL) { + grn_obj *table; + if ((table = grn_ctx_at(ctx, table_id))) { + int name_len; + char name_buf[GRN_TABLE_MAX_KEY_SIZE]; + + name_len = grn_obj_name(ctx, table, name_buf, + GRN_TABLE_MAX_KEY_SIZE); + + if (name_len > 5) { + if (table->header.type == GRN_TABLE_PAT_KEY && + !memcmp(name_buf, CONST_STR_LEN("item_"))) { + /* ["_key","ShortText"],["last","Time"],["kana","kana"],["freq2","Int32"],["freq","Int32"],["co","pair_all"],["buzz","Int32"],["boost","Int32"] */ + grn_obj *ref_table; + grn_table_cursor *tc; + grn_obj *col_last, *col_kana, *col_freq, *col_freq2, + *col_buzz, *col_boost; + + col_kana = grn_obj_column(ctx, table, CONST_STR_LEN("kana")); + col_freq = grn_obj_column(ctx, table, CONST_STR_LEN("freq")); + col_last = grn_obj_column(ctx, table, CONST_STR_LEN("last")); + col_boost = grn_obj_column(ctx, table, CONST_STR_LEN("boost")); + col_freq2 = grn_obj_column(ctx, table, CONST_STR_LEN("freq2")); + col_buzz = grn_obj_column(ctx, table, CONST_STR_LEN("buzz")); + + ref_table = grn_ctx_at(ctx, grn_obj_get_range(ctx, col_kana)); + + if ((tc = grn_table_cursor_open(ctx, table, NULL, 0, NULL, + 0, 0, -1, 0))) { + grn_id rec_id; + while (loop && (rec_id = grn_table_cursor_next(ctx, tc)) + != GRN_ID_NIL) { + char *key; + size_t key_len; + msgpack_packer pk; + msgpack_sbuffer sbuf; + + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&pk, 8); + + /* ["_key","ShortText"],["last","Time"],["kana","kana"],["freq2","Int32"],["freq","Int32"],["co","pair_all"],["buzz","Int32"],["boost","Int32"] */ + msgpack_pack_str(&pk, 6); + msgpack_pack_str_body(&pk, "target", strlen("target")); + msgpack_pack_str(&pk, name_len); + msgpack_pack_str_body(&pk, name_buf, name_len); + + msgpack_pack_str(&pk, 4); + msgpack_pack_str_body(&pk, + GRN_COLUMN_NAME_KEY, + GRN_COLUMN_NAME_KEY_LEN); + key_len = grn_table_cursor_get_key(ctx, tc, (void **)&key); + msgpack_pack_str(&pk, key_len); + msgpack_pack_str_body(&pk, key, key_len); + + PACK_MAP_ITEM(last); + PACK_MAP_ITEM(kana); + PACK_MAP_ITEM(freq); + PACK_MAP_ITEM(freq2); + PACK_MAP_ITEM(buzz); + PACK_MAP_ITEM(boost); + + zmq_send_to_httpd(zmq_send_sock, sbuf.data, sbuf.size); + + usleep(SEND_WAIT); + + msgpack_sbuffer_destroy(&sbuf); + } + grn_table_cursor_close(ctx, tc); + } + } else if (table->header.type == GRN_TABLE_HASH_KEY && + !memcmp(name_buf, CONST_STR_LEN("pair_"))) { + grn_obj *ref_table; + grn_table_cursor *tc; + grn_obj *col_pre, *col_post, *col_freq0, *col_freq1, *col_freq2; + + col_pre = grn_obj_column(ctx, table, CONST_STR_LEN("pre")); + col_post = grn_obj_column(ctx, table, CONST_STR_LEN("post")); + col_freq0 = grn_obj_column(ctx, table, CONST_STR_LEN("freq0")); + col_freq1 = grn_obj_column(ctx, table, CONST_STR_LEN("freq1")); + col_freq2 = grn_obj_column(ctx, table, CONST_STR_LEN("freq2")); + + ref_table = grn_ctx_at(ctx, grn_obj_get_range(ctx, col_pre)); + + if ((tc = grn_table_cursor_open(ctx, table, NULL, 0, NULL, + 0, 0, -1, 0))) { + grn_id rec_id; + while (loop && (rec_id = grn_table_cursor_next(ctx, tc)) + != GRN_ID_NIL) { + uint64_t *key; + msgpack_packer pk; + msgpack_sbuffer sbuf; + + /* skip freq0 == 0 && freq1 == 0 && freq2 == 0 */ + { + grn_obj f; + grn_obj_get_value(ctx, col_freq0, rec_id, &f); + if (!GRN_INT32_VALUE(&f)) { + grn_obj_get_value(ctx, col_freq1, rec_id, &f); + if (!GRN_INT32_VALUE(&f)) { + grn_obj_get_value(ctx, col_freq2, rec_id, &f); + if (!GRN_INT32_VALUE(&f)) { continue; } + } + } + } + + /* make pair_* message */ + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&pk, 7); + /* ["_key","UInt64"],["pre","item_all"],["post","item_all"],["freq2","Int32"],["freq1","Int32"],["freq0","Int32"] */ + + msgpack_pack_str(&pk, 6); + msgpack_pack_str_body(&pk, "target", strlen("target")); + msgpack_pack_str(&pk, name_len); + msgpack_pack_str_body(&pk, name_buf, name_len); + + msgpack_pack_str(&pk, 4); + msgpack_pack_str_body(&pk, + GRN_COLUMN_NAME_KEY, + GRN_COLUMN_NAME_KEY_LEN); + grn_table_cursor_get_key(ctx, tc, (void **)&key); + msgpack_pack_uint64(&pk, *key); + + PACK_MAP_ITEM(pre); + PACK_MAP_ITEM(post); + PACK_MAP_ITEM(freq0); + PACK_MAP_ITEM(freq1); + PACK_MAP_ITEM(freq2); + + zmq_send_to_httpd(zmq_send_sock, sbuf.data, sbuf.size); + + usleep(SEND_WAIT); + + msgpack_sbuffer_destroy(&sbuf); + } + grn_table_cursor_close(ctx, tc); + } + } + } + grn_obj_unlink(ctx, table); + } + } + grn_table_cursor_close(ctx, cur); + } +} + +static void * +send_to_httpd(void *arg) +{ + send_thd_data *thd = arg; + void *zmq_send_sock; + if ((zmq_send_sock = zmq_socket(thd->zmq_ctx, ZMQ_PUB))) { + if (!zmq_bind(zmq_send_sock, thd->send_endpoint)) { + grn_ctx ctx; + if (!(grn_ctx_init(&ctx, 0))) { + grn_obj *db; + if ((db = grn_db_open(&ctx, thd->db_path))) { + uint64_t hwm = 1; + zmq_setsockopt(zmq_send_sock, ZMQ_SNDHWM, &hwm, sizeof(uint64_t)); + while (loop) { + send_handler(zmq_send_sock, &ctx); + } + grn_obj_close(&ctx, db); + } else { + print_error("error in grn_db_open() on send thread."); + } + grn_ctx_fin(&ctx); + } else { + print_error("error in grn_ctx_init() on send thread."); + } + } else { + print_error("cannot bind zmq_socket."); + } + } else { + print_error("cannot create zmq_socket."); + } + return NULL; +} + +static void +handle_msg(msgpack_object *obj, grn_ctx *ctx, grn_obj *buf) +{ + int submit_flag = 0; + uint64_t millisec = 0; + const char *query = NULL, + *client_id = NULL, *learn_target_names = NULL; + uint32_t query_len = 0, client_id_len = 0, learn_target_names_len = 0; + if (obj->type == MSGPACK_OBJECT_MAP) { + int i; + for (i = 0; i < obj->via.map.size; i++) { + msgpack_object_kv *kv; + msgpack_object *key; + msgpack_object *value; + kv = &(obj->via.map.ptr[i]); + key = &(kv->key); + value = &(kv->val); + if (key->type == MSGPACK_OBJECT_STR && MSGPACK_OBJECT_STR_SIZE(key) > 0) { + switch (MSGPACK_OBJECT_STR_PTR(key)[0]) { + case 'i': + if (value->type == MSGPACK_OBJECT_STR) { + client_id_len = MSGPACK_OBJECT_STR_SIZE(value); + client_id = MSGPACK_OBJECT_STR_PTR(value); + } + break; + case 'q': + if (value->type == MSGPACK_OBJECT_STR) { + query_len = MSGPACK_OBJECT_STR_SIZE(value); + query = MSGPACK_OBJECT_STR_PTR(value); + } + break; + case 'l': + if (value->type == MSGPACK_OBJECT_STR) { + learn_target_names_len = MSGPACK_OBJECT_STR_SIZE(value); + learn_target_names = MSGPACK_OBJECT_STR_PTR(value); + } + break; + case 's': + if (kv->val.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + millisec = kv->val.via.u64; + } + break; + case 't': + if (kv->val.type == MSGPACK_OBJECT_BOOLEAN) { + submit_flag = (kv->val.via.boolean ? 1 : 0); + } + break; + default: + break; + } + } + } + load_to_multi_targets(ctx, buf, query, query_len, + client_id, client_id_len, + learn_target_names, learn_target_names_len, + millisec, submit_flag); + } +} + +static void +recv_event_loop(msgpack_zone *mempool, void *zmq_sock, grn_ctx *ctx) +{ + grn_obj buf; + zmq_pollitem_t items[] = { + { zmq_sock, 0, ZMQ_POLLIN, 0} + }; + GRN_TEXT_INIT(&buf, 0); + while (loop) { + zmq_poll(items, 1, 10000); + if (items[0].revents & ZMQ_POLLIN) { /* always true */ + zmq_msg_t msg; + if (zmq_msg_init(&msg)) { + print_error("cannot init zmq message."); + } else { + if (zmq_msg_recv(&msg, zmq_sock, 0) == -1) { + print_error("cannot recv zmq message."); + } else { + msgpack_object obj; + msgpack_unpack_return ret; + ret = msgpack_unpack(zmq_msg_data(&msg), zmq_msg_size(&msg), NULL, mempool, &obj); + if (MSGPACK_UNPACK_SUCCESS == ret) { + /* msgpack_object_print(stdout, obj); */ + handle_msg(&obj, ctx, &buf); + } + msgpack_zone_clear(mempool); + } + zmq_msg_close(&msg); + } + } + } + grn_obj_unlink(ctx, &buf); +} + +struct _suggest_log_file { + FILE *fp; + char *path; + uint64_t line; + /* datas from one line */ + int submit; + char *query; + uint64_t millisec; + char *client_id; + char *learn_target_name; + /* link list */ + struct _suggest_log_file *next; +}; +typedef struct _suggest_log_file suggest_log_file; + +#if 0 +static void +print_log_file_list(suggest_log_file *list) +{ + while (list) { + printf("fp:%p millisec:%" PRIu64 " next:%p\n", + list->fp, list->millisec, list->next); + list = list->next; + } +} +#endif + +static void +free_log_line_data(suggest_log_file *l) +{ + if (l->query) { + free(l->query); + l->query = NULL; + } + if (l->client_id) { + free(l->client_id); + l->client_id = NULL; + } + if (l->learn_target_name) { + free(l->learn_target_name); + l->learn_target_name = NULL; + } +} + +#define MAX_LOG_LENGTH 0x2000 + +static void +read_log_line(suggest_log_file **list) +{ + suggest_log_file *t = *list; + char line_buf[MAX_LOG_LENGTH]; + while (1) { + free_log_line_data(t); + if (fgets(line_buf, MAX_LOG_LENGTH, t->fp)) { + char *eol; + t->line++; + if ((eol = strrchr(line_buf, '\n'))) { + const char *query, *types, *client_id, *learn_target_name; + struct evkeyvalq get_args; + *eol = '\0'; + evhttp_parse_query(line_buf, &get_args); + parse_keyval(NULL, + &get_args, &query, &types, &client_id, NULL, + &learn_target_name, NULL, &(t->millisec), NULL, NULL, NULL, + NULL); + if (query && client_id && learn_target_name && t->millisec) { + t->query = evhttp_decode_uri(query); + t->submit = (types && !strcmp(types, "submit")); + t->client_id = evhttp_decode_uri(client_id); + t->learn_target_name = evhttp_decode_uri(learn_target_name); + evhttp_clear_headers(&get_args); + break; + } + print_error("invalid line path:%s line:%" PRIu64, + t->path, t->line); + evhttp_clear_headers(&get_args); + } else { + /* read until new line */ + while (1) { + int c = fgetc(t->fp); + if (c == '\n' || c == EOF) { break; } + } + } + } else { + /* terminate reading log */ + fclose(t->fp); + free(t->path); + *list = t->next; + free(t); + break; + } + } +} + +/* re-sorting by list->millisec asc with moving a head item. */ +static void +sort_log_file_list(suggest_log_file **list) +{ + suggest_log_file *p, *target; + target = *list; + if (!target || !target->next || target->millisec < target->next->millisec) { + return; + } + *list = target->next; + for (p = *list; p; p = p->next) { + if (!p->next || target->millisec > p->next->millisec) { + target->next = p->next; + p->next = target; + return; + } + } +} + +#define PATH_SEPARATOR '/' + +static suggest_log_file * +gather_log_file(const char *dir_path, unsigned int dir_path_len) +{ + DIR *dir; + struct dirent *dirent; + char path[PATH_MAX + 1]; + suggest_log_file *list = NULL; + if (!(dir = opendir(dir_path))) { + print_error("cannot open log directory."); + return NULL; + } + memcpy(path, dir_path, dir_path_len); + path[dir_path_len] = PATH_SEPARATOR; + while ((dirent = readdir(dir))) { + struct stat fstat; + unsigned int d_namlen, path_len; + if (*(dirent->d_name) == '.' && ( + dirent->d_name[1] == '\0' || + (dirent->d_name[1] == '.' && dirent->d_name[2] == '\0'))) { + continue; + } + d_namlen = strlen(dirent->d_name); + path_len = dir_path_len + 1 + d_namlen; + if (dir_path_len + d_namlen >= PATH_MAX) { continue; } + memcpy(path + dir_path_len + 1, dirent->d_name, d_namlen); + path[path_len] = '\0'; + lstat(path, &fstat); + if (S_ISDIR(fstat.st_mode)) { + gather_log_file(path, path_len); + } else { + suggest_log_file *p = calloc(1, sizeof(suggest_log_file)); + if (!(p->fp = fopen(path, "r"))) { + free(p); + } else { + if (list) { + p->next = list; + } + p->path = strdup(path); + list = p; + read_log_line(&list); + sort_log_file_list(&list); + } + } + /* print_log_file_list(list); */ + } + return list; +} + +static void +load_log(grn_ctx *ctx, const char *log_dir_name) +{ + grn_obj buf; + suggest_log_file *list; + GRN_TEXT_INIT(&buf, 0); + list = gather_log_file(log_dir_name, strlen(log_dir_name)); + while (list) { + /* + printf("file:%s line:%" PRIu64 " query:%s millisec:%" PRIu64 "\n", + list->path, list->line, list->query, list->millisec); + */ + load_to_multi_targets(ctx, &buf, + list->query, strlen(list->query), + list->client_id, strlen(list->client_id), + list->learn_target_name, strlen(list->learn_target_name), + list->millisec, + list->submit); + read_log_line(&list); + sort_log_file_list(&list); + } + grn_obj_close(ctx, &buf); +} + +static void +usage(FILE *output) +{ + fprintf(output, + "Usage: groonga-suggest-learner [options...] db_path\n" + "options:\n" + " -r <recv endpoint>: recv endpoint (default: %s)\n" + " --receive-endpoint <recv endpoint>\n" + "\n" + " -s <send endpoint>: send endpoint (default: %s)\n" + " --send-endpoint <send endpoint>\n" + "\n" + " -l <log directory>: load from log files made on webserver.\n" + " --log-base-path <log directory>\n" + "\n" + " --log-path <path> : output logs to <path>\n" + " --log-level <level> : set log level to <level> (default: %d)\n" + " -d, --daemon : daemonize\n", + DEFAULT_RECV_ENDPOINT, DEFAULT_SEND_ENDPOINT, + GRN_LOG_DEFAULT_LEVEL); +} + +static void +signal_handler(int sig) +{ + loop = 0; +} + +int +main(int argc, char **argv) +{ + run_mode mode = RUN_MODE_NONE; + int n_processed_args; + const char *recv_endpoint = DEFAULT_RECV_ENDPOINT; + const char *send_endpoint = DEFAULT_SEND_ENDPOINT; + const char *log_base_path = NULL; + const char *db_path = NULL; + + /* parse options */ + { + int flags = mode; + const char *log_path = NULL; + const char *log_level = NULL; + static grn_str_getopt_opt opts[] = { + {'r', "receive-endpoint", NULL, 0, GETOPT_OP_NONE}, + {'s', "send-endpoint", NULL, 0, GETOPT_OP_NONE}, + {'l', "log-base-path", NULL, 0, GETOPT_OP_NONE}, + {'\0', "log-path", NULL, 0, GETOPT_OP_NONE}, + {'\0', "log-level", NULL, 0, GETOPT_OP_NONE}, + {'d', "daemon", NULL, RUN_MODE_DAEMON, GETOPT_OP_UPDATE}, + {'h', "help", NULL, RUN_MODE_USAGE, GETOPT_OP_UPDATE}, + {'\0', NULL, NULL, 0, 0} + }; + opts[0].arg = &recv_endpoint; + opts[1].arg = &send_endpoint; + opts[2].arg = &log_base_path; + opts[3].arg = &log_path; + opts[4].arg = &log_level; + + n_processed_args = grn_str_getopt(argc, argv, opts, &flags); + + if (log_path) { + grn_default_logger_set_path(log_path); + } + + if (log_level) { + const char * const end = log_level + strlen(log_level); + const char *rest = NULL; + const int value = grn_atoi(log_level, end, &rest); + if (end != rest || value < 0 || value > 9) { + fprintf(stderr, "invalid log level: <%s>\n", log_level); + return EXIT_FAILURE; + } + grn_default_logger_set_max_level(value); + } + + mode = (flags & RUN_MODE_MASK); + + if (mode & RUN_MODE_USAGE) { + usage(stdout); + return EXIT_SUCCESS; + } + + if ((n_processed_args < 0) || + (argc - n_processed_args) != 1) { + usage(stderr); + } + + db_path = argv[n_processed_args]; + } + + /* main */ + { + grn_ctx *ctx; + msgpack_zone *mempool; + + if (mode == RUN_MODE_DAEMON) { + daemonize(); + } + + grn_init(); + + ctx = grn_ctx_open(0); + if (!(grn_db_open(ctx, db_path))) { + print_error("cannot open database."); + } else { + if (log_base_path) { + /* loading log mode */ + load_log(ctx, log_base_path); + } else { + /* zeromq/msgpack recv mode */ + if (!(mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) { + print_error("cannot create msgpack zone."); + } else { + void *zmq_ctx, *zmq_recv_sock; + if (!(zmq_ctx = zmq_init(1))) { + print_error("cannot create zmq context."); + } else { + if (!(zmq_recv_sock = zmq_socket(zmq_ctx, ZMQ_SUB))) { + print_error("cannot create zmq_socket."); + } else if (zmq_bind(zmq_recv_sock, recv_endpoint)) { + print_error("cannot bind zmq_socket."); + } else { + send_thd_data thd; + + signal(SIGTERM, signal_handler); + signal(SIGINT, signal_handler); + signal(SIGQUIT, signal_handler); + + zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0); + thd.db_path = db_path; + thd.send_endpoint = send_endpoint; + thd.zmq_ctx = zmq_ctx; + + if (pthread_create(&(thd.thd), NULL, send_to_httpd, &thd)) { + print_error("error in pthread_create() for sending datas."); + } + recv_event_loop(mempool, zmq_recv_sock, ctx); + if (pthread_join(thd.thd, NULL)) { + print_error("error in pthread_join() for waiting completion of sending data."); + } + } + zmq_term(zmq_ctx); + } + msgpack_zone_free(mempool); + } + } + } + grn_obj_close(ctx, grn_ctx_db(ctx)); + grn_ctx_fin(ctx); + grn_fin(); + } + return 0; +} diff --git a/storage/mroonga/vendor/groonga/src/suggest/httpd_sources.am b/storage/mroonga/vendor/groonga/src/suggest/httpd_sources.am new file mode 100644 index 00000000..a09328d5 --- /dev/null +++ b/storage/mroonga/vendor/groonga/src/suggest/httpd_sources.am @@ -0,0 +1,3 @@ +groonga_suggest_httpd_SOURCES = \ + groonga_suggest_httpd.c \ + zmq_compatible.h diff --git a/storage/mroonga/vendor/groonga/src/suggest/learner_sources.am b/storage/mroonga/vendor/groonga/src/suggest/learner_sources.am new file mode 100644 index 00000000..03b49087 --- /dev/null +++ b/storage/mroonga/vendor/groonga/src/suggest/learner_sources.am @@ -0,0 +1,3 @@ +groonga_suggest_learner_SOURCES = \ + groonga_suggest_learner.c \ + zmq_compatible.h diff --git a/storage/mroonga/vendor/groonga/src/suggest/util.c b/storage/mroonga/vendor/groonga/src/suggest/util.c new file mode 100644 index 00000000..e455a257 --- /dev/null +++ b/storage/mroonga/vendor/groonga/src/suggest/util.c @@ -0,0 +1,215 @@ +/* -*- c-basic-offset: 2 -*- */ +/* Copyright(C) 2010- Brazil + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA +*/ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <stdarg.h> +#include <unistd.h> +#include <sys/wait.h> +#include <sys/queue.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "util.h" + +#define DEFAULT_FREQUENCY_THRESHOLD 100 +#define DEFAULT_CONDITIONAL_PROBABILITY_THRESHOLD 0.2 + +int +print_error(const char *format, ...) +{ + int r; + va_list l; + + va_start(l, format); + vfprintf(stderr, format, l); + r = fprintf(stderr, "\n"); + fflush(stderr); + va_end(l); + + return r; +} + +int +daemonize(void) +{ + pid_t pid; + + switch (fork()) { + case 0: + break; + case -1: + print_error("fork failed."); + return -1; + default: + wait(NULL); + _exit(0); + } + switch ((pid = fork())) { + case 0: + break; + case -1: + perror("fork"); + return -1; + default: + fprintf(stderr, "%d\n", pid); + _exit(0); + } + { + int null_fd = open("/dev/null", O_RDWR, 0); + if (null_fd != -1) { + dup2(null_fd, 0); + dup2(null_fd, 1); + dup2(null_fd, 2); + if (null_fd > 2) { close(null_fd); } + } + } + return 1; +} + +static uint64_t +atouint64_t(const char *s) +{ + uint64_t r; + for (r = 0; *s; s++) { + r *= 10; + r += (*s - '0'); + } + return r; +} + +void +parse_keyval(grn_ctx *ctx, + struct evkeyvalq *get_args, + const char **query, const char **types, + const char **client_id, const char **target_name, + const char **learn_target_name, + const char **callback, + uint64_t *millisec, + int *frequency_threshold, + double *conditional_probability_threshold, + int *limit, + grn_obj *pass_through_parameters) +{ + struct evkeyval *get; + + if (query) { *query = NULL; } + if (types) { *types = NULL; } + if (client_id) { *client_id = NULL; } + if (target_name) { *target_name = NULL; } + if (learn_target_name) { *learn_target_name = NULL; } + if (callback) { *callback = NULL; } + if (millisec) { *millisec = 0; } + if (frequency_threshold) { + *frequency_threshold = DEFAULT_FREQUENCY_THRESHOLD; + } + if (conditional_probability_threshold) { + *conditional_probability_threshold = DEFAULT_CONDITIONAL_PROBABILITY_THRESHOLD; + } + if (limit) { *limit = -1; } + + TAILQ_FOREACH(get, get_args, next) { + grn_bool is_pass_through_parameter = GRN_FALSE; + size_t key_length; + + key_length = strlen(get->key); + switch (key_length) { + case 0: + break; + case 1: + switch(get->key[0]) { + case 'q': + if (query) { + *query = get->value; + } + break; + case 't': + /* TODO: check types */ + if (types) { + *types = get->value; + } + break; + case 'i': + if (client_id) { + *client_id = get->value; + } + break; + case 's': + if (millisec) { + *millisec = atouint64_t(get->value); + } + break; + case 'n': + /* TODO: check target_name */ + if (target_name) { + *target_name = get->value; + } + break; + case 'l': + if (learn_target_name) { + *learn_target_name = get->value; + } + break; + case 'h': + if (frequency_threshold) { + *frequency_threshold = atoi(get->value); + } + break; + case 'p': + if (conditional_probability_threshold) { + *conditional_probability_threshold = strtod(get->value, NULL); + } + break; + case 'm': + if (limit) { + *limit = atoi(get->value); + } + break; + default: + is_pass_through_parameter = GRN_TRUE; + break; + } + break; + default: + switch (get->key[0]) { + case 'c': + if (!strcmp(get->key, "callback")) { + if (callback) { + *callback = get->value; + } + } else { + is_pass_through_parameter = GRN_TRUE; + } + break; + default: + is_pass_through_parameter = GRN_TRUE; + } + } + + if (is_pass_through_parameter && pass_through_parameters) { + if (GRN_TEXT_LEN(pass_through_parameters) > 0) { + GRN_TEXT_PUTS(ctx, pass_through_parameters, "&"); + } + grn_text_urlenc(ctx, pass_through_parameters, get->key, strlen(get->key)); + GRN_TEXT_PUTS(ctx, pass_through_parameters, "="); + grn_text_urlenc(ctx, pass_through_parameters, + get->value, strlen(get->value)); + } + } +} diff --git a/storage/mroonga/vendor/groonga/src/suggest/util.h b/storage/mroonga/vendor/groonga/src/suggest/util.h new file mode 100644 index 00000000..eb36edfd --- /dev/null +++ b/storage/mroonga/vendor/groonga/src/suggest/util.h @@ -0,0 +1,40 @@ +/* -*- c-basic-offset: 2 -*- */ +/* Copyright(C) 2010- Brazil + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA +*/ +#ifndef GRN_SUGGEST_UTIL_H +#define GRN_SUGGEST_UTIL_H + +#include <sys/queue.h> +#include <event.h> +#include <stdint.h> + +#include <groonga.h> + +int print_error(const char *format, ...); +int daemonize(void); +void parse_keyval(grn_ctx *ctx, + struct evkeyvalq *get_args, + const char **query, const char **types, + const char **client_id, const char **target_name, + const char **learn_target_name, + const char **callback, + uint64_t *millisec, + int *frequency_threshold, + double *conditional_probability_threshold, + int *limit, + grn_obj *pass_through_parameters); + +#endif /* GRN_SUGGEST_UTIL_H */ diff --git a/storage/mroonga/vendor/groonga/src/suggest/util_sources.am b/storage/mroonga/vendor/groonga/src/suggest/util_sources.am new file mode 100644 index 00000000..d4b74860 --- /dev/null +++ b/storage/mroonga/vendor/groonga/src/suggest/util_sources.am @@ -0,0 +1,3 @@ +libutil_la_SOURCES = \ + util.c \ + util.h diff --git a/storage/mroonga/vendor/groonga/src/suggest/zmq_compatible.h b/storage/mroonga/vendor/groonga/src/suggest/zmq_compatible.h new file mode 100644 index 00000000..e4897f68 --- /dev/null +++ b/storage/mroonga/vendor/groonga/src/suggest/zmq_compatible.h @@ -0,0 +1,33 @@ +/* -*- c-basic-offset: 2 -*- */ +/* Copyright(C) 2013 Brazil + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA +*/ +#ifndef GRN_SUGGEST_ZMQ_COMPATIBLE_H +#define GRN_SUGGEST_ZMQ_COMPATIBLE_H + +#include <zmq.h> + +#ifndef ZMQ_SNDHWM +# define ZMQ_SNDHWM ZMQ_HWM +#endif + +#if ZMQ_VERSION_MAJOR == 2 +# define zmq_msg_send(message, socket, flags) \ + zmq_send((socket), (message), (flags)) +# define zmq_msg_recv(message, socket, flags) \ + zmq_recv((socket), (message), (flags)) +#endif + +#endif /* GRN_SUGGEST_ZMQ_COMPATIBLE_H */ |