From 8daa83a594a2e98f39d764422bfbdbc62c9efd44 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 19:20:00 +0200 Subject: Adding upstream version 2:4.20.0+dfsg. Signed-off-by: Daniel Baumann --- source3/rpc_server/mdssvc/mdssvc_es.c | 865 ++++++++++++++++++++++++++++++++++ 1 file changed, 865 insertions(+) create mode 100644 source3/rpc_server/mdssvc/mdssvc_es.c (limited to 'source3/rpc_server/mdssvc/mdssvc_es.c') diff --git a/source3/rpc_server/mdssvc/mdssvc_es.c b/source3/rpc_server/mdssvc/mdssvc_es.c new file mode 100644 index 0000000..8460b48 --- /dev/null +++ b/source3/rpc_server/mdssvc/mdssvc_es.c @@ -0,0 +1,865 @@ +/* + Unix SMB/CIFS implementation. + Main metadata server / Spotlight routines / ES backend + + Copyright (C) Ralph Boehme 2019 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#include "includes.h" +#include "system/filesys.h" +#include "lib/util/time_basic.h" +#include "lib/tls/tls.h" +#include "lib/util/tevent_ntstatus.h" +#include "libcli/http/http.h" +#include "lib/util/tevent_unix.h" +#include "credentials.h" +#include "mdssvc.h" +#include "mdssvc_es.h" +#include "rpc_server/mdssvc/es_parser.tab.h" + +#include + +#undef DBGC_CLASS +#define DBGC_CLASS DBGC_RPC_SRV + +#define MDSSVC_ELASTIC_QUERY_TEMPLATE \ + "{" \ + " \"from\": %zu," \ + " \"size\": %zu," \ + " \"_source\": [%s]," \ + " \"query\": {" \ + " \"query_string\": {" \ + " \"query\": \"%s\"" \ + " }" \ + " }" \ + "}" + +#define MDSSVC_ELASTIC_SOURCES \ + "\"path.real\"" + +static bool mdssvc_es_init(struct mdssvc_ctx *mdssvc_ctx) +{ + struct mdssvc_es_ctx *mdssvc_es_ctx = NULL; + json_error_t json_error; + char *default_path = NULL; + const char *path = NULL; + + mdssvc_es_ctx = talloc_zero(mdssvc_ctx, struct mdssvc_es_ctx); + if (mdssvc_es_ctx == NULL) { + return false; + } + mdssvc_es_ctx->mdssvc_ctx = mdssvc_ctx; + + mdssvc_es_ctx->creds = cli_credentials_init_anon(mdssvc_es_ctx); + if (mdssvc_es_ctx->creds == NULL) { + TALLOC_FREE(mdssvc_es_ctx); + return false; + } + + default_path = talloc_asprintf( + mdssvc_es_ctx, + "%s/mdssvc/elasticsearch_mappings.json", + get_dyn_SAMBA_DATADIR()); + if (default_path == NULL) { + TALLOC_FREE(mdssvc_es_ctx); + return false; + } + + path = lp_parm_const_string(GLOBAL_SECTION_SNUM, + "elasticsearch", + "mappings", + default_path); + if (path == NULL) { + TALLOC_FREE(mdssvc_es_ctx); + return false; + } + + mdssvc_es_ctx->mappings = json_load_file(path, 0, &json_error); + if (mdssvc_es_ctx->mappings == NULL) { + DBG_ERR("Opening mapping file [%s] failed: %s\n", + path, json_error.text); + TALLOC_FREE(mdssvc_es_ctx); + return false; + } + TALLOC_FREE(default_path); + + mdssvc_ctx->backend_private = mdssvc_es_ctx; + return true; +} + +static bool mdssvc_es_shutdown(struct mdssvc_ctx *mdssvc_ctx) +{ + return true; +} + +static struct tevent_req *mds_es_connect_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct mds_es_ctx *mds_es_ctx); +static int mds_es_connect_recv(struct tevent_req *req); +static void mds_es_connected(struct tevent_req *subreq); +static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx); +static void mds_es_search_set_pending(struct sl_es_search *s); +static void mds_es_search_unset_pending(struct sl_es_search *s); + +static int mds_es_ctx_destructor(struct mds_es_ctx *mds_es_ctx) +{ + struct sl_es_search *s = mds_es_ctx->searches; + + /* + * The per tree-connect state mds_es_ctx (a child of mds_ctx) is about + * to go away and has already freed all waiting searches. If there's a + * search remaining that's when the search is already active. Reset the + * mds_es_ctx pointer, so we can detect this when the search completes. + */ + + if (s == NULL) { + return 0; + } + + s->mds_es_ctx = NULL; + + return 0; +} + +static bool mds_es_connect(struct mds_ctx *mds_ctx) +{ + struct mdssvc_es_ctx *mdssvc_es_ctx = talloc_get_type_abort( + mds_ctx->mdssvc_ctx->backend_private, struct mdssvc_es_ctx); + struct mds_es_ctx *mds_es_ctx = NULL; + struct tevent_req *subreq = NULL; + + mds_es_ctx = talloc_zero(mds_ctx, struct mds_es_ctx); + if (mds_es_ctx == NULL) { + return false; + } + *mds_es_ctx = (struct mds_es_ctx) { + .mdssvc_es_ctx = mdssvc_es_ctx, + .mds_ctx = mds_ctx, + }; + + mds_ctx->backend_private = mds_es_ctx; + talloc_set_destructor(mds_es_ctx, mds_es_ctx_destructor); + + subreq = mds_es_connect_send( + mds_es_ctx, + mdssvc_es_ctx->mdssvc_ctx->ev_ctx, + mds_es_ctx); + if (subreq == NULL) { + TALLOC_FREE(mds_es_ctx); + return false; + } + tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx); + return true; +} + +static void mds_es_connected(struct tevent_req *subreq) +{ + struct mds_es_ctx *mds_es_ctx = tevent_req_callback_data( + subreq, struct mds_es_ctx); + int ret; + bool ok; + + ret = mds_es_connect_recv(subreq); + TALLOC_FREE(subreq); + if (ret != 0) { + DBG_ERR("HTTP connect failed\n"); + return; + } + + ok = mds_es_next_search_trigger(mds_es_ctx); + if (!ok) { + DBG_ERR("mds_es_next_search_trigger failed\n"); + } + return; +} + +struct mds_es_connect_state { + struct tevent_context *ev; + struct mds_es_ctx *mds_es_ctx; + struct tevent_queue_entry *qe; + const char *server_addr; + uint16_t server_port; + struct tstream_tls_params *tls_params; +}; + +static void mds_es_http_connect_done(struct tevent_req *subreq); +static void mds_es_http_waited(struct tevent_req *subreq); + +static struct tevent_req *mds_es_connect_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct mds_es_ctx *mds_es_ctx) +{ + struct tevent_req *req = NULL; + struct tevent_req *subreq = NULL; + struct mds_es_connect_state *state = NULL; + const char *server_addr = NULL; + bool use_tls; + NTSTATUS status; + + req = tevent_req_create(mem_ctx, &state, struct mds_es_connect_state); + if (req == NULL) { + return NULL; + } + *state = (struct mds_es_connect_state) { + .ev = ev, + .mds_es_ctx = mds_es_ctx, + }; + + server_addr = lp_parm_const_string( + mds_es_ctx->mds_ctx->snum, + "elasticsearch", + "address", + "localhost"); + state->server_addr = talloc_strdup(state, server_addr); + if (tevent_req_nomem(state->server_addr, req)) { + return tevent_req_post(req, ev); + } + + state->server_port = lp_parm_int( + mds_es_ctx->mds_ctx->snum, + "elasticsearch", + "port", + 9200); + + use_tls = lp_parm_bool( + mds_es_ctx->mds_ctx->snum, + "elasticsearch", + "use tls", + false); + + DBG_DEBUG("Connecting to HTTP%s [%s] port [%"PRIu16"]\n", + use_tls ? "S" : "", state->server_addr, state->server_port); + + if (use_tls) { + const char *ca_file = lp__tls_cafile(); + const char *crl_file = lp__tls_crlfile(); + const char *tls_priority = lp_tls_priority(); + enum tls_verify_peer_state verify_peer = lp_tls_verify_peer(); + + status = tstream_tls_params_client(state, + ca_file, + crl_file, + tls_priority, + verify_peer, + state->server_addr, + &state->tls_params); + if (!NT_STATUS_IS_OK(status)) { + DBG_ERR("Failed tstream_tls_params_client - %s\n", + nt_errstr(status)); + tevent_req_nterror(req, status); + return tevent_req_post(req, ev); + } + } + + subreq = http_connect_send(state, + state->ev, + state->server_addr, + state->server_port, + mds_es_ctx->mdssvc_es_ctx->creds, + state->tls_params); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, mds_es_http_connect_done, req); + return req; +} + +static void mds_es_http_connect_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct mds_es_connect_state *state = tevent_req_data( + req, struct mds_es_connect_state); + int error; + + error = http_connect_recv(subreq, + state->mds_es_ctx, + &state->mds_es_ctx->http_conn); + TALLOC_FREE(subreq); + if (error != 0) { + DBG_ERR("HTTP connect failed, retrying...\n"); + + subreq = tevent_wakeup_send( + state->mds_es_ctx, + state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx, + tevent_timeval_current_ofs(10, 0)); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, + mds_es_http_waited, + req); + return; + } + + DBG_DEBUG("Connected to HTTP%s [%s] port [%"PRIu16"]\n", + state->tls_params ? "S" : "", + state->server_addr, state->server_port); + + tevent_req_done(req); + return; +} + +static void mds_es_http_waited(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct mds_es_connect_state *state = tevent_req_data( + req, struct mds_es_connect_state); + bool ok; + + ok = tevent_wakeup_recv(subreq); + TALLOC_FREE(subreq); + if (!ok) { + tevent_req_error(req, ETIMEDOUT); + return; + } + + subreq = mds_es_connect_send( + state->mds_es_ctx, + state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx, + state->mds_es_ctx); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, mds_es_connected, state->mds_es_ctx); +} + +static int mds_es_connect_recv(struct tevent_req *req) +{ + return tevent_req_simple_recv_unix(req); +} + +static void mds_es_reconnect_on_error(struct sl_es_search *s) +{ + struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx; + struct tevent_req *subreq = NULL; + + if (s->slq != NULL) { + s->slq->state = SLQ_STATE_ERROR; + } + + DBG_WARNING("Reconnecting HTTP...\n"); + TALLOC_FREE(mds_es_ctx->http_conn); + + subreq = mds_es_connect_send( + mds_es_ctx, + mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx, + mds_es_ctx); + if (subreq == NULL) { + DBG_ERR("mds_es_connect_send failed\n"); + return; + } + tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx); +} + +static int search_destructor(struct sl_es_search *s) +{ + if (s->mds_es_ctx == NULL) { + return 0; + } + DLIST_REMOVE(s->mds_es_ctx->searches, s); + return 0; +} + +static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct sl_es_search *s); +static int mds_es_search_recv(struct tevent_req *req); +static void mds_es_search_done(struct tevent_req *subreq); + +static bool mds_es_search(struct sl_query *slq) +{ + struct mds_es_ctx *mds_es_ctx = talloc_get_type_abort( + slq->mds_ctx->backend_private, struct mds_es_ctx); + struct sl_es_search *s = NULL; + bool ok; + + s = talloc_zero(slq, struct sl_es_search); + if (s == NULL) { + return false; + } + *s = (struct sl_es_search) { + .ev = mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx, + .mds_es_ctx = mds_es_ctx, + .slq = slq, + .size = SL_PAGESIZE, + }; + + /* 0 would mean no limit */ + s->max = lp_parm_ulonglong(s->slq->mds_ctx->snum, + "elasticsearch", + "max results", + MAX_SL_RESULTS); + + DBG_DEBUG("Spotlight query: '%s'\n", slq->query_string); + + ok = map_spotlight_to_es_query( + s, + mds_es_ctx->mdssvc_es_ctx->mappings, + slq->path_scope, + slq->query_string, + &s->es_query); + if (!ok) { + TALLOC_FREE(s); + return false; + } + DBG_DEBUG("Elasticsearch query: '%s'\n", s->es_query); + + slq->backend_private = s; + slq->state = SLQ_STATE_RUNNING; + DLIST_ADD_END(mds_es_ctx->searches, s); + talloc_set_destructor(s, search_destructor); + + return mds_es_next_search_trigger(mds_es_ctx); +} + +static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx) +{ + struct tevent_req *subreq = NULL; + struct sl_es_search *s = mds_es_ctx->searches; + + if (mds_es_ctx->http_conn == NULL) { + DBG_DEBUG("Waiting for HTTP connection...\n"); + return true; + } + if (s == NULL) { + DBG_DEBUG("No pending searches, idling...\n"); + return true; + } + if (s->pending) { + DBG_DEBUG("Search pending [%p]\n", s); + return true; + } + + subreq = mds_es_search_send(s, s->ev, s); + if (subreq == NULL) { + return false; + } + tevent_req_set_callback(subreq, mds_es_search_done, s); + mds_es_search_set_pending(s); + return true; +} + +static void mds_es_search_done(struct tevent_req *subreq) +{ + struct sl_es_search *s = tevent_req_callback_data( + subreq, struct sl_es_search); + struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx; + struct sl_query *slq = s->slq; + int ret; + bool ok; + + DBG_DEBUG("Search done for search [%p]\n", s); + + mds_es_search_unset_pending(s); + + if (mds_es_ctx == NULL) { + /* + * Search connection closed by the user while s was pending. + */ + TALLOC_FREE(s); + return; + } + + DLIST_REMOVE(mds_es_ctx->searches, s); + + ret = mds_es_search_recv(subreq); + TALLOC_FREE(subreq); + if (ret != 0) { + mds_es_reconnect_on_error(s); + return; + } + + if (slq == NULL) { + /* + * Closed by the user. Explicitly free "s" here because the + * talloc parent slq is already gone. + */ + TALLOC_FREE(s); + goto trigger; + } + + SLQ_DEBUG(10, slq, "search done"); + + if (s->total == 0 || s->from >= s->max) { + slq->state = SLQ_STATE_DONE; + goto trigger; + } + + if (slq->query_results->num_results >= SL_PAGESIZE) { + slq->state = SLQ_STATE_FULL; + goto trigger; + } + + /* + * Reschedule this query as there are more results waiting in the + * Elasticsearch server and the client result queue has room as + * well. But put it at the end of the list of active queries as a simple + * heuristic that should ensure all client queries are dispatched to the + * server. + */ + DLIST_ADD_END(mds_es_ctx->searches, s); + +trigger: + ok = mds_es_next_search_trigger(mds_es_ctx); + if (!ok) { + DBG_ERR("mds_es_next_search_trigger failed\n"); + } +} + +static void mds_es_search_http_send_done(struct tevent_req *subreq); +static void mds_es_search_http_read_done(struct tevent_req *subreq); + +struct mds_es_search_state { + struct tevent_context *ev; + struct sl_es_search *s; + struct tevent_queue_entry *qe; + struct http_request http_request; + struct http_request *http_response; +}; + +static int mds_es_search_pending_destructor(struct sl_es_search *s) +{ + /* + * s is a child of slq which may get freed when a user closes a + * query. To maintain the HTTP request/response sequence on the HTTP + * channel, we keep processing pending requests and free s when we + * receive the HTTP response for pending requests. + */ + DBG_DEBUG("Preserving pending search [%p]\n", s); + s->slq = NULL; + return -1; +} + +static void mds_es_search_set_pending(struct sl_es_search *s) +{ + DBG_DEBUG("Set pending [%p]\n", s); + SLQ_DEBUG(10, s->slq, "pending"); + + s->pending = true; + talloc_set_destructor(s, mds_es_search_pending_destructor); +} + +static void mds_es_search_unset_pending(struct sl_es_search *s) +{ + DBG_DEBUG("Unset pending [%p]\n", s); + if (s->slq != NULL) { + SLQ_DEBUG(10, s->slq, "unset pending"); + } + + s->pending = false; + talloc_set_destructor(s, search_destructor); +} + +static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct sl_es_search *s) +{ + struct tevent_req *req = NULL; + struct tevent_req *subreq = NULL; + struct mds_es_search_state *state = NULL; + const char *index = NULL; + char *elastic_query = NULL; + char *uri = NULL; + size_t elastic_query_len; + char *elastic_query_len_str = NULL; + char *hostname = NULL; + bool pretty = false; + + req = tevent_req_create(mem_ctx, &state, struct mds_es_search_state); + if (req == NULL) { + return NULL; + } + *state = (struct mds_es_search_state) { + .ev = ev, + .s = s, + }; + + if (!tevent_req_set_endtime(req, ev, timeval_current_ofs(60, 0))) { + return tevent_req_post(req, s->ev); + } + + index = lp_parm_const_string(s->slq->mds_ctx->snum, + "elasticsearch", + "index", + "_all"); + if (tevent_req_nomem(index, req)) { + return tevent_req_post(req, ev); + } + + if (DEBUGLVL(10)) { + pretty = true; + } + + uri = talloc_asprintf(state, + "/%s/_search%s", + index, + pretty ? "?pretty" : ""); + if (tevent_req_nomem(uri, req)) { + return tevent_req_post(req, ev); + } + + elastic_query = talloc_asprintf(state, + MDSSVC_ELASTIC_QUERY_TEMPLATE, + s->from, + s->size, + MDSSVC_ELASTIC_SOURCES, + s->es_query); + if (tevent_req_nomem(elastic_query, req)) { + return tevent_req_post(req, ev); + } + DBG_DEBUG("Elastic query: '%s'\n", elastic_query); + + elastic_query_len = strlen(elastic_query); + + state->http_request = (struct http_request) { + .type = HTTP_REQ_POST, + .uri = uri, + .body = data_blob_const(elastic_query, elastic_query_len), + .major = '1', + .minor = '1', + }; + + elastic_query_len_str = talloc_asprintf(state, "%zu", elastic_query_len); + if (tevent_req_nomem(elastic_query_len_str, req)) { + return tevent_req_post(req, ev); + } + + hostname = get_myname(state); + if (tevent_req_nomem(hostname, req)) { + return tevent_req_post(req, ev); + } + + http_add_header(state, &state->http_request.headers, + "Content-Type", "application/json"); + http_add_header(state, &state->http_request.headers, + "Accept", "application/json"); + http_add_header(state, &state->http_request.headers, + "User-Agent", "Samba/mdssvc"); + http_add_header(state, &state->http_request.headers, + "Host", hostname); + http_add_header(state, &state->http_request.headers, + "Content-Length", elastic_query_len_str); + + subreq = http_send_request_send(state, + ev, + s->mds_es_ctx->http_conn, + &state->http_request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, mds_es_search_http_send_done, req); + return req; +} + +static void mds_es_search_http_send_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct mds_es_search_state *state = tevent_req_data( + req, struct mds_es_search_state); + NTSTATUS status; + + DBG_DEBUG("Sent out search [%p]\n", state->s); + + status = http_send_request_recv(subreq); + TALLOC_FREE(subreq); + if (!NT_STATUS_IS_OK(status)) { + tevent_req_error(req, map_errno_from_nt_status(status)); + return; + } + + if (state->s->mds_es_ctx == NULL || state->s->slq == NULL) { + tevent_req_done(req); + return; + } + + subreq = http_read_response_send(state, + state->ev, + state->s->mds_es_ctx->http_conn, + SL_PAGESIZE * 8192); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, mds_es_search_http_read_done, req); +} + +static void mds_es_search_http_read_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct mds_es_search_state *state = tevent_req_data( + req, struct mds_es_search_state); + struct sl_es_search *s = state->s; + struct sl_query *slq = s->slq; + json_t *root = NULL; + json_t *matches = NULL; + json_t *match = NULL; + size_t i; + json_error_t error; + size_t hits; + NTSTATUS status; + int ret; + bool ok; + + DBG_DEBUG("Got response for search [%p]\n", s); + + status = http_read_response_recv(subreq, state, &state->http_response); + TALLOC_FREE(subreq); + if (!NT_STATUS_IS_OK(status)) { + DBG_DEBUG("HTTP response failed: %s\n", nt_errstr(status)); + tevent_req_error(req, map_errno_from_nt_status(status)); + return; + } + + if (slq == NULL || s->mds_es_ctx == NULL) { + tevent_req_done(req); + return; + } + + switch (state->http_response->response_code) { + case 200: + break; + default: + DBG_ERR("HTTP server response: %u\n", + state->http_response->response_code); + goto fail; + } + + DBG_DEBUG("JSON response:\n%s\n", + talloc_strndup(talloc_tos(), + (char *)state->http_response->body.data, + state->http_response->body.length)); + + root = json_loadb((char *)state->http_response->body.data, + state->http_response->body.length, + 0, + &error); + if (root == NULL) { + DBG_ERR("json_loadb failed\n"); + goto fail; + } + + if (s->total == 0) { + /* + * Get the total number of results the first time, format + * used by Elasticsearch 7.0 or newer + */ + ret = json_unpack(root, "{s: {s: {s: i}}}", + "hits", "total", "value", &s->total); + if (ret != 0) { + /* Format used before 7.0 */ + ret = json_unpack(root, "{s: {s: i}}", + "hits", "total", &s->total); + if (ret != 0) { + DBG_ERR("json_unpack failed\n"); + goto fail; + } + } + + DBG_DEBUG("Total: %zu\n", s->total); + + if (s->total == 0) { + json_decref(root); + tevent_req_done(req); + return; + } + } + + if (s->max == 0 || s->max > s->total) { + s->max = s->total; + } + + ret = json_unpack(root, "{s: {s:o}}", + "hits", "hits", &matches); + if (ret != 0 || matches == NULL) { + DBG_ERR("json_unpack hits failed\n"); + goto fail; + } + + hits = json_array_size(matches); + if (hits == 0) { + DBG_ERR("Hu?! No results?\n"); + goto fail; + } + DBG_DEBUG("Hits: %zu\n", hits); + + for (i = 0; i < hits && s->from + i < s->max; i++) { + const char *path = NULL; + + match = json_array_get(matches, i); + if (match == NULL) { + DBG_ERR("Hu?! No value for index %zu\n", i); + goto fail; + } + ret = json_unpack(match, + "{s: {s: {s: s}}}", + "_source", + "path", + "real", + &path); + if (ret != 0) { + DBG_ERR("Missing path.real in JSON result\n"); + goto fail; + } + + ok = mds_add_result(slq, path); + if (!ok) { + DBG_ERR("error adding result for path: %s\n", path); + goto fail; + } + } + json_decref(root); + + s->from += hits; + slq->state = SLQ_STATE_RESULTS; + tevent_req_done(req); + return; + +fail: + if (root != NULL) { + json_decref(root); + } + slq->state = SLQ_STATE_ERROR; + tevent_req_error(req, EINVAL); + return; +} + +static int mds_es_search_recv(struct tevent_req *req) +{ + return tevent_req_simple_recv_unix(req); +} + +static bool mds_es_search_cont(struct sl_query *slq) +{ + struct sl_es_search *s = talloc_get_type_abort( + slq->backend_private, struct sl_es_search); + + SLQ_DEBUG(10, slq, "continue"); + DLIST_ADD_END(s->mds_es_ctx->searches, s); + return mds_es_next_search_trigger(s->mds_es_ctx); +} + +struct mdssvc_backend mdsscv_backend_es = { + .init = mdssvc_es_init, + .shutdown = mdssvc_es_shutdown, + .connect = mds_es_connect, + .search_start = mds_es_search, + .search_cont = mds_es_search_cont, +}; -- cgit v1.2.3