// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_FILEJOURNAL_H #define CEPH_FILEJOURNAL_H #include #include #include #include using std::deque; #include "Journal.h" #include "common/config_fwd.h" #include "common/Cond.h" #include "common/Thread.h" #include "common/Throttle.h" #include "JournalThrottle.h" #include "common/zipkin_trace.h" #ifdef HAVE_LIBAIO # include #endif // re-include our assert to clobber the system one; fix dout: #include "include/ceph_assert.h" /** * Implements journaling on top of block device or file. * * Lock ordering is write_lock > aio_lock > (completions_lock | finisher_lock) */ class FileJournal : public Journal, public md_config_obs_t { public: /// Protected by finisher_lock struct completion_item { uint64_t seq; Context *finish; utime_t start; TrackedOpRef tracked_op; completion_item(uint64_t o, Context *c, utime_t s, TrackedOpRef opref) : seq(o), finish(c), start(s), tracked_op(opref) {} completion_item() : seq(0), finish(0), start(0) {} }; struct write_item { uint64_t seq; ceph::buffer::list bl; uint32_t orig_len; TrackedOpRef tracked_op; ZTracer::Trace trace; write_item(uint64_t s, ceph::buffer::list& b, int ol, TrackedOpRef opref) : seq(s), orig_len(ol), tracked_op(opref) { bl = std::move(b); } write_item() : seq(0), orig_len(0) {} }; ceph::mutex finisher_lock = ceph::make_mutex("FileJournal::finisher_lock"); ceph::condition_variable finisher_cond; uint64_t journaled_seq; bool plug_journal_completions; ceph::mutex writeq_lock = ceph::make_mutex("FileJournal::writeq_lock"); ceph::condition_variable writeq_cond; std::list writeq; bool writeq_empty(); write_item &peek_write(); void pop_write(); void batch_pop_write(std::list &items); void batch_unpop_write(std::list &items); ceph::mutex completions_lock = ceph::make_mutex("FileJournal::completions_lock"); std::list completions; bool completions_empty() { std::lock_guard l{completions_lock}; return completions.empty(); } void batch_pop_completions(std::list &items) { std::lock_guard l{completions_lock}; completions.swap(items); } void batch_unpop_completions(std::list &items) { std::lock_guard l{completions_lock}; completions.splice(completions.begin(), items); } completion_item completion_peek_front() { std::lock_guard l{completions_lock}; ceph_assert(!completions.empty()); return completions.front(); } void completion_pop_front() { std::lock_guard l{completions_lock}; ceph_assert(!completions.empty()); completions.pop_front(); } int prepare_entry(std::vector& tls, ceph::buffer::list* tbl) override; void submit_entry(uint64_t seq, ceph::buffer::list& bl, uint32_t orig_len, Context *oncommit, TrackedOpRef osd_op = TrackedOpRef()) override; /// End protected by finisher_lock /* * journal header */ struct header_t { enum { FLAG_CRC = (1<<0), // NOTE: remove kludgey weirdness in read_header() next time a flag is added. }; uint64_t flags; uuid_d fsid; __u32 block_size; __u32 alignment; int64_t max_size; // max size of journal ring buffer int64_t start; // offset of first entry uint64_t committed_up_to; // committed up to /** * start_seq * * entry at header.start has sequence >= start_seq * * Generally, the entry at header.start will have sequence * start_seq if it exists. The only exception is immediately * after journal creation since the first sequence number is * not known. * * If the first read on open fails, we can assume corruption * if start_seq > committed_up_to because the entry would have * a sequence >= start_seq and therefore > committed_up_to. */ uint64_t start_seq; header_t() : flags(0), block_size(0), alignment(0), max_size(0), start(0), committed_up_to(0), start_seq(0) {} void clear() { start = block_size; } uint64_t get_fsid64() const { return *(uint64_t*)fsid.bytes(); } void encode(ceph::buffer::list& bl) const { using ceph::encode; __u32 v = 4; encode(v, bl); ceph::buffer::list em; { encode(flags, em); encode(fsid, em); encode(block_size, em); encode(alignment, em); encode(max_size, em); encode(start, em); encode(committed_up_to, em); encode(start_seq, em); } encode(em, bl); } void decode(ceph::buffer::list::const_iterator& bl) { using ceph::decode; __u32 v; decode(v, bl); if (v < 2) { // normally 0, but conceivably 1 // decode old header_t struct (pre v0.40). bl += 4u; // skip __u32 flags (it was unused by any old code) flags = 0; uint64_t tfsid; decode(tfsid, bl); *(uint64_t*)&fsid.bytes()[0] = tfsid; *(uint64_t*)&fsid.bytes()[8] = tfsid; decode(block_size, bl); decode(alignment, bl); decode(max_size, bl); decode(start, bl); committed_up_to = 0; start_seq = 0; return; } ceph::buffer::list em; decode(em, bl); auto t = em.cbegin(); decode(flags, t); decode(fsid, t); decode(block_size, t); decode(alignment, t); decode(max_size, t); decode(start, t); if (v > 2) decode(committed_up_to, t); else committed_up_to = 0; if (v > 3) decode(start_seq, t); else start_seq = 0; } } header; struct entry_header_t { uint64_t seq; // fs op seq # uint32_t crc32c; // payload only. not header, pre_pad, post_pad, or footer. uint32_t len; uint32_t pre_pad, post_pad; uint64_t magic1; uint64_t magic2; static uint64_t make_magic(uint64_t seq, uint32_t len, uint64_t fsid) { return (fsid ^ seq ^ len); } bool check_magic(off64_t pos, uint64_t fsid) { return magic1 == (uint64_t)pos && magic2 == (fsid ^ seq ^ len); } } __attribute__((__packed__, aligned(4))); bool journalq_empty() { return journalq.empty(); } private: std::string fn; char *zero_buf; off64_t max_size; size_t block_size; bool directio, aio, force_aio; bool must_write_header; off64_t write_pos; // byte where the next entry to be written will go off64_t read_pos; // bool discard; //for block journal whether support discard #ifdef HAVE_LIBAIO /// state associated with an in-flight aio request /// Protected by aio_lock struct aio_info { struct iocb iocb {}; ceph::buffer::list bl; struct iovec *iov; bool done; uint64_t off, len; ///< these are for debug only uint64_t seq; ///< seq number to complete on aio completion, if non-zero aio_info(ceph::buffer::list& b, uint64_t o, uint64_t s) : iov(NULL), done(false), off(o), len(b.length()), seq(s) { bl = std::move(b); } ~aio_info() { delete[] iov; } }; ceph::mutex aio_lock = ceph::make_mutex("FileJournal::aio_lock"); ceph::condition_variable aio_cond; ceph::condition_variable write_finish_cond; io_context_t aio_ctx = 0; std::list aio_queue; int aio_num = 0, aio_bytes = 0; uint64_t aio_write_queue_ops = 0; uint64_t aio_write_queue_bytes = 0; /// End protected by aio_lock #endif uint64_t last_committed_seq; uint64_t journaled_since_start; std::string devname; /* * full states cycle at the beginnging of each commit epoch, when commit_start() * is called. * FULL - we just filled up during this epoch. * WAIT - we filled up last epoch; now we have to wait until everything during * that epoch commits to the fs before we can start writing over it. * NOTFULL - all good, journal away. */ enum { FULL_NOTFULL = 0, FULL_FULL = 1, FULL_WAIT = 2, } full_state; int fd; // in journal std::deque > journalq; // track seq offsets, so we can trim later. uint64_t writing_seq; // throttle int set_throttle_params(); const char** get_tracked_conf_keys() const override; void handle_conf_change( const ConfigProxy& conf, const std::set &changed) override { for (const char **i = get_tracked_conf_keys(); *i; ++i) { if (changed.count(std::string(*i))) { set_throttle_params(); return; } } } void complete_write(uint64_t ops, uint64_t bytes); JournalThrottle throttle; // write thread ceph::mutex write_lock = ceph::make_mutex("FileJournal::write_lock"); bool write_stop; bool aio_stop; ceph::condition_variable commit_cond; int _open(bool wr, bool create=false); int _open_block_device(); void _close(int fd) const; int _open_file(int64_t oldsize, blksize_t blksize, bool create); int _dump(std::ostream& out, bool simple); void print_header(const header_t &hdr) const; int read_header(header_t *hdr) const; ceph::bufferptr prepare_header(); void start_writer(); void stop_writer(); void write_thread_entry(); void queue_completions_thru(uint64_t seq); int check_for_full(uint64_t seq, off64_t pos, off64_t size); int prepare_multi_write(ceph::buffer::list& bl, uint64_t& orig_ops, uint64_t& orig_bytee); int prepare_single_write(write_item &next_write, ceph::buffer::list& bl, off64_t& queue_pos, uint64_t& orig_ops, uint64_t& orig_bytes); void do_write(ceph::buffer::list& bl); void write_finish_thread_entry(); void check_aio_completion(); void do_aio_write(ceph::buffer::list& bl); int write_aio_bl(off64_t& pos, ceph::buffer::list& bl, uint64_t seq); void check_align(off64_t pos, ceph::buffer::list& bl); int write_bl(off64_t& pos, ceph::buffer::list& bl); /// read len from journal starting at in_pos and wrapping up to len void wrap_read_bl( off64_t in_pos, ///< [in] start position int64_t len, ///< [in] length to read ceph::buffer::list* bl, ///< [out] result off64_t *out_pos ///< [out] next position to read, will be wrapped ) const; void do_discard(int64_t offset, int64_t end); class Writer : public Thread { FileJournal *journal; public: explicit Writer(FileJournal *fj) : journal(fj) {} void *entry() override { journal->write_thread_entry(); return 0; } } write_thread; class WriteFinisher : public Thread { FileJournal *journal; public: explicit WriteFinisher(FileJournal *fj) : journal(fj) {} void *entry() override { journal->write_finish_thread_entry(); return 0; } } write_finish_thread; off64_t get_top() const { return round_up_to(sizeof(header), block_size); } ZTracer::Endpoint trace_endpoint; public: FileJournal(CephContext* cct, uuid_d fsid, Finisher *fin, ceph::condition_variable *sync_cond, const char *f, bool dio=false, bool ai=true, bool faio=false) : Journal(cct, fsid, fin, sync_cond), journaled_seq(0), plug_journal_completions(false), fn(f), zero_buf(NULL), max_size(0), block_size(0), directio(dio), aio(ai), force_aio(faio), must_write_header(false), write_pos(0), read_pos(0), discard(false), last_committed_seq(0), journaled_since_start(0), full_state(FULL_NOTFULL), fd(-1), writing_seq(0), throttle(cct, cct->_conf->filestore_caller_concurrency), write_stop(true), aio_stop(true), write_thread(this), write_finish_thread(this), trace_endpoint("0.0.0.0", 0, "FileJournal") { if (aio && !directio) { lderr(cct) << "FileJournal::_open_any: aio not supported without directio; disabling aio" << dendl; aio = false; } #ifndef HAVE_LIBAIO if (aio && ::getenv("CEPH_DEV") == NULL) { lderr(cct) << "FileJournal::_open_any: libaio not compiled in; disabling aio" << dendl; aio = false; } #endif cct->_conf.add_observer(this); } ~FileJournal() override { ceph_assert(fd == -1); delete[] zero_buf; cct->_conf.remove_observer(this); } int check() override; int create() override; int open(uint64_t fs_op_seq) override; void close() override; int peek_fsid(uuid_d& fsid); int dump(std::ostream& out) override; int simple_dump(std::ostream& out); int _fdump(ceph::Formatter &f, bool simple); void flush() override; void get_devices(std::set *ls) override; void collect_metadata(std::map *pm) override; void reserve_throttle_and_backoff(uint64_t count) override; bool is_writeable() override { return read_pos == 0; } int make_writeable() override; // writes void commit_start(uint64_t seq) override; void committed_thru(uint64_t seq) override; bool should_commit_now() override { return full_state != FULL_NOTFULL && !write_stop; } void write_header_sync(); void set_wait_on_full(bool b) { wait_on_full = b; } off64_t get_journal_size_estimate() override; // reads /// Result code for read_entry enum read_entry_result { SUCCESS, FAILURE, MAYBE_CORRUPT }; /** * read_entry * * Reads next entry starting at pos. If the entry appears * clean, *bl will contain the payload, *seq will contain * the sequence number, and *out_pos will reflect the next * read position. If the entry is invalid *ss will contain * debug text, while *seq, *out_pos, and *bl will be unchanged. * * If the entry suggests a corrupt log, *ss will contain debug * text, *out_pos will contain the next index to check. If * we find an entry in this way that returns SUCCESS, the journal * is most likely corrupt. */ read_entry_result do_read_entry( off64_t pos, ///< [in] position to read off64_t *next_pos, ///< [out] next position to read ceph::buffer::list* bl, ///< [out] payload for successful read uint64_t *seq, ///< [out] seq of successful read std::ostream *ss, ///< [out] error output entry_header_t *h = 0 ///< [out] header ) const; ///< @return result code bool read_entry( ceph::buffer::list &bl, uint64_t &last_seq, bool *corrupt ); bool read_entry( ceph::buffer::list &bl, uint64_t &last_seq) override { return read_entry(bl, last_seq, 0); } // Debug/Testing void get_header( uint64_t wanted_seq, off64_t *_pos, entry_header_t *h); void corrupt( int wfd, off64_t corrupt_at); void corrupt_payload( int wfd, uint64_t seq); void corrupt_footer_magic( int wfd, uint64_t seq); void corrupt_header_magic( int wfd, uint64_t seq); }; WRITE_CLASS_ENCODER(FileJournal::header_t) #endif