summaryrefslogtreecommitdiffstats
path: root/src/osdc/Journaler.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/osdc/Journaler.h
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/osdc/Journaler.h')
-rw-r--r--src/osdc/Journaler.h542
1 files changed, 542 insertions, 0 deletions
diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h
new file mode 100644
index 000000000..3e8f0f665
--- /dev/null
+++ b/src/osdc/Journaler.h
@@ -0,0 +1,542 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+/* Journaler
+ *
+ * This class stripes a serial log over objects on the store. Four
+ * logical pointers:
+ *
+ * write_pos - where we're writing new entries
+ * unused_field - where we're reading old entires
+ * expire_pos - what is deemed "old" by user
+ * trimmed_pos - where we're expiring old items
+ *
+ * trimmed_pos <= expire_pos <= unused_field <= write_pos.
+ *
+ * Often, unused_field <= write_pos (as with MDS log). During
+ * recovery, write_pos is undefined until the end of the log is
+ * discovered.
+ *
+ * A "head" struct at the beginning of the log is used to store
+ * metadata at regular intervals. The basic invariants include:
+ *
+ * head.unused_field <= unused_field -- the head may "lag", since
+ * it's updated lazily.
+ * head.write_pos <= write_pos
+ * head.expire_pos <= expire_pos
+ * head.trimmed_pos <= trimmed_pos
+ *
+ * More significantly,
+ *
+ * head.expire_pos >= trimmed_pos -- this ensures we can find the
+ * "beginning" of the log as last
+ * recorded, before it is trimmed.
+ * trimming will block until a
+ * sufficiently current expire_pos
+ * is committed.
+ *
+ * To recover log state, we simply start at the last write_pos in the
+ * head, and probe the object sequence sizes until we read the end.
+ *
+ * Head struct is stored in the first object. Actual journal starts
+ * after layout.period() bytes.
+ *
+ */
+
+#ifndef CEPH_JOURNALER_H
+#define CEPH_JOURNALER_H
+
+#include <list>
+#include <map>
+
+#include "Objecter.h"
+#include "Filer.h"
+
+#include "common/Timer.h"
+#include "common/Throttle.h"
+#include "include/common_fwd.h"
+
+class Context;
+class Finisher;
+class C_OnFinisher;
+
+typedef __u8 stream_format_t;
+
+// Legacy envelope is leading uint32_t size
+enum StreamFormat {
+ JOURNAL_FORMAT_LEGACY = 0,
+ JOURNAL_FORMAT_RESILIENT = 1,
+ // Insert new formats here, before COUNT
+ JOURNAL_FORMAT_COUNT
+};
+
+// Highest journal format version that we support
+#define JOURNAL_FORMAT_MAX (JOURNAL_FORMAT_COUNT - 1)
+
+// Legacy envelope is leading uint32_t size
+#define JOURNAL_ENVELOPE_LEGACY (sizeof(uint32_t))
+
+// Resilient envelope is leading uint64_t sentinel, uint32_t size,
+// trailing uint64_t start_ptr
+#define JOURNAL_ENVELOPE_RESILIENT (sizeof(uint32_t) + sizeof(uint64_t) + \
+ sizeof(uint64_t))
+
+/**
+ * Represents a collection of entries serialized in a byte stream.
+ *
+ * Each entry consists of:
+ * - a blob (used by the next level up as a serialized LogEvent)
+ * - a uint64_t (used by the next level up as a pointer to the start
+ * of the entry in the collection bytestream)
+ */
+class JournalStream
+{
+ stream_format_t format;
+
+ public:
+ JournalStream(stream_format_t format_) : format(format_) {}
+
+ void set_format(stream_format_t format_) {format = format_;}
+
+ bool readable(bufferlist &bl, uint64_t *need) const;
+ size_t read(bufferlist &from, bufferlist *to, uint64_t *start_ptr);
+ size_t write(bufferlist &entry, bufferlist *to, uint64_t const &start_ptr);
+ size_t get_envelope_size() const {
+ if (format >= JOURNAL_FORMAT_RESILIENT) {
+ return JOURNAL_ENVELOPE_RESILIENT;
+ } else {
+ return JOURNAL_ENVELOPE_LEGACY;
+ }
+ }
+
+ // A magic number for the start of journal entries, so that we can
+ // identify them in damaged journals.
+ static const uint64_t sentinel = 0x3141592653589793;
+};
+
+
+class Journaler {
+public:
+ // this goes at the head of the log "file".
+ class Header {
+ public:
+ uint64_t trimmed_pos;
+ uint64_t expire_pos;
+ uint64_t unused_field;
+ uint64_t write_pos;
+ string magic;
+ file_layout_t layout; //< The mapping from byte stream offsets
+ // to RADOS objects
+ stream_format_t stream_format; //< The encoding of LogEvents
+ // within the journal byte stream
+
+ Header(const char *m="") :
+ trimmed_pos(0), expire_pos(0), unused_field(0), write_pos(0), magic(m),
+ stream_format(-1) {
+ }
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(2, 2, bl);
+ encode(magic, bl);
+ encode(trimmed_pos, bl);
+ encode(expire_pos, bl);
+ encode(unused_field, bl);
+ encode(write_pos, bl);
+ encode(layout, bl, 0); // encode in legacy format
+ encode(stream_format, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::const_iterator &bl) {
+ DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
+ decode(magic, bl);
+ decode(trimmed_pos, bl);
+ decode(expire_pos, bl);
+ decode(unused_field, bl);
+ decode(write_pos, bl);
+ decode(layout, bl);
+ if (struct_v > 1) {
+ decode(stream_format, bl);
+ } else {
+ stream_format = JOURNAL_FORMAT_LEGACY;
+ }
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const {
+ f->open_object_section("journal_header");
+ {
+ f->dump_string("magic", magic);
+ f->dump_unsigned("write_pos", write_pos);
+ f->dump_unsigned("expire_pos", expire_pos);
+ f->dump_unsigned("trimmed_pos", trimmed_pos);
+ f->dump_unsigned("stream_format", stream_format);
+ f->dump_object("layout", layout);
+ }
+ f->close_section(); // journal_header
+ }
+
+ static void generate_test_instances(list<Header*> &ls)
+ {
+ ls.push_back(new Header());
+
+ ls.push_back(new Header());
+ ls.back()->trimmed_pos = 1;
+ ls.back()->expire_pos = 2;
+ ls.back()->unused_field = 3;
+ ls.back()->write_pos = 4;
+ ls.back()->magic = "magique";
+
+ ls.push_back(new Header());
+ ls.back()->stream_format = JOURNAL_FORMAT_RESILIENT;
+ }
+ };
+ WRITE_CLASS_ENCODER(Header)
+
+ uint32_t get_stream_format() const {
+ return stream_format;
+ }
+
+ Header last_committed;
+
+private:
+ // me
+ CephContext *cct;
+ std::mutex lock;
+ const std::string name;
+ typedef std::lock_guard<std::mutex> lock_guard;
+ typedef std::unique_lock<std::mutex> unique_lock;
+ Finisher *finisher;
+ Header last_written;
+ inodeno_t ino;
+ int64_t pg_pool;
+ bool readonly;
+ file_layout_t layout;
+ uint32_t stream_format;
+ JournalStream journal_stream;
+
+ const char *magic;
+ Objecter *objecter;
+ Filer filer;
+
+ PerfCounters *logger;
+ int logger_key_lat;
+
+ class C_DelayFlush;
+ C_DelayFlush *delay_flush_event;
+ /*
+ * Do a flush as a result of a C_DelayFlush context.
+ */
+ void _do_delayed_flush()
+ {
+ ceph_assert(delay_flush_event != NULL);
+ lock_guard l(lock);
+ delay_flush_event = NULL;
+ _do_flush();
+ }
+
+ // my state
+ static const int STATE_UNDEF = 0;
+ static const int STATE_READHEAD = 1;
+ static const int STATE_PROBING = 2;
+ static const int STATE_ACTIVE = 3;
+ static const int STATE_REREADHEAD = 4;
+ static const int STATE_REPROBING = 5;
+ static const int STATE_STOPPING = 6;
+
+ int state;
+ int error;
+
+ void _write_head(Context *oncommit=NULL);
+ void _wait_for_flush(Context *onsafe);
+ void _trim();
+
+ // header
+ ceph::real_time last_wrote_head;
+ void _finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit);
+ class C_WriteHead;
+ friend class C_WriteHead;
+
+ void _reread_head(Context *onfinish);
+ void _set_layout(file_layout_t const *l);
+ list<Context*> waitfor_recover;
+ void _read_head(Context *on_finish, bufferlist *bl);
+ void _finish_read_head(int r, bufferlist& bl);
+ void _finish_reread_head(int r, bufferlist& bl, Context *finish);
+ void _probe(Context *finish, uint64_t *end);
+ void _finish_probe_end(int r, uint64_t end);
+ void _reprobe(C_OnFinisher *onfinish);
+ void _finish_reprobe(int r, uint64_t end, C_OnFinisher *onfinish);
+ void _finish_reread_head_and_probe(int r, C_OnFinisher *onfinish);
+ class C_ReadHead;
+ friend class C_ReadHead;
+ class C_ProbeEnd;
+ friend class C_ProbeEnd;
+ class C_RereadHead;
+ friend class C_RereadHead;
+ class C_ReProbe;
+ friend class C_ReProbe;
+ class C_RereadHeadProbe;
+ friend class C_RereadHeadProbe;
+
+ // writer
+ uint64_t prezeroing_pos;
+ uint64_t prezero_pos; ///< we zero journal space ahead of write_pos to
+ // avoid problems with tail probing
+ uint64_t write_pos; ///< logical write position, where next entry
+ // will go
+ uint64_t flush_pos; ///< where we will flush. if
+ /// write_pos>flush_pos, we're buffering writes.
+ uint64_t safe_pos; ///< what has been committed safely to disk.
+
+ uint64_t next_safe_pos; /// start position of the first entry that isn't
+ /// being fully flushed. If we don't flush any
+ // partial entry, it's equal to flush_pos.
+
+ bufferlist write_buf; ///< write buffer. flush_pos +
+ /// write_buf.length() == write_pos.
+
+ // protect write_buf from bufferlist _len overflow
+ Throttle write_buf_throttle;
+
+ uint64_t waiting_for_zero_pos;
+ interval_set<uint64_t> pending_zero; // non-contig bits we've zeroed
+ list<Context*> waitfor_prezero;
+
+ std::map<uint64_t, uint64_t> pending_safe; // flush_pos -> safe_pos
+ // when safe through given offset
+ std::map<uint64_t, std::list<Context*> > waitfor_safe;
+
+ void _flush(C_OnFinisher *onsafe);
+ void _do_flush(unsigned amount=0);
+ void _finish_flush(int r, uint64_t start, ceph::real_time stamp);
+ class C_Flush;
+ friend class C_Flush;
+
+ // reader
+ uint64_t read_pos; // logical read position, where next entry starts.
+ uint64_t requested_pos; // what we've requested from OSD.
+ uint64_t received_pos; // what we've received from OSD.
+ // read buffer. unused_field + read_buf.length() == prefetch_pos.
+ bufferlist read_buf;
+
+ map<uint64_t,bufferlist> prefetch_buf;
+
+ uint64_t fetch_len; // how much to read at a time
+ uint64_t temp_fetch_len;
+
+ // for wait_for_readable()
+ C_OnFinisher *on_readable;
+ C_OnFinisher *on_write_error;
+ bool called_write_error;
+
+ // read completion callback
+ void _finish_read(int r, uint64_t offset, uint64_t length, bufferlist &bl);
+ void _finish_retry_read(int r);
+ void _assimilate_prefetch();
+ void _issue_read(uint64_t len); // read some more
+ void _prefetch(); // maybe read ahead
+ class C_Read;
+ friend class C_Read;
+ class C_RetryRead;
+ friend class C_RetryRead;
+
+ // trimmer
+ uint64_t expire_pos; // what we're allowed to trim to
+ uint64_t trimming_pos; // what we've requested to trim through
+ uint64_t trimmed_pos; // what has been trimmed
+
+ bool readable;
+
+ void _finish_trim(int r, uint64_t to);
+ class C_Trim;
+ friend class C_Trim;
+
+ void _issue_prezero();
+ void _finish_prezero(int r, uint64_t from, uint64_t len);
+ friend struct C_Journaler_Prezero;
+
+ // only init_headers when following or first reading off-disk
+ void init_headers(Header& h) {
+ ceph_assert(readonly ||
+ state == STATE_READHEAD ||
+ state == STATE_REREADHEAD);
+ last_written = last_committed = h;
+ }
+
+ /**
+ * handle a write error
+ *
+ * called when we get an objecter error on a write.
+ *
+ * @param r error code
+ */
+ void handle_write_error(int r);
+
+ bool _is_readable();
+
+ void _finish_erase(int data_result, C_OnFinisher *completion);
+ class C_EraseFinish;
+ friend class C_EraseFinish;
+
+ C_OnFinisher *wrap_finisher(Context *c);
+
+ uint32_t write_iohint; // the fadvise flags for write op, see
+ // CEPH_OSD_OP_FADIVSE_*
+
+public:
+ Journaler(const std::string &name_, inodeno_t ino_, int64_t pool,
+ const char *mag, Objecter *obj, PerfCounters *l, int lkey, Finisher *f) :
+ last_committed(mag),
+ cct(obj->cct), name(name_), finisher(f), last_written(mag),
+ ino(ino_), pg_pool(pool), readonly(true),
+ stream_format(-1), journal_stream(-1),
+ magic(mag),
+ objecter(obj), filer(objecter, f), logger(l), logger_key_lat(lkey),
+ delay_flush_event(0),
+ state(STATE_UNDEF), error(0),
+ prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0),
+ safe_pos(0), next_safe_pos(0),
+ write_buf_throttle(cct, "write_buf_throttle", UINT_MAX - (UINT_MAX >> 3)),
+ waiting_for_zero_pos(0),
+ read_pos(0), requested_pos(0), received_pos(0),
+ fetch_len(0), temp_fetch_len(0),
+ on_readable(0), on_write_error(NULL), called_write_error(false),
+ expire_pos(0), trimming_pos(0), trimmed_pos(0), readable(false),
+ write_iohint(0)
+ {
+ }
+
+ /* reset
+ *
+ * NOTE: we assume the caller knows/has ensured that any objects in
+ * our sequence do not exist.. e.g. after a MKFS. this is _not_ an
+ * "erase" method.
+ */
+ void reset() {
+ lock_guard l(lock);
+ ceph_assert(state == STATE_ACTIVE);
+
+ readonly = true;
+ delay_flush_event = NULL;
+ state = STATE_UNDEF;
+ error = 0;
+ prezeroing_pos = 0;
+ prezero_pos = 0;
+ write_pos = 0;
+ flush_pos = 0;
+ safe_pos = 0;
+ next_safe_pos = 0;
+ read_pos = 0;
+ requested_pos = 0;
+ received_pos = 0;
+ fetch_len = 0;
+ ceph_assert(!on_readable);
+ expire_pos = 0;
+ trimming_pos = 0;
+ trimmed_pos = 0;
+ waiting_for_zero_pos = 0;
+ }
+
+ // Asynchronous operations
+ // =======================
+ void erase(Context *completion);
+ void create(file_layout_t *layout, stream_format_t const sf);
+ void recover(Context *onfinish);
+ void reread_head(Context *onfinish);
+ void reread_head_and_probe(Context *onfinish);
+ void write_head(Context *onsave=0);
+ void wait_for_flush(Context *onsafe = 0);
+ void flush(Context *onsafe = 0);
+ void wait_for_readable(Context *onfinish);
+ bool have_waiter() const;
+ void wait_for_prezero(Context *onfinish);
+
+ // Synchronous setters
+ // ===================
+ void set_layout(file_layout_t const *l);
+ void set_readonly();
+ void set_writeable();
+ void set_write_pos(uint64_t p) {
+ lock_guard l(lock);
+ prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = p;
+ }
+ void set_read_pos(uint64_t p) {
+ lock_guard l(lock);
+ // we can't cope w/ in-progress read right now.
+ ceph_assert(requested_pos == received_pos);
+ read_pos = requested_pos = received_pos = p;
+ read_buf.clear();
+ }
+ uint64_t append_entry(bufferlist& bl);
+ void set_expire_pos(uint64_t ep) {
+ lock_guard l(lock);
+ expire_pos = ep;
+ }
+ void set_trimmed_pos(uint64_t p) {
+ lock_guard l(lock);
+ trimming_pos = trimmed_pos = p;
+ }
+
+ bool _write_head_needed();
+ bool write_head_needed() {
+ lock_guard l(lock);
+ return _write_head_needed();
+ }
+
+
+ void trim();
+ void trim_tail() {
+ lock_guard l(lock);
+
+ ceph_assert(!readonly);
+ _issue_prezero();
+ }
+
+ void set_write_error_handler(Context *c);
+
+ void set_write_iohint(uint32_t iohint_flags) {
+ write_iohint = iohint_flags;
+ }
+ /**
+ * Cause any ongoing waits to error out with -EAGAIN, set error
+ * to -EAGAIN.
+ */
+ void shutdown();
+public:
+
+ // Synchronous getters
+ // ===================
+ // TODO: need some locks on reads for true safety
+ uint64_t get_layout_period() const {
+ return layout.get_period();
+ }
+ file_layout_t& get_layout() { return layout; }
+ bool is_active() { return state == STATE_ACTIVE; }
+ bool is_stopping() { return state == STATE_STOPPING; }
+ int get_error() { return error; }
+ bool is_readonly() { return readonly; }
+ bool is_readable();
+ bool try_read_entry(bufferlist& bl);
+ uint64_t get_write_pos() const { return write_pos; }
+ uint64_t get_write_safe_pos() const { return safe_pos; }
+ uint64_t get_read_pos() const { return read_pos; }
+ uint64_t get_expire_pos() const { return expire_pos; }
+ uint64_t get_trimmed_pos() const { return trimmed_pos; }
+ size_t get_journal_envelope_size() const {
+ return journal_stream.get_envelope_size();
+ }
+};
+WRITE_CLASS_ENCODER(Journaler::Header)
+
+#endif