From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/rgw/rgw_sync_module_es_rest.cc | 426 +++++++++++++++++++++++++++++++++++++ 1 file changed, 426 insertions(+) create mode 100644 src/rgw/rgw_sync_module_es_rest.cc (limited to 'src/rgw/rgw_sync_module_es_rest.cc') diff --git a/src/rgw/rgw_sync_module_es_rest.cc b/src/rgw/rgw_sync_module_es_rest.cc new file mode 100644 index 000000000..f040a166b --- /dev/null +++ b/src/rgw/rgw_sync_module_es_rest.cc @@ -0,0 +1,426 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_sync_module_es.h" +#include "rgw_sync_module_es_rest.h" +#include "rgw_es_query.h" +#include "rgw_op.h" +#include "rgw_rest.h" +#include "rgw_rest_s3.h" +#include "rgw_sal_rados.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw + +struct es_index_obj_response { + string bucket; + rgw_obj_key key; + uint64_t versioned_epoch{0}; + ACLOwner owner; + set read_permissions; + + struct { + uint64_t size{0}; + ceph::real_time mtime; + string etag; + string content_type; + string storage_class; + map custom_str; + map custom_int; + map custom_date; + + template + struct _custom_entry { + string name; + T value; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("name", name, obj); + JSONDecoder::decode_json("value", value, obj); + } + }; + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("size", size, obj); + string mtime_str; + JSONDecoder::decode_json("mtime", mtime_str, obj); + parse_time(mtime_str.c_str(), &mtime); + JSONDecoder::decode_json("etag", etag, obj); + JSONDecoder::decode_json("content_type", content_type, obj); + JSONDecoder::decode_json("storage_class", storage_class, obj); + list<_custom_entry > str_entries; + JSONDecoder::decode_json("custom-string", str_entries, obj); + for (auto& e : str_entries) { + custom_str[e.name] = e.value; + } + list<_custom_entry > int_entries; + JSONDecoder::decode_json("custom-int", int_entries, obj); + for (auto& e : int_entries) { + custom_int[e.name] = e.value; + } + list<_custom_entry > date_entries; + JSONDecoder::decode_json("custom-date", date_entries, obj); + for (auto& e : date_entries) { + custom_date[e.name] = e.value; + } + } + } meta; + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("bucket", bucket, obj); + JSONDecoder::decode_json("name", key.name, obj); + JSONDecoder::decode_json("instance", key.instance, obj); + JSONDecoder::decode_json("versioned_epoch", versioned_epoch, obj); + JSONDecoder::decode_json("permissions", read_permissions, obj); + JSONDecoder::decode_json("owner", owner, obj); + JSONDecoder::decode_json("meta", meta, obj); + } +}; + +struct es_search_response { + uint32_t took; + bool timed_out; + struct { + uint32_t total; + uint32_t successful; + uint32_t failed; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("total", total, obj); + JSONDecoder::decode_json("successful", successful, obj); + JSONDecoder::decode_json("failed", failed, obj); + } + } shards; + struct obj_hit { + string index; + string type; + string id; + // double score + es_index_obj_response source; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("_index", index, obj); + JSONDecoder::decode_json("_type", type, obj); + JSONDecoder::decode_json("_id", id, obj); + JSONDecoder::decode_json("_source", source, obj); + } + }; + struct { + uint32_t total; + // double max_score; + list hits; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("total", total, obj); + // JSONDecoder::decode_json("max_score", max_score, obj); + JSONDecoder::decode_json("hits", hits, obj); + } + } hits; + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("took", took, obj); + JSONDecoder::decode_json("timed_out", timed_out, obj); + JSONDecoder::decode_json("_shards", shards, obj); + JSONDecoder::decode_json("hits", hits, obj); + } +}; + +class RGWMetadataSearchOp : public RGWOp { + RGWSyncModuleInstanceRef sync_module_ref; + RGWElasticSyncModuleInstance *es_module; +protected: + string expression; + string custom_prefix; +#define MAX_KEYS_DEFAULT 100 + uint64_t max_keys{MAX_KEYS_DEFAULT}; + string marker_str; + uint64_t marker{0}; + string next_marker; + bool is_truncated{false}; + string err; + + es_search_response response; + +public: + RGWMetadataSearchOp(const RGWSyncModuleInstanceRef& sync_module) : sync_module_ref(sync_module) { + es_module = static_cast(sync_module_ref.get()); + } + + int verify_permission(optional_yield) override { + return 0; + } + virtual int get_params() = 0; + void pre_exec() override; + void execute(optional_yield y) override; + + const char* name() const override { return "metadata_search"; } + virtual RGWOpType get_type() override { return RGW_OP_METADATA_SEARCH; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; } +}; + +void RGWMetadataSearchOp::pre_exec() +{ + rgw_bucket_object_pre_exec(s); +} + +void RGWMetadataSearchOp::execute(optional_yield y) +{ + op_ret = get_params(); + if (op_ret < 0) + return; + + list > conds; + + if (!s->user->get_info().system) { + conds.push_back(make_pair("permissions", s->user->get_id().to_str())); + } + + if (!s->bucket_name.empty()) { + conds.push_back(make_pair("bucket", s->bucket_name)); + } + + ESQueryCompiler es_query(expression, &conds, custom_prefix); + + static map aliases = { + { "bucket", "bucket" }, /* forces lowercase */ + { "name", "name" }, + { "key", "name" }, + { "instance", "instance" }, + { "etag", "meta.etag" }, + { "size", "meta.size" }, + { "mtime", "meta.mtime" }, + { "lastmodified", "meta.mtime" }, + { "last_modified", "meta.mtime" }, + { "contenttype", "meta.content_type" }, + { "content_type", "meta.content_type" }, + { "storageclass", "meta.storage_class" }, + { "storage_class", "meta.storage_class" }, + }; + es_query.set_field_aliases(&aliases); + + static map generic_map = { {"bucket", ESEntityTypeMap::ES_ENTITY_STR}, + {"name", ESEntityTypeMap::ES_ENTITY_STR}, + {"instance", ESEntityTypeMap::ES_ENTITY_STR}, + {"permissions", ESEntityTypeMap::ES_ENTITY_STR}, + {"meta.etag", ESEntityTypeMap::ES_ENTITY_STR}, + {"meta.content_type", ESEntityTypeMap::ES_ENTITY_STR}, + {"meta.mtime", ESEntityTypeMap::ES_ENTITY_DATE}, + {"meta.size", ESEntityTypeMap::ES_ENTITY_INT}, + {"meta.storage_class", ESEntityTypeMap::ES_ENTITY_STR} }; + ESEntityTypeMap gm(generic_map); + es_query.set_generic_type_map(&gm); + + static set restricted_fields = { {"permissions"} }; + es_query.set_restricted_fields(&restricted_fields); + + map custom_map; + for (auto& i : s->bucket->get_info().mdsearch_config) { + custom_map[i.first] = (ESEntityTypeMap::EntityType)i.second; + } + + ESEntityTypeMap em(custom_map); + es_query.set_custom_type_map(&em); + + bool valid = es_query.compile(&err); + if (!valid) { + ldpp_dout(this, 10) << "invalid query, failed generating request json" << dendl; + op_ret = -EINVAL; + return; + } + + JSONFormatter f; + encode_json("root", es_query, &f); + + RGWRESTConn *conn = es_module->get_rest_conn(); + + bufferlist in; + bufferlist out; + + stringstream ss; + + f.flush(ss); + in.append(ss.str()); + + string resource = es_module->get_index_path() + "/_search"; + param_vec_t params; + static constexpr int BUFSIZE = 32; + char buf[BUFSIZE]; + snprintf(buf, sizeof(buf), "%lld", (long long)max_keys); + params.push_back(param_pair_t("size", buf)); + if (marker > 0) { + params.push_back(param_pair_t("from", marker_str.c_str())); + } + ldpp_dout(this, 20) << "sending request to elasticsearch, payload=" << string(in.c_str(), in.length()) << dendl; + auto& extra_headers = es_module->get_request_headers(); + op_ret = conn->get_resource(s, resource, ¶ms, &extra_headers, + out, &in, nullptr, y); + if (op_ret < 0) { + ldpp_dout(this, 0) << "ERROR: failed to fetch resource (r=" << resource << ", ret=" << op_ret << ")" << dendl; + return; + } + + ldpp_dout(this, 20) << "response: " << string(out.c_str(), out.length()) << dendl; + + JSONParser jparser; + if (!jparser.parse(out.c_str(), out.length())) { + ldpp_dout(this, 0) << "ERROR: failed to parse elasticsearch response" << dendl; + op_ret = -EINVAL; + return; + } + + try { + decode_json_obj(response, &jparser); + } catch (const JSONDecoder::err& e) { + ldpp_dout(this, 0) << "ERROR: failed to decode JSON input: " << e.what() << dendl; + op_ret = -EINVAL; + return; + } + +} + +class RGWMetadataSearch_ObjStore_S3 : public RGWMetadataSearchOp { +public: + explicit RGWMetadataSearch_ObjStore_S3(const RGWSyncModuleInstanceRef& _sync_module) : RGWMetadataSearchOp(_sync_module) { + custom_prefix = "x-amz-meta-"; + } + + int get_params() override { + expression = s->info.args.get("query"); + bool exists; + string max_keys_str = s->info.args.get("max-keys", &exists); +#define MAX_KEYS_MAX 10000 + if (exists) { + string err; + max_keys = strict_strtoll(max_keys_str.c_str(), 10, &err); + if (!err.empty()) { + return -EINVAL; + } + if (max_keys > MAX_KEYS_MAX) { + max_keys = MAX_KEYS_MAX; + } + } + marker_str = s->info.args.get("marker", &exists); + if (exists) { + string err; + marker = strict_strtoll(marker_str.c_str(), 10, &err); + if (!err.empty()) { + return -EINVAL; + } + } + uint64_t nm = marker + max_keys; + static constexpr int BUFSIZE = 32; + char buf[BUFSIZE]; + snprintf(buf, sizeof(buf), "%lld", (long long)nm); + next_marker = buf; + return 0; + } + void send_response() override { + if (op_ret) { + s->err.message = err; + set_req_state_err(s, op_ret); + } + dump_errno(s); + end_header(s, this, "application/xml"); + + if (op_ret < 0) { + return; + } + + is_truncated = (response.hits.hits.size() >= max_keys); + + s->formatter->open_object_section("SearchMetadataResponse"); + s->formatter->dump_string("Marker", marker_str); + s->formatter->dump_string("IsTruncated", (is_truncated ? "true" : "false")); + if (is_truncated) { + s->formatter->dump_string("NextMarker", next_marker); + } + if (s->format == RGW_FORMAT_JSON) { + s->formatter->open_array_section("Objects"); + } + for (auto& i : response.hits.hits) { + s->formatter->open_object_section("Contents"); + es_index_obj_response& e = i.source; + s->formatter->dump_string("Bucket", e.bucket); + s->formatter->dump_string("Key", e.key.name); + string instance = (!e.key.instance.empty() ? e.key.instance : "null"); + s->formatter->dump_string("Instance", instance.c_str()); + s->formatter->dump_int("VersionedEpoch", e.versioned_epoch); + dump_time(s, "LastModified", &e.meta.mtime); + s->formatter->dump_int("Size", e.meta.size); + s->formatter->dump_format("ETag", "\"%s\"", e.meta.etag.c_str()); + s->formatter->dump_string("ContentType", e.meta.content_type.c_str()); + s->formatter->dump_string("StorageClass", e.meta.storage_class.c_str()); + dump_owner(s, e.owner.get_id(), e.owner.get_display_name()); + s->formatter->open_array_section("CustomMetadata"); + for (auto& m : e.meta.custom_str) { + s->formatter->open_object_section("Entry"); + s->formatter->dump_string("Name", m.first.c_str()); + s->formatter->dump_string("Value", m.second); + s->formatter->close_section(); + } + for (auto& m : e.meta.custom_int) { + s->formatter->open_object_section("Entry"); + s->formatter->dump_string("Name", m.first.c_str()); + s->formatter->dump_int("Value", m.second); + s->formatter->close_section(); + } + for (auto& m : e.meta.custom_date) { + s->formatter->open_object_section("Entry"); + s->formatter->dump_string("Name", m.first.c_str()); + s->formatter->dump_string("Value", m.second); + s->formatter->close_section(); + } + s->formatter->close_section(); + rgw_flush_formatter(s, s->formatter); + s->formatter->close_section(); + }; + if (s->format == RGW_FORMAT_JSON) { + s->formatter->close_section(); + } + s->formatter->close_section(); + rgw_flush_formatter_and_reset(s, s->formatter); + } +}; + +class RGWHandler_REST_MDSearch_S3 : public RGWHandler_REST_S3 { +protected: + RGWOp *op_get() override { + if (s->info.args.exists("query")) { + return new RGWMetadataSearch_ObjStore_S3(store->getRados()->get_sync_module()); + } + if (!s->init_state.url_bucket.empty() && + s->info.args.exists("mdsearch")) { + return new RGWGetBucketMetaSearch_ObjStore_S3; + } + return nullptr; + } + RGWOp *op_head() override { + return nullptr; + } + RGWOp *op_post() override { + return nullptr; + } +public: + explicit RGWHandler_REST_MDSearch_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {} + virtual ~RGWHandler_REST_MDSearch_S3() {} +}; + + +RGWHandler_REST* RGWRESTMgr_MDSearch_S3::get_handler(rgw::sal::RGWRadosStore *store, + struct req_state* const s, + const rgw::auth::StrategyRegistry& auth_registry, + const std::string& frontend_prefix) +{ + int ret = + RGWHandler_REST_S3::init_from_header(store, s, + RGW_FORMAT_XML, true); + if (ret < 0) { + return nullptr; + } + + if (!s->object->empty()) { + return nullptr; + } + + RGWHandler_REST *handler = new RGWHandler_REST_MDSearch_S3(auth_registry); + + ldpp_dout(s, 20) << __func__ << " handler=" << typeid(*handler).name() + << dendl; + return handler; +} + -- cgit v1.2.3