summaryrefslogtreecommitdiffstats
path: root/src/test/journal/test_ObjectRecorder.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/test/journal/test_ObjectRecorder.cc
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/journal/test_ObjectRecorder.cc')
-rw-r--r--src/test/journal/test_ObjectRecorder.cc469
1 files changed, 469 insertions, 0 deletions
diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc
new file mode 100644
index 00000000..3cc8e893
--- /dev/null
+++ b/src/test/journal/test_ObjectRecorder.cc
@@ -0,0 +1,469 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectRecorder.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Timer.h"
+#include "gtest/gtest.h"
+#include "test/librados/test.h"
+#include "test/journal/RadosTestFixture.h"
+#include <limits>
+
+using std::shared_ptr;
+
+class TestObjectRecorder : public RadosTestFixture {
+public:
+ TestObjectRecorder()
+ : m_flush_interval(std::numeric_limits<uint32_t>::max()),
+ m_flush_bytes(std::numeric_limits<uint64_t>::max()),
+ m_flush_age(600)
+ {
+ }
+
+ struct Handler : public journal::ObjectRecorder::Handler {
+ Mutex lock;
+ shared_ptr<Mutex> object_lock;
+ Cond cond;
+ bool is_closed = false;
+ uint32_t overflows = 0;
+
+ Handler() : lock("lock") {
+ }
+
+ void closed(journal::ObjectRecorder *object_recorder) override {
+ Mutex::Locker locker(lock);
+ is_closed = true;
+ cond.Signal();
+ }
+ void overflow(journal::ObjectRecorder *object_recorder) override {
+ Mutex::Locker locker(lock);
+ journal::AppendBuffers append_buffers;
+ object_lock->Lock();
+ object_recorder->claim_append_buffers(&append_buffers);
+ object_lock->Unlock();
+
+ ++overflows;
+ cond.Signal();
+ }
+ };
+
+ typedef std::list<journal::ObjectRecorderPtr> ObjectRecorders;
+ typedef std::map<std::string, shared_ptr<Mutex>> ObjectRecorderLocksMap;
+
+ ObjectRecorders m_object_recorders;
+ ObjectRecorderLocksMap m_object_recorder_locks;
+
+ uint32_t m_flush_interval;
+ uint64_t m_flush_bytes;
+ double m_flush_age;
+ uint64_t m_max_in_flight_appends = 0;
+ Handler m_handler;
+
+ void TearDown() override {
+ for (ObjectRecorders::iterator it = m_object_recorders.begin();
+ it != m_object_recorders.end(); ++it) {
+ C_SaferCond cond;
+ (*it)->flush(&cond);
+ cond.wait();
+ }
+ m_object_recorders.clear();
+
+ RadosTestFixture::TearDown();
+ }
+
+ inline void set_batch_options(uint32_t flush_interval, uint64_t flush_bytes,
+ double flush_age, int max_in_flight) {
+ m_flush_interval = flush_interval;
+ m_flush_bytes = flush_bytes;
+ m_flush_age = flush_age;
+ m_max_in_flight_appends = max_in_flight;
+ }
+
+ journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
+ const std::string &payload) {
+ journal::FutureImplPtr future(new journal::FutureImpl(tag_tid, entry_tid,
+ 456));
+ future->init(journal::FutureImplPtr());
+
+ bufferlist bl;
+ bl.append(payload);
+ return std::make_pair(future, bl);
+ }
+
+ journal::ObjectRecorderPtr create_object(const std::string &oid,
+ uint8_t order, shared_ptr<Mutex> lock) {
+ journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
+ m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order,
+ m_max_in_flight_appends));
+ {
+ Mutex::Locker locker(*lock);
+ object->set_append_batch_options(m_flush_interval, m_flush_bytes,
+ m_flush_age);
+ }
+ m_object_recorders.push_back(object);
+ m_object_recorder_locks.insert(std::make_pair(oid, lock));
+ m_handler.object_lock = lock;
+ return object;
+ }
+};
+
+TEST_F(TestObjectRecorder, Append) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ set_batch_options(0, 0, 0, 0);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock->Lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+ ASSERT_EQ(0U, object->get_pending_appends());
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
+ "payload");
+ append_buffers = {append_buffer2};
+ lock->Lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+ ASSERT_EQ(0U, object->get_pending_appends());
+
+ C_SaferCond cond;
+ append_buffer2.first->flush(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, AppendFlushByCount) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ set_batch_options(2, 0, 0, -1);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock->Lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
+ "payload");
+ append_buffers = {append_buffer2};
+ lock->Lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+ ASSERT_EQ(0U, object->get_pending_appends());
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, AppendFlushByBytes) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ set_batch_options(0, 10, 0, -1);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock->Lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
+ "payload");
+ append_buffers = {append_buffer2};
+ lock->Lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+ ASSERT_EQ(0U, object->get_pending_appends());
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, AppendFlushByAge) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ set_batch_options(0, 0, 0.1, -1);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock->Lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
+ "payload");
+ append_buffers = {append_buffer2};
+ lock->Lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, AppendFilledObject) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 12, lock);
+
+ std::string payload(2048, '1');
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ payload);
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock->Lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
+ payload);
+ append_buffers = {append_buffer2};
+ lock->Lock();
+ ASSERT_TRUE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, Flush) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ set_batch_options(0, 10, 0, -1);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock->Lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ C_SaferCond cond1;
+ object->flush(&cond1);
+ ASSERT_EQ(0, cond1.wait());
+
+ C_SaferCond cond2;
+ append_buffer1.first->wait(&cond2);
+ ASSERT_EQ(0, cond2.wait());
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, FlushFuture) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ set_batch_options(0, 10, 0, -1);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+
+ journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer};
+ lock->Lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ C_SaferCond cond;
+ append_buffer.first->wait(&cond);
+ object->flush(append_buffer.first);
+ ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
+ append_buffer.first->is_complete());
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, FlushDetachedFuture) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+
+ journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
+ "payload");
+
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer};
+
+ object->flush(append_buffer.first);
+ ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
+ lock->Lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+
+ // should automatically flush once its attached to the object
+ C_SaferCond cond;
+ append_buffer.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, Close) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ set_batch_options(2, 0, 0, -1);
+ shared_ptr<Mutex> lock(new Mutex("object_recorder_lock"));
+ journal::ObjectRecorderPtr object = create_object(oid, 24, lock);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1};
+ lock->Lock();
+ ASSERT_FALSE(object->append(std::move(append_buffers)));
+ lock->Unlock();
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ lock->Lock();
+ ASSERT_FALSE(object->close());
+ ASSERT_TRUE(lock->is_locked());
+ lock->Unlock();
+
+ {
+ Mutex::Locker locker(m_handler.lock);
+ while (!m_handler.is_closed) {
+ if (m_handler.cond.WaitInterval(
+ m_handler.lock, utime_t(10, 0)) != 0) {
+ break;
+ }
+ }
+ }
+
+ ASSERT_TRUE(m_handler.is_closed);
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, Overflow) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ shared_ptr<Mutex> lock1(new Mutex("object_recorder_lock_1"));
+ journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1);
+
+ std::string payload(1 << 11, '1');
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
+ payload);
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
+ payload);
+ journal::AppendBuffers append_buffers;
+ append_buffers = {append_buffer1, append_buffer2};
+ lock1->Lock();
+ ASSERT_TRUE(object1->append(std::move(append_buffers)));
+ lock1->Unlock();
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_EQ(0U, object1->get_pending_appends());
+
+ bool overflowed = false;
+ {
+ Mutex::Locker locker(m_handler.lock);
+ while (m_handler.overflows == 0) {
+ if (m_handler.cond.WaitInterval(
+ m_handler.lock, utime_t(10, 0)) != 0) {
+ break;
+ }
+ }
+ if (m_handler.overflows != 0) {
+ overflowed = true;
+ m_handler.overflows = 0;
+ }
+ }
+
+ ASSERT_TRUE(overflowed);
+
+ shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2"));
+ journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2);
+
+ journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
+ payload);
+ append_buffers = {append_buffer3};
+ lock2->Lock();
+ ASSERT_FALSE(object2->append(std::move(append_buffers)));
+ lock2->Unlock();
+ append_buffer3.first->flush(NULL);
+
+ overflowed = false;
+ {
+ Mutex::Locker locker(m_handler.lock);
+ while (m_handler.overflows == 0) {
+ if (m_handler.cond.WaitInterval(
+ m_handler.lock, utime_t(10, 0)) != 0) {
+ break;
+ }
+ }
+ if (m_handler.overflows != 0) {
+ overflowed = true;
+ }
+ }
+
+ ASSERT_TRUE(overflowed);
+}