summaryrefslogtreecommitdiffstats
path: root/src/test/osdc
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/osdc')
-rw-r--r--src/test/osdc/CMakeLists.txt13
-rw-r--r--src/test/osdc/FakeWriteback.cc93
-rw-r--r--src/test/osdc/FakeWriteback.h47
-rw-r--r--src/test/osdc/MemWriteback.cc166
-rw-r--r--src/test/osdc/MemWriteback.h52
-rw-r--r--src/test/osdc/object_cacher_stress.cc425
6 files changed, 796 insertions, 0 deletions
diff --git a/src/test/osdc/CMakeLists.txt b/src/test/osdc/CMakeLists.txt
new file mode 100644
index 000000000..297c2672c
--- /dev/null
+++ b/src/test/osdc/CMakeLists.txt
@@ -0,0 +1,13 @@
+add_executable(ceph_test_objectcacher_stress
+ object_cacher_stress.cc
+ FakeWriteback.cc
+ MemWriteback.cc
+ )
+target_link_libraries(ceph_test_objectcacher_stress
+ osdc
+ global
+ ${EXTRALIBS}
+ ${CMAKE_DL_LIBS}
+ )
+install(TARGETS ceph_test_objectcacher_stress
+ DESTINATION ${CMAKE_INSTALL_BINDIR})
diff --git a/src/test/osdc/FakeWriteback.cc b/src/test/osdc/FakeWriteback.cc
new file mode 100644
index 000000000..2f58965cc
--- /dev/null
+++ b/src/test/osdc/FakeWriteback.cc
@@ -0,0 +1,93 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <errno.h>
+#include <time.h>
+
+#include <thread>
+#include "common/debug.h"
+#include "common/Cond.h"
+#include "common/Finisher.h"
+#include "common/ceph_mutex.h"
+#include "include/ceph_assert.h"
+#include "common/ceph_time.h"
+
+#include "FakeWriteback.h"
+
+#define dout_subsys ceph_subsys_objectcacher
+#undef dout_prefix
+#define dout_prefix *_dout << "FakeWriteback(" << this << ") "
+
+class C_Delay : public Context {
+ CephContext *m_cct;
+ Context *m_con;
+ ceph::timespan m_delay;
+ ceph::mutex *m_lock;
+ bufferlist *m_bl;
+ uint64_t m_off;
+
+public:
+ C_Delay(CephContext *cct, Context *c, ceph::mutex *lock, uint64_t off,
+ bufferlist *pbl, uint64_t delay_ns=0)
+ : m_cct(cct), m_con(c), m_delay(delay_ns * std::chrono::nanoseconds(1)),
+ m_lock(lock), m_bl(pbl), m_off(off) {}
+ void finish(int r) override {
+ std::this_thread::sleep_for(m_delay);
+ if (m_bl) {
+ buffer::ptr bp(r);
+ bp.zero();
+ m_bl->append(bp);
+ ldout(m_cct, 20) << "finished read " << m_off << "~" << r << dendl;
+ }
+ std::lock_guard locker{*m_lock};
+ m_con->complete(r);
+ }
+};
+
+FakeWriteback::FakeWriteback(CephContext *cct, ceph::mutex *lock, uint64_t delay_ns)
+ : m_cct(cct), m_lock(lock), m_delay_ns(delay_ns)
+{
+ m_finisher = new Finisher(cct);
+ m_finisher->start();
+}
+
+FakeWriteback::~FakeWriteback()
+{
+ m_finisher->stop();
+ delete m_finisher;
+}
+
+void FakeWriteback::read(const object_t& oid, uint64_t object_no,
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len, snapid_t snapid,
+ bufferlist *pbl, uint64_t trunc_size,
+ __u32 trunc_seq, int op_flags,
+ const ZTracer::Trace &parent_trace,
+ Context *onfinish)
+{
+ C_Delay *wrapper = new C_Delay(m_cct, onfinish, m_lock, off, pbl,
+ m_delay_ns);
+ m_finisher->queue(wrapper, len);
+}
+
+ceph_tid_t FakeWriteback::write(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len,
+ const SnapContext& snapc,
+ const bufferlist &bl, ceph::real_time mtime,
+ uint64_t trunc_size, __u32 trunc_seq,
+ ceph_tid_t journal_tid,
+ const ZTracer::Trace &parent_trace,
+ Context *oncommit)
+{
+ C_Delay *wrapper = new C_Delay(m_cct, oncommit, m_lock, off, NULL,
+ m_delay_ns);
+ m_finisher->queue(wrapper, 0);
+ return ++m_tid;
+}
+
+bool FakeWriteback::may_copy_on_write(const object_t&, uint64_t, uint64_t,
+ snapid_t)
+{
+ return false;
+}
diff --git a/src/test/osdc/FakeWriteback.h b/src/test/osdc/FakeWriteback.h
new file mode 100644
index 000000000..11f78e813
--- /dev/null
+++ b/src/test/osdc/FakeWriteback.h
@@ -0,0 +1,47 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef CEPH_TEST_OSDC_FAKEWRITEBACK_H
+#define CEPH_TEST_OSDC_FAKEWRITEBACK_H
+
+#include "include/Context.h"
+#include "include/types.h"
+#include "osd/osd_types.h"
+#include "osdc/WritebackHandler.h"
+
+#include <atomic>
+
+class Finisher;
+
+class FakeWriteback : public WritebackHandler {
+public:
+ FakeWriteback(CephContext *cct, ceph::mutex *lock, uint64_t delay_ns);
+ ~FakeWriteback() override;
+
+ void read(const object_t& oid, uint64_t object_no,
+ const object_locator_t& oloc, uint64_t off, uint64_t len,
+ snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
+ __u32 trunc_seq, int op_flags,
+ const ZTracer::Trace &parent_trace,
+ Context *onfinish) override;
+
+ ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
+ uint64_t off, uint64_t len,
+ const SnapContext& snapc, const bufferlist &bl,
+ ceph::real_time mtime, uint64_t trunc_size,
+ __u32 trunc_seq, ceph_tid_t journal_tid,
+ const ZTracer::Trace &parent_trace,
+ Context *oncommit) override;
+
+ using WritebackHandler::write;
+
+ bool may_copy_on_write(const object_t&, uint64_t, uint64_t,
+ snapid_t) override;
+private:
+ CephContext *m_cct;
+ ceph::mutex *m_lock;
+ uint64_t m_delay_ns;
+ std::atomic<unsigned> m_tid = { 0 };
+ Finisher *m_finisher;
+};
+
+#endif
diff --git a/src/test/osdc/MemWriteback.cc b/src/test/osdc/MemWriteback.cc
new file mode 100644
index 000000000..4cb11291a
--- /dev/null
+++ b/src/test/osdc/MemWriteback.cc
@@ -0,0 +1,166 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <errno.h>
+#include <time.h>
+
+#include <thread>
+#include "common/debug.h"
+#include "common/Cond.h"
+#include "common/Finisher.h"
+#include "common/ceph_mutex.h"
+#include "include/ceph_assert.h"
+#include "common/ceph_time.h"
+
+#include "MemWriteback.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_objectcacher
+#undef dout_prefix
+#define dout_prefix *_dout << "MemWriteback(" << this << ") "
+
+class C_DelayRead : public Context {
+ MemWriteback *wb;
+ CephContext *m_cct;
+ Context *m_con;
+ ceph::timespan m_delay;
+ ceph::mutex *m_lock;
+ object_t m_oid;
+ uint64_t m_off;
+ uint64_t m_len;
+ bufferlist *m_bl;
+
+public:
+ C_DelayRead(MemWriteback *mwb, CephContext *cct, Context *c, ceph::mutex *lock,
+ const object_t& oid, uint64_t off, uint64_t len, bufferlist *pbl,
+ uint64_t delay_ns=0)
+ : wb(mwb), m_cct(cct), m_con(c),
+ m_delay(delay_ns * std::chrono::nanoseconds(1)),
+ m_lock(lock), m_oid(oid), m_off(off), m_len(len), m_bl(pbl) {}
+ void finish(int r) override {
+ std::this_thread::sleep_for(m_delay);
+ std::lock_guard locker{*m_lock};
+ r = wb->read_object_data(m_oid, m_off, m_len, m_bl);
+ if (m_con)
+ m_con->complete(r);
+ }
+};
+
+class C_DelayWrite : public Context {
+ MemWriteback *wb;
+ CephContext *m_cct;
+ Context *m_con;
+ ceph::timespan m_delay;
+ ceph::mutex *m_lock;
+ object_t m_oid;
+ uint64_t m_off;
+ uint64_t m_len;
+ const bufferlist& m_bl;
+
+public:
+ C_DelayWrite(MemWriteback *mwb, CephContext *cct, Context *c, ceph::mutex *lock,
+ const object_t& oid, uint64_t off, uint64_t len,
+ const bufferlist& bl, uint64_t delay_ns=0)
+ : wb(mwb), m_cct(cct), m_con(c),
+ m_delay(delay_ns * std::chrono::nanoseconds(1)),
+ m_lock(lock), m_oid(oid), m_off(off), m_len(len), m_bl(bl) {}
+ void finish(int r) override {
+ std::this_thread::sleep_for(m_delay);
+ std::lock_guard locker{*m_lock};
+ wb->write_object_data(m_oid, m_off, m_len, m_bl);
+ if (m_con)
+ m_con->complete(r);
+ }
+};
+
+MemWriteback::MemWriteback(CephContext *cct, ceph::mutex *lock, uint64_t delay_ns)
+ : m_cct(cct), m_lock(lock), m_delay_ns(delay_ns)
+{
+ m_finisher = new Finisher(cct);
+ m_finisher->start();
+}
+
+MemWriteback::~MemWriteback()
+{
+ m_finisher->stop();
+ delete m_finisher;
+}
+
+void MemWriteback::read(const object_t& oid, uint64_t object_no,
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len, snapid_t snapid,
+ bufferlist *pbl, uint64_t trunc_size,
+ __u32 trunc_seq, int op_flags,
+ const ZTracer::Trace &parent_trace,
+ Context *onfinish)
+{
+ ceph_assert(snapid == CEPH_NOSNAP);
+ C_DelayRead *wrapper = new C_DelayRead(this, m_cct, onfinish, m_lock, oid,
+ off, len, pbl, m_delay_ns);
+ m_finisher->queue(wrapper, len);
+}
+
+ceph_tid_t MemWriteback::write(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len,
+ const SnapContext& snapc,
+ const bufferlist &bl, ceph::real_time mtime,
+ uint64_t trunc_size, __u32 trunc_seq,
+ ceph_tid_t journal_tid,
+ const ZTracer::Trace &parent_trace,
+ Context *oncommit)
+{
+ ceph_assert(snapc.seq == 0);
+ C_DelayWrite *wrapper = new C_DelayWrite(this, m_cct, oncommit, m_lock, oid,
+ off, len, bl, m_delay_ns);
+ m_finisher->queue(wrapper, 0);
+ return ++m_tid;
+}
+
+void MemWriteback::write_object_data(const object_t& oid, uint64_t off, uint64_t len,
+ const bufferlist& data_bl)
+{
+ dout(1) << "writing " << oid << " " << off << "~" << len << dendl;
+ ceph_assert(len == data_bl.length());
+ bufferlist& obj_bl = object_data[oid];
+ bufferlist new_obj_bl;
+ // ensure size, or set it if new object
+ if (off + len > obj_bl.length()) {
+ obj_bl.append_zero(off + len - obj_bl.length());
+ }
+
+ // beginning
+ new_obj_bl.substr_of(obj_bl, 0, off);
+ // overwritten bit
+ new_obj_bl.append(data_bl);
+ // tail bit
+ bufferlist tmp;
+ tmp.substr_of(obj_bl, off+len, obj_bl.length()-(off+len));
+ new_obj_bl.append(tmp);
+ obj_bl.swap(new_obj_bl);
+ dout(1) << oid << " final size " << obj_bl.length() << dendl;
+}
+
+int MemWriteback::read_object_data(const object_t& oid, uint64_t off, uint64_t len,
+ bufferlist *data_bl)
+{
+ dout(1) << "reading " << oid << " " << off << "~" << len << dendl;
+ auto obj_i = object_data.find(oid);
+ if (obj_i == object_data.end()) {
+ dout(1) << oid << "DNE!" << dendl;
+ return -ENOENT;
+ }
+
+ const bufferlist& obj_bl = obj_i->second;
+ dout(1) << "reading " << oid << " from total size " << obj_bl.length() << dendl;
+
+ uint64_t read_len = std::min(len, obj_bl.length()-off);
+ data_bl->substr_of(obj_bl, off, read_len);
+ return 0;
+}
+
+bool MemWriteback::may_copy_on_write(const object_t&, uint64_t, uint64_t,
+ snapid_t)
+{
+ return false;
+}
diff --git a/src/test/osdc/MemWriteback.h b/src/test/osdc/MemWriteback.h
new file mode 100644
index 000000000..12c1ac3c0
--- /dev/null
+++ b/src/test/osdc/MemWriteback.h
@@ -0,0 +1,52 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#ifndef CEPH_TEST_OSDC_MEMWRITEBACK_H
+#define CEPH_TEST_OSDC_MEMWRITEBACK_H
+
+#include "include/Context.h"
+#include "include/types.h"
+#include "osd/osd_types.h"
+#include "osdc/WritebackHandler.h"
+
+#include <atomic>
+
+class Finisher;
+
+class MemWriteback : public WritebackHandler {
+public:
+ MemWriteback(CephContext *cct, ceph::mutex *lock, uint64_t delay_ns);
+ ~MemWriteback() override;
+
+ void read(const object_t& oid, uint64_t object_no,
+ const object_locator_t& oloc, uint64_t off, uint64_t len,
+ snapid_t snapid, bufferlist *pbl, uint64_t trunc_size,
+ __u32 trunc_seq, int op_flags,
+ const ZTracer::Trace &parent_trace,
+ Context *onfinish) override;
+
+ ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
+ uint64_t off, uint64_t len,
+ const SnapContext& snapc, const bufferlist &bl,
+ ceph::real_time mtime, uint64_t trunc_size,
+ __u32 trunc_seq, ceph_tid_t journal_tid,
+ const ZTracer::Trace &parent_trace,
+ Context *oncommit) override;
+
+ using WritebackHandler::write;
+
+ bool may_copy_on_write(const object_t&, uint64_t, uint64_t,
+ snapid_t) override;
+ void write_object_data(const object_t& oid, uint64_t off, uint64_t len,
+ const bufferlist& data_bl);
+ int read_object_data(const object_t& oid, uint64_t off, uint64_t len,
+ bufferlist *data_bl);
+private:
+ std::map<object_t, bufferlist> object_data;
+ CephContext *m_cct;
+ ceph::mutex *m_lock;
+ uint64_t m_delay_ns;
+ std::atomic<unsigned> m_tid = { 0 };
+ Finisher *m_finisher;
+};
+
+#endif
diff --git a/src/test/osdc/object_cacher_stress.cc b/src/test/osdc/object_cacher_stress.cc
new file mode 100644
index 000000000..096f9b49e
--- /dev/null
+++ b/src/test/osdc/object_cacher_stress.cc
@@ -0,0 +1,425 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <cstdlib>
+#include <ctime>
+#include <sstream>
+#include <string>
+#include <vector>
+#include <boost/scoped_ptr.hpp>
+
+#include "common/ceph_argparse.h"
+#include "common/ceph_mutex.h"
+#include "common/common_init.h"
+#include "common/config.h"
+#include "common/snap_types.h"
+#include "global/global_init.h"
+#include "include/buffer.h"
+#include "include/Context.h"
+#include "include/stringify.h"
+#include "osdc/ObjectCacher.h"
+
+#include "FakeWriteback.h"
+#include "MemWriteback.h"
+
+#include <atomic>
+
+using namespace std;
+
+// XXX: Only tests default namespace
+struct op_data {
+ op_data(const std::string &oid, uint64_t offset, uint64_t len, bool read)
+ : extent(oid, 0, offset, len, 0), is_read(read)
+ {
+ extent.oloc.pool = 0;
+ extent.buffer_extents.push_back(make_pair(0, len));
+ }
+
+ ObjectExtent extent;
+ bool is_read;
+ ceph::bufferlist result;
+ std::atomic<unsigned> done = { 0 };
+};
+
+class C_Count : public Context {
+ op_data *m_op;
+ std::atomic<unsigned> *m_outstanding = nullptr;
+public:
+ C_Count(op_data *op, std::atomic<unsigned> *outstanding)
+ : m_op(op), m_outstanding(outstanding) {}
+ void finish(int r) override {
+ m_op->done++;
+ ceph_assert(*m_outstanding > 0);
+ (*m_outstanding)--;
+ }
+};
+
+int stress_test(uint64_t num_ops, uint64_t num_objs,
+ uint64_t max_obj_size, uint64_t delay_ns,
+ uint64_t max_op_len, float percent_reads)
+{
+ ceph::mutex lock = ceph::make_mutex("object_cacher_stress::object_cacher");
+ FakeWriteback writeback(g_ceph_context, &lock, delay_ns);
+
+ ObjectCacher obc(g_ceph_context, "test", writeback, lock, NULL, NULL,
+ g_conf()->client_oc_size,
+ g_conf()->client_oc_max_objects,
+ g_conf()->client_oc_max_dirty,
+ g_conf()->client_oc_target_dirty,
+ g_conf()->client_oc_max_dirty_age,
+ true);
+ obc.start();
+
+ std::atomic<unsigned> outstanding_reads = { 0 };
+ vector<std::shared_ptr<op_data> > ops;
+ ObjectCacher::ObjectSet object_set(NULL, 0, 0);
+ SnapContext snapc;
+ ceph::buffer::ptr bp(max_op_len);
+ ceph::bufferlist bl;
+ uint64_t journal_tid = 0;
+ bp.zero();
+ bl.append(bp);
+
+ // schedule ops
+ std::cout << "Test configuration:\n\n"
+ << setw(10) << "ops: " << num_ops << "\n"
+ << setw(10) << "objects: " << num_objs << "\n"
+ << setw(10) << "obj size: " << max_obj_size << "\n"
+ << setw(10) << "delay: " << delay_ns << "\n"
+ << setw(10) << "max op len: " << max_op_len << "\n"
+ << setw(10) << "percent reads: " << percent_reads << "\n\n";
+
+ for (uint64_t i = 0; i < num_ops; ++i) {
+ uint64_t offset = random() % max_obj_size;
+ uint64_t max_len = std::min(max_obj_size - offset, max_op_len);
+ // no zero-length operations
+ uint64_t length = random() % (std::max<uint64_t>(max_len - 1, 1)) + 1;
+ std::string oid = "test" + stringify(random() % num_objs);
+ bool is_read = random() < percent_reads * float(RAND_MAX);
+ std::shared_ptr<op_data> op(new op_data(oid, offset, length, is_read));
+ ops.push_back(op);
+ std::cout << "op " << i << " " << (is_read ? "read" : "write")
+ << " " << op->extent << "\n";
+ if (op->is_read) {
+ ObjectCacher::OSDRead *rd = obc.prepare_read(CEPH_NOSNAP, &op->result, 0);
+ rd->extents.push_back(op->extent);
+ outstanding_reads++;
+ Context *completion = new C_Count(op.get(), &outstanding_reads);
+ lock.lock();
+ int r = obc.readx(rd, &object_set, completion);
+ lock.unlock();
+ ceph_assert(r >= 0);
+ if ((uint64_t)r == length)
+ completion->complete(r);
+ else
+ ceph_assert(r == 0);
+ } else {
+ ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl,
+ ceph::real_time::min(), 0,
+ ++journal_tid);
+ wr->extents.push_back(op->extent);
+ lock.lock();
+ obc.writex(wr, &object_set, NULL);
+ lock.unlock();
+ }
+ }
+
+ // check that all reads completed
+ for (uint64_t i = 0; i < num_ops; ++i) {
+ if (!ops[i]->is_read)
+ continue;
+ std::cout << "waiting for read " << i << ops[i]->extent << std::endl;
+ uint64_t done = 0;
+ while (done == 0) {
+ done = ops[i]->done;
+ if (!done) {
+ usleep(500);
+ }
+ }
+ if (done > 1) {
+ std::cout << "completion called more than once!\n" << std::endl;
+ return EXIT_FAILURE;
+ }
+ }
+
+ lock.lock();
+ obc.release_set(&object_set);
+ lock.unlock();
+
+ int r = 0;
+ ceph::mutex mylock = ceph::make_mutex("librbd::ImageCtx::flush_cache");
+ ceph::condition_variable cond;
+ bool done;
+ Context *onfinish = new C_SafeCond(mylock, cond, &done, &r);
+ lock.lock();
+ bool already_flushed = obc.flush_set(&object_set, onfinish);
+ std::cout << "already flushed = " << already_flushed << std::endl;
+ lock.unlock();
+ {
+ std::unique_lock locker{mylock};
+ cond.wait(locker, [&done] { return done; });
+ }
+ lock.lock();
+ bool unclean = obc.release_set(&object_set);
+ lock.unlock();
+
+ if (unclean) {
+ std::cout << "unclean buffers left over!" << std::endl;
+ return EXIT_FAILURE;
+ }
+
+ obc.stop();
+
+ std::cout << "Test completed successfully." << std::endl;
+
+ return EXIT_SUCCESS;
+}
+
+int correctness_test(uint64_t delay_ns)
+{
+ std::cerr << "starting correctness test" << std::endl;
+ ceph::mutex lock = ceph::make_mutex("object_cacher_stress::object_cacher");
+ MemWriteback writeback(g_ceph_context, &lock, delay_ns);
+
+ ObjectCacher obc(g_ceph_context, "test", writeback, lock, NULL, NULL,
+ 1<<21, // max cache size, 2MB
+ 1, // max objects, just one
+ 1<<18, // max dirty, 256KB
+ 1<<17, // target dirty, 128KB
+ g_conf()->client_oc_max_dirty_age,
+ true);
+ obc.start();
+ std::cerr << "just start()ed ObjectCacher" << std::endl;
+
+ SnapContext snapc;
+ ceph_tid_t journal_tid = 0;
+ std::string oid("correctness_test_obj");
+ ObjectCacher::ObjectSet object_set(NULL, 0, 0);
+ ceph::bufferlist zeroes_bl;
+ zeroes_bl.append_zero(1<<20);
+
+ // set up a 4MB all-zero object
+ std::cerr << "writing 4x1MB object" << std::endl;
+ std::map<int, C_SaferCond> create_finishers;
+ for (int i = 0; i < 4; ++i) {
+ ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, zeroes_bl,
+ ceph::real_time::min(), 0,
+ ++journal_tid);
+ ObjectExtent extent(oid, 0, zeroes_bl.length()*i, zeroes_bl.length(), 0);
+ extent.oloc.pool = 0;
+ extent.buffer_extents.push_back(make_pair(0, 1<<20));
+ wr->extents.push_back(extent);
+ lock.lock();
+ obc.writex(wr, &object_set, &create_finishers[i]);
+ lock.unlock();
+ }
+
+ // write some 1-valued bits at 256-KB intervals for checking consistency
+ std::cerr << "Writing some 0xff values" << std::endl;
+ ceph::buffer::ptr ones(1<<16);
+ memset(ones.c_str(), 0xff, ones.length());
+ ceph::bufferlist ones_bl;
+ ones_bl.append(ones);
+ for (int i = 1<<18; i < 1<<22; i+=1<<18) {
+ ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, ones_bl,
+ ceph::real_time::min(), 0,
+ ++journal_tid);
+ ObjectExtent extent(oid, 0, i, ones_bl.length(), 0);
+ extent.oloc.pool = 0;
+ extent.buffer_extents.push_back(make_pair(0, 1<<16));
+ wr->extents.push_back(extent);
+ lock.lock();
+ obc.writex(wr, &object_set, &create_finishers[i]);
+ lock.unlock();
+ }
+
+ for (auto i = create_finishers.begin(); i != create_finishers.end(); ++i) {
+ i->second.wait();
+ }
+ std::cout << "Finished setting up object" << std::endl;
+ lock.lock();
+ C_SaferCond flushcond;
+ bool done = obc.flush_all(&flushcond);
+ if (!done) {
+ std::cout << "Waiting for flush" << std::endl;
+ lock.unlock();
+ flushcond.wait();
+ lock.lock();
+ }
+ lock.unlock();
+
+ /* now read the back half of the object in, check consistency,
+ */
+ std::cout << "Reading back half of object (1<<21~1<<21)" << std::endl;
+ bufferlist readbl;
+ C_SaferCond backreadcond;
+ ObjectCacher::OSDRead *back_half_rd = obc.prepare_read(CEPH_NOSNAP, &readbl, 0);
+ ObjectExtent back_half_extent(oid, 0, 1<<21, 1<<21, 0);
+ back_half_extent.oloc.pool = 0;
+ back_half_extent.buffer_extents.push_back(make_pair(0, 1<<21));
+ back_half_rd->extents.push_back(back_half_extent);
+ lock.lock();
+ int r = obc.readx(back_half_rd, &object_set, &backreadcond);
+ lock.unlock();
+ ceph_assert(r >= 0);
+ if (r == 0) {
+ std::cout << "Waiting to read data into cache" << std::endl;
+ r = backreadcond.wait();
+ }
+
+ ceph_assert(r == 1<<21);
+
+ /* Read the whole object in,
+ * verify we have to wait for it to complete,
+ * overwrite a small piece, (http://tracker.ceph.com/issues/16002),
+ * and check consistency */
+
+ readbl.clear();
+ std::cout<< "Reading whole object (0~1<<22)" << std::endl;
+ C_SaferCond frontreadcond;
+ ObjectCacher::OSDRead *whole_rd = obc.prepare_read(CEPH_NOSNAP, &readbl, 0);
+ ObjectExtent whole_extent(oid, 0, 0, 1<<22, 0);
+ whole_extent.oloc.pool = 0;
+ whole_extent.buffer_extents.push_back(make_pair(0, 1<<22));
+ whole_rd->extents.push_back(whole_extent);
+ lock.lock();
+ r = obc.readx(whole_rd, &object_set, &frontreadcond);
+ // we cleared out the cache by reading back half, it shouldn't pass immediately!
+ ceph_assert(r == 0);
+ std::cout << "Data (correctly) not available without fetching" << std::endl;
+
+ ObjectCacher::OSDWrite *verify_wr = obc.prepare_write(snapc, ones_bl,
+ ceph::real_time::min(), 0,
+ ++journal_tid);
+ ObjectExtent verify_extent(oid, 0, (1<<18)+(1<<16), ones_bl.length(), 0);
+ verify_extent.oloc.pool = 0;
+ verify_extent.buffer_extents.push_back(make_pair(0, 1<<16));
+ verify_wr->extents.push_back(verify_extent);
+ C_SaferCond verify_finisher;
+ obc.writex(verify_wr, &object_set, &verify_finisher);
+ lock.unlock();
+ std::cout << "wrote dirtying data" << std::endl;
+
+ std::cout << "Waiting to read data into cache" << std::endl;
+ frontreadcond.wait();
+ verify_finisher.wait();
+
+ std::cout << "Validating data" << std::endl;
+
+ for (int i = 1<<18; i < 1<<22; i+=1<<18) {
+ bufferlist ones_maybe;
+ ones_maybe.substr_of(readbl, i, ones_bl.length());
+ ceph_assert(0 == memcmp(ones_maybe.c_str(), ones_bl.c_str(), ones_bl.length()));
+ }
+ bufferlist ones_maybe;
+ ones_maybe.substr_of(readbl, (1<<18)+(1<<16), ones_bl.length());
+ ceph_assert(0 == memcmp(ones_maybe.c_str(), ones_bl.c_str(), ones_bl.length()));
+
+ std::cout << "validated that data is 0xff where it should be" << std::endl;
+
+ lock.lock();
+ C_SaferCond flushcond2;
+ done = obc.flush_all(&flushcond2);
+ if (!done) {
+ std::cout << "Waiting for final write flush" << std::endl;
+ lock.unlock();
+ flushcond2.wait();
+ lock.lock();
+ }
+
+ bool unclean = obc.release_set(&object_set);
+ if (unclean) {
+ std::cout << "unclean buffers left over!" << std::endl;
+ vector<ObjectExtent> discard_extents;
+ int i = 0;
+ for (auto oi = object_set.objects.begin(); !oi.end(); ++oi) {
+ discard_extents.emplace_back(oid, i++, 0, 1<<22, 0);
+ }
+ obc.discard_set(&object_set, discard_extents);
+ lock.unlock();
+ obc.stop();
+ goto fail;
+ }
+ lock.unlock();
+
+ obc.stop();
+
+ std::cout << "Testing ObjectCacher correctness complete" << std::endl;
+ return EXIT_SUCCESS;
+
+ fail:
+ return EXIT_FAILURE;
+}
+
+int main(int argc, const char **argv)
+{
+ auto args = argv_to_vec(argc, argv);
+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_UTILITY,
+ CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+
+ long long delay_ns = 0;
+ long long num_ops = 1000;
+ long long obj_bytes = 4 << 20;
+ long long max_len = 128 << 10;
+ long long num_objs = 10;
+ float percent_reads = 0.90;
+ int seed = time(0) % 100000;
+ bool stress = false;
+ bool correctness = false;
+ std::ostringstream err;
+ std::vector<const char*>::iterator i;
+ for (i = args.begin(); i != args.end();) {
+ if (ceph_argparse_witharg(args, i, &delay_ns, err, "--delay-ns", (char*)NULL)) {
+ if (!err.str().empty()) {
+ cerr << argv[0] << ": " << err.str() << std::endl;
+ return EXIT_FAILURE;
+ }
+ } else if (ceph_argparse_witharg(args, i, &num_ops, err, "--ops", (char*)NULL)) {
+ if (!err.str().empty()) {
+ cerr << argv[0] << ": " << err.str() << std::endl;
+ return EXIT_FAILURE;
+ }
+ } else if (ceph_argparse_witharg(args, i, &num_objs, err, "--objects", (char*)NULL)) {
+ if (!err.str().empty()) {
+ cerr << argv[0] << ": " << err.str() << std::endl;
+ return EXIT_FAILURE;
+ }
+ } else if (ceph_argparse_witharg(args, i, &obj_bytes, err, "--obj-size", (char*)NULL)) {
+ if (!err.str().empty()) {
+ cerr << argv[0] << ": " << err.str() << std::endl;
+ return EXIT_FAILURE;
+ }
+ } else if (ceph_argparse_witharg(args, i, &max_len, err, "--max-op-size", (char*)NULL)) {
+ if (!err.str().empty()) {
+ cerr << argv[0] << ": " << err.str() << std::endl;
+ return EXIT_FAILURE;
+ }
+ } else if (ceph_argparse_witharg(args, i, &percent_reads, err, "--percent-read", (char*)NULL)) {
+ if (!err.str().empty()) {
+ cerr << argv[0] << ": " << err.str() << std::endl;
+ return EXIT_FAILURE;
+ }
+ } else if (ceph_argparse_witharg(args, i, &seed, err, "--seed", (char*)NULL)) {
+ if (!err.str().empty()) {
+ cerr << argv[0] << ": " << err.str() << std::endl;
+ return EXIT_FAILURE;
+ }
+ } else if (ceph_argparse_flag(args, i, "--stress-test", NULL)) {
+ stress = true;
+ } else if (ceph_argparse_flag(args, i, "--correctness-test", NULL)) {
+ correctness = true;
+ } else {
+ cerr << "unknown option " << *i << std::endl;
+ return EXIT_FAILURE;
+ }
+ }
+
+ if (stress) {
+ srandom(seed);
+ return stress_test(num_ops, num_objs, obj_bytes, delay_ns, max_len, percent_reads);
+ }
+ if (correctness) {
+ return correctness_test(delay_ns);
+ }
+}