diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/tools/immutable_object_cache | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
19 files changed, 2399 insertions, 0 deletions
diff --git a/src/tools/immutable_object_cache/CMakeLists.txt b/src/tools/immutable_object_cache/CMakeLists.txt new file mode 100644 index 000000000..ed118ed6f --- /dev/null +++ b/src/tools/immutable_object_cache/CMakeLists.txt @@ -0,0 +1,19 @@ +set(ceph_immutable_object_cache_files + ObjectCacheStore.cc + CacheController.cc + CacheServer.cc + CacheClient.cc + CacheSession.cc + SimplePolicy.cc + Types.cc + ) +add_library(ceph_immutable_object_cache_lib STATIC ${ceph_immutable_object_cache_files}) + +add_executable(ceph-immutable-object-cache + main.cc) +target_link_libraries(ceph-immutable-object-cache + ceph_immutable_object_cache_lib + librados + StdFilesystem::filesystem + global) +install(TARGETS ceph-immutable-object-cache DESTINATION bin) diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc new file mode 100644 index 000000000..2b837be51 --- /dev/null +++ b/src/tools/immutable_object_cache/CacheClient.cc @@ -0,0 +1,435 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <boost/bind/bind.hpp> +#include "CacheClient.h" +#include "common/Cond.h" +#include "common/version.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_immutable_obj_cache +#undef dout_prefix +#define dout_prefix *_dout << "ceph::cache::CacheClient: " << this << " " \ + << __func__ << ": " + +namespace ceph { +namespace immutable_obj_cache { + + CacheClient::CacheClient(const std::string& file, CephContext* ceph_ctx) + : m_cct(ceph_ctx), m_io_service_work(m_io_service), + m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)), + m_io_thread(nullptr), m_session_work(false), m_writing(false), + m_reading(false), m_sequence_id(0) { + m_worker_thread_num = + m_cct->_conf.get_val<uint64_t>( + "immutable_object_cache_client_dedicated_thread_num"); + + if (m_worker_thread_num != 0) { + m_worker = new boost::asio::io_service(); + m_worker_io_service_work = new boost::asio::io_service::work(*m_worker); + for (uint64_t i = 0; i < m_worker_thread_num; i++) { + std::thread* thd = new std::thread([this](){m_worker->run();}); + m_worker_threads.push_back(thd); + } + } + m_bp_header = buffer::create(get_header_size()); + } + + CacheClient::~CacheClient() { + stop(); + } + + void CacheClient::run() { + m_io_thread.reset(new std::thread([this](){m_io_service.run(); })); + } + + bool CacheClient::is_session_work() { + return m_session_work.load() == true; + } + + int CacheClient::stop() { + m_session_work.store(false); + m_io_service.stop(); + + if (m_io_thread != nullptr) { + m_io_thread->join(); + } + if (m_worker_thread_num != 0) { + m_worker->stop(); + for (auto thd : m_worker_threads) { + thd->join(); + delete thd; + } + delete m_worker_io_service_work; + delete m_worker; + } + return 0; + } + + // close domain socket + void CacheClient::close() { + m_session_work.store(false); + boost::system::error_code close_ec; + m_dm_socket.close(close_ec); + if (close_ec) { + ldout(m_cct, 20) << "close: " << close_ec.message() << dendl; + } + } + + // sync connect + int CacheClient::connect() { + int ret = -1; + C_SaferCond cond; + Context* on_finish = new LambdaContext([&cond, &ret](int err) { + ret = err; + cond.complete(err); + }); + + connect(on_finish); + cond.wait(); + + return ret; + } + + // async connect + void CacheClient::connect(Context* on_finish) { + m_dm_socket.async_connect(m_ep, + boost::bind(&CacheClient::handle_connect, this, + on_finish, boost::asio::placeholders::error)); + } + + void CacheClient::handle_connect(Context* on_finish, + const boost::system::error_code& err) { + if (err) { + ldout(m_cct, 20) << "fails to connect to cache server. error : " + << err.message() << dendl; + fault(ASIO_ERROR_CONNECT, err); + on_finish->complete(-1); + return; + } + + ldout(m_cct, 20) << "successfully connected to cache server." << dendl; + on_finish->complete(0); + } + + void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, uint64_t object_size, + std::string oid, + CacheGenContextURef&& on_finish) { + ldout(m_cct, 20) << dendl; + ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ, + ++m_sequence_id, 0, 0, pool_id, + snap_id, object_size, oid, pool_nspace); + req->process_msg = std::move(on_finish); + req->encode(); + + { + std::lock_guard locker{m_lock}; + m_outcoming_bl.append(req->get_payload_bufferlist()); + ceph_assert(m_seq_to_req.find(req->seq) == m_seq_to_req.end()); + m_seq_to_req[req->seq] = req; + } + + // try to send message to server. + try_send(); + + // try to receive ack from server. + try_receive(); + } + + void CacheClient::try_send() { + ldout(m_cct, 20) << dendl; + if (!m_writing.load()) { + m_writing.store(true); + send_message(); + } + } + + void CacheClient::send_message() { + ldout(m_cct, 20) << dendl; + bufferlist bl; + { + std::lock_guard locker{m_lock}; + bl.swap(m_outcoming_bl); + ceph_assert(m_outcoming_bl.length() == 0); + } + + // send bytes as many as possible. + boost::asio::async_write(m_dm_socket, + boost::asio::buffer(bl.c_str(), bl.length()), + boost::asio::transfer_exactly(bl.length()), + [this, bl](const boost::system::error_code& err, size_t cb) { + if (err || cb != bl.length()) { + fault(ASIO_ERROR_WRITE, err); + return; + } + + ceph_assert(cb == bl.length()); + + { + std::lock_guard locker{m_lock}; + if (m_outcoming_bl.length() == 0) { + m_writing.store(false); + return; + } + } + + // still have left bytes, continue to send. + send_message(); + }); + try_receive(); + } + + void CacheClient::try_receive() { + ldout(m_cct, 20) << dendl; + if (!m_reading.load()) { + m_reading.store(true); + receive_message(); + } + } + + void CacheClient::receive_message() { + ldout(m_cct, 20) << dendl; + ceph_assert(m_reading.load()); + read_reply_header(); + } + + void CacheClient::read_reply_header() { + ldout(m_cct, 20) << dendl; + /* create new head buffer for every reply */ + bufferptr bp_head(buffer::create(get_header_size())); + auto raw_ptr = bp_head.c_str(); + + boost::asio::async_read(m_dm_socket, + boost::asio::buffer(raw_ptr, get_header_size()), + boost::asio::transfer_exactly(get_header_size()), + boost::bind(&CacheClient::handle_reply_header, + this, bp_head, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } + + void CacheClient::handle_reply_header(bufferptr bp_head, + const boost::system::error_code& ec, + size_t bytes_transferred) { + ldout(m_cct, 20) << dendl; + if (ec || bytes_transferred != get_header_size()) { + fault(ASIO_ERROR_READ, ec); + return; + } + + ceph_assert(bytes_transferred == bp_head.length()); + + uint32_t data_len = get_data_len(bp_head.c_str()); + + bufferptr bp_data(buffer::create(data_len)); + read_reply_data(std::move(bp_head), std::move(bp_data), data_len); + } + + void CacheClient::read_reply_data(bufferptr&& bp_head, + bufferptr&& bp_data, + const uint64_t data_len) { + ldout(m_cct, 20) << dendl; + auto raw_ptr = bp_data.c_str(); + boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len), + boost::asio::transfer_exactly(data_len), + boost::bind(&CacheClient::handle_reply_data, + this, std::move(bp_head), std::move(bp_data), data_len, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } + + void CacheClient::handle_reply_data(bufferptr bp_head, + bufferptr bp_data, + const uint64_t data_len, + const boost::system::error_code& ec, + size_t bytes_transferred) { + ldout(m_cct, 20) << dendl; + if (ec || bytes_transferred != data_len) { + fault(ASIO_ERROR_WRITE, ec); + return; + } + ceph_assert(bp_data.length() == data_len); + + bufferlist data_buffer; + data_buffer.append(std::move(bp_head)); + data_buffer.append(std::move(bp_data)); + + ObjectCacheRequest* reply = decode_object_cache_request(data_buffer); + data_buffer.clear(); + ceph_assert(data_buffer.length() == 0); + + process(reply, reply->seq); + + { + std::lock_guard locker{m_lock}; + if (m_seq_to_req.size() == 0 && m_outcoming_bl.length()) { + m_reading.store(false); + return; + } + } + if (is_session_work()) { + receive_message(); + } + } + + void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) { + ldout(m_cct, 20) << dendl; + ObjectCacheRequest* current_request = nullptr; + { + std::lock_guard locker{m_lock}; + ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end()); + current_request = m_seq_to_req[seq_id]; + m_seq_to_req.erase(seq_id); + } + + ceph_assert(current_request != nullptr); + auto process_reply = new LambdaContext([current_request, reply] + (bool dedicated) { + if (dedicated) { + // dedicated thrad to execute this context. + } + current_request->process_msg.release()->complete(reply); + delete current_request; + delete reply; + }); + + if (m_worker_thread_num != 0) { + m_worker->post([process_reply]() { + process_reply->complete(true); + }); + } else { + process_reply->complete(false); + } + } + + // if there is one request fails, just execute fault, then shutdown RO. + void CacheClient::fault(const int err_type, + const boost::system::error_code& ec) { + ldout(m_cct, 20) << "fault." << ec.message() << dendl; + + if (err_type == ASIO_ERROR_CONNECT) { + ceph_assert(!m_session_work.load()); + if (ec == boost::asio::error::connection_refused) { + ldout(m_cct, 20) << "Connecting RO daenmon fails : "<< ec.message() + << ". Immutable-object-cache daemon is down ? " + << "Data will be read from ceph cluster " << dendl; + } else { + ldout(m_cct, 20) << "Connecting RO daemon fails : " + << ec.message() << dendl; + } + + if (m_dm_socket.is_open()) { + // Set to indicate what error occurred, if any. + // Note that, even if the function indicates an error, + // the underlying descriptor is closed. + boost::system::error_code close_ec; + m_dm_socket.close(close_ec); + if (close_ec) { + ldout(m_cct, 20) << "close: " << close_ec.message() << dendl; + } + } + return; + } + + if (!m_session_work.load()) { + return; + } + + /* when current session don't work, ASIO will don't receive any new request from hook. + * On the other hand, for pending request of ASIO, cancle these request, + * then call their callback. these request which are cancled by this method, + * will be re-dispatched to RADOS layer. + * make sure just have one thread to modify execute below code. */ + m_session_work.store(false); + + if (err_type == ASIO_ERROR_MSG_INCOMPLETE) { + ldout(m_cct, 20) << "ASIO In-complete message." << ec.message() << dendl; + ceph_assert(0); + } + + if (err_type == ASIO_ERROR_READ) { + ldout(m_cct, 20) << "ASIO async read fails : " << ec.message() << dendl; + } + + if (err_type == ASIO_ERROR_WRITE) { + ldout(m_cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl; + // CacheClient should not occur this error. + ceph_assert(0); + } + + // currently, for any asio error, just shutdown RO. + close(); + + /* all pending request, which have entered into ASIO, + * will be re-dispatched to RADOS.*/ + { + std::lock_guard locker{m_lock}; + for (auto it : m_seq_to_req) { + it.second->type = RBDSC_READ_RADOS; + it.second->process_msg->complete(it.second); + } + m_seq_to_req.clear(); + } + + ldout(m_cct, 20) << "Because ASIO domain socket fails, just shutdown RO.\ + Later all reading will be re-dispatched RADOS layer" + << ec.message() << dendl; + } + + // TODO : re-implement this method + int CacheClient::register_client(Context* on_finish) { + ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER, + m_sequence_id++, + ceph_version_to_str()); + reg_req->encode(); + + bufferlist bl; + bl.append(reg_req->get_payload_bufferlist()); + + uint64_t ret; + boost::system::error_code ec; + + ret = boost::asio::write(m_dm_socket, + boost::asio::buffer(bl.c_str(), bl.length()), ec); + + if (ec || ret != bl.length()) { + fault(ASIO_ERROR_WRITE, ec); + return -1; + } + delete reg_req; + + ret = boost::asio::read(m_dm_socket, + boost::asio::buffer(m_bp_header.c_str(), get_header_size()), ec); + if (ec || ret != get_header_size()) { + fault(ASIO_ERROR_READ, ec); + return -1; + } + + uint64_t data_len = get_data_len(m_bp_header.c_str()); + bufferptr bp_data(buffer::create(data_len)); + + ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(), + data_len), ec); + if (ec || ret != data_len) { + fault(ASIO_ERROR_READ, ec); + return -1; + } + + bufferlist data_buffer; + data_buffer.append(m_bp_header); + data_buffer.append(std::move(bp_data)); + ObjectCacheRequest* req = decode_object_cache_request(data_buffer); + if (req->type == RBDSC_REGISTER_REPLY) { + m_session_work.store(true); + on_finish->complete(0); + } else { + on_finish->complete(-1); + } + + delete req; + return 0; + } + +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/CacheClient.h b/src/tools/immutable_object_cache/CacheClient.h new file mode 100644 index 000000000..b2f749631 --- /dev/null +++ b/src/tools/immutable_object_cache/CacheClient.h @@ -0,0 +1,84 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CACHE_CACHE_CLIENT_H +#define CEPH_CACHE_CACHE_CLIENT_H + +#include <atomic> +#include <boost/asio.hpp> +#include <boost/asio/error.hpp> +#include <boost/algorithm/string.hpp> + +#include "include/ceph_assert.h" +#include "common/ceph_mutex.h" +#include "include/Context.h" +#include "Types.h" +#include "SocketCommon.h" + + +using boost::asio::local::stream_protocol; + +namespace ceph { +namespace immutable_obj_cache { + +class CacheClient { + public: + CacheClient(const std::string& file, CephContext* ceph_ctx); + ~CacheClient(); + void run(); + bool is_session_work(); + void close(); + int stop(); + int connect(); + void connect(Context* on_finish); + void lookup_object(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, uint64_t object_size, std::string oid, + CacheGenContextURef&& on_finish); + int register_client(Context* on_finish); + + private: + void send_message(); + void try_send(); + void fault(const int err_type, const boost::system::error_code& err); + void handle_connect(Context* on_finish, const boost::system::error_code& err); + void try_receive(); + void receive_message(); + void process(ObjectCacheRequest* reply, uint64_t seq_id); + void read_reply_header(); + void handle_reply_header(bufferptr bp_head, + const boost::system::error_code& ec, + size_t bytes_transferred); + void read_reply_data(bufferptr&& bp_head, bufferptr&& bp_data, + const uint64_t data_len); + void handle_reply_data(bufferptr bp_head, bufferptr bp_data, + const uint64_t data_len, + const boost::system::error_code& ec, + size_t bytes_transferred); + + private: + CephContext* m_cct; + boost::asio::io_service m_io_service; + boost::asio::io_service::work m_io_service_work; + stream_protocol::socket m_dm_socket; + stream_protocol::endpoint m_ep; + std::shared_ptr<std::thread> m_io_thread; + std::atomic<bool> m_session_work; + + uint64_t m_worker_thread_num; + boost::asio::io_service* m_worker; + std::vector<std::thread*> m_worker_threads; + boost::asio::io_service::work* m_worker_io_service_work; + + std::atomic<bool> m_writing; + std::atomic<bool> m_reading; + std::atomic<uint64_t> m_sequence_id; + ceph::mutex m_lock = + ceph::make_mutex("ceph::cache::cacheclient::m_lock"); + std::map<uint64_t, ObjectCacheRequest*> m_seq_to_req; + bufferlist m_outcoming_bl; + bufferptr m_bp_header; +}; + +} // namespace immutable_obj_cache +} // namespace ceph +#endif // CEPH_CACHE_CACHE_CLIENT_H diff --git a/src/tools/immutable_object_cache/CacheController.cc b/src/tools/immutable_object_cache/CacheController.cc new file mode 100644 index 000000000..ae1636839 --- /dev/null +++ b/src/tools/immutable_object_cache/CacheController.cc @@ -0,0 +1,139 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "CacheController.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_immutable_obj_cache +#undef dout_prefix +#define dout_prefix *_dout << "ceph::cache::CacheController: " << this << " " \ + << __func__ << ": " + +namespace ceph { +namespace immutable_obj_cache { + +CacheController::CacheController(CephContext *cct, + const std::vector<const char*> &args): + m_args(args), m_cct(cct) { + ldout(m_cct, 20) << dendl; +} + +CacheController::~CacheController() { + delete m_cache_server; + delete m_object_cache_store; +} + +int CacheController::init() { + ldout(m_cct, 20) << dendl; + m_object_cache_store = new ObjectCacheStore(m_cct); + // TODO(dehao): make this configurable + int r = m_object_cache_store->init(true); + if (r < 0) { + lderr(m_cct) << "init error\n" << dendl; + return r; + } + + r = m_object_cache_store->init_cache(); + if (r < 0) { + lderr(m_cct) << "init error\n" << dendl; + } + + return r; +} + +int CacheController::shutdown() { + ldout(m_cct, 20) << dendl; + + int r; + if (m_cache_server != nullptr) { + r = m_cache_server->stop(); + if (r < 0) { + lderr(m_cct) << "stop error\n" << dendl; + return r; + } + } + + r = m_object_cache_store->shutdown(); + if (r < 0) { + lderr(m_cct) << "stop error\n" << dendl; + return r; + } + + return r; +} + +void CacheController::handle_signal(int signum) { + shutdown(); +} + +int CacheController::run() { + try { + std::string controller_path = + m_cct->_conf.get_val<std::string>("immutable_object_cache_sock"); + if (controller_path.empty()) { + lderr(m_cct) << "'immutable_object_cache_sock' path not set" << dendl; + return -EINVAL; + } + + std::remove(controller_path.c_str()); + + m_cache_server = new CacheServer(m_cct, controller_path, + std::bind(&CacheController::handle_request, this, + std::placeholders::_1, std::placeholders::_2)); + + int ret = m_cache_server->run(); + if (ret != 0) { + return ret; + } + + return 0; + } catch (std::exception& e) { + lderr(m_cct) << "Exception: " << e.what() << dendl; + return -EFAULT; + } +} + +void CacheController::handle_request(CacheSession* session, + ObjectCacheRequest* req) { + ldout(m_cct, 20) << dendl; + + switch (req->get_request_type()) { + case RBDSC_REGISTER: { + // TODO(dehao): skip register and allow clients to lookup directly + + auto req_reg_data = reinterpret_cast <ObjectCacheRegData*> (req); + session->set_client_version(req_reg_data->version); + + ObjectCacheRequest* reply = new ObjectCacheRegReplyData( + RBDSC_REGISTER_REPLY, req->seq); + session->send(reply); + break; + } + case RBDSC_READ: { + // lookup object in local cache store + std::string cache_path; + ObjectCacheReadData* req_read_data = + reinterpret_cast <ObjectCacheReadData*> (req); + bool return_dne_path = session->client_version().empty(); + int ret = m_object_cache_store->lookup_object( + req_read_data->pool_namespace, req_read_data->pool_id, + req_read_data->snap_id, req_read_data->object_size, + req_read_data->oid, return_dne_path, cache_path); + ObjectCacheRequest* reply = nullptr; + if (ret != OBJ_CACHE_PROMOTED && ret != OBJ_CACHE_DNE) { + reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq); + } else { + reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, + req->seq, cache_path); + } + session->send(reply); + break; + } + default: + ldout(m_cct, 5) << "can't recongize request" << dendl; + ceph_assert(0); + } +} + +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/CacheController.h b/src/tools/immutable_object_cache/CacheController.h new file mode 100644 index 000000000..f70f6bb1c --- /dev/null +++ b/src/tools/immutable_object_cache/CacheController.h @@ -0,0 +1,40 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CACHE_CACHE_CONTROLLER_H +#define CEPH_CACHE_CACHE_CONTROLLER_H + +#include "common/ceph_context.h" +#include "common/WorkQueue.h" +#include "CacheServer.h" +#include "ObjectCacheStore.h" + +namespace ceph { +namespace immutable_obj_cache { + +class CacheController { + public: + CacheController(CephContext *cct, const std::vector<const char*> &args); + ~CacheController(); + + int init(); + + int shutdown(); + + void handle_signal(int sinnum); + + int run(); + + void handle_request(CacheSession* session, ObjectCacheRequest* msg); + + private: + CacheServer *m_cache_server = nullptr; + std::vector<const char*> m_args; + CephContext *m_cct; + ObjectCacheStore *m_object_cache_store = nullptr; +}; + +} // namespace immutable_obj_cache +} // namespace ceph + +#endif // CEPH_CACHE_CACHE_CONTROLLER_H diff --git a/src/tools/immutable_object_cache/CacheServer.cc b/src/tools/immutable_object_cache/CacheServer.cc new file mode 100644 index 000000000..e94a47c7a --- /dev/null +++ b/src/tools/immutable_object_cache/CacheServer.cc @@ -0,0 +1,106 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <boost/bind/bind.hpp> +#include "common/debug.h" +#include "common/ceph_context.h" +#include "CacheServer.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_immutable_obj_cache +#undef dout_prefix +#define dout_prefix *_dout << "ceph::cache::CacheServer: " << this << " " \ + << __func__ << ": " + + +namespace ceph { +namespace immutable_obj_cache { + +CacheServer::CacheServer(CephContext* cct, const std::string& file, + ProcessMsg processmsg) + : cct(cct), m_server_process_msg(processmsg), + m_local_path(file), m_acceptor(m_io_service) {} + +CacheServer::~CacheServer() { + stop(); +} + +int CacheServer::run() { + ldout(cct, 20) << dendl; + + int ret = start_accept(); + if (ret != 0) { + return ret; + } + + boost::system::error_code ec; + ret = m_io_service.run(ec); + if (ec) { + ldout(cct, 1) << "m_io_service run fails: " << ec.message() << dendl; + return -1; + } + return 0; +} + +int CacheServer::stop() { + m_io_service.stop(); + return 0; +} + +int CacheServer::start_accept() { + ldout(cct, 20) << dendl; + + boost::system::error_code ec; + m_acceptor.open(m_local_path.protocol(), ec); + if (ec) { + lderr(cct) << "failed to open domain socket: " << ec.message() << dendl; + return -ec.value(); + } + + m_acceptor.bind(m_local_path, ec); + if (ec) { + lderr(cct) << "failed to bind to domain socket '" + << m_local_path << "': " << ec.message() << dendl; + return -ec.value(); + } + + m_acceptor.listen(boost::asio::socket_base::max_connections, ec); + if (ec) { + lderr(cct) << "failed to listen on domain socket: " << ec.message() + << dendl; + return -ec.value(); + } + + accept(); + return 0; +} + +void CacheServer::accept() { + CacheSessionPtr new_session = nullptr; + + new_session.reset(new CacheSession(m_io_service, + m_server_process_msg, cct)); + + m_acceptor.async_accept(new_session->socket(), + boost::bind(&CacheServer::handle_accept, this, new_session, + boost::asio::placeholders::error)); +} + +void CacheServer::handle_accept(CacheSessionPtr new_session, + const boost::system::error_code& error) { + ldout(cct, 20) << dendl; + if (error) { + // operation_absort + lderr(cct) << "async accept fails : " << error.message() << dendl; + return; + } + + // TODO(dehao) : session setting + new_session->start(); + + // lanuch next accept + accept(); +} + +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/CacheServer.h b/src/tools/immutable_object_cache/CacheServer.h new file mode 100644 index 000000000..31d859934 --- /dev/null +++ b/src/tools/immutable_object_cache/CacheServer.h @@ -0,0 +1,45 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CACHE_CACHE_SERVER_H +#define CEPH_CACHE_CACHE_SERVER_H + +#include <boost/asio.hpp> +#include <boost/asio/error.hpp> + +#include "Types.h" +#include "SocketCommon.h" +#include "CacheSession.h" + + +using boost::asio::local::stream_protocol; + +namespace ceph { +namespace immutable_obj_cache { + +class CacheServer { + public: + CacheServer(CephContext* cct, const std::string& file, ProcessMsg processmsg); + ~CacheServer(); + + int run(); + int start_accept(); + int stop(); + + private: + void accept(); + void handle_accept(CacheSessionPtr new_session, + const boost::system::error_code& error); + + private: + CephContext* cct; + boost::asio::io_service m_io_service; + ProcessMsg m_server_process_msg; + stream_protocol::endpoint m_local_path; + stream_protocol::acceptor m_acceptor; +}; + +} // namespace immutable_obj_cache +} // namespace ceph + +#endif diff --git a/src/tools/immutable_object_cache/CacheSession.cc b/src/tools/immutable_object_cache/CacheSession.cc new file mode 100644 index 000000000..38c38c97d --- /dev/null +++ b/src/tools/immutable_object_cache/CacheSession.cc @@ -0,0 +1,140 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <boost/bind/bind.hpp> +#include "common/debug.h" +#include "common/ceph_context.h" +#include "CacheSession.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_immutable_obj_cache +#undef dout_prefix +#define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \ + << __func__ << ": " + + +namespace ceph { +namespace immutable_obj_cache { + +CacheSession::CacheSession(io_service& io_service, + ProcessMsg processmsg, + CephContext* cct) + : m_dm_socket(io_service), + m_server_process_msg(processmsg), m_cct(cct) { + m_bp_header = buffer::create(get_header_size()); +} + +CacheSession::~CacheSession() { + close(); +} + +stream_protocol::socket& CacheSession::socket() { + return m_dm_socket; +} + +void CacheSession::set_client_version(const std::string &version) { + m_client_version = version; +} + +const std::string &CacheSession::client_version() const { + return m_client_version; +} + +void CacheSession::close() { + if (m_dm_socket.is_open()) { + boost::system::error_code close_ec; + m_dm_socket.close(close_ec); + if (close_ec) { + ldout(m_cct, 20) << "close: " << close_ec.message() << dendl; + } + } +} + +void CacheSession::start() { + read_request_header(); +} + +void CacheSession::read_request_header() { + ldout(m_cct, 20) << dendl; + boost::asio::async_read(m_dm_socket, + boost::asio::buffer(m_bp_header.c_str(), get_header_size()), + boost::asio::transfer_exactly(get_header_size()), + boost::bind(&CacheSession::handle_request_header, + shared_from_this(), boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); +} + +void CacheSession::handle_request_header(const boost::system::error_code& err, + size_t bytes_transferred) { + ldout(m_cct, 20) << dendl; + if (err || bytes_transferred != get_header_size()) { + fault(err); + return; + } + + read_request_data(get_data_len(m_bp_header.c_str())); +} + +void CacheSession::read_request_data(uint64_t data_len) { + ldout(m_cct, 20) << dendl; + bufferptr bp_data(buffer::create(data_len)); + boost::asio::async_read(m_dm_socket, + boost::asio::buffer(bp_data.c_str(), bp_data.length()), + boost::asio::transfer_exactly(data_len), + boost::bind(&CacheSession::handle_request_data, + shared_from_this(), bp_data, data_len, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); +} + +void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len, + const boost::system::error_code& err, + size_t bytes_transferred) { + ldout(m_cct, 20) << dendl; + if (err || bytes_transferred != data_len) { + fault(err); + return; + } + + bufferlist bl_data; + + bl_data.append(m_bp_header); + bl_data.append(std::move(bp)); + + ObjectCacheRequest* req = decode_object_cache_request(bl_data); + + process(req); + delete req; + read_request_header(); +} + +void CacheSession::process(ObjectCacheRequest* req) { + ldout(m_cct, 20) << dendl; + m_server_process_msg(this, req); +} + +void CacheSession::send(ObjectCacheRequest* reply) { + ldout(m_cct, 20) << dendl; + bufferlist bl; + reply->encode(); + bl.append(reply->get_payload_bufferlist()); + + boost::asio::async_write(m_dm_socket, + boost::asio::buffer(bl.c_str(), bl.length()), + boost::asio::transfer_exactly(bl.length()), + [this, bl, reply](const boost::system::error_code& err, + size_t bytes_transferred) { + delete reply; + if (err || bytes_transferred != bl.length()) { + fault(err); + return; + } + }); +} + +void CacheSession::fault(const boost::system::error_code& ec) { + ldout(m_cct, 20) << "session fault : " << ec.message() << dendl; +} + +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/CacheSession.h b/src/tools/immutable_object_cache/CacheSession.h new file mode 100644 index 000000000..0826e8a2b --- /dev/null +++ b/src/tools/immutable_object_cache/CacheSession.h @@ -0,0 +1,56 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CACHE_SESSION_H +#define CEPH_CACHE_SESSION_H + +#include <boost/asio.hpp> +#include <boost/asio/error.hpp> + +#include "Types.h" +#include "SocketCommon.h" + +using boost::asio::local::stream_protocol; +using boost::asio::io_service; + +namespace ceph { +namespace immutable_obj_cache { + +class CacheSession : public std::enable_shared_from_this<CacheSession> { + public: + CacheSession(io_service& io_service, ProcessMsg process_msg, + CephContext* ctx); + ~CacheSession(); + stream_protocol::socket& socket(); + void close(); + void start(); + void read_request_header(); + void handle_request_header(const boost::system::error_code& err, + size_t bytes_transferred); + void read_request_data(uint64_t data_len); + void handle_request_data(bufferptr bp, uint64_t data_len, + const boost::system::error_code& err, + size_t bytes_transferred); + void process(ObjectCacheRequest* req); + void fault(const boost::system::error_code& ec); + void send(ObjectCacheRequest* msg); + + void set_client_version(const std::string &version); + const std::string &client_version() const; + + private: + stream_protocol::socket m_dm_socket; + ProcessMsg m_server_process_msg; + CephContext* m_cct; + + std::string m_client_version; + + bufferptr m_bp_header; +}; + +typedef std::shared_ptr<CacheSession> CacheSessionPtr; + +} // namespace immutable_obj_cache +} // namespace ceph + +#endif // CEPH_CACHE_SESSION_H diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.cc b/src/tools/immutable_object_cache/ObjectCacheStore.cc new file mode 100644 index 000000000..1b1eef1e3 --- /dev/null +++ b/src/tools/immutable_object_cache/ObjectCacheStore.cc @@ -0,0 +1,466 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "ObjectCacheStore.h" +#include "Utils.h" +#if __has_include(<filesystem>) +#include <filesystem> +namespace fs = std::filesystem; +#else +#include <experimental/filesystem> +namespace fs = std::experimental::filesystem; +#endif + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_immutable_obj_cache +#undef dout_prefix +#define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \ + << __func__ << ": " + + +namespace ceph { +namespace immutable_obj_cache { + +namespace { + +class SafeTimerSingleton : public CommonSafeTimer<ceph::mutex> { +public: + ceph::mutex lock = ceph::make_mutex + ("ceph::immutable_object_cache::SafeTimerSingleton::lock"); + + explicit SafeTimerSingleton(CephContext *cct) + : CommonSafeTimer(cct, lock, true) { + init(); + } + ~SafeTimerSingleton() { + std::lock_guard locker{lock}; + shutdown(); + } +}; + +} // anonymous namespace + +enum ThrottleTargetCode { + ROC_QOS_IOPS_THROTTLE = 1, + ROC_QOS_BPS_THROTTLE = 2 +}; + +ObjectCacheStore::ObjectCacheStore(CephContext *cct) + : m_cct(cct), m_rados(new librados::Rados()) { + + m_cache_root_dir = + m_cct->_conf.get_val<std::string>("immutable_object_cache_path"); + + if (m_cache_root_dir.back() != '/') { + m_cache_root_dir += "/"; + } + + uint64_t cache_max_size = + m_cct->_conf.get_val<Option::size_t>("immutable_object_cache_max_size"); + + double cache_watermark = + m_cct->_conf.get_val<double>("immutable_object_cache_watermark"); + + uint64_t max_inflight_ops = + m_cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops"); + + uint64_t limit = 0; + if ((limit = m_cct->_conf.get_val<uint64_t> + ("immutable_object_cache_qos_iops_limit")) != 0) { + apply_qos_tick_and_limit(ROC_QOS_IOPS_THROTTLE, + m_cct->_conf.get_val<std::chrono::milliseconds> + ("immutable_object_cache_qos_schedule_tick_min"), + limit, + m_cct->_conf.get_val<uint64_t> + ("immutable_object_cache_qos_iops_burst"), + m_cct->_conf.get_val<std::chrono::seconds> + ("immutable_object_cache_qos_iops_burst_seconds")); + } + if ((limit = m_cct->_conf.get_val<uint64_t> + ("immutable_object_cache_qos_bps_limit")) != 0) { + apply_qos_tick_and_limit(ROC_QOS_BPS_THROTTLE, + m_cct->_conf.get_val<std::chrono::milliseconds> + ("immutable_object_cache_qos_schedule_tick_min"), + limit, + m_cct->_conf.get_val<uint64_t> + ("immutable_object_cache_qos_bps_burst"), + m_cct->_conf.get_val<std::chrono::seconds> + ("immutable_object_cache_qos_bps_burst_seconds")); + } + + if ((cache_watermark <= 0) || (cache_watermark > 1)) { + lderr(m_cct) << "Invalid water mark provided, set it to default." << dendl; + cache_watermark = 0.9; + } + m_policy = new SimplePolicy(m_cct, cache_max_size, max_inflight_ops, + cache_watermark); +} + +ObjectCacheStore::~ObjectCacheStore() { + delete m_policy; + if (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE) { + ceph_assert(m_throttles[ROC_QOS_IOPS_THROTTLE] != nullptr); + delete m_throttles[ROC_QOS_IOPS_THROTTLE]; + } + if (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE) { + ceph_assert(m_throttles[ROC_QOS_BPS_THROTTLE] != nullptr); + delete m_throttles[ROC_QOS_BPS_THROTTLE]; + } +} + +int ObjectCacheStore::init(bool reset) { + ldout(m_cct, 20) << dendl; + + int ret = m_rados->init_with_context(m_cct); + if (ret < 0) { + lderr(m_cct) << "fail to init Ceph context" << dendl; + return ret; + } + + ret = m_rados->connect(); + if (ret < 0) { + lderr(m_cct) << "fail to connect to cluster" << dendl; + return ret; + } + + // TODO(dehao): fsck and reuse existing cache objects + if (reset) { + try { + if (fs::exists(m_cache_root_dir)) { + // remove all sub folders + for (auto& p : fs::directory_iterator(m_cache_root_dir)) { + fs::remove_all(p.path()); + } + } else { + fs::create_directories(m_cache_root_dir); + } + } catch (const fs::filesystem_error& e) { + lderr(m_cct) << "failed to initialize cache store directory: " + << e.what() << dendl; + return -e.code().value(); + } + } + return 0; +} + +int ObjectCacheStore::shutdown() { + ldout(m_cct, 20) << dendl; + + m_rados->shutdown(); + return 0; +} + +int ObjectCacheStore::init_cache() { + ldout(m_cct, 20) << dendl; + std::string cache_dir = m_cache_root_dir; + + return 0; +} + +int ObjectCacheStore::do_promote(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, std::string object_name) { + ldout(m_cct, 20) << "to promote object: " << object_name + << " from pool id: " << pool_id + << " namespace: " << pool_nspace + << " snapshot: " << snap_id << dendl; + + int ret = 0; + std::string cache_file_name = + get_cache_file_name(pool_nspace, pool_id, snap_id, object_name); + librados::IoCtx ioctx; + { + std::lock_guard _locker{m_ioctx_map_lock}; + if (m_ioctx_map.find(pool_id) == m_ioctx_map.end()) { + ret = m_rados->ioctx_create2(pool_id, ioctx); + if (ret < 0) { + lderr(m_cct) << "fail to create ioctx" << dendl; + return ret; + } + m_ioctx_map.emplace(pool_id, ioctx); + } else { + ioctx = m_ioctx_map[pool_id]; + } + } + + ioctx.set_namespace(pool_nspace); + ioctx.snap_set_read(snap_id); + + librados::bufferlist* read_buf = new librados::bufferlist(); + + auto ctx = new LambdaContext([this, read_buf, cache_file_name](int ret) { + handle_promote_callback(ret, read_buf, cache_file_name); + }); + + return promote_object(&ioctx, object_name, read_buf, ctx); +} + +int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf, + std::string cache_file_name) { + ldout(m_cct, 20) << " cache_file_name: " << cache_file_name << dendl; + + // rados read error + if (ret != -ENOENT && ret < 0) { + lderr(m_cct) << "fail to read from rados" << dendl; + + m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); + delete read_buf; + return ret; + } + + auto state = OBJ_CACHE_PROMOTED; + if (ret == -ENOENT) { + // object is empty + state = OBJ_CACHE_DNE; + ret = 0; + } else { + std::string cache_file_path = get_cache_file_path(cache_file_name, true); + if (cache_file_path == "") { + lderr(m_cct) << "fail to write cache file" << dendl; + m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); + delete read_buf; + return -ENOSPC; + } + + ret = read_buf->write_file(cache_file_path.c_str()); + if (ret < 0) { + lderr(m_cct) << "fail to write cache file" << dendl; + + m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); + delete read_buf; + return ret; + } + } + + // update metadata + ceph_assert(OBJ_CACHE_SKIP == m_policy->get_status(cache_file_name)); + m_policy->update_status(cache_file_name, state, read_buf->length()); + ceph_assert(state == m_policy->get_status(cache_file_name)); + + delete read_buf; + + evict_objects(); + + return ret; +} + +int ObjectCacheStore::lookup_object(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, uint64_t object_size, + std::string object_name, + bool return_dne_path, + std::string& target_cache_file_path) { + ldout(m_cct, 20) << "object name = " << object_name + << " in pool ID : " << pool_id << dendl; + + int pret = -1; + std::string cache_file_name = + get_cache_file_name(pool_nspace, pool_id, snap_id, object_name); + + cache_status_t ret = m_policy->lookup_object(cache_file_name); + + switch (ret) { + case OBJ_CACHE_NONE: { + if (take_token_from_throttle(object_size, 1)) { + pret = do_promote(pool_nspace, pool_id, snap_id, object_name); + if (pret < 0) { + lderr(m_cct) << "fail to start promote" << dendl; + } + } else { + m_policy->update_status(cache_file_name, OBJ_CACHE_NONE); + } + return ret; + } + case OBJ_CACHE_PROMOTED: + target_cache_file_path = get_cache_file_path(cache_file_name); + return ret; + case OBJ_CACHE_DNE: + if (return_dne_path) { + target_cache_file_path = get_cache_file_path(cache_file_name); + } + return ret; + case OBJ_CACHE_SKIP: + return ret; + default: + lderr(m_cct) << "unrecognized object cache status" << dendl; + ceph_assert(0); + } +} + +int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, + std::string object_name, + librados::bufferlist* read_buf, + Context* on_finish) { + ldout(m_cct, 20) << "object name = " << object_name << dendl; + + librados::AioCompletion* read_completion = create_rados_callback(on_finish); + // issue a zero-sized read req to get the entire obj + int ret = ioctx->aio_read(object_name, read_completion, read_buf, 0, 0); + if (ret < 0) { + lderr(m_cct) << "failed to read from rados" << dendl; + } + read_completion->release(); + + return ret; +} + +int ObjectCacheStore::evict_objects() { + ldout(m_cct, 20) << dendl; + + std::list<std::string> obj_list; + m_policy->get_evict_list(&obj_list); + for (auto& obj : obj_list) { + do_evict(obj); + } + return 0; +} + +int ObjectCacheStore::do_evict(std::string cache_file) { + ldout(m_cct, 20) << "file = " << cache_file << dendl; + + if (cache_file == "") { + return 0; + } + + std::string cache_file_path = get_cache_file_path(cache_file); + + ldout(m_cct, 20) << "evict cache: " << cache_file_path << dendl; + + // TODO(dehao): possible race on read? + int ret = std::remove(cache_file_path.c_str()); + // evict metadata + if (ret == 0) { + m_policy->update_status(cache_file, OBJ_CACHE_SKIP); + m_policy->evict_entry(cache_file); + } + + return ret; +} + +std::string ObjectCacheStore::get_cache_file_name(std::string pool_nspace, + uint64_t pool_id, + uint64_t snap_id, + std::string oid) { + return pool_nspace + ":" + std::to_string(pool_id) + ":" + + std::to_string(snap_id) + ":" + oid; +} + +std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name, + bool mkdir) { + ldout(m_cct, 20) << cache_file_name <<dendl; + + uint32_t crc = 0; + crc = ceph_crc32c(0, (unsigned char *)cache_file_name.c_str(), + cache_file_name.length()); + + std::string cache_file_dir = std::to_string(crc % 100) + "/"; + + if (mkdir) { + ldout(m_cct, 20) << "creating cache dir: " << cache_file_dir <<dendl; + std::error_code ec; + std::string new_dir = m_cache_root_dir + cache_file_dir; + if (fs::exists(new_dir, ec)) { + ldout(m_cct, 20) << "cache dir exists: " << cache_file_dir <<dendl; + return new_dir + cache_file_name; + } + + if (!fs::create_directories(new_dir, ec)) { + ldout(m_cct, 5) << "fail to create cache dir: " << new_dir + << "error: " << ec.message() << dendl; + return ""; + } + } + + return m_cache_root_dir + cache_file_dir + cache_file_name; +} + +void ObjectCacheStore::handle_throttle_ready(uint64_t tokens, uint64_t type) { + m_io_throttled = false; + std::lock_guard lock(m_throttle_lock); + if (type & ROC_QOS_IOPS_THROTTLE){ + m_iops_tokens += tokens; + } else if (type & ROC_QOS_BPS_THROTTLE){ + m_bps_tokens += tokens; + } else { + lderr(m_cct) << "unknow throttle type." << dendl; + } +} + +bool ObjectCacheStore::take_token_from_throttle(uint64_t object_size, + uint64_t object_num) { + if (m_io_throttled == true) { + return false; + } + + int flag = 0; + bool wait = false; + if (!wait && (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE)) { + std::lock_guard lock(m_throttle_lock); + if (object_num > m_iops_tokens) { + wait = m_throttles[ROC_QOS_IOPS_THROTTLE]->get(object_num, this, + &ObjectCacheStore::handle_throttle_ready, object_num, + ROC_QOS_IOPS_THROTTLE); + } else { + m_iops_tokens -= object_num; + flag = 1; + } + } + if (!wait && (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE)) { + std::lock_guard lock(m_throttle_lock); + if (object_size > m_bps_tokens) { + wait = m_throttles[ROC_QOS_BPS_THROTTLE]->get(object_size, this, + &ObjectCacheStore::handle_throttle_ready, object_size, + ROC_QOS_BPS_THROTTLE); + } else { + m_bps_tokens -= object_size; + } + } + + if (wait) { + m_io_throttled = true; + // when passing iops throttle, but limit in bps throttle, recovery + if (flag == 1) { + std::lock_guard lock(m_throttle_lock); + m_iops_tokens += object_num; + } + } + + return !wait; +} + +static const std::map<uint64_t, std::string> THROTTLE_FLAGS = { + { ROC_QOS_IOPS_THROTTLE, "roc_qos_iops_throttle" }, + { ROC_QOS_BPS_THROTTLE, "roc_qos_bps_throttle" } +}; + +void ObjectCacheStore::apply_qos_tick_and_limit( + const uint64_t flag, + std::chrono::milliseconds min_tick, + uint64_t limit, + uint64_t burst, + std::chrono::seconds burst_seconds) { + SafeTimerSingleton* safe_timer_singleton = nullptr; + TokenBucketThrottle* throttle = nullptr; + safe_timer_singleton = + &m_cct->lookup_or_create_singleton_object<SafeTimerSingleton>( + "tools::immutable_object_cache", false, m_cct); + SafeTimer* timer = safe_timer_singleton; + ceph::mutex* timer_lock = &safe_timer_singleton->lock; + m_qos_enabled_flag |= flag; + auto throttle_flags_it = THROTTLE_FLAGS.find(flag); + ceph_assert(throttle_flags_it != THROTTLE_FLAGS.end()); + throttle = new TokenBucketThrottle(m_cct, throttle_flags_it->second, + 0, 0, timer, timer_lock); + throttle->set_schedule_tick_min(min_tick.count()); + int ret = throttle->set_limit(limit, burst, burst_seconds.count()); + if (ret < 0) { + lderr(m_cct) << throttle->get_name() << ": invalid qos parameter: " + << "burst(" << burst << ") is less than " + << "limit(" << limit << ")" << dendl; + throttle->set_limit(limit, 0, 1); + } + + ceph_assert(m_throttles.find(flag) == m_throttles.end()); + m_throttles.insert({flag, throttle}); +} + +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/ObjectCacheStore.h b/src/tools/immutable_object_cache/ObjectCacheStore.h new file mode 100644 index 000000000..51e5a77b8 --- /dev/null +++ b/src/tools/immutable_object_cache/ObjectCacheStore.h @@ -0,0 +1,85 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CACHE_OBJECT_CACHE_STORE_H +#define CEPH_CACHE_OBJECT_CACHE_STORE_H + +#include "common/ceph_context.h" +#include "common/ceph_mutex.h" +#include "common/Timer.h" +#include "common/Throttle.h" +#include "common/Cond.h" +#include "include/rados/librados.hpp" + +#include "SimplePolicy.h" + + +using librados::Rados; +using librados::IoCtx; +class Context; + +namespace ceph { +namespace immutable_obj_cache { + +typedef shared_ptr<librados::Rados> RadosRef; +typedef shared_ptr<librados::IoCtx> IoCtxRef; + +class ObjectCacheStore { + public: + ObjectCacheStore(CephContext *cct); + ~ObjectCacheStore(); + int init(bool reset); + int shutdown(); + int init_cache(); + int lookup_object(std::string pool_nspace, + uint64_t pool_id, uint64_t snap_id, + uint64_t object_size, + std::string object_name, + bool return_dne_path, + std::string& target_cache_file_path); + private: + enum ThrottleTypeCode { + THROTTLE_CODE_BYTE, + THROTTLE_CODE_OBJECT + }; + + std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, std::string oid); + std::string get_cache_file_path(std::string cache_file_name, + bool mkdir = false); + int evict_objects(); + int do_promote(std::string pool_nspace, uint64_t pool_id, + uint64_t snap_id, std::string object_name); + int promote_object(librados::IoCtx*, std::string object_name, + librados::bufferlist* read_buf, + Context* on_finish); + int handle_promote_callback(int, bufferlist*, std::string); + int do_evict(std::string cache_file); + + bool take_token_from_throttle(uint64_t object_size, uint64_t object_num); + void handle_throttle_ready(uint64_t tokens, uint64_t type); + void apply_qos_tick_and_limit(const uint64_t flag, + std::chrono::milliseconds min_tick, + uint64_t limit, uint64_t burst, + std::chrono::seconds burst_seconds); + + CephContext *m_cct; + RadosRef m_rados; + std::map<uint64_t, librados::IoCtx> m_ioctx_map; + ceph::mutex m_ioctx_map_lock = + ceph::make_mutex("ceph::cache::ObjectCacheStore::m_ioctx_map_lock"); + Policy* m_policy; + std::string m_cache_root_dir; + // throttle mechanism + uint64_t m_qos_enabled_flag{0}; + std::map<uint64_t, TokenBucketThrottle*> m_throttles; + bool m_io_throttled{false}; + ceph::mutex m_throttle_lock = + ceph::make_mutex("ceph::cache::ObjectCacheStore::m_throttle_lock");; + uint64_t m_iops_tokens{0}; + uint64_t m_bps_tokens{0}; +}; + +} // namespace immutable_obj_cache +} // ceph +#endif // CEPH_CACHE_OBJECT_CACHE_STORE_H diff --git a/src/tools/immutable_object_cache/Policy.h b/src/tools/immutable_object_cache/Policy.h new file mode 100644 index 000000000..7924a8919 --- /dev/null +++ b/src/tools/immutable_object_cache/Policy.h @@ -0,0 +1,34 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CACHE_POLICY_H +#define CEPH_CACHE_POLICY_H + +#include <list> +#include <string> + +namespace ceph { +namespace immutable_obj_cache { + +typedef enum { + OBJ_CACHE_NONE = 0, + OBJ_CACHE_PROMOTED, + OBJ_CACHE_SKIP, + OBJ_CACHE_DNE, +} cache_status_t; + +class Policy { + public: + Policy() {} + virtual ~Policy() {} + virtual cache_status_t lookup_object(std::string) = 0; + virtual int evict_entry(std::string) = 0; + virtual void update_status(std::string, cache_status_t, + uint64_t size = 0) = 0; + virtual cache_status_t get_status(std::string) = 0; + virtual void get_evict_list(std::list<std::string>* obj_list) = 0; +}; + +} // namespace immutable_obj_cache +} // namespace ceph +#endif diff --git a/src/tools/immutable_object_cache/SimplePolicy.cc b/src/tools/immutable_object_cache/SimplePolicy.cc new file mode 100644 index 000000000..3a7375ba9 --- /dev/null +++ b/src/tools/immutable_object_cache/SimplePolicy.cc @@ -0,0 +1,216 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/debug.h" +#include "SimplePolicy.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_immutable_obj_cache +#undef dout_prefix +#define dout_prefix *_dout << "ceph::cache::SimplePolicy: " << this << " " \ + << __func__ << ": " + +namespace ceph { +namespace immutable_obj_cache { + +SimplePolicy::SimplePolicy(CephContext *cct, uint64_t cache_size, + uint64_t max_inflight, double watermark) + : cct(cct), m_watermark(watermark), m_max_inflight_ops(max_inflight), + m_max_cache_size(cache_size) { + + ldout(cct, 20) << "max cache size= " << m_max_cache_size + << " ,watermark= " << m_watermark + << " ,max inflight ops= " << m_max_inflight_ops << dendl; + + m_cache_size = 0; + +} + +SimplePolicy::~SimplePolicy() { + ldout(cct, 20) << dendl; + + for (auto it : m_cache_map) { + Entry* entry = (it.second); + delete entry; + } +} + +cache_status_t SimplePolicy::alloc_entry(std::string file_name) { + ldout(cct, 20) << "alloc entry for: " << file_name << dendl; + + std::unique_lock wlocker{m_cache_map_lock}; + + // cache hit when promoting + if (m_cache_map.find(file_name) != m_cache_map.end()) { + ldout(cct, 20) << "object is under promoting: " << file_name << dendl; + return OBJ_CACHE_SKIP; + } + + if ((m_cache_size < m_max_cache_size) && + (inflight_ops < m_max_inflight_ops)) { + Entry* entry = new Entry(); + ceph_assert(entry != nullptr); + m_cache_map[file_name] = entry; + wlocker.unlock(); + update_status(file_name, OBJ_CACHE_SKIP); + return OBJ_CACHE_NONE; // start promotion request + } + + // if there's no free entry, return skip to read from rados + return OBJ_CACHE_SKIP; +} + +cache_status_t SimplePolicy::lookup_object(std::string file_name) { + ldout(cct, 20) << "lookup: " << file_name << dendl; + + std::shared_lock rlocker{m_cache_map_lock}; + + auto entry_it = m_cache_map.find(file_name); + // simply promote on first lookup + if (entry_it == m_cache_map.end()) { + rlocker.unlock(); + return alloc_entry(file_name); + } + + Entry* entry = entry_it->second; + + if (entry->status == OBJ_CACHE_PROMOTED || entry->status == OBJ_CACHE_DNE) { + // bump pos in lru on hit + m_promoted_lru.lru_touch(entry); + } + + return entry->status; +} + +void SimplePolicy::update_status(std::string file_name, + cache_status_t new_status, uint64_t size) { + ldout(cct, 20) << "update status for: " << file_name + << " new status = " << new_status << dendl; + + std::unique_lock locker{m_cache_map_lock}; + + auto entry_it = m_cache_map.find(file_name); + if (entry_it == m_cache_map.end()) { + return; + } + + ceph_assert(entry_it != m_cache_map.end()); + Entry* entry = entry_it->second; + + // to promote + if (entry->status == OBJ_CACHE_NONE && new_status== OBJ_CACHE_SKIP) { + entry->status = new_status; + entry->file_name = file_name; + inflight_ops++; + return; + } + + // promoting done + if (entry->status == OBJ_CACHE_SKIP && (new_status== OBJ_CACHE_PROMOTED || + new_status== OBJ_CACHE_DNE)) { + m_promoted_lru.lru_insert_top(entry); + entry->status = new_status; + entry->size = size; + m_cache_size += entry->size; + inflight_ops--; + return; + } + + // promoting failed + if (entry->status == OBJ_CACHE_SKIP && new_status== OBJ_CACHE_NONE) { + // mark this entry as free + entry->file_name = ""; + entry->status = new_status; + + m_cache_map.erase(entry_it); + inflight_ops--; + delete entry; + return; + } + + // to evict + if ((entry->status == OBJ_CACHE_PROMOTED || entry->status == OBJ_CACHE_DNE) && + new_status== OBJ_CACHE_NONE) { + // mark this entry as free + uint64_t size = entry->size; + entry->file_name = ""; + entry->size = 0; + entry->status = new_status; + + m_promoted_lru.lru_remove(entry); + m_cache_map.erase(entry_it); + m_cache_size -= size; + delete entry; + return; + } +} + +int SimplePolicy::evict_entry(std::string file_name) { + ldout(cct, 20) << "to evict: " << file_name << dendl; + + update_status(file_name, OBJ_CACHE_NONE); + + return 0; +} + +cache_status_t SimplePolicy::get_status(std::string file_name) { + ldout(cct, 20) << file_name << dendl; + + std::shared_lock locker{m_cache_map_lock}; + auto entry_it = m_cache_map.find(file_name); + if (entry_it == m_cache_map.end()) { + return OBJ_CACHE_NONE; + } + + return entry_it->second->status; +} + +void SimplePolicy::get_evict_list(std::list<std::string>* obj_list) { + ldout(cct, 20) << dendl; + + std::unique_lock locker{m_cache_map_lock}; + // check free ratio, pop entries from LRU + if ((double)m_cache_size > m_max_cache_size * m_watermark) { + // TODO(dehao): make this configurable + int evict_num = m_cache_map.size() * 0.1; + for (int i = 0; i < evict_num; i++) { + Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire()); + if (entry == nullptr) { + continue; + } + std::string file_name = entry->file_name; + obj_list->push_back(file_name); + } + } +} + +// for unit test +uint64_t SimplePolicy::get_free_size() { + return m_max_cache_size - m_cache_size; +} + +uint64_t SimplePolicy::get_promoting_entry_num() { + uint64_t index = 0; + std::shared_lock rlocker{m_cache_map_lock}; + for (auto it : m_cache_map) { + if (it.second->status == OBJ_CACHE_SKIP) { + index++; + } + } + return index; +} + +uint64_t SimplePolicy::get_promoted_entry_num() { + return m_promoted_lru.lru_get_size(); +} + +std::string SimplePolicy::get_evict_entry() { + Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_get_next_expire()); + if (entry == nullptr) { + return ""; + } + return entry->file_name; +} + +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/SimplePolicy.h b/src/tools/immutable_object_cache/SimplePolicy.h new file mode 100644 index 000000000..671cbd518 --- /dev/null +++ b/src/tools/immutable_object_cache/SimplePolicy.h @@ -0,0 +1,68 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CACHE_SIMPLE_POLICY_H +#define CEPH_CACHE_SIMPLE_POLICY_H + +#include "common/ceph_context.h" +#include "common/ceph_mutex.h" +#include "include/lru.h" +#include "Policy.h" + +#include <unordered_map> +#include <string> + +namespace ceph { +namespace immutable_obj_cache { + +class SimplePolicy : public Policy { + public: + SimplePolicy(CephContext *cct, uint64_t block_num, uint64_t max_inflight, + double watermark); + ~SimplePolicy(); + + cache_status_t lookup_object(std::string file_name); + cache_status_t get_status(std::string file_name); + + void update_status(std::string file_name, + cache_status_t new_status, + uint64_t size = 0); + + int evict_entry(std::string file_name); + + void get_evict_list(std::list<std::string>* obj_list); + + uint64_t get_free_size(); + uint64_t get_promoting_entry_num(); + uint64_t get_promoted_entry_num(); + std::string get_evict_entry(); + + private: + cache_status_t alloc_entry(std::string file_name); + + class Entry : public LRUObject { + public: + cache_status_t status; + Entry() : status(OBJ_CACHE_NONE) {} + std::string file_name; + uint64_t size; + }; + + CephContext* cct; + double m_watermark; + uint64_t m_max_inflight_ops; + uint64_t m_max_cache_size; + std::atomic<uint64_t> inflight_ops = 0; + + std::unordered_map<std::string, Entry*> m_cache_map; + ceph::shared_mutex m_cache_map_lock = + ceph::make_shared_mutex("rbd::cache::SimplePolicy::m_cache_map_lock"); + + std::atomic<uint64_t> m_cache_size; + + LRU m_promoted_lru; +}; + +} // namespace immutable_obj_cache +} // namespace ceph +#endif // CEPH_CACHE_SIMPLE_POLICY_H diff --git a/src/tools/immutable_object_cache/SocketCommon.h b/src/tools/immutable_object_cache/SocketCommon.h new file mode 100644 index 000000000..99acf3609 --- /dev/null +++ b/src/tools/immutable_object_cache/SocketCommon.h @@ -0,0 +1,31 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CACHE_SOCKET_COMMON_H +#define CEPH_CACHE_SOCKET_COMMON_H + +namespace ceph { +namespace immutable_obj_cache { + +static const int RBDSC_REGISTER = 0X11; +static const int RBDSC_READ = 0X12; +static const int RBDSC_REGISTER_REPLY = 0X13; +static const int RBDSC_READ_REPLY = 0X14; +static const int RBDSC_READ_RADOS = 0X15; + +static const int ASIO_ERROR_READ = 0X01; +static const int ASIO_ERROR_WRITE = 0X02; +static const int ASIO_ERROR_CONNECT = 0X03; +static const int ASIO_ERROR_ACCEPT = 0X04; +static const int ASIO_ERROR_MSG_INCOMPLETE = 0X05; + +class ObjectCacheRequest; +class CacheSession; + +typedef GenContextURef<ObjectCacheRequest*> CacheGenContextURef; + +typedef std::function<void(CacheSession*, ObjectCacheRequest*)> ProcessMsg; + +} // namespace immutable_obj_cache +} // namespace ceph +#endif // CEPH_CACHE_SOCKET_COMMON_H diff --git a/src/tools/immutable_object_cache/Types.cc b/src/tools/immutable_object_cache/Types.cc new file mode 100644 index 000000000..860017d6a --- /dev/null +++ b/src/tools/immutable_object_cache/Types.cc @@ -0,0 +1,184 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Types.h" +#include "SocketCommon.h" + +#define dout_subsys ceph_subsys_immutable_obj_cache +#undef dout_prefix +#define dout_prefix *_dout << "ceph::cache::Types: " << __func__ << ": " + +namespace ceph { +namespace immutable_obj_cache { + +ObjectCacheRequest::ObjectCacheRequest() {} +ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s) + : type(t), seq(s) {} +ObjectCacheRequest::~ObjectCacheRequest() {} + +void ObjectCacheRequest::encode() { + ENCODE_START(2, 1, payload); + ceph::encode(type, payload); + ceph::encode(seq, payload); + if (!payload_empty()) { + encode_payload(); + } + ENCODE_FINISH(payload); +} + +void ObjectCacheRequest::decode(bufferlist& bl) { + auto i = bl.cbegin(); + DECODE_START(2, i); + ceph::decode(type, i); + ceph::decode(seq, i); + if (!payload_empty()) { + decode_payload(i, struct_v); + } + DECODE_FINISH(i); +} + +ObjectCacheRegData::ObjectCacheRegData() {} +ObjectCacheRegData::ObjectCacheRegData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} +ObjectCacheRegData::ObjectCacheRegData(uint16_t t, uint64_t s, + const std::string &version) + : ObjectCacheRequest(t, s), + version(version) { +} + +ObjectCacheRegData::~ObjectCacheRegData() {} + +void ObjectCacheRegData::encode_payload() { + ceph::encode(version, payload); +} + +void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i, + __u8 encode_version) { + if (i.end()) { + return; + } + ceph::decode(version, i); +} + +ObjectCacheRegReplyData::ObjectCacheRegReplyData() {} +ObjectCacheRegReplyData::ObjectCacheRegReplyData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} + +ObjectCacheRegReplyData::~ObjectCacheRegReplyData() {} + +void ObjectCacheRegReplyData::encode_payload() {} + +void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) {} + +ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s, + uint64_t read_offset, + uint64_t read_len, + uint64_t pool_id, uint64_t snap_id, + uint64_t object_size, + std::string oid, + std::string pool_namespace) + : ObjectCacheRequest(t, s), read_offset(read_offset), + read_len(read_len), pool_id(pool_id), snap_id(snap_id), + object_size(object_size), oid(oid), pool_namespace(pool_namespace) +{} + +ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} + +ObjectCacheReadData::~ObjectCacheReadData() {} + +void ObjectCacheReadData::encode_payload() { + ceph::encode(read_offset, payload); + ceph::encode(read_len, payload); + ceph::encode(pool_id, payload); + ceph::encode(snap_id, payload); + ceph::encode(oid, payload); + ceph::encode(pool_namespace, payload); + ceph::encode(object_size, payload); +} + +void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i, + __u8 encode_version) { + ceph::decode(read_offset, i); + ceph::decode(read_len, i); + ceph::decode(pool_id, i); + ceph::decode(snap_id, i); + ceph::decode(oid, i); + ceph::decode(pool_namespace, i); + if (encode_version >= 2) { + ceph::decode(object_size, i); + } +} + +ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s, + string cache_path) + : ObjectCacheRequest(t, s), cache_path(cache_path) {} +ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} + +ObjectCacheReadReplyData::~ObjectCacheReadReplyData() {} + +void ObjectCacheReadReplyData::encode_payload() { + ceph::encode(cache_path, payload); +} + +void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i, + __u8 encode_version) { + ceph::decode(cache_path, i); +} + +ObjectCacheReadRadosData::ObjectCacheReadRadosData() {} +ObjectCacheReadRadosData::ObjectCacheReadRadosData(uint16_t t, uint64_t s) + : ObjectCacheRequest(t, s) {} + +ObjectCacheReadRadosData::~ObjectCacheReadRadosData() {} + +void ObjectCacheReadRadosData::encode_payload() {} + +void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i, + __u8 encode_version) {} + +ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) { + ObjectCacheRequest* req = nullptr; + + uint16_t type; + uint64_t seq; + auto i = payload_buffer.cbegin(); + DECODE_START(1, i); + ceph::decode(type, i); + ceph::decode(seq, i); + DECODE_FINISH(i); + + switch (type) { + case RBDSC_REGISTER: { + req = new ObjectCacheRegData(type, seq); + break; + } + case RBDSC_READ: { + req = new ObjectCacheReadData(type, seq); + break; + } + case RBDSC_REGISTER_REPLY: { + req = new ObjectCacheRegReplyData(type, seq); + break; + } + case RBDSC_READ_REPLY: { + req = new ObjectCacheReadReplyData(type, seq); + break; + } + case RBDSC_READ_RADOS: { + req = new ObjectCacheReadRadosData(type, seq); + break; + } + default: + ceph_assert(0); + } + + req->decode(payload_buffer); + + return req; +} + +} // namespace immutable_obj_cache +} // namespace ceph diff --git a/src/tools/immutable_object_cache/Types.h b/src/tools/immutable_object_cache/Types.h new file mode 100644 index 000000000..05394d843 --- /dev/null +++ b/src/tools/immutable_object_cache/Types.h @@ -0,0 +1,136 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CACHE_TYPES_H +#define CEPH_CACHE_TYPES_H + +#include "include/encoding.h" +#include "include/Context.h" +#include "SocketCommon.h" + +namespace ceph { +namespace immutable_obj_cache { + +namespace { +struct HeaderHelper { + uint8_t v; + uint8_t c_v; + ceph_le32 len; +}__attribute__((packed)); + +inline uint8_t get_header_size() { + return sizeof(HeaderHelper); +} + +inline uint32_t get_data_len(char* buf) { + HeaderHelper* header = reinterpret_cast<HeaderHelper*>(buf); + return header->len; +} +} // namespace + +class ObjectCacheRequest { + public: + uint16_t type; + uint64_t seq; + + bufferlist payload; + + CacheGenContextURef process_msg; + + ObjectCacheRequest(); + ObjectCacheRequest(uint16_t type, uint64_t seq); + virtual ~ObjectCacheRequest(); + + // encode consists of two steps + // step 1 : directly encode common bits using encode method of base classs. + // step 2 : according to payload_empty, determine whether addtional bits + // need to be encoded which be implements by child class. + void encode(); + void decode(bufferlist& bl); + bufferlist get_payload_bufferlist() { return payload; } + + virtual void encode_payload() = 0; + virtual void decode_payload(bufferlist::const_iterator bl_it, + __u8 encode_version) = 0; + virtual uint16_t get_request_type() = 0; + virtual bool payload_empty() = 0; +}; + +class ObjectCacheRegData : public ObjectCacheRequest { + public: + std::string version; + ObjectCacheRegData(); + ObjectCacheRegData(uint16_t t, uint64_t s, const std::string &version); + ObjectCacheRegData(uint16_t t, uint64_t s); + ~ObjectCacheRegData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) override; + uint16_t get_request_type() override { return RBDSC_REGISTER; } + bool payload_empty() override { return false; } +}; + +class ObjectCacheRegReplyData : public ObjectCacheRequest { + public: + ObjectCacheRegReplyData(); + ObjectCacheRegReplyData(uint16_t t, uint64_t s); + ~ObjectCacheRegReplyData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator iter, + __u8 encode_version) override; + uint16_t get_request_type() override { return RBDSC_REGISTER_REPLY; } + bool payload_empty() override { return true; } +}; + +class ObjectCacheReadData : public ObjectCacheRequest { + public: + uint64_t read_offset; + uint64_t read_len; + uint64_t pool_id; + uint64_t snap_id; + uint64_t object_size = 0; + std::string oid; + std::string pool_namespace; + ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset, + uint64_t read_len, uint64_t pool_id, + uint64_t snap_id, uint64_t object_size, + std::string oid, std::string pool_namespace); + ObjectCacheReadData(uint16_t t, uint64_t s); + ~ObjectCacheReadData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) override; + uint16_t get_request_type() override { return RBDSC_READ; } + bool payload_empty() override { return false; } +}; + +class ObjectCacheReadReplyData : public ObjectCacheRequest { + public: + std::string cache_path; + ObjectCacheReadReplyData(uint16_t t, uint64_t s, std::string cache_path); + ObjectCacheReadReplyData(uint16_t t, uint64_t s); + ~ObjectCacheReadReplyData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) override; + uint16_t get_request_type() override { return RBDSC_READ_REPLY; } + bool payload_empty() override { return false; } +}; + +class ObjectCacheReadRadosData : public ObjectCacheRequest { + public: + ObjectCacheReadRadosData(); + ObjectCacheReadRadosData(uint16_t t, uint64_t s); + ~ObjectCacheReadRadosData() override; + void encode_payload() override; + void decode_payload(bufferlist::const_iterator bl, + __u8 encode_version) override; + uint16_t get_request_type() override { return RBDSC_READ_RADOS; } + bool payload_empty() override { return true; } +}; + +ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer); + +} // namespace immutable_obj_cache +} // namespace ceph +#endif // CEPH_CACHE_TYPES_H diff --git a/src/tools/immutable_object_cache/Utils.h b/src/tools/immutable_object_cache/Utils.h new file mode 100644 index 000000000..3c68cfa7b --- /dev/null +++ b/src/tools/immutable_object_cache/Utils.h @@ -0,0 +1,31 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_CACHE_UTILS_H +#define CEPH_CACHE_UTILS_H + +#include "include/rados/librados.hpp" +#include "include/Context.h" + +namespace ceph { +namespace immutable_obj_cache { +namespace detail { + +template <typename T, void(T::*MF)(int)> +void rados_callback(rados_completion_t c, void *arg) { + T *obj = reinterpret_cast<T*>(arg); + int r = rados_aio_get_return_value(c); + (obj->*MF)(r); +} + +} // namespace detail + +template <typename T, void(T::*MF)(int)=&T::complete> +librados::AioCompletion *create_rados_callback(T *obj) { + return librados::Rados::aio_create_completion( + obj, &detail::rados_callback<T, MF>); +} + +} // namespace immutable_obj_cache +} // namespace ceph +#endif // CEPH_CACHE_UTILS_H diff --git a/src/tools/immutable_object_cache/main.cc b/src/tools/immutable_object_cache/main.cc new file mode 100644 index 000000000..55b0d087a --- /dev/null +++ b/src/tools/immutable_object_cache/main.cc @@ -0,0 +1,84 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "common/ceph_argparse.h" +#include "common/config.h" +#include "common/debug.h" +#include "common/errno.h" +#include "global/global_init.h" +#include "global/signal_handler.h" +#include "CacheController.h" + +#include <vector> + +ceph::immutable_obj_cache::CacheController *cachectl = nullptr; + +void usage() { + std::cout << "usage: ceph-immutable-object-cache [options...]" << std::endl; + std::cout << "options:\n"; + std::cout << " -m monaddress[:port] connect to specified monitor\n"; + std::cout << " --keyring=<path> path to keyring for local " + << "cluster\n"; + std::cout << " --log-file=<logfile> file to log debug output\n"; + std::cout << " --debug-immutable-obj-cache=<log-level>/<memory-level> " + << "set debug level\n"; + generic_server_usage(); +} + +static void handle_signal(int signum) { + if (cachectl) + cachectl->handle_signal(signum); +} + +int main(int argc, const char **argv) { + std::vector<const char*> args; + env_to_vec(args); + argv_to_vec(argc, argv, args); + + if (ceph_argparse_need_usage(args)) { + usage(); + exit(0); + } + + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_DAEMON, + CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); + + if (g_conf()->daemonize) { + global_init_daemonize(g_ceph_context); + } + + common_init_finish(g_ceph_context); + global_init_chdir(g_ceph_context); + + init_async_signal_handler(); + register_async_signal_handler(SIGHUP, sighup_handler); + register_async_signal_handler_oneshot(SIGINT, handle_signal); + register_async_signal_handler_oneshot(SIGTERM, handle_signal); + + std::vector<const char*> cmd_args; + argv_to_vec(argc, argv, cmd_args); + + cachectl = new ceph::immutable_obj_cache::CacheController(g_ceph_context, + cmd_args); + int r = cachectl->init(); + if (r < 0) { + std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; + goto cleanup; + } + + r = cachectl->run(); + if (r < 0) { + goto cleanup; + } + + cleanup: + unregister_async_signal_handler(SIGHUP, sighup_handler); + unregister_async_signal_handler(SIGINT, handle_signal); + unregister_async_signal_handler(SIGTERM, handle_signal); + shutdown_async_signal_handler(); + + delete cachectl; + + return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; +} |