diff options
Diffstat (limited to '')
-rw-r--r-- | src/osdc/ObjectCacher.h | 781 |
1 files changed, 781 insertions, 0 deletions
diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h new file mode 100644 index 000000000..2101692e1 --- /dev/null +++ b/src/osdc/ObjectCacher.h @@ -0,0 +1,781 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +#ifndef CEPH_OBJECTCACHER_H +#define CEPH_OBJECTCACHER_H + +#include "include/types.h" +#include "include/lru.h" +#include "include/Context.h" +#include "include/xlist.h" +#include "include/common_fwd.h" + +#include "common/Cond.h" +#include "common/Finisher.h" +#include "common/Thread.h" +#include "common/zipkin_trace.h" + +#include "Objecter.h" +#include "Striper.h" + +class WritebackHandler; + +enum { + l_objectcacher_first = 25000, + + l_objectcacher_cache_ops_hit, // ops we satisfy completely from cache + l_objectcacher_cache_ops_miss, // ops we don't satisfy completely from cache + + l_objectcacher_cache_bytes_hit, // bytes read directly from cache + + l_objectcacher_cache_bytes_miss, // bytes we couldn't read directly + + // from cache + + l_objectcacher_data_read, // total bytes read out + l_objectcacher_data_written, // bytes written to cache + l_objectcacher_data_flushed, // bytes flushed to WritebackHandler + l_objectcacher_overwritten_in_flush, // bytes overwritten while + // flushing is in progress + + l_objectcacher_write_ops_blocked, // total write ops we delayed due + // to dirty limits + l_objectcacher_write_bytes_blocked, // total number of write bytes + // we delayed due to dirty + // limits + l_objectcacher_write_time_blocked, // total time in seconds spent + // blocking a write due to dirty + // limits + + l_objectcacher_last, +}; + +class ObjectCacher { + PerfCounters *perfcounter; + public: + CephContext *cct; + class Object; + struct ObjectSet; + class C_ReadFinish; + + typedef void (*flush_set_callback_t) (void *p, ObjectSet *oset); + + // read scatter/gather + struct OSDRead { + std::vector<ObjectExtent> extents; + snapid_t snap; + ceph::buffer::list *bl; + int fadvise_flags; + OSDRead(snapid_t s, ceph::buffer::list *b, int f) + : snap(s), bl(b), fadvise_flags(f) {} + }; + + OSDRead *prepare_read(snapid_t snap, ceph::buffer::list *b, int f) const { + return new OSDRead(snap, b, f); + } + + // write scatter/gather + struct OSDWrite { + std::vector<ObjectExtent> extents; + SnapContext snapc; + ceph::buffer::list bl; + ceph::real_time mtime; + int fadvise_flags; + ceph_tid_t journal_tid; + OSDWrite(const SnapContext& sc, const ceph::buffer::list& b, ceph::real_time mt, + int f, ceph_tid_t _journal_tid) + : snapc(sc), bl(b), mtime(mt), fadvise_flags(f), + journal_tid(_journal_tid) {} + }; + + OSDWrite *prepare_write(const SnapContext& sc, + const ceph::buffer::list &b, + ceph::real_time mt, + int f, + ceph_tid_t journal_tid) const { + return new OSDWrite(sc, b, mt, f, journal_tid); + } + + + + // ******* BufferHead ********* + class BufferHead : public LRUObject { + public: + // states + static const int STATE_MISSING = 0; + static const int STATE_CLEAN = 1; + static const int STATE_ZERO = 2; // NOTE: these are *clean* zeros + static const int STATE_DIRTY = 3; + static const int STATE_RX = 4; + static const int STATE_TX = 5; + static const int STATE_ERROR = 6; // a read error occurred + + private: + // my fields + int state; + int ref; + struct { + loff_t start, length; // bh extent in object + } ex; + bool dontneed; //indicate bh don't need by anyone + bool nocache; //indicate bh don't need by this caller + + public: + Object *ob; + ceph::buffer::list bl; + ceph_tid_t last_write_tid; // version of bh (if non-zero) + ceph_tid_t last_read_tid; // tid of last read op (if any) + ceph::real_time last_write; + SnapContext snapc; + ceph_tid_t journal_tid; + int error; // holds return value for failed reads + + std::map<loff_t, std::list<Context*> > waitfor_read; + + // cons + explicit BufferHead(Object *o) : + state(STATE_MISSING), + ref(0), + dontneed(false), + nocache(false), + ob(o), + last_write_tid(0), + last_read_tid(0), + journal_tid(0), + error(0) { + ex.start = ex.length = 0; + } + + // extent + loff_t start() const { return ex.start; } + void set_start(loff_t s) { ex.start = s; } + loff_t length() const { return ex.length; } + void set_length(loff_t l) { ex.length = l; } + loff_t end() const { return ex.start + ex.length; } + loff_t last() const { return end() - 1; } + + // states + void set_state(int s) { + if (s == STATE_RX || s == STATE_TX) get(); + if (state == STATE_RX || state == STATE_TX) put(); + state = s; + } + int get_state() const { return state; } + + inline int get_error() const { + return error; + } + inline void set_error(int _error) { + error = _error; + } + + inline ceph_tid_t get_journal_tid() const { + return journal_tid; + } + inline void set_journal_tid(ceph_tid_t _journal_tid) { + journal_tid = _journal_tid; + } + + bool is_missing() const { return state == STATE_MISSING; } + bool is_dirty() const { return state == STATE_DIRTY; } + bool is_clean() const { return state == STATE_CLEAN; } + bool is_zero() const { return state == STATE_ZERO; } + bool is_tx() const { return state == STATE_TX; } + bool is_rx() const { return state == STATE_RX; } + bool is_error() const { return state == STATE_ERROR; } + + // reference counting + int get() { + ceph_assert(ref >= 0); + if (ref == 0) lru_pin(); + return ++ref; + } + int put() { + ceph_assert(ref > 0); + if (ref == 1) lru_unpin(); + --ref; + return ref; + } + + void set_dontneed(bool v) { + dontneed = v; + } + bool get_dontneed() const { + return dontneed; + } + + void set_nocache(bool v) { + nocache = v; + } + bool get_nocache() const { + return nocache; + } + + inline bool can_merge_journal(BufferHead *bh) const { + return (get_journal_tid() == bh->get_journal_tid()); + } + + struct ptr_lt { + bool operator()(const BufferHead* l, const BufferHead* r) const { + const Object *lob = l->ob; + const Object *rob = r->ob; + const ObjectSet *loset = lob->oset; + const ObjectSet *roset = rob->oset; + if (loset != roset) + return loset < roset; + if (lob != rob) + return lob < rob; + if (l->start() != r->start()) + return l->start() < r->start(); + return l < r; + } + }; + }; + + // ******* Object ********* + class Object : public LRUObject { + private: + // ObjectCacher::Object fields + int ref; + ObjectCacher *oc; + sobject_t oid; + friend struct ObjectSet; + + public: + uint64_t object_no; + ObjectSet *oset; + xlist<Object*>::item set_item; + object_locator_t oloc; + uint64_t truncate_size, truncate_seq; + + bool complete; + bool exists; + + std::map<loff_t, BufferHead*> data; + + ceph_tid_t last_write_tid; // version of bh (if non-zero) + ceph_tid_t last_commit_tid; // last update committed. + + int dirty_or_tx; + + std::map< ceph_tid_t, std::list<Context*> > waitfor_commit; + xlist<C_ReadFinish*> reads; + + Object(const Object&) = delete; + Object& operator=(const Object&) = delete; + + Object(ObjectCacher *_oc, sobject_t o, uint64_t ono, ObjectSet *os, + object_locator_t& l, uint64_t ts, uint64_t tq) : + ref(0), + oc(_oc), + oid(o), object_no(ono), oset(os), set_item(this), oloc(l), + truncate_size(ts), truncate_seq(tq), + complete(false), exists(true), + last_write_tid(0), last_commit_tid(0), + dirty_or_tx(0) { + // add to set + os->objects.push_back(&set_item); + } + ~Object() { + reads.clear(); + ceph_assert(ref == 0); + ceph_assert(data.empty()); + ceph_assert(dirty_or_tx == 0); + set_item.remove_myself(); + } + + sobject_t get_soid() const { return oid; } + object_t get_oid() { return oid.oid; } + snapid_t get_snap() { return oid.snap; } + ObjectSet *get_object_set() const { return oset; } + std::string get_namespace() { return oloc.nspace; } + uint64_t get_object_number() const { return object_no; } + + const object_locator_t& get_oloc() const { return oloc; } + void set_object_locator(object_locator_t& l) { oloc = l; } + + bool can_close() const { + if (lru_is_expireable()) { + ceph_assert(data.empty()); + ceph_assert(waitfor_commit.empty()); + return true; + } + return false; + } + + /** + * Check buffers and waiters for consistency + * - no overlapping buffers + * - index in map matches BH + * - waiters fall within BH + */ + void audit_buffers(); + + /** + * find first buffer that includes or follows an offset + * + * @param offset object byte offset + * @return iterator pointing to buffer, or data.end() + */ + std::map<loff_t,BufferHead*>::const_iterator data_lower_bound(loff_t offset) const { + auto p = data.lower_bound(offset); + if (p != data.begin() && + (p == data.end() || p->first > offset)) { + --p; // might overlap! + if (p->first + p->second->length() <= offset) + ++p; // doesn't overlap. + } + return p; + } + + // bh + // add to my map + void add_bh(BufferHead *bh) { + if (data.empty()) + get(); + ceph_assert(data.count(bh->start()) == 0); + data[bh->start()] = bh; + } + void remove_bh(BufferHead *bh) { + ceph_assert(data.count(bh->start())); + data.erase(bh->start()); + if (data.empty()) + put(); + } + + bool is_empty() const { return data.empty(); } + + // mid-level + BufferHead *split(BufferHead *bh, loff_t off); + void merge_left(BufferHead *left, BufferHead *right); + bool can_merge_bh(BufferHead *left, BufferHead *right); + void try_merge_bh(BufferHead *bh); + void maybe_rebuild_buffer(BufferHead *bh); + + bool is_cached(loff_t off, loff_t len) const; + bool include_all_cached_data(loff_t off, loff_t len); + int map_read(ObjectExtent &ex, + std::map<loff_t, BufferHead*>& hits, + std::map<loff_t, BufferHead*>& missing, + std::map<loff_t, BufferHead*>& rx, + std::map<loff_t, BufferHead*>& errors); + BufferHead *map_write(ObjectExtent &ex, ceph_tid_t tid); + + void replace_journal_tid(BufferHead *bh, ceph_tid_t tid); + void truncate(loff_t s); + void discard(loff_t off, loff_t len, C_GatherBuilder* commit_gather); + + // reference counting + int get() { + ceph_assert(ref >= 0); + if (ref == 0) lru_pin(); + return ++ref; + } + int put() { + ceph_assert(ref > 0); + if (ref == 1) lru_unpin(); + --ref; + return ref; + } + }; + + + struct ObjectSet { + void *parent; + + inodeno_t ino; + uint64_t truncate_seq, truncate_size; + + int64_t poolid; + xlist<Object*> objects; + + int dirty_or_tx; + bool return_enoent; + + ObjectSet(void *p, int64_t _poolid, inodeno_t i) + : parent(p), ino(i), truncate_seq(0), + truncate_size(0), poolid(_poolid), dirty_or_tx(0), + return_enoent(false) {} + + }; + + + // ******* ObjectCacher ********* + // ObjectCacher fields + private: + WritebackHandler& writeback_handler; + bool scattered_write; + + std::string name; + ceph::mutex& lock; + + uint64_t max_dirty, target_dirty, max_size, max_objects; + ceph::timespan max_dirty_age; + bool block_writes_upfront; + + ZTracer::Endpoint trace_endpoint; + + flush_set_callback_t flush_set_callback; + void *flush_set_callback_arg; + + // indexed by pool_id + std::vector<ceph::unordered_map<sobject_t, Object*> > objects; + + std::list<Context*> waitfor_read; + + ceph_tid_t last_read_tid; + + std::set<BufferHead*, BufferHead::ptr_lt> dirty_or_tx_bh; + LRU bh_lru_dirty, bh_lru_rest; + LRU ob_lru; + + ceph::condition_variable flusher_cond; + bool flusher_stop; + void flusher_entry(); + class FlusherThread : public Thread { + ObjectCacher *oc; + public: + explicit FlusherThread(ObjectCacher *o) : oc(o) {} + void *entry() override { + oc->flusher_entry(); + return 0; + } + } flusher_thread; + + Finisher finisher; + + // objects + Object *get_object_maybe(sobject_t oid, object_locator_t &l) { + // have it? + if (((uint32_t)l.pool < objects.size()) && + (objects[l.pool].count(oid))) + return objects[l.pool][oid]; + return NULL; + } + + Object *get_object(sobject_t oid, uint64_t object_no, ObjectSet *oset, + object_locator_t &l, uint64_t truncate_size, + uint64_t truncate_seq); + void close_object(Object *ob); + + // bh stats + ceph::condition_variable stat_cond; + + loff_t stat_clean; + loff_t stat_zero; + loff_t stat_dirty; + loff_t stat_rx; + loff_t stat_tx; + loff_t stat_missing; + loff_t stat_error; + loff_t stat_dirty_waiting; // bytes that writers are waiting on to write + + size_t stat_nr_dirty_waiters; + + void verify_stats() const; + + void bh_stat_add(BufferHead *bh); + void bh_stat_sub(BufferHead *bh); + loff_t get_stat_tx() const { return stat_tx; } + loff_t get_stat_rx() const { return stat_rx; } + loff_t get_stat_dirty() const { return stat_dirty; } + loff_t get_stat_clean() const { return stat_clean; } + loff_t get_stat_zero() const { return stat_zero; } + loff_t get_stat_dirty_waiting() const { return stat_dirty_waiting; } + size_t get_stat_nr_dirty_waiters() const { return stat_nr_dirty_waiters; } + + void touch_bh(BufferHead *bh) { + if (bh->is_dirty()) + bh_lru_dirty.lru_touch(bh); + else + bh_lru_rest.lru_touch(bh); + + bh->set_dontneed(false); + bh->set_nocache(false); + touch_ob(bh->ob); + } + void touch_ob(Object *ob) { + ob_lru.lru_touch(ob); + } + void bottouch_ob(Object *ob) { + ob_lru.lru_bottouch(ob); + } + + // bh states + void bh_set_state(BufferHead *bh, int s); + void copy_bh_state(BufferHead *bh1, BufferHead *bh2) { + bh_set_state(bh2, bh1->get_state()); + } + + void mark_missing(BufferHead *bh) { + bh_set_state(bh,BufferHead::STATE_MISSING); + } + void mark_clean(BufferHead *bh) { + bh_set_state(bh, BufferHead::STATE_CLEAN); + } + void mark_zero(BufferHead *bh) { + bh_set_state(bh, BufferHead::STATE_ZERO); + } + void mark_rx(BufferHead *bh) { + bh_set_state(bh, BufferHead::STATE_RX); + } + void mark_tx(BufferHead *bh) { + bh_set_state(bh, BufferHead::STATE_TX); } + void mark_error(BufferHead *bh) { + bh_set_state(bh, BufferHead::STATE_ERROR); + } + void mark_dirty(BufferHead *bh) { + bh_set_state(bh, BufferHead::STATE_DIRTY); + bh_lru_dirty.lru_touch(bh); + //bh->set_dirty_stamp(ceph_clock_now()); + } + + void bh_add(Object *ob, BufferHead *bh); + void bh_remove(Object *ob, BufferHead *bh); + + // io + void bh_read(BufferHead *bh, int op_flags, + const ZTracer::Trace &parent_trace); + void bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace); + void bh_write_scattered(std::list<BufferHead*>& blist); + void bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff, + int64_t *amount, int *max_count); + + void trim(); + void flush(ZTracer::Trace *trace, loff_t amount=0); + + /** + * flush a range of buffers + * + * Flush any buffers that intersect the specified extent. If len==0, + * flush *all* buffers for the object. + * + * @param o object + * @param off start offset + * @param len extent length, or 0 for entire object + * @return true if object was already clean/flushed. + */ + bool flush(Object *o, loff_t off, loff_t len, + ZTracer::Trace *trace); + loff_t release(Object *o); + void purge(Object *o); + + int64_t reads_outstanding; + ceph::condition_variable read_cond; + + int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, + bool external_call, ZTracer::Trace *trace); + void retry_waiting_reads(); + + public: + void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid, + loff_t offset, uint64_t length, + ceph::buffer::list &bl, int r, + bool trust_enoent); + void bh_write_commit(int64_t poolid, sobject_t oid, + std::vector<std::pair<loff_t, uint64_t> >& ranges, + ceph_tid_t t, int r); + + class C_WriteCommit; + class C_WaitForWrite; + + void perf_start(); + void perf_stop(); + + + + ObjectCacher(CephContext *cct_, std::string name, WritebackHandler& wb, ceph::mutex& l, + flush_set_callback_t flush_callback, + void *flush_callback_arg, + uint64_t max_bytes, uint64_t max_objects, + uint64_t max_dirty, uint64_t target_dirty, double max_age, + bool block_writes_upfront); + ~ObjectCacher(); + + void start() { + flusher_thread.create("flusher"); + } + void stop() { + ceph_assert(flusher_thread.is_started()); + lock.lock(); // hmm.. watch out for deadlock! + flusher_stop = true; + flusher_cond.notify_all(); + lock.unlock(); + flusher_thread.join(); + } + + + class C_RetryRead; + + + // non-blocking. async. + + /** + * @note total read size must be <= INT_MAX, since + * the return value is total bytes read + */ + int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, + ZTracer::Trace *parent_trace = nullptr); + int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace, + ZTracer::Trace *parent_trace = nullptr); + bool is_cached(ObjectSet *oset, std::vector<ObjectExtent>& extents, + snapid_t snapid); + +private: + // write blocking + int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, + ZTracer::Trace *trace, Context *onfreespace); + void _maybe_wait_for_writeback(uint64_t len, ZTracer::Trace *trace); + bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish); + + void _discard(ObjectSet *oset, const std::vector<ObjectExtent>& exls, + C_GatherBuilder* gather); + void _discard_finish(ObjectSet *oset, bool was_dirty, Context* on_finish); + +public: + bool set_is_empty(ObjectSet *oset); + bool set_is_cached(ObjectSet *oset); + bool set_is_dirty_or_committing(ObjectSet *oset); + + bool flush_set(ObjectSet *oset, Context *onfinish=0); + bool flush_set(ObjectSet *oset, std::vector<ObjectExtent>& ex, + ZTracer::Trace *trace, Context *onfinish = 0); + bool flush_all(Context *onfinish = 0); + + void purge_set(ObjectSet *oset); + + // returns # of bytes not released (ie non-clean) + loff_t release_set(ObjectSet *oset); + uint64_t release_all(); + + void discard_set(ObjectSet *oset, const std::vector<ObjectExtent>& ex); + void discard_writeback(ObjectSet *oset, const std::vector<ObjectExtent>& ex, + Context* on_finish); + + /** + * Retry any in-flight reads that get -ENOENT instead of marking + * them zero, and get rid of any cached -ENOENTs. + * After this is called and the cache's lock is unlocked, + * any new requests will treat -ENOENT normally. + */ + void clear_nonexistence(ObjectSet *oset); + + + // cache sizes + void set_max_dirty(uint64_t v) { + max_dirty = v; + } + void set_target_dirty(int64_t v) { + target_dirty = v; + } + void set_max_size(int64_t v) { + max_size = v; + } + void set_max_dirty_age(double a) { + max_dirty_age = ceph::make_timespan(a); + } + void set_max_objects(int64_t v) { + max_objects = v; + } + + + // file functions + + /*** async+caching (non-blocking) file interface ***/ + int file_is_cached(ObjectSet *oset, file_layout_t *layout, + snapid_t snapid, loff_t offset, uint64_t len) { + std::vector<ObjectExtent> extents; + Striper::file_to_extents(cct, oset->ino, layout, offset, len, + oset->truncate_size, extents); + return is_cached(oset, extents, snapid); + } + + int file_read(ObjectSet *oset, file_layout_t *layout, snapid_t snapid, + loff_t offset, uint64_t len, ceph::buffer::list *bl, int flags, + Context *onfinish) { + OSDRead *rd = prepare_read(snapid, bl, flags); + Striper::file_to_extents(cct, oset->ino, layout, offset, len, + oset->truncate_size, rd->extents); + return readx(rd, oset, onfinish); + } + + int file_write(ObjectSet *oset, file_layout_t *layout, + const SnapContext& snapc, loff_t offset, uint64_t len, + ceph::buffer::list& bl, ceph::real_time mtime, int flags) { + OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0); + Striper::file_to_extents(cct, oset->ino, layout, offset, len, + oset->truncate_size, wr->extents); + return writex(wr, oset, nullptr); + } + + bool file_flush(ObjectSet *oset, file_layout_t *layout, + const SnapContext& snapc, loff_t offset, uint64_t len, + Context *onfinish) { + std::vector<ObjectExtent> extents; + Striper::file_to_extents(cct, oset->ino, layout, offset, len, + oset->truncate_size, extents); + ZTracer::Trace trace; + return flush_set(oset, extents, &trace, onfinish); + } +}; + + +inline std::ostream& operator<<(std::ostream &out, + const ObjectCacher::BufferHead &bh) +{ + out << "bh[ " << &bh << " " + << bh.start() << "~" << bh.length() + << " " << bh.ob + << " (" << bh.bl.length() << ")" + << " v " << bh.last_write_tid; + if (bh.get_journal_tid() != 0) { + out << " j " << bh.get_journal_tid(); + } + if (bh.is_tx()) out << " tx"; + if (bh.is_rx()) out << " rx"; + if (bh.is_dirty()) out << " dirty"; + if (bh.is_clean()) out << " clean"; + if (bh.is_zero()) out << " zero"; + if (bh.is_missing()) out << " missing"; + if (bh.bl.length() > 0) out << " firstbyte=" << (int)bh.bl[0]; + if (bh.error) out << " error=" << bh.error; + out << "]"; + out << " waiters = {"; + for (auto it = bh.waitfor_read.begin(); it != bh.waitfor_read.end(); ++it) { + out << " " << it->first << "->["; + for (auto lit = it->second.begin(); + lit != it->second.end(); ++lit) { + out << *lit << ", "; + } + out << "]"; + } + out << "}"; + return out; +} + +inline std::ostream& operator<<(std::ostream &out, + const ObjectCacher::ObjectSet &os) +{ + return out << "objectset[" << os.ino + << " ts " << os.truncate_seq << "/" << os.truncate_size + << " objects " << os.objects.size() + << " dirty_or_tx " << os.dirty_or_tx + << "]"; +} + +inline std::ostream& operator<<(std::ostream &out, + const ObjectCacher::Object &ob) +{ + out << "object[" + << ob.get_soid() << " oset " << ob.oset << std::dec + << " wr " << ob.last_write_tid << "/" << ob.last_commit_tid; + + if (ob.complete) + out << " COMPLETE"; + if (!ob.exists) + out << " !EXISTS"; + + out << "]"; + return out; +} + +#endif |