summaryrefslogtreecommitdiffstats
path: root/src/journal
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/journal
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/journal')
-rw-r--r--src/journal/CMakeLists.txt14
-rw-r--r--src/journal/Entry.cc152
-rw-r--r--src/journal/Entry.h62
-rw-r--r--src/journal/Future.cc40
-rw-r--r--src/journal/Future.h58
-rw-r--r--src/journal/FutureImpl.cc174
-rw-r--r--src/journal/FutureImpl.h132
-rw-r--r--src/journal/JournalMetadata.cc1177
-rw-r--r--src/journal/JournalMetadata.h379
-rw-r--r--src/journal/JournalMetadataListener.h30
-rw-r--r--src/journal/JournalPlayer.cc803
-rw-r--r--src/journal/JournalPlayer.h157
-rw-r--r--src/journal/JournalRecorder.cc409
-rw-r--r--src/journal/JournalRecorder.h137
-rw-r--r--src/journal/JournalTrimmer.cc248
-rw-r--r--src/journal/JournalTrimmer.h94
-rw-r--r--src/journal/Journaler.cc468
-rw-r--r--src/journal/Journaler.h168
-rw-r--r--src/journal/ObjectPlayer.cc313
-rw-r--r--src/journal/ObjectPlayer.h141
-rw-r--r--src/journal/ObjectRecorder.cc361
-rw-r--r--src/journal/ObjectRecorder.h155
-rw-r--r--src/journal/ReplayEntry.h34
-rw-r--r--src/journal/ReplayHandler.h21
-rw-r--r--src/journal/Settings.h22
-rw-r--r--src/journal/Utils.cc25
-rw-r--r--src/journal/Utils.h54
27 files changed, 5828 insertions, 0 deletions
diff --git a/src/journal/CMakeLists.txt b/src/journal/CMakeLists.txt
new file mode 100644
index 00000000..3632c105
--- /dev/null
+++ b/src/journal/CMakeLists.txt
@@ -0,0 +1,14 @@
+set(journal_srcs
+ Entry.cc
+ Future.cc
+ FutureImpl.cc
+ Journaler.cc
+ JournalMetadata.cc
+ JournalPlayer.cc
+ JournalRecorder.cc
+ JournalTrimmer.cc
+ ObjectPlayer.cc
+ ObjectRecorder.cc
+ Utils.cc)
+add_library(journal STATIC ${journal_srcs})
+target_link_libraries(journal cls_journal_client)
diff --git a/src/journal/Entry.cc b/src/journal/Entry.cc
new file mode 100644
index 00000000..48648a87
--- /dev/null
+++ b/src/journal/Entry.cc
@@ -0,0 +1,152 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Entry.h"
+#include "include/encoding.h"
+#include "include/stringify.h"
+#include "common/Formatter.h"
+#include <strstream>
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "Entry: " << this << " "
+
+namespace journal {
+
+namespace {
+
+const uint32_t HEADER_FIXED_SIZE = 25; /// preamble, version, entry tid, tag id
+const uint32_t REMAINDER_FIXED_SIZE = 8; /// data size, crc
+
+} // anonymous namespace
+
+uint32_t Entry::get_fixed_size() {
+ return HEADER_FIXED_SIZE + REMAINDER_FIXED_SIZE;
+}
+
+void Entry::encode(bufferlist &bl) const {
+ using ceph::encode;
+ bufferlist data_bl;
+ encode(preamble, data_bl);
+ encode(static_cast<uint8_t>(1), data_bl);
+ encode(m_entry_tid, data_bl);
+ encode(m_tag_tid, data_bl);
+ encode(m_data, data_bl);
+
+ uint32_t crc = data_bl.crc32c(0);
+ uint32_t bl_offset = bl.length();
+ bl.claim_append(data_bl);
+ encode(crc, bl);
+ ceph_assert(get_fixed_size() + m_data.length() + bl_offset == bl.length());
+}
+
+void Entry::decode(bufferlist::const_iterator &iter) {
+ using ceph::decode;
+ uint32_t start_offset = iter.get_off();
+ uint64_t bl_preamble;
+ decode(bl_preamble, iter);
+ if (bl_preamble != preamble) {
+ throw buffer::malformed_input("incorrect preamble: " +
+ stringify(bl_preamble));
+ }
+
+ uint8_t version;
+ decode(version, iter);
+ if (version != 1) {
+ throw buffer::malformed_input("unknown version: " + stringify(version));
+ }
+
+ decode(m_entry_tid, iter);
+ decode(m_tag_tid, iter);
+ decode(m_data, iter);
+ uint32_t end_offset = iter.get_off();
+
+ uint32_t crc;
+ decode(crc, iter);
+
+ bufferlist data_bl;
+ data_bl.substr_of(iter.get_bl(), start_offset, end_offset - start_offset);
+ uint32_t actual_crc = data_bl.crc32c(0);
+ if (crc != actual_crc) {
+ throw buffer::malformed_input("crc mismatch: " + stringify(crc) +
+ " != " + stringify(actual_crc));
+ }
+}
+
+void Entry::dump(Formatter *f) const {
+ f->dump_unsigned("tag_tid", m_tag_tid);
+ f->dump_unsigned("entry_tid", m_entry_tid);
+
+ std::stringstream data;
+ m_data.hexdump(data);
+ f->dump_string("data", data.str());
+}
+
+bool Entry::is_readable(bufferlist::const_iterator iter, uint32_t *bytes_needed) {
+ using ceph::decode;
+ uint32_t start_off = iter.get_off();
+ if (iter.get_remaining() < HEADER_FIXED_SIZE) {
+ *bytes_needed = HEADER_FIXED_SIZE - iter.get_remaining();
+ return false;
+ }
+ uint64_t bl_preamble;
+ decode(bl_preamble, iter);
+ if (bl_preamble != preamble) {
+ *bytes_needed = 0;
+ return false;
+ }
+ iter.advance(HEADER_FIXED_SIZE - sizeof(bl_preamble));
+
+ if (iter.get_remaining() < sizeof(uint32_t)) {
+ *bytes_needed = sizeof(uint32_t) - iter.get_remaining();
+ return false;
+ }
+ uint32_t data_size;
+ decode(data_size, iter);
+
+ if (iter.get_remaining() < data_size) {
+ *bytes_needed = data_size - iter.get_remaining();
+ return false;
+ }
+ iter.advance(data_size);
+ uint32_t end_off = iter.get_off();
+
+ if (iter.get_remaining() < sizeof(uint32_t)) {
+ *bytes_needed = sizeof(uint32_t) - iter.get_remaining();
+ return false;
+ }
+
+ bufferlist crc_bl;
+ crc_bl.substr_of(iter.get_bl(), start_off, end_off - start_off);
+
+ *bytes_needed = 0;
+ uint32_t crc;
+ decode(crc, iter);
+ if (crc != crc_bl.crc32c(0)) {
+ return false;
+ }
+ return true;
+}
+
+void Entry::generate_test_instances(std::list<Entry *> &o) {
+ o.push_back(new Entry(1, 123, bufferlist()));
+
+ bufferlist bl;
+ bl.append("data");
+ o.push_back(new Entry(2, 123, bl));
+}
+
+bool Entry::operator==(const Entry& rhs) const {
+ return (m_tag_tid == rhs.m_tag_tid && m_entry_tid == rhs.m_entry_tid &&
+ const_cast<bufferlist&>(m_data).contents_equal(
+ const_cast<bufferlist&>(rhs.m_data)));
+}
+
+std::ostream &operator<<(std::ostream &os, const Entry &entry) {
+ os << "Entry[tag_tid=" << entry.get_tag_tid() << ", "
+ << "entry_tid=" << entry.get_entry_tid() << ", "
+ << "data size=" << entry.get_data().length() << "]";
+ return os;
+}
+
+} // namespace journal
diff --git a/src/journal/Entry.h b/src/journal/Entry.h
new file mode 100644
index 00000000..52d9e67b
--- /dev/null
+++ b/src/journal/Entry.h
@@ -0,0 +1,62 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_ENTRY_H
+#define CEPH_JOURNAL_ENTRY_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include <iosfwd>
+#include <string>
+
+namespace ceph {
+class Formatter;
+}
+
+namespace journal {
+
+class Entry {
+public:
+ Entry() : m_tag_tid(0), m_entry_tid() {}
+ Entry(uint64_t tag_tid, uint64_t entry_tid, const bufferlist &data)
+ : m_tag_tid(tag_tid), m_entry_tid(entry_tid), m_data(data)
+ {
+ }
+
+ static uint32_t get_fixed_size();
+
+ inline uint64_t get_tag_tid() const {
+ return m_tag_tid;
+ }
+ inline uint64_t get_entry_tid() const {
+ return m_entry_tid;
+ }
+ inline const bufferlist &get_data() const {
+ return m_data;
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(bufferlist::const_iterator &iter);
+ void dump(ceph::Formatter *f) const;
+
+ bool operator==(const Entry& rhs) const;
+
+ static bool is_readable(bufferlist::const_iterator iter, uint32_t *bytes_needed);
+ static void generate_test_instances(std::list<Entry *> &o);
+
+private:
+ static const uint64_t preamble = 0x3141592653589793;
+
+ uint64_t m_tag_tid;
+ uint64_t m_entry_tid;
+ bufferlist m_data;
+};
+
+std::ostream &operator<<(std::ostream &os, const Entry &entry);
+WRITE_CLASS_ENCODER(journal::Entry)
+
+} // namespace journal
+
+
+#endif // CEPH_JOURNAL_ENTRY_H
diff --git a/src/journal/Future.cc b/src/journal/Future.cc
new file mode 100644
index 00000000..89f7fd32
--- /dev/null
+++ b/src/journal/Future.cc
@@ -0,0 +1,40 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Future.h"
+#include "journal/FutureImpl.h"
+#include "include/ceph_assert.h"
+
+namespace journal {
+
+void Future::flush(Context *on_safe) {
+ m_future_impl->flush(on_safe);
+}
+
+void Future::wait(Context *on_safe) {
+ ceph_assert(on_safe != NULL);
+ m_future_impl->wait(on_safe);
+}
+
+bool Future::is_complete() const {
+ return m_future_impl->is_complete();
+}
+
+int Future::get_return_value() const {
+ return m_future_impl->get_return_value();
+}
+
+void intrusive_ptr_add_ref(FutureImpl *p) {
+ p->get();
+}
+
+void intrusive_ptr_release(FutureImpl *p) {
+ p->put();
+}
+
+std::ostream &operator<<(std::ostream &os, const Future &future) {
+ return os << *future.m_future_impl.get();
+}
+
+} // namespace journal
+
diff --git a/src/journal/Future.h b/src/journal/Future.h
new file mode 100644
index 00000000..fef00156
--- /dev/null
+++ b/src/journal/Future.h
@@ -0,0 +1,58 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_FUTURE_H
+#define CEPH_JOURNAL_FUTURE_H
+
+#include "include/int_types.h"
+#include <string>
+#include <iosfwd>
+#include <boost/intrusive_ptr.hpp>
+#include "include/ceph_assert.h"
+
+class Context;
+
+namespace journal {
+
+class FutureImpl;
+
+class Future {
+public:
+ typedef boost::intrusive_ptr<FutureImpl> FutureImplPtr;
+
+ Future() {}
+ Future(const FutureImplPtr &future_impl) : m_future_impl(future_impl) {}
+
+ inline bool is_valid() const {
+ return m_future_impl.get() != nullptr;
+ }
+
+ void flush(Context *on_safe);
+ void wait(Context *on_safe);
+
+ bool is_complete() const;
+ int get_return_value() const;
+
+private:
+ friend class Journaler;
+ friend std::ostream& operator<<(std::ostream&, const Future&);
+
+ inline FutureImplPtr get_future_impl() const {
+ return m_future_impl;
+ }
+
+ FutureImplPtr m_future_impl;
+};
+
+void intrusive_ptr_add_ref(FutureImpl *p);
+void intrusive_ptr_release(FutureImpl *p);
+
+std::ostream &operator<<(std::ostream &os, const Future &future);
+
+} // namespace journal
+
+using journal::intrusive_ptr_add_ref;
+using journal::intrusive_ptr_release;
+using journal::operator<<;
+
+#endif // CEPH_JOURNAL_FUTURE_H
diff --git a/src/journal/FutureImpl.cc b/src/journal/FutureImpl.cc
new file mode 100644
index 00000000..eac3fc39
--- /dev/null
+++ b/src/journal/FutureImpl.cc
@@ -0,0 +1,174 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/FutureImpl.h"
+#include "journal/Utils.h"
+
+namespace journal {
+
+FutureImpl::FutureImpl(uint64_t tag_tid, uint64_t entry_tid,
+ uint64_t commit_tid)
+ : RefCountedObject(NULL, 0), m_tag_tid(tag_tid), m_entry_tid(entry_tid),
+ m_commit_tid(commit_tid),
+ m_lock("FutureImpl::m_lock", false, false), m_safe(false),
+ m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
+ m_consistent_ack(this) {
+}
+
+void FutureImpl::init(const FutureImplPtr &prev_future) {
+ // chain ourself to the prior future (if any) to that we known when the
+ // journal is consistent
+ if (prev_future) {
+ m_prev_future = prev_future;
+ m_prev_future->wait(&m_consistent_ack);
+ } else {
+ m_consistent_ack.complete(0);
+ }
+}
+
+void FutureImpl::flush(Context *on_safe) {
+
+ bool complete;
+ FlushHandlers flush_handlers;
+ FutureImplPtr prev_future;
+ {
+ Mutex::Locker locker(m_lock);
+ complete = (m_safe && m_consistent);
+ if (!complete) {
+ if (on_safe != nullptr) {
+ m_contexts.push_back(on_safe);
+ }
+
+ prev_future = prepare_flush(&flush_handlers, m_lock);
+ }
+ }
+
+ // instruct prior futures to flush as well
+ while (prev_future) {
+ prev_future = prev_future->prepare_flush(&flush_handlers);
+ }
+
+ if (complete && on_safe != NULL) {
+ on_safe->complete(m_return_value);
+ } else if (!flush_handlers.empty()) {
+ // attached to journal object -- instruct it to flush all entries through
+ // this one. possible to become detached while lock is released, so flush
+ // will be re-requested by the object if it doesn't own the future
+ for (auto &pair : flush_handlers) {
+ pair.first->flush(pair.second);
+ }
+ }
+}
+
+FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers) {
+ Mutex::Locker locker(m_lock);
+ return prepare_flush(flush_handlers, m_lock);
+}
+
+FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers,
+ Mutex &lock) {
+ ceph_assert(m_lock.is_locked());
+
+ if (m_flush_state == FLUSH_STATE_NONE) {
+ m_flush_state = FLUSH_STATE_REQUESTED;
+
+ if (m_flush_handler && flush_handlers->count(m_flush_handler) == 0) {
+ flush_handlers->insert({m_flush_handler, this});
+ }
+ }
+ return m_prev_future;
+}
+
+void FutureImpl::wait(Context *on_safe) {
+ ceph_assert(on_safe != NULL);
+ {
+ Mutex::Locker locker(m_lock);
+ if (!m_safe || !m_consistent) {
+ m_contexts.push_back(on_safe);
+ return;
+ }
+ }
+
+ on_safe->complete(m_return_value);
+}
+
+bool FutureImpl::is_complete() const {
+ Mutex::Locker locker(m_lock);
+ return m_safe && m_consistent;
+}
+
+int FutureImpl::get_return_value() const {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_safe && m_consistent);
+ return m_return_value;
+}
+
+bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_flush_handler);
+ m_flush_handler = flush_handler;
+ return m_flush_state != FLUSH_STATE_NONE;
+}
+
+void FutureImpl::safe(int r) {
+ m_lock.Lock();
+ ceph_assert(!m_safe);
+ m_safe = true;
+ if (m_return_value == 0) {
+ m_return_value = r;
+ }
+
+ m_flush_handler.reset();
+ if (m_consistent) {
+ finish_unlock();
+ } else {
+ m_lock.Unlock();
+ }
+}
+
+void FutureImpl::consistent(int r) {
+ m_lock.Lock();
+ ceph_assert(!m_consistent);
+ m_consistent = true;
+ m_prev_future.reset();
+ if (m_return_value == 0) {
+ m_return_value = r;
+ }
+
+ if (m_safe) {
+ finish_unlock();
+ } else {
+ m_lock.Unlock();
+ }
+}
+
+void FutureImpl::finish_unlock() {
+ ceph_assert(m_lock.is_locked());
+ ceph_assert(m_safe && m_consistent);
+
+ Contexts contexts;
+ contexts.swap(m_contexts);
+
+ m_lock.Unlock();
+ for (Contexts::iterator it = contexts.begin();
+ it != contexts.end(); ++it) {
+ (*it)->complete(m_return_value);
+ }
+}
+
+std::ostream &operator<<(std::ostream &os, const FutureImpl &future) {
+ os << "Future[tag_tid=" << future.m_tag_tid << ", "
+ << "entry_tid=" << future.m_entry_tid << ", "
+ << "commit_tid=" << future.m_commit_tid << "]";
+ return os;
+}
+
+void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p) {
+ p->get();
+}
+
+void intrusive_ptr_release(FutureImpl::FlushHandler *p) {
+ p->put();
+}
+
+} // namespace journal
diff --git a/src/journal/FutureImpl.h b/src/journal/FutureImpl.h
new file mode 100644
index 00000000..2be3eb25
--- /dev/null
+++ b/src/journal/FutureImpl.h
@@ -0,0 +1,132 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_FUTURE_IMPL_H
+#define CEPH_JOURNAL_FUTURE_IMPL_H
+
+#include "include/int_types.h"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "include/Context.h"
+#include "journal/Future.h"
+#include <list>
+#include <map>
+#include <boost/noncopyable.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include "include/ceph_assert.h"
+
+class Context;
+
+namespace journal {
+
+class FutureImpl;
+typedef boost::intrusive_ptr<FutureImpl> FutureImplPtr;
+
+class FutureImpl : public RefCountedObject, boost::noncopyable {
+public:
+ struct FlushHandler {
+ virtual ~FlushHandler() {}
+ virtual void flush(const FutureImplPtr &future) = 0;
+ virtual void get() = 0;
+ virtual void put() = 0;
+ };
+ typedef boost::intrusive_ptr<FlushHandler> FlushHandlerPtr;
+
+ FutureImpl(uint64_t tag_tid, uint64_t entry_tid, uint64_t commit_tid);
+
+ void init(const FutureImplPtr &prev_future);
+
+ inline uint64_t get_tag_tid() const {
+ return m_tag_tid;
+ }
+ inline uint64_t get_entry_tid() const {
+ return m_entry_tid;
+ }
+ inline uint64_t get_commit_tid() const {
+ return m_commit_tid;
+ }
+
+ void flush(Context *on_safe = NULL);
+ void wait(Context *on_safe);
+
+ bool is_complete() const;
+ int get_return_value() const;
+
+ inline bool is_flush_in_progress() const {
+ Mutex::Locker locker(m_lock);
+ return (m_flush_state == FLUSH_STATE_IN_PROGRESS);
+ }
+ inline void set_flush_in_progress() {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_flush_handler);
+ m_flush_handler.reset();
+ m_flush_state = FLUSH_STATE_IN_PROGRESS;
+ }
+
+ bool attach(const FlushHandlerPtr &flush_handler);
+ inline void detach() {
+ Mutex::Locker locker(m_lock);
+ m_flush_handler.reset();
+ }
+ inline FlushHandlerPtr get_flush_handler() const {
+ Mutex::Locker locker(m_lock);
+ return m_flush_handler;
+ }
+
+ void safe(int r);
+
+private:
+ friend std::ostream &operator<<(std::ostream &, const FutureImpl &);
+
+ typedef std::map<FlushHandlerPtr, FutureImplPtr> FlushHandlers;
+ typedef std::list<Context *> Contexts;
+
+ enum FlushState {
+ FLUSH_STATE_NONE,
+ FLUSH_STATE_REQUESTED,
+ FLUSH_STATE_IN_PROGRESS
+ };
+
+ struct C_ConsistentAck : public Context {
+ FutureImplPtr future;
+ C_ConsistentAck(FutureImpl *_future) : future(_future) {}
+ void complete(int r) override {
+ future->consistent(r);
+ future.reset();
+ }
+ void finish(int r) override {}
+ };
+
+ uint64_t m_tag_tid;
+ uint64_t m_entry_tid;
+ uint64_t m_commit_tid;
+
+ mutable Mutex m_lock;
+ FutureImplPtr m_prev_future;
+ bool m_safe;
+ bool m_consistent;
+ int m_return_value;
+
+ FlushHandlerPtr m_flush_handler;
+ FlushState m_flush_state;
+
+ C_ConsistentAck m_consistent_ack;
+ Contexts m_contexts;
+
+ FutureImplPtr prepare_flush(FlushHandlers *flush_handlers);
+ FutureImplPtr prepare_flush(FlushHandlers *flush_handlers, Mutex &lock);
+
+ void consistent(int r);
+ void finish_unlock();
+};
+
+void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p);
+void intrusive_ptr_release(FutureImpl::FlushHandler *p);
+
+std::ostream &operator<<(std::ostream &os, const FutureImpl &future);
+
+} // namespace journal
+
+using journal::operator<<;
+
+#endif // CEPH_JOURNAL_FUTURE_IMPL_H
diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc
new file mode 100644
index 00000000..f35eccb1
--- /dev/null
+++ b/src/journal/JournalMetadata.cc
@@ -0,0 +1,1177 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalMetadata.h"
+#include "journal/Utils.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "cls/journal/cls_journal_client.h"
+#include <functional>
+#include <set>
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "JournalMetadata: " << this << " "
+
+namespace journal {
+
+using namespace cls::journal;
+
+namespace {
+
+struct C_GetClient : public Context {
+ CephContext *cct;
+ librados::IoCtx &ioctx;
+ const std::string &oid;
+ AsyncOpTracker &async_op_tracker;
+ std::string client_id;
+ cls::journal::Client *client;
+ Context *on_finish;
+
+ bufferlist out_bl;
+
+ C_GetClient(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
+ AsyncOpTracker &async_op_tracker, const std::string &client_id,
+ cls::journal::Client *client, Context *on_finish)
+ : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
+ client_id(client_id), client(client), on_finish(on_finish) {
+ async_op_tracker.start_op();
+ }
+ ~C_GetClient() override {
+ async_op_tracker.finish_op();
+ }
+
+ virtual void send() {
+ send_get_client();
+ }
+
+ void send_get_client() {
+ ldout(cct, 20) << "C_GetClient: " << __func__ << dendl;
+
+ librados::ObjectReadOperation op;
+ client::get_client_start(&op, client_id);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, nullptr, &utils::rados_state_callback<
+ C_GetClient, &C_GetClient::handle_get_client>);
+
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_get_client(int r) {
+ ldout(cct, 20) << "C_GetClient: " << __func__ << ": r=" << r << dendl;
+
+ if (r == 0) {
+ auto it = out_bl.cbegin();
+ r = client::get_client_finish(&it, client);
+ }
+ complete(r);
+ }
+
+ void finish(int r) override {
+ on_finish->complete(r);
+ }
+};
+
+struct C_AllocateTag : public Context {
+ CephContext *cct;
+ librados::IoCtx &ioctx;
+ const std::string &oid;
+ AsyncOpTracker &async_op_tracker;
+ uint64_t tag_class;
+ Tag *tag;
+ Context *on_finish;
+
+ bufferlist out_bl;
+
+ C_AllocateTag(CephContext *cct, librados::IoCtx &ioctx,
+ const std::string &oid, AsyncOpTracker &async_op_tracker,
+ uint64_t tag_class, const bufferlist &data, Tag *tag,
+ Context *on_finish)
+ : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
+ tag_class(tag_class), tag(tag), on_finish(on_finish) {
+ async_op_tracker.start_op();
+ tag->data = data;
+ }
+ ~C_AllocateTag() override {
+ async_op_tracker.finish_op();
+ }
+
+ void send() {
+ send_get_next_tag_tid();
+ }
+
+ void send_get_next_tag_tid() {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
+
+ librados::ObjectReadOperation op;
+ client::get_next_tag_tid_start(&op);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, nullptr, &utils::rados_state_callback<
+ C_AllocateTag, &C_AllocateTag::handle_get_next_tag_tid>);
+
+ out_bl.clear();
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_get_next_tag_tid(int r) {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
+
+ if (r == 0) {
+ auto iter = out_bl.cbegin();
+ r = client::get_next_tag_tid_finish(&iter, &tag->tid);
+ }
+ if (r < 0) {
+ complete(r);
+ return;
+ }
+ send_tag_create();
+ }
+
+ void send_tag_create() {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
+
+ librados::ObjectWriteOperation op;
+ client::tag_create(&op, tag->tid, tag_class, tag->data);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, nullptr, &utils::rados_state_callback<
+ C_AllocateTag, &C_AllocateTag::handle_tag_create>);
+
+ int r = ioctx.aio_operate(oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_tag_create(int r) {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
+
+ if (r == -ESTALE) {
+ send_get_next_tag_tid();
+ return;
+ } else if (r < 0) {
+ complete(r);
+ return;
+ }
+
+ send_get_tag();
+ }
+
+ void send_get_tag() {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << dendl;
+
+ librados::ObjectReadOperation op;
+ client::get_tag_start(&op, tag->tid);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, nullptr, &utils::rados_state_callback<
+ C_AllocateTag, &C_AllocateTag::handle_get_tag>);
+
+ out_bl.clear();
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_get_tag(int r) {
+ ldout(cct, 20) << "C_AllocateTag: " << __func__ << ": r=" << r << dendl;
+
+ if (r == 0) {
+ auto iter = out_bl.cbegin();
+
+ cls::journal::Tag journal_tag;
+ r = client::get_tag_finish(&iter, &journal_tag);
+ if (r == 0) {
+ *tag = journal_tag;
+ }
+ }
+ complete(r);
+ }
+
+ void finish(int r) override {
+ on_finish->complete(r);
+ }
+};
+
+struct C_GetTag : public Context {
+ CephContext *cct;
+ librados::IoCtx &ioctx;
+ const std::string &oid;
+ AsyncOpTracker &async_op_tracker;
+ uint64_t tag_tid;
+ JournalMetadata::Tag *tag;
+ Context *on_finish;
+
+ bufferlist out_bl;
+
+ C_GetTag(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
+ AsyncOpTracker &async_op_tracker, uint64_t tag_tid,
+ JournalMetadata::Tag *tag, Context *on_finish)
+ : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
+ tag_tid(tag_tid), tag(tag), on_finish(on_finish) {
+ async_op_tracker.start_op();
+ }
+ ~C_GetTag() override {
+ async_op_tracker.finish_op();
+ }
+
+ void send() {
+ send_get_tag();
+ }
+
+ void send_get_tag() {
+ librados::ObjectReadOperation op;
+ client::get_tag_start(&op, tag_tid);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, nullptr, &utils::rados_state_callback<
+ C_GetTag, &C_GetTag::handle_get_tag>);
+
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_get_tag(int r) {
+ if (r == 0) {
+ auto iter = out_bl.cbegin();
+ r = client::get_tag_finish(&iter, tag);
+ }
+ complete(r);
+ }
+
+ void finish(int r) override {
+ on_finish->complete(r);
+ }
+};
+
+struct C_GetTags : public Context {
+ CephContext *cct;
+ librados::IoCtx &ioctx;
+ const std::string &oid;
+ const std::string &client_id;
+ AsyncOpTracker &async_op_tracker;
+ uint64_t start_after_tag_tid;
+ boost::optional<uint64_t> tag_class;
+ JournalMetadata::Tags *tags;
+ Context *on_finish;
+
+ const uint64_t MAX_RETURN = 64;
+ bufferlist out_bl;
+
+ C_GetTags(CephContext *cct, librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &client_id, AsyncOpTracker &async_op_tracker,
+ uint64_t start_after_tag_tid,
+ const boost::optional<uint64_t> &tag_class,
+ JournalMetadata::Tags *tags, Context *on_finish)
+ : cct(cct), ioctx(ioctx), oid(oid), client_id(client_id),
+ async_op_tracker(async_op_tracker),
+ start_after_tag_tid(start_after_tag_tid), tag_class(tag_class),
+ tags(tags), on_finish(on_finish) {
+ async_op_tracker.start_op();
+ }
+ ~C_GetTags() override {
+ async_op_tracker.finish_op();
+ }
+
+ void send() {
+ send_tag_list();
+ }
+
+ void send_tag_list() {
+ librados::ObjectReadOperation op;
+ client::tag_list_start(&op, start_after_tag_tid, MAX_RETURN, client_id,
+ tag_class);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, nullptr, &utils::rados_state_callback<
+ C_GetTags, &C_GetTags::handle_tag_list>);
+
+ out_bl.clear();
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_tag_list(int r) {
+ if (r == 0) {
+ std::set<cls::journal::Tag> journal_tags;
+ auto iter = out_bl.cbegin();
+ r = client::tag_list_finish(&iter, &journal_tags);
+ if (r == 0) {
+ for (auto &journal_tag : journal_tags) {
+ tags->push_back(journal_tag);
+ start_after_tag_tid = journal_tag.tid;
+ }
+
+ if (journal_tags.size() == MAX_RETURN) {
+ send_tag_list();
+ return;
+ }
+ }
+ }
+ complete(r);
+ }
+
+ void finish(int r) override {
+ on_finish->complete(r);
+ }
+};
+
+struct C_FlushCommitPosition : public Context {
+ Context *commit_position_ctx;
+ Context *on_finish;
+
+ C_FlushCommitPosition(Context *commit_position_ctx, Context *on_finish)
+ : commit_position_ctx(commit_position_ctx), on_finish(on_finish) {
+ }
+ void finish(int r) override {
+ if (commit_position_ctx != nullptr) {
+ commit_position_ctx->complete(r);
+ }
+ on_finish->complete(r);
+ }
+};
+
+struct C_AssertActiveTag : public Context {
+ CephContext *cct;
+ librados::IoCtx &ioctx;
+ const std::string &oid;
+ AsyncOpTracker &async_op_tracker;
+ std::string client_id;
+ uint64_t tag_tid;
+ Context *on_finish;
+
+ bufferlist out_bl;
+
+ C_AssertActiveTag(CephContext *cct, librados::IoCtx &ioctx,
+ const std::string &oid, AsyncOpTracker &async_op_tracker,
+ const std::string &client_id, uint64_t tag_tid,
+ Context *on_finish)
+ : cct(cct), ioctx(ioctx), oid(oid), async_op_tracker(async_op_tracker),
+ client_id(client_id), tag_tid(tag_tid), on_finish(on_finish) {
+ async_op_tracker.start_op();
+ }
+ ~C_AssertActiveTag() override {
+ async_op_tracker.finish_op();
+ }
+
+ void send() {
+ ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << dendl;
+
+ librados::ObjectReadOperation op;
+ client::tag_list_start(&op, tag_tid, 2, client_id, boost::none);
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ this, nullptr, &utils::rados_state_callback<
+ C_AssertActiveTag, &C_AssertActiveTag::handle_send>);
+
+ int r = ioctx.aio_operate(oid, comp, &op, &out_bl);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+
+ void handle_send(int r) {
+ ldout(cct, 20) << "C_AssertActiveTag: " << __func__ << ": r=" << r << dendl;
+
+ std::set<cls::journal::Tag> tags;
+ if (r == 0) {
+ auto it = out_bl.cbegin();
+ r = client::tag_list_finish(&it, &tags);
+ }
+
+ // NOTE: since 0 is treated as an uninitialized list filter, we need to
+ // load to entries and look at the last tid
+ if (r == 0 && !tags.empty() && tags.rbegin()->tid > tag_tid) {
+ r = -ESTALE;
+ }
+ complete(r);
+ }
+
+ void finish(int r) override {
+ on_finish->complete(r);
+ }
+};
+
+} // anonymous namespace
+
+JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
+ Mutex *timer_lock, librados::IoCtx &ioctx,
+ const std::string &oid,
+ const std::string &client_id,
+ const Settings &settings)
+ : RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid),
+ m_client_id(client_id), m_settings(settings), m_order(0),
+ m_splay_width(0), m_pool_id(-1), m_initialized(false),
+ m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
+ m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
+ m_watch_handle(0), m_minimum_set(0), m_active_set(0),
+ m_update_notifications(0), m_commit_position_ctx(NULL),
+ m_commit_position_task_ctx(NULL) {
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+}
+
+JournalMetadata::~JournalMetadata() {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_initialized);
+}
+
+void JournalMetadata::init(Context *on_finish) {
+ {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_initialized);
+ m_initialized = true;
+ }
+
+ // chain the init sequence (reverse order)
+ on_finish = utils::create_async_context_callback(
+ this, on_finish);
+ on_finish = new C_ImmutableMetadata(this, on_finish);
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ if (r < 0) {
+ lderr(m_cct) << __func__ << ": failed to watch journal"
+ << cpp_strerror(r) << dendl;
+ Mutex::Locker locker(m_lock);
+ m_watch_handle = 0;
+ on_finish->complete(r);
+ return;
+ }
+
+ get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, on_finish);
+ });
+
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ on_finish, nullptr, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_watch(m_oid, comp, &m_watch_handle, &m_watch_ctx);
+ ceph_assert(r == 0);
+ comp->release();
+}
+
+void JournalMetadata::shut_down(Context *on_finish) {
+
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ uint64_t watch_handle = 0;
+ {
+ Mutex::Locker locker(m_lock);
+ m_initialized = false;
+ std::swap(watch_handle, m_watch_handle);
+ }
+
+ // chain the shut down sequence (reverse order)
+ on_finish = utils::create_async_context_callback(
+ this, on_finish);
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ ldout(m_cct, 20) << "shut_down: waiting for ops" << dendl;
+ m_async_op_tracker.wait_for_ops(on_finish);
+ });
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ ldout(m_cct, 20) << "shut_down: flushing watch" << dendl;
+ librados::Rados rados(m_ioctx);
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ on_finish, nullptr, utils::rados_ctx_callback);
+ r = rados.aio_watch_flush(comp);
+ ceph_assert(r == 0);
+ comp->release();
+ });
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ flush_commit_position(on_finish);
+ });
+ if (watch_handle != 0) {
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ on_finish, nullptr, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_unwatch(watch_handle, comp);
+ ceph_assert(r == 0);
+ comp->release();
+ } else {
+ on_finish->complete(0);
+ }
+}
+
+void JournalMetadata::get_immutable_metadata(uint8_t *order,
+ uint8_t *splay_width,
+ int64_t *pool_id,
+ Context *on_finish) {
+ client::get_immutable_metadata(m_ioctx, m_oid, order, splay_width, pool_id,
+ on_finish);
+}
+
+void JournalMetadata::get_mutable_metadata(uint64_t *minimum_set,
+ uint64_t *active_set,
+ RegisteredClients *clients,
+ Context *on_finish) {
+ client::get_mutable_metadata(m_ioctx, m_oid, minimum_set, active_set, clients,
+ on_finish);
+}
+
+void JournalMetadata::register_client(const bufferlist &data,
+ Context *on_finish) {
+ ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
+ librados::ObjectWriteOperation op;
+ client::client_register(&op, m_client_id, data);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
+
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, NULL,
+ utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+}
+
+void JournalMetadata::update_client(const bufferlist &data,
+ Context *on_finish) {
+ ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
+ librados::ObjectWriteOperation op;
+ client::client_update_data(&op, m_client_id, data);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
+
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, NULL,
+ utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+}
+
+void JournalMetadata::unregister_client(Context *on_finish) {
+ ceph_assert(!m_client_id.empty());
+
+ ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
+ librados::ObjectWriteOperation op;
+ client::client_unregister(&op, m_client_id);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
+
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, NULL,
+ utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+}
+
+void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data,
+ Tag *tag, Context *on_finish) {
+ on_finish = new C_NotifyUpdate(this, on_finish);
+ C_AllocateTag *ctx = new C_AllocateTag(m_cct, m_ioctx, m_oid,
+ m_async_op_tracker, tag_class,
+ data, tag, on_finish);
+ ctx->send();
+}
+
+void JournalMetadata::get_client(const std::string &client_id,
+ cls::journal::Client *client,
+ Context *on_finish) {
+ C_GetClient *ctx = new C_GetClient(m_cct, m_ioctx, m_oid, m_async_op_tracker,
+ client_id, client, on_finish);
+ ctx->send();
+}
+
+void JournalMetadata::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) {
+ C_GetTag *ctx = new C_GetTag(m_cct, m_ioctx, m_oid, m_async_op_tracker,
+ tag_tid, tag, on_finish);
+ ctx->send();
+}
+
+void JournalMetadata::get_tags(uint64_t start_after_tag_tid,
+ const boost::optional<uint64_t> &tag_class,
+ Tags *tags, Context *on_finish) {
+ C_GetTags *ctx = new C_GetTags(m_cct, m_ioctx, m_oid, m_client_id,
+ m_async_op_tracker, start_after_tag_tid,
+ tag_class, tags, on_finish);
+ ctx->send();
+}
+
+void JournalMetadata::add_listener(JournalMetadataListener *listener) {
+ Mutex::Locker locker(m_lock);
+ while (m_update_notifications > 0) {
+ m_update_cond.Wait(m_lock);
+ }
+ m_listeners.push_back(listener);
+}
+
+void JournalMetadata::remove_listener(JournalMetadataListener *listener) {
+ Mutex::Locker locker(m_lock);
+ while (m_update_notifications > 0) {
+ m_update_cond.Wait(m_lock);
+ }
+ m_listeners.remove(listener);
+}
+
+void JournalMetadata::set_minimum_set(uint64_t object_set) {
+ Mutex::Locker locker(m_lock);
+
+ ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set
+ << ", new=" << object_set << dendl;
+ if (m_minimum_set >= object_set) {
+ return;
+ }
+
+ librados::ObjectWriteOperation op;
+ client::set_minimum_set(&op, object_set);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this);
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, NULL,
+ utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+
+ m_minimum_set = object_set;
+}
+
+int JournalMetadata::set_active_set(uint64_t object_set) {
+ C_SaferCond ctx;
+ set_active_set(object_set, &ctx);
+ return ctx.wait();
+}
+
+void JournalMetadata::set_active_set(uint64_t object_set, Context *on_finish) {
+ Mutex::Locker locker(m_lock);
+
+ ldout(m_cct, 20) << __func__ << ": current=" << m_active_set
+ << ", new=" << object_set << dendl;
+ if (m_active_set >= object_set) {
+ m_work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ librados::ObjectWriteOperation op;
+ client::set_active_set(&op, object_set);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, NULL,
+ utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+
+ m_active_set = object_set;
+}
+
+void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) {
+ Mutex::Locker locker(m_lock);
+
+ C_AssertActiveTag *ctx = new C_AssertActiveTag(m_cct, m_ioctx, m_oid,
+ m_async_op_tracker,
+ m_client_id, tag_tid,
+ on_finish);
+ ctx->send();
+}
+
+void JournalMetadata::flush_commit_position() {
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ C_SaferCond ctx;
+ flush_commit_position(&ctx);
+ ctx.wait();
+}
+
+void JournalMetadata::flush_commit_position(Context *on_safe) {
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ Mutex::Locker timer_locker(*m_timer_lock);
+ Mutex::Locker locker(m_lock);
+ if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) {
+ // nothing to flush
+ if (on_safe != nullptr) {
+ m_work_queue->queue(on_safe, 0);
+ }
+ return;
+ }
+
+ if (on_safe != nullptr) {
+ m_flush_commit_position_ctxs.push_back(on_safe);
+ }
+ if (m_commit_position_ctx == nullptr) {
+ return;
+ }
+
+ cancel_commit_task();
+ handle_commit_position_task();
+}
+
+void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) {
+ Mutex::Locker locker(m_lock);
+ uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid];
+ if (allocated_entry_tid <= entry_tid) {
+ allocated_entry_tid = entry_tid + 1;
+ }
+}
+
+bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid,
+ uint64_t *entry_tid) const {
+ Mutex::Locker locker(m_lock);
+
+ AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid);
+ if (it == m_allocated_entry_tids.end()) {
+ return false;
+ }
+
+ ceph_assert(it->second > 0);
+ *entry_tid = it->second - 1;
+ return true;
+}
+
+void JournalMetadata::handle_immutable_metadata(int r, Context *on_init) {
+ if (r < 0) {
+ lderr(m_cct) << "failed to initialize immutable metadata: "
+ << cpp_strerror(r) << dendl;
+ on_init->complete(r);
+ return;
+ }
+
+ ldout(m_cct, 10) << "initialized immutable metadata" << dendl;
+ refresh(on_init);
+}
+
+void JournalMetadata::refresh(Context *on_complete) {
+ ldout(m_cct, 10) << "refreshing mutable metadata" << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ if (on_complete != nullptr) {
+ m_refresh_ctxs.push_back(on_complete);
+ }
+ ++m_refreshes_in_progress;
+ }
+
+ auto refresh = new C_Refresh(this);
+ get_mutable_metadata(&refresh->minimum_set, &refresh->active_set,
+ &refresh->registered_clients, refresh);
+}
+
+void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
+ ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl;
+
+ m_lock.Lock();
+ if (r == 0) {
+ Client client(m_client_id, bufferlist());
+ RegisteredClients::iterator it = refresh->registered_clients.find(client);
+ if (it != refresh->registered_clients.end()) {
+ if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) {
+ ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id
+ << dendl;
+ }
+ m_minimum_set = std::max(m_minimum_set, refresh->minimum_set);
+ m_active_set = std::max(m_active_set, refresh->active_set);
+ m_registered_clients = refresh->registered_clients;
+ m_client = *it;
+
+ ++m_update_notifications;
+ m_lock.Unlock();
+ for (Listeners::iterator it = m_listeners.begin();
+ it != m_listeners.end(); ++it) {
+ (*it)->handle_update(this);
+ }
+ m_lock.Lock();
+ if (--m_update_notifications == 0) {
+ m_update_cond.Signal();
+ }
+ } else {
+ lderr(m_cct) << "failed to locate client: " << m_client_id << dendl;
+ r = -ENOENT;
+ }
+ }
+
+ Contexts refresh_ctxs;
+ ceph_assert(m_refreshes_in_progress > 0);
+ --m_refreshes_in_progress;
+ if (m_refreshes_in_progress == 0) {
+ std::swap(refresh_ctxs, m_refresh_ctxs);
+ }
+ m_lock.Unlock();
+
+ for (auto ctx : refresh_ctxs) {
+ ctx->complete(r);
+ }
+}
+
+void JournalMetadata::cancel_commit_task() {
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ ceph_assert(m_timer_lock->is_locked());
+ ceph_assert(m_lock.is_locked());
+ ceph_assert(m_commit_position_ctx != nullptr);
+ ceph_assert(m_commit_position_task_ctx != nullptr);
+ m_timer->cancel_event(m_commit_position_task_ctx);
+ m_commit_position_task_ctx = NULL;
+}
+
+void JournalMetadata::schedule_commit_task() {
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ ceph_assert(m_timer_lock->is_locked());
+ ceph_assert(m_lock.is_locked());
+ ceph_assert(m_commit_position_ctx != nullptr);
+ if (m_commit_position_task_ctx == nullptr) {
+ m_commit_position_task_ctx =
+ m_timer->add_event_after(m_settings.commit_interval,
+ new C_CommitPositionTask(this));
+ }
+}
+
+void JournalMetadata::handle_commit_position_task() {
+ ceph_assert(m_timer_lock->is_locked());
+ ceph_assert(m_lock.is_locked());
+ ldout(m_cct, 20) << __func__ << ": "
+ << "client_id=" << m_client_id << ", "
+ << "commit_position=" << m_commit_position << dendl;
+
+ m_commit_position_task_ctx = nullptr;
+ Context* commit_position_ctx = nullptr;
+ std::swap(commit_position_ctx, m_commit_position_ctx);
+
+ m_async_op_tracker.start_op();
+ ++m_flush_commits_in_progress;
+
+ Context* ctx = new FunctionContext([this, commit_position_ctx](int r) {
+ Contexts flush_commit_position_ctxs;
+ m_lock.Lock();
+ ceph_assert(m_flush_commits_in_progress > 0);
+ --m_flush_commits_in_progress;
+ if (m_flush_commits_in_progress == 0) {
+ std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs);
+ }
+ m_lock.Unlock();
+
+ commit_position_ctx->complete(0);
+ for (auto ctx : flush_commit_position_ctxs) {
+ ctx->complete(0);
+ }
+ m_async_op_tracker.finish_op();
+ });
+ ctx = new C_NotifyUpdate(this, ctx);
+ ctx = new FunctionContext([this, ctx](int r) {
+ // manually kick of a refresh in case the notification is missed
+ // and ignore the next notification that we are about to send
+ m_lock.Lock();
+ ++m_ignore_watch_notifies;
+ m_lock.Unlock();
+
+ refresh(ctx);
+ });
+ ctx = new FunctionContext([this, ctx](int r) {
+ schedule_laggy_clients_disconnect(ctx);
+ });
+
+ librados::ObjectWriteOperation op;
+ client::client_commit(&op, m_client_id, m_commit_position);
+
+ auto comp = librados::Rados::aio_create_completion(ctx, nullptr,
+ utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+}
+
+void JournalMetadata::schedule_watch_reset() {
+ ceph_assert(m_timer_lock->is_locked());
+ m_timer->add_event_after(1, new C_WatchReset(this));
+}
+
+void JournalMetadata::handle_watch_reset() {
+ ceph_assert(m_timer_lock->is_locked());
+ if (!m_initialized) {
+ return;
+ }
+
+ int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
+ if (r < 0) {
+ if (r == -ENOENT) {
+ ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl;
+ } else if (r == -EBLACKLISTED) {
+ ldout(m_cct, 5) << __func__ << ": client blacklisted" << dendl;
+ } else {
+ lderr(m_cct) << __func__ << ": failed to watch journal: "
+ << cpp_strerror(r) << dendl;
+ }
+ schedule_watch_reset();
+ } else {
+ ldout(m_cct, 10) << __func__ << ": reset journal watch" << dendl;
+ refresh(NULL);
+ }
+}
+
+void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
+ ldout(m_cct, 10) << "journal header updated" << dendl;
+
+ bufferlist bl;
+ m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);
+
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_ignore_watch_notifies > 0) {
+ --m_ignore_watch_notifies;
+ return;
+ }
+ }
+
+ refresh(NULL);
+}
+
+void JournalMetadata::handle_watch_error(int err) {
+ if (err == -ENOTCONN) {
+ ldout(m_cct, 5) << "journal watch error: header removed" << dendl;
+ } else if (err == -EBLACKLISTED) {
+ lderr(m_cct) << "journal watch error: client blacklisted" << dendl;
+ } else {
+ lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
+ }
+
+ Mutex::Locker timer_locker(*m_timer_lock);
+ Mutex::Locker locker(m_lock);
+
+ // release old watch on error
+ if (m_watch_handle != 0) {
+ m_ioctx.unwatch2(m_watch_handle);
+ m_watch_handle = 0;
+ }
+
+ if (m_initialized && err != -ENOENT) {
+ schedule_watch_reset();
+ }
+}
+
+uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
+ uint64_t tag_tid,
+ uint64_t entry_tid) {
+ Mutex::Locker locker(m_lock);
+ uint64_t commit_tid = ++m_commit_tid;
+ m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid,
+ entry_tid);
+
+ ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " ["
+ << "object_num=" << object_num << ", "
+ << "tag_tid=" << tag_tid << ", "
+ << "entry_tid=" << entry_tid << "]"
+ << dendl;
+ return commit_tid;
+}
+
+void JournalMetadata::overflow_commit_tid(uint64_t commit_tid,
+ uint64_t object_num) {
+ Mutex::Locker locker(m_lock);
+
+ auto it = m_pending_commit_tids.find(commit_tid);
+ ceph_assert(it != m_pending_commit_tids.end());
+ ceph_assert(it->second.object_num < object_num);
+
+ ldout(m_cct, 20) << __func__ << ": "
+ << "commit_tid=" << commit_tid << ", "
+ << "old_object_num=" << it->second.object_num << ", "
+ << "new_object_num=" << object_num << dendl;
+ it->second.object_num = object_num;
+}
+
+void JournalMetadata::get_commit_entry(uint64_t commit_tid,
+ uint64_t *object_num,
+ uint64_t *tag_tid, uint64_t *entry_tid) {
+ Mutex::Locker locker(m_lock);
+
+ auto it = m_pending_commit_tids.find(commit_tid);
+ ceph_assert(it != m_pending_commit_tids.end());
+
+ *object_num = it->second.object_num;
+ *tag_tid = it->second.tag_tid;
+ *entry_tid = it->second.entry_tid;
+}
+
+void JournalMetadata::committed(uint64_t commit_tid,
+ const CreateContext &create_context) {
+ ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl;
+
+ ObjectSetPosition commit_position;
+ Context *stale_ctx = nullptr;
+ {
+ Mutex::Locker timer_locker(*m_timer_lock);
+ Mutex::Locker locker(m_lock);
+ ceph_assert(commit_tid > m_commit_position_tid);
+
+ if (!m_commit_position.object_positions.empty()) {
+ // in-flight commit position update
+ commit_position = m_commit_position;
+ } else {
+ // safe commit position
+ commit_position = m_client.commit_position;
+ }
+
+ CommitTids::iterator it = m_pending_commit_tids.find(commit_tid);
+ ceph_assert(it != m_pending_commit_tids.end());
+
+ CommitEntry &commit_entry = it->second;
+ commit_entry.committed = true;
+
+ bool update_commit_position = false;
+ while (!m_pending_commit_tids.empty()) {
+ CommitTids::iterator it = m_pending_commit_tids.begin();
+ CommitEntry &commit_entry = it->second;
+ if (!commit_entry.committed) {
+ break;
+ }
+
+ commit_position.object_positions.emplace_front(
+ commit_entry.object_num, commit_entry.tag_tid,
+ commit_entry.entry_tid);
+ m_pending_commit_tids.erase(it);
+ update_commit_position = true;
+ }
+
+ if (!update_commit_position) {
+ return;
+ }
+
+ // prune the position to have one position per splay offset
+ std::set<uint8_t> in_use_splay_offsets;
+ ObjectPositions::iterator ob_it = commit_position.object_positions.begin();
+ while (ob_it != commit_position.object_positions.end()) {
+ uint8_t splay_offset = ob_it->object_number % m_splay_width;
+ if (!in_use_splay_offsets.insert(splay_offset).second) {
+ ob_it = commit_position.object_positions.erase(ob_it);
+ } else {
+ ++ob_it;
+ }
+ }
+
+ stale_ctx = m_commit_position_ctx;
+ m_commit_position_ctx = create_context();
+ m_commit_position = commit_position;
+ m_commit_position_tid = commit_tid;
+
+ ldout(m_cct, 20) << "updated commit position: " << commit_position << ", "
+ << "on_safe=" << m_commit_position_ctx << dendl;
+ schedule_commit_task();
+ }
+
+
+ if (stale_ctx != nullptr) {
+ ldout(m_cct, 20) << "canceling stale commit: on_safe=" << stale_ctx
+ << dendl;
+ stale_ctx->complete(-ESTALE);
+ }
+}
+
+void JournalMetadata::notify_update() {
+ ldout(m_cct, 10) << "notifying journal header update" << dendl;
+
+ bufferlist bl;
+ m_ioctx.notify2(m_oid, bl, 5000, NULL);
+}
+
+void JournalMetadata::async_notify_update(Context *on_safe) {
+ ldout(m_cct, 10) << "async notifying journal header update" << dendl;
+
+ C_AioNotify *ctx = new C_AioNotify(this, on_safe);
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, NULL,
+ utils::rados_ctx_callback);
+
+ bufferlist bl;
+ int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL);
+ ceph_assert(r == 0);
+
+ comp->release();
+}
+
+void JournalMetadata::wait_for_ops() {
+ C_SaferCond ctx;
+ m_async_op_tracker.wait_for_ops(&ctx);
+ ctx.wait();
+}
+
+void JournalMetadata::handle_notified(int r) {
+ ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
+}
+
+void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
+ ldout(m_cct, 20) << __func__ << dendl;
+ if (m_settings.max_concurrent_object_sets <= 0) {
+ on_finish->complete(0);
+ return;
+ }
+
+ Context *ctx = on_finish;
+ {
+ Mutex::Locker locker(m_lock);
+ for (auto &c : m_registered_clients) {
+ if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
+ c.id == m_client_id ||
+ m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
+ continue;
+ }
+ const std::string &client_id = c.id;
+ uint64_t object_set = 0;
+ if (!c.commit_position.object_positions.empty()) {
+ auto &position = *(c.commit_position.object_positions.begin());
+ object_set = position.object_number / m_splay_width;
+ }
+
+ if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
+ ldout(m_cct, 1) << __func__ << ": " << client_id
+ << ": scheduling disconnect" << dendl;
+
+ ctx = new FunctionContext([this, client_id, ctx](int r1) {
+ ldout(m_cct, 10) << __func__ << ": " << client_id
+ << ": flagging disconnected" << dendl;
+
+ librados::ObjectWriteOperation op;
+ client::client_update_state(
+ &op, client_id, cls::journal::CLIENT_STATE_DISCONNECTED);
+
+ auto comp = librados::Rados::aio_create_completion(
+ ctx, nullptr, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+ });
+ }
+ }
+ }
+
+ if (ctx == on_finish) {
+ ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
+ }
+ ctx->complete(0);
+}
+
+std::ostream &operator<<(std::ostream &os,
+ const JournalMetadata::RegisteredClients &clients) {
+ os << "[";
+ for (JournalMetadata::RegisteredClients::const_iterator c = clients.begin();
+ c != clients.end(); ++c) {
+ os << (c == clients.begin() ? "" : ", " ) << *c;
+ }
+ os << "]";
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os,
+ const JournalMetadata &jm) {
+ Mutex::Locker locker(jm.m_lock);
+ os << "[oid=" << jm.m_oid << ", "
+ << "initialized=" << jm.m_initialized << ", "
+ << "order=" << (int)jm.m_order << ", "
+ << "splay_width=" << (int)jm.m_splay_width << ", "
+ << "pool_id=" << jm.m_pool_id << ", "
+ << "minimum_set=" << jm.m_minimum_set << ", "
+ << "active_set=" << jm.m_active_set << ", "
+ << "client_id=" << jm.m_client_id << ", "
+ << "commit_tid=" << jm.m_commit_tid << ", "
+ << "commit_interval=" << jm.m_settings.commit_interval << ", "
+ << "commit_position=" << jm.m_commit_position << ", "
+ << "registered_clients=" << jm.m_registered_clients << "]";
+ return os;
+}
+
+} // namespace journal
diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h
new file mode 100644
index 00000000..393d2029
--- /dev/null
+++ b/src/journal/JournalMetadata.h
@@ -0,0 +1,379 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNAL_METADATA_H
+#define CEPH_JOURNAL_JOURNAL_METADATA_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/AsyncOpTracker.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "common/WorkQueue.h"
+#include "cls/journal/cls_journal_types.h"
+#include "journal/JournalMetadataListener.h"
+#include "journal/Settings.h"
+#include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
+#include <boost/optional.hpp>
+#include <functional>
+#include <list>
+#include <map>
+#include <string>
+#include "include/ceph_assert.h"
+
+class SafeTimer;
+
+namespace journal {
+
+class JournalMetadata;
+typedef boost::intrusive_ptr<JournalMetadata> JournalMetadataPtr;
+
+class JournalMetadata : public RefCountedObject, boost::noncopyable {
+public:
+ typedef std::function<Context*()> CreateContext;
+ typedef cls::journal::ObjectPosition ObjectPosition;
+ typedef cls::journal::ObjectPositions ObjectPositions;
+ typedef cls::journal::ObjectSetPosition ObjectSetPosition;
+ typedef cls::journal::Client Client;
+ typedef cls::journal::Tag Tag;
+
+ typedef std::set<Client> RegisteredClients;
+ typedef std::list<Tag> Tags;
+
+ JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+ librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &client_id, const Settings &settings);
+ ~JournalMetadata() override;
+
+ void init(Context *on_init);
+ void shut_down(Context *on_finish);
+
+ bool is_initialized() const { return m_initialized; }
+
+ void get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
+ int64_t *pool_id, Context *on_finish);
+
+ void get_mutable_metadata(uint64_t *minimum_set, uint64_t *active_set,
+ RegisteredClients *clients, Context *on_finish);
+
+ void add_listener(JournalMetadataListener *listener);
+ void remove_listener(JournalMetadataListener *listener);
+
+ void register_client(const bufferlist &data, Context *on_finish);
+ void update_client(const bufferlist &data, Context *on_finish);
+ void unregister_client(Context *on_finish);
+ void get_client(const std::string &client_id, cls::journal::Client *client,
+ Context *on_finish);
+
+ void allocate_tag(uint64_t tag_class, const bufferlist &data,
+ Tag *tag, Context *on_finish);
+ void get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish);
+ void get_tags(uint64_t start_after_tag_tid,
+ const boost::optional<uint64_t> &tag_class, Tags *tags,
+ Context *on_finish);
+
+ inline const Settings &get_settings() const {
+ return m_settings;
+ }
+ inline const std::string &get_client_id() const {
+ return m_client_id;
+ }
+ inline uint8_t get_order() const {
+ return m_order;
+ }
+ inline uint64_t get_object_size() const {
+ return 1 << m_order;
+ }
+ inline uint8_t get_splay_width() const {
+ return m_splay_width;
+ }
+ inline int64_t get_pool_id() const {
+ return m_pool_id;
+ }
+
+ inline void queue(Context *on_finish, int r) {
+ m_work_queue->queue(on_finish, r);
+ }
+
+ inline ContextWQ *get_work_queue() {
+ return m_work_queue;
+ }
+
+ inline SafeTimer &get_timer() {
+ return *m_timer;
+ }
+ inline Mutex &get_timer_lock() {
+ return *m_timer_lock;
+ }
+
+ void set_minimum_set(uint64_t object_set);
+ inline uint64_t get_minimum_set() const {
+ Mutex::Locker locker(m_lock);
+ return m_minimum_set;
+ }
+
+ int set_active_set(uint64_t object_set);
+ void set_active_set(uint64_t object_set, Context *on_finish);
+ inline uint64_t get_active_set() const {
+ Mutex::Locker locker(m_lock);
+ return m_active_set;
+ }
+
+ void assert_active_tag(uint64_t tag_tid, Context *on_finish);
+
+ void flush_commit_position();
+ void flush_commit_position(Context *on_safe);
+ void get_commit_position(ObjectSetPosition *commit_position) const {
+ Mutex::Locker locker(m_lock);
+ *commit_position = m_client.commit_position;
+ }
+
+ void get_registered_clients(RegisteredClients *registered_clients) {
+ Mutex::Locker locker(m_lock);
+ *registered_clients = m_registered_clients;
+ }
+
+ inline uint64_t allocate_entry_tid(uint64_t tag_tid) {
+ Mutex::Locker locker(m_lock);
+ return m_allocated_entry_tids[tag_tid]++;
+ }
+ void reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid);
+ bool get_last_allocated_entry_tid(uint64_t tag_tid, uint64_t *entry_tid) const;
+
+ uint64_t allocate_commit_tid(uint64_t object_num, uint64_t tag_tid,
+ uint64_t entry_tid);
+ void overflow_commit_tid(uint64_t commit_tid, uint64_t object_num);
+ void get_commit_entry(uint64_t commit_tid, uint64_t *object_num,
+ uint64_t *tag_tid, uint64_t *entry_tid);
+ void committed(uint64_t commit_tid, const CreateContext &create_context);
+
+ void notify_update();
+ void async_notify_update(Context *on_safe);
+
+ void wait_for_ops();
+
+private:
+ typedef std::map<uint64_t, uint64_t> AllocatedEntryTids;
+ typedef std::list<JournalMetadataListener*> Listeners;
+ typedef std::list<Context*> Contexts;
+
+ struct CommitEntry {
+ uint64_t object_num;
+ uint64_t tag_tid;
+ uint64_t entry_tid;
+ bool committed;
+
+ CommitEntry() : object_num(0), tag_tid(0), entry_tid(0), committed(false) {
+ }
+ CommitEntry(uint64_t _object_num, uint64_t _tag_tid, uint64_t _entry_tid)
+ : object_num(_object_num), tag_tid(_tag_tid), entry_tid(_entry_tid),
+ committed(false) {
+ }
+ };
+ typedef std::map<uint64_t, CommitEntry> CommitTids;
+
+ struct C_WatchCtx : public librados::WatchCtx2 {
+ JournalMetadata *journal_metadata;
+
+ C_WatchCtx(JournalMetadata *_journal_metadata)
+ : journal_metadata(_journal_metadata) {}
+
+ void handle_notify(uint64_t notify_id, uint64_t cookie,
+ uint64_t notifier_id, bufferlist& bl) override {
+ journal_metadata->handle_watch_notify(notify_id, cookie);
+ }
+ void handle_error(uint64_t cookie, int err) override {
+ journal_metadata->handle_watch_error(err);
+ }
+ };
+
+ struct C_WatchReset : public Context {
+ JournalMetadata *journal_metadata;
+
+ C_WatchReset(JournalMetadata *_journal_metadata)
+ : journal_metadata(_journal_metadata) {
+ journal_metadata->m_async_op_tracker.start_op();
+ }
+ ~C_WatchReset() override {
+ journal_metadata->m_async_op_tracker.finish_op();
+ }
+ void finish(int r) override {
+ journal_metadata->handle_watch_reset();
+ }
+ };
+
+ struct C_CommitPositionTask : public Context {
+ JournalMetadata *journal_metadata;
+
+ C_CommitPositionTask(JournalMetadata *_journal_metadata)
+ : journal_metadata(_journal_metadata) {
+ journal_metadata->m_async_op_tracker.start_op();
+ }
+ ~C_CommitPositionTask() override {
+ journal_metadata->m_async_op_tracker.finish_op();
+ }
+ void finish(int r) override {
+ Mutex::Locker locker(journal_metadata->m_lock);
+ journal_metadata->handle_commit_position_task();
+ };
+ };
+
+ struct C_AioNotify : public Context {
+ JournalMetadata* journal_metadata;
+ Context *on_safe;
+
+ C_AioNotify(JournalMetadata *_journal_metadata, Context *_on_safe)
+ : journal_metadata(_journal_metadata), on_safe(_on_safe) {
+ journal_metadata->m_async_op_tracker.start_op();
+ }
+ ~C_AioNotify() override {
+ journal_metadata->m_async_op_tracker.finish_op();
+ }
+ void finish(int r) override {
+ journal_metadata->handle_notified(r);
+ if (on_safe != nullptr) {
+ on_safe->complete(0);
+ }
+ }
+ };
+
+ struct C_NotifyUpdate : public Context {
+ JournalMetadata* journal_metadata;
+ Context *on_safe;
+
+ C_NotifyUpdate(JournalMetadata *_journal_metadata, Context *_on_safe = NULL)
+ : journal_metadata(_journal_metadata), on_safe(_on_safe) {
+ journal_metadata->m_async_op_tracker.start_op();
+ }
+ ~C_NotifyUpdate() override {
+ journal_metadata->m_async_op_tracker.finish_op();
+ }
+ void finish(int r) override {
+ if (r == 0) {
+ journal_metadata->async_notify_update(on_safe);
+ return;
+ }
+ if (on_safe != NULL) {
+ on_safe->complete(r);
+ }
+ }
+ };
+
+ struct C_ImmutableMetadata : public Context {
+ JournalMetadata* journal_metadata;
+ Context *on_finish;
+
+ C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish)
+ : journal_metadata(_journal_metadata), on_finish(_on_finish) {
+ Mutex::Locker locker(journal_metadata->m_lock);
+ journal_metadata->m_async_op_tracker.start_op();
+ }
+ ~C_ImmutableMetadata() override {
+ journal_metadata->m_async_op_tracker.finish_op();
+ }
+ void finish(int r) override {
+ journal_metadata->handle_immutable_metadata(r, on_finish);
+ }
+ };
+
+ struct C_Refresh : public Context {
+ JournalMetadata* journal_metadata;
+ uint64_t minimum_set;
+ uint64_t active_set;
+ RegisteredClients registered_clients;
+
+ C_Refresh(JournalMetadata *_journal_metadata)
+ : journal_metadata(_journal_metadata), minimum_set(0), active_set(0) {
+ Mutex::Locker locker(journal_metadata->m_lock);
+ journal_metadata->m_async_op_tracker.start_op();
+ }
+ ~C_Refresh() override {
+ journal_metadata->m_async_op_tracker.finish_op();
+ }
+ void finish(int r) override {
+ journal_metadata->handle_refresh_complete(this, r);
+ }
+ };
+
+ librados::IoCtx m_ioctx;
+ CephContext *m_cct;
+ std::string m_oid;
+ std::string m_client_id;
+ Settings m_settings;
+
+ uint8_t m_order;
+ uint8_t m_splay_width;
+ int64_t m_pool_id;
+ bool m_initialized;
+
+ ContextWQ *m_work_queue;
+ SafeTimer *m_timer;
+ Mutex *m_timer_lock;
+
+ mutable Mutex m_lock;
+
+ uint64_t m_commit_tid;
+ CommitTids m_pending_commit_tids;
+
+ Listeners m_listeners;
+
+ C_WatchCtx m_watch_ctx;
+ uint64_t m_watch_handle;
+
+ uint64_t m_minimum_set;
+ uint64_t m_active_set;
+ RegisteredClients m_registered_clients;
+ Client m_client;
+
+ AllocatedEntryTids m_allocated_entry_tids;
+
+ size_t m_update_notifications;
+ Cond m_update_cond;
+
+ size_t m_ignore_watch_notifies = 0;
+ size_t m_refreshes_in_progress = 0;
+ Contexts m_refresh_ctxs;
+
+ uint64_t m_commit_position_tid = 0;
+ ObjectSetPosition m_commit_position;
+ Context *m_commit_position_ctx;
+ Context *m_commit_position_task_ctx;
+
+ size_t m_flush_commits_in_progress = 0;
+ Contexts m_flush_commit_position_ctxs;
+
+ AsyncOpTracker m_async_op_tracker;
+
+ void handle_immutable_metadata(int r, Context *on_init);
+
+ void refresh(Context *on_finish);
+ void handle_refresh_complete(C_Refresh *refresh, int r);
+
+ void cancel_commit_task();
+ void schedule_commit_task();
+ void handle_commit_position_task();
+
+ void schedule_watch_reset();
+ void handle_watch_reset();
+ void handle_watch_notify(uint64_t notify_id, uint64_t cookie);
+ void handle_watch_error(int err);
+ void handle_notified(int r);
+
+ void schedule_laggy_clients_disconnect(Context *on_finish);
+
+ friend std::ostream &operator<<(std::ostream &os,
+ const JournalMetadata &journal_metadata);
+};
+
+std::ostream &operator<<(std::ostream &os,
+ const JournalMetadata::RegisteredClients &clients);
+
+std::ostream &operator<<(std::ostream &os,
+ const JournalMetadata &journal_metadata);
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNAL_METADATA_H
diff --git a/src/journal/JournalMetadataListener.h b/src/journal/JournalMetadataListener.h
new file mode 100644
index 00000000..121fe685
--- /dev/null
+++ b/src/journal/JournalMetadataListener.h
@@ -0,0 +1,30 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 SUSE LINUX GmbH
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_JOURNAL_JOURNAL_METADATA_LISTENER_H
+#define CEPH_JOURNAL_JOURNAL_METADATA_LISTENER_H
+
+namespace journal {
+
+class JournalMetadata;
+
+struct JournalMetadataListener {
+ virtual ~JournalMetadataListener() {};
+ virtual void handle_update(JournalMetadata *) = 0;
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNAL_METADATA_LISTENER_H
+
diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc
new file mode 100644
index 00000000..89952e63
--- /dev/null
+++ b/src/journal/JournalPlayer.cc
@@ -0,0 +1,803 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalPlayer.h"
+#include "journal/Entry.h"
+#include "journal/ReplayHandler.h"
+#include "journal/Utils.h"
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "JournalPlayer: " << this << " "
+
+namespace journal {
+
+namespace {
+
+struct C_HandleComplete : public Context {
+ ReplayHandler *replay_handler;
+
+ explicit C_HandleComplete(ReplayHandler *_replay_handler)
+ : replay_handler(_replay_handler) {
+ replay_handler->get();
+ }
+ ~C_HandleComplete() override {
+ replay_handler->put();
+ }
+ void finish(int r) override {
+ replay_handler->handle_complete(r);
+ }
+};
+
+struct C_HandleEntriesAvailable : public Context {
+ ReplayHandler *replay_handler;
+
+ explicit C_HandleEntriesAvailable(ReplayHandler *_replay_handler)
+ : replay_handler(_replay_handler) {
+ replay_handler->get();
+ }
+ ~C_HandleEntriesAvailable() override {
+ replay_handler->put();
+ }
+ void finish(int r) override {
+ replay_handler->handle_entries_available();
+ }
+};
+
+} // anonymous namespace
+
+JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
+ const std::string &object_oid_prefix,
+ const JournalMetadataPtr& journal_metadata,
+ ReplayHandler *replay_handler)
+ : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
+ m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
+ m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), m_splay_offset(0),
+ m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0) {
+ m_replay_handler->get();
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+
+ ObjectSetPosition commit_position;
+ m_journal_metadata->get_commit_position(&commit_position);
+
+ if (!commit_position.object_positions.empty()) {
+ ldout(m_cct, 5) << "commit position: " << commit_position << dendl;
+
+ // start replay after the last committed entry's object
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ auto &active_position = commit_position.object_positions.front();
+ m_active_tag_tid = active_position.tag_tid;
+ m_commit_position_valid = true;
+ m_commit_position = active_position;
+ m_splay_offset = active_position.object_number % splay_width;
+ for (auto &position : commit_position.object_positions) {
+ uint8_t splay_offset = position.object_number % splay_width;
+ m_commit_positions[splay_offset] = position;
+ }
+ }
+}
+
+JournalPlayer::~JournalPlayer() {
+ ceph_assert(m_async_op_tracker.empty());
+ {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_shut_down);
+ ceph_assert(m_fetch_object_numbers.empty());
+ ceph_assert(!m_watch_scheduled);
+ }
+ m_replay_handler->put();
+}
+
+void JournalPlayer::prefetch() {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_state == STATE_INIT);
+ m_state = STATE_PREFETCH;
+
+ m_active_set = m_journal_metadata->get_active_set();
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+ m_prefetch_splay_offsets.insert(splay_offset);
+ }
+
+ // compute active object for each splay offset (might be before
+ // active set)
+ std::map<uint8_t, uint64_t> splay_offset_to_objects;
+ for (auto &position : m_commit_positions) {
+ ceph_assert(splay_offset_to_objects.count(position.first) == 0);
+ splay_offset_to_objects[position.first] = position.second.object_number;
+ }
+
+ // prefetch the active object for each splay offset
+ std::set<uint64_t> prefetch_object_numbers;
+ for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+ uint64_t object_number = splay_offset;
+ if (splay_offset_to_objects.count(splay_offset) != 0) {
+ object_number = splay_offset_to_objects[splay_offset];
+ }
+
+ prefetch_object_numbers.insert(object_number);
+ }
+
+ ldout(m_cct, 10) << __func__ << ": prefetching "
+ << prefetch_object_numbers.size() << " " << "objects"
+ << dendl;
+ for (auto object_number : prefetch_object_numbers) {
+ fetch(object_number);
+ }
+}
+
+void JournalPlayer::prefetch_and_watch(double interval) {
+ {
+ Mutex::Locker locker(m_lock);
+ m_watch_enabled = true;
+ m_watch_interval = interval;
+ m_watch_step = WATCH_STEP_FETCH_CURRENT;
+ }
+ prefetch();
+}
+
+void JournalPlayer::shut_down(Context *on_finish) {
+ ldout(m_cct, 20) << __func__ << dendl;
+ Mutex::Locker locker(m_lock);
+
+ ceph_assert(!m_shut_down);
+ m_shut_down = true;
+ m_watch_enabled = false;
+
+ on_finish = utils::create_async_context_callback(
+ m_journal_metadata, on_finish);
+
+ if (m_watch_scheduled) {
+ ObjectPlayerPtr object_player = get_object_player();
+ switch (m_watch_step) {
+ case WATCH_STEP_FETCH_FIRST:
+ object_player = m_object_players.begin()->second;
+ // fallthrough
+ case WATCH_STEP_FETCH_CURRENT:
+ object_player->unwatch();
+ break;
+ case WATCH_STEP_ASSERT_ACTIVE:
+ break;
+ }
+ }
+
+ m_async_op_tracker.wait_for_ops(on_finish);
+}
+
+bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
+ ldout(m_cct, 20) << __func__ << dendl;
+ Mutex::Locker locker(m_lock);
+
+ if (m_state != STATE_PLAYBACK) {
+ m_handler_notified = false;
+ return false;
+ }
+
+ if (!verify_playback_ready()) {
+ if (!is_object_set_ready()) {
+ m_handler_notified = false;
+ } else {
+ refetch(true);
+ }
+ return false;
+ }
+
+ ObjectPlayerPtr object_player = get_object_player();
+ ceph_assert(object_player && !object_player->empty());
+
+ object_player->front(entry);
+ object_player->pop_front();
+
+ uint64_t last_entry_tid;
+ if (m_journal_metadata->get_last_allocated_entry_tid(
+ entry->get_tag_tid(), &last_entry_tid) &&
+ entry->get_entry_tid() != last_entry_tid + 1) {
+ lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
+
+ m_state = STATE_ERROR;
+ notify_complete(-ENOMSG);
+ return false;
+ }
+
+ advance_splay_object();
+ remove_empty_object_player(object_player);
+
+ m_journal_metadata->reserve_entry_tid(entry->get_tag_tid(),
+ entry->get_entry_tid());
+ *commit_tid = m_journal_metadata->allocate_commit_tid(
+ object_player->get_object_number(), entry->get_tag_tid(),
+ entry->get_entry_tid());
+ return true;
+}
+
+void JournalPlayer::process_state(uint64_t object_number, int r) {
+ ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", "
+ << "r=" << r << dendl;
+
+ ceph_assert(m_lock.is_locked());
+ if (r >= 0) {
+ switch (m_state) {
+ case STATE_PREFETCH:
+ ldout(m_cct, 10) << "PREFETCH" << dendl;
+ r = process_prefetch(object_number);
+ break;
+ case STATE_PLAYBACK:
+ ldout(m_cct, 10) << "PLAYBACK" << dendl;
+ r = process_playback(object_number);
+ break;
+ case STATE_ERROR:
+ ldout(m_cct, 10) << "ERROR" << dendl;
+ break;
+ default:
+ lderr(m_cct) << "UNEXPECTED STATE (" << m_state << ")" << dendl;
+ ceph_abort();
+ break;
+ }
+ }
+
+ if (r < 0) {
+ m_state = STATE_ERROR;
+ notify_complete(r);
+ }
+}
+
+int JournalPlayer::process_prefetch(uint64_t object_number) {
+ ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
+ ceph_assert(m_lock.is_locked());
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint8_t splay_offset = object_number % splay_width;
+
+ PrefetchSplayOffsets::iterator it = m_prefetch_splay_offsets.find(
+ splay_offset);
+ if (it == m_prefetch_splay_offsets.end()) {
+ return 0;
+ }
+
+ bool prefetch_complete = false;
+ ceph_assert(m_object_players.count(splay_offset) == 1);
+ ObjectPlayerPtr object_player = m_object_players[splay_offset];
+
+ // prefetch in-order since a newer splay object could prefetch first
+ if (m_fetch_object_numbers.count(object_player->get_object_number()) == 0) {
+ // skip past known committed records
+ if (m_commit_positions.count(splay_offset) != 0 &&
+ !object_player->empty()) {
+ ObjectPosition &position = m_commit_positions[splay_offset];
+
+ ldout(m_cct, 15) << "seeking known commit position " << position << " in "
+ << object_player->get_oid() << dendl;
+
+ bool found_commit = false;
+ Entry entry;
+ while (!object_player->empty()) {
+ object_player->front(&entry);
+
+ if (entry.get_tag_tid() == position.tag_tid &&
+ entry.get_entry_tid() == position.entry_tid) {
+ found_commit = true;
+ } else if (found_commit) {
+ ldout(m_cct, 10) << "located next uncommitted entry: " << entry
+ << dendl;
+ break;
+ }
+
+ ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl;
+ m_journal_metadata->reserve_entry_tid(entry.get_tag_tid(),
+ entry.get_entry_tid());
+ object_player->pop_front();
+ }
+
+ // do not search for commit position for this object
+ // if we've already seen it
+ if (found_commit) {
+ m_commit_positions.erase(splay_offset);
+ }
+ }
+
+ // if the object is empty, pre-fetch the next splay object
+ if (object_player->empty() && object_player->refetch_required()) {
+ ldout(m_cct, 10) << "refetching potentially partially decoded object"
+ << dendl;
+ object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE);
+ fetch(object_player);
+ } else if (!remove_empty_object_player(object_player)) {
+ ldout(m_cct, 10) << "prefetch of object complete" << dendl;
+ prefetch_complete = true;
+ }
+ }
+
+ if (!prefetch_complete) {
+ return 0;
+ }
+
+ m_prefetch_splay_offsets.erase(it);
+ if (!m_prefetch_splay_offsets.empty()) {
+ return 0;
+ }
+
+ ldout(m_cct, 10) << "switching to playback mode" << dendl;
+ m_state = STATE_PLAYBACK;
+
+ // if we have a valid commit position, our read should start with
+ // the next consistent journal entry in the sequence
+ if (m_commit_position_valid) {
+ splay_offset = m_commit_position.object_number % splay_width;
+ object_player = m_object_players[splay_offset];
+
+ if (object_player->empty()) {
+ if (!object_player->refetch_required()) {
+ advance_splay_object();
+ }
+ } else {
+ Entry entry;
+ object_player->front(&entry);
+ if (entry.get_tag_tid() == m_commit_position.tag_tid) {
+ advance_splay_object();
+ }
+ }
+ }
+
+ if (verify_playback_ready()) {
+ notify_entries_available();
+ } else if (is_object_set_ready()) {
+ refetch(false);
+ }
+ return 0;
+}
+
+int JournalPlayer::process_playback(uint64_t object_number) {
+ ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl;
+ ceph_assert(m_lock.is_locked());
+
+ if (verify_playback_ready()) {
+ notify_entries_available();
+ } else if (is_object_set_ready()) {
+ refetch(false);
+ }
+ return 0;
+}
+
+bool JournalPlayer::is_object_set_ready() const {
+ ceph_assert(m_lock.is_locked());
+ if (m_watch_scheduled || !m_fetch_object_numbers.empty()) {
+ ldout(m_cct, 20) << __func__ << ": waiting for in-flight fetch" << dendl;
+ return false;
+ }
+
+ return true;
+}
+
+bool JournalPlayer::verify_playback_ready() {
+ ceph_assert(m_lock.is_locked());
+
+ while (true) {
+ if (!is_object_set_ready()) {
+ ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl;
+ return false;
+ }
+
+ ObjectPlayerPtr object_player = get_object_player();
+ ceph_assert(object_player);
+ uint64_t object_num = object_player->get_object_number();
+
+ // Verify is the active object player has another entry available
+ // in the sequence
+ // NOTE: replay currently does not check tag class to playback multiple tags
+ // from different classes (issue #14909). When a new tag is discovered, it
+ // is assumed that the previous tag was closed at the last replayable entry.
+ Entry entry;
+ if (!object_player->empty()) {
+ m_watch_prune_active_tag = false;
+ object_player->front(&entry);
+
+ if (!m_active_tag_tid) {
+ ldout(m_cct, 10) << __func__ << ": "
+ << "object_num=" << object_num << ", "
+ << "initial tag=" << entry.get_tag_tid()
+ << dendl;
+ m_active_tag_tid = entry.get_tag_tid();
+ return true;
+ } else if (entry.get_tag_tid() < *m_active_tag_tid ||
+ (m_prune_tag_tid && entry.get_tag_tid() <= *m_prune_tag_tid)) {
+ // entry occurred before the current active tag
+ ldout(m_cct, 10) << __func__ << ": detected stale entry: "
+ << "object_num=" << object_num << ", "
+ << "entry=" << entry << dendl;
+ prune_tag(entry.get_tag_tid());
+ continue;
+ } else if (entry.get_tag_tid() > *m_active_tag_tid) {
+ // new tag at current playback position -- implies that previous
+ // tag ended abruptly without flushing out all records
+ // search for the start record for the next tag
+ ldout(m_cct, 10) << __func__ << ": new tag detected: "
+ << "object_num=" << object_num << ", "
+ << "active_tag=" << *m_active_tag_tid << ", "
+ << "new_tag=" << entry.get_tag_tid() << dendl;
+ if (entry.get_entry_tid() == 0) {
+ // first entry in new tag -- can promote to active
+ prune_active_tag(entry.get_tag_tid());
+ return true;
+ } else {
+ // prune current active and wait for initial entry for new tag
+ prune_active_tag(boost::none);
+ continue;
+ }
+ } else {
+ ldout(m_cct, 20) << __func__ << ": "
+ << "object_num=" << object_num << ", "
+ << "entry: " << entry << dendl;
+ ceph_assert(entry.get_tag_tid() == *m_active_tag_tid);
+ return true;
+ }
+ } else {
+ if (!m_active_tag_tid) {
+ // waiting for our first entry
+ ldout(m_cct, 10) << __func__ << ": waiting for first entry: "
+ << "object_num=" << object_num << dendl;
+ return false;
+ } else if (m_prune_tag_tid && *m_prune_tag_tid == *m_active_tag_tid) {
+ ldout(m_cct, 10) << __func__ << ": no more entries" << dendl;
+ return false;
+ } else if (m_watch_enabled && m_watch_prune_active_tag) {
+ // detected current tag is now longer active and we have re-read the
+ // current object but it's still empty, so this tag is done
+ ldout(m_cct, 10) << __func__ << ": assuming no more in-sequence entries: "
+ << "object_num=" << object_num << ", "
+ << "active_tag " << *m_active_tag_tid << dendl;
+ prune_active_tag(boost::none);
+ continue;
+ } else if (object_player->refetch_required()) {
+ // if the active object requires a refetch, don't proceed looking for a
+ // new tag before this process completes
+ ldout(m_cct, 10) << __func__ << ": refetch required: "
+ << "object_num=" << object_num << dendl;
+ return false;
+ } else if (!m_watch_enabled) {
+ // current playback position is empty so this tag is done
+ ldout(m_cct, 10) << __func__ << ": no more in-sequence entries: "
+ << "object_num=" << object_num << ", "
+ << "active_tag=" << *m_active_tag_tid << dendl;
+ prune_active_tag(boost::none);
+ continue;
+ } else if (!m_watch_scheduled) {
+ // no more entries and we don't have an active watch in-progress
+ ldout(m_cct, 10) << __func__ << ": no more entries -- watch required"
+ << dendl;
+ return false;
+ }
+ }
+ }
+ return false;
+}
+
+void JournalPlayer::prune_tag(uint64_t tag_tid) {
+ ceph_assert(m_lock.is_locked());
+ ldout(m_cct, 10) << __func__ << ": pruning remaining entries for tag "
+ << tag_tid << dendl;
+
+ // prune records that are at or below the largest prune tag tid
+ if (!m_prune_tag_tid || *m_prune_tag_tid < tag_tid) {
+ m_prune_tag_tid = tag_tid;
+ }
+
+ bool pruned = false;
+ for (auto &player_pair : m_object_players) {
+ ObjectPlayerPtr object_player(player_pair.second);
+ ldout(m_cct, 15) << __func__ << ": checking " << object_player->get_oid()
+ << dendl;
+ while (!object_player->empty()) {
+ Entry entry;
+ object_player->front(&entry);
+ if (entry.get_tag_tid() == tag_tid) {
+ ldout(m_cct, 20) << __func__ << ": pruned " << entry << dendl;
+ object_player->pop_front();
+ pruned = true;
+ } else {
+ break;
+ }
+ }
+ }
+
+ // avoid watch delay when pruning stale tags from journal objects
+ if (pruned) {
+ ldout(m_cct, 15) << __func__ << ": resetting refetch state to immediate"
+ << dendl;
+ for (auto &player_pair : m_object_players) {
+ ObjectPlayerPtr object_player(player_pair.second);
+ object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE);
+ }
+ }
+
+ // trim empty player to prefetch the next available object
+ for (auto &player_pair : m_object_players) {
+ remove_empty_object_player(player_pair.second);
+ }
+}
+
+void JournalPlayer::prune_active_tag(const boost::optional<uint64_t>& tag_tid) {
+ ceph_assert(m_lock.is_locked());
+ ceph_assert(m_active_tag_tid);
+
+ uint64_t active_tag_tid = *m_active_tag_tid;
+ if (tag_tid) {
+ m_active_tag_tid = tag_tid;
+ }
+ m_splay_offset = 0;
+ m_watch_step = WATCH_STEP_FETCH_CURRENT;
+
+ prune_tag(active_tag_tid);
+}
+
+ObjectPlayerPtr JournalPlayer::get_object_player() const {
+ ceph_assert(m_lock.is_locked());
+
+ SplayedObjectPlayers::const_iterator it = m_object_players.find(
+ m_splay_offset);
+ ceph_assert(it != m_object_players.end());
+ return it->second;
+}
+
+ObjectPlayerPtr JournalPlayer::get_object_player(uint64_t object_number) const {
+ ceph_assert(m_lock.is_locked());
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint8_t splay_offset = object_number % splay_width;
+ auto splay_it = m_object_players.find(splay_offset);
+ ceph_assert(splay_it != m_object_players.end());
+
+ ObjectPlayerPtr object_player = splay_it->second;
+ ceph_assert(object_player->get_object_number() == object_number);
+ return object_player;
+}
+
+void JournalPlayer::advance_splay_object() {
+ ceph_assert(m_lock.is_locked());
+ ++m_splay_offset;
+ m_splay_offset %= m_journal_metadata->get_splay_width();
+ m_watch_step = WATCH_STEP_FETCH_CURRENT;
+ ldout(m_cct, 20) << __func__ << ": new offset "
+ << static_cast<uint32_t>(m_splay_offset) << dendl;
+}
+
+bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) {
+ ceph_assert(m_lock.is_locked());
+ ceph_assert(!m_watch_scheduled);
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint64_t object_set = player->get_object_number() / splay_width;
+ uint64_t active_set = m_journal_metadata->get_active_set();
+ if (!player->empty() || object_set == active_set) {
+ return false;
+ } else if (player->refetch_required()) {
+ ldout(m_cct, 20) << __func__ << ": " << player->get_oid() << " requires "
+ << "a refetch" << dendl;
+ return false;
+ } else if (m_active_set != active_set) {
+ ldout(m_cct, 20) << __func__ << ": new active set detected, all players "
+ << "require refetch" << dendl;
+ m_active_set = active_set;
+ for (auto &pair : m_object_players) {
+ pair.second->set_refetch_state(ObjectPlayer::REFETCH_STATE_IMMEDIATE);
+ }
+ return false;
+ }
+
+ ldout(m_cct, 15) << __func__ << ": " << player->get_oid() << " empty"
+ << dendl;
+
+ m_watch_prune_active_tag = false;
+ m_watch_step = WATCH_STEP_FETCH_CURRENT;
+
+ uint64_t next_object_num = player->get_object_number() + splay_width;
+ fetch(next_object_num);
+ return true;
+}
+
+void JournalPlayer::fetch(uint64_t object_num) {
+ ceph_assert(m_lock.is_locked());
+
+ ObjectPlayerPtr object_player(new ObjectPlayer(
+ m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
+ m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order(),
+ m_journal_metadata->get_settings().max_fetch_bytes));
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ m_object_players[object_num % splay_width] = object_player;
+ fetch(object_player);
+}
+
+void JournalPlayer::fetch(const ObjectPlayerPtr &object_player) {
+ ceph_assert(m_lock.is_locked());
+
+ uint64_t object_num = object_player->get_object_number();
+ std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
+ ceph_assert(m_fetch_object_numbers.count(object_num) == 0);
+ m_fetch_object_numbers.insert(object_num);
+
+ ldout(m_cct, 10) << __func__ << ": " << oid << dendl;
+ C_Fetch *fetch_ctx = new C_Fetch(this, object_num);
+
+ object_player->fetch(fetch_ctx);
+}
+
+void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
+ ldout(m_cct, 10) << __func__ << ": "
+ << utils::get_object_name(m_object_oid_prefix, object_num)
+ << ": r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_fetch_object_numbers.count(object_num) == 1);
+ m_fetch_object_numbers.erase(object_num);
+
+ if (m_shut_down) {
+ return;
+ }
+
+ if (r == 0) {
+ ObjectPlayerPtr object_player = get_object_player(object_num);
+ remove_empty_object_player(object_player);
+ }
+ process_state(object_num, r);
+}
+
+void JournalPlayer::refetch(bool immediate) {
+ ldout(m_cct, 10) << __func__ << dendl;
+ ceph_assert(m_lock.is_locked());
+ m_handler_notified = false;
+
+ // if watching the object, handle the periodic re-fetch
+ if (m_watch_enabled) {
+ schedule_watch(immediate);
+ return;
+ }
+
+ ObjectPlayerPtr object_player = get_object_player();
+ if (object_player->refetch_required()) {
+ object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE);
+ fetch(object_player);
+ return;
+ }
+
+ notify_complete(0);
+}
+
+void JournalPlayer::schedule_watch(bool immediate) {
+ ldout(m_cct, 10) << __func__ << dendl;
+ ceph_assert(m_lock.is_locked());
+ if (m_watch_scheduled) {
+ return;
+ }
+
+ m_watch_scheduled = true;
+
+ if (m_watch_step == WATCH_STEP_ASSERT_ACTIVE) {
+ // detect if a new tag has been created in case we are blocked
+ // by an incomplete tag sequence
+ ldout(m_cct, 20) << __func__ << ": asserting active tag="
+ << *m_active_tag_tid << dendl;
+
+ m_async_op_tracker.start_op();
+ FunctionContext *ctx = new FunctionContext([this](int r) {
+ handle_watch_assert_active(r);
+ });
+ m_journal_metadata->assert_active_tag(*m_active_tag_tid, ctx);
+ return;
+ }
+
+ ObjectPlayerPtr object_player;
+ double watch_interval = m_watch_interval;
+
+ switch (m_watch_step) {
+ case WATCH_STEP_FETCH_CURRENT:
+ {
+ object_player = get_object_player();
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint64_t active_set = m_journal_metadata->get_active_set();
+ uint64_t object_set = object_player->get_object_number() / splay_width;
+ if (immediate ||
+ (object_player->get_refetch_state() ==
+ ObjectPlayer::REFETCH_STATE_IMMEDIATE) ||
+ (object_set < active_set && object_player->refetch_required())) {
+ ldout(m_cct, 20) << __func__ << ": immediately refetching "
+ << object_player->get_oid()
+ << dendl;
+ object_player->set_refetch_state(ObjectPlayer::REFETCH_STATE_NONE);
+ watch_interval = 0;
+ }
+ }
+ break;
+ case WATCH_STEP_FETCH_FIRST:
+ object_player = m_object_players.begin()->second;
+ watch_interval = 0;
+ break;
+ default:
+ ceph_abort();
+ }
+
+ ldout(m_cct, 20) << __func__ << ": scheduling watch on "
+ << object_player->get_oid() << dendl;
+ Context *ctx = utils::create_async_context_callback(
+ m_journal_metadata, new C_Watch(this, object_player->get_object_number()));
+ object_player->watch(ctx, watch_interval);
+}
+
+void JournalPlayer::handle_watch(uint64_t object_num, int r) {
+ ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_watch_scheduled);
+ m_watch_scheduled = false;
+
+ if (m_shut_down || r == -ECANCELED) {
+ // unwatch of object player(s)
+ return;
+ }
+
+ ObjectPlayerPtr object_player = get_object_player(object_num);
+ if (r == 0 && object_player->empty()) {
+ // possibly need to prune this empty object player if we've
+ // already fetched it after the active set was advanced with no
+ // new records
+ remove_empty_object_player(object_player);
+ }
+
+ // determine what object to query on next watch schedule tick
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ if (m_watch_step == WATCH_STEP_FETCH_CURRENT &&
+ object_player->get_object_number() % splay_width != 0) {
+ m_watch_step = WATCH_STEP_FETCH_FIRST;
+ } else if (m_active_tag_tid) {
+ m_watch_step = WATCH_STEP_ASSERT_ACTIVE;
+ } else {
+ m_watch_step = WATCH_STEP_FETCH_CURRENT;
+ }
+
+ process_state(object_num, r);
+}
+
+void JournalPlayer::handle_watch_assert_active(int r) {
+ ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_watch_scheduled);
+ m_watch_scheduled = false;
+
+ if (r == -ESTALE) {
+ // newer tag exists -- since we are at this step in the watch sequence,
+ // we know we can prune the active tag if watch fails again
+ ldout(m_cct, 10) << __func__ << ": tag " << *m_active_tag_tid << " "
+ << "no longer active" << dendl;
+ m_watch_prune_active_tag = true;
+ }
+
+ m_watch_step = WATCH_STEP_FETCH_CURRENT;
+ if (!m_shut_down && m_watch_enabled) {
+ schedule_watch(false);
+ }
+ m_async_op_tracker.finish_op();
+}
+
+void JournalPlayer::notify_entries_available() {
+ ceph_assert(m_lock.is_locked());
+ if (m_handler_notified) {
+ return;
+ }
+ m_handler_notified = true;
+
+ ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
+ m_journal_metadata->queue(new C_HandleEntriesAvailable(
+ m_replay_handler), 0);
+}
+
+void JournalPlayer::notify_complete(int r) {
+ ceph_assert(m_lock.is_locked());
+ m_handler_notified = true;
+
+ ldout(m_cct, 10) << __func__ << ": replay complete: r=" << r << dendl;
+ m_journal_metadata->queue(new C_HandleComplete(
+ m_replay_handler), r);
+}
+
+} // namespace journal
diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h
new file mode 100644
index 00000000..09029061
--- /dev/null
+++ b/src/journal/JournalPlayer.h
@@ -0,0 +1,157 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNAL_PLAYER_H
+#define CEPH_JOURNAL_JOURNAL_PLAYER_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/AsyncOpTracker.h"
+#include "common/Mutex.h"
+#include "journal/JournalMetadata.h"
+#include "journal/ObjectPlayer.h"
+#include "cls/journal/cls_journal_types.h"
+#include <boost/none.hpp>
+#include <boost/optional.hpp>
+#include <map>
+
+class SafeTimer;
+
+namespace journal {
+
+class Entry;
+class ReplayHandler;
+
+class JournalPlayer {
+public:
+ typedef cls::journal::ObjectPosition ObjectPosition;
+ typedef cls::journal::ObjectPositions ObjectPositions;
+ typedef cls::journal::ObjectSetPosition ObjectSetPosition;
+
+ JournalPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
+ const JournalMetadataPtr& journal_metadata,
+ ReplayHandler *replay_handler);
+ ~JournalPlayer();
+
+ void prefetch();
+ void prefetch_and_watch(double interval);
+ void shut_down(Context *on_finish);
+
+ bool try_pop_front(Entry *entry, uint64_t *commit_tid);
+
+private:
+ typedef std::set<uint8_t> PrefetchSplayOffsets;
+ typedef std::map<uint8_t, ObjectPlayerPtr> SplayedObjectPlayers;
+ typedef std::map<uint8_t, ObjectPosition> SplayedObjectPositions;
+ typedef std::set<uint64_t> ObjectNumbers;
+
+ enum State {
+ STATE_INIT,
+ STATE_PREFETCH,
+ STATE_PLAYBACK,
+ STATE_ERROR
+ };
+
+ enum WatchStep {
+ WATCH_STEP_FETCH_CURRENT,
+ WATCH_STEP_FETCH_FIRST,
+ WATCH_STEP_ASSERT_ACTIVE
+ };
+
+ struct C_Fetch : public Context {
+ JournalPlayer *player;
+ uint64_t object_num;
+ C_Fetch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) {
+ player->m_async_op_tracker.start_op();
+ }
+ ~C_Fetch() override {
+ player->m_async_op_tracker.finish_op();
+ }
+ void finish(int r) override {
+ player->handle_fetched(object_num, r);
+ }
+ };
+
+ struct C_Watch : public Context {
+ JournalPlayer *player;
+ uint64_t object_num;
+ C_Watch(JournalPlayer *player, uint64_t object_num)
+ : player(player), object_num(object_num) {
+ player->m_async_op_tracker.start_op();
+ }
+ ~C_Watch() override {
+ player->m_async_op_tracker.finish_op();
+ }
+
+ void finish(int r) override {
+ player->handle_watch(object_num, r);
+ }
+ };
+
+ librados::IoCtx m_ioctx;
+ CephContext *m_cct;
+ std::string m_object_oid_prefix;
+ JournalMetadataPtr m_journal_metadata;
+
+ ReplayHandler *m_replay_handler;
+
+ AsyncOpTracker m_async_op_tracker;
+
+ mutable Mutex m_lock;
+ State m_state;
+ uint8_t m_splay_offset;
+
+ bool m_watch_enabled;
+ bool m_watch_scheduled;
+ double m_watch_interval;
+ WatchStep m_watch_step = WATCH_STEP_FETCH_CURRENT;
+ bool m_watch_prune_active_tag = false;
+
+ bool m_shut_down = false;
+ bool m_handler_notified = false;
+
+ ObjectNumbers m_fetch_object_numbers;
+
+ PrefetchSplayOffsets m_prefetch_splay_offsets;
+ SplayedObjectPlayers m_object_players;
+
+ bool m_commit_position_valid = false;
+ ObjectPosition m_commit_position;
+ SplayedObjectPositions m_commit_positions;
+ uint64_t m_active_set = 0;
+
+ boost::optional<uint64_t> m_active_tag_tid = boost::none;
+ boost::optional<uint64_t> m_prune_tag_tid = boost::none;
+
+ void advance_splay_object();
+
+ bool is_object_set_ready() const;
+ bool verify_playback_ready();
+ void prune_tag(uint64_t tag_tid);
+ void prune_active_tag(const boost::optional<uint64_t>& tag_tid);
+
+ ObjectPlayerPtr get_object_player() const;
+ ObjectPlayerPtr get_object_player(uint64_t object_number) const;
+ bool remove_empty_object_player(const ObjectPlayerPtr &object_player);
+
+ void process_state(uint64_t object_number, int r);
+ int process_prefetch(uint64_t object_number);
+ int process_playback(uint64_t object_number);
+
+ void fetch(uint64_t object_num);
+ void fetch(const ObjectPlayerPtr &object_player);
+ void handle_fetched(uint64_t object_num, int r);
+ void refetch(bool immediate);
+
+ void schedule_watch(bool immediate);
+ void handle_watch(uint64_t object_num, int r);
+ void handle_watch_assert_active(int r);
+
+ void notify_entries_available();
+ void notify_complete(int r);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNAL_PLAYER_H
diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc
new file mode 100644
index 00000000..aa90660a
--- /dev/null
+++ b/src/journal/JournalRecorder.cc
@@ -0,0 +1,409 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalRecorder.h"
+#include "common/errno.h"
+#include "journal/Entry.h"
+#include "journal/Utils.h"
+
+#include <atomic>
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "JournalRecorder: " << this << " " << __func__ \
+ << ": "
+
+using std::shared_ptr;
+
+namespace journal {
+
+namespace {
+
+struct C_Flush : public Context {
+ JournalMetadataPtr journal_metadata;
+ Context *on_finish;
+ std::atomic<int64_t> pending_flushes = { 0 };
+ int ret_val;
+
+ C_Flush(JournalMetadataPtr _journal_metadata, Context *_on_finish,
+ size_t _pending_flushes)
+ : journal_metadata(_journal_metadata), on_finish(_on_finish),
+ pending_flushes(_pending_flushes), ret_val(0) {
+ }
+
+ void complete(int r) override {
+ if (r < 0 && ret_val == 0) {
+ ret_val = r;
+ }
+ if (--pending_flushes == 0) {
+ // ensure all prior callback have been flushed as well
+ journal_metadata->queue(on_finish, ret_val);
+ delete this;
+ }
+ }
+ void finish(int r) override {
+ }
+};
+
+} // anonymous namespace
+
+JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
+ const std::string &object_oid_prefix,
+ const JournalMetadataPtr& journal_metadata,
+ uint64_t max_in_flight_appends)
+ : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
+ m_journal_metadata(journal_metadata),
+ m_max_in_flight_appends(max_in_flight_appends), m_listener(this),
+ m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
+ m_current_set(m_journal_metadata->get_active_set()) {
+
+ Mutex::Locker locker(m_lock);
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+ shared_ptr<Mutex> object_lock(new Mutex(
+ "ObjectRecorder::m_lock::" + std::to_string(splay_offset)));
+ m_object_locks.push_back(object_lock);
+
+ uint64_t object_number = splay_offset + (m_current_set * splay_width);
+ Mutex::Locker locker(*object_lock);
+ m_object_ptrs[splay_offset] = create_object_recorder(
+ object_number, m_object_locks[splay_offset]);
+ }
+
+ m_journal_metadata->add_listener(&m_listener);
+}
+
+JournalRecorder::~JournalRecorder() {
+ m_journal_metadata->remove_listener(&m_listener);
+
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_in_flight_advance_sets == 0);
+ ceph_assert(m_in_flight_object_closes == 0);
+}
+
+void JournalRecorder::shut_down(Context *on_safe) {
+ on_safe = new FunctionContext(
+ [this, on_safe](int r) {
+ Context *ctx = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_in_flight_advance_sets != 0) {
+ ceph_assert(m_on_object_set_advanced == nullptr);
+ m_on_object_set_advanced = new FunctionContext(
+ [on_safe, r](int) {
+ on_safe->complete(r);
+ });
+ } else {
+ ctx = on_safe;
+ }
+ }
+ if (ctx != nullptr) {
+ ctx->complete(r);
+ }
+ });
+ flush(on_safe);
+}
+
+void JournalRecorder::set_append_batch_options(int flush_interval,
+ uint64_t flush_bytes,
+ double flush_age) {
+ ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
+ << "flush_bytes=" << flush_bytes << ", "
+ << "flush_age=" << flush_age << dendl;
+
+ Mutex::Locker locker(m_lock);
+ m_flush_interval = flush_interval;
+ m_flush_bytes = flush_bytes;
+ m_flush_age = flush_age;
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+ Mutex::Locker object_locker(*m_object_locks[splay_offset]);
+ auto object_recorder = get_object(splay_offset);
+ object_recorder->set_append_batch_options(flush_interval, flush_bytes,
+ flush_age);
+ }
+}
+
+Future JournalRecorder::append(uint64_t tag_tid,
+ const bufferlist &payload_bl) {
+ ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl;
+
+ m_lock.Lock();
+
+ uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid);
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint8_t splay_offset = entry_tid % splay_width;
+
+ ObjectRecorderPtr object_ptr = get_object(splay_offset);
+ uint64_t commit_tid = m_journal_metadata->allocate_commit_tid(
+ object_ptr->get_object_number(), tag_tid, entry_tid);
+ FutureImplPtr future(new FutureImpl(tag_tid, entry_tid, commit_tid));
+ future->init(m_prev_future);
+ m_prev_future = future;
+
+ m_object_locks[splay_offset]->Lock();
+ m_lock.Unlock();
+
+ bufferlist entry_bl;
+ encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl),
+ entry_bl);
+ ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size());
+
+ bool object_full = object_ptr->append({{future, entry_bl}});
+ m_object_locks[splay_offset]->Unlock();
+
+ if (object_full) {
+ ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
+ << dendl;
+ Mutex::Locker l(m_lock);
+ close_and_advance_object_set(object_ptr->get_object_number() / splay_width);
+ }
+ return Future(future);
+}
+
+void JournalRecorder::flush(Context *on_safe) {
+ ldout(m_cct, 20) << dendl;
+
+ C_Flush *ctx;
+ {
+ Mutex::Locker locker(m_lock);
+
+ ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1);
+ for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
+ it != m_object_ptrs.end(); ++it) {
+ it->second->flush(ctx);
+ }
+
+ }
+
+ // avoid holding the lock in case there is nothing to flush
+ ctx->complete(0);
+}
+
+ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
+ ceph_assert(m_lock.is_locked());
+
+ ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset];
+ ceph_assert(object_recoder != NULL);
+ return object_recoder;
+}
+
+void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
+ ceph_assert(m_lock.is_locked());
+
+ // entry overflow from open object
+ if (m_current_set != object_set) {
+ ldout(m_cct, 20) << "close already in-progress" << dendl;
+ return;
+ }
+
+ // we shouldn't overflow upon append if already closed and we
+ // shouldn't receive an overflowed callback if already closed
+ ceph_assert(m_in_flight_advance_sets == 0);
+ ceph_assert(m_in_flight_object_closes == 0);
+
+ uint64_t active_set = m_journal_metadata->get_active_set();
+ ceph_assert(m_current_set == active_set);
+ ++m_current_set;
+ ++m_in_flight_advance_sets;
+
+ ldout(m_cct, 10) << "closing active object set " << object_set << dendl;
+ if (close_object_set(m_current_set)) {
+ advance_object_set();
+ }
+}
+
+void JournalRecorder::advance_object_set() {
+ ceph_assert(m_lock.is_locked());
+
+ ceph_assert(m_in_flight_object_closes == 0);
+ ldout(m_cct, 10) << "advance to object set " << m_current_set << dendl;
+ m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet(
+ this));
+}
+
+void JournalRecorder::handle_advance_object_set(int r) {
+ Context *on_object_set_advanced = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
+
+ ceph_assert(m_in_flight_advance_sets > 0);
+ --m_in_flight_advance_sets;
+
+ if (r < 0 && r != -ESTALE) {
+ lderr(m_cct) << "failed to advance object set: " << cpp_strerror(r)
+ << dendl;
+ }
+
+ if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
+ open_object_set();
+ std::swap(on_object_set_advanced, m_on_object_set_advanced);
+ }
+ }
+ if (on_object_set_advanced != nullptr) {
+ on_object_set_advanced->complete(0);
+ }
+}
+
+void JournalRecorder::open_object_set() {
+ ceph_assert(m_lock.is_locked());
+
+ ldout(m_cct, 10) << "opening object set " << m_current_set << dendl;
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+
+ lock_object_recorders();
+ for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
+ it != m_object_ptrs.end(); ++it) {
+ ObjectRecorderPtr object_recorder = it->second;
+ uint64_t object_number = object_recorder->get_object_number();
+ if (object_number / splay_width != m_current_set) {
+ ceph_assert(object_recorder->is_closed());
+
+ // ready to close object and open object in active set
+ create_next_object_recorder(object_recorder);
+ }
+ }
+ unlock_object_recorders();
+}
+
+bool JournalRecorder::close_object_set(uint64_t active_set) {
+ ldout(m_cct, 10) << "active_set=" << active_set << dendl;
+ ceph_assert(m_lock.is_locked());
+
+ // object recorders will invoke overflow handler as they complete
+ // closing the object to ensure correct order of future appends
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ lock_object_recorders();
+ for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
+ it != m_object_ptrs.end(); ++it) {
+ ObjectRecorderPtr object_recorder = it->second;
+ if (object_recorder->get_object_number() / splay_width != active_set) {
+ ldout(m_cct, 10) << "closing object " << object_recorder->get_oid()
+ << dendl;
+ // flush out all queued appends and hold future appends
+ if (!object_recorder->close()) {
+ ++m_in_flight_object_closes;
+ } else {
+ ldout(m_cct, 10) << "object " << object_recorder->get_oid() << " closed"
+ << dendl;
+ }
+ }
+ }
+ unlock_object_recorders();
+ return (m_in_flight_object_closes == 0);
+}
+
+ObjectRecorderPtr JournalRecorder::create_object_recorder(
+ uint64_t object_number, shared_ptr<Mutex> lock) {
+ ldout(m_cct, 10) << "object_number=" << object_number << dendl;
+ ObjectRecorderPtr object_recorder(new ObjectRecorder(
+ m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
+ object_number, lock, m_journal_metadata->get_work_queue(),
+ &m_object_handler, m_journal_metadata->get_order(),
+ m_max_in_flight_appends));
+ object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes,
+ m_flush_age);
+ return object_recorder;
+}
+
+void JournalRecorder::create_next_object_recorder(
+ ObjectRecorderPtr object_recorder) {
+ ceph_assert(m_lock.is_locked());
+
+ uint64_t object_number = object_recorder->get_object_number();
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint8_t splay_offset = object_number % splay_width;
+ ldout(m_cct, 10) << "object_number=" << object_number << dendl;
+
+ ceph_assert(m_object_locks[splay_offset]->is_locked());
+
+ ObjectRecorderPtr new_object_recorder = create_object_recorder(
+ (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]);
+
+ ldout(m_cct, 10) << "old oid=" << object_recorder->get_oid() << ", "
+ << "new oid=" << new_object_recorder->get_oid() << dendl;
+ AppendBuffers append_buffers;
+ object_recorder->claim_append_buffers(&append_buffers);
+
+ // update the commit record to point to the correct object number
+ for (auto &append_buffer : append_buffers) {
+ m_journal_metadata->overflow_commit_tid(
+ append_buffer.first->get_commit_tid(),
+ new_object_recorder->get_object_number());
+ }
+
+ new_object_recorder->append(std::move(append_buffers));
+ m_object_ptrs[splay_offset] = new_object_recorder;
+}
+
+void JournalRecorder::handle_update() {
+ Mutex::Locker locker(m_lock);
+
+ uint64_t active_set = m_journal_metadata->get_active_set();
+ if (m_current_set < active_set) {
+ // peer journal client advanced the active set
+ ldout(m_cct, 10) << "current_set=" << m_current_set << ", "
+ << "active_set=" << active_set << dendl;
+
+ uint64_t current_set = m_current_set;
+ m_current_set = active_set;
+ if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
+ ldout(m_cct, 10) << "closing current object set " << current_set << dendl;
+ if (close_object_set(active_set)) {
+ open_object_set();
+ }
+ }
+ }
+}
+
+void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
+ ldout(m_cct, 10) << object_recorder->get_oid() << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ uint64_t object_number = object_recorder->get_object_number();
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint8_t splay_offset = object_number % splay_width;
+ ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
+ ceph_assert(active_object_recorder->get_object_number() == object_number);
+
+ ceph_assert(m_in_flight_object_closes > 0);
+ --m_in_flight_object_closes;
+
+ // object closed after advance active set committed
+ ldout(m_cct, 10) << "object " << active_object_recorder->get_oid()
+ << " closed" << dendl;
+ if (m_in_flight_object_closes == 0) {
+ if (m_in_flight_advance_sets == 0) {
+ // peer forced closing of object set
+ open_object_set();
+ } else {
+ // local overflow advanced object set
+ advance_object_set();
+ }
+ }
+}
+
+void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
+ ldout(m_cct, 10) << object_recorder->get_oid() << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ uint64_t object_number = object_recorder->get_object_number();
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint8_t splay_offset = object_number % splay_width;
+ ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
+ ceph_assert(active_object_recorder->get_object_number() == object_number);
+
+ ldout(m_cct, 10) << "object " << active_object_recorder->get_oid()
+ << " overflowed" << dendl;
+ close_and_advance_object_set(object_number / splay_width);
+}
+
+} // namespace journal
diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h
new file mode 100644
index 00000000..382f75ac
--- /dev/null
+++ b/src/journal/JournalRecorder.h
@@ -0,0 +1,137 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
+#define CEPH_JOURNAL_JOURNAL_RECORDER_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "journal/Future.h"
+#include "journal/FutureImpl.h"
+#include "journal/JournalMetadata.h"
+#include "journal/ObjectRecorder.h"
+#include <map>
+#include <string>
+
+class SafeTimer;
+
+namespace journal {
+
+class JournalRecorder {
+public:
+ JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
+ const JournalMetadataPtr &journal_metadata,
+ uint64_t max_in_flight_appends);
+ ~JournalRecorder();
+
+ void shut_down(Context *on_safe);
+
+ void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+ double flush_age);
+
+ Future append(uint64_t tag_tid, const bufferlist &bl);
+ void flush(Context *on_safe);
+
+ ObjectRecorderPtr get_object(uint8_t splay_offset);
+
+private:
+ typedef std::map<uint8_t, ObjectRecorderPtr> ObjectRecorderPtrs;
+
+ struct Listener : public JournalMetadataListener {
+ JournalRecorder *journal_recorder;
+
+ Listener(JournalRecorder *_journal_recorder)
+ : journal_recorder(_journal_recorder) {}
+
+ void handle_update(JournalMetadata *) override {
+ journal_recorder->handle_update();
+ }
+ };
+
+ struct ObjectHandler : public ObjectRecorder::Handler {
+ JournalRecorder *journal_recorder;
+
+ ObjectHandler(JournalRecorder *_journal_recorder)
+ : journal_recorder(_journal_recorder) {
+ }
+
+ void closed(ObjectRecorder *object_recorder) override {
+ journal_recorder->handle_closed(object_recorder);
+ }
+ void overflow(ObjectRecorder *object_recorder) override {
+ journal_recorder->handle_overflow(object_recorder);
+ }
+ };
+
+ struct C_AdvanceObjectSet : public Context {
+ JournalRecorder *journal_recorder;
+
+ C_AdvanceObjectSet(JournalRecorder *_journal_recorder)
+ : journal_recorder(_journal_recorder) {
+ }
+ void finish(int r) override {
+ journal_recorder->handle_advance_object_set(r);
+ }
+ };
+
+ librados::IoCtx m_ioctx;
+ CephContext *m_cct;
+ std::string m_object_oid_prefix;
+
+ JournalMetadataPtr m_journal_metadata;
+
+ uint32_t m_flush_interval = 0;
+ uint64_t m_flush_bytes = 0;
+ double m_flush_age = 0;
+ uint64_t m_max_in_flight_appends;
+
+ Listener m_listener;
+ ObjectHandler m_object_handler;
+
+ Mutex m_lock;
+
+ uint32_t m_in_flight_advance_sets = 0;
+ uint32_t m_in_flight_object_closes = 0;
+ uint64_t m_current_set;
+ ObjectRecorderPtrs m_object_ptrs;
+ std::vector<std::shared_ptr<Mutex>> m_object_locks;
+
+ FutureImplPtr m_prev_future;
+
+ Context *m_on_object_set_advanced = nullptr;
+
+ void open_object_set();
+ bool close_object_set(uint64_t active_set);
+
+ void advance_object_set();
+ void handle_advance_object_set(int r);
+
+ void close_and_advance_object_set(uint64_t object_set);
+
+ ObjectRecorderPtr create_object_recorder(uint64_t object_number,
+ std::shared_ptr<Mutex> lock);
+ void create_next_object_recorder(ObjectRecorderPtr object_recorder);
+
+ void handle_update();
+
+ void handle_closed(ObjectRecorder *object_recorder);
+ void handle_overflow(ObjectRecorder *object_recorder);
+
+ void lock_object_recorders() {
+ for (auto& lock : m_object_locks) {
+ lock->Lock();
+ }
+ }
+
+ void unlock_object_recorders() {
+ for (auto& lock : m_object_locks) {
+ lock->Unlock();
+ }
+ }
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNAL_RECORDER_H
diff --git a/src/journal/JournalTrimmer.cc b/src/journal/JournalTrimmer.cc
new file mode 100644
index 00000000..645a6230
--- /dev/null
+++ b/src/journal/JournalTrimmer.cc
@@ -0,0 +1,248 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalTrimmer.h"
+#include "journal/Utils.h"
+#include "common/Cond.h"
+#include "common/errno.h"
+#include <limits>
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "JournalTrimmer: " << this << " "
+
+namespace journal {
+
+struct JournalTrimmer::C_RemoveSet : public Context {
+ JournalTrimmer *journal_trimmer;
+ uint64_t object_set;
+ Mutex lock;
+ uint32_t refs;
+ int return_value;
+
+ C_RemoveSet(JournalTrimmer *_journal_trimmer, uint64_t _object_set,
+ uint8_t _splay_width);
+ void complete(int r) override;
+ void finish(int r) override {
+ journal_trimmer->handle_set_removed(r, object_set);
+ journal_trimmer->m_async_op_tracker.finish_op();
+ }
+};
+
+JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx,
+ const std::string &object_oid_prefix,
+ const JournalMetadataPtr &journal_metadata)
+ : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
+ m_journal_metadata(journal_metadata), m_metadata_listener(this),
+ m_lock("JournalTrimmer::m_lock"), m_remove_set_pending(false),
+ m_remove_set(0), m_remove_set_ctx(NULL) {
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+
+ m_journal_metadata->add_listener(&m_metadata_listener);
+}
+
+JournalTrimmer::~JournalTrimmer() {
+ ceph_assert(m_shutdown);
+}
+
+void JournalTrimmer::shut_down(Context *on_finish) {
+ ldout(m_cct, 20) << __func__ << dendl;
+ {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_shutdown);
+ m_shutdown = true;
+ }
+
+ m_journal_metadata->remove_listener(&m_metadata_listener);
+
+ // chain the shut down sequence (reverse order)
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ m_async_op_tracker.wait_for_ops(on_finish);
+ });
+ m_journal_metadata->flush_commit_position(on_finish);
+}
+
+void JournalTrimmer::remove_objects(bool force, Context *on_finish) {
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ on_finish = new FunctionContext([this, force, on_finish](int r) {
+ Mutex::Locker locker(m_lock);
+
+ if (m_remove_set_pending) {
+ on_finish->complete(-EBUSY);
+ }
+
+ if (!force) {
+ JournalMetadata::RegisteredClients registered_clients;
+ m_journal_metadata->get_registered_clients(&registered_clients);
+
+ if (registered_clients.size() == 0) {
+ on_finish->complete(-EINVAL);
+ return;
+ } else if (registered_clients.size() > 1) {
+ on_finish->complete(-EBUSY);
+ return;
+ }
+ }
+
+ m_remove_set = std::numeric_limits<uint64_t>::max();
+ m_remove_set_pending = true;
+ m_remove_set_ctx = on_finish;
+
+ remove_set(m_journal_metadata->get_minimum_set());
+ });
+
+ m_async_op_tracker.wait_for_ops(on_finish);
+}
+
+void JournalTrimmer::committed(uint64_t commit_tid) {
+ ldout(m_cct, 20) << __func__ << ": commit_tid=" << commit_tid << dendl;
+ m_journal_metadata->committed(commit_tid,
+ m_create_commit_position_safe_context);
+}
+
+void JournalTrimmer::trim_objects(uint64_t minimum_set) {
+ ceph_assert(m_lock.is_locked());
+
+ ldout(m_cct, 20) << __func__ << ": min_set=" << minimum_set << dendl;
+ if (minimum_set <= m_journal_metadata->get_minimum_set()) {
+ return;
+ }
+
+ if (m_remove_set_pending) {
+ m_remove_set = std::max(m_remove_set, minimum_set);
+ return;
+ }
+
+ m_remove_set = minimum_set;
+ m_remove_set_pending = true;
+ remove_set(m_journal_metadata->get_minimum_set());
+}
+
+void JournalTrimmer::remove_set(uint64_t object_set) {
+ ceph_assert(m_lock.is_locked());
+
+ m_async_op_tracker.start_op();
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ C_RemoveSet *ctx = new C_RemoveSet(this, object_set, splay_width);
+
+ ldout(m_cct, 20) << __func__ << ": removing object set " << object_set
+ << dendl;
+ for (uint64_t object_number = object_set * splay_width;
+ object_number < (object_set + 1) * splay_width;
+ ++object_number) {
+ std::string oid = utils::get_object_name(m_object_oid_prefix,
+ object_number);
+
+ ldout(m_cct, 20) << "removing journal object " << oid << dendl;
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, NULL,
+ utils::rados_ctx_callback);
+ int r = m_ioctx.aio_remove(oid, comp,
+ CEPH_OSD_FLAG_FULL_FORCE | CEPH_OSD_FLAG_FULL_TRY);
+ ceph_assert(r == 0);
+ comp->release();
+ }
+}
+
+void JournalTrimmer::handle_metadata_updated() {
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ JournalMetadata::RegisteredClients registered_clients;
+ m_journal_metadata->get_registered_clients(&registered_clients);
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint64_t minimum_set = m_journal_metadata->get_minimum_set();
+ uint64_t active_set = m_journal_metadata->get_active_set();
+ uint64_t minimum_commit_set = active_set;
+ std::string minimum_client_id;
+
+ for (auto &client : registered_clients) {
+ if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
+ continue;
+ }
+
+ if (client.commit_position.object_positions.empty()) {
+ // client hasn't recorded any commits
+ minimum_commit_set = minimum_set;
+ minimum_client_id = client.id;
+ break;
+ }
+
+ for (auto &position : client.commit_position.object_positions) {
+ uint64_t object_set = position.object_number / splay_width;
+ if (object_set < minimum_commit_set) {
+ minimum_client_id = client.id;
+ minimum_commit_set = object_set;
+ }
+ }
+ }
+
+ if (minimum_commit_set > minimum_set) {
+ trim_objects(minimum_commit_set);
+ } else {
+ ldout(m_cct, 20) << "object set " << minimum_commit_set << " still "
+ << "in-use by client " << minimum_client_id << dendl;
+ }
+}
+
+void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) {
+ ldout(m_cct, 20) << __func__ << ": r=" << r << ", set=" << object_set << ", "
+ << "trim=" << m_remove_set << dendl;
+
+ Mutex::Locker locker(m_lock);
+ m_remove_set_pending = false;
+
+ if (r == -ENOENT) {
+ // no objects within the set existed
+ r = 0;
+ }
+ if (r == 0) {
+ // advance the minimum set to the next set
+ m_journal_metadata->set_minimum_set(object_set + 1);
+ uint64_t active_set = m_journal_metadata->get_active_set();
+ uint64_t minimum_set = m_journal_metadata->get_minimum_set();
+
+ if (m_remove_set > minimum_set && minimum_set <= active_set) {
+ m_remove_set_pending = true;
+ remove_set(minimum_set);
+ }
+ }
+
+ if (m_remove_set_ctx != nullptr && !m_remove_set_pending) {
+ ldout(m_cct, 20) << "completing remove set context" << dendl;
+ m_remove_set_ctx->complete(r);
+ m_remove_set_ctx = nullptr;
+ }
+}
+
+JournalTrimmer::C_RemoveSet::C_RemoveSet(JournalTrimmer *_journal_trimmer,
+ uint64_t _object_set,
+ uint8_t _splay_width)
+ : journal_trimmer(_journal_trimmer), object_set(_object_set),
+ lock(utils::unique_lock_name("C_RemoveSet::lock", this)),
+ refs(_splay_width), return_value(-ENOENT) {
+}
+
+void JournalTrimmer::C_RemoveSet::complete(int r) {
+ lock.Lock();
+ if (r < 0 && r != -ENOENT &&
+ (return_value == -ENOENT || return_value == 0)) {
+ return_value = r;
+ } else if (r == 0 && return_value == -ENOENT) {
+ return_value = 0;
+ }
+
+ if (--refs == 0) {
+ finish(return_value);
+ lock.Unlock();
+ delete this;
+ } else {
+ lock.Unlock();
+ }
+}
+
+} // namespace journal
diff --git a/src/journal/JournalTrimmer.h b/src/journal/JournalTrimmer.h
new file mode 100644
index 00000000..0b279239
--- /dev/null
+++ b/src/journal/JournalTrimmer.h
@@ -0,0 +1,94 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNAL_TRIMMER_H
+#define CEPH_JOURNAL_JOURNAL_TRIMMER_H
+
+#include "include/int_types.h"
+#include "include/rados/librados.hpp"
+#include "include/Context.h"
+#include "common/AsyncOpTracker.h"
+#include "common/Mutex.h"
+#include "journal/JournalMetadata.h"
+#include "cls/journal/cls_journal_types.h"
+#include <functional>
+
+struct Context;
+
+namespace journal {
+
+class JournalTrimmer {
+public:
+ typedef cls::journal::ObjectSetPosition ObjectSetPosition;
+
+ JournalTrimmer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
+ const JournalMetadataPtr &journal_metadata);
+ ~JournalTrimmer();
+
+ void shut_down(Context *on_finish);
+
+ void remove_objects(bool force, Context *on_finish);
+ void committed(uint64_t commit_tid);
+
+private:
+ typedef std::function<Context*()> CreateContext;
+
+ struct MetadataListener : public JournalMetadataListener {
+ JournalTrimmer *journal_trimmer;
+
+ MetadataListener(JournalTrimmer *journal_trimmer)
+ : journal_trimmer(journal_trimmer) {
+ }
+ void handle_update(JournalMetadata *) override {
+ journal_trimmer->handle_metadata_updated();
+ }
+ };
+
+ struct C_CommitPositionSafe : public Context {
+ JournalTrimmer *journal_trimmer;
+
+ C_CommitPositionSafe(JournalTrimmer *_journal_trimmer)
+ : journal_trimmer(_journal_trimmer) {
+ journal_trimmer->m_async_op_tracker.start_op();
+ }
+ ~C_CommitPositionSafe() override {
+ journal_trimmer->m_async_op_tracker.finish_op();
+ }
+
+ void finish(int r) override {
+ }
+ };
+
+ struct C_RemoveSet;
+
+ librados::IoCtx m_ioctx;
+ CephContext *m_cct;
+ std::string m_object_oid_prefix;
+
+ JournalMetadataPtr m_journal_metadata;
+ MetadataListener m_metadata_listener;
+
+ AsyncOpTracker m_async_op_tracker;
+
+ Mutex m_lock;
+
+ bool m_remove_set_pending;
+ uint64_t m_remove_set;
+ Context *m_remove_set_ctx;
+
+ bool m_shutdown = false;
+
+ CreateContext m_create_commit_position_safe_context = [this]() {
+ return new C_CommitPositionSafe(this);
+ };
+
+ void trim_objects(uint64_t minimum_set);
+ void remove_set(uint64_t object_set);
+
+ void handle_metadata_updated();
+ void handle_set_removed(int r, uint64_t object_set);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNAL_TRIMMER_H
diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc
new file mode 100644
index 00000000..65435ae9
--- /dev/null
+++ b/src/journal/Journaler.cc
@@ -0,0 +1,468 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Journaler.h"
+#include "include/stringify.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+#include "journal/Entry.h"
+#include "journal/FutureImpl.h"
+#include "journal/JournalMetadata.h"
+#include "journal/JournalPlayer.h"
+#include "journal/JournalRecorder.h"
+#include "journal/JournalTrimmer.h"
+#include "journal/ReplayEntry.h"
+#include "journal/ReplayHandler.h"
+#include "cls/journal/cls_journal_client.h"
+#include "cls/journal/cls_journal_types.h"
+#include "Utils.h"
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "Journaler: " << this << " "
+
+namespace journal {
+
+namespace {
+
+static const std::string JOURNAL_HEADER_PREFIX = "journal.";
+static const std::string JOURNAL_OBJECT_PREFIX = "journal_data.";
+
+} // anonymous namespace
+
+using namespace cls::journal;
+using utils::rados_ctx_callback;
+
+std::string Journaler::header_oid(const std::string &journal_id) {
+ return JOURNAL_HEADER_PREFIX + journal_id;
+}
+
+std::string Journaler::object_oid_prefix(int pool_id,
+ const std::string &journal_id) {
+ return JOURNAL_OBJECT_PREFIX + stringify(pool_id) + "." + journal_id + ".";
+}
+
+Journaler::Threads::Threads(CephContext *cct)
+ : timer_lock("Journaler::timer_lock") {
+ thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1);
+ thread_pool->start();
+
+ work_queue = new ContextWQ("Journaler::work_queue", 60, thread_pool);
+
+ timer = new SafeTimer(cct, timer_lock, true);
+ timer->init();
+}
+
+Journaler::Threads::~Threads() {
+ {
+ Mutex::Locker timer_locker(timer_lock);
+ timer->shutdown();
+ }
+ delete timer;
+ timer = nullptr;
+
+ work_queue->drain();
+ delete work_queue;
+ work_queue = nullptr;
+
+ thread_pool->stop();
+ delete thread_pool;
+ thread_pool = nullptr;
+}
+
+Journaler::Journaler(librados::IoCtx &header_ioctx,
+ const std::string &journal_id,
+ const std::string &client_id, const Settings &settings)
+ : m_threads(new Threads(reinterpret_cast<CephContext*>(header_ioctx.cct()))),
+ m_client_id(client_id) {
+ set_up(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
+ header_ioctx, journal_id, settings);
+}
+
+Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer,
+ Mutex *timer_lock, librados::IoCtx &header_ioctx,
+ const std::string &journal_id,
+ const std::string &client_id, const Settings &settings)
+ : m_client_id(client_id) {
+ set_up(work_queue, timer, timer_lock, header_ioctx, journal_id,
+ settings);
+}
+
+void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer,
+ Mutex *timer_lock, librados::IoCtx &header_ioctx,
+ const std::string &journal_id,
+ const Settings &settings) {
+ m_header_ioctx.dup(header_ioctx);
+ m_cct = reinterpret_cast<CephContext *>(m_header_ioctx.cct());
+
+ m_header_oid = header_oid(journal_id);
+ m_object_oid_prefix = object_oid_prefix(m_header_ioctx.get_id(), journal_id);
+
+ m_metadata = new JournalMetadata(work_queue, timer, timer_lock,
+ m_header_ioctx, m_header_oid, m_client_id,
+ settings);
+ m_metadata->get();
+}
+
+Journaler::~Journaler() {
+ if (m_metadata != nullptr) {
+ ceph_assert(!m_metadata->is_initialized());
+ if (!m_initialized) {
+ // never initialized -- ensure any in-flight ops are complete
+ // since we wouldn't expect shut_down to be invoked
+ m_metadata->wait_for_ops();
+ }
+ m_metadata->put();
+ m_metadata = nullptr;
+ }
+ ceph_assert(m_trimmer == nullptr);
+ ceph_assert(m_player == nullptr);
+ ceph_assert(m_recorder == nullptr);
+
+ delete m_threads;
+ m_threads = nullptr;
+}
+
+void Journaler::exists(Context *on_finish) const {
+ librados::ObjectReadOperation op;
+ op.stat(nullptr, nullptr, nullptr);
+
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(on_finish, nullptr, rados_ctx_callback);
+ int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op, nullptr);
+ ceph_assert(r == 0);
+ comp->release();
+}
+
+void Journaler::init(Context *on_init) {
+ m_initialized = true;
+ m_metadata->init(new C_InitJournaler(this, on_init));
+}
+
+int Journaler::init_complete() {
+ int64_t pool_id = m_metadata->get_pool_id();
+
+ if (pool_id < 0 || pool_id == m_header_ioctx.get_id()) {
+ ldout(m_cct, 20) << "using image pool for journal data" << dendl;
+ m_data_ioctx.dup(m_header_ioctx);
+ } else {
+ ldout(m_cct, 20) << "using pool id=" << pool_id << " for journal data"
+ << dendl;
+ librados::Rados rados(m_header_ioctx);
+ int r = rados.ioctx_create2(pool_id, m_data_ioctx);
+ if (r < 0) {
+ if (r == -ENOENT) {
+ ldout(m_cct, 1) << "pool id=" << pool_id << " no longer exists"
+ << dendl;
+ }
+ return r;
+ }
+ }
+ m_trimmer = new JournalTrimmer(m_data_ioctx, m_object_oid_prefix,
+ m_metadata);
+ return 0;
+}
+
+void Journaler::shut_down() {
+ C_SaferCond ctx;
+ shut_down(&ctx);
+ ctx.wait();
+}
+
+void Journaler::shut_down(Context *on_finish) {
+ ceph_assert(m_player == nullptr);
+ ceph_assert(m_recorder == nullptr);
+
+ JournalMetadata *metadata = nullptr;
+ ceph_assert(m_metadata != nullptr);
+ std::swap(metadata, m_metadata);
+ ceph_assert(metadata != nullptr);
+
+ on_finish = new FunctionContext([metadata, on_finish](int r) {
+ metadata->put();
+ on_finish->complete(0);
+ });
+
+ JournalTrimmer *trimmer = nullptr;
+ std::swap(trimmer, m_trimmer);
+ if (!trimmer) {
+ metadata->shut_down(on_finish);
+ return;
+ }
+
+ on_finish = new FunctionContext([trimmer, metadata, on_finish](int r) {
+ delete trimmer;
+ metadata->shut_down(on_finish);
+ });
+ trimmer->shut_down(on_finish);
+}
+
+bool Journaler::is_initialized() const {
+ return m_metadata->is_initialized();
+}
+
+void Journaler::get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
+ int64_t *pool_id, Context *on_finish) {
+ m_metadata->get_immutable_metadata(order, splay_width, pool_id, on_finish);
+}
+
+void Journaler::get_mutable_metadata(uint64_t *minimum_set,
+ uint64_t *active_set,
+ RegisteredClients *clients,
+ Context *on_finish) {
+ m_metadata->get_mutable_metadata(minimum_set, active_set, clients, on_finish);
+}
+
+void Journaler::create(uint8_t order, uint8_t splay_width,
+ int64_t pool_id, Context *on_finish) {
+ if (order > 26 || order < 12) {
+ lderr(m_cct) << "order must be in the range [12, 26]" << dendl;
+ on_finish->complete(-EDOM);
+ return;
+ }
+ if (splay_width == 0) {
+ on_finish->complete(-EINVAL);
+ return;
+ }
+
+ ldout(m_cct, 5) << "creating new journal: " << m_header_oid << dendl;
+
+ librados::ObjectWriteOperation op;
+ client::create(&op, order, splay_width, pool_id);
+
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(on_finish, nullptr, rados_ctx_callback);
+ int r = m_header_ioctx.aio_operate(m_header_oid, comp, &op);
+ ceph_assert(r == 0);
+ comp->release();
+}
+
+void Journaler::remove(bool force, Context *on_finish) {
+ // chain journal removal (reverse order)
+ on_finish = new FunctionContext([this, on_finish](int r) {
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ on_finish, nullptr, utils::rados_ctx_callback);
+ r = m_header_ioctx.aio_remove(m_header_oid, comp);
+ ceph_assert(r == 0);
+ comp->release();
+ });
+
+ on_finish = new FunctionContext([this, force, on_finish](int r) {
+ m_trimmer->remove_objects(force, on_finish);
+ });
+
+ m_metadata->shut_down(on_finish);
+}
+
+void Journaler::flush_commit_position(Context *on_safe) {
+ m_metadata->flush_commit_position(on_safe);
+}
+
+void Journaler::add_listener(JournalMetadataListener *listener) {
+ m_metadata->add_listener(listener);
+}
+
+void Journaler::remove_listener(JournalMetadataListener *listener) {
+ m_metadata->remove_listener(listener);
+}
+
+int Journaler::register_client(const bufferlist &data) {
+ C_SaferCond cond;
+ register_client(data, &cond);
+ return cond.wait();
+}
+
+int Journaler::unregister_client() {
+ C_SaferCond cond;
+ unregister_client(&cond);
+ return cond.wait();
+}
+
+void Journaler::register_client(const bufferlist &data, Context *on_finish) {
+ return m_metadata->register_client(data, on_finish);
+}
+
+void Journaler::update_client(const bufferlist &data, Context *on_finish) {
+ return m_metadata->update_client(data, on_finish);
+}
+
+void Journaler::unregister_client(Context *on_finish) {
+ return m_metadata->unregister_client(on_finish);
+}
+
+void Journaler::get_client(const std::string &client_id,
+ cls::journal::Client *client,
+ Context *on_finish) {
+ m_metadata->get_client(client_id, client, on_finish);
+}
+
+int Journaler::get_cached_client(const std::string &client_id,
+ cls::journal::Client *client) {
+ RegisteredClients clients;
+ m_metadata->get_registered_clients(&clients);
+
+ auto it = clients.find({client_id, {}});
+ if (it == clients.end()) {
+ return -ENOENT;
+ }
+
+ *client = *it;
+ return 0;
+}
+
+void Journaler::allocate_tag(const bufferlist &data, cls::journal::Tag *tag,
+ Context *on_finish) {
+ m_metadata->allocate_tag(cls::journal::Tag::TAG_CLASS_NEW, data, tag,
+ on_finish);
+}
+
+void Journaler::allocate_tag(uint64_t tag_class, const bufferlist &data,
+ cls::journal::Tag *tag, Context *on_finish) {
+ m_metadata->allocate_tag(tag_class, data, tag, on_finish);
+}
+
+void Journaler::get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish) {
+ m_metadata->get_tag(tag_tid, tag, on_finish);
+}
+
+void Journaler::get_tags(uint64_t tag_class, Tags *tags, Context *on_finish) {
+ m_metadata->get_tags(0, tag_class, tags, on_finish);
+}
+
+void Journaler::get_tags(uint64_t start_after_tag_tid, uint64_t tag_class,
+ Tags *tags, Context *on_finish) {
+ m_metadata->get_tags(start_after_tag_tid, tag_class, tags, on_finish);
+}
+
+void Journaler::start_replay(ReplayHandler *replay_handler) {
+ create_player(replay_handler);
+ m_player->prefetch();
+}
+
+void Journaler::start_live_replay(ReplayHandler *replay_handler,
+ double interval) {
+ create_player(replay_handler);
+ m_player->prefetch_and_watch(interval);
+}
+
+bool Journaler::try_pop_front(ReplayEntry *replay_entry,
+ uint64_t *tag_tid) {
+ ceph_assert(m_player != nullptr);
+
+ Entry entry;
+ uint64_t commit_tid;
+ if (!m_player->try_pop_front(&entry, &commit_tid)) {
+ return false;
+ }
+
+ *replay_entry = ReplayEntry(entry.get_data(), commit_tid);
+ if (tag_tid != nullptr) {
+ *tag_tid = entry.get_tag_tid();
+ }
+ return true;
+}
+
+void Journaler::stop_replay() {
+ C_SaferCond ctx;
+ stop_replay(&ctx);
+ ctx.wait();
+}
+
+void Journaler::stop_replay(Context *on_finish) {
+ JournalPlayer *player = nullptr;
+ ceph_assert(m_player != nullptr);
+ std::swap(player, m_player);
+ ceph_assert(player != nullptr);
+
+ on_finish = new FunctionContext([player, on_finish](int r) {
+ delete player;
+ on_finish->complete(r);
+ });
+ player->shut_down(on_finish);
+}
+
+void Journaler::committed(const ReplayEntry &replay_entry) {
+ m_trimmer->committed(replay_entry.get_commit_tid());
+}
+
+void Journaler::committed(const Future &future) {
+ FutureImplPtr future_impl = future.get_future_impl();
+ m_trimmer->committed(future_impl->get_commit_tid());
+}
+
+void Journaler::start_append(uint64_t max_in_flight_appends) {
+ ceph_assert(m_recorder == nullptr);
+
+ // TODO verify active object set >= current replay object set
+
+ m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix,
+ m_metadata, max_in_flight_appends);
+}
+
+void Journaler::set_append_batch_options(int flush_interval,
+ uint64_t flush_bytes,
+ double flush_age) {
+ ceph_assert(m_recorder != nullptr);
+ m_recorder->set_append_batch_options(flush_interval, flush_bytes, flush_age);
+}
+
+void Journaler::stop_append(Context *on_safe) {
+ JournalRecorder *recorder = nullptr;
+ ceph_assert(m_recorder != nullptr);
+ std::swap(recorder, m_recorder);
+ ceph_assert(recorder != nullptr);
+
+ on_safe = new FunctionContext([recorder, on_safe](int r) {
+ delete recorder;
+ on_safe->complete(r);
+ });
+ recorder->shut_down(on_safe);
+}
+
+uint64_t Journaler::get_max_append_size() const {
+ uint64_t max_payload_size = m_metadata->get_object_size() -
+ Entry::get_fixed_size();
+ if (m_metadata->get_settings().max_payload_bytes > 0) {
+ max_payload_size = std::min(max_payload_size,
+ m_metadata->get_settings().max_payload_bytes);
+ }
+ return max_payload_size;
+}
+
+Future Journaler::append(uint64_t tag_tid, const bufferlist &payload_bl) {
+ return m_recorder->append(tag_tid, payload_bl);
+}
+
+void Journaler::flush_append(Context *on_safe) {
+ m_recorder->flush(on_safe);
+}
+
+void Journaler::create_player(ReplayHandler *replay_handler) {
+ ceph_assert(m_player == nullptr);
+ m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata,
+ replay_handler);
+}
+
+void Journaler::get_metadata(uint8_t *order, uint8_t *splay_width,
+ int64_t *pool_id) {
+ ceph_assert(m_metadata != nullptr);
+
+ *order = m_metadata->get_order();
+ *splay_width = m_metadata->get_splay_width();
+ *pool_id = m_metadata->get_pool_id();
+}
+
+std::ostream &operator<<(std::ostream &os,
+ const Journaler &journaler) {
+ os << "[metadata=";
+ if (journaler.m_metadata) {
+ os << *journaler.m_metadata;
+ } else {
+ os << "NULL";
+ }
+ os << "]";
+ return os;
+}
+
+} // namespace journal
diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h
new file mode 100644
index 00000000..5a6e0c7c
--- /dev/null
+++ b/src/journal/Journaler.h
@@ -0,0 +1,168 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNALER_H
+#define CEPH_JOURNAL_JOURNALER_H
+
+#include "include/int_types.h"
+#include "include/buffer_fwd.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "journal/Future.h"
+#include "journal/JournalMetadataListener.h"
+#include "cls/journal/cls_journal_types.h"
+#include <list>
+#include <map>
+#include <string>
+#include "include/ceph_assert.h"
+
+class ContextWQ;
+class SafeTimer;
+class ThreadPool;
+
+namespace journal {
+
+class JournalMetadata;
+class JournalPlayer;
+class JournalRecorder;
+class JournalTrimmer;
+class ReplayEntry;
+class ReplayHandler;
+class Settings;
+
+class Journaler {
+public:
+ struct Threads {
+ Threads(CephContext *cct);
+ ~Threads();
+
+ ThreadPool *thread_pool = nullptr;
+ ContextWQ *work_queue = nullptr;
+
+ SafeTimer *timer = nullptr;
+ Mutex timer_lock;
+ };
+
+ typedef cls::journal::Tag Tag;
+ typedef std::list<cls::journal::Tag> Tags;
+ typedef std::set<cls::journal::Client> RegisteredClients;
+
+ static std::string header_oid(const std::string &journal_id);
+ static std::string object_oid_prefix(int pool_id,
+ const std::string &journal_id);
+
+ Journaler(librados::IoCtx &header_ioctx, const std::string &journal_id,
+ const std::string &client_id, const Settings &settings);
+ Journaler(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+ librados::IoCtx &header_ioctx, const std::string &journal_id,
+ const std::string &client_id, const Settings &settings);
+ ~Journaler();
+
+ void exists(Context *on_finish) const;
+ void create(uint8_t order, uint8_t splay_width, int64_t pool_id, Context *ctx);
+ void remove(bool force, Context *on_finish);
+
+ void init(Context *on_init);
+ void shut_down();
+ void shut_down(Context *on_finish);
+
+ bool is_initialized() const;
+
+ void get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
+ int64_t *pool_id, Context *on_finish);
+ void get_mutable_metadata(uint64_t *minimum_set, uint64_t *active_set,
+ RegisteredClients *clients, Context *on_finish);
+
+ void add_listener(JournalMetadataListener *listener);
+ void remove_listener(JournalMetadataListener *listener);
+
+ int register_client(const bufferlist &data);
+ void register_client(const bufferlist &data, Context *on_finish);
+
+ int unregister_client();
+ void unregister_client(Context *on_finish);
+
+ void update_client(const bufferlist &data, Context *on_finish);
+ void get_client(const std::string &client_id, cls::journal::Client *client,
+ Context *on_finish);
+ int get_cached_client(const std::string &client_id,
+ cls::journal::Client *client);
+
+ void flush_commit_position(Context *on_safe);
+
+ void allocate_tag(const bufferlist &data, cls::journal::Tag *tag,
+ Context *on_finish);
+ void allocate_tag(uint64_t tag_class, const bufferlist &data,
+ cls::journal::Tag *tag, Context *on_finish);
+ void get_tag(uint64_t tag_tid, Tag *tag, Context *on_finish);
+ void get_tags(uint64_t tag_class, Tags *tags, Context *on_finish);
+ void get_tags(uint64_t start_after_tag_tid, uint64_t tag_class, Tags *tags,
+ Context *on_finish);
+
+ void start_replay(ReplayHandler *replay_handler);
+ void start_live_replay(ReplayHandler *replay_handler, double interval);
+ bool try_pop_front(ReplayEntry *replay_entry, uint64_t *tag_tid = nullptr);
+ void stop_replay();
+ void stop_replay(Context *on_finish);
+
+ uint64_t get_max_append_size() const;
+ void start_append(uint64_t max_in_flight_appends);
+ void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+ double flush_age);
+ Future append(uint64_t tag_tid, const bufferlist &bl);
+ void flush_append(Context *on_safe);
+ void stop_append(Context *on_safe);
+
+ void committed(const ReplayEntry &replay_entry);
+ void committed(const Future &future);
+
+ void get_metadata(uint8_t *order, uint8_t *splay_width, int64_t *pool_id);
+
+private:
+ struct C_InitJournaler : public Context {
+ Journaler *journaler;
+ Context *on_safe;
+ C_InitJournaler(Journaler *_journaler, Context *_on_safe)
+ : journaler(_journaler), on_safe(_on_safe) {
+ }
+ void finish(int r) override {
+ if (r == 0) {
+ r = journaler->init_complete();
+ }
+ on_safe->complete(r);
+ }
+ };
+
+ Threads *m_threads = nullptr;
+
+ mutable librados::IoCtx m_header_ioctx;
+ librados::IoCtx m_data_ioctx;
+ CephContext *m_cct;
+ std::string m_client_id;
+
+ std::string m_header_oid;
+ std::string m_object_oid_prefix;
+
+ bool m_initialized = false;
+ JournalMetadata *m_metadata = nullptr;
+ JournalPlayer *m_player = nullptr;
+ JournalRecorder *m_recorder = nullptr;
+ JournalTrimmer *m_trimmer = nullptr;
+
+ void set_up(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+ librados::IoCtx &header_ioctx, const std::string &journal_id,
+ const Settings &settings);
+
+ int init_complete();
+ void create_player(ReplayHandler *replay_handler);
+
+ friend std::ostream &operator<<(std::ostream &os,
+ const Journaler &journaler);
+};
+
+std::ostream &operator<<(std::ostream &os,
+ const Journaler &journaler);
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNALER_H
diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc
new file mode 100644
index 00000000..d4d9fb75
--- /dev/null
+++ b/src/journal/ObjectPlayer.cc
@@ -0,0 +1,313 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectPlayer.h"
+#include "journal/Utils.h"
+#include "common/Timer.h"
+#include <limits>
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "ObjectPlayer: " << this << " "
+
+namespace journal {
+
+ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
+ const std::string &object_oid_prefix,
+ uint64_t object_num, SafeTimer &timer,
+ Mutex &timer_lock, uint8_t order,
+ uint64_t max_fetch_bytes)
+ : RefCountedObject(NULL, 0), m_object_num(object_num),
+ m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
+ m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
+ m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
+ m_watch_interval(0), m_watch_task(NULL),
+ m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
+ m_fetch_in_progress(false) {
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+}
+
+ObjectPlayer::~ObjectPlayer() {
+ {
+ Mutex::Locker timer_locker(m_timer_lock);
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_fetch_in_progress);
+ ceph_assert(m_watch_ctx == nullptr);
+ }
+}
+
+void ObjectPlayer::fetch(Context *on_finish) {
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
+
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_fetch_in_progress);
+ m_fetch_in_progress = true;
+
+ C_Fetch *context = new C_Fetch(this, on_finish);
+ librados::ObjectReadOperation op;
+ op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
+ op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+
+ librados::AioCompletion *rados_completion =
+ librados::Rados::aio_create_completion(context, utils::rados_ctx_callback,
+ NULL);
+ int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
+ ceph_assert(r == 0);
+ rados_completion->release();
+}
+
+void ObjectPlayer::watch(Context *on_fetch, double interval) {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
+
+ Mutex::Locker timer_locker(m_timer_lock);
+ m_watch_interval = interval;
+
+ ceph_assert(m_watch_ctx == nullptr);
+ m_watch_ctx = on_fetch;
+
+ schedule_watch();
+}
+
+void ObjectPlayer::unwatch() {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
+ Context *watch_ctx = nullptr;
+ {
+ Mutex::Locker timer_locker(m_timer_lock);
+ ceph_assert(!m_unwatched);
+ m_unwatched = true;
+
+ if (!cancel_watch()) {
+ return;
+ }
+
+ std::swap(watch_ctx, m_watch_ctx);
+ }
+
+ if (watch_ctx != nullptr) {
+ watch_ctx->complete(-ECANCELED);
+ }
+}
+
+void ObjectPlayer::front(Entry *entry) const {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_entries.empty());
+ *entry = m_entries.front();
+}
+
+void ObjectPlayer::pop_front() {
+ Mutex::Locker locker(m_lock);
+ ceph_assert(!m_entries.empty());
+
+ auto &entry = m_entries.front();
+ m_entry_keys.erase({entry.get_tag_tid(), entry.get_entry_tid()});
+ m_entries.pop_front();
+}
+
+int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
+ bool *refetch) {
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len="
+ << bl.length() << dendl;
+
+ *refetch = false;
+ if (r == -ENOENT) {
+ return 0;
+ } else if (r < 0) {
+ return r;
+ } else if (bl.length() == 0) {
+ return 0;
+ }
+
+ Mutex::Locker locker(m_lock);
+ ceph_assert(m_fetch_in_progress);
+ m_read_off += bl.length();
+ m_read_bl.append(bl);
+ m_refetch_state = REFETCH_STATE_REQUIRED;
+
+ bool full_fetch = (m_max_fetch_bytes == 2U << m_order);
+ bool partial_entry = false;
+ bool invalid = false;
+ uint32_t invalid_start_off = 0;
+
+ clear_invalid_range(m_read_bl_off, m_read_bl.length());
+ bufferlist::const_iterator iter{&m_read_bl, 0};
+ while (!iter.end()) {
+ uint32_t bytes_needed;
+ uint32_t bl_off = iter.get_off();
+ if (!Entry::is_readable(iter, &bytes_needed)) {
+ if (bytes_needed != 0) {
+ invalid_start_off = m_read_bl_off + bl_off;
+ invalid = true;
+ partial_entry = true;
+ if (full_fetch) {
+ lderr(m_cct) << ": partial record at offset " << invalid_start_off
+ << dendl;
+ } else {
+ ldout(m_cct, 20) << ": partial record detected, will re-fetch"
+ << dendl;
+ }
+ break;
+ }
+
+ if (!invalid) {
+ invalid_start_off = m_read_bl_off + bl_off;
+ invalid = true;
+ lderr(m_cct) << ": detected corrupt journal entry at offset "
+ << invalid_start_off << dendl;
+ }
+ ++iter;
+ continue;
+ }
+
+ Entry entry;
+ decode(entry, iter);
+ ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
+
+ uint32_t entry_len = iter.get_off() - bl_off;
+ if (invalid) {
+ // new corrupt region detected
+ uint32_t invalid_end_off = m_read_bl_off + bl_off;
+ lderr(m_cct) << ": corruption range [" << invalid_start_off
+ << ", " << invalid_end_off << ")" << dendl;
+ m_invalid_ranges.insert(invalid_start_off,
+ invalid_end_off - invalid_start_off);
+ invalid = false;
+
+ m_read_bl_off = invalid_end_off;
+ }
+
+ EntryKey entry_key(std::make_pair(entry.get_tag_tid(),
+ entry.get_entry_tid()));
+ if (m_entry_keys.find(entry_key) == m_entry_keys.end()) {
+ m_entry_keys[entry_key] = m_entries.insert(m_entries.end(), entry);
+ } else {
+ ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl;
+ *m_entry_keys[entry_key] = entry;
+ }
+
+ // prune decoded / corrupted journal entries from front of bl
+ bufferlist sub_bl;
+ sub_bl.substr_of(m_read_bl, iter.get_off(),
+ m_read_bl.length() - iter.get_off());
+ sub_bl.swap(m_read_bl);
+ iter = bufferlist::iterator(&m_read_bl, 0);
+
+ // advance the decoded entry offset
+ m_read_bl_off += entry_len;
+ }
+
+ if (invalid) {
+ uint32_t invalid_end_off = m_read_bl_off + m_read_bl.length();
+ if (!partial_entry) {
+ lderr(m_cct) << ": corruption range [" << invalid_start_off
+ << ", " << invalid_end_off << ")" << dendl;
+ }
+ m_invalid_ranges.insert(invalid_start_off,
+ invalid_end_off - invalid_start_off);
+ }
+
+ if (!m_invalid_ranges.empty() && !partial_entry) {
+ return -EBADMSG;
+ } else if (partial_entry && (full_fetch || m_entries.empty())) {
+ *refetch = true;
+ return -EAGAIN;
+ }
+
+ return 0;
+}
+
+void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) {
+ // possibly remove previously partial record region
+ InvalidRanges decode_range;
+ decode_range.insert(off, len);
+ InvalidRanges intersect_range;
+ intersect_range.intersection_of(m_invalid_ranges, decode_range);
+ if (!intersect_range.empty()) {
+ ldout(m_cct, 20) << ": clearing invalid range: " << intersect_range
+ << dendl;
+ m_invalid_ranges.subtract(intersect_range);
+ }
+}
+
+void ObjectPlayer::schedule_watch() {
+ ceph_assert(m_timer_lock.is_locked());
+ if (m_watch_ctx == NULL) {
+ return;
+ }
+
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
+ ceph_assert(m_watch_task == nullptr);
+ m_watch_task = m_timer.add_event_after(
+ m_watch_interval,
+ new FunctionContext([this](int) {
+ handle_watch_task();
+ }));
+}
+
+bool ObjectPlayer::cancel_watch() {
+ ceph_assert(m_timer_lock.is_locked());
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
+ if (m_watch_task != nullptr) {
+ bool canceled = m_timer.cancel_event(m_watch_task);
+ ceph_assert(canceled);
+
+ m_watch_task = nullptr;
+ return true;
+ }
+ return false;
+}
+
+void ObjectPlayer::handle_watch_task() {
+ ceph_assert(m_timer_lock.is_locked());
+
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
+ ceph_assert(m_watch_ctx != nullptr);
+ ceph_assert(m_watch_task != nullptr);
+
+ m_watch_task = nullptr;
+ fetch(new C_WatchFetch(this));
+}
+
+void ObjectPlayer::handle_watch_fetched(int r) {
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
+ << dendl;
+
+ Context *watch_ctx = nullptr;
+ {
+ Mutex::Locker timer_locker(m_timer_lock);
+ std::swap(watch_ctx, m_watch_ctx);
+
+ if (m_unwatched) {
+ m_unwatched = false;
+ r = -ECANCELED;
+ }
+ }
+
+ if (watch_ctx != nullptr) {
+ watch_ctx->complete(r);
+ }
+}
+
+void ObjectPlayer::C_Fetch::finish(int r) {
+ bool refetch = false;
+ r = object_player->handle_fetch_complete(r, read_bl, &refetch);
+
+ {
+ Mutex::Locker locker(object_player->m_lock);
+ object_player->m_fetch_in_progress = false;
+ }
+
+ if (refetch) {
+ object_player->fetch(on_finish);
+ return;
+ }
+
+ object_player.reset();
+ on_finish->complete(r);
+}
+
+void ObjectPlayer::C_WatchFetch::finish(int r) {
+ object_player->handle_watch_fetched(r);
+}
+
+} // namespace journal
diff --git a/src/journal/ObjectPlayer.h b/src/journal/ObjectPlayer.h
new file mode 100644
index 00000000..b9062b83
--- /dev/null
+++ b/src/journal/ObjectPlayer.h
@@ -0,0 +1,141 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_OBJECT_PLAYER_H
+#define CEPH_JOURNAL_OBJECT_PLAYER_H
+
+#include "include/Context.h"
+#include "include/interval_set.h"
+#include "include/rados/librados.hpp"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "journal/Entry.h"
+#include <list>
+#include <string>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
+#include <boost/unordered_map.hpp>
+#include "include/ceph_assert.h"
+
+class SafeTimer;
+
+namespace journal {
+
+class ObjectPlayer;
+typedef boost::intrusive_ptr<ObjectPlayer> ObjectPlayerPtr;
+
+class ObjectPlayer : public RefCountedObject {
+public:
+ typedef std::list<Entry> Entries;
+ typedef interval_set<uint64_t> InvalidRanges;
+
+ enum RefetchState {
+ REFETCH_STATE_NONE,
+ REFETCH_STATE_REQUIRED,
+ REFETCH_STATE_IMMEDIATE
+ };
+
+ ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
+ uint64_t object_num, SafeTimer &timer, Mutex &timer_lock,
+ uint8_t order, uint64_t max_fetch_bytes);
+ ~ObjectPlayer() override;
+
+ inline const std::string &get_oid() const {
+ return m_oid;
+ }
+ inline uint64_t get_object_number() const {
+ return m_object_num;
+ }
+
+ void fetch(Context *on_finish);
+ void watch(Context *on_fetch, double interval);
+ void unwatch();
+
+ void front(Entry *entry) const;
+ void pop_front();
+ inline bool empty() const {
+ Mutex::Locker locker(m_lock);
+ return m_entries.empty();
+ }
+
+ inline void get_entries(Entries *entries) {
+ Mutex::Locker locker(m_lock);
+ *entries = m_entries;
+ }
+ inline void get_invalid_ranges(InvalidRanges *invalid_ranges) {
+ Mutex::Locker locker(m_lock);
+ *invalid_ranges = m_invalid_ranges;
+ }
+
+ inline bool refetch_required() const {
+ return (get_refetch_state() != REFETCH_STATE_NONE);
+ }
+ inline RefetchState get_refetch_state() const {
+ return m_refetch_state;
+ }
+ inline void set_refetch_state(RefetchState refetch_state) {
+ m_refetch_state = refetch_state;
+ }
+
+private:
+ typedef std::pair<uint64_t, uint64_t> EntryKey;
+ typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys;
+
+ struct C_Fetch : public Context {
+ ObjectPlayerPtr object_player;
+ Context *on_finish;
+ bufferlist read_bl;
+ C_Fetch(ObjectPlayer *o, Context *ctx) : object_player(o), on_finish(ctx) {
+ }
+ void finish(int r) override;
+ };
+ struct C_WatchFetch : public Context {
+ ObjectPlayerPtr object_player;
+ C_WatchFetch(ObjectPlayer *o) : object_player(o) {
+ }
+ void finish(int r) override;
+ };
+
+ librados::IoCtx m_ioctx;
+ uint64_t m_object_num;
+ std::string m_oid;
+ CephContext *m_cct;
+
+ SafeTimer &m_timer;
+ Mutex &m_timer_lock;
+
+ uint8_t m_order;
+ uint64_t m_max_fetch_bytes;
+
+ double m_watch_interval;
+ Context *m_watch_task;
+
+ mutable Mutex m_lock;
+ bool m_fetch_in_progress;
+ bufferlist m_read_bl;
+ uint32_t m_read_off = 0;
+ uint32_t m_read_bl_off = 0;
+
+ Entries m_entries;
+ EntryKeys m_entry_keys;
+ InvalidRanges m_invalid_ranges;
+
+ Context *m_watch_ctx = nullptr;
+
+ bool m_unwatched = false;
+ RefetchState m_refetch_state = REFETCH_STATE_IMMEDIATE;
+
+ int handle_fetch_complete(int r, const bufferlist &bl, bool *refetch);
+
+ void clear_invalid_range(uint32_t off, uint32_t len);
+
+ void schedule_watch();
+ bool cancel_watch();
+ void handle_watch_task();
+ void handle_watch_fetched(int r);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_OBJECT_PLAYER_H
diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc
new file mode 100644
index 00000000..127731e9
--- /dev/null
+++ b/src/journal/ObjectRecorder.cc
@@ -0,0 +1,361 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectRecorder.h"
+#include "journal/Future.h"
+#include "journal/Utils.h"
+#include "include/ceph_assert.h"
+#include "common/Timer.h"
+#include "cls/journal/cls_journal_client.h"
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "ObjectRecorder: " << this << " " \
+ << __func__ << " (" << m_oid << "): "
+
+using namespace cls::journal;
+using std::shared_ptr;
+
+namespace journal {
+
+ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t object_number, shared_ptr<Mutex> lock,
+ ContextWQ *work_queue, Handler *handler,
+ uint8_t order, int32_t max_in_flight_appends)
+ : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
+ m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler),
+ m_order(order), m_soft_max_size(1 << m_order),
+ m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this),
+ m_lock(lock), m_last_flush_time(ceph_clock_now()), m_append_tid(0),
+ m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+ ceph_assert(m_handler != NULL);
+ ldout(m_cct, 20) << dendl;
+}
+
+ObjectRecorder::~ObjectRecorder() {
+ ldout(m_cct, 20) << dendl;
+ ceph_assert(m_pending_buffers.empty());
+ ceph_assert(m_in_flight_tids.empty());
+ ceph_assert(m_in_flight_appends.empty());
+}
+
+void ObjectRecorder::set_append_batch_options(int flush_interval,
+ uint64_t flush_bytes,
+ double flush_age) {
+ ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
+ << "flush_bytes=" << flush_bytes << ", "
+ << "flush_age=" << flush_age << dendl;
+
+ ceph_assert(m_lock->is_locked());
+ m_flush_interval = flush_interval;
+ m_flush_bytes = flush_bytes;
+ m_flush_age = flush_age;
+}
+
+bool ObjectRecorder::append(AppendBuffers &&append_buffers) {
+ ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl;
+
+ ceph_assert(m_lock->is_locked());
+
+ FutureImplPtr last_flushed_future;
+ for (auto& append_buffer : append_buffers) {
+ ldout(m_cct, 20) << *append_buffer.first << ", "
+ << "size=" << append_buffer.second.length() << dendl;
+ bool flush_requested = append_buffer.first->attach(&m_flush_handler);
+ if (flush_requested) {
+ last_flushed_future = append_buffer.first;
+ }
+
+ m_pending_buffers.push_back(append_buffer);
+ m_pending_bytes += append_buffer.second.length();
+ }
+
+ return send_appends(!!last_flushed_future, last_flushed_future);
+}
+
+void ObjectRecorder::flush(Context *on_safe) {
+ ldout(m_cct, 20) << dendl;
+
+ Future future;
+ {
+ Mutex::Locker locker(*m_lock);
+
+ // if currently handling flush notifications, wait so that
+ // we notify in the correct order (since lock is dropped on
+ // callback)
+ if (m_in_flight_flushes) {
+ m_in_flight_flushes_cond.Wait(*(m_lock.get()));
+ }
+
+ // attach the flush to the most recent append
+ if (!m_pending_buffers.empty()) {
+ future = Future(m_pending_buffers.rbegin()->first);
+ } else if (!m_in_flight_appends.empty()) {
+ AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
+ ceph_assert(!append_buffers.empty());
+ future = Future(append_buffers.rbegin()->first);
+ }
+ }
+
+ if (future.is_valid()) {
+ // cannot be invoked while the same lock context
+ m_op_work_queue->queue(new FunctionContext(
+ [future, on_safe] (int r) mutable {
+ future.flush(on_safe);
+ }));
+ } else {
+ on_safe->complete(0);
+ }
+}
+
+void ObjectRecorder::flush(const FutureImplPtr &future) {
+ ldout(m_cct, 20) << "flushing " << *future << dendl;
+
+ m_lock->Lock();
+ if (future->get_flush_handler().get() != &m_flush_handler) {
+ // if we don't own this future, re-issue the flush so that it hits the
+ // correct journal object owner
+ future->flush();
+ m_lock->Unlock();
+ return;
+ } else if (future->is_flush_in_progress()) {
+ m_lock->Unlock();
+ return;
+ }
+
+ bool overflowed = send_appends(true, future);
+ if (overflowed) {
+ notify_handler_unlock();
+ } else {
+ m_lock->Unlock();
+ }
+}
+
+void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
+ ldout(m_cct, 20) << dendl;
+
+ ceph_assert(m_lock->is_locked());
+ ceph_assert(m_in_flight_tids.empty());
+ ceph_assert(m_in_flight_appends.empty());
+ ceph_assert(m_object_closed || m_overflowed);
+
+ for (auto& append_buffer : m_pending_buffers) {
+ ldout(m_cct, 20) << "detached " << *append_buffer.first << dendl;
+ append_buffer.first->detach();
+ }
+ append_buffers->splice(append_buffers->end(), m_pending_buffers,
+ m_pending_buffers.begin(), m_pending_buffers.end());
+}
+
+bool ObjectRecorder::close() {
+ ceph_assert(m_lock->is_locked());
+
+ ldout(m_cct, 20) << dendl;
+ send_appends(true, {});
+
+ ceph_assert(!m_object_closed);
+ m_object_closed = true;
+ return (m_in_flight_tids.empty() && !m_in_flight_flushes);
+}
+
+void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
+ ldout(m_cct, 20) << "tid=" << tid << ", r=" << r << dendl;
+
+ AppendBuffers append_buffers;
+ {
+ m_lock->Lock();
+ auto tid_iter = m_in_flight_tids.find(tid);
+ ceph_assert(tid_iter != m_in_flight_tids.end());
+ m_in_flight_tids.erase(tid_iter);
+
+ InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
+ ceph_assert(iter != m_in_flight_appends.end());
+
+ if (r == -EOVERFLOW) {
+ ldout(m_cct, 10) << "append overflowed" << dendl;
+ m_overflowed = true;
+
+ // notify of overflow once all in-flight ops are complete
+ if (m_in_flight_tids.empty()) {
+ append_overflowed();
+ notify_handler_unlock();
+ } else {
+ m_lock->Unlock();
+ }
+ return;
+ }
+
+ append_buffers.swap(iter->second);
+ ceph_assert(!append_buffers.empty());
+
+ for (auto& append_buffer : append_buffers) {
+ m_object_bytes += append_buffer.second.length();
+ }
+ ldout(m_cct, 20) << "object_bytes=" << m_object_bytes << dendl;
+
+ m_in_flight_appends.erase(iter);
+ m_in_flight_flushes = true;
+ m_lock->Unlock();
+ }
+
+ // Flag the associated futures as complete.
+ for (auto& append_buffer : append_buffers) {
+ ldout(m_cct, 20) << *append_buffer.first << " marked safe" << dendl;
+ append_buffer.first->safe(r);
+ }
+
+ // wake up any flush requests that raced with a RADOS callback
+ m_lock->Lock();
+ m_in_flight_flushes = false;
+ m_in_flight_flushes_cond.Signal();
+
+ if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) {
+ // all remaining unsent appends should be redirected to new object
+ notify_handler_unlock();
+ } else {
+ bool overflowed = send_appends(false, {});
+ if (overflowed) {
+ notify_handler_unlock();
+ } else {
+ m_lock->Unlock();
+ }
+ }
+}
+
+void ObjectRecorder::append_overflowed() {
+ ldout(m_cct, 10) << dendl;
+
+ ceph_assert(m_lock->is_locked());
+ ceph_assert(!m_in_flight_appends.empty());
+
+ InFlightAppends in_flight_appends;
+ in_flight_appends.swap(m_in_flight_appends);
+
+ AppendBuffers restart_append_buffers;
+ for (InFlightAppends::iterator it = in_flight_appends.begin();
+ it != in_flight_appends.end(); ++it) {
+ restart_append_buffers.insert(restart_append_buffers.end(),
+ it->second.begin(), it->second.end());
+ }
+
+ restart_append_buffers.splice(restart_append_buffers.end(),
+ m_pending_buffers,
+ m_pending_buffers.begin(),
+ m_pending_buffers.end());
+ restart_append_buffers.swap(m_pending_buffers);
+}
+
+bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
+ ldout(m_cct, 20) << dendl;
+
+ ceph_assert(m_lock->is_locked());
+ if (m_object_closed || m_overflowed) {
+ ldout(m_cct, 20) << "already closed or overflowed" << dendl;
+ return false;
+ }
+
+ if (m_pending_buffers.empty()) {
+ ldout(m_cct, 20) << "append buffers empty" << dendl;
+ return false;
+ }
+
+ if (!force &&
+ ((m_flush_interval > 0 && m_pending_buffers.size() >= m_flush_interval) ||
+ (m_flush_bytes > 0 && m_pending_bytes >= m_flush_bytes) ||
+ (m_flush_age > 0 &&
+ m_last_flush_time + m_flush_age >= ceph_clock_now()))) {
+ ldout(m_cct, 20) << "forcing batch flush" << dendl;
+ force = true;
+ }
+
+ auto max_in_flight_appends = m_max_in_flight_appends;
+ if (m_flush_interval > 0 || m_flush_bytes > 0 || m_flush_age > 0) {
+ if (!force && max_in_flight_appends == 0) {
+ ldout(m_cct, 20) << "attempting to batch AIO appends" << dendl;
+ max_in_flight_appends = 1;
+ }
+ } else if (max_in_flight_appends < 0) {
+ max_in_flight_appends = 0;
+ }
+
+ if (!force && max_in_flight_appends != 0 &&
+ static_cast<int32_t>(m_in_flight_tids.size()) >= max_in_flight_appends) {
+ ldout(m_cct, 10) << "max in flight appends reached" << dendl;
+ return false;
+ }
+
+ librados::ObjectWriteOperation op;
+ client::guard_append(&op, m_soft_max_size);
+
+ size_t append_bytes = 0;
+ AppendBuffers append_buffers;
+ for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
+ auto& future = it->first;
+ auto& bl = it->second;
+ auto size = m_object_bytes + m_in_flight_bytes + append_bytes + bl.length();
+ if (size == m_soft_max_size) {
+ ldout(m_cct, 10) << "object at capacity " << *future << dendl;
+ m_overflowed = true;
+ } else if (size > m_soft_max_size) {
+ ldout(m_cct, 10) << "object beyond capacity " << *future << dendl;
+ m_overflowed = true;
+ break;
+ }
+
+ bool flush_break = (force && flush_future && flush_future == future);
+ ldout(m_cct, 20) << "flushing " << *future << dendl;
+ future->set_flush_in_progress();
+
+ op.append(bl);
+ op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+
+ append_bytes += bl.length();
+ append_buffers.push_back(*it);
+ it = m_pending_buffers.erase(it);
+
+ if (flush_break) {
+ ldout(m_cct, 20) << "stopping at requested flush future" << dendl;
+ break;
+ }
+ }
+
+ if (append_bytes > 0) {
+ m_last_flush_time = ceph_clock_now();
+
+ uint64_t append_tid = m_append_tid++;
+ m_in_flight_tids.insert(append_tid);
+ m_in_flight_appends[append_tid].swap(append_buffers);
+ m_in_flight_bytes += append_bytes;
+
+ ceph_assert(m_pending_bytes >= append_bytes);
+ m_pending_bytes -= append_bytes;
+
+ auto rados_completion = librados::Rados::aio_create_completion(
+ new C_AppendFlush(this, append_tid), nullptr, utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
+ ceph_assert(r == 0);
+ rados_completion->release();
+ ldout(m_cct, 20) << "flushing journal tid=" << append_tid << ", "
+ << "append_bytes=" << append_bytes << ", "
+ << "in_flight_bytes=" << m_in_flight_bytes << ", "
+ << "pending_bytes=" << m_pending_bytes << dendl;
+ }
+
+ return m_overflowed;
+}
+
+void ObjectRecorder::notify_handler_unlock() {
+ ceph_assert(m_lock->is_locked());
+ if (m_object_closed) {
+ m_lock->Unlock();
+ m_handler->closed(this);
+ } else {
+ // TODO need to delay completion until after aio_notify completes
+ m_lock->Unlock();
+ m_handler->overflow(this);
+ }
+}
+
+} // namespace journal
diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h
new file mode 100644
index 00000000..ff00e0a0
--- /dev/null
+++ b/src/journal/ObjectRecorder.h
@@ -0,0 +1,155 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
+#define CEPH_JOURNAL_OBJECT_RECORDER_H
+
+#include "include/utime.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "common/WorkQueue.h"
+#include "journal/FutureImpl.h"
+#include <list>
+#include <map>
+#include <set>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
+#include "include/ceph_assert.h"
+
+class SafeTimer;
+
+namespace journal {
+
+class ObjectRecorder;
+typedef boost::intrusive_ptr<ObjectRecorder> ObjectRecorderPtr;
+
+typedef std::pair<FutureImplPtr, bufferlist> AppendBuffer;
+typedef std::list<AppendBuffer> AppendBuffers;
+
+class ObjectRecorder : public RefCountedObject, boost::noncopyable {
+public:
+ struct Handler {
+ virtual ~Handler() {
+ }
+ virtual void closed(ObjectRecorder *object_recorder) = 0;
+ virtual void overflow(ObjectRecorder *object_recorder) = 0;
+ };
+
+ ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t object_number, std::shared_ptr<Mutex> lock,
+ ContextWQ *work_queue, Handler *handler, uint8_t order,
+ int32_t max_in_flight_appends);
+ ~ObjectRecorder() override;
+
+ void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+ double flush_age);
+
+ inline uint64_t get_object_number() const {
+ return m_object_number;
+ }
+ inline const std::string &get_oid() const {
+ return m_oid;
+ }
+
+ bool append(AppendBuffers &&append_buffers);
+ void flush(Context *on_safe);
+ void flush(const FutureImplPtr &future);
+
+ void claim_append_buffers(AppendBuffers *append_buffers);
+
+ bool is_closed() const {
+ ceph_assert(m_lock->is_locked());
+ return (m_object_closed && m_in_flight_appends.empty());
+ }
+ bool close();
+
+ inline CephContext *cct() const {
+ return m_cct;
+ }
+
+ inline size_t get_pending_appends() const {
+ Mutex::Locker locker(*m_lock);
+ return m_pending_buffers.size();
+ }
+
+private:
+ typedef std::set<uint64_t> InFlightTids;
+ typedef std::map<uint64_t, AppendBuffers> InFlightAppends;
+
+ struct FlushHandler : public FutureImpl::FlushHandler {
+ ObjectRecorder *object_recorder;
+ FlushHandler(ObjectRecorder *o) : object_recorder(o) {}
+ void get() override {
+ object_recorder->get();
+ }
+ void put() override {
+ object_recorder->put();
+ }
+ void flush(const FutureImplPtr &future) override {
+ object_recorder->flush(future);
+ }
+ };
+ struct C_AppendFlush : public Context {
+ ObjectRecorder *object_recorder;
+ uint64_t tid;
+ C_AppendFlush(ObjectRecorder *o, uint64_t _tid)
+ : object_recorder(o), tid(_tid) {
+ object_recorder->get();
+ }
+ void finish(int r) override {
+ object_recorder->handle_append_flushed(tid, r);
+ object_recorder->put();
+ }
+ };
+
+ librados::IoCtx m_ioctx;
+ std::string m_oid;
+ uint64_t m_object_number;
+ CephContext *m_cct;
+
+ ContextWQ *m_op_work_queue;
+
+ Handler *m_handler;
+
+ uint8_t m_order;
+ uint64_t m_soft_max_size;
+
+ uint32_t m_flush_interval = 0;
+ uint64_t m_flush_bytes = 0;
+ double m_flush_age = 0;
+ int32_t m_max_in_flight_appends;
+
+ FlushHandler m_flush_handler;
+
+ mutable std::shared_ptr<Mutex> m_lock;
+ AppendBuffers m_pending_buffers;
+ uint64_t m_pending_bytes = 0;
+ utime_t m_last_flush_time;
+
+ uint64_t m_append_tid;
+
+ InFlightTids m_in_flight_tids;
+ InFlightAppends m_in_flight_appends;
+ uint64_t m_object_bytes = 0;
+ bool m_overflowed;
+ bool m_object_closed;
+
+ bufferlist m_prefetch_bl;
+
+ bool m_in_flight_flushes;
+ Cond m_in_flight_flushes_cond;
+ uint64_t m_in_flight_bytes = 0;
+
+ bool send_appends(bool force, FutureImplPtr flush_sentinal);
+ void handle_append_flushed(uint64_t tid, int r);
+ void append_overflowed();
+
+ void notify_handler_unlock();
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_OBJECT_RECORDER_H
diff --git a/src/journal/ReplayEntry.h b/src/journal/ReplayEntry.h
new file mode 100644
index 00000000..4dd3ba47
--- /dev/null
+++ b/src/journal/ReplayEntry.h
@@ -0,0 +1,34 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_REPLAY_ENTRY_H
+#define CEPH_JOURNAL_REPLAY_ENTRY_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+
+namespace journal {
+
+class ReplayEntry {
+public:
+ ReplayEntry() : m_commit_tid(0) {
+ }
+ ReplayEntry(const bufferlist &data, uint64_t commit_tid)
+ : m_data(data), m_commit_tid(commit_tid) {
+ }
+
+ inline const bufferlist &get_data() const {
+ return m_data;
+ }
+ inline uint64_t get_commit_tid() const {
+ return m_commit_tid;
+ }
+
+private:
+ bufferlist m_data;
+ uint64_t m_commit_tid;
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_REPLAY_ENTRY_H
diff --git a/src/journal/ReplayHandler.h b/src/journal/ReplayHandler.h
new file mode 100644
index 00000000..e61240d8
--- /dev/null
+++ b/src/journal/ReplayHandler.h
@@ -0,0 +1,21 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_REPLAY_HANDLER_H
+#define CEPH_JOURNAL_REPLAY_HANDLER_H
+
+namespace journal {
+
+struct ReplayHandler {
+ virtual ~ReplayHandler() {}
+
+ virtual void get() = 0;
+ virtual void put() = 0;
+
+ virtual void handle_entries_available() = 0;
+ virtual void handle_complete(int r) = 0;
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_REPLAY_HANDLER_H
diff --git a/src/journal/Settings.h b/src/journal/Settings.h
new file mode 100644
index 00000000..ca57125a
--- /dev/null
+++ b/src/journal/Settings.h
@@ -0,0 +1,22 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_SETTINGS_H
+#define CEPH_JOURNAL_SETTINGS_H
+
+#include "include/int_types.h"
+
+namespace journal {
+
+struct Settings {
+ double commit_interval = 5; ///< commit position throttle (in secs)
+ uint64_t max_fetch_bytes = 0; ///< 0 implies no limit
+ uint64_t max_payload_bytes = 0; ///< 0 implies object size limit
+ int max_concurrent_object_sets = 0; ///< 0 implies no limit
+ std::set<std::string> whitelisted_laggy_clients;
+ ///< clients that mustn't be disconnected
+};
+
+} // namespace journal
+
+#endif // # CEPH_JOURNAL_SETTINGS_H
diff --git a/src/journal/Utils.cc b/src/journal/Utils.cc
new file mode 100644
index 00000000..2a8d945a
--- /dev/null
+++ b/src/journal/Utils.cc
@@ -0,0 +1,25 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Utils.h"
+#include "include/Context.h"
+#include "include/stringify.h"
+
+namespace journal {
+namespace utils {
+
+std::string get_object_name(const std::string &prefix, uint64_t number) {
+ return prefix + stringify(number);
+}
+
+std::string unique_lock_name(const std::string &name, void *address) {
+ return name + " (" + stringify(address) + ")";
+}
+
+void rados_ctx_callback(rados_completion_t c, void *arg) {
+ Context *comp = reinterpret_cast<Context *>(arg);
+ comp->complete(rados_aio_get_return_value(c));
+}
+
+} // namespace utils
+} // namespace journal
diff --git a/src/journal/Utils.h b/src/journal/Utils.h
new file mode 100644
index 00000000..c5695e58
--- /dev/null
+++ b/src/journal/Utils.h
@@ -0,0 +1,54 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_UTILS_H
+#define CEPH_JOURNAL_UTILS_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include <string>
+
+namespace journal {
+namespace utils {
+
+namespace detail {
+
+template <typename M>
+struct C_AsyncCallback : public Context {
+ M journal_metadata;
+ Context *on_finish;
+
+ C_AsyncCallback(M journal_metadata, Context *on_finish)
+ : journal_metadata(journal_metadata), on_finish(on_finish) {
+ }
+ void finish(int r) override {
+ journal_metadata->queue(on_finish, r);
+ }
+};
+
+} // namespace detail
+
+template <typename T, void(T::*MF)(int)>
+void rados_state_callback(rados_completion_t c, void *arg) {
+ T *obj = reinterpret_cast<T*>(arg);
+ int r = rados_aio_get_return_value(c);
+ (obj->*MF)(r);
+}
+
+std::string get_object_name(const std::string &prefix, uint64_t number);
+
+std::string unique_lock_name(const std::string &name, void *address);
+
+void rados_ctx_callback(rados_completion_t c, void *arg);
+
+template <typename M>
+Context *create_async_context_callback(M journal_metadata, Context *on_finish) {
+ // use async callback to acquire a clean lock context
+ return new detail::C_AsyncCallback<M>(journal_metadata, on_finish);
+}
+
+} // namespace utils
+} // namespace journal
+
+#endif // CEPH_JOURNAL_UTILS_H