summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_notify.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_notify.h')
-rw-r--r--src/rgw/rgw_notify.h92
1 files changed, 92 insertions, 0 deletions
diff --git a/src/rgw/rgw_notify.h b/src/rgw/rgw_notify.h
new file mode 100644
index 000000000..4aa8f0855
--- /dev/null
+++ b/src/rgw/rgw_notify.h
@@ -0,0 +1,92 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <string>
+#include "common/ceph_time.h"
+#include "include/common_fwd.h"
+#include "rgw_notify_event_type.h"
+#include "common/async/yield_context.h"
+#include "cls/2pc_queue/cls_2pc_queue_types.h"
+#include "rgw_pubsub.h"
+
+// forward declarations
+namespace rgw::sal {
+ class RGWRadosStore;
+ class RGWObject;
+}
+
+class RGWRados;
+struct rgw_obj_key;
+
+namespace rgw::notify {
+
+// initialize the notification manager
+// notification manager is dequeing the 2-phase-commit queues
+// and send the notifications to the endpoints
+bool init(CephContext* cct, rgw::sal::RGWRadosStore* store, const DoutPrefixProvider *dpp);
+
+// shutdown the notification manager
+void shutdown();
+
+// create persistent delivery queue for a topic (endpoint)
+// this operation also add a topic name to the common (to all RGWs) list of all topics
+int add_persistent_topic(const std::string& topic_name, optional_yield y);
+
+// remove persistent delivery queue for a topic (endpoint)
+// this operation also remove the topic name from the common (to all RGWs) list of all topics
+int remove_persistent_topic(const std::string& topic_name, optional_yield y);
+
+// struct holding reservation information
+// populated in the publish_reserve call
+// then used to commit or abort the reservation
+struct reservation_t {
+ struct topic_t {
+ topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg, cls_2pc_reservation::id_t _res_id) :
+ configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {}
+
+ const std::string configurationId;
+ const rgw_pubsub_topic cfg;
+ // res_id is reset after topic is committed/aborted
+ cls_2pc_reservation::id_t res_id;
+ };
+
+ const DoutPrefixProvider *dpp;
+ std::vector<topic_t> topics;
+ rgw::sal::RGWRadosStore* const store;
+ const req_state* const s;
+ size_t size;
+ rgw::sal::RGWObject* const object;
+ const std::string* const object_name;
+ KeyValueMap cached_metadata;
+
+ reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RGWRadosStore* _store, const req_state* _s,
+ rgw::sal::RGWObject* _object, const std::string* _object_name=nullptr) :
+ dpp(_dpp), store(_store), s(_s), object(_object), object_name(_object_name) {}
+
+ // dtor doing resource leak guarding
+ // aborting the reservation if not already committed or aborted
+ ~reservation_t();
+};
+
+// create a reservation on the 2-phase-commit queue
+int publish_reserve(const DoutPrefixProvider *dpp,
+ EventType event_type,
+ reservation_t& reservation,
+ const RGWObjTags* req_tags);
+
+// commit the reservation to the queue
+int publish_commit(rgw::sal::RGWObject* obj,
+ uint64_t size,
+ const ceph::real_time& mtime,
+ const std::string& etag,
+ EventType event_type,
+ reservation_t& reservation,
+ const DoutPrefixProvider *dpp);
+
+// cancel the reservation
+int publish_abort(const DoutPrefixProvider *dpp, reservation_t& reservation);
+
+}
+