summaryrefslogtreecommitdiffstats
path: root/src/rgw/driver/rados/rgw_notify.h
blob: 9269611e4a6f3a1ae539ad6c2c54a2d72044bdf0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
// vim: ts=8 sw=2 smarttab ft=cpp

#pragma once

#include <string>
#include "common/ceph_time.h"
#include "include/common_fwd.h"
#include "rgw_notify_event_type.h"
#include "common/async/yield_context.h"
#include "cls/2pc_queue/cls_2pc_queue_types.h"
#include "rgw_pubsub.h"

// forward declarations
namespace rgw::sal {
    class RadosStore;
    class RGWObject;
}

class RGWRados;
struct rgw_obj_key;

namespace rgw::notify {

// initialize the notification manager
// notification manager is dequeing the 2-phase-commit queues
// and send the notifications to the endpoints
bool init(CephContext* cct, rgw::sal::RadosStore* store, const DoutPrefixProvider *dpp);

// shutdown the notification manager
void shutdown();

// create persistent delivery queue for a topic (endpoint)
// this operation also add a topic name to the common (to all RGWs) list of all topics
int add_persistent_topic(const std::string& topic_name, optional_yield y);

// remove persistent delivery queue for a topic (endpoint)
// this operation also remove the topic name from the common (to all RGWs) list of all topics
int remove_persistent_topic(const std::string& topic_name, optional_yield y);

// same as the above, expect you need to provide the IoCtx, the above uses rgw::notify::Manager::rados_ioctx
int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_name, optional_yield y);

// struct holding reservation information
// populated in the publish_reserve call
// then used to commit or abort the reservation
struct reservation_t {
  struct topic_t {
    topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg,
	    cls_2pc_reservation::id_t _res_id) :
      configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {}

    const std::string configurationId;
    const rgw_pubsub_topic cfg;
    // res_id is reset after topic is committed/aborted
    cls_2pc_reservation::id_t res_id;
  };

  const DoutPrefixProvider* const dpp;
  std::vector<topic_t> topics;
  rgw::sal::RadosStore* const store;
  const req_state* const s;
  size_t size;
  rgw::sal::Object* const object;
  rgw::sal::Object* const src_object; // may differ from object
  rgw::sal::Bucket* const bucket;
  const std::string* const object_name;
  boost::optional<const RGWObjTags&> tagset;
  meta_map_t x_meta_map; // metadata cached by value
  bool metadata_fetched_from_attributes;
  const std::string user_id;
  const std::string user_tenant;
  const std::string req_id;
  optional_yield yield;

  /* ctor for rgw_op callers */
  reservation_t(const DoutPrefixProvider* _dpp,
		rgw::sal::RadosStore* _store,
		const req_state* _s,
		rgw::sal::Object* _object,
		rgw::sal::Object* _src_object,
		const std::string* _object_name,
		optional_yield y);

  /* ctor for non-request caller (e.g., lifecycle) */
  reservation_t(const DoutPrefixProvider* _dpp,
		rgw::sal::RadosStore* _store,
		rgw::sal::Object* _object,
		rgw::sal::Object* _src_object,
		rgw::sal::Bucket* _bucket,
		const std::string& _user_id,
		const std::string& _user_tenant,
		const std::string& _req_id,
		optional_yield y);

  // dtor doing resource leak guarding
  // aborting the reservation if not already committed or aborted
  ~reservation_t();
};

// create a reservation on the 2-phase-commit queue
  int publish_reserve(const DoutPrefixProvider *dpp,
		      EventType event_type,
		      reservation_t& reservation,
		      const RGWObjTags* req_tags);

// commit the reservation to the queue
int publish_commit(rgw::sal::Object* obj,
        uint64_t size,
        const ceph::real_time& mtime, 
        const std::string& etag, 
        const std::string& version,
        EventType event_type,
        reservation_t& reservation,
        const DoutPrefixProvider *dpp);

// cancel the reservation
int publish_abort(reservation_t& reservation);

}