summaryrefslogtreecommitdiffstats
path: root/src/journal/ObjectRecorder.h
blob: ff00e0a0a1f031da668e03994d6e7e7b7315db68 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
#define CEPH_JOURNAL_OBJECT_RECORDER_H

#include "include/utime.h"
#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/RefCountedObj.h"
#include "common/WorkQueue.h"
#include "journal/FutureImpl.h"
#include <list>
#include <map>
#include <set>
#include <boost/intrusive_ptr.hpp>
#include <boost/noncopyable.hpp>
#include "include/ceph_assert.h"

class SafeTimer;

namespace journal {

class ObjectRecorder;
typedef boost::intrusive_ptr<ObjectRecorder> ObjectRecorderPtr;

typedef std::pair<FutureImplPtr, bufferlist> AppendBuffer;
typedef std::list<AppendBuffer> AppendBuffers;

class ObjectRecorder : public RefCountedObject, boost::noncopyable {
public:
  struct Handler {
    virtual ~Handler() {
    }
    virtual void closed(ObjectRecorder *object_recorder) = 0;
    virtual void overflow(ObjectRecorder *object_recorder) = 0;
  };

  ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
                 uint64_t object_number, std::shared_ptr<Mutex> lock,
                 ContextWQ *work_queue, Handler *handler, uint8_t order,
                 int32_t max_in_flight_appends);
  ~ObjectRecorder() override;

  void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
                                double flush_age);

  inline uint64_t get_object_number() const {
    return m_object_number;
  }
  inline const std::string &get_oid() const {
    return m_oid;
  }

  bool append(AppendBuffers &&append_buffers);
  void flush(Context *on_safe);
  void flush(const FutureImplPtr &future);

  void claim_append_buffers(AppendBuffers *append_buffers);

  bool is_closed() const {
    ceph_assert(m_lock->is_locked());
    return (m_object_closed && m_in_flight_appends.empty());
  }
  bool close();

  inline CephContext *cct() const {
    return m_cct;
  }

  inline size_t get_pending_appends() const {
    Mutex::Locker locker(*m_lock);
    return m_pending_buffers.size();
  }

private:
  typedef std::set<uint64_t> InFlightTids;
  typedef std::map<uint64_t, AppendBuffers> InFlightAppends;

  struct FlushHandler : public FutureImpl::FlushHandler {
    ObjectRecorder *object_recorder;
    FlushHandler(ObjectRecorder *o) : object_recorder(o) {}
    void get() override {
      object_recorder->get();
    }
    void put() override {
      object_recorder->put();
    }
    void flush(const FutureImplPtr &future) override {
      object_recorder->flush(future);
    }
  };
  struct C_AppendFlush : public Context {
    ObjectRecorder *object_recorder;
    uint64_t tid;
    C_AppendFlush(ObjectRecorder *o, uint64_t _tid)
        : object_recorder(o), tid(_tid) {
      object_recorder->get();
    }
    void finish(int r) override {
      object_recorder->handle_append_flushed(tid, r);
      object_recorder->put();
    }
  };

  librados::IoCtx m_ioctx;
  std::string m_oid;
  uint64_t m_object_number;
  CephContext *m_cct;

  ContextWQ *m_op_work_queue;

  Handler *m_handler;

  uint8_t m_order;
  uint64_t m_soft_max_size;

  uint32_t m_flush_interval = 0;
  uint64_t m_flush_bytes = 0;
  double m_flush_age = 0;
  int32_t m_max_in_flight_appends;

  FlushHandler m_flush_handler;

  mutable std::shared_ptr<Mutex> m_lock;
  AppendBuffers m_pending_buffers;
  uint64_t m_pending_bytes = 0;
  utime_t m_last_flush_time;

  uint64_t m_append_tid;

  InFlightTids m_in_flight_tids;
  InFlightAppends m_in_flight_appends;
  uint64_t m_object_bytes = 0;
  bool m_overflowed;
  bool m_object_closed;

  bufferlist m_prefetch_bl;

  bool m_in_flight_flushes;
  Cond m_in_flight_flushes_cond;
  uint64_t m_in_flight_bytes = 0;

  bool send_appends(bool force, FutureImplPtr flush_sentinal);
  void handle_append_flushed(uint64_t tid, int r);
  void append_overflowed();

  void notify_handler_unlock();
};

} // namespace journal

#endif // CEPH_JOURNAL_OBJECT_RECORDER_H