diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/test/journal/test_ObjectRecorder.cc | |
parent | Initial commit. (diff) | |
download | ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/journal/test_ObjectRecorder.cc')
-rw-r--r-- | src/test/journal/test_ObjectRecorder.cc | 469 |
1 files changed, 469 insertions, 0 deletions
diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc new file mode 100644 index 00000000..3cc8e893 --- /dev/null +++ b/src/test/journal/test_ObjectRecorder.cc @@ -0,0 +1,469 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "journal/ObjectRecorder.h" +#include "common/Cond.h" +#include "common/Mutex.h" +#include "common/Timer.h" +#include "gtest/gtest.h" +#include "test/librados/test.h" +#include "test/journal/RadosTestFixture.h" +#include <limits> + +using std::shared_ptr; + +class TestObjectRecorder : public RadosTestFixture { +public: + TestObjectRecorder() + : m_flush_interval(std::numeric_limits<uint32_t>::max()), + m_flush_bytes(std::numeric_limits<uint64_t>::max()), + m_flush_age(600) + { + } + + struct Handler : public journal::ObjectRecorder::Handler { + Mutex lock; + shared_ptr<Mutex> object_lock; + Cond cond; + bool is_closed = false; + uint32_t overflows = 0; + + Handler() : lock("lock") { + } + + void closed(journal::ObjectRecorder *object_recorder) override { + Mutex::Locker locker(lock); + is_closed = true; + cond.Signal(); + } + void overflow(journal::ObjectRecorder *object_recorder) override { + Mutex::Locker locker(lock); + journal::AppendBuffers append_buffers; + object_lock->Lock(); + object_recorder->claim_append_buffers(&append_buffers); + object_lock->Unlock(); + + ++overflows; + cond.Signal(); + } + }; + + typedef std::list<journal::ObjectRecorderPtr> ObjectRecorders; + typedef std::map<std::string, shared_ptr<Mutex>> ObjectRecorderLocksMap; + + ObjectRecorders m_object_recorders; + ObjectRecorderLocksMap m_object_recorder_locks; + + uint32_t m_flush_interval; + uint64_t m_flush_bytes; + double m_flush_age; + uint64_t m_max_in_flight_appends = 0; + Handler m_handler; + + void TearDown() override { + for (ObjectRecorders::iterator it = m_object_recorders.begin(); + it != m_object_recorders.end(); ++it) { + C_SaferCond cond; + (*it)->flush(&cond); + cond.wait(); + } + m_object_recorders.clear(); + + RadosTestFixture::TearDown(); + } + + inline void set_batch_options(uint32_t flush_interval, uint64_t flush_bytes, + double flush_age, int max_in_flight) { + m_flush_interval = flush_interval; + m_flush_bytes = flush_bytes; + m_flush_age = flush_age; + m_max_in_flight_appends = max_in_flight; + } + + journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid, + const std::string &payload) { + journal::FutureImplPtr future(new journal::FutureImpl(tag_tid, entry_tid, + 456)); + future->init(journal::FutureImplPtr()); + + bufferlist bl; + bl.append(payload); + return std::make_pair(future, bl); + } + + journal::ObjectRecorderPtr create_object(const std::string &oid, + uint8_t order, shared_ptr<Mutex> lock) { + journal::ObjectRecorderPtr object(new journal::ObjectRecorder( + m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order, + m_max_in_flight_appends)); + { + Mutex::Locker locker(*lock); + object->set_append_batch_options(m_flush_interval, m_flush_bytes, + m_flush_age); + } + m_object_recorders.push_back(object); + m_object_recorder_locks.insert(std::make_pair(oid, lock)); + m_handler.object_lock = lock; + return object; + } +}; + +TEST_F(TestObjectRecorder, Append) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + set_batch_options(0, 0, 0, 0); + shared_ptr<Mutex> lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock->Lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock->Unlock(); + ASSERT_EQ(0U, object->get_pending_appends()); + + journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, + "payload"); + append_buffers = {append_buffer2}; + lock->Lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock->Unlock(); + ASSERT_EQ(0U, object->get_pending_appends()); + + C_SaferCond cond; + append_buffer2.first->flush(&cond); + ASSERT_EQ(0, cond.wait()); + ASSERT_EQ(0U, object->get_pending_appends()); +} + +TEST_F(TestObjectRecorder, AppendFlushByCount) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + set_batch_options(2, 0, 0, -1); + shared_ptr<Mutex> lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock->Lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock->Unlock(); + ASSERT_EQ(1U, object->get_pending_appends()); + + journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, + "payload"); + append_buffers = {append_buffer2}; + lock->Lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock->Unlock(); + ASSERT_EQ(0U, object->get_pending_appends()); + + C_SaferCond cond; + append_buffer2.first->wait(&cond); + ASSERT_EQ(0, cond.wait()); +} + +TEST_F(TestObjectRecorder, AppendFlushByBytes) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + set_batch_options(0, 10, 0, -1); + shared_ptr<Mutex> lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock->Lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock->Unlock(); + ASSERT_EQ(1U, object->get_pending_appends()); + + journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, + "payload"); + append_buffers = {append_buffer2}; + lock->Lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock->Unlock(); + ASSERT_EQ(0U, object->get_pending_appends()); + + C_SaferCond cond; + append_buffer2.first->wait(&cond); + ASSERT_EQ(0, cond.wait()); +} + +TEST_F(TestObjectRecorder, AppendFlushByAge) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + set_batch_options(0, 0, 0.1, -1); + shared_ptr<Mutex> lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock->Lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock->Unlock(); + + journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, + "payload"); + append_buffers = {append_buffer2}; + lock->Lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock->Unlock(); + + C_SaferCond cond; + append_buffer2.first->wait(&cond); + ASSERT_EQ(0, cond.wait()); + ASSERT_EQ(0U, object->get_pending_appends()); +} + +TEST_F(TestObjectRecorder, AppendFilledObject) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + shared_ptr<Mutex> lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 12, lock); + + std::string payload(2048, '1'); + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + payload); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock->Lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock->Unlock(); + + journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, + payload); + append_buffers = {append_buffer2}; + lock->Lock(); + ASSERT_TRUE(object->append(std::move(append_buffers))); + lock->Unlock(); + + C_SaferCond cond; + append_buffer2.first->wait(&cond); + ASSERT_EQ(0, cond.wait()); + ASSERT_EQ(0U, object->get_pending_appends()); +} + +TEST_F(TestObjectRecorder, Flush) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + set_batch_options(0, 10, 0, -1); + shared_ptr<Mutex> lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock->Lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock->Unlock(); + ASSERT_EQ(1U, object->get_pending_appends()); + + C_SaferCond cond1; + object->flush(&cond1); + ASSERT_EQ(0, cond1.wait()); + + C_SaferCond cond2; + append_buffer1.first->wait(&cond2); + ASSERT_EQ(0, cond2.wait()); + ASSERT_EQ(0U, object->get_pending_appends()); +} + +TEST_F(TestObjectRecorder, FlushFuture) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + set_batch_options(0, 10, 0, -1); + shared_ptr<Mutex> lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + + journal::AppendBuffer append_buffer = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer}; + lock->Lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock->Unlock(); + ASSERT_EQ(1U, object->get_pending_appends()); + + C_SaferCond cond; + append_buffer.first->wait(&cond); + object->flush(append_buffer.first); + ASSERT_TRUE(append_buffer.first->is_flush_in_progress() || + append_buffer.first->is_complete()); + ASSERT_EQ(0, cond.wait()); +} + +TEST_F(TestObjectRecorder, FlushDetachedFuture) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + shared_ptr<Mutex> lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + + journal::AppendBuffer append_buffer = create_append_buffer(234, 123, + "payload"); + + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer}; + + object->flush(append_buffer.first); + ASSERT_FALSE(append_buffer.first->is_flush_in_progress()); + lock->Lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock->Unlock(); + + // should automatically flush once its attached to the object + C_SaferCond cond; + append_buffer.first->wait(&cond); + ASSERT_EQ(0, cond.wait()); +} + +TEST_F(TestObjectRecorder, Close) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + set_batch_options(2, 0, 0, -1); + shared_ptr<Mutex> lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + "payload"); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1}; + lock->Lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock->Unlock(); + ASSERT_EQ(1U, object->get_pending_appends()); + + lock->Lock(); + ASSERT_FALSE(object->close()); + ASSERT_TRUE(lock->is_locked()); + lock->Unlock(); + + { + Mutex::Locker locker(m_handler.lock); + while (!m_handler.is_closed) { + if (m_handler.cond.WaitInterval( + m_handler.lock, utime_t(10, 0)) != 0) { + break; + } + } + } + + ASSERT_TRUE(m_handler.is_closed); + ASSERT_EQ(0U, object->get_pending_appends()); +} + +TEST_F(TestObjectRecorder, Overflow) { + std::string oid = get_temp_oid(); + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + shared_ptr<Mutex> lock1(new Mutex("object_recorder_lock_1")); + journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1); + + std::string payload(1 << 11, '1'); + journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, + payload); + journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, + payload); + journal::AppendBuffers append_buffers; + append_buffers = {append_buffer1, append_buffer2}; + lock1->Lock(); + ASSERT_TRUE(object1->append(std::move(append_buffers))); + lock1->Unlock(); + + C_SaferCond cond; + append_buffer2.first->wait(&cond); + ASSERT_EQ(0, cond.wait()); + ASSERT_EQ(0U, object1->get_pending_appends()); + + bool overflowed = false; + { + Mutex::Locker locker(m_handler.lock); + while (m_handler.overflows == 0) { + if (m_handler.cond.WaitInterval( + m_handler.lock, utime_t(10, 0)) != 0) { + break; + } + } + if (m_handler.overflows != 0) { + overflowed = true; + m_handler.overflows = 0; + } + } + + ASSERT_TRUE(overflowed); + + shared_ptr<Mutex> lock2(new Mutex("object_recorder_lock_2")); + journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2); + + journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123, + payload); + append_buffers = {append_buffer3}; + lock2->Lock(); + ASSERT_FALSE(object2->append(std::move(append_buffers))); + lock2->Unlock(); + append_buffer3.first->flush(NULL); + + overflowed = false; + { + Mutex::Locker locker(m_handler.lock); + while (m_handler.overflows == 0) { + if (m_handler.cond.WaitInterval( + m_handler.lock, utime_t(10, 0)) != 0) { + break; + } + } + if (m_handler.overflows != 0) { + overflowed = true; + } + } + + ASSERT_TRUE(overflowed); +} |