summaryrefslogtreecommitdiffstats
path: root/src/osdc/ObjectCacher.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/osdc/ObjectCacher.cc')
-rw-r--r--src/osdc/ObjectCacher.cc2807
1 files changed, 2807 insertions, 0 deletions
diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc
new file mode 100644
index 000000000..f2b6d9736
--- /dev/null
+++ b/src/osdc/ObjectCacher.cc
@@ -0,0 +1,2807 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <limits.h>
+
+#include "msg/Messenger.h"
+#include "ObjectCacher.h"
+#include "WritebackHandler.h"
+#include "common/errno.h"
+#include "common/perf_counters.h"
+
+#include "include/ceph_assert.h"
+
+#define MAX_FLUSH_UNDER_LOCK 20 ///< max bh's we start writeback on
+#define BUFFER_MEMORY_WEIGHT CEPH_PAGE_SHIFT // memory usage of BufferHead, count in (1<<n)
+ /// while holding the lock
+
+using std::chrono::seconds;
+using std::list;
+using std::map;
+using std::make_pair;
+using std::pair;
+using std::set;
+using std::string;
+using std::vector;
+
+using ceph::bufferlist;
+
+using namespace std::literals;
+
+/*** ObjectCacher::BufferHead ***/
+
+
+/*** ObjectCacher::Object ***/
+
+#define dout_subsys ceph_subsys_objectcacher
+#undef dout_prefix
+#define dout_prefix *_dout << "objectcacher.object(" << oid << ") "
+
+
+
+class ObjectCacher::C_ReadFinish : public Context {
+ ObjectCacher *oc;
+ int64_t poolid;
+ sobject_t oid;
+ loff_t start;
+ uint64_t length;
+ xlist<C_ReadFinish*>::item set_item;
+ bool trust_enoent;
+ ceph_tid_t tid;
+ ZTracer::Trace trace;
+
+public:
+ bufferlist bl;
+ C_ReadFinish(ObjectCacher *c, Object *ob, ceph_tid_t t, loff_t s,
+ uint64_t l, const ZTracer::Trace &trace) :
+ oc(c), poolid(ob->oloc.pool), oid(ob->get_soid()), start(s), length(l),
+ set_item(this), trust_enoent(true),
+ tid(t), trace(trace) {
+ ob->reads.push_back(&set_item);
+ }
+
+ void finish(int r) override {
+ oc->bh_read_finish(poolid, oid, tid, start, length, bl, r, trust_enoent);
+ trace.event("finish");
+
+ // object destructor clears the list
+ if (set_item.is_on_list())
+ set_item.remove_myself();
+ }
+
+ void distrust_enoent() {
+ trust_enoent = false;
+ }
+};
+
+class ObjectCacher::C_RetryRead : public Context {
+ ObjectCacher *oc;
+ OSDRead *rd;
+ ObjectSet *oset;
+ Context *onfinish;
+ ZTracer::Trace trace;
+public:
+ C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c,
+ const ZTracer::Trace &trace)
+ : oc(_oc), rd(r), oset(os), onfinish(c), trace(trace) {
+ }
+ void finish(int r) override {
+ if (r >= 0) {
+ r = oc->_readx(rd, oset, onfinish, false, &trace);
+ }
+
+ if (r == 0) {
+ // read is still in-progress
+ return;
+ }
+
+ trace.event("finish");
+ if (onfinish) {
+ onfinish->complete(r);
+ }
+ }
+};
+
+ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left,
+ loff_t off)
+{
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
+ ldout(oc->cct, 20) << "split " << *left << " at " << off << dendl;
+
+ // split off right
+ ObjectCacher::BufferHead *right = new BufferHead(this);
+
+ //inherit and if later access, this auto clean.
+ right->set_dontneed(left->get_dontneed());
+ right->set_nocache(left->get_nocache());
+
+ right->last_write_tid = left->last_write_tid;
+ right->last_read_tid = left->last_read_tid;
+ right->set_state(left->get_state());
+ right->set_error(left->error);
+ right->snapc = left->snapc;
+ right->set_journal_tid(left->journal_tid);
+
+ loff_t newleftlen = off - left->start();
+ right->set_start(off);
+ right->set_length(left->length() - newleftlen);
+
+ // shorten left
+ oc->bh_stat_sub(left);
+ left->set_length(newleftlen);
+ oc->bh_stat_add(left);
+
+ // add right
+ oc->bh_add(this, right);
+
+ // split buffers too
+ bufferlist bl;
+ bl = std::move(left->bl);
+ if (bl.length()) {
+ ceph_assert(bl.length() == (left->length() + right->length()));
+ right->bl.substr_of(bl, left->length(), right->length());
+ left->bl.substr_of(bl, 0, left->length());
+ }
+
+ // move read waiters
+ if (!left->waitfor_read.empty()) {
+ auto start_remove = left->waitfor_read.begin();
+ while (start_remove != left->waitfor_read.end() &&
+ start_remove->first < right->start())
+ ++start_remove;
+ for (auto p = start_remove; p != left->waitfor_read.end(); ++p) {
+ ldout(oc->cct, 20) << "split moving waiters at byte " << p->first
+ << " to right bh" << dendl;
+ right->waitfor_read[p->first].swap( p->second );
+ ceph_assert(p->second.empty());
+ }
+ left->waitfor_read.erase(start_remove, left->waitfor_read.end());
+ }
+
+ ldout(oc->cct, 20) << "split left is " << *left << dendl;
+ ldout(oc->cct, 20) << "split right is " << *right << dendl;
+ return right;
+}
+
+
+void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
+{
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
+
+ ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
+ if (left->get_journal_tid() == 0) {
+ left->set_journal_tid(right->get_journal_tid());
+ }
+ right->set_journal_tid(0);
+
+ oc->bh_remove(this, right);
+ oc->bh_stat_sub(left);
+ left->set_length(left->length() + right->length());
+ oc->bh_stat_add(left);
+
+ // data
+ left->bl.claim_append(right->bl);
+
+ // version
+ // note: this is sorta busted, but should only be used for dirty buffers
+ left->last_write_tid = std::max( left->last_write_tid, right->last_write_tid );
+ left->last_write = std::max( left->last_write, right->last_write );
+
+ left->set_dontneed(right->get_dontneed() ? left->get_dontneed() : false);
+ left->set_nocache(right->get_nocache() ? left->get_nocache() : false);
+
+ // waiters
+ for (auto p = right->waitfor_read.begin();
+ p != right->waitfor_read.end();
+ ++p)
+ left->waitfor_read[p->first].splice(left->waitfor_read[p->first].begin(),
+ p->second );
+
+ // hose right
+ delete right;
+
+ ldout(oc->cct, 10) << "merge_left result " << *left << dendl;
+}
+
+bool ObjectCacher::Object::can_merge_bh(BufferHead *left, BufferHead *right)
+{
+ if (left->end() != right->start() ||
+ left->get_state() != right->get_state() ||
+ !left->can_merge_journal(right))
+ return false;
+ if (left->is_tx() && left->last_write_tid != right->last_write_tid)
+ return false;
+ return true;
+}
+
+void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
+{
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
+ ldout(oc->cct, 10) << "try_merge_bh " << *bh << dendl;
+
+ // do not merge rx buffers; last_read_tid may not match
+ if (bh->is_rx())
+ return;
+
+ // to the left?
+ auto p = data.find(bh->start());
+ ceph_assert(p->second == bh);
+ if (p != data.begin()) {
+ --p;
+ if (can_merge_bh(p->second, bh)) {
+ merge_left(p->second, bh);
+ bh = p->second;
+ } else {
+ ++p;
+ }
+ }
+ // to the right?
+ ceph_assert(p->second == bh);
+ ++p;
+ if (p != data.end() && can_merge_bh(bh, p->second))
+ merge_left(bh, p->second);
+
+ maybe_rebuild_buffer(bh);
+}
+
+void ObjectCacher::Object::maybe_rebuild_buffer(BufferHead *bh)
+{
+ auto& bl = bh->bl;
+ if (bl.get_num_buffers() <= 1)
+ return;
+
+ auto wasted = bl.get_wasted_space();
+ if (wasted * 2 > bl.length() &&
+ wasted > (1U << BUFFER_MEMORY_WEIGHT))
+ bl.rebuild();
+}
+
+/*
+ * count bytes we have cached in given range
+ */
+bool ObjectCacher::Object::is_cached(loff_t cur, loff_t left) const
+{
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
+ auto p = data_lower_bound(cur);
+ while (left > 0) {
+ if (p == data.end())
+ return false;
+
+ if (p->first <= cur) {
+ // have part of it
+ loff_t lenfromcur = std::min(p->second->end() - cur, left);
+ cur += lenfromcur;
+ left -= lenfromcur;
+ ++p;
+ continue;
+ } else if (p->first > cur) {
+ // gap
+ return false;
+ } else
+ ceph_abort();
+ }
+
+ return true;
+}
+
+/*
+ * all cached data in this range[off, off+len]
+ */
+bool ObjectCacher::Object::include_all_cached_data(loff_t off, loff_t len)
+{
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
+ if (data.empty())
+ return true;
+ auto first = data.begin();
+ auto last = data.rbegin();
+ if (first->second->start() >= off && last->second->end() <= (off + len))
+ return true;
+ else
+ return false;
+}
+
+/*
+ * map a range of bytes into buffer_heads.
+ * - create missing buffer_heads as necessary.
+ */
+int ObjectCacher::Object::map_read(ObjectExtent &ex,
+ map<loff_t, BufferHead*>& hits,
+ map<loff_t, BufferHead*>& missing,
+ map<loff_t, BufferHead*>& rx,
+ map<loff_t, BufferHead*>& errors)
+{
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
+ ldout(oc->cct, 10) << "map_read " << ex.oid << " "
+ << ex.offset << "~" << ex.length << dendl;
+
+ loff_t cur = ex.offset;
+ loff_t left = ex.length;
+
+ auto p = data_lower_bound(ex.offset);
+ while (left > 0) {
+ // at end?
+ if (p == data.end()) {
+ // rest is a miss.
+ BufferHead *n = new BufferHead(this);
+ n->set_start(cur);
+ n->set_length(left);
+ oc->bh_add(this, n);
+ if (complete) {
+ oc->mark_zero(n);
+ hits[cur] = n;
+ ldout(oc->cct, 20) << "map_read miss+complete+zero " << left << " left, " << *n << dendl;
+ } else {
+ missing[cur] = n;
+ ldout(oc->cct, 20) << "map_read miss " << left << " left, " << *n << dendl;
+ }
+ cur += left;
+ ceph_assert(cur == (loff_t)ex.offset + (loff_t)ex.length);
+ break; // no more.
+ }
+
+ if (p->first <= cur) {
+ // have it (or part of it)
+ BufferHead *e = p->second;
+
+ if (e->is_clean() ||
+ e->is_dirty() ||
+ e->is_tx() ||
+ e->is_zero()) {
+ hits[cur] = e; // readable!
+ ldout(oc->cct, 20) << "map_read hit " << *e << dendl;
+ } else if (e->is_rx()) {
+ rx[cur] = e; // missing, not readable.
+ ldout(oc->cct, 20) << "map_read rx " << *e << dendl;
+ } else if (e->is_error()) {
+ errors[cur] = e;
+ ldout(oc->cct, 20) << "map_read error " << *e << dendl;
+ } else {
+ ceph_abort();
+ }
+
+ loff_t lenfromcur = std::min(e->end() - cur, left);
+ cur += lenfromcur;
+ left -= lenfromcur;
+ ++p;
+ continue; // more?
+
+ } else if (p->first > cur) {
+ // gap.. miss
+ loff_t next = p->first;
+ BufferHead *n = new BufferHead(this);
+ loff_t len = std::min(next - cur, left);
+ n->set_start(cur);
+ n->set_length(len);
+ oc->bh_add(this,n);
+ if (complete) {
+ oc->mark_zero(n);
+ hits[cur] = n;
+ ldout(oc->cct, 20) << "map_read gap+complete+zero " << *n << dendl;
+ } else {
+ missing[cur] = n;
+ ldout(oc->cct, 20) << "map_read gap " << *n << dendl;
+ }
+ cur += std::min(left, n->length());
+ left -= std::min(left, n->length());
+ continue; // more?
+ } else {
+ ceph_abort();
+ }
+ }
+ return 0;
+}
+
+void ObjectCacher::Object::audit_buffers()
+{
+ loff_t offset = 0;
+ for (auto it = data.begin(); it != data.end(); ++it) {
+ if (it->first != it->second->start()) {
+ lderr(oc->cct) << "AUDIT FAILURE: map position " << it->first
+ << " does not match bh start position: "
+ << *it->second << dendl;
+ ceph_assert(it->first == it->second->start());
+ }
+ if (it->first < offset) {
+ lderr(oc->cct) << "AUDIT FAILURE: " << it->first << " " << *it->second
+ << " overlaps with previous bh " << *((--it)->second)
+ << dendl;
+ ceph_assert(it->first >= offset);
+ }
+ BufferHead *bh = it->second;
+ for (auto w_it = bh->waitfor_read.begin();
+ w_it != bh->waitfor_read.end(); ++w_it) {
+ if (w_it->first < bh->start() ||
+ w_it->first >= bh->start() + bh->length()) {
+ lderr(oc->cct) << "AUDIT FAILURE: waiter at " << w_it->first
+ << " is not within bh " << *bh << dendl;
+ ceph_assert(w_it->first >= bh->start());
+ ceph_assert(w_it->first < bh->start() + bh->length());
+ }
+ }
+ offset = it->first + it->second->length();
+ }
+}
+
+/*
+ * map a range of extents on an object's buffer cache.
+ * - combine any bh's we're writing into one
+ * - break up bufferheads that don't fall completely within the range
+ * //no! - return a bh that includes the write. may also include
+ * other dirty data to left and/or right.
+ */
+ObjectCacher::BufferHead *ObjectCacher::Object::map_write(ObjectExtent &ex,
+ ceph_tid_t tid)
+{
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
+ BufferHead *final = 0;
+
+ ldout(oc->cct, 10) << "map_write oex " << ex.oid
+ << " " << ex.offset << "~" << ex.length << dendl;
+
+ loff_t cur = ex.offset;
+ loff_t left = ex.length;
+
+ auto p = data_lower_bound(ex.offset);
+ while (left > 0) {
+ loff_t max = left;
+
+ // at end ?
+ if (p == data.end()) {
+ if (final == NULL) {
+ final = new BufferHead(this);
+ replace_journal_tid(final, tid);
+ final->set_start( cur );
+ final->set_length( max );
+ oc->bh_add(this, final);
+ ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
+ } else {
+ oc->bh_stat_sub(final);
+ final->set_length(final->length() + max);
+ oc->bh_stat_add(final);
+ }
+ left -= max;
+ cur += max;
+ continue;
+ }
+
+ ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
+ //oc->verify_stats();
+
+ if (p->first <= cur) {
+ BufferHead *bh = p->second;
+ ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
+
+ if (p->first < cur) {
+ ceph_assert(final == 0);
+ if (cur + max >= bh->end()) {
+ // we want right bit (one splice)
+ final = split(bh, cur); // just split it, take right half.
+ maybe_rebuild_buffer(bh);
+ replace_journal_tid(final, tid);
+ ++p;
+ ceph_assert(p->second == final);
+ } else {
+ // we want middle bit (two splices)
+ final = split(bh, cur);
+ maybe_rebuild_buffer(bh);
+ ++p;
+ ceph_assert(p->second == final);
+ auto right = split(final, cur+max);
+ maybe_rebuild_buffer(right);
+ replace_journal_tid(final, tid);
+ }
+ } else {
+ ceph_assert(p->first == cur);
+ if (bh->length() <= max) {
+ // whole bufferhead, piece of cake.
+ } else {
+ // we want left bit (one splice)
+ auto right = split(bh, cur + max); // just split
+ maybe_rebuild_buffer(right);
+ }
+ if (final) {
+ oc->mark_dirty(bh);
+ oc->mark_dirty(final);
+ --p; // move iterator back to final
+ ceph_assert(p->second == final);
+ replace_journal_tid(bh, tid);
+ merge_left(final, bh);
+ } else {
+ final = bh;
+ replace_journal_tid(final, tid);
+ }
+ }
+
+ // keep going.
+ loff_t lenfromcur = final->end() - cur;
+ cur += lenfromcur;
+ left -= lenfromcur;
+ ++p;
+ continue;
+ } else {
+ // gap!
+ loff_t next = p->first;
+ loff_t glen = std::min(next - cur, max);
+ ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
+ if (final) {
+ oc->bh_stat_sub(final);
+ final->set_length(final->length() + glen);
+ oc->bh_stat_add(final);
+ } else {
+ final = new BufferHead(this);
+ replace_journal_tid(final, tid);
+ final->set_start( cur );
+ final->set_length( glen );
+ oc->bh_add(this, final);
+ }
+
+ cur += glen;
+ left -= glen;
+ continue; // more?
+ }
+ }
+
+ // set version
+ ceph_assert(final);
+ ceph_assert(final->get_journal_tid() == tid);
+ ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
+
+ return final;
+}
+
+void ObjectCacher::Object::replace_journal_tid(BufferHead *bh,
+ ceph_tid_t tid) {
+ ceph_tid_t bh_tid = bh->get_journal_tid();
+
+ ceph_assert(tid == 0 || bh_tid <= tid);
+ if (bh_tid != 0 && bh_tid != tid) {
+ // inform journal that it should not expect a writeback from this extent
+ oc->writeback_handler.overwrite_extent(get_oid(), bh->start(),
+ bh->length(), bh_tid, tid);
+ }
+ bh->set_journal_tid(tid);
+}
+
+void ObjectCacher::Object::truncate(loff_t s)
+{
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
+ ldout(oc->cct, 10) << "truncate " << *this << " to " << s << dendl;
+
+ std::list<Context*> waiting_for_read;
+ while (!data.empty()) {
+ BufferHead *bh = data.rbegin()->second;
+ if (bh->end() <= s)
+ break;
+
+ // split bh at truncation point?
+ if (bh->start() < s) {
+ split(bh, s);
+ maybe_rebuild_buffer(bh);
+ continue;
+ }
+
+ // remove bh entirely
+ ceph_assert(bh->start() >= s);
+ for ([[maybe_unused]] auto& [off, ctxs] : bh->waitfor_read) {
+ waiting_for_read.splice(waiting_for_read.end(), ctxs);
+ }
+ bh->waitfor_read.clear();
+ replace_journal_tid(bh, 0);
+ oc->bh_remove(this, bh);
+ delete bh;
+ }
+ if (!waiting_for_read.empty()) {
+ ldout(oc->cct, 10) << "restarting reads post-truncate" << dendl;
+ }
+ finish_contexts(oc->cct, waiting_for_read, 0);
+}
+
+void ObjectCacher::Object::discard(loff_t off, loff_t len,
+ C_GatherBuilder* commit_gather)
+{
+ ceph_assert(ceph_mutex_is_locked(oc->lock));
+ ldout(oc->cct, 10) << "discard " << *this << " " << off << "~" << len
+ << dendl;
+
+ if (!exists) {
+ ldout(oc->cct, 10) << " setting exists on " << *this << dendl;
+ exists = true;
+ }
+ if (complete) {
+ ldout(oc->cct, 10) << " clearing complete on " << *this << dendl;
+ complete = false;
+ }
+
+ std::list<Context*> waiting_for_read;
+ auto p = data_lower_bound(off);
+ while (p != data.end()) {
+ BufferHead *bh = p->second;
+ if (bh->start() >= off + len)
+ break;
+
+ // split bh at truncation point?
+ if (bh->start() < off) {
+ split(bh, off);
+ maybe_rebuild_buffer(bh);
+ ++p;
+ continue;
+ }
+
+ ceph_assert(bh->start() >= off);
+ if (bh->end() > off + len) {
+ auto right = split(bh, off + len);
+ maybe_rebuild_buffer(right);
+ }
+
+ ++p;
+ ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
+ replace_journal_tid(bh, 0);
+
+ if (bh->is_tx() && commit_gather != nullptr) {
+ // wait for the writeback to commit
+ waitfor_commit[bh->last_write_tid].emplace_back(commit_gather->new_sub());
+ } else if (bh->is_rx()) {
+ // cannot remove bh with in-flight read, but we can ensure the
+ // read won't overwrite the discard
+ bh->last_read_tid = ++oc->last_read_tid;
+ bh->bl.clear();
+ bh->set_nocache(true);
+ oc->mark_zero(bh);
+ // we should mark all Rx bh to zero
+ continue;
+ } else {
+ for ([[maybe_unused]] auto& [off, ctxs] : bh->waitfor_read) {
+ waiting_for_read.splice(waiting_for_read.end(), ctxs);
+ }
+ bh->waitfor_read.clear();
+ }
+
+ oc->bh_remove(this, bh);
+ delete bh;
+ }
+ if (!waiting_for_read.empty()) {
+ ldout(oc->cct, 10) << "restarting reads post-discard" << dendl;
+ }
+ finish_contexts(oc->cct, waiting_for_read, 0); /* restart reads */
+}
+
+
+
+/*** ObjectCacher ***/
+
+#undef dout_prefix
+#define dout_prefix *_dout << "objectcacher "
+
+
+ObjectCacher::ObjectCacher(CephContext *cct_, string name,
+ WritebackHandler& wb, ceph::mutex& l,
+ flush_set_callback_t flush_callback,
+ void *flush_callback_arg, uint64_t max_bytes,
+ uint64_t max_objects, uint64_t max_dirty,
+ uint64_t target_dirty, double max_dirty_age,
+ bool block_writes_upfront)
+ : perfcounter(NULL),
+ cct(cct_), writeback_handler(wb), name(name), lock(l),
+ max_dirty(max_dirty), target_dirty(target_dirty),
+ max_size(max_bytes), max_objects(max_objects),
+ max_dirty_age(ceph::make_timespan(max_dirty_age)),
+ block_writes_upfront(block_writes_upfront),
+ trace_endpoint("ObjectCacher"),
+ flush_set_callback(flush_callback),
+ flush_set_callback_arg(flush_callback_arg),
+ last_read_tid(0), flusher_stop(false), flusher_thread(this),finisher(cct),
+ stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0),
+ stat_missing(0), stat_error(0), stat_dirty_waiting(0),
+ stat_nr_dirty_waiters(0), reads_outstanding(0)
+{
+ perf_start();
+ finisher.start();
+ scattered_write = writeback_handler.can_scattered_write();
+}
+
+ObjectCacher::~ObjectCacher()
+{
+ finisher.stop();
+ perf_stop();
+ // we should be empty.
+ for (auto i = objects.begin(); i != objects.end(); ++i)
+ ceph_assert(i->empty());
+ ceph_assert(bh_lru_rest.lru_get_size() == 0);
+ ceph_assert(bh_lru_dirty.lru_get_size() == 0);
+ ceph_assert(ob_lru.lru_get_size() == 0);
+ ceph_assert(dirty_or_tx_bh.empty());
+}
+
+void ObjectCacher::perf_start()
+{
+ string n = "objectcacher-" + name;
+ PerfCountersBuilder plb(cct, n, l_objectcacher_first, l_objectcacher_last);
+
+ plb.add_u64_counter(l_objectcacher_cache_ops_hit,
+ "cache_ops_hit", "Hit operations");
+ plb.add_u64_counter(l_objectcacher_cache_ops_miss,
+ "cache_ops_miss", "Miss operations");
+ plb.add_u64_counter(l_objectcacher_cache_bytes_hit,
+ "cache_bytes_hit", "Hit data", NULL, 0, unit_t(UNIT_BYTES));
+ plb.add_u64_counter(l_objectcacher_cache_bytes_miss,
+ "cache_bytes_miss", "Miss data", NULL, 0, unit_t(UNIT_BYTES));
+ plb.add_u64_counter(l_objectcacher_data_read,
+ "data_read", "Read data");
+ plb.add_u64_counter(l_objectcacher_data_written,
+ "data_written", "Data written to cache");
+ plb.add_u64_counter(l_objectcacher_data_flushed,
+ "data_flushed", "Data flushed");
+ plb.add_u64_counter(l_objectcacher_overwritten_in_flush,
+ "data_overwritten_while_flushing",
+ "Data overwritten while flushing");
+ plb.add_u64_counter(l_objectcacher_write_ops_blocked, "write_ops_blocked",
+ "Write operations, delayed due to dirty limits");
+ plb.add_u64_counter(l_objectcacher_write_bytes_blocked,
+ "write_bytes_blocked",
+ "Write data blocked on dirty limit", NULL, 0, unit_t(UNIT_BYTES));
+ plb.add_time(l_objectcacher_write_time_blocked, "write_time_blocked",
+ "Time spent blocking a write due to dirty limits");
+
+ perfcounter = plb.create_perf_counters();
+ cct->get_perfcounters_collection()->add(perfcounter);
+}
+
+void ObjectCacher::perf_stop()
+{
+ ceph_assert(perfcounter);
+ cct->get_perfcounters_collection()->remove(perfcounter);
+ delete perfcounter;
+}
+
+/* private */
+ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid,
+ uint64_t object_no,
+ ObjectSet *oset,
+ object_locator_t &l,
+ uint64_t truncate_size,
+ uint64_t truncate_seq)
+{
+ // XXX: Add handling of nspace in object_locator_t in cache
+ ceph_assert(ceph_mutex_is_locked(lock));
+ // have it?
+ if ((uint32_t)l.pool < objects.size()) {
+ if (objects[l.pool].count(oid)) {
+ Object *o = objects[l.pool][oid];
+ o->object_no = object_no;
+ o->truncate_size = truncate_size;
+ o->truncate_seq = truncate_seq;
+ return o;
+ }
+ } else {
+ objects.resize(l.pool+1);
+ }
+
+ // create it.
+ Object *o = new Object(this, oid, object_no, oset, l, truncate_size,
+ truncate_seq);
+ objects[l.pool][oid] = o;
+ ob_lru.lru_insert_top(o);
+ return o;
+}
+
+void ObjectCacher::close_object(Object *ob)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ldout(cct, 10) << "close_object " << *ob << dendl;
+ ceph_assert(ob->can_close());
+
+ // ok!
+ ob_lru.lru_remove(ob);
+ objects[ob->oloc.pool].erase(ob->get_soid());
+ ob->set_item.remove_myself();
+ delete ob;
+}
+
+void ObjectCacher::bh_read(BufferHead *bh, int op_flags,
+ const ZTracer::Trace &parent_trace)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ldout(cct, 7) << "bh_read on " << *bh << " outstanding reads "
+ << reads_outstanding << dendl;
+
+ ZTracer::Trace trace;
+ if (parent_trace.valid()) {
+ trace.init("", &trace_endpoint, &parent_trace);
+ trace.copy_name("bh_read " + bh->ob->get_oid().name);
+ trace.event("start");
+ }
+
+ mark_rx(bh);
+ bh->last_read_tid = ++last_read_tid;
+
+ // finisher
+ C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob, bh->last_read_tid,
+ bh->start(), bh->length(), trace);
+ // go
+ writeback_handler.read(bh->ob->get_oid(), bh->ob->get_object_number(),
+ bh->ob->get_oloc(), bh->start(), bh->length(),
+ bh->ob->get_snap(), &onfinish->bl,
+ bh->ob->truncate_size, bh->ob->truncate_seq,
+ op_flags, trace, onfinish);
+
+ ++reads_outstanding;
+}
+
+void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
+ ceph_tid_t tid, loff_t start,
+ uint64_t length, bufferlist &bl, int r,
+ bool trust_enoent)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ldout(cct, 7) << "bh_read_finish "
+ << oid
+ << " tid " << tid
+ << " " << start << "~" << length
+ << " (bl is " << bl.length() << ")"
+ << " returned " << r
+ << " outstanding reads " << reads_outstanding
+ << dendl;
+
+ if (r >= 0 && bl.length() < length) {
+ ldout(cct, 7) << "bh_read_finish " << oid << " padding " << start << "~"
+ << length << " with " << length - bl.length() << " bytes of zeroes"
+ << dendl;
+ bl.append_zero(length - bl.length());
+ }
+
+ list<Context*> ls;
+ int err = 0;
+
+ if (objects[poolid].count(oid) == 0) {
+ ldout(cct, 7) << "bh_read_finish no object cache" << dendl;
+ } else {
+ Object *ob = objects[poolid][oid];
+
+ if (r == -ENOENT && !ob->complete) {
+ // wake up *all* rx waiters, or else we risk reordering
+ // identical reads. e.g.
+ // read 1~1
+ // reply to unrelated 3~1 -> !exists
+ // read 1~1 -> immediate ENOENT
+ // reply to first 1~1 -> ooo ENOENT
+ bool allzero = true;
+ for (auto p = ob->data.begin(); p != ob->data.end(); ++p) {
+ BufferHead *bh = p->second;
+ for (auto p = bh->waitfor_read.begin();
+ p != bh->waitfor_read.end();
+ ++p)
+ ls.splice(ls.end(), p->second);
+ bh->waitfor_read.clear();
+ if (!bh->is_zero() && !bh->is_rx())
+ allzero = false;
+ }
+
+ // just pass through and retry all waiters if we don't trust
+ // -ENOENT for this read
+ if (trust_enoent) {
+ ldout(cct, 7)
+ << "bh_read_finish ENOENT, marking complete and !exists on " << *ob
+ << dendl;
+ ob->complete = true;
+ ob->exists = false;
+
+ /* If all the bhs are effectively zero, get rid of them. All
+ * the waiters will be retried and get -ENOENT immediately, so
+ * it's safe to clean up the unneeded bh's now. Since we know
+ * it's safe to remove them now, do so, so they aren't hanging
+ *around waiting for more -ENOENTs from rados while the cache
+ * is being shut down.
+ *
+ * Only do this when all the bhs are rx or clean, to match the
+ * condition in _readx(). If there are any non-rx or non-clean
+ * bhs, _readx() will wait for the final result instead of
+ * returning -ENOENT immediately.
+ */
+ if (allzero) {
+ ldout(cct, 10)
+ << "bh_read_finish ENOENT and allzero, getting rid of "
+ << "bhs for " << *ob << dendl;
+ auto p = ob->data.begin();
+ while (p != ob->data.end()) {
+ BufferHead *bh = p->second;
+ // current iterator will be invalidated by bh_remove()
+ ++p;
+ bh_remove(ob, bh);
+ delete bh;
+ }
+ }
+ }
+ }
+
+ // apply to bh's!
+ loff_t opos = start;
+ while (true) {
+ auto p = ob->data_lower_bound(opos);
+ if (p == ob->data.end())
+ break;
+ if (opos >= start+(loff_t)length) {
+ ldout(cct, 20) << "break due to opos " << opos << " >= start+length "
+ << start << "+" << length << "=" << start+(loff_t)length
+ << dendl;
+ break;
+ }
+
+ BufferHead *bh = p->second;
+ ldout(cct, 20) << "checking bh " << *bh << dendl;
+
+ // finishers?
+ for (auto it = bh->waitfor_read.begin();
+ it != bh->waitfor_read.end();
+ ++it)
+ ls.splice(ls.end(), it->second);
+ bh->waitfor_read.clear();
+
+ if (bh->start() > opos) {
+ ldout(cct, 1) << "bh_read_finish skipping gap "
+ << opos << "~" << bh->start() - opos
+ << dendl;
+ opos = bh->start();
+ continue;
+ }
+
+ if (!bh->is_rx()) {
+ ldout(cct, 10) << "bh_read_finish skipping non-rx " << *bh << dendl;
+ opos = bh->end();
+ continue;
+ }
+
+ if (bh->last_read_tid != tid) {
+ ldout(cct, 10) << "bh_read_finish bh->last_read_tid "
+ << bh->last_read_tid << " != tid " << tid
+ << ", skipping" << dendl;
+ opos = bh->end();
+ continue;
+ }
+
+ ceph_assert(opos >= bh->start());
+ ceph_assert(bh->start() == opos); // we don't merge rx bh's... yet!
+ ceph_assert(bh->length() <= start+(loff_t)length-opos);
+
+ if (bh->error < 0)
+ err = bh->error;
+
+ opos = bh->end();
+
+ if (r == -ENOENT) {
+ if (trust_enoent) {
+ ldout(cct, 10) << "bh_read_finish removing " << *bh << dendl;
+ bh_remove(ob, bh);
+ delete bh;
+ } else {
+ ldout(cct, 10) << "skipping unstrusted -ENOENT and will retry for "
+ << *bh << dendl;
+ }
+ continue;
+ }
+
+ if (r < 0) {
+ bh->error = r;
+ mark_error(bh);
+ } else {
+ bh->bl.substr_of(bl,
+ bh->start() - start,
+ bh->length());
+ mark_clean(bh);
+ }
+
+ ldout(cct, 10) << "bh_read_finish read " << *bh << dendl;
+
+ ob->try_merge_bh(bh);
+ }
+ }
+
+ // called with lock held.
+ ldout(cct, 20) << "finishing waiters " << ls << dendl;
+
+ finish_contexts(cct, ls, err);
+ retry_waiting_reads();
+
+ --reads_outstanding;
+ read_cond.notify_all();
+}
+
+void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
+ int64_t *max_amount, int *max_count)
+{
+ list<BufferHead*> blist;
+
+ int count = 0;
+ int64_t total_len = 0;
+ set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
+ ceph_assert(it != dirty_or_tx_bh.end());
+ for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
+ p != dirty_or_tx_bh.end();
+ ++p) {
+ BufferHead *obh = *p;
+ if (obh->ob != bh->ob)
+ break;
+ if (obh->is_dirty() && obh->last_write <= cutoff) {
+ blist.push_back(obh);
+ ++count;
+ total_len += obh->length();
+ if ((max_count && count > *max_count) ||
+ (max_amount && total_len > *max_amount))
+ break;
+ }
+ }
+
+ while (it != dirty_or_tx_bh.begin()) {
+ --it;
+ BufferHead *obh = *it;
+ if (obh->ob != bh->ob)
+ break;
+ if (obh->is_dirty() && obh->last_write <= cutoff) {
+ blist.push_front(obh);
+ ++count;
+ total_len += obh->length();
+ if ((max_count && count > *max_count) ||
+ (max_amount && total_len > *max_amount))
+ break;
+ }
+ }
+ if (max_count)
+ *max_count -= count;
+ if (max_amount)
+ *max_amount -= total_len;
+
+ bh_write_scattered(blist);
+}
+
+class ObjectCacher::C_WriteCommit : public Context {
+ ObjectCacher *oc;
+ int64_t poolid;
+ sobject_t oid;
+ vector<pair<loff_t, uint64_t> > ranges;
+ ZTracer::Trace trace;
+public:
+ ceph_tid_t tid = 0;
+ C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
+ uint64_t l, const ZTracer::Trace &trace) :
+ oc(c), poolid(_poolid), oid(o), trace(trace) {
+ ranges.push_back(make_pair(s, l));
+ }
+ C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
+ vector<pair<loff_t, uint64_t> >& _ranges) :
+ oc(c), poolid(_poolid), oid(o), tid(0) {
+ ranges.swap(_ranges);
+ }
+ void finish(int r) override {
+ oc->bh_write_commit(poolid, oid, ranges, tid, r);
+ trace.event("finish");
+ }
+};
+void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+
+ Object *ob = blist.front()->ob;
+ ob->get();
+
+ ceph::real_time last_write;
+ SnapContext snapc;
+ vector<pair<loff_t, uint64_t> > ranges;
+ vector<pair<uint64_t, bufferlist> > io_vec;
+
+ ranges.reserve(blist.size());
+ io_vec.reserve(blist.size());
+
+ uint64_t total_len = 0;
+ for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
+ BufferHead *bh = *p;
+ ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
+ ceph_assert(bh->ob == ob);
+ ceph_assert(bh->bl.length() == bh->length());
+ ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
+
+ int n = io_vec.size();
+ io_vec.resize(n + 1);
+ io_vec[n].first = bh->start();
+ io_vec[n].second = bh->bl;
+
+ total_len += bh->length();
+ if (bh->snapc.seq > snapc.seq)
+ snapc = bh->snapc;
+ if (bh->last_write > last_write)
+ last_write = bh->last_write;
+ }
+
+ C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
+
+ ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
+ io_vec, snapc, last_write,
+ ob->truncate_size, ob->truncate_seq,
+ oncommit);
+ oncommit->tid = tid;
+ ob->last_write_tid = tid;
+ for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
+ BufferHead *bh = *p;
+ bh->last_write_tid = tid;
+ mark_tx(bh);
+ }
+
+ if (perfcounter)
+ perfcounter->inc(l_objectcacher_data_flushed, total_len);
+}
+
+void ObjectCacher::bh_write(BufferHead *bh, const ZTracer::Trace &parent_trace)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ldout(cct, 7) << "bh_write " << *bh << dendl;
+
+ bh->ob->get();
+
+ ZTracer::Trace trace;
+ if (parent_trace.valid()) {
+ trace.init("", &trace_endpoint, &parent_trace);
+ trace.copy_name("bh_write " + bh->ob->get_oid().name);
+ trace.event("start");
+ }
+
+ // finishers
+ C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
+ bh->ob->get_soid(), bh->start(),
+ bh->length(), trace);
+ // go
+ ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(),
+ bh->ob->get_oloc(),
+ bh->start(), bh->length(),
+ bh->snapc, bh->bl, bh->last_write,
+ bh->ob->truncate_size,
+ bh->ob->truncate_seq,
+ bh->journal_tid, trace, oncommit);
+ ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
+
+ // set bh last_write_tid
+ oncommit->tid = tid;
+ bh->ob->last_write_tid = tid;
+ bh->last_write_tid = tid;
+
+ if (perfcounter) {
+ perfcounter->inc(l_objectcacher_data_flushed, bh->length());
+ }
+
+ mark_tx(bh);
+}
+
+void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
+ vector<pair<loff_t, uint64_t> >& ranges,
+ ceph_tid_t tid, int r)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
+ << " ranges " << ranges << " returned " << r << dendl;
+
+ if (objects[poolid].count(oid) == 0) {
+ ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
+ return;
+ }
+
+ Object *ob = objects[poolid][oid];
+ int was_dirty_or_tx = ob->oset->dirty_or_tx;
+
+ for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
+ p != ranges.end();
+ ++p) {
+ loff_t start = p->first;
+ uint64_t length = p->second;
+ if (!ob->exists) {
+ ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
+ ob->exists = true;
+
+ if (writeback_handler.may_copy_on_write(ob->get_oid(), start, length,
+ ob->get_snap())) {
+ ldout(cct, 10) << "bh_write_commit may copy on write, clearing "
+ "complete on " << *ob << dendl;
+ ob->complete = false;
+ }
+ }
+
+ vector<pair<loff_t, BufferHead*>> hit;
+ // apply to bh's!
+ for (map<loff_t, BufferHead*>::const_iterator p = ob->data_lower_bound(start);
+ p != ob->data.end();
+ ++p) {
+ BufferHead *bh = p->second;
+
+ if (bh->start() >= start+(loff_t)length)
+ break;
+
+ // make sure bh is tx
+ if (!bh->is_tx()) {
+ ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
+ continue;
+ }
+
+ // make sure bh tid matches
+ if (bh->last_write_tid != tid) {
+ ceph_assert(bh->last_write_tid > tid);
+ ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
+ continue;
+ }
+
+ // we don't merge tx buffers. tx buffer should be within the range
+ ceph_assert(bh->start() >= start);
+ ceph_assert(bh->end() <= start+(loff_t)length);
+
+ if (r >= 0) {
+ // ok! mark bh clean and error-free
+ mark_clean(bh);
+ bh->set_journal_tid(0);
+ if (bh->get_nocache())
+ bh_lru_rest.lru_bottouch(bh);
+ hit.push_back(make_pair(bh->start(), bh));
+ ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
+ } else {
+ mark_dirty(bh);
+ ldout(cct, 10) << "bh_write_commit marking dirty again due to error "
+ << *bh << " r = " << r << " " << cpp_strerror(-r)
+ << dendl;
+ }
+ }
+
+ for (auto& p : hit) {
+ //p.second maybe merged and deleted in merge_left
+ if (ob->data.count(p.first))
+ ob->try_merge_bh(p.second);
+ }
+ }
+
+ // update last_commit.
+ ceph_assert(ob->last_commit_tid < tid);
+ ob->last_commit_tid = tid;
+
+ // waiters?
+ list<Context*> ls;
+ if (ob->waitfor_commit.count(tid)) {
+ ls.splice(ls.begin(), ob->waitfor_commit[tid]);
+ ob->waitfor_commit.erase(tid);
+ }
+
+ // is the entire object set now clean and fully committed?
+ ObjectSet *oset = ob->oset;
+ ob->put();
+
+ if (flush_set_callback &&
+ was_dirty_or_tx > 0 &&
+ oset->dirty_or_tx == 0) { // nothing dirty/tx
+ flush_set_callback(flush_set_callback_arg, oset);
+ }
+
+ if (!ls.empty())
+ finish_contexts(cct, ls, r);
+}
+
+void ObjectCacher::flush(ZTracer::Trace *trace, loff_t amount)
+{
+ ceph_assert(trace != nullptr);
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ceph::real_time cutoff = ceph::real_clock::now();
+
+ ldout(cct, 10) << "flush " << amount << dendl;
+
+ /*
+ * NOTE: we aren't actually pulling things off the LRU here, just
+ * looking at the tail item. Then we call bh_write, which moves it
+ * to the other LRU, so that we can call
+ * lru_dirty.lru_get_next_expire() again.
+ */
+ int64_t left = amount;
+ while (amount == 0 || left > 0) {
+ BufferHead *bh = static_cast<BufferHead*>(
+ bh_lru_dirty.lru_get_next_expire());
+ if (!bh) break;
+ if (bh->last_write > cutoff) break;
+
+ if (scattered_write) {
+ bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
+ } else {
+ left -= bh->length();
+ bh_write(bh, *trace);
+ }
+ }
+}
+
+
+void ObjectCacher::trim()
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ldout(cct, 10) << "trim start: bytes: max " << max_size << " clean "
+ << get_stat_clean() << ", objects: max " << max_objects
+ << " current " << ob_lru.lru_get_size() << dendl;
+
+ uint64_t max_clean_bh = max_size >> BUFFER_MEMORY_WEIGHT;
+ uint64_t nr_clean_bh = bh_lru_rest.lru_get_size() - bh_lru_rest.lru_get_num_pinned();
+ while (get_stat_clean() > 0 &&
+ ((uint64_t)get_stat_clean() > max_size ||
+ nr_clean_bh > max_clean_bh)) {
+ BufferHead *bh = static_cast<BufferHead*>(bh_lru_rest.lru_expire());
+ if (!bh)
+ break;
+
+ ldout(cct, 10) << "trim trimming " << *bh << dendl;
+ ceph_assert(bh->is_clean() || bh->is_zero() || bh->is_error());
+
+ Object *ob = bh->ob;
+ bh_remove(ob, bh);
+ delete bh;
+
+ --nr_clean_bh;
+
+ if (ob->complete) {
+ ldout(cct, 10) << "trim clearing complete on " << *ob << dendl;
+ ob->complete = false;
+ }
+ }
+
+ while (ob_lru.lru_get_size() > max_objects) {
+ Object *ob = static_cast<Object*>(ob_lru.lru_expire());
+ if (!ob)
+ break;
+
+ ldout(cct, 10) << "trim trimming " << *ob << dendl;
+ close_object(ob);
+ }
+
+ ldout(cct, 10) << "trim finish: max " << max_size << " clean "
+ << get_stat_clean() << ", objects: max " << max_objects
+ << " current " << ob_lru.lru_get_size() << dendl;
+}
+
+
+
+/* public */
+
+bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents,
+ snapid_t snapid)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ for (vector<ObjectExtent>::iterator ex_it = extents.begin();
+ ex_it != extents.end();
+ ++ex_it) {
+ ldout(cct, 10) << "is_cached " << *ex_it << dendl;
+
+ // get Object cache
+ sobject_t soid(ex_it->oid, snapid);
+ Object *o = get_object_maybe(soid, ex_it->oloc);
+ if (!o)
+ return false;
+ if (!o->is_cached(ex_it->offset, ex_it->length))
+ return false;
+ }
+ return true;
+}
+
+
+/*
+ * returns # bytes read (if in cache). onfinish is untouched (caller
+ * must delete it)
+ * returns 0 if doing async read
+ */
+int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
+ ZTracer::Trace *parent_trace)
+{
+ ZTracer::Trace trace;
+ if (parent_trace != nullptr) {
+ trace.init("read", &trace_endpoint, parent_trace);
+ trace.event("start");
+ }
+
+ int r =_readx(rd, oset, onfinish, true, &trace);
+ if (r < 0) {
+ trace.event("finish");
+ }
+ return r;
+}
+
+int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
+ bool external_call, ZTracer::Trace *trace)
+{
+ ceph_assert(trace != nullptr);
+ ceph_assert(ceph_mutex_is_locked(lock));
+ bool success = true;
+ int error = 0;
+ uint64_t bytes_in_cache = 0;
+ uint64_t bytes_not_in_cache = 0;
+ uint64_t total_bytes_read = 0;
+ map<uint64_t, bufferlist> stripe_map; // final buffer offset -> substring
+ bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
+ bool nocache = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
+
+ /*
+ * WARNING: we can only meaningfully return ENOENT if the read request
+ * passed in a single ObjectExtent. Any caller who wants ENOENT instead of
+ * zeroed buffers needs to feed single extents into readx().
+ */
+ ceph_assert(!oset->return_enoent || rd->extents.size() == 1);
+
+ for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
+ ex_it != rd->extents.end();
+ ++ex_it) {
+ ldout(cct, 10) << "readx " << *ex_it << dendl;
+
+ total_bytes_read += ex_it->length;
+
+ // get Object cache
+ sobject_t soid(ex_it->oid, rd->snap);
+ Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
+ ex_it->truncate_size, oset->truncate_seq);
+ if (external_call)
+ touch_ob(o);
+
+ // does not exist and no hits?
+ if (oset->return_enoent && !o->exists) {
+ ldout(cct, 10) << "readx object !exists, 1 extent..." << dendl;
+
+ // should we worry about COW underneath us?
+ if (writeback_handler.may_copy_on_write(soid.oid, ex_it->offset,
+ ex_it->length, soid.snap)) {
+ ldout(cct, 20) << "readx may copy on write" << dendl;
+ bool wait = false;
+ list<BufferHead*> blist;
+ for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
+ bh_it != o->data.end();
+ ++bh_it) {
+ BufferHead *bh = bh_it->second;
+ if (bh->is_dirty() || bh->is_tx()) {
+ ldout(cct, 10) << "readx flushing " << *bh << dendl;
+ wait = true;
+ if (bh->is_dirty()) {
+ if (scattered_write)
+ blist.push_back(bh);
+ else
+ bh_write(bh, *trace);
+ }
+ }
+ }
+ if (scattered_write && !blist.empty())
+ bh_write_scattered(blist);
+ if (wait) {
+ ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
+ << " on " << *o << dendl;
+ o->waitfor_commit[o->last_write_tid].push_back(
+ new C_RetryRead(this,rd, oset, onfinish, *trace));
+ // FIXME: perfcounter!
+ return 0;
+ }
+ }
+
+ // can we return ENOENT?
+ bool allzero = true;
+ for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
+ bh_it != o->data.end();
+ ++bh_it) {
+ ldout(cct, 20) << "readx ob has bh " << *bh_it->second << dendl;
+ if (!bh_it->second->is_zero() && !bh_it->second->is_rx()) {
+ allzero = false;
+ break;
+ }
+ }
+ if (allzero) {
+ ldout(cct, 10) << "readx ob has all zero|rx, returning ENOENT"
+ << dendl;
+ delete rd;
+ if (dontneed)
+ bottouch_ob(o);
+ return -ENOENT;
+ }
+ }
+
+ // map extent into bufferheads
+ map<loff_t, BufferHead*> hits, missing, rx, errors;
+ o->map_read(*ex_it, hits, missing, rx, errors);
+ if (external_call) {
+ // retry reading error buffers
+ missing.insert(errors.begin(), errors.end());
+ } else {
+ // some reads had errors, fail later so completions
+ // are cleaned up properly
+ // TODO: make read path not call _readx for every completion
+ hits.insert(errors.begin(), errors.end());
+ }
+
+ if (!missing.empty() || !rx.empty()) {
+ // read missing
+ map<loff_t, BufferHead*>::iterator last = missing.end();
+ for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
+ bh_it != missing.end();
+ ++bh_it) {
+ uint64_t rx_bytes = static_cast<uint64_t>(
+ stat_rx + bh_it->second->length());
+ bytes_not_in_cache += bh_it->second->length();
+ if (!waitfor_read.empty() || (stat_rx > 0 && rx_bytes > max_size)) {
+ // cache is full with concurrent reads -- wait for rx's to complete
+ // to constrain memory growth (especially during copy-ups)
+ if (success) {
+ ldout(cct, 10) << "readx missed, waiting on cache to complete "
+ << waitfor_read.size() << " blocked reads, "
+ << (std::max(rx_bytes, max_size) - max_size)
+ << " read bytes" << dendl;
+ waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish,
+ *trace));
+ }
+
+ bh_remove(o, bh_it->second);
+ delete bh_it->second;
+ } else {
+ bh_it->second->set_nocache(nocache);
+ bh_read(bh_it->second, rd->fadvise_flags, *trace);
+ if ((success && onfinish) || last != missing.end())
+ last = bh_it;
+ }
+ success = false;
+ }
+
+ //add wait in last bh avoid wakeup early. Because read is order
+ if (last != missing.end()) {
+ ldout(cct, 10) << "readx missed, waiting on " << *last->second
+ << " off " << last->first << dendl;
+ last->second->waitfor_read[last->first].push_back(
+ new C_RetryRead(this, rd, oset, onfinish, *trace) );
+
+ }
+
+ // bump rx
+ for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
+ bh_it != rx.end();
+ ++bh_it) {
+ touch_bh(bh_it->second); // bump in lru, so we don't lose it.
+ if (success && onfinish) {
+ ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
+ << " off " << bh_it->first << dendl;
+ bh_it->second->waitfor_read[bh_it->first].push_back(
+ new C_RetryRead(this, rd, oset, onfinish, *trace) );
+ }
+ bytes_not_in_cache += bh_it->second->length();
+ success = false;
+ }
+
+ for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
+ bh_it != hits.end(); ++bh_it)
+ //bump in lru, so we don't lose it when later read
+ touch_bh(bh_it->second);
+
+ } else {
+ ceph_assert(!hits.empty());
+
+ // make a plain list
+ for (map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
+ bh_it != hits.end();
+ ++bh_it) {
+ BufferHead *bh = bh_it->second;
+ ldout(cct, 10) << "readx hit bh " << *bh << dendl;
+ if (bh->is_error() && bh->error)
+ error = bh->error;
+ bytes_in_cache += bh->length();
+
+ if (bh->get_nocache() && bh->is_clean())
+ bh_lru_rest.lru_bottouch(bh);
+ else
+ touch_bh(bh);
+ //must be after touch_bh because touch_bh set dontneed false
+ if (dontneed &&
+ ((loff_t)ex_it->offset <= bh->start() &&
+ (bh->end() <=(loff_t)(ex_it->offset + ex_it->length)))) {
+ bh->set_dontneed(true); //if dirty
+ if (bh->is_clean())
+ bh_lru_rest.lru_bottouch(bh);
+ }
+ }
+
+ if (!error) {
+ // create reverse map of buffer offset -> object for the
+ // eventual result. this is over a single ObjectExtent, so we
+ // know that
+ // - the bh's are contiguous
+ // - the buffer frags need not be (and almost certainly aren't)
+ loff_t opos = ex_it->offset;
+ map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
+ ceph_assert(bh_it->second->start() <= opos);
+ uint64_t bhoff = opos - bh_it->second->start();
+ vector<pair<uint64_t,uint64_t> >::iterator f_it
+ = ex_it->buffer_extents.begin();
+ uint64_t foff = 0;
+ while (1) {
+ BufferHead *bh = bh_it->second;
+ ceph_assert(opos == (loff_t)(bh->start() + bhoff));
+
+ uint64_t len = std::min(f_it->second - foff, bh->length() - bhoff);
+ ldout(cct, 10) << "readx rmap opos " << opos << ": " << *bh << " +"
+ << bhoff << " frag " << f_it->first << "~"
+ << f_it->second << " +" << foff << "~" << len
+ << dendl;
+
+ bufferlist bit;
+ // put substr here first, since substr_of clobbers, and we
+ // may get multiple bh's at this stripe_map position
+ if (bh->is_zero()) {
+ stripe_map[f_it->first].append_zero(len);
+ } else {
+ bit.substr_of(bh->bl,
+ opos - bh->start(),
+ len);
+ stripe_map[f_it->first].claim_append(bit);
+ }
+
+ opos += len;
+ bhoff += len;
+ foff += len;
+ if (opos == bh->end()) {
+ ++bh_it;
+ bhoff = 0;
+ }
+ if (foff == f_it->second) {
+ ++f_it;
+ foff = 0;
+ }
+ if (bh_it == hits.end()) break;
+ if (f_it == ex_it->buffer_extents.end())
+ break;
+ }
+ ceph_assert(f_it == ex_it->buffer_extents.end());
+ ceph_assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
+ }
+
+ if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
+ bottouch_ob(o);
+ }
+ }
+
+ if (!success) {
+ if (perfcounter && external_call) {
+ perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
+ perfcounter->inc(l_objectcacher_cache_bytes_miss, bytes_not_in_cache);
+ perfcounter->inc(l_objectcacher_cache_ops_miss);
+ }
+ if (onfinish) {
+ ldout(cct, 20) << "readx defer " << rd << dendl;
+ } else {
+ ldout(cct, 20) << "readx drop " << rd << " (no complete, but no waiter)"
+ << dendl;
+ delete rd;
+ }
+ return 0; // wait!
+ }
+ if (perfcounter && external_call) {
+ perfcounter->inc(l_objectcacher_data_read, total_bytes_read);
+ perfcounter->inc(l_objectcacher_cache_bytes_hit, bytes_in_cache);
+ perfcounter->inc(l_objectcacher_cache_ops_hit);
+ }
+
+ // no misses... success! do the read.
+ ldout(cct, 10) << "readx has all buffers" << dendl;
+
+ // ok, assemble into result buffer.
+ uint64_t pos = 0;
+ if (rd->bl && !error) {
+ rd->bl->clear();
+ for (map<uint64_t,bufferlist>::iterator i = stripe_map.begin();
+ i != stripe_map.end();
+ ++i) {
+ ceph_assert(pos == i->first);
+ ldout(cct, 10) << "readx adding buffer len " << i->second.length()
+ << " at " << pos << dendl;
+ pos += i->second.length();
+ rd->bl->claim_append(i->second);
+ ceph_assert(rd->bl->length() == pos);
+ }
+ ldout(cct, 10) << "readx result is " << rd->bl->length() << dendl;
+ } else if (!error) {
+ ldout(cct, 10) << "readx no bufferlist ptr (readahead?), done." << dendl;
+ map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
+ pos = i->first + i->second.length();
+ }
+
+ // done with read.
+ int ret = error ? error : pos;
+ ldout(cct, 20) << "readx done " << rd << " " << ret << dendl;
+ ceph_assert(pos <= (uint64_t) INT_MAX);
+
+ delete rd;
+
+ trim();
+
+ return ret;
+}
+
+void ObjectCacher::retry_waiting_reads()
+{
+ list<Context *> ls;
+ ls.swap(waitfor_read);
+
+ while (!ls.empty() && waitfor_read.empty()) {
+ Context *ctx = ls.front();
+ ls.pop_front();
+ ctx->complete(0);
+ }
+ waitfor_read.splice(waitfor_read.end(), ls);
+}
+
+int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace,
+ ZTracer::Trace *parent_trace)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ceph::real_time now = ceph::real_clock::now();
+ uint64_t bytes_written = 0;
+ uint64_t bytes_written_in_flush = 0;
+ bool dontneed = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
+ bool nocache = wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
+
+ ZTracer::Trace trace;
+ if (parent_trace != nullptr) {
+ trace.init("write", &trace_endpoint, parent_trace);
+ trace.event("start");
+ }
+
+ list<Context*> wait_for_reads;
+ for (vector<ObjectExtent>::iterator ex_it = wr->extents.begin();
+ ex_it != wr->extents.end();
+ ++ex_it) {
+ // get object cache
+ sobject_t soid(ex_it->oid, CEPH_NOSNAP);
+ Object *o = get_object(soid, ex_it->objectno, oset, ex_it->oloc,
+ ex_it->truncate_size, oset->truncate_seq);
+
+ // map it all into a single bufferhead.
+ BufferHead *bh = o->map_write(*ex_it, wr->journal_tid);
+ bool missing = bh->is_missing();
+ bh->snapc = wr->snapc;
+
+ // readers that need to be woken up due to an overwrite
+ for (auto& [_, wait_for_read] : bh->waitfor_read) {
+ wait_for_reads.splice(wait_for_reads.end(), wait_for_read);
+ }
+ bh->waitfor_read.clear();
+
+ bytes_written += ex_it->length;
+ if (bh->is_tx()) {
+ bytes_written_in_flush += ex_it->length;
+ }
+
+ // adjust buffer pointers (ie "copy" data into my cache)
+ // this is over a single ObjectExtent, so we know that
+ // - there is one contiguous bh
+ // - the buffer frags need not be (and almost certainly aren't)
+ // note: i assume striping is monotonic... no jumps backwards, ever!
+ loff_t opos = ex_it->offset;
+ for (vector<pair<uint64_t, uint64_t> >::iterator f_it
+ = ex_it->buffer_extents.begin();
+ f_it != ex_it->buffer_extents.end();
+ ++f_it) {
+ ldout(cct, 10) << "writex writing " << f_it->first << "~"
+ << f_it->second << " into " << *bh << " at " << opos
+ << dendl;
+ uint64_t bhoff = opos - bh->start();
+ ceph_assert(f_it->second <= bh->length() - bhoff);
+
+ // get the frag we're mapping in
+ bufferlist frag;
+ frag.substr_of(wr->bl, f_it->first, f_it->second);
+
+ // keep anything left of bhoff
+ if (!bhoff)
+ bh->bl.swap(frag);
+ else
+ bh->bl.claim_append(frag);
+
+ opos += f_it->second;
+ }
+
+ // ok, now bh is dirty.
+ mark_dirty(bh);
+ if (dontneed)
+ bh->set_dontneed(true);
+ else if (nocache && missing)
+ bh->set_nocache(true);
+ else
+ touch_bh(bh);
+
+ bh->last_write = now;
+
+ o->try_merge_bh(bh);
+ }
+
+ if (perfcounter) {
+ perfcounter->inc(l_objectcacher_data_written, bytes_written);
+ if (bytes_written_in_flush) {
+ perfcounter->inc(l_objectcacher_overwritten_in_flush,
+ bytes_written_in_flush);
+ }
+ }
+
+ int r = _wait_for_write(wr, bytes_written, oset, &trace, onfreespace);
+ delete wr;
+
+ finish_contexts(cct, wait_for_reads, 0);
+
+ //verify_stats();
+ trim();
+ return r;
+}
+
+class ObjectCacher::C_WaitForWrite : public Context {
+public:
+ C_WaitForWrite(ObjectCacher *oc, uint64_t len,
+ const ZTracer::Trace &trace, Context *onfinish) :
+ m_oc(oc), m_len(len), m_trace(trace), m_onfinish(onfinish) {}
+ void finish(int r) override;
+private:
+ ObjectCacher *m_oc;
+ uint64_t m_len;
+ ZTracer::Trace m_trace;
+ Context *m_onfinish;
+};
+
+void ObjectCacher::C_WaitForWrite::finish(int r)
+{
+ std::lock_guard l(m_oc->lock);
+ m_oc->_maybe_wait_for_writeback(m_len, &m_trace);
+ m_onfinish->complete(r);
+}
+
+void ObjectCacher::_maybe_wait_for_writeback(uint64_t len,
+ ZTracer::Trace *trace)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ceph::mono_time start = ceph::mono_clock::now();
+ int blocked = 0;
+ // wait for writeback?
+ // - wait for dirty and tx bytes (relative to the max_dirty threshold)
+ // - do not wait for bytes other waiters are waiting on. this means that
+ // threads do not wait for each other. this effectively allows the cache
+ // size to balloon proportional to the data that is in flight.
+
+ uint64_t max_dirty_bh = max_dirty >> BUFFER_MEMORY_WEIGHT;
+ while (get_stat_dirty() + get_stat_tx() > 0 &&
+ (((uint64_t)(get_stat_dirty() + get_stat_tx()) >=
+ max_dirty + get_stat_dirty_waiting()) ||
+ (dirty_or_tx_bh.size() >=
+ max_dirty_bh + get_stat_nr_dirty_waiters()))) {
+
+ if (blocked == 0) {
+ trace->event("start wait for writeback");
+ }
+ ldout(cct, 10) << __func__ << " waiting for dirty|tx "
+ << (get_stat_dirty() + get_stat_tx()) << " >= max "
+ << max_dirty << " + dirty_waiting "
+ << get_stat_dirty_waiting() << dendl;
+ flusher_cond.notify_all();
+ stat_dirty_waiting += len;
+ ++stat_nr_dirty_waiters;
+ std::unique_lock l{lock, std::adopt_lock};
+ stat_cond.wait(l);
+ l.release();
+ stat_dirty_waiting -= len;
+ --stat_nr_dirty_waiters;
+ ++blocked;
+ ldout(cct, 10) << __func__ << " woke up" << dendl;
+ }
+ if (blocked > 0) {
+ trace->event("finish wait for writeback");
+ }
+ if (blocked && perfcounter) {
+ perfcounter->inc(l_objectcacher_write_ops_blocked);
+ perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
+ ceph::timespan blocked = ceph::mono_clock::now() - start;
+ perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
+ }
+}
+
+// blocking wait for write.
+int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset,
+ ZTracer::Trace *trace, Context *onfreespace)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ceph_assert(trace != nullptr);
+ int ret = 0;
+
+ if (max_dirty > 0 && !(wr->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_FUA)) {
+ if (block_writes_upfront) {
+ _maybe_wait_for_writeback(len, trace);
+ if (onfreespace)
+ onfreespace->complete(0);
+ } else {
+ ceph_assert(onfreespace);
+ finisher.queue(new C_WaitForWrite(this, len, *trace, onfreespace));
+ }
+ } else {
+ // write-thru! flush what we just wrote.
+ ceph::condition_variable cond;
+ bool done = false;
+ Context *fin = block_writes_upfront ?
+ new C_Cond(cond, &done, &ret) : onfreespace;
+ ceph_assert(fin);
+ bool flushed = flush_set(oset, wr->extents, trace, fin);
+ ceph_assert(!flushed); // we just dirtied it, and didn't drop our lock!
+ ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len
+ << " bytes" << dendl;
+ if (block_writes_upfront) {
+ std::unique_lock l{lock, std::adopt_lock};
+ cond.wait(l, [&done] { return done; });
+ l.release();
+ ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
+ if (onfreespace)
+ onfreespace->complete(ret);
+ }
+ }
+
+ // start writeback anyway?
+ if (get_stat_dirty() > 0 && (uint64_t) get_stat_dirty() > target_dirty) {
+ ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
+ << target_dirty << ", nudging flusher" << dendl;
+ flusher_cond.notify_all();
+ }
+ return ret;
+}
+
+void ObjectCacher::flusher_entry()
+{
+ ldout(cct, 10) << "flusher start" << dendl;
+ std::unique_lock l{lock};
+ while (!flusher_stop) {
+ loff_t all = get_stat_tx() + get_stat_rx() + get_stat_clean() +
+ get_stat_dirty();
+ ldout(cct, 11) << "flusher "
+ << all << " / " << max_size << ": "
+ << get_stat_tx() << " tx, "
+ << get_stat_rx() << " rx, "
+ << get_stat_clean() << " clean, "
+ << get_stat_dirty() << " dirty ("
+ << target_dirty << " target, "
+ << max_dirty << " max)"
+ << dendl;
+ loff_t actual = get_stat_dirty() + get_stat_dirty_waiting();
+
+ ZTracer::Trace trace;
+ if (cct->_conf->osdc_blkin_trace_all) {
+ trace.init("flusher", &trace_endpoint);
+ trace.event("start");
+ }
+
+ if (actual > 0 && (uint64_t) actual > target_dirty) {
+ // flush some dirty pages
+ ldout(cct, 10) << "flusher " << get_stat_dirty() << " dirty + "
+ << get_stat_dirty_waiting() << " dirty_waiting > target "
+ << target_dirty << ", flushing some dirty bhs" << dendl;
+ flush(&trace, actual - target_dirty);
+ } else {
+ // check tail of lru for old dirty items
+ ceph::real_time cutoff = ceph::real_clock::now();
+ cutoff -= max_dirty_age;
+ BufferHead *bh = 0;
+ int max = MAX_FLUSH_UNDER_LOCK;
+ while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
+ lru_get_next_expire())) != 0 &&
+ bh->last_write <= cutoff &&
+ max > 0) {
+ ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
+ if (scattered_write) {
+ bh_write_adjacencies(bh, cutoff, NULL, &max);
+ } else {
+ bh_write(bh, trace);
+ --max;
+ }
+ }
+ if (!max) {
+ // back off the lock to avoid starving other threads
+ trace.event("backoff");
+ l.unlock();
+ l.lock();
+ continue;
+ }
+ }
+
+ trace.event("finish");
+ if (flusher_stop)
+ break;
+
+ flusher_cond.wait_for(l, 1s);
+ }
+
+ /* Wait for reads to finish. This is only possible if handling
+ * -ENOENT made some read completions finish before their rados read
+ * came back. If we don't wait for them, and destroy the cache, when
+ * the rados reads do come back their callback will try to access the
+ * no-longer-valid ObjectCacher.
+ */
+ read_cond.wait(l, [this] {
+ if (reads_outstanding > 0) {
+ ldout(cct, 10) << "Waiting for all reads to complete. Number left: "
+ << reads_outstanding << dendl;
+ return false;
+ } else {
+ return true;
+ }
+ });
+ ldout(cct, 10) << "flusher finish" << dendl;
+}
+
+
+// -------------------------------------------------
+
+bool ObjectCacher::set_is_empty(ObjectSet *oset)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ if (oset->objects.empty())
+ return true;
+
+ for (xlist<Object*>::iterator p = oset->objects.begin(); !p.end(); ++p)
+ if (!(*p)->is_empty())
+ return false;
+
+ return true;
+}
+
+bool ObjectCacher::set_is_cached(ObjectSet *oset)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ if (oset->objects.empty())
+ return false;
+
+ for (xlist<Object*>::iterator p = oset->objects.begin();
+ !p.end(); ++p) {
+ Object *ob = *p;
+ for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
+ q != ob->data.end();
+ ++q) {
+ BufferHead *bh = q->second;
+ if (!bh->is_dirty() && !bh->is_tx())
+ return true;
+ }
+ }
+
+ return false;
+}
+
+bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ if (oset->objects.empty())
+ return false;
+
+ for (xlist<Object*>::iterator i = oset->objects.begin();
+ !i.end(); ++i) {
+ Object *ob = *i;
+
+ for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
+ p != ob->data.end();
+ ++p) {
+ BufferHead *bh = p->second;
+ if (bh->is_dirty() || bh->is_tx())
+ return true;
+ }
+ }
+
+ return false;
+}
+
+
+// purge. non-blocking. violently removes dirty buffers from cache.
+void ObjectCacher::purge(Object *ob)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ldout(cct, 10) << "purge " << *ob << dendl;
+
+ ob->truncate(0);
+}
+
+
+// flush. non-blocking. no callback.
+// true if clean, already flushed.
+// false if we wrote something.
+// be sloppy about the ranges and flush any buffer it touches
+bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length,
+ ZTracer::Trace *trace)
+{
+ ceph_assert(trace != nullptr);
+ ceph_assert(ceph_mutex_is_locked(lock));
+ list<BufferHead*> blist;
+ bool clean = true;
+ ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
+ for (map<loff_t,BufferHead*>::const_iterator p = ob->data_lower_bound(offset);
+ p != ob->data.end();
+ ++p) {
+ BufferHead *bh = p->second;
+ ldout(cct, 20) << "flush " << *bh << dendl;
+ if (length && bh->start() > offset+length) {
+ break;
+ }
+ if (bh->is_tx()) {
+ clean = false;
+ continue;
+ }
+ if (!bh->is_dirty()) {
+ continue;
+ }
+
+ if (scattered_write)
+ blist.push_back(bh);
+ else
+ bh_write(bh, *trace);
+ clean = false;
+ }
+ if (scattered_write && !blist.empty())
+ bh_write_scattered(blist);
+
+ return clean;
+}
+
+bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather,
+ Context *onfinish)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ if (gather->has_subs()) {
+ gather->set_finisher(onfinish);
+ gather->activate();
+ return false;
+ }
+
+ ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl;
+ onfinish->complete(0);
+ return true;
+}
+
+// flush. non-blocking, takes callback.
+// returns true if already flushed
+bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ceph_assert(onfinish != NULL);
+ if (oset->objects.empty()) {
+ ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
+ onfinish->complete(0);
+ return true;
+ }
+
+ ldout(cct, 10) << "flush_set " << oset << dendl;
+
+ // we'll need to wait for all objects to flush!
+ C_GatherBuilder gather(cct);
+ set<Object*> waitfor_commit;
+
+ list<BufferHead*> blist;
+ Object *last_ob = NULL;
+ set<BufferHead*, BufferHead::ptr_lt>::const_iterator it, p, q;
+
+ // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
+ // order. But items in oset->objects are not sorted. So the iterator can
+ // point to any buffer head in the ObjectSet
+ BufferHead key(*oset->objects.begin());
+ it = dirty_or_tx_bh.lower_bound(&key);
+ p = q = it;
+
+ bool backwards = true;
+ if (it != dirty_or_tx_bh.begin())
+ --it;
+ else
+ backwards = false;
+
+ for (; p != dirty_or_tx_bh.end(); p = q) {
+ ++q;
+ BufferHead *bh = *p;
+ if (bh->ob->oset != oset)
+ break;
+ waitfor_commit.insert(bh->ob);
+ if (bh->is_dirty()) {
+ if (scattered_write) {
+ if (last_ob != bh->ob) {
+ if (!blist.empty()) {
+ bh_write_scattered(blist);
+ blist.clear();
+ }
+ last_ob = bh->ob;
+ }
+ blist.push_back(bh);
+ } else {
+ bh_write(bh, {});
+ }
+ }
+ }
+
+ if (backwards) {
+ for(p = q = it; true; p = q) {
+ if (q != dirty_or_tx_bh.begin())
+ --q;
+ else
+ backwards = false;
+ BufferHead *bh = *p;
+ if (bh->ob->oset != oset)
+ break;
+ waitfor_commit.insert(bh->ob);
+ if (bh->is_dirty()) {
+ if (scattered_write) {
+ if (last_ob != bh->ob) {
+ if (!blist.empty()) {
+ bh_write_scattered(blist);
+ blist.clear();
+ }
+ last_ob = bh->ob;
+ }
+ blist.push_front(bh);
+ } else {
+ bh_write(bh, {});
+ }
+ }
+ if (!backwards)
+ break;
+ }
+ }
+
+ if (scattered_write && !blist.empty())
+ bh_write_scattered(blist);
+
+ for (set<Object*>::iterator i = waitfor_commit.begin();
+ i != waitfor_commit.end(); ++i) {
+ Object *ob = *i;
+
+ // we'll need to gather...
+ ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
+ << ob->last_write_tid << " on " << *ob << dendl;
+ ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
+ }
+
+ return _flush_set_finish(&gather, onfinish);
+}
+
+// flush. non-blocking, takes callback.
+// returns true if already flushed
+bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv,
+ ZTracer::Trace *trace, Context *onfinish)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ceph_assert(trace != nullptr);
+ ceph_assert(onfinish != NULL);
+ if (oset->objects.empty()) {
+ ldout(cct, 10) << "flush_set on " << oset << " dne" << dendl;
+ onfinish->complete(0);
+ return true;
+ }
+
+ ldout(cct, 10) << "flush_set " << oset << " on " << exv.size()
+ << " ObjectExtents" << dendl;
+
+ // we'll need to wait for all objects to flush!
+ C_GatherBuilder gather(cct);
+
+ for (vector<ObjectExtent>::iterator p = exv.begin();
+ p != exv.end();
+ ++p) {
+ ObjectExtent &ex = *p;
+ sobject_t soid(ex.oid, CEPH_NOSNAP);
+ if (objects[oset->poolid].count(soid) == 0)
+ continue;
+ Object *ob = objects[oset->poolid][soid];
+
+ ldout(cct, 20) << "flush_set " << oset << " ex " << ex << " ob " << soid
+ << " " << ob << dendl;
+
+ if (!flush(ob, ex.offset, ex.length, trace)) {
+ // we'll need to gather...
+ ldout(cct, 10) << "flush_set " << oset << " will wait for ack tid "
+ << ob->last_write_tid << " on " << *ob << dendl;
+ ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
+ }
+ }
+
+ return _flush_set_finish(&gather, onfinish);
+}
+
+// flush all dirty data. non-blocking, takes callback.
+// returns true if already flushed
+bool ObjectCacher::flush_all(Context *onfinish)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ceph_assert(onfinish != NULL);
+
+ ldout(cct, 10) << "flush_all " << dendl;
+
+ // we'll need to wait for all objects to flush!
+ C_GatherBuilder gather(cct);
+ set<Object*> waitfor_commit;
+
+ list<BufferHead*> blist;
+ Object *last_ob = NULL;
+ set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
+ next = it = dirty_or_tx_bh.begin();
+ while (it != dirty_or_tx_bh.end()) {
+ ++next;
+ BufferHead *bh = *it;
+ waitfor_commit.insert(bh->ob);
+
+ if (bh->is_dirty()) {
+ if (scattered_write) {
+ if (last_ob != bh->ob) {
+ if (!blist.empty()) {
+ bh_write_scattered(blist);
+ blist.clear();
+ }
+ last_ob = bh->ob;
+ }
+ blist.push_back(bh);
+ } else {
+ bh_write(bh, {});
+ }
+ }
+
+ it = next;
+ }
+
+ if (scattered_write && !blist.empty())
+ bh_write_scattered(blist);
+
+ for (set<Object*>::iterator i = waitfor_commit.begin();
+ i != waitfor_commit.end();
+ ++i) {
+ Object *ob = *i;
+
+ // we'll need to gather...
+ ldout(cct, 10) << "flush_all will wait for ack tid "
+ << ob->last_write_tid << " on " << *ob << dendl;
+ ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
+ }
+
+ return _flush_set_finish(&gather, onfinish);
+}
+
+void ObjectCacher::purge_set(ObjectSet *oset)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ if (oset->objects.empty()) {
+ ldout(cct, 10) << "purge_set on " << oset << " dne" << dendl;
+ return;
+ }
+
+ ldout(cct, 10) << "purge_set " << oset << dendl;
+ const bool were_dirty = oset->dirty_or_tx > 0;
+
+ for (xlist<Object*>::iterator i = oset->objects.begin();
+ !i.end(); ++i) {
+ Object *ob = *i;
+ purge(ob);
+ }
+
+ // Although we have purged rather than flushed, caller should still
+ // drop any resources associate with dirty data.
+ ceph_assert(oset->dirty_or_tx == 0);
+ if (flush_set_callback && were_dirty) {
+ flush_set_callback(flush_set_callback_arg, oset);
+ }
+}
+
+
+loff_t ObjectCacher::release(Object *ob)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ list<BufferHead*> clean;
+ loff_t o_unclean = 0;
+
+ for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
+ p != ob->data.end();
+ ++p) {
+ BufferHead *bh = p->second;
+ if (bh->is_clean() || bh->is_zero() || bh->is_error())
+ clean.push_back(bh);
+ else
+ o_unclean += bh->length();
+ }
+
+ for (list<BufferHead*>::iterator p = clean.begin();
+ p != clean.end();
+ ++p) {
+ bh_remove(ob, *p);
+ delete *p;
+ }
+
+ if (ob->can_close()) {
+ ldout(cct, 10) << "release trimming " << *ob << dendl;
+ close_object(ob);
+ ceph_assert(o_unclean == 0);
+ return 0;
+ }
+
+ if (ob->complete) {
+ ldout(cct, 10) << "release clearing complete on " << *ob << dendl;
+ ob->complete = false;
+ }
+ if (!ob->exists) {
+ ldout(cct, 10) << "release setting exists on " << *ob << dendl;
+ ob->exists = true;
+ }
+
+ return o_unclean;
+}
+
+loff_t ObjectCacher::release_set(ObjectSet *oset)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ // return # bytes not clean (and thus not released).
+ loff_t unclean = 0;
+
+ if (oset->objects.empty()) {
+ ldout(cct, 10) << "release_set on " << oset << " dne" << dendl;
+ return 0;
+ }
+
+ ldout(cct, 10) << "release_set " << oset << dendl;
+
+ xlist<Object*>::iterator q;
+ for (xlist<Object*>::iterator p = oset->objects.begin();
+ !p.end(); ) {
+ q = p;
+ ++q;
+ Object *ob = *p;
+
+ loff_t o_unclean = release(ob);
+ unclean += o_unclean;
+
+ if (o_unclean)
+ ldout(cct, 10) << "release_set " << oset << " " << *ob
+ << " has " << o_unclean << " bytes left"
+ << dendl;
+ p = q;
+ }
+
+ if (unclean) {
+ ldout(cct, 10) << "release_set " << oset
+ << ", " << unclean << " bytes left" << dendl;
+ }
+
+ return unclean;
+}
+
+
+uint64_t ObjectCacher::release_all()
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ldout(cct, 10) << "release_all" << dendl;
+ uint64_t unclean = 0;
+
+ vector<ceph::unordered_map<sobject_t, Object*> >::iterator i
+ = objects.begin();
+ while (i != objects.end()) {
+ ceph::unordered_map<sobject_t, Object*>::iterator p = i->begin();
+ while (p != i->end()) {
+ ceph::unordered_map<sobject_t, Object*>::iterator n = p;
+ ++n;
+
+ Object *ob = p->second;
+
+ loff_t o_unclean = release(ob);
+ unclean += o_unclean;
+
+ if (o_unclean)
+ ldout(cct, 10) << "release_all " << *ob
+ << " has " << o_unclean << " bytes left"
+ << dendl;
+ p = n;
+ }
+ ++i;
+ }
+
+ if (unclean) {
+ ldout(cct, 10) << "release_all unclean " << unclean << " bytes left"
+ << dendl;
+ }
+
+ return unclean;
+}
+
+void ObjectCacher::clear_nonexistence(ObjectSet *oset)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ldout(cct, 10) << "clear_nonexistence() " << oset << dendl;
+
+ for (xlist<Object*>::iterator p = oset->objects.begin();
+ !p.end(); ++p) {
+ Object *ob = *p;
+ if (!ob->exists) {
+ ldout(cct, 10) << " setting exists and complete on " << *ob << dendl;
+ ob->exists = true;
+ ob->complete = false;
+ }
+ for (xlist<C_ReadFinish*>::iterator q = ob->reads.begin();
+ !q.end(); ++q) {
+ C_ReadFinish *comp = *q;
+ comp->distrust_enoent();
+ }
+ }
+}
+
+/**
+ * discard object extents from an ObjectSet by removing the objects in
+ * exls from the in-memory oset.
+ */
+void ObjectCacher::discard_set(ObjectSet *oset, const vector<ObjectExtent>& exls)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ bool was_dirty = oset->dirty_or_tx > 0;
+
+ _discard(oset, exls, nullptr);
+ _discard_finish(oset, was_dirty, nullptr);
+}
+
+/**
+ * discard object extents from an ObjectSet by removing the objects in
+ * exls from the in-memory oset. If the bh is in TX state, the discard
+ * will wait for the write to commit prior to invoking on_finish.
+ */
+void ObjectCacher::discard_writeback(ObjectSet *oset,
+ const vector<ObjectExtent>& exls,
+ Context* on_finish)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ bool was_dirty = oset->dirty_or_tx > 0;
+
+ C_GatherBuilder gather(cct);
+ _discard(oset, exls, &gather);
+
+ if (gather.has_subs()) {
+ bool flushed = was_dirty && oset->dirty_or_tx == 0;
+ gather.set_finisher(new LambdaContext(
+ [this, oset, flushed, on_finish](int) {
+ ceph_assert(ceph_mutex_is_locked(lock));
+ if (flushed && flush_set_callback)
+ flush_set_callback(flush_set_callback_arg, oset);
+ if (on_finish)
+ on_finish->complete(0);
+ }));
+ gather.activate();
+ return;
+ }
+
+ _discard_finish(oset, was_dirty, on_finish);
+}
+
+void ObjectCacher::_discard(ObjectSet *oset, const vector<ObjectExtent>& exls,
+ C_GatherBuilder* gather)
+{
+ if (oset->objects.empty()) {
+ ldout(cct, 10) << __func__ << " on " << oset << " dne" << dendl;
+ return;
+ }
+
+ ldout(cct, 10) << __func__ << " " << oset << dendl;
+
+ for (auto& ex : exls) {
+ ldout(cct, 10) << __func__ << " " << oset << " ex " << ex << dendl;
+ sobject_t soid(ex.oid, CEPH_NOSNAP);
+ if (objects[oset->poolid].count(soid) == 0)
+ continue;
+ Object *ob = objects[oset->poolid][soid];
+
+ ob->discard(ex.offset, ex.length, gather);
+ }
+}
+
+void ObjectCacher::_discard_finish(ObjectSet *oset, bool was_dirty,
+ Context* on_finish)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+
+ // did we truncate off dirty data?
+ if (flush_set_callback && was_dirty && oset->dirty_or_tx == 0) {
+ flush_set_callback(flush_set_callback_arg, oset);
+ }
+
+ // notify that in-flight writeback has completed
+ if (on_finish != nullptr) {
+ on_finish->complete(0);
+ }
+}
+
+void ObjectCacher::verify_stats() const
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ldout(cct, 10) << "verify_stats" << dendl;
+
+ loff_t clean = 0, zero = 0, dirty = 0, rx = 0, tx = 0, missing = 0,
+ error = 0;
+ for (vector<ceph::unordered_map<sobject_t, Object*> >::const_iterator i
+ = objects.begin();
+ i != objects.end();
+ ++i) {
+ for (ceph::unordered_map<sobject_t, Object*>::const_iterator p
+ = i->begin();
+ p != i->end();
+ ++p) {
+ Object *ob = p->second;
+ for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
+ q != ob->data.end();
+ ++q) {
+ BufferHead *bh = q->second;
+ switch (bh->get_state()) {
+ case BufferHead::STATE_MISSING:
+ missing += bh->length();
+ break;
+ case BufferHead::STATE_CLEAN:
+ clean += bh->length();
+ break;
+ case BufferHead::STATE_ZERO:
+ zero += bh->length();
+ break;
+ case BufferHead::STATE_DIRTY:
+ dirty += bh->length();
+ break;
+ case BufferHead::STATE_TX:
+ tx += bh->length();
+ break;
+ case BufferHead::STATE_RX:
+ rx += bh->length();
+ break;
+ case BufferHead::STATE_ERROR:
+ error += bh->length();
+ break;
+ default:
+ ceph_abort();
+ }
+ }
+ }
+ }
+
+ ldout(cct, 10) << " clean " << clean << " rx " << rx << " tx " << tx
+ << " dirty " << dirty << " missing " << missing
+ << " error " << error << dendl;
+ ceph_assert(clean == stat_clean);
+ ceph_assert(rx == stat_rx);
+ ceph_assert(tx == stat_tx);
+ ceph_assert(dirty == stat_dirty);
+ ceph_assert(missing == stat_missing);
+ ceph_assert(zero == stat_zero);
+ ceph_assert(error == stat_error);
+}
+
+void ObjectCacher::bh_stat_add(BufferHead *bh)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ switch (bh->get_state()) {
+ case BufferHead::STATE_MISSING:
+ stat_missing += bh->length();
+ break;
+ case BufferHead::STATE_CLEAN:
+ stat_clean += bh->length();
+ break;
+ case BufferHead::STATE_ZERO:
+ stat_zero += bh->length();
+ break;
+ case BufferHead::STATE_DIRTY:
+ stat_dirty += bh->length();
+ bh->ob->dirty_or_tx += bh->length();
+ bh->ob->oset->dirty_or_tx += bh->length();
+ break;
+ case BufferHead::STATE_TX:
+ stat_tx += bh->length();
+ bh->ob->dirty_or_tx += bh->length();
+ bh->ob->oset->dirty_or_tx += bh->length();
+ break;
+ case BufferHead::STATE_RX:
+ stat_rx += bh->length();
+ break;
+ case BufferHead::STATE_ERROR:
+ stat_error += bh->length();
+ break;
+ default:
+ ceph_abort_msg("bh_stat_add: invalid bufferhead state");
+ }
+ if (get_stat_dirty_waiting() > 0)
+ stat_cond.notify_all();
+}
+
+void ObjectCacher::bh_stat_sub(BufferHead *bh)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ switch (bh->get_state()) {
+ case BufferHead::STATE_MISSING:
+ stat_missing -= bh->length();
+ break;
+ case BufferHead::STATE_CLEAN:
+ stat_clean -= bh->length();
+ break;
+ case BufferHead::STATE_ZERO:
+ stat_zero -= bh->length();
+ break;
+ case BufferHead::STATE_DIRTY:
+ stat_dirty -= bh->length();
+ bh->ob->dirty_or_tx -= bh->length();
+ bh->ob->oset->dirty_or_tx -= bh->length();
+ break;
+ case BufferHead::STATE_TX:
+ stat_tx -= bh->length();
+ bh->ob->dirty_or_tx -= bh->length();
+ bh->ob->oset->dirty_or_tx -= bh->length();
+ break;
+ case BufferHead::STATE_RX:
+ stat_rx -= bh->length();
+ break;
+ case BufferHead::STATE_ERROR:
+ stat_error -= bh->length();
+ break;
+ default:
+ ceph_abort_msg("bh_stat_sub: invalid bufferhead state");
+ }
+}
+
+void ObjectCacher::bh_set_state(BufferHead *bh, int s)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ int state = bh->get_state();
+ // move between lru lists?
+ if (s == BufferHead::STATE_DIRTY && state != BufferHead::STATE_DIRTY) {
+ bh_lru_rest.lru_remove(bh);
+ bh_lru_dirty.lru_insert_top(bh);
+ } else if (s != BufferHead::STATE_DIRTY &&state == BufferHead::STATE_DIRTY) {
+ bh_lru_dirty.lru_remove(bh);
+ if (bh->get_dontneed())
+ bh_lru_rest.lru_insert_bot(bh);
+ else
+ bh_lru_rest.lru_insert_top(bh);
+ }
+
+ if ((s == BufferHead::STATE_TX ||
+ s == BufferHead::STATE_DIRTY) &&
+ state != BufferHead::STATE_TX &&
+ state != BufferHead::STATE_DIRTY) {
+ dirty_or_tx_bh.insert(bh);
+ } else if ((state == BufferHead::STATE_TX ||
+ state == BufferHead::STATE_DIRTY) &&
+ s != BufferHead::STATE_TX &&
+ s != BufferHead::STATE_DIRTY) {
+ dirty_or_tx_bh.erase(bh);
+ }
+
+ if (s != BufferHead::STATE_ERROR &&
+ state == BufferHead::STATE_ERROR) {
+ bh->error = 0;
+ }
+
+ // set state
+ bh_stat_sub(bh);
+ bh->set_state(s);
+ bh_stat_add(bh);
+}
+
+void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ldout(cct, 30) << "bh_add " << *ob << " " << *bh << dendl;
+ ob->add_bh(bh);
+ if (bh->is_dirty()) {
+ bh_lru_dirty.lru_insert_top(bh);
+ dirty_or_tx_bh.insert(bh);
+ } else {
+ if (bh->get_dontneed())
+ bh_lru_rest.lru_insert_bot(bh);
+ else
+ bh_lru_rest.lru_insert_top(bh);
+ }
+
+ if (bh->is_tx()) {
+ dirty_or_tx_bh.insert(bh);
+ }
+ bh_stat_add(bh);
+}
+
+void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
+{
+ ceph_assert(ceph_mutex_is_locked(lock));
+ ceph_assert(bh->get_journal_tid() == 0);
+ ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
+ ob->remove_bh(bh);
+ if (bh->is_dirty()) {
+ bh_lru_dirty.lru_remove(bh);
+ dirty_or_tx_bh.erase(bh);
+ } else {
+ bh_lru_rest.lru_remove(bh);
+ }
+
+ if (bh->is_tx()) {
+ dirty_or_tx_bh.erase(bh);
+ }
+ bh_stat_sub(bh);
+ if (get_stat_dirty_waiting() > 0)
+ stat_cond.notify_all();
+}
+