summaryrefslogtreecommitdiffstats
path: root/src/tools/immutable_object_cache
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/tools/immutable_object_cache
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tools/immutable_object_cache')
-rw-r--r--src/tools/immutable_object_cache/CMakeLists.txt19
-rw-r--r--src/tools/immutable_object_cache/CacheClient.cc435
-rw-r--r--src/tools/immutable_object_cache/CacheClient.h84
-rw-r--r--src/tools/immutable_object_cache/CacheController.cc139
-rw-r--r--src/tools/immutable_object_cache/CacheController.h40
-rw-r--r--src/tools/immutable_object_cache/CacheServer.cc106
-rw-r--r--src/tools/immutable_object_cache/CacheServer.h45
-rw-r--r--src/tools/immutable_object_cache/CacheSession.cc140
-rw-r--r--src/tools/immutable_object_cache/CacheSession.h56
-rw-r--r--src/tools/immutable_object_cache/ObjectCacheStore.cc466
-rw-r--r--src/tools/immutable_object_cache/ObjectCacheStore.h85
-rw-r--r--src/tools/immutable_object_cache/Policy.h34
-rw-r--r--src/tools/immutable_object_cache/SimplePolicy.cc216
-rw-r--r--src/tools/immutable_object_cache/SimplePolicy.h68
-rw-r--r--src/tools/immutable_object_cache/SocketCommon.h31
-rw-r--r--src/tools/immutable_object_cache/Types.cc184
-rw-r--r--src/tools/immutable_object_cache/Types.h136
-rw-r--r--src/tools/immutable_object_cache/Utils.h31
-rw-r--r--src/tools/immutable_object_cache/main.cc84
19 files changed, 2399 insertions, 0 deletions
diff --git a/src/tools/immutable_object_cache/CMakeLists.txt b/src/tools/immutable_object_cache/CMakeLists.txt
new file mode 100644
index 000000000..ed118ed6f
--- /dev/null
+++ b/src/tools/immutable_object_cache/CMakeLists.txt
@@ -0,0 +1,19 @@
+set(ceph_immutable_object_cache_files
+ ObjectCacheStore.cc
+ CacheController.cc
+ CacheServer.cc
+ CacheClient.cc
+ CacheSession.cc
+ SimplePolicy.cc
+ Types.cc
+ )
+add_library(ceph_immutable_object_cache_lib STATIC ${ceph_immutable_object_cache_files})
+
+add_executable(ceph-immutable-object-cache
+ main.cc)
+target_link_libraries(ceph-immutable-object-cache
+ ceph_immutable_object_cache_lib
+ librados
+ StdFilesystem::filesystem
+ global)
+install(TARGETS ceph-immutable-object-cache DESTINATION bin)
diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc
new file mode 100644
index 000000000..2b837be51
--- /dev/null
+++ b/src/tools/immutable_object_cache/CacheClient.cc
@@ -0,0 +1,435 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/bind/bind.hpp>
+#include "CacheClient.h"
+#include "common/Cond.h"
+#include "common/version.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::CacheClient: " << this << " " \
+ << __func__ << ": "
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+ CacheClient::CacheClient(const std::string& file, CephContext* ceph_ctx)
+ : m_cct(ceph_ctx), m_io_service_work(m_io_service),
+ m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
+ m_io_thread(nullptr), m_session_work(false), m_writing(false),
+ m_reading(false), m_sequence_id(0) {
+ m_worker_thread_num =
+ m_cct->_conf.get_val<uint64_t>(
+ "immutable_object_cache_client_dedicated_thread_num");
+
+ if (m_worker_thread_num != 0) {
+ m_worker = new boost::asio::io_service();
+ m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
+ for (uint64_t i = 0; i < m_worker_thread_num; i++) {
+ std::thread* thd = new std::thread([this](){m_worker->run();});
+ m_worker_threads.push_back(thd);
+ }
+ }
+ m_bp_header = buffer::create(get_header_size());
+ }
+
+ CacheClient::~CacheClient() {
+ stop();
+ }
+
+ void CacheClient::run() {
+ m_io_thread.reset(new std::thread([this](){m_io_service.run(); }));
+ }
+
+ bool CacheClient::is_session_work() {
+ return m_session_work.load() == true;
+ }
+
+ int CacheClient::stop() {
+ m_session_work.store(false);
+ m_io_service.stop();
+
+ if (m_io_thread != nullptr) {
+ m_io_thread->join();
+ }
+ if (m_worker_thread_num != 0) {
+ m_worker->stop();
+ for (auto thd : m_worker_threads) {
+ thd->join();
+ delete thd;
+ }
+ delete m_worker_io_service_work;
+ delete m_worker;
+ }
+ return 0;
+ }
+
+ // close domain socket
+ void CacheClient::close() {
+ m_session_work.store(false);
+ boost::system::error_code close_ec;
+ m_dm_socket.close(close_ec);
+ if (close_ec) {
+ ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
+ }
+ }
+
+ // sync connect
+ int CacheClient::connect() {
+ int ret = -1;
+ C_SaferCond cond;
+ Context* on_finish = new LambdaContext([&cond, &ret](int err) {
+ ret = err;
+ cond.complete(err);
+ });
+
+ connect(on_finish);
+ cond.wait();
+
+ return ret;
+ }
+
+ // async connect
+ void CacheClient::connect(Context* on_finish) {
+ m_dm_socket.async_connect(m_ep,
+ boost::bind(&CacheClient::handle_connect, this,
+ on_finish, boost::asio::placeholders::error));
+ }
+
+ void CacheClient::handle_connect(Context* on_finish,
+ const boost::system::error_code& err) {
+ if (err) {
+ ldout(m_cct, 20) << "fails to connect to cache server. error : "
+ << err.message() << dendl;
+ fault(ASIO_ERROR_CONNECT, err);
+ on_finish->complete(-1);
+ return;
+ }
+
+ ldout(m_cct, 20) << "successfully connected to cache server." << dendl;
+ on_finish->complete(0);
+ }
+
+ void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, uint64_t object_size,
+ std::string oid,
+ CacheGenContextURef&& on_finish) {
+ ldout(m_cct, 20) << dendl;
+ ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
+ ++m_sequence_id, 0, 0, pool_id,
+ snap_id, object_size, oid, pool_nspace);
+ req->process_msg = std::move(on_finish);
+ req->encode();
+
+ {
+ std::lock_guard locker{m_lock};
+ m_outcoming_bl.append(req->get_payload_bufferlist());
+ ceph_assert(m_seq_to_req.find(req->seq) == m_seq_to_req.end());
+ m_seq_to_req[req->seq] = req;
+ }
+
+ // try to send message to server.
+ try_send();
+
+ // try to receive ack from server.
+ try_receive();
+ }
+
+ void CacheClient::try_send() {
+ ldout(m_cct, 20) << dendl;
+ if (!m_writing.load()) {
+ m_writing.store(true);
+ send_message();
+ }
+ }
+
+ void CacheClient::send_message() {
+ ldout(m_cct, 20) << dendl;
+ bufferlist bl;
+ {
+ std::lock_guard locker{m_lock};
+ bl.swap(m_outcoming_bl);
+ ceph_assert(m_outcoming_bl.length() == 0);
+ }
+
+ // send bytes as many as possible.
+ boost::asio::async_write(m_dm_socket,
+ boost::asio::buffer(bl.c_str(), bl.length()),
+ boost::asio::transfer_exactly(bl.length()),
+ [this, bl](const boost::system::error_code& err, size_t cb) {
+ if (err || cb != bl.length()) {
+ fault(ASIO_ERROR_WRITE, err);
+ return;
+ }
+
+ ceph_assert(cb == bl.length());
+
+ {
+ std::lock_guard locker{m_lock};
+ if (m_outcoming_bl.length() == 0) {
+ m_writing.store(false);
+ return;
+ }
+ }
+
+ // still have left bytes, continue to send.
+ send_message();
+ });
+ try_receive();
+ }
+
+ void CacheClient::try_receive() {
+ ldout(m_cct, 20) << dendl;
+ if (!m_reading.load()) {
+ m_reading.store(true);
+ receive_message();
+ }
+ }
+
+ void CacheClient::receive_message() {
+ ldout(m_cct, 20) << dendl;
+ ceph_assert(m_reading.load());
+ read_reply_header();
+ }
+
+ void CacheClient::read_reply_header() {
+ ldout(m_cct, 20) << dendl;
+ /* create new head buffer for every reply */
+ bufferptr bp_head(buffer::create(get_header_size()));
+ auto raw_ptr = bp_head.c_str();
+
+ boost::asio::async_read(m_dm_socket,
+ boost::asio::buffer(raw_ptr, get_header_size()),
+ boost::asio::transfer_exactly(get_header_size()),
+ boost::bind(&CacheClient::handle_reply_header,
+ this, bp_head,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+ }
+
+ void CacheClient::handle_reply_header(bufferptr bp_head,
+ const boost::system::error_code& ec,
+ size_t bytes_transferred) {
+ ldout(m_cct, 20) << dendl;
+ if (ec || bytes_transferred != get_header_size()) {
+ fault(ASIO_ERROR_READ, ec);
+ return;
+ }
+
+ ceph_assert(bytes_transferred == bp_head.length());
+
+ uint32_t data_len = get_data_len(bp_head.c_str());
+
+ bufferptr bp_data(buffer::create(data_len));
+ read_reply_data(std::move(bp_head), std::move(bp_data), data_len);
+ }
+
+ void CacheClient::read_reply_data(bufferptr&& bp_head,
+ bufferptr&& bp_data,
+ const uint64_t data_len) {
+ ldout(m_cct, 20) << dendl;
+ auto raw_ptr = bp_data.c_str();
+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
+ boost::asio::transfer_exactly(data_len),
+ boost::bind(&CacheClient::handle_reply_data,
+ this, std::move(bp_head), std::move(bp_data), data_len,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+ }
+
+ void CacheClient::handle_reply_data(bufferptr bp_head,
+ bufferptr bp_data,
+ const uint64_t data_len,
+ const boost::system::error_code& ec,
+ size_t bytes_transferred) {
+ ldout(m_cct, 20) << dendl;
+ if (ec || bytes_transferred != data_len) {
+ fault(ASIO_ERROR_WRITE, ec);
+ return;
+ }
+ ceph_assert(bp_data.length() == data_len);
+
+ bufferlist data_buffer;
+ data_buffer.append(std::move(bp_head));
+ data_buffer.append(std::move(bp_data));
+
+ ObjectCacheRequest* reply = decode_object_cache_request(data_buffer);
+ data_buffer.clear();
+ ceph_assert(data_buffer.length() == 0);
+
+ process(reply, reply->seq);
+
+ {
+ std::lock_guard locker{m_lock};
+ if (m_seq_to_req.size() == 0 && m_outcoming_bl.length()) {
+ m_reading.store(false);
+ return;
+ }
+ }
+ if (is_session_work()) {
+ receive_message();
+ }
+ }
+
+ void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
+ ldout(m_cct, 20) << dendl;
+ ObjectCacheRequest* current_request = nullptr;
+ {
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
+ current_request = m_seq_to_req[seq_id];
+ m_seq_to_req.erase(seq_id);
+ }
+
+ ceph_assert(current_request != nullptr);
+ auto process_reply = new LambdaContext([current_request, reply]
+ (bool dedicated) {
+ if (dedicated) {
+ // dedicated thrad to execute this context.
+ }
+ current_request->process_msg.release()->complete(reply);
+ delete current_request;
+ delete reply;
+ });
+
+ if (m_worker_thread_num != 0) {
+ m_worker->post([process_reply]() {
+ process_reply->complete(true);
+ });
+ } else {
+ process_reply->complete(false);
+ }
+ }
+
+ // if there is one request fails, just execute fault, then shutdown RO.
+ void CacheClient::fault(const int err_type,
+ const boost::system::error_code& ec) {
+ ldout(m_cct, 20) << "fault." << ec.message() << dendl;
+
+ if (err_type == ASIO_ERROR_CONNECT) {
+ ceph_assert(!m_session_work.load());
+ if (ec == boost::asio::error::connection_refused) {
+ ldout(m_cct, 20) << "Connecting RO daenmon fails : "<< ec.message()
+ << ". Immutable-object-cache daemon is down ? "
+ << "Data will be read from ceph cluster " << dendl;
+ } else {
+ ldout(m_cct, 20) << "Connecting RO daemon fails : "
+ << ec.message() << dendl;
+ }
+
+ if (m_dm_socket.is_open()) {
+ // Set to indicate what error occurred, if any.
+ // Note that, even if the function indicates an error,
+ // the underlying descriptor is closed.
+ boost::system::error_code close_ec;
+ m_dm_socket.close(close_ec);
+ if (close_ec) {
+ ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
+ }
+ }
+ return;
+ }
+
+ if (!m_session_work.load()) {
+ return;
+ }
+
+ /* when current session don't work, ASIO will don't receive any new request from hook.
+ * On the other hand, for pending request of ASIO, cancle these request,
+ * then call their callback. these request which are cancled by this method,
+ * will be re-dispatched to RADOS layer.
+ * make sure just have one thread to modify execute below code. */
+ m_session_work.store(false);
+
+ if (err_type == ASIO_ERROR_MSG_INCOMPLETE) {
+ ldout(m_cct, 20) << "ASIO In-complete message." << ec.message() << dendl;
+ ceph_assert(0);
+ }
+
+ if (err_type == ASIO_ERROR_READ) {
+ ldout(m_cct, 20) << "ASIO async read fails : " << ec.message() << dendl;
+ }
+
+ if (err_type == ASIO_ERROR_WRITE) {
+ ldout(m_cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
+ // CacheClient should not occur this error.
+ ceph_assert(0);
+ }
+
+ // currently, for any asio error, just shutdown RO.
+ close();
+
+ /* all pending request, which have entered into ASIO,
+ * will be re-dispatched to RADOS.*/
+ {
+ std::lock_guard locker{m_lock};
+ for (auto it : m_seq_to_req) {
+ it.second->type = RBDSC_READ_RADOS;
+ it.second->process_msg->complete(it.second);
+ }
+ m_seq_to_req.clear();
+ }
+
+ ldout(m_cct, 20) << "Because ASIO domain socket fails, just shutdown RO.\
+ Later all reading will be re-dispatched RADOS layer"
+ << ec.message() << dendl;
+ }
+
+ // TODO : re-implement this method
+ int CacheClient::register_client(Context* on_finish) {
+ ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER,
+ m_sequence_id++,
+ ceph_version_to_str());
+ reg_req->encode();
+
+ bufferlist bl;
+ bl.append(reg_req->get_payload_bufferlist());
+
+ uint64_t ret;
+ boost::system::error_code ec;
+
+ ret = boost::asio::write(m_dm_socket,
+ boost::asio::buffer(bl.c_str(), bl.length()), ec);
+
+ if (ec || ret != bl.length()) {
+ fault(ASIO_ERROR_WRITE, ec);
+ return -1;
+ }
+ delete reg_req;
+
+ ret = boost::asio::read(m_dm_socket,
+ boost::asio::buffer(m_bp_header.c_str(), get_header_size()), ec);
+ if (ec || ret != get_header_size()) {
+ fault(ASIO_ERROR_READ, ec);
+ return -1;
+ }
+
+ uint64_t data_len = get_data_len(m_bp_header.c_str());
+ bufferptr bp_data(buffer::create(data_len));
+
+ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(),
+ data_len), ec);
+ if (ec || ret != data_len) {
+ fault(ASIO_ERROR_READ, ec);
+ return -1;
+ }
+
+ bufferlist data_buffer;
+ data_buffer.append(m_bp_header);
+ data_buffer.append(std::move(bp_data));
+ ObjectCacheRequest* req = decode_object_cache_request(data_buffer);
+ if (req->type == RBDSC_REGISTER_REPLY) {
+ m_session_work.store(true);
+ on_finish->complete(0);
+ } else {
+ on_finish->complete(-1);
+ }
+
+ delete req;
+ return 0;
+ }
+
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/CacheClient.h b/src/tools/immutable_object_cache/CacheClient.h
new file mode 100644
index 000000000..b2f749631
--- /dev/null
+++ b/src/tools/immutable_object_cache/CacheClient.h
@@ -0,0 +1,84 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_CACHE_CLIENT_H
+#define CEPH_CACHE_CACHE_CLIENT_H
+
+#include <atomic>
+#include <boost/asio.hpp>
+#include <boost/asio/error.hpp>
+#include <boost/algorithm/string.hpp>
+
+#include "include/ceph_assert.h"
+#include "common/ceph_mutex.h"
+#include "include/Context.h"
+#include "Types.h"
+#include "SocketCommon.h"
+
+
+using boost::asio::local::stream_protocol;
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+class CacheClient {
+ public:
+ CacheClient(const std::string& file, CephContext* ceph_ctx);
+ ~CacheClient();
+ void run();
+ bool is_session_work();
+ void close();
+ int stop();
+ int connect();
+ void connect(Context* on_finish);
+ void lookup_object(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, uint64_t object_size, std::string oid,
+ CacheGenContextURef&& on_finish);
+ int register_client(Context* on_finish);
+
+ private:
+ void send_message();
+ void try_send();
+ void fault(const int err_type, const boost::system::error_code& err);
+ void handle_connect(Context* on_finish, const boost::system::error_code& err);
+ void try_receive();
+ void receive_message();
+ void process(ObjectCacheRequest* reply, uint64_t seq_id);
+ void read_reply_header();
+ void handle_reply_header(bufferptr bp_head,
+ const boost::system::error_code& ec,
+ size_t bytes_transferred);
+ void read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data,
+ const uint64_t data_len);
+ void handle_reply_data(bufferptr bp_head, bufferptr bp_data,
+ const uint64_t data_len,
+ const boost::system::error_code& ec,
+ size_t bytes_transferred);
+
+ private:
+ CephContext* m_cct;
+ boost::asio::io_service m_io_service;
+ boost::asio::io_service::work m_io_service_work;
+ stream_protocol::socket m_dm_socket;
+ stream_protocol::endpoint m_ep;
+ std::shared_ptr<std::thread> m_io_thread;
+ std::atomic<bool> m_session_work;
+
+ uint64_t m_worker_thread_num;
+ boost::asio::io_service* m_worker;
+ std::vector<std::thread*> m_worker_threads;
+ boost::asio::io_service::work* m_worker_io_service_work;
+
+ std::atomic<bool> m_writing;
+ std::atomic<bool> m_reading;
+ std::atomic<uint64_t> m_sequence_id;
+ ceph::mutex m_lock =
+ ceph::make_mutex("ceph::cache::cacheclient::m_lock");
+ std::map<uint64_t, ObjectCacheRequest*> m_seq_to_req;
+ bufferlist m_outcoming_bl;
+ bufferptr m_bp_header;
+};
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+#endif // CEPH_CACHE_CACHE_CLIENT_H
diff --git a/src/tools/immutable_object_cache/CacheController.cc b/src/tools/immutable_object_cache/CacheController.cc
new file mode 100644
index 000000000..ae1636839
--- /dev/null
+++ b/src/tools/immutable_object_cache/CacheController.cc
@@ -0,0 +1,139 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "CacheController.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::CacheController: " << this << " " \
+ << __func__ << ": "
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+CacheController::CacheController(CephContext *cct,
+ const std::vector<const char*> &args):
+ m_args(args), m_cct(cct) {
+ ldout(m_cct, 20) << dendl;
+}
+
+CacheController::~CacheController() {
+ delete m_cache_server;
+ delete m_object_cache_store;
+}
+
+int CacheController::init() {
+ ldout(m_cct, 20) << dendl;
+ m_object_cache_store = new ObjectCacheStore(m_cct);
+ // TODO(dehao): make this configurable
+ int r = m_object_cache_store->init(true);
+ if (r < 0) {
+ lderr(m_cct) << "init error\n" << dendl;
+ return r;
+ }
+
+ r = m_object_cache_store->init_cache();
+ if (r < 0) {
+ lderr(m_cct) << "init error\n" << dendl;
+ }
+
+ return r;
+}
+
+int CacheController::shutdown() {
+ ldout(m_cct, 20) << dendl;
+
+ int r;
+ if (m_cache_server != nullptr) {
+ r = m_cache_server->stop();
+ if (r < 0) {
+ lderr(m_cct) << "stop error\n" << dendl;
+ return r;
+ }
+ }
+
+ r = m_object_cache_store->shutdown();
+ if (r < 0) {
+ lderr(m_cct) << "stop error\n" << dendl;
+ return r;
+ }
+
+ return r;
+}
+
+void CacheController::handle_signal(int signum) {
+ shutdown();
+}
+
+int CacheController::run() {
+ try {
+ std::string controller_path =
+ m_cct->_conf.get_val<std::string>("immutable_object_cache_sock");
+ if (controller_path.empty()) {
+ lderr(m_cct) << "'immutable_object_cache_sock' path not set" << dendl;
+ return -EINVAL;
+ }
+
+ std::remove(controller_path.c_str());
+
+ m_cache_server = new CacheServer(m_cct, controller_path,
+ std::bind(&CacheController::handle_request, this,
+ std::placeholders::_1, std::placeholders::_2));
+
+ int ret = m_cache_server->run();
+ if (ret != 0) {
+ return ret;
+ }
+
+ return 0;
+ } catch (std::exception& e) {
+ lderr(m_cct) << "Exception: " << e.what() << dendl;
+ return -EFAULT;
+ }
+}
+
+void CacheController::handle_request(CacheSession* session,
+ ObjectCacheRequest* req) {
+ ldout(m_cct, 20) << dendl;
+
+ switch (req->get_request_type()) {
+ case RBDSC_REGISTER: {
+ // TODO(dehao): skip register and allow clients to lookup directly
+
+ auto req_reg_data = reinterpret_cast <ObjectCacheRegData*> (req);
+ session->set_client_version(req_reg_data->version);
+
+ ObjectCacheRequest* reply = new ObjectCacheRegReplyData(
+ RBDSC_REGISTER_REPLY, req->seq);
+ session->send(reply);
+ break;
+ }
+ case RBDSC_READ: {
+ // lookup object in local cache store
+ std::string cache_path;
+ ObjectCacheReadData* req_read_data =
+ reinterpret_cast <ObjectCacheReadData*> (req);
+ bool return_dne_path = session->client_version().empty();
+ int ret = m_object_cache_store->lookup_object(
+ req_read_data->pool_namespace, req_read_data->pool_id,
+ req_read_data->snap_id, req_read_data->object_size,
+ req_read_data->oid, return_dne_path, cache_path);
+ ObjectCacheRequest* reply = nullptr;
+ if (ret != OBJ_CACHE_PROMOTED && ret != OBJ_CACHE_DNE) {
+ reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
+ } else {
+ reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
+ req->seq, cache_path);
+ }
+ session->send(reply);
+ break;
+ }
+ default:
+ ldout(m_cct, 5) << "can't recongize request" << dendl;
+ ceph_assert(0);
+ }
+}
+
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/CacheController.h b/src/tools/immutable_object_cache/CacheController.h
new file mode 100644
index 000000000..f70f6bb1c
--- /dev/null
+++ b/src/tools/immutable_object_cache/CacheController.h
@@ -0,0 +1,40 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_CACHE_CONTROLLER_H
+#define CEPH_CACHE_CACHE_CONTROLLER_H
+
+#include "common/ceph_context.h"
+#include "common/WorkQueue.h"
+#include "CacheServer.h"
+#include "ObjectCacheStore.h"
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+class CacheController {
+ public:
+ CacheController(CephContext *cct, const std::vector<const char*> &args);
+ ~CacheController();
+
+ int init();
+
+ int shutdown();
+
+ void handle_signal(int sinnum);
+
+ int run();
+
+ void handle_request(CacheSession* session, ObjectCacheRequest* msg);
+
+ private:
+ CacheServer *m_cache_server = nullptr;
+ std::vector<const char*> m_args;
+ CephContext *m_cct;
+ ObjectCacheStore *m_object_cache_store = nullptr;
+};
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+
+#endif // CEPH_CACHE_CACHE_CONTROLLER_H
diff --git a/src/tools/immutable_object_cache/CacheServer.cc b/src/tools/immutable_object_cache/CacheServer.cc
new file mode 100644
index 000000000..e94a47c7a
--- /dev/null
+++ b/src/tools/immutable_object_cache/CacheServer.cc
@@ -0,0 +1,106 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/bind/bind.hpp>
+#include "common/debug.h"
+#include "common/ceph_context.h"
+#include "CacheServer.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::CacheServer: " << this << " " \
+ << __func__ << ": "
+
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+CacheServer::CacheServer(CephContext* cct, const std::string& file,
+ ProcessMsg processmsg)
+ : cct(cct), m_server_process_msg(processmsg),
+ m_local_path(file), m_acceptor(m_io_service) {}
+
+CacheServer::~CacheServer() {
+ stop();
+}
+
+int CacheServer::run() {
+ ldout(cct, 20) << dendl;
+
+ int ret = start_accept();
+ if (ret != 0) {
+ return ret;
+ }
+
+ boost::system::error_code ec;
+ ret = m_io_service.run(ec);
+ if (ec) {
+ ldout(cct, 1) << "m_io_service run fails: " << ec.message() << dendl;
+ return -1;
+ }
+ return 0;
+}
+
+int CacheServer::stop() {
+ m_io_service.stop();
+ return 0;
+}
+
+int CacheServer::start_accept() {
+ ldout(cct, 20) << dendl;
+
+ boost::system::error_code ec;
+ m_acceptor.open(m_local_path.protocol(), ec);
+ if (ec) {
+ lderr(cct) << "failed to open domain socket: " << ec.message() << dendl;
+ return -ec.value();
+ }
+
+ m_acceptor.bind(m_local_path, ec);
+ if (ec) {
+ lderr(cct) << "failed to bind to domain socket '"
+ << m_local_path << "': " << ec.message() << dendl;
+ return -ec.value();
+ }
+
+ m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
+ if (ec) {
+ lderr(cct) << "failed to listen on domain socket: " << ec.message()
+ << dendl;
+ return -ec.value();
+ }
+
+ accept();
+ return 0;
+}
+
+void CacheServer::accept() {
+ CacheSessionPtr new_session = nullptr;
+
+ new_session.reset(new CacheSession(m_io_service,
+ m_server_process_msg, cct));
+
+ m_acceptor.async_accept(new_session->socket(),
+ boost::bind(&CacheServer::handle_accept, this, new_session,
+ boost::asio::placeholders::error));
+}
+
+void CacheServer::handle_accept(CacheSessionPtr new_session,
+ const boost::system::error_code& error) {
+ ldout(cct, 20) << dendl;
+ if (error) {
+ // operation_absort
+ lderr(cct) << "async accept fails : " << error.message() << dendl;
+ return;
+ }
+
+ // TODO(dehao) : session setting
+ new_session->start();
+
+ // lanuch next accept
+ accept();
+}
+
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/CacheServer.h b/src/tools/immutable_object_cache/CacheServer.h
new file mode 100644
index 000000000..31d859934
--- /dev/null
+++ b/src/tools/immutable_object_cache/CacheServer.h
@@ -0,0 +1,45 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_CACHE_SERVER_H
+#define CEPH_CACHE_CACHE_SERVER_H
+
+#include <boost/asio.hpp>
+#include <boost/asio/error.hpp>
+
+#include "Types.h"
+#include "SocketCommon.h"
+#include "CacheSession.h"
+
+
+using boost::asio::local::stream_protocol;
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+class CacheServer {
+ public:
+ CacheServer(CephContext* cct, const std::string& file, ProcessMsg processmsg);
+ ~CacheServer();
+
+ int run();
+ int start_accept();
+ int stop();
+
+ private:
+ void accept();
+ void handle_accept(CacheSessionPtr new_session,
+ const boost::system::error_code& error);
+
+ private:
+ CephContext* cct;
+ boost::asio::io_service m_io_service;
+ ProcessMsg m_server_process_msg;
+ stream_protocol::endpoint m_local_path;
+ stream_protocol::acceptor m_acceptor;
+};
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+
+#endif
diff --git a/src/tools/immutable_object_cache/CacheSession.cc b/src/tools/immutable_object_cache/CacheSession.cc
new file mode 100644
index 000000000..38c38c97d
--- /dev/null
+++ b/src/tools/immutable_object_cache/CacheSession.cc
@@ -0,0 +1,140 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/bind/bind.hpp>
+#include "common/debug.h"
+#include "common/ceph_context.h"
+#include "CacheSession.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \
+ << __func__ << ": "
+
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+CacheSession::CacheSession(io_service& io_service,
+ ProcessMsg processmsg,
+ CephContext* cct)
+ : m_dm_socket(io_service),
+ m_server_process_msg(processmsg), m_cct(cct) {
+ m_bp_header = buffer::create(get_header_size());
+}
+
+CacheSession::~CacheSession() {
+ close();
+}
+
+stream_protocol::socket& CacheSession::socket() {
+ return m_dm_socket;
+}
+
+void CacheSession::set_client_version(const std::string &version) {
+ m_client_version = version;
+}
+
+const std::string &CacheSession::client_version() const {
+ return m_client_version;
+}
+
+void CacheSession::close() {
+ if (m_dm_socket.is_open()) {
+ boost::system::error_code close_ec;
+ m_dm_socket.close(close_ec);
+ if (close_ec) {
+ ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
+ }
+ }
+}
+
+void CacheSession::start() {
+ read_request_header();
+}
+
+void CacheSession::read_request_header() {
+ ldout(m_cct, 20) << dendl;
+ boost::asio::async_read(m_dm_socket,
+ boost::asio::buffer(m_bp_header.c_str(), get_header_size()),
+ boost::asio::transfer_exactly(get_header_size()),
+ boost::bind(&CacheSession::handle_request_header,
+ shared_from_this(), boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+}
+
+void CacheSession::handle_request_header(const boost::system::error_code& err,
+ size_t bytes_transferred) {
+ ldout(m_cct, 20) << dendl;
+ if (err || bytes_transferred != get_header_size()) {
+ fault(err);
+ return;
+ }
+
+ read_request_data(get_data_len(m_bp_header.c_str()));
+}
+
+void CacheSession::read_request_data(uint64_t data_len) {
+ ldout(m_cct, 20) << dendl;
+ bufferptr bp_data(buffer::create(data_len));
+ boost::asio::async_read(m_dm_socket,
+ boost::asio::buffer(bp_data.c_str(), bp_data.length()),
+ boost::asio::transfer_exactly(data_len),
+ boost::bind(&CacheSession::handle_request_data,
+ shared_from_this(), bp_data, data_len,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+}
+
+void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
+ const boost::system::error_code& err,
+ size_t bytes_transferred) {
+ ldout(m_cct, 20) << dendl;
+ if (err || bytes_transferred != data_len) {
+ fault(err);
+ return;
+ }
+
+ bufferlist bl_data;
+
+ bl_data.append(m_bp_header);
+ bl_data.append(std::move(bp));
+
+ ObjectCacheRequest* req = decode_object_cache_request(bl_data);
+
+ process(req);
+ delete req;
+ read_request_header();
+}
+
+void CacheSession::process(ObjectCacheRequest* req) {
+ ldout(m_cct, 20) << dendl;
+ m_server_process_msg(this, req);
+}
+
+void CacheSession::send(ObjectCacheRequest* reply) {
+ ldout(m_cct, 20) << dendl;
+ bufferlist bl;
+ reply->encode();
+ bl.append(reply->get_payload_bufferlist());
+
+ boost::asio::async_write(m_dm_socket,
+ boost::asio::buffer(bl.c_str(), bl.length()),
+ boost::asio::transfer_exactly(bl.length()),
+ [this, bl, reply](const boost::system::error_code& err,
+ size_t bytes_transferred) {
+ delete reply;
+ if (err || bytes_transferred != bl.length()) {
+ fault(err);
+ return;
+ }
+ });
+}
+
+void CacheSession::fault(const boost::system::error_code& ec) {
+ ldout(m_cct, 20) << "session fault : " << ec.message() << dendl;
+}
+
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/CacheSession.h b/src/tools/immutable_object_cache/CacheSession.h
new file mode 100644
index 000000000..0826e8a2b
--- /dev/null
+++ b/src/tools/immutable_object_cache/CacheSession.h
@@ -0,0 +1,56 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_SESSION_H
+#define CEPH_CACHE_SESSION_H
+
+#include <boost/asio.hpp>
+#include <boost/asio/error.hpp>
+
+#include "Types.h"
+#include "SocketCommon.h"
+
+using boost::asio::local::stream_protocol;
+using boost::asio::io_service;
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+class CacheSession : public std::enable_shared_from_this<CacheSession> {
+ public:
+ CacheSession(io_service& io_service, ProcessMsg process_msg,
+ CephContext* ctx);
+ ~CacheSession();
+ stream_protocol::socket& socket();
+ void close();
+ void start();
+ void read_request_header();
+ void handle_request_header(const boost::system::error_code& err,
+ size_t bytes_transferred);
+ void read_request_data(uint64_t data_len);
+ void handle_request_data(bufferptr bp, uint64_t data_len,
+ const boost::system::error_code& err,
+ size_t bytes_transferred);
+ void process(ObjectCacheRequest* req);
+ void fault(const boost::system::error_code& ec);
+ void send(ObjectCacheRequest* msg);
+
+ void set_client_version(const std::string &version);
+ const std::string &client_version() const;
+
+ private:
+ stream_protocol::socket m_dm_socket;
+ ProcessMsg m_server_process_msg;
+ CephContext* m_cct;
+
+ std::string m_client_version;
+
+ bufferptr m_bp_header;
+};
+
+typedef std::shared_ptr<CacheSession> CacheSessionPtr;
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+
+#endif // CEPH_CACHE_SESSION_H
diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.cc b/src/tools/immutable_object_cache/ObjectCacheStore.cc
new file mode 100644
index 000000000..1b1eef1e3
--- /dev/null
+++ b/src/tools/immutable_object_cache/ObjectCacheStore.cc
@@ -0,0 +1,466 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ObjectCacheStore.h"
+#include "Utils.h"
+#if __has_include(<filesystem>)
+#include <filesystem>
+namespace fs = std::filesystem;
+#else
+#include <experimental/filesystem>
+namespace fs = std::experimental::filesystem;
+#endif
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \
+ << __func__ << ": "
+
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+namespace {
+
+class SafeTimerSingleton : public CommonSafeTimer<ceph::mutex> {
+public:
+ ceph::mutex lock = ceph::make_mutex
+ ("ceph::immutable_object_cache::SafeTimerSingleton::lock");
+
+ explicit SafeTimerSingleton(CephContext *cct)
+ : CommonSafeTimer(cct, lock, true) {
+ init();
+ }
+ ~SafeTimerSingleton() {
+ std::lock_guard locker{lock};
+ shutdown();
+ }
+};
+
+} // anonymous namespace
+
+enum ThrottleTargetCode {
+ ROC_QOS_IOPS_THROTTLE = 1,
+ ROC_QOS_BPS_THROTTLE = 2
+};
+
+ObjectCacheStore::ObjectCacheStore(CephContext *cct)
+ : m_cct(cct), m_rados(new librados::Rados()) {
+
+ m_cache_root_dir =
+ m_cct->_conf.get_val<std::string>("immutable_object_cache_path");
+
+ if (m_cache_root_dir.back() != '/') {
+ m_cache_root_dir += "/";
+ }
+
+ uint64_t cache_max_size =
+ m_cct->_conf.get_val<Option::size_t>("immutable_object_cache_max_size");
+
+ double cache_watermark =
+ m_cct->_conf.get_val<double>("immutable_object_cache_watermark");
+
+ uint64_t max_inflight_ops =
+ m_cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops");
+
+ uint64_t limit = 0;
+ if ((limit = m_cct->_conf.get_val<uint64_t>
+ ("immutable_object_cache_qos_iops_limit")) != 0) {
+ apply_qos_tick_and_limit(ROC_QOS_IOPS_THROTTLE,
+ m_cct->_conf.get_val<std::chrono::milliseconds>
+ ("immutable_object_cache_qos_schedule_tick_min"),
+ limit,
+ m_cct->_conf.get_val<uint64_t>
+ ("immutable_object_cache_qos_iops_burst"),
+ m_cct->_conf.get_val<std::chrono::seconds>
+ ("immutable_object_cache_qos_iops_burst_seconds"));
+ }
+ if ((limit = m_cct->_conf.get_val<uint64_t>
+ ("immutable_object_cache_qos_bps_limit")) != 0) {
+ apply_qos_tick_and_limit(ROC_QOS_BPS_THROTTLE,
+ m_cct->_conf.get_val<std::chrono::milliseconds>
+ ("immutable_object_cache_qos_schedule_tick_min"),
+ limit,
+ m_cct->_conf.get_val<uint64_t>
+ ("immutable_object_cache_qos_bps_burst"),
+ m_cct->_conf.get_val<std::chrono::seconds>
+ ("immutable_object_cache_qos_bps_burst_seconds"));
+ }
+
+ if ((cache_watermark <= 0) || (cache_watermark > 1)) {
+ lderr(m_cct) << "Invalid water mark provided, set it to default." << dendl;
+ cache_watermark = 0.9;
+ }
+ m_policy = new SimplePolicy(m_cct, cache_max_size, max_inflight_ops,
+ cache_watermark);
+}
+
+ObjectCacheStore::~ObjectCacheStore() {
+ delete m_policy;
+ if (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE) {
+ ceph_assert(m_throttles[ROC_QOS_IOPS_THROTTLE] != nullptr);
+ delete m_throttles[ROC_QOS_IOPS_THROTTLE];
+ }
+ if (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE) {
+ ceph_assert(m_throttles[ROC_QOS_BPS_THROTTLE] != nullptr);
+ delete m_throttles[ROC_QOS_BPS_THROTTLE];
+ }
+}
+
+int ObjectCacheStore::init(bool reset) {
+ ldout(m_cct, 20) << dendl;
+
+ int ret = m_rados->init_with_context(m_cct);
+ if (ret < 0) {
+ lderr(m_cct) << "fail to init Ceph context" << dendl;
+ return ret;
+ }
+
+ ret = m_rados->connect();
+ if (ret < 0) {
+ lderr(m_cct) << "fail to connect to cluster" << dendl;
+ return ret;
+ }
+
+ // TODO(dehao): fsck and reuse existing cache objects
+ if (reset) {
+ try {
+ if (fs::exists(m_cache_root_dir)) {
+ // remove all sub folders
+ for (auto& p : fs::directory_iterator(m_cache_root_dir)) {
+ fs::remove_all(p.path());
+ }
+ } else {
+ fs::create_directories(m_cache_root_dir);
+ }
+ } catch (const fs::filesystem_error& e) {
+ lderr(m_cct) << "failed to initialize cache store directory: "
+ << e.what() << dendl;
+ return -e.code().value();
+ }
+ }
+ return 0;
+}
+
+int ObjectCacheStore::shutdown() {
+ ldout(m_cct, 20) << dendl;
+
+ m_rados->shutdown();
+ return 0;
+}
+
+int ObjectCacheStore::init_cache() {
+ ldout(m_cct, 20) << dendl;
+ std::string cache_dir = m_cache_root_dir;
+
+ return 0;
+}
+
+int ObjectCacheStore::do_promote(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string object_name) {
+ ldout(m_cct, 20) << "to promote object: " << object_name
+ << " from pool id: " << pool_id
+ << " namespace: " << pool_nspace
+ << " snapshot: " << snap_id << dendl;
+
+ int ret = 0;
+ std::string cache_file_name =
+ get_cache_file_name(pool_nspace, pool_id, snap_id, object_name);
+ librados::IoCtx ioctx;
+ {
+ std::lock_guard _locker{m_ioctx_map_lock};
+ if (m_ioctx_map.find(pool_id) == m_ioctx_map.end()) {
+ ret = m_rados->ioctx_create2(pool_id, ioctx);
+ if (ret < 0) {
+ lderr(m_cct) << "fail to create ioctx" << dendl;
+ return ret;
+ }
+ m_ioctx_map.emplace(pool_id, ioctx);
+ } else {
+ ioctx = m_ioctx_map[pool_id];
+ }
+ }
+
+ ioctx.set_namespace(pool_nspace);
+ ioctx.snap_set_read(snap_id);
+
+ librados::bufferlist* read_buf = new librados::bufferlist();
+
+ auto ctx = new LambdaContext([this, read_buf, cache_file_name](int ret) {
+ handle_promote_callback(ret, read_buf, cache_file_name);
+ });
+
+ return promote_object(&ioctx, object_name, read_buf, ctx);
+}
+
+int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf,
+ std::string cache_file_name) {
+ ldout(m_cct, 20) << " cache_file_name: " << cache_file_name << dendl;
+
+ // rados read error
+ if (ret != -ENOENT && ret < 0) {
+ lderr(m_cct) << "fail to read from rados" << dendl;
+
+ m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
+ delete read_buf;
+ return ret;
+ }
+
+ auto state = OBJ_CACHE_PROMOTED;
+ if (ret == -ENOENT) {
+ // object is empty
+ state = OBJ_CACHE_DNE;
+ ret = 0;
+ } else {
+ std::string cache_file_path = get_cache_file_path(cache_file_name, true);
+ if (cache_file_path == "") {
+ lderr(m_cct) << "fail to write cache file" << dendl;
+ m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
+ delete read_buf;
+ return -ENOSPC;
+ }
+
+ ret = read_buf->write_file(cache_file_path.c_str());
+ if (ret < 0) {
+ lderr(m_cct) << "fail to write cache file" << dendl;
+
+ m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
+ delete read_buf;
+ return ret;
+ }
+ }
+
+ // update metadata
+ ceph_assert(OBJ_CACHE_SKIP == m_policy->get_status(cache_file_name));
+ m_policy->update_status(cache_file_name, state, read_buf->length());
+ ceph_assert(state == m_policy->get_status(cache_file_name));
+
+ delete read_buf;
+
+ evict_objects();
+
+ return ret;
+}
+
+int ObjectCacheStore::lookup_object(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, uint64_t object_size,
+ std::string object_name,
+ bool return_dne_path,
+ std::string& target_cache_file_path) {
+ ldout(m_cct, 20) << "object name = " << object_name
+ << " in pool ID : " << pool_id << dendl;
+
+ int pret = -1;
+ std::string cache_file_name =
+ get_cache_file_name(pool_nspace, pool_id, snap_id, object_name);
+
+ cache_status_t ret = m_policy->lookup_object(cache_file_name);
+
+ switch (ret) {
+ case OBJ_CACHE_NONE: {
+ if (take_token_from_throttle(object_size, 1)) {
+ pret = do_promote(pool_nspace, pool_id, snap_id, object_name);
+ if (pret < 0) {
+ lderr(m_cct) << "fail to start promote" << dendl;
+ }
+ } else {
+ m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
+ }
+ return ret;
+ }
+ case OBJ_CACHE_PROMOTED:
+ target_cache_file_path = get_cache_file_path(cache_file_name);
+ return ret;
+ case OBJ_CACHE_DNE:
+ if (return_dne_path) {
+ target_cache_file_path = get_cache_file_path(cache_file_name);
+ }
+ return ret;
+ case OBJ_CACHE_SKIP:
+ return ret;
+ default:
+ lderr(m_cct) << "unrecognized object cache status" << dendl;
+ ceph_assert(0);
+ }
+}
+
+int ObjectCacheStore::promote_object(librados::IoCtx* ioctx,
+ std::string object_name,
+ librados::bufferlist* read_buf,
+ Context* on_finish) {
+ ldout(m_cct, 20) << "object name = " << object_name << dendl;
+
+ librados::AioCompletion* read_completion = create_rados_callback(on_finish);
+ // issue a zero-sized read req to get the entire obj
+ int ret = ioctx->aio_read(object_name, read_completion, read_buf, 0, 0);
+ if (ret < 0) {
+ lderr(m_cct) << "failed to read from rados" << dendl;
+ }
+ read_completion->release();
+
+ return ret;
+}
+
+int ObjectCacheStore::evict_objects() {
+ ldout(m_cct, 20) << dendl;
+
+ std::list<std::string> obj_list;
+ m_policy->get_evict_list(&obj_list);
+ for (auto& obj : obj_list) {
+ do_evict(obj);
+ }
+ return 0;
+}
+
+int ObjectCacheStore::do_evict(std::string cache_file) {
+ ldout(m_cct, 20) << "file = " << cache_file << dendl;
+
+ if (cache_file == "") {
+ return 0;
+ }
+
+ std::string cache_file_path = get_cache_file_path(cache_file);
+
+ ldout(m_cct, 20) << "evict cache: " << cache_file_path << dendl;
+
+ // TODO(dehao): possible race on read?
+ int ret = std::remove(cache_file_path.c_str());
+ // evict metadata
+ if (ret == 0) {
+ m_policy->update_status(cache_file, OBJ_CACHE_SKIP);
+ m_policy->evict_entry(cache_file);
+ }
+
+ return ret;
+}
+
+std::string ObjectCacheStore::get_cache_file_name(std::string pool_nspace,
+ uint64_t pool_id,
+ uint64_t snap_id,
+ std::string oid) {
+ return pool_nspace + ":" + std::to_string(pool_id) + ":" +
+ std::to_string(snap_id) + ":" + oid;
+}
+
+std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name,
+ bool mkdir) {
+ ldout(m_cct, 20) << cache_file_name <<dendl;
+
+ uint32_t crc = 0;
+ crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(),
+ cache_file_name.length());
+
+ std::string cache_file_dir = std::to_string(crc % 100) + "/";
+
+ if (mkdir) {
+ ldout(m_cct, 20) << "creating cache dir: " << cache_file_dir <<dendl;
+ std::error_code ec;
+ std::string new_dir = m_cache_root_dir + cache_file_dir;
+ if (fs::exists(new_dir, ec)) {
+ ldout(m_cct, 20) << "cache dir exists: " << cache_file_dir <<dendl;
+ return new_dir + cache_file_name;
+ }
+
+ if (!fs::create_directories(new_dir, ec)) {
+ ldout(m_cct, 5) << "fail to create cache dir: " << new_dir
+ << "error: " << ec.message() << dendl;
+ return "";
+ }
+ }
+
+ return m_cache_root_dir + cache_file_dir + cache_file_name;
+}
+
+void ObjectCacheStore::handle_throttle_ready(uint64_t tokens, uint64_t type) {
+ m_io_throttled = false;
+ std::lock_guard lock(m_throttle_lock);
+ if (type & ROC_QOS_IOPS_THROTTLE){
+ m_iops_tokens += tokens;
+ } else if (type & ROC_QOS_BPS_THROTTLE){
+ m_bps_tokens += tokens;
+ } else {
+ lderr(m_cct) << "unknow throttle type." << dendl;
+ }
+}
+
+bool ObjectCacheStore::take_token_from_throttle(uint64_t object_size,
+ uint64_t object_num) {
+ if (m_io_throttled == true) {
+ return false;
+ }
+
+ int flag = 0;
+ bool wait = false;
+ if (!wait && (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE)) {
+ std::lock_guard lock(m_throttle_lock);
+ if (object_num > m_iops_tokens) {
+ wait = m_throttles[ROC_QOS_IOPS_THROTTLE]->get(object_num, this,
+ &ObjectCacheStore::handle_throttle_ready, object_num,
+ ROC_QOS_IOPS_THROTTLE);
+ } else {
+ m_iops_tokens -= object_num;
+ flag = 1;
+ }
+ }
+ if (!wait && (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE)) {
+ std::lock_guard lock(m_throttle_lock);
+ if (object_size > m_bps_tokens) {
+ wait = m_throttles[ROC_QOS_BPS_THROTTLE]->get(object_size, this,
+ &ObjectCacheStore::handle_throttle_ready, object_size,
+ ROC_QOS_BPS_THROTTLE);
+ } else {
+ m_bps_tokens -= object_size;
+ }
+ }
+
+ if (wait) {
+ m_io_throttled = true;
+ // when passing iops throttle, but limit in bps throttle, recovery
+ if (flag == 1) {
+ std::lock_guard lock(m_throttle_lock);
+ m_iops_tokens += object_num;
+ }
+ }
+
+ return !wait;
+}
+
+static const std::map<uint64_t, std::string> THROTTLE_FLAGS = {
+ { ROC_QOS_IOPS_THROTTLE, "roc_qos_iops_throttle" },
+ { ROC_QOS_BPS_THROTTLE, "roc_qos_bps_throttle" }
+};
+
+void ObjectCacheStore::apply_qos_tick_and_limit(
+ const uint64_t flag,
+ std::chrono::milliseconds min_tick,
+ uint64_t limit,
+ uint64_t burst,
+ std::chrono::seconds burst_seconds) {
+ SafeTimerSingleton* safe_timer_singleton = nullptr;
+ TokenBucketThrottle* throttle = nullptr;
+ safe_timer_singleton =
+ &m_cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
+ "tools::immutable_object_cache", false, m_cct);
+ SafeTimer* timer = safe_timer_singleton;
+ ceph::mutex* timer_lock = &safe_timer_singleton->lock;
+ m_qos_enabled_flag |= flag;
+ auto throttle_flags_it = THROTTLE_FLAGS.find(flag);
+ ceph_assert(throttle_flags_it != THROTTLE_FLAGS.end());
+ throttle = new TokenBucketThrottle(m_cct, throttle_flags_it->second,
+ 0, 0, timer, timer_lock);
+ throttle->set_schedule_tick_min(min_tick.count());
+ int ret = throttle->set_limit(limit, burst, burst_seconds.count());
+ if (ret < 0) {
+ lderr(m_cct) << throttle->get_name() << ": invalid qos parameter: "
+ << "burst(" << burst << ") is less than "
+ << "limit(" << limit << ")" << dendl;
+ throttle->set_limit(limit, 0, 1);
+ }
+
+ ceph_assert(m_throttles.find(flag) == m_throttles.end());
+ m_throttles.insert({flag, throttle});
+}
+
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.h b/src/tools/immutable_object_cache/ObjectCacheStore.h
new file mode 100644
index 000000000..51e5a77b8
--- /dev/null
+++ b/src/tools/immutable_object_cache/ObjectCacheStore.h
@@ -0,0 +1,85 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_OBJECT_CACHE_STORE_H
+#define CEPH_CACHE_OBJECT_CACHE_STORE_H
+
+#include "common/ceph_context.h"
+#include "common/ceph_mutex.h"
+#include "common/Timer.h"
+#include "common/Throttle.h"
+#include "common/Cond.h"
+#include "include/rados/librados.hpp"
+
+#include "SimplePolicy.h"
+
+
+using librados::Rados;
+using librados::IoCtx;
+class Context;
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+typedef shared_ptr<librados::Rados> RadosRef;
+typedef shared_ptr<librados::IoCtx> IoCtxRef;
+
+class ObjectCacheStore {
+ public:
+ ObjectCacheStore(CephContext *cct);
+ ~ObjectCacheStore();
+ int init(bool reset);
+ int shutdown();
+ int init_cache();
+ int lookup_object(std::string pool_nspace,
+ uint64_t pool_id, uint64_t snap_id,
+ uint64_t object_size,
+ std::string object_name,
+ bool return_dne_path,
+ std::string& target_cache_file_path);
+ private:
+ enum ThrottleTypeCode {
+ THROTTLE_CODE_BYTE,
+ THROTTLE_CODE_OBJECT
+ };
+
+ std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string oid);
+ std::string get_cache_file_path(std::string cache_file_name,
+ bool mkdir = false);
+ int evict_objects();
+ int do_promote(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, std::string object_name);
+ int promote_object(librados::IoCtx*, std::string object_name,
+ librados::bufferlist* read_buf,
+ Context* on_finish);
+ int handle_promote_callback(int, bufferlist*, std::string);
+ int do_evict(std::string cache_file);
+
+ bool take_token_from_throttle(uint64_t object_size, uint64_t object_num);
+ void handle_throttle_ready(uint64_t tokens, uint64_t type);
+ void apply_qos_tick_and_limit(const uint64_t flag,
+ std::chrono::milliseconds min_tick,
+ uint64_t limit, uint64_t burst,
+ std::chrono::seconds burst_seconds);
+
+ CephContext *m_cct;
+ RadosRef m_rados;
+ std::map<uint64_t, librados::IoCtx> m_ioctx_map;
+ ceph::mutex m_ioctx_map_lock =
+ ceph::make_mutex("ceph::cache::ObjectCacheStore::m_ioctx_map_lock");
+ Policy* m_policy;
+ std::string m_cache_root_dir;
+ // throttle mechanism
+ uint64_t m_qos_enabled_flag{0};
+ std::map<uint64_t, TokenBucketThrottle*> m_throttles;
+ bool m_io_throttled{false};
+ ceph::mutex m_throttle_lock =
+ ceph::make_mutex("ceph::cache::ObjectCacheStore::m_throttle_lock");;
+ uint64_t m_iops_tokens{0};
+ uint64_t m_bps_tokens{0};
+};
+
+} // namespace immutable_obj_cache
+} // ceph
+#endif // CEPH_CACHE_OBJECT_CACHE_STORE_H
diff --git a/src/tools/immutable_object_cache/Policy.h b/src/tools/immutable_object_cache/Policy.h
new file mode 100644
index 000000000..7924a8919
--- /dev/null
+++ b/src/tools/immutable_object_cache/Policy.h
@@ -0,0 +1,34 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_POLICY_H
+#define CEPH_CACHE_POLICY_H
+
+#include <list>
+#include <string>
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+typedef enum {
+ OBJ_CACHE_NONE = 0,
+ OBJ_CACHE_PROMOTED,
+ OBJ_CACHE_SKIP,
+ OBJ_CACHE_DNE,
+} cache_status_t;
+
+class Policy {
+ public:
+ Policy() {}
+ virtual ~Policy() {}
+ virtual cache_status_t lookup_object(std::string) = 0;
+ virtual int evict_entry(std::string) = 0;
+ virtual void update_status(std::string, cache_status_t,
+ uint64_t size = 0) = 0;
+ virtual cache_status_t get_status(std::string) = 0;
+ virtual void get_evict_list(std::list<std::string>* obj_list) = 0;
+};
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+#endif
diff --git a/src/tools/immutable_object_cache/SimplePolicy.cc b/src/tools/immutable_object_cache/SimplePolicy.cc
new file mode 100644
index 000000000..3a7375ba9
--- /dev/null
+++ b/src/tools/immutable_object_cache/SimplePolicy.cc
@@ -0,0 +1,216 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/debug.h"
+#include "SimplePolicy.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::SimplePolicy: " << this << " " \
+ << __func__ << ": "
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+SimplePolicy::SimplePolicy(CephContext *cct, uint64_t cache_size,
+ uint64_t max_inflight, double watermark)
+ : cct(cct), m_watermark(watermark), m_max_inflight_ops(max_inflight),
+ m_max_cache_size(cache_size) {
+
+ ldout(cct, 20) << "max cache size= " << m_max_cache_size
+ << " ,watermark= " << m_watermark
+ << " ,max inflight ops= " << m_max_inflight_ops << dendl;
+
+ m_cache_size = 0;
+
+}
+
+SimplePolicy::~SimplePolicy() {
+ ldout(cct, 20) << dendl;
+
+ for (auto it : m_cache_map) {
+ Entry* entry = (it.second);
+ delete entry;
+ }
+}
+
+cache_status_t SimplePolicy::alloc_entry(std::string file_name) {
+ ldout(cct, 20) << "alloc entry for: " << file_name << dendl;
+
+ std::unique_lock wlocker{m_cache_map_lock};
+
+ // cache hit when promoting
+ if (m_cache_map.find(file_name) != m_cache_map.end()) {
+ ldout(cct, 20) << "object is under promoting: " << file_name << dendl;
+ return OBJ_CACHE_SKIP;
+ }
+
+ if ((m_cache_size < m_max_cache_size) &&
+ (inflight_ops < m_max_inflight_ops)) {
+ Entry* entry = new Entry();
+ ceph_assert(entry != nullptr);
+ m_cache_map[file_name] = entry;
+ wlocker.unlock();
+ update_status(file_name, OBJ_CACHE_SKIP);
+ return OBJ_CACHE_NONE; // start promotion request
+ }
+
+ // if there's no free entry, return skip to read from rados
+ return OBJ_CACHE_SKIP;
+}
+
+cache_status_t SimplePolicy::lookup_object(std::string file_name) {
+ ldout(cct, 20) << "lookup: " << file_name << dendl;
+
+ std::shared_lock rlocker{m_cache_map_lock};
+
+ auto entry_it = m_cache_map.find(file_name);
+ // simply promote on first lookup
+ if (entry_it == m_cache_map.end()) {
+ rlocker.unlock();
+ return alloc_entry(file_name);
+ }
+
+ Entry* entry = entry_it->second;
+
+ if (entry->status == OBJ_CACHE_PROMOTED || entry->status == OBJ_CACHE_DNE) {
+ // bump pos in lru on hit
+ m_promoted_lru.lru_touch(entry);
+ }
+
+ return entry->status;
+}
+
+void SimplePolicy::update_status(std::string file_name,
+ cache_status_t new_status, uint64_t size) {
+ ldout(cct, 20) << "update status for: " << file_name
+ << " new status = " << new_status << dendl;
+
+ std::unique_lock locker{m_cache_map_lock};
+
+ auto entry_it = m_cache_map.find(file_name);
+ if (entry_it == m_cache_map.end()) {
+ return;
+ }
+
+ ceph_assert(entry_it != m_cache_map.end());
+ Entry* entry = entry_it->second;
+
+ // to promote
+ if (entry->status == OBJ_CACHE_NONE && new_status== OBJ_CACHE_SKIP) {
+ entry->status = new_status;
+ entry->file_name = file_name;
+ inflight_ops++;
+ return;
+ }
+
+ // promoting done
+ if (entry->status == OBJ_CACHE_SKIP && (new_status== OBJ_CACHE_PROMOTED ||
+ new_status== OBJ_CACHE_DNE)) {
+ m_promoted_lru.lru_insert_top(entry);
+ entry->status = new_status;
+ entry->size = size;
+ m_cache_size += entry->size;
+ inflight_ops--;
+ return;
+ }
+
+ // promoting failed
+ if (entry->status == OBJ_CACHE_SKIP && new_status== OBJ_CACHE_NONE) {
+ // mark this entry as free
+ entry->file_name = "";
+ entry->status = new_status;
+
+ m_cache_map.erase(entry_it);
+ inflight_ops--;
+ delete entry;
+ return;
+ }
+
+ // to evict
+ if ((entry->status == OBJ_CACHE_PROMOTED || entry->status == OBJ_CACHE_DNE) &&
+ new_status== OBJ_CACHE_NONE) {
+ // mark this entry as free
+ uint64_t size = entry->size;
+ entry->file_name = "";
+ entry->size = 0;
+ entry->status = new_status;
+
+ m_promoted_lru.lru_remove(entry);
+ m_cache_map.erase(entry_it);
+ m_cache_size -= size;
+ delete entry;
+ return;
+ }
+}
+
+int SimplePolicy::evict_entry(std::string file_name) {
+ ldout(cct, 20) << "to evict: " << file_name << dendl;
+
+ update_status(file_name, OBJ_CACHE_NONE);
+
+ return 0;
+}
+
+cache_status_t SimplePolicy::get_status(std::string file_name) {
+ ldout(cct, 20) << file_name << dendl;
+
+ std::shared_lock locker{m_cache_map_lock};
+ auto entry_it = m_cache_map.find(file_name);
+ if (entry_it == m_cache_map.end()) {
+ return OBJ_CACHE_NONE;
+ }
+
+ return entry_it->second->status;
+}
+
+void SimplePolicy::get_evict_list(std::list<std::string>* obj_list) {
+ ldout(cct, 20) << dendl;
+
+ std::unique_lock locker{m_cache_map_lock};
+ // check free ratio, pop entries from LRU
+ if ((double)m_cache_size > m_max_cache_size * m_watermark) {
+ // TODO(dehao): make this configurable
+ int evict_num = m_cache_map.size() * 0.1;
+ for (int i = 0; i < evict_num; i++) {
+ Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire());
+ if (entry == nullptr) {
+ continue;
+ }
+ std::string file_name = entry->file_name;
+ obj_list->push_back(file_name);
+ }
+ }
+}
+
+// for unit test
+uint64_t SimplePolicy::get_free_size() {
+ return m_max_cache_size - m_cache_size;
+}
+
+uint64_t SimplePolicy::get_promoting_entry_num() {
+ uint64_t index = 0;
+ std::shared_lock rlocker{m_cache_map_lock};
+ for (auto it : m_cache_map) {
+ if (it.second->status == OBJ_CACHE_SKIP) {
+ index++;
+ }
+ }
+ return index;
+}
+
+uint64_t SimplePolicy::get_promoted_entry_num() {
+ return m_promoted_lru.lru_get_size();
+}
+
+std::string SimplePolicy::get_evict_entry() {
+ Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_get_next_expire());
+ if (entry == nullptr) {
+ return "";
+ }
+ return entry->file_name;
+}
+
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/SimplePolicy.h b/src/tools/immutable_object_cache/SimplePolicy.h
new file mode 100644
index 000000000..671cbd518
--- /dev/null
+++ b/src/tools/immutable_object_cache/SimplePolicy.h
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_SIMPLE_POLICY_H
+#define CEPH_CACHE_SIMPLE_POLICY_H
+
+#include "common/ceph_context.h"
+#include "common/ceph_mutex.h"
+#include "include/lru.h"
+#include "Policy.h"
+
+#include <unordered_map>
+#include <string>
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+class SimplePolicy : public Policy {
+ public:
+ SimplePolicy(CephContext *cct, uint64_t block_num, uint64_t max_inflight,
+ double watermark);
+ ~SimplePolicy();
+
+ cache_status_t lookup_object(std::string file_name);
+ cache_status_t get_status(std::string file_name);
+
+ void update_status(std::string file_name,
+ cache_status_t new_status,
+ uint64_t size = 0);
+
+ int evict_entry(std::string file_name);
+
+ void get_evict_list(std::list<std::string>* obj_list);
+
+ uint64_t get_free_size();
+ uint64_t get_promoting_entry_num();
+ uint64_t get_promoted_entry_num();
+ std::string get_evict_entry();
+
+ private:
+ cache_status_t alloc_entry(std::string file_name);
+
+ class Entry : public LRUObject {
+ public:
+ cache_status_t status;
+ Entry() : status(OBJ_CACHE_NONE) {}
+ std::string file_name;
+ uint64_t size;
+ };
+
+ CephContext* cct;
+ double m_watermark;
+ uint64_t m_max_inflight_ops;
+ uint64_t m_max_cache_size;
+ std::atomic<uint64_t> inflight_ops = 0;
+
+ std::unordered_map<std::string, Entry*> m_cache_map;
+ ceph::shared_mutex m_cache_map_lock =
+ ceph::make_shared_mutex("rbd::cache::SimplePolicy::m_cache_map_lock");
+
+ std::atomic<uint64_t> m_cache_size;
+
+ LRU m_promoted_lru;
+};
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+#endif // CEPH_CACHE_SIMPLE_POLICY_H
diff --git a/src/tools/immutable_object_cache/SocketCommon.h b/src/tools/immutable_object_cache/SocketCommon.h
new file mode 100644
index 000000000..99acf3609
--- /dev/null
+++ b/src/tools/immutable_object_cache/SocketCommon.h
@@ -0,0 +1,31 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_SOCKET_COMMON_H
+#define CEPH_CACHE_SOCKET_COMMON_H
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+static const int RBDSC_REGISTER = 0X11;
+static const int RBDSC_READ = 0X12;
+static const int RBDSC_REGISTER_REPLY = 0X13;
+static const int RBDSC_READ_REPLY = 0X14;
+static const int RBDSC_READ_RADOS = 0X15;
+
+static const int ASIO_ERROR_READ = 0X01;
+static const int ASIO_ERROR_WRITE = 0X02;
+static const int ASIO_ERROR_CONNECT = 0X03;
+static const int ASIO_ERROR_ACCEPT = 0X04;
+static const int ASIO_ERROR_MSG_INCOMPLETE = 0X05;
+
+class ObjectCacheRequest;
+class CacheSession;
+
+typedef GenContextURef<ObjectCacheRequest*> CacheGenContextURef;
+
+typedef std::function<void(CacheSession*, ObjectCacheRequest*)> ProcessMsg;
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+#endif // CEPH_CACHE_SOCKET_COMMON_H
diff --git a/src/tools/immutable_object_cache/Types.cc b/src/tools/immutable_object_cache/Types.cc
new file mode 100644
index 000000000..860017d6a
--- /dev/null
+++ b/src/tools/immutable_object_cache/Types.cc
@@ -0,0 +1,184 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Types.h"
+#include "SocketCommon.h"
+
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::Types: " << __func__ << ": "
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+ObjectCacheRequest::ObjectCacheRequest() {}
+ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s)
+ : type(t), seq(s) {}
+ObjectCacheRequest::~ObjectCacheRequest() {}
+
+void ObjectCacheRequest::encode() {
+ ENCODE_START(2, 1, payload);
+ ceph::encode(type, payload);
+ ceph::encode(seq, payload);
+ if (!payload_empty()) {
+ encode_payload();
+ }
+ ENCODE_FINISH(payload);
+}
+
+void ObjectCacheRequest::decode(bufferlist& bl) {
+ auto i = bl.cbegin();
+ DECODE_START(2, i);
+ ceph::decode(type, i);
+ ceph::decode(seq, i);
+ if (!payload_empty()) {
+ decode_payload(i, struct_v);
+ }
+ DECODE_FINISH(i);
+}
+
+ObjectCacheRegData::ObjectCacheRegData() {}
+ObjectCacheRegData::ObjectCacheRegData(uint16_t t, uint64_t s)
+ : ObjectCacheRequest(t, s) {}
+ObjectCacheRegData::ObjectCacheRegData(uint16_t t, uint64_t s,
+ const std::string &version)
+ : ObjectCacheRequest(t, s),
+ version(version) {
+}
+
+ObjectCacheRegData::~ObjectCacheRegData() {}
+
+void ObjectCacheRegData::encode_payload() {
+ ceph::encode(version, payload);
+}
+
+void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i,
+ __u8 encode_version) {
+ if (i.end()) {
+ return;
+ }
+ ceph::decode(version, i);
+}
+
+ObjectCacheRegReplyData::ObjectCacheRegReplyData() {}
+ObjectCacheRegReplyData::ObjectCacheRegReplyData(uint16_t t, uint64_t s)
+ : ObjectCacheRequest(t, s) {}
+
+ObjectCacheRegReplyData::~ObjectCacheRegReplyData() {}
+
+void ObjectCacheRegReplyData::encode_payload() {}
+
+void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl,
+ __u8 encode_version) {}
+
+ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s,
+ uint64_t read_offset,
+ uint64_t read_len,
+ uint64_t pool_id, uint64_t snap_id,
+ uint64_t object_size,
+ std::string oid,
+ std::string pool_namespace)
+ : ObjectCacheRequest(t, s), read_offset(read_offset),
+ read_len(read_len), pool_id(pool_id), snap_id(snap_id),
+ object_size(object_size), oid(oid), pool_namespace(pool_namespace)
+{}
+
+ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s)
+ : ObjectCacheRequest(t, s) {}
+
+ObjectCacheReadData::~ObjectCacheReadData() {}
+
+void ObjectCacheReadData::encode_payload() {
+ ceph::encode(read_offset, payload);
+ ceph::encode(read_len, payload);
+ ceph::encode(pool_id, payload);
+ ceph::encode(snap_id, payload);
+ ceph::encode(oid, payload);
+ ceph::encode(pool_namespace, payload);
+ ceph::encode(object_size, payload);
+}
+
+void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i,
+ __u8 encode_version) {
+ ceph::decode(read_offset, i);
+ ceph::decode(read_len, i);
+ ceph::decode(pool_id, i);
+ ceph::decode(snap_id, i);
+ ceph::decode(oid, i);
+ ceph::decode(pool_namespace, i);
+ if (encode_version >= 2) {
+ ceph::decode(object_size, i);
+ }
+}
+
+ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s,
+ string cache_path)
+ : ObjectCacheRequest(t, s), cache_path(cache_path) {}
+ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s)
+ : ObjectCacheRequest(t, s) {}
+
+ObjectCacheReadReplyData::~ObjectCacheReadReplyData() {}
+
+void ObjectCacheReadReplyData::encode_payload() {
+ ceph::encode(cache_path, payload);
+}
+
+void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i,
+ __u8 encode_version) {
+ ceph::decode(cache_path, i);
+}
+
+ObjectCacheReadRadosData::ObjectCacheReadRadosData() {}
+ObjectCacheReadRadosData::ObjectCacheReadRadosData(uint16_t t, uint64_t s)
+ : ObjectCacheRequest(t, s) {}
+
+ObjectCacheReadRadosData::~ObjectCacheReadRadosData() {}
+
+void ObjectCacheReadRadosData::encode_payload() {}
+
+void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i,
+ __u8 encode_version) {}
+
+ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) {
+ ObjectCacheRequest* req = nullptr;
+
+ uint16_t type;
+ uint64_t seq;
+ auto i = payload_buffer.cbegin();
+ DECODE_START(1, i);
+ ceph::decode(type, i);
+ ceph::decode(seq, i);
+ DECODE_FINISH(i);
+
+ switch (type) {
+ case RBDSC_REGISTER: {
+ req = new ObjectCacheRegData(type, seq);
+ break;
+ }
+ case RBDSC_READ: {
+ req = new ObjectCacheReadData(type, seq);
+ break;
+ }
+ case RBDSC_REGISTER_REPLY: {
+ req = new ObjectCacheRegReplyData(type, seq);
+ break;
+ }
+ case RBDSC_READ_REPLY: {
+ req = new ObjectCacheReadReplyData(type, seq);
+ break;
+ }
+ case RBDSC_READ_RADOS: {
+ req = new ObjectCacheReadRadosData(type, seq);
+ break;
+ }
+ default:
+ ceph_assert(0);
+ }
+
+ req->decode(payload_buffer);
+
+ return req;
+}
+
+} // namespace immutable_obj_cache
+} // namespace ceph
diff --git a/src/tools/immutable_object_cache/Types.h b/src/tools/immutable_object_cache/Types.h
new file mode 100644
index 000000000..05394d843
--- /dev/null
+++ b/src/tools/immutable_object_cache/Types.h
@@ -0,0 +1,136 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_TYPES_H
+#define CEPH_CACHE_TYPES_H
+
+#include "include/encoding.h"
+#include "include/Context.h"
+#include "SocketCommon.h"
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+namespace {
+struct HeaderHelper {
+ uint8_t v;
+ uint8_t c_v;
+ ceph_le32 len;
+}__attribute__((packed));
+
+inline uint8_t get_header_size() {
+ return sizeof(HeaderHelper);
+}
+
+inline uint32_t get_data_len(char* buf) {
+ HeaderHelper* header = reinterpret_cast<HeaderHelper*>(buf);
+ return header->len;
+}
+} // namespace
+
+class ObjectCacheRequest {
+ public:
+ uint16_t type;
+ uint64_t seq;
+
+ bufferlist payload;
+
+ CacheGenContextURef process_msg;
+
+ ObjectCacheRequest();
+ ObjectCacheRequest(uint16_t type, uint64_t seq);
+ virtual ~ObjectCacheRequest();
+
+ // encode consists of two steps
+ // step 1 : directly encode common bits using encode method of base classs.
+ // step 2 : according to payload_empty, determine whether addtional bits
+ // need to be encoded which be implements by child class.
+ void encode();
+ void decode(bufferlist& bl);
+ bufferlist get_payload_bufferlist() { return payload; }
+
+ virtual void encode_payload() = 0;
+ virtual void decode_payload(bufferlist::const_iterator bl_it,
+ __u8 encode_version) = 0;
+ virtual uint16_t get_request_type() = 0;
+ virtual bool payload_empty() = 0;
+};
+
+class ObjectCacheRegData : public ObjectCacheRequest {
+ public:
+ std::string version;
+ ObjectCacheRegData();
+ ObjectCacheRegData(uint16_t t, uint64_t s, const std::string &version);
+ ObjectCacheRegData(uint16_t t, uint64_t s);
+ ~ObjectCacheRegData() override;
+ void encode_payload() override;
+ void decode_payload(bufferlist::const_iterator bl,
+ __u8 encode_version) override;
+ uint16_t get_request_type() override { return RBDSC_REGISTER; }
+ bool payload_empty() override { return false; }
+};
+
+class ObjectCacheRegReplyData : public ObjectCacheRequest {
+ public:
+ ObjectCacheRegReplyData();
+ ObjectCacheRegReplyData(uint16_t t, uint64_t s);
+ ~ObjectCacheRegReplyData() override;
+ void encode_payload() override;
+ void decode_payload(bufferlist::const_iterator iter,
+ __u8 encode_version) override;
+ uint16_t get_request_type() override { return RBDSC_REGISTER_REPLY; }
+ bool payload_empty() override { return true; }
+};
+
+class ObjectCacheReadData : public ObjectCacheRequest {
+ public:
+ uint64_t read_offset;
+ uint64_t read_len;
+ uint64_t pool_id;
+ uint64_t snap_id;
+ uint64_t object_size = 0;
+ std::string oid;
+ std::string pool_namespace;
+ ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset,
+ uint64_t read_len, uint64_t pool_id,
+ uint64_t snap_id, uint64_t object_size,
+ std::string oid, std::string pool_namespace);
+ ObjectCacheReadData(uint16_t t, uint64_t s);
+ ~ObjectCacheReadData() override;
+ void encode_payload() override;
+ void decode_payload(bufferlist::const_iterator bl,
+ __u8 encode_version) override;
+ uint16_t get_request_type() override { return RBDSC_READ; }
+ bool payload_empty() override { return false; }
+};
+
+class ObjectCacheReadReplyData : public ObjectCacheRequest {
+ public:
+ std::string cache_path;
+ ObjectCacheReadReplyData(uint16_t t, uint64_t s, std::string cache_path);
+ ObjectCacheReadReplyData(uint16_t t, uint64_t s);
+ ~ObjectCacheReadReplyData() override;
+ void encode_payload() override;
+ void decode_payload(bufferlist::const_iterator bl,
+ __u8 encode_version) override;
+ uint16_t get_request_type() override { return RBDSC_READ_REPLY; }
+ bool payload_empty() override { return false; }
+};
+
+class ObjectCacheReadRadosData : public ObjectCacheRequest {
+ public:
+ ObjectCacheReadRadosData();
+ ObjectCacheReadRadosData(uint16_t t, uint64_t s);
+ ~ObjectCacheReadRadosData() override;
+ void encode_payload() override;
+ void decode_payload(bufferlist::const_iterator bl,
+ __u8 encode_version) override;
+ uint16_t get_request_type() override { return RBDSC_READ_RADOS; }
+ bool payload_empty() override { return true; }
+};
+
+ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer);
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+#endif // CEPH_CACHE_TYPES_H
diff --git a/src/tools/immutable_object_cache/Utils.h b/src/tools/immutable_object_cache/Utils.h
new file mode 100644
index 000000000..3c68cfa7b
--- /dev/null
+++ b/src/tools/immutable_object_cache/Utils.h
@@ -0,0 +1,31 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_CACHE_UTILS_H
+#define CEPH_CACHE_UTILS_H
+
+#include "include/rados/librados.hpp"
+#include "include/Context.h"
+
+namespace ceph {
+namespace immutable_obj_cache {
+namespace detail {
+
+template <typename T, void(T::*MF)(int)>
+void rados_callback(rados_completion_t c, void *arg) {
+ T *obj = reinterpret_cast<T*>(arg);
+ int r = rados_aio_get_return_value(c);
+ (obj->*MF)(r);
+}
+
+} // namespace detail
+
+template <typename T, void(T::*MF)(int)=&T::complete>
+librados::AioCompletion *create_rados_callback(T *obj) {
+ return librados::Rados::aio_create_completion(
+ obj, &detail::rados_callback<T, MF>);
+}
+
+} // namespace immutable_obj_cache
+} // namespace ceph
+#endif // CEPH_CACHE_UTILS_H
diff --git a/src/tools/immutable_object_cache/main.cc b/src/tools/immutable_object_cache/main.cc
new file mode 100644
index 000000000..55b0d087a
--- /dev/null
+++ b/src/tools/immutable_object_cache/main.cc
@@ -0,0 +1,84 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/ceph_argparse.h"
+#include "common/config.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "global/global_init.h"
+#include "global/signal_handler.h"
+#include "CacheController.h"
+
+#include <vector>
+
+ceph::immutable_obj_cache::CacheController *cachectl = nullptr;
+
+void usage() {
+ std::cout << "usage: ceph-immutable-object-cache [options...]" << std::endl;
+ std::cout << "options:\n";
+ std::cout << " -m monaddress[:port] connect to specified monitor\n";
+ std::cout << " --keyring=<path> path to keyring for local "
+ << "cluster\n";
+ std::cout << " --log-file=<logfile> file to log debug output\n";
+ std::cout << " --debug-immutable-obj-cache=<log-level>/<memory-level> "
+ << "set debug level\n";
+ generic_server_usage();
+}
+
+static void handle_signal(int signum) {
+ if (cachectl)
+ cachectl->handle_signal(signum);
+}
+
+int main(int argc, const char **argv) {
+ std::vector<const char*> args;
+ env_to_vec(args);
+ argv_to_vec(argc, argv, args);
+
+ if (ceph_argparse_need_usage(args)) {
+ usage();
+ exit(0);
+ }
+
+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
+ CODE_ENVIRONMENT_DAEMON,
+ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
+
+ if (g_conf()->daemonize) {
+ global_init_daemonize(g_ceph_context);
+ }
+
+ common_init_finish(g_ceph_context);
+ global_init_chdir(g_ceph_context);
+
+ init_async_signal_handler();
+ register_async_signal_handler(SIGHUP, sighup_handler);
+ register_async_signal_handler_oneshot(SIGINT, handle_signal);
+ register_async_signal_handler_oneshot(SIGTERM, handle_signal);
+
+ std::vector<const char*> cmd_args;
+ argv_to_vec(argc, argv, cmd_args);
+
+ cachectl = new ceph::immutable_obj_cache::CacheController(g_ceph_context,
+ cmd_args);
+ int r = cachectl->init();
+ if (r < 0) {
+ std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl;
+ goto cleanup;
+ }
+
+ r = cachectl->run();
+ if (r < 0) {
+ goto cleanup;
+ }
+
+ cleanup:
+ unregister_async_signal_handler(SIGHUP, sighup_handler);
+ unregister_async_signal_handler(SIGINT, handle_signal);
+ unregister_async_signal_handler(SIGTERM, handle_signal);
+ shutdown_async_signal_handler();
+
+ delete cachectl;
+
+ return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
+}