summaryrefslogtreecommitdiffstats
path: root/src/os/filestore/FileJournal.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/os/filestore/FileJournal.h546
1 files changed, 546 insertions, 0 deletions
diff --git a/src/os/filestore/FileJournal.h b/src/os/filestore/FileJournal.h
new file mode 100644
index 000000000..53b18c125
--- /dev/null
+++ b/src/os/filestore/FileJournal.h
@@ -0,0 +1,546 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef CEPH_FILEJOURNAL_H
+#define CEPH_FILEJOURNAL_H
+
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+#include <stdlib.h>
+using std::deque;
+
+#include "Journal.h"
+#include "common/config_fwd.h"
+#include "common/Cond.h"
+#include "common/Thread.h"
+#include "common/Throttle.h"
+#include "JournalThrottle.h"
+#include "common/zipkin_trace.h"
+
+#ifdef HAVE_LIBAIO
+# include <libaio.h>
+#endif
+
+// re-include our assert to clobber the system one; fix dout:
+#include "include/ceph_assert.h"
+
+/**
+ * Implements journaling on top of block device or file.
+ *
+ * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock)
+ */
+class FileJournal :
+ public Journal,
+ public md_config_obs_t {
+public:
+ /// Protected by finisher_lock
+ struct completion_item {
+ uint64_t seq;
+ Context *finish;
+ utime_t start;
+ TrackedOpRef tracked_op;
+ completion_item(uint64_t o, Context *c, utime_t s, TrackedOpRef opref)
+ : seq(o), finish(c), start(s), tracked_op(opref) {}
+ completion_item() : seq(0), finish(0), start(0) {}
+ };
+ struct write_item {
+ uint64_t seq;
+ ceph::buffer::list bl;
+ uint32_t orig_len;
+ TrackedOpRef tracked_op;
+ ZTracer::Trace trace;
+ write_item(uint64_t s, ceph::buffer::list& b, int ol, TrackedOpRef opref) :
+ seq(s), orig_len(ol), tracked_op(opref) {
+ bl = std::move(b);
+ }
+ write_item() : seq(0), orig_len(0) {}
+ };
+
+ ceph::mutex finisher_lock = ceph::make_mutex("FileJournal::finisher_lock");
+ ceph::condition_variable finisher_cond;
+ uint64_t journaled_seq;
+ bool plug_journal_completions;
+
+ ceph::mutex writeq_lock = ceph::make_mutex("FileJournal::writeq_lock");
+ ceph::condition_variable writeq_cond;
+ std::list<write_item> writeq;
+ bool writeq_empty();
+ write_item &peek_write();
+ void pop_write();
+ void batch_pop_write(std::list<write_item> &items);
+ void batch_unpop_write(std::list<write_item> &items);
+
+ ceph::mutex completions_lock =
+ ceph::make_mutex("FileJournal::completions_lock");
+ std::list<completion_item> completions;
+ bool completions_empty() {
+ std::lock_guard l{completions_lock};
+ return completions.empty();
+ }
+ void batch_pop_completions(std::list<completion_item> &items) {
+ std::lock_guard l{completions_lock};
+ completions.swap(items);
+ }
+ void batch_unpop_completions(std::list<completion_item> &items) {
+ std::lock_guard l{completions_lock};
+ completions.splice(completions.begin(), items);
+ }
+ completion_item completion_peek_front() {
+ std::lock_guard l{completions_lock};
+ ceph_assert(!completions.empty());
+ return completions.front();
+ }
+ void completion_pop_front() {
+ std::lock_guard l{completions_lock};
+ ceph_assert(!completions.empty());
+ completions.pop_front();
+ }
+
+ int prepare_entry(std::vector<ObjectStore::Transaction>& tls, ceph::buffer::list* tbl) override;
+
+ void submit_entry(uint64_t seq, ceph::buffer::list& bl, uint32_t orig_len,
+ Context *oncommit,
+ TrackedOpRef osd_op = TrackedOpRef()) override;
+ /// End protected by finisher_lock
+
+ /*
+ * journal header
+ */
+ struct header_t {
+ enum {
+ FLAG_CRC = (1<<0),
+ // NOTE: remove kludgey weirdness in read_header() next time a flag is added.
+ };
+
+ uint64_t flags;
+ uuid_d fsid;
+ __u32 block_size;
+ __u32 alignment;
+ int64_t max_size; // max size of journal ring buffer
+ int64_t start; // offset of first entry
+ uint64_t committed_up_to; // committed up to
+
+ /**
+ * start_seq
+ *
+ * entry at header.start has sequence >= start_seq
+ *
+ * Generally, the entry at header.start will have sequence
+ * start_seq if it exists. The only exception is immediately
+ * after journal creation since the first sequence number is
+ * not known.
+ *
+ * If the first read on open fails, we can assume corruption
+ * if start_seq > committed_up_to because the entry would have
+ * a sequence >= start_seq and therefore > committed_up_to.
+ */
+ uint64_t start_seq;
+
+ header_t() :
+ flags(0), block_size(0), alignment(0), max_size(0), start(0),
+ committed_up_to(0), start_seq(0) {}
+
+ void clear() {
+ start = block_size;
+ }
+
+ uint64_t get_fsid64() const {
+ return *(uint64_t*)fsid.bytes();
+ }
+
+ void encode(ceph::buffer::list& bl) const {
+ using ceph::encode;
+ __u32 v = 4;
+ encode(v, bl);
+ ceph::buffer::list em;
+ {
+ encode(flags, em);
+ encode(fsid, em);
+ encode(block_size, em);
+ encode(alignment, em);
+ encode(max_size, em);
+ encode(start, em);
+ encode(committed_up_to, em);
+ encode(start_seq, em);
+ }
+ encode(em, bl);
+ }
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ using ceph::decode;
+ __u32 v;
+ decode(v, bl);
+ if (v < 2) { // normally 0, but conceivably 1
+ // decode old header_t struct (pre v0.40).
+ bl += 4u; // skip __u32 flags (it was unused by any old code)
+ flags = 0;
+ uint64_t tfsid;
+ decode(tfsid, bl);
+ *(uint64_t*)&fsid.bytes()[0] = tfsid;
+ *(uint64_t*)&fsid.bytes()[8] = tfsid;
+ decode(block_size, bl);
+ decode(alignment, bl);
+ decode(max_size, bl);
+ decode(start, bl);
+ committed_up_to = 0;
+ start_seq = 0;
+ return;
+ }
+ ceph::buffer::list em;
+ decode(em, bl);
+ auto t = em.cbegin();
+ decode(flags, t);
+ decode(fsid, t);
+ decode(block_size, t);
+ decode(alignment, t);
+ decode(max_size, t);
+ decode(start, t);
+
+ if (v > 2)
+ decode(committed_up_to, t);
+ else
+ committed_up_to = 0;
+
+ if (v > 3)
+ decode(start_seq, t);
+ else
+ start_seq = 0;
+ }
+ } header;
+
+ struct entry_header_t {
+ uint64_t seq; // fs op seq #
+ uint32_t crc32c; // payload only. not header, pre_pad, post_pad, or footer.
+ uint32_t len;
+ uint32_t pre_pad, post_pad;
+ uint64_t magic1;
+ uint64_t magic2;
+
+ static uint64_t make_magic(uint64_t seq, uint32_t len, uint64_t fsid) {
+ return (fsid ^ seq ^ len);
+ }
+ bool check_magic(off64_t pos, uint64_t fsid) {
+ return
+ magic1 == (uint64_t)pos &&
+ magic2 == (fsid ^ seq ^ len);
+ }
+ } __attribute__((__packed__, aligned(4)));
+
+ bool journalq_empty() { return journalq.empty(); }
+
+private:
+ std::string fn;
+
+ char *zero_buf;
+ off64_t max_size;
+ size_t block_size;
+ bool directio, aio, force_aio;
+ bool must_write_header;
+ off64_t write_pos; // byte where the next entry to be written will go
+ off64_t read_pos; //
+ bool discard; //for block journal whether support discard
+
+#ifdef HAVE_LIBAIO
+ /// state associated with an in-flight aio request
+ /// Protected by aio_lock
+ struct aio_info {
+ struct iocb iocb {};
+ ceph::buffer::list bl;
+ struct iovec *iov;
+ bool done;
+ uint64_t off, len; ///< these are for debug only
+ uint64_t seq; ///< seq number to complete on aio completion, if non-zero
+
+ aio_info(ceph::buffer::list& b, uint64_t o, uint64_t s)
+ : iov(NULL), done(false), off(o), len(b.length()), seq(s) {
+ bl = std::move(b);
+ }
+ ~aio_info() {
+ delete[] iov;
+ }
+ };
+ ceph::mutex aio_lock = ceph::make_mutex("FileJournal::aio_lock");
+ ceph::condition_variable aio_cond;
+ ceph::condition_variable write_finish_cond;
+ io_context_t aio_ctx = 0;
+ std::list<aio_info> aio_queue;
+ int aio_num = 0, aio_bytes = 0;
+ uint64_t aio_write_queue_ops = 0;
+ uint64_t aio_write_queue_bytes = 0;
+ /// End protected by aio_lock
+#endif
+
+ uint64_t last_committed_seq;
+ uint64_t journaled_since_start;
+
+ std::string devname;
+
+ /*
+ * full states cycle at the beginnging of each commit epoch, when commit_start()
+ * is called.
+ * FULL - we just filled up during this epoch.
+ * WAIT - we filled up last epoch; now we have to wait until everything during
+ * that epoch commits to the fs before we can start writing over it.
+ * NOTFULL - all good, journal away.
+ */
+ enum {
+ FULL_NOTFULL = 0,
+ FULL_FULL = 1,
+ FULL_WAIT = 2,
+ } full_state;
+
+ int fd;
+
+ // in journal
+ std::deque<std::pair<uint64_t, off64_t> > journalq; // track seq offsets, so we can trim later.
+ uint64_t writing_seq;
+
+
+ // throttle
+ int set_throttle_params();
+ const char** get_tracked_conf_keys() const override;
+ void handle_conf_change(
+ const ConfigProxy& conf,
+ const std::set <std::string> &changed) override {
+ for (const char **i = get_tracked_conf_keys();
+ *i;
+ ++i) {
+ if (changed.count(std::string(*i))) {
+ set_throttle_params();
+ return;
+ }
+ }
+ }
+
+ void complete_write(uint64_t ops, uint64_t bytes);
+ JournalThrottle throttle;
+
+ // write thread
+ ceph::mutex write_lock = ceph::make_mutex("FileJournal::write_lock");
+ bool write_stop;
+ bool aio_stop;
+
+ ceph::condition_variable commit_cond;
+
+ int _open(bool wr, bool create=false);
+ int _open_block_device();
+ void _close(int fd) const;
+ int _open_file(int64_t oldsize, blksize_t blksize, bool create);
+ int _dump(std::ostream& out, bool simple);
+ void print_header(const header_t &hdr) const;
+ int read_header(header_t *hdr) const;
+ ceph::bufferptr prepare_header();
+ void start_writer();
+ void stop_writer();
+ void write_thread_entry();
+
+ void queue_completions_thru(uint64_t seq);
+
+ int check_for_full(uint64_t seq, off64_t pos, off64_t size);
+ int prepare_multi_write(ceph::buffer::list& bl, uint64_t& orig_ops, uint64_t& orig_bytee);
+ int prepare_single_write(write_item &next_write, ceph::buffer::list& bl, off64_t& queue_pos,
+ uint64_t& orig_ops, uint64_t& orig_bytes);
+ void do_write(ceph::buffer::list& bl);
+
+ void write_finish_thread_entry();
+ void check_aio_completion();
+ void do_aio_write(ceph::buffer::list& bl);
+ int write_aio_bl(off64_t& pos, ceph::buffer::list& bl, uint64_t seq);
+
+
+ void check_align(off64_t pos, ceph::buffer::list& bl);
+ int write_bl(off64_t& pos, ceph::buffer::list& bl);
+
+ /// read len from journal starting at in_pos and wrapping up to len
+ void wrap_read_bl(
+ off64_t in_pos, ///< [in] start position
+ int64_t len, ///< [in] length to read
+ ceph::buffer::list* bl, ///< [out] result
+ off64_t *out_pos ///< [out] next position to read, will be wrapped
+ ) const;
+
+ void do_discard(int64_t offset, int64_t end);
+
+ class Writer : public Thread {
+ FileJournal *journal;
+ public:
+ explicit Writer(FileJournal *fj) : journal(fj) {}
+ void *entry() override {
+ journal->write_thread_entry();
+ return 0;
+ }
+ } write_thread;
+
+ class WriteFinisher : public Thread {
+ FileJournal *journal;
+ public:
+ explicit WriteFinisher(FileJournal *fj) : journal(fj) {}
+ void *entry() override {
+ journal->write_finish_thread_entry();
+ return 0;
+ }
+ } write_finish_thread;
+
+ off64_t get_top() const {
+ return round_up_to(sizeof(header), block_size);
+ }
+
+ ZTracer::Endpoint trace_endpoint;
+
+ public:
+ FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, ceph::condition_variable *sync_cond,
+ const char *f, bool dio=false, bool ai=true, bool faio=false) :
+ Journal(cct, fsid, fin, sync_cond),
+ journaled_seq(0),
+ plug_journal_completions(false),
+ fn(f),
+ zero_buf(NULL),
+ max_size(0), block_size(0),
+ directio(dio), aio(ai), force_aio(faio),
+ must_write_header(false),
+ write_pos(0), read_pos(0),
+ discard(false),
+ last_committed_seq(0),
+ journaled_since_start(0),
+ full_state(FULL_NOTFULL),
+ fd(-1),
+ writing_seq(0),
+ throttle(cct, cct->_conf->filestore_caller_concurrency),
+ write_stop(true),
+ aio_stop(true),
+ write_thread(this),
+ write_finish_thread(this),
+ trace_endpoint("0.0.0.0", 0, "FileJournal") {
+
+ if (aio && !directio) {
+ lderr(cct) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl;
+ aio = false;
+ }
+#ifndef HAVE_LIBAIO
+ if (aio && ::getenv("CEPH_DEV") == NULL) {
+ lderr(cct) << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl;
+ aio = false;
+ }
+#endif
+
+ cct->_conf.add_observer(this);
+ }
+ ~FileJournal() override {
+ ceph_assert(fd == -1);
+ delete[] zero_buf;
+ cct->_conf.remove_observer(this);
+ }
+
+ int check() override;
+ int create() override;
+ int open(uint64_t fs_op_seq) override;
+ void close() override;
+ int peek_fsid(uuid_d& fsid);
+
+ int dump(std::ostream& out) override;
+ int simple_dump(std::ostream& out);
+ int _fdump(ceph::Formatter &f, bool simple);
+
+ void flush() override;
+
+ void get_devices(std::set<std::string> *ls) override;
+ void collect_metadata(std::map<std::string,std::string> *pm) override;
+
+ void reserve_throttle_and_backoff(uint64_t count) override;
+
+ bool is_writeable() override {
+ return read_pos == 0;
+ }
+ int make_writeable() override;
+
+ // writes
+ void commit_start(uint64_t seq) override;
+ void committed_thru(uint64_t seq) override;
+ bool should_commit_now() override {
+ return full_state != FULL_NOTFULL && !write_stop;
+ }
+
+ void write_header_sync();
+
+ void set_wait_on_full(bool b) { wait_on_full = b; }
+
+ off64_t get_journal_size_estimate() override;
+
+ // reads
+
+ /// Result code for read_entry
+ enum read_entry_result {
+ SUCCESS,
+ FAILURE,
+ MAYBE_CORRUPT
+ };
+
+ /**
+ * read_entry
+ *
+ * Reads next entry starting at pos. If the entry appears
+ * clean, *bl will contain the payload, *seq will contain
+ * the sequence number, and *out_pos will reflect the next
+ * read position. If the entry is invalid *ss will contain
+ * debug text, while *seq, *out_pos, and *bl will be unchanged.
+ *
+ * If the entry suggests a corrupt log, *ss will contain debug
+ * text, *out_pos will contain the next index to check. If
+ * we find an entry in this way that returns SUCCESS, the journal
+ * is most likely corrupt.
+ */
+ read_entry_result do_read_entry(
+ off64_t pos, ///< [in] position to read
+ off64_t *next_pos, ///< [out] next position to read
+ ceph::buffer::list* bl, ///< [out] payload for successful read
+ uint64_t *seq, ///< [out] seq of successful read
+ std::ostream *ss, ///< [out] error output
+ entry_header_t *h = 0 ///< [out] header
+ ) const; ///< @return result code
+
+ bool read_entry(
+ ceph::buffer::list &bl,
+ uint64_t &last_seq,
+ bool *corrupt
+ );
+
+ bool read_entry(
+ ceph::buffer::list &bl,
+ uint64_t &last_seq) override {
+ return read_entry(bl, last_seq, 0);
+ }
+
+ // Debug/Testing
+ void get_header(
+ uint64_t wanted_seq,
+ off64_t *_pos,
+ entry_header_t *h);
+ void corrupt(
+ int wfd,
+ off64_t corrupt_at);
+ void corrupt_payload(
+ int wfd,
+ uint64_t seq);
+ void corrupt_footer_magic(
+ int wfd,
+ uint64_t seq);
+ void corrupt_header_magic(
+ int wfd,
+ uint64_t seq);
+};
+
+WRITE_CLASS_ENCODER(FileJournal::header_t)
+
+#endif