summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_object_expirer_core.h
blob: a17f447c4aae32e0ae1bd47b280674adca11c9a2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp

#ifndef CEPH_OBJEXP_H
#define CEPH_OBJEXP_H

#include <atomic>
#include <string>
#include <cerrno>
#include <sstream>
#include <iostream>

#include "auth/Crypto.h"

#include "common/armor.h"
#include "common/ceph_json.h"
#include "common/config.h"
#include "common/ceph_argparse.h"
#include "common/Formatter.h"
#include "common/errno.h"

#include "common/ceph_mutex.h"
#include "common/Cond.h"
#include "common/Thread.h"

#include "global/global_init.h"

#include "include/common_fwd.h"
#include "include/utime.h"
#include "include/str_list.h"

#include "rgw_sal.h"
#include "rgw_sal_rados.h"

class RGWSI_RADOS;
class RGWSI_Zone;
class RGWBucketInfo;
class cls_timeindex_entry;

class RGWObjExpStore {
  CephContext *cct;
  RGWSI_RADOS *rados_svc;
  RGWSI_Zone *zone_svc;
public:
  RGWObjExpStore(CephContext *_cct, RGWSI_RADOS *_rados_svc, RGWSI_Zone *_zone_svc) : cct(_cct),
                                                                                      rados_svc(_rados_svc),
                                                                                      zone_svc(_zone_svc) {}

  int objexp_hint_add(const DoutPrefixProvider *dpp, 
                      const ceph::real_time& delete_at,
                      const string& tenant_name,
                      const string& bucket_name,
                      const string& bucket_id,
                      const rgw_obj_index_key& obj_key);

  int objexp_hint_list(const DoutPrefixProvider *dpp, 
                       const string& oid,
                       const ceph::real_time& start_time,
                       const ceph::real_time& end_time,
                       const int max_entries,
                       const string& marker,
                       list<cls_timeindex_entry>& entries, /* out */
                       string *out_marker,                 /* out */
                       bool *truncated);                   /* out */

  int objexp_hint_trim(const DoutPrefixProvider *dpp, 
                       const string& oid,
                       const ceph::real_time& start_time,
                       const ceph::real_time& end_time,
                       const string& from_marker,
                       const string& to_marker);
};

class RGWObjectExpirer {
protected:
  rgw::sal::RGWRadosStore *store;
  RGWObjExpStore exp_store;

  int init_bucket_info(const std::string& tenant_name,
                       const std::string& bucket_name,
                       const std::string& bucket_id,
                       RGWBucketInfo& bucket_info);

  class OEWorker : public Thread, public DoutPrefixProvider {
    CephContext *cct;
    RGWObjectExpirer *oe;
    ceph::mutex lock = ceph::make_mutex("OEWorker");
    ceph::condition_variable cond;

  public:
    OEWorker(CephContext * const cct,
             RGWObjectExpirer * const oe)
      : cct(cct),
        oe(oe) {
    }

    void *entry() override;
    void stop();

    CephContext *get_cct() const override;
    unsigned get_subsys() const;
    std::ostream& gen_prefix(std::ostream& out) const;
  };

  OEWorker *worker{nullptr};
  std::atomic<bool> down_flag = { false };

public:
  explicit RGWObjectExpirer(rgw::sal::RGWRadosStore *_store)
    : store(_store),
      exp_store(_store->getRados()->ctx(), _store->svc()->rados, _store->svc()->zone),
      worker(NULL) {
  }
  ~RGWObjectExpirer() {
    stop_processor();
  }

  int hint_add(const DoutPrefixProvider *dpp, 
               const ceph::real_time& delete_at,
               const string& tenant_name,
               const string& bucket_name,
               const string& bucket_id,
               const rgw_obj_index_key& obj_key) {
    return exp_store.objexp_hint_add(dpp, delete_at, tenant_name, bucket_name,
                                     bucket_id, obj_key);
  }

  int garbage_single_object(const DoutPrefixProvider *dpp, objexp_hint_entry& hint);

  void garbage_chunk(const DoutPrefixProvider *dpp, 
                     std::list<cls_timeindex_entry>& entries, /* in  */
                     bool& need_trim);                        /* out */

  void trim_chunk(const DoutPrefixProvider *dpp, 
                  const std::string& shard,
                  const utime_t& from,
                  const utime_t& to,
                  const string& from_marker,
                  const string& to_marker);

  bool process_single_shard(const DoutPrefixProvider *dpp, 
                            const std::string& shard,
                            const utime_t& last_run,
                            const utime_t& round_start);

  bool inspect_all_shards(const DoutPrefixProvider *dpp, 
                          const utime_t& last_run,
                          const utime_t& round_start);

  bool going_down();
  void start_processor();
  void stop_processor();
};
#endif /* CEPH_OBJEXP_H */