summaryrefslogtreecommitdiffstats
path: root/src/journal/Journaler.h
blob: f42513c5d67fe5fe40e0214e17f5c2e41d438500 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#ifndef CEPH_JOURNAL_JOURNALER_H
#define CEPH_JOURNAL_JOURNALER_H

#include "include/int_types.h"
#include "include/buffer_fwd.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "journal/Future.h"
#include "journal/JournalMetadataListener.h"
#include "cls/journal/cls_journal_types.h"
#include "common/Timer.h"
#include <list>
#include <map>
#include <string>
#include "include/ceph_assert.h"

class ContextWQ;
class ThreadPool;

namespace journal {

struct CacheManagerHandler;

class JournalTrimmer;
class ReplayEntry;
class ReplayHandler;
class Settings;

class Journaler {
public:
  struct Threads {
    Threads(CephContext *cct);
    ~Threads();

    ThreadPool *thread_pool = nullptr;
    ContextWQ *work_queue = nullptr;

    SafeTimer *timer;
    ceph::mutex timer_lock = ceph::make_mutex("Journaler::timer_lock");
  };

  typedef cls::journal::Tag Tag;
  typedef std::list<cls::journal::Tag> Tags;
  typedef std::set<cls::journal::Client> RegisteredClients;

  static std::string header_oid(const std::string &journal_id);
  static std::string object_oid_prefix(int pool_id,
				       const std::string &journal_id);

  Journaler(librados::IoCtx &header_ioctx, const std::string &journal_id,
	    const std::string &client_id, const Settings &settings,
            CacheManagerHandler *cache_manager_handler);
  Journaler(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock,
            librados::IoCtx &header_ioctx, const std::string &journal_id,
	    const std::string &client_id, const Settings &settings,
            CacheManagerHandler *cache_manager_handler);
  ~Journaler();

  void exists(Context *on_finish) const;
  void create(uint8_t order, uint8_t splay_width, int64_t pool_id, Context *ctx);
  void remove(bool force, Context *on_finish);

  void init(Context *on_init);
  void shut_down();
  void shut_down(Context *on_finish);

  bool is_initialized() const;

  void get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
			      int64_t *pool_id, Context *on_finish);
  void get_mutable_metadata(uint64_t *minimum_set, uint64_t *active_set,
			    RegisteredClients *clients, Context *on_finish);

  void add_listener(JournalMetadataListener *listener);
  void remove_listener(JournalMetadataListener *listener);

  int register_client(const bufferlist &data);
  void register_client(const bufferlist &data, Context *on_finish);

  int unregister_client();
  void unregister_client(Context *on_finish);

  void update_client(const bufferlist &data, Context *on_finish);
  void get_client(const std::string &client_id, cls::journal::Client *client,
                  Context *on_finish);
  int get_cached_client(const std::string &client_id,
                        cls::journal::Client *client);

  void flush_commit_position(Context *on_safe);

  void allocate_tag(const bufferlist &data, cls::journal::Tag *tag,
                    Context *on_finish);
  void allocate_tag(uint64_t tag_class, const bufferlist &data,
                    cls::journal::Tag *tag, Context *on_finish);
  void get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish);
  void get_tags(uint64_t tag_class, Tags *tags, Context *on_finish);
  void get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, Tags *tags,
                Context *on_finish);

  void start_replay(ReplayHandler* replay_handler);
  void start_live_replay(ReplayHandler* replay_handler, double interval);
  bool try_pop_front(ReplayEntry *replay_entry, uint64_t *tag_tid = nullptr);
  void stop_replay();
  void stop_replay(Context *on_finish);

  uint64_t get_max_append_size() const;
  void start_append(uint64_t max_in_flight_appends);
  void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
                                double flush_age);
  Future append(uint64_t tag_tid, const bufferlist &bl);
  void flush_append(Context *on_safe);
  void stop_append(Context *on_safe);

  void committed(const ReplayEntry &replay_entry);
  void committed(const Future &future);

  void get_metadata(uint8_t *order, uint8_t *splay_width, int64_t *pool_id);

private:
  struct C_InitJournaler : public Context {
    Journaler *journaler;
    Context *on_safe;
    C_InitJournaler(Journaler *_journaler, Context *_on_safe)
      : journaler(_journaler), on_safe(_on_safe) {
    }
    void finish(int r) override {
      if (r == 0) {
	r = journaler->init_complete();
      }
      on_safe->complete(r);
    }
  };

  Threads *m_threads = nullptr;

  mutable librados::IoCtx m_header_ioctx;
  librados::IoCtx m_data_ioctx;
  CephContext *m_cct;
  std::string m_client_id;
  CacheManagerHandler *m_cache_manager_handler;

  std::string m_header_oid;
  std::string m_object_oid_prefix;

  bool m_initialized = false;
  ceph::ref_t<class JournalMetadata> m_metadata;
  std::unique_ptr<class JournalPlayer> m_player;
  std::unique_ptr<class JournalRecorder> m_recorder;
  JournalTrimmer *m_trimmer = nullptr;

  void set_up(ContextWQ *work_queue, SafeTimer *timer, ceph::mutex *timer_lock,
              librados::IoCtx &header_ioctx, const std::string &journal_id,
              const Settings &settings);

  int init_complete();
  void create_player(ReplayHandler* replay_handler);

  friend std::ostream &operator<<(std::ostream &os,
				  const Journaler &journaler);
};

std::ostream &operator<<(std::ostream &os,
			 const Journaler &journaler);

} // namespace journal

#endif // CEPH_JOURNAL_JOURNALER_H