summaryrefslogtreecommitdiffstats
path: root/source3/rpc_server/mdssvc/mdssvc_es.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--source3/rpc_server/mdssvc/mdssvc_es.c865
1 files changed, 865 insertions, 0 deletions
diff --git a/source3/rpc_server/mdssvc/mdssvc_es.c b/source3/rpc_server/mdssvc/mdssvc_es.c
new file mode 100644
index 0000000..8460b48
--- /dev/null
+++ b/source3/rpc_server/mdssvc/mdssvc_es.c
@@ -0,0 +1,865 @@
+/*
+ Unix SMB/CIFS implementation.
+ Main metadata server / Spotlight routines / ES backend
+
+ Copyright (C) Ralph Boehme 2019
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "system/filesys.h"
+#include "lib/util/time_basic.h"
+#include "lib/tls/tls.h"
+#include "lib/util/tevent_ntstatus.h"
+#include "libcli/http/http.h"
+#include "lib/util/tevent_unix.h"
+#include "credentials.h"
+#include "mdssvc.h"
+#include "mdssvc_es.h"
+#include "rpc_server/mdssvc/es_parser.tab.h"
+
+#include <jansson.h>
+
+#undef DBGC_CLASS
+#define DBGC_CLASS DBGC_RPC_SRV
+
+#define MDSSVC_ELASTIC_QUERY_TEMPLATE \
+ "{" \
+ " \"from\": %zu," \
+ " \"size\": %zu," \
+ " \"_source\": [%s]," \
+ " \"query\": {" \
+ " \"query_string\": {" \
+ " \"query\": \"%s\"" \
+ " }" \
+ " }" \
+ "}"
+
+#define MDSSVC_ELASTIC_SOURCES \
+ "\"path.real\""
+
+static bool mdssvc_es_init(struct mdssvc_ctx *mdssvc_ctx)
+{
+ struct mdssvc_es_ctx *mdssvc_es_ctx = NULL;
+ json_error_t json_error;
+ char *default_path = NULL;
+ const char *path = NULL;
+
+ mdssvc_es_ctx = talloc_zero(mdssvc_ctx, struct mdssvc_es_ctx);
+ if (mdssvc_es_ctx == NULL) {
+ return false;
+ }
+ mdssvc_es_ctx->mdssvc_ctx = mdssvc_ctx;
+
+ mdssvc_es_ctx->creds = cli_credentials_init_anon(mdssvc_es_ctx);
+ if (mdssvc_es_ctx->creds == NULL) {
+ TALLOC_FREE(mdssvc_es_ctx);
+ return false;
+ }
+
+ default_path = talloc_asprintf(
+ mdssvc_es_ctx,
+ "%s/mdssvc/elasticsearch_mappings.json",
+ get_dyn_SAMBA_DATADIR());
+ if (default_path == NULL) {
+ TALLOC_FREE(mdssvc_es_ctx);
+ return false;
+ }
+
+ path = lp_parm_const_string(GLOBAL_SECTION_SNUM,
+ "elasticsearch",
+ "mappings",
+ default_path);
+ if (path == NULL) {
+ TALLOC_FREE(mdssvc_es_ctx);
+ return false;
+ }
+
+ mdssvc_es_ctx->mappings = json_load_file(path, 0, &json_error);
+ if (mdssvc_es_ctx->mappings == NULL) {
+ DBG_ERR("Opening mapping file [%s] failed: %s\n",
+ path, json_error.text);
+ TALLOC_FREE(mdssvc_es_ctx);
+ return false;
+ }
+ TALLOC_FREE(default_path);
+
+ mdssvc_ctx->backend_private = mdssvc_es_ctx;
+ return true;
+}
+
+static bool mdssvc_es_shutdown(struct mdssvc_ctx *mdssvc_ctx)
+{
+ return true;
+}
+
+static struct tevent_req *mds_es_connect_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct mds_es_ctx *mds_es_ctx);
+static int mds_es_connect_recv(struct tevent_req *req);
+static void mds_es_connected(struct tevent_req *subreq);
+static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx);
+static void mds_es_search_set_pending(struct sl_es_search *s);
+static void mds_es_search_unset_pending(struct sl_es_search *s);
+
+static int mds_es_ctx_destructor(struct mds_es_ctx *mds_es_ctx)
+{
+ struct sl_es_search *s = mds_es_ctx->searches;
+
+ /*
+ * The per tree-connect state mds_es_ctx (a child of mds_ctx) is about
+ * to go away and has already freed all waiting searches. If there's a
+ * search remaining that's when the search is already active. Reset the
+ * mds_es_ctx pointer, so we can detect this when the search completes.
+ */
+
+ if (s == NULL) {
+ return 0;
+ }
+
+ s->mds_es_ctx = NULL;
+
+ return 0;
+}
+
+static bool mds_es_connect(struct mds_ctx *mds_ctx)
+{
+ struct mdssvc_es_ctx *mdssvc_es_ctx = talloc_get_type_abort(
+ mds_ctx->mdssvc_ctx->backend_private, struct mdssvc_es_ctx);
+ struct mds_es_ctx *mds_es_ctx = NULL;
+ struct tevent_req *subreq = NULL;
+
+ mds_es_ctx = talloc_zero(mds_ctx, struct mds_es_ctx);
+ if (mds_es_ctx == NULL) {
+ return false;
+ }
+ *mds_es_ctx = (struct mds_es_ctx) {
+ .mdssvc_es_ctx = mdssvc_es_ctx,
+ .mds_ctx = mds_ctx,
+ };
+
+ mds_ctx->backend_private = mds_es_ctx;
+ talloc_set_destructor(mds_es_ctx, mds_es_ctx_destructor);
+
+ subreq = mds_es_connect_send(
+ mds_es_ctx,
+ mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
+ mds_es_ctx);
+ if (subreq == NULL) {
+ TALLOC_FREE(mds_es_ctx);
+ return false;
+ }
+ tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
+ return true;
+}
+
+static void mds_es_connected(struct tevent_req *subreq)
+{
+ struct mds_es_ctx *mds_es_ctx = tevent_req_callback_data(
+ subreq, struct mds_es_ctx);
+ int ret;
+ bool ok;
+
+ ret = mds_es_connect_recv(subreq);
+ TALLOC_FREE(subreq);
+ if (ret != 0) {
+ DBG_ERR("HTTP connect failed\n");
+ return;
+ }
+
+ ok = mds_es_next_search_trigger(mds_es_ctx);
+ if (!ok) {
+ DBG_ERR("mds_es_next_search_trigger failed\n");
+ }
+ return;
+}
+
+struct mds_es_connect_state {
+ struct tevent_context *ev;
+ struct mds_es_ctx *mds_es_ctx;
+ struct tevent_queue_entry *qe;
+ const char *server_addr;
+ uint16_t server_port;
+ struct tstream_tls_params *tls_params;
+};
+
+static void mds_es_http_connect_done(struct tevent_req *subreq);
+static void mds_es_http_waited(struct tevent_req *subreq);
+
+static struct tevent_req *mds_es_connect_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct mds_es_ctx *mds_es_ctx)
+{
+ struct tevent_req *req = NULL;
+ struct tevent_req *subreq = NULL;
+ struct mds_es_connect_state *state = NULL;
+ const char *server_addr = NULL;
+ bool use_tls;
+ NTSTATUS status;
+
+ req = tevent_req_create(mem_ctx, &state, struct mds_es_connect_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ *state = (struct mds_es_connect_state) {
+ .ev = ev,
+ .mds_es_ctx = mds_es_ctx,
+ };
+
+ server_addr = lp_parm_const_string(
+ mds_es_ctx->mds_ctx->snum,
+ "elasticsearch",
+ "address",
+ "localhost");
+ state->server_addr = talloc_strdup(state, server_addr);
+ if (tevent_req_nomem(state->server_addr, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ state->server_port = lp_parm_int(
+ mds_es_ctx->mds_ctx->snum,
+ "elasticsearch",
+ "port",
+ 9200);
+
+ use_tls = lp_parm_bool(
+ mds_es_ctx->mds_ctx->snum,
+ "elasticsearch",
+ "use tls",
+ false);
+
+ DBG_DEBUG("Connecting to HTTP%s [%s] port [%"PRIu16"]\n",
+ use_tls ? "S" : "", state->server_addr, state->server_port);
+
+ if (use_tls) {
+ const char *ca_file = lp__tls_cafile();
+ const char *crl_file = lp__tls_crlfile();
+ const char *tls_priority = lp_tls_priority();
+ enum tls_verify_peer_state verify_peer = lp_tls_verify_peer();
+
+ status = tstream_tls_params_client(state,
+ ca_file,
+ crl_file,
+ tls_priority,
+ verify_peer,
+ state->server_addr,
+ &state->tls_params);
+ if (!NT_STATUS_IS_OK(status)) {
+ DBG_ERR("Failed tstream_tls_params_client - %s\n",
+ nt_errstr(status));
+ tevent_req_nterror(req, status);
+ return tevent_req_post(req, ev);
+ }
+ }
+
+ subreq = http_connect_send(state,
+ state->ev,
+ state->server_addr,
+ state->server_port,
+ mds_es_ctx->mdssvc_es_ctx->creds,
+ state->tls_params);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, mds_es_http_connect_done, req);
+ return req;
+}
+
+static void mds_es_http_connect_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct mds_es_connect_state *state = tevent_req_data(
+ req, struct mds_es_connect_state);
+ int error;
+
+ error = http_connect_recv(subreq,
+ state->mds_es_ctx,
+ &state->mds_es_ctx->http_conn);
+ TALLOC_FREE(subreq);
+ if (error != 0) {
+ DBG_ERR("HTTP connect failed, retrying...\n");
+
+ subreq = tevent_wakeup_send(
+ state->mds_es_ctx,
+ state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
+ tevent_timeval_current_ofs(10, 0));
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq,
+ mds_es_http_waited,
+ req);
+ return;
+ }
+
+ DBG_DEBUG("Connected to HTTP%s [%s] port [%"PRIu16"]\n",
+ state->tls_params ? "S" : "",
+ state->server_addr, state->server_port);
+
+ tevent_req_done(req);
+ return;
+}
+
+static void mds_es_http_waited(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct mds_es_connect_state *state = tevent_req_data(
+ req, struct mds_es_connect_state);
+ bool ok;
+
+ ok = tevent_wakeup_recv(subreq);
+ TALLOC_FREE(subreq);
+ if (!ok) {
+ tevent_req_error(req, ETIMEDOUT);
+ return;
+ }
+
+ subreq = mds_es_connect_send(
+ state->mds_es_ctx,
+ state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
+ state->mds_es_ctx);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, mds_es_connected, state->mds_es_ctx);
+}
+
+static int mds_es_connect_recv(struct tevent_req *req)
+{
+ return tevent_req_simple_recv_unix(req);
+}
+
+static void mds_es_reconnect_on_error(struct sl_es_search *s)
+{
+ struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
+ struct tevent_req *subreq = NULL;
+
+ if (s->slq != NULL) {
+ s->slq->state = SLQ_STATE_ERROR;
+ }
+
+ DBG_WARNING("Reconnecting HTTP...\n");
+ TALLOC_FREE(mds_es_ctx->http_conn);
+
+ subreq = mds_es_connect_send(
+ mds_es_ctx,
+ mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
+ mds_es_ctx);
+ if (subreq == NULL) {
+ DBG_ERR("mds_es_connect_send failed\n");
+ return;
+ }
+ tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
+}
+
+static int search_destructor(struct sl_es_search *s)
+{
+ if (s->mds_es_ctx == NULL) {
+ return 0;
+ }
+ DLIST_REMOVE(s->mds_es_ctx->searches, s);
+ return 0;
+}
+
+static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct sl_es_search *s);
+static int mds_es_search_recv(struct tevent_req *req);
+static void mds_es_search_done(struct tevent_req *subreq);
+
+static bool mds_es_search(struct sl_query *slq)
+{
+ struct mds_es_ctx *mds_es_ctx = talloc_get_type_abort(
+ slq->mds_ctx->backend_private, struct mds_es_ctx);
+ struct sl_es_search *s = NULL;
+ bool ok;
+
+ s = talloc_zero(slq, struct sl_es_search);
+ if (s == NULL) {
+ return false;
+ }
+ *s = (struct sl_es_search) {
+ .ev = mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
+ .mds_es_ctx = mds_es_ctx,
+ .slq = slq,
+ .size = SL_PAGESIZE,
+ };
+
+ /* 0 would mean no limit */
+ s->max = lp_parm_ulonglong(s->slq->mds_ctx->snum,
+ "elasticsearch",
+ "max results",
+ MAX_SL_RESULTS);
+
+ DBG_DEBUG("Spotlight query: '%s'\n", slq->query_string);
+
+ ok = map_spotlight_to_es_query(
+ s,
+ mds_es_ctx->mdssvc_es_ctx->mappings,
+ slq->path_scope,
+ slq->query_string,
+ &s->es_query);
+ if (!ok) {
+ TALLOC_FREE(s);
+ return false;
+ }
+ DBG_DEBUG("Elasticsearch query: '%s'\n", s->es_query);
+
+ slq->backend_private = s;
+ slq->state = SLQ_STATE_RUNNING;
+ DLIST_ADD_END(mds_es_ctx->searches, s);
+ talloc_set_destructor(s, search_destructor);
+
+ return mds_es_next_search_trigger(mds_es_ctx);
+}
+
+static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx)
+{
+ struct tevent_req *subreq = NULL;
+ struct sl_es_search *s = mds_es_ctx->searches;
+
+ if (mds_es_ctx->http_conn == NULL) {
+ DBG_DEBUG("Waiting for HTTP connection...\n");
+ return true;
+ }
+ if (s == NULL) {
+ DBG_DEBUG("No pending searches, idling...\n");
+ return true;
+ }
+ if (s->pending) {
+ DBG_DEBUG("Search pending [%p]\n", s);
+ return true;
+ }
+
+ subreq = mds_es_search_send(s, s->ev, s);
+ if (subreq == NULL) {
+ return false;
+ }
+ tevent_req_set_callback(subreq, mds_es_search_done, s);
+ mds_es_search_set_pending(s);
+ return true;
+}
+
+static void mds_es_search_done(struct tevent_req *subreq)
+{
+ struct sl_es_search *s = tevent_req_callback_data(
+ subreq, struct sl_es_search);
+ struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
+ struct sl_query *slq = s->slq;
+ int ret;
+ bool ok;
+
+ DBG_DEBUG("Search done for search [%p]\n", s);
+
+ mds_es_search_unset_pending(s);
+
+ if (mds_es_ctx == NULL) {
+ /*
+ * Search connection closed by the user while s was pending.
+ */
+ TALLOC_FREE(s);
+ return;
+ }
+
+ DLIST_REMOVE(mds_es_ctx->searches, s);
+
+ ret = mds_es_search_recv(subreq);
+ TALLOC_FREE(subreq);
+ if (ret != 0) {
+ mds_es_reconnect_on_error(s);
+ return;
+ }
+
+ if (slq == NULL) {
+ /*
+ * Closed by the user. Explicitly free "s" here because the
+ * talloc parent slq is already gone.
+ */
+ TALLOC_FREE(s);
+ goto trigger;
+ }
+
+ SLQ_DEBUG(10, slq, "search done");
+
+ if (s->total == 0 || s->from >= s->max) {
+ slq->state = SLQ_STATE_DONE;
+ goto trigger;
+ }
+
+ if (slq->query_results->num_results >= SL_PAGESIZE) {
+ slq->state = SLQ_STATE_FULL;
+ goto trigger;
+ }
+
+ /*
+ * Reschedule this query as there are more results waiting in the
+ * Elasticsearch server and the client result queue has room as
+ * well. But put it at the end of the list of active queries as a simple
+ * heuristic that should ensure all client queries are dispatched to the
+ * server.
+ */
+ DLIST_ADD_END(mds_es_ctx->searches, s);
+
+trigger:
+ ok = mds_es_next_search_trigger(mds_es_ctx);
+ if (!ok) {
+ DBG_ERR("mds_es_next_search_trigger failed\n");
+ }
+}
+
+static void mds_es_search_http_send_done(struct tevent_req *subreq);
+static void mds_es_search_http_read_done(struct tevent_req *subreq);
+
+struct mds_es_search_state {
+ struct tevent_context *ev;
+ struct sl_es_search *s;
+ struct tevent_queue_entry *qe;
+ struct http_request http_request;
+ struct http_request *http_response;
+};
+
+static int mds_es_search_pending_destructor(struct sl_es_search *s)
+{
+ /*
+ * s is a child of slq which may get freed when a user closes a
+ * query. To maintain the HTTP request/response sequence on the HTTP
+ * channel, we keep processing pending requests and free s when we
+ * receive the HTTP response for pending requests.
+ */
+ DBG_DEBUG("Preserving pending search [%p]\n", s);
+ s->slq = NULL;
+ return -1;
+}
+
+static void mds_es_search_set_pending(struct sl_es_search *s)
+{
+ DBG_DEBUG("Set pending [%p]\n", s);
+ SLQ_DEBUG(10, s->slq, "pending");
+
+ s->pending = true;
+ talloc_set_destructor(s, mds_es_search_pending_destructor);
+}
+
+static void mds_es_search_unset_pending(struct sl_es_search *s)
+{
+ DBG_DEBUG("Unset pending [%p]\n", s);
+ if (s->slq != NULL) {
+ SLQ_DEBUG(10, s->slq, "unset pending");
+ }
+
+ s->pending = false;
+ talloc_set_destructor(s, search_destructor);
+}
+
+static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct sl_es_search *s)
+{
+ struct tevent_req *req = NULL;
+ struct tevent_req *subreq = NULL;
+ struct mds_es_search_state *state = NULL;
+ const char *index = NULL;
+ char *elastic_query = NULL;
+ char *uri = NULL;
+ size_t elastic_query_len;
+ char *elastic_query_len_str = NULL;
+ char *hostname = NULL;
+ bool pretty = false;
+
+ req = tevent_req_create(mem_ctx, &state, struct mds_es_search_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ *state = (struct mds_es_search_state) {
+ .ev = ev,
+ .s = s,
+ };
+
+ if (!tevent_req_set_endtime(req, ev, timeval_current_ofs(60, 0))) {
+ return tevent_req_post(req, s->ev);
+ }
+
+ index = lp_parm_const_string(s->slq->mds_ctx->snum,
+ "elasticsearch",
+ "index",
+ "_all");
+ if (tevent_req_nomem(index, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ if (DEBUGLVL(10)) {
+ pretty = true;
+ }
+
+ uri = talloc_asprintf(state,
+ "/%s/_search%s",
+ index,
+ pretty ? "?pretty" : "");
+ if (tevent_req_nomem(uri, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ elastic_query = talloc_asprintf(state,
+ MDSSVC_ELASTIC_QUERY_TEMPLATE,
+ s->from,
+ s->size,
+ MDSSVC_ELASTIC_SOURCES,
+ s->es_query);
+ if (tevent_req_nomem(elastic_query, req)) {
+ return tevent_req_post(req, ev);
+ }
+ DBG_DEBUG("Elastic query: '%s'\n", elastic_query);
+
+ elastic_query_len = strlen(elastic_query);
+
+ state->http_request = (struct http_request) {
+ .type = HTTP_REQ_POST,
+ .uri = uri,
+ .body = data_blob_const(elastic_query, elastic_query_len),
+ .major = '1',
+ .minor = '1',
+ };
+
+ elastic_query_len_str = talloc_asprintf(state, "%zu", elastic_query_len);
+ if (tevent_req_nomem(elastic_query_len_str, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ hostname = get_myname(state);
+ if (tevent_req_nomem(hostname, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ http_add_header(state, &state->http_request.headers,
+ "Content-Type", "application/json");
+ http_add_header(state, &state->http_request.headers,
+ "Accept", "application/json");
+ http_add_header(state, &state->http_request.headers,
+ "User-Agent", "Samba/mdssvc");
+ http_add_header(state, &state->http_request.headers,
+ "Host", hostname);
+ http_add_header(state, &state->http_request.headers,
+ "Content-Length", elastic_query_len_str);
+
+ subreq = http_send_request_send(state,
+ ev,
+ s->mds_es_ctx->http_conn,
+ &state->http_request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, mds_es_search_http_send_done, req);
+ return req;
+}
+
+static void mds_es_search_http_send_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct mds_es_search_state *state = tevent_req_data(
+ req, struct mds_es_search_state);
+ NTSTATUS status;
+
+ DBG_DEBUG("Sent out search [%p]\n", state->s);
+
+ status = http_send_request_recv(subreq);
+ TALLOC_FREE(subreq);
+ if (!NT_STATUS_IS_OK(status)) {
+ tevent_req_error(req, map_errno_from_nt_status(status));
+ return;
+ }
+
+ if (state->s->mds_es_ctx == NULL || state->s->slq == NULL) {
+ tevent_req_done(req);
+ return;
+ }
+
+ subreq = http_read_response_send(state,
+ state->ev,
+ state->s->mds_es_ctx->http_conn,
+ SL_PAGESIZE * 8192);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, mds_es_search_http_read_done, req);
+}
+
+static void mds_es_search_http_read_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct mds_es_search_state *state = tevent_req_data(
+ req, struct mds_es_search_state);
+ struct sl_es_search *s = state->s;
+ struct sl_query *slq = s->slq;
+ json_t *root = NULL;
+ json_t *matches = NULL;
+ json_t *match = NULL;
+ size_t i;
+ json_error_t error;
+ size_t hits;
+ NTSTATUS status;
+ int ret;
+ bool ok;
+
+ DBG_DEBUG("Got response for search [%p]\n", s);
+
+ status = http_read_response_recv(subreq, state, &state->http_response);
+ TALLOC_FREE(subreq);
+ if (!NT_STATUS_IS_OK(status)) {
+ DBG_DEBUG("HTTP response failed: %s\n", nt_errstr(status));
+ tevent_req_error(req, map_errno_from_nt_status(status));
+ return;
+ }
+
+ if (slq == NULL || s->mds_es_ctx == NULL) {
+ tevent_req_done(req);
+ return;
+ }
+
+ switch (state->http_response->response_code) {
+ case 200:
+ break;
+ default:
+ DBG_ERR("HTTP server response: %u\n",
+ state->http_response->response_code);
+ goto fail;
+ }
+
+ DBG_DEBUG("JSON response:\n%s\n",
+ talloc_strndup(talloc_tos(),
+ (char *)state->http_response->body.data,
+ state->http_response->body.length));
+
+ root = json_loadb((char *)state->http_response->body.data,
+ state->http_response->body.length,
+ 0,
+ &error);
+ if (root == NULL) {
+ DBG_ERR("json_loadb failed\n");
+ goto fail;
+ }
+
+ if (s->total == 0) {
+ /*
+ * Get the total number of results the first time, format
+ * used by Elasticsearch 7.0 or newer
+ */
+ ret = json_unpack(root, "{s: {s: {s: i}}}",
+ "hits", "total", "value", &s->total);
+ if (ret != 0) {
+ /* Format used before 7.0 */
+ ret = json_unpack(root, "{s: {s: i}}",
+ "hits", "total", &s->total);
+ if (ret != 0) {
+ DBG_ERR("json_unpack failed\n");
+ goto fail;
+ }
+ }
+
+ DBG_DEBUG("Total: %zu\n", s->total);
+
+ if (s->total == 0) {
+ json_decref(root);
+ tevent_req_done(req);
+ return;
+ }
+ }
+
+ if (s->max == 0 || s->max > s->total) {
+ s->max = s->total;
+ }
+
+ ret = json_unpack(root, "{s: {s:o}}",
+ "hits", "hits", &matches);
+ if (ret != 0 || matches == NULL) {
+ DBG_ERR("json_unpack hits failed\n");
+ goto fail;
+ }
+
+ hits = json_array_size(matches);
+ if (hits == 0) {
+ DBG_ERR("Hu?! No results?\n");
+ goto fail;
+ }
+ DBG_DEBUG("Hits: %zu\n", hits);
+
+ for (i = 0; i < hits && s->from + i < s->max; i++) {
+ const char *path = NULL;
+
+ match = json_array_get(matches, i);
+ if (match == NULL) {
+ DBG_ERR("Hu?! No value for index %zu\n", i);
+ goto fail;
+ }
+ ret = json_unpack(match,
+ "{s: {s: {s: s}}}",
+ "_source",
+ "path",
+ "real",
+ &path);
+ if (ret != 0) {
+ DBG_ERR("Missing path.real in JSON result\n");
+ goto fail;
+ }
+
+ ok = mds_add_result(slq, path);
+ if (!ok) {
+ DBG_ERR("error adding result for path: %s\n", path);
+ goto fail;
+ }
+ }
+ json_decref(root);
+
+ s->from += hits;
+ slq->state = SLQ_STATE_RESULTS;
+ tevent_req_done(req);
+ return;
+
+fail:
+ if (root != NULL) {
+ json_decref(root);
+ }
+ slq->state = SLQ_STATE_ERROR;
+ tevent_req_error(req, EINVAL);
+ return;
+}
+
+static int mds_es_search_recv(struct tevent_req *req)
+{
+ return tevent_req_simple_recv_unix(req);
+}
+
+static bool mds_es_search_cont(struct sl_query *slq)
+{
+ struct sl_es_search *s = talloc_get_type_abort(
+ slq->backend_private, struct sl_es_search);
+
+ SLQ_DEBUG(10, slq, "continue");
+ DLIST_ADD_END(s->mds_es_ctx->searches, s);
+ return mds_es_next_search_trigger(s->mds_es_ctx);
+}
+
+struct mdssvc_backend mdsscv_backend_es = {
+ .init = mdssvc_es_init,
+ .shutdown = mdssvc_es_shutdown,
+ .connect = mds_es_connect,
+ .search_start = mds_es_search,
+ .search_cont = mds_es_search_cont,
+};