summaryrefslogtreecommitdiffstats
path: root/src/journal/FutureImpl.h
blob: 241a09709ffebaf5df9e74d666d890719db4dc66 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#ifndef CEPH_JOURNAL_FUTURE_IMPL_H
#define CEPH_JOURNAL_FUTURE_IMPL_H

#include "include/int_types.h"
#include "common/RefCountedObj.h"
#include "include/Context.h"
#include "journal/Future.h"
#include <list>
#include <map>
#include <boost/noncopyable.hpp>
#include "include/ceph_assert.h"

class Context;

namespace journal {

class FutureImpl : public RefCountedObject, boost::noncopyable {
public:
  struct FlushHandler {
    using ref = std::shared_ptr<FlushHandler>;
    virtual void flush(const ceph::ref_t<FutureImpl> &future) = 0;
    virtual ~FlushHandler() = default;
  };

  void init(const ceph::ref_t<FutureImpl> &prev_future);

  inline uint64_t get_tag_tid() const {
    return m_tag_tid;
  }
  inline uint64_t get_entry_tid() const {
    return m_entry_tid;
  }
  inline uint64_t get_commit_tid() const {
    return m_commit_tid;
  }

  void flush(Context *on_safe = NULL);
  void wait(Context *on_safe);

  bool is_complete() const;
  int get_return_value() const;

  inline bool is_flush_in_progress() const {
    std::lock_guard locker{m_lock};
    return (m_flush_state == FLUSH_STATE_IN_PROGRESS);
  }
  inline void set_flush_in_progress() {
    auto h = std::move(m_flush_handler);
    ceph_assert(h);
    std::lock_guard locker{m_lock};
    m_flush_state = FLUSH_STATE_IN_PROGRESS;
  }

  bool attach(FlushHandler::ref flush_handler);
  inline void detach() {
    m_flush_handler.reset();
  }
  inline FlushHandler::ref get_flush_handler() const {
    return m_flush_handler;
  }

  void safe(int r);

private:
  friend std::ostream &operator<<(std::ostream &, const FutureImpl &);

  typedef std::map<FlushHandler::ref, ceph::ref_t<FutureImpl>> FlushHandlers;
  typedef std::list<Context *> Contexts;

  enum FlushState {
    FLUSH_STATE_NONE,
    FLUSH_STATE_REQUESTED,
    FLUSH_STATE_IN_PROGRESS
  };

  struct C_ConsistentAck : public Context {
    ceph::ref_t<FutureImpl> future;
    C_ConsistentAck(ceph::ref_t<FutureImpl> _future) : future(std::move(_future)) {}
    void complete(int r) override {
      future->consistent(r);
      future.reset();
    }
    void finish(int r) override {}
  };

  FRIEND_MAKE_REF(FutureImpl);
  FutureImpl(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid);
  ~FutureImpl() override = default;

  uint64_t m_tag_tid;
  uint64_t m_entry_tid;
  uint64_t m_commit_tid;

  mutable ceph::mutex m_lock = ceph::make_mutex("FutureImpl::m_lock", false);
  ceph::ref_t<FutureImpl> m_prev_future;
  bool m_safe = false;
  bool m_consistent = false;
  int m_return_value = 0;

  FlushHandler::ref m_flush_handler;
  FlushState m_flush_state = FLUSH_STATE_NONE;

  C_ConsistentAck m_consistent_ack;
  Contexts m_contexts;

  ceph::ref_t<FutureImpl> prepare_flush(FlushHandlers *flush_handlers);
  ceph::ref_t<FutureImpl> prepare_flush(FlushHandlers *flush_handlers, ceph::mutex &lock);

  void consistent(int r);
  void finish_unlock();
};

std::ostream &operator<<(std::ostream &os, const FutureImpl &future);

} // namespace journal

using journal::operator<<;

#endif // CEPH_JOURNAL_FUTURE_IMPL_H