From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rgw/driver/rados/rgw_cr_rados.h | 1647 +++++++++++++++++++++++++++++++++++ 1 file changed, 1647 insertions(+) create mode 100644 src/rgw/driver/rados/rgw_cr_rados.h (limited to 'src/rgw/driver/rados/rgw_cr_rados.h') diff --git a/src/rgw/driver/rados/rgw_cr_rados.h b/src/rgw/driver/rados/rgw_cr_rados.h new file mode 100644 index 000000000..7bda18878 --- /dev/null +++ b/src/rgw/driver/rados/rgw_cr_rados.h @@ -0,0 +1,1647 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#pragma once + +#include +#include "include/ceph_assert.h" +#include "rgw_coroutine.h" +#include "rgw_sal.h" +#include "rgw_sal_rados.h" +#include "common/WorkQueue.h" +#include "common/Throttle.h" + +#include +#include "common/ceph_time.h" + +#include "services/svc_sys_obj.h" +#include "services/svc_bucket.h" + +struct rgw_http_param_pair; +class RGWRESTConn; + +class RGWAsyncRadosRequest : public RefCountedObject { + RGWCoroutine *caller; + RGWAioCompletionNotifier *notifier; + + int retcode; + + ceph::mutex lock = ceph::make_mutex("RGWAsyncRadosRequest::lock"); + +protected: + virtual int _send_request(const DoutPrefixProvider *dpp) = 0; +public: + RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) + : caller(_caller), notifier(_cn), retcode(0) { + } + ~RGWAsyncRadosRequest() override { + if (notifier) { + notifier->put(); + } + } + + void send_request(const DoutPrefixProvider *dpp) { + get(); + retcode = _send_request(dpp); + { + std::lock_guard l{lock}; + if (notifier) { + notifier->cb(); // drops its own ref + notifier = nullptr; + } + } + put(); + } + + int get_ret_status() { return retcode; } + + void finish() { + { + std::lock_guard l{lock}; + if (notifier) { + // we won't call notifier->cb() to drop its ref, so drop it here + notifier->put(); + notifier = nullptr; + } + } + put(); + } +}; + + +class RGWAsyncRadosProcessor { + std::deque m_req_queue; + std::atomic going_down = { false }; +protected: + CephContext *cct; + ThreadPool m_tp; + Throttle req_throttle; + + struct RGWWQ : public DoutPrefixProvider, public ThreadPool::WorkQueue { + RGWAsyncRadosProcessor *processor; + RGWWQ(RGWAsyncRadosProcessor *p, + ceph::timespan timeout, ceph::timespan suicide_timeout, + ThreadPool *tp) + : ThreadPool::WorkQueue("RGWWQ", timeout, suicide_timeout, tp), processor(p) {} + + bool _enqueue(RGWAsyncRadosRequest *req) override; + void _dequeue(RGWAsyncRadosRequest *req) override { + ceph_abort(); + } + bool _empty() override; + RGWAsyncRadosRequest *_dequeue() override; + using ThreadPool::WorkQueue::_process; + void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override; + void _dump_queue(); + void _clear() override { + ceph_assert(processor->m_req_queue.empty()); + } + + CephContext *get_cct() const { return processor->cct; } + unsigned get_subsys() const { return ceph_subsys_rgw; } + std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw async rados processor: ";} + + } req_wq; + +public: + RGWAsyncRadosProcessor(CephContext *_cct, int num_threads); + ~RGWAsyncRadosProcessor() {} + void start(); + void stop(); + void handle_request(const DoutPrefixProvider *dpp, RGWAsyncRadosRequest *req); + void queue(RGWAsyncRadosRequest *req); + + bool is_going_down() { + return going_down; + } + +}; + +template +class RGWSimpleWriteOnlyAsyncCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + rgw::sal::RadosStore* store; + + P params; + const DoutPrefixProvider *dpp; + + class Request : public RGWAsyncRadosRequest { + rgw::sal::RadosStore* store; + P params; + const DoutPrefixProvider *dpp; + protected: + int _send_request(const DoutPrefixProvider *dpp) override; + public: + Request(RGWCoroutine *caller, + RGWAioCompletionNotifier *cn, + rgw::sal::RadosStore* store, + const P& _params, + const DoutPrefixProvider *dpp) : RGWAsyncRadosRequest(caller, cn), + store(store), + params(_params), + dpp(dpp) {} + } *req{nullptr}; + + public: + RGWSimpleWriteOnlyAsyncCR(RGWAsyncRadosProcessor *_async_rados, + rgw::sal::RadosStore* _store, + const P& _params, + const DoutPrefixProvider *_dpp) : RGWSimpleCoroutine(_store->ctx()), + async_rados(_async_rados), + store(_store), + params(_params), + dpp(_dpp) {} + + ~RGWSimpleWriteOnlyAsyncCR() override { + request_cleanup(); + } + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request(const DoutPrefixProvider *dpp) override { + req = new Request(this, + stack->create_completion_notifier(), + store, + params, + dpp); + + async_rados->queue(req); + return 0; + } + int request_complete() override { + return req->get_ret_status(); + } +}; + + +template +class RGWSimpleAsyncCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + rgw::sal::RadosStore* store; + + P params; + std::shared_ptr result; + const DoutPrefixProvider *dpp; + + class Request : public RGWAsyncRadosRequest { + rgw::sal::RadosStore* store; + P params; + std::shared_ptr result; + const DoutPrefixProvider *dpp; + protected: + int _send_request(const DoutPrefixProvider *dpp) override; + public: + Request(const DoutPrefixProvider *dpp, + RGWCoroutine *caller, + RGWAioCompletionNotifier *cn, + rgw::sal::RadosStore* _store, + const P& _params, + std::shared_ptr& _result, + const DoutPrefixProvider *_dpp) : RGWAsyncRadosRequest(caller, cn), + store(_store), + params(_params), + result(_result), + dpp(_dpp) {} + } *req{nullptr}; + + public: + RGWSimpleAsyncCR(RGWAsyncRadosProcessor *_async_rados, + rgw::sal::RadosStore* _store, + const P& _params, + std::shared_ptr& _result, + const DoutPrefixProvider *_dpp) : RGWSimpleCoroutine(_store->ctx()), + async_rados(_async_rados), + store(_store), + params(_params), + result(_result), + dpp(_dpp) {} + + ~RGWSimpleAsyncCR() override { + request_cleanup(); + } + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request(const DoutPrefixProvider *dpp) override { + req = new Request(dpp, + this, + stack->create_completion_notifier(), + store, + params, + result, + dpp); + + async_rados->queue(req); + return 0; + } + int request_complete() override { + return req->get_ret_status(); + } +}; + +class RGWGenericAsyncCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + rgw::sal::RadosStore* store; + + +public: + class Action { + public: + virtual ~Action() {} + virtual int operate() = 0; + }; + +private: + std::shared_ptr action; + + class Request : public RGWAsyncRadosRequest { + std::shared_ptr action; + protected: + int _send_request(const DoutPrefixProvider *dpp) override { + if (!action) { + return 0; + } + return action->operate(); + } + public: + Request(const DoutPrefixProvider *dpp, + RGWCoroutine *caller, + RGWAioCompletionNotifier *cn, + std::shared_ptr& _action) : RGWAsyncRadosRequest(caller, cn), + action(_action) {} + } *req{nullptr}; + + public: + RGWGenericAsyncCR(CephContext *_cct, + RGWAsyncRadosProcessor *_async_rados, + std::shared_ptr& _action) : RGWSimpleCoroutine(_cct), + async_rados(_async_rados), + action(_action) {} + template + RGWGenericAsyncCR(CephContext *_cct, + RGWAsyncRadosProcessor *_async_rados, + std::shared_ptr& _action) : RGWSimpleCoroutine(_cct), + async_rados(_async_rados), + action(std::static_pointer_cast(_action)) {} + + ~RGWGenericAsyncCR() override { + request_cleanup(); + } + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request(const DoutPrefixProvider *dpp) override { + req = new Request(dpp, this, + stack->create_completion_notifier(), + action); + + async_rados->queue(req); + return 0; + } + int request_complete() override { + return req->get_ret_status(); + } +}; + + +class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest { + const DoutPrefixProvider *dpp; + RGWSI_SysObj* svc_sysobj; + rgw_raw_obj obj; + const bool want_attrs; + const bool raw_attrs; +protected: + int _send_request(const DoutPrefixProvider *dpp) override; +public: + RGWAsyncGetSystemObj(const DoutPrefixProvider *dpp, + RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc, + RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj, + bool want_attrs, bool raw_attrs); + + bufferlist bl; + std::map attrs; + RGWObjVersionTracker objv_tracker; +}; + +class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest { + const DoutPrefixProvider *dpp; + RGWSI_SysObj *svc; + rgw_raw_obj obj; + bool exclusive; + bufferlist bl; + +protected: + int _send_request(const DoutPrefixProvider *dpp) override; +public: + RGWAsyncPutSystemObj(const DoutPrefixProvider *dpp, RGWCoroutine *caller, + RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc, + RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj, + bool _exclusive, bufferlist _bl); + + RGWObjVersionTracker objv_tracker; +}; + +class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest { + const DoutPrefixProvider *dpp; + RGWSI_SysObj *svc; + rgw_raw_obj obj; + std::map attrs; + bool exclusive; + +protected: + int _send_request(const DoutPrefixProvider *dpp) override; +public: + RGWAsyncPutSystemObjAttrs(const DoutPrefixProvider *dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc, + RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj, + std::map _attrs, bool exclusive); + + RGWObjVersionTracker objv_tracker; +}; + +class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest { + rgw::sal::RadosStore* store; + rgw_raw_obj obj; + std::string lock_name; + std::string cookie; + uint32_t duration_secs; + +protected: + int _send_request(const DoutPrefixProvider *dpp) override; +public: + RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store, + RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj, + const std::string& _name, const std::string& _cookie, uint32_t _duration_secs); +}; + +class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest { + rgw::sal::RadosStore* store; + rgw_raw_obj obj; + std::string lock_name; + std::string cookie; + +protected: + int _send_request(const DoutPrefixProvider *dpp) override; +public: + RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store, + RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj, + const std::string& _name, const std::string& _cookie); +}; + +template +class RGWSimpleRadosReadCR : public RGWSimpleCoroutine { + const DoutPrefixProvider* dpp; + rgw::sal::RadosStore* store; + rgw_raw_obj obj; + T* result; + /// on ENOENT, call handle_data() with an empty object instead of failing + const bool empty_on_enoent; + RGWObjVersionTracker* objv_tracker; + + T val; + rgw_rados_ref ref; + ceph::buffer::list bl; + boost::intrusive_ptr cn; + +public: + RGWSimpleRadosReadCR(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* store, + const rgw_raw_obj& obj, + T* result, bool empty_on_enoent = true, + RGWObjVersionTracker* objv_tracker = nullptr) + : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), + obj(obj), result(result), empty_on_enoent(empty_on_enoent), + objv_tracker(objv_tracker) { + if (!result) { + result = &val; + } + } + + int send_request(const DoutPrefixProvider *dpp) { + int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref); + if (r < 0) { + ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" + << r << dendl; + return r; + } + + set_status() << "sending request"; + + librados::ObjectReadOperation op; + if (objv_tracker) { + objv_tracker->prepare_op_for_read(&op); + } + + op.read(0, -1, &bl, nullptr); + + cn = stack->create_completion_notifier(); + return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op, + nullptr); + } + + int request_complete() { + int ret = cn->completion()->get_return_value(); + set_status() << "request complete; ret=" << ret; + + if (ret == -ENOENT && empty_on_enoent) { + *result = T(); + } else { + if (ret < 0) { + return ret; + } + try { + auto iter = bl.cbegin(); + if (iter.end()) { + // allow successful reads with empty buffers. ReadSyncStatus coroutines + // depend on this to be able to read without locking, because the + // cls lock from InitSyncStatus will create an empty object if it didn't + // exist + *result = T(); + } else { + decode(*result, iter); + } + } catch (buffer::error& err) { + return -EIO; + } + } + + return handle_data(*result); + } + + virtual int handle_data(T& data) { + return 0; + } +}; + +class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine { + const DoutPrefixProvider* dpp; + rgw::sal::RadosStore* const store; + + const rgw_raw_obj obj; + std::map* const pattrs; + const bool raw_attrs; + RGWObjVersionTracker* const objv_tracker; + + rgw_rados_ref ref; + std::map unfiltered_attrs; + boost::intrusive_ptr cn; + +public: + RGWSimpleRadosReadAttrsCR(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* store, + rgw_raw_obj obj, + std::map* pattrs, + bool raw_attrs, + RGWObjVersionTracker* objv_tracker = nullptr) + : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), + obj(std::move(obj)), pattrs(pattrs), raw_attrs(raw_attrs), + objv_tracker(objv_tracker) {} + + int send_request(const DoutPrefixProvider *dpp) override; + int request_complete() override; +}; + +template +class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine { + const DoutPrefixProvider* dpp; + rgw::sal::RadosStore* const store; + rgw_raw_obj obj; + RGWObjVersionTracker* objv_tracker; + bool exclusive; + + bufferlist bl; + rgw_rados_ref ref; + std::map unfiltered_attrs; + boost::intrusive_ptr cn; + + +public: + RGWSimpleRadosWriteCR(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* const store, + rgw_raw_obj obj, const T& data, + RGWObjVersionTracker* objv_tracker = nullptr, + bool exclusive = false) + : RGWSimpleCoroutine(store->ctx()), dpp(dpp), store(store), + obj(std::move(obj)), objv_tracker(objv_tracker), exclusive(exclusive) { + encode(data, bl); + } + + int send_request(const DoutPrefixProvider *dpp) override { + int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref); + if (r < 0) { + ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" + << r << dendl; + return r; + } + + set_status() << "sending request"; + + librados::ObjectWriteOperation op; + if (exclusive) { + op.create(true); + } + if (objv_tracker) { + objv_tracker->prepare_op_for_write(&op); + } + op.write_full(bl); + + cn = stack->create_completion_notifier(); + return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op); + } + + int request_complete() override { + int ret = cn->completion()->get_return_value(); + set_status() << "request complete; ret=" << ret; + if (ret >= 0 && objv_tracker) { + objv_tracker->apply_write(); + } + return ret; + } +}; + +class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine { + const DoutPrefixProvider* dpp; + rgw::sal::RadosStore* const store; + RGWObjVersionTracker* objv_tracker; + rgw_raw_obj obj; + std::map attrs; + bool exclusive; + + rgw_rados_ref ref; + boost::intrusive_ptr cn; + + +public: + RGWSimpleRadosWriteAttrsCR(const DoutPrefixProvider* dpp, + rgw::sal::RadosStore* const store, + rgw_raw_obj obj, + std::map attrs, + RGWObjVersionTracker* objv_tracker = nullptr, + bool exclusive = false) + : RGWSimpleCoroutine(store->ctx()), dpp(dpp), + store(store), objv_tracker(objv_tracker), + obj(std::move(obj)), attrs(std::move(attrs)), + exclusive(exclusive) {} + + int send_request(const DoutPrefixProvider *dpp) override { + int r = store->getRados()->get_raw_obj_ref(dpp, obj, &ref); + if (r < 0) { + ldpp_dout(dpp, -1) << "ERROR: failed to get ref for (" << obj << ") ret=" + << r << dendl; + return r; + } + + set_status() << "sending request"; + + librados::ObjectWriteOperation op; + if (exclusive) { + op.create(true); + } + if (objv_tracker) { + objv_tracker->prepare_op_for_write(&op); + } + + for (const auto& [name, bl] : attrs) { + if (!bl.length()) + continue; + op.setxattr(name.c_str(), bl); + } + + cn = stack->create_completion_notifier(); + if (!op.size()) { + cn->cb(); + return 0; + } + + return ref.pool.ioctx().aio_operate(ref.obj.oid, cn->completion(), &op); + } + + int request_complete() override { + int ret = cn->completion()->get_return_value(); + set_status() << "request complete; ret=" << ret; + if (ret >= 0 && objv_tracker) { + objv_tracker->apply_write(); + } + return ret; + } +}; + +class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine { + rgw::sal::RadosStore* store; + std::map entries; + + rgw_rados_ref ref; + + rgw_raw_obj obj; + + boost::intrusive_ptr cn; + +public: + RGWRadosSetOmapKeysCR(rgw::sal::RadosStore* _store, + const rgw_raw_obj& _obj, + std::map& _entries); + + int send_request(const DoutPrefixProvider *dpp) override; + int request_complete() override; +}; + +class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine { + public: + struct Result { + rgw_rados_ref ref; + std::set entries; + bool more = false; + }; + using ResultPtr = std::shared_ptr; + + RGWRadosGetOmapKeysCR(rgw::sal::RadosStore* _store, const rgw_raw_obj& _obj, + const std::string& _marker, int _max_entries, + ResultPtr result); + + int send_request(const DoutPrefixProvider *dpp) override; + int request_complete() override; + + private: + rgw::sal::RadosStore* store; + rgw_raw_obj obj; + std::string marker; + int max_entries; + ResultPtr result; + boost::intrusive_ptr cn; +}; + +class RGWRadosGetOmapValsCR : public RGWSimpleCoroutine { + public: + struct Result { + rgw_rados_ref ref; + std::map entries; + bool more = false; + }; + using ResultPtr = std::shared_ptr; + + RGWRadosGetOmapValsCR(rgw::sal::RadosStore* _store, const rgw_raw_obj& _obj, + const std::string& _marker, int _max_entries, + ResultPtr result); + + int send_request(const DoutPrefixProvider *dpp) override; + int request_complete() override; + + private: + rgw::sal::RadosStore* store; + rgw_raw_obj obj; + std::string marker; + int max_entries; + ResultPtr result; + boost::intrusive_ptr cn; +}; + +class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine { + rgw::sal::RadosStore* store; + + rgw_rados_ref ref; + + std::set keys; + + rgw_raw_obj obj; + + boost::intrusive_ptr cn; + +public: + RGWRadosRemoveOmapKeysCR(rgw::sal::RadosStore* _store, + const rgw_raw_obj& _obj, + const std::set& _keys); + + int send_request(const DoutPrefixProvider *dpp) override; + + int request_complete() override; +}; + +class RGWRadosRemoveCR : public RGWSimpleCoroutine { + rgw::sal::RadosStore* store; + librados::IoCtx ioctx; + const rgw_raw_obj obj; + RGWObjVersionTracker* objv_tracker; + boost::intrusive_ptr cn; + +public: + RGWRadosRemoveCR(rgw::sal::RadosStore* store, const rgw_raw_obj& obj, + RGWObjVersionTracker* objv_tracker = nullptr); + + int send_request(const DoutPrefixProvider *dpp) override; + int request_complete() override; +}; + +class RGWRadosRemoveOidCR : public RGWSimpleCoroutine { + librados::IoCtx ioctx; + const std::string oid; + RGWObjVersionTracker* objv_tracker; + boost::intrusive_ptr cn; + +public: + RGWRadosRemoveOidCR(rgw::sal::RadosStore* store, + librados::IoCtx&& ioctx, std::string_view oid, + RGWObjVersionTracker* objv_tracker = nullptr); + + RGWRadosRemoveOidCR(rgw::sal::RadosStore* store, + RGWSI_RADOS::Obj& obj, + RGWObjVersionTracker* objv_tracker = nullptr); + + RGWRadosRemoveOidCR(rgw::sal::RadosStore* store, + RGWSI_RADOS::Obj&& obj, + RGWObjVersionTracker* objv_tracker = nullptr); + + int send_request(const DoutPrefixProvider *dpp) override; + int request_complete() override; +}; + +class RGWSimpleRadosLockCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + rgw::sal::RadosStore* store; + std::string lock_name; + std::string cookie; + uint32_t duration; + + rgw_raw_obj obj; + + RGWAsyncLockSystemObj *req; + +public: + RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store, + const rgw_raw_obj& _obj, + const std::string& _lock_name, + const std::string& _cookie, + uint32_t _duration); + ~RGWSimpleRadosLockCR() override { + request_cleanup(); + } + void request_cleanup() override; + + int send_request(const DoutPrefixProvider *dpp) override; + int request_complete() override; + + static std::string gen_random_cookie(CephContext* cct) { + static constexpr std::size_t COOKIE_LEN = 16; + char buf[COOKIE_LEN + 1]; + gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1); + return buf; + } +}; + +class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + rgw::sal::RadosStore* store; + std::string lock_name; + std::string cookie; + + rgw_raw_obj obj; + + RGWAsyncUnlockSystemObj *req; + +public: + RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store, + const rgw_raw_obj& _obj, + const std::string& _lock_name, + const std::string& _cookie); + ~RGWSimpleRadosUnlockCR() override { + request_cleanup(); + } + void request_cleanup() override; + + int send_request(const DoutPrefixProvider *dpp) override; + int request_complete() override; +}; + +#define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100 + +class RGWOmapAppend : public RGWConsumerCR { + RGWAsyncRadosProcessor *async_rados; + rgw::sal::RadosStore* store; + + rgw_raw_obj obj; + + bool going_down; + + int num_pending_entries; + std::list pending_entries; + + std::map entries; + + uint64_t window_size; + uint64_t total_entries; +public: + RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store, + const rgw_raw_obj& _obj, + uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT); + int operate(const DoutPrefixProvider *dpp) override; + void flush_pending(); + bool append(const std::string& s); + bool finish(); + + uint64_t get_total_entries() { + return total_entries; + } + + const rgw_raw_obj& get_obj() { + return obj; + } +}; + +class RGWShardedOmapCRManager { + RGWAsyncRadosProcessor *async_rados; + rgw::sal::RadosStore* store; + RGWCoroutine *op; + + int num_shards; + + std::vector shards; +public: + RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const std::string& oid_prefix) + : async_rados(_async_rados), + store(_store), op(_op), num_shards(_num_shards) { + shards.reserve(num_shards); + for (int i = 0; i < num_shards; ++i) { + char buf[oid_prefix.size() + 16]; + snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i); + RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf)); + shard->get(); + shards.push_back(shard); + op->spawn(shard, false); + } + } + + ~RGWShardedOmapCRManager() { + for (auto shard : shards) { + shard->put(); + } + } + + bool append(const std::string& entry, int shard_id) { + return shards[shard_id]->append(entry); + } + bool finish() { + bool success = true; + for (auto& append_op : shards) { + success &= (append_op->finish() && (!append_op->is_error())); + } + return success; + } + + uint64_t get_total_entries(int shard_id) { + return shards[shard_id]->get_total_entries(); + } +}; + +class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest { + rgw::sal::RadosStore* store; + rgw_bucket bucket; + const DoutPrefixProvider *dpp; + +protected: + int _send_request(const DoutPrefixProvider *dpp) override; +public: + RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, + rgw::sal::RadosStore* _store, const rgw_bucket& bucket, + const DoutPrefixProvider *dpp) + : RGWAsyncRadosRequest(caller, cn), store(_store), bucket(bucket), dpp(dpp) {} + + RGWBucketInfo bucket_info; + std::map attrs; +}; + +class RGWAsyncPutBucketInstanceInfo : public RGWAsyncRadosRequest { + rgw::sal::RadosStore* store; + RGWBucketInfo& bucket_info; + bool exclusive; + real_time mtime; + std::map* attrs; + const DoutPrefixProvider *dpp; + +protected: + int _send_request(const DoutPrefixProvider *dpp) override; +public: + RGWAsyncPutBucketInstanceInfo(RGWCoroutine* caller, + RGWAioCompletionNotifier* cn, + rgw::sal::RadosStore* store, + RGWBucketInfo& bucket_info, + bool exclusive, + real_time mtime, + std::map* attrs, + const DoutPrefixProvider* dpp) + : RGWAsyncRadosRequest(caller, cn), store(store), bucket_info(bucket_info), + exclusive(exclusive), mtime(mtime), attrs(attrs), dpp(dpp) {} +}; + +class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + rgw::sal::RadosStore* store; + rgw_bucket bucket; + RGWBucketInfo *bucket_info; + std::map *pattrs; + const DoutPrefixProvider *dpp; + + RGWAsyncGetBucketInstanceInfo *req{nullptr}; + +public: + // rgw_bucket constructor + RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store, + const rgw_bucket& _bucket, RGWBucketInfo *_bucket_info, + std::map *_pattrs, const DoutPrefixProvider *dpp) + : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), + bucket(_bucket), bucket_info(_bucket_info), pattrs(_pattrs), dpp(dpp) {} + ~RGWGetBucketInstanceInfoCR() override { + request_cleanup(); + } + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request(const DoutPrefixProvider *dpp) override { + req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, bucket, dpp); + async_rados->queue(req); + return 0; + } + int request_complete() override { + if (bucket_info) { + *bucket_info = std::move(req->bucket_info); + } + if (pattrs) { + *pattrs = std::move(req->attrs); + } + return req->get_ret_status(); + } +}; + +class RGWPutBucketInstanceInfoCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + rgw::sal::RadosStore* store; + RGWBucketInfo& bucket_info; + bool exclusive; + real_time mtime; + std::map* attrs; + const DoutPrefixProvider *dpp; + + RGWAsyncPutBucketInstanceInfo* req = nullptr; + +public: + // rgw_bucket constructor + RGWPutBucketInstanceInfoCR(RGWAsyncRadosProcessor *async_rados, + rgw::sal::RadosStore* store, + RGWBucketInfo& bucket_info, + bool exclusive, + real_time mtime, + std::map* attrs, + const DoutPrefixProvider *dpp) + : RGWSimpleCoroutine(store->ctx()), async_rados(async_rados), store(store), + bucket_info(bucket_info), exclusive(exclusive), + mtime(mtime), attrs(attrs), dpp(dpp) {} + ~RGWPutBucketInstanceInfoCR() override { + request_cleanup(); + } + void request_cleanup() override { + if (req) { + req->finish(); + req = nullptr; + } + } + + int send_request(const DoutPrefixProvider *dpp) override { + req = new RGWAsyncPutBucketInstanceInfo(this, + stack->create_completion_notifier(), + store, bucket_info, exclusive, + mtime, attrs, dpp); + async_rados->queue(req); + return 0; + } + int request_complete() override { + return req->get_ret_status(); + } +}; + +class RGWRadosBILogTrimCR : public RGWSimpleCoroutine { + const RGWBucketInfo& bucket_info; + int shard_id; + const rgw::bucket_index_layout_generation generation; + RGWRados::BucketShard bs; + std::string start_marker; + std::string end_marker; + boost::intrusive_ptr cn; + public: + RGWRadosBILogTrimCR(const DoutPrefixProvider *dpp, + rgw::sal::RadosStore* store, const RGWBucketInfo& bucket_info, + int shard_id, + const rgw::bucket_index_layout_generation& generation, + const std::string& start_marker, + const std::string& end_marker); + + int send_request(const DoutPrefixProvider *dpp) override; + int request_complete() override; +}; + +class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest { + rgw::sal::RadosStore* store; + rgw_zone_id source_zone; + + std::optional user_id; + + rgw_bucket src_bucket; + std::optional dest_placement_rule; + RGWBucketInfo dest_bucket_info; + + rgw_obj_key key; + std::optional dest_key; + std::optional versioned_epoch; + + real_time src_mtime; + + bool copy_if_newer; + std::shared_ptr filter; + rgw_zone_set_entry source_trace_entry; + rgw_zone_set zones_trace; + PerfCounters* counters; + const DoutPrefixProvider *dpp; + +protected: + int _send_request(const DoutPrefixProvider *dpp) override; +public: + RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store, + const rgw_zone_id& _source_zone, + std::optional& _user_id, + const rgw_bucket& _src_bucket, + std::optional _dest_placement_rule, + const RGWBucketInfo& _dest_bucket_info, + const rgw_obj_key& _key, + const std::optional& _dest_key, + std::optional _versioned_epoch, + bool _if_newer, + std::shared_ptr _filter, + const rgw_zone_set_entry& source_trace_entry, + rgw_zone_set *_zones_trace, + PerfCounters* counters, const DoutPrefixProvider *dpp) + : RGWAsyncRadosRequest(caller, cn), store(_store), + source_zone(_source_zone), + user_id(_user_id), + src_bucket(_src_bucket), + dest_placement_rule(_dest_placement_rule), + dest_bucket_info(_dest_bucket_info), + key(_key), + dest_key(_dest_key), + versioned_epoch(_versioned_epoch), + copy_if_newer(_if_newer), + filter(_filter), + source_trace_entry(source_trace_entry), + counters(counters), + dpp(dpp) + { + if (_zones_trace) { + zones_trace = *_zones_trace; + } + } +}; + +class RGWFetchRemoteObjCR : public RGWSimpleCoroutine { + CephContext *cct; + RGWAsyncRadosProcessor *async_rados; + rgw::sal::RadosStore* store; + rgw_zone_id source_zone; + + std::optional user_id; + + rgw_bucket src_bucket; + std::optional dest_placement_rule; + RGWBucketInfo dest_bucket_info; + + rgw_obj_key key; + std::optional dest_key; + std::optional versioned_epoch; + + real_time src_mtime; + + bool copy_if_newer; + + std::shared_ptr filter; + + RGWAsyncFetchRemoteObj *req; + const rgw_zone_set_entry& source_trace_entry; + rgw_zone_set *zones_trace; + PerfCounters* counters; + const DoutPrefixProvider *dpp; + +public: + RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store, + const rgw_zone_id& _source_zone, + std::optional _user_id, + const rgw_bucket& _src_bucket, + std::optional _dest_placement_rule, + const RGWBucketInfo& _dest_bucket_info, + const rgw_obj_key& _key, + const std::optional& _dest_key, + std::optional _versioned_epoch, + bool _if_newer, + std::shared_ptr _filter, + const rgw_zone_set_entry& source_trace_entry, + rgw_zone_set *_zones_trace, + PerfCounters* counters, const DoutPrefixProvider *dpp) + : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), + async_rados(_async_rados), store(_store), + source_zone(_source_zone), + user_id(_user_id), + src_bucket(_src_bucket), + dest_placement_rule(_dest_placement_rule), + dest_bucket_info(_dest_bucket_info), + key(_key), + dest_key(_dest_key), + versioned_epoch(_versioned_epoch), + copy_if_newer(_if_newer), + filter(_filter), + req(NULL), + source_trace_entry(source_trace_entry), + zones_trace(_zones_trace), counters(counters), dpp(dpp) {} + + + ~RGWFetchRemoteObjCR() override { + request_cleanup(); + } + + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request(const DoutPrefixProvider *dpp) override { + req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, + source_zone, user_id, src_bucket, dest_placement_rule, dest_bucket_info, + key, dest_key, versioned_epoch, copy_if_newer, filter, + source_trace_entry, zones_trace, counters, dpp); + async_rados->queue(req); + return 0; + } + + int request_complete() override { + return req->get_ret_status(); + } +}; + +class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest { + rgw::sal::RadosStore* store; + rgw_zone_id source_zone; + + rgw_bucket src_bucket; + rgw_obj_key key; + + ceph::real_time *pmtime; + uint64_t *psize; + std::string *petag; + std::map *pattrs; + std::map *pheaders; + +protected: + int _send_request(const DoutPrefixProvider *dpp) override; +public: + RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* _store, + const rgw_zone_id& _source_zone, + rgw_bucket& _src_bucket, + const rgw_obj_key& _key, + ceph::real_time *_pmtime, + uint64_t *_psize, + std::string *_petag, + std::map *_pattrs, + std::map *_pheaders) : RGWAsyncRadosRequest(caller, cn), store(_store), + source_zone(_source_zone), + src_bucket(_src_bucket), + key(_key), + pmtime(_pmtime), + psize(_psize), + petag(_petag), + pattrs(_pattrs), + pheaders(_pheaders) {} +}; + +class RGWStatRemoteObjCR : public RGWSimpleCoroutine { + CephContext *cct; + RGWAsyncRadosProcessor *async_rados; + rgw::sal::RadosStore* store; + rgw_zone_id source_zone; + + rgw_bucket src_bucket; + rgw_obj_key key; + + ceph::real_time *pmtime; + uint64_t *psize; + std::string *petag; + std::map *pattrs; + std::map *pheaders; + + RGWAsyncStatRemoteObj *req; + +public: + RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store, + const rgw_zone_id& _source_zone, + rgw_bucket& _src_bucket, + const rgw_obj_key& _key, + ceph::real_time *_pmtime, + uint64_t *_psize, + std::string *_petag, + std::map *_pattrs, + std::map *_pheaders) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), + async_rados(_async_rados), store(_store), + source_zone(_source_zone), + src_bucket(_src_bucket), + key(_key), + pmtime(_pmtime), + psize(_psize), + petag(_petag), + pattrs(_pattrs), + pheaders(_pheaders), + req(NULL) {} + + + ~RGWStatRemoteObjCR() override { + request_cleanup(); + } + + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request(const DoutPrefixProvider *dpp) override { + req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone, + src_bucket, key, pmtime, psize, petag, pattrs, pheaders); + async_rados->queue(req); + return 0; + } + + int request_complete() override { + return req->get_ret_status(); + } +}; + +class RGWAsyncRemoveObj : public RGWAsyncRadosRequest { + const DoutPrefixProvider *dpp; + rgw::sal::RadosStore* store; + rgw_zone_id source_zone; + + std::unique_ptr bucket; + std::unique_ptr obj; + + std::string owner; + std::string owner_display_name; + bool versioned; + uint64_t versioned_epoch; + std::string marker_version_id; + + bool del_if_older; + ceph::real_time timestamp; + rgw_zone_set zones_trace; + +protected: + int _send_request(const DoutPrefixProvider *dpp) override; +public: + RGWAsyncRemoveObj(const DoutPrefixProvider *_dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn, + rgw::sal::RadosStore* _store, + const rgw_zone_id& _source_zone, + RGWBucketInfo& _bucket_info, + const rgw_obj_key& _key, + const std::string& _owner, + const std::string& _owner_display_name, + bool _versioned, + uint64_t _versioned_epoch, + bool _delete_marker, + bool _if_older, + real_time& _timestamp, + rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), dpp(_dpp), store(_store), + source_zone(_source_zone), + owner(_owner), + owner_display_name(_owner_display_name), + versioned(_versioned), + versioned_epoch(_versioned_epoch), + del_if_older(_if_older), + timestamp(_timestamp) { + if (_delete_marker) { + marker_version_id = _key.instance; + } + + if (_zones_trace) { + zones_trace = *_zones_trace; + } + store->get_bucket(nullptr, _bucket_info, &bucket); + obj = bucket->get_object(_key); + } +}; + +class RGWRemoveObjCR : public RGWSimpleCoroutine { + const DoutPrefixProvider *dpp; + CephContext *cct; + RGWAsyncRadosProcessor *async_rados; + rgw::sal::RadosStore* store; + rgw_zone_id source_zone; + + RGWBucketInfo bucket_info; + + rgw_obj_key key; + bool versioned; + uint64_t versioned_epoch; + bool delete_marker; + std::string owner; + std::string owner_display_name; + + bool del_if_older; + real_time timestamp; + + RGWAsyncRemoveObj *req; + + rgw_zone_set *zones_trace; + +public: + RGWRemoveObjCR(const DoutPrefixProvider *_dpp, RGWAsyncRadosProcessor *_async_rados, rgw::sal::RadosStore* _store, + const rgw_zone_id& _source_zone, + RGWBucketInfo& _bucket_info, + const rgw_obj_key& _key, + bool _versioned, + uint64_t _versioned_epoch, + std::string *_owner, + std::string *_owner_display_name, + bool _delete_marker, + real_time *_timestamp, + rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), dpp(_dpp), cct(_store->ctx()), + async_rados(_async_rados), store(_store), + source_zone(_source_zone), + bucket_info(_bucket_info), + key(_key), + versioned(_versioned), + versioned_epoch(_versioned_epoch), + delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) { + del_if_older = (_timestamp != NULL); + if (_timestamp) { + timestamp = *_timestamp; + } + + if (_owner) { + owner = *_owner; + } + + if (_owner_display_name) { + owner_display_name = *_owner_display_name; + } + } + ~RGWRemoveObjCR() override { + request_cleanup(); + } + + void request_cleanup() override { + if (req) { + req->finish(); + req = NULL; + } + } + + int send_request(const DoutPrefixProvider *dpp) override { + req = new RGWAsyncRemoveObj(dpp, this, stack->create_completion_notifier(), store, source_zone, bucket_info, + key, owner, owner_display_name, versioned, versioned_epoch, + delete_marker, del_if_older, timestamp, zones_trace); + async_rados->queue(req); + return 0; + } + + int request_complete() override { + return req->get_ret_status(); + } +}; + +/// \brief Collect average latency +/// +/// Used in data sync to back off on concurrency when latency of lock +/// operations rises. +/// +/// \warning This class is not thread safe. We do not use a mutex +/// because all coroutines spawned by RGWDataSyncCR share a single thread. +class LatencyMonitor { + ceph::timespan total; + std::uint64_t count = 0; + +public: + + LatencyMonitor() = default; + void add_latency(ceph::timespan latency) { + total += latency; + ++count; + } + + ceph::timespan avg_latency() { + using namespace std::literals; + return count == 0 ? 0s : total / count; + } +}; + +class RGWContinuousLeaseCR : public RGWCoroutine { + RGWAsyncRadosProcessor* async_rados; + rgw::sal::RadosStore* store; + + const rgw_raw_obj obj; + + const std::string lock_name; + const std::string cookie{RGWSimpleRadosLockCR::gen_random_cookie(cct)}; + + int interval; + bool going_down{false}; + bool locked{false}; + + const ceph::timespan interval_tolerance; + const ceph::timespan ts_interval; + + RGWCoroutine* caller; + + bool aborted{false}; + + ceph::coarse_mono_time last_renew_try_time; + ceph::coarse_mono_time current_time; + + LatencyMonitor* latency; + +public: + RGWContinuousLeaseCR(RGWAsyncRadosProcessor* async_rados, + rgw::sal::RadosStore* _store, + rgw_raw_obj obj, std::string lock_name, + int interval, RGWCoroutine* caller, + LatencyMonitor* const latency) + : RGWCoroutine(_store->ctx()), async_rados(async_rados), store(_store), + obj(std::move(obj)), lock_name(std::move(lock_name)), + interval(interval), interval_tolerance(ceph::make_timespan(9*interval/10)), + ts_interval(ceph::make_timespan(interval)), caller(caller), latency(latency) + {} + + virtual ~RGWContinuousLeaseCR() override; + + int operate(const DoutPrefixProvider *dpp) override; + + bool is_locked() const { + if (ceph::coarse_mono_clock::now() - last_renew_try_time > ts_interval) { + return false; + } + return locked; + } + + void set_locked(bool status) { + locked = status; + } + + void go_down() { + going_down = true; + wakeup(); + } + + void abort() { + aborted = true; + } +}; + +class RGWRadosTimelogAddCR : public RGWSimpleCoroutine { + const DoutPrefixProvider *dpp; + rgw::sal::RadosStore* store; + std::list entries; + + std::string oid; + + boost::intrusive_ptr cn; + +public: + RGWRadosTimelogAddCR(const DoutPrefixProvider *dpp, rgw::sal::RadosStore* _store, const std::string& _oid, + const cls_log_entry& entry); + + int send_request(const DoutPrefixProvider *dpp) override; + int request_complete() override; +}; + +class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine { + const DoutPrefixProvider *dpp; + rgw::sal::RadosStore* store; + boost::intrusive_ptr cn; + protected: + std::string oid; + real_time start_time; + real_time end_time; + std::string from_marker; + std::string to_marker; + + public: + RGWRadosTimelogTrimCR(const DoutPrefixProvider *dpp, + rgw::sal::RadosStore* store, const std::string& oid, + const real_time& start_time, const real_time& end_time, + const std::string& from_marker, + const std::string& to_marker); + + int send_request(const DoutPrefixProvider *dpp) override; + int request_complete() override; +}; + +// wrapper to update last_trim_marker on success +class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR { + CephContext *cct; + std::string *last_trim_marker; + public: + static constexpr const char* max_marker = "99999999"; + + RGWSyncLogTrimCR(const DoutPrefixProvider *dpp, + rgw::sal::RadosStore* store, const std::string& oid, + const std::string& to_marker, std::string *last_trim_marker); + int request_complete() override; +}; + +class RGWAsyncStatObj : public RGWAsyncRadosRequest { + const DoutPrefixProvider *dpp; + rgw::sal::RadosStore* store; + RGWBucketInfo bucket_info; + rgw_obj obj; + uint64_t *psize; + real_time *pmtime; + uint64_t *pepoch; + RGWObjVersionTracker *objv_tracker; +protected: + int _send_request(const DoutPrefixProvider *dpp) override; +public: + RGWAsyncStatObj(const DoutPrefixProvider *dpp, RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RadosStore* store, + const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr, + real_time *pmtime = nullptr, uint64_t *pepoch = nullptr, + RGWObjVersionTracker *objv_tracker = nullptr) + : RGWAsyncRadosRequest(caller, cn), dpp(dpp), store(store), obj(obj), psize(psize), + pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {} +}; + +class RGWStatObjCR : public RGWSimpleCoroutine { + const DoutPrefixProvider *dpp; + rgw::sal::RadosStore* store; + RGWAsyncRadosProcessor *async_rados; + RGWBucketInfo bucket_info; + rgw_obj obj; + uint64_t *psize; + real_time *pmtime; + uint64_t *pepoch; + RGWObjVersionTracker *objv_tracker; + RGWAsyncStatObj *req = nullptr; + public: + RGWStatObjCR(const DoutPrefixProvider *dpp, RGWAsyncRadosProcessor *async_rados, rgw::sal::RadosStore* store, + const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr, + real_time* pmtime = nullptr, uint64_t *pepoch = nullptr, + RGWObjVersionTracker *objv_tracker = nullptr); + ~RGWStatObjCR() override { + request_cleanup(); + } + void request_cleanup() override; + + int send_request(const DoutPrefixProvider *dpp) override; + int request_complete() override; +}; + +/// coroutine wrapper for IoCtx::aio_notify() +class RGWRadosNotifyCR : public RGWSimpleCoroutine { + rgw::sal::RadosStore* const store; + const rgw_raw_obj obj; + bufferlist request; + const uint64_t timeout_ms; + bufferlist *response; + rgw_rados_ref ref; + boost::intrusive_ptr cn; + +public: + RGWRadosNotifyCR(rgw::sal::RadosStore* store, const rgw_raw_obj& obj, + bufferlist& request, uint64_t timeout_ms, + bufferlist *response); + + int send_request(const DoutPrefixProvider *dpp) override; + int request_complete() override; +}; + +class RGWDataPostNotifyCR : public RGWCoroutine { + RGWRados *store; + RGWHTTPManager& http_manager; + bc::flat_map >& shards; + const char *source_zone; + RGWRESTConn *conn; + +public: + RGWDataPostNotifyCR(RGWRados *_store, RGWHTTPManager& _http_manager, bc::flat_map >& _shards, const char *_zone, RGWRESTConn *_conn) + : RGWCoroutine(_store->ctx()), store(_store), http_manager(_http_manager), + shards(_shards), source_zone(_zone), conn(_conn) {} + + int operate(const DoutPrefixProvider* dpp) override; +}; + -- cgit v1.2.3