1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#pragma once
#include <string>
#include "common/ceph_time.h"
#include "include/common_fwd.h"
#include "rgw_notify_event_type.h"
#include "common/async/yield_context.h"
#include "cls/2pc_queue/cls_2pc_queue_types.h"
#include "rgw_pubsub.h"
// forward declarations
namespace rgw::sal {
class RGWRadosStore;
class RGWObject;
}
class RGWRados;
struct rgw_obj_key;
namespace rgw::notify {
// initialize the notification manager
// notification manager is dequeing the 2-phase-commit queues
// and send the notifications to the endpoints
bool init(CephContext* cct, rgw::sal::RGWRadosStore* store, const DoutPrefixProvider *dpp);
// shutdown the notification manager
void shutdown();
// create persistent delivery queue for a topic (endpoint)
// this operation also add a topic name to the common (to all RGWs) list of all topics
int add_persistent_topic(const std::string& topic_name, optional_yield y);
// remove persistent delivery queue for a topic (endpoint)
// this operation also remove the topic name from the common (to all RGWs) list of all topics
int remove_persistent_topic(const std::string& topic_name, optional_yield y);
// struct holding reservation information
// populated in the publish_reserve call
// then used to commit or abort the reservation
struct reservation_t {
struct topic_t {
topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg, cls_2pc_reservation::id_t _res_id) :
configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {}
const std::string configurationId;
const rgw_pubsub_topic cfg;
// res_id is reset after topic is committed/aborted
cls_2pc_reservation::id_t res_id;
};
const DoutPrefixProvider *dpp;
std::vector<topic_t> topics;
rgw::sal::RGWRadosStore* const store;
const req_state* const s;
size_t size;
rgw::sal::RGWObject* const object;
const std::string* const object_name;
KeyValueMap cached_metadata;
reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RGWRadosStore* _store, const req_state* _s,
rgw::sal::RGWObject* _object, const std::string* _object_name=nullptr) :
dpp(_dpp), store(_store), s(_s), object(_object), object_name(_object_name) {}
// dtor doing resource leak guarding
// aborting the reservation if not already committed or aborted
~reservation_t();
};
// create a reservation on the 2-phase-commit queue
int publish_reserve(const DoutPrefixProvider *dpp,
EventType event_type,
reservation_t& reservation,
const RGWObjTags* req_tags);
// commit the reservation to the queue
int publish_commit(rgw::sal::RGWObject* obj,
uint64_t size,
const ceph::real_time& mtime,
const std::string& etag,
EventType event_type,
reservation_t& reservation,
const DoutPrefixProvider *dpp);
// cancel the reservation
int publish_abort(const DoutPrefixProvider *dpp, reservation_t& reservation);
}
|