diff options
Diffstat (limited to 'src/osdc/Journaler.h')
-rw-r--r-- | src/osdc/Journaler.h | 540 |
1 files changed, 540 insertions, 0 deletions
diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h new file mode 100644 index 00000000..e3cd9e6c --- /dev/null +++ b/src/osdc/Journaler.h @@ -0,0 +1,540 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +/* Journaler + * + * This class stripes a serial log over objects on the store. Four + * logical pointers: + * + * write_pos - where we're writing new entries + * unused_field - where we're reading old entires + * expire_pos - what is deemed "old" by user + * trimmed_pos - where we're expiring old items + * + * trimmed_pos <= expire_pos <= unused_field <= write_pos. + * + * Often, unused_field <= write_pos (as with MDS log). During + * recovery, write_pos is undefined until the end of the log is + * discovered. + * + * A "head" struct at the beginning of the log is used to store + * metadata at regular intervals. The basic invariants include: + * + * head.unused_field <= unused_field -- the head may "lag", since + * it's updated lazily. + * head.write_pos <= write_pos + * head.expire_pos <= expire_pos + * head.trimmed_pos <= trimmed_pos + * + * More significantly, + * + * head.expire_pos >= trimmed_pos -- this ensures we can find the + * "beginning" of the log as last + * recorded, before it is trimmed. + * trimming will block until a + * sufficiently current expire_pos + * is committed. + * + * To recover log state, we simply start at the last write_pos in the + * head, and probe the object sequence sizes until we read the end. + * + * Head struct is stored in the first object. Actual journal starts + * after layout.period() bytes. + * + */ + +#ifndef CEPH_JOURNALER_H +#define CEPH_JOURNALER_H + +#include <list> +#include <map> + +#include "Objecter.h" +#include "Filer.h" + +#include "common/Timer.h" +#include "common/Throttle.h" + +class CephContext; +class Context; +class PerfCounters; +class Finisher; +class C_OnFinisher; + +typedef __u8 stream_format_t; + +// Legacy envelope is leading uint32_t size +enum StreamFormat { + JOURNAL_FORMAT_LEGACY = 0, + JOURNAL_FORMAT_RESILIENT = 1, + // Insert new formats here, before COUNT + JOURNAL_FORMAT_COUNT +}; + +// Highest journal format version that we support +#define JOURNAL_FORMAT_MAX (JOURNAL_FORMAT_COUNT - 1) + +// Legacy envelope is leading uint32_t size +#define JOURNAL_ENVELOPE_LEGACY (sizeof(uint32_t)) + +// Resilient envelope is leading uint64_t sentinel, uint32_t size, +// trailing uint64_t start_ptr +#define JOURNAL_ENVELOPE_RESILIENT (sizeof(uint32_t) + sizeof(uint64_t) + \ + sizeof(uint64_t)) + +/** + * Represents a collection of entries serialized in a byte stream. + * + * Each entry consists of: + * - a blob (used by the next level up as a serialized LogEvent) + * - a uint64_t (used by the next level up as a pointer to the start + * of the entry in the collection bytestream) + */ +class JournalStream +{ + stream_format_t format; + + public: + JournalStream(stream_format_t format_) : format(format_) {} + + void set_format(stream_format_t format_) {format = format_;} + + bool readable(bufferlist &bl, uint64_t *need) const; + size_t read(bufferlist &from, bufferlist *to, uint64_t *start_ptr); + size_t write(bufferlist &entry, bufferlist *to, uint64_t const &start_ptr); + size_t get_envelope_size() const { + if (format >= JOURNAL_FORMAT_RESILIENT) { + return JOURNAL_ENVELOPE_RESILIENT; + } else { + return JOURNAL_ENVELOPE_LEGACY; + } + } + + // A magic number for the start of journal entries, so that we can + // identify them in damaged journals. + static const uint64_t sentinel = 0x3141592653589793; +}; + + +class Journaler { +public: + // this goes at the head of the log "file". + class Header { + public: + uint64_t trimmed_pos; + uint64_t expire_pos; + uint64_t unused_field; + uint64_t write_pos; + string magic; + file_layout_t layout; //< The mapping from byte stream offsets + // to RADOS objects + stream_format_t stream_format; //< The encoding of LogEvents + // within the journal byte stream + + Header(const char *m="") : + trimmed_pos(0), expire_pos(0), unused_field(0), write_pos(0), magic(m), + stream_format(-1) { + } + + void encode(bufferlist &bl) const { + ENCODE_START(2, 2, bl); + encode(magic, bl); + encode(trimmed_pos, bl); + encode(expire_pos, bl); + encode(unused_field, bl); + encode(write_pos, bl); + encode(layout, bl, 0); // encode in legacy format + encode(stream_format, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator &bl) { + DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl); + decode(magic, bl); + decode(trimmed_pos, bl); + decode(expire_pos, bl); + decode(unused_field, bl); + decode(write_pos, bl); + decode(layout, bl); + if (struct_v > 1) { + decode(stream_format, bl); + } else { + stream_format = JOURNAL_FORMAT_LEGACY; + } + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const { + f->open_object_section("journal_header"); + { + f->dump_string("magic", magic); + f->dump_unsigned("write_pos", write_pos); + f->dump_unsigned("expire_pos", expire_pos); + f->dump_unsigned("trimmed_pos", trimmed_pos); + f->dump_unsigned("stream_format", stream_format); + f->dump_object("layout", layout); + } + f->close_section(); // journal_header + } + + static void generate_test_instances(list<Header*> &ls) + { + ls.push_back(new Header()); + + ls.push_back(new Header()); + ls.back()->trimmed_pos = 1; + ls.back()->expire_pos = 2; + ls.back()->unused_field = 3; + ls.back()->write_pos = 4; + ls.back()->magic = "magique"; + + ls.push_back(new Header()); + ls.back()->stream_format = JOURNAL_FORMAT_RESILIENT; + } + }; + WRITE_CLASS_ENCODER(Header) + + uint32_t get_stream_format() const { + return stream_format; + } + + Header last_committed; + +private: + // me + CephContext *cct; + std::mutex lock; + const std::string name; + typedef std::lock_guard<std::mutex> lock_guard; + typedef std::unique_lock<std::mutex> unique_lock; + Finisher *finisher; + Header last_written; + inodeno_t ino; + int64_t pg_pool; + bool readonly; + file_layout_t layout; + uint32_t stream_format; + JournalStream journal_stream; + + const char *magic; + Objecter *objecter; + Filer filer; + + PerfCounters *logger; + int logger_key_lat; + + class C_DelayFlush; + C_DelayFlush *delay_flush_event; + /* + * Do a flush as a result of a C_DelayFlush context. + */ + void _do_delayed_flush() + { + ceph_assert(delay_flush_event != NULL); + lock_guard l(lock); + delay_flush_event = NULL; + _do_flush(); + } + + // my state + static const int STATE_UNDEF = 0; + static const int STATE_READHEAD = 1; + static const int STATE_PROBING = 2; + static const int STATE_ACTIVE = 3; + static const int STATE_REREADHEAD = 4; + static const int STATE_REPROBING = 5; + static const int STATE_STOPPING = 6; + + int state; + int error; + + void _write_head(Context *oncommit=NULL); + void _wait_for_flush(Context *onsafe); + void _trim(); + + // header + ceph::real_time last_wrote_head; + void _finish_write_head(int r, Header &wrote, C_OnFinisher *oncommit); + class C_WriteHead; + friend class C_WriteHead; + + void _reread_head(Context *onfinish); + void _set_layout(file_layout_t const *l); + list<Context*> waitfor_recover; + void _read_head(Context *on_finish, bufferlist *bl); + void _finish_read_head(int r, bufferlist& bl); + void _finish_reread_head(int r, bufferlist& bl, Context *finish); + void _probe(Context *finish, uint64_t *end); + void _finish_probe_end(int r, uint64_t end); + void _reprobe(C_OnFinisher *onfinish); + void _finish_reprobe(int r, uint64_t end, C_OnFinisher *onfinish); + void _finish_reread_head_and_probe(int r, C_OnFinisher *onfinish); + class C_ReadHead; + friend class C_ReadHead; + class C_ProbeEnd; + friend class C_ProbeEnd; + class C_RereadHead; + friend class C_RereadHead; + class C_ReProbe; + friend class C_ReProbe; + class C_RereadHeadProbe; + friend class C_RereadHeadProbe; + + // writer + uint64_t prezeroing_pos; + uint64_t prezero_pos; ///< we zero journal space ahead of write_pos to + // avoid problems with tail probing + uint64_t write_pos; ///< logical write position, where next entry + // will go + uint64_t flush_pos; ///< where we will flush. if + /// write_pos>flush_pos, we're buffering writes. + uint64_t safe_pos; ///< what has been committed safely to disk. + + uint64_t next_safe_pos; /// start position of the first entry that isn't + /// being fully flushed. If we don't flush any + // partial entry, it's equal to flush_pos. + + bufferlist write_buf; ///< write buffer. flush_pos + + /// write_buf.length() == write_pos. + + // protect write_buf from bufferlist _len overflow + Throttle write_buf_throttle; + + uint64_t waiting_for_zero_pos; + interval_set<uint64_t> pending_zero; // non-contig bits we've zeroed + list<Context*> waitfor_prezero; + + std::map<uint64_t, uint64_t> pending_safe; // flush_pos -> safe_pos + // when safe through given offset + std::map<uint64_t, std::list<Context*> > waitfor_safe; + + void _flush(C_OnFinisher *onsafe); + void _do_flush(unsigned amount=0); + void _finish_flush(int r, uint64_t start, ceph::real_time stamp); + class C_Flush; + friend class C_Flush; + + // reader + uint64_t read_pos; // logical read position, where next entry starts. + uint64_t requested_pos; // what we've requested from OSD. + uint64_t received_pos; // what we've received from OSD. + // read buffer. unused_field + read_buf.length() == prefetch_pos. + bufferlist read_buf; + + map<uint64_t,bufferlist> prefetch_buf; + + uint64_t fetch_len; // how much to read at a time + uint64_t temp_fetch_len; + + // for wait_for_readable() + C_OnFinisher *on_readable; + C_OnFinisher *on_write_error; + bool called_write_error; + + // read completion callback + void _finish_read(int r, uint64_t offset, uint64_t length, bufferlist &bl); + void _finish_retry_read(int r); + void _assimilate_prefetch(); + void _issue_read(uint64_t len); // read some more + void _prefetch(); // maybe read ahead + class C_Read; + friend class C_Read; + class C_RetryRead; + friend class C_RetryRead; + + // trimmer + uint64_t expire_pos; // what we're allowed to trim to + uint64_t trimming_pos; // what we've requested to trim through + uint64_t trimmed_pos; // what has been trimmed + + bool readable; + + void _finish_trim(int r, uint64_t to); + class C_Trim; + friend class C_Trim; + + void _issue_prezero(); + void _finish_prezero(int r, uint64_t from, uint64_t len); + friend struct C_Journaler_Prezero; + + // only init_headers when following or first reading off-disk + void init_headers(Header& h) { + ceph_assert(readonly || + state == STATE_READHEAD || + state == STATE_REREADHEAD); + last_written = last_committed = h; + } + + /** + * handle a write error + * + * called when we get an objecter error on a write. + * + * @param r error code + */ + void handle_write_error(int r); + + bool _is_readable(); + + void _finish_erase(int data_result, C_OnFinisher *completion); + class C_EraseFinish; + friend class C_EraseFinish; + + C_OnFinisher *wrap_finisher(Context *c); + + uint32_t write_iohint; // the fadvise flags for write op, see + // CEPH_OSD_OP_FADIVSE_* + +public: + Journaler(const std::string &name_, inodeno_t ino_, int64_t pool, + const char *mag, Objecter *obj, PerfCounters *l, int lkey, Finisher *f) : + last_committed(mag), + cct(obj->cct), name(name_), finisher(f), last_written(mag), + ino(ino_), pg_pool(pool), readonly(true), + stream_format(-1), journal_stream(-1), + magic(mag), + objecter(obj), filer(objecter, f), logger(l), logger_key_lat(lkey), + delay_flush_event(0), + state(STATE_UNDEF), error(0), + prezeroing_pos(0), prezero_pos(0), write_pos(0), flush_pos(0), + safe_pos(0), next_safe_pos(0), + write_buf_throttle(cct, "write_buf_throttle", UINT_MAX - (UINT_MAX >> 3)), + waiting_for_zero_pos(0), + read_pos(0), requested_pos(0), received_pos(0), + fetch_len(0), temp_fetch_len(0), + on_readable(0), on_write_error(NULL), called_write_error(false), + expire_pos(0), trimming_pos(0), trimmed_pos(0), readable(false), + write_iohint(0) + { + } + + /* reset + * + * NOTE: we assume the caller knows/has ensured that any objects in + * our sequence do not exist.. e.g. after a MKFS. this is _not_ an + * "erase" method. + */ + void reset() { + lock_guard l(lock); + ceph_assert(state == STATE_ACTIVE); + + readonly = true; + delay_flush_event = NULL; + state = STATE_UNDEF; + error = 0; + prezeroing_pos = 0; + prezero_pos = 0; + write_pos = 0; + flush_pos = 0; + safe_pos = 0; + next_safe_pos = 0; + read_pos = 0; + requested_pos = 0; + received_pos = 0; + fetch_len = 0; + ceph_assert(!on_readable); + expire_pos = 0; + trimming_pos = 0; + trimmed_pos = 0; + waiting_for_zero_pos = 0; + } + + // Asynchronous operations + // ======================= + void erase(Context *completion); + void create(file_layout_t *layout, stream_format_t const sf); + void recover(Context *onfinish); + void reread_head(Context *onfinish); + void reread_head_and_probe(Context *onfinish); + void write_head(Context *onsave=0); + void wait_for_flush(Context *onsafe = 0); + void flush(Context *onsafe = 0); + void wait_for_readable(Context *onfinish); + bool have_waiter() const; + void wait_for_prezero(Context *onfinish); + + // Synchronous setters + // =================== + void set_layout(file_layout_t const *l); + void set_readonly(); + void set_writeable(); + void set_write_pos(uint64_t p) { + lock_guard l(lock); + prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = p; + } + void set_read_pos(uint64_t p) { + lock_guard l(lock); + // we can't cope w/ in-progress read right now. + ceph_assert(requested_pos == received_pos); + read_pos = requested_pos = received_pos = p; + read_buf.clear(); + } + uint64_t append_entry(bufferlist& bl); + void set_expire_pos(uint64_t ep) { + lock_guard l(lock); + expire_pos = ep; + } + void set_trimmed_pos(uint64_t p) { + lock_guard l(lock); + trimming_pos = trimmed_pos = p; + } + + bool _write_head_needed(); + bool write_head_needed() { + lock_guard l(lock); + return _write_head_needed(); + } + + + void trim(); + void trim_tail() { + lock_guard l(lock); + + ceph_assert(!readonly); + _issue_prezero(); + } + + void set_write_error_handler(Context *c); + + void set_write_iohint(uint32_t iohint_flags) { + write_iohint = iohint_flags; + } + /** + * Cause any ongoing waits to error out with -EAGAIN, set error + * to -EAGAIN. + */ + void shutdown(); +public: + + // Synchronous getters + // =================== + // TODO: need some locks on reads for true safety + uint64_t get_layout_period() const { + return layout.get_period(); + } + file_layout_t& get_layout() { return layout; } + bool is_active() { return state == STATE_ACTIVE; } + bool is_stopping() { return state == STATE_STOPPING; } + int get_error() { return error; } + bool is_readonly() { return readonly; } + bool is_readable(); + bool try_read_entry(bufferlist& bl); + uint64_t get_write_pos() const { return write_pos; } + uint64_t get_write_safe_pos() const { return safe_pos; } + uint64_t get_read_pos() const { return read_pos; } + uint64_t get_expire_pos() const { return expire_pos; } + uint64_t get_trimmed_pos() const { return trimmed_pos; } +}; +WRITE_CLASS_ENCODER(Journaler::Header) + +#endif |