From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rgw/driver/rados/rgw_notify.cc | 1023 ++++++++++++++++++++++++++++++++++++ 1 file changed, 1023 insertions(+) create mode 100644 src/rgw/driver/rados/rgw_notify.cc (limited to 'src/rgw/driver/rados/rgw_notify.cc') diff --git a/src/rgw/driver/rados/rgw_notify.cc b/src/rgw/driver/rados/rgw_notify.cc new file mode 100644 index 000000000..b1835016e --- /dev/null +++ b/src/rgw/driver/rados/rgw_notify.cc @@ -0,0 +1,1023 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "rgw_notify.h" +#include "cls/2pc_queue/cls_2pc_queue_client.h" +#include "cls/lock/cls_lock_client.h" +#include +#include +#include +#include +#include "rgw_sal_rados.h" +#include "rgw_pubsub.h" +#include "rgw_pubsub_push.h" +#include "rgw_perf_counters.h" +#include "common/dout.h" +#include + +#define dout_subsys ceph_subsys_rgw + +namespace rgw::notify { + +struct event_entry_t { + rgw_pubsub_s3_event event; + std::string push_endpoint; + std::string push_endpoint_args; + std::string arn_topic; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(event, bl); + encode(push_endpoint, bl); + encode(push_endpoint_args, bl); + encode(arn_topic, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(event, bl); + decode(push_endpoint, bl); + decode(push_endpoint_args, bl); + decode(arn_topic, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(event_entry_t) + +using queues_t = std::set; + +// use mmap/mprotect to allocate 128k coroutine stacks +auto make_stack_allocator() { + return boost::context::protected_fixedsize_stack{128*1024}; +} + +const std::string Q_LIST_OBJECT_NAME = "queues_list_object"; + +class Manager : public DoutPrefixProvider { + const size_t max_queue_size; + const uint32_t queues_update_period_ms; + const uint32_t queues_update_retry_ms; + const uint32_t queue_idle_sleep_us; + const utime_t failover_time; + CephContext* const cct; + static constexpr auto COOKIE_LEN = 16; + const std::string lock_cookie; + boost::asio::io_context io_context; + boost::asio::executor_work_guard work_guard; + const uint32_t worker_count; + std::vector workers; + const uint32_t stale_reservations_period_s; + const uint32_t reservations_cleanup_period_s; +public: + librados::IoCtx& rados_ioctx; +private: + + CephContext *get_cct() const override { return cct; } + unsigned get_subsys() const override { return dout_subsys; } + std::ostream& gen_prefix(std::ostream& out) const override { return out << "rgw notify: "; } + + // read the list of queues from the queue list object + int read_queue_list(queues_t& queues, optional_yield y) { + constexpr auto max_chunk = 1024U; + std::string start_after; + bool more = true; + int rval; + while (more) { + librados::ObjectReadOperation op; + queues_t queues_chunk; + op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval); + const auto ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y); + if (ret == -ENOENT) { + // queue list object was not created - nothing to do + return 0; + } + if (ret < 0) { + // TODO: do we need to check on rval as well as ret? + ldpp_dout(this, 1) << "ERROR: failed to read queue list. error: " << ret << dendl; + return ret; + } + queues.merge(queues_chunk); + } + return 0; + } + + // set m1 to be the minimum between m1 and m2 + static int set_min_marker(std::string& m1, const std::string m2) { + cls_queue_marker mr1; + cls_queue_marker mr2; + if (mr1.from_str(m1.c_str()) < 0 || mr2.from_str(m2.c_str()) < 0) { + return -EINVAL; + } + if (mr2.gen <= mr1.gen && mr2.offset < mr1.offset) { + m1 = m2; + } + return 0; + } + + using Clock = ceph::coarse_mono_clock; + using Executor = boost::asio::io_context::executor_type; + using Timer = boost::asio::basic_waitable_timer, Executor>; + + class tokens_waiter { + const std::chrono::hours infinite_duration; + size_t pending_tokens; + Timer timer; + + struct token { + tokens_waiter& waiter; + token(tokens_waiter& _waiter) : waiter(_waiter) { + ++waiter.pending_tokens; + } + + ~token() { + --waiter.pending_tokens; + if (waiter.pending_tokens == 0) { + waiter.timer.cancel(); + } + } + }; + + public: + + tokens_waiter(boost::asio::io_context& io_context) : + infinite_duration(1000), + pending_tokens(0), + timer(io_context) {} + + void async_wait(yield_context yield) { + if (pending_tokens == 0) { + return; + } + timer.expires_from_now(infinite_duration); + boost::system::error_code ec; + timer.async_wait(yield[ec]); + ceph_assert(ec == boost::system::errc::operation_canceled); + } + + token make_token() { + return token(*this); + } + }; + + // processing of a specific entry + // return whether processing was successfull (true) or not (false) + bool process_entry(const cls_queue_entry& entry, yield_context yield) { + event_entry_t event_entry; + auto iter = entry.data.cbegin(); + try { + decode(event_entry, iter); + } catch (buffer::error& err) { + ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl; + return false; + } + try { + // TODO move endpoint creation to queue level + const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic, + RGWHTTPArgs(event_entry.push_endpoint_args, this), + cct); + ldpp_dout(this, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint << + " for entry: " << entry.marker << dendl; + const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield)); + if (ret < 0) { + ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint + << " failed. error: " << ret << " (will retry)" << dendl; + return false; + } else { + ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint + << " ok" << dendl; + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); + return true; + } + } catch (const RGWPubSubEndpoint::configuration_error& e) { + ldpp_dout(this, 5) << "WARNING: failed to create push endpoint: " + << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl; + return false; + } + } + + // clean stale reservation from queue + void cleanup_queue(const std::string& queue_name, yield_context yield) { + while (true) { + ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl; + const auto now = ceph::coarse_real_time::clock::now(); + const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s); + librados::ObjectWriteOperation op; + op.assert_exists(); + rados::cls::lock::assert_locked(&op, queue_name+"_lock", + ClsLockType::EXCLUSIVE, + lock_cookie, + "" /*no tag*/); + cls_2pc_queue_expire_reservations(op, stale_time); + // check ownership and do reservation cleanup in one batch + auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + if (ret == -ENOENT) { + // queue was deleted + ldpp_dout(this, 5) << "INFO: queue: " + << queue_name << ". was removed. cleanup will stop" << dendl; + return; + } + if (ret == -EBUSY) { + ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl; + return; + } + if (ret < 0) { + ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name + << ". error: " << ret << dendl; + } + Timer timer(io_context); + timer.expires_from_now(std::chrono::seconds(reservations_cleanup_period_s)); + boost::system::error_code ec; + timer.async_wait(yield[ec]); + } + } + + // processing of a specific queue + void process_queue(const std::string& queue_name, yield_context yield) { + constexpr auto max_elements = 1024; + auto is_idle = false; + const std::string start_marker; + + // start a the cleanup coroutine for the queue + spawn::spawn(io_context, [this, queue_name](yield_context yield) { + cleanup_queue(queue_name, yield); + }, make_stack_allocator()); + + while (true) { + // if queue was empty the last time, sleep for idle timeout + if (is_idle) { + Timer timer(io_context); + timer.expires_from_now(std::chrono::microseconds(queue_idle_sleep_us)); + boost::system::error_code ec; + timer.async_wait(yield[ec]); + } + + // get list of entries in the queue + is_idle = true; + bool truncated = false; + std::string end_marker; + std::vector entries; + auto total_entries = 0U; + { + librados::ObjectReadOperation op; + op.assert_exists(); + bufferlist obl; + int rval; + rados::cls::lock::assert_locked(&op, queue_name+"_lock", + ClsLockType::EXCLUSIVE, + lock_cookie, + "" /*no tag*/); + cls_2pc_queue_list_entries(op, start_marker, max_elements, &obl, &rval); + // check ownership and list entries in one batch + auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, nullptr, optional_yield(io_context, yield)); + if (ret == -ENOENT) { + // queue was deleted + ldpp_dout(this, 5) << "INFO: queue: " + << queue_name << ". was removed. processing will stop" << dendl; + return; + } + if (ret == -EBUSY) { + ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl; + return; + } + if (ret < 0) { + ldpp_dout(this, 5) << "WARNING: failed to get list of entries in queue and/or lock queue: " + << queue_name << ". error: " << ret << " (will retry)" << dendl; + continue; + } + ret = cls_2pc_queue_list_entries_result(obl, entries, &truncated, end_marker); + if (ret < 0) { + ldpp_dout(this, 5) << "WARNING: failed to parse list of entries in queue: " + << queue_name << ". error: " << ret << " (will retry)" << dendl; + continue; + } + } + total_entries = entries.size(); + if (total_entries == 0) { + // nothing in the queue + continue; + } + // log when queue is not idle + ldpp_dout(this, 20) << "INFO: found: " << total_entries << " entries in: " << queue_name << + ". end marker is: " << end_marker << dendl; + + is_idle = false; + auto has_error = false; + auto remove_entries = false; + auto entry_idx = 1U; + tokens_waiter waiter(io_context); + for (auto& entry : entries) { + if (has_error) { + // bail out on first error + break; + } + // TODO pass entry pointer instead of by-value + spawn::spawn(yield, [this, &queue_name, entry_idx, total_entries, &end_marker, &remove_entries, &has_error, &waiter, entry](yield_context yield) { + const auto token = waiter.make_token(); + if (process_entry(entry, yield)) { + ldpp_dout(this, 20) << "INFO: processing of entry: " << + entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " ok" << dendl; + remove_entries = true; + } else { + if (set_min_marker(end_marker, entry.marker) < 0) { + ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl; + } else { + ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl; + } + has_error = true; + ldpp_dout(this, 20) << "INFO: processing of entry: " << + entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl; + } + }, make_stack_allocator()); + ++entry_idx; + } + + // wait for all pending work to finish + waiter.async_wait(yield); + + // delete all published entries from queue + if (remove_entries) { + librados::ObjectWriteOperation op; + op.assert_exists(); + rados::cls::lock::assert_locked(&op, queue_name+"_lock", + ClsLockType::EXCLUSIVE, + lock_cookie, + "" /*no tag*/); + cls_2pc_queue_remove_entries(op, end_marker); + // check ownership and deleted entries in one batch + const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + if (ret == -ENOENT) { + // queue was deleted + ldpp_dout(this, 5) << "INFO: queue: " + << queue_name << ". was removed. processing will stop" << dendl; + return; + } + if (ret == -EBUSY) { + ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl; + return; + } + if (ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker << " from queue: " + << queue_name << ". error: " << ret << dendl; + } else { + ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker << " from queue: " + << queue_name << dendl; + } + } + } + } + + // lits of owned queues + using owned_queues_t = std::unordered_set; + + // process all queues + // find which of the queues is owned by this daemon and process it + void process_queues(yield_context yield) { + auto has_error = false; + owned_queues_t owned_queues; + + // add randomness to the duration between queue checking + // to make sure that different daemons are not synced + std::random_device seed; + std::mt19937 rnd_gen(seed()); + const auto min_jitter = 100; // ms + const auto max_jitter = 500; // ms + std::uniform_int_distribution<> duration_jitter(min_jitter, max_jitter); + + std::vector queue_gc; + std::mutex queue_gc_lock; + while (true) { + Timer timer(io_context); + const auto duration = (has_error ? + std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) + + std::chrono::milliseconds(duration_jitter(rnd_gen)); + timer.expires_from_now(duration); + const auto tp = ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration); + ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp) << dendl; + boost::system::error_code ec; + timer.async_wait(yield[ec]); + + queues_t queues; + auto ret = read_queue_list(queues, optional_yield(io_context, yield)); + if (ret < 0) { + has_error = true; + continue; + } + + for (const auto& queue_name : queues) { + // try to lock the queue to check if it is owned by this rgw + // or if ownershif needs to be taken + librados::ObjectWriteOperation op; + op.assert_exists(); + rados::cls::lock::lock(&op, queue_name+"_lock", + ClsLockType::EXCLUSIVE, + lock_cookie, + "" /*no tag*/, + "" /*no description*/, + failover_time, + LOCK_FLAG_MAY_RENEW); + + ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield)); + if (ret == -EBUSY) { + // lock is already taken by another RGW + ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl; + // if queue was owned by this RGW, processing should be stopped, queue would be deleted from list afterwards + continue; + } + if (ret == -ENOENT) { + // queue is deleted - processing will stop the next time we try to read from the queue + ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " should not be locked - already deleted" << dendl; + continue; + } + if (ret < 0) { + // failed to lock for another reason, continue to process other queues + ldpp_dout(this, 1) << "ERROR: failed to lock queue: " << queue_name << ". error: " << ret << dendl; + has_error = true; + continue; + } + // add queue to list of owned queues + if (owned_queues.insert(queue_name).second) { + ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " now owned (locked) by this daemon" << dendl; + // start processing this queue + spawn::spawn(io_context, [this, &queue_gc, &queue_gc_lock, queue_name](yield_context yield) { + process_queue(queue_name, yield); + // if queue processing ended, it measn that the queue was removed or not owned anymore + // mark it for deletion + std::lock_guard lock_guard(queue_gc_lock); + queue_gc.push_back(queue_name); + ldpp_dout(this, 10) << "INFO: queue: " << queue_name << " marked for removal" << dendl; + }, make_stack_allocator()); + } else { + ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " ownership (lock) renewed" << dendl; + } + } + // erase all queue that were deleted + { + std::lock_guard lock_guard(queue_gc_lock); + std::for_each(queue_gc.begin(), queue_gc.end(), [this, &owned_queues](const std::string& queue_name) { + owned_queues.erase(queue_name); + ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " removed" << dendl; + }); + queue_gc.clear(); + } + } + } + +public: + + ~Manager() { + work_guard.reset(); + io_context.stop(); + std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); }); + } + + // ctor: start all threads + Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms, + uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms, + uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s, + uint32_t _worker_count, rgw::sal::RadosStore* store) : + max_queue_size(_max_queue_size), + queues_update_period_ms(_queues_update_period_ms), + queues_update_retry_ms(_queues_update_retry_ms), + queue_idle_sleep_us(_queue_idle_sleep_us), + failover_time(std::chrono::milliseconds(failover_time_ms)), + cct(_cct), + lock_cookie(gen_rand_alphanumeric(cct, COOKIE_LEN)), + work_guard(boost::asio::make_work_guard(io_context)), + worker_count(_worker_count), + stale_reservations_period_s(_stale_reservations_period_s), + reservations_cleanup_period_s(_reservations_cleanup_period_s), + rados_ioctx(store->getRados()->get_notif_pool_ctx()) + { + spawn::spawn(io_context, [this] (yield_context yield) { + process_queues(yield); + }, make_stack_allocator()); + + // start the worker threads to do the actual queue processing + const std::string WORKER_THREAD_NAME = "notif-worker"; + for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) { + workers.emplace_back([this]() { + try { + io_context.run(); + } catch (const std::exception& err) { + ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl; + throw(err); + } + }); + const auto rc = ceph_pthread_setname(workers.back().native_handle(), + (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str()); + ceph_assert(rc == 0); + } + ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl; + } + + int add_persistent_topic(const std::string& topic_name, optional_yield y) { + if (topic_name == Q_LIST_OBJECT_NAME) { + ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl; + return -EINVAL; + } + librados::ObjectWriteOperation op; + op.create(true); + cls_2pc_queue_init(op, topic_name, max_queue_size); + auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y); + if (ret == -EEXIST) { + // queue already exists - nothing to do + ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already exists. nothing to do" << dendl; + return 0; + } + if (ret < 0) { + // failed to create queue + ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name << ". error: " << ret << dendl; + return ret; + } + + bufferlist empty_bl; + std::map new_topic{{topic_name, empty_bl}}; + op.omap_set(new_topic); + ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y); + if (ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name << " to queue list. error: " << ret << dendl; + return ret; + } + ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " added to queue list" << dendl; + return 0; + } +}; + +// singleton manager +// note that the manager itself is not a singleton, and multiple instances may co-exist +// TODO make the pointer atomic in allocation and deallocation to avoid race conditions +static Manager* s_manager = nullptr; + +constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB +constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30; // check queue list every 30seconds +constexpr uint32_t Q_LIST_RETRY_MSEC = 1000; // retry every second if queue list update failed +constexpr uint32_t IDLE_TIMEOUT_USEC = 100*1000; // idle sleep 100ms +constexpr uint32_t FAILOVER_TIME_MSEC = 3*Q_LIST_UPDATE_MSEC; // FAILOVER TIME 3x renew time +constexpr uint32_t WORKER_COUNT = 1; // 1 worker thread +constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old +constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds + +bool init(CephContext* cct, rgw::sal::RadosStore* store, const DoutPrefixProvider *dpp) { + if (s_manager) { + return false; + } + // TODO: take conf from CephContext + s_manager = new Manager(cct, MAX_QUEUE_SIZE, + Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC, + IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC, + STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S, + WORKER_COUNT, + store); + return true; +} + +void shutdown() { + delete s_manager; + s_manager = nullptr; +} + +int add_persistent_topic(const std::string& topic_name, optional_yield y) { + if (!s_manager) { + return -EAGAIN; + } + return s_manager->add_persistent_topic(topic_name, y); +} + +int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_name, optional_yield y) { + librados::ObjectWriteOperation op; + op.remove(); + auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_name, &op, y); + if (ret == -ENOENT) { + // queue already removed - nothing to do + ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl; + return 0; + } + if (ret < 0) { + // failed to remove queue + ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl; + return ret; + } + + std::set topic_to_remove{{topic_name}}; + op.omap_rm_keys(topic_to_remove); + ret = rgw_rados_operate(dpp, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl; + return ret; + } + ldpp_dout(dpp, 20) << "INFO: queue: " << topic_name << " removed from queue list" << dendl; + return 0; +} + +int remove_persistent_topic(const std::string& topic_name, optional_yield y) { + if (!s_manager) { + return -EAGAIN; + } + return remove_persistent_topic(s_manager, s_manager->rados_ioctx, topic_name, y); +} + +rgw::sal::Object* get_object_with_atttributes( + const reservation_t& res, rgw::sal::Object* obj) { + // in case of copy obj, the tags and metadata are taken from source + const auto src_obj = res.src_object ? res.src_object : obj; + if (src_obj->get_attrs().empty()) { + if (!src_obj->get_bucket()) { + src_obj->set_bucket(res.bucket); + } + const auto ret = src_obj->get_obj_attrs(res.yield, res.dpp); + if (ret < 0) { + ldpp_dout(res.dpp, 20) << "failed to get attributes from object: " << + src_obj->get_key() << ". ret = " << ret << dendl; + return nullptr; + } + } + return src_obj; +} + +static inline void filter_amz_meta(meta_map_t& dest, const meta_map_t& src) { + std::copy_if(src.cbegin(), src.cend(), + std::inserter(dest, dest.end()), + [](const auto& m) { + return (boost::algorithm::starts_with(m.first, RGW_AMZ_META_PREFIX)); + }); +} + + +static inline void metadata_from_attributes( + reservation_t& res, rgw::sal::Object* obj) { + auto& metadata = res.x_meta_map; + const auto src_obj = get_object_with_atttributes(res, obj); + if (!src_obj) { + return; + } + res.metadata_fetched_from_attributes = true; + for (auto& attr : src_obj->get_attrs()) { + if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) { + std::string_view key(attr.first); + key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1); + // we want to pass a null terminated version + // of the bufferlist, hence "to_str().c_str()" + metadata.emplace(key, attr.second.to_str().c_str()); + } + } +} + +static inline void tags_from_attributes( + const reservation_t& res, rgw::sal::Object* obj, KeyMultiValueMap& tags) { + const auto src_obj = get_object_with_atttributes(res, obj); + if (!src_obj) { + return; + } + const auto& attrs = src_obj->get_attrs(); + const auto attr_iter = attrs.find(RGW_ATTR_TAGS); + if (attr_iter != attrs.end()) { + auto bliter = attr_iter->second.cbegin(); + RGWObjTags obj_tags; + try { + ::decode(obj_tags, bliter); + } catch(buffer::error&) { + // not able to decode tags + return; + } + tags = std::move(obj_tags.get_tags()); + } +} + +// populate event from request +static inline void populate_event(reservation_t& res, + rgw::sal::Object* obj, + uint64_t size, + const ceph::real_time& mtime, + const std::string& etag, + const std::string& version, + EventType event_type, + rgw_pubsub_s3_event& event) { + event.eventTime = mtime; + event.eventName = to_event_string(event_type); + event.userIdentity = res.user_id; // user that triggered the change + event.x_amz_request_id = res.req_id; // request ID of the original change + event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made + // configurationId is filled from notification configuration + event.bucket_name = res.bucket->get_name(); + event.bucket_ownerIdentity = res.bucket->get_owner() ? + res.bucket->get_owner()->get_id().id : res.bucket->get_info().owner.id; + const auto region = res.store->get_zone()->get_zonegroup().get_api_name(); + rgw::ARN bucket_arn(res.bucket->get_key()); + bucket_arn.region = region; + event.bucket_arn = to_string(bucket_arn); + event.object_key = res.object_name ? *res.object_name : obj->get_name(); + event.object_size = size; + event.object_etag = etag; + event.object_versionId = version; + event.awsRegion = region; + // use timestamp as per key sequence id (hex encoded) + const utime_t ts(real_clock::now()); + boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), + std::back_inserter(event.object_sequencer)); + set_event_id(event.id, etag, ts); + event.bucket_id = res.bucket->get_bucket_id(); + // pass meta data + if (!res.metadata_fetched_from_attributes) { + // either no metadata exist or no metadata filter was used + metadata_from_attributes(res, obj); + } + event.x_meta_map = res.x_meta_map; + // pass tags + if (!res.tagset || + (*res.tagset).get_tags().empty()) { + // try to fetch the tags from the attributes + tags_from_attributes(res, obj, event.tags); + } else { + event.tags = (*res.tagset).get_tags(); + } + // opaque data will be filled from topic configuration +} + +static inline bool notification_match(reservation_t& res, + const rgw_pubsub_topic_filter& filter, + EventType event, + const RGWObjTags* req_tags) { + if (!match(filter.events, event)) { + return false; + } + const auto obj = res.object; + if (!match(filter.s3_filter.key_filter, + res.object_name ? *res.object_name : obj->get_name())) { + return false; + } + + if (!filter.s3_filter.metadata_filter.kv.empty()) { + // metadata filter exists + if (res.s) { + filter_amz_meta(res.x_meta_map, res.s->info.x_meta_map); + } + metadata_from_attributes(res, obj); + if (!match(filter.s3_filter.metadata_filter, res.x_meta_map)) { + return false; + } + } + + if (!filter.s3_filter.tag_filter.kv.empty()) { + // tag filter exists + if (req_tags) { + // tags in the request + if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) { + return false; + } + } else if (res.tagset && !(*res.tagset).get_tags().empty()) { + // tags were cached in req_state + if (!match(filter.s3_filter.tag_filter, (*res.tagset).get_tags())) { + return false; + } + } else { + // try to fetch tags from the attributes + KeyMultiValueMap tags; + tags_from_attributes(res, obj, tags); + if (!match(filter.s3_filter.tag_filter, tags)) { + return false; + } + } + } + + return true; +} + + int publish_reserve(const DoutPrefixProvider* dpp, + EventType event_type, + reservation_t& res, + const RGWObjTags* req_tags) +{ + const RGWPubSub ps(res.store, res.user_tenant); + const RGWPubSub::Bucket ps_bucket(ps, res.bucket); + rgw_pubsub_bucket_topics bucket_topics; + auto rc = ps_bucket.get_topics(res.dpp, bucket_topics, res.yield); + if (rc < 0) { + // failed to fetch bucket topics + return rc; + } + for (const auto& bucket_topic : bucket_topics.topics) { + const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second; + const rgw_pubsub_topic& topic_cfg = topic_filter.topic; + if (!notification_match(res, topic_filter, event_type, req_tags)) { + // notification does not apply to req_state + continue; + } + ldpp_dout(res.dpp, 20) << "INFO: notification: '" << topic_filter.s3_id << + "' on topic: '" << topic_cfg.dest.arn_topic << + "' and bucket: '" << res.bucket->get_name() << + "' (unique topic: '" << topic_cfg.name << + "') apply to event of type: '" << to_string(event_type) << "'" << dendl; + + cls_2pc_reservation::id_t res_id; + if (topic_cfg.dest.persistent) { + // TODO: take default reservation size from conf + constexpr auto DEFAULT_RESERVATION = 4*1024U; // 4K + res.size = DEFAULT_RESERVATION; + librados::ObjectWriteOperation op; + bufferlist obl; + int rval; + const auto& queue_name = topic_cfg.dest.arn_topic; + cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval); + auto ret = rgw_rados_operate( + res.dpp, res.store->getRados()->get_notif_pool_ctx(), + queue_name, &op, res.yield, librados::OPERATION_RETURNVEC); + if (ret < 0) { + ldpp_dout(res.dpp, 1) << + "ERROR: failed to reserve notification on queue: " + << queue_name << ". error: " << ret << dendl; + // if no space is left in queue we ask client to slow down + return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret; + } + ret = cls_2pc_queue_reserve_result(obl, res_id); + if (ret < 0) { + ldpp_dout(res.dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl; + return ret; + } + } + res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id); + } + return 0; +} + +int publish_commit(rgw::sal::Object* obj, + uint64_t size, + const ceph::real_time& mtime, + const std::string& etag, + const std::string& version, + EventType event_type, + reservation_t& res, + const DoutPrefixProvider* dpp) +{ + for (auto& topic : res.topics) { + if (topic.cfg.dest.persistent && + topic.res_id == cls_2pc_reservation::NO_ID) { + // nothing to commit or already committed/aborted + continue; + } + event_entry_t event_entry; + populate_event(res, obj, size, mtime, etag, version, event_type, event_entry.event); + event_entry.event.configurationId = topic.configurationId; + event_entry.event.opaque_data = topic.cfg.opaque_data; + if (topic.cfg.dest.persistent) { + event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint); + event_entry.push_endpoint_args = + std::move(topic.cfg.dest.push_endpoint_args); + event_entry.arn_topic = topic.cfg.dest.arn_topic; + bufferlist bl; + encode(event_entry, bl); + const auto& queue_name = topic.cfg.dest.arn_topic; + if (bl.length() > res.size) { + // try to make a larger reservation, fail only if this is not possible + ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length() + << " exceeded reserved size: " << res.size + << + " . trying to make a larger reservation on queue:" << queue_name + << dendl; + // first cancel the existing reservation + librados::ObjectWriteOperation op; + cls_2pc_queue_abort(op, topic.res_id); + auto ret = rgw_rados_operate( + dpp, res.store->getRados()->get_notif_pool_ctx(), + topic.cfg.dest.arn_topic, &op, + res.yield); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " + << topic.res_id << + " when trying to make a larger reservation on queue: " << queue_name + << ". error: " << ret << dendl; + return ret; + } + // now try to make a bigger one + buffer::list obl; + int rval; + cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval); + ret = rgw_rados_operate( + dpp, res.store->getRados()->get_notif_pool_ctx(), + queue_name, &op, res.yield, librados::OPERATION_RETURNVEC); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: " + << queue_name + << ". error: " << ret << dendl; + return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret; + } + ret = cls_2pc_queue_reserve_result(obl, topic.res_id); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for " + "extra space. error: " << ret << dendl; + return ret; + } + } + std::vector bl_data_vec{std::move(bl)}; + librados::ObjectWriteOperation op; + cls_2pc_queue_commit(op, bl_data_vec, topic.res_id); + const auto ret = rgw_rados_operate( + dpp, res.store->getRados()->get_notif_pool_ctx(), + queue_name, &op, res.yield); + topic.res_id = cls_2pc_reservation::NO_ID; + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: " + << queue_name << ". error: " << ret + << dendl; + return ret; + } + } else { + try { + // TODO add endpoint LRU cache + const auto push_endpoint = RGWPubSubEndpoint::create( + topic.cfg.dest.push_endpoint, + topic.cfg.dest.arn_topic, + RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp), + dpp->get_cct()); + ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: " + << topic.cfg.dest.push_endpoint << dendl; + const auto ret = push_endpoint->send_to_completion_async( + dpp->get_cct(), event_entry.event, res.yield); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: push to endpoint " + << topic.cfg.dest.push_endpoint + << " failed. error: " << ret << dendl; + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return ret; + } + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); + } catch (const RGWPubSubEndpoint::configuration_error& e) { + ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: " + << topic.cfg.dest.push_endpoint << ". error: " << e.what() << dendl; + if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); + return -EINVAL; + } + } + } + return 0; +} + +int publish_abort(reservation_t& res) { + for (auto& topic : res.topics) { + if (!topic.cfg.dest.persistent || + topic.res_id == cls_2pc_reservation::NO_ID) { + // nothing to abort or already committed/aborted + continue; + } + const auto& queue_name = topic.cfg.dest.arn_topic; + librados::ObjectWriteOperation op; + cls_2pc_queue_abort(op, topic.res_id); + const auto ret = rgw_rados_operate( + res.dpp, res.store->getRados()->get_notif_pool_ctx(), + queue_name, &op, res.yield); + if (ret < 0) { + ldpp_dout(res.dpp, 1) << "ERROR: failed to abort reservation: " + << topic.res_id << + " from queue: " << queue_name << ". error: " << ret << dendl; + return ret; + } + topic.res_id = cls_2pc_reservation::NO_ID; + } + return 0; +} + +reservation_t::reservation_t(const DoutPrefixProvider* _dpp, + rgw::sal::RadosStore* _store, + const req_state* _s, + rgw::sal::Object* _object, + rgw::sal::Object* _src_object, + const std::string* _object_name, + optional_yield y) : + dpp(_s), store(_store), s(_s), size(0) /* XXX */, + object(_object), src_object(_src_object), bucket(_s->bucket.get()), + object_name(_object_name), + tagset(_s->tagset), + metadata_fetched_from_attributes(false), + user_id(_s->user->get_id().id), + user_tenant(_s->user->get_id().tenant), + req_id(_s->req_id), + yield(y) +{ + filter_amz_meta(x_meta_map, _s->info.x_meta_map); +} + +reservation_t::reservation_t(const DoutPrefixProvider* _dpp, + rgw::sal::RadosStore* _store, + rgw::sal::Object* _object, + rgw::sal::Object* _src_object, + rgw::sal::Bucket* _bucket, + const std::string& _user_id, + const std::string& _user_tenant, + const std::string& _req_id, + optional_yield y) : + dpp(_dpp), store(_store), s(nullptr), size(0) /* XXX */, + object(_object), src_object(_src_object), bucket(_bucket), + object_name(nullptr), + metadata_fetched_from_attributes(false), + user_id(_user_id), + user_tenant(_user_tenant), + req_id(_req_id), + yield(y) +{} + +reservation_t::~reservation_t() { + publish_abort(*this); +} + +} // namespace rgw::notify -- cgit v1.2.3