summaryrefslogtreecommitdiffstats
path: root/src/journal/JournalRecorder.h
blob: 9d8ea6c10f3d924a73dc78a72b6d7b82ace440b7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
#define CEPH_JOURNAL_JOURNAL_RECORDER_H

#include "include/int_types.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "common/ceph_mutex.h"
#include "common/containers.h"
#include "common/Timer.h"
#include "journal/Future.h"
#include "journal/FutureImpl.h"
#include "journal/JournalMetadata.h"
#include "journal/ObjectRecorder.h"
#include <map>
#include <string>

namespace journal {

class JournalRecorder {
public:
  JournalRecorder(librados::IoCtx &ioctx, std::string_view object_oid_prefix,
                  ceph::ref_t<JournalMetadata> journal_metadata,
                  uint64_t max_in_flight_appends);
  ~JournalRecorder();

  void shut_down(Context *on_safe);

  void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
                                double flush_age);

  Future append(uint64_t tag_tid, const bufferlist &bl);
  void flush(Context *on_safe);

  ceph::ref_t<ObjectRecorder> get_object(uint8_t splay_offset);

private:
  typedef std::map<uint8_t, ceph::ref_t<ObjectRecorder>> ObjectRecorderPtrs;
  typedef std::vector<std::unique_lock<ceph::mutex>> Lockers;

  struct Listener : public JournalMetadataListener {
    JournalRecorder *journal_recorder;

    Listener(JournalRecorder *_journal_recorder)
      : journal_recorder(_journal_recorder) {}

    void handle_update(JournalMetadata *) override {
      journal_recorder->handle_update();
    }
  };

  struct ObjectHandler : public ObjectRecorder::Handler {
    JournalRecorder *journal_recorder;

    ObjectHandler(JournalRecorder *_journal_recorder)
      : journal_recorder(_journal_recorder) {
    }

    void closed(ObjectRecorder *object_recorder) override {
      journal_recorder->handle_closed(object_recorder);
    }
    void overflow(ObjectRecorder *object_recorder) override {
      journal_recorder->handle_overflow(object_recorder);
    }
  };

  struct C_AdvanceObjectSet : public Context {
    JournalRecorder *journal_recorder;

    C_AdvanceObjectSet(JournalRecorder *_journal_recorder)
      : journal_recorder(_journal_recorder) {
    }
    void finish(int r) override {
      journal_recorder->handle_advance_object_set(r);
    }
  };

  librados::IoCtx m_ioctx;
  CephContext *m_cct = nullptr;
  std::string m_object_oid_prefix;

  ceph::ref_t<JournalMetadata> m_journal_metadata;

  uint32_t m_flush_interval = 0;
  uint64_t m_flush_bytes = 0;
  double m_flush_age = 0;
  uint64_t m_max_in_flight_appends;

  Listener m_listener;
  ObjectHandler m_object_handler;

  ceph::mutex m_lock = ceph::make_mutex("JournalerRecorder::m_lock");

  uint32_t m_in_flight_advance_sets = 0;
  uint32_t m_in_flight_object_closes = 0;
  uint64_t m_current_set;
  ObjectRecorderPtrs m_object_ptrs;
  ceph::containers::tiny_vector<ceph::mutex> m_object_locks;

  ceph::ref_t<FutureImpl> m_prev_future;

  Context *m_on_object_set_advanced = nullptr;

  void open_object_set();
  bool close_object_set(uint64_t active_set);

  void advance_object_set();
  void handle_advance_object_set(int r);

  void close_and_advance_object_set(uint64_t object_set);

  ceph::ref_t<ObjectRecorder> create_object_recorder(uint64_t object_number,
                                           ceph::mutex* lock);
  bool create_next_object_recorder(ceph::ref_t<ObjectRecorder> object_recorder);

  void handle_update();

  void handle_closed(ObjectRecorder *object_recorder);
  void handle_overflow(ObjectRecorder *object_recorder);

  Lockers lock_object_recorders();
};

} // namespace journal

#endif // CEPH_JOURNAL_JOURNAL_RECORDER_H